Explorar el Código

Merge branch 'develop' into jms_20250106_prod

Davidliu hace 2 meses
padre
commit
dedc55cd1d

BIN
src.tar.gz


+ 1 - 1
src/core/callcenter/acd.py

@@ -104,6 +104,6 @@ class AcdService:
         for call_id in tmp_arr:
             call_info_queue.put_nowait(call_id)
 
-    def wait_timeout(self, call_id, timeouts=30):
+    def wait_timeout(self, call_id, timeouts=120):
         delay_action = DelayAction(call_id=call_id)
         self.cache.add_delay_message(DelayActionEnum.ACD_TIMEOUT_PLAY.name, delay_action, timeouts)

+ 22 - 7
src/core/callcenter/agent.py

@@ -17,7 +17,7 @@ from src.core import with_app_context
 from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \
     AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData
 from src.core.callcenter.cache import Cache
-from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID
+from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID, CENTER_AGENT_LIVE_CNT
 from src.core.callcenter.dao import *
 from src.core.callcenter.data_handler import DataHandleServer
 from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
@@ -180,6 +180,7 @@ class AgentEventService:
         saas_id = call_info.saas_id if call_info else None
         flow_id = call_info.cti_flow_id if call_info else None
         call_id = call_info.call_id if call_info else None
+        device_id = device_info.device_id if device_info else None
         agent_num = device_info.agent_key if device_info else None
         is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
         caller = (device_info.called if is_agent else device_info.caller) if device_info else None
@@ -188,7 +189,7 @@ class AgentEventService:
 
         start_time = time.time()
         try:
-            self.logger.info('bot_event_channel, event_name=%s, call_id=%s, is_agent=%s, agent_num=%s', event_name, call_id, is_agent, agent_num)
+            self.logger.info('bot_event_channel, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s', event_name, call_id, device_id, is_agent, agent_num)
             agent = self.data_handle_server.get_agent(saas_id, agent_num)
             if not agent:
                 # self.logger.warn("bot event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called,
@@ -268,12 +269,11 @@ class AgentOperService:
         self.agent_actionlog_service = AgentActionLogService(app)
         self.agent_state_service = AgentStateService(app)
 
-        # self.daemon_stopping = False
         self.agent_heartbeat_expire = 30
+        self.agent_serial_live_expire = 60*10
         self.agent_heartbeat_job_scheduler = BackgroundScheduler()
         self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
         self.agent_heartbeat_job_scheduler.start()
-        # threading.Thread(target=self.agent_heartbeat_daemon).start()
 
     def agent_heartbeat_daemon(self):
         def check_out_daemon(_name, key, value):
@@ -286,20 +286,35 @@ class AgentOperService:
             except:
                 traceback.print_exc()
 
-        # while not self.daemon_stopping:
+        def check_agent_live_daemon(_members):
+            key = CENTER_AGENT_LIVE_CNT % SAAS_ID
+            pre_val = self.redis_handler.redis.get(key)
+            if not pre_val:
+                if not _members or len(_members) == 0:
+                    value = datetime.now().timestamp()
+                    self.redis_handler.redis.set(key, value, ex=60*60, nx=True)
+            else:
+                diff = datetime.now().timestamp() - float(pre_val)
+                # self.logger.info('check_agent_live_daemon, members=%s, diff=%s, pre_val=%s', (len(_members) if _members else 0), diff, pre_val)
+                if diff > self.agent_serial_live_expire:
+                    self.logger.warn('WARING::live agent count less than 1 serial ten minutes')
+                    self.redis_handler.redis.delete(key)
+
+                if _members and len(_members) > 0:
+                    self.redis_handler.redis.delete(key)
+
         name = CENTER_AGENT_HEARTBEAT % SAAS_ID
         members = self.redis_handler.redis.hgetall(name)
+        check_agent_live_daemon(members)
         if not members:
             return
 
         registry.MANUAL_AGENT_LIVES.set(len(members))
         for k,v in members.items():
             check_out_daemon(name, k, v)
-        # time.sleep(1)
 
     def __del__(self):
         self.agent_heartbeat_job_scheduler.shutdown()
-        # self.daemon_stopping = True
 
     @with_app_context
     def enable(self, req: AgentActionRequest):

+ 32 - 0
src/core/callcenter/cache.py

@@ -4,6 +4,7 @@
 import json
 import sys
 import time
+import traceback
 import uuid
 from datetime import datetime
 
@@ -238,3 +239,34 @@ class Cache:
         self.logger.info("set_pjsua_thread_lock")
         return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)
 
