acd.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import json
  4. import time
  5. from datetime import datetime
  6. from queue import Queue
  7. from typing import Dict, Any, Optional
  8. from src.core.callcenter.cache import Cache
  9. from src.core.callcenter.agent import AgentOperService
  10. from src.core.callcenter.call import CallService
  11. from src.core.callcenter.api import CallInfo, AgentActionRequest, DelayAction
  12. from apscheduler.schedulers.background import BackgroundScheduler
  13. from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
  14. from src.core.callcenter.constant import saasId
  15. from src.core.callcenter.enumeration import AnswerFlag, DelayActionEnum
  16. class AcdService:
  17. def __init__(self, client, app):
  18. self.client = client
  19. self.app = app
  20. self.logger = app.logger
  21. self.cache = Cache(app)
  22. self.call_service = CallService(client, app)
  23. self.agent_service = AgentOperService(app)
  24. # self.agent_state_service = AgentStateService(app)
  25. self.holdsQueue: Dict[str, Queue] = {}
  26. self.pool = ThreadPoolExecutor(max_workers=4)
  27. self.checkIdleScheduler = BackgroundScheduler()
  28. self.checkIdleScheduler.add_job(self.try_transfer_agent, 'interval', seconds=2, max_instances=1)
  29. self.checkIdleScheduler.start()
  30. def transfer_to_agent(self, call_id, device_id, service_id='00000000000000000'):
  31. call_info = self.cache.get_call_info(call_id)
  32. self.logger.info('transfer_to_agent, come in, call_id:%s, call_info:%s'%(call_id, call_info))
  33. if not call_info:
  34. return
  35. call_info.answer_flag = AnswerFlag.TRANSFER_TO_AGENT.code
  36. # 1. hold住并且播放等待音
  37. self.call_service.hold(call_info, device_id)
  38. self.wait_timeout(call_id)
  39. # 获得空闲坐席
  40. agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=service_id))
  41. if not agent_number:
  42. # 如果没有空闲坐席,播放等待音
  43. text = "AcdService transferToAgent agentNumber is empty serviceId:%s,called:%s,callId:%s"%(service_id, call_info.called, call_info.call_id)
  44. # print(text, flush=True)
  45. self.logger.info(text)
  46. self.add_acd_queue(call_info, service_id)
  47. else:
  48. # 有空闲坐席,直接转接
  49. text = "AcdService transferToAgent agentNumber not empty %s, serviceId:%s,called:%s,callId:%s"%(agent_number, service_id, call_info.called, call_info.call_id)
  50. # print(text, flush=True)
  51. self.logger.info(text)
  52. self.call_service.transfer(call_info, agent_number, service_id)
  53. # self.agent_state_service.busy(call_info.saas_id, agent_number, agent_number)
  54. self.cache.add_call_info(call_info)
  55. def try_transfer_agent(self):
  56. # print("AcdService tryTransferAgent start", len(self.holdsQueue), flush=True)
  57. # self.logger.info("AcdService tryTransferAgent start")
  58. all_task = []
  59. for k, v in self.holdsQueue.items():
  60. # print("AcdService tryTransferAgent start, queue.k:%s, queue.v:%s"% (k, v.qsize()), flush=True)
  61. self.logger.info("AcdService tryTransferAgent start, queue.k:%s, queue.v:%s", k, v.qsize())
  62. if v.qsize() <= 0:
  63. self.holdsQueue.pop(k, None)
  64. continue
  65. all_task.append(self.pool.submit(self.holds_one_queue, k, v))
  66. wait(all_task, timeout=6, return_when=ALL_COMPLETED)
  67. def add_acd_queue(self, call_info: CallInfo, service_id):
  68. call_info_queue = self.holdsQueue.get(service_id)
  69. if not call_info_queue:
  70. call_info_queue = Queue(maxsize=10)
  71. self.holdsQueue[service_id] = call_info_queue
  72. call_info_queue.put_nowait(call_info.call_id)
  73. def holds_one_queue(self, task_service_id, call_info_queue):
  74. tmp_arr = []
  75. while not call_info_queue.empty():
  76. call_id = call_info_queue.get_nowait()
  77. call_info = self.cache.get_call_info(call_id)
  78. if not call_info or not call_info.device_list:
  79. # print("AcdService tryTransferAgent callInfoCache is null ", call_id)
  80. self.logger.info("AcdService tryTransferAgent callInfoCache is null %s", call_id)
  81. continue
  82. agent_number = self.agent_service.assign(AgentActionRequest(saas_id=saasId, service_id=task_service_id))
  83. if not agent_number:
  84. text = "AcdService tryTransferAgent agentNumber is Empty %s %s"% (call_id, json.dumps(call_info.device_list))
  85. # print(text, flush=True)
  86. self.logger.info(text)
  87. tmp_arr.append(call_id)
  88. continue
  89. text = "AcdService tryTransferAgent agentNumber not Empty %s, serviceId:%s, called:%s, callId:%s"%(agent_number, task_service_id, call_info.called, call_id)
  90. # print(text, flush=True)
  91. self.logger.info(text)
  92. self.call_service.transfer(call_info, agent_number, task_service_id)
  93. for call_id in tmp_arr:
  94. call_info_queue.put_nowait(call_id)
  95. def wait_timeout(self, call_id, timeouts=30):
  96. delay_action = DelayAction(call_id=call_id)
  97. self.cache.add_delay_message(DelayActionEnum.ACD_TIMEOUT_PLAY.name, delay_action, timeouts)