DavidLiu 5 сар өмнө
parent
commit
a800e0894e

+ 46 - 9
src/core/callcenter/callback.py

@@ -1,7 +1,11 @@
 #!/usr/bin/env python3
 # encoding:utf-8
+import queue
 import threading
+import concurrent.futures
 
+import mmh3
+import random
 import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 from src.core.callcenter.agent import AgentEventService
 from src.core.callcenter.cache import Cache
@@ -11,11 +15,31 @@ from src.core.callcenter.esl.constant.event_names import CUSTOM, DETECTED_TONE
 
 class Callback(object):
 
-    def __init__(self, app):
+    def __init__(self, app, thread_num):
         self.app = app
+        self.is_stopping = False
         self.logger = app.logger
         self.cache = Cache(app)
+        self.event_queue = queue.Queue()
+        self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(thread_num)}
         self.agent_event_service = AgentEventService(app)
+        threading.Thread(target=self.start).start()
+
+    def start(self):
+        while not self.is_stopping:
+            event, call_info, device_info = self.event_queue.get(timeout=1)
+            call_type = CallType.get_by_code(call_info.call_type) if call_info else None
+            if call_type is None:
+                continue
+            if CallType.BOT_CALL == call_type:
+                self.choose_thread_pool_executor(event).submit(self.agent_event_service.bot_event_channel, event, call_info, device_info)
+            else:
+                self.choose_thread_pool_executor(event).submit(self.agent_event_service.agent_event_channel, event, call_info, device_info)
+
+    def stop(self):
+        self.is_stopping = True
+        for k, v in self.executors.items():
+            v.shutdown()
 
     def callback_event(self, event):
         event_name = EslEventUtil.getEventName(event)
@@ -30,13 +54,26 @@ class Callback(object):
         if not call_info:
             # self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
             return
-
-        call_type = CallType.get_by_code(call_info.call_type) if call_info else None
         device_info = call_info.device_info_map.get(device_id) if call_info and call_info.device_info_map else None
-        # self.logger.info("callback::event_name=%s, call_type=%s, call_id=%s, device_id=%s, call_info=%s", event_name, call_type, call_id, device_id, call_info)
-        if CallType.BOT_CALL == call_type:
-            threading.Thread(target=self.agent_event_service.bot_event_channel, args=(event, call_info, device_info)).start()
-            # self.agent_event_service.bot_event_channel(event, call_info, device_info)
+        self.event_queue.put_nowait((event, call_info, device_info))
+
+        # call_type = CallType.get_by_code(call_info.call_type) if call_info else None
+        # # self.logger.info("callback::event_name=%s, call_type=%s, call_id=%s, device_id=%s, call_info=%s", event_name, call_type, call_id, device_id, call_info)
+        # if CallType.BOT_CALL == call_type:
+        #     threading.Thread(target=self.agent_event_service.bot_event_channel, args=(event, call_info, device_info)).start()
+        #     # self.agent_event_service.bot_event_channel(event, call_info, device_info)
+        # else:
+        #     threading.Thread(target=self.agent_event_service.agent_event_channel, args=(event, call_info, device_info)).start()
+        #     # self.agent_event_service.agent_event_channel(event, call_info, device_info)
+
+    def choose_thread_pool_executor(self, e):
+        call_id = EslEventUtil.getCallId(e)
+        device_id = EslEventUtil.getUniqueId(e)
+        wdh_device_id = EslEventUtil.getDeviceId(e)
+        random_id = call_id if call_id else device_id
+        if random_id:
+            random_index = abs(mmh3.hash(random_id)) % len(self.executors)
         else:
-            threading.Thread(target=self.agent_event_service.agent_event_channel, args=(event, call_info, device_info)).start()
-            # self.agent_event_service.agent_event_channel(event, call_info, device_info)
+            random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0
+        # self.logger.info('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id)
+        return self.executors.get(random_index)

+ 2 - 1
src/core/callcenter/esl/client.py

@@ -45,7 +45,7 @@ class InboundClient:
         self.logger = app.logger
         self.bot_agent = agent
         self.cache = Cache(app)
-        self.callback = Callback(app)
+        self.callback = Callback(app, self.thread_num)
         self.dataHandleServer = DataHandleServer(app)
         self.handler_table = self.scan_esl_event_handlers()
         self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent)
@@ -491,6 +491,7 @@ class InboundClient:
             v.shutdown()
         self.con.disconnect()
         self.is_stopping = True
+        self.callback.stop()
 
 
 class OutboundClient: