Procházet zdrojové kódy

Merge branch 'dev_20241205' of ssh://gitlab.fuxicarbon.com:1111/client_service/voice-gateway-service into dev_20241205

余尚辉 před 3 měsíci
rodič
revize
dc32d686bd

+ 45 - 9
src/core/callcenter/callback.py

@@ -1,7 +1,11 @@
 #!/usr/bin/env python3
 # encoding:utf-8
+import queue
 import threading
+import concurrent.futures
 
+import mmh3
+import random
 import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 from src.core.callcenter.agent import AgentEventService
 from src.core.callcenter.cache import Cache
@@ -11,11 +15,37 @@ from src.core.callcenter.esl.constant.event_names import CUSTOM, DETECTED_TONE
 
 class Callback(object):
 
-    def __init__(self, app):
+    def __init__(self, app, thread_num):
         self.app = app
+        self.is_stopping = False
         self.logger = app.logger
         self.cache = Cache(app)
+        self.event_queue = queue.Queue()
+        self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(thread_num)}
         self.agent_event_service = AgentEventService(app)
+        threading.Thread(target=self.start).start()
+
+    def start(self):
+        while not self.is_stopping:
+            try:
+                event, call_info, device_info = self.event_queue.get(timeout=1)
+                call_type = CallType.get_by_code(call_info.call_type) if call_info else None
+                # self.logger.info("callback::call_type=%s, call_info=%s", call_type, call_info)
+                if call_type is None:
+                    continue
+                if CallType.BOT_CALL == call_type:
+                    self.choose_thread_pool_executor(event).submit(self.agent_event_service.bot_event_channel, event, call_info, device_info)
+                else:
+                    self.choose_thread_pool_executor(event).submit(self.agent_event_service.agent_event_channel, event, call_info, device_info)
+            except:
+                pass
+
+
+    def stop(self):
+        self.is_stopping = True
+        for k, v in self.executors.items():
+            v.shutdown()
+
 
     def callback_event(self, event):
         event_name = EslEventUtil.getEventName(event)
@@ -30,13 +60,19 @@ class Callback(object):
         if not call_info:
             # self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
             return
-
-        call_type = CallType.get_by_code(call_info.call_type) if call_info else None
         device_info = call_info.device_info_map.get(device_id) if call_info and call_info.device_info_map else None
-        # self.logger.info("callback::event_name=%s, call_type=%s, call_id=%s, device_id=%s, call_info=%s", event_name, call_type, call_id, device_id, call_info)
-        if CallType.BOT_CALL == call_type:
-            threading.Thread(target=self.agent_event_service.bot_event_channel, args=(event, call_info, device_info)).start()
-            # self.agent_event_service.bot_event_channel(event, call_info, device_info)
+        self.logger.info("callback::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
+        self.event_queue.put_nowait((event, call_info, device_info))
+
+
+    def choose_thread_pool_executor(self, e):
+        call_id = EslEventUtil.getCallId(e)
+        device_id = EslEventUtil.getUniqueId(e)
+        wdh_device_id = EslEventUtil.getDeviceId(e)
+        random_id = call_id if call_id else device_id
+        if random_id:
+            random_index = abs(mmh3.hash(random_id)) % len(self.executors)
         else:
-            threading.Thread(target=self.agent_event_service.agent_event_channel, args=(event, call_info, device_info)).start()
-            # self.agent_event_service.agent_event_channel(event, call_info, device_info)
+            random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0
+        # self.logger.info('choose_thread_pool_executor.index %s %s %s %s', random_index, call_id, device_id, wdh_device_id)
+        return self.executors.get(random_index)

+ 5 - 25
src/core/callcenter/esl/client.py

@@ -45,7 +45,7 @@ class InboundClient:
         self.logger = app.logger
         self.bot_agent = agent
         self.cache = Cache(app)
-        self.callback = Callback(app)
+        self.callback = Callback(app, self.thread_num)
         self.dataHandleServer = DataHandleServer(app)
         self.handler_table = self.scan_esl_event_handlers()
         self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent)
@@ -100,7 +100,7 @@ class InboundClient:
                     time.sleep(3)
                     self.start()
                 else:
-                    # threading.Thread(target=self.process_esl_event, args=(e,)).start()
+                    registry.FLASK_ACTIVE_THREADS.set(threading.active_count())
                     self.choose_thread_pool_executor(e).submit(self.process_esl_event, e)
 
     def choose_thread_pool_executor(self, e):
@@ -183,6 +183,7 @@ class InboundClient:
             self.cache.add_call_info(call_info)
             self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
             self.dataHandleServer.update_record(call_id, status= 0)
+
     def exec_when_play_timeout(self, call_id):
         call_info = self.cache.get_call_info(call_id)
         if not call_info or not call_info.next_commands:
@@ -210,6 +211,7 @@ class InboundClient:
             next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code)
             call_info.next_commands = [next_command]
             self.cache.add_call_info(call_info)
+            self.dataHandleServer.update_record(call_id, status= 0)
             self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
                              device_id, WaitingHangupMusicPath)
 
@@ -491,6 +493,7 @@ class InboundClient:
             v.shutdown()
         self.con.disconnect()
         self.is_stopping = True
+        self.callback.stop()
 
 
 class OutboundClient:
@@ -555,29 +558,6 @@ class OutboundClient:
                     registry.CALL_INCOMING_REQUESTS.labels(f"{_bucket_call_type}").inc()
                     self.server.dataHandleServer.create_record(call_id, caller_number, _bucket_call_type, service_category=service_category, destination=destination)
 
-                    # # 检查白名单
-                    # if caller_number in whitelist:
-                    #     agents = self.server.load_agent_monitor()
-                    #     destination = random.choice(agents) # 随机取一个坐席号
-                    #     # 直接转接到人工坐席
-                    #     self.server.logger.info( "Caller %s is in whitelist, agents: %s, destination: %s"% (caller_number, agents, destination))
-                    #     call_info['type']= 0
-                    #     call_info['agent_num'] = destination
-                    # else:
-                    #     #转到ai机器人
-                    #     destination = self.server.agent.register(**kwargs)
-                    #     self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s"% (device_id, destination, new_device_id))
-                    #     call_info['type'] = 1
-                    #     call_info['service_category'] = 1
-                    #     call_info['user_id'] = destination
-                    #     call_info['user_name'] = f"机器人{destination}"
-                    #     self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs)
-                    #     self.server.cache.add_device_user_part(device_id, destination)
-                    #     con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id)
-
-                    # self.server.dataHandleServer.create_record(call_info)
-
-
                     # destination = "user/1001"
                     # msg = ESL.ESLevent("sendmsg", uuid)
                     # msg.addHeader("call-command", "execute")

+ 5 - 0
src/core/callcenter/esl/handler/channel_hangup_handler.py

@@ -15,6 +15,7 @@ from src.core.callcenter.esl.constant.event_names import CHANNEL_HANGUP
 from src.core.callcenter.esl.handler.esl_event_handler import EslEventHandler
 from src.core.callcenter.api import CallInfo, DeviceInfo, NextCommand
 from src.core.callcenter.push import PushHandler
+from src.core.callcenter import registry
 from src.core.callcenter.data_handler import *
 
 @EslEventName(CHANNEL_HANGUP)
@@ -99,6 +100,7 @@ class ChannelHangupHandler(EslEventHandler):
             # if next_command:
             #     self.next_cmd(call_info, device_info, next_command, cause)
             #     return
+
             # 一般情况下,挂断其他所有设备
             if device_info.cdr_type <= 4 and not call_info.end_time and not skip_hangup_all:
                 call_info.end_time = device_info.end_time
@@ -106,6 +108,9 @@ class ChannelHangupHandler(EslEventHandler):
                 self.call_service.hangup_all(call_info, CallCause.HANGUP_EVENT)
                 # self.inbound_client.hangup_call(call_id, device_id, CallCause.HANGUP_EVENT)
 
+            if CallType.BOT_CALL.code == call_info.call_type and device_info.device_type == DeviceType.ROBOT.code:
+                registry.CALL_BOT_HANGUP_REQUESTS.labels(f"{call_info.bucket_type}").inc()
+
             # 全部挂机以后推送挂机状态
             # self.logger.info('yushanghui::call_info.device_list %s', call_info.device_list)
             # if len(call_info.device_list) == 0:

+ 4 - 1
src/core/callcenter/registry.py

@@ -14,7 +14,10 @@ metrics = PrometheusMetrics(app)
 CALL_INCOMING_REQUESTS = Counter('call_incoming_requests', '呼入总流量', ['bucket'])
 CALL_BOT_ANSWER_REQUESTS = Counter('call_bot_answer_requests', '机器人接听量', ['bucket'])
 CALL_BOT_TRANSFER_REQUESTS = Counter('call_bot_transfer_requests', '机器转人量', ['bucket'])
+CALL_BOT_HANGUP_REQUESTS = Counter('call_bot_hangup_requests', '机器挂机量', ['bucket'])
 
 # esl时间耗时
 ESL_EVENT_LATENCY = Histogram('esl_event_latency', 'Esl Event latency in seconds', ['eventName'])
-ESL_EVENT_CALLBACK_LATENCY = Histogram('esl_event_callback_latency', 'Esl Event callback latency in seconds', ['eventName','callType'])
+ESL_EVENT_CALLBACK_LATENCY = Histogram('esl_event_callback_latency', 'Esl Event callback latency in seconds', ['eventName','callType'])
+
+FLASK_ACTIVE_THREADS = Gauge('flask_active_threads', 'Number of active threads')

+ 2 - 0
src/core/callcenter/views.py

@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 # encoding:utf-8
+
 from flask import request, render_template_string
 
 from src.core.callcenter.agent import AgentService, AgentOperService
@@ -12,6 +13,7 @@ from src.core.voip.bot import BotAgent
 from . import app
 from .acd import AcdService
 
+
 agent = BotAgent(app)
 inbound_client = InboundClient(agent,app)
 outbound_client = OutboundClient(agent,app)