123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- #!/usr/bin/env python3
- # encoding:utf-8
- import json
- import time
- from datetime import datetime
- from queue import Queue
- from typing import Dict, Any, Optional
- from src.core.callcenter.cache import Cache
- from src.core.callcenter.agent import AgentOperService
- from src.core.callcenter.call import CallService
- from src.core.callcenter.api import CallInfo, AgentActionRequest, DelayAction
- from apscheduler.schedulers.background import BackgroundScheduler
- from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
- from src.core.callcenter.constant import saasId
- from src.core.callcenter.enumeration import AnswerFlag, DelayActionEnum
- class AcdService:
- def __init__(self, client, app):
- self.client = client
- self.app = app
- self.logger = app.logger
- self.cache = Cache(app)
- self.call_service = CallService(client, app)
- self.agent_service = AgentOperService(app)
- # self.agent_state_service = AgentStateService(app)
- self.holdsQueue: Dict[str, Queue] = {}
- self.pool = ThreadPoolExecutor(max_workers=4)
- self.checkIdleScheduler = BackgroundScheduler()
- self.checkIdleScheduler.add_job(self.try_transfer_agent, 'interval', seconds=2, max_instances=1)
- self.checkIdleScheduler.start()
- def transfer_to_agent(self, call_id, device_id, service_id='00000000000000000'):
- call_info = self.cache.get_call_info(call_id)
- self.logger.info('transfer_to_agent, come in, call_id:%s, call_info:%s'%(call_id, call_info))
- if not call_info:
- return
- call_info.answer_flag = AnswerFlag.TRANSFER_TO_AGENT.code
- # 1. hold住并且播放等待音
- self.call_service.hold(call_info, device_id)
- self.wait_timeout(call_id)
- # 获得空闲坐席
- agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=service_id))
- if not agent_number:
- # 如果没有空闲坐席,播放等待音
- text = "AcdService transferToAgent agentNumber is empty serviceId:%s,called:%s,callId:%s"%(service_id, call_info.called, call_info.call_id)
- # print(text, flush=True)
- self.logger.info(text)
- self.add_acd_queue(call_info, service_id)
- else:
- # 有空闲坐席,直接转接
- text = "AcdService transferToAgent agentNumber not empty %s, serviceId:%s,called:%s,callId:%s"%(agent_number, service_id, call_info.called, call_info.call_id)
- # print(text, flush=True)
- self.logger.info(text)
- self.call_service.transfer(call_info, agent_number, service_id)
- # self.agent_state_service.busy(call_info.saas_id, agent_number, agent_number)
- self.cache.add_call_info(call_info)
- def try_transfer_agent(self):
- # print("AcdService tryTransferAgent start", len(self.holdsQueue), flush=True)
- # self.logger.info("AcdService tryTransferAgent start")
- all_task = []
- for k, v in self.holdsQueue.items():
- # print("AcdService tryTransferAgent start, queue.k:%s, queue.v:%s"% (k, v.qsize()), flush=True)
- self.logger.info("AcdService tryTransferAgent start, queue.k:%s, queue.v:%s", k, v.qsize())
- if v.qsize() <= 0:
- self.holdsQueue.pop(k, None)
- continue
- all_task.append(self.pool.submit(self.holds_one_queue, k, v))
- wait(all_task, timeout=6, return_when=ALL_COMPLETED)
- def add_acd_queue(self, call_info: CallInfo, service_id):
- call_info_queue = self.holdsQueue.get(service_id)
- if not call_info_queue:
- call_info_queue = Queue(maxsize=10)
- self.holdsQueue[service_id] = call_info_queue
- call_info_queue.put_nowait(call_info.call_id)
- def holds_one_queue(self, task_service_id, call_info_queue):
- tmp_arr = []
- while not call_info_queue.empty():
- call_id = call_info_queue.get_nowait()
- call_info = self.cache.get_call_info(call_id)
- if not call_info or not call_info.device_list:
- # print("AcdService tryTransferAgent callInfoCache is null ", call_id)
- self.logger.info("AcdService tryTransferAgent callInfoCache is null %s", call_id)
- continue
- agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=task_service_id))
- if not agent_number:
- text = "AcdService tryTransferAgent agentNumber is Empty %s %s"% (call_id, json.dumps(call_info.device_list))
- # print(text, flush=True)
- self.logger.info(text)
- tmp_arr.append(call_id)
- continue
- text = "AcdService tryTransferAgent agentNumber not Empty %s, serviceId:%s, called:%s, callId:%s"%(agent_number, task_service_id, call_info.called, call_id)
- # print(text, flush=True)
- self.logger.info(text)
- self.call_service.transfer(call_info, agent_number, task_service_id)
- for call_id in tmp_arr:
- call_info_queue.put_nowait(call_id)
- def wait_timeout(self, call_id, timeouts=30):
- delay_action = DelayAction(call_id=call_id)
- self.cache.add_delay_message(DelayActionEnum.ACD_TIMEOUT_PLAY.name, delay_action, timeouts)
|