Browse Source

merge develop, acd upgrade and warning record

Davidliu 2 months ago
parent
commit
a4a4e9981d

BIN
src.tar.gz


+ 4 - 12
src/core/callcenter/acd.py

@@ -45,15 +45,11 @@ class AcdService:
         agent_number = self.agent_service.assign(AgentActionRequest(saas_id=SAAS_ID, service_id=service_id))
         if not agent_number:
             # 如果没有空闲坐席,播放等待音
-            text = "AcdService transferToAgent agentNumber is empty serviceId:%s,caller:%s,called:%s,callId:%s"%(service_id, call_info.caller, call_info.called, call_info.call_id)
-            # print(text, flush=True)
-            self.logger.info(text)
+            self.logger.info("AcdService transferToAgent agentNumber is empty serviceId:%s,caller:%s,called:%s,callId:%s"%(service_id, call_info.caller, call_info.called, call_info.call_id))
             self.add_acd_queue(call_info, service_id)
         else:
             # 有空闲坐席,直接转接
-            text = "AcdService transferToAgent agentNumber not empty %s, serviceId:%s,caller:%s,called:%s,callId:%s"%(agent_number, service_id, call_info.caller,call_info.called, call_info.call_id)
-            # print(text, flush=True)
-            self.logger.info(text)
+            self.logger.info("AcdService transferToAgent agentNumber is %s, serviceId:%s,caller:%s,called:%s,callId:%s"%(agent_number, service_id, call_info.caller,call_info.called, call_info.call_id))
             self.call_service.transfer(call_info, agent_number, service_id)
             # self.agent_state_service.busy(call_info.saas_id, agent_number, agent_number)
         self.cache.add_call_info(call_info)
@@ -90,15 +86,11 @@ class AcdService:
                 continue
             agent_number = self.agent_service.assign(AgentActionRequest(saas_id=SAAS_ID, service_id=task_service_id))
             if not agent_number:
-                text = "AcdService tryTransferAgent agentNumber is Empty %s %s"% (call_id, json.dumps(call_info.device_list))
-                # print(text, flush=True)
-                self.logger.info(text)
+                self.logger.info("AcdService tryTransferAgent agentNumber is Empty %s %s"% (call_id, json.dumps(call_info.device_list)))
                 tmp_arr.append(call_id)
                 continue
 
-            text = "AcdService tryTransferAgent agentNumber not Empty %s, serviceId:%s, called:%s, callId:%s"%(agent_number, task_service_id, call_info.called, call_id)
-            # print(text, flush=True)
-            self.logger.info(text)
+            self.logger.info("AcdService tryTransferAgent agentNumber not Empty %s, serviceId:%s, called:%s, callId:%s"%(agent_number, task_service_id, call_info.called, call_id))
             self.call_service.transfer(call_info, agent_number, task_service_id)
 
         for call_id in tmp_arr:

+ 61 - 37
src/core/callcenter/agent.py

@@ -298,6 +298,7 @@ class AgentOperService:
                 # self.logger.info('check_agent_live_daemon, members=%s, diff=%s, pre_val=%s', (len(_members) if _members else 0), diff, pre_val)
                 if diff > self.agent_serial_live_expire:
                     self.logger.warn('WARING::live agent count less than 1 serial ten minutes')
+                    self.data_handle_server.create_warning_record(1, '10分钟空岗报警')
                     self.redis_handler.redis.delete(key)
 
                 if _members and len(_members) > 0:
@@ -819,9 +820,9 @@ class AgentStateService:
         self.app = app
         self.logger = app.logger
         self.redis_handler = RedisHandler()
-        self.assigned_recycle_millisecond = 3 * 1000
-        self.state_service_id_data_map = defaultdict(dict)
-        self.executor = ThreadPoolExecutor(max_workers=10)
+        self.assigned_recycle_millisecond = 30 * 1000
+        # self.state_service_id_data_map = defaultdict(dict)
+        # self.executor = ThreadPoolExecutor(max_workers=10)
         self.data_handle_server = DataHandleServer(app)
         self.agent_monitor_service = AgentMonitorService(app)
         self.agent_actionlog_service = AgentActionLogService(app)
@@ -889,10 +890,29 @@ class AgentStateService:
                 return choose_phone_num
             choose_phone_num = self._choose_max_idle_time(idle_agents)
             self.handle_assign_time(saas_id, service_id, choose_phone_num)
+            self.handle_lock_agent(choose_phone_num, saas_id, service_id)
         finally:
             lock.release()
         return choose_phone_num
 
