Browse Source

Merge branch 'dev_bucket_20241202' into develop

DavidLiu 3 months ago
parent
commit
ca531c76c1

+ 13 - 5
src/core/callcenter/acd.py

@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
 # encoding:utf-8
 # encoding:utf-8
+import json
 import time
 import time
 from datetime import datetime
 from datetime import datetime
 from queue import Queue
 from queue import Queue
@@ -7,12 +8,12 @@ from typing import Dict, Any, Optional
 from src.core.callcenter.cache import Cache
 from src.core.callcenter.cache import Cache
 from src.core.callcenter.agent import AgentOperService
 from src.core.callcenter.agent import AgentOperService
 from src.core.callcenter.call import CallService
 from src.core.callcenter.call import CallService
-from src.core.callcenter.api import CallInfo, AgentActionRequest
+from src.core.callcenter.api import CallInfo, AgentActionRequest, DelayAction
 from apscheduler.schedulers.background import BackgroundScheduler
 from apscheduler.schedulers.background import BackgroundScheduler
 from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
 from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
 
 
 from src.core.callcenter.constant import saasId
 from src.core.callcenter.constant import saasId
-from src.core.callcenter.enumeration import AnswerFlag
+from src.core.callcenter.enumeration import AnswerFlag, DelayActionEnum
 
 
 
 
 class AcdService:
 class AcdService:
@@ -29,7 +30,7 @@ class AcdService:
         self.checkIdleScheduler.add_job(self.try_transfer_agent, 'interval', seconds=2, max_instances=1)
         self.checkIdleScheduler.add_job(self.try_transfer_agent, 'interval', seconds=2, max_instances=1)
         self.checkIdleScheduler.start()
         self.checkIdleScheduler.start()
 
 
-    def transfer_to_agent(self, call_id, device_id, service_id):
+    def transfer_to_agent(self, call_id, device_id, service_id='00000000000000000'):
         call_info = self.cache.get_call_info(call_id)
         call_info = self.cache.get_call_info(call_id)
         self.logger.info('transfer_to_agent, come in, call_id:%s, call_info:%s'%(call_id, call_info))
         self.logger.info('transfer_to_agent, come in, call_id:%s, call_info:%s'%(call_id, call_info))
         if not call_info:
         if not call_info:
@@ -38,7 +39,9 @@ class AcdService:
         self.cache.add_call_info(call_info)
         self.cache.add_call_info(call_info)
         # 1. hold住并且播放等待音
         # 1. hold住并且播放等待音
         self.call_service.hold(call_info, device_id)
         self.call_service.hold(call_info, device_id)
+        self.wait_timeout(call_id)
         self.logger.info('transfer_to_agent, 1111111 ')
         self.logger.info('transfer_to_agent, 1111111 ')
+
         # 获得空闲坐席
         # 获得空闲坐席
         agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=service_id))
         agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=service_id))
         self.logger.info('transfer_to_agent, 222222 %s'%agent_number)
         self.logger.info('transfer_to_agent, 222222 %s'%agent_number)
@@ -57,7 +60,7 @@ class AcdService:
         # self.logger.info("AcdService tryTransferAgent start")
         # self.logger.info("AcdService tryTransferAgent start")
         all_task = []
         all_task = []
         for k, v in self.holdsQueue.items():
         for k, v in self.holdsQueue.items():
-            self.logger.info("AcdService tryTransferAgent start, queue.k:%s, q.v %s", k, v.qsize())
+            self.logger.info("AcdService tryTransferAgent start, queue.k:%s, queue.v:%s", k, v.qsize())
             all_task.append(self.pool.submit(self.holds_one_queue, k, v))
             all_task.append(self.pool.submit(self.holds_one_queue, k, v))
         wait(all_task, timeout=6, return_when=ALL_COMPLETED)
         wait(all_task, timeout=6, return_when=ALL_COMPLETED)
 
 
@@ -74,10 +77,11 @@ class AcdService:
             call_id = call_info_queue.get_nowait()
             call_id = call_info_queue.get_nowait()
             call_info = self.cache.get_call_info(call_id)
             call_info = self.cache.get_call_info(call_id)
             if not call_info or not call_info.device_list:
             if not call_info or not call_info.device_list:
+                self.logger.info("AcdService tryTransferAgent callInfoCache is null %s", call_id)
                 continue
                 continue
             agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=task_service_id))
             agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=task_service_id))
             if not agent_number:
             if not agent_number:
