123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731 |
- #!/usr/bin/env python3
- # encoding:utf-8
- import traceback
- from typing import List
- from collections import defaultdict
- from src.core.callcenter.constant import START_AGENT_NUM
- from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
- AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState
- from sqlalchemy import or_
- from src.core.callcenter.dao import *
- from src.core.callcenter.exception import BizException
- from src.core.callcenter.api import AgentActionRequest, AgentInfo, AgentQueryRequest, AgentRequest, AgentEventData, \
- AgentStateData, HumanServiceQueryRequest, AgentMonitorData
- from src.core.callcenter.push import PushHandler
- from src.core.datasource import RedisHandler
- from concurrent.futures import ThreadPoolExecutor
- import threading
- class AgentOperService:
- def __init__(self, client, logger):
- self.inbound_client = client
- self.logger = logger
- self.push_handler = PushHandler(logger)
- self.agent_monitor_service = AgentMonitorService(client, logger)
- self.agent_actionlog_service = AgentActionLogService(client, logger)
- self.agent_state_service = AgentStateService(client, logger)
- def enable(self, req: AgentActionRequest):
- agent = _get_agent(req.saas_id, req.agent_number, req.out_id)
- if agent.agent_state == AgentState.ENABLE.code:
- return
- agent.agent_state = AgentState.ENABLE.code
- db.session.commit()
- def disable(self, req: AgentActionRequest):
- agent = _get_agent(req.saas_id, req.agent_id)
- if agent.agent_state == AgentState.DISABLE.code:
- return
- agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_number)
- if agent_monitor.check_state == AgentCheck.IN.code and \
- agent_monitor.service_state == AgentServiceState.CALLING.code:
- raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
- agent.phone_num = ''
- agent.agent_state = AgentState.DISABLE.code
- db.session.commit()
- phone = _get_phone(req.saas_id, agent.phone_num)
- phone.is_delete = 1
- db.session.commit()
- def checkin(self, req: AgentActionRequest):
- try:
- agent = _get_agent(req.saas_id, req.agent_id)
- if not agent or agent.agent_state == AgentState.DISABLE.code:
- raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN)
- phone = _get_phone(req.saas_id, agent.phone_num)
- agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num)
- agent_monitor.check_scene = req.scene
- self.agent_monitor_service.update_checkin(agent_monitor)
- self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN)
- self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num)
- if req.scene == AgentScene.MANUAL.code:
- # 如果是手动外呼增加置闲
- self._handle_idle(req.scene, agent)
- return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene)
- except Exception as e:
- self.logger.error(traceback.format_exc())
- raise e
- def checkout(self, req: AgentActionRequest):
- agent = _get_agent(req.saas_id, req.agent_id)
- if not agent or agent.agent_state == AgentState.DISABLE.code:
- raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
- agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num)
- if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code:
- raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
- if agent_monitor.check_state == AgentCheck.OUT.code:
- return self._push_event_for_checkout(agent, req.scene)
- self.agent_monitor_service.update_checkout(agent_monitor)
- self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT)
- self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num)
- return self._push_event_for_checkout(agent, req.scene)
- def busy(self, req: AgentActionRequest):
- agent = _get_agent(req.saas_id, req.agent_id)
- if not agent or agent.agent_state == AgentState.DISABLE.code:
- raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
- agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num)
- if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code:
- raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
- if agent_monitor.service_state == AgentServiceState.CALLING.code:
- raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
- self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
- if agent_monitor.service_state == AgentServiceState.BUSY.code:
- self._push_event_for_busy(agent, req.scene)
- return
- self.agent_monitor_service.update_busy(agent_monitor)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY)
- self._push_event_for_busy(agent, req.scene)
- def idle(self, req: AgentActionRequest):
- agent = _get_agent(req.saas_id, req.agent_id)
- if not agent or agent.agent_state == AgentState.DISABLE.code:
- raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
- self._handle_idle(req.scene, agent)
- def assign(self, req: AgentActionRequest):
- return self.agent_state_service.assign_agent(req.saas_id, req.service_id)
- def idle_agent_exist(self, request: AgentActionRequest):
- pass
- def agent_state(self,req: AgentActionRequest):
- # agent = _get_agent(req.saas_id, req.agent_id)
- agent_monitor = _get_agent_monitor(req.saas_id, req.agent_id)
- return agent_monitor.service_state
- def turn_on(self, req: AgentActionRequest):
- agent = _get_agent(req.saas_id, req.agent_id)
- agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num)
- agent_scene = AgentScene.get_by_code(req.scene)
- if not agent_monitor:
- raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
- if agent_monitor.service_state == AgentServiceState.CALLING.code:
- self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
- return
- self.agent_monitor_service.update_calling(agent_monitor)
- self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
- self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
- def _handle_idle(self, scene, agent):
- agent_monitor = _get_agent_monitor(agent.saas_id, agent.agent_num)
- if agent_monitor.check_state == AgentCheck.OUT.code:
- raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
- if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code:
- raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG)
- if scene == AgentScene.ROBOT.code:
- self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
- else:
- self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
- if agent_monitor.service_state == AgentServiceState.IDLE.code:
- self._push_event_for_idle(agent, scene)
- return
- self.agent_monitor_service.update_idle(agent_monitor)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE)
- self._push_event_for_idle(agent, scene)
- def _push_event_for_checkin(self, agent, agent_monitor, phone, scene):
- """坐席签入事件推送"""
- agent_scene = AgentScene.get_by_code(scene)
- self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS)
- event_data = AgentEventData(agent.saas_id, agent.out_id)
- event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code,
- 'ext': {'saasId': agent.saas_id,
- 'agentId': agent.out_id,
- 'phoneNum': phone.phone_num,
- 'phonePwd': phone.phone_pwd,
- 'sipServer': phone.sip_server
- }
- }
- return event_data.__dict__
- def _push_event_for_checkout(self,agent,scene):
- """签出事件推送"""
- agent_scene = AgentScene.get_by_code(scene)
- self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功')
- event_data = AgentEventData(agent.saas_id, agent.out_id)
- event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code,
- 'ext': {'saasId': agent.saas_id,
- 'agentId': agent.out_id,
- }
- }
- return event_data.__dict__
- def _push_event_for_busy(self,agent,scene):
- """置忙事件推送"""
- agent_scene = AgentScene.get_by_code(scene)
- self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY)
- pass
- def _push_event_for_idle(self, agent, scene):
- """坐席置闲事件推送"""
- agent_scene = AgentScene.get_by_code(scene)
- self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY)
- class AgentService:
- def __init__(self, client, logger):
- self.inbound_client = client
- self.logger = logger
- self.agent_monitor_service = AgentMonitorService(client, logger)
- def get_and_check(self, req: AgentActionRequest):
- agent = _get_agent(req.saas_id, req.agent_id)
- if not agent:
- return {}
- phone = _get_phone(req.saas_id, agent.phone_num)
- return phone.to_dict()
- def watch_agent_state(self, req: HumanServiceQueryRequest):
- pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id,
- HumanServiceMap.service_id == req.serviceId).all()
- agent_ids = [x.agent_id for x in pos]
- monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids)
- return monitors
- def add(self, req: AgentRequest):
- new_agent_num = _get_newest_agent_number(req.saas_id)
- agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id,
- agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number,
- distribute=req.distribute, agent_state=req.agent_state)
- db.session.add(agent)
- db.session.commit()
- agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id)
- db.session.add(agent_monitor)
- db.session.commit()
- return new_agent_num
- def update(self, req: AgentRequest):
- agent = _get_agent(req.saas_id, req.agent_number, req.out_id)
- if not agent:
- return
- phone_num = agent.phone_num
- state_change = agent.agent_state != req.agent_state
- disable = req.agent_state == AgentState.DISABLE
- agent.agent_name = req.agent_name
- agent.agent_pwd = req.agent_password
- agent.agent_type = req.agent_type
- agent.phone_num = req.phone_number if not disable else ''
- agent.distribute = req.distribute
- agent.agent_state = req.agent_state
- db.session.commit()
- if state_change and disable:
- phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first()
- if phone:
- db.session.delete(phone)
- def detail(self, saas_id, agent_number):
- agent = _get_agent(saas_id, agent_number=agent_number, out_id='')
- return agent
- def count(self, req: AgentQueryRequest):
- cnt = Agent.query.filter(Agent.saas_id == req.saas_id,
- or_(Agent.agent_num == req.agent_number,
- Agent.agent_name.contains(req.agent_name),
- Agent.out_id == req.out_id,
- Agent.agent_type == req.agent_type
- )
- ).count()
- return cnt
- def query_page(self, req: AgentQueryRequest):
- pagination = Agent.query.filter(Agent.saas_id == req.saas_id,
- or_(Agent.agent_num == req.agent_number,
- Agent.agent_name.contains(req.agent_name),
- Agent.out_id == req.out_id,
- Agent.agent_type == req.agent_type
- )
- ).paginate(req.page, req.size)
- # data = {
- # "page": pagination.page, # 当前页码
- # "pages": pagination.pages, # 总页码
- # "has_prev": pagination.has_prev, # 是否有上一页
- # "prev_num": pagination.prev_num, # 上一页页码
- # "has_next": pagination.has_next, # 是否有下一页
- # "next_num": pagination.next_num, # 下一页页码
- # "items": [{
- # "id": item.id,
- # "name": item.name,
- # "age": item.age,
- # "sex": item.sex,
- # "money": item.money,
- # } for item in pagination.items]
- # }
- return pagination
- def delete(self, saas_id, agent_number):
- agent = _get_agent(saas_id, agent_number=agent_number, out_id='')
- if not agent:
- return
- agent.is_delete = 1
- agent_monitor = _get_agent_monitor(saas_id, agent_number)
- agent_monitor.is_delete = 1
- db.session.commit()
- phone = _get_phone(saas_id, agent.phone_num)
- phone.is_delete = 1
- db.session.commit()
- class AgentMonitorService:
- def __init__(self, client, logger):
- self.inbound_client = client
- self.logger = logger
- def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None):
- if not out_ids:
- return []
- agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all()
- if not agents:
- raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
- agent_num_map = {x.agent_num: x for x in agents}
- agent_nums = [x.agent_num for x in agents]
- agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all()
- res = []
- for agent_monitor in agent_monitors:
- agent = agent_num_map.get(agent_monitor.agent_num)
- data = AgentMonitorData()
- data.saas_id = saas_id
- data.agent_num = agent_monitor.agent_num
- data.agent_name = agent.agent_name
- data.out_id = agent_monitor.out_id
- data.check_state = agent_monitor.check_state
- data.check_scene = agent_monitor.check_scene
- data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0
- if agent_monitor.check_in_time:
- data.check_in_time = agent_monitor.check_in_time.timestamp()
- day_start = self._get_day_start()
- if agent_monitor.check_in_time.timestamp() > day_start and \
- agent_monitor.check_state == AgentCheck.OUT.code:
- data.online_state = 2
- data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None
- data.service_state = agent_monitor.service_state
- data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None
- data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None
- data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None
- data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None
- data.heart_state = agent_monitor.heart_state
- data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None
- data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None
- res.append(data.__dict__)
- return res
- def update_checkin(self, agent_monitor):
- agent_monitor.check_state = AgentCheck.IN.code
- agent_monitor.check_in_time = datetime.utcnow()
- agent_monitor.heart_state = AgentHeartState.NORMAL.code
- agent_monitor.heart_time = datetime.utcnow()
- db.session.commit()
- def update_checkout(self, agent_monitor):
- agent_monitor.check_state = AgentCheck.OUT.code
- agent_monitor.check_out_time = datetime.utcnow()
- agent_monitor.service_state = AgentServiceState.LOGOUT.code
- agent_monitor.heart_state = AgentHeartState.DEFAULT.code
- agent_monitor.heart_time = datetime.utcnow()
- print("update_checkout", agent_monitor.check_out_time)
- db.session.commit()
- def update_idle(self, agent_monitor):
- agent_monitor.service_state = AgentServiceState.IDLE.code
- agent_monitor.idle_time = datetime.utcnow()
- db.session.commit()
- def update_busy(self, agent_monitor):
- agent_monitor.service_state = AgentServiceState.BUSY.code
- agent_monitor.busy_time = datetime.utcnow()
- db.session.commit()
- def update_dialing(self, agent_monitor):
- agent_monitor.service_state = AgentServiceState.DIALING.code
- db.session.commit()
- def update_calling(self, agent_monitor):
- agent_monitor.service_state = AgentServiceState.CALLING.code
- agent_monitor.call_time = datetime.utcnow()
- db.session.commit()
- def update_processing(self, agent_monitor):
- agent_monitor.service_state = AgentServiceState.REPROCESSING.code
- agent_monitor.hang_time = datetime.utcnow()
- db.session.commit()
- def update_session_id(self, agent_monitor, session_id):
- agent_monitor.session_id = session_id
- db.session.commit()
- def update_heart_error(self, agent_monitor):
- agent_monitor.heart_state = AgentHeartState.ABNORMAL.code
- agent_monitor.heart_time = datetime.utcnow()
- db.session.commit()
- def _get_day_start(self):
- today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
- return int(today_start.timestamp() * 1000) # Return milliseconds
- class AgentActionLogService:
- def __init__(self, client, logger):
- self.inbound_client = client
- self.logger = logger
- def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState):
- action_log = AgentActionLog()
- action_log.saas_id = agent_monitor.saas_id
- action_log.agent_num = agent_monitor.agent_num
- action_log.out_id = agent_monitor.out_id
- action_log.action_type = 0
- action_log.check_state = agent_check_enum.code
- action_log.pre_check_state = agent_monitor.check_state
- if agent_log_enum:
- action_log.event_type = agent_log_enum.code
- action_log.event_desc = agent_log_enum.description
- now = datetime.utcnow()
- pre_date = None
- if agent_monitor.check_state == AgentCheck.IN.code:
- pre_date = agent_monitor.check_in_time
- if agent_monitor.check_state == AgentCheck.OUT.code:
- pre_date = agent_monitor.check_out_time
- if pre_date is None:
- pre_date = agent_monitor.update_time
- action_log.check_state_time = now
- action_log.pre_check_state_time = pre_date
- action_log.check_state_duration = now.timestamp() - pre_date.timestamp()
- db.session.add(action_log)
- db.session.commit()
- def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState,
- task_id=None, service_id=None):
- if agent_monitor.service_state == agent_service_state.code:
- self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code)
- action_log = AgentActionLog()
- action_log.saas_id = agent_monitor.saas_id
- action_log.agent_num = agent_monitor.agent_num
- action_log.out_id = agent_monitor.out_id
- action_log.action_type = 1
- action_log.service_state = agent_service_state.code
- action_log.pre_service_state = agent_monitor.service_state
- action_log.task_id = task_id
- action_log.service_id = service_id
- if agent_log_enum:
- action_log.event_type = agent_log_enum.code
- action_log.event_desc = agent_log_enum.description
- now = datetime.utcnow()
- pre_date = None
- if agent_monitor.service_state == AgentServiceState.IDLE.code:
- pre_date = agent_monitor.idle_time
- if agent_monitor.service_state == AgentServiceState.BUSY.code:
- pre_date = agent_monitor.busy_time
- if agent_monitor.service_state == AgentServiceState.CALLING.code:
- pre_date = agent_monitor.call_time
- if agent_monitor.service_state == AgentServiceState.LOGOUT.code:
- pre_date = agent_monitor.check_out_time
- if agent_monitor.service_state == AgentServiceState.REPROCESSING.code:
- pre_date = agent_monitor.hang_time
- action_log.service_state_time = now
- if pre_date is None:
- pre_date = agent_monitor.update_time
- action_log.pre_service_state_time = pre_date
- action_log.service_state_duration = now.timestamp() - pre_date.timestamp()
- db.session.add(action_log)
- db.session.commit()
- class AgentStateService:
- def __init__(self, client, logger):
- self.inbound_client = client
- self.logger = logger
- self.redis_handler = RedisHandler()
- self.assigned_recycle_millisecond = 60000
- self.state_service_id_data_map = defaultdict(dict)
- self.executor = ThreadPoolExecutor(max_workers=10)
- self.agent_monitor_service = AgentMonitorService(client, logger)
- self.agent_actionlog_service = AgentActionLogService(client, logger)
- def idle(self, saas_id, agent_id, phone_num):
- human_service = _get_human_service_service(saas_id, agent_id)
- if human_service is None:
- print(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log
- return
- self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id)
- def busy(self, saas_id, agent_id, phone_num):
- human_service = _get_human_service_service(saas_id, agent_id)
- if human_service is None:
- print(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log
- return
- self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id)
- def idle_by_human(self, saas_id, agent_id, service_id):
- agent = _get_agent(saas_id, out_id=agent_id)
- if not agent:
- return
- agent_monitor = _get_agent_monitor(saas_id, agent_number=agent.agent_num)
- if not agent_monitor:
- return
- if agent_monitor.check_state == AgentCheck.IN.code:
- self.agent_monitor_service.update_idle(agent_monitor)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE)
- self.idle_hash(saas_id, agent_id, agent.phone_num, service_id)
- def busy_by_human(self, saas_id, service_id, agent_id=None):
- agent = _get_agent(saas_id, out_id=agent_id)
- if not agent:
- return
- agent_monitor = _get_agent_monitor(saas_id, agent_number=agent.agent_num)
- if not agent_monitor:
- return
- if agent_monitor.check_state == AgentCheck.IN.code:
- self.agent_monitor_service.update_busy(agent_monitor)
- self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY,
- AgentLogState.DEL_HUMAN_SERVICE)
- self.busy_hash(saas_id, agent_id, agent.phone_num, service_id)
- def checkin(self, saas_id, agent_id, phone_num):
- key = self._check_in_key(saas_id)
- state_data = AgentStateData()
- state_data.status = HumanState.DEFAULT.code
- state_data.time = datetime.utcnow().timestamp()
- self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
- self.redis_handler.redis.expire(key, self._get_expire_time())
- def checkout(self, saas_id, agent_id, phone_num):
- key = self._check_in_key(saas_id)
- self.redis_handler.redis.hdel(key, phone_num)
- def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
- choose_phone_num = ''
- print("assignAgent %s %s %s"% (saas_id, service_id, called), flush=True)
- idle_agents = self.idle_agents(saas_id, service_id)
- if len(idle_agents) <= 0:
- return choose_phone_num
- choose_phone_num = self._choose_max_idle_time(idle_agents)
- self.handle_assign_time(saas_id, service_id, choose_phone_num)
- return choose_phone_num
- def handle_assign_time(self, saas_id, service_id, choose_phone_num):
- key = self._key(saas_id, service_id)
- cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
- if cache_agent_map and choose_phone_num in cache_agent_map:
- state_data = cache_agent_map[choose_phone_num]
- state_data.assign_time = datetime.utcnow().timestamp() * 1000
- self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string())
- self.redis_handler.redis.expire(key, self._get_expire_time())
- # self.update_report_state(saas_id, service_id)
- def idle_agents(self, saas_id, service_id):
- cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
- if not cache_agent_list:
- return []
- agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list])
- print("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str), flush=True)
- return self.get_idle_agents(cache_agent_list)
- def idle_hash(self, saas_id, agent_id, phone_num, service_id):
- key = self._key(saas_id, service_id)
- state_data = AgentStateData()
- print("idle_hash, key=%s, saas_id=%s, phone_num=%s"%(key, saas_id, phone_num), flush=True)
- cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
- if cache_agent_map and phone_num in cache_agent_map:
- state_data = cache_agent_map[phone_num]
- state_data.status = HumanState.IDLE.code
- state_data.time = datetime.utcnow().timestamp() * 1000
- self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
- self.redis_handler.redis.expire(key, self._get_expire_time())
- # self.update_report_state(saas_id, service_id)
- def busy_hash(self, saas_id, agent_id, phone_num, service_id):
- cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
- state_data = cache_agent_map.get(phone_num)
- key = self._key(saas_id, service_id)
- if state_data is None:
- return
- state_data.status = HumanState.BUSY.code
- self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
- self.redis_handler.redis.expire(key, self._get_expire_time())
- # self.update_report_state(saas_id, service_id)
- def get_cache_agent_map(self, saas_id, service_id):
- cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
- # 检查列表是否为空,如果为空返回空字典
- if not cache_agent_list:
- return {}
- # 使用字典推导式将 cache_agent_list 转换为字典
- return {agent.phone_num: agent for agent in cache_agent_list}
- def get_cache_agent_list(self, saas_id, service_id):
- redis_key = self._key(saas_id, service_id)
- map_cache_by_key = self.redis_handler.redis.hgetall(redis_key)
- print("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key), flush=True)
- if not map_cache_by_key: # 检查字典是否为空
- return [] # 返回空列表
- free_agents = []
- for phone_num, json_value in map_cache_by_key.items():
- agent_status_data_dict = json.loads(json_value)
- agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象
- agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码
- free_agents.append(agent_status_data)
- return free_agents
- def get_idle_agents(self,cache_agent_list):
- current_time =int(datetime.utcnow().timestamp() * 1000) # 获取当前时间的毫秒级时间戳
- idle_agents = [
- agent for agent in cache_agent_list
- if agent.status == 1 and (
- agent.assign_time == 0 or
- agent.assign_time + self.assigned_recycle_millisecond < current_time
- )
- ]
- return idle_agents
- def get_agent_service_idle_size(self, saas_id, service_id):
- idle_agents_size = 0
- cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
- if cache_agent_list: # 检查列表是否非空
- idle_agents = self.get_idle_agents(cache_agent_list)
- idle_agents_size = len(idle_agents) # 获取空闲代理的数量
- return idle_agents_size
- def get_agent_service_busy_size(self, saas_id, service_id):
- busy_agents_size = 0
- cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
- if cache_agent_list: # 检查列表是否非空
- idle_agents = self.get_idle_agents(cache_agent_list)
- busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理
- busy_agents_size = len(busy_agents) # 获取忙碌代理的数量
- return busy_agents_size
- def update_report_state(self, saas_id, service_id):
- key = self._key(saas_id, service_id)
- # data_map 这个地方有疑问
- data_map = self.state_service_id_data_map[key]
- idle = HumanState.IDLE
- if idle.value not in data_map:
- data_map[idle.code] = threading.Lock()
- self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
- # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
- busy = HumanState.BUSY
- if busy.value not in data_map:
- data_map[busy.code] = threading.Lock()
- self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
- # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
- def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
- name = "cti_center_real_time_human_service_state"
- tag_list = {
- "vcc_id": saas_id,
- "service_id": service_id,
- "state": human_state.code,
- }
- if human_state == HumanState.IDLE:
- # meter_registry 这块疑问
- self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
- elif human_state == HumanState.BUSY:
- self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
- return 0
- def _check_in_key(self, saas_id):
- return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
- def _key(self, saas_id, service_id):
- return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
- def _get_expire_time(self):
- now = datetime.now()
- end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
- expire_time = (end_of_day - now).total_seconds() * 1000 # Convert to milliseconds
- return int(expire_time)
- def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
- idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)
- return idle_agents[0].phone_num
- def _get_agent(saas_id, agent_id=None, agent_number=None, out_id=None):
- agent = Agent.query.filter(
- Agent.saas_id == saas_id,
- or_(Agent.out_id == agent_id, Agent.out_id == out_id, Agent.agent_num == agent_number)
- ).first()
- return agent
- def _get_newest_agent_number(saas_id):
- agent = Agent.query.filter(Agent.saas_id == saas_id).order_by(Agent.agent_num.desc()).first()
- agentNum = START_AGENT_NUM
- if agent and agent.agent_num:
- agentNum = str(int(agent.agent_num) + 1)
- return agentNum
- def _get_agent_monitor(saas_id, agent_number):
- monitor = AgentMonitor.query.filter(AgentMonitor.saas_id == saas_id,
- AgentMonitor.agent_num == agent_number).first()
- print('_get_agent_monitor', saas_id, agent_number, monitor)
- return monitor
- def _get_phone(saas_id, phone_num):
- phone = Phone.query.filter(Phone.saas_id == saas_id, Phone.phone_num == phone_num).first()
- return phone
- def _get_human_service_service(saas_id, agent_id):
- human_service_map = HumanServiceMap.query.filter(HumanServiceMap.saas_id == saas_id, HumanServiceMap.agent_id ==agent_id).first()
- return human_service_map
|