+    def handle_check_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'):
+        key = self._lock_key(saas_id, service_id, choose_phone_num)
+        res = self.redis_handler.redis.get(key)
+        self.logger.info('checkAgent %s %s %s %s'% (saas_id, service_id, choose_phone_num, res))
+        return False if res else True
+
+    def handle_lock_agent(self, choose_phone_num, saas_id, service_id='00000000000000000'):
+        key = self._lock_key(saas_id, service_id, choose_phone_num)
+        expire = self._get_expire_time()
+        self.redis_handler.redis.set(key, 1, nx=True, ex=expire)
+        res = self.redis_handler.redis.get(key)
+        self.logger.info('lockAgent %s %s %s %s %s'% (saas_id, service_id, choose_phone_num, expire, res))
+
+    def handle_release_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'):
+        key = self._lock_key(saas_id, service_id, choose_phone_num)
+        self.redis_handler.redis.delete(key)
+        self.logger.info('releaseAgent %s %s %s'% (saas_id, service_id, choose_phone_num))
+
     def handle_assign_time(self, saas_id, service_id, choose_phone_num):
         key = self._key(saas_id, service_id)
         cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
@@ -963,13 +983,14 @@ class AgentStateService:
         return free_agents
 
     def get_idle_agents(self,cache_agent_list):
-        current_time =int(datetime.now().timestamp() * 1000)  # 获取当前时间的毫秒级时间戳
+        # current_time =int(datetime.now().timestamp() * 1000)  # 获取当前时间的毫秒级时间戳
         idle_agents = [
             agent for agent in cache_agent_list
-            if agent.status == 1 and (
-                    agent.assign_time == 0 or
-                    agent.assign_time + self.assigned_recycle_millisecond < current_time
-            )
+            if agent.status == 1 and self.handle_check_agent_lock(agent.phone_num, SAAS_ID)
+               #and (
+               #     agent.assign_time == 0 or
+               #     agent.assign_time + self.assigned_recycle_millisecond < current_time
+            #)
         ]
         return idle_agents
 
@@ -990,34 +1011,34 @@ class AgentStateService:
             busy_agents_size = len(busy_agents)  # 获取忙碌代理的数量
         return busy_agents_size
 
-    def update_report_state(self, saas_id, service_id):
-        key = self._key(saas_id, service_id)
-        # data_map 这个地方有疑问
-        data_map = self.state_service_id_data_map[key]
-        idle = HumanState.IDLE
-        if idle.value not in data_map:
-            data_map[idle.code] = threading.Lock()
-            self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
-            # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
-        busy = HumanState.BUSY
-        if busy.value not in data_map:
-            data_map[busy.code] = threading.Lock()
-            self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
-            # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
-
-    def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
-        name = "cti_center_real_time_human_service_state"
-        tag_list = {
-            "vcc_id": saas_id,
-            "service_id": service_id,
-            "state": human_state.code,
-        }
-        if human_state == HumanState.IDLE:
-            # meter_registry 这块疑问
-            self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
-        elif human_state == HumanState.BUSY:
-            self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
-        return 0
+    # def update_report_state(self, saas_id, service_id):
+    #     key = self._key(saas_id, service_id)
+    #     # data_map 这个地方有疑问
+    #     data_map = self.state_service_id_data_map[key]
+    #     idle = HumanState.IDLE
+    #     if idle.value not in data_map:
+    #         data_map[idle.code] = threading.Lock()
+    #         self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
+    #         # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
+    #     busy = HumanState.BUSY
+    #     if busy.value not in data_map:
+    #         data_map[busy.code] = threading.Lock()
+    #         self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
+    #         # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
+    #
+    # def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
+    #     name = "cti_center_real_time_human_service_state"
+    #     tag_list = {
+    #         "vcc_id": saas_id,
+    #         "service_id": service_id,
+    #         "state": human_state.code,
+    #     }
+    #     if human_state == HumanState.IDLE:
+    #         # meter_registry 这块疑问
+    #         self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
+    #     elif human_state == HumanState.BUSY:
+    #         self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
+    #     return 0
 
     def _check_in_key(self, saas_id):
         return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
@@ -1025,10 +1046,13 @@ class AgentStateService:
     def _key(self, saas_id, service_id):
         return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
 
+    def _lock_key(self, saas_id, service_id, choose_phone_num):
+        return "CTI:%s:HUMAN:%s:%s:LOCK"%(saas_id.upper(), service_id, choose_phone_num)
+
     def _get_expire_time(self):
         now = datetime.now()
         end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
-        expire_time = (end_of_day - now).total_seconds() * 1000  # Convert to milliseconds
+        expire_time = (end_of_day - now).total_seconds()  # Convert to milliseconds
         return int(expire_time)
 
     def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:

+ 36 - 0
src/core/callcenter/dao.py

@@ -454,6 +454,8 @@ class CallRecord(db.Model):
     status = db.Column(db.SmallInteger, nullable=True, comment='通话状态(0未接听 1已接通)')
     sip_status = db.Column(db.String(32), nullable=True, comment='sip码')
     sip_hangup_cause = db.Column(db.String(32), nullable=True, comment='sip挂机描述')
+    hangup_dir = db.Column(db.SmallInteger, nullable=True, comment='挂机方向:1 主叫 2 被叫 3 机器人')
+    hangup_reason = db.Column(db.String(255), nullable=True, comment='挂机原因')
     phone = db.Column(db.String(20), nullable=True, comment='电话号码')
     bussiness_type = db.Column(db.String(50), nullable=True, comment='业务类型(创个返回字符串)')
     url = db.Column(db.String(255), nullable=True, comment='录音的地址')
@@ -528,4 +530,38 @@ class BotRecords(db.Model):
             'intent': self.intent,
             'contents': self.contents,
             'dialog': self.dialog,
+        }
+
+class WarningRecord(db.Model):
+    __tablename__ = 't_warning_record'
+    __table_args__ = {'comment': '告警记录表'}
+
+    id = db.Column(db.Integer, primary_key=True, autoincrement=True, comment='主键')
+    type = db.Column(db.SmallInteger, nullable=True, comment='类型(0:连续5个未接  1:10分钟空岗报警  2:连续3个5s内挂断)')
+    desc = db.Column(db.String(500), nullable=True, comment='报警描述')
+    read = db.Column(db.SmallInteger, nullable=False, default=0, comment='是否已读(0否 1是)')
+    del_flag = db.Column(db.SmallInteger, nullable=False, default=0, comment='删除标志(0代表存在 2代表删除)')
+    revision = db.Column(db.Integer, nullable=True, comment='乐观锁')
+    create_by = db.Column(db.String(100), nullable=True, comment='创建人')
+    create_time = db.Column(db.DateTime, nullable=True, comment='创建时间')
+    update_by = db.Column(db.String(100), nullable=True, comment='更新人')
+    update_time = db.Column(db.DateTime, nullable=True, comment='更新时间')
+    remark = db.Column(db.String(500), nullable=True, comment='备注')
+
+    def __repr__(self):
+        return json.dumps(self.to_dict())
+
+    def to_dict(self):
+        return {
+            'id': self.id,
+            'type': self.type,
+            'desc': self.desc,
+            'read': self.read,
+            'del_flag': self.del_flag,
+            'revision': self.revision,
+            'create_by': self.create_by,
+            'create_time': self.create_time.isoformat() if self.create_time else None,
+            'update_by': self.update_by,
+            'update_time': self.update_time.isoformat() if self.update_time else None,
+            'remark': self.remark,
         }

+ 12 - 1
src/core/callcenter/data_handler.py

@@ -35,6 +35,17 @@ class DataHandleServer:
             db.session.rollback()
             raise ValueError(f"创建记录失败: {e}")
 
+    @with_app_context
+    def create_warning_record(self, warning_type, warning_desc):
+        try:
+            warn_record = WarningRecord(type=warning_type, desc=warning_desc, create_time=datetime.now())
+            db.session.add(warn_record)
+            db.session.commit()
+            print("报警记录创建成功")
+        except Exception as e:
+            db.session.rollback()
+            raise ValueError(f"报警创建记录失败: {e}")
+
     @with_app_context
     def update_record(self, session_id, **kwargs):
         self.app.logger.info(f"update_record::session_id:{session_id}")
@@ -47,7 +58,7 @@ class DataHandleServer:
         user_name = kwargs.get('user_name')
         if time_end and call_record.type==1:
             bot_record = BotRecords.query.filter(BotRecords.session == session_id).first()
-            call_record.bussiness_type = bot_record.intent if bot_record else ''
+            call_record.bussiness_type = bot_record.intent if bot_record else '未知'
         # 如果记录是转人工并且有客服接通把客服更新到转接字段
         self.app.logger.debug(f"Received kwargs: {kwargs} user_id:{user_id},user_name:{user_name}, call_record:{call_record}")
         #如果记录已经有user_id不再更新 删除参数里面的user_id

+ 1 - 0
src/core/callcenter/enumeration.py

@@ -418,6 +418,7 @@ class HangupDir(Enum):
     HOST_HANGUP = (1, "主叫挂断")
     CUSTOMER_HANGUP = (2, "被叫挂断")
     PLATFORM_HANGUP = (3, "平台挂机")
+    ROBOT_HANGUP = (4, "机器人挂机")
 
     def __init__(self, code=None, description=None):
         self.code = code

+ 2 - 0
src/core/callcenter/esl/client.py

@@ -287,6 +287,8 @@ class InboundClient:
                 self.send_args(device_id, SET, arg, con=_con)
                 e = _con.sendMSG(msg)
                 # e = _con.api(UUID_KILL, device_id)
+                # 更新通话记录
+                self.dataHandleServer.update_record(call_id, hangup_dir=HangupDir.PLATFORM_HANGUP.code)
                 self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e))
                 # self.logger.info("hangup_call挂机 hangup call: %s, device: %s, arg: %s, res:%s" % (call_id, device_id, arg, e.serialize('json')))
         except:

+ 29 - 12
src/core/callcenter/esl/handler/channel_hangup_handler.py

@@ -8,7 +8,9 @@ import traceback
 from pydub import AudioSegment
 import threading
 from src.core.callcenter.acd import AcdService
+from src.core.callcenter.agent import AgentStateService
 from src.core.callcenter.call import CallService
+from src.core.callcenter.constant import SAAS_ID
 from src.core.callcenter.enumeration import CallType, DeviceType, AnswerFlag, NextType, CdrType, HangupDir, \
     CallCause,AgentServiceState,AgentScene, WorkStatus
 from src.core.callcenter.esl.annotation import EslEventName
@@ -29,6 +31,8 @@ class ChannelHangupHandler(EslEventHandler):
         self.call_service = CallService(inbound_client,inbound_client.app)
         self.push_handler = PushHandler(inbound_client.logger)
         self.dataHandleServer=DataHandleServer(inbound_client.app)
+        self.agent_state_service = AgentStateService(inbound_client.app)
+
 
     def handle(self, address, event, coreUUID):
         # self.logger.info(json.loads(event.serialize('json')))
@@ -129,15 +133,18 @@ class ChannelHangupHandler(EslEventHandler):
 
             # 全部挂机以后推送挂机状态
             # self.logger.info('yushanghui::call_info.device_list %s', call_info.device_list)
+            # 判断挂机方向 && 更新缓存
+            self.hangup_dir(call_info, device_info, cause)
             if call_info.hangup_count >= call_info.answer_count:
-            # if len(call_info.device_list) == 0:
+                # if len(call_info.device_list) == 0:
                 self.get_call_info_record(call_info)
 
             # 连续报警判断
             self.hook_serial_warn(call_info)
-            # 判断挂机方向 && 更新缓存
-            self.hangup_dir(call_info, device_info, cause)
             self.cache.add_call_info(call_info, persistent=True)
+            if device_info.device_type != DeviceType.ROBOT.code:
+                # 释放坐席接听锁
+                self.agent_state_service.handle_release_agent_lock(call_info.agent_key, SAAS_ID)
 
         except:
             traceback.print_exc()
@@ -146,10 +153,12 @@ class ChannelHangupHandler(EslEventHandler):
         no_answer_cnt = self.cache.get_serial_no_answer_cnt(call_info)
         if no_answer_cnt >=5:
             self.logger.warn('WARING::serial_no_answer_cnt greater than 5')
+            self.dataHandleServer.create_warning_record(0, '连续5个未接')
 
         no_speed_hangup_cnt = self.cache.get_serial_speed_hangup_cnt(call_info)
         if no_speed_hangup_cnt >=3:
             self.logger.warn('WARING::get_serial_speed_hangup_cnt greater than 3')
+            self.dataHandleServer.create_warning_record(2, '连续3个5s内挂断')
 
     def get_call_id(self, event):
         call_id = EslEventUtil.getCallId(event)
@@ -175,7 +184,7 @@ class ChannelHangupHandler(EslEventHandler):
                 if value.device_type == DeviceType.AGENT.code :
                     agent_name = value.agent_key
             self.logger.info("get_call_info_record: %s,agent_name:%s, sip_status:%s, hangup_cause:%s", records, agent_name, sip_status, hangup_cause)
-            threading.Thread(target=self._update_record_in_thread, args=(call_info.call_id, list(dict.fromkeys(records)), ",".join(sip_status), ",".join(hangup_cause), agent_name)).start()
+            threading.Thread(target=self._update_record_in_thread, args=(call_info, list(dict.fromkeys(records)), ",".join(sip_status), ",".join(hangup_cause), agent_name)).start()
         except Exception as e:
             self.logger.info("get_call_info_record:exception %s", e)
             traceback.print_exc()