-                self.logger.info("AcdService tryTransferAgent agentNumber is Empty %s"% call_id)
+                self.logger.info("AcdService tryTransferAgent agentNumber is Empty %s %s"% (call_id, json.dumps(call_info.device_list)))
                 tmp_arr.append(call_id)
                 tmp_arr.append(call_id)
                 continue
                 continue
             self.logger.info(
             self.logger.info(
@@ -87,3 +91,7 @@ class AcdService:
 
 
         for call_id in tmp_arr:
         for call_id in tmp_arr:
             call_info_queue.put_nowait(call_id)
             call_info_queue.put_nowait(call_id)
+
+    def wait_timeout(self, call_id, timeouts=30):
+        delay_action = DelayAction(call_id=call_id)
+        self.cache.add_delay_message(DelayActionEnum.ACD_TIMEOUT_PLAY.name, delay_action, timeouts)

+ 4 - 1
src/core/callcenter/api.py

@@ -455,7 +455,7 @@ class CallInfo(BaseApi):
                  call_time=None, call_type=None, direction=None, answer_flag=None, wait_time=None, answer_count=0,
                  call_time=None, call_type=None, direction=None, answer_flag=None, wait_time=None, answer_count=0,
                  hangup_dir=None, sdk_hangup=0, hangup_code=None, answer_time=None, end_time=None, talk_time=None,
                  hangup_dir=None, sdk_hangup=0, hangup_code=None, answer_time=None, end_time=None, talk_time=None,
                  first_queue_time=None, queue_start_time=None, queue_end_time=None, overflow_count=0,
                  first_queue_time=None, queue_start_time=None, queue_end_time=None, overflow_count=0,
-                 uuid1=None, uuid2=None, cdr_notify_url=None, queue_level=None, transfer_agent=None, device_list=[],
+                 uuid1=None, uuid2=None, cdr_notify_url=None, queue_level=None, transfer_agent=None, user_no_answer_end_call=False, device_list=[],
                  device_info_map: Dict[str, Any] = {}, follow_data: Dict[str, Any] = {},
                  device_info_map: Dict[str, Any] = {}, follow_data: Dict[str, Any] = {},
                  process_data: Dict[str, Any] = {}, next_commands=[], call_details=[]):
                  process_data: Dict[str, Any] = {}, next_commands=[], call_details=[]):
         self.core_uuid = core_uuid  # 通话唯一标识
         self.core_uuid = core_uuid  # 通话唯一标识
@@ -502,6 +502,7 @@ class CallInfo(BaseApi):
         self.cdr_notify_url = cdr_notify_url  # 话单通知地址
         self.cdr_notify_url = cdr_notify_url  # 话单通知地址
         self.queue_level = queue_level  # 排队等级,默认是进队列时间
         self.queue_level = queue_level  # 排队等级,默认是进队列时间
         self.transfer_agent = transfer_agent #是否转人工
         self.transfer_agent = transfer_agent #是否转人工
+        self.user_no_answer_end_call = user_no_answer_end_call #用户未接听挂机
         self.device_list = device_list  # 当前通话的设备
         self.device_list = device_list  # 当前通话的设备
         self.device_info_map = device_info_map
         self.device_info_map = device_info_map
         self.follow_data = follow_data  # 呼叫随路数据(作为落单数据)
         self.follow_data = follow_data  # 呼叫随路数据(作为落单数据)
@@ -535,6 +536,7 @@ class CallInfo(BaseApi):
                    queue_end_time=data.get('queue_end_time'), overflow_count=data.get('overflow_count'),
                    queue_end_time=data.get('queue_end_time'), overflow_count=data.get('overflow_count'),
                    uuid1=data.get('uuid1'), uuid2=data.get('uuid2'), cdr_notify_url=data.get('cdr_notify_url'),
                    uuid1=data.get('uuid1'), uuid2=data.get('uuid2'), cdr_notify_url=data.get('cdr_notify_url'),
                    queue_level=data.get('queue_level'), transfer_agent=data.get('transfer_agent'),
                    queue_level=data.get('queue_level'), transfer_agent=data.get('transfer_agent'),
+                   user_no_answer_end_call=data.get('user_no_answer_end_call'),
                    device_list=data.get('device_list', []),device_info_map=device_info_map,
                    device_list=data.get('device_list', []),device_info_map=device_info_map,
                    follow_data=follow_data, process_data=process_data,
                    follow_data=follow_data, process_data=process_data,
                    next_commands=next_commands, call_details=call_details)
                    next_commands=next_commands, call_details=call_details)
@@ -588,6 +590,7 @@ class CallInfo(BaseApi):
             "cdr_notify_url": self.cdr_notify_url,
             "cdr_notify_url": self.cdr_notify_url,
             "queue_level": self.queue_level,
             "queue_level": self.queue_level,
             "transfer_agent": self.transfer_agent,
             "transfer_agent": self.transfer_agent,
+            "user_no_answer_end_call": self.user_no_answer_end_call,
             "device_list": [x for x in self.device_list],
             "device_list": [x for x in self.device_list],
             "device_info_map": {key: vars(value) for key, value in self.device_info_map.items()},
             "device_info_map": {key: vars(value) for key, value in self.device_info_map.items()},
             "follow_data": {key: vars(value) for key, value in self.follow_data.items()},
             "follow_data": {key: vars(value) for key, value in self.follow_data.items()},

+ 6 - 4
src/core/callcenter/cache.py

@@ -120,23 +120,25 @@ class Cache:
                             status=0)
                             status=0)
 
 
     def add_delay_message(self, action, delay_action, timeouts):
     def add_delay_message(self, action, delay_action, timeouts):
-        delay_action.uuid = uuid.uuid4()
+        delay_action.uuid = str(uuid.uuid4())
         key = CTI_ENGINE_DELAY_ACTION % action
         key = CTI_ENGINE_DELAY_ACTION % action
         msg = delay_action.to_json_string()
         msg = delay_action.to_json_string()