+    def get_serial_no_answer_cnt(self, call_info:CallInfo):
+        try:
+            if call_info.hangup_count >= call_info.answer_count:
+                key = CALL_SERIAL_NO_ANSWER % SAAS_ID
+                self.logger.info('get_serial_no_answer_cnt:call_id=%s, call_time_length=%s', call_info.call_id, call_info.answer_count)
+                if call_info.answer_count <= 0:
+                    self.redis_handler.redis.sadd(key, call_info.call_id)
+                    return self.redis_handler.redis.scard(key)
+                else:
+                    self.redis_handler.redis.delete(key)
+        except Exception as e:
+            traceback.print_exc()
+            self.logger.info('get_serial_no_answer_cnt:exception, msg:%s',e )
+        return 0
+
+    def get_serial_speed_hangup_cnt(self, call_info:CallInfo):
+        try:
+            if call_info.hangup_count >= call_info.answer_count:
+                now = datetime.now().timestamp()
+                key = CALL_SERIAL_SPEED_HANGUP % SAAS_ID
+                call_len = now - call_info.call_time
+                self.logger.info('get_serial_speed_hangup_cnt:call_id=%s, call_time_length=%s', call_info.call_id, call_len)
+                if call_len <= CALL_SERIAL_SPEED_HANGUP_OFFSET:
+                    self.redis_handler.redis.sadd(key, call_info.call_id)
+                    return self.redis_handler.redis.scard(key)
+                else:
+                    self.redis_handler.redis.delete(key)
+        except Exception as e:
+            traceback.print_exc()
+            self.logger.info('get_serial_speed_hangup_cnt:exception, msg:%s',e)
+        return 0

+ 1 - 1
src/core/callcenter/callback.py

@@ -58,7 +58,7 @@ class Callback(object):
             call_id = self.cache.get_call_id_by_device_id(device_id)
         call_info = self.cache.get_call_info(call_id)
         if not call_info:
-            # self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
+            self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
             return
         device_info = call_info.device_info_map.get(device_id) if call_info and call_info.device_info_map else None
         # self.logger.info("callback::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)

+ 4 - 0
src/core/callcenter/constant.py

@@ -82,6 +82,7 @@ CALL_INFO = "CALL_INFO:"
 START_AGENT_NUM = "1000"
 
 DELAY_ACTION_BATCH_SIZE = 10
+CALL_SERIAL_SPEED_HANGUP_OFFSET = 5
 CENTER_AGENT_HEARTBEAT = "CENTER:AGENT:HEARTBEAT:%s"
 CTI_ENGINE_DELAY_ACTION = "DELAY:ACTION:%s"
 CTI_ENGINE_DELAY_ACTION_LOCK = "DELAY:ACTION:LOCK:%s"
@@ -91,6 +92,9 @@ CTI_MANAGE_CENTER_CALL_END_KEY = "CTI:MANAGE:CENTER:CALL:END:KEY:%s"
 CTI_AGENT_MANUAL_ANSWER = "AGENT:MANUAL:ANSWER:%s:%s"
 BOT_REGISTER_PER_HOURS = "BOT:REGISTER:PER_HOURS:%s"
 BOT_PJSUA_THREAD_LOCK = "BOT:PJSUA:THREAD:LOCK:%s"
+CALL_SERIAL_NO_ANSWER = "CALL:SERIAL:NO_ANSWER:%s"
+CALL_SERIAL_SPEED_HANGUP = "CALL:SERIAL:SPEED:HANGUP:%s"
+CENTER_AGENT_LIVE_CNT = "CENTER:AGENT:LIVE:CNT:%s"
 
 def get_json_dict(json_text=None):
     if isinstance(json_text, str):

+ 1 - 1
src/core/callcenter/esl/client.py

@@ -601,7 +601,7 @@ class OutboundClient:
 
             return None
 
-        def answer(self, con, call_id, device_id, timeouts=30):
+        def answer(self, con, call_id, device_id, timeouts=120):
             con.execute("answer", "", device_id)
             # con.execute("bgapi", f"uuid_setvar {device_id} {sipHeaderCallId} {call_id}", device_id)
             con.execute("playback", HOLD_MUSIC_PATH, device_id)

+ 47 - 28
src/core/callcenter/esl/handler/channel_hangup_handler.py

@@ -1,6 +1,6 @@
 #!/usr/bin/env python3
 # encoding:utf-8
-
+import subprocess
 import json
 import sys
 import os
