#!/usr/bin/env python3 # encoding:utf-8 import time from datetime import datetime from queue import Queue from typing import Dict, Any, Optional import src.core.callcenter.cache as Cache from src.core.callcenter.agent import AgentOperService from src.core.callcenter.call import CallService from src.core.callcenter.api import CallInfo, AgentActionRequest from apscheduler.schedulers.background import BackgroundScheduler from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED from src.core.callcenter.constant import saasId class AcdService: def __init__(self, client, logger): self.client = client self.logger = logger self.call_service = CallService(client, logger) self.agent_service = AgentOperService(client, logger) self.holdsQueue: Dict[str, Queue] = {} self.pool = ThreadPoolExecutor(max_workers=4) # checkIdleScheduler = BackgroundScheduler() # checkIdleScheduler.add_job(self.try_transfer_agent, 'interval', seconds=2, max_instances=1) # checkIdleScheduler.start() def transfer_to_agent(self, call_info: CallInfo, device_id, service_id): print('debugger::transfer_to_agent, come in ', end="", flush=True) # 1. hold住并且播放等待音 self.call_service.hold(call_info, device_id) print('debugger::transfer_to_agent, 1111111 ', end="", flush=True) # 获得空闲坐席 agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=service_id)) print('debugger::transfer_to_agent, 222222 %s'%agent_number, end="", flush=True) if not agent_number: # 如果没有空闲坐席,播放等待音 self.logger.info("AcdService transferToAgent agentNumber is empty serviceId:%s,called:%s,callId:%s", service_id, call_info.called, call_info.call_id) self.add_acd_queue(call_info, service_id) else: # 有空闲坐席,直接转接 self.logger.info("AcdService transferToAgent agentNumber not empty %s, serviceId:%s,called:%s,callId:%s", agent_number, service_id, call_info.called, call_info.call_id) self.call_service.transfer(call_info, agent_number, service_id) def try_transfer_agent(self): self.logger.info("AcdService tryTransferAgent start") all_task = [] for k, v in self.holdsQueue.items(): all_task.append(self.pool.submit(self.holds_one_queue, k, v)) wait(all_task, timeout=6, return_when=ALL_COMPLETED) def add_acd_queue(self, call_info: CallInfo, service_id): call_info_queue = self.holdsQueue.get(service_id) if not call_info_queue: call_info_queue = Queue(maxsize=10) self.holdsQueue[service_id] = call_info_queue call_info_queue.put_nowait(call_info.call_id) def holds_one_queue(self, task_service_id, call_info_queue): tmp_arr = [] while not call_info_queue.empty(): call_id = call_info_queue.get_nowait() call_info = Cache.get_call_info(call_id) if not call_info or not call_info.device_list: continue agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=task_service_id)) if not agent_number: self.logger.info("AcdService tryTransferAgent agentNumber is Empty %s", call_id) tmp_arr.append(call_id) continue self.logger.info( "AcdService tryTransferAgent agentNumber not Empty %s, serviceId:%s, called:%s, callId:%s", agent_number, task_service_id, call_info.called, call_id) self.call_service.transfer(call_info, agent_number, task_service_id) for call_id in tmp_arr: call_info_queue.put_nowait(call_id)