-        action_time = datetime.utcnow().timestamp() + timeouts * 1000
+        action_time = datetime.utcnow().timestamp()*1000 + timeouts * 1000
         self.redis_handler.redis.zadd(key, {msg : action_time})
         self.redis_handler.redis.zadd(key, {msg : action_time})
 
 
     def get_delay_message(self, action):
     def get_delay_message(self, action):
         key = CTI_ENGINE_DELAY_ACTION % action
         key = CTI_ENGINE_DELAY_ACTION % action
-        current_time = int(time.time() * 1000)  # 毫秒级时间戳
+        current_time = int(datetime.utcnow().timestamp() * 1000)  # 毫秒级时间戳
         members = self.redis_handler.redis.zrangebyscore(key, 0, current_time, start=0, num=DELAY_ACTION_BATCH_SIZE, withscores=True)
         members = self.redis_handler.redis.zrangebyscore(key, 0, current_time, start=0, num=DELAY_ACTION_BATCH_SIZE, withscores=True)
         if not members:
         if not members:
             return []
             return []
         self.logger.info('get_delay_message %s %s %s'%(key, action, members))
         self.logger.info('get_delay_message %s %s %s'%(key, action, members))
         # scored_entries = [{"member": entry[0].decode('utf-8'), "score": entry[1]} for entry in members]
         # scored_entries = [{"member": entry[0].decode('utf-8'), "score": entry[1]} for entry in members]
         action_list = [entry[0].decode('utf-8') for entry in members]
         action_list = [entry[0].decode('utf-8') for entry in members]
+        self.logger.info('get_delay_message %s %s %s'%(key, action, json.dumps(action_list)))
         if action_list:
         if action_list:
-            self.redis_handler.redis.zrem(key, *action_list)
+            a = self.redis_handler.redis.zrem(key, *action_list)
+            self.logger.info('get_delay_message %s %s %s %s'%(key, action, json.dumps(action_list), a))
         return action_list
         return action_list
 
 
     def lock_delay_action(self, val):
     def lock_delay_action(self, val):

+ 0 - 2
src/core/callcenter/call.py

@@ -89,8 +89,6 @@ class CallService:
             pass
             pass
         custom_device_id = devices[0]
         custom_device_id = devices[0]
         self.logger.info('hold, custom_device_id=%s'%custom_device_id)
         self.logger.info('hold, custom_device_id=%s'%custom_device_id)
-        # self.client.sync_invoke_method("bridge_break", method_args=(custom_device_id,))
-        # self.client.sync_invoke_method("hold_play", method_args=(custom_device_id,HOLD_MUSIC_PATH))
         self.client.bridge_break(call_info.call_id, custom_device_id)
         self.client.bridge_break(call_info.call_id, custom_device_id)
         self.cache.set_need_play_hold_music(call_info.call_id)
         self.cache.set_need_play_hold_music(call_info.call_id)
         self.logger.info('hold success custom_device_id=%s'%custom_device_id)
         self.logger.info('hold success custom_device_id=%s'%custom_device_id)

+ 2 - 0
src/core/callcenter/constant.py

@@ -43,6 +43,8 @@ READY_TIMES = "readyTimes"
 SEREVICE_TIMES = "serviceTimes"
 SEREVICE_TIMES = "serviceTimes"
 
 
 HOLD_MUSIC_PATH = '/freeswitch/music/hold.wav'
 HOLD_MUSIC_PATH = '/freeswitch/music/hold.wav'
+WaitingHangupMusicPath = '/freeswitch/music/sorry.wav'
+
 BASE_RECORD_PATH = '/freeswitch/record/'
 BASE_RECORD_PATH = '/freeswitch/record/'
 
 
 EMPTY = ""
 EMPTY = ""

+ 35 - 0
src/core/callcenter/dao.py

@@ -5,6 +5,41 @@ import json
 from . import db
 from . import db
 from datetime import datetime
 from datetime import datetime
 
 
+class Bucket(db.Model):
+    __tablename__ = 'c_bucket'
+    __table_args__ = {
+        'comment': '流量分桶表',
+        'mysql_engine': 'InnoDB',
+        'mysql_charset': 'utf8mb4'
+    }
+
+    id = db.Column(db.BigInteger, primary_key=True, autoincrement=True, comment='主键')
+    name = db.Column(db.String(32), nullable=False, default='', comment='名称')
+    eid = db.Column(db.String(16), nullable=False, default='', comment='实验id')
+    lower = db.Column(db.String(32), nullable=False, default='', comment='下限')
+    upper = db.Column(db.String(64), nullable=False, default='', comment='上限')
+    config = db.Column(db.String(100), nullable=False, default='', comment='拓展配置')
+    update_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment='更新时间')
+    create_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime.utcnow, comment='创建时间' )
+
+    __table_args__ = (
+        db.Index('idx_eid', 'eid', mysql_using='BTREE'),
+    )
+
+    def __repr__(self):
+        return json.dumps(self.to_dict())
+
+    def to_dict(self):
+        return {
+            'id': self.id,
+            'name': self.name,
+            'eid': self.eid,
+            'lower': self.lower,
+            'upper': self.upper,
+            'config': self.config,
+            'update_time': self.update_time.isoformat() if self.update_time else None,
+            'create_time': self.create_time.isoformat() if self.create_time else None,
+        }
 
 
 class Agent(db.Model):
 class Agent(db.Model):
     __tablename__ = 'c_agent'
     __tablename__ = 'c_agent'