@@ -96,7 +96,7 @@ class ChannelHangupHandler(EslEventHandler):
             bucket_type = call_info.bucket_type if call_info.bucket_type else "EMPTY"
             registry.CALL_HANGUP_REQUESTS.labels(f"{bucket_type}", f"{device_info.sip_status}").inc()
 
-            self.logger.info('ChannelHangupHandler, device_id=%s, hangup_reason=%s, device_type=%s, cdr_type=%s, end_time=%s, skip_hangup_all=%s, answer_count=%s, hangup_count=%s' % (device_id, hangup_reason, device_info.device_type, device_info.cdr_type, call_info.end_time, skip_hangup_all, call_info.answer_count, call_info.hangup_count))
+            self.logger.info('ChannelHangupHandler, call_id=%s, device_id=%s, hangup_reason=%s, device_type=%s, cdr_type=%s, end_time=%s, skip_hangup_all=%s, answer_count=%s, hangup_count=%s' % (call_id, device_id, hangup_reason, device_info.device_type, device_info.cdr_type, call_info.end_time, skip_hangup_all, call_info.answer_count, call_info.hangup_count))
             # 如果是转人工
             # if 'transferToAgent' == hangup_reason and DeviceType.ROBOT.code == device_info.device_type:
             #     call_info.answer_flag = AnswerFlag.TRANSFER_TO_AGENT.code
@@ -129,10 +129,12 @@ class ChannelHangupHandler(EslEventHandler):
 
             # 全部挂机以后推送挂机状态
             # self.logger.info('yushanghui::call_info.device_list %s', call_info.device_list)
-            if call_info.hangup_count == call_info.answer_count:
+            if call_info.hangup_count >= call_info.answer_count:
             # if len(call_info.device_list) == 0:
                 self.get_call_info_record(call_info)
 
+            # 连续报警判断
+            self.hook_serial_warn(call_info)
             # 判断挂机方向 && 更新缓存
             self.hangup_dir(call_info, device_info, cause)
             self.cache.add_call_info(call_info, persistent=True)
@@ -140,6 +142,15 @@ class ChannelHangupHandler(EslEventHandler):
         except:
             traceback.print_exc()
 
+    def hook_serial_warn(self, call_info:CallInfo):
+        no_answer_cnt = self.cache.get_serial_no_answer_cnt(call_info)
+        if no_answer_cnt >=5:
+            self.logger.warn('WARING::serial_no_answer_cnt greater than 5')
+
+        no_speed_hangup_cnt = self.cache.get_serial_speed_hangup_cnt(call_info)
+        if no_speed_hangup_cnt >=3:
+            self.logger.warn('WARING::get_serial_speed_hangup_cnt greater than 3')
+
     def get_call_id(self, event):
         call_id = EslEventUtil.getCallId(event)
         device_id = EslEventUtil.getDeviceId(event)
@@ -155,14 +166,19 @@ class ChannelHangupHandler(EslEventHandler):
         sip_status = []
         hangup_cause = []
         agent_name = ''
-        for value in call_info.device_info_map.values():
-            records.append(value.record) if value.record else None
-            sip_status.append(value.sip_status if value.sip_status else 'EMPTY')
-            hangup_cause.append(value.hangup_cause if value.hangup_cause else 'EMPTY')
-            if value.device_type == DeviceType.AGENT.code :
-                agent_name = value.agent_key
-        self.logger.info("get_call_info_record: %s,agent_name:%s, sip_status:%s, hangup_cause:%s", records, agent_name, sip_status, hangup_cause)
-        threading.Thread(target=self._update_record_in_thread, args=(call_info.call_id, list(dict.fromkeys(records)), ",".join(sip_status), ",".join(hangup_cause), agent_name)).start()
+        try:
+            self.logger.info("get_call_info_record: %s", call_info)
+            for value in call_info.device_info_map.values():
+                records.append(value.record) if value.record else None
+                sip_status.append(value.sip_status if value.sip_status else 'EMPTY')
+                hangup_cause.append(value.hangup_cause if value.hangup_cause else 'EMPTY')
+                if value.device_type == DeviceType.AGENT.code :
+                    agent_name = value.agent_key
+            self.logger.info("get_call_info_record: %s,agent_name:%s, sip_status:%s, hangup_cause:%s", records, agent_name, sip_status, hangup_cause)
+            threading.Thread(target=self._update_record_in_thread, args=(call_info.call_id, list(dict.fromkeys(records)), ",".join(sip_status), ",".join(hangup_cause), agent_name)).start()
+        except Exception as e:
+            self.logger.info("get_call_info_record:exception %s", e)
+            traceback.print_exc()
 
     def update_name(self,call_id, agent_name):
         try:
