#!/usr/bin/env python3 # encoding:utf-8 import json import time from datetime import datetime from queue import Queue from typing import Dict, Any, Optional from src.core.callcenter.cache import Cache from src.core.callcenter.agent import AgentOperService from src.core.callcenter.call import CallService from src.core.callcenter.api import CallInfo, AgentActionRequest, DelayAction from apscheduler.schedulers.background import BackgroundScheduler from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED from src.core.callcenter.constant import saasId from src.core.callcenter.enumeration import AnswerFlag, DelayActionEnum class AcdService: def __init__(self, client, app): self.client = client self.app = app self.logger = app.logger self.cache = Cache(app) self.call_service = CallService(client, app) self.agent_service = AgentOperService(app) # self.agent_state_service = AgentStateService(app) self.holdsQueue: Dict[str, Queue] = {} self.pool = ThreadPoolExecutor(max_workers=4) self.checkIdleScheduler = BackgroundScheduler() self.checkIdleScheduler.add_job(self.try_transfer_agent, 'interval', seconds=2, max_instances=1) self.checkIdleScheduler.start() def transfer_to_agent(self, call_id, device_id, service_id='00000000000000000'): call_info = self.cache.get_call_info(call_id) self.logger.info('transfer_to_agent, come in, call_id:%s, call_info:%s'%(call_id, call_info)) if not call_info: return call_info.answer_flag = AnswerFlag.TRANSFER_TO_AGENT.code # 1. hold住并且播放等待音 self.call_service.hold(call_info, device_id) self.wait_timeout(call_id) # 获得空闲坐席 agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=service_id)) if not agent_number: # 如果没有空闲坐席,播放等待音 text = "AcdService transferToAgent agentNumber is empty serviceId:%s,called:%s,callId:%s"%(service_id, call_info.called, call_info.call_id) # print(text, flush=True) self.logger.info(text) self.add_acd_queue(call_info, service_id) else: # 有空闲坐席,直接转接 text = "AcdService transferToAgent agentNumber not empty %s, serviceId:%s,called:%s,callId:%s"%(agent_number, service_id, call_info.called, call_info.call_id) # print(text, flush=True) self.logger.info(text) self.call_service.transfer(call_info, agent_number, service_id) # self.agent_state_service.busy(call_info.saas_id, agent_number, agent_number) self.cache.add_call_info(call_info) def try_transfer_agent(self): # print("AcdService tryTransferAgent start", len(self.holdsQueue), flush=True) # self.logger.info("AcdService tryTransferAgent start") all_task = [] for k, v in self.holdsQueue.items(): # print("AcdService tryTransferAgent start, queue.k:%s, queue.v:%s"% (k, v.qsize()), flush=True) self.logger.info("AcdService tryTransferAgent start, queue.k:%s, queue.v:%s", k, v.qsize()) if v.qsize() <= 0: self.holdsQueue.pop(k, None) continue all_task.append(self.pool.submit(self.holds_one_queue, k, v)) wait(all_task, timeout=6, return_when=ALL_COMPLETED) def add_acd_queue(self, call_info: CallInfo, service_id): call_info_queue = self.holdsQueue.get(service_id) if not call_info_queue: call_info_queue = Queue(maxsize=10) self.holdsQueue[service_id] = call_info_queue call_info_queue.put_nowait(call_info.call_id) def holds_one_queue(self, task_service_id, call_info_queue): tmp_arr = [] while not call_info_queue.empty(): call_id = call_info_queue.get_nowait() call_info = self.cache.get_call_info(call_id) if not call_info or not call_info.device_list: # print("AcdService tryTransferAgent callInfoCache is null ", call_id) self.logger.info("AcdService tryTransferAgent callInfoCache is null %s", call_id) continue agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=task_service_id)) if not agent_number: text = "AcdService tryTransferAgent agentNumber is Empty %s %s"% (call_id, json.dumps(call_info.device_list)) # print(text, flush=True) self.logger.info(text) tmp_arr.append(call_id) continue text = "AcdService tryTransferAgent agentNumber not Empty %s, serviceId:%s, called:%s, callId:%s"%(agent_number, task_service_id, call_info.called, call_id) # print(text, flush=True) self.logger.info(text) self.call_service.transfer(call_info, agent_number, task_service_id) for call_id in tmp_arr: call_info_queue.put_nowait(call_id) def wait_timeout(self, call_id, timeouts=30): delay_action = DelayAction(call_id=call_id) self.cache.add_delay_message(DelayActionEnum.ACD_TIMEOUT_PLAY.name, delay_action, timeouts)