+ 12 - 1
src/core/callcenter/data_handler.py

@@ -9,7 +9,18 @@ class DataHandleServer:
         self.app = app
         self.app = app
 
 
     @with_app_context
     @with_app_context
-    def create_record(self, call_info):
+    def create_record(self, call_id, caller_number, call_type, service_category=None, destination=None):
+        call_info = {
+            "session_id": call_id,
+            "time_begin": datetime.utcnow(),
+            "category": 0,
+            "phone": caller_number,
+            "type": call_type,
+            "service_category": service_category,
+            "agent_num":destination,
+            "user_id":destination,
+            "user_name": f"机器人{destination}" if destination else None,
+        }
         call_record = CallRecord()
         call_record = CallRecord()
         # 处理多余的参数
         # 处理多余的参数
         for key, value in call_info.items():
         for key, value in call_info.items():

+ 2 - 2
src/core/callcenter/enumeration.py

@@ -8,8 +8,8 @@ class DelayActionEnum(Enum):
     CALL_TIMEOUT_HANGUP = ('CALL_TIMEOUT_HANGUP', "超时挂机")
     CALL_TIMEOUT_HANGUP = ('CALL_TIMEOUT_HANGUP', "超时挂机")
     PLAY_TIMEOUT_HANGUP = ('PLAY_TIMEOUT_HANGUP','播放超时挂机')
     PLAY_TIMEOUT_HANGUP = ('PLAY_TIMEOUT_HANGUP','播放超时挂机')
     NOTICE_TIMEOUT_HANGUP = ('NOTICE_TIMEOUT_HANGUP', '')
     NOTICE_TIMEOUT_HANGUP = ('NOTICE_TIMEOUT_HANGUP', '')
-    CALL_TIMEOUT_DECR = ('CALL_TIMEOUT_DECR', ''),
-    ACD_TIMEOUT_PLAY = ('ACD_TIMEOUT_PLAY','')
+    CALL_TIMEOUT_DECR = ('CALL_TIMEOUT_DECR', '呼叫超时重试'),
+    ACD_TIMEOUT_PLAY = ('ACD_TIMEOUT_PLAY','排队等待音超时')
 
 
     def __init__(self, code=None, description=None):
     def __init__(self, code=None, description=None):
         self.code = code
         self.code = code

+ 125 - 79
src/core/callcenter/esl/client.py

@@ -9,16 +9,18 @@ import sys
 import ESL
 import ESL
 import time
 import time
 import mmh3
 import mmh3
+import uuid
 import threading
 import threading
 import traceback
 import traceback
 import concurrent.futures
 import concurrent.futures
 
 
 from apscheduler.schedulers.background import BackgroundScheduler
 from apscheduler.schedulers.background import BackgroundScheduler
 
 
+from src.core.callcenter import BizException
 from src.core.callcenter.cache import Cache
 from src.core.callcenter.cache import Cache
 from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand
 from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand
 from src.core.callcenter.callback import Callback
 from src.core.callcenter.callback import Callback
-from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH, saasId
+from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH, WaitingHangupMusicPath, saasId
 from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, \
 from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, \
     SPACE, SPLIT, SOFIA, \
     SPACE, SPLIT, SOFIA, \
     ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, \
     ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, \
@@ -28,7 +30,7 @@ import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 import src.core.callcenter.esl.handler as event_handler
 import src.core.callcenter.esl.handler as event_handler
 from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId
 from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId
 from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \
 from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \
-    Direction, CdrType
+    Direction, CdrType, BizErrorCode
 from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
 from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
 from src.core.callcenter.snowflake import Snowflake
 from src.core.callcenter.snowflake import Snowflake
 from src.core.datasource import SERVE_HOST
 from src.core.datasource import SERVE_HOST
@@ -166,14 +168,15 @@ class InboundClient:
             if device_info.device_type <= DeviceType.ROBOT.code:
             if device_info.device_type <= DeviceType.ROBOT.code:
                 call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code
                 call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code
                 call_info.hangup_code = CallCause.CALL_TIMEOUT.code
                 call_info.hangup_code = CallCause.CALL_TIMEOUT.code
-            # if device_info.device_type.code == DeviceType.CUSTOMER.code:
-                # call_info.user_no_answer_end_call = True
-
-            if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code:
-                channel = self.show_channel(device_id)
-                if channel:
-                    delay_action = DelayAction(call_id=call_id, device_id=device_id)
-                    self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20)
+
+            if device_info.device_type.code == DeviceType.CUSTOMER.code:
+                call_info.user_no_answer_end_call = True
+
+            # if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code:
+            #     channel = self.show_channel(device_id)
+            #     if channel:
+            #         delay_action = DelayAction(call_id=call_id, device_id=device_id)
+            #         self.cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20)
             self.cache.add_call_info(call_info)
             self.cache.add_call_info(call_info)
             self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
             self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
             self.dataHandleServer.update_record(call_id, {"status": 0})
             self.dataHandleServer.update_record(call_id, {"status": 0})