@@ -180,26 +196,29 @@ class ChannelHangupHandler(EslEventHandler):
                 self.dataHandleServer.update_record(call_id, time_end=datetime.now(), sip_status=sip_status, sip_hangup_cause=hangup_cause,user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
                 return
             merge_record = self.merge_audio_files(records) if len(records) > 1 else records[0]
-            # try:
-            #     self._ensure_path_permissions(merge_record)
-            #     os.chmod(merge_record, 0o755)  # 设置文件权限为 755
-            #     self.logger.info("成功设置文件权限: %s -> 755", merge_record)
-            # except Exception as chmod_error:
-            #     self.logger.error("设置文件权限失败: %s, error: %s", merge_record, str(chmod_error))
-
-            self.dataHandleServer.update_record(call_id, time_end=datetime.now(), url=merge_record, sip_status=sip_status, sip_hangup_cause=hangup_cause,user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
-            self.logger.info("更新录音记录完成: call_id=%s", call_id)
+            # 计算录音时长
+            duration = self.get_audio_duration(merge_record)
+            self.dataHandleServer.update_record(call_id, times=int(duration), time_end=datetime.now(), url=merge_record, sip_status=sip_status, sip_hangup_cause=hangup_cause,user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
+            self.logger.info("更新录音记录完成: call_id=%s, duration=%s", call_id, int(duration))
         except Exception as e:
             self.logger.error("更新录音记录失败: call_id=%s, error=%s", call_id, str(e))
 
-    def _ensure_path_permissions(self, file_path):
-        """确保文件及其父级目录的权限为 755"""
-        current_path = os.path.abspath(file_path)
-        self.logger.info("_ensure_path_permissions::%s", current_path)
-        # while current_path != "/":  # 遍历到根目录为止
-        #     if os.path.exists(current_path):
-        #         os.chmod(current_path, 0o755)  # 设置当前路径权限为 755
-        #     current_path = os.path.dirname(current_path)  # 获取父目录路径
+    def get_audio_duration(self, audio_path):
+        """使用 ffmpeg 计算音频时长"""
+        try:
+            result = subprocess.run(
+                ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", audio_path],
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+                text=True
+            )
+            info = json.loads(result.stdout)
+            duration = float(info["format"]["duration"])
+            return duration
+        except Exception as e:
+            print(f"获取音频时长失败: {e}")
+            return None
+
     def merge_audio_files(self,audio_files):
         if not audio_files:
             self.logger.info("没有可合并的音频文件")

+ 17 - 11
src/core/voip/asr.py

@@ -114,17 +114,23 @@ class TestSt:
                 on_close=self.test_on_close,
                 callback_args=[self.__id]
             )
-            self.sr.start(
-                aformat="pcm",
-                sample_rate=8000,
-                enable_intermediate_result=True,
-                enable_punctuation_prediction=True,
-                enable_inverse_text_normalization=True
-            )
-            self.sr.ctrl(ex={'max_sentence_silence': '1200ms', 'disfluency': True,'enable_words': True })
-            self.logger.debug(f"[{self.__id}]ASR session started. {count}")
-            self.__event.wait(timeout=.5)
-            self.logger.debug(f"[{self.__id}]ASR session started. {count}")
+            try:
+                self.sr.start(
+                    aformat="pcm",
+                    sample_rate=8000,
+                    enable_intermediate_result=True,
+                    enable_punctuation_prediction=True,
+                    enable_inverse_text_normalization=True,
+                    ex={'max_sentence_silence': 2000, 'disfluency': True, 'enable_words': True}
+                )
+                # _res = self.sr.ctrl(ex={'max_sentence_silence': 6000, 'disfluency': True,'enable_words': True })
+                self.logger.debug(f"[{self.__id}]ASR session started. {count}")
+                self.__event.wait(timeout=.5)
+                self.logger.debug(f"[{self.__id}]ASR session started. {count}")
+            except Exception as e:
+                traceback.print_exc()
+                self.logger.debug(f"[{self.__id}]ASR session start exception. {e}")
+
             count = count + 1
 
     def test_on_sentence_begin(self, message, *args):

+ 3 - 0
src/core/voip/bot.py

@@ -99,6 +99,9 @@ class MyAudioMediaPort(pj.AudioMediaPort):
                     # print("测试超长", user_asr_text)
                 elif asr_text:
                     self.user_asr_texts.append(asr_text)
+                    user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
+                    self.user_asr_texts.clear()
+                    self.call.chat(user_asr_text)
                 if time_difference > int(self.call.wait_time):
                     self.call.reset_wait_time()
             else: