client.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import random
  4. import socketserver
  5. import ESL
  6. import time
  7. import uuid
  8. import threading
  9. import traceback
  10. import concurrent.futures
  11. from apscheduler.schedulers.background import BackgroundScheduler
  12. from src.core.callcenter import BizException
  13. from src.core.callcenter.cache import Cache
  14. from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand
  15. from src.core.callcenter.callback import Callback
  16. from src.core.callcenter.constant import SK, EMPTY, WaitingHangupMusicPath, SAAS_ID, HOLD_MUSIC_PATH
  17. from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, \
  18. SPACE, SOFIA, \
  19. ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, \
  20. UUID_BREAK, UUID_HOLD, \
  21. UUID_RECORD, UUID_SETVAR, UUID_GETVAR, UUID_KILL, ANSWER
  22. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  23. import src.core.callcenter.esl.handler as event_handler
  24. from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId
  25. from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \
  26. Direction, CdrType, BizErrorCode, WhiteTypeEnum
  27. from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
  28. from src.core.callcenter.snowflake import Snowflake
  29. from src.core.datasource import SERVE_HOST
  30. from src.core.voip.constant import *
  31. from src.core.callcenter.data_handler import *
  32. from src.core.callcenter import registry
  33. class InboundClient:
  34. def __init__(self, agent, app):
  35. self.con = None
  36. self.thread_num = 12
  37. self.is_stopping = False
  38. self.app = app
  39. self.logger = app.logger
  40. self.bot_agent = agent
  41. self.cache = Cache(app)
  42. self.callback = Callback(app, self.thread_num)
  43. self.dataHandleServer = DataHandleServer(app)
  44. self.handler_table = self.scan_esl_event_handlers()
  45. self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent)
  46. self.host, self.port, self.password = SERVE_HOST, '8021', '4918257983818884358'
  47. self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="client-event-pool") for x in range(self.thread_num)}
  48. self.delay_action_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
  49. self.delay_action_scheduler = BackgroundScheduler()
  50. self.delay_action_scheduler.add_job(self.submit_delay_action, 'interval', seconds=1, max_instances=1, name='delay_action_daemon')
  51. self.delay_action_scheduler.start()
  52. threading.Thread(target=self.start, args=()).start()
  53. def submit_delay_action(self):
  54. for name, member in DelayActionEnum.__members__.items():
  55. action_messages = self.cache.get_delay_message(name)
  56. for action_message in action_messages:
  57. self.delay_action_executor.submit(self.do_delay_action, name, action_message)
  58. def scan_esl_event_handlers(self):
  59. import inspect
  60. import importlib
  61. import pkgutil
  62. classes = []
  63. # 遍历包中的模块
  64. for module_info in pkgutil.iter_modules(event_handler.__path__):
  65. module = importlib.import_module(f"{event_handler.__name__}.{module_info.name}")
  66. for _name, _cls in inspect.getmembers(module, inspect.isclass):
  67. if hasattr(_cls, '_esl_event_name'):
  68. classes.append(_cls)
  69. handlers = {}
  70. for _cls in classes:
  71. items = handlers.get(_cls._esl_event_name, [])
  72. items.append(_cls(self, self.bot_agent))
  73. handlers[_cls._esl_event_name] = items
  74. return handlers
  75. def start(self):
  76. self.logger.info('inbound.start')
  77. self.con = ESL.ESLconnection(self.host, self.port, self.password)
  78. if self.con.connected():
  79. self.logger.info('inbound esl connected ... ')
  80. self.con.events('plain', 'all') #CHANNEL_ORIGINATE,CHANNEL_PROGRESS,CHANNEL_PROGRESS_MEDIA,CHANNEL_ANSWER,CHANNEL_HANGUP,CUSTOM,PLAYBACK_START,PLAYBACK_STOP,DETECTED_TONE
  81. while not self.is_stopping:
  82. e = self.con.recvEvent()
  83. # if e:
  84. # self.logger.info(json.loads(e.serialize('json')))
  85. event_name = e.getHeader("Event-Name")
  86. if event_name == "SERVER_DISCONNECTED":
  87. self.logger.info('come in SERVER_DISCONNECTED case')
  88. self.con.disconnect()
  89. time.sleep(3)
  90. self.start()
  91. else:
  92. registry.FLASK_ACTIVE_THREADS.set(threading.active_count())
  93. # threading.Thread(target=self.process_esl_event, args=(e,)).start()
  94. self.choose_thread_pool_executor(e).submit(self.process_esl_event, e)
  95. def choose_thread_pool_executor(self, e):
  96. call_id = EslEventUtil.getCallId(e)
  97. device_id = EslEventUtil.getUniqueId(e)
  98. wdh_device_id = EslEventUtil.getDeviceId(e)
  99. random_id = call_id if call_id else device_id
  100. if random_id:
  101. random_index = abs(mmh3.hash(random_id)) % len(self.executors)
  102. else:
  103. random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0
  104. # self.logger.info('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id)
  105. return self.executors.get(random_index)
  106. def process_esl_event(self, e):
  107. # self.logger.info(json.loads(e.serialize('json')))
  108. start_time = time.time()
  109. event_name = EslEventUtil.getEventName(e)
  110. coreUUID = EslEventUtil.getCoreUuid(e)
  111. address = self.host + ':' + self.port
  112. # self.logger.info("process_esl_event.event_name=%s,coreUUID=%s", event_name, coreUUID)
  113. try:
  114. self.callback.callback_event(e)
  115. if event_name in self.handler_table:
  116. items = self.handler_table.get(event_name)
  117. for x in items:
  118. try:
  119. # self.logger.info("process_esl_event.handle.%s", x.__class__.__name__)
  120. x.handle(address, e, coreUUID)
  121. except:
  122. traceback.print_exc()
  123. else:
  124. self.default_event_handler.handle(address, e, coreUUID)
  125. except:
  126. traceback.print_exc()
  127. finally:
  128. latency = (time.time() - start_time)
  129. registry.ESL_EVENT_LATENCY.labels(event_name).observe(latency)
  130. def do_delay_action(self, action, message):
  131. delay_action = DelayAction.from_json(message)
  132. flag = self.cache.lock_delay_action(delay_action.uuid)
  133. self.logger.info("do_delay_action::action=%s, flag=%s", action, flag)
  134. if not flag:
  135. self.logger.info("异步延迟执行操作重复 action:%s msg:%s", action, message)
  136. return
  137. delay_action_enum = DelayActionEnum.get_by_code(action)
  138. if not delay_action_enum:
  139. self.logger.info("异步延迟执行 delayActionEnum为空 action:%s msg:%s", action, message)
  140. return
  141. if DelayActionEnum.CALL_TIMEOUT_HANGUP == delay_action_enum:
  142. self.exec_when_call_timeout(delay_action.call_id, delay_action.device_id)
  143. elif DelayActionEnum.PLAY_TIMEOUT_HANGUP == delay_action_enum:
  144. self.exec_when_play_timeout(delay_action.call_id)
  145. elif DelayActionEnum.ACD_TIMEOUT_PLAY == delay_action_enum:
  146. self.exec_when_acd_timeout(delay_action.call_id)
  147. def exec_when_call_timeout(self, call_id, device_id):
  148. call_info = self.cache.get_call_info(call_id)
  149. if not call_info or not (device_id in call_info.device_list):
  150. self.logger.info("do_delay_action:exec_when_call_timeout:return, device_id=%s, call_info=%s", device_id, call_info)
  151. return
  152. device_info = call_info.device_info_map.get(device_id)
  153. if device_info and device_info.answer_time is None:
  154. self.logger.info("call:%s deviceId:%s execWhenCallTimeOut", call_id, device_id)
  155. device_info.hangup_cause = CallCause.CALL_TIMEOUT.name
  156. call_info.next_commands = []
  157. if device_info.device_type <= DeviceType.ROBOT.code:
  158. call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code
  159. call_info.hangup_code = CallCause.CALL_TIMEOUT.code
  160. if device_info.device_type.code == DeviceType.CUSTOMER.code:
  161. call_info.user_no_answer_end_call = True
  162. # if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code:
  163. # channel = self.show_channel(device_id)
  164. # if channel:
  165. # delay_action = DelayAction(call_id=call_id, device_id=device_id)
  166. # self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20)
  167. self.cache.add_call_info(call_info)
  168. self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
  169. # self.dataHandleServer.update_record(call_id, status= 0)
  170. def exec_when_play_timeout(self, call_id):
  171. call_info = self.cache.get_call_info(call_id)
  172. if not call_info or not call_info.next_commands:
  173. return
  174. self.logger.info("播放结束音乐失败,进行挂机 callId:%s", call_id)
  175. next_types = [x.next_type for x in call_info.next_commands]
  176. if NextType.NEXT_HANGUP.code in next_types:
  177. for device_id in call_info.device_list:
  178. self.hangup_call(call_id, device_id, CallCause.PLAY_TIMEOUT)
  179. def exec_when_acd_timeout(self, call_id):
  180. call_info = self.cache.get_call_info(call_id)
  181. if not call_info:
  182. self.logger.info("exec_when_acd_timeout callInfo为空 callId: %s", call_id)
  183. return
  184. next_cmd = call_info.next_commands[0] if call_info.next_commands and len(call_info.next_commands) >0 else None
  185. self.logger.info("do_delay_action:exec_when_acd_timeout:next_cmd=%s, call_info=%s", next_cmd, call_info)
  186. if next_cmd and next_cmd.next_type == NextType.NEXT_TRANSFER_CALL.code:
  187. device_id = next_cmd.next_value
  188. self.break0(device_id)
  189. self.hold_play(device_id, WaitingHangupMusicPath)
  190. self.play_timeout(call_id, timeout=30)
  191. next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code)
  192. call_info.next_commands = [next_command]
  193. self.cache.add_call_info(call_info)
  194. # self.dataHandleServer.update_record(call_id, status= 0)
  195. self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
  196. device_id, WaitingHangupMusicPath)
  197. device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER]
  198. self.logger.info("do_delay_action:exec_when_acd_timeout:device_list=%s, call_info=%s", device_list, call_info)
  199. if device_list and len(device_list) == 1:
  200. device_id = device_list[0].device_id
  201. self.break0(device_id)
  202. if not WaitingHangupMusicPath:
  203. self.hangup_call(call_id, device_id, CallCause.WAITING_TIMEOUT)
  204. return
  205. self.hold_play(device_id, WaitingHangupMusicPath)
  206. self.play_timeout(call_id, timeout=30)
  207. next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code)
  208. call_info.next_commands = [next_command]
  209. self.cache.add_call_info(call_info)
  210. # self.dataHandleServer.update_record(call_id, status= 0)
  211. self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
  212. device_id, WaitingHangupMusicPath)
  213. def make_call(self, context: MakeCallContext):
  214. # self.logger.info("拨打测试context:%s", context.__dict__)
  215. called = context.get_called()
  216. params = {'gateway': context.route_gateway_name, 'called': called, 'realm': context.get_realm()}
  217. builder = [
  218. '{', context.get_sip_header(), '}'
  219. ]
  220. self.call_timeout(context.call_id, context.device_id, context.timeout)
  221. if context.device_type == DeviceType.CUSTOMER.code:
  222. profile = self.expression(profile1, params)
  223. builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}")
  224. else:
  225. profile = self.expression(profile2, params)
  226. builder.append(f"{profile}{PARK}")
  227. # self.logger.info("拨打测试builder:%s", builder)
  228. cmd = "".join(builder)
  229. # self.logger.info("makeCall::%s", cmd)
  230. self.bgapi(ORIGINATE, arg=cmd, desc="make_call")
  231. # self.con.bgapi(ORIGINATE, cmd)
  232. def call_timeout(self, call_id, device_id, timeout):
  233. """呼叫超时主动挂机"""
  234. delay_action = DelayAction(call_id=call_id, device_id=device_id)
  235. self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
  236. def send_args(self, device_id, name, arg, con=None):
  237. msg = ESL.ESLevent("sendmsg", device_id)
  238. msg.addHeader("call-command", EXECUTE)
  239. msg.addHeader("execute-app-name", name)
  240. msg.addHeader("execute-app-arg", arg)
  241. if con:
  242. con.sendEvent(msg)
  243. else:
  244. self.con.sendEvent(msg)
  245. def bridge_call(self, call_id, device_id1, device_id2):
  246. """桥接电话"""
  247. self.multi_set_var(device_id1, BRIDGE_VARIABLES)
  248. self.multi_set_var(device_id2, BRIDGE_VARIABLES)
  249. self.bgapi(BRIDGE, arg=device_id1 + SPACE + device_id2, desc="bridge_call")
  250. def transfer_call(self, _from, _to):
  251. """转接"""
  252. builder = [
  253. _from,
  254. " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline "
  255. ]
  256. command = ''.join(builder)
  257. self.bgapi(command, desc="transfer_call")
  258. def answer(self, device_id):
  259. """应答"""
  260. self.con.bgapi('uuid_phone_event', device_id + ' talk')
  261. def hangup_call(self, call_id, device_id, case_enum=CallCause.DEFAULT):
  262. """挂机"""
  263. msg = ESL.ESLevent("sendmsg", device_id)
  264. msg.addHeader("call-command", EXECUTE)
  265. msg.addHeader("execute-app-name", HANGUP)
  266. msg.addHeader("execute-app-arg", NORMAL_CLEARING)
  267. # msg.addHeader("async", "true")
  268. _con = None
  269. try:
  270. _con = ESL.ESLconnection(self.host, self.port, self.password)
  271. if _con.connected():
  272. arg = ''.join([EslEventUtil.SIP_H_P_LIBRA_HANGUP_CAUSE, "=", case_enum.description])
  273. self.send_args(device_id, SET, arg, con=_con)
  274. e = _con.sendMSG(msg)
  275. # e = _con.api(UUID_KILL, device_id)
  276. # 更新通话记录
  277. self.dataHandleServer.update_record(call_id, hangup_dir=HangupDir.PLATFORM_HANGUP.code)
  278. self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e))
  279. # self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e.serialize('json')))
  280. except:
  281. traceback.print_exc()
  282. finally:
  283. if _con:
  284. _con.disconnect()
  285. def kill_call(self, call_id, device_id, case_enum=CallCause.DEFAULT):
  286. if not device_id:
  287. return
  288. self.logger.info("kill_call:%s, device:%s, case_enum:%s", call_id, device_id, case_enum)
  289. self.api(UUID_KILL, device_id, desc="kill_call")
  290. def broadcast(self, uuid, path, smf):
  291. builder = [
  292. UUID_BROADCAST,
  293. uuid,
  294. path,
  295. smf
  296. ]
  297. command = ' '.join(builder)
  298. self.bgapi(command, desc="broadcast")
  299. # self.con.bgapi(command, EMPTY)
  300. def break0(self, uuid, all=False, sync=True):
  301. builder = [
  302. UUID_BREAK,
  303. uuid
  304. ]
  305. if all:
  306. builder.append("all")
  307. command = ' '.join(builder)
  308. if sync:
  309. self.api(command, desc="break0")
  310. else:
  311. self.bgapi(command, desc="break0")
  312. def hold(self, smf, uuid, display):
  313. builder = [
  314. UUID_HOLD,
  315. smf,
  316. uuid,
  317. display
  318. ]
  319. if display:
  320. builder.append("all")
  321. else:
  322. builder.append(EMPTY)
  323. command = ' '.join(builder)
  324. self.con.bgapi(command, EMPTY)
  325. def get_var(self, uuid, var):
  326. builder = [
  327. UUID_GETVAR,
  328. uuid,
  329. var
  330. ]
  331. command = ' '.join(builder)
  332. self.con.bgapi(command, EMPTY)
  333. def set_var(self, uuid, var, val):
  334. builder = [
  335. UUID_SETVAR,
  336. uuid,
  337. var,
  338. val
  339. ]
  340. command = ' '.join(builder)
  341. self.bgapi(command, desc ="set_var")
  342. def multi_set_var(self, uuid, params):
  343. builder = [
  344. "uuid_setvar_multi " + uuid + " "
  345. ]
  346. builder1 = []
  347. for k, v in params.items():
  348. builder1.append(k + "="+v)
  349. builder.append(';'.join(builder1))
  350. command = ''.join(builder)
  351. self.bgapi(command, desc="multi_set_var")
  352. def record(self, uuid, action, path, limit):
  353. builder = [
  354. UUID_RECORD,
  355. uuid,
  356. action,
  357. path,
  358. str(limit)
  359. ]
  360. command = ' '.join(builder)
  361. self.bgapi(command, desc="record")
  362. def transfer(self, uuid, smf, dest, dialplan, context):
  363. builder = [
  364. UUID_TRANSFER,
  365. uuid,
  366. smf,
  367. dest,
  368. dialplan,
  369. context
  370. ]
  371. command = ' '.join(builder)
  372. self.con.bgapi(command, EMPTY)
  373. def insert(self, device_id):
  374. """强插"""
  375. builder = [
  376. device_id,
  377. " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline "
  378. ]
  379. arg = ''.join(builder)
  380. self.con.api(TRANSFER, arg)
  381. def bridge_break(self, call_id, device_id):
  382. """拆线"""
  383. builder = [
  384. device_id,
  385. f" -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,set:{SIP_HEADER}{sipHeaderHoldMusic}={call_id},park:' inline "
  386. ]
  387. arg = ''.join(builder)
  388. self.api(TRANSFER, arg, "bridge_break")
  389. def play_file(self, call_id, device_id, file, sync):
  390. """放音"""
  391. if sync:
  392. return self.hold_play(device_id, file)
  393. else:
  394. msg = ESL.ESLevent("sendmsg", device_id)
  395. msg.addHeader("call-command", EXECUTE)
  396. msg.addHeader("execute-app-name", PLAYBACK)
  397. msg.addHeader("execute-app-arg", file)
  398. msg.addHeader("async", "true")
  399. self.con.sendEvent(msg)
  400. def stop_play(self, device_id):
  401. """关闭播放音乐"""
  402. builder = [
  403. device_id,
  404. " on"
  405. ]
  406. arg = ''.join(builder)
  407. self.con.api(PAUSE, arg)
  408. def hold_play(self, device_id, play):
  409. """向a-leg插播tts音乐(无限播放)"""
  410. builder = [
  411. device_id,
  412. " playback::",
  413. play,
  414. " ",
  415. SMF_ALEG
  416. ]
  417. arg = ''.join(builder)
  418. self.api(UUID_BROADCAST, arg, "hold_play")
  419. def api(self, command, arg=EMPTY, desc=""):
  420. _con = None
  421. try:
  422. _con = ESL.ESLconnection(self.host, self.port, self.password)
  423. if _con.connected():
  424. e = _con.api(command, arg)
  425. self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' ' + arg, e.serialize('json')))
  426. except:
  427. traceback.print_exc()
  428. finally:
  429. if _con:
  430. _con.disconnect()
  431. def bgapi(self, command, arg=EMPTY, desc=""):
  432. _con = None
  433. try:
  434. _con = ESL.ESLconnection(self.host, self.port, self.password)
  435. if _con.connected():
  436. e = _con.bgapi(command, arg)
  437. self.logger.info('%s success, arg=%s, event=%s' % (desc, command + ' '+arg, e.serialize('json')))
  438. except:
  439. traceback.print_exc()
  440. finally:
  441. if _con:
  442. _con.disconnect()
  443. def play_timeout(self, call_id, timeout):
  444. """播放超时主动挂机"""
  445. delay_action = DelayAction(call_id=call_id)
  446. self.cache.add_delay_message(DelayActionEnum.PLAY_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
  447. def listen(self, device_id1, device_id2, aleg=True, bleg=True):
  448. """监听"""
  449. if aleg:
  450. self.send_args(device_id1, SET, "eavesdrop_bridge_aleg=true")
  451. if bleg:
  452. self.send_args(device_id1, SET, "eavesdrop_bridge_bleg=true")
  453. self.send_args(device_id1, EAVESDROP, device_id2)
  454. def show_channel(self, device_id):
  455. msg = self.con.api("show", " channels like " + device_id + " as json")
  456. self.logger.info('show_channel::', msg)
  457. return msg
  458. def expression(self, template, params):
  459. for key, value in params.items():
  460. template = template.replace("#{["+key+"]}", str(value))
  461. return template
  462. def stop(self):
  463. for k, v in self.executors.items():
  464. v.shutdown()
  465. self.con.disconnect()
  466. self.is_stopping = True
  467. self.callback.stop()
  468. class OutboundClient:
  469. def __init__(self, agent, app):
  470. self.app = app
  471. self.logger = app.logger
  472. server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent))
  473. server_thread.daemon = True # 设置守护线程
  474. server_thread.start()
  475. class ESLRequestHandler(socketserver.BaseRequestHandler):
  476. def setup(self):
  477. try:
  478. self.server.logger.info('%s connected!', self.client_address)
  479. fd = self.request.fileno()
  480. con = ESL.ESLconnection(fd)
  481. self.server.logger.info('Connected: %s', con.connected())
  482. if con.connected():
  483. info = con.getInfo()
  484. self.server.logger.info(json.loads(info.serialize('json')))
  485. event_name = info.getHeader("Event-Name")
  486. self.server.logger.info('Event-Name: %s', event_name)
  487. device_id = info.getHeader("unique-id")
  488. caller_number = info.getHeader("Caller-Caller-ID-Number") # 获取来电号码
  489. call_id = 'C' + str(Snowflake().next_id())
  490. new_device_id = 'D' + str(Snowflake().next_id())
  491. kwargs = json.loads(info.serialize('json'))
  492. kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id
  493. kwargs['variable_sip_h_P-LIBRA-DeviceId'] = device_id
  494. bucket_call_type = 2
  495. destination = None
  496. service_category = 0
  497. bucket = self.server.get_bucket(call_id)
  498. whitelist = self.server.get_whitelist()
  499. in_whitelist_type = self.in_whitelist(caller_number, whitelist)
  500. user_name = None
  501. self.server.logger.info('call incoming call_id=%s, caller_number=%s, device_id=%s, new_device_id=%s, in_whitelist=%s, bucket=%s', call_id, caller_number, device_id, new_device_id, in_whitelist_type, bucket.name)
  502. # 检查白名单
  503. if in_whitelist_type:
  504. if WhiteTypeEnum.AI == in_whitelist_type:
  505. bucket_call_type = 0
  506. service_category = 1
  507. destination = self.bridge_ai(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs)
  508. user_name = f"机器人{destination}"
  509. else:
  510. bucket_call_type = 0
  511. self.transfer_custom(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs)
  512. else:
  513. # 自然分流
  514. if bucket and bucket.name == 'AI':
  515. bucket_call_type = 1
  516. service_category = 1
  517. destination = self.bridge_ai(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs)
  518. user_name = f"机器人{destination}"
  519. else:
  520. self.transfer_custom(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs)
  521. registry.CALL_INCOMING_REQUESTS.labels(f"{bucket_call_type}").inc()
  522. self.server.dataHandleServer.create_record(call_id, caller_number, bucket_call_type, service_category=service_category, user_id=destination if user_name else None, user_name= user_name)
  523. try:
  524. con.disconnect()
  525. self.server.logger.info("connection disconnected !!!")
  526. except:
  527. traceback.print_exc()
  528. else:
  529. self.server.logger.info("Failed to connect to FreeSWITCH")
  530. except:
  531. traceback.print_exc()
  532. finally:
  533. try:
  534. if self.request.fileno() != -1:
  535. self.request.close()
  536. except OSError:
  537. # Ignore the error if socket is already closed
  538. pass
  539. def transfer_custom(self, con, bucket_call_type, call_id, device_id, new_device_id, **kwargs):
  540. self.answer(con, call_id, device_id)
  541. self.build_call_info(CallType.INCOMING_AGENT_CALL.code, call_id, device_id, new_device_id, destination=None, bucket_type=bucket_call_type, **kwargs)
  542. self.server.agent.acd_service.transfer_to_agent(call_id, device_id)
  543. def bridge_ai(self, con, bucket_call_type, call_id, device_id, new_device_id, **kwargs):
  544. destination = self.server.agent.register(**kwargs)
  545. self.server.logger.info("call_id=%s, device_id=%s, destination=%s, new_device_id=%s" % (call_id, device_id, destination, new_device_id))
  546. self.build_call_info(CallType.INCOMING_BOT_CALL.code, call_id, device_id, new_device_id, str(destination), bucket_type=bucket_call_type, **kwargs)
  547. self.server.cache.add_device_user_part(device_id, destination)
  548. con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s" % (call_id, new_device_id, new_device_id, destination), device_id)
  549. return destination
  550. def in_whitelist(self, caller_number, whitelist):
  551. for x in whitelist:
  552. phone, _type = x
  553. if caller_number in phone or phone in caller_number:
  554. return WhiteTypeEnum.get_by_code(_type)
  555. return None
  556. def answer(self, con, call_id, device_id, timeouts=120):
  557. con.execute("answer", "", device_id)
  558. # con.execute("bgapi", f"uuid_setvar {device_id} {sipHeaderCallId} {call_id}", device_id)
  559. con.execute("playback", HOLD_MUSIC_PATH, device_id)
  560. delay_action = DelayAction(call_id=call_id)
  561. self.server.cache.add_delay_message(DelayActionEnum.ACD_TIMEOUT_PLAY.name, delay_action, timeouts)
  562. def build_call_info(self, call_type, call_id, device_id, new_device_id, destination, bucket_type, **kwargs):
  563. caller = kwargs.get('Channel-Caller-ID-Number')
  564. called = destination
  565. now = datetime.now().timestamp()
  566. call_info = CallInfo(call_id=call_id, agent_key=destination,
  567. caller=caller, called=called, direction=Direction.INBOUND.code,
  568. call_type=call_type, call_time=now,
  569. uuid1=call_id, uuid2=device_id, saas_id=SAAS_ID, bucket_type=bucket_type,
  570. core_uuid=None, cti_flow_id=None, conference=None, group_id=None, hidden_customer=0, caller_display=None, called_display=None, number_location=None, agent_name=None, login_type=None, ivr_id=None, task_id=None, media_host=None, sip_server=None, client_host=None, record=None, record2=None, record_time=None, answer_flag=None, wait_time=None, answer_count=0, hangup_dir=None, sdk_hangup=0, hangup_code=None, answer_time=None, end_time=None, talk_time=None, first_queue_time=None, queue_start_time=None, queue_end_time=None, overflow_count=0, cdr_notify_url=None, queue_level=None, transfer_agent=None,device_list=[], device_info_map = {}, follow_data = {}, process_data = {}, next_commands=[], call_details=[])
  571. device_custom = DeviceInfo(device_id=device_id, call_time=now,
  572. call_id=call_id, device_type=DeviceType.CUSTOMER.code,
  573. agent_key=destination, cdr_type=CdrType.OUTBOUND.code,
  574. cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None)
  575. call_info.device_list.append(device_id)
  576. # call_info.next_commands.append(NextCommand(device_id, NextType.NEXT_CALL_BRIDGE.code, new_device_id))
  577. call_info.device_info_map = {device_id: device_custom}
  578. if CallType.INCOMING_BOT_CALL.code == call_type:
  579. device_bot = DeviceInfo(device_id=new_device_id, call_time=now,
  580. call_id=call_id, device_type=DeviceType.ROBOT.code,
  581. agent_key=destination, cdr_type=CdrType.INBOUND.code,
  582. cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None)
  583. call_info.device_list.append(new_device_id)
  584. call_info.device_info_map[new_device_id] = device_bot
  585. self.server.cache.add_call_info(call_info)
  586. return call_info
  587. class CustomTCPServer(socketserver.TCPServer):
  588. def __init__(self, server_address, RequestHandlerClass, agent, app):
  589. super().__init__(server_address, RequestHandlerClass)
  590. self.agent = agent
  591. self.app = app
  592. self.cache = Cache(app)
  593. self.logger = app.logger
  594. self.whitelist = []
  595. self.buckets = []
  596. # self.update_whitelist() # 初始化加载白名单
  597. # self.update_bucket() #初始化分流
  598. self.dataHandleServer = DataHandleServer(app)
  599. self.cache_job_scheduler = BackgroundScheduler()
  600. self.cache_job_scheduler.add_job(self.update_cache_job, run_date=datetime.now())
  601. self.cache_job_scheduler.add_job(self.update_cache_job, 'interval', seconds=60, max_instances=1, name='cache_job_daemon')
  602. self.cache_job_scheduler.add_job(self.update_whitelist, run_date=datetime.now())
  603. self.cache_job_scheduler.add_job(self.update_whitelist, 'interval', seconds=60, max_instances=1,name='cache_job_whiteList')
  604. self.cache_job_scheduler.start()
  605. def update_cache_job(self):
  606. # self.logger.info("begin flush cache job")
  607. # self.update_whitelist()
  608. self.update_bucket()
  609. # self.logger.info("end flush cache job")
  610. def update_whitelist(self):
  611. with self.app.app_context():
  612. phones = Whitelist.query.filter_by(del_flag=0).all()
  613. self.whitelist = [(phone.phone, phone.type) for phone in phones]
  614. self.logger.info("Whitelist updated: %s", self.whitelist)
  615. def get_whitelist(self):
  616. return self.whitelist
  617. def load_agent_monitor(self):
  618. with self.app.app_context():
  619. agents = AgentMonitor.query.filter_by(check_state=0, service_state=2).all()
  620. agent_nums = [agent.agent_num for agent in agents]
  621. return agent_nums
  622. def get_bucket(self, custom_uuid=None):
  623. custom_uuid == custom_uuid if custom_uuid else str(uuid.uuid4())
  624. random_id = abs(mmh3.hash(custom_uuid))
  625. if len(self.buckets) <= 0:
  626. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  627. for bucket in self.buckets:
  628. num = (random_id % 100 + 100) %100
  629. if bucket.lower <= num < bucket.upper:
  630. return bucket
  631. return self.buckets[0]
  632. def update_bucket(self):
  633. with self.app.app_context():
  634. buckets = Bucket.query.filter_by(eid='001').all()
  635. self.buckets = buckets
  636. self.logger.info("bucket updated: %s", self.buckets)
  637. def start(self, HOST='0.0.0.0', PORT=8084, agent=None):
  638. # HOST, PORT = "0.0.0.0", 8084
  639. # 创建一个 TCP 服务器
  640. with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, self.app) as server:
  641. print(f"ESL server listening on {HOST}:{PORT}")
  642. server.serve_forever()