#!/usr/bin/env python3 # encoding:utf-8 import threading import time import traceback from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from typing import List from apscheduler.schedulers.background import BackgroundScheduler from sqlalchemy import or_ from src.core.callcenter import registry import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil from src.core import with_app_context from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \ AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData from src.core.callcenter.cache import Cache from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID, CENTER_AGENT_LIVE_CNT from src.core.callcenter.dao import * from src.core.callcenter.data_handler import DataHandleServer from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \ AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect, HangupDir from src.core.callcenter.esl.constant.event_names import * from src.core.callcenter.exception import BizException from src.core.callcenter.push import PushHandler from src.core.datasource import RedisHandler class AgentEventService: def __init__(self, app): self.app = app self.logger = app.logger self.cache = Cache(app) self.push_handler = PushHandler(app.logger) self.data_handle_server = DataHandleServer(app) self.agent_monitor_service = AgentMonitorService(app) self.agent_state_service = AgentStateService(app) self.agent_actionlog_service = AgentActionLogService(app) def delay_state(self, state_data: AgentDelayStateData): agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num) if not agent: return agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num) if not agent_monitor: return #TODO 非最新通话的延迟事件,忽略. agent_scene = AgentScene.get_by_code(state_data.scene) self.logger.info("agent event delay state %s %s %s %s", state_data.saas_id, state_data.agent_num, state_data.service_state, agent_monitor.service_state) if AgentServiceState.REPROCESSING.code == state_data.service_state and AgentServiceState.REPROCESSING.code == agent_monitor.service_state: self.reprocessing_idle(state_data) if AgentServiceState.DIALING.code == state_data.service_state and AgentServiceState.DIALING.code == agent_monitor.service_state: if self.cache.get_call_is_answer(state_data.saas_id, state_data.agent_num): return self.agent_monitor_service.update_idle(agent_monitor) self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0') self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING) self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.BIZ_DIALING_IDLE) if AgentServiceState.HANGING.code == state_data.service_state \ and (AgentServiceState.DIALING.code == agent_monitor.service_state or AgentServiceState.CALLING.code == agent_monitor.service_state): self.agent_monitor_service.update_idle(agent_monitor) if AgentServiceState.DIALING.code == agent_monitor.service_state: self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0') self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING) self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.MANUAL_HANG_UP) def agent_event_channel(self, event, call_info: CallInfo, device_info: DeviceInfo): event_name = EslEventUtil.getEventName(event) saas_id = call_info.saas_id if call_info else None flow_id = call_info.cti_flow_id if call_info else None call_id = call_info.call_id if call_info else None device_id = device_info.device_id if device_info else None agent_num = device_info.agent_key if device_info else None caller = device_info.caller if device_info else None called = device_info.called if device_info else None is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False start_time = time.time() try: self.logger.info('agent_event_channel, event_name=%s, call_id=%s, agent_num=%s, device_id=%s, is_agent=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, agent_num, device_id, is_agent, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count) agent = self.data_handle_server.get_agent(saas_id, agent_num) if not agent: # self.logger.warn("event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json'))) return agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num) if not agent_monitor: # self.logger.warn("event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json'))) return # 信道发起事件,触发完成发起(或桥)&& 坐席侧 if CHANNEL_ORIGINATE == event_name and is_agent: # if call_info.hangup_dir and call_info.hangup_dir == HangupDir.CUSTOMER_HANGUP.code: # self.logger.info('agent_event_channel:already:hangup, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count) # return self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_RINGING,phone=call_info.caller) # 进度事件,外呼时对方提醒。或者入呼时提醒 && 坐席侧 if CHANNEL_PROGRESS == event_name and is_agent: self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.DIALING) self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING) # 媒体进度事件,外呼时对方提醒。或者入呼时提醒 && 用户侧 if CHANNEL_PROGRESS_MEDIA == event_name and not is_agent: self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING_RINGING) #应答 if CHANNEL_ANSWER == event_name: if call_id: if self.cache.get_call_is_end(call_id): # self.logger.warn("event service channel call is end {} {} {} {} {} {}", saas_id, event_name, caller, called, call_id, json.dumps(event.serialize('json'))) return self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num) self.logger.info('agent接通:%s',agent ) self.data_handle_server.update_record(call_id, status=1, user_id=agent.user_id,user_name=agent.agent_name) if is_agent: # 坐席接起 self.cache.set_call_is_answer(saas_id, flow_id) self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.ANSWER_COMPENSATE) self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON) else: # 用户侧接起 self.agent_monitor_service.update_calling(agent_monitor) self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING) self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING) #挂断 if CHANNEL_HANGUP == event_name: # 坐席侧挂断 if is_agent: if call_id: self.cache.set_call_is_end(call_id) self.agent_monitor_service.update_processing(agent_monitor) self.logger.info('挂断更新:%s', agent) # self.data_handle_server.update_record(call_id, user_id=agent.user_id, user_name=agent.agent_name) self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, call_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL)) self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, '0') self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_HANG_REPROCESSING) self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.REPROCESSING) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING, AgentLogState.CHANNEL_HANG_UP) # self.data_handle_server.update_record(call_id, time_end=datetime.now()) # 同步处理后处理置闲 # reprocessingIdle(statusDto); # agentProducer.pushDelayedStatus(statusDto, reprocessingTimeout); if (CHANNEL_BRIDGE == event_name or PLAYBACK_START == event_name) and is_agent: self.push_handler.push_on_ring_start(saas_id, flow_id, agent_num, AgentScene.MANUAL, call_id) # self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING) # self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING) if DETECTED_TONE == event_name and not is_agent: self.push_handler.push_on_detected_tone(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id) if (CHANNEL_UNBRIDGE == event_name or PLAYBACK_STOP == event_name) and is_agent: self.push_handler.push_on_ring_end(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id) except: traceback.print_exc() finally: latency = (time.time() - start_time) registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency) def bot_event_channel(self, event, call_info, device_info): event_name = EslEventUtil.getEventName(event) saas_id = call_info.saas_id if call_info else None flow_id = call_info.cti_flow_id if call_info else None call_id = call_info.call_id if call_info else None device_id = device_info.device_id if device_info else None agent_num = device_info.agent_key if device_info else None is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False caller = (device_info.called if is_agent else device_info.caller) if device_info else None called = (device_info.caller if is_agent else device_info.called) if device_info else None human_service_id = '00000000000000000' start_time = time.time() try: self.logger.info('bot_event_channel, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count) agent = self.data_handle_server.get_agent(saas_id, agent_num) if not agent: self.logger.warn("bot event service channel agent is null %s %s %s %s %s %s", saas_id, event_name, call_id, caller, called, json.dumps(event.serialize('json'))) return agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num) if not agent_monitor: self.logger.warn("bot event service channel agentMonitor is null %s %s %s %s %s %s", saas_id, event_name, call_id, caller, called, json.dumps(event.serialize('json'))) return # 信道发起事件,触发完成发起(或桥)&& 坐席侧 if CHANNEL_ORIGINATE == event_name and is_agent: self.data_handle_server.update_record(call_id, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name) # 转接给客服以后更新转接人 # if call_info.hangup_dir and call_info.hangup_dir == HangupDir.CUSTOMER_HANGUP.code: # self.logger.info('bot_event_channel:already:hangup, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count) # return self.push_handler.push_on_call_ring(saas_id, flow_id, agent_num, AgentScene.ROBOT, call_id, ServiceDirect.ROBOT_CALL.service_direct, called, caller, human_service_id) self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_RINGING,phone=call_info.caller) if CHANNEL_ANSWER == event_name: self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num) # self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name) if is_agent: self.agent_monitor_service.update_calling(agent_monitor) self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! internal") self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.CALLING) self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON, service_id=human_service_id) else: self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! external") if CHANNEL_HANGUP == event_name and is_agent: self.agent_monitor_service.update_processing(agent_monitor) self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, call_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT)) self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, "0") self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_HANG_REPROCESSING) self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.REPROCESSING) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING, AgentLogState.CHANNEL_HANG_UP, service_id=human_service_id) # self.data_handle_server.update_record(call_id, time_end=datetime.now()) except: traceback.print_exc() finally: latency = (time.time() - start_time) registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency) def reprocessing_idle(self, state_data: AgentDelayStateData): agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num) if not agent: return agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num) if not agent_monitor: return self.logger.info('reprocessing_idle %s %s %s', state_data.saas_id, state_data.agent_num, agent_monitor) self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num) self.logger.info('reprocessing_idle_end') self.agent_monitor_service.update_idle(agent_monitor) self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, state_data.call_id, state_data.scene, WorkStatus.AGENT_HANG_IDLE) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.REPROCESSING_IDLE) class AgentOperService: def __init__(self, app): self.app = app self.logger = app.logger self.redis_handler = RedisHandler() self.push_handler = PushHandler(app.logger) self.data_handle_server = DataHandleServer(app) self.agent_monitor_service = AgentMonitorService(app) self.agent_actionlog_service = AgentActionLogService(app) self.agent_state_service = AgentStateService(app) self.agent_heartbeat_expire = 30 self.agent_serial_live_expire = 60*10 self.agent_heartbeat_job_scheduler = BackgroundScheduler() self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon') self.agent_heartbeat_job_scheduler.start() def agent_heartbeat_daemon(self): def check_out_daemon(_name, key, value): try: sec = datetime.now().timestamp() - float(value) if sec > self.agent_heartbeat_expire: self.redis_handler.redis.hdel(_name, key) self.logger.error("agent heartbeat expired, will checkout %s %s", key, value) # self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key)) except: traceback.print_exc() def check_agent_live_daemon(_members): key = CENTER_AGENT_LIVE_CNT % SAAS_ID pre_val = self.redis_handler.redis.get(key) if not pre_val: if not _members or len(_members) == 0: value = datetime.now().timestamp() self.redis_handler.redis.set(key, value, ex=60*60, nx=True) else: diff = datetime.now().timestamp() - float(pre_val) # self.logger.info('check_agent_live_daemon, members=%s, diff=%s, pre_val=%s', (len(_members) if _members else 0), diff, pre_val) if diff > self.agent_serial_live_expire: self.logger.warn('WARING::live agent count less than 1 serial ten minutes') self.data_handle_server.create_warning_record(1, '10分钟空岗报警') self.redis_handler.redis.delete(key) if _members and len(_members) > 0: self.redis_handler.redis.delete(key) name = CENTER_AGENT_HEARTBEAT % SAAS_ID members = self.redis_handler.redis.hgetall(name) check_agent_live_daemon(members) if not members: return registry.MANUAL_AGENT_LIVES.set(len(members)) for k,v in members.items(): check_out_daemon(name, k, v) def __del__(self): self.agent_heartbeat_job_scheduler.shutdown() @with_app_context def enable(self, req: AgentActionRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id) if agent.agent_state == AgentState.ENABLE.code: return agent.agent_state = AgentState.ENABLE.code db.session.commit() @with_app_context def disable(self, req: AgentActionRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id) if agent.agent_state == AgentState.DISABLE.code: return agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_number) if agent_monitor.check_state == AgentCheck.IN.code and \ agent_monitor.service_state == AgentServiceState.CALLING.code: raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE) agent.phone_num = '' agent.agent_state = AgentState.DISABLE.code db.session.commit() phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num) phone.is_delete = 1 db.session.commit() @with_app_context def checkin(self, req: AgentActionRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id) if not agent or agent.agent_state == AgentState.DISABLE.code: raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN) phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num) agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num) # agent_monitor.check_scene = req.scene self.agent_monitor_service.update_checkin(agent_monitor,req.scene) self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN) self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num) print("checkin", agent_monitor,agent_monitor.check_state) if req.scene == AgentScene.MANUAL.code: # 如果是手动外呼增加置忙 self._handle_idle(req.scene, agent) return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene) @with_app_context def checkout(self, req: AgentActionRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id) if not agent or agent.agent_state == AgentState.DISABLE.code: raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE) agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num) if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code: raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE) if agent_monitor.check_state == AgentCheck.OUT.code: return self._push_event_for_checkout(agent, req.scene) self.agent_monitor_service.update_checkout(agent_monitor) self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT) self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num) return self._push_event_for_checkout(agent, req.scene) @with_app_context def busy(self, req: AgentActionRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id) if not agent or agent.agent_state == AgentState.DISABLE.code: raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE) agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num) if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code: raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE) if agent_monitor.service_state == AgentServiceState.CALLING.code: raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE) self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num) if agent_monitor.service_state == AgentServiceState.BUSY.code: self._push_event_for_busy(agent, req.scene) return self.agent_monitor_service.update_busy(agent_monitor) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY) self._push_event_for_busy(agent, req.scene) @with_app_context def idle(self, req: AgentActionRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id) if not agent or agent.agent_state == AgentState.DISABLE.code: raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE) self._handle_idle(req.scene, agent) @with_app_context def assign(self, req: AgentActionRequest): return self.agent_state_service.assign_agent(req.saas_id, req.service_id) def idle_agent_exist(self, request: AgentActionRequest): pass @with_app_context def agent_state(self,req: AgentActionRequest): # agent = _get_agent(req.saas_id, req.agent_id) agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, req.agent_id) return agent_monitor.service_state @with_app_context def turn_on(self, req: AgentActionRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id) agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num) agent_scene = AgentScene.get_by_code(req.scene) if not agent_monitor: raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR) if agent_monitor.service_state == AgentServiceState.CALLING.code: self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY) return self.agent_monitor_service.update_calling(agent_monitor) self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num) self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY) @with_app_context def _handle_idle(self, scene, agent): agent_monitor = self.data_handle_server.get_agent_monitor(agent.saas_id, agent.agent_num) print('agent_monitor:', agent_monitor, agent) if agent_monitor.check_state == AgentCheck.OUT.code: raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE) if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code: raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG) if scene == AgentScene.ROBOT.code: self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num) else: self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num) if agent_monitor.service_state == AgentServiceState.IDLE.code: self._push_event_for_idle(agent, scene) return self.agent_monitor_service.update_idle(agent_monitor) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE) self._push_event_for_idle(agent, scene) def _push_event_for_checkin(self, agent, agent_monitor, phone, scene): """坐席签入事件推送""" agent_scene = AgentScene.get_by_code(scene) self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS) event_data = AgentEventData(agent.saas_id, agent.out_id) event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code, 'ext': {'saasId': agent.saas_id, 'agentId': agent.out_id, 'phoneNum': phone.phone_num, 'phonePwd': phone.phone_pwd, 'sipServer': phone.sip_server } } return event_data.__dict__ def _push_event_for_checkout(self,agent,scene): """签出事件推送""" agent_scene = AgentScene.get_by_code(scene) self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功') event_data = AgentEventData(agent.saas_id, agent.out_id) event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code, 'ext': {'saasId': agent.saas_id, 'agentId': agent.out_id, } } return event_data.__dict__ def _push_event_for_busy(self,agent,scene): """置忙事件推送""" agent_scene = AgentScene.get_by_code(scene) self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY) pass def _push_event_for_idle(self, agent, scene): """坐席置闲事件推送""" agent_scene = AgentScene.get_by_code(scene) self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY) class AgentService: def __init__(self, app): self.app = app self.logger = app.logger self.data_handle_server = DataHandleServer(app) self.agent_monitor_service = AgentMonitorService(app) @with_app_context def get_and_check(self, req: AgentActionRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id) if not agent: return {} phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num) return phone.to_dict() @with_app_context def watch_agent_state(self, req: HumanServiceQueryRequest): pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id, HumanServiceMap.service_id == req.serviceId).all() agent_ids = [x.agent_id for x in pos] monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids) return monitors @with_app_context def add(self, req: AgentRequest): new_agent_num = self.data_handle_server.get_newest_agent_number(req.saas_id) agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id, agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number, distribute=req.distribute, agent_state=req.agent_state) db.session.add(agent) db.session.commit() agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id) db.session.add(agent_monitor) db.session.commit() return new_agent_num @with_app_context def update(self, req: AgentRequest): agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id) if not agent: return phone_num = agent.phone_num state_change = agent.agent_state != req.agent_state disable = req.agent_state == AgentState.DISABLE agent.agent_name = req.agent_name agent.agent_pwd = req.agent_password agent.agent_type = req.agent_type agent.phone_num = req.phone_number if not disable else '' agent.distribute = req.distribute agent.agent_state = req.agent_state db.session.commit() if state_change and disable: phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first() if phone: db.session.delete(phone) @with_app_context def detail(self, saas_id, agent_number): agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='') return agent @with_app_context def count(self, req: AgentQueryRequest): cnt = Agent.query.filter(Agent.saas_id == req.saas_id, or_(Agent.agent_num == req.agent_number, Agent.agent_name.contains(req.agent_name), Agent.out_id == req.out_id, Agent.agent_type == req.agent_type ) ).count() return cnt @with_app_context def query_page(self, req: AgentQueryRequest): pagination = Agent.query.filter(Agent.saas_id == req.saas_id, or_(Agent.agent_num == req.agent_number, Agent.agent_name.contains(req.agent_name), Agent.out_id == req.out_id, Agent.agent_type == req.agent_type ) ).paginate(req.page, req.size) # data = { # "page": pagination.page, # 当前页码 # "pages": pagination.pages, # 总页码 # "has_prev": pagination.has_prev, # 是否有上一页 # "prev_num": pagination.prev_num, # 上一页页码 # "has_next": pagination.has_next, # 是否有下一页 # "next_num": pagination.next_num, # 下一页页码 # "items": [{ # "id": item.id, # "name": item.name, # "age": item.age, # "sex": item.sex, # "money": item.money, # } for item in pagination.items] # } return pagination @with_app_context def delete(self, saas_id, agent_number): agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='') if not agent: return agent.is_delete = 1 agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number) agent_monitor.is_delete = 1 db.session.commit() phone = self.data_handle_server.get_phone(saas_id, agent.phone_num) phone.is_delete = 1 db.session.commit() class AgentMonitorService: def __init__(self, app): self.app = app self.logger = app.logger self.data_handle_server = DataHandleServer(app) @with_app_context def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None): if not out_ids: return [] agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all() if not agents: raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR) agent_num_map = {x.agent_num: x for x in agents} agent_nums = [x.agent_num for x in agents] agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all() res = [] for agent_monitor in agent_monitors: agent = agent_num_map.get(agent_monitor.agent_num) data = AgentMonitorData() data.saas_id = saas_id data.agent_num = agent_monitor.agent_num data.agent_name = agent.agent_name data.out_id = agent_monitor.out_id data.check_state = agent_monitor.check_state data.check_scene = agent_monitor.check_scene data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0 if agent_monitor.check_in_time: data.check_in_time = agent_monitor.check_in_time.timestamp() day_start = self._get_day_start() if agent_monitor.check_in_time.timestamp() > day_start and \ agent_monitor.check_state == AgentCheck.OUT.code: data.online_state = 2 data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None data.service_state = agent_monitor.service_state data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None data.heart_state = agent_monitor.heart_state data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None res.append(data.__dict__) return res @with_app_context def update_checkin(self, agent_monitor,scene=None): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.check_state = AgentCheck.IN.code agent_monitor.check_in_time = datetime.now() agent_monitor.heart_state = AgentHeartState.NORMAL.code agent_monitor.heart_time = datetime.now() agent_monitor.check_scene = scene self.logger.info("update_checkin %s", agent_monitor.check_state) db.session.commit() @with_app_context def update_checkout(self, agent_monitor): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.check_state = AgentCheck.OUT.code agent_monitor.check_out_time = datetime.now() agent_monitor.service_state = AgentServiceState.LOGOUT.code agent_monitor.heart_state = AgentHeartState.DEFAULT.code agent_monitor.heart_time = datetime.now() self.logger.info("update_checkout %s", agent_monitor.check_out_time) db.session.commit() @with_app_context def update_idle(self, agent_monitor): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.service_state = AgentServiceState.IDLE.code agent_monitor.idle_time = datetime.now() db.session.commit() @with_app_context def update_busy(self, agent_monitor): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.service_state = AgentServiceState.BUSY.code agent_monitor.busy_time = datetime.now() db.session.commit() @with_app_context def update_dialing(self, agent_monitor): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.service_state = AgentServiceState.DIALING.code db.session.commit() @with_app_context def update_calling(self, agent_monitor): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.service_state = AgentServiceState.CALLING.code agent_monitor.call_time = datetime.now() db.session.commit() @with_app_context def update_processing(self, agent_monitor): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.service_state = AgentServiceState.REPROCESSING.code agent_monitor.hang_time = datetime.now() db.session.commit() @with_app_context def update_session_id(self, agent_monitor, session_id): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.session_id = session_id db.session.commit() @with_app_context def update_heart_error(self, agent_monitor): agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id) agent_monitor.heart_state = AgentHeartState.ABNORMAL.code agent_monitor.heart_time = datetime.now() db.session.commit() @with_app_context def _get_day_start(self): today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) return int(today_start.timestamp() * 1000) # Return milliseconds class AgentActionLogService: def __init__(self, app): self.app = app self.logger = app.logger self.data_handle_server = DataHandleServer(app) @with_app_context def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState): action_log = AgentActionLog() action_log.saas_id = agent_monitor.saas_id action_log.agent_num = agent_monitor.agent_num action_log.out_id = agent_monitor.out_id action_log.action_type = 0 action_log.check_state = agent_check_enum.code action_log.pre_check_state = agent_monitor.check_state if agent_log_enum: action_log.event_type = agent_log_enum.code action_log.event_desc = agent_log_enum.description now = datetime.now() pre_date = None if agent_monitor.check_state == AgentCheck.IN.code: pre_date = agent_monitor.check_in_time if agent_monitor.check_state == AgentCheck.OUT.code: pre_date = agent_monitor.check_out_time if pre_date is None: pre_date = agent_monitor.update_time action_log.check_state_time = now action_log.pre_check_state_time = pre_date action_log.check_state_duration = now.timestamp() - pre_date.timestamp() db.session.add(action_log) db.session.commit() @with_app_context def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState, task_id=None, service_id=None): if agent_monitor.service_state == agent_service_state.code: self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code) action_log = AgentActionLog() action_log.saas_id = agent_monitor.saas_id action_log.agent_num = agent_monitor.agent_num action_log.out_id = agent_monitor.out_id action_log.action_type = 1 action_log.service_state = agent_service_state.code action_log.pre_service_state = agent_monitor.service_state action_log.task_id = task_id action_log.service_id = service_id if agent_log_enum: action_log.event_type = agent_log_enum.code action_log.event_desc = agent_log_enum.description now = datetime.now() pre_date = None if agent_monitor.service_state == AgentServiceState.IDLE.code: pre_date = agent_monitor.idle_time if agent_monitor.service_state == AgentServiceState.BUSY.code: pre_date = agent_monitor.busy_time if agent_monitor.service_state == AgentServiceState.CALLING.code: pre_date = agent_monitor.call_time if agent_monitor.service_state == AgentServiceState.LOGOUT.code: pre_date = agent_monitor.check_out_time if agent_monitor.service_state == AgentServiceState.REPROCESSING.code: pre_date = agent_monitor.hang_time action_log.service_state_time = now if pre_date is None: pre_date = agent_monitor.update_time action_log.pre_service_state_time = pre_date action_log.service_state_duration = now.timestamp() - pre_date.timestamp() db.session.add(action_log) db.session.commit() class AgentStateService: def __init__(self, app): self.app = app self.logger = app.logger self.redis_handler = RedisHandler() self.assigned_recycle_millisecond = 30 * 1000 # self.state_service_id_data_map = defaultdict(dict) # self.executor = ThreadPoolExecutor(max_workers=10) self.data_handle_server = DataHandleServer(app) self.agent_monitor_service = AgentMonitorService(app) self.agent_actionlog_service = AgentActionLogService(app) def idle(self, saas_id, agent_id, phone_num): human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id) if human_service is None: self.logger.info(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log return self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id) def busy(self, saas_id, agent_id, phone_num): human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id) if human_service is None: self.logger.info(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log return self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id) def idle_by_human(self, saas_id, agent_id, service_id): agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id) if not agent: return agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num) if not agent_monitor: return if agent_monitor.check_state == AgentCheck.IN.code: self.agent_monitor_service.update_idle(agent_monitor) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE) self.idle_hash(saas_id, agent_id, agent.phone_num, service_id) def busy_by_human(self, saas_id, service_id, agent_id=None): agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id) if not agent: return agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num) if not agent_monitor: return if agent_monitor.check_state == AgentCheck.IN.code: self.agent_monitor_service.update_busy(agent_monitor) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.DEL_HUMAN_SERVICE) self.busy_hash(saas_id, agent_id, agent.phone_num, service_id) def checkin(self, saas_id, agent_id, phone_num): key = self._check_in_key(saas_id) state_data = AgentStateData() state_data.status = HumanState.DEFAULT.code state_data.time = datetime.now().timestamp() self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string()) self.redis_handler.redis.expire(key, self._get_expire_time()) def checkout(self, saas_id, agent_id, phone_num): key = self._check_in_key(saas_id) self.redis_handler.redis.hdel(key, phone_num) self.busy(saas_id, agent_id, phone_num) def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None): choose_phone_num = '' lock = threading.Lock() try: lock.acquire() self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called)) idle_agents = self.idle_agents(saas_id, service_id) if len(idle_agents) <= 0: return choose_phone_num choose_phone_num = self._choose_max_idle_time(idle_agents) self.handle_assign_time(saas_id, service_id, choose_phone_num) self.handle_lock_agent(choose_phone_num, saas_id, service_id) finally: lock.release() return choose_phone_num def handle_check_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'): key = self._lock_key(saas_id, service_id, choose_phone_num) res = self.redis_handler.redis.get(key) self.logger.info('checkAgent %s %s %s %s'% (saas_id, service_id, choose_phone_num, res)) return False if res else True def handle_lock_agent(self, choose_phone_num, saas_id, service_id='00000000000000000'): key = self._lock_key(saas_id, service_id, choose_phone_num) expire = self._get_expire_time() self.redis_handler.redis.set(key, 1, nx=True, ex=expire) res = self.redis_handler.redis.get(key) self.logger.info('lockAgent %s %s %s %s %s'% (saas_id, service_id, choose_phone_num, expire, res)) def handle_release_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'): key = self._lock_key(saas_id, service_id, choose_phone_num) self.redis_handler.redis.delete(key) # self.redis_handler.redis.expire(key, 3) self.logger.info('releaseAgent %s %s %s'% (saas_id, service_id, choose_phone_num)) def handle_assign_time(self, saas_id, service_id, choose_phone_num): key = self._key(saas_id, service_id) cache_agent_map = self.get_cache_agent_map(saas_id, service_id) if cache_agent_map and choose_phone_num in cache_agent_map: state_data = cache_agent_map[choose_phone_num] state_data.assign_time = datetime.now().timestamp() * 1000 self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string()) self.redis_handler.redis.expire(key, self._get_expire_time()) # self.update_report_state(saas_id, service_id) def idle_agents(self, saas_id, service_id): cache_agent_list = self.get_cache_agent_list(saas_id, service_id) if not cache_agent_list: return [] agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list]) self.logger.info("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str)) return self.get_idle_agents(cache_agent_list) def idle_hash(self, saas_id, agent_id, phone_num, service_id): key = self._key(saas_id, service_id) state_data = AgentStateData() # self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s", key, saas_id, phone_num) cache_agent_map = self.get_cache_agent_map(saas_id, service_id) if cache_agent_map and phone_num in cache_agent_map: state_data = cache_agent_map[phone_num] state_data.status = HumanState.IDLE.code state_data.time = datetime.now().timestamp() * 1000 self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string()) self.redis_handler.redis.expire(key, self._get_expire_time()) # self.update_report_state(saas_id, service_id) self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s, state_data=%s"%(key, saas_id, phone_num, state_data)) def busy_hash(self, saas_id, agent_id, phone_num, service_id): cache_agent_map = self.get_cache_agent_map(saas_id, service_id) state_data = cache_agent_map.get(phone_num) key = self._key(saas_id, service_id) if state_data is None: return state_data.status = HumanState.BUSY.code self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string()) self.redis_handler.redis.expire(key, self._get_expire_time()) # self.update_report_state(saas_id, service_id) def get_cache_agent_map(self, saas_id, service_id): cache_agent_list = self.get_cache_agent_list(saas_id, service_id) # 检查列表是否为空,如果为空返回空字典 if not cache_agent_list: return {} # 使用字典推导式将 cache_agent_list 转换为字典 return {agent.phone_num: agent for agent in cache_agent_list} def get_cache_agent_list(self, saas_id, service_id): redis_key = self._key(saas_id, service_id) map_cache_by_key = self.redis_handler.redis.hgetall(redis_key) self.logger.info("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key)) if not map_cache_by_key: # 检查字典是否为空 return [] # 返回空列表 free_agents = [] for phone_num, json_value in map_cache_by_key.items(): agent_status_data_dict = json.loads(json_value) agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象 agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码 free_agents.append(agent_status_data) return free_agents def get_idle_agents(self,cache_agent_list): # current_time =int(datetime.now().timestamp() * 1000) # 获取当前时间的毫秒级时间戳 idle_agents = [ agent for agent in cache_agent_list if agent.status == 1 and self.handle_check_agent_lock(agent.phone_num, SAAS_ID) #and ( # agent.assign_time == 0 or # agent.assign_time + self.assigned_recycle_millisecond < current_time #) ] return idle_agents def get_agent_service_idle_size(self, saas_id, service_id): idle_agents_size = 0 cache_agent_list = self.get_cache_agent_list(saas_id, service_id) if cache_agent_list: # 检查列表是否非空 idle_agents = self.get_idle_agents(cache_agent_list) idle_agents_size = len(idle_agents) # 获取空闲代理的数量 return idle_agents_size def get_agent_service_busy_size(self, saas_id, service_id): busy_agents_size = 0 cache_agent_list = self.get_cache_agent_list(saas_id, service_id) if cache_agent_list: # 检查列表是否非空 idle_agents = self.get_idle_agents(cache_agent_list) busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理 busy_agents_size = len(busy_agents) # 获取忙碌代理的数量 return busy_agents_size # def update_report_state(self, saas_id, service_id): # key = self._key(saas_id, service_id) # # data_map 这个地方有疑问 # data_map = self.state_service_id_data_map[key] # idle = HumanState.IDLE # if idle.value not in data_map: # data_map[idle.code] = threading.Lock() # self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle) # # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle) # busy = HumanState.BUSY # if busy.value not in data_map: # data_map[busy.code] = threading.Lock() # self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy) # # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy) # # def do_report_real_time_human_service_id(self, saas_id, service_id, human_state): # name = "cti_center_real_time_human_service_state" # tag_list = { # "vcc_id": saas_id, # "service_id": service_id, # "state": human_state.code, # } # if human_state == HumanState.IDLE: # # meter_registry 这块疑问 # self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id)) # elif human_state == HumanState.BUSY: # self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id)) # return 0 def _check_in_key(self, saas_id): return "CTI:%s:HUMAN:AGENT"%(saas_id.upper()) def _key(self, saas_id, service_id): return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id) def _lock_key(self, saas_id, service_id, choose_phone_num): return "CTI:%s:HUMAN:%s:%s:LOCK"%(saas_id.upper(), service_id, choose_phone_num) def _get_expire_time(self): now = datetime.now() end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0) expire_time = (end_of_day - now).total_seconds() # Convert to milliseconds return int(expire_time) def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str: idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False) return idle_agents[0].phone_num