123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640 |
- #!/usr/bin/env python3
- # encoding:utf-8
- import json
- import random
- import socketserver
- import sys
- import ESL
- import time
- import mmh3
- import threading
- import traceback
- import concurrent.futures
- from apscheduler.schedulers.background import BackgroundScheduler
- from src.core.callcenter.cache import Cache
- from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand
- from src.core.callcenter.callback import Callback
- from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH, saasId
- from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, \
- SPACE, SPLIT, SOFIA, \
- ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, \
- UUID_BREAK, UUID_HOLD, \
- UUID_RECORD, UUID_SETVAR, UUID_GETVAR, UUID_KILL
- import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
- import src.core.callcenter.esl.handler as event_handler
- from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId
- from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \
- Direction, CdrType
- from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
- from src.core.callcenter.snowflake import Snowflake
- from src.core.datasource import SERVE_HOST
- from src.core.voip.constant import *
- from src.core.callcenter.data_handler import *
- class InboundClient:
- def __init__(self, agent, app):
- self.con = None
- self.thread_num = 12
- self.is_stopping = False
- self.app = app
- self.logger = app.logger
- self.bot_agent = agent
- self.cache = Cache(app)
- self.callback = Callback(app)
- self.dataHandleServer = DataHandleServer(app)
- self.handler_table = self.scan_esl_event_handlers()
- self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent)
- self.host, self.port, self.password = SERVE_HOST, '8021', '4918257983818884358'
- self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(self.thread_num)}
- self.delay_action_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
- self.delay_action_scheduler = BackgroundScheduler()
- self.delay_action_scheduler.add_job(self.submit_delay_action, 'interval', seconds=1, max_instances=1)
- self.delay_action_scheduler.start()
- threading.Thread(target=self.start, args=()).start()
- def submit_delay_action(self):
- for name, member in DelayActionEnum.__members__.items():
- action_messages = self.cache.get_delay_message(name)
- for action_message in action_messages:
- self.delay_action_executor.submit(self.do_delay_action, name, action_message)
- def scan_esl_event_handlers(self):
- import inspect
- import importlib
- import pkgutil
- classes = []
- # 遍历包中的模块
- for module_info in pkgutil.iter_modules(event_handler.__path__):
- module = importlib.import_module(f"{event_handler.__name__}.{module_info.name}")
- for _name, _cls in inspect.getmembers(module, inspect.isclass):
- if hasattr(_cls, '_esl_event_name'):
- classes.append(_cls)
- handlers = {}
- for _cls in classes:
- items = handlers.get(_cls._esl_event_name, [])
- items.append(_cls(self, self.bot_agent))
- handlers[_cls._esl_event_name] = items
- return handlers
- def start(self):
- self.logger.info('inbound.start')
- self.con = ESL.ESLconnection(self.host, self.port, self.password)
- if self.con.connected():
- self.logger.info('inbound esl connected ... ')
- self.con.events('plain', 'all') #CHANNEL_ORIGINATE,CHANNEL_PROGRESS,CHANNEL_PROGRESS_MEDIA,CHANNEL_ANSWER,CHANNEL_HANGUP,CUSTOM,PLAYBACK_START,PLAYBACK_STOP,DETECTED_TONE
- while not self.is_stopping:
- e = self.con.recvEvent()
- # if e:
- # self.logger.info(json.loads(e.serialize('json')))
- event_name = e.getHeader("Event-Name")
- if event_name == "SERVER_DISCONNECTED":
- self.logger.info('come in SERVER_DISCONNECTED case')
- self.con.disconnect()
- time.sleep(3)
- self.start()
- else:
- threading.Thread(target=self.process_esl_event, args=(e,)).start()
- # self.choose_thread_pool_executor(e).submit(self.process_esl_event, e)
- def choose_thread_pool_executor(self, e):
- call_id = EslEventUtil.getCallId(e)
- device_id = EslEventUtil.getUniqueId(e)
- wdh_device_id = EslEventUtil.getDeviceId(e)
- random_id = call_id if call_id else device_id
- if random_id:
- random_index = abs(mmh3.hash(random_id)) % len(self.executors)
- else:
- random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0
- # self.logger.info('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id)
- return self.executors.get(random_index)
- def process_esl_event(self, e):
- # self.logger.info(json.loads(e.serialize('json')))
- event_name = EslEventUtil.getEventName(e)
- coreUUID = EslEventUtil.getCoreUuid(e)
- address = self.host + ':' + self.port
- # self.logger.info("process_esl_event.event_name=%s,coreUUID=%s", event_name, coreUUID)
- try:
- self.callback.callback_event(e)
- if event_name in self.handler_table:
- items = self.handler_table.get(event_name)
- for x in items:
- try:
- # self.logger.info("process_esl_event.handle.%s", x.__class__.__name__)
- x.handle(address, e, coreUUID)
- except:
- traceback.print_exc()
- else:
- self.default_event_handler.handle(address, e, coreUUID)
- except:
- traceback.print_exc()
- def do_delay_action(self, action, message):
- delay_action = DelayAction.from_json(message)
- flag = self.cache.lock_delay_action(delay_action.uuid)
- self.logger.info("do_delay_action::action=%s, flag=%s", action, flag)
- if not flag:
- self.logger.info("异步延迟执行操作重复 action:%s msg:%s", action, message)
- return
- delay_action_enum = DelayActionEnum.get_by_code(action)
- if not delay_action_enum:
- self.logger.info("异步延迟执行 delayActionEnum为空 action:%s msg:%s", action, message)
- return
- if DelayActionEnum.CALL_TIMEOUT_HANGUP == delay_action_enum:
- self.exec_when_call_timeout(delay_action.call_id, delay_action.device_id)
- elif DelayActionEnum.PLAY_TIMEOUT_HANGUP == delay_action_enum:
- self.exec_when_play_timeout(delay_action.call_id)
- elif DelayActionEnum.ACD_TIMEOUT_PLAY == delay_action_enum:
- self.exec_when_acd_timeout(delay_action.call_id)
- def exec_when_call_timeout(self, call_id, device_id):
- call_info = self.cache.get_call_info(call_id)
- if not call_info or not (device_id in call_info.device_list):
- return
- device_info = call_info.device_info_map.get(device_id)
- if device_info and device_info.answer_time is None:
- self.logger.info("call:%s deviceId:%s execWhenCallTimeOut", call_id, device_id)
- device_info.hangup_cause = CallCause.CALL_TIMEOUT.name
- call_info.next_commands = []
- if device_info.device_type <= DeviceType.ROBOT.code:
- call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code
- call_info.hangup_code = CallCause.CALL_TIMEOUT.code
- # if device_info.device_type.code == DeviceType.CUSTOMER.code:
- # call_info.user_no_answer_end_call = True
- if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code:
- channel = self.show_channel(device_id)
- if channel:
- delay_action = DelayAction(call_id=call_id, device_id=device_id)
- self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20)
- self.cache.add_call_info(call_info)
- self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
- self.dataHandleServer.update_record(call_id, {"status": 0})
- def exec_when_play_timeout(self, call_id):
- call_info = self.cache.get_call_info(call_id)
- if not call_info or not call_info.next_commands:
- return
- self.logger.debug("播放结束音乐失败,进行挂机 callId:%s", call_id)
- next_types = [x.next_type for x in call_info.next_commands]
- if NextType.NEXT_HANGUP.code in next_types:
- for device_id in call_info.device_list:
- self.hangup_call(call_id, device_id, CallCause.PLAY_TIMEOUT)
- def exec_when_acd_timeout(self, call_id):
- call_info = self.cache.get_call_info(call_id)
- if not call_info:
- self.logger.info("exec_when_acd_timeout callInfo为空 callId: {}", call_id)
- return
- device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER]
- if device_list and len(device_list) == 1:
- device_id = device_list[0].device_id
- self.bridge_break(call_id, device_id)
- self.hold_play(device_id, HOLD_MUSIC_PATH)
- self.play_timeout(call_id, timeout=30)
- self.cache.add_call_info(call_info)
- self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
- device_id, HOLD_MUSIC_PATH)
- def make_call_new(self, context: MakeCallContext):
- # self.logger.info("拨打测试context:%s", context.__dict__)
- called = context.get_called()
- params = {'gateway': context.route_gateway_name, 'called': called, 'realm': context.get_realm()}
- builder = [
- '{', context.get_sip_header(), '}'
- ]
- if context.device_type == DeviceType.CUSTOMER.code:
- profile = self.expression(profile1, params)
- builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}")
- else:
- profile = self.expression(profile2, params)
- builder.append(f"{profile}{PARK}")
- # self.logger.info("拨打测试builder:%s", builder)
- cmd = "".join(builder)
- self.logger.info(cmd)
- self.con.bgapi(ORIGINATE, cmd)
- def call_timeout(self, call_id, device_id, timeout):
- """呼叫超时主动挂机"""
- delay_action = DelayAction(call_id=call_id, device_id=device_id)
- self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
- def send_args(self, device_id, name, arg, con=None):
- msg = ESL.ESLevent("sendmsg", device_id)
- msg.addHeader("call-command", EXECUTE)
- msg.addHeader("execute-app-name", name)
- msg.addHeader("execute-app-arg", arg)
- if con:
- con.sendEvent(msg)
- else:
- self.con.sendEvent(msg)
- def bridge_call(self, call_id, device_id1, device_id2):
- """桥接电话"""
- self.multi_set_var(device_id1, BRIDGE_VARIABLES)
- self.multi_set_var(device_id2, BRIDGE_VARIABLES)
- self.bgapi(BRIDGE, arg=device_id1 + SPACE + device_id2, desc="bridge_call")
- def transfer_call(self, _from, _to):
- """转接"""
- builder = [
- _from,
- " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline "
- ]
- command = ''.join(builder)
- self.bgapi(command, desc="transfer_call")
- def answer(self, device_id):
- """应答"""
- self.con.bgapi('uuid_phone_event', device_id + ' talk')
- def hangup_call(self, call_id, device_id, case_enum=CallCause.DEFAULT):
- """挂机"""
- msg = ESL.ESLevent("sendmsg", device_id)
- msg.addHeader("call-command", EXECUTE)
- msg.addHeader("execute-app-name", HANGUP)
- msg.addHeader("execute-app-arg", NORMAL_CLEARING)
- # msg.addHeader("async", "true")
- _con = None
- try:
- _con = ESL.ESLconnection(self.host, self.port, self.password)
- if _con.connected():
- arg = ''.join([EslEventUtil.SIP_H_P_LIBRA_HANGUP_CAUSE, "=", case_enum.description])
- self.send_args(device_id, SET, arg, con=_con)
- e = _con.sendMSG(msg)
- # e = _con.api(UUID_KILL, device_id)
- self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e))
- # self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e.serialize('json')))
- except:
- traceback.print_exc()
- finally:
- if _con:
- _con.disconnect()
- def kill_call(self, call_id, device_id, case_enum=CallCause.DEFAULT):
- if not device_id:
- return
- self.logger.info("kill_call:%s, device:%s, case_enum:%s", call_id, device_id, case_enum)
- self.api(UUID_KILL, device_id, desc="kill_call")
- def broadcast(self, uuid, path, smf):
- builder = [
- UUID_BROADCAST,
- uuid,
- path,
- smf
- ]
- command = ' '.join(builder)
- self.con.bgapi(command, EMPTY)
- def break0(self, uuid, all=False, sync=True):
- builder = [
- UUID_BREAK,
- uuid
- ]
- if all:
- builder.append("all")
- command = ' '.join(builder)
- if sync:
- self.con.api(command, EMPTY)
- else:
- self.con.bgapi(command, EMPTY)
- def hold(self, smf, uuid, display):
- builder = [
- UUID_HOLD,
- smf,
- uuid,
- display
- ]
- if display:
- builder.append("all")
- else:
- builder.append(EMPTY)
- command = ' '.join(builder)
- self.con.bgapi(command, EMPTY)
- def get_var(self, uuid, var):
- builder = [
- UUID_GETVAR,
- uuid,
- var
- ]
- command = ' '.join(builder)
- self.con.bgapi(command, EMPTY)
- def set_var(self, uuid, var, val):
- builder = [
- UUID_SETVAR,
- uuid,
- var,
- val
- ]
- command = ' '.join(builder)
- self.bgapi(command, desc ="set_var")
- def multi_set_var(self, uuid, params):
- builder = [
- "uuid_setvar_multi " + uuid + " "
- ]
- builder1 = []
- for k, v in params.items():
- builder1.append(k + "="+v)
- builder.append(';'.join(builder1))
- command = ''.join(builder)
- self.bgapi(command, desc="multi_set_var")
- def record(self, uuid, action, path, limit):
- builder = [
- UUID_RECORD,
- uuid,
- action,
- path,
- str(limit)
- ]
- command = ' '.join(builder)
- self.bgapi(command, desc="record")
- def transfer(self, uuid, smf, dest, dialplan, context):
- builder = [
- UUID_TRANSFER,
- uuid,
- smf,
- dest,
- dialplan,
- context
- ]
- command = ' '.join(builder)
- self.con.bgapi(command, EMPTY)
- def insert(self, device_id):
- """强插"""
- builder = [
- device_id,
- " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline "
- ]
- arg = ''.join(builder)
- self.con.api(TRANSFER, arg)
- def bridge_break(self, call_id, device_id):
- """拆线"""
- builder = [
- device_id,
- f" -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,set:{SIP_HEADER}{sipHeaderHoldMusic}={call_id},park:' inline "
- ]
- arg = ''.join(builder)
- self.api(TRANSFER, arg, "bridge_break")
- def play_file(self, call_id, device_id, file, sync):
- """放音"""
- if sync:
- return self.hold_play(device_id, file)
- else:
- msg = ESL.ESLevent("sendmsg", device_id)
- msg.addHeader("call-command", EXECUTE)
- msg.addHeader("execute-app-name", PLAYBACK)
- msg.addHeader("execute-app-arg", file)
- msg.addHeader("async", "true")
- self.con.sendEvent(msg)
- def stop_play(self, device_id):
- """关闭播放音乐"""
- builder = [
- device_id,
- " on"
- ]
- arg = ''.join(builder)
- self.con.api(PAUSE, arg)
- def hold_play(self, device_id, play):
- """向a-leg插播tts音乐(无限播放)"""
- builder = [
- device_id,
- " playback::",
- play,
- " ",
- SMF_ALEG
- ]
- arg = ''.join(builder)
- self.api(UUID_BROADCAST, arg, "hold_play")
- def api(self, command, arg=EMPTY, desc=""):
- _con = None
- try:
- _con = ESL.ESLconnection(self.host, self.port, self.password)
- if _con.connected():
- e = _con.api(command, arg)
- self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' ' + arg, e.serialize('json')))
- except:
- traceback.print_exc()
- finally:
- if _con:
- _con.disconnect()
- def bgapi(self, command, arg=EMPTY, desc=""):
- _con = None
- try:
- _con = ESL.ESLconnection(self.host, self.port, self.password)
- if _con.connected():
- e = _con.bgapi(command, arg)
- self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' '+arg, e.serialize('json')))
- except:
- traceback.print_exc()
- finally:
- if _con:
- _con.disconnect()
- def play_timeout(self, call_id, timeout):
- """播放超时主动挂机"""
- delay_action = DelayAction(call_id=call_id)
- self.cache.add_delay_message(DelayActionEnum.PLAY_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
- def listen(self, device_id1, device_id2, aleg=True, bleg=True):
- """监听"""
- if aleg:
- self.send_args(device_id1, SET, "eavesdrop_bridge_aleg=true")
- if bleg:
- self.send_args(device_id1, SET, "eavesdrop_bridge_bleg=true")
- self.send_args(device_id1, EAVESDROP, device_id2)
- def show_channel(self, device_id):
- msg = self.con.api("show", " channels like " + device_id + " as json")
- self.logger.info('show_channel::', msg)
- return msg
- def expression(self, template, params):
- for key, value in params.items():
- template = template.replace("#{["+key+"]}", str(value))
- return template
- def stop(self):
- for k, v in self.executors.items():
- v.shutdown()
- self.con.disconnect()
- self.is_stopping = True
- class OutboundClient:
- def __init__(self, agent, app):
- self.app = app
- self.logger = app.logger
- self.whitelist = []
- self.update_whitelist() # 初始化加载白名单
- # 定时更新白名单
- threading.Thread(target=self.refresh_whitelist, daemon=True).start()
- self.dataHandleServer = DataHandleServer(app)
- #threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent, logger)).start()
- server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent))
- server_thread.daemon = True # 设置守护线程
- server_thread.start()
- def update_whitelist(self):
- with self.app.app_context():
- phones = Whitelist.query.filter_by(del_flag=0).all()
- self.whitelist = [phone.phone for phone in phones]
- self.logger.info("Whitelist updated: %s", self.whitelist)
- def refresh_whitelist(self):
- while True:
- time.sleep(3600) # 每 1小时 更新一次
- self.update_whitelist()
- def load_whitelist(self):
- return self.whitelist
- def load_agent_monitor(self):
- with self.app.app_context():
- agents = AgentMonitor.query.filter_by(check_state=0,service_state=2).all()
- agent_nums = [agent.agent_num for agent in agents]
- return agent_nums
- class ESLRequestHandler(socketserver.BaseRequestHandler):
- def setup(self):
- try:
- self.server.logger.info('%s connected!', self.client_address)
- fd = self.request.fileno()
- con = ESL.ESLconnection(fd)
- self.server.logger.info('Connected: %s', con.connected())
- if con.connected():
- info = con.getInfo()
- # self.logger.info(json.loads(info.serialize('json')))
- event_name = info.getHeader("Event-Name")
- self.server.logger.info('Event-Name: %s', event_name)
- device_id = info.getHeader("unique-id")
- caller_number = info.getHeader("Caller-Caller-ID-Number") # 获取来电号码
- whitelist = self.server.load_whitelist()
- call_id = 'C' + str(Snowflake().next_id())
- new_device_id = 'D' + str(Snowflake().next_id())
- kwargs = json.loads(info.serialize('json'))
- kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id
- kwargs['variable_sip_h_P-LIBRA-DeviceId'] = new_device_id
- call_info = {
- "session_id": call_id,
- "time_begin": datetime.utcnow(),
- "category": 0,
- "phone": caller_number
- }
- # 检查白名单
- if caller_number in whitelist:
- agents = self.server.load_agent_monitor()
- destination = random.choice(agents) # 随机取一个坐席号
- # 直接转接到人工坐席
- self.server.logger.info( "Caller %s is in whitelist, agents: %s, destination: %s"% (caller_number, agents, destination))
- call_info['type']= 0
- call_info['agent_num'] = destination
- else:
- #转到ai机器人
- destination = self.server.agent.register(**kwargs)
- self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s"% (device_id, destination, new_device_id))
- call_info['type'] = 1
- call_info['service_category'] = 1
- call_info['user_id'] = destination
- call_info['user_name'] = f"机器人{destination}"
- self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs)
- self.server.cache.add_device_user_part(device_id, destination)
- con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id)
- self.server.dataHandleServer.create_record(call_info)
- # destination = "user/1001"
- # msg = ESL.ESLevent("sendmsg", uuid)
- # msg.addHeader("call-command", "execute")
- # msg.addHeader("execute-app-name", "bridge")
- # msg.addHeader("execute-app-arg", destination)
- # # 发送消息以执行 bridge 操作
- # con.sendEvent(msg)
- # self.logger.info(f"Call {uuid} is bridged to {destination}")
- # con.execute("answer", "", uuid)
- # con.execute("transfer", "1001 XML default", uuid)
- # try:
- # con.disconnect()
- # except:
- # self.logger.info('come in ')
- # traceback.print_exc()
- else:
- self.server.logger.info("Failed to connect to FreeSWITCH")
- except:
- traceback.print_exc()
- def build_call_info(self, call_id, device_id, new_device_id, destination, **kwargs):
- caller = kwargs.get('Channel-Caller-ID-Number')
- called = destination
- now = datetime.utcnow().timestamp()
- call_info = CallInfo(call_id=call_id, agent_key=destination,
- caller=caller, called=called, direction=Direction.INBOUND.code,
- call_type=CallType.BOT_CALL.code, call_time=now,
- uuid1=call_id, uuid2=device_id, saas_id=saasId,
- core_uuid=None, cti_flow_id=None, conference=None, group_id=None, hidden_customer=0, caller_display=None, called_display=None, number_location=None, agent_name=None, login_type=None, ivr_id=None, task_id=None, media_host=None, sip_server=None, client_host=None, record=None, record2=None, record_time=None, answer_flag=None, wait_time=None, answer_count=0, hangup_dir=None, sdk_hangup=0, hangup_code=None, answer_time=None, end_time=None, talk_time=None, first_queue_time=None, queue_start_time=None, queue_end_time=None, overflow_count=0, cdr_notify_url=None, queue_level=None, transfer_agent=None,device_list=[], device_info_map = {}, follow_data = {}, process_data = {}, next_commands=[], call_details=[])
- device_custom = DeviceInfo(device_id=device_id, call_time=now,
- call_id=call_id, device_type=DeviceType.CUSTOMER.code,
- agent_key=destination, cdr_type=CdrType.OUTBOUND.code,
- cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None)
- device_bot = DeviceInfo(device_id=new_device_id, call_time=now,
- call_id=call_id, device_type=DeviceType.ROBOT.code,
- agent_key=destination, cdr_type=CdrType.INBOUND.code,
- cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None)
- call_info.device_list.append(device_id)
- self.server.logger.info("liuwei::debugger:3333::call_id=%s, device_id=%s" % (call_id, device_id))
- call_info.device_list.append(new_device_id)
- self.server.logger.info("liuwei::debugger:4444::call_id=%s, device_id=%s" % (call_id, new_device_id))
- # call_info.next_commands.append(NextCommand(device_id, NextType.NEXT_CALL_BRIDGE.code, new_device_id))
- call_info.device_info_map = {device_id: device_custom, new_device_id: device_bot}
- self.server.logger.info("lwoutBound, call_info=%s"%(call_info))
- self.server.cache.add_call_info(call_info)
- class CustomTCPServer(socketserver.TCPServer):
- def __init__(self, server_address, RequestHandlerClass, agent, app,whitelist_loader,load_agent_monitor,dataHandleServer):
- self.agent = agent
- self.cache = Cache(app)
- self.logger = app.logger
- self.load_whitelist = whitelist_loader
- self.load_agent_monitor = load_agent_monitor
- self.dataHandleServer = dataHandleServer
- super().__init__(server_address, RequestHandlerClass)
- def start(self, HOST='0.0.0.0', PORT=8084, agent=None):
- # HOST, PORT = "0.0.0.0", 8084
- # 创建一个 TCP 服务器
- with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, self.app, self.load_whitelist, self.load_agent_monitor,self.dataHandleServer) as server:
- print(f"ESL server listening on {HOST}:{PORT}")
- server.serve_forever()
|