@@ -181,7 +184,7 @@ class InboundClient:
         call_info = self.cache.get_call_info(call_id)
         call_info = self.cache.get_call_info(call_id)
         if not call_info or not call_info.next_commands:
         if not call_info or not call_info.next_commands:
             return
             return
-        self.logger.debug("播放结束音乐失败,进行挂机 callId:%s", call_id)
+        self.logger.info("播放结束音乐失败,进行挂机 callId:%s", call_id)
         next_types = [x.next_type for x in call_info.next_commands]
         next_types = [x.next_type for x in call_info.next_commands]
         if NextType.NEXT_HANGUP.code in next_types:
         if NextType.NEXT_HANGUP.code in next_types:
             for device_id in call_info.device_list:
             for device_id in call_info.device_list:
@@ -195,12 +198,17 @@ class InboundClient:
         device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER]
         device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER]
         if device_list and len(device_list) == 1:
         if device_list and len(device_list) == 1:
             device_id = device_list[0].device_id
             device_id = device_list[0].device_id
-            self.bridge_break(call_id, device_id)
-            self.hold_play(device_id, HOLD_MUSIC_PATH)
+            self.break0(device_id)
+            if not WaitingHangupMusicPath:
+                self.hangup_call(call_id, device_id, CallCause.WAITING_TIMEOUT)
+                return
+            self.hold_play(device_id, WaitingHangupMusicPath)
             self.play_timeout(call_id, timeout=30)
             self.play_timeout(call_id, timeout=30)
+            next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code)
+            call_info.next_commands = [next_command]
             self.cache.add_call_info(call_info)
             self.cache.add_call_info(call_info)
             self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
             self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
-                             device_id, HOLD_MUSIC_PATH)
+                             device_id, WaitingHangupMusicPath)
 
 
     def make_call_new(self, context: MakeCallContext):
     def make_call_new(self, context: MakeCallContext):
         # self.logger.info("拨打测试context:%s", context.__dict__)
         # self.logger.info("拨打测试context:%s", context.__dict__)
@@ -210,7 +218,7 @@ class InboundClient:
         builder = [
         builder = [
             '{', context.get_sip_header(), '}'
             '{', context.get_sip_header(), '}'
         ]
         ]
-
+        self.call_timeout(context.call_id, context.device_id, context.timeout)
         if context.device_type == DeviceType.CUSTOMER.code:
         if context.device_type == DeviceType.CUSTOMER.code:
             profile = self.expression(profile1, params)
             profile = self.expression(profile1, params)
             builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}")
             builder.append(f"{SOFIA}{SK}{profile}{SK}{called}{PARK}")
@@ -304,9 +312,9 @@ class InboundClient:
             builder.append("all")
             builder.append("all")
         command = ' '.join(builder)
         command = ' '.join(builder)
         if sync:
         if sync:
-            self.con.api(command, EMPTY)
+            self.api(command, desc="break0")
         else:
         else:
-            self.con.bgapi(command, EMPTY)
+            self.bgapi(command, desc="break0")
 
 
     def hold(self, smf, uuid, display):
     def hold(self, smf, uuid, display):
         builder = [
         builder = [
@@ -487,40 +495,10 @@ class OutboundClient:
     def __init__(self, agent, app):
     def __init__(self, agent, app):
         self.app = app
         self.app = app
         self.logger = app.logger
         self.logger = app.logger
-        self.whitelist = []
-        self.update_whitelist()  # 初始化加载白名单
-
-        # 定时更新白名单
-        threading.Thread(target=self.refresh_whitelist, daemon=True).start()
-
-
-        self.dataHandleServer = DataHandleServer(app)
-        #threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent, logger)).start()
         server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent))
         server_thread = threading.Thread(target=self.start, args=('0.0.0.0', 8084, agent))
         server_thread.daemon = True  # 设置守护线程
         server_thread.daemon = True  # 设置守护线程
         server_thread.start()
         server_thread.start()
 
 
-    def update_whitelist(self):
-        with self.app.app_context():
-            phones = Whitelist.query.filter_by(del_flag=0).all()
-            self.whitelist = [phone.phone for phone in phones]
-            self.logger.info("Whitelist updated: %s", self.whitelist)
-
-    def refresh_whitelist(self):
-        while True:
-            time.sleep(3600) # 每 1小时 更新一次
-            self.update_whitelist()
-
-    def load_whitelist(self):
-        return self.whitelist
-
-    def load_agent_monitor(self):
-        with self.app.app_context():
-            agents = AgentMonitor.query.filter_by(check_state=0,service_state=2).all()
-            agent_nums = [agent.agent_num for agent in agents]
-            return agent_nums
-
-
     class ESLRequestHandler(socketserver.BaseRequestHandler):
     class ESLRequestHandler(socketserver.BaseRequestHandler):
         def setup(self):
         def setup(self):
             try:
             try:
