Browse Source

Merge branch 'develop'

Davidliu 3 weeks ago
parent
commit
089bdf954c

+ 1 - 0
.gitignore

@@ -35,3 +35,4 @@ config/*.xlsx
 **.log
 **.ipynb
 *.pyc
+src.tar.gz

+ 1 - 1
docker-compose.yml

@@ -1,7 +1,7 @@
 services:
   pjsua:
     image: pjsua2:v2.14.1214
-    container_name: pjsua
+    container_name: voip
     restart: always
 #    network_mode: host
     volumes:

+ 5 - 13
src/core/callcenter/acd.py

@@ -45,15 +45,11 @@ class AcdService:
         agent_number = self.agent_service.assign(AgentActionRequest(saas_id=SAAS_ID, service_id=service_id))
         if not agent_number:
             # 如果没有空闲坐席,播放等待音
-            text = "AcdService transferToAgent agentNumber is empty serviceId:%s,called:%s,callId:%s"%(service_id, call_info.called, call_info.call_id)
-            # print(text, flush=True)
-            self.logger.info(text)
+            self.logger.info("AcdService transferToAgent agentNumber is empty serviceId:%s,caller:%s,called:%s,callId:%s"%(service_id, call_info.caller, call_info.called, call_info.call_id))
             self.add_acd_queue(call_info, service_id)
         else:
             # 有空闲坐席,直接转接
-            text = "AcdService transferToAgent agentNumber not empty %s, serviceId:%s,called:%s,callId:%s"%(agent_number, service_id, call_info.called, call_info.call_id)
-            # print(text, flush=True)
-            self.logger.info(text)
+            self.logger.info("AcdService transferToAgent agentNumber is %s, serviceId:%s,caller:%s,called:%s,callId:%s"%(agent_number, service_id, call_info.caller,call_info.called, call_info.call_id))
             self.call_service.transfer(call_info, agent_number, service_id)
             # self.agent_state_service.busy(call_info.saas_id, agent_number, agent_number)
         self.cache.add_call_info(call_info)
@@ -90,20 +86,16 @@ class AcdService:
                 continue
             agent_number = self.agent_service.assign(AgentActionRequest(saas_id=SAAS_ID, service_id=task_service_id))
             if not agent_number:
-                text = "AcdService tryTransferAgent agentNumber is Empty %s %s"% (call_id, json.dumps(call_info.device_list))
-                # print(text, flush=True)
-                self.logger.info(text)
+                self.logger.info("AcdService tryTransferAgent agentNumber is Empty %s %s"% (call_id, json.dumps(call_info.device_list)))
                 tmp_arr.append(call_id)
                 continue
 
-            text = "AcdService tryTransferAgent agentNumber not Empty %s, serviceId:%s, called:%s, callId:%s"%(agent_number, task_service_id, call_info.called, call_id)
-            # print(text, flush=True)
-            self.logger.info(text)
+            self.logger.info("AcdService tryTransferAgent agentNumber not Empty %s, serviceId:%s, called:%s, callId:%s"%(agent_number, task_service_id, call_info.called, call_id))
             self.call_service.transfer(call_info, agent_number, task_service_id)
 
         for call_id in tmp_arr:
             call_info_queue.put_nowait(call_id)
 
-    def wait_timeout(self, call_id, timeouts=30):
+    def wait_timeout(self, call_id, timeouts=55):
         delay_action = DelayAction(call_id=call_id)
         self.cache.add_delay_message(DelayActionEnum.ACD_TIMEOUT_PLAY.name, delay_action, timeouts)

+ 133 - 70
src/core/callcenter/agent.py

@@ -7,7 +7,7 @@ import traceback
 from collections import defaultdict
 from concurrent.futures import ThreadPoolExecutor
 from typing import List
-
+from datetime import datetime
 from apscheduler.schedulers.background import BackgroundScheduler
 from sqlalchemy import or_
 
@@ -17,11 +17,11 @@ from src.core import with_app_context
 from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \
     AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData
 from src.core.callcenter.cache import Cache
-from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID
+from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID, CENTER_AGENT_LIVE_CNT
 from src.core.callcenter.dao import *
 from src.core.callcenter.data_handler import DataHandleServer
 from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
-    AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect
+    AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect, HangupDir
 from src.core.callcenter.esl.constant.event_names import *
 from src.core.callcenter.exception import BizException
 from src.core.callcenter.push import PushHandler
@@ -80,6 +80,8 @@ class AgentEventService:
 
     def agent_event_channel(self, event, call_info: CallInfo, device_info: DeviceInfo):
         event_name = EslEventUtil.getEventName(event)
+        event_timestamp = EslEventUtil.getEventTimestamp(event)
+        event_time = datetime.fromtimestamp(event_timestamp).strftime('%Y-%m-%d %H:%M:%S')
         saas_id = call_info.saas_id if call_info else None
         flow_id = call_info.cti_flow_id if call_info else None
         call_id = call_info.call_id if call_info else None
@@ -91,18 +93,21 @@ class AgentEventService:
 
         start_time = time.time()
         try:
-            self.logger.info('agent_event_channel, event_name=%s, agent_num=%s, device_id=%s, is_agent=%s', event_name, agent_num, device_id, is_agent)
+            self.logger.info('agent_event_channel, event_name=%s, call_id=%s, event_time=%s, agent_num=%s, device_id=%s, is_agent=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, event_time, agent_num, device_id, is_agent, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
             agent = self.data_handle_server.get_agent(saas_id, agent_num)
             if not agent:
-                # self.logger.warn("event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
+                self.logger.warn("agent_event_channel:return, agent is null %s %s %s %s %s", saas_id, event_name, event_time, caller, called)
                 return
             agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
             if not agent_monitor:
-                # self.logger.warn("event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
+                self.logger.warn("agent_event_channel:return, agentMonitor is null %s %s %s %s %s", saas_id, event_name, event_time, caller, called)
                 return
 
             # 信道发起事件,触发完成发起(或桥)&& 坐席侧
             if CHANNEL_ORIGINATE == event_name and is_agent:
+                # if call_info.hangup_dir and call_info.hangup_dir == HangupDir.CUSTOMER_HANGUP.code:
+                #     self.logger.info('agent_event_channel:already:hangup, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
+                    # return
                 self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_RINGING,phone=call_info.caller)
 
             # 进度事件,外呼时对方提醒。或者入呼时提醒 && 坐席侧
@@ -144,8 +149,9 @@ class AgentEventService:
                     if call_id:
                         self.cache.set_call_is_end(call_id)
                 self.agent_monitor_service.update_processing(agent_monitor)
-                self.logger.info('挂断更新')
-                self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL))
+                self.logger.info('挂断更新:%s', agent)
+                # self.data_handle_server.update_record(call_id, user_id=agent.user_id, user_name=agent.agent_name)
+                self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, call_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL))
                 self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, '0')
                 self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_HANG_REPROCESSING)
                 self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.REPROCESSING)
@@ -171,14 +177,20 @@ class AgentEventService:
         except:
             traceback.print_exc()
         finally:
-            latency = (time.time() - start_time)
+            time_cost = (time.time() - start_time) * 1000
+            registry.ESL_EVENT_CALLBACK_COST.labels(event_name, "agent").observe(time_cost)
+            latency = (time.time() - event_timestamp) * 1000
             registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
+            self.logger.info('agent_event_channel, event_name=%s, time_cost=%s, latency=%s, call_id=%s, event_time=%s, agent_num=%s, device_id=%s, is_agent=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, time_cost, latency, call_id, event_time, agent_num, device_id, is_agent, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
 
     def bot_event_channel(self, event, call_info, device_info):
         event_name = EslEventUtil.getEventName(event)
+        event_timestamp = EslEventUtil.getEventTimestamp(event)
+        event_time = datetime.fromtimestamp(event_timestamp).strftime('%Y-%m-%d %H:%M:%S')
         saas_id = call_info.saas_id if call_info else None
         flow_id = call_info.cti_flow_id if call_info else None
         call_id = call_info.call_id if call_info else None
+        device_id = device_info.device_id if device_info else None
         agent_num = device_info.agent_key if device_info else None
         is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
         caller = (device_info.called if is_agent else device_info.caller) if device_info else None
@@ -187,27 +199,29 @@ class AgentEventService:
 
         start_time = time.time()
         try:
-            self.logger.info('bot_event_channel, event_name=%s, call_id=%s, is_agent=%s, agent_num=%s', event_name, call_id, is_agent, agent_num)
+            self.logger.info('bot_event_channel, event_name=%s, call_id=%s, device_id=%s, event_time=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, event_time, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
             agent = self.data_handle_server.get_agent(saas_id, agent_num)
             if not agent:
-                # self.logger.warn("bot event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called,
-                #                  json.dumps(event.serialize('json')))
+                self.logger.warn("bot_event_channel:return, agent is null %s %s %s %s %s %s", saas_id, event_name, event_time, call_id, caller, called)
                 return
             agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
             if not agent_monitor:
-                # self.logger.warn("bot event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller,
-                #                  called, json.dumps(event.serialize('json')))
+                self.logger.warn("bot_event_channel:return, agentMonitor is null %s %s %s %s %s %s ", saas_id, event_name, event_time, call_id, caller, called)
                 return
 
             # 信道发起事件,触发完成发起(或桥)&& 坐席侧
             if CHANNEL_ORIGINATE == event_name and is_agent:
+                # if call_info.hangup_dir and call_info.hangup_dir == HangupDir.CUSTOMER_HANGUP.code:
+                #     self.logger.info('bot_event_channel:already:hangup, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
+                    # return
                 self.push_handler.push_on_call_ring(saas_id, flow_id, agent_num, AgentScene.ROBOT, call_id, ServiceDirect.ROBOT_CALL.service_direct, called, caller, human_service_id)
                 self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_RINGING,phone=call_info.caller)
+                self.data_handle_server.update_record(call_id, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)  # 转接给客服以后更新转接人
 
 
             if CHANNEL_ANSWER == event_name:
                 self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
-                self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)
+                # self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)
                 if is_agent:
                     self.agent_monitor_service.update_calling(agent_monitor)
                     self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! internal")
@@ -220,7 +234,7 @@ class AgentEventService:
 
             if CHANNEL_HANGUP == event_name and is_agent:
                 self.agent_monitor_service.update_processing(agent_monitor)
-                self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT))
+                self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, call_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT))
                 self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, "0")
                 self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_HANG_REPROCESSING)
                 self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.REPROCESSING)
@@ -233,8 +247,11 @@ class AgentEventService:
         except:
             traceback.print_exc()
         finally:
-            latency = (time.time() - start_time)
-            registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
+            time_cost = (time.time() - start_time) * 1000
+            registry.ESL_EVENT_CALLBACK_COST.labels(event_name, "bot").observe(time_cost)
+            latency = (time.time() - event_timestamp) * 1000
+            registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "bot").observe(latency)
+            self.logger.info('bot_event_channel, event_name=%s, time_cost=%s, latency=%s, call_id=%s, device_id=%s, event_time=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, time_cost, latency, call_id, device_id, event_time, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
 
     def reprocessing_idle(self, state_data: AgentDelayStateData):
         agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
@@ -247,7 +264,7 @@ class AgentEventService:
         self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
         self.logger.info('reprocessing_idle_end')
         self.agent_monitor_service.update_idle(agent_monitor)
-        self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, "", state_data.scene, WorkStatus.AGENT_HANG_IDLE)
+        self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, state_data.call_id, state_data.scene, WorkStatus.AGENT_HANG_IDLE)
         self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.REPROCESSING_IDLE)
 
 
@@ -266,12 +283,11 @@ class AgentOperService:
         self.agent_actionlog_service = AgentActionLogService(app)
         self.agent_state_service = AgentStateService(app)
 
-        # self.daemon_stopping = False
         self.agent_heartbeat_expire = 30
+        self.agent_serial_live_expire = 60*10
         self.agent_heartbeat_job_scheduler = BackgroundScheduler()
         self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
         self.agent_heartbeat_job_scheduler.start()
-        # threading.Thread(target=self.agent_heartbeat_daemon).start()
 
     def agent_heartbeat_daemon(self):
         def check_out_daemon(_name, key, value):
@@ -280,22 +296,39 @@ class AgentOperService:
                 if sec > self.agent_heartbeat_expire:
                     self.redis_handler.redis.hdel(_name, key)
                     self.logger.error("agent heartbeat expired, will checkout %s %s", key, value)
-                    self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key))
+                    # self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key))
             except:
                 traceback.print_exc()
 
-        # while not self.daemon_stopping:
+        def check_agent_live_daemon(_members):
+            _key = CENTER_AGENT_LIVE_CNT % SAAS_ID
+            pre_time = self.redis_handler.redis.get(_key)
+            if not pre_time or not _members or len(_members) == 0:
+                _value = datetime.now().timestamp()
+                self.redis_handler.redis.set(_key, _value, ex=60*60, nx=True)
+            else:
+                _diff = datetime.now().timestamp() - float(pre_time)
+                if _diff > self.agent_serial_live_expire:
+                    self.logger.info('check_agent_live_daemon, members=%s, diff=%s, pre_time=%s', (len(_members) if _members else 0), _diff, pre_time)
+                    self.logger.warn('WARING::live agent count less than 1 serial ten minutes')
+                    self.data_handle_server.create_warning_record(1, '10分钟空岗报警')
+                    self.redis_handler.redis.delete(_key)
+
+                if _members and len(_members) > 0:
+                    self.redis_handler.redis.delete(_key)
+
         name = CENTER_AGENT_HEARTBEAT % SAAS_ID
         members = self.redis_handler.redis.hgetall(name)
+        check_agent_live_daemon(members)
         if not members:
             return
+
+        registry.MANUAL_AGENT_LIVES.set(len(members))
         for k,v in members.items():
             check_out_daemon(name, k, v)
-        # time.sleep(1)
 
     def __del__(self):
         self.agent_heartbeat_job_scheduler.shutdown()
-        # self.daemon_stopping = True
 
     @with_app_context
     def enable(self, req: AgentActionRequest):
@@ -801,8 +834,8 @@ class AgentStateService:
         self.logger = app.logger
         self.redis_handler = RedisHandler()
         self.assigned_recycle_millisecond = 30 * 1000
-        self.state_service_id_data_map = defaultdict(dict)
-        self.executor = ThreadPoolExecutor(max_workers=10)
+        # self.state_service_id_data_map = defaultdict(dict)
+        # self.executor = ThreadPoolExecutor(max_workers=10)
         self.data_handle_server = DataHandleServer(app)
         self.agent_monitor_service = AgentMonitorService(app)
         self.agent_actionlog_service = AgentActionLogService(app)
@@ -861,14 +894,39 @@ class AgentStateService:
 
     def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
         choose_phone_num = ''
-        self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called))
-        idle_agents = self.idle_agents(saas_id, service_id)
-        if len(idle_agents) <= 0:
-            return choose_phone_num
-        choose_phone_num = self._choose_max_idle_time(idle_agents)
-        self.handle_assign_time(saas_id, service_id, choose_phone_num)
+        lock = threading.Lock()
+        try:
+            lock.acquire()
+            self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called))
+            idle_agents = self.idle_agents(saas_id, service_id)
+            if len(idle_agents) <= 0:
+                return choose_phone_num
+            choose_phone_num = self._choose_max_idle_time(idle_agents)
+            self.handle_assign_time(saas_id, service_id, choose_phone_num)
+            self.handle_lock_agent(choose_phone_num, saas_id, service_id)
+        finally:
+            lock.release()
         return choose_phone_num
 
+    def handle_check_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'):
+        key = self._lock_key(saas_id, service_id, choose_phone_num)
+        res = self.redis_handler.redis.get(key)
+        self.logger.info('checkAgent %s %s %s %s'% (saas_id, service_id, choose_phone_num, res))
+        return False if res else True
+
+    def handle_lock_agent(self, choose_phone_num, saas_id, service_id='00000000000000000'):
+        key = self._lock_key(saas_id, service_id, choose_phone_num)
+        expire = self._get_expire_time()
+        self.redis_handler.redis.set(key, 1, nx=True, ex=expire)
+        res = self.redis_handler.redis.get(key)
+        self.logger.info('lockAgent %s %s %s %s %s'% (saas_id, service_id, choose_phone_num, expire, res))
+
+    def handle_release_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'):
+        key = self._lock_key(saas_id, service_id, choose_phone_num)
+        self.redis_handler.redis.delete(key)
+        # self.redis_handler.redis.expire(key, 3)
+        self.logger.info('releaseAgent %s %s %s'% (saas_id, service_id, choose_phone_num))
+
     def handle_assign_time(self, saas_id, service_id, choose_phone_num):
         key = self._key(saas_id, service_id)
         cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
@@ -939,13 +997,14 @@ class AgentStateService:
         return free_agents
 
     def get_idle_agents(self,cache_agent_list):
-        current_time =int(datetime.now().timestamp() * 1000)  # 获取当前时间的毫秒级时间戳
+        # current_time =int(datetime.now().timestamp() * 1000)  # 获取当前时间的毫秒级时间戳
         idle_agents = [
             agent for agent in cache_agent_list
-            if agent.status == 1 and (
-                    agent.assign_time == 0 or
-                    agent.assign_time + self.assigned_recycle_millisecond < current_time
-            )
+            if agent.status == 1 and self.handle_check_agent_lock(agent.phone_num, SAAS_ID)
+               #and (
+               #     agent.assign_time == 0 or
+               #     agent.assign_time + self.assigned_recycle_millisecond < current_time
+            #)
         ]
         return idle_agents
 
@@ -966,34 +1025,34 @@ class AgentStateService:
             busy_agents_size = len(busy_agents)  # 获取忙碌代理的数量
         return busy_agents_size
 
-    def update_report_state(self, saas_id, service_id):
-        key = self._key(saas_id, service_id)
-        # data_map 这个地方有疑问
-        data_map = self.state_service_id_data_map[key]
-        idle = HumanState.IDLE
-        if idle.value not in data_map:
-            data_map[idle.code] = threading.Lock()
-            self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
-            # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
-        busy = HumanState.BUSY
-        if busy.value not in data_map:
-            data_map[busy.code] = threading.Lock()
-            self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
-            # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
-
-    def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
-        name = "cti_center_real_time_human_service_state"
-        tag_list = {
-            "vcc_id": saas_id,
-            "service_id": service_id,
-            "state": human_state.code,
-        }
-        if human_state == HumanState.IDLE:
-            # meter_registry 这块疑问
-            self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
-        elif human_state == HumanState.BUSY:
-            self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
-        return 0
+    # def update_report_state(self, saas_id, service_id):
+    #     key = self._key(saas_id, service_id)
+    #     # data_map 这个地方有疑问
+    #     data_map = self.state_service_id_data_map[key]
+    #     idle = HumanState.IDLE
+    #     if idle.value not in data_map:
+    #         data_map[idle.code] = threading.Lock()
+    #         self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
+    #         # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
+    #     busy = HumanState.BUSY
+    #     if busy.value not in data_map:
+    #         data_map[busy.code] = threading.Lock()
+    #         self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
+    #         # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
+    #
+    # def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
+    #     name = "cti_center_real_time_human_service_state"
+    #     tag_list = {
+    #         "vcc_id": saas_id,
+    #         "service_id": service_id,
+    #         "state": human_state.code,
+    #     }
+    #     if human_state == HumanState.IDLE:
+    #         # meter_registry 这块疑问
+    #         self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
+    #     elif human_state == HumanState.BUSY:
+    #         self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
+    #     return 0
 
     def _check_in_key(self, saas_id):
         return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
@@ -1001,11 +1060,15 @@ class AgentStateService:
     def _key(self, saas_id, service_id):
         return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
 
+    def _lock_key(self, saas_id, service_id, choose_phone_num):
+        return "CTI:%s:HUMAN:%s:%s:LOCK"%(saas_id.upper(), service_id, choose_phone_num)
+
     def _get_expire_time(self):
-        now = datetime.now()
-        end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
-        expire_time = (end_of_day - now).total_seconds() * 1000  # Convert to milliseconds
-        return int(expire_time)
+        # now = datetime.now()
+        # end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
+        # expire_time = (end_of_day - now).total_seconds()  # Convert to milliseconds
+        # return int(expire_time)
+        return 60*60*24*30
 
     def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
         idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)

+ 12 - 7
src/core/callcenter/api.py

@@ -177,9 +177,10 @@ class AgentStateData(BaseApi):
         self.phone_num = phone_num
 
 class AgentDelayStateData(BaseApi):
-    def __init__(self, saas_id=None, flow_id=None, agent_num=None, service_state=None, scene=None):
+    def __init__(self, saas_id=None, flow_id=None, call_id=None, agent_num=None, service_state=None, scene=None):
         self.saas_id = saas_id
         self.flow_id = flow_id
+        self.call_id = call_id
         self.agent_num = agent_num
         self.service_state = service_state
         self.scene = scene
@@ -217,8 +218,8 @@ class MakeCallContext(BaseApi):
                  device_id: Optional[str] = None,
                  eavesdrop: Optional[str] = None,
                  device_type: Optional[int] = None,
-                 timeout: Optional[int] = 60*2,
-                 originate_timeout: Optional[int] = 60*2,
+                 timeout: Optional[int] = 90,
+                 originate_timeout: Optional[int] = 90,
                  sip_header_map: Optional[Dict[str, str]] = {},
                  called_prefix: Optional[str] = "",
                  service_id: Optional[str] = None,
@@ -284,12 +285,14 @@ class MakeCallContext(BaseApi):
 
         if self.device_type == DeviceType.CUSTOMER.code:
             headers += [
-                "RECORD_STEREO_SWAP=true"
+                "RECORD_STEREO_SWAP=true",
+                "rx_gain=30"
             ]
         else:
             headers += [
                 "RECORD_STEREO_SWAP=false",
-                "continue_on_fail=true"
+                "continue_on_fail=true",
+                "rx_gain=30"
             ]
 
         if self.sip_header_map:
@@ -455,7 +458,7 @@ class CallInfo(BaseApi):
                  hangup_dir=None, sdk_hangup=0, hangup_code=None, answer_time=None, end_time=None, talk_time=None,
                  first_queue_time=None, queue_start_time=None, queue_end_time=None, overflow_count=0,
                  uuid1=None, uuid2=None, cdr_notify_url=None, queue_level=None, transfer_agent=None, bucket_type=None,
-                 user_no_answer_end_call=False, device_list=[], device_info_map: Dict[str, Any] = {}, follow_data: Dict[str, Any] = {},
+                 user_no_answer_end_call=False, hangup_count=0, device_list=[], device_info_map: Dict[str, Any] = {}, follow_data: Dict[str, Any] = {},
                  process_data: Dict[str, Any] = {}, next_commands=[], call_details=[]):
         self.core_uuid = core_uuid  # 通话唯一标识
         self.cti_flow_id = cti_flow_id
@@ -503,6 +506,7 @@ class CallInfo(BaseApi):
         self.transfer_agent = transfer_agent #是否转人工
         self.bucket_type = bucket_type #呼入流量类型
         self.user_no_answer_end_call = user_no_answer_end_call #用户未接听挂机
+        self.hangup_count = hangup_count
         self.device_list = device_list  # 当前通话的设备
         self.device_info_map = device_info_map
         self.follow_data = follow_data  # 呼叫随路数据(作为落单数据)
@@ -537,7 +541,7 @@ class CallInfo(BaseApi):
                    uuid1=data.get('uuid1'), uuid2=data.get('uuid2'), cdr_notify_url=data.get('cdr_notify_url'),
                    queue_level=data.get('queue_level'), transfer_agent=data.get('transfer_agent'),
                    bucket_type=data.get('bucket_type'), user_no_answer_end_call=data.get('user_no_answer_end_call'),
-                   device_list=data.get('device_list', []),device_info_map=device_info_map,
+                   hangup_count=data.get('hangup_count'), device_list=data.get('device_list', []),device_info_map=device_info_map,
                    follow_data=follow_data, process_data=process_data,
                    next_commands=next_commands, call_details=call_details)
 
@@ -592,6 +596,7 @@ class CallInfo(BaseApi):
             "transfer_agent": self.transfer_agent,
             "bucket_type": self.bucket_type,
             "user_no_answer_end_call": self.user_no_answer_end_call,
+            "hangup_count": self.hangup_count,
             "device_list": [x for x in self.device_list],
             "device_info_map": {key: vars(value) for key, value in self.device_info_map.items()},
             "follow_data": {key: vars(value) for key, value in self.follow_data.items()},

+ 35 - 3
src/core/callcenter/cache.py

@@ -4,6 +4,7 @@
 import json
 import sys
 import time
+import traceback
 import uuid
 from datetime import datetime
 
@@ -76,7 +77,7 @@ class Cache:
 
 
     # 缓存CALL_INFO
-    def add_call_info(self, call: CallInfo, persistent=False):
+    def add_call_info(self, call: CallInfo, persistent=True):
         for k, v in call.device_info_map.items():
             self.add_device(k, call.call_id)
         # print('add_call_info call_id:%s, call=%s'% (call.call_id, call))
@@ -211,12 +212,12 @@ class Cache:
         return self.redis_handler.redis.set(key, "1", ex=60, nx=True)
 
     def lock_register_per_hours(self):
-        hour = datetime.now().strftime('%Y%m%d%H')
+        hour = datetime.now().strftime('%Y%m%d')
         key = BOT_REGISTER_PER_HOURS %hour
         return self.redis_handler.redis.get(key)
 
     def set_register_per_hours(self, expire=86400):
-        hour = datetime.now().strftime('%Y%m%d%H')
+        hour = datetime.now().strftime('%Y%m%d')
         key = BOT_REGISTER_PER_HOURS %hour
         return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)
 
@@ -238,3 +239,34 @@ class Cache:
         self.logger.info("set_pjsua_thread_lock")
         return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)
 
+    def get_serial_no_answer_cnt(self, call_info:CallInfo):
+        try:
+            if call_info.hangup_count >= call_info.answer_count:
+                key = CALL_SERIAL_NO_ANSWER % SAAS_ID
+                self.logger.info('get_serial_no_answer_cnt:call_id=%s, call_time_length=%s', call_info.call_id, call_info.answer_count)
+                if call_info.answer_count <= 0:
+                    self.redis_handler.redis.sadd(key, call_info.call_id)
+                    return self.redis_handler.redis.scard(key)
+                else:
+                    self.redis_handler.redis.delete(key)
+        except Exception as e:
+            traceback.print_exc()
+            self.logger.info('get_serial_no_answer_cnt:exception, msg:%s',e )
+        return 0
+
+    def get_serial_speed_hangup_cnt(self, call_info:CallInfo):
+        try:
+            if call_info.hangup_count >= call_info.answer_count:
+                now = datetime.now().timestamp()
+                key = CALL_SERIAL_SPEED_HANGUP % SAAS_ID
+                call_len = now - call_info.call_time
+                self.logger.info('get_serial_speed_hangup_cnt:call_id=%s, call_time_length=%s', call_info.call_id, call_len)
+                if call_len <= CALL_SERIAL_SPEED_HANGUP_OFFSET:
+                    self.redis_handler.redis.sadd(key, call_info.call_id)
+                    return self.redis_handler.redis.scard(key)
+                else:
+                    self.redis_handler.redis.delete(key)
+        except Exception as e:
+            traceback.print_exc()
+            self.logger.info('get_serial_speed_hangup_cnt:exception, msg:%s',e)
+        return 0

+ 1 - 1
src/core/callcenter/call.py

@@ -69,7 +69,7 @@ class CallService:
         self.agent_monitor_service.update_dialing(agent_monitor)
         self.push_handler.push_on_call_ring(call_info.saas_id, flow_id=call_info.cti_flow_id, user_id=agent_id, scene=AgentScene.MANUAL, call_id=call_info.call_id, service_direct=ServiceDirect.MANUAL_CALL.service_direct)
         self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.DIALING, AgentLogState.DIALING)
-        self.data_handle_server.create_record(call_info.call_id, call_info.called, call_info.call_type, service_category=0, category=1 , user_id=agent.user_id, user_name=agent.agent_name)
+        self.data_handle_server.create_record(call_info.call_id, call_info.called, 2, service_category=0, category=1 , user_id=agent.user_id, user_name=agent.agent_name)
 
     def hold(self, call_id, device_id):
         self.logger.info('hold, custom_device_id=%s'%device_id)

+ 14 - 9
src/core/callcenter/callback.py

@@ -1,17 +1,17 @@
 #!/usr/bin/env python3
 # encoding:utf-8
-import json
+import concurrent.futures
 import queue
+import random
 import threading
-import concurrent.futures
 
 import mmh3
-import random
+
 import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 from src.core.callcenter.agent import AgentEventService
 from src.core.callcenter.cache import Cache
 from src.core.callcenter.enumeration import CallType
-from src.core.callcenter.esl.constant.event_names import CUSTOM, DETECTED_TONE
+from src.core.callcenter.esl.constant.event_names import DETECTED_TONE
 
 
 class Callback(object):
@@ -22,7 +22,7 @@ class Callback(object):
         self.logger = app.logger
         self.cache = Cache(app)
         self.event_queue = queue.Queue()
-        self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="callback-event-pool") for x in range(thread_num)}
+        self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="callback-event-pool") for x in range(1)}
         self.agent_event_service = AgentEventService(app)
         threading.Thread(target=self.start).start()
 
@@ -32,11 +32,16 @@ class Callback(object):
                 event, call_info, device_info = self.event_queue.get(timeout=1)
                 call_type = CallType.get_by_code(call_info.call_type) if call_info else None
                 if call_type is None:
+                    event_name = EslEventUtil.getEventName(event)
+                    self.logger.info("callback:call_type_none:return::event_name=%s, call_info=%s, device_info=%s", event_name, call_info, device_info)
                     continue
+
                 if CallType.BOT_CALL == call_type or CallType.INCOMING_BOT_CALL == call_type:
-                    self.choose_thread_pool_executor(event).submit(self.agent_event_service.bot_event_channel, event, call_info, device_info)
+                    threading.Thread(target=self.agent_event_service.bot_event_channel, args=(event, call_info, device_info)).start()
+                    # self.choose_thread_pool_executor(event).submit(self.agent_event_service.bot_event_channel, event, call_info, device_info)
                 else:
-                    self.choose_thread_pool_executor(event).submit(self.agent_event_service.agent_event_channel, event, call_info, device_info)
+                    threading.Thread(target=self.agent_event_service.agent_event_channel, args=(event, call_info, device_info)).start()
+                    # self.choose_thread_pool_executor(event).submit(self.agent_event_service.agent_event_channel, event, call_info, device_info)
             except:
                 pass
 
@@ -58,10 +63,10 @@ class Callback(object):
             call_id = self.cache.get_call_id_by_device_id(device_id)
         call_info = self.cache.get_call_info(call_id)
         if not call_info:
-            # self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
+            self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
             return
         device_info = call_info.device_info_map.get(device_id) if call_info and call_info.device_info_map else None
-        # self.logger.info("callback::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
+        self.logger.info("callback::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
         self.event_queue.put_nowait((event, call_info, device_info))
 
 

+ 1 - 1
src/core/callcenter/config.py

@@ -48,7 +48,7 @@ dictConfig({
 
         },
         "root": {
-            "level": "DEBUG",  # # handler中的level会覆盖掉这里的level
+            "level": "INFO",  # # handler中的level会覆盖掉这里的level
             "handlers": ["console", "log_file"],
         },
     }

+ 5 - 0
src/core/callcenter/constant.py

@@ -43,6 +43,7 @@ READY_TIMES = "readyTimes"
 SEREVICE_TIMES = "serviceTimes"
 
 HOLD_MUSIC_PATH = '/freeswitch/music/hold.wav'
+AGENT_BUSY_MUSIC_PATH = '/freeswitch/music/agent_busy.wav'
 WaitingHangupMusicPath = '/freeswitch/music/sorry.wav'
 
 BASE_RECORD_PATH = '/freeswitch/record/'
@@ -82,6 +83,7 @@ CALL_INFO = "CALL_INFO:"
 START_AGENT_NUM = "1000"
 
 DELAY_ACTION_BATCH_SIZE = 10
+CALL_SERIAL_SPEED_HANGUP_OFFSET = 5
 CENTER_AGENT_HEARTBEAT = "CENTER:AGENT:HEARTBEAT:%s"
 CTI_ENGINE_DELAY_ACTION = "DELAY:ACTION:%s"
 CTI_ENGINE_DELAY_ACTION_LOCK = "DELAY:ACTION:LOCK:%s"
@@ -91,6 +93,9 @@ CTI_MANAGE_CENTER_CALL_END_KEY = "CTI:MANAGE:CENTER:CALL:END:KEY:%s"
 CTI_AGENT_MANUAL_ANSWER = "AGENT:MANUAL:ANSWER:%s:%s"
 BOT_REGISTER_PER_HOURS = "BOT:REGISTER:PER_HOURS:%s"
 BOT_PJSUA_THREAD_LOCK = "BOT:PJSUA:THREAD:LOCK:%s"
+CALL_SERIAL_NO_ANSWER = "CALL:SERIAL:NO_ANSWER:%s"
+CALL_SERIAL_SPEED_HANGUP = "CALL:SERIAL:SPEED:HANGUP:%s"
+CENTER_AGENT_LIVE_CNT = "CENTER:AGENT:LIVE:CNT:%s"
 
 def get_json_dict(json_text=None):
     if isinstance(json_text, str):

+ 42 - 4
src/core/callcenter/dao.py

@@ -111,10 +111,10 @@ class AgentActionLog(db.Model):
     pre_check_state = db.Column(db.SmallInteger, nullable=False, default=-1, comment='上一次签入或签出')
     service_state = db.Column(db.SmallInteger, nullable=False, default=-1, comment='坐席状态')
     pre_service_state = db.Column(db.SmallInteger, nullable=False, default=-1, comment='上一次坐席状态')
-    check_state_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime(2000, 1, 1), comment='签入或签出时间')
-    pre_check_state_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime(2000, 1, 1), comment='上一次签入或签出时间')
-    service_state_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime(2000, 1, 1), comment='坐席状态变更时间')
-    pre_service_state_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime(2000, 1, 1), comment='上一次坐席状态变更时间')
+    check_state_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime.now, comment='签入或签出时间')
+    pre_check_state_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime.now, comment='上一次签入或签出时间')
+    service_state_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime.now, comment='坐席状态变更时间')
+    pre_service_state_time = db.Column(db.TIMESTAMP, nullable=False, default=datetime.now, comment='上一次坐席状态变更时间')
     check_state_duration = db.Column(db.BigInteger, nullable=False, default=0, comment='行为持续时间')
     service_state_duration = db.Column(db.BigInteger, nullable=False, default=0, comment='状态持续时间')
     task_id = db.Column(db.String(32), nullable=False, default='', comment='任务Id')
@@ -412,6 +412,7 @@ class Whitelist(db.Model):
     id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键')
     phone = db.Column(db.String(20), nullable=False, comment='电话号码')
     description = db.Column(db.String(255), nullable=True, comment='描述说明(备注)')
+    type = db.Column(db.SmallInteger, nullable=False, default='0', comment='类型(0人工白名单  1机器人白名单)')
     del_flag = db.Column(db.SmallInteger, nullable=False, default=False, comment='删除标志(0代表存在 2代表删除)')
     revision = db.Column(db.Integer, nullable=True, comment='乐观锁')
     create_by = db.Column(db.String(32), nullable=True, comment='创建人')
@@ -425,6 +426,7 @@ class Whitelist(db.Model):
             'id': self.id,
             'phone': self.phone,
             'description': self.description,
+            'type': self.type,
             'del_flag': self.del_flag,
             'revision': self.revision,
             'create_by': self.create_by,
@@ -452,6 +454,8 @@ class CallRecord(db.Model):
     status = db.Column(db.SmallInteger, nullable=True, comment='通话状态(0未接听 1已接通)')
     sip_status = db.Column(db.String(32), nullable=True, comment='sip码')
     sip_hangup_cause = db.Column(db.String(32), nullable=True, comment='sip挂机描述')
+    hangup_dir = db.Column(db.SmallInteger, nullable=True, comment='挂机方向:1 主叫 2 被叫 3 机器人')
+    hangup_reason = db.Column(db.String(255), nullable=True, comment='挂机原因')
     phone = db.Column(db.String(20), nullable=True, comment='电话号码')
     bussiness_type = db.Column(db.String(50), nullable=True, comment='业务类型(创个返回字符串)')
     url = db.Column(db.String(255), nullable=True, comment='录音的地址')
@@ -526,4 +530,38 @@ class BotRecords(db.Model):
             'intent': self.intent,
             'contents': self.contents,
             'dialog': self.dialog,
+        }
+
+class WarningRecord(db.Model):
+    __tablename__ = 't_warning_record'
+    __table_args__ = {'comment': '告警记录表'}
+
+    id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键')
+    type = db.Column(db.SmallInteger, nullable=True, comment='类型(0:连续5个未接  1:10分钟空岗报警  2:连续3个5s内挂断)')
+    desc = db.Column(db.String(500), nullable=True, comment='报警描述')
+    read = db.Column(db.SmallInteger, nullable=False, default=0, comment='是否已读(0否 1是)')
+    del_flag = db.Column(db.SmallInteger, nullable=False, default=0, comment='删除标志(0代表存在 2代表删除)')
+    revision = db.Column(db.Integer, nullable=True, comment='乐观锁')
+    create_by = db.Column(db.String(100), nullable=True, comment='创建人')
+    create_time = db.Column(db.DateTime, nullable=True, comment='创建时间')
+    update_by = db.Column(db.String(100), nullable=True, comment='更新人')
+    update_time = db.Column(db.DateTime, nullable=True, comment='更新时间')
+    remark = db.Column(db.String(500), nullable=True, comment='备注')
+
+    def __repr__(self):
+        return json.dumps(self.to_dict())
+
+    def to_dict(self):
+        return {
+            'id': self.id,
+            'type': self.type,
+            'desc': self.desc,
+            'read': self.read,
+            'del_flag': self.del_flag,
+            'revision': self.revision,
+            'create_by': self.create_by,
+            'create_time': self.create_time.isoformat() if self.create_time else None,
+            'update_by': self.update_by,
+            'update_time': self.update_time.isoformat() if self.update_time else None,
+            'remark': self.remark,
         }

+ 29 - 10
src/core/callcenter/data_handler.py

@@ -1,3 +1,5 @@
+
+import time
 from src.core import with_app_context
 from src.core.callcenter.constant import START_AGENT_NUM
 from src.core.callcenter.dao import *
@@ -7,6 +9,7 @@ class DataHandleServer:
     """通话记录服务"""
     def __init__(self,app):
         self.app = app
+        self.logger = app.logger
 
     @with_app_context
     def create_record(self, call_id, caller_number, call_type, service_category=None, category=0, user_id=None,user_name=None):
@@ -19,7 +22,7 @@ class DataHandleServer:
             "service_category": service_category,
             "user_id":user_id,
             "user_name": user_name,
-            'status': 0
+            'status': 1 if category==0 and service_category == 1 else 0  # 如果通话类型啥呼入并且走到ai 是接通状态 否则默认是未接通
         }
         call_record = CallRecord()
         self.app.logger.info(f"创建通话记录: {call_info}")
@@ -35,30 +38,46 @@ class DataHandleServer:
             db.session.rollback()
             raise ValueError(f"创建记录失败: {e}")
 
+    @with_app_context
+    def create_warning_record(self, warning_type, warning_desc):
+        try:
+            warn_record = WarningRecord(type=warning_type, desc=warning_desc, create_time=datetime.now())
+            db.session.add(warn_record)
+            db.session.commit()
+            print("报警记录创建成功")
+        except Exception as e:
+            db.session.rollback()
+            raise ValueError(f"报警创建记录失败: {e}")
+
     @with_app_context
     def update_record(self, session_id, **kwargs):
+        start_time = time.time()
+        self.logger.info(f"update_record::session_id:{session_id}")
         call_record = CallRecord.query.filter(CallRecord.session_id == session_id).first()
-
+        if not call_record:
+            self.logger.info("update_record::call_recard is empty !!!")
+            return
         time_end = kwargs.get('time_end')
         user_id =  kwargs.get('user_id')
         user_name = kwargs.get('user_name')
         if time_end and call_record.type==1:
             bot_record = BotRecords.query.filter(BotRecords.session == session_id).first()
-            call_record.bussiness_type = bot_record.intent if bot_record else ''
+            call_record.bussiness_type = bot_record.intent if bot_record else '未知'
         # 如果记录是转人工并且有客服接通把客服更新到转接字段
-        self.app.logger.debug(f"Received kwargs: {kwargs} user_id:{user_id},user_name:{user_name}")
-        # if call_record.service_category==2 and user_id:
-        #    call_record.transfer_user_id = user_id
-        #    call_record.transfer_user_name =user_name
-        #    kwargs.pop('user_id', None)
-        #    kwargs.pop('user_name', None)
+        self.logger.debug(f"Received kwargs: {kwargs} user_id:{user_id},user_name:{user_name}, call_record:{call_record}")
+        #如果记录已经有user_id不再更新 删除参数里面的user_id
+        if call_record.user_id and 'user_id' in kwargs:
+           kwargs.pop('user_id', None)
+           kwargs.pop('user_name', None)
         # 动态更新字段
         for key, value in kwargs.items():
             if hasattr(call_record, key):
                 setattr(call_record, key, value)
-        self.app.logger.info(f"更新通话记录: {kwargs}")
         db.session.commit()
 
+        time_cost = (time.time() - start_time) * 1000
+        self.logger.info(f"更新通话记录::session_id:{session_id}, time_cost:{time_cost},  {kwargs}, {call_record.to_dict()}")
+
     @with_app_context
     def get_user_name(self,agent_num):
         agent = Agent.query.filter(Agent.agent_num == agent_num).first()

+ 13 - 0
src/core/callcenter/enumeration.py

@@ -3,6 +3,18 @@
 
 from enum import Enum
 
+class WhiteTypeEnum(Enum):
+    DEFAULT = (0, "传统")
+    AI = (1, "AI")
+
+    def __init__(self, code=None, description=None):
+        self.code = code
+        self.description = description
+
+    @classmethod
+    def get_by_code(cls, code):
+        return next((member for member in cls if member.code == code), None)
+
 
 class DelayActionEnum(Enum):
     CALL_TIMEOUT_HANGUP = ('CALL_TIMEOUT_HANGUP', "超时挂机")
@@ -406,6 +418,7 @@ class HangupDir(Enum):
     HOST_HANGUP = (1, "主叫挂断")
     CUSTOMER_HANGUP = (2, "被叫挂断")
     PLATFORM_HANGUP = (3, "平台挂机")
+    ROBOT_HANGUP = (4, "机器人挂机")
 
     def __init__(self, code=None, description=None):
         self.code = code

+ 86 - 47
src/core/callcenter/esl/client.py

@@ -17,7 +17,8 @@ from src.core.callcenter import BizException
 from src.core.callcenter.cache import Cache
 from src.core.callcenter.api import MakeCallContext, DelayAction, CallInfo, DeviceInfo, NextCommand
 from src.core.callcenter.callback import Callback
-from src.core.callcenter.constant import SK, EMPTY, WaitingHangupMusicPath, SAAS_ID, HOLD_MUSIC_PATH
+from src.core.callcenter.constant import SK, EMPTY, WaitingHangupMusicPath, SAAS_ID, HOLD_MUSIC_PATH, \
+    AGENT_BUSY_MUSIC_PATH
 from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, \
     SPACE, SOFIA, \
     ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, \
@@ -27,7 +28,7 @@ import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 import src.core.callcenter.esl.handler as event_handler
 from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2, sipHeaderCallId
 from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType, \
-    Direction, CdrType, BizErrorCode
+    Direction, CdrType, BizErrorCode, WhiteTypeEnum
 from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
 from src.core.callcenter.snowflake import Snowflake
 from src.core.datasource import SERVE_HOST
@@ -105,6 +106,7 @@ class InboundClient:
                     self.choose_thread_pool_executor(e).submit(self.process_esl_event, e)
 
     def choose_thread_pool_executor(self, e):
+        event_name = EslEventUtil.getEventName(e)
         call_id = EslEventUtil.getCallId(e)
         device_id = EslEventUtil.getUniqueId(e)
         wdh_device_id = EslEventUtil.getDeviceId(e)
@@ -113,7 +115,7 @@ class InboundClient:
             random_index = abs(mmh3.hash(random_id)) % len(self.executors)
         else:
             random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0
-        # self.logger.info('choose_thread_pool_executor.index=', random_index, call_id, device_id, wdh_device_id)
+        # self.logger.info('choose_thread_pool_executor:event_name=%s, random_index=%s, call_id=%s, unique_id=%s, device_id=%s, event_time=%s', event_name, random_index, call_id, device_id, wdh_device_id, event_time)
         return self.executors.get(random_index)
 
     def process_esl_event(self, e):
@@ -121,8 +123,13 @@ class InboundClient:
         start_time = time.time()
         event_name = EslEventUtil.getEventName(e)
         coreUUID = EslEventUtil.getCoreUuid(e)
+        call_id = EslEventUtil.getCallId(e)
+        device_id = EslEventUtil.getUniqueId(e)
+        wdh_device_id = EslEventUtil.getDeviceId(e)
+        event_timestamp = EslEventUtil.getEventTimestamp(e)
+        event_time = datetime.fromtimestamp(event_timestamp).strftime('%Y-%m-%d %H:%M:%S')
         address = self.host + ':' + self.port
-        # self.logger.info("process_esl_event.event_name=%s,coreUUID=%s", event_name, coreUUID)
+        self.logger.info('process_esl_event:event_name=%s, call_id=%s, unique_id=%s, device_id=%s, event_time=%s', event_name, call_id, device_id, wdh_device_id, event_time)
         try:
             self.callback.callback_event(e)
             if event_name in self.handler_table:
@@ -138,8 +145,11 @@ class InboundClient:
         except:
             traceback.print_exc()
         finally:
-            latency = (time.time() - start_time)
+            time_cost = (time.time() - start_time) * 1000
+            registry.ESL_EVENT_COST.labels(event_name).observe(time_cost)
+            latency = (time.time() - event_timestamp) * 1000
             registry.ESL_EVENT_LATENCY.labels(event_name).observe(latency)
+            self.logger.info('process_esl_event:event_name=%s, time_cost=%s, latency=%s, call_id=%s, unique_id=%s, device_id=%s, event_time=%s', event_name, time_cost, latency, call_id, device_id, wdh_device_id, event_time)
 
     def do_delay_action(self, action, message):
         delay_action = DelayAction.from_json(message)
@@ -163,6 +173,7 @@ class InboundClient:
     def exec_when_call_timeout(self, call_id, device_id):
         call_info = self.cache.get_call_info(call_id)
         if not call_info or not (device_id in call_info.device_list):
+            self.logger.info("do_delay_action:exec_when_call_timeout:return, device_id=%s, call_info=%s", device_id, call_info)
             return
         device_info = call_info.device_info_map.get(device_id)
         if device_info and device_info.answer_time is None:
@@ -184,7 +195,7 @@ class InboundClient:
 
             self.cache.add_call_info(call_info)
             self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
-            self.dataHandleServer.update_record(call_id, status= 0)
+            # self.dataHandleServer.update_record(call_id, status= 0)
 
     def exec_when_play_timeout(self, call_id):
         call_info = self.cache.get_call_info(call_id)
@@ -201,21 +212,33 @@ class InboundClient:
         if not call_info:
             self.logger.info("exec_when_acd_timeout callInfo为空 callId: %s", call_id)
             return
+
+        def play_sorry(music_file):
+            self.hold_play(device_id, music_file)
+            self.play_timeout(call_id, timeout=30)
+            next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code)
+            call_info.next_commands = [next_command]
+            self.cache.add_call_info(call_info)
+            self.dataHandleServer.update_record(call_id, hangup_reason= '当前坐席忙,请稍后在播。')
+            self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
+                             device_id, music_file)
+
+        next_cmd = call_info.next_commands[0] if call_info.next_commands and len(call_info.next_commands) >0 else None
+        self.logger.info("do_delay_action:exec_when_acd_timeout:next_cmd=%s, call_info=%s", next_cmd, call_info)
+        if next_cmd and next_cmd.next_type == NextType.NEXT_TRANSFER_CALL.code:
+            device_id = next_cmd.next_value
+            self.break0(device_id)
+            play_sorry(AGENT_BUSY_MUSIC_PATH)
+
         device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER]
+        self.logger.info("do_delay_action:exec_when_acd_timeout:device_list=%s, call_info=%s", device_list, call_info)
         if device_list and len(device_list) == 1:
             device_id = device_list[0].device_id
             self.break0(device_id)
             if not WaitingHangupMusicPath:
                 self.hangup_call(call_id, device_id, CallCause.WAITING_TIMEOUT)
                 return
-            self.hold_play(device_id, WaitingHangupMusicPath)
-            self.play_timeout(call_id, timeout=30)
-            next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code)
-            call_info.next_commands = [next_command]
-            self.cache.add_call_info(call_info)
-            self.dataHandleServer.update_record(call_id, status= 0)
-            self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
-                             device_id, WaitingHangupMusicPath)
+            play_sorry(WaitingHangupMusicPath)
 
     def make_call(self, context: MakeCallContext):
         # self.logger.info("拨打测试context:%s", context.__dict__)
@@ -287,6 +310,8 @@ class InboundClient:
                 self.send_args(device_id, SET, arg, con=_con)
                 e = _con.sendMSG(msg)
                 # e = _con.api(UUID_KILL, device_id)
+                # 更新通话记录
+                self.dataHandleServer.update_record(call_id, hangup_dir=HangupDir.PLATFORM_HANGUP.code)
                 self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e))
                 # self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e.serialize('json')))
         except:
@@ -519,7 +544,7 @@ class OutboundClient:
                 if con.connected():
                     info = con.getInfo()
 
-                    # self.logger.info(json.loads(info.serialize('json')))
+                    self.server.logger.info(json.loads(info.serialize('json')))
                     event_name = info.getHeader("Event-Name")
                     self.server.logger.info('Event-Name: %s', event_name)
                     device_id = info.getHeader("unique-id")
@@ -532,37 +557,36 @@ class OutboundClient:
                     kwargs['variable_sip_h_P-LIBRA-CallId'] = call_id
                     kwargs['variable_sip_h_P-LIBRA-DeviceId'] = device_id
 
-                    _bucket_call_type = 2
+                    bucket_call_type = 2
                     destination = None
                     service_category = 0
                     bucket = self.server.get_bucket(call_id)
                     whitelist = self.server.get_whitelist()
-                    self.server.logger.info('call incoming call_id=%s, device_id=%s, bucket=%s', call_id, device_id, bucket.name)
-                    self.server.logger.info('call incoming caller_number=%s, whitelist=%s', caller_number, json.dumps(whitelist))
-
-                    if self.in_whitelist(caller_number, whitelist):
-                        # 检查白名单
-                        _bucket_call_type = 0
-                        self.answer(con, call_id, device_id)
-                        self.build_call_info(CallType.INCOMING_AGENT_CALL.code, call_id, device_id, new_device_id, destination=None, bucket_type=_bucket_call_type, **kwargs)
-                        self.server.agent.acd_service.transfer_to_agent(call_id, device_id)
-                    elif bucket and bucket.name == 'AI':
-                        #转到ai机器人
-                        _bucket_call_type = 1
-                        service_category = 1
-                        destination = self.server.agent.register(**kwargs)
-                        self.server.logger.info("device_id=%s, destination=%s, new_device_id=%s" % (device_id, destination, new_device_id))
-                        self.build_call_info(CallType.INCOMING_BOT_CALL.code, call_id, device_id, new_device_id, str(destination), bucket_type=_bucket_call_type, **kwargs)
-                        self.server.cache.add_device_user_part(device_id, destination)
-                        con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s" % (call_id, new_device_id, new_device_id, destination), device_id)
+                    in_whitelist_type = self.in_whitelist(caller_number, whitelist)
+                    user_name = None
+                    self.server.logger.info('call incoming call_id=%s, caller_number=%s, device_id=%s, new_device_id=%s, in_whitelist=%s, bucket=%s', call_id, caller_number, device_id, new_device_id, in_whitelist_type, bucket.name)
+                    # 检查白名单
+                    if in_whitelist_type:
+                        if WhiteTypeEnum.AI == in_whitelist_type:
+                            bucket_call_type = 0
+                            service_category = 1
+                            destination = self.bridge_ai(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs)
+                            user_name = f"机器人{destination}"
+                        else:
+                            bucket_call_type = 0
+                            self.transfer_custom(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs)
                     else:
-                        # 传统服务
-                        self.answer(con, call_id, device_id)
-                        self.build_call_info(CallType.INCOMING_AGENT_CALL.code, call_id, device_id, new_device_id, destination=None, bucket_type=_bucket_call_type,  **kwargs)
-                        self.server.agent.acd_service.transfer_to_agent(call_id, device_id)
-
-                    registry.CALL_INCOMING_REQUESTS.labels(f"{_bucket_call_type}").inc()
-                    self.server.dataHandleServer.create_record(call_id, caller_number, _bucket_call_type, service_category=service_category, user_id=destination if _bucket_call_type == 1 else None , user_name=  f"机器人{destination}" if _bucket_call_type ==1 else None)
+                        # 自然分流
+                        if bucket and bucket.name == 'AI':
+                            bucket_call_type = 1
+                            service_category = 1
+                            destination = self.bridge_ai(con, bucket_call_type, call_id,  device_id, new_device_id, **kwargs)
+                            user_name = f"机器人{destination}"
+                        else:
+                            self.transfer_custom(con, bucket_call_type, call_id, device_id, new_device_id, **kwargs)
+
+                    registry.CALL_INCOMING_REQUESTS.labels(f"{bucket_call_type}").inc()
+                    self.server.dataHandleServer.create_record(call_id, caller_number, bucket_call_type, service_category=service_category, user_id=destination if user_name else None, user_name= user_name)
 
                     try:
                         con.disconnect()
@@ -581,13 +605,28 @@ class OutboundClient:
                     # Ignore the error if socket is already closed
                     pass
 
+        def transfer_custom(self, con, bucket_call_type, call_id, device_id, new_device_id, **kwargs):
+            self.answer(con, call_id, device_id)
+            self.build_call_info(CallType.INCOMING_AGENT_CALL.code, call_id, device_id, new_device_id, destination=None, bucket_type=bucket_call_type, **kwargs)
+            self.server.agent.acd_service.transfer_to_agent(call_id, device_id)
+
+        def bridge_ai(self, con, bucket_call_type, call_id, device_id, new_device_id, **kwargs):
+            destination = self.server.agent.register(**kwargs)
+            self.server.logger.info("call_id=%s, device_id=%s, destination=%s, new_device_id=%s" % (call_id, device_id, destination, new_device_id))
+            self.build_call_info(CallType.INCOMING_BOT_CALL.code, call_id, device_id, new_device_id, str(destination), bucket_type=bucket_call_type, **kwargs)
+            self.server.cache.add_device_user_part(device_id, destination)
+            con.execute("bridge", "{sip_h_P-LIBRA-CallId=%s,sip_h_P-LIBRA-DeviceId=%s,origination_uuid=%s}user/%s" % (call_id, new_device_id, new_device_id, destination), device_id)
+            return destination
+
         def in_whitelist(self, caller_number, whitelist):
             for x in whitelist:
-                if caller_number in x or caller_number in x:
-                    return True
-            return False
+                phone, _type = x
+                if caller_number in phone or phone in caller_number:
+                    return WhiteTypeEnum.get_by_code(_type)
+
+            return None
 
-        def answer(self, con, call_id, device_id, timeouts=30):
+        def answer(self, con, call_id, device_id, timeouts=55):
             con.execute("answer", "", device_id)
             # con.execute("bgapi", f"uuid_setvar {device_id} {sipHeaderCallId} {call_id}", device_id)
             con.execute("playback", HOLD_MUSIC_PATH, device_id)
@@ -641,7 +680,7 @@ class OutboundClient:
             self.cache_job_scheduler.add_job(self.update_cache_job, run_date=datetime.now())
             self.cache_job_scheduler.add_job(self.update_cache_job, 'interval', seconds=60, max_instances=1, name='cache_job_daemon')
             self.cache_job_scheduler.add_job(self.update_whitelist, run_date=datetime.now())
-            self.cache_job_scheduler.add_job(self.update_whitelist, 'interval', seconds=600, max_instances=1,name='cache_job_whiteList')
+            self.cache_job_scheduler.add_job(self.update_whitelist, 'interval', seconds=60, max_instances=1,name='cache_job_whiteList')
             self.cache_job_scheduler.start()
 
         def update_cache_job(self):
@@ -653,7 +692,7 @@ class OutboundClient:
         def update_whitelist(self):
             with self.app.app_context():
                 phones = Whitelist.query.filter_by(del_flag=0).all()
-                self.whitelist = [phone.phone for phone in phones]
+                self.whitelist = [(phone.phone, phone.type) for phone in phones]
                 self.logger.info("Whitelist updated: %s", self.whitelist)
 
         def get_whitelist(self):

+ 8 - 8
src/core/callcenter/esl/handler/channel_answer_handler.py

@@ -23,24 +23,24 @@ class ChannelAnswerHandler(EslEventHandler):
 
     def handle(self, address, event, coreUUID):
         call_id = EslEventUtil.getCallId(event)
+        device_id = EslEventUtil.getDeviceId(event)
         call_info = self.cache.get_call_info(call_id)
-        self.logger.info("answer call_id:%s, call_info:%s", call_id, call_info)
+        self.logger.info("answer call_id:%s, device_id:%s, call_info:%s", call_id, device_id, call_info)
         if not call_info:
             self.logger.info("answer call_info is null, call_id:%s", call_id)
             return
 
         registry.CALL_ANSWER_REQUESTS.labels(f"{call_info.bucket_type}").inc()
-        device_id = EslEventUtil.getDeviceId(event)
         device_info = call_info.device_info_map.get(device_id)
         if not device_info:
             self.logger.info("answer device_info is null, call_id:%s, call_info:%s", call_id, call_info)
             return
 
-        if CallType.AGENT_CALL.code == call_info.call_type and device_info.device_type == DeviceType.CUSTOMER.code:
-            self.record(event, device_id)
+        self.record(event, device_id)
+        # if CallType.AGENT_CALL.code == call_info.call_type and device_info.device_type == DeviceType.CUSTOMER.code:
 
         if (CallType.BOT_CALL.code == call_info.call_type or CallType.INCOMING_BOT_CALL.code == call_info.call_type) and device_info.device_type == DeviceType.ROBOT.code:
-            self.record(event, device_id)
+            # self.record(event, device_id)
             call_info.answer_flag = AnswerFlag.ROBOT_ANSWER.code
             registry.CALL_BOT_ANSWER_REQUESTS.labels(f"{call_info.bucket_type}").inc()
 
@@ -50,13 +50,13 @@ class ChannelAnswerHandler(EslEventHandler):
 
         next_command = call_info.next_commands[0] if len(call_info.next_commands) > 0 else None
         device_type = DeviceType.get_by_code(device_info.device_type)
-        self.logger.info("ChannelAnswerHandler call_id:%s, device_id:%s, device_type:%s, next_command:%s"%(call_id, device_id, device_type, next_command))
+        self.logger.info("answer call_id:%s, device_id:%s, device_type:%s, next_command:%s"%(call_id, device_id, device_type, next_command))
         if not next_command:
             self.cache.add_call_info(call_info)
             return
 
         call_info.next_commands.remove(next_command)
-        self.logger.info("ChannelAnswerHandler call_info.answer_time::%s,time:%s", call_info.answer_time, EslEventUtil.getEventDateTimestamp(event))
+        self.logger.info("answer call_id:%s, device_id:%s, call_info.answer_time::%s,time:%s", call_id, device_id, call_info.answer_time, EslEventUtil.getEventDateTimestamp(event))
 
         if NextType.NEXT_CALL_OTHER.code == next_command.next_type:
             self.call_other(call_info, device_info, event)
@@ -167,7 +167,7 @@ class ChannelAnswerHandler(EslEventHandler):
         else:
             _record_url = self.start_recording(device_id, get_record_file_name(call_id, CallStage.ALL), call)
         device.record = _record_url
-        self.logger.info("luyincall:%s, device:%s" % (call, device))
+        self.logger.info("luyin::call_id:%s, device_id:%s, call:%s, device:%s" % (call_id, device_id, call, device))
         self.cache.add_call_info(call)
         return _record_url
 

+ 88 - 39
src/core/callcenter/esl/handler/channel_hangup_handler.py

@@ -1,6 +1,6 @@
 #!/usr/bin/env python3
 # encoding:utf-8
-
+import subprocess
 import json
 import sys
 import os
@@ -8,7 +8,9 @@ import traceback
 from pydub import AudioSegment
 import threading
 from src.core.callcenter.acd import AcdService
+from src.core.callcenter.agent import AgentStateService
 from src.core.callcenter.call import CallService
+from src.core.callcenter.constant import SAAS_ID
 from src.core.callcenter.enumeration import CallType, DeviceType, AnswerFlag, NextType, CdrType, HangupDir, \
     CallCause,AgentServiceState,AgentScene, WorkStatus
 from src.core.callcenter.esl.annotation import EslEventName
@@ -29,25 +31,27 @@ class ChannelHangupHandler(EslEventHandler):
         self.call_service = CallService(inbound_client,inbound_client.app)
         self.push_handler = PushHandler(inbound_client.logger)
         self.dataHandleServer=DataHandleServer(inbound_client.app)
+        self.agent_state_service = AgentStateService(inbound_client.app)
+
 
     def handle(self, address, event, coreUUID):
         # self.logger.info(json.loads(event.serialize('json')))
         try:
             call_id = self.get_call_id(event)
-            self.logger.info('call_id is %s', call_id)
+            device_id = EslEventUtil.getDeviceId(event)
+            self.logger.info('hangup call_id:%s, device_id:%s', call_id, device_id)
             if not call_id:
                 self.release(event)
                 self.logger.info("call_id is null, event=%s", json.loads(event.serialize('json')))
                 return
 
             call_info = self.cache.get_call_info(call_id)
-            self.logger.info('call_info is %s', call_info)
+            self.logger.info('hangup call_id:%s, device_id:%s, call_info: %s', call_id, device_id, call_info)
             if not call_info:
                 self.release(event)
                 self.logger.info("call_info:%s is null", call_id)
                 return
 
-            device_id = EslEventUtil.getDeviceId(event)
             device_info = call_info.device_info_map.get(device_id)
             if not device_info:
                 self.release(event)
@@ -58,7 +62,7 @@ class ChannelHangupHandler(EslEventHandler):
                 self.release(event)
 
             count = len(call_info.device_list)
-            self.logger.info('ChannelHangupHandler, call_id=%s, device_id=%s, count=%s'% (call_id, device_id, count))
+            self.logger.info('hangup, call_id=%s, device_id=%s, count=%s'% (call_id, device_id, count))
             try:
                 call_info.device_list.remove(device_id)
             except:
@@ -90,12 +94,13 @@ class ChannelHangupHandler(EslEventHandler):
             # 计算录音时长
             if device_info.record_start_time:
                 device_info.record_time = int(device_info.end_time) - int(device_info.record_start_time)
+            call_info.hangup_count = call_info.hangup_count + 1
             call_info.device_info_map[device_info.device_id] = device_info
             skip_hangup_all = device_info.device_type == DeviceType.ROBOT.code
             bucket_type = call_info.bucket_type if call_info.bucket_type else "EMPTY"
             registry.CALL_HANGUP_REQUESTS.labels(f"{bucket_type}", f"{device_info.sip_status}").inc()
 
-            self.logger.info('ChannelHangupHandler, hangup_reason=%s, device_type=%s, cdr_type=%s, end_time=%s, skip_hangup_all=%s' % (hangup_reason, device_info.device_type, device_info.cdr_type, call_info.end_time, skip_hangup_all))
+            self.logger.info('ChannelHangupHandler, call_id=%s, device_id=%s, hangup_reason=%s, device_type=%s, cdr_type=%s, end_time=%s, skip_hangup_all=%s, answer_count=%s, hangup_count=%s' % (call_id, device_id, hangup_reason, device_info.device_type, device_info.cdr_type, call_info.end_time, skip_hangup_all, call_info.answer_count, call_info.hangup_count))
             # 如果是转人工
             # if 'transferToAgent' == hangup_reason and DeviceType.ROBOT.code == device_info.device_type:
             #     call_info.answer_flag = AnswerFlag.TRANSFER_TO_AGENT.code
@@ -128,16 +133,33 @@ class ChannelHangupHandler(EslEventHandler):
 
             # 全部挂机以后推送挂机状态
             # self.logger.info('yushanghui::call_info.device_list %s', call_info.device_list)
-            if len(call_info.device_list) == 0:
-                self.get_call_info_record(call_info)
-
             # 判断挂机方向 && 更新缓存
             self.hangup_dir(call_info, device_info, cause)
+            if call_info.hangup_count >= call_info.answer_count:
+                # if len(call_info.device_list) == 0:
+                self.get_call_info_record(call_info)
+
+            # 连续报警判断
+            self.hook_serial_warn(call_info)
             self.cache.add_call_info(call_info, persistent=True)
+            if device_info.device_type != DeviceType.ROBOT.code:
+                # 释放坐席接听锁
+                self.agent_state_service.handle_release_agent_lock(call_info.agent_key, SAAS_ID)
 
         except:
             traceback.print_exc()
 
+    def hook_serial_warn(self, call_info:CallInfo):
+        no_answer_cnt = self.cache.get_serial_no_answer_cnt(call_info)
+        if no_answer_cnt >=5:
+            self.logger.warn('WARING::serial_no_answer_cnt greater than 5')
+            self.dataHandleServer.create_warning_record(0, '连续5个未接')
+
+        no_speed_hangup_cnt = self.cache.get_serial_speed_hangup_cnt(call_info)
+        if no_speed_hangup_cnt >=3:
+            self.logger.warn('WARING::get_serial_speed_hangup_cnt greater than 3')
+            self.dataHandleServer.create_warning_record(2, '连续3个5s内挂断')
+
     def get_call_id(self, event):
         call_id = EslEventUtil.getCallId(event)
         device_id = EslEventUtil.getDeviceId(event)
@@ -152,47 +174,72 @@ class ChannelHangupHandler(EslEventHandler):
         records = []
         sip_status = []
         hangup_cause = []
-        for value in call_info.device_info_map.values():
-            records.append(value.record) if value.record else None
-            sip_status.append(value.sip_status)
-            hangup_cause.append(value.hangup_cause)
-        self.logger.info("get_call_info_record: %s", records)
-        threading.Thread(target=self._update_record_in_thread, args=(call_info.call_id, list(set(records)), ",".join(sip_status), ",".join(hangup_cause),call_info.answer_count)).start()
-
-    def _update_record_in_thread(self, call_id, records, sip_status, hangup_cause, answer_count):
+        agent_name = ''
+        try:
+            self.logger.info("get_call_info_record: %s", call_info)
+            for value in call_info.device_info_map.values():
+                records.append(value.record) if value.record else None
+                sip_status.append(value.sip_status if value.sip_status else 'EMPTY')
+                hangup_cause.append(value.hangup_cause if value.hangup_cause else 'EMPTY')
+                if value.device_type == DeviceType.AGENT.code :
+                    agent_name = value.agent_key
+            self.logger.info("get_call_info_record: %s,agent_name:%s, sip_status:%s, hangup_cause:%s", records, agent_name, sip_status, hangup_cause)
+            threading.Thread(target=self._update_record_in_thread, args=(call_info, list(dict.fromkeys(records)), ",".join(sip_status), ",".join(hangup_cause), agent_name)).start()
+        except Exception as e:
+            self.logger.info("get_call_info_record:exception %s", e)
+            traceback.print_exc()
+
+    def update_name(self,call_id, agent_name):
+        try:
+            agent = self.dataHandleServer.get_user_name(agent_name)
+            return agent
+        except Exception as e:
+            self.logger.error("update_name error: %s", str(e))
+    def _update_record_in_thread(self, call_info, records, sip_status, hangup_cause, agent_name):
         """用于在独立线程中执行 update_record"""
         try:
-            status = 0 if answer_count <= 0 else 1
+            call_id = call_info.call_id
+            hangup_dir = call_info.hangup_dir
+            agent = self.update_name(call_id, agent_name)
+            # status = 0 if answer_count <= 0 else 1
             if len(records) == 0:
-                self.logger.warning("没有录音文件,直接更新记录: call_id=%s, sip_status=%s, hangup_cause=%s", call_id, sip_status, hangup_cause)
-                self.dataHandleServer.update_record(call_id, time_end=datetime.now(), sip_status=sip_status, sip_hangup_cause=hangup_cause, status=status)
+                self.logger.warning("没有录音文件,直接更新记录: call_id=%s, sip_status=%s, hangup_cause=%s, agent_name=%s, hangup_dir=%s", call_id, sip_status, hangup_cause,agent_name, hangup_dir)
+                self.dataHandleServer.update_record(call_id, time_end=datetime.now(), sip_status=sip_status, sip_hangup_cause=hangup_cause, hangup_dir=hangup_dir, user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
                 return
-            merge_record = self.merge_audio_files(records) if len(records) > 1 else records[0]
-            # try:
-            #     self._ensure_path_permissions(merge_record)
-            #     os.chmod(merge_record, 0o755)  # 设置文件权限为 755
-            #     self.logger.info("成功设置文件权限: %s -> 755", merge_record)
-            # except Exception as chmod_error:
-            #     self.logger.error("设置文件权限失败: %s, error: %s", merge_record, str(chmod_error))
-
-            self.dataHandleServer.update_record(call_id, time_end=datetime.now(), url=merge_record, sip_status=sip_status, sip_hangup_cause=hangup_cause, status=status)
-            self.logger.info("更新录音记录完成: call_id=%s", call_id)
+            merge_record = records[0]
+            # merge_record = self.merge_audio_files(records) if len(records) > 1 else records[0]
+            # 计算录音时长
+            duration = self.get_audio_duration(merge_record) or 0
+            self.dataHandleServer.update_record(call_id, times=int(duration), time_end=datetime.now(), url=merge_record, sip_status=sip_status, sip_hangup_cause=hangup_cause, hangup_dir=hangup_dir, user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
+            self.logger.info("更新录音记录完成: call_id=%s, duration=%s, hangup_dir=%s", call_id, int(duration), hangup_dir)
         except Exception as e:
             self.logger.error("更新录音记录失败: call_id=%s, error=%s", call_id, str(e))
 
-    def _ensure_path_permissions(self, file_path):
-        """确保文件及其父级目录的权限为 755"""
-        current_path = os.path.abspath(file_path)
-        self.logger.info("_ensure_path_permissions::%s", current_path)
-        # while current_path != "/":  # 遍历到根目录为止
-        #     if os.path.exists(current_path):
-        #         os.chmod(current_path, 0o755)  # 设置当前路径权限为 755
-        #     current_path = os.path.dirname(current_path)  # 获取父目录路径
+    def get_audio_duration(self, audio_path):
+        """使用 ffmpeg 计算音频时长"""
+        try:
+            result = subprocess.run(
+                ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", audio_path],
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+                text=True
+            )
+            info = json.loads(result.stdout)
+            duration = float(info["format"]["duration"])
+            return duration
+        except Exception as e:
+            print(f"获取音频时长失败: {e}")
+            return 0
+
     def merge_audio_files(self,audio_files):
         if not audio_files:
             self.logger.info("没有可合并的音频文件")
             return
-            # 初始化第一个音频文件
+        if not os.path.exists(audio_files[0]):
+            self.logger.info("没有可合并的音频文件")
+            return
+
+        # 初始化第一个音频文件
         combined = AudioSegment.from_file(audio_files[0])
         # 循环添加其余的音频文件
         for audio_file in audio_files[1:]:
@@ -238,6 +285,8 @@ class ChannelHangupHandler(EslEventHandler):
             call_info.hangup_dir = HangupDir.HOST_HANGUP.code
         elif DeviceType.CUSTOMER.code == device_info.device_type:
             call_info.hangup_dir = HangupDir.CUSTOMER_HANGUP.code
+        # elif DeviceType.ROBOT.code == device_info.device_type:
+        #     call_info.hangup_dir = HangupDir.ROBOT_HANGUP.code
 
         # if not call_info.end_time:
         #     call_info.end_time = device_info.end_time

+ 3 - 1
src/core/callcenter/esl/handler/dtmf_handler.py

@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 # encoding:utf-8
+import json
 
 from src.core.callcenter.esl.annotation import EslEventName
 from src.core.callcenter.esl.constant.event_names import DTMF
@@ -13,4 +14,5 @@ class DTMFHandler(EslEventHandler):
         super().__init__(inbound_client, bot_agent)
 
     def handle(self, address, event, coreUUID):
-        pass
+        self.logger.info("DTMFHandler, event=%s", json.loads(event.serialize('json')))
+

+ 13 - 1
src/core/callcenter/esl/handler/playback_stop_handler.py

@@ -2,7 +2,8 @@
 # encoding:utf-8
 
 import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
-from src.core.callcenter.constant import HOLD_MUSIC_PATH
+from src.core.callcenter.api import NextCommand
+from src.core.callcenter.constant import HOLD_MUSIC_PATH, WaitingHangupMusicPath
 from src.core.callcenter.data_handler import *
 from src.core.callcenter.enumeration import NextType, CallCause
 from src.core.callcenter.esl.annotation import EslEventName
@@ -40,8 +41,19 @@ class PlaybackStopHandler(EslEventHandler):
             self.logger.info("PLAYBACK_STOP next_command is null, call_info:%s", call_info)
             return
 
+        # self.logger.info("PLAYBACK_STOP next_command:%s", next_command)
+        # if next_command and next_command.next_type == NextType.NEXT_TRANSFER_CALL.code:
+        #     device_id = next_command.next_value
+        #     self.inbound_client.break0(device_id)
+        #     self.inbound_client.hold_play(device_id, WaitingHangupMusicPath)
+        #     self.inbound_client.play_timeout(call_id, timeout=30)
+        #     next_command = NextCommand(device_id = device_id, next_type=NextType.NEXT_HANGUP.code)
+        #     call_info.next_commands = [next_command]
+        #     self.logger.info("PLAYBACK_STOP 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id, device_id, WaitingHangupMusicPath)
+
         if NextType.NEXT_HANGUP == next_command.next_type:
             call_info.end_time = device_info.end_time
             for _device_id in call_info.device_list:
                 self.inbound_client.hangup_call(call_id, _device_id, CallCause.PLAYBACK_STOP)
+
         self.cache.add_call_info(call_info)

+ 5 - 0
src/core/callcenter/esl/utils/esl_event_util.py

@@ -1,6 +1,7 @@
 #!/usr/bin/env python3
 # encoding:utf-8
 
+import time
 import random
 import re
 import string
@@ -231,6 +232,10 @@ def getFreeswitchSwitchname(e):
 def getEventDateTimestamp(e):
     return e.getHeader(EVENT_DATE_TIMESTAMP)
 
+def getEventTimestamp(e):
+    event_timestamp = getEventDateTimestamp(e)
+    event_timestamp_sec = float(event_timestamp) / 1_000_000 if event_timestamp else time.time()
+    return event_timestamp_sec
 
 def getEventSubClass(e):
     return e.getHeader(EVENT_SUB_CLASS)

+ 3 - 0
src/core/callcenter/registry.py

@@ -28,7 +28,9 @@ ASR_ERRORS = Counter( 'asr_error_codes', 'Count of ASR error codes', ['errorCode
 
 
 # esl时间耗时
+ESL_EVENT_COST = Histogram('esl_event_cost', 'Esl Event process cost in seconds', ['eventName'])
 ESL_EVENT_LATENCY = Histogram('esl_event_latency', 'Esl Event latency in seconds', ['eventName'])
+ESL_EVENT_CALLBACK_COST = Histogram('esl_event_callback_cost', 'Esl Event callback process cost in seconds', ['eventName','callType'])
 ESL_EVENT_CALLBACK_LATENCY = Histogram('esl_event_callback_latency', 'Esl Event callback latency in seconds', ['eventName','callType'])
 
 
@@ -42,5 +44,6 @@ BOT_CREATE_ACCOUNT_LATENCY = Histogram('bot_create_account_latency',  '创建虚
 
 FLASK_ACTIVE_THREADS = Gauge('flask_active_threads', 'Number of active threads')
 BOT_AGENT_LIVES = Gauge('bot_agent_lives', 'Number of active agents')
+MANUAL_AGENT_LIVES = Gauge('manual_agent_lives', 'Number of active agents')
 
 

+ 26 - 2
src/core/callcenter/test.py

@@ -1,3 +1,27 @@
-import json
-from src.core.callcenter.api import CallInfo, DeviceInfo, NextCommand
+#
+# import jieba
+#
+# TestStr = "能帮我查一下,我家水费欠多少"
+# seg_list = jieba.cut(TestStr, cut_all=False, HMM=True)
+# print ("Default Mode:", "/ ".join(seg_list))
+
+import uuid
+import mmh3
+
+from src.core.callcenter.dao import Bucket
+
+def get_bucket(custom_uuid=None, buckets=[]):
+    random_id = abs(mmh3.hash(custom_uuid))
+    for bucket in buckets:
+        num = (random_id % 100 + 100) % 100
+        if bucket.lower <= num < bucket.upper:
+            return num, bucket
+    return -1, buckets[0]
+
+if __name__ == '__main__':
+    arr = ['C1879412349555838976','C1879412206890782720','C1879411969535119360','C1879411406290423808','C1879408024871899136','C1879407680997691392','C1879406254007390208','C1879404740748644352','C1879403850650226688','C1879402961977872384','C1879402509785763840','C1879402237567045632','C1879402005592674304','C1879400827102302208','C1879400778024751104','C1879400705488457728','C1879400533513605120','C1879400336188379136','C1879400327959154688','C1879399300082044928','C1879399233669435392','C1879396009050771456','C1879394097295396864','C1879393224498483200','C1879381728368398336','C1879381287505104896','C1879379466774515712','C1879376723787780096','C1879374004641468416','C1879373548330553344','C1879372415646175232','C1879367459866284032','C1879365634769424384','C1879364921326702592','C1879364787436130304','C1879363948554358784','C1879362454358724608','C1879360081448013824','C1879358294565457920','C1879358151116066816','C1879357497190518784','C1879357257641234432','C1879357023229972480','C1879355792935751680','C1879355755749052416','C1879354039309832192']
+    buckets=[Bucket(id=1, name="传统", lower=0, upper=90), Bucket(id=2, name="AI",lower=90, upper=100)]
+    for custom_uuid in arr:
+        num, bucket = get_bucket(custom_uuid=custom_uuid, buckets=buckets)
+        print(custom_uuid, num, bucket.name)
 

+ 15 - 2
src/core/callcenter/views.py

@@ -1,8 +1,9 @@
 #!/usr/bin/env python3
 # encoding:utf-8
+import traceback
 
 from flask import request, render_template_string
-
+import json
 from src.core.callcenter.agent import AgentService, AgentOperService
 from src.core.callcenter.api import AgentCallRequest, AgentActionRequest, HangupCallRequest
 from src.core.callcenter.call import CallService
@@ -188,4 +189,16 @@ def member_active():
 def num_generate():
     """获取 cti 流程 ID"""
     flow_id = call_service.snowflake.next_id()
-    return success_response(flow_id)
+    return success_response(flow_id)
+
+
+@app.route('/open/agent/sdkAnalytics', methods=['POST'])
+def track_event():
+    try:
+        data = request.get_json()
+        # 存入日志文件
+        # app.logger.info(json.dumps(data))
+    except Exception as e:
+        traceback.print_exc()
+        app.logger.error('track_event:exception', e)
+    return success_response('ok')

+ 16 - 16
src/core/callcenter/ws.py

@@ -37,22 +37,22 @@ Thread(target=listen_to_redis).start()
 
 # def common_down_data(user_id, data, namespace='/ws/cs-im'):
 #     emit('common_down_data', data, room=user_id, namespace=namespace)
-def common_down_data(user_id, data, namespace='/ws/cs-im'):
-    """推送 common_down_data 消息"""
-    # 发布消息到 Redis 频道
-    redis_handler.publish('socket_channel', json.dumps({
-        'event': 'common_down_data',
-        'user_id': user_id,
-        'data': data
-    }))
-def common_down_cmd(user_id, data):
-    """推送 common_down_cmd 消息"""
-    # 发布消息到 Redis 频道
-    redis_handler.publish('socket_channel', json.dumps({
-        'event': 'common_down_cmd',
-        'user_id': user_id,
-        'data': data
-    }))
+# def common_down_data(user_id, data, namespace='/ws/cs-im'):
+#     """推送 common_down_data 消息"""
+#     # 发布消息到 Redis 频道
+#     redis_handler.publish('socket_channel', json.dumps({
+#         'event': 'common_down_data',
+#         'user_id': user_id,
+#         'data': data
+#     }))
+# def common_down_cmd(user_id, data):
+#     """推送 common_down_cmd 消息"""
+#     # 发布消息到 Redis 频道
+#     redis_handler.publish('socket_channel', json.dumps({
+#         'event': 'common_down_cmd',
+#         'user_id': user_id,
+#         'data': data
+#     }))
 
 # def common_down_cmd(user_id, data):
 #     emit('common_down_cmd', data, room=user_id)

+ 54 - 51
src/core/voip/asr.py

@@ -4,6 +4,8 @@
 import os
 import json
 import threading
+import traceback
+
 import nls  # 引入阿里云语音识别库
 from aliyunsdkcore.client import AcsClient
 from aliyunsdkcore.request import CommonRequest
@@ -68,101 +70,102 @@ class TestSt:
             print("无法获取Token")
             return None
 
-    def __init__(self, tid, message_receiver=None):
+    def __init__(self, tid, logger, message_receiver=None):
         # self.is_closed = False
         # self.lock = threading.Lock()
-
+        self.logger = logger
+        self.__event = threading.Event()
         self.__th = threading.Thread(target=self.__test_run)
         self.__id = tid
         self.message_receiver = message_receiver
         self._Token = self.get_cached_token()
         self.sr = None
-        print("开始")
+        self.logger.debug("开始")
 
     def start(self):
         self.__th.start()
 
     def send_audio(self, audio_data):
         if self.sr:
-            # print("Sending audio data of length:", len(audio_data))
             self.sr.send_audio(audio_data)
-            # print("Audio data sent.")
-        # if self.sr and not self.is_closed:
-        #     with self.lock:
-        #         try:
-        #             self.sr.send_audio(audio_data)
-        #         except Exception as e:
-        #             print(f"Error sending audio: {e}")
-        #             self.close()
-    # def close(self):
-    #     with self.lock:
-    #         if not self.is_closed:
-    #             self.is_closed = True
-    #             try:
-    #                 self.sr.stop()
-    #             except Exception as e:
-    #                 print(f"Error stopping ASR: {e}")
 
     def close(self):
         try:
             self.sr.stop()
         except Exception as e:
-            print(f"Error stopping ASR: {e}")
+            self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}")
 
     def __test_run(self):
-        print("Thread:{} start..".format(self.__id))
+        self.logger.debug("Thread:%s start..",self.__id)
         nls.enableTrace(True)
-        self.sr = nls.NlsSpeechTranscriber(
-            url=URL,
-            token=self._Token,
-            appkey=APPKEY,
-            on_sentence_begin=self.test_on_sentence_begin,
-            on_sentence_end=self.test_on_sentence_end,
-            on_start=self.test_on_start,
-            on_result_changed=self.test_on_result_chg,
-            on_completed=self.test_on_completed,
-            on_error=self.test_on_error,
-            on_close=self.test_on_close,
-            callback_args=[self.__id]
-        )
-        self.sr.start(
-            aformat="pcm",
-            enable_intermediate_result=True,
-            enable_punctuation_prediction=True,
-            enable_inverse_text_normalization=True
-        )
-        self.sr.ctrl(ex={'max_sentence_silence': '1200ms', 'disfluency': True,'enable_words': True })
-        print("ASR session started.")
+        count = 0
+        self.__event.clear()
+        while not self.__event.is_set():
+            self.sr = nls.NlsSpeechTranscriber(
+                url=URL,
+                token=self._Token,
+                appkey=APPKEY,
+                on_sentence_begin=self.test_on_sentence_begin,
+                on_sentence_end=self.test_on_sentence_end,
+                on_start=self.test_on_start,
+                on_result_changed=self.test_on_result_chg,
+                on_completed=self.test_on_completed,
+                on_error=self.test_on_error,
+                on_close=self.test_on_close,
+                callback_args=[self.__id]
+            )
+            try:
+                self.sr.start(
+                    aformat="pcm",
+                    sample_rate=8000,
+                    enable_intermediate_result=True,
+                    enable_punctuation_prediction=True,
+                    enable_inverse_text_normalization=True,
+                    ex={'max_sentence_silence': 2000, 'disfluency': True, 'enable_words': True}
+                )
+                # _res = self.sr.ctrl(ex={'max_sentence_silence': 6000, 'disfluency': True,'enable_words': True })
+                self.logger.debug(f"[{self.__id}]ASR session started. {count}")
+                self.__event.wait(timeout=.5)
+                self.logger.debug(f"[{self.__id}]ASR session started. {count}")
+            except Exception as e:
+                traceback.print_exc()
+                self.logger.debug(f"[{self.__id}]ASR session start exception. {e}")
+
+            count = count + 1
 
     def test_on_sentence_begin(self, message, *args):
-        # pass
-        print("test_on_sentence_begin:{}".format(message))
+        self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message)
         if self.message_receiver:
             self.message_receiver(message, *args)
 
     def test_on_sentence_end(self, message, *args):
-        print("test_on_sentence_end:{}".format(message))
+        self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message)
         if self.message_receiver:
             self.message_receiver(message, *args)
 
     def test_on_start(self, message, *args):
-        # print("test_on_start:{}".format(message))
+        self.__event.set()
+        self.logger.debug("[%s]test_on_start:%s", self.__id, message)
         pass
 
     def test_on_error(self, message, *args):
-        # print("on_error args=>{}".format(args))
+        self.logger.debug("on_error args=>%s", args)
+        if not self.__event.is_set():
+            self.__event.set()
         if self.message_receiver:
             self.message_receiver(message, *args)
 
     def test_on_close(self, *args):
-        # print("on_close: args=>{}".format(args))
+        self.logger.debug("on_close: args=>%s", args)
+        if not self.__event.is_set():
+            self.__event.set()
         pass
 
     def test_on_result_chg(self, message, *args):
-        # print("test_on_chg:{}".format(message))
+        # self.logger.debug("test_on_chg:{}".format(message))
         if self.message_receiver:
             self.message_receiver(message, *args)
 
     def test_on_completed(self, message, *args):
-        # print("on_completed:args=>{} message=>{}".format(args, message))
+        # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message))
         pass

+ 69 - 55
src/core/voip/bot.py

@@ -18,6 +18,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
 
 from src.core.callcenter import registry
 from src.core.callcenter.cache import Cache
+from src.core.callcenter.enumeration import HangupDir
 from src.core.datasource import SIP_SERVER, SERVE_HOST
 from src.core.voip.constant import *
 
@@ -81,7 +82,7 @@ class MyAudioMediaPort(pj.AudioMediaPort):
         if self.asr:  # 如果ASR实例存在,则发送音频数据
             if self.first:
                 self.first = False
-                self.call.logger.info("Received audio frame: %s %s", frame.buf, frame.size)
+                self.call.logger.debug("Received audio frame: %s, %s %s", self.call.session_id,frame.buf, frame.size)
             self.asr.send_audio(frame.buf)
 
         try:
@@ -99,6 +100,9 @@ class MyAudioMediaPort(pj.AudioMediaPort):
                     # print("测试超长", user_asr_text)
                 elif asr_text:
                     self.user_asr_texts.append(asr_text)
+                    user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
+                    self.user_asr_texts.clear()
+                    self.call.chat(user_asr_text)
                 if time_difference > int(self.call.wait_time):
                     self.call.reset_wait_time()
             else:
@@ -212,7 +216,7 @@ class MyCall(pj.Call):
 
         self.cur_player_file = None   #当前播放的文件
 
-        self.asr = TestSt(self.session_id, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
+        self.asr = TestSt(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
         self.asr.start()  # 启动ASR线程
 
         self.start_time = time.time()  # 当前机器人对话开始时间
@@ -376,6 +380,8 @@ class MyCall(pj.Call):
         if action_code == 'hang':  # 挂断
             self.agent.hangup(self.user_part)
             self.end_statistics()
+            # 更新通话记录
+            self.agent.dataHandleServer.update_record(self.session_id, hangup_dir=HangupDir.ROBOT_HANGUP.code)
         elif action_code == 'transfer':  # 转人工
             self.agent.transfer(user_part=self.user_part, call_id=self.session_id, device_id=self.device_id)
             self.end_statistics()
@@ -449,6 +455,7 @@ class ToTextBotAgent:
             while try_count > 0:
                 once_start = time.time()
                 try:
+                    # message.strip()
                     response = requests.post(url, data=request_data, headers=headers, timeout=3)
                     if response and response.ok:
                         response_data = response.json()
@@ -458,74 +465,77 @@ class ToTextBotAgent:
                             self.call_agent.message_queue.put(message)
                             break
                         else:
-                            self.call_agent.logger.info("响应中没有 'data' 字段")
+                            self.call_agent.logger.info(f"to_request::failed, sessionId={request.sessionId}, response_data:{response_data}")
                     else:
-                        self.call_agent.logger.info(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
+                        self.call_agent.logger.info(f"to_request::请求失败,sessionId:{request.sessionId}, 状态码: {response.status_code if response else None}, 响应内容: {response.text if response else None}")
                 except Exception as e:
                     traceback.print_exc()
-                    self.call_agent.logger.error(f"TaskId={request.taskId}, 请求发生异常 {e}, URL: {url}")
+                    self.call_agent.logger.error(f"to_request::exception, TaskId={request.taskId}, sessionId={request.sessionId}, 请求发生异常: {e}")
                 finally:
                     try_count = try_count - 1
                     latency = (time.time() - once_start)
                     registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)
 
+            self.call_agent.logger.info(f"to_request::sessionId:{request.sessionId}, message:{message}")
             if not message:
+                self.call_agent.logger.info(f"to_request::sessionId:{request.sessionId}, come in default response")
                 massage = self.get_default_response()
                 self.call_agent.message_queue.put(massage)
         finally:
             latency = (time.time() - start_time)
             registry.BOT_REQUEST_COUNT.inc()
             registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
-            self.call_agent.logger.info(f"sessionId={ self.call_agent.session_id}, timeCost={latency}, chat::request:{request_data}, response:{response_data if response_data else None}")
-
-
-    def to_quest(self, request: BotChatRequest, try_count = 3):
-        start_time = time.time()
-        request_data = request.to_json_string()
-        response = None
-        try:
-            url = f"http://{SERVE_HOST}:40072/botservice"
-            # payload = request.to_json_string()
-            # self.call_agent.logger.info(f"请求数据:{request_data},url:{url}")
-            with requests.Session() as session:
-                message = None
-                # try:
-                session.headers.update({'Content-Type': 'application/json'})
-                while try_count > 0:
-                    once_start = time.time()
-                    try:
-                        response = session.post(url=url, json=request_data, timeout=3)
-                        # response = requests.post(url=url,  json=json.loads(request_data), headers=headers, timeout=10)  # 使用占位URL
-                        self.call_agent.logger.info("to_request come in , try_count=%s", try_count)
-                        if response.status_code == 200:
-                            response_data = response.json()
-                            if "data" in response_data and response_data["code"]==0:
-                                data = response_data["data"]
-                                message = ChatMessage.from_json(data)
-                                self.call_agent.message_queue.put(message)
-                                break
-                            else:
-                                self.call_agent.logger.info("响应中没有 'data' 字段")
-                        else:
-                            self.call_agent.logger.info(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
-                    except Exception as e:
-                        traceback.print_exc()
-                        self.call_agent.logger.error(f"TaskId={request.taskId}, 请求发生异常: {e}, URL: {url}")
-                    finally:
-                        try_count = try_count - 1
-                        latency = (time.time() - once_start)
-                        registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)
-
-                if not message:
-                    message = self.get_default_response()
-                    self.call_agent.message_queue.put(message)
-                # finally:
-                #     session.close()
-        finally:
-            latency = (time.time() - start_time)
-            registry.BOT_REQUEST_COUNT.inc()
-            registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
-            self.call_agent.logger.info(f"sessionId={ self.call_agent.session_id}, timeCost={latency}, chat::request:{request_data}, response:{response.text if response else None}")
+            self.call_agent.logger.info(f"to_request sessionId={self.call_agent.session_id}, timeCost={latency}, request:{request_data}, response:{response_data if response_data else None}")
+
+
+    # def to_quest(self, request: BotChatRequest, try_count = 3):
+    #     start_time = time.time()
+    #     request_data = request.to_json_string()
+    #     response = None
+    #     try:
+    #         url = f"http://{SERVE_HOST}:40072/botservice"
+    #         # payload = request.to_json_string()
+    #         # self.call_agent.logger.info(f"请求数据:{request_data},url:{url}")
+    #         with requests.Session() as session:
+    #             message = None
+    #             # try:
+    #             session.headers.update({'Content-Type': 'application/json'})
+    #             while try_count > 0:
+    #                 once_start = time.time()
+    #                 try:
+    #                     response = session.post(url=url, json=request_data, timeout=3)
+    #                     # response = requests.post(url=url,  json=json.loads(request_data), headers=headers, timeout=10)  # 使用占位URL
+    #                     # self.call_agent.logger.info("to_request come in , try_count=%s", try_count)
+    #                     if response.status_code == 200:
+    #                         response_data = response.json()
+    #                         if "data" in response_data and response_data["code"]==0:
+    #                             data = response_data["data"]
+    #                             message = ChatMessage.from_json(data)
+    #                             self.call_agent.message_queue.put(message)
+    #                             break
+    #                         else:
+    #                             self.call_agent.logger.info(f"to_request::sessionId:{request.sessionId}, 响应中没有 'data' 字段")
+    #                     else:
+    #                         self.call_agent.logger.info(f"to_request::请求失败,sessionId:{request.sessionId}, 状态码: {response.status_code}, 响应内容: {response.text}")
+    #                 except Exception as e:
+    #                     traceback.print_exc()
+    #                     self.call_agent.logger.error(f"to_request::TaskId={request.taskId}, sessionId={request.sessionId}, 请求发生异常: {e}, URL: {url}")
+    #                 finally:
+    #                     try_count = try_count - 1
+    #                     latency = (time.time() - once_start)
+    #                     registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)
+    #
+    #             self.call_agent.logger.info(f"to_request::sessionId:{request.sessionId}, message:{message.to_json_string() if message else None}")
+    #             if not message:
+    #                 message = self.get_default_response()
+    #                 self.call_agent.message_queue.put(message)
+    #             # finally:
+    #             #     session.close()
+    #     finally:
+    #         latency = (time.time() - start_time)
+    #         registry.BOT_REQUEST_COUNT.inc()
+    #         registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
+    #         self.call_agent.logger.info(f"to_request::sessionId={ self.call_agent.session_id}, timeCost={latency}, request:{request_data}, response:{response.text if response else None}")
 
     def get_default_response(self):
         response=   {
@@ -598,6 +608,7 @@ class BotAgent:
             media_cfg.jbMinPre = 4  # Minimum pre-fetch frames
             media_cfg.jbMaxPre = 16  # Maximum pre-fetch frames
             media_cfg.noVad = True  # Disable Voice Activity Detection if needed
+            media_cfg.clockRate = 8000
             self.ep.medConfig = media_cfg  # Apply media config to endpoint
 
             # Create SIP transport. Error handling sample is shown
@@ -623,6 +634,9 @@ class BotAgent:
                 acfg.natConfig.turnServer = f"stun:{self.host}:3478"
                 # acfg.natConfig.turnUsername = "username"
                 # acfg.natConfig.turnPassword = "password"
+                acfg.natConfig.udpKaIntervalSec = 30
+                acfg.natConfig.contactRewriteUse = 2
+                acfg.natConfig.sdpNatRewriteUse = 2
 
                 # Create the account
                 acc = Account(self, user_part=user_part)

+ 4 - 2
src/core/voip/constant.py

@@ -19,10 +19,12 @@ def build_audio_format():
     fmt = pj.MediaFormatAudio()
     fmt.type = pj.PJMEDIA_TYPE_AUDIO
     fmt.id = pj.PJMEDIA_FORMAT_PCM
-    fmt.clockRate = 16000  # 采样率
     fmt.channelCount = 1  # 通道数
-    fmt.frameTimeUsec = 20000  # 每帧的时间(20 毫秒)
     fmt.bitsPerSample = 16  # 每个采样的位数
+    fmt.clockRate = 8000  # 采样率
+    fmt.frameTimeUsec = 12500  # 每帧的时间(20 毫秒)
+    # fmt.clockRate = 16000  # 采样率
+    # fmt.frameTimeUsec = 20000  # 每帧的时间(20 毫秒)
     return fmt