Procházet zdrojové kódy

bucket dev, 入口分流逻辑开发

DavidLiu před 4 měsíci
rodič
revize
3c44d0d4f1

+ 1 - 1
src/core/callcenter/acd.py

@@ -29,7 +29,7 @@ class AcdService:
         self.checkIdleScheduler.add_job(self.try_transfer_agent, 'interval', seconds=2, max_instances=1)
         self.checkIdleScheduler.start()
 
-    def transfer_to_agent(self, call_id, device_id, service_id):
+    def transfer_to_agent(self, call_id, device_id, service_id='00000000000000000'):
         call_info = self.cache.get_call_info(call_id)
         self.logger.info('transfer_to_agent, come in, call_id:%s, call_info:%s'%(call_id, call_info))
         if not call_info:

+ 12 - 1
src/core/callcenter/data_handler.py

@@ -13,7 +13,18 @@ class DataHandleServer:
         self.app = app
 
     @with_app_context
-    def create_record(self, call_info):
+    def create_record(self, call_id, caller_number, call_type, service_category=None, destination=None):
+        call_info = {
+            "session_id": call_id,
+            "time_begin": datetime.utcnow(),
+            "category": 0,
+            "phone": caller_number,
+            "type": call_type,
+            "service_category": service_category,
+            "agent_num":destination,
+            "user_id":destination,
+            "user_name": f"机器人{destination}" if destination else None,
+        }
         call_record = CallRecord()
         # 处理多余的参数
         for key, value in call_info.items():

+ 70 - 25
src/core/callcenter/esl/client.py

@@ -9,12 +9,14 @@ import sys
 import ESL
 import time
 import mmh3
+import uuid
 import threading
 import traceback
 import concurrent.futures
 
 from apscheduler.schedulers.background import BackgroundScheduler
 
+from src.core.callcenter import BizException
 from src.core.callcenter.cache import Cache
 from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand
 from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH, saasId
@@ -27,7 +29,7 @@ import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 import src.core.callcenter.esl.handler as event_handler
 from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId
 from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \
-    Direction, CdrType
+    Direction, CdrType, BizErrorCode
 from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
 from src.core.callcenter.snowflake import Snowflake
 from src.core.datasource import SERVE_HOST
@@ -501,7 +503,6 @@ class OutboundClient:
                     self.server.logger.info('Event-Name: %s', event_name)
                     device_id = info.getHeader("unique-id")
                     caller_number = info.getHeader("Caller-Caller-ID-Number")  # 获取来电号码
-                    whitelist = self.server.load_whitelist()
 
                     call_id = 'C' + str(Snowflake().next_id())
                     new_device_id = 'D' + str(Snowflake().next_id())
@@ -509,33 +510,52 @@ class OutboundClient:
                     kwargs = json.loads(info.serialize('json'))
                     kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id
                     kwargs['variable_sip_h_P-LIBRA-DeviceId'] = new_device_id
-                    call_info = {
-                        "session_id": call_id,
-                        "time_begin": datetime.utcnow(),
-                        "category": 0,
-                        "phone": caller_number
-                    }
-                    # 检查白名单
-                    if caller_number in whitelist:
-                        agents = self.server.load_agent_monitor()
-                        destination = random.choice(agents) # 随机取一个坐席号
-                        # 直接转接到人工坐席
-                        self.server.logger.info( "Caller %s is in whitelist, agents: %s, destination: %s"% (caller_number, agents, destination))
-                        call_info['type']= 0
-                        call_info['agent_num'] = destination
-                    else:
+
+                    bucket = self.server.get_bucket(call_id)
+                    whitelist = self.server.get_whitelist()
+                    if bucket and bucket.name == 'AI':
                         #转到ai机器人
                         destination = self.server.agent.register(**kwargs)
-                        self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s"% (device_id, destination, new_device_id))
-                        call_info['type'] = 1
-                        call_info['service_category'] = 1
-                        call_info['user_id'] = destination
-                        call_info['user_name'] = f"机器人{destination}"
+                        self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s" % (device_id, destination, new_device_id))
                         self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs)
                         self.server.cache.add_device_user_part(device_id, destination)
-                        con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id)
+                        con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s" % (call_id, new_device_id, new_device_id, destination), device_id)
+                        self.server.dataHandleServer.create_record(call_id, caller_number, 1, service_category=1, destination=destination)
+                    elif caller_number in whitelist:
+                        # 检查白名单
+                        self.build_call_info(call_id, device_id, new_device_id, destination=None, **kwargs)
+                        # 传统服务
+                        self.server.agent.acd_service.transfer_to_agent(self, call_id, device_id)
+                        self.server.dataHandleServer.create_record(call_id, caller_number, 0, service_category=None, destination=None)
+                    else:
+                        self.build_call_info(call_id, device_id, new_device_id, destination=None, **kwargs)
+                        # 传统服务
+                        self.server.agent.acd_service.transfer_to_agent(self, call_id, device_id)
+                        self.server.dataHandleServer.create_record(call_id, caller_number, 0, service_category=None, destination=None)
+
+                    # # 检查白名单
+                    # if caller_number in whitelist:
+                    #     agents = self.server.load_agent_monitor()
+                    #     destination = random.choice(agents) # 随机取一个坐席号
+                    #     # 直接转接到人工坐席
+                    #     self.server.logger.info( "Caller %s is in whitelist, agents: %s, destination: %s"% (caller_number, agents, destination))
+                    #     call_info['type']= 0
+                    #     call_info['agent_num'] = destination
+                    # else:
+                    #     #转到ai机器人
+                    #     destination = self.server.agent.register(**kwargs)
+                    #     self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s"% (device_id, destination, new_device_id))
+                    #     call_info['type'] = 1
+                    #     call_info['service_category'] = 1
+                    #     call_info['user_id'] = destination
+                    #     call_info['user_name'] = f"机器人{destination}"
+                    #     self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs)
+                    #     self.server.cache.add_device_user_part(device_id, destination)
+                    #     con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id)
+
+                    # self.server.dataHandleServer.create_record(call_info)
+
 
-                    self.server.dataHandleServer.create_record(call_info)
                     # destination = "user/1001"
                     # msg = ESL.ESLevent("sendmsg", uuid)
                     # msg.addHeader("call-command", "execute")
@@ -595,10 +615,13 @@ class OutboundClient:
             self.cache = Cache(app)
             self.logger = app.logger
             self.whitelist = []
+            self.buckets = []
             self.update_whitelist()  # 初始化加载白名单
+            self.update_bucket() #初始化分流
             self.dataHandleServer = DataHandleServer(app)
             # 定时更新白名单
             threading.Thread(target=self.refresh_whitelist, daemon=True).start()
+            threading.Thread(target=self.refresh_bucket, daemon=True).start()
 
         def update_whitelist(self):
             with self.app.app_context():
@@ -613,7 +636,7 @@ class OutboundClient:
                 self.update_whitelist()
 
 
-        def load_whitelist(self):
+        def get_whitelist(self):
             return self.whitelist
 
 
@@ -623,6 +646,28 @@ class OutboundClient:
                 agent_nums = [agent.agent_num for agent in agents]
                 return agent_nums
 
+        def get_bucket(self, custom_uuid=None):
+            custom_uuid == custom_uuid if custom_uuid else str(uuid.uuid4())
+            random_id = abs(mmh3.hash(custom_uuid))
+            if len(self.buckets) <= 0:
+                raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
+            for bucket in self.buckets:
+                num = (random_id % 100 + 100) %100
+                if num >= bucket.lower and num < bucket.upper:
+                    return bucket
+            return self.buckets[0]
+
+        def update_bucket(self):
+            with self.app.app_context():
+                buckets = Bucket.query.filter_by(eid='001').all()
+                self.buckets = buckets
+                self.logger.info("bucket updated: %s", self.whitelist)
+
+        def refresh_bucket(self):
+            while True:
+                time.sleep(60)  # 每 1分钟 更新一次
+                self.update_bucket()
+
     def start(self, HOST='0.0.0.0', PORT=8084, agent=None):
         # HOST, PORT = "0.0.0.0", 8084
         # 创建一个 TCP 服务器