client.py 32 KB


  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import json
  4. import random
  5. import socketserver
  6. import sys
  7. import ESL
  8. import time
  9. import mmh3
  10. import uuid
  11. import threading
  12. import traceback
  13. import concurrent.futures
  14. from apscheduler.schedulers.background import BackgroundScheduler
  15. from src.core.callcenter import BizException
  16. from src.core.callcenter.cache import Cache
  17. from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand
  18. from src.core.callcenter.callback import Callback
  19. from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH, WaitingHangupMusicPath, saasId
  20. from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, \
  21. SPACE, SPLIT, SOFIA, \
  22. ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, \
  23. UUID_BREAK, UUID_HOLD, \
  24. UUID_RECORD, UUID_SETVAR, UUID_GETVAR, UUID_KILL
  25. import src.core.registry as Registry
  26. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  27. import src.core.callcenter.esl.handler as event_handler
  28. from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId
  29. from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \
  30. Direction, CdrType, BizErrorCode
  31. from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
  32. from src.core.callcenter.snowflake import Snowflake
  33. from src.core.datasource import SERVE_HOST
  34. from src.core.voip.constant import *
  35. from src.core.callcenter.data_handler import *
  36. class InboundClient:
  37. def __init__(self, agent, app):
  38. self.con = None
  39. self.thread_num = 12
  40. self.is_stopping = False
  41. self.app = app
  42. self.logger = app.logger
  43. self.bot_agent = agent
  44. self.cache = Cache(app)
  45. self.callback = Callback(app)
  46. self.dataHandleServer = DataHandleServer(app)
  47. self.handler_table = self.scan_esl_event_handlers()
  48. self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent)
  49. self.host, self.port, self.password = SERVE_HOST, '8021', '4918257983818884358'
  50. self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(self.thread_num)}
  51. self.delay_action_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
  52. self.delay_action_scheduler = BackgroundScheduler()
  53. self.delay_action_scheduler.add_job(self.submit_delay_action, 'interval', seconds=1, max_instances=1)
  54. self.delay_action_scheduler.start()
  55. threading.Thread(target=self.start, args=()).start()
  56. def submit_delay_action(self):
  57. for name, member in DelayActionEnum.__members__.items():
  58. action_messages = self.cache.get_delay_message(name)
  59. for action_message in action_messages:
  60. self.delay_action_executor.submit(self.do_delay_action, name, action_message)
  61. def scan_esl_event_handlers(self):
  62. import inspect
  63. import importlib
  64. import pkgutil
  65. classes = []
  66. # 遍历包中的模块
  67. for module_info in pkgutil.iter_modules(event_handler.__path__):
  68. module = importlib.import_module(f"{event_handler.__name__}.{module_info.name}")
  69. for _name, _cls in inspect.getmembers(module, inspect.isclass):
  70. if hasattr(_cls, '_esl_event_name'):
  71. classes.append(_cls)
  72. handlers = {}
  73. for _cls in classes:
  74. items = handlers.get(_cls._esl_event_name, [])
  75. items.append(_cls(self, self.bot_agent))
  76. handlers[_cls._esl_event_name] = items
  77. return handlers
  78. def start(self):
  79. self.logger.info('inbound.start')
  80. self.con = ESL.ESLconnection(self.host, self.port, self.password)
  81. if self.con.connected():
  82. self.logger.info('inbound esl connected ... ')
  83. self.con.events('plain', 'all') #CHANNEL_ORIGINATE,CHANNEL_PROGRESS,CHANNEL_PROGRESS_MEDIA,CHANNEL_ANSWER,CHANNEL_HANGUP,CUSTOM,PLAYBACK_START,PLAYBACK_STOP,DETECTED_TONE
  84. while not self.is_stopping:
  85. e = self.con.recvEvent()
  86. # if e:
  87. # self.logger.info(json.loads(e.serialize('json')))
  88. event_name = e.getHeader("Event-Name")
  89. if event_name == "SERVER_DISCONNECTED":
  90. self.logger.info('come in SERVER_DISCONNECTED case')
  91. self.con.disconnect()
  92. time.sleep(3)
  93. self.start()
  94. else:
  95. threading.Thread(target=self.process_esl_event, args=(e,)).start()
  96. # self.choose_thread_pool_executor(e).submit(self.process_esl_event, e)
  97. def choose_thread_pool_executor(self, e):
  98. call_id = EslEventUtil.getCallId(e)
  99. device_id = EslEventUtil.getUniqueId(e)
  100. wdh_device_id = EslEventUtil.getDeviceId(e)
  101. random_id = call_id if call_id else device_id
  102. if random_id:
  103. random_index = abs(mmh3.hash(random_id)) % len(self.executors)
  104. else:
  105. random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0
  106. # self.logger.info('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id)
  107. return self.executors.get(random_index)
  108. def process_esl_event(self, e):
  109. # self.logger.info(json.loads(e.serialize('json')))
  110. event_name = EslEventUtil.getEventName(e)
  111. coreUUID = EslEventUtil.getCoreUuid(e)
  112. address = self.host + ':' + self.port
  113. # self.logger.info("process_esl_event.event_name=%s,coreUUID=%s", event_name, coreUUID)
  114. try:
  115. self.callback.callback_event(e)
  116. if event_name in self.handler_table:
  117. items = self.handler_table.get(event_name)
  118. for x in items:
  119. try:
  120. # self.logger.info("process_esl_event.handle.%s", x.__class__.__name__)
  121. x.handle(address, e, coreUUID)
  122. except:
  123. traceback.print_exc()
  124. else:
  125. self.default_event_handler.handle(address, e, coreUUID)
  126. except:
  127. traceback.print_exc()
  128. def do_delay_action(self, action, message):
  129. delay_action = DelayAction.from_json(message)
  130. flag = self.cache.lock_delay_action(delay_action.uuid)
  131. self.logger.info("do_delay_action::action=%s, flag=%s", action, flag)
  132. if not flag:
  133. self.logger.info("异步延迟执行操作重复 action:%s msg:%s", action, message)
  134. return
  135. delay_action_enum = DelayActionEnum.get_by_code(action)
  136. if not delay_action_enum:
  137. self.logger.info("异步延迟执行 delayActionEnum为空 action:%s msg:%s", action, message)
  138. return
  139. if DelayActionEnum.CALL_TIMEOUT_HANGUP == delay_action_enum:
  140. self.exec_when_call_timeout(delay_action.call_id, delay_action.device_id)
  141. elif DelayActionEnum.PLAY_TIMEOUT_HANGUP == delay_action_enum:
  142. self.exec_when_play_timeout(delay_action.call_id)
  143. elif DelayActionEnum.ACD_TIMEOUT_PLAY == delay_action_enum:
  144. self.exec_when_acd_timeout(delay_action.call_id)
  145. def exec_when_call_timeout(self, call_id, device_id):
  146. call_info = self.cache.get_call_info(call_id)
  147. if not call_info or not (device_id in call_info.device_list):
  148. return
  149. device_info = call_info.device_info_map.get(device_id)
  150. if device_info and device_info.answer_time is None:
  151. self.logger.info("call:%s deviceId:%s execWhenCallTimeOut", call_id, device_id)
  152. device_info.hangup_cause = CallCause.CALL_TIMEOUT.name
  153. call_info.next_commands = []
  154. if device_info.device_type <= DeviceType.ROBOT.code:
  155. call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code
  156. call_info.hangup_code = CallCause.CALL_TIMEOUT.code
  157. if device_info.device_type.code == DeviceType.CUSTOMER.code:
  158. call_info.user_no_answer_end_call = True
  159. # if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code:
  160. # channel = self.show_channel(device_id)
  161. # if channel:
  162. # delay_action = DelayAction(call_id=call_id, device_id=device_id)
  163. # self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20)
  164. self.cache.add_call_info(call_info)
  165. self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
  166. self.dataHandleServer.update_record(call_id, {"status": 0})
  167. def exec_when_play_timeout(self, call_id):
  168. call_info = self.cache.get_call_info(call_id)
  169. if not call_info or not call_info.next_commands:
  170. return
  171. self.logger.info("播放结束音乐失败,进行挂机 callId:%s", call_id)
  172. next_types = [x.next_type for x in call_info.next_commands]
  173. if NextType.NEXT_HANGUP.code in next_types:
  174. for device_id in call_info.device_list:
  175. self.hangup_call(call_id, device_id, CallCause.PLAY_TIMEOUT)
  176. def exec_when_acd_timeout(self, call_id):
  177. call_info = self.cache.get_call_info(call_id)
  178. if not call_info:
  179. self.logger.info("exec_when_acd_timeout callInfo为空 callId: {}", call_id)
  180. return
  181. device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER]
  182. if device_list and len(device_list) == 1:
  183. device_id = device_list[0].device_id
  184. self.break0(device_id)
  185. if not WaitingHangupMusicPath:
  186. self.hangup_call(call_id, device_id, CallCause.WAITING_TIMEOUT)
  187. return
  188. self.hold_play(device_id, WaitingHangupMusicPath)
  189. self.play_timeout(call_id, timeout=30)
  190. next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code)
  191. call_info.next_commands = [next_command]
  192. self.cache.add_call_info(call_info)
  193. self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
  194. device_id, WaitingHangupMusicPath)
  195. def make_call_new(self, context: MakeCallContext):
  196. # self.logger.info("拨打测试context:%s", context.__dict__)
  197. called = context.get_called()
  198. params = {'gateway': context.route_gateway_name, 'called': called, 'realm': context.get_realm()}
  199. builder = [
  200. '{', context.get_sip_header(), '}'
  201. ]
  202. self.call_timeout(context.call_id, context.device_id, context.timeout)
  203. if context.device_type == DeviceType.CUSTOMER.code:
  204. profile = self.expression(profile1, params)
  205. builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}")
  206. else:
  207. profile = self.expression(profile2, params)
  208. builder.append(f"{profile}{PARK}")
  209. # self.logger.info("拨打测试builder:%s", builder)
  210. cmd = "".join(builder)
  211. self.logger.info(cmd)
  212. self.con.bgapi(ORIGINATE, cmd)
  213. def call_timeout(self, call_id, device_id, timeout):
  214. """呼叫超时主动挂机"""
  215. delay_action = DelayAction(call_id=call_id, device_id=device_id)
  216. self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
  217. def send_args(self, device_id, name, arg, con=None):
  218. msg = ESL.ESLevent("sendmsg", device_id)
  219. msg.addHeader("call-command", EXECUTE)
  220. msg.addHeader("execute-app-name", name)
  221. msg.addHeader("execute-app-arg", arg)
  222. if con:
  223. con.sendEvent(msg)
  224. else:
  225. self.con.sendEvent(msg)
  226. def bridge_call(self, call_id, device_id1, device_id2):
  227. """桥接电话"""
  228. self.multi_set_var(device_id1, BRIDGE_VARIABLES)
  229. self.multi_set_var(device_id2, BRIDGE_VARIABLES)
  230. self.bgapi(BRIDGE, arg=device_id1 + SPACE + device_id2, desc="bridge_call")
  231. def transfer_call(self, _from, _to):
  232. """转接"""
  233. builder = [
  234. _from,
  235. " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline "
  236. ]
  237. command = ''.join(builder)
  238. self.bgapi(command, desc="transfer_call")
  239. def answer(self, device_id):
  240. """应答"""
  241. self.con.bgapi('uuid_phone_event', device_id + ' talk')
  242. def hangup_call(self, call_id, device_id, case_enum=CallCause.DEFAULT):
  243. """挂机"""
  244. msg = ESL.ESLevent("sendmsg", device_id)
  245. msg.addHeader("call-command", EXECUTE)
  246. msg.addHeader("execute-app-name", HANGUP)
  247. msg.addHeader("execute-app-arg", NORMAL_CLEARING)
  248. # msg.addHeader("async", "true")
  249. _con = None
  250. try:
  251. _con = ESL.ESLconnection(self.host, self.port, self.password)
  252. if _con.connected():
  253. arg = ''.join([EslEventUtil.SIP_H_P_LIBRA_HANGUP_CAUSE, "=", case_enum.description])
  254. self.send_args(device_id, SET, arg, con=_con)
  255. e = _con.sendMSG(msg)
  256. # e = _con.api(UUID_KILL, device_id)
  257. self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e))
  258. # self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e.serialize('json')))
  259. except:
  260. traceback.print_exc()
  261. finally:
  262. if _con:
  263. _con.disconnect()
  264. def kill_call(self, call_id, device_id, case_enum=CallCause.DEFAULT):
  265. if not device_id:
  266. return
  267. self.logger.info("kill_call:%s, device:%s, case_enum:%s", call_id, device_id, case_enum)
  268. self.api(UUID_KILL, device_id, desc="kill_call")
  269. def broadcast(self, uuid, path, smf):
  270. builder = [
  271. UUID_BROADCAST,
  272. uuid,
  273. path,
  274. smf
  275. ]
  276. command = ' '.join(builder)
  277. self.con.bgapi(command, EMPTY)
  278. def break0(self, uuid, all=False, sync=True):
  279. builder = [
  280. UUID_BREAK,
  281. uuid
  282. ]
  283. if all:
  284. builder.append("all")
  285. command = ' '.join(builder)
  286. if sync:
  287. self.api(command, desc="break0")
  288. else:
  289. self.bgapi(command, desc="break0")
  290. def hold(self, smf, uuid, display):
  291. builder = [
  292. UUID_HOLD,
  293. smf,
  294. uuid,
  295. display
  296. ]
  297. if display:
  298. builder.append("all")
  299. else:
  300. builder.append(EMPTY)
  301. command = ' '.join(builder)
  302. self.con.bgapi(command, EMPTY)
  303. def get_var(self, uuid, var):
  304. builder = [
  305. UUID_GETVAR,
  306. uuid,
  307. var
  308. ]
  309. command = ' '.join(builder)
  310. self.con.bgapi(command, EMPTY)
  311. def set_var(self, uuid, var, val):
  312. builder = [
  313. UUID_SETVAR,
  314. uuid,
  315. var,
  316. val
  317. ]
  318. command = ' '.join(builder)
  319. self.bgapi(command, desc ="set_var")
  320. def multi_set_var(self, uuid, params):
  321. builder = [
  322. "uuid_setvar_multi " + uuid + " "
  323. ]
  324. builder1 = []
  325. for k, v in params.items():
  326. builder1.append(k + "="+v)
  327. builder.append(';'.join(builder1))
  328. command = ''.join(builder)
  329. self.bgapi(command, desc="multi_set_var")
  330. def record(self, uuid, action, path, limit):
  331. builder = [
  332. UUID_RECORD,
  333. uuid,
  334. action,
  335. path,
  336. str(limit)
  337. ]
  338. command = ' '.join(builder)
  339. self.bgapi(command, desc="record")
  340. def transfer(self, uuid, smf, dest, dialplan, context):
  341. builder = [
  342. UUID_TRANSFER,
  343. uuid,
  344. smf,
  345. dest,
  346. dialplan,
  347. context
  348. ]
  349. command = ' '.join(builder)
  350. self.con.bgapi(command, EMPTY)
  351. def insert(self, device_id):
  352. """强插"""
  353. builder = [
  354. device_id,
  355. " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline "
  356. ]
  357. arg = ''.join(builder)
  358. self.con.api(TRANSFER, arg)
  359. def bridge_break(self, call_id, device_id):
  360. """拆线"""
  361. builder = [
  362. device_id,
  363. f" -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,set:{SIP_HEADER}{sipHeaderHoldMusic}={call_id},park:' inline "
  364. ]
  365. arg = ''.join(builder)
  366. self.api(TRANSFER, arg, "bridge_break")
  367. def play_file(self, call_id, device_id, file, sync):
  368. """放音"""
  369. if sync:
  370. return self.hold_play(device_id, file)
  371. else:
  372. msg = ESL.ESLevent("sendmsg", device_id)
  373. msg.addHeader("call-command", EXECUTE)
  374. msg.addHeader("execute-app-name", PLAYBACK)
  375. msg.addHeader("execute-app-arg", file)
  376. msg.addHeader("async", "true")
  377. self.con.sendEvent(msg)
  378. def stop_play(self, device_id):
  379. """关闭播放音乐"""
  380. builder = [
  381. device_id,
  382. " on"
  383. ]
  384. arg = ''.join(builder)
  385. self.con.api(PAUSE, arg)
  386. def hold_play(self, device_id, play):
  387. """向a-leg插播tts音乐(无限播放)"""
  388. builder = [
  389. device_id,
  390. " playback::",
  391. play,
  392. " ",
  393. SMF_ALEG
  394. ]
  395. arg = ''.join(builder)
  396. self.api(UUID_BROADCAST, arg, "hold_play")
  397. def api(self, command, arg=EMPTY, desc=""):
  398. _con = None
  399. try:
  400. _con = ESL.ESLconnection(self.host, self.port, self.password)
  401. if _con.connected():
  402. e = _con.api(command, arg)
  403. self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' ' + arg, e.serialize('json')))
  404. except:
  405. traceback.print_exc()
  406. finally:
  407. if _con:
  408. _con.disconnect()
  409. def bgapi(self, command, arg=EMPTY, desc=""):
  410. _con = None
  411. try:
  412. _con = ESL.ESLconnection(self.host, self.port, self.password)
  413. if _con.connected():
  414. e = _con.bgapi(command, arg)
  415. self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' '+arg, e.serialize('json')))
  416. except:
  417. traceback.print_exc()
  418. finally:
  419. if _con:
  420. _con.disconnect()
  421. def play_timeout(self, call_id, timeout):
  422. """播放超时主动挂机"""
  423. delay_action = DelayAction(call_id=call_id)
  424. self.cache.add_delay_message(DelayActionEnum.PLAY_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
  425. def listen(self, device_id1, device_id2, aleg=True, bleg=True):
  426. """监听"""
  427. if aleg:
  428. self.send_args(device_id1, SET, "eavesdrop_bridge_aleg=true")
  429. if bleg:
  430. self.send_args(device_id1, SET, "eavesdrop_bridge_bleg=true")
  431. self.send_args(device_id1, EAVESDROP, device_id2)
  432. def show_channel(self, device_id):
  433. msg = self.con.api("show", " channels like " + device_id + " as json")
  434. self.logger.info('show_channel::', msg)
  435. return msg
  436. def expression(self, template, params):
  437. for key, value in params.items():
  438. template = template.replace("#{["+key+"]}", str(value))
  439. return template
  440. def stop(self):
  441. for k, v in self.executors.items():
  442. v.shutdown()
  443. self.con.disconnect()
  444. self.is_stopping = True
  445. class OutboundClient:
  446. def __init__(self, agent, app):
  447. self.app = app
  448. self.logger = app.logger
  449. server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent))
  450. server_thread.daemon = True # 设置守护线程
  451. server_thread.start()
  452. class ESLRequestHandler(socketserver.BaseRequestHandler):
  453. def setup(self):
  454. try:
  455. self.server.logger.info('%s connected!', self.client_address)
  456. fd = self.request.fileno()
  457. con = ESL.ESLconnection(fd)
  458. self.server.logger.info('Connected: %s', con.connected())
  459. if con.connected():
  460. info = con.getInfo()
  461. # self.logger.info(json.loads(info.serialize('json')))
  462. event_name = info.getHeader("Event-Name")
  463. self.server.logger.info('Event-Name: %s', event_name)
  464. device_id = info.getHeader("unique-id")
  465. caller_number = info.getHeader("Caller-Caller-ID-Number") # 获取来电号码
  466. call_id = 'C' + str(Snowflake().next_id())
  467. new_device_id = 'D' + str(Snowflake().next_id())
  468. kwargs = json.loads(info.serialize('json'))
  469. kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id
  470. kwargs['variable_sip_h_P-LIBRA-DeviceId'] = new_device_id
  471. _bucket_call_type = 2
  472. destination = None
  473. bucket = self.server.get_bucket(call_id)
  474. whitelist = self.server.get_whitelist()
  475. if bucket and bucket.name == 'AI':
  476. #转到ai机器人
  477. _bucket_call_type = 1
  478. destination = self.server.agent.register(**kwargs)
  479. self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s" % (device_id, destination, new_device_id))
  480. self.build_call_info(call_id, device_id, new_device_id, str(destination), _bucket_call_type, **kwargs)
  481. self.server.cache.add_device_user_part(device_id, destination)
  482. con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s" % (call_id, new_device_id, new_device_id, destination), device_id)
  483. elif caller_number in whitelist:
  484. # 检查白名单
  485. _bucket_call_type = 0
  486. self.build_call_info(call_id, device_id, new_device_id, destination=None, bucket_type=_bucket_call_type, **kwargs)
  487. self.server.agent.acd_service.transfer_to_agent(call_id, new_device_id)
  488. else:
  489. # 传统服务
  490. self.build_call_info(call_id, device_id, new_device_id, destination=None, bucket_type=_bucket_call_type, **kwargs)
  491. self.server.agent.acd_service.transfer_to_agent(call_id, new_device_id)
  492. Registry.counter('call_incoming_requests', '呼入流量统计', ['bucket_type', str(_bucket_call_type)]).inc()
  493. self.server.dataHandleServer.create_record(call_id, caller_number, _bucket_call_type, service_category=None, destination=destination)
  494. # # 检查白名单
  495. # if caller_number in whitelist:
  496. # agents = self.server.load_agent_monitor()
  497. # destination = random.choice(agents) # 随机取一个坐席号
  498. # # 直接转接到人工坐席
  499. # self.server.logger.info( "Caller %s is in whitelist, agents: %s, destination: %s"% (caller_number, agents, destination))
  500. # call_info['type']= 0
  501. # call_info['agent_num'] = destination
  502. # else:
  503. # #转到ai机器人
  504. # destination = self.server.agent.register(**kwargs)
  505. # self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s"% (device_id, destination, new_device_id))
  506. # call_info['type'] = 1
  507. # call_info['service_category'] = 1
  508. # call_info['user_id'] = destination
  509. # call_info['user_name'] = f"机器人{destination}"
  510. # self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs)
  511. # self.server.cache.add_device_user_part(device_id, destination)
  512. # con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id)
  513. # self.server.dataHandleServer.create_record(call_info)
  514. # destination = "user/1001"
  515. # msg = ESL.ESLevent("sendmsg", uuid)
  516. # msg.addHeader("call-command", "execute")
  517. # msg.addHeader("execute-app-name", "bridge")
  518. # msg.addHeader("execute-app-arg", destination)
  519. # # 发送消息以执行 bridge 操作
  520. # con.sendEvent(msg)
  521. # self.logger.info(f"Call {uuid} is bridged to {destination}")
  522. # con.execute("answer", "", uuid)
  523. # con.execute("transfer", "1001 XML default", uuid)
  524. # try:
  525. # con.disconnect()
  526. # except:
  527. # self.logger.info('come in ')
  528. # traceback.print_exc()
  529. else:
  530. self.server.logger.info("Failed to connect to FreeSWITCH")
  531. except:
  532. traceback.print_exc()
  533. def build_call_info(self, call_id, device_id, new_device_id, destination, bucket_type, **kwargs):
  534. caller = kwargs.get('Channel-Caller-ID-Number')
  535. called = destination
  536. now = datetime.utcnow().timestamp()
  537. call_info = CallInfo(call_id=call_id, agent_key=destination,
  538. caller=caller, called=called, direction=Direction.INBOUND.code,
  539. call_type=CallType.BOT_CALL.code, call_time=now,
  540. uuid1=call_id, uuid2=device_id, saas_id=saasId, bucket_type=bucket_type,
  541. core_uuid=None, cti_flow_id=None, conference=None, group_id=None, hidden_customer=0, caller_display=None, called_display=None, number_location=None, agent_name=None, login_type=None, ivr_id=None, task_id=None, media_host=None, sip_server=None, client_host=None, record=None, record2=None, record_time=None, answer_flag=None, wait_time=None, answer_count=0, hangup_dir=None, sdk_hangup=0, hangup_code=None, answer_time=None, end_time=None, talk_time=None, first_queue_time=None, queue_start_time=None, queue_end_time=None, overflow_count=0, cdr_notify_url=None, queue_level=None, transfer_agent=None,device_list=[], device_info_map = {}, follow_data = {}, process_data = {}, next_commands=[], call_details=[])
  542. device_custom = DeviceInfo(device_id=device_id, call_time=now,
  543. call_id=call_id, device_type=DeviceType.CUSTOMER.code,
  544. agent_key=destination, cdr_type=CdrType.OUTBOUND.code,
  545. cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None)
  546. device_bot = DeviceInfo(device_id=new_device_id, call_time=now,
  547. call_id=call_id, device_type=DeviceType.ROBOT.code,
  548. agent_key=destination, cdr_type=CdrType.INBOUND.code,
  549. cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None)
  550. call_info.device_list.append(device_id)
  551. # self.server.logger.info("liuwei::debugger:3333::call_id=%s, device_id=%s" % (call_id, device_id))
  552. call_info.device_list.append(new_device_id)
  553. # self.server.logger.info("liuwei::debugger:4444::call_id=%s, device_id=%s" % (call_id, new_device_id))
  554. # call_info.next_commands.append(NextCommand(device_id, NextType.NEXT_CALL_BRIDGE.code, new_device_id))
  555. call_info.device_info_map = {device_id: device_custom, new_device_id: device_bot}
  556. # self.server.logger.info("lwoutBound, call_info=%s"%(call_info))
  557. self.server.cache.add_call_info(call_info)
  558. class CustomTCPServer(socketserver.TCPServer):
  559. def __init__(self, server_address, RequestHandlerClass, agent, app):
  560. super().__init__(server_address, RequestHandlerClass)
  561. self.agent = agent
  562. self.app = app
  563. self.cache = Cache(app)
  564. self.logger = app.logger
  565. self.whitelist = []
  566. self.buckets = []
  567. self.update_whitelist() # 初始化加载白名单
  568. self.update_bucket() #初始化分流
  569. self.dataHandleServer = DataHandleServer(app)
  570. # 定时更新白名单
  571. threading.Thread(target=self.refresh_whitelist, daemon=True).start()
  572. threading.Thread(target=self.refresh_bucket, daemon=True).start()
  573. def update_whitelist(self):
  574. with self.app.app_context():
  575. phones = Whitelist.query.filter_by(del_flag=0).all()
  576. self.whitelist = [phone.phone for phone in phones]
  577. self.logger.info("Whitelist updated: %s", self.whitelist)
  578. def refresh_whitelist(self):
  579. while True:
  580. time.sleep(3600) # 每 1小时 更新一次
  581. self.update_whitelist()
  582. def get_whitelist(self):
  583. return self.whitelist
  584. def load_agent_monitor(self):
  585. with self.app.app_context():
  586. agents = AgentMonitor.query.filter_by(check_state=0, service_state=2).all()
  587. agent_nums = [agent.agent_num for agent in agents]
  588. return agent_nums
  589. def get_bucket(self, custom_uuid=None):
  590. custom_uuid == custom_uuid if custom_uuid else str(uuid.uuid4())
  591. random_id = abs(mmh3.hash(custom_uuid))
  592. if len(self.buckets) <= 0:
  593. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  594. for bucket in self.buckets:
  595. num = (random_id % 100 + 100) %100
  596. if num >= bucket.lower and num < bucket.upper:
  597. return bucket
  598. return self.buckets[0]
  599. def update_bucket(self):
  600. with self.app.app_context():
  601. buckets = Bucket.query.filter_by(eid='001').all()
  602. self.buckets = buckets
  603. self.logger.info("bucket updated: %s", self.whitelist)
  604. def refresh_bucket(self):
  605. while True:
  606. time.sleep(60) # 每 1分钟 更新一次
  607. self.update_bucket()
  608. def start(self, HOST='0.0.0.0', PORT=8084, agent=None):
  609. # HOST, PORT = "0.0.0.0", 8084
  610. # 创建一个 TCP 服务器
  611. with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, self.app) as server:
  612. print(f"ESL server listening on {HOST}:{PORT}")
  613. server.serve_forever()