@@ -186,20 +195,22 @@ class ChannelHangupHandler(EslEventHandler):
             return agent
         except Exception as e:
             self.logger.error("update_name error: %s", str(e))
-    def _update_record_in_thread(self, call_id, records, sip_status, hangup_cause, agent_name):
+    def _update_record_in_thread(self, call_info, records, sip_status, hangup_cause, agent_name):
         """用于在独立线程中执行 update_record"""
         try:
+            call_id = call_info.call_id
+            hangup_dir = call_info.hangup_dir
             agent = self.update_name(call_id, agent_name)
             # status = 0 if answer_count <= 0 else 1
             if len(records) == 0:
-                self.logger.warning("没有录音文件,直接更新记录: call_id=%s, sip_status=%s, hangup_cause=%s, agent_name=%s", call_id, sip_status, hangup_cause,agent_name)
-                self.dataHandleServer.update_record(call_id, time_end=datetime.now(), sip_status=sip_status, sip_hangup_cause=hangup_cause,user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
+                self.logger.warning("没有录音文件,直接更新记录: call_id=%s, sip_status=%s, hangup_cause=%s, agent_name=%s, hangup_dir=%s", call_id, sip_status, hangup_cause,agent_name, hangup_dir)
+                self.dataHandleServer.update_record(call_id, time_end=datetime.now(), sip_status=sip_status, sip_hangup_cause=hangup_cause, hangup_dir=hangup_dir, user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
                 return
             merge_record = self.merge_audio_files(records) if len(records) > 1 else records[0]
             # 计算录音时长
-            duration = self.get_audio_duration(merge_record)
-            self.dataHandleServer.update_record(call_id, times=int(duration), time_end=datetime.now(), url=merge_record, sip_status=sip_status, sip_hangup_cause=hangup_cause,user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
-            self.logger.info("更新录音记录完成: call_id=%s, duration=%s", call_id, int(duration))
+            duration = self.get_audio_duration(merge_record) or 0
+            self.dataHandleServer.update_record(call_id, times=int(duration), time_end=datetime.now(), url=merge_record, sip_status=sip_status, sip_hangup_cause=hangup_cause, hangup_dir=hangup_dir, user_id=agent.user_id if agent else None, user_name=agent.agent_name if agent else None)
+            self.logger.info("更新录音记录完成: call_id=%s, duration=%s, hangup_dir=%s", call_id, int(duration), hangup_dir)
         except Exception as e:
             self.logger.error("更新录音记录失败: call_id=%s, error=%s", call_id, str(e))
 
@@ -217,13 +228,17 @@ class ChannelHangupHandler(EslEventHandler):
             return duration
         except Exception as e:
             print(f"获取音频时长失败: {e}")
-            return None
+            return 0
 
     def merge_audio_files(self,audio_files):
         if not audio_files:
             self.logger.info("没有可合并的音频文件")
             return
-            # 初始化第一个音频文件
+        if not os.path.exists(audio_files[0]):
+            self.logger.info("没有可合并的音频文件")
+            return
+
+        # 初始化第一个音频文件
         combined = AudioSegment.from_file(audio_files[0])
         # 循环添加其余的音频文件
         for audio_file in audio_files[1:]:
@@ -269,6 +284,8 @@ class ChannelHangupHandler(EslEventHandler):
             call_info.hangup_dir = HangupDir.HOST_HANGUP.code
         elif DeviceType.CUSTOMER.code == device_info.device_type:
             call_info.hangup_dir = HangupDir.CUSTOMER_HANGUP.code
+        # elif DeviceType.ROBOT.code == device_info.device_type:
+        #     call_info.hangup_dir = HangupDir.ROBOT_HANGUP.code
 
         # if not call_info.end_time:
         #     call_info.end_time = device_info.end_time

+ 3 - 0
src/core/voip/bot.py

@@ -18,6 +18,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
 
 from src.core.callcenter import registry
 from src.core.callcenter.cache import Cache
+from src.core.callcenter.enumeration import HangupDir
 from src.core.datasource import SIP_SERVER, SERVE_HOST
 from src.core.voip.constant import *
 
@@ -379,6 +380,8 @@ class MyCall(pj.Call):
         if action_code == 'hang':  # 挂断
             self.agent.hangup(self.user_part)
             self.end_statistics()
+            # 更新通话记录
+            self.agent.dataHandleServer.update_record(self.session_id, hangup_dir=HangupDir.ROBOT_HANGUP.code)
         elif action_code == 'transfer':  # 转人工
             self.agent.transfer(user_part=self.user_part, call_id=self.session_id, device_id=self.device_id)
             self.end_statistics()