#!/usr/bin/env python3 # encoding:utf-8 import random import socketserver import ESL import time import uuid import threading import traceback import concurrent.futures from apscheduler.schedulers.background import BackgroundScheduler from src.core.callcenter import BizException from src.core.callcenter.cache import Cache from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand from src.core.callcenter.callback import Callback from src.core.callcenter.constant import SK, EMPTY, WaitingHangupMusicPath, SAAS_ID, HOLD_MUSIC_PATH from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, \ SPACE, SOFIA, \ ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, \ UUID_BREAK, UUID_HOLD, \ UUID_RECORD, UUID_SETVAR, UUID_GETVAR, UUID_KILL, ANSWER import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil import src.core.callcenter.esl.handler as event_handler from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \ Direction, CdrType, BizErrorCode, WhiteTypeEnum from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler from src.core.callcenter.snowflake import Snowflake from src.core.datasource import SERVE_HOST from src.core.voip.constant import * from src.core.callcenter.data_handler import * from src.core.callcenter import registry class InboundClient: def __init__(self, agent, app): self.con = None self.thread_num = 12 self.is_stopping = False self.app = app self.logger = app.logger self.bot_agent = agent self.cache = Cache(app) self.callback = Callback(app, self.thread_num) self.dataHandleServer = DataHandleServer(app) self.handler_table = self.scan_esl_event_handlers() self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent) self.host, self.port, self.password = SERVE_HOST, '8021', '4918257983818884358' self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="client-event-pool") for x in range(self.thread_num)} self.delay_action_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) self.delay_action_scheduler = BackgroundScheduler() self.delay_action_scheduler.add_job(self.submit_delay_action, 'interval', seconds=1, max_instances=1, name='delay_action_daemon') self.delay_action_scheduler.start() threading.Thread(target=self.start, args=()).start() def submit_delay_action(self): for name, member in DelayActionEnum.__members__.items(): action_messages = self.cache.get_delay_message(name) for action_message in action_messages: self.delay_action_executor.submit(self.do_delay_action, name, action_message) def scan_esl_event_handlers(self): import inspect import importlib import pkgutil classes = [] # 遍历包中的模块 for module_info in pkgutil.iter_modules(event_handler.__path__): module = importlib.import_module(f"{event_handler.__name__}.{module_info.name}") for _name, _cls in inspect.getmembers(module, inspect.isclass): if hasattr(_cls, '_esl_event_name'): classes.append(_cls) handlers = {} for _cls in classes: items = handlers.get(_cls._esl_event_name, []) items.append(_cls(self, self.bot_agent)) handlers[_cls._esl_event_name] = items return handlers def start(self): self.logger.info('inbound.start') self.con = ESL.ESLconnection(self.host, self.port, self.password) if self.con.connected(): self.logger.info('inbound esl connected ... ') self.con.events('plain', 'all') #CHANNEL_ORIGINATE,CHANNEL_PROGRESS,CHANNEL_PROGRESS_MEDIA,CHANNEL_ANSWER,CHANNEL_HANGUP,CUSTOM,PLAYBACK_START,PLAYBACK_STOP,DETECTED_TONE while not self.is_stopping: e = self.con.recvEvent() # if e: # self.logger.info(json.loads(e.serialize('json'))) event_name = e.getHeader("Event-Name") if event_name == "SERVER_DISCONNECTED": self.logger.info('come in SERVER_DISCONNECTED case') self.con.disconnect() time.sleep(3) self.start() else: registry.FLASK_ACTIVE_THREADS.set(threading.active_count()) # threading.Thread(target=self.process_esl_event, args=(e,)).start() self.choose_thread_pool_executor(e).submit(self.process_esl_event, e) def choose_thread_pool_executor(self, e): call_id = EslEventUtil.getCallId(e) device_id = EslEventUtil.getUniqueId(e) wdh_device_id = EslEventUtil.getDeviceId(e) random_id = call_id if call_id else device_id if random_id: random_index = abs(mmh3.hash(random_id)) % len(self.executors) else: random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0 # self.logger.info('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id) return self.executors.get(random_index) def process_esl_event(self, e): # self.logger.info(json.loads(e.serialize('json'))) start_time = time.time() event_name = EslEventUtil.getEventName(e) coreUUID = EslEventUtil.getCoreUuid(e) address = self.host + ':' + self.port # self.logger.info("process_esl_event.event_name=%s,coreUUID=%s", event_name, coreUUID) try: self.callback.callback_event(e) if event_name in self.handler_table: items = self.handler_table.get(event_name) for x in items: try: # self.logger.info("process_esl_event.handle.%s", x.__class__.__name__) x.handle(address, e, coreUUID) except: traceback.print_exc() else: self.default_event_handler.handle(address, e, coreUUID) except: traceback.print_exc() finally: latency = (time.time() - start_time) registry.ESL_EVENT_LATENCY.labels(event_name).observe(latency) def do_delay_action(self, action, message): delay_action = DelayAction.from_json(message) flag = self.cache.lock_delay_action(delay_action.uuid) self.logger.info("do_delay_action::action=%s, flag=%s", action, flag) if not flag: self.logger.info("异步延迟执行操作重复 action:%s msg:%s", action, message) return delay_action_enum = DelayActionEnum.get_by_code(action) if not delay_action_enum: self.logger.info("异步延迟执行 delayActionEnum为空 action:%s msg:%s", action, message) return if DelayActionEnum.CALL_TIMEOUT_HANGUP == delay_action_enum: self.exec_when_call_timeout(delay_action.call_id, delay_action.device_id) elif DelayActionEnum.PLAY_TIMEOUT_HANGUP == delay_action_enum: self.exec_when_play_timeout(delay_action.call_id) elif DelayActionEnum.ACD_TIMEOUT_PLAY == delay_action_enum: self.exec_when_acd_timeout(delay_action.call_id) def exec_when_call_timeout(self, call_id, device_id): call_info = self.cache.get_call_info(call_id) if not call_info or not (device_id in call_info.device_list): return device_info = call_info.device_info_map.get(device_id) if device_info and device_info.answer_time is None: self.logger.info("call:%s deviceId:%s execWhenCallTimeOut", call_id, device_id) device_info.hangup_cause = CallCause.CALL_TIMEOUT.name call_info.next_commands = [] if device_info.device_type <= DeviceType.ROBOT.code: call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code call_info.hangup_code = CallCause.CALL_TIMEOUT.code if device_info.device_type.code == DeviceType.CUSTOMER.code: call_info.user_no_answer_end_call = True # if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code: # channel = self.show_channel(device_id) # if channel: # delay_action = DelayAction(call_id=call_id, device_id=device_id) # self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20) self.cache.add_call_info(call_info) self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT) # self.dataHandleServer.update_record(call_id, status= 0) def exec_when_play_timeout(self, call_id): call_info = self.cache.get_call_info(call_id) if not call_info or not call_info.next_commands: return self.logger.info("播放结束音乐失败,进行挂机 callId:%s", call_id) next_types = [x.next_type for x in call_info.next_commands] if NextType.NEXT_HANGUP.code in next_types: for device_id in call_info.device_list: self.hangup_call(call_id, device_id, CallCause.PLAY_TIMEOUT) def exec_when_acd_timeout(self, call_id): call_info = self.cache.get_call_info(call_id) if not call_info: self.logger.info("exec_when_acd_timeout callInfo为空 callId: %s", call_id) return device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER] if device_list and len(device_list) == 1: device_id = device_list[0].device_id self.break0(device_id) if not WaitingHangupMusicPath: self.hangup_call(call_id, device_id, CallCause.WAITING_TIMEOUT) return self.hold_play(device_id, WaitingHangupMusicPath) self.play_timeout(call_id, timeout=30) next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code) call_info.next_commands = [next_command] self.cache.add_call_info(call_info) # self.dataHandleServer.update_record(call_id, status= 0) self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id, device_id, WaitingHangupMusicPath) def make_call(self, context: MakeCallContext): # self.logger.info("拨打测试context:%s", context.__dict__) called = context.get_called() params = {'gateway': context.route_gateway_name, 'called': called, 'realm': context.get_realm()} builder = [ '{', context.get_sip_header(), '}' ] self.call_timeout(context.call_id, context.device_id, context.timeout) if context.device_type == DeviceType.CUSTOMER.code: profile = self.expression(profile1, params) builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}") else: profile = self.expression(profile2, params) builder.append(f"{profile}{PARK}") # self.logger.info("拨打测试builder:%s", builder) cmd = "".join(builder) # self.logger.info("makeCall::%s", cmd) self.bgapi(ORIGINATE, arg=cmd, desc="make_call") # self.con.bgapi(ORIGINATE, cmd) def call_timeout(self, call_id, device_id, timeout): """呼叫超时主动挂机""" delay_action = DelayAction(call_id=call_id, device_id=device_id) self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout) def send_args(self, device_id, name, arg, con=None): msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", name) msg.addHeader("execute-app-arg", arg) if con: con.sendEvent(msg) else: self.con.sendEvent(msg) def bridge_call(self, call_id, device_id1, device_id2): """桥接电话""" self.multi_set_var(device_id1, BRIDGE_VARIABLES) self.multi_set_var(device_id2, BRIDGE_VARIABLES) self.bgapi(BRIDGE, arg=device_id1 + SPACE + device_id2, desc="bridge_call") def transfer_call(self, _from, _to): """转接""" builder = [ _from, " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline " ] command = ''.join(builder) self.bgapi(command, desc="transfer_call") def answer(self, device_id): """应答""" self.con.bgapi('uuid_phone_event', device_id + ' talk') def hangup_call(self, call_id, device_id, case_enum=CallCause.DEFAULT): """挂机""" msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", HANGUP) msg.addHeader("execute-app-arg", NORMAL_CLEARING) # msg.addHeader("async", "true") _con = None try: _con = ESL.ESLconnection(self.host, self.port, self.password) if _con.connected(): arg = ''.join([EslEventUtil.SIP_H_P_LIBRA_HANGUP_CAUSE, "=", case_enum.description]) self.send_args(device_id, SET, arg, con=_con) e = _con.sendMSG(msg) # e = _con.api(UUID_KILL, device_id) self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e)) # self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e.serialize('json'))) except: traceback.print_exc() finally: if _con: _con.disconnect() def kill_call(self, call_id, device_id, case_enum=CallCause.DEFAULT): if not device_id: return self.logger.info("kill_call:%s, device:%s, case_enum:%s", call_id, device_id, case_enum) self.api(UUID_KILL, device_id, desc="kill_call") def broadcast(self, uuid, path, smf): builder = [ UUID_BROADCAST, uuid, path, smf ] command = ' '.join(builder) self.bgapi(command, desc="broadcast") # self.con.bgapi(command, EMPTY) def break0(self, uuid, all=False, sync=True): builder = [ UUID_BREAK, uuid ] if all: builder.append("all") command = ' '.join(builder) if sync: self.api(command, desc="break0") else: self.bgapi(command, desc="break0") def hold(self, smf, uuid, display): builder = [ UUID_HOLD, smf, uuid, display ] if display: builder.append("all") else: builder.append(EMPTY) command = ' '.join(builder) self.con.bgapi(command, EMPTY) def get_var(self, uuid, var): builder = [ UUID_GETVAR, uuid, var ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def set_var(self, uuid, var, val): builder = [ UUID_SETVAR, uuid, var, val ] command = ' '.join(builder) self.bgapi(command, desc ="set_var") def multi_set_var(self, uuid, params): builder = [ "uuid_setvar_multi " + uuid + " " ] builder1 = [] for k, v in params.items(): builder1.append(k + "="+v) builder.append(';'.join(builder1)) command = ''.join(builder) self.bgapi(command, desc="multi_set_var") def record(self, uuid, action, path, limit): builder = [ UUID_RECORD, uuid, action, path, str(limit) ] command = ' '.join(builder) self.bgapi(command, desc="record") def transfer(self, uuid, smf, dest, dialplan, context): builder = [ UUID_TRANSFER, uuid, smf, dest, dialplan, context ] command = ' '.join(builder) self.con.bgapi(command, EMPTY) def insert(self, device_id): """强插""" builder = [ device_id, " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline " ] arg = ''.join(builder) self.con.api(TRANSFER, arg) def bridge_break(self, call_id, device_id): """拆线""" builder = [ device_id, f" -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,set:{SIP_HEADER}{sipHeaderHoldMusic}={call_id},park:' inline " ] arg = ''.join(builder) self.api(TRANSFER, arg, "bridge_break") def play_file(self, call_id, device_id, file, sync): """放音""" if sync: return self.hold_play(device_id, file) else: msg = ESL.ESLevent("sendmsg", device_id) msg.addHeader("call-command", EXECUTE) msg.addHeader("execute-app-name", PLAYBACK) msg.addHeader("execute-app-arg", file) msg.addHeader("async", "true") self.con.sendEvent(msg) def stop_play(self, device_id): """关闭播放音乐""" builder = [ device_id, " on" ] arg = ''.join(builder) self.con.api(PAUSE, arg) def hold_play(self, device_id, play): """向a-leg插播tts音乐(无限播放)""" builder = [ device_id, " playback::", play, " ", SMF_ALEG ] arg = ''.join(builder) self.api(UUID_BROADCAST, arg, "hold_play") def api(self, command, arg=EMPTY, desc=""): _con = None try: _con = ESL.ESLconnection(self.host, self.port, self.password) if _con.connected(): e = _con.api(command, arg) self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' ' + arg, e.serialize('json'))) except: traceback.print_exc() finally: if _con: _con.disconnect() def bgapi(self, command, arg=EMPTY, desc=""): _con = None try: _con = ESL.ESLconnection(self.host, self.port, self.password) if _con.connected(): e = _con.bgapi(command, arg) self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' '+arg, e.serialize('json'))) except: traceback.print_exc() finally: if _con: _con.disconnect() def play_timeout(self, call_id, timeout): """播放超时主动挂机""" delay_action = DelayAction(call_id=call_id) self.cache.add_delay_message(DelayActionEnum.PLAY_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout) def listen(self, device_id1, device_id2, aleg=True, bleg=True): """监听""" if aleg: self.send_args(device_id1, SET, "eavesdrop_bridge_aleg=true") if bleg: self.send_args(device_id1, SET, "eavesdrop_bridge_bleg=true") self.send_args(device_id1, EAVESDROP, device_id2) def show_channel(self, device_id): msg = self.con.api("show", " channels like " + device_id + " as json") self.logger.info('show_channel::', msg) return msg def expression(self, template, params): for key, value in params.items(): template = template.replace("#{["+key+"]}", str(value)) return template def stop(self): for k, v in self.executors.items(): v.shutdown() self.con.disconnect() self.is_stopping = True self.callback.stop() class OutboundClient: def __init__(self, agent, app): self.app = app self.logger = app.logger server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent)) server_thread.daemon = True # 设置守护线程 server_thread.start() class ESLRequestHandler(socketserver.BaseRequestHandler): def setup(self): try: self.server.logger.info('%s connected!', self.client_address) fd = self.request.fileno() con = ESL.ESLconnection(fd) self.server.logger.info('Connected: %s', con.connected()) if con.connected(): info = con.getInfo() self.server.logger.info(json.loads(info.serialize('json'))) event_name = info.getHeader("Event-Name") self.server.logger.info('Event-Name: %s', event_name) device_id = info.getHeader("unique-id") caller_number = info.getHeader("Caller-Caller-ID-Number") # 获取来电号码 call_id = 'C' + str(Snowflake().next_id()) new_device_id = 'D' + str(Snowflake().next_id()) kwargs = json.loads(info.serialize('json')) kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id kwargs['variable_sip_h_P-LIBRA-DeviceId'] = device_id bucket_call_type = 2 destination = None service_category = 0 bucket = self.server.get_bucket(call_id) whitelist = self.server.get_whitelist() in_whitelist_type = self.in_whitelist(caller_number, whitelist) self.server.logger.info('call incoming call_id=%s, caller_number=%s, device_id=%s, new_device_id=%s, in_whitelist=%s, bucket=%s', call_id, caller_number, device_id, new_device_id, in_whitelist_type, bucket.name) # 检查白名单 if in_whitelist_type: if WhiteTypeEnum.AI == in_whitelist_type: bucket_call_type = 0 service_category = 1 destination = self.bridge_ai(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs) else: bucket_call_type = 0 self.transfer_custom(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs) else: # 自然分流 if bucket and bucket.name == 'AI': bucket_call_type = 1 service_category = 1 destination = self.bridge_ai(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs) else: self.transfer_custom(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs) registry.CALL_INCOMING_REQUESTS.labels(f"{bucket_call_type}").inc() self.server.dataHandleServer.create_record(call_id, caller_number, bucket_call_type, service_category=service_category, user_id=destination if bucket_call_type == 1 else None , user_name= f"机器人{destination}" if bucket_call_type ==1 else None) try: con.disconnect() self.server.logger.info("connection disconnected !!!") except: traceback.print_exc() else: self.server.logger.info("Failed to connect to FreeSWITCH") except: traceback.print_exc() finally: try: if self.request.fileno() != -1: self.request.close() except OSError: # Ignore the error if socket is already closed pass def transfer_custom(self, con, bucket_call_type, call_id, device_id, new_device_id, **kwargs): self.answer(con, call_id, device_id) self.build_call_info(CallType.INCOMING_AGENT_CALL.code, call_id, device_id, new_device_id, destination=None, bucket_type=bucket_call_type, **kwargs) self.server.agent.acd_service.transfer_to_agent(call_id, device_id) def bridge_ai(self, con, bucket_call_type, call_id, device_id, new_device_id, **kwargs): destination = self.server.agent.register(**kwargs) self.server.logger.info("call_id=%s, device_id=%s, destination=%s, new_device_id=%s" % (call_id, device_id, destination, new_device_id)) self.build_call_info(CallType.INCOMING_BOT_CALL.code, call_id, device_id, new_device_id, str(destination), bucket_type=bucket_call_type, **kwargs) self.server.cache.add_device_user_part(device_id, destination) con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s" % (call_id, new_device_id, new_device_id, destination), device_id) return destination def in_whitelist(self, caller_number, whitelist): for x in whitelist: phone, _type = x if caller_number in phone or phone in caller_number: return WhiteTypeEnum.get_by_code(_type) return None def answer(self, con, call_id, device_id, timeouts=30): con.execute("answer", "", device_id) # con.execute("bgapi", f"uuid_setvar {device_id} {sipHeaderCallId} {call_id}", device_id) con.execute("playback", HOLD_MUSIC_PATH, device_id) delay_action = DelayAction(call_id=call_id) self.server.cache.add_delay_message(DelayActionEnum.ACD_TIMEOUT_PLAY.name, delay_action, timeouts) def build_call_info(self, call_type, call_id, device_id, new_device_id, destination, bucket_type, **kwargs): caller = kwargs.get('Channel-Caller-ID-Number') called = destination now = datetime.now().timestamp() call_info = CallInfo(call_id=call_id, agent_key=destination, caller=caller, called=called, direction=Direction.INBOUND.code, call_type=call_type, call_time=now, uuid1=call_id, uuid2=device_id, saas_id=SAAS_ID, bucket_type=bucket_type, core_uuid=None, cti_flow_id=None, conference=None, group_id=None, hidden_customer=0, caller_display=None, called_display=None, number_location=None, agent_name=None, login_type=None, ivr_id=None, task_id=None, media_host=None, sip_server=None, client_host=None, record=None, record2=None, record_time=None, answer_flag=None, wait_time=None, answer_count=0, hangup_dir=None, sdk_hangup=0, hangup_code=None, answer_time=None, end_time=None, talk_time=None, first_queue_time=None, queue_start_time=None, queue_end_time=None, overflow_count=0, cdr_notify_url=None, queue_level=None, transfer_agent=None,device_list=[], device_info_map = {}, follow_data = {}, process_data = {}, next_commands=[], call_details=[]) device_custom = DeviceInfo(device_id=device_id, call_time=now, call_id=call_id, device_type=DeviceType.CUSTOMER.code, agent_key=destination, cdr_type=CdrType.OUTBOUND.code, cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None) call_info.device_list.append(device_id) # call_info.next_commands.append(NextCommand(device_id, NextType.NEXT_CALL_BRIDGE.code, new_device_id)) call_info.device_info_map = {device_id: device_custom} if CallType.INCOMING_BOT_CALL.code == call_type: device_bot = DeviceInfo(device_id=new_device_id, call_time=now, call_id=call_id, device_type=DeviceType.ROBOT.code, agent_key=destination, cdr_type=CdrType.INBOUND.code, cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None) call_info.device_list.append(new_device_id) call_info.device_info_map[new_device_id] = device_bot self.server.cache.add_call_info(call_info) return call_info class CustomTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass, agent, app): super().__init__(server_address, RequestHandlerClass) self.agent = agent self.app = app self.cache = Cache(app) self.logger = app.logger self.whitelist = [] self.buckets = [] # self.update_whitelist() # 初始化加载白名单 # self.update_bucket() #初始化分流 self.dataHandleServer = DataHandleServer(app) self.cache_job_scheduler = BackgroundScheduler() self.cache_job_scheduler.add_job(self.update_cache_job, run_date=datetime.now()) self.cache_job_scheduler.add_job(self.update_cache_job, 'interval', seconds=60, max_instances=1, name='cache_job_daemon') self.cache_job_scheduler.add_job(self.update_whitelist, run_date=datetime.now()) self.cache_job_scheduler.add_job(self.update_whitelist, 'interval', seconds=600, max_instances=1,name='cache_job_whiteList') self.cache_job_scheduler.start() def update_cache_job(self): # self.logger.info("begin flush cache job") # self.update_whitelist() self.update_bucket() # self.logger.info("end flush cache job") def update_whitelist(self): with self.app.app_context(): phones = Whitelist.query.filter_by(del_flag=0).all() self.whitelist = [(phone.phone, phone.type) for phone in phones] self.logger.info("Whitelist updated: %s", self.whitelist) def get_whitelist(self): return self.whitelist def load_agent_monitor(self): with self.app.app_context(): agents = AgentMonitor.query.filter_by(check_state=0, service_state=2).all() agent_nums = [agent.agent_num for agent in agents] return agent_nums def get_bucket(self, custom_uuid=None): custom_uuid == custom_uuid if custom_uuid else str(uuid.uuid4()) random_id = abs(mmh3.hash(custom_uuid)) if len(self.buckets) <= 0: raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR) for bucket in self.buckets: num = (random_id % 100 + 100) %100 if bucket.lower <= num < bucket.upper: return bucket return self.buckets[0] def update_bucket(self): with self.app.app_context(): buckets = Bucket.query.filter_by(eid='001').all() self.buckets = buckets self.logger.info("bucket updated: %s", self.buckets) def start(self, HOST='0.0.0.0', PORT=8084, agent=None): # HOST, PORT = "0.0.0.0", 8084 # 创建一个 TCP 服务器 with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, self.app) as server: print(f"ESL server listening on {HOST}:{PORT}") server.serve_forever()