12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013 |
- #!/usr/bin/env python3
- # encoding:utf-8
- import threading
- import time
- import traceback
- from collections import defaultdict
- from concurrent.futures import ThreadPoolExecutor
- from typing import List
- from apscheduler.schedulers.background import BackgroundScheduler
- from sqlalchemy import or_
- from src.core.callcenter import registry
- import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
- from src.core import with_app_context
- from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \
- AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData
- from src.core.callcenter.cache import Cache
- from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID
- from src.core.callcenter.dao import *
- from src.core.callcenter.data_handler import DataHandleServer
- from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
- AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect
- from src.core.callcenter.esl.constant.event_names import *
- from src.core.callcenter.exception import BizException
- from src.core.callcenter.push import PushHandler
- from src.core.datasource import RedisHandler
- class AgentEventService:
- def __init__(self, app):
- self.app = app
- self.logger = app.logger
- self.cache = Cache(app)
- self.push_handler = PushHandler(app.logger)
- self.data_handle_server = DataHandleServer(app)
- self.agent_monitor_service = AgentMonitorService(app)
- self.agent_state_service = AgentStateService(app)
- self.agent_actionlog_service = AgentActionLogService(app)
- def delay_state(self, state_data: AgentDelayStateData):
- agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
- if not agent:
- return
- agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
- if not agent_monitor:
- return
- #TODO 非最新通话的延迟事件,忽略.
- agent_scene = AgentScene.get_by_code(state_data.scene)
- self.logger.info("agent event delay state %s %s %s %s", state_data.saas_id, state_data.agent_num, state_data.service_state, agent_monitor.service_state)
- if AgentServiceState.REPROCESSING.code == state_data.service_state and AgentServiceState.REPROCESSING.code == agent_monitor.service_state:
- self.reprocessing_idle(state_data)
- if AgentServiceState.DIALING.code == state_data.service_state and AgentServiceState.DIALING.code == agent_monitor.service_state:
- if self.cache.get_call_is_answer(state_data.saas_id, state_data.agent_num):
- return
- self.agent_monitor_service.update_idle(agent_monitor)
- self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
- self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
- self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.BIZ_DIALING_IDLE)
- if AgentServiceState.HANGING.code == state_data.service_state \
- and (AgentServiceState.DIALING.code == agent_monitor.service_state
- or AgentServiceState.CALLING.code == agent_monitor.service_state):
- self.agent_monitor_service.update_idle(agent_monitor)
- if AgentServiceState.DIALING.code == agent_monitor.service_state:
- self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
- self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
- self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.MANUAL_HANG_UP)
- def agent_event_channel(self, event, call_info: CallInfo, device_info: DeviceInfo):
- event_name = EslEventUtil.getEventName(event)
- saas_id = call_info.saas_id if call_info else None
- flow_id = call_info.cti_flow_id if call_info else None
- call_id = call_info.call_id if call_info else None
- device_id = device_info.device_id if device_info else None
- agent_num = device_info.agent_key if device_info else None
- caller = device_info.caller if device_info else None
- called = device_info.called if device_info else None
- is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
- start_time = time.time()
- try:
- self.logger.info('agent_event_channel, event_name=%s, agent_num=%s, device_id=%s, is_agent=%s', event_name, agent_num, device_id, is_agent)
- agent = self.data_handle_server.get_agent(saas_id, agent_num)
- if not agent:
- # self.logger.warn("event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
- return
- agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
- if not agent_monitor:
- # self.logger.warn("event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
- return
- # 信道发起事件,触发完成发起(或桥)&& 坐席侧
- if CHANNEL_ORIGINATE == event_name and is_agent:
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_RINGING,phone=call_info.caller)
- # 进度事件,外呼时对方提醒。或者入呼时提醒 && 坐席侧
- if CHANNEL_PROGRESS == event_name and is_agent:
- self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.DIALING)
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING)
- # 媒体进度事件,外呼时对方提醒。或者入呼时提醒 && 用户侧
- if CHANNEL_PROGRESS_MEDIA == event_name and not is_agent:
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING_RINGING)
- #应答
- if CHANNEL_ANSWER == event_name:
- if call_id:
- if self.cache.get_call_is_end(call_id):
- # self.logger.warn("event service channel call is end {} {} {} {} {} {}", saas_id, event_name, caller, called, call_id, json.dumps(event.serialize('json')))
- return
- self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
- self.logger.info('agent接通:%s',agent )
- self.data_handle_server.update_record(call_id, status=1, user_id=agent.user_id,user_name=agent.agent_name)
- if is_agent:
- # 坐席接起
- self.cache.set_call_is_answer(saas_id, flow_id)
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.ANSWER_COMPENSATE)
- self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON)
- else:
- # 用户侧接起
- self.agent_monitor_service.update_calling(agent_monitor)
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
- self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
- #挂断
- if CHANNEL_HANGUP == event_name:
- # 坐席侧挂断
- if is_agent:
- if call_id:
- self.cache.set_call_is_end(call_id)
- self.agent_monitor_service.update_processing(agent_monitor)
- self.logger.info('挂断更新')
- self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL))
- self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, '0')
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_HANG_REPROCESSING)
- self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.REPROCESSING)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING, AgentLogState.CHANNEL_HANG_UP)
- # self.data_handle_server.update_record(call_id, time_end=datetime.now())
- # 同步处理后处理置闲
- # reprocessingIdle(statusDto);
- # agentProducer.pushDelayedStatus(statusDto, reprocessingTimeout);
- if (CHANNEL_BRIDGE == event_name or PLAYBACK_START == event_name) and is_agent:
- self.push_handler.push_on_ring_start(saas_id, flow_id, agent_num, AgentScene.MANUAL, call_id)
- # self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
- # self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
- if DETECTED_TONE == event_name and not is_agent:
- self.push_handler.push_on_detected_tone(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
- if (CHANNEL_UNBRIDGE == event_name or PLAYBACK_STOP == event_name) and is_agent:
- self.push_handler.push_on_ring_end(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
- except:
- traceback.print_exc()
- finally:
- latency = (time.time() - start_time)
- registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
- def bot_event_channel(self, event, call_info, device_info):
- event_name = EslEventUtil.getEventName(event)
- saas_id = call_info.saas_id if call_info else None
- flow_id = call_info.cti_flow_id if call_info else None
- call_id = call_info.call_id if call_info else None
- agent_num = device_info.agent_key if device_info else None
- is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
- caller = (device_info.called if is_agent else device_info.caller) if device_info else None
- called = (device_info.caller if is_agent else device_info.called) if device_info else None
- human_service_id = '00000000000000000'
- start_time = time.time()
- try:
- self.logger.info('bot_event_channel, event_name=%s, call_id=%s, is_agent=%s, agent_num=%s', event_name, call_id, is_agent, agent_num)
- agent = self.data_handle_server.get_agent(saas_id, agent_num)
- if not agent:
- # self.logger.warn("bot event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called,
- # json.dumps(event.serialize('json')))
- return
- agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
- if not agent_monitor:
- # self.logger.warn("bot event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller,
- # called, json.dumps(event.serialize('json')))
- return
- # 信道发起事件,触发完成发起(或桥)&& 坐席侧
- if CHANNEL_ORIGINATE == event_name and is_agent:
- self.push_handler.push_on_call_ring(saas_id, flow_id, agent_num, AgentScene.ROBOT, call_id, ServiceDirect.ROBOT_CALL.service_direct, called, caller, human_service_id)
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_RINGING,phone=call_info.caller)
- if CHANNEL_ANSWER == event_name:
- self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
- self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)
- if is_agent:
- self.agent_monitor_service.update_calling(agent_monitor)
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! internal")
- self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.CALLING)
- self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON, service_id=human_service_id)
- else:
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! external")
- if CHANNEL_HANGUP == event_name and is_agent:
- self.agent_monitor_service.update_processing(agent_monitor)
- self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT))
- self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, "0")
- self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_HANG_REPROCESSING)
- self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.REPROCESSING)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING,
- AgentLogState.CHANNEL_HANG_UP, service_id=human_service_id)
- # self.data_handle_server.update_record(call_id, time_end=datetime.now())
- except:
- traceback.print_exc()
- finally:
- latency = (time.time() - start_time)
- registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
- def reprocessing_idle(self, state_data: AgentDelayStateData):
- agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
- if not agent:
- return
- agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
- if not agent_monitor:
- return
- self.logger.info('reprocessing_idle %s %s %s', state_data.saas_id, state_data.agent_num, agent_monitor)
- self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
- self.logger.info('reprocessing_idle_end')
- self.agent_monitor_service.update_idle(agent_monitor)
- self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, "", state_data.scene, WorkStatus.AGENT_HANG_IDLE)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.REPROCESSING_IDLE)
- class AgentOperService:
- def __init__(self, app):
- self.app = app
- self.logger = app.logger
- self.redis_handler = RedisHandler()
- self.push_handler = PushHandler(app.logger)
- self.data_handle_server = DataHandleServer(app)
- self.agent_monitor_service = AgentMonitorService(app)
- self.agent_actionlog_service = AgentActionLogService(app)
- self.agent_state_service = AgentStateService(app)
- # self.daemon_stopping = False
- self.agent_heartbeat_expire = 30
- self.agent_heartbeat_job_scheduler = BackgroundScheduler()
- self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
- self.agent_heartbeat_job_scheduler.start()
- # threading.Thread(target=self.agent_heartbeat_daemon).start()
- def agent_heartbeat_daemon(self):
- def check_out_daemon(_name, key, value):
- try:
- sec = datetime.now().timestamp() - float(value)
- if sec > self.agent_heartbeat_expire:
- self.redis_handler.redis.hdel(_name, key)
- self.logger.error("agent heartbeat expired, will checkout %s %s", key, value)
- self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key))
- except:
- traceback.print_exc()
- # while not self.daemon_stopping:
- name = CENTER_AGENT_HEARTBEAT % SAAS_ID
- members = self.redis_handler.redis.hgetall(name)
- if not members:
- return
- for k,v in members.items():
- check_out_daemon(name, k, v)
- # time.sleep(1)
- def __del__(self):
- self.agent_heartbeat_job_scheduler.shutdown()
- # self.daemon_stopping = True
- @with_app_context
- def enable(self, req: AgentActionRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
- if agent.agent_state == AgentState.ENABLE.code:
- return
- agent.agent_state = AgentState.ENABLE.code
- db.session.commit()
- @with_app_context
- def disable(self, req: AgentActionRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
- if agent.agent_state == AgentState.DISABLE.code:
- return
- agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_number)
- if agent_monitor.check_state == AgentCheck.IN.code and \
- agent_monitor.service_state == AgentServiceState.CALLING.code:
- raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
- agent.phone_num = ''
- agent.agent_state = AgentState.DISABLE.code
- db.session.commit()
- phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
- phone.is_delete = 1
- db.session.commit()
- @with_app_context
- def checkin(self, req: AgentActionRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
- if not agent or agent.agent_state == AgentState.DISABLE.code:
- raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN)
- phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
- agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
- # agent_monitor.check_scene = req.scene
- self.agent_monitor_service.update_checkin(agent_monitor,req.scene)
- self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN)
- self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num)
- print("checkin", agent_monitor,agent_monitor.check_state)
- if req.scene == AgentScene.MANUAL.code:
- # 如果是手动外呼增加置忙
- self._handle_idle(req.scene, agent)
- return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene)
- @with_app_context
- def checkout(self, req: AgentActionRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
- if not agent or agent.agent_state == AgentState.DISABLE.code:
- raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
- agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
- if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code:
- raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
- if agent_monitor.check_state == AgentCheck.OUT.code:
- return self._push_event_for_checkout(agent, req.scene)
- self.agent_monitor_service.update_checkout(agent_monitor)
- self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT)
- self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num)
- return self._push_event_for_checkout(agent, req.scene)
- @with_app_context
- def busy(self, req: AgentActionRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
- if not agent or agent.agent_state == AgentState.DISABLE.code:
- raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
- agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
- if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code:
- raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
- if agent_monitor.service_state == AgentServiceState.CALLING.code:
- raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
- self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
- if agent_monitor.service_state == AgentServiceState.BUSY.code:
- self._push_event_for_busy(agent, req.scene)
- return
- self.agent_monitor_service.update_busy(agent_monitor)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY)
- self._push_event_for_busy(agent, req.scene)
- @with_app_context
- def idle(self, req: AgentActionRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
- if not agent or agent.agent_state == AgentState.DISABLE.code:
- raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
- self._handle_idle(req.scene, agent)
- @with_app_context
- def assign(self, req: AgentActionRequest):
- return self.agent_state_service.assign_agent(req.saas_id, req.service_id)
- def idle_agent_exist(self, request: AgentActionRequest):
- pass
- @with_app_context
- def agent_state(self,req: AgentActionRequest):
- # agent = _get_agent(req.saas_id, req.agent_id)
- agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, req.agent_id)
- return agent_monitor.service_state
- @with_app_context
- def turn_on(self, req: AgentActionRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
- agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
- agent_scene = AgentScene.get_by_code(req.scene)
- if not agent_monitor:
- raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
- if agent_monitor.service_state == AgentServiceState.CALLING.code:
- self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
- return
- self.agent_monitor_service.update_calling(agent_monitor)
- self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
- self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
- @with_app_context
- def _handle_idle(self, scene, agent):
- agent_monitor = self.data_handle_server.get_agent_monitor(agent.saas_id, agent.agent_num)
- print('agent_monitor:', agent_monitor, agent)
- if agent_monitor.check_state == AgentCheck.OUT.code:
- raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
- if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code:
- raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG)
- if scene == AgentScene.ROBOT.code:
- self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
- else:
- self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
- if agent_monitor.service_state == AgentServiceState.IDLE.code:
- self._push_event_for_idle(agent, scene)
- return
- self.agent_monitor_service.update_idle(agent_monitor)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE)
- self._push_event_for_idle(agent, scene)
- def _push_event_for_checkin(self, agent, agent_monitor, phone, scene):
- """坐席签入事件推送"""
- agent_scene = AgentScene.get_by_code(scene)
- self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS)
- event_data = AgentEventData(agent.saas_id, agent.out_id)
- event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code,
- 'ext': {'saasId': agent.saas_id,
- 'agentId': agent.out_id,
- 'phoneNum': phone.phone_num,
- 'phonePwd': phone.phone_pwd,
- 'sipServer': phone.sip_server
- }
- }
- return event_data.__dict__
- def _push_event_for_checkout(self,agent,scene):
- """签出事件推送"""
- agent_scene = AgentScene.get_by_code(scene)
- self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功')
- event_data = AgentEventData(agent.saas_id, agent.out_id)
- event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code,
- 'ext': {'saasId': agent.saas_id,
- 'agentId': agent.out_id,
- }
- }
- return event_data.__dict__
- def _push_event_for_busy(self,agent,scene):
- """置忙事件推送"""
- agent_scene = AgentScene.get_by_code(scene)
- self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY)
- pass
- def _push_event_for_idle(self, agent, scene):
- """坐席置闲事件推送"""
- agent_scene = AgentScene.get_by_code(scene)
- self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY)
- class AgentService:
- def __init__(self, app):
- self.app = app
- self.logger = app.logger
- self.data_handle_server = DataHandleServer(app)
- self.agent_monitor_service = AgentMonitorService(app)
- @with_app_context
- def get_and_check(self, req: AgentActionRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
- if not agent:
- return {}
- phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
- return phone.to_dict()
- @with_app_context
- def watch_agent_state(self, req: HumanServiceQueryRequest):
- pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id,
- HumanServiceMap.service_id == req.serviceId).all()
- agent_ids = [x.agent_id for x in pos]
- monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids)
- return monitors
- @with_app_context
- def add(self, req: AgentRequest):
- new_agent_num = self.data_handle_server.get_newest_agent_number(req.saas_id)
- agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id,
- agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number,
- distribute=req.distribute, agent_state=req.agent_state)
- db.session.add(agent)
- db.session.commit()
- agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id)
- db.session.add(agent_monitor)
- db.session.commit()
- return new_agent_num
- @with_app_context
- def update(self, req: AgentRequest):
- agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
- if not agent:
- return
- phone_num = agent.phone_num
- state_change = agent.agent_state != req.agent_state
- disable = req.agent_state == AgentState.DISABLE
- agent.agent_name = req.agent_name
- agent.agent_pwd = req.agent_password
- agent.agent_type = req.agent_type
- agent.phone_num = req.phone_number if not disable else ''
- agent.distribute = req.distribute
- agent.agent_state = req.agent_state
- db.session.commit()
- if state_change and disable:
- phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first()
- if phone:
- db.session.delete(phone)
- @with_app_context
- def detail(self, saas_id, agent_number):
- agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
- return agent
- @with_app_context
- def count(self, req: AgentQueryRequest):
- cnt = Agent.query.filter(Agent.saas_id == req.saas_id,
- or_(Agent.agent_num == req.agent_number,
- Agent.agent_name.contains(req.agent_name),
- Agent.out_id == req.out_id,
- Agent.agent_type == req.agent_type
- )
- ).count()
- return cnt
- @with_app_context
- def query_page(self, req: AgentQueryRequest):
- pagination = Agent.query.filter(Agent.saas_id == req.saas_id,
- or_(Agent.agent_num == req.agent_number,
- Agent.agent_name.contains(req.agent_name),
- Agent.out_id == req.out_id,
- Agent.agent_type == req.agent_type
- )
- ).paginate(req.page, req.size)
- # data = {
- # "page": pagination.page, # 当前页码
- # "pages": pagination.pages, # 总页码
- # "has_prev": pagination.has_prev, # 是否有上一页
- # "prev_num": pagination.prev_num, # 上一页页码
- # "has_next": pagination.has_next, # 是否有下一页
- # "next_num": pagination.next_num, # 下一页页码
- # "items": [{
- # "id": item.id,
- # "name": item.name,
- # "age": item.age,
- # "sex": item.sex,
- # "money": item.money,
- # } for item in pagination.items]
- # }
- return pagination
- @with_app_context
- def delete(self, saas_id, agent_number):
- agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
- if not agent:
- return
- agent.is_delete = 1
- agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number)
- agent_monitor.is_delete = 1
- db.session.commit()
- phone = self.data_handle_server.get_phone(saas_id, agent.phone_num)
- phone.is_delete = 1
- db.session.commit()
- class AgentMonitorService:
- def __init__(self, app):
- self.app = app
- self.logger = app.logger
- self.data_handle_server = DataHandleServer(app)
- @with_app_context
- def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None):
- if not out_ids:
- return []
- agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all()
- if not agents:
- raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
- agent_num_map = {x.agent_num: x for x in agents}
- agent_nums = [x.agent_num for x in agents]
- agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all()
- res = []
- for agent_monitor in agent_monitors:
- agent = agent_num_map.get(agent_monitor.agent_num)
- data = AgentMonitorData()
- data.saas_id = saas_id
- data.agent_num = agent_monitor.agent_num
- data.agent_name = agent.agent_name
- data.out_id = agent_monitor.out_id
- data.check_state = agent_monitor.check_state
- data.check_scene = agent_monitor.check_scene
- data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0
- if agent_monitor.check_in_time:
- data.check_in_time = agent_monitor.check_in_time.timestamp()
- day_start = self._get_day_start()
- if agent_monitor.check_in_time.timestamp() > day_start and \
- agent_monitor.check_state == AgentCheck.OUT.code:
- data.online_state = 2
- data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None
- data.service_state = agent_monitor.service_state
- data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None
- data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None
- data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None
- data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None
- data.heart_state = agent_monitor.heart_state
- data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None
- data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None
- res.append(data.__dict__)
- return res
- @with_app_context
- def update_checkin(self, agent_monitor,scene=None):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.check_state = AgentCheck.IN.code
- agent_monitor.check_in_time = datetime.now()
- agent_monitor.heart_state = AgentHeartState.NORMAL.code
- agent_monitor.heart_time = datetime.now()
- agent_monitor.check_scene = scene
- self.logger.info("update_checkin %s", agent_monitor.check_state)
- db.session.commit()
- @with_app_context
- def update_checkout(self, agent_monitor):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.check_state = AgentCheck.OUT.code
- agent_monitor.check_out_time = datetime.now()
- agent_monitor.service_state = AgentServiceState.LOGOUT.code
- agent_monitor.heart_state = AgentHeartState.DEFAULT.code
- agent_monitor.heart_time = datetime.now()
- self.logger.info("update_checkout %s", agent_monitor.check_out_time)
- db.session.commit()
- @with_app_context
- def update_idle(self, agent_monitor):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.service_state = AgentServiceState.IDLE.code
- agent_monitor.idle_time = datetime.now()
- db.session.commit()
- @with_app_context
- def update_busy(self, agent_monitor):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.service_state = AgentServiceState.BUSY.code
- agent_monitor.busy_time = datetime.now()
- db.session.commit()
- @with_app_context
- def update_dialing(self, agent_monitor):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.service_state = AgentServiceState.DIALING.code
- db.session.commit()
- @with_app_context
- def update_calling(self, agent_monitor):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.service_state = AgentServiceState.CALLING.code
- agent_monitor.call_time = datetime.now()
- db.session.commit()
- @with_app_context
- def update_processing(self, agent_monitor):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.service_state = AgentServiceState.REPROCESSING.code
- agent_monitor.hang_time = datetime.now()
- db.session.commit()
- @with_app_context
- def update_session_id(self, agent_monitor, session_id):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.session_id = session_id
- db.session.commit()
- @with_app_context
- def update_heart_error(self, agent_monitor):
- agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
- agent_monitor.heart_state = AgentHeartState.ABNORMAL.code
- agent_monitor.heart_time = datetime.now()
- db.session.commit()
- @with_app_context
- def _get_day_start(self):
- today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
- return int(today_start.timestamp() * 1000) # Return milliseconds
- class AgentActionLogService:
- def __init__(self, app):
- self.app = app
- self.logger = app.logger
- self.data_handle_server = DataHandleServer(app)
- @with_app_context
- def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState):
- action_log = AgentActionLog()
- action_log.saas_id = agent_monitor.saas_id
- action_log.agent_num = agent_monitor.agent_num
- action_log.out_id = agent_monitor.out_id
- action_log.action_type = 0
- action_log.check_state = agent_check_enum.code
- action_log.pre_check_state = agent_monitor.check_state
- if agent_log_enum:
- action_log.event_type = agent_log_enum.code
- action_log.event_desc = agent_log_enum.description
- now = datetime.now()
- pre_date = None
- if agent_monitor.check_state == AgentCheck.IN.code:
- pre_date = agent_monitor.check_in_time
- if agent_monitor.check_state == AgentCheck.OUT.code:
- pre_date = agent_monitor.check_out_time
- if pre_date is None:
- pre_date = agent_monitor.update_time
- action_log.check_state_time = now
- action_log.pre_check_state_time = pre_date
- action_log.check_state_duration = now.timestamp() - pre_date.timestamp()
- db.session.add(action_log)
- db.session.commit()
- @with_app_context
- def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState,
- task_id=None, service_id=None):
- if agent_monitor.service_state == agent_service_state.code:
- self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code)
- action_log = AgentActionLog()
- action_log.saas_id = agent_monitor.saas_id
- action_log.agent_num = agent_monitor.agent_num
- action_log.out_id = agent_monitor.out_id
- action_log.action_type = 1
- action_log.service_state = agent_service_state.code
- action_log.pre_service_state = agent_monitor.service_state
- action_log.task_id = task_id
- action_log.service_id = service_id
- if agent_log_enum:
- action_log.event_type = agent_log_enum.code
- action_log.event_desc = agent_log_enum.description
- now = datetime.now()
- pre_date = None
- if agent_monitor.service_state == AgentServiceState.IDLE.code:
- pre_date = agent_monitor.idle_time
- if agent_monitor.service_state == AgentServiceState.BUSY.code:
- pre_date = agent_monitor.busy_time
- if agent_monitor.service_state == AgentServiceState.CALLING.code:
- pre_date = agent_monitor.call_time
- if agent_monitor.service_state == AgentServiceState.LOGOUT.code:
- pre_date = agent_monitor.check_out_time
- if agent_monitor.service_state == AgentServiceState.REPROCESSING.code:
- pre_date = agent_monitor.hang_time
- action_log.service_state_time = now
- if pre_date is None:
- pre_date = agent_monitor.update_time
- action_log.pre_service_state_time = pre_date
- action_log.service_state_duration = now.timestamp() - pre_date.timestamp()
- db.session.add(action_log)
- db.session.commit()
- class AgentStateService:
- def __init__(self, app):
- self.app = app
- self.logger = app.logger
- self.redis_handler = RedisHandler()
- self.assigned_recycle_millisecond = 30 * 1000
- self.state_service_id_data_map = defaultdict(dict)
- self.executor = ThreadPoolExecutor(max_workers=10)
- self.data_handle_server = DataHandleServer(app)
- self.agent_monitor_service = AgentMonitorService(app)
- self.agent_actionlog_service = AgentActionLogService(app)
- def idle(self, saas_id, agent_id, phone_num):
- human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
- if human_service is None:
- self.logger.info(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log
- return
- self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id)
- def busy(self, saas_id, agent_id, phone_num):
- human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
- if human_service is None:
- self.logger.info(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log
- return
- self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id)
- def idle_by_human(self, saas_id, agent_id, service_id):
- agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
- if not agent:
- return
- agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
- if not agent_monitor:
- return
- if agent_monitor.check_state == AgentCheck.IN.code:
- self.agent_monitor_service.update_idle(agent_monitor)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE)
- self.idle_hash(saas_id, agent_id, agent.phone_num, service_id)
- def busy_by_human(self, saas_id, service_id, agent_id=None):
- agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
- if not agent:
- return
- agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
- if not agent_monitor:
- return
- if agent_monitor.check_state == AgentCheck.IN.code:
- self.agent_monitor_service.update_busy(agent_monitor)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY,
- AgentLogState.DEL_HUMAN_SERVICE)
- self.busy_hash(saas_id, agent_id, agent.phone_num, service_id)
- def checkin(self, saas_id, agent_id, phone_num):
- key = self._check_in_key(saas_id)
- state_data = AgentStateData()
- state_data.status = HumanState.DEFAULT.code
- state_data.time = datetime.now().timestamp()
- self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
- self.redis_handler.redis.expire(key, self._get_expire_time())
- def checkout(self, saas_id, agent_id, phone_num):
- key = self._check_in_key(saas_id)
- self.redis_handler.redis.hdel(key, phone_num)
- self.busy(saas_id, agent_id, phone_num)
- def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
- choose_phone_num = ''
- self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called))
- idle_agents = self.idle_agents(saas_id, service_id)
- if len(idle_agents) <= 0:
- return choose_phone_num
- choose_phone_num = self._choose_max_idle_time(idle_agents)
- self.handle_assign_time(saas_id, service_id, choose_phone_num)
- return choose_phone_num
- def handle_assign_time(self, saas_id, service_id, choose_phone_num):
- key = self._key(saas_id, service_id)
- cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
- if cache_agent_map and choose_phone_num in cache_agent_map:
- state_data = cache_agent_map[choose_phone_num]
- state_data.assign_time = datetime.now().timestamp() * 1000
- self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string())
- self.redis_handler.redis.expire(key, self._get_expire_time())
- # self.update_report_state(saas_id, service_id)
- def idle_agents(self, saas_id, service_id):
- cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
- if not cache_agent_list:
- return []
- agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list])
- self.logger.info("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str))
- return self.get_idle_agents(cache_agent_list)
- def idle_hash(self, saas_id, agent_id, phone_num, service_id):
- key = self._key(saas_id, service_id)
- state_data = AgentStateData()
- # self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s", key, saas_id, phone_num)
- cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
- if cache_agent_map and phone_num in cache_agent_map:
- state_data = cache_agent_map[phone_num]
- state_data.status = HumanState.IDLE.code
- state_data.time = datetime.now().timestamp() * 1000
- self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
- self.redis_handler.redis.expire(key, self._get_expire_time())
- # self.update_report_state(saas_id, service_id)
- self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s, state_data=%s"%(key, saas_id, phone_num, state_data))
- def busy_hash(self, saas_id, agent_id, phone_num, service_id):
- cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
- state_data = cache_agent_map.get(phone_num)
- key = self._key(saas_id, service_id)
- if state_data is None:
- return
- state_data.status = HumanState.BUSY.code
- self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
- self.redis_handler.redis.expire(key, self._get_expire_time())
- # self.update_report_state(saas_id, service_id)
- def get_cache_agent_map(self, saas_id, service_id):
- cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
- # 检查列表是否为空,如果为空返回空字典
- if not cache_agent_list:
- return {}
- # 使用字典推导式将 cache_agent_list 转换为字典
- return {agent.phone_num: agent for agent in cache_agent_list}
- def get_cache_agent_list(self, saas_id, service_id):
- redis_key = self._key(saas_id, service_id)
- map_cache_by_key = self.redis_handler.redis.hgetall(redis_key)
- self.logger.info("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key))
- if not map_cache_by_key: # 检查字典是否为空
- return [] # 返回空列表
- free_agents = []
- for phone_num, json_value in map_cache_by_key.items():
- agent_status_data_dict = json.loads(json_value)
- agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象
- agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码
- free_agents.append(agent_status_data)
- return free_agents
- def get_idle_agents(self,cache_agent_list):
- current_time =int(datetime.now().timestamp() * 1000) # 获取当前时间的毫秒级时间戳
- idle_agents = [
- agent for agent in cache_agent_list
- if agent.status == 1 and (
- agent.assign_time == 0 or
- agent.assign_time + self.assigned_recycle_millisecond < current_time
- )
- ]
- return idle_agents
- def get_agent_service_idle_size(self, saas_id, service_id):
- idle_agents_size = 0
- cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
- if cache_agent_list: # 检查列表是否非空
- idle_agents = self.get_idle_agents(cache_agent_list)
- idle_agents_size = len(idle_agents) # 获取空闲代理的数量
- return idle_agents_size
- def get_agent_service_busy_size(self, saas_id, service_id):
- busy_agents_size = 0
- cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
- if cache_agent_list: # 检查列表是否非空
- idle_agents = self.get_idle_agents(cache_agent_list)
- busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理
- busy_agents_size = len(busy_agents) # 获取忙碌代理的数量
- return busy_agents_size
- def update_report_state(self, saas_id, service_id):
- key = self._key(saas_id, service_id)
- # data_map 这个地方有疑问
- data_map = self.state_service_id_data_map[key]
- idle = HumanState.IDLE
- if idle.value not in data_map:
- data_map[idle.code] = threading.Lock()
- self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
- # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
- busy = HumanState.BUSY
- if busy.value not in data_map:
- data_map[busy.code] = threading.Lock()
- self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
- # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
- def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
- name = "cti_center_real_time_human_service_state"
- tag_list = {
- "vcc_id": saas_id,
- "service_id": service_id,
- "state": human_state.code,
- }
- if human_state == HumanState.IDLE:
- # meter_registry 这块疑问
- self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
- elif human_state == HumanState.BUSY:
- self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
- return 0
- def _check_in_key(self, saas_id):
- return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
- def _key(self, saas_id, service_id):
- return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
- def _get_expire_time(self):
- now = datetime.now()
- end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
- expire_time = (end_of_day - now).total_seconds() * 1000 # Convert to milliseconds
- return int(expire_time)
- def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
- idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)
- return idle_agents[0].phone_num
|