#!/usr/bin/env python3 # encoding:utf-8 import traceback from typing import List from collections import defaultdict from src.core.callcenter.constant import START_AGENT_NUM from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \ AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState from sqlalchemy import or_ from src.core.callcenter.dao import * from src.core.callcenter.exception import BizException from src.core.callcenter.api import AgentActionRequest, AgentInfo, AgentQueryRequest, AgentRequest, AgentEventData, \ AgentStateData, HumanServiceQueryRequest, AgentMonitorData from src.core.callcenter.push import PushHandler from src.core.datasource import RedisHandler from concurrent.futures import ThreadPoolExecutor import threading class AgentOperService: def __init__(self, client, logger): self.inbound_client = client self.logger = logger self.push_handler = PushHandler(logger) self.agent_monitor_service = AgentMonitorService(client, logger) self.agent_actionlog_service = AgentActionLogService(client, logger) self.agent_state_service = AgentStateService(client, logger) def enable(self, req: AgentActionRequest): agent = _get_agent(req.saas_id, req.agent_number, req.out_id) if agent.agent_state == AgentState.ENABLE.code: return agent.agent_state = AgentState.ENABLE.code db.session.commit() def disable(self, req: AgentActionRequest): agent = _get_agent(req.saas_id, req.agent_id) if agent.agent_state == AgentState.DISABLE.code: return agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_number) if agent_monitor.check_state == AgentCheck.IN.code and \ agent_monitor.service_state == AgentServiceState.CALLING.code: raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE) agent.phone_num = '' agent.agent_state = AgentState.DISABLE.code db.session.commit() phone = _get_phone(req.saas_id, agent.phone_num) phone.is_delete = 1 db.session.commit() def checkin(self, req: AgentActionRequest): try: agent = _get_agent(req.saas_id, req.agent_id) if not agent or agent.agent_state == AgentState.DISABLE.code: raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN) phone = _get_phone(req.saas_id, agent.phone_num) agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num) agent_monitor.check_scene = req.scene self.agent_monitor_service.update_checkin(agent_monitor) self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN) self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num) if req.scene == AgentScene.MANUAL.code: # 如果是手动外呼增加置闲 self._handle_idle(req.scene, agent) return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene) except Exception as e: self.logger.error(traceback.format_exc()) raise e def checkout(self, req: AgentActionRequest): agent = _get_agent(req.saas_id, req.agent_id) if not agent or agent.agent_state == AgentState.DISABLE.code: raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE) agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num) if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code: raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE) if agent_monitor.check_state == AgentCheck.OUT.code: return self._push_event_for_checkout(agent, req.scene) self.agent_monitor_service.update_checkout(agent_monitor) self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT) self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num) return self._push_event_for_checkout(agent, req.scene) def busy(self, req: AgentActionRequest): agent = _get_agent(req.saas_id, req.agent_id) if not agent or agent.agent_state == AgentState.DISABLE.code: raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE) agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num) if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code: raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE) if agent_monitor.service_state == AgentServiceState.CALLING.code: raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE) self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num) if agent_monitor.service_state == AgentServiceState.BUSY.code: self._push_event_for_busy(agent, req.scene) return self.agent_monitor_service.update_busy(agent_monitor) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY) self._push_event_for_busy(agent, req.scene) def idle(self, req: AgentActionRequest): agent = _get_agent(req.saas_id, req.agent_id) if not agent or agent.agent_state == AgentState.DISABLE.code: raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE) self._handle_idle(req.scene, agent) def assign(self, req: AgentActionRequest): return self.agent_state_service.assign_agent(req.saas_id, req.service_id) def idle_agent_exist(self, request: AgentActionRequest): pass def agent_state(self,req: AgentActionRequest): # agent = _get_agent(req.saas_id, req.agent_id) agent_monitor = _get_agent_monitor(req.saas_id, req.agent_id) return agent_monitor.service_state def turn_on(self, req: AgentActionRequest): agent = _get_agent(req.saas_id, req.agent_id) agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num) agent_scene = AgentScene.get_by_code(req.scene) if not agent_monitor: raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR) if agent_monitor.service_state == AgentServiceState.CALLING.code: self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY) return self.agent_monitor_service.update_calling(agent_monitor) self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num) self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY) def _handle_idle(self, scene, agent): agent_monitor = _get_agent_monitor(agent.saas_id, agent.agent_num) if agent_monitor.check_state == AgentCheck.OUT.code: raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE) if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code: raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG) if scene == AgentScene.ROBOT.code: self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num) else: self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num) if agent_monitor.service_state == AgentServiceState.IDLE.code: self._push_event_for_idle(agent, scene) return self.agent_monitor_service.update_idle(agent_monitor) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE) self._push_event_for_idle(agent, scene) def _push_event_for_checkin(self, agent, agent_monitor, phone, scene): """坐席签入事件推送""" agent_scene = AgentScene.get_by_code(scene) self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS) event_data = AgentEventData(agent.saas_id, agent.out_id) event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code, 'ext': {'saasId': agent.saas_id, 'agentId': agent.out_id, 'phoneNum': phone.phone_num, 'phonePwd': phone.phone_pwd, 'sipServer': phone.sip_server } } return event_data.__dict__ def _push_event_for_checkout(self,agent,scene): """签出事件推送""" agent_scene = AgentScene.get_by_code(scene) self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功') event_data = AgentEventData(agent.saas_id, agent.out_id) event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code, 'ext': {'saasId': agent.saas_id, 'agentId': agent.out_id, } } return event_data.__dict__ def _push_event_for_busy(self,agent,scene): """置忙事件推送""" agent_scene = AgentScene.get_by_code(scene) self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY) pass def _push_event_for_idle(self, agent, scene): """坐席置闲事件推送""" agent_scene = AgentScene.get_by_code(scene) self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY) class AgentService: def __init__(self, client, logger): self.inbound_client = client self.logger = logger self.agent_monitor_service = AgentMonitorService(client, logger) def get_and_check(self, req: AgentActionRequest): agent = _get_agent(req.saas_id, req.agent_id) if not agent: return {} phone = _get_phone(req.saas_id, agent.phone_num) return phone.to_dict() def watch_agent_state(self, req: HumanServiceQueryRequest): pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id, HumanServiceMap.service_id == req.serviceId).all() agent_ids = [x.agent_id for x in pos] monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids) return monitors def add(self, req: AgentRequest): new_agent_num = _get_newest_agent_number(req.saas_id) agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id, agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number, distribute=req.distribute, agent_state=req.agent_state) db.session.add(agent) db.session.commit() agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id) db.session.add(agent_monitor) db.session.commit() return new_agent_num def update(self, req: AgentRequest): agent = _get_agent(req.saas_id, req.agent_number, req.out_id) if not agent: return phone_num = agent.phone_num state_change = agent.agent_state != req.agent_state disable = req.agent_state == AgentState.DISABLE agent.agent_name = req.agent_name agent.agent_pwd = req.agent_password agent.agent_type = req.agent_type agent.phone_num = req.phone_number if not disable else '' agent.distribute = req.distribute agent.agent_state = req.agent_state db.session.commit() if state_change and disable: phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first() if phone: db.session.delete(phone) def detail(self, saas_id, agent_number): agent = _get_agent(saas_id, agent_number=agent_number, out_id='') return agent def count(self, req: AgentQueryRequest): cnt = Agent.query.filter(Agent.saas_id == req.saas_id, or_(Agent.agent_num == req.agent_number, Agent.agent_name.contains(req.agent_name), Agent.out_id == req.out_id, Agent.agent_type == req.agent_type ) ).count() return cnt def query_page(self, req: AgentQueryRequest): pagination = Agent.query.filter(Agent.saas_id == req.saas_id, or_(Agent.agent_num == req.agent_number, Agent.agent_name.contains(req.agent_name), Agent.out_id == req.out_id, Agent.agent_type == req.agent_type ) ).paginate(req.page, req.size) # data = { # "page": pagination.page, # 当前页码 # "pages": pagination.pages, # 总页码 # "has_prev": pagination.has_prev, # 是否有上一页 # "prev_num": pagination.prev_num, # 上一页页码 # "has_next": pagination.has_next, # 是否有下一页 # "next_num": pagination.next_num, # 下一页页码 # "items": [{ # "id": item.id, # "name": item.name, # "age": item.age, # "sex": item.sex, # "money": item.money, # } for item in pagination.items] # } return pagination def delete(self, saas_id, agent_number): agent = _get_agent(saas_id, agent_number=agent_number, out_id='') if not agent: return agent.is_delete = 1 agent_monitor = _get_agent_monitor(saas_id, agent_number) agent_monitor.is_delete = 1 db.session.commit() phone = _get_phone(saas_id, agent.phone_num) phone.is_delete = 1 db.session.commit() class AgentMonitorService: def __init__(self, client, logger): self.inbound_client = client self.logger = logger def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None): if not out_ids: return [] agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all() if not agents: raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR) agent_num_map = {x.agent_num: x for x in agents} agent_nums = [x.agent_num for x in agents] agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all() res = [] for agent_monitor in agent_monitors: agent = agent_num_map.get(agent_monitor.agent_num) data = AgentMonitorData() data.saas_id = saas_id data.agent_num = agent_monitor.agent_num data.agent_name = agent.agent_name data.out_id = agent_monitor.out_id data.check_state = agent_monitor.check_state data.check_scene = agent_monitor.check_scene data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0 if agent_monitor.check_in_time: data.check_in_time = agent_monitor.check_in_time.timestamp() day_start = self._get_day_start() if agent_monitor.check_in_time.timestamp() > day_start and \ agent_monitor.check_state == AgentCheck.OUT.code: data.online_state = 2 data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None data.service_state = agent_monitor.service_state data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None data.heart_state = agent_monitor.heart_state data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None res.append(data.__dict__) return res def update_checkin(self, agent_monitor): agent_monitor.check_state = AgentCheck.IN.code agent_monitor.check_in_time = datetime.utcnow() agent_monitor.heart_state = AgentHeartState.NORMAL.code agent_monitor.heart_time = datetime.utcnow() db.session.commit() def update_checkout(self, agent_monitor): agent_monitor.check_state = AgentCheck.OUT.code agent_monitor.check_out_time = datetime.utcnow() agent_monitor.service_state = AgentServiceState.LOGOUT.code agent_monitor.heart_state = AgentHeartState.DEFAULT.code agent_monitor.heart_time = datetime.utcnow() print("update_checkout", agent_monitor.check_out_time) db.session.commit() def update_idle(self, agent_monitor): agent_monitor.service_state = AgentServiceState.IDLE.code agent_monitor.idle_time = datetime.utcnow() db.session.commit() def update_busy(self, agent_monitor): agent_monitor.service_state = AgentServiceState.BUSY.code agent_monitor.busy_time = datetime.utcnow() db.session.commit() def update_dialing(self, agent_monitor): agent_monitor.service_state = AgentServiceState.DIALING.code db.session.commit() def update_calling(self, agent_monitor): agent_monitor.service_state = AgentServiceState.CALLING.code agent_monitor.call_time = datetime.utcnow() db.session.commit() def update_processing(self, agent_monitor): agent_monitor.service_state = AgentServiceState.REPROCESSING.code agent_monitor.hang_time = datetime.utcnow() db.session.commit() def update_session_id(self, agent_monitor, session_id): agent_monitor.session_id = session_id db.session.commit() def update_heart_error(self, agent_monitor): agent_monitor.heart_state = AgentHeartState.ABNORMAL.code agent_monitor.heart_time = datetime.utcnow() db.session.commit() def _get_day_start(self): today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) return int(today_start.timestamp() * 1000) # Return milliseconds class AgentActionLogService: def __init__(self, client, logger): self.inbound_client = client self.logger = logger def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState): action_log = AgentActionLog() action_log.saas_id = agent_monitor.saas_id action_log.agent_num = agent_monitor.agent_num action_log.out_id = agent_monitor.out_id action_log.action_type = 0 action_log.check_state = agent_check_enum.code action_log.pre_check_state = agent_monitor.check_state if agent_log_enum: action_log.event_type = agent_log_enum.code action_log.event_desc = agent_log_enum.description now = datetime.utcnow() pre_date = None if agent_monitor.check_state == AgentCheck.IN.code: pre_date = agent_monitor.check_in_time if agent_monitor.check_state == AgentCheck.OUT.code: pre_date = agent_monitor.check_out_time if pre_date is None: pre_date = agent_monitor.update_time action_log.check_state_time = now action_log.pre_check_state_time = pre_date action_log.check_state_duration = now.timestamp() - pre_date.timestamp() db.session.add(action_log) db.session.commit() def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState, task_id=None, service_id=None): if agent_monitor.service_state == agent_service_state.code: self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code) action_log = AgentActionLog() action_log.saas_id = agent_monitor.saas_id action_log.agent_num = agent_monitor.agent_num action_log.out_id = agent_monitor.out_id action_log.action_type = 1 action_log.service_state = agent_service_state.code action_log.pre_service_state = agent_monitor.service_state action_log.task_id = task_id action_log.service_id = service_id if agent_log_enum: action_log.event_type = agent_log_enum.code action_log.event_desc = agent_log_enum.description now = datetime.utcnow() pre_date = None if agent_monitor.service_state == AgentServiceState.IDLE.code: pre_date = agent_monitor.idle_time if agent_monitor.service_state == AgentServiceState.BUSY.code: pre_date = agent_monitor.busy_time if agent_monitor.service_state == AgentServiceState.CALLING.code: pre_date = agent_monitor.call_time if agent_monitor.service_state == AgentServiceState.LOGOUT.code: pre_date = agent_monitor.check_out_time if agent_monitor.service_state == AgentServiceState.REPROCESSING.code: pre_date = agent_monitor.hang_time action_log.service_state_time = now if pre_date is None: pre_date = agent_monitor.update_time action_log.pre_service_state_time = pre_date action_log.service_state_duration = now.timestamp() - pre_date.timestamp() db.session.add(action_log) db.session.commit() class AgentStateService: def __init__(self, client, logger): self.inbound_client = client self.logger = logger self.redis_handler = RedisHandler() self.assigned_recycle_millisecond = 60000 self.state_service_id_data_map = defaultdict(dict) self.executor = ThreadPoolExecutor(max_workers=10) self.agent_monitor_service = AgentMonitorService(client, logger) self.agent_actionlog_service = AgentActionLogService(client, logger) def idle(self, saas_id, agent_id, phone_num): human_service = _get_human_service_service(saas_id, agent_id) if human_service is None: print(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log return self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id) def busy(self, saas_id, agent_id, phone_num): human_service = _get_human_service_service(saas_id, agent_id) if human_service is None: print(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log return self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id) def idle_by_human(self, saas_id, agent_id, service_id): agent = _get_agent(saas_id, out_id=agent_id) if not agent: return agent_monitor = _get_agent_monitor(saas_id, agent_number=agent.agent_num) if not agent_monitor: return if agent_monitor.check_state == AgentCheck.IN.code: self.agent_monitor_service.update_idle(agent_monitor) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE) self.idle_hash(saas_id, agent_id, agent.phone_num, service_id) def busy_by_human(self, saas_id, service_id, agent_id=None): agent = _get_agent(saas_id, out_id=agent_id) if not agent: return agent_monitor = _get_agent_monitor(saas_id, agent_number=agent.agent_num) if not agent_monitor: return if agent_monitor.check_state == AgentCheck.IN.code: self.agent_monitor_service.update_busy(agent_monitor) self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.DEL_HUMAN_SERVICE) self.busy_hash(saas_id, agent_id, agent.phone_num, service_id) def checkin(self, saas_id, agent_id, phone_num): key = self._check_in_key(saas_id) state_data = AgentStateData() state_data.status = HumanState.DEFAULT.code state_data.time = datetime.utcnow().timestamp() self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string()) self.redis_handler.redis.expire(key, self._get_expire_time()) def checkout(self, saas_id, agent_id, phone_num): key = self._check_in_key(saas_id) self.redis_handler.redis.hdel(key, phone_num) def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None): choose_phone_num = '' print("assignAgent %s %s %s"% (saas_id, service_id, called), flush=True) idle_agents = self.idle_agents(saas_id, service_id) if len(idle_agents) <= 0: return choose_phone_num choose_phone_num = self._choose_max_idle_time(idle_agents) self.handle_assign_time(saas_id, service_id, choose_phone_num) return choose_phone_num def handle_assign_time(self, saas_id, service_id, choose_phone_num): key = self._key(saas_id, service_id) cache_agent_map = self.get_cache_agent_map(saas_id, service_id) if cache_agent_map and choose_phone_num in cache_agent_map: state_data = cache_agent_map[choose_phone_num] state_data.assign_time = datetime.utcnow().timestamp() * 1000 self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string()) self.redis_handler.redis.expire(key, self._get_expire_time()) # self.update_report_state(saas_id, service_id) def idle_agents(self, saas_id, service_id): cache_agent_list = self.get_cache_agent_list(saas_id, service_id) if not cache_agent_list: return [] agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list]) print("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str), flush=True) return self.get_idle_agents(cache_agent_list) def idle_hash(self, saas_id, agent_id, phone_num, service_id): key = self._key(saas_id, service_id) state_data = AgentStateData() print("idle_hash, key=%s, saas_id=%s, phone_num=%s"%(key, saas_id, phone_num), flush=True) cache_agent_map = self.get_cache_agent_map(saas_id, service_id) if cache_agent_map and phone_num in cache_agent_map: state_data = cache_agent_map[phone_num] state_data.status = HumanState.IDLE.code state_data.time = datetime.utcnow().timestamp() * 1000 self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string()) self.redis_handler.redis.expire(key, self._get_expire_time()) # self.update_report_state(saas_id, service_id) def busy_hash(self, saas_id, agent_id, phone_num, service_id): cache_agent_map = self.get_cache_agent_map(saas_id, service_id) state_data = cache_agent_map.get(phone_num) key = self._key(saas_id, service_id) if state_data is None: return state_data.status = HumanState.BUSY.code self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string()) self.redis_handler.redis.expire(key, self._get_expire_time()) # self.update_report_state(saas_id, service_id) def get_cache_agent_map(self, saas_id, service_id): cache_agent_list = self.get_cache_agent_list(saas_id, service_id) # 检查列表是否为空,如果为空返回空字典 if not cache_agent_list: return {} # 使用字典推导式将 cache_agent_list 转换为字典 return {agent.phone_num: agent for agent in cache_agent_list} def get_cache_agent_list(self, saas_id, service_id): redis_key = self._key(saas_id, service_id) map_cache_by_key = self.redis_handler.redis.hgetall(redis_key) print("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key), flush=True) if not map_cache_by_key: # 检查字典是否为空 return [] # 返回空列表 free_agents = [] for phone_num, json_value in map_cache_by_key.items(): agent_status_data_dict = json.loads(json_value) agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象 agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码 free_agents.append(agent_status_data) return free_agents def get_idle_agents(self,cache_agent_list): current_time =int(datetime.utcnow().timestamp() * 1000) # 获取当前时间的毫秒级时间戳 idle_agents = [ agent for agent in cache_agent_list if agent.status == 1 and ( agent.assign_time == 0 or agent.assign_time + self.assigned_recycle_millisecond < current_time ) ] return idle_agents def get_agent_service_idle_size(self, saas_id, service_id): idle_agents_size = 0 cache_agent_list = self.get_cache_agent_list(saas_id, service_id) if cache_agent_list: # 检查列表是否非空 idle_agents = self.get_idle_agents(cache_agent_list) idle_agents_size = len(idle_agents) # 获取空闲代理的数量 return idle_agents_size def get_agent_service_busy_size(self, saas_id, service_id): busy_agents_size = 0 cache_agent_list = self.get_cache_agent_list(saas_id, service_id) if cache_agent_list: # 检查列表是否非空 idle_agents = self.get_idle_agents(cache_agent_list) busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理 busy_agents_size = len(busy_agents) # 获取忙碌代理的数量 return busy_agents_size def update_report_state(self, saas_id, service_id): key = self._key(saas_id, service_id) # data_map 这个地方有疑问 data_map = self.state_service_id_data_map[key] idle = HumanState.IDLE if idle.value not in data_map: data_map[idle.code] = threading.Lock() self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle) # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle) busy = HumanState.BUSY if busy.value not in data_map: data_map[busy.code] = threading.Lock() self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy) # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy) def do_report_real_time_human_service_id(self, saas_id, service_id, human_state): name = "cti_center_real_time_human_service_state" tag_list = { "vcc_id": saas_id, "service_id": service_id, "state": human_state.code, } if human_state == HumanState.IDLE: # meter_registry 这块疑问 self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id)) elif human_state == HumanState.BUSY: self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id)) return 0 def _check_in_key(self, saas_id): return "CTI:%s:HUMAN:AGENT"%(saas_id.upper()) def _key(self, saas_id, service_id): return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id) def _get_expire_time(self): now = datetime.now() end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0) expire_time = (end_of_day - now).total_seconds() * 1000 # Convert to milliseconds return int(expire_time) def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str: idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False) return idle_agents[0].phone_num def _get_agent(saas_id, agent_id=None, agent_number=None, out_id=None): agent = Agent.query.filter( Agent.saas_id == saas_id, or_(Agent.out_id == agent_id, Agent.out_id == out_id, Agent.agent_num == agent_number) ).first() return agent def _get_newest_agent_number(saas_id): agent = Agent.query.filter(Agent.saas_id == saas_id).order_by(Agent.agent_num.desc()).first() agentNum = START_AGENT_NUM if agent and agent.agent_num: agentNum = str(int(agent.agent_num) + 1) return agentNum def _get_agent_monitor(saas_id, agent_number): monitor = AgentMonitor.query.filter(AgentMonitor.saas_id == saas_id, AgentMonitor.agent_num == agent_number).first() print('_get_agent_monitor', saas_id, agent_number, monitor) return monitor def _get_phone(saas_id, phone_num): phone = Phone.query.filter(Phone.saas_id == saas_id, Phone.phone_num == phone_num).first() return phone def _get_human_service_service(saas_id, agent_id): human_service_map = HumanServiceMap.query.filter(HumanServiceMap.saas_id == saas_id, HumanServiceMap.agent_id ==agent_id).first() return human_service_map