@@ -536,7 +514,6 @@ class OutboundClient:
                     self.server.logger.info('Event-Name: %s', event_name)
                     self.server.logger.info('Event-Name: %s', event_name)
                     device_id = info.getHeader("unique-id")
                     device_id = info.getHeader("unique-id")
                     caller_number = info.getHeader("Caller-Caller-ID-Number")  # 获取来电号码
                     caller_number = info.getHeader("Caller-Caller-ID-Number")  # 获取来电号码
-                    whitelist = self.server.load_whitelist()
 
 
                     call_id = 'C' + str(Snowflake().next_id())
                     call_id = 'C' + str(Snowflake().next_id())
                     new_device_id = 'D' + str(Snowflake().next_id())
                     new_device_id = 'D' + str(Snowflake().next_id())
@@ -544,33 +521,51 @@ class OutboundClient:
                     kwargs = json.loads(info.serialize('json'))
                     kwargs = json.loads(info.serialize('json'))
                     kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id
                     kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id
                     kwargs['variable_sip_h_P-LIBRA-DeviceId'] = new_device_id
                     kwargs['variable_sip_h_P-LIBRA-DeviceId'] = new_device_id
-                    call_info = {
-                        "session_id": call_id,
-                        "time_begin": datetime.utcnow(),
-                        "category": 0,
-                        "phone": caller_number
-                    }
-                    # 检查白名单
-                    if caller_number in whitelist:
-                        agents = self.server.load_agent_monitor()
-                        destination = random.choice(agents) # 随机取一个坐席号
-                        # 直接转接到人工坐席
-                        self.server.logger.info( "Caller %s is in whitelist, agents: %s, destination: %s"% (caller_number, agents, destination))
-                        call_info['type']= 0
-                        call_info['agent_num'] = destination
-                    else:
+
+                    bucket = self.server.get_bucket(call_id)
+                    whitelist = self.server.get_whitelist()
+                    if bucket and bucket.name == 'AI':
                         #转到ai机器人
                         #转到ai机器人
                         destination = self.server.agent.register(**kwargs)
                         destination = self.server.agent.register(**kwargs)
-                        self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s"% (device_id, destination, new_device_id))
-                        call_info['type'] = 1
-                        call_info['service_category'] = 1
-                        call_info['user_id'] = destination
-                        call_info['user_name'] = f"机器人{destination}"
+                        self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s" % (device_id, destination, new_device_id))
                         self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs)
                         self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs)
                         self.server.cache.add_device_user_part(device_id, destination)
                         self.server.cache.add_device_user_part(device_id, destination)
-                        con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id)
+                        con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s" % (call_id, new_device_id, new_device_id, destination), device_id)
+                        self.server.dataHandleServer.create_record(call_id, caller_number, 1, service_category=1, destination=destination)
+                    elif caller_number in whitelist:
+                        # 检查白名单
+                        self.build_call_info(call_id, device_id, new_device_id, destination=None, **kwargs)
+                        self.server.agent.acd_service.transfer_to_agent(call_id, new_device_id)
+                        self.server.dataHandleServer.create_record(call_id, caller_number, 0, service_category=None, destination=None)
+                    else:
+                        # 传统服务
+                        self.build_call_info(call_id, device_id, new_device_id, destination=None, **kwargs)
+                        self.server.agent.acd_service.transfer_to_agent(call_id, new_device_id)
+                        self.server.dataHandleServer.create_record(call_id, caller_number, 2, service_category=None, destination=None)
+
+                    # # 检查白名单
+                    # if caller_number in whitelist:
+                    #     agents = self.server.load_agent_monitor()
+                    #     destination = random.choice(agents) # 随机取一个坐席号
+                    #     # 直接转接到人工坐席
+                    #     self.server.logger.info( "Caller %s is in whitelist, agents: %s, destination: %s"% (caller_number, agents, destination))
+                    #     call_info['type']= 0
+                    #     call_info['agent_num'] = destination
+                    # else:
+                    #     #转到ai机器人
+                    #     destination = self.server.agent.register(**kwargs)
+                    #     self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s"% (device_id, destination, new_device_id))
+                    #     call_info['type'] = 1
+                    #     call_info['service_category'] = 1
+                    #     call_info['user_id'] = destination
+                    #     call_info['user_name'] = f"机器人{destination}"
+                    #     self.build_call_info(call_id, device_id, new_device_id, str(destination), **kwargs)
+                    #     self.server.cache.add_device_user_part(device_id, destination)
+                    #     con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s"%(call_id, new_device_id, new_device_id, destination), device_id)
+
+                    # self.server.dataHandleServer.create_record(call_info)
+
 
 
-                    self.server.dataHandleServer.create_record(call_info)
                     # destination = "user/1001"
                     # destination = "user/1001"
                     # msg = ESL.ESLevent("sendmsg", uuid)
                     # msg = ESL.ESLevent("sendmsg", uuid)
                     # msg.addHeader("call-command", "execute")
                     # msg.addHeader("call-command", "execute")
@@ -613,28 +608,79 @@ class OutboundClient:
                                      cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None)
                                      cti_flow_id=None, conference=None, agent_name=None, from_agent=None, caller=None, called=None, display=None, called_location=None, caller_location=None, ring_start_time=None, ring_end_time=None, answer_time=None, bridge_time=None, end_time=None, talk_time=None, sip_protocol=None, channel_name=None, hangup_cause=None, ring_cause=None, sip_status=None, record=None, record_time=None, record_start_time=None, state=None, apparent_number=None, caller_display=None)
 
 
             call_info.device_list.append(device_id)
             call_info.device_list.append(device_id)
