#!/usr/bin/env python3 # encoding:utf-8 import json import random import socketserver import sys import ESL import time import mmh3 import threading import traceback import concurrent.futures from apscheduler.schedulers.background import BackgroundScheduler from src.core.callcenter.cache import Cache from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand from src.core.callcenter.callback import Callback from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH, saasId from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, \ SPACE, SPLIT, SOFIA, \ ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, \ UUID_BREAK, UUID_HOLD, \ UUID_RECORD, UUID_SETVAR, UUID_GETVAR, UUID_KILL import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil import src.core.callcenter.esl.handler as event_handler from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \ Direction, CdrType from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler from src.core.callcenter.snowflake import Snowflake from src.core.datasource import SERVE_HOST from src.core.voip.constant import * from src.core.callcenter.data_handler import * class InboundClient: def __init__(self, agent, app): self.con = None self.thread_num = 12 self.is_stopping = False self.app = app self.logger = app.logger self.bot_agent = agent self.cache = Cache(app) self.callback = Callback(app) self.dataHandleServer = DataHandleServer(app) self.handler_table = self.scan_esl_event_handlers() self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent) self.host, self.port, self.password = SERVE_HOST, '8021', '4918257983818884358' self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(self.thread_num)} self.delay_action_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) self.delay_action_scheduler = BackgroundScheduler() self.delay_action_scheduler.add_job(self.submit_delay_action, 'interval', seconds=1, max_instances=1) self.delay_action_scheduler.start() threading.Thread(target=self.start, args=()).start() def submit_delay_action(self): for name, member in DelayActionEnum.__members__.items(): action_messages = self.cache.get_delay_message(name) for action_message in action_messages: self.delay_action_executor.submit(self.do_delay_action, name, action_message) def scan_esl_event_handlers(self): import inspect import importlib import pkgutil classes = [] # 遍历包中的模块 for module_info in pkgutil.iter_modules(event_handler.__path__): module = importlib.import_module(f"{event_handler.__name__}.{module_info.name}") for _name, _cls in inspect.getmembers(module, inspect.isclass): if hasattr(_cls, '_esl_event_name'): classes.append(_cls) handlers = {} for _cls in classes: items = handlers.get(_cls._esl_event_name, []) items.append(_cls(self, self.bot_agent)) handlers[_cls._esl_event_name] = items return handlers def start(self): self.logger.info('inbound.start') self.con = ESL.ESLconnection(self.host, self.port, self.password) if self.con.connected(): self.logger.info('inbound esl connected ... ') self.con.events('plain', 'all') #CHANNEL_ORIGINATE,CHANNEL_PROGRESS,CHANNEL_PROGRESS_MEDIA,CHANNEL_ANSWER,CHANNEL_HANGUP,CUSTOM,PLAYBACK_START,PLAYBACK_STOP,DETECTED_TONE while not self.is_stopping: e = self.con.recvEvent() # if e: # self.logger.info(json.loads(e.serialize('json'))) event_name = e.getHeader("Event-Name") if event_name == "SERVER_DISCONNECTED": self.logger.info('come in SERVER_DISCONNECTED case') self.con.disconnect() time.sleep(3) self.start() else: threading.Thread(target=self.process_esl_event, args=(e,)).start() # self.choose_thread_pool_executor(e).submit(self.process_esl_event, e) def choose_thread_pool_executor(self, e): call_id = EslEventUtil.getCallId(e) device_id = EslEventUtil.getUniqueId(e) wdh_device_id = EslEventUtil.getDeviceId(e) random_id = call_id if call_id else device_id if random_id: random_index = abs(mmh3.hash(random_id)) % len(self.executors) else: random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0 # self.logger.info('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id) return self.executors.get(random_index) def process_esl_event(self, e): # self.logger.info(json.loads(e.serialize('json'))) event_name = EslEventUtil.getEventName(e) coreUUID = EslEventUtil.getCoreUuid(e) address = self.host + ':' + self.port # self.logger.info("process_esl_event.event_name=%s,coreUUID=%s", event_name, coreUUID) try: self.callback.callback_event(e) if event_name in self.handler_table: items = self.handler_table.get(event_name) for x in items: try: # self.logger.info("process_esl_event.handle.%s", x.__class__.__name__) x.handle(address, e, coreUUID) except: traceback.print_exc() else: self.default_event_handler.handle(address, e, coreUUID) except: traceback.print_exc() def do_delay_action(self, action, message): delay_action = DelayAction.from_json(message) flag = self.cache.lock_delay_action(delay_action.uuid) self.logger.info("do_delay_action::action=%s, flag=%s", action, flag) if not flag: self.logger.info("异步延迟执行操作重复 action:%s msg:%s", action, message) return delay_action_enum = DelayActionEnum.get_by_code(action) if not delay_action_enum: self.logger.info("异步延迟执行 delayActionEnum为空 action:%s msg:%s", action, message) return if DelayActionEnum.CALL_TIMEOUT_HANGUP == delay_action_enum: self.exec_when_call_timeout(delay_action.call_id, delay_action.device_id) elif DelayActionEnum.PLAY_TIMEOUT_HANGUP == delay_action_enum: self.exec_when_play_timeout(delay_action.call_id) elif DelayActionEnum.ACD_TIMEOUT_PLAY == delay_action_enum: self.exec_when_acd_timeout(delay_action.call_id) def exec_when_call_timeout(self, call_id, device_id): call_info = self.cache.get_call_info(call_id) if not call_info or not (device_id in call_info.device_list): return device_info = call_info.device_info_map.get(device_id) if device_info and device_info.answer_time is None: self.logger.info("call:%s deviceId:%s execWhenCallTimeOut", call_id, device_id) device_info.hangup_cause = CallCause.CALL_TIMEOUT.name call_info.next_commands = [] if device_info.device_type <= DeviceType.ROBOT.code: call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code call_info.hangup_code = CallCause.CALL_TIMEOUT.code # if device_info.device_type.code == DeviceType.CUSTOMER.code: # call_info.user_no_answer_end_call = True if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code: channel = self.show_channel(device_id) if channel: delay_action = DelayAction(call_id=call_id, device_id=device_id) self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20) self.cache.add_call_info(call_info) self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT) self.dataHandleServer.update_record(call_id, {"status": 0}) def exec_when_play_timeout(self, call_id): call_info = self.cache.get_call_info(call_id) if not call_info or not call_info.next_commands: return self.logger.debug("播放结束音乐失败,进行挂机 callId:%s", call_id) next_types = [x.next_type for x in call_info.next_commands] if NextType.NEXT_HANGUP.code in next_types: for device_id in call_info.device_list: self.hangup_call(call_id, device_id, CallCause.PLAY_TIMEOUT) def exec_when_acd_timeout(self, call_id): call_info = self.cache.get_call_info(call_id) if not call_info: self.logger.info("exec_when_acd_timeout callInfo为空 callId: {}", call_id) return device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER] if device_list and len(device_list) == 1: device_id = device_list[0].device_id self.bridge_break(call_id, device_id) self.hold_play(device_id, HOLD_MUSIC_PATH) self.play_timeout(call_id, timeout=30) self.cache.add_call_info(call_info) self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id, device_id, HOLD_MUSIC_PATH) def make_call_new(self, context: MakeCallContext): # self.logger.info("拨打测试context:%s", context.__dict__) called = context.get_called() params = {'gateway': context.route_gateway_name, 'called': called, 'realm': context.get_realm()} builder = [ '{', context.get_sip_header(), '}' ] if context.device_type == DeviceType.CUSTOMER.code: profile = self.expression(profile1, params) builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}") else: profile = self.expression(profile2, params) builder.append(f"{profile}{PARK}") # self.logger.info("拨打测试builder:%s", builder) cmd = "".join(builder) self.logger.info(cmd) self.con.bgapi(ORIGINATE, cmd) def call_timeout(self, call_id, device_id, timeout): """呼叫超时主动挂机""" delay_action = DelayAction(call_id=call_id, device_id=device_id) self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout) def send_args(self, device_id, name, arg, con=None): msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", name) msg.addHeader("execute-app-arg", arg) if con: con.sendEvent(msg) else: self.con.sendEvent(msg) def bridge_call(self, call_id, device_id1, device_id2): """桥接电话""" self.multi_set_var(device_id1, BRIDGE_VARIABLES) self.multi_set_var(device_id2, BRIDGE_VARIABLES) self.bgapi(BRIDGE, arg=device_id1 + SPACE + device_id2, desc="bridge_call") def transfer_call(self, _from, _to): """转接""" builder = [ _from, " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline " ] command = ''.join(builder) self.bgapi(command, desc="transfer_call") def answer(self, device_id): """应答""" self.con.bgapi('uuid_phone_event', device_id + ' talk') def hangup_call(self, call_id, device_id, case_enum=CallCause.DEFAULT): """挂机""" msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", HANGUP) msg.addHeader("execute-app-arg", NORMAL_CLEARING) # msg.addHeader("async", "true") _con = None try: _con = ESL.ESLconnection(self.host, self.port, self.password) if _con.connected(): arg = ''.join([EslEventUtil.SIP_H_P_LIBRA_HANGUP_CAUSE, "=", case_enum.description]) self.send_args(device_id, SET, arg, con=_con) e = _con.sendMSG(msg) # e = _con.api(UUID_KILL, device_id) self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e)) # self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e.serialize('json'))) except: traceback.print_exc() finally: if _con: _con.disconnect() def kill_call(self, call_id, device_id, case_enum=CallCause.DEFAULT): if not device_id: return self.logger.info("kill_call:%s, device:%s, case_enum:%s", call_id, device_id, case_enum) self.api(UUID_KILL, device_id, desc="kill_call") def broadcast(self, uuid, path, smf): builder = [ UUID_BROADCAST, uuid, path, smf ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def break0(self, uuid, all=False, sync=True): builder = [ UUID_BREAK, uuid ] if all: builder.append("all") command = ' '.join(builder) if sync: self.con.api(command, EMPTY) else: self.con.bgapi(command, EMPTY) def hold(self, smf, uuid, display): builder = [ UUID_HOLD, smf, uuid, display ] if display: builder.append("all") else: builder.append(EMPTY) command = ' '.join(builder) self.con.bgapi(command, EMPTY) def get_var(self, uuid, var): builder = [ UUID_GETVAR, uuid, var ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def set_var(self, uuid, var, val): builder = [ UUID_SETVAR, uuid, var, val ] command = ' '.join(builder) self.bgapi(command, desc ="set_var") def multi_set_var(self, uuid, params): builder = [ "uuid_setvar_multi " + uuid + " " ] builder1 = [] for k, v in params.items(): builder1.append(k + "="+v) builder.append(';'.join(builder1)) command = ''.join(builder) self.bgapi(command, desc="multi_set_var") def record(self, uuid, action, path, limit): builder = [ UUID_RECORD, uuid, action, path, str(limit) ] command = ' '.join(builder) self.bgapi(command, desc="record") def transfer(self, uuid, smf, dest, dialplan, context): builder = [ UUID_TRANSFER, uuid, smf, dest, dialplan, context ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def insert(self, device_id): """强插""" builder = [ device_id, " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline " ] arg = ''.join(builder) self.con.api(TRANSFER, arg) def bridge_break(self, call_id, device_id): """拆线""" builder = [ device_id, f" -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,set:{SIP_HEADER}{sipHeaderHoldMusic}={call_id},park:' inline " ] arg = ''.join(builder) self.api(TRANSFER, arg, "bridge_break") def play_file(self, call_id, device_id, file, sync): """放音""" if sync: return self.hold_play(device_id, file) else: msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", PLAYBACK) msg.addHeader("execute-app-arg", file) msg.addHeader("async", "true") self.con.sendEvent(msg) def stop_play(self, device_id): """关闭播放音乐""" builder = [ device_id, " on" ] arg = ''.join(builder) self.con.api(PAUSE, arg) def hold_play(self, device_id, play): """向a-leg插播tts音乐(无限播放)""" builder = [ device_id, " playback::", play, " ", SMF_ALEG ] arg = ''.join(builder) self.api(UUID_BROADCAST, arg, "hold_play") def api(self, command, arg=EMPTY, desc=""): _con = None try: _con = ESL.ESLconnection(self.host, self.port, self.password) if _con.connected(): e = _con.api(command, arg) self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' ' + arg, e.serialize('json'))) except: traceback.print_exc() finally: if _con: _con.disconnect() def bgapi(self, command, arg=EMPTY, desc=""): _con = None try: _con = ESL.ESLconnection(self.host, self.port, self.password) if _con.connected(): e = _con.bgapi(command, arg) self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' '+arg, e.serialize('json'))) except: traceback.print_exc() finally: if _con: _con.disconnect() def play_timeout(self, call_id, timeout): """播放超时主动挂机""" delay_action = DelayAction(call_id=call_id) self.cache.add_delay_message(DelayActionEnum.PLAY_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout) def listen(self, device_id1, device_id2, aleg=True, bleg=True): """监听""" if aleg: self.send_args(device_id1, SET, "eavesdrop_bridge_aleg=true") if bleg: self.send_args(device_id1, SET, "eavesdrop_bridge_bleg=true") self.send_args(device_id1, EAVESDROP, device_id2) def show_channel(self, device_id): msg = self.con.api("show", " channels like " + device_id + " as json") self.logger.info('show_channel::', msg) return msg def expression(self, template, params): for key, value in params.items(): template = template.replace("#{["+key+"]}", str(value)) return template def stop(self): for k, v in self.executors.items(): v.shutdown() self.con.disconnect() self.is_stopping = True class OutboundClient: def __init__(self, agent, app): self.app = app self.logger = app.logger self.whitelist = [] self.update_whitelist() # 初始化加载白名单 # 定时更新白名单 threading.Thread(target=self.refresh_whitelist, daemon=True).start() self.dataHandleServer = DataHandleServer(app) #threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent, logger)).start() server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent)) server_thread.daemon = True # 设置守护线程 server_thread.start() def update_whitelist(self): with self.app.app_context(): phones = Whitelist.query.filter_by(del_flag=0).all() self.whitelist = [phone.phone for phone in phones] self.logger.info("Whitelist updated: %s", self.whitelist) def refresh_whitelist(self): while True: time.sleep(3600) # 每 1小时 更新一次 self.update_whitelist() def load_whitelist(self): return self.whitelist def load_agent_monitor(self): with self.app.app_context(): agents = AgentMonitor.query.filter_by(check_state=0,service_state=2).all() agent_nums = [agent.agent_num for agent in agents] return agent_nums class ESLRequestHandler(socketserver.BaseRequestHandler): def setup(self): try: self.server.logger.info('%s connected!', self.client_address) fd = self.request.fileno() con = ESL.ESLconnection(fd) self.server.logger.info('Connected: %s', con.connected()) if con.connected(): info = con.getInfo() # self.logger.info(json.loads(info.serialize('json'))) event_name = info.getHeader("Event-Name") self.server.logger.info('Event-Name: %s', event_name) device_id = info.getHeader("unique-id") caller_number = info.getHeader("Caller-Caller-ID-Number") # 获取来电号码 whitelist = self.server.load_whitelist() call_id = 'C' + str(Snowflake().next_id()) new_device_id = 'D' + str(Snowflake().next_id()) kwargs = json.loads(info.serialize('json')) kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id kwargs['variable_sip_h_P-LIBRA-DeviceId'] = new_device_id call_info = { "session_id": call_id, "time_begin": datetime.utcnow(), "category": 0, "phone": caller_number } # 检查白名单 if caller_number in whitelist: agents = self.server.load_agent_monitor() destination = random.choice(agents) # 随机取一个坐席号 # 直接转接到人工坐席 self.server.logger.info( "Caller %s is in whitelist, agents: %s, destination: %s"% (caller_number, agents, destination)) call_info['type']= 0 call_info['agent_num'] = destination else: #转到ai机器人 destination = self.server.agent.register(**kwargs) self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s"% (device_id, destination, new_device_id)) call_info['type'] = 1 call_info['service_category'] = 1 call_info['user_id'] = destination call_info['user_name'] = f"机器人{destination}" self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs) self.server.cache.add_device_user_part(device_id, destination) con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id) self.server.dataHandleServer.create_record(call_info) # destination = "user/1001" # msg = ESL.ESLevent("sendmsg", uuid) # msg.addHeader("call-command", "execute") # msg.addHeader("execute-app-name", "bridge") # msg.addHeader("execute-app-arg", destination) # # 发送消息以执行 bridge 操作 # con.sendEvent(msg) # self.logger.info(f"Call {uuid} is bridged to {destination}") # con.execute("answer", "", uuid) # con.execute("transfer", "1001 XML default", uuid) # try: # con.disconnect() # except: # self.logger.info('come in ') # traceback.print_exc() else: self.server.logger.info("Failed to connect to FreeSWITCH") except: traceback.print_exc() def build_call_info(self, call_id, device_id, new_device_id, destination, **kwargs): caller = kwargs.get('Channel-Caller-ID-Number') called = destination now = datetime.utcnow().timestamp() call_info = CallInfo(call_id=call_id, agent_key=destination, caller=caller, called=called, direction=Direction.INBOUND.code, call_type=CallType.BOT_CALL.code, call_time=now, uuid1=call_id, uuid2=device_id, saas_id=saasId, core_uuid=None, cti_flow_id=None, conference=None, group_id=None, hidden_customer=0, caller_display=None, called_display=None, number_location=None, agent_name=None, login_type=None, ivr_id=None, task_id=None, media_host=None, sip_server=None, client_host=None, record=None, record2=None, record_time=None, answer_flag=None, wait_time=None, answer_count=0, hangup_dir=None, sdk_hangup=0, hangup_code=None, answer_time=None, end_time=None, talk_time=None, first_queue_time=None, queue_start_time=None, queue_end_time=None, overflow_count=0, cdr_notify_url=None, queue_level=None, transfer_agent=None,device_list=[], device_info_map = {}, follow_data = {}, process_data = {}, next_commands=[], call_details=[]) device_custom = DeviceInfo(device_id=device_id, call_time=now, call_id=call_id, device_type=DeviceType.CUSTOMER.code, agent_key=destination, cdr_type=CdrType.OUTBOUND.code, cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None) device_bot = DeviceInfo(device_id=new_device_id, call_time=now, call_id=call_id, device_type=DeviceType.ROBOT.code, agent_key=destination, cdr_type=CdrType.INBOUND.code, cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None) call_info.device_list.append(device_id) self.server.logger.info("liuwei::debugger:3333::call_id=%s, device_id=%s" % (call_id, device_id)) call_info.device_list.append(new_device_id) self.server.logger.info("liuwei::debugger:4444::call_id=%s, device_id=%s" % (call_id, new_device_id)) # call_info.next_commands.append(NextCommand(device_id, NextType.NEXT_CALL_BRIDGE.code, new_device_id)) call_info.device_info_map = {device_id: device_custom, new_device_id: device_bot} self.server.logger.info("lwoutBound, call_info=%s"%(call_info)) self.server.cache.add_call_info(call_info) class CustomTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass, agent, app,whitelist_loader,load_agent_monitor,dataHandleServer): self.agent = agent self.cache = Cache(app) self.logger = app.logger self.load_whitelist = whitelist_loader self.load_agent_monitor = load_agent_monitor self.dataHandleServer = dataHandleServer super().__init__(server_address, RequestHandlerClass) def start(self, HOST='0.0.0.0', PORT=8084, agent=None): # HOST, PORT = "0.0.0.0", 8084 # 创建一个 TCP 服务器 with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, self.app, self.load_whitelist, self.load_agent_monitor,self.dataHandleServer) as server: print(f"ESL server listening on {HOST}:{PORT}") server.serve_forever()