#!/usr/bin/env python3 # encoding:utf-8 import json import random import socketserver import sys import ESL import time import mmh3 import threading import traceback import concurrent.futures from apscheduler.schedulers.background import BackgroundScheduler import src.core.callcenter.cache as Cache from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH, saasId from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, SPACE, SPLIT, SOFIA, \ ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, UUID_BREAK, UUID_HOLD, \ UUID_RECORD, UUID_SETVAR, UUID_GETVAR import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil import src.core.callcenter.esl.handler as event_handler from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2 from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \ Direction, CdrType from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler from src.core.callcenter.snowflake import Snowflake from src.core.datasource import SERVE_HOST from src.core.voip.constant import * from src.core.callcenter.dao import * class InboundClient: def __init__(self, agent, logger): self.con = None self.thread_num = 12 self.is_stopping = False self.logger = logger self.bot_agent = agent self.handler_table = self.scan_esl_event_handlers() self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent, self.logger) self.host, self.port, self.password = SERVE_HOST, '8021', '4918257983818884358' self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(self.thread_num)} self.delay_action_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) self.delay_action_scheduler = BackgroundScheduler() self.delay_action_scheduler.add_job(self.submit_delay_action, 'interval', seconds=1, max_instances=1) self.delay_action_scheduler.start() threading.Thread(target=self.start, args=()).start() def submit_delay_action(self): for name, member in DelayActionEnum.__members__.items(): action_messages = Cache.get_delay_message(name) for action_message in action_messages: self.delay_action_executor.submit(self.do_delay_action, name, action_message) def scan_esl_event_handlers(self): import inspect import importlib import pkgutil classes = [] # 遍历包中的模块 for module_info in pkgutil.iter_modules(event_handler.__path__): module = importlib.import_module(f"{event_handler.__name__}.{module_info.name}") for _name, _cls in inspect.getmembers(module, inspect.isclass): if hasattr(_cls, '_esl_event_name'): classes.append(_cls) handlers = {} for _cls in classes: items = handlers.get(_cls._esl_event_name, []) items.append(_cls(self, self.bot_agent, self.logger)) handlers[_cls._esl_event_name] = items return handlers def start(self): self.logger.info('inbound.start') self.con = ESL.ESLconnection(self.host, self.port, self.password) if self.con.connected(): self.logger.info('inbound esl connected ... ') self.con.events('plain', 'all') #CHANNEL_ORIGINATE,CHANNEL_PROGRESS,CHANNEL_PROGRESS_MEDIA,CHANNEL_ANSWER,CHANNEL_HANGUP,CUSTOM,PLAYBACK_START,PLAYBACK_STOP,DETECTED_TONE while not self.is_stopping: e = self.con.recvEvent() # if e: # self.logger.info(json.loads(e.serialize('json'))) event_name = e.getHeader("Event-Name") if event_name == "SERVER_DISCONNECTED": self.logger.info('come in SERVER_DISCONNECTED case') self.con.disconnect() time.sleep(3) self.start() else: # threading.Thread(target=self.process_esl_event, args=(e,)).start() self.choose_thread_pool_executor(e).submit(self.process_esl_event, e) def choose_thread_pool_executor(self, e): call_id = EslEventUtil.getCallId(e) device_id = EslEventUtil.getUniqueId(e) wdh_device_id = EslEventUtil.getDeviceId(e) random_id = call_id if call_id else device_id if random_id: random_index = abs(mmh3.hash(random_id)) % len(self.executors) else: random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0 # print('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id) return self.executors.get(random_index) def process_esl_event(self, e): # print(json.loads(e.serialize('json'))) event_name = EslEventUtil.getEventName(e) coreUUID = EslEventUtil.getCoreUuid(e) address = self.host + ':' + self.port # self.logger.info("process_esl_event.event_name=%s,coreUUID=%s", event_name, coreUUID) try: if event_name in self.handler_table: items = self.handler_table.get(event_name) for x in items: try: x.handle(address, e, coreUUID) except: traceback.print_exc() else: self.default_event_handler.handle(address, e, coreUUID) except: traceback.print_exc() def do_delay_action(self, action, message): delay_action = DelayAction.from_json(message) flag = Cache.lock_delay_action(delay_action.uuid) if not flag: self.logger.info("异步延迟执行操作重复 action:%s msg:%s", action, message) return delay_action_enum = DelayActionEnum.get_by_code(action) if not delay_action_enum: self.logger.info("异步延迟执行 delayActionEnum为空 action:%s msg:%s", action, message) return if DelayActionEnum.CALL_TIMEOUT_HANGUP == delay_action_enum: self.exec_when_call_timeout(delay_action.call_id, delay_action.device_id) elif DelayActionEnum.PLAY_TIMEOUT_HANGUP == delay_action_enum: self.exec_when_play_timeout(delay_action.call_id) elif DelayActionEnum.ACD_TIMEOUT_PLAY == delay_action_enum: self.exec_when_acd_timeout(delay_action.call_id) def exec_when_call_timeout(self, call_id, device_id): call_info = Cache.get_call_info(call_id) if not call_info or not (device_id in call_info.device_list): return device_info = call_info.device_info_map.get(device_id) if device_info and device_info.answer_time is None: self.logger.info("call:%s deviceId:%s execWhenCallTimeOut", call_id, device_id) device_info.hangup_cause = CallCause.CALL_TIMEOUT.name call_info.next_commands = [] if device_info.device_type <= DeviceType.ROBOT.code: call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code call_info.hangup_code = CallCause.CALL_TIMEOUT.code # if device_info.device_type.code == DeviceType.CUSTOMER.code: # call_info.user_no_answer_end_call = True if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code: channel = self.show_channel(device_id) if channel: delay_action = DelayAction(call_id=call_id, device_id=device_id) Cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20) Cache.add_call_info(call_info) self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT) def exec_when_play_timeout(self, call_id): call_info = Cache.get_call_info(call_id) if not call_info or not call_info.next_commands: return self.logger.debug("播放结束音乐失败,进行挂机 callId:%s", call_id) next_types = [x.next_type for x in call_info.next_commands] if NextType.NEXT_HANGUP.code in next_types: for device_id in call_info.device_list: self.hangup_call(call_id, device_id, CallCause.PLAY_TIMEOUT) def exec_when_acd_timeout(self, call_id): call_info = Cache.get_call_info(call_id) if not call_info: self.logger.info("exec_when_acd_timeout callInfo为空 callId: {}", call_id) return device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER] if device_list and len(device_list) == 1: device_id = device_list[0].device_id self.bridge_break(device_id) self.hold_play(device_id, HOLD_MUSIC_PATH) self.play_timeout(call_id, timeout=30) Cache.add_call_info(call_info) self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id, device_id, HOLD_MUSIC_PATH) def make_call_new(self, context: MakeCallContext): called = context.get_called() params = {'gateway': context.route_gateway_name, 'called': called, 'realm': context.get_realm()} builder = [ '{', context.get_sip_header(), '}' ] if context.device_type == DeviceType.CUSTOMER.code: profile = self.expression(profile1, params) builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}") else: profile = self.expression(profile2, params) builder.append(f"{profile}{PARK}") cmd = "".join(builder) print(cmd) # self.logger.info(cmd) sys.stdout.flush() # 强制刷新输出缓冲区 self.con.bgapi(ORIGINATE, cmd) def make_call(self, route_gateway, display, called, call_id, device_id, timeout=30, originate_timeout=30, *sip_headers): # called = f"{called}{AT}{route_gateway.media_host}{CO}{route_gateway.media_port}" if route_gateway.caller_prefix: display = f"{route_gateway.caller_prefix}{display}" if route_gateway.called_prefix: called = f"{route_gateway.called_prefix}{called}" sip_buffer = [] if sip_headers: sip_buffer = [f"{SIP_HEADER}{header}" for header in sip_headers] params = { "callId": call_id, "deviceId": device_id, "caller": display, "called": called, } if route_gateway.sip_header1: sip_header1 = self.expression(route_gateway.sip_header1, params) sip_buffer.append(f"{SIP_HEADER}{sip_header1}") if route_gateway.sip_header2: sip_header2 = self.expression(route_gateway.sip_header2, params) sip_buffer.append(f"{SIP_HEADER}{sip_header2}") if route_gateway.sip_header3: sip_buffer.append(f"{SIP_HEADER}{route_gateway.sip_header3}") builder = [ "{return_ring_ready=true,", f"sip_contact_user={display},", "ring_asr=true,", "absolute_codec_string=^^:PCMU:PCMA,", # Assuming codecs is defined somewhere f"origination_caller_id_number={display},", f"origination_caller_id_name={display},", f"origination_uuid={device_id},", ] if originate_timeout: builder.append(f"originate_timeout={originate_timeout},") if sip_buffer: builder.append(f"{SPLIT}".join(sip_buffer)) builder.append("}") builder.append(f"{SOFIA}{SK}{route_gateway.profile}{SK}{called}{PARK}") cmd = "".join(builder) print(cmd) self.con.bgapi(ORIGINATE, cmd) def call_timeout(self, call_id, device_id, timeout): """呼叫超时主动挂机""" delay_action = DelayAction(call_id=call_id, device_id=device_id) Cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout) def send_args(self, device_id, name, arg): msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", name) msg.addHeader("execute-app-arg", arg) self.con.sendEvent(msg) def bridge_call(self, call_id, device_id1, device_id2): """桥接电话""" self.multi_set_var(device_id1, BRIDGE_VARIABLES) self.multi_set_var(device_id2, BRIDGE_VARIABLES) self.con.bgapi(BRIDGE, device_id1 + SPACE + device_id2) def transfer_call(self, _from, _to): """转接""" builder = [ _from, " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline " ] arg = ''.join(builder) self.con.bgapi(TRANSFER, arg) def answer(self, device_id): """应答""" self.con.bgapi('uuid_phone_event', device_id + ' talk') def hangup_call(self, call_id, device_id, case_enum=CallCause.DEFAULT): """挂机""" msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", HANGUP) msg.addHeader("execute-app-arg", NORMAL_CLEARING) print("hangup_call挂机 hangup call: %s, device: %s, ctiCauseEnum: %s"% (call_id, device_id, case_enum), flush=True) self.send_args(device_id, SET, EslEventUtil.SIP_H_P_LIBRA_HANGUP_CAUSE + "=" + case_enum.description) self.con.sendEvent(msg) def broadcast(self, uuid, path, smf): builder = [ UUID_BROADCAST, uuid, path, smf ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def break0(self, uuid, all=False, sync=True): builder = [ UUID_BREAK, uuid ] if all: builder.append("all") command = ' '.join(builder) if sync: self.con.api(command, EMPTY) else: self.con.bgapi(command, EMPTY) def hold(self, smf, uuid, display): builder = [ UUID_HOLD, smf, uuid, display ] if display: builder.append("all") else: builder.append(EMPTY) command = ' '.join(builder) self.con.bgapi(command, EMPTY) def get_var(self, uuid, var): builder = [ UUID_GETVAR, uuid, var ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def set_var(self, uuid, var, val): builder = [ UUID_SETVAR, uuid, var, val ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def multi_set_var(self, uuid, params): builder = [ "uuid_setvar_multi " + uuid + " " ] builder1 = [] for k, v in params.items(): builder1.append(k + "="+v) builder.append(';'.join(builder1)) command = ''.join(builder) self.con.bgapi(command, EMPTY) def record(self, uuid, action, path, limit): builder = [ UUID_RECORD, uuid, action, path, str(limit) ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def transfer(self, uuid, smf, dest, dialplan, context): builder = [ UUID_TRANSFER, uuid, smf, dest, dialplan, context ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def insert(self, device_id): """强插""" builder = [ device_id, " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline " ] arg = ''.join(builder) self.con.api(TRANSFER, arg) def bridge_break(self, device_id): """拆线""" builder = [ device_id, " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,set:" + SIP_HEADER + sipHeaderHoldMusic + "=true,park:' inline " ] arg = ''.join(builder) print('debugger::bridge_break, arg=%s'%arg, flush=True) self.con.api(TRANSFER, arg) def play_file(self, call_id, device_id, file, sync): """放音""" if sync: return self.hold_play(device_id, file) else: msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", PLAYBACK) msg.addHeader("execute-app-arg", file) msg.addHeader("async", "true") self.con.sendEvent(msg) def stop_play(self, device_id): """关闭播放音乐""" builder = [ device_id, " on" ] arg = ''.join(builder) return self.con.api(PAUSE, arg) def hold_play(self, device_id, play): """向a-leg插播tts音乐(无限播放)""" builder = [ device_id, " playback::", play, " ", SMF_ALEG ] print('debugger::hold_play, device_id=%s, play=%s' % (device_id, play), flush=True) arg = ''.join(builder) print('debugger::hold_play, arg=%s' % arg, flush=True) return self.con.api(UUID_BROADCAST, arg) def play_timeout(self, call_id, timeout): """播放超时主动挂机""" delay_action = DelayAction(call_id=call_id) Cache.add_delay_message(DelayActionEnum.PLAY_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout) def listen(self, device_id1, device_id2, aleg=True, bleg=True): """监听""" if aleg: self.send_args(device_id1, SET, "eavesdrop_bridge_aleg=true") if bleg: self.send_args(device_id1, SET, "eavesdrop_bridge_bleg=true") self.send_args(device_id1, EAVESDROP, device_id2) def show_channel(self, device_id): msg = self.con.api("show", " channels like " + device_id + " as json") print('show_channel::', msg) return msg def expression(self, template, params): for key, value in params.items(): template = template.replace("#{["+key+"]}", str(value)) return template def stop(self): for k, v in self.executors.items(): v.shutdown() self.con.disconnect() self.is_stopping = True class OutboundClient: def __init__(self, agent, logger,app): self.logger = logger self.app = app self.whitelist = [] self.update_whitelist() # 初始化加载白名单 # 定时更新白名单 threading.Thread(target=self.refresh_whitelist, daemon=True).start() #threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent, logger)).start() server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent, logger)) server_thread.daemon = True # 设置守护线程 server_thread.start() def update_whitelist(self): with self.app.app_context(): phones = Whitelist.query.filter_by(del_flag=0).all() self.whitelist = [phone.phone for phone in phones] self.logger.info("Whitelist updated: %s", self.whitelist) def refresh_whitelist(self): while True: time.sleep(3600) # 每 1小时 更新一次 self.update_whitelist() def load_whitelist(self): return self.whitelist def load_agent_monitor(self): with self.app.app_context(): agents = AgentMonitor.query.filter_by(check_state=0,service_state=2).all() agent_nums = [agent.agent_num for agent in agents] return agent_nums class ESLRequestHandler(socketserver.BaseRequestHandler): def setup(self): try: self.server.logger.info('%s connected!', self.client_address) fd = self.request.fileno() con = ESL.ESLconnection(fd) self.server.logger.info('Connected: %s', con.connected()) if con.connected(): info = con.getInfo() # print(json.loads(info.serialize('json'))) event_name = info.getHeader("Event-Name") device_id = info.getHeader("unique-id") caller_number = info.getHeader("Caller-Caller-ID-Number") # 获取来电号码 whitelist = self.server.load_whitelist() # 检查白名单 if caller_number in whitelist: # 直接转接到人工坐席 agents = self.server.load_agent_monitor() # 随机取一个坐席号 destination = random.choice(agents) # 直接转接到人工坐席 self.server.logger.info("Caller %s is in whitelist, directly transferring call, agents: %s, destination: %s", caller_number, agents,destination) return call_id = 'C' + str(Snowflake().next_id()) new_device_id = 'D' + str(Snowflake().next_id()) kwargs = json.loads(info.serialize('json')) kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id destination = self.server.agent.register(**kwargs) self.server.logger.info("debugger::device_id=%s, destination=%s, new_device_id=%s", device_id, destination, new_device_id) self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs) Cache.add_device_user_part(device_id, destination) con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id) # destination = "user/1001" # msg = ESL.ESLevent("sendmsg", uuid) # msg.addHeader("call-command", "execute") # msg.addHeader("execute-app-name", "bridge") # msg.addHeader("execute-app-arg", destination) # # 发送消息以执行 bridge 操作 # con.sendEvent(msg) # print(f"Call {uuid} is bridged to {destination}") # con.execute("answer", "", uuid) # con.execute("transfer", "1001 XML default", uuid) # try: # con.disconnect() # except: # print('come in ') # traceback.print_exc() else: self.server.logger.info("Failed to connect to FreeSWITCH") except: traceback.print_exc() def build_call_info(self, call_id, device_id, new_device_id, destination, **kwargs): caller = kwargs.get('Channel-Caller-ID-Number') called = destination now = datetime.utcnow().timestamp() call_info = CallInfo(call_id=call_id, agent_key=destination, caller=caller, called=called, direction=Direction.INBOUND.code, call_type=CallType.BOOT_CALL.code, call_time=now, uuid1=call_id, uuid2=device_id, saas_id=saasId) device_custom = DeviceInfo(device_id=device_id, call_time=now, call_id=call_id, device_type=DeviceType.CUSTOMER.code, agent_key=destination, cdr_type=CdrType.INBOUND.code) device_bot = DeviceInfo(device_id=new_device_id, call_time=now, call_id=call_id, device_type=DeviceType.ROBOT.code, agent_key=destination, cdr_type=CdrType.INBOUND.code) call_info.device_list.append(device_id) call_info.device_list.append(new_device_id) # call_info.next_commands.append(NextCommand(device_id, NextType.NEXT_CALL_BRIDGE.code, new_device_id)) call_info.device_info_map = {device_id: device_custom, new_device_id: device_bot} Cache.add_call_info(call_info) class CustomTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass, agent, logger,whitelist_loader,load_agent_monitor): self.agent = agent self.logger = logger self.load_whitelist = whitelist_loader self.load_agent_monitor = load_agent_monitor super().__init__(server_address, RequestHandlerClass) def start(self, HOST='0.0.0.0', PORT=8084, agent=None, logger=None): # HOST, PORT = "0.0.0.0", 8084 # 创建一个 TCP 服务器 with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, logger, self.load_whitelist, self.load_agent_monitor) as server: self.logger.info(f"ESL server listening on {HOST}:{PORT}") server.serve_forever()