client.py 23 KB


  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import json
  4. import random
  5. import socketserver
  6. import sys
  7. import ESL
  8. import time
  9. import mmh3
  10. import threading
  11. import traceback
  12. import concurrent.futures
  13. from apscheduler.schedulers.background import BackgroundScheduler
  14. import src.core.callcenter.cache as Cache
  15. from src.core.callcenter.api import MakeCallContext, DelayAction
  16. from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH
  17. from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, SPACE, SPLIT, SOFIA, \
  18. ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, UUID_BREAK, UUID_HOLD, \
  19. UUID_RECORD, UUID_SETVAR, UUID_GETVAR
  20. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  21. import src.core.callcenter.esl.handler as event_handler
  22. from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2
  23. from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType
  24. from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
  25. from src.core.datasource import SERVE_HOST
  26. from src.core.voip.constant import *
  27. from src.core.callcenter.dao import *
  28. class InboundClient:
  29. def __init__(self, agent, logger):
  30. self.con = None
  31. self.thread_num = 12
  32. self.is_stopping = False
  33. self.logger = logger
  34. self.bot_agent = agent
  35. self.handler_table = self.scan_esl_event_handlers()
  36. self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent, self.logger)
  37. self.host, self.port, self.password = SERVE_HOST, '8021', '4918257983818884358'
  38. self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(self.thread_num)}
  39. self.delay_action_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
  40. self.delay_action_scheduler = BackgroundScheduler()
  41. self.delay_action_scheduler.add_job(self.submit_delay_action, 'interval', seconds=1, max_instances=1)
  42. self.delay_action_scheduler.start()
  43. threading.Thread(target=self.start, args=()).start()
  44. def submit_delay_action(self):
  45. for name, member in DelayActionEnum.__members__.items():
  46. action_messages = Cache.get_delay_message(name)
  47. for action_message in action_messages:
  48. self.delay_action_executor.submit(self.do_delay_action, name, action_message)
  49. def scan_esl_event_handlers(self):
  50. import inspect
  51. import importlib
  52. import pkgutil
  53. classes = []
  54. # 遍历包中的模块
  55. for module_info in pkgutil.iter_modules(event_handler.__path__):
  56. module = importlib.import_module(f"{event_handler.__name__}.{module_info.name}")
  57. for _name, _cls in inspect.getmembers(module, inspect.isclass):
  58. if hasattr(_cls, '_esl_event_name'):
  59. classes.append(_cls)
  60. handlers = {}
  61. for _cls in classes:
  62. items = handlers.get(_cls._esl_event_name, [])
  63. items.append(_cls(self, self.bot_agent, self.logger))
  64. handlers[_cls._esl_event_name] = items
  65. return handlers
  66. def start(self):
  67. self.logger.info('inbound.start')
  68. self.con = ESL.ESLconnection(self.host, self.port, self.password)
  69. if self.con.connected():
  70. self.logger.info('inbound esl connected ... ')
  71. self.con.events('plain', 'all') #CHANNEL_ORIGINATE,CHANNEL_PROGRESS,CHANNEL_PROGRESS_MEDIA,CHANNEL_ANSWER,CHANNEL_HANGUP,CUSTOM,PLAYBACK_START,PLAYBACK_STOP,DETECTED_TONE
  72. while not self.is_stopping:
  73. e = self.con.recvEvent()
  74. # if e:
  75. # self.logger.info(json.loads(e.serialize('json')))
  76. event_name = e.getHeader("Event-Name")
  77. if event_name == "SERVER_DISCONNECTED":
  78. self.logger.info('come in SERVER_DISCONNECTED case')
  79. self.con.disconnect()
  80. time.sleep(3)
  81. self.start()
  82. else:
  83. # threading.Thread(target=self.process_esl_event, args=(e,)).start()
  84. self.choose_thread_pool_executor(e).submit(self.process_esl_event, e)
  85. def choose_thread_pool_executor(self, e):
  86. call_id = EslEventUtil.getCallId(e)
  87. device_id = EslEventUtil.getUniqueId(e)
  88. wdh_device_id = EslEventUtil.getDeviceId(e)
  89. random_id = call_id if call_id else device_id
  90. if random_id:
  91. random_index = abs(mmh3.hash(random_id)) % len(self.executors)
  92. else:
  93. random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0
  94. # print('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id)
  95. return self.executors.get(random_index)
  96. def process_esl_event(self, e):
  97. # print(json.loads(e.serialize('json')))
  98. event_name = EslEventUtil.getEventName(e)
  99. coreUUID = EslEventUtil.getCoreUuid(e)
  100. address = self.host + ':' + self.port
  101. # self.logger.info("process_esl_event.event_name=%s,coreUUID=%s", event_name, coreUUID)
  102. try:
  103. if event_name in self.handler_table:
  104. items = self.handler_table.get(event_name)
  105. for x in items:
  106. try:
  107. x.handle(address, e, coreUUID)
  108. except:
  109. traceback.print_exc()
  110. else:
  111. self.default_event_handler.handle(address, e, coreUUID)
  112. except:
  113. traceback.print_exc()
  114. def do_delay_action(self, action, message):
  115. delay_action = DelayAction.from_json(message)
  116. flag = Cache.lock_delay_action(delay_action.uuid)
  117. if not flag:
  118. self.logger.info("异步延迟执行操作重复 action:%s msg:%s", action, message)
  119. return
  120. delay_action_enum = DelayActionEnum.get_by_code(action)
  121. if not delay_action_enum:
  122. self.logger.info("异步延迟执行 delayActionEnum为空 action:%s msg:%s", action, message)
  123. return
  124. if DelayActionEnum.CALL_TIMEOUT_HANGUP == delay_action_enum:
  125. self.exec_when_call_timeout(delay_action.call_id, delay_action.device_id)
  126. elif DelayActionEnum.PLAY_TIMEOUT_HANGUP == delay_action_enum:
  127. self.exec_when_play_timeout(delay_action.call_id)
  128. elif DelayActionEnum.ACD_TIMEOUT_PLAY == delay_action_enum:
  129. self.exec_when_acd_timeout(delay_action.call_id)
  130. def exec_when_call_timeout(self, call_id, device_id):
  131. call_info = Cache.get_call_info(call_id)
  132. if not call_info or not (device_id in call_info.device_list):
  133. return
  134. device_info = call_info.device_info_map.get(device_id)
  135. if device_info and device_info.answer_time is None:
  136. self.logger.info("call:%s deviceId:%s execWhenCallTimeOut", call_id, device_id)
  137. device_info.hangup_cause = CallCause.CALL_TIMEOUT.name
  138. call_info.next_commands = []
  139. if device_info.device_type <= DeviceType.ROBOT.code:
  140. call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code
  141. call_info.hangup_code = CallCause.CALL_TIMEOUT.code
  142. # if device_info.device_type.code == DeviceType.CUSTOMER.code:
  143. # call_info.user_no_answer_end_call = True
  144. if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code:
  145. channel = self.show_channel(device_id)
  146. if channel:
  147. delay_action = DelayAction(call_id=call_id, device_id=device_id)
  148. Cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20)
  149. Cache.add_call_info(call_info)
  150. self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
  151. def exec_when_play_timeout(self, call_id):
  152. call_info = Cache.get_call_info(call_id)
  153. if not call_info or not call_info.next_commands:
  154. return
  155. self.logger.debug("播放结束音乐失败,进行挂机 callId:%s", call_id)
  156. next_types = [x.next_type for x in call_info.next_commands]
  157. if NextType.NEXT_HANGUP.code in next_types:
  158. for device_id in call_info.device_list:
  159. self.hangup_call(call_id, device_id, CallCause.PLAY_TIMEOUT)
  160. def exec_when_acd_timeout(self, call_id):
  161. call_info = Cache.get_call_info(call_id)
  162. if not call_info:
  163. self.logger.info("exec_when_acd_timeout callInfo为空 callId: {}", call_id)
  164. return
  165. device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER]
  166. if device_list and len(device_list) == 1:
  167. device_id = device_list[0].device_id
  168. self.bridge_break(device_id)
  169. self.hold_play(device_id, HOLD_MUSIC_PATH)
  170. self.play_timeout(call_id, timeout=30)
  171. Cache.add_call_info(call_info)
  172. self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
  173. device_id, HOLD_MUSIC_PATH)
  174. def make_call_new(self, context: MakeCallContext):
  175. called = context.get_called()
  176. params = {'gateway': context.route_gateway_name, 'called': called, 'realm': context.get_realm()}
  177. builder = [
  178. '{', context.get_sip_header(), '}'
  179. ]
  180. if context.device_type == DeviceType.CUSTOMER.code:
  181. profile = self.expression(profile1, params)
  182. builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}")
  183. else:
  184. profile = self.expression(profile2, params)
  185. builder.append(f"{profile}{PARK}")
  186. cmd = "".join(builder)
  187. print(cmd)
  188. # self.logger.info(cmd)
  189. sys.stdout.flush() # 强制刷新输出缓冲区
  190. self.con.bgapi(ORIGINATE, cmd)
  191. def make_call(self, route_gateway, display, called, call_id, device_id, timeout=30, originate_timeout=30, *sip_headers):
  192. # called = f"{called}{AT}{route_gateway.media_host}{CO}{route_gateway.media_port}"
  193. if route_gateway.caller_prefix:
  194. display = f"{route_gateway.caller_prefix}{display}"
  195. if route_gateway.called_prefix:
  196. called = f"{route_gateway.called_prefix}{called}"
  197. sip_buffer = []
  198. if sip_headers:
  199. sip_buffer = [f"{SIP_HEADER}{header}" for header in sip_headers]
  200. params = {
  201. "callId": call_id,
  202. "deviceId": device_id,
  203. "caller": display,
  204. "called": called,
  205. }
  206. if route_gateway.sip_header1:
  207. sip_header1 = self.expression(route_gateway.sip_header1, params)
  208. sip_buffer.append(f"{SIP_HEADER}{sip_header1}")
  209. if route_gateway.sip_header2:
  210. sip_header2 = self.expression(route_gateway.sip_header2, params)
  211. sip_buffer.append(f"{SIP_HEADER}{sip_header2}")
  212. if route_gateway.sip_header3:
  213. sip_buffer.append(f"{SIP_HEADER}{route_gateway.sip_header3}")
  214. builder = [
  215. "{return_ring_ready=true,",
  216. f"sip_contact_user={display},",
  217. "ring_asr=true,",
  218. "absolute_codec_string=^^:PCMU:PCMA,", # Assuming codecs is defined somewhere
  219. f"origination_caller_id_number={display},",
  220. f"origination_caller_id_name={display},",
  221. f"origination_uuid={device_id},",
  222. ]
  223. if originate_timeout:
  224. builder.append(f"originate_timeout={originate_timeout},")
  225. if sip_buffer:
  226. builder.append(f"{SPLIT}".join(sip_buffer))
  227. builder.append("}")
  228. builder.append(f"{SOFIA}{SK}{route_gateway.profile}{SK}{called}{PARK}")
  229. cmd = "".join(builder)
  230. print(cmd)
  231. self.con.bgapi(ORIGINATE, cmd)
  232. def call_timeout(self, call_id, device_id, timeout):
  233. """呼叫超时主动挂机"""
  234. delay_action = DelayAction(call_id=call_id, device_id=device_id)
  235. Cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
  236. def send_args(self, device_id, name, arg):
  237. msg = ESL.ESLevent("sendmsg", device_id)
  238. msg.addHeader("call-command", EXECUTE)
  239. msg.addHeader("execute-app-name", name)
  240. msg.addHeader("execute-app-arg", arg)
  241. self.con.sendEvent(msg)
  242. def bridge_call(self, call_id, device_id1, device_id2):
  243. """桥接电话"""
  244. self.multi_set_var(device_id1, BRIDGE_VARIABLES)
  245. self.multi_set_var(device_id2, BRIDGE_VARIABLES)
  246. self.con.bgapi(BRIDGE, device_id1 + SPACE + device_id2)
  247. def transfer_call(self, _from, _to):
  248. """转接"""
  249. builder = [
  250. _from,
  251. " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline "
  252. ]
  253. arg = ''.join(builder)
  254. self.con.bgapi(TRANSFER, arg)
  255. def answer(self, device_id):
  256. """应答"""
  257. self.con.bgapi('uuid_phone_event', device_id + ' talk')
  258. def hangup_call(self, call_id, device_id, case_enum=CallCause.DEFAULT):
  259. """挂机"""
  260. msg = ESL.ESLevent("sendmsg", device_id)
  261. msg.addHeader("call-command", EXECUTE)
  262. msg.addHeader("execute-app-name", HANGUP)
  263. msg.addHeader("execute-app-arg", NORMAL_CLEARING)
  264. self.logger.info("hangup_call挂机 hangup call: %s, device: %s, ctiCauseEnum: %s", call_id, device_id, case_enum)
  265. self.send_args(device_id, SET, EslEventUtil.SIP_H_P_LIBRA_HANGUP_CAUSE + "=" + case_enum.description)
  266. self.con.sendEvent(msg)
  267. def broadcast(self, uuid, path, smf):
  268. builder = [
  269. UUID_BROADCAST,
  270. uuid,
  271. path,
  272. smf
  273. ]
  274. command = ' '.join(builder)
  275. self.con.bgapi(command, EMPTY)
  276. def break0(self, uuid, all=False, sync=True):
  277. builder = [
  278. UUID_BREAK,
  279. uuid
  280. ]
  281. if all:
  282. builder.append("all")
  283. command = ' '.join(builder)
  284. if sync:
  285. self.con.api(command, EMPTY)
  286. else:
  287. self.con.bgapi(command, EMPTY)
  288. def hold(self, smf, uuid, display):
  289. builder = [
  290. UUID_HOLD,
  291. smf,
  292. uuid,
  293. display
  294. ]
  295. if display:
  296. builder.append("all")
  297. else:
  298. builder.append(EMPTY)
  299. command = ' '.join(builder)
  300. self.con.bgapi(command, EMPTY)
  301. def get_var(self, uuid, var):
  302. builder = [
  303. UUID_GETVAR,
  304. uuid,
  305. var
  306. ]
  307. command = ' '.join(builder)
  308. self.con.bgapi(command, EMPTY)
  309. def set_var(self, uuid, var, val):
  310. builder = [
  311. UUID_SETVAR,
  312. uuid,
  313. var,
  314. val
  315. ]
  316. command = ' '.join(builder)
  317. self.con.bgapi(command, EMPTY)
  318. def multi_set_var(self, uuid, params):
  319. builder = [
  320. "uuid_setvar_multi " + uuid + " "
  321. ]
  322. builder1 = []
  323. for k, v in params.items():
  324. builder1.append(k + "="+v)
  325. builder.append(';'.join(builder1))
  326. command = ''.join(builder)
  327. self.con.bgapi(command, EMPTY)
  328. def record(self, uuid, action, path, limit):
  329. builder = [
  330. UUID_RECORD,
  331. uuid,
  332. action,
  333. path,
  334. str(limit)
  335. ]
  336. command = ' '.join(builder)
  337. self.con.bgapi(command, EMPTY)
  338. def transfer(self, uuid, smf, dest, dialplan, context):
  339. builder = [
  340. UUID_TRANSFER,
  341. uuid,
  342. smf,
  343. dest,
  344. dialplan,
  345. context
  346. ]
  347. command = ' '.join(builder)
  348. self.con.bgapi(command, EMPTY)
  349. def insert(self, device_id):
  350. """强插"""
  351. builder = [
  352. device_id,
  353. " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,park:' inline "
  354. ]
  355. arg = ''.join(builder)
  356. self.con.api(TRANSFER, arg)
  357. def bridge_break(self, device_id):
  358. """拆线"""
  359. builder = [
  360. device_id,
  361. " -both 'set:hangup_after_bridge=false,set:park_after_bridge=true,set:" + SIP_HEADER + sipHeaderHoldMusic + "=true,park:' inline "
  362. ]
  363. arg = ''.join(builder)
  364. self.con.api(TRANSFER, arg)
  365. def play_file(self, call_id, device_id, file, sync):
  366. """放音"""
  367. if sync:
  368. return self.hold_play(device_id, file)
  369. else:
  370. msg = ESL.ESLevent("sendmsg", device_id)
  371. msg.addHeader("call-command", EXECUTE)
  372. msg.addHeader("execute-app-name", PLAYBACK)
  373. msg.addHeader("execute-app-arg", file)
  374. msg.addHeader("async", "true")
  375. self.con.sendEvent(msg)
  376. def stop_play(self, device_id):
  377. """关闭播放音乐"""
  378. builder = [
  379. device_id,
  380. " on"
  381. ]
  382. arg = "".join(builder)
  383. return self.con.api(PAUSE, arg)
  384. def hold_play(self, device_id, play):
  385. """向a-leg插播tts音乐(无限播放)"""
  386. builder = [
  387. device_id,
  388. " playback::",
  389. play,
  390. " ",
  391. SMF_ALEG
  392. ]
  393. arg = "".join(builder)
  394. return self.con.api(UUID_BROADCAST, arg)
  395. def play_timeout(self, call_id, timeout):
  396. """播放超时主动挂机"""
  397. delay_action = DelayAction(call_id=call_id)
  398. Cache.add_delay_message(DelayActionEnum.PLAY_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
  399. def listen(self, device_id1, device_id2, aleg=True, bleg=True):
  400. """监听"""
  401. if aleg:
  402. self.send_args(device_id1, SET, "eavesdrop_bridge_aleg=true")
  403. if bleg:
  404. self.send_args(device_id1, SET, "eavesdrop_bridge_bleg=true")
  405. self.send_args(device_id1, EAVESDROP, device_id2)
  406. def show_channel(self, device_id):
  407. msg = self.con.api("show", " channels like " + device_id + " as json")
  408. print('show_channel::', msg)
  409. return msg
  410. def expression(self, template, params):
  411. for key, value in params.items():
  412. template = template.replace("#{["+key+"]}", str(value))
  413. return template
  414. def stop(self):
  415. for k, v in self.executors.items():
  416. v.shutdown()
  417. self.con.disconnect()
  418. self.is_stopping = True
  419. class OutboundClient:
  420. def __init__(self, agent, logger,app):
  421. self.logger = logger
  422. self.app = app
  423. self.whitelist = []
  424. self.update_whitelist() # 初始化加载白名单
  425. # 定时更新白名单
  426. threading.Thread(target=self.refresh_whitelist, daemon=True).start()
  427. #threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent, logger)).start()
  428. server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent, logger))
  429. server_thread.daemon = True # 设置守护线程
  430. server_thread.start()
  431. def update_whitelist(self):
  432. with self.app.app_context():
  433. phones = Whitelist.query.filter_by(del_flag=0).all()
  434. self.whitelist = [phone.phone for phone in phones]
  435. self.logger.info("Whitelist updated: %s", self.whitelist)
  436. def refresh_whitelist(self):
  437. while True:
  438. time.sleep(3600) # 每 1小时 更新一次
  439. self.update_whitelist()
  440. def load_whitelist(self):
  441. return self.whitelist
  442. def load_agent_monitor(self):
  443. with self.app.app_context():
  444. agents = AgentMonitor.query.filter_by(check_state=0,service_state=2).all()
  445. agent_nums = [agent.agent_num for agent in agents]
  446. return agent_nums
  447. class ESLRequestHandler(socketserver.BaseRequestHandler):
  448. def setup(self):
  449. try:
  450. self.server.logger.info('%s connected!', self.client_address)
  451. fd = self.request.fileno()
  452. con = ESL.ESLconnection(fd)
  453. self.server.logger.info('Connected: %s', con.connected())
  454. if con.connected():
  455. info = con.getInfo()
  456. # print(json.loads(info.serialize('json')))
  457. event_name = info.getHeader("Event-Name")
  458. device_id = info.getHeader("unique-id")
  459. caller_number = info.getHeader("Caller-Caller-ID-Number") # 获取来电号码
  460. whitelist = self.server.load_whitelist()
  461. agents = self.server.load_agent_monitor()
  462. # 直接转接到人工坐席
  463. # 检查白名单
  464. if caller_number in whitelist:
  465. # 随机取一个坐席号
  466. destination = random.choice(agents)
  467. # 直接转接到人工坐席
  468. self.server.logger.info("Caller %s is in whitelist, directly transferring call, agents: %s, destination: %s", caller_number, agents,destination)
  469. return
  470. kwargs = json.loads(info.serialize('json'))
  471. destination = self.server.agent.register(**kwargs)
  472. self.server.logger.info("device_id=%s, destination=%s", device_id, destination)
  473. Cache.add_device_user_part(device_id, destination)
  474. con.execute("bridge", f"user/{destination}", device_id)
  475. # destination = "user/1001"
  476. # msg = ESL.ESLevent("sendmsg", uuid)
  477. # msg.addHeader("call-command", "execute")
  478. # msg.addHeader("execute-app-name", "bridge")
  479. # msg.addHeader("execute-app-arg", destination)
  480. # # 发送消息以执行 bridge 操作
  481. # con.sendEvent(msg)
  482. # print(f"Call {uuid} is bridged to {destination}")
  483. # con.execute("answer", "", uuid)
  484. # con.execute("transfer", "1001 XML default", uuid)
  485. # try:
  486. # con.disconnect()
  487. # except:
  488. # print('come in ')
  489. # traceback.print_exc()
  490. else:
  491. self.server.logger.info("Failed to connect to FreeSWITCH")
  492. except:
  493. traceback.print_exc()
  494. class CustomTCPServer(socketserver.TCPServer):
  495. def __init__(self, server_address, RequestHandlerClass, agent, logger,whitelist_loader,load_agent_monitor):
  496. self.agent = agent
  497. self.logger = logger
  498. self.load_whitelist = whitelist_loader
  499. self.load_agent_monitor = load_agent_monitor
  500. super().__init__(server_address, RequestHandlerClass)
  501. def start(self, HOST='0.0.0.0', PORT=8084, agent=None, logger=None):
  502. # HOST, PORT = "0.0.0.0", 8084
  503. # 创建一个 TCP 服务器
  504. with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, logger, self.load_whitelist, self.load_agent_monitor) as server:
  505. self.logger.info(f"ESL server listening on {HOST}:{PORT}")
  506. server.serve_forever()