callback.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import json
  4. import queue
  5. import threading
  6. import concurrent.futures
  7. import traceback
  8. import mmh3
  9. import random
  10. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  11. from src.core.callcenter.agent import AgentEventService
  12. from src.core.callcenter.cache import Cache
  13. from src.core.callcenter.enumeration import CallType
  14. from src.core.callcenter.esl.constant.event_names import CUSTOM, DETECTED_TONE
  15. class Callback(object):
  16. def __init__(self, app, thread_num):
  17. self.app = app
  18. self.is_stopping = False
  19. self.logger = app.logger
  20. self.cache = Cache(app)
  21. self.event_queue = queue.Queue()
  22. self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="callback-event-pool") for x in range(thread_num)}
  23. self.agent_event_service = AgentEventService(app)
  24. threading.Thread(target=self.start).start()
  25. def start(self):
  26. while not self.is_stopping:
  27. try:
  28. event, call_info, device_info = self.event_queue.get(timeout=1)
  29. call_type = CallType.get_by_code(call_info.call_type) if call_info else None
  30. if call_type is None:
  31. event_name = EslEventUtil.getEventName(event)
  32. self.logger.info("callback:call_type_none:return::event_name=%s, call_info=%s, device_info=%s", event_name, call_info, device_info)
  33. continue
  34. if CallType.BOT_CALL == call_type or CallType.INCOMING_BOT_CALL == call_type:
  35. self.choose_thread_pool_executor(event).submit(self.agent_event_service.bot_event_channel, event, call_info, device_info)
  36. else:
  37. self.choose_thread_pool_executor(event).submit(self.agent_event_service.agent_event_channel, event, call_info, device_info)
  38. except:
  39. traceback.print_exc()
  40. def stop(self):
  41. self.is_stopping = True
  42. for k, v in self.executors.items():
  43. v.shutdown()
  44. def callback_event(self, event):
  45. event_name = EslEventUtil.getEventName(event)
  46. # CUSTOM == event_name or
  47. if not (event_name.startswith('CHANNEL_') or event_name.startswith('PLAYBACK_') or event_name == DETECTED_TONE):
  48. return
  49. call_id = EslEventUtil.getCallId(event)
  50. device_id = EslEventUtil.getDeviceId(event)
  51. if not call_id and device_id:
  52. call_id = self.cache.get_call_id_by_device_id(device_id)
  53. call_info = self.cache.get_call_info(call_id)
  54. if not call_info:
  55. self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
  56. return
  57. device_info = call_info.device_info_map.get(device_id) if call_info and call_info.device_info_map else None
  58. self.logger.info("callback::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
  59. self.event_queue.put_nowait((event, call_info, device_info))
  60. def choose_thread_pool_executor(self, e):
  61. call_id = EslEventUtil.getCallId(e)
  62. device_id = EslEventUtil.getUniqueId(e)
  63. wdh_device_id = EslEventUtil.getDeviceId(e)
  64. random_id = call_id if call_id else device_id
  65. if random_id:
  66. random_index = abs(mmh3.hash(random_id)) % len(self.executors)
  67. else:
  68. random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0
  69. # self.logger.info('choose_thread_pool_executor.index %s %s %s %s', random_index, call_id, device_id, wdh_device_id)
  70. return self.executors.get(random_index)