-            self.server.logger.info("liuwei::debugger:3333::call_id=%s, device_id=%s" % (call_id, device_id))
+            # self.server.logger.info("liuwei::debugger:3333::call_id=%s, device_id=%s" % (call_id, device_id))
             call_info.device_list.append(new_device_id)
             call_info.device_list.append(new_device_id)
-            self.server.logger.info("liuwei::debugger:4444::call_id=%s, device_id=%s" % (call_id, new_device_id))
+            # self.server.logger.info("liuwei::debugger:4444::call_id=%s, device_id=%s" % (call_id, new_device_id))
             # call_info.next_commands.append(NextCommand(device_id, NextType.NEXT_CALL_BRIDGE.code, new_device_id))
             # call_info.next_commands.append(NextCommand(device_id, NextType.NEXT_CALL_BRIDGE.code, new_device_id))
             call_info.device_info_map = {device_id: device_custom, new_device_id: device_bot}
             call_info.device_info_map = {device_id: device_custom, new_device_id: device_bot}
-            self.server.logger.info("lwoutBound, call_info=%s"%(call_info))
+            # self.server.logger.info("lwoutBound, call_info=%s"%(call_info))
             self.server.cache.add_call_info(call_info)
             self.server.cache.add_call_info(call_info)
 
 
 
 
     class CustomTCPServer(socketserver.TCPServer):
     class CustomTCPServer(socketserver.TCPServer):
-        def __init__(self, server_address, RequestHandlerClass, agent, app,whitelist_loader,load_agent_monitor,dataHandleServer):
+        def __init__(self, server_address, RequestHandlerClass, agent, app):
+            super().__init__(server_address, RequestHandlerClass)
             self.agent = agent
             self.agent = agent
+            self.app = app
             self.cache = Cache(app)
             self.cache = Cache(app)
             self.logger = app.logger
             self.logger = app.logger
-            self.load_whitelist = whitelist_loader
-            self.load_agent_monitor = load_agent_monitor
-            self.dataHandleServer = dataHandleServer
-            super().__init__(server_address, RequestHandlerClass)
+            self.whitelist = []
+            self.buckets = []
+            self.update_whitelist()  # 初始化加载白名单
+            self.update_bucket() #初始化分流
+            self.dataHandleServer = DataHandleServer(app)
+            # 定时更新白名单
+            threading.Thread(target=self.refresh_whitelist, daemon=True).start()
+            threading.Thread(target=self.refresh_bucket, daemon=True).start()
+
+        def update_whitelist(self):
+            with self.app.app_context():
+                phones = Whitelist.query.filter_by(del_flag=0).all()
+                self.whitelist = [phone.phone for phone in phones]
+                self.logger.info("Whitelist updated: %s", self.whitelist)
+
+
+        def refresh_whitelist(self):
+            while True:
+                time.sleep(3600)  # 每 1小时 更新一次
+                self.update_whitelist()
+
+
+        def get_whitelist(self):
+            return self.whitelist
+
+
+        def load_agent_monitor(self):
+            with self.app.app_context():
+                agents = AgentMonitor.query.filter_by(check_state=0, service_state=2).all()
+                agent_nums = [agent.agent_num for agent in agents]
+                return agent_nums
+
+        def get_bucket(self, custom_uuid=None):
+            custom_uuid == custom_uuid if custom_uuid else str(uuid.uuid4())
+            random_id = abs(mmh3.hash(custom_uuid))
+            if len(self.buckets) <= 0:
+                raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
+            for bucket in self.buckets:
+                num = (random_id % 100 + 100) %100
+                if num >= bucket.lower and num < bucket.upper:
+                    return bucket
+            return self.buckets[0]
+
+        def update_bucket(self):
+            with self.app.app_context():
+                buckets = Bucket.query.filter_by(eid='001').all()
+                self.buckets = buckets
+                self.logger.info("bucket updated: %s", self.whitelist)
+
+        def refresh_bucket(self):
+            while True:
+                time.sleep(60)  # 每 1分钟 更新一次
+                self.update_bucket()
 
 
     def start(self, HOST='0.0.0.0', PORT=8084, agent=None):
     def start(self, HOST='0.0.0.0', PORT=8084, agent=None):
         # HOST, PORT = "0.0.0.0", 8084
         # HOST, PORT = "0.0.0.0", 8084
         # 创建一个 TCP 服务器
         # 创建一个 TCP 服务器
-        with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, self.app, self.load_whitelist, self.load_agent_monitor,self.dataHandleServer) as server:
+        with self.CustomTCPServer((HOST, PORT), self.ESLRequestHandler, agent, self.app) as server:
             print(f"ESL server listening on {HOST}:{PORT}")
             print(f"ESL server listening on {HOST}:{PORT}")
             server.serve_forever()
             server.serve_forever()

+ 4 - 3
src/core/callcenter/esl/handler/channel_answer_handler.py

@@ -4,7 +4,8 @@ import json
 import time
 import time
 from datetime import datetime
 from datetime import datetime
 from src.core.callcenter.constant import saasId, get_record_prefix,get_record_file_name
 from src.core.callcenter.constant import saasId, get_record_prefix,get_record_file_name
-from src.core.callcenter.enumeration import NextType, AnswerFlag, Direction, DeviceType, AgentScene, CdrType,WorkStatus,AgentServiceState,CallStage
+from src.core.callcenter.enumeration import NextType, AnswerFlag, Direction, DeviceType, AgentScene, CdrType, \
+    WorkStatus, AgentServiceState, CallStage, CallType
 from src.core.callcenter.esl.annotation import EslEventName
 from src.core.callcenter.esl.annotation import EslEventName
 import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 from src.core.callcenter.esl.constant.event_names import CHANNEL_ANSWER
 from src.core.callcenter.esl.constant.event_names import CHANNEL_ANSWER
@@ -39,8 +40,8 @@ class ChannelAnswerHandler(EslEventHandler):
         if not next_command:
         if not next_command:
             return
             return
 
 
-        # if device_info.device_type == DeviceType.CUSTOMER.code:
-        #     self.push_handler.push_on_ring_start(saas_id=call_info.saas_id, flow_id=call_info.cti_flow_id, user_id=call_info.agent_key, scene=AgentScene.MANUAL, call_id=call_info.call_id)
+        if CallType.AGENT_CALL.code == call_info.call_type and device_info.device_type == DeviceType.CUSTOMER.code:
+            self.record(event, device_id)
 
 
         device_info.answer_time = EslEventUtil.getEventDateTimestamp(event)
         device_info.answer_time = EslEventUtil.getEventDateTimestamp(event)
         device_info.ring_end_time = EslEventUtil.getEventDateTimestamp(event)
         device_info.ring_end_time = EslEventUtil.getEventDateTimestamp(event)

+ 30 - 10
src/core/callcenter/esl/handler/playback_stop_handler.py

@@ -1,13 +1,15 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
 # encoding:utf-8
 # encoding:utf-8
 
 
+import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
+from src.core.callcenter.constant import HOLD_MUSIC_PATH
+from src.core.callcenter.data_handler import *
+from src.core.callcenter.enumeration import NextType, CallCause
 from src.core.callcenter.esl.annotation import EslEventName
 from src.core.callcenter.esl.annotation import EslEventName
-from src.core.callcenter.enumeration import  DeviceType,AgentScene
 from src.core.callcenter.esl.constant.event_names import PLAYBACK_STOP
 from src.core.callcenter.esl.constant.event_names import PLAYBACK_STOP
 from src.core.callcenter.esl.handler.esl_event_handler import EslEventHandler
 from src.core.callcenter.esl.handler.esl_event_handler import EslEventHandler
-import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 from src.core.callcenter.push import PushHandler
 from src.core.callcenter.push import PushHandler
-from src.core.callcenter.data_handler import *
+
 
 
 @EslEventName(PLAYBACK_STOP)
 @EslEventName(PLAYBACK_STOP)
 class PlaybackStopHandler(EslEventHandler):
 class PlaybackStopHandler(EslEventHandler):
@@ -18,10 +20,28 @@ class PlaybackStopHandler(EslEventHandler):
         self.push_handler = PushHandler(inbound_client.logger)
         self.push_handler = PushHandler(inbound_client.logger)
 
 
     def handle(self, address, event, coreUUID):
     def handle(self, address, event, coreUUID):
-        pass
-        # call_id = EslEventUtil.getCallId(event)
-        # device_id = EslEventUtil.getDeviceId(event)
-        # call = self.cache.get_call_info(call_id)
-        # device = call.device_info_map.get(device_id)
-        # if device.device_type == DeviceType.AGENT.code:
-        #     self.push_handler.push_on_ring_end(call.cti_flow_id, call.agent_key, AgentScene.MANUAL,call.call_id)
+        call_id = EslEventUtil.getCallId(event)
+        device_id = EslEventUtil.getDeviceId(event)
+        playback_file_path = EslEventUtil.getPlaybackFilePath(event)
+        if playback_file_path and HOLD_MUSIC_PATH == playback_file_path:
+            self.logger.info("等待音结束不需要处理 callId: %s deviceId: %s playbackFilePath: %s", call_id, device_id, playback_file_path);
+            return
+
+        call_info = self.cache.get_call_info(call_id)
+        if not call_info:
+            self.logger.info("PLAYBACK_STOP call_info:%s is null", call_id)
+            return
+        device_info = call_info.device_info_map.get(device_id)
+        if not device_info:
+            self.logger.info("PLAYBACK_STOP device_info:%s is null", device_id)
+            return
+        next_command = call_info.next_commands[0] if call_info.next_commands and len(call_info.next_commands) > 0 else None
+        if not next_command:
+            self.logger.info("PLAYBACK_STOP next_command is null, call_info:%s", call_info)
+            return
+
+        if NextType.NEXT_HANGUP == next_command.next_type:
+            call_info.end_time = device_info.end_time
+            for _device_id in call_info.device_list:
+                self.inbound_client.hangup_call(call_id, _device_id, CallCause.PLAYBACK_STOP)
+        self.cache.add_call_info(call_info)