agent.py 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import threading
  4. import time
  5. import traceback
  6. from collections import defaultdict
  7. from concurrent.futures import ThreadPoolExecutor
  8. from typing import List
  9. from apscheduler.schedulers.background import BackgroundScheduler
  10. from sqlalchemy import or_
  11. from src.core.callcenter import registry
  12. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  13. from src.core import with_app_context
  14. from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \
  15. AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData
  16. from src.core.callcenter.cache import Cache
  17. from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID
  18. from src.core.callcenter.dao import *
  19. from src.core.callcenter.data_handler import DataHandleServer
  20. from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
  21. AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect
  22. from src.core.callcenter.esl.constant.event_names import *
  23. from src.core.callcenter.exception import BizException
  24. from src.core.callcenter.push import PushHandler
  25. from src.core.datasource import RedisHandler
  26. class AgentEventService:
  27. def __init__(self, app):
  28. self.app = app
  29. self.logger = app.logger
  30. self.cache = Cache(app)
  31. self.push_handler = PushHandler(app.logger)
  32. self.data_handle_server = DataHandleServer(app)
  33. self.agent_monitor_service = AgentMonitorService(app)
  34. self.agent_state_service = AgentStateService(app)
  35. self.agent_actionlog_service = AgentActionLogService(app)
  36. def delay_state(self, state_data: AgentDelayStateData):
  37. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  38. if not agent:
  39. return
  40. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  41. if not agent_monitor:
  42. return
  43. #TODO 非最新通话的延迟事件,忽略.
  44. agent_scene = AgentScene.get_by_code(state_data.scene)
  45. self.logger.info("agent event delay state %s %s %s %s", state_data.saas_id, state_data.agent_num, state_data.service_state, agent_monitor.service_state)
  46. if AgentServiceState.REPROCESSING.code == state_data.service_state and AgentServiceState.REPROCESSING.code == agent_monitor.service_state:
  47. self.reprocessing_idle(state_data)
  48. if AgentServiceState.DIALING.code == state_data.service_state and AgentServiceState.DIALING.code == agent_monitor.service_state:
  49. if self.cache.get_call_is_answer(state_data.saas_id, state_data.agent_num):
  50. return
  51. self.agent_monitor_service.update_idle(agent_monitor)
  52. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  53. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  54. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  55. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.BIZ_DIALING_IDLE)
  56. if AgentServiceState.HANGING.code == state_data.service_state \
  57. and (AgentServiceState.DIALING.code == agent_monitor.service_state
  58. or AgentServiceState.CALLING.code == agent_monitor.service_state):
  59. self.agent_monitor_service.update_idle(agent_monitor)
  60. if AgentServiceState.DIALING.code == agent_monitor.service_state:
  61. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  62. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  63. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  64. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.MANUAL_HANG_UP)
  65. def agent_event_channel(self, event, call_info: CallInfo, device_info: DeviceInfo):
  66. event_name = EslEventUtil.getEventName(event)
  67. saas_id = call_info.saas_id if call_info else None
  68. flow_id = call_info.cti_flow_id if call_info else None
  69. call_id = call_info.call_id if call_info else None
  70. device_id = device_info.device_id if device_info else None
  71. agent_num = device_info.agent_key if device_info else None
  72. caller = device_info.caller if device_info else None
  73. called = device_info.called if device_info else None
  74. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  75. start_time = time.time()
  76. try:
  77. self.logger.info('agent_event_channel, event_name=%s, agent_num=%s, device_id=%s, is_agent=%s', event_name, agent_num, device_id, is_agent)
  78. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  79. if not agent:
  80. # self.logger.warn("event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  81. return
  82. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  83. if not agent_monitor:
  84. # self.logger.warn("event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  85. return
  86. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  87. if CHANNEL_ORIGINATE == event_name and is_agent:
  88. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  89. # 进度事件,外呼时对方提醒。或者入呼时提醒 && 坐席侧
  90. if CHANNEL_PROGRESS == event_name and is_agent:
  91. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.DIALING)
  92. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING)
  93. # 媒体进度事件,外呼时对方提醒。或者入呼时提醒 && 用户侧
  94. if CHANNEL_PROGRESS_MEDIA == event_name and not is_agent:
  95. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING_RINGING)
  96. #应答
  97. if CHANNEL_ANSWER == event_name:
  98. if call_id:
  99. if self.cache.get_call_is_end(call_id):
  100. # self.logger.warn("event service channel call is end {} {} {} {} {} {}", saas_id, event_name, caller, called, call_id, json.dumps(event.serialize('json')))
  101. return
  102. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  103. self.logger.info('agent接通:%s',agent )
  104. self.data_handle_server.update_record(call_id, status=1, user_id=agent.user_id,user_name=agent.agent_name)
  105. if is_agent:
  106. # 坐席接起
  107. self.cache.set_call_is_answer(saas_id, flow_id)
  108. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.ANSWER_COMPENSATE)
  109. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  110. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON)
  111. else:
  112. # 用户侧接起
  113. self.agent_monitor_service.update_calling(agent_monitor)
  114. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  115. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  116. #挂断
  117. if CHANNEL_HANGUP == event_name:
  118. # 坐席侧挂断
  119. if is_agent:
  120. if call_id:
  121. self.cache.set_call_is_end(call_id)
  122. self.agent_monitor_service.update_processing(agent_monitor)
  123. self.logger.info('挂断更新')
  124. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL))
  125. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, '0')
  126. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_HANG_REPROCESSING)
  127. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.REPROCESSING)
  128. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING, AgentLogState.CHANNEL_HANG_UP)
  129. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  130. # 同步处理后处理置闲
  131. # reprocessingIdle(statusDto);
  132. # agentProducer.pushDelayedStatus(statusDto, reprocessingTimeout);
  133. if (CHANNEL_BRIDGE == event_name or PLAYBACK_START == event_name) and is_agent:
  134. self.push_handler.push_on_ring_start(saas_id, flow_id, agent_num, AgentScene.MANUAL, call_id)
  135. # self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  136. # self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  137. if DETECTED_TONE == event_name and not is_agent:
  138. self.push_handler.push_on_detected_tone(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  139. if (CHANNEL_UNBRIDGE == event_name or PLAYBACK_STOP == event_name) and is_agent:
  140. self.push_handler.push_on_ring_end(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  141. except:
  142. traceback.print_exc()
  143. finally:
  144. latency = (time.time() - start_time)
  145. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  146. def bot_event_channel(self, event, call_info, device_info):
  147. event_name = EslEventUtil.getEventName(event)
  148. saas_id = call_info.saas_id if call_info else None
  149. flow_id = call_info.cti_flow_id if call_info else None
  150. call_id = call_info.call_id if call_info else None
  151. agent_num = device_info.agent_key if device_info else None
  152. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  153. caller = (device_info.called if is_agent else device_info.caller) if device_info else None
  154. called = (device_info.caller if is_agent else device_info.called) if device_info else None
  155. human_service_id = '00000000000000000'
  156. start_time = time.time()
  157. try:
  158. self.logger.info('bot_event_channel, event_name=%s, call_id=%s, is_agent=%s, agent_num=%s', event_name, call_id, is_agent, agent_num)
  159. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  160. if not agent:
  161. # self.logger.warn("bot event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called,
  162. # json.dumps(event.serialize('json')))
  163. return
  164. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  165. if not agent_monitor:
  166. # self.logger.warn("bot event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller,
  167. # called, json.dumps(event.serialize('json')))
  168. return
  169. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  170. if CHANNEL_ORIGINATE == event_name and is_agent:
  171. self.push_handler.push_on_call_ring(saas_id, flow_id, agent_num, AgentScene.ROBOT, call_id, ServiceDirect.ROBOT_CALL.service_direct, called, caller, human_service_id)
  172. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  173. if CHANNEL_ANSWER == event_name:
  174. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  175. self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)
  176. if is_agent:
  177. self.agent_monitor_service.update_calling(agent_monitor)
  178. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! internal")
  179. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.CALLING)
  180. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  181. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON, service_id=human_service_id)
  182. else:
  183. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! external")
  184. if CHANNEL_HANGUP == event_name and is_agent:
  185. self.agent_monitor_service.update_processing(agent_monitor)
  186. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT))
  187. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, "0")
  188. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_HANG_REPROCESSING)
  189. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.REPROCESSING)
  190. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING,
  191. AgentLogState.CHANNEL_HANG_UP, service_id=human_service_id)
  192. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  193. except:
  194. traceback.print_exc()
  195. finally:
  196. latency = (time.time() - start_time)
  197. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  198. def reprocessing_idle(self, state_data: AgentDelayStateData):
  199. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  200. if not agent:
  201. return
  202. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  203. if not agent_monitor:
  204. return
  205. self.logger.info('reprocessing_idle %s %s %s', state_data.saas_id, state_data.agent_num, agent_monitor)
  206. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  207. self.logger.info('reprocessing_idle_end')
  208. self.agent_monitor_service.update_idle(agent_monitor)
  209. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, "", state_data.scene, WorkStatus.AGENT_HANG_IDLE)
  210. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.REPROCESSING_IDLE)
  211. class AgentOperService:
  212. def __init__(self, app):
  213. self.app = app
  214. self.logger = app.logger
  215. self.redis_handler = RedisHandler()
  216. self.push_handler = PushHandler(app.logger)
  217. self.data_handle_server = DataHandleServer(app)
  218. self.agent_monitor_service = AgentMonitorService(app)
  219. self.agent_actionlog_service = AgentActionLogService(app)
  220. self.agent_state_service = AgentStateService(app)
  221. # self.daemon_stopping = False
  222. self.agent_heartbeat_expire = 30
  223. self.agent_heartbeat_job_scheduler = BackgroundScheduler()
  224. self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
  225. self.agent_heartbeat_job_scheduler.start()
  226. # threading.Thread(target=self.agent_heartbeat_daemon).start()
  227. def agent_heartbeat_daemon(self):
  228. def check_out_daemon(_name, key, value):
  229. try:
  230. sec = datetime.now().timestamp() - float(value)
  231. if sec > self.agent_heartbeat_expire:
  232. self.redis_handler.redis.hdel(_name, key)
  233. self.logger.error("agent heartbeat expired, will checkout %s %s", key, value)
  234. self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key))
  235. except:
  236. traceback.print_exc()
  237. # while not self.daemon_stopping:
  238. name = CENTER_AGENT_HEARTBEAT % SAAS_ID
  239. members = self.redis_handler.redis.hgetall(name)
  240. if not members:
  241. return
  242. for k,v in members.items():
  243. check_out_daemon(name, k, v)
  244. # time.sleep(1)
  245. def __del__(self):
  246. self.agent_heartbeat_job_scheduler.shutdown()
  247. # self.daemon_stopping = True
  248. @with_app_context
  249. def enable(self, req: AgentActionRequest):
  250. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  251. if agent.agent_state == AgentState.ENABLE.code:
  252. return
  253. agent.agent_state = AgentState.ENABLE.code
  254. db.session.commit()
  255. @with_app_context
  256. def disable(self, req: AgentActionRequest):
  257. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  258. if agent.agent_state == AgentState.DISABLE.code:
  259. return
  260. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_number)
  261. if agent_monitor.check_state == AgentCheck.IN.code and \
  262. agent_monitor.service_state == AgentServiceState.CALLING.code:
  263. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  264. agent.phone_num = ''
  265. agent.agent_state = AgentState.DISABLE.code
  266. db.session.commit()
  267. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  268. phone.is_delete = 1
  269. db.session.commit()
  270. @with_app_context
  271. def checkin(self, req: AgentActionRequest):
  272. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  273. if not agent or agent.agent_state == AgentState.DISABLE.code:
  274. raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN)
  275. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  276. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  277. # agent_monitor.check_scene = req.scene
  278. self.agent_monitor_service.update_checkin(agent_monitor,req.scene)
  279. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN)
  280. self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num)
  281. print("checkin", agent_monitor,agent_monitor.check_state)
  282. if req.scene == AgentScene.MANUAL.code:
  283. # 如果是手动外呼增加置忙
  284. self._handle_idle(req.scene, agent)
  285. return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene)
  286. @with_app_context
  287. def checkout(self, req: AgentActionRequest):
  288. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  289. if not agent or agent.agent_state == AgentState.DISABLE.code:
  290. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  291. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  292. if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code:
  293. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  294. if agent_monitor.check_state == AgentCheck.OUT.code:
  295. return self._push_event_for_checkout(agent, req.scene)
  296. self.agent_monitor_service.update_checkout(agent_monitor)
  297. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT)
  298. self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num)
  299. return self._push_event_for_checkout(agent, req.scene)
  300. @with_app_context
  301. def busy(self, req: AgentActionRequest):
  302. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  303. if not agent or agent.agent_state == AgentState.DISABLE.code:
  304. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  305. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  306. if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code:
  307. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  308. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  309. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  310. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  311. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  312. self._push_event_for_busy(agent, req.scene)
  313. return
  314. self.agent_monitor_service.update_busy(agent_monitor)
  315. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY)
  316. self._push_event_for_busy(agent, req.scene)
  317. @with_app_context
  318. def idle(self, req: AgentActionRequest):
  319. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  320. if not agent or agent.agent_state == AgentState.DISABLE.code:
  321. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  322. self._handle_idle(req.scene, agent)
  323. @with_app_context
  324. def assign(self, req: AgentActionRequest):
  325. return self.agent_state_service.assign_agent(req.saas_id, req.service_id)
  326. def idle_agent_exist(self, request: AgentActionRequest):
  327. pass
  328. @with_app_context
  329. def agent_state(self,req: AgentActionRequest):
  330. # agent = _get_agent(req.saas_id, req.agent_id)
  331. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, req.agent_id)
  332. return agent_monitor.service_state
  333. @with_app_context
  334. def turn_on(self, req: AgentActionRequest):
  335. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  336. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  337. agent_scene = AgentScene.get_by_code(req.scene)
  338. if not agent_monitor:
  339. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  340. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  341. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  342. return
  343. self.agent_monitor_service.update_calling(agent_monitor)
  344. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  345. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  346. @with_app_context
  347. def _handle_idle(self, scene, agent):
  348. agent_monitor = self.data_handle_server.get_agent_monitor(agent.saas_id, agent.agent_num)
  349. print('agent_monitor:', agent_monitor, agent)
  350. if agent_monitor.check_state == AgentCheck.OUT.code:
  351. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  352. if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code:
  353. raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG)
  354. if scene == AgentScene.ROBOT.code:
  355. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  356. else:
  357. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  358. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  359. self._push_event_for_idle(agent, scene)
  360. return
  361. self.agent_monitor_service.update_idle(agent_monitor)
  362. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE)
  363. self._push_event_for_idle(agent, scene)
  364. def _push_event_for_checkin(self, agent, agent_monitor, phone, scene):
  365. """坐席签入事件推送"""
  366. agent_scene = AgentScene.get_by_code(scene)
  367. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS)
  368. event_data = AgentEventData(agent.saas_id, agent.out_id)
  369. event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code,
  370. 'ext': {'saasId': agent.saas_id,
  371. 'agentId': agent.out_id,
  372. 'phoneNum': phone.phone_num,
  373. 'phonePwd': phone.phone_pwd,
  374. 'sipServer': phone.sip_server
  375. }
  376. }
  377. return event_data.__dict__
  378. def _push_event_for_checkout(self,agent,scene):
  379. """签出事件推送"""
  380. agent_scene = AgentScene.get_by_code(scene)
  381. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功')
  382. event_data = AgentEventData(agent.saas_id, agent.out_id)
  383. event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code,
  384. 'ext': {'saasId': agent.saas_id,
  385. 'agentId': agent.out_id,
  386. }
  387. }
  388. return event_data.__dict__
  389. def _push_event_for_busy(self,agent,scene):
  390. """置忙事件推送"""
  391. agent_scene = AgentScene.get_by_code(scene)
  392. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY)
  393. pass
  394. def _push_event_for_idle(self, agent, scene):
  395. """坐席置闲事件推送"""
  396. agent_scene = AgentScene.get_by_code(scene)
  397. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY)
  398. class AgentService:
  399. def __init__(self, app):
  400. self.app = app
  401. self.logger = app.logger
  402. self.data_handle_server = DataHandleServer(app)
  403. self.agent_monitor_service = AgentMonitorService(app)
  404. @with_app_context
  405. def get_and_check(self, req: AgentActionRequest):
  406. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  407. if not agent:
  408. return {}
  409. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  410. return phone.to_dict()
  411. @with_app_context
  412. def watch_agent_state(self, req: HumanServiceQueryRequest):
  413. pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id,
  414. HumanServiceMap.service_id == req.serviceId).all()
  415. agent_ids = [x.agent_id for x in pos]
  416. monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids)
  417. return monitors
  418. @with_app_context
  419. def add(self, req: AgentRequest):
  420. new_agent_num = self.data_handle_server.get_newest_agent_number(req.saas_id)
  421. agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id,
  422. agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number,
  423. distribute=req.distribute, agent_state=req.agent_state)
  424. db.session.add(agent)
  425. db.session.commit()
  426. agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id)
  427. db.session.add(agent_monitor)
  428. db.session.commit()
  429. return new_agent_num
  430. @with_app_context
  431. def update(self, req: AgentRequest):
  432. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  433. if not agent:
  434. return
  435. phone_num = agent.phone_num
  436. state_change = agent.agent_state != req.agent_state
  437. disable = req.agent_state == AgentState.DISABLE
  438. agent.agent_name = req.agent_name
  439. agent.agent_pwd = req.agent_password
  440. agent.agent_type = req.agent_type
  441. agent.phone_num = req.phone_number if not disable else ''
  442. agent.distribute = req.distribute
  443. agent.agent_state = req.agent_state
  444. db.session.commit()
  445. if state_change and disable:
  446. phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first()
  447. if phone:
  448. db.session.delete(phone)
  449. @with_app_context
  450. def detail(self, saas_id, agent_number):
  451. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  452. return agent
  453. @with_app_context
  454. def count(self, req: AgentQueryRequest):
  455. cnt = Agent.query.filter(Agent.saas_id == req.saas_id,
  456. or_(Agent.agent_num == req.agent_number,
  457. Agent.agent_name.contains(req.agent_name),
  458. Agent.out_id == req.out_id,
  459. Agent.agent_type == req.agent_type
  460. )
  461. ).count()
  462. return cnt
  463. @with_app_context
  464. def query_page(self, req: AgentQueryRequest):
  465. pagination = Agent.query.filter(Agent.saas_id == req.saas_id,
  466. or_(Agent.agent_num == req.agent_number,
  467. Agent.agent_name.contains(req.agent_name),
  468. Agent.out_id == req.out_id,
  469. Agent.agent_type == req.agent_type
  470. )
  471. ).paginate(req.page, req.size)
  472. # data = {
  473. # "page": pagination.page, # 当前页码
  474. # "pages": pagination.pages, # 总页码
  475. # "has_prev": pagination.has_prev, # 是否有上一页
  476. # "prev_num": pagination.prev_num, # 上一页页码
  477. # "has_next": pagination.has_next, # 是否有下一页
  478. # "next_num": pagination.next_num, # 下一页页码
  479. # "items": [{
  480. # "id": item.id,
  481. # "name": item.name,
  482. # "age": item.age,
  483. # "sex": item.sex,
  484. # "money": item.money,
  485. # } for item in pagination.items]
  486. # }
  487. return pagination
  488. @with_app_context
  489. def delete(self, saas_id, agent_number):
  490. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  491. if not agent:
  492. return
  493. agent.is_delete = 1
  494. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number)
  495. agent_monitor.is_delete = 1
  496. db.session.commit()
  497. phone = self.data_handle_server.get_phone(saas_id, agent.phone_num)
  498. phone.is_delete = 1
  499. db.session.commit()
  500. class AgentMonitorService:
  501. def __init__(self, app):
  502. self.app = app
  503. self.logger = app.logger
  504. self.data_handle_server = DataHandleServer(app)
  505. @with_app_context
  506. def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None):
  507. if not out_ids:
  508. return []
  509. agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all()
  510. if not agents:
  511. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  512. agent_num_map = {x.agent_num: x for x in agents}
  513. agent_nums = [x.agent_num for x in agents]
  514. agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all()
  515. res = []
  516. for agent_monitor in agent_monitors:
  517. agent = agent_num_map.get(agent_monitor.agent_num)
  518. data = AgentMonitorData()
  519. data.saas_id = saas_id
  520. data.agent_num = agent_monitor.agent_num
  521. data.agent_name = agent.agent_name
  522. data.out_id = agent_monitor.out_id
  523. data.check_state = agent_monitor.check_state
  524. data.check_scene = agent_monitor.check_scene
  525. data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0
  526. if agent_monitor.check_in_time:
  527. data.check_in_time = agent_monitor.check_in_time.timestamp()
  528. day_start = self._get_day_start()
  529. if agent_monitor.check_in_time.timestamp() > day_start and \
  530. agent_monitor.check_state == AgentCheck.OUT.code:
  531. data.online_state = 2
  532. data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None
  533. data.service_state = agent_monitor.service_state
  534. data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None
  535. data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None
  536. data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None
  537. data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None
  538. data.heart_state = agent_monitor.heart_state
  539. data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None
  540. data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None
  541. res.append(data.__dict__)
  542. return res
  543. @with_app_context
  544. def update_checkin(self, agent_monitor,scene=None):
  545. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  546. agent_monitor.check_state = AgentCheck.IN.code
  547. agent_monitor.check_in_time = datetime.now()
  548. agent_monitor.heart_state = AgentHeartState.NORMAL.code
  549. agent_monitor.heart_time = datetime.now()
  550. agent_monitor.check_scene = scene
  551. self.logger.info("update_checkin %s", agent_monitor.check_state)
  552. db.session.commit()
  553. @with_app_context
  554. def update_checkout(self, agent_monitor):
  555. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  556. agent_monitor.check_state = AgentCheck.OUT.code
  557. agent_monitor.check_out_time = datetime.now()
  558. agent_monitor.service_state = AgentServiceState.LOGOUT.code
  559. agent_monitor.heart_state = AgentHeartState.DEFAULT.code
  560. agent_monitor.heart_time = datetime.now()
  561. self.logger.info("update_checkout %s", agent_monitor.check_out_time)
  562. db.session.commit()
  563. @with_app_context
  564. def update_idle(self, agent_monitor):
  565. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  566. agent_monitor.service_state = AgentServiceState.IDLE.code
  567. agent_monitor.idle_time = datetime.now()
  568. db.session.commit()
  569. @with_app_context
  570. def update_busy(self, agent_monitor):
  571. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  572. agent_monitor.service_state = AgentServiceState.BUSY.code
  573. agent_monitor.busy_time = datetime.now()
  574. db.session.commit()
  575. @with_app_context
  576. def update_dialing(self, agent_monitor):
  577. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  578. agent_monitor.service_state = AgentServiceState.DIALING.code
  579. db.session.commit()
  580. @with_app_context
  581. def update_calling(self, agent_monitor):
  582. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  583. agent_monitor.service_state = AgentServiceState.CALLING.code
  584. agent_monitor.call_time = datetime.now()
  585. db.session.commit()
  586. @with_app_context
  587. def update_processing(self, agent_monitor):
  588. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  589. agent_monitor.service_state = AgentServiceState.REPROCESSING.code
  590. agent_monitor.hang_time = datetime.now()
  591. db.session.commit()
  592. @with_app_context
  593. def update_session_id(self, agent_monitor, session_id):
  594. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  595. agent_monitor.session_id = session_id
  596. db.session.commit()
  597. @with_app_context
  598. def update_heart_error(self, agent_monitor):
  599. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  600. agent_monitor.heart_state = AgentHeartState.ABNORMAL.code
  601. agent_monitor.heart_time = datetime.now()
  602. db.session.commit()
  603. @with_app_context
  604. def _get_day_start(self):
  605. today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  606. return int(today_start.timestamp() * 1000) # Return milliseconds
  607. class AgentActionLogService:
  608. def __init__(self, app):
  609. self.app = app
  610. self.logger = app.logger
  611. self.data_handle_server = DataHandleServer(app)
  612. @with_app_context
  613. def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState):
  614. action_log = AgentActionLog()
  615. action_log.saas_id = agent_monitor.saas_id
  616. action_log.agent_num = agent_monitor.agent_num
  617. action_log.out_id = agent_monitor.out_id
  618. action_log.action_type = 0
  619. action_log.check_state = agent_check_enum.code
  620. action_log.pre_check_state = agent_monitor.check_state
  621. if agent_log_enum:
  622. action_log.event_type = agent_log_enum.code
  623. action_log.event_desc = agent_log_enum.description
  624. now = datetime.now()
  625. pre_date = None
  626. if agent_monitor.check_state == AgentCheck.IN.code:
  627. pre_date = agent_monitor.check_in_time
  628. if agent_monitor.check_state == AgentCheck.OUT.code:
  629. pre_date = agent_monitor.check_out_time
  630. if pre_date is None:
  631. pre_date = agent_monitor.update_time
  632. action_log.check_state_time = now
  633. action_log.pre_check_state_time = pre_date
  634. action_log.check_state_duration = now.timestamp() - pre_date.timestamp()
  635. db.session.add(action_log)
  636. db.session.commit()
  637. @with_app_context
  638. def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState,
  639. task_id=None, service_id=None):
  640. if agent_monitor.service_state == agent_service_state.code:
  641. self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code)
  642. action_log = AgentActionLog()
  643. action_log.saas_id = agent_monitor.saas_id
  644. action_log.agent_num = agent_monitor.agent_num
  645. action_log.out_id = agent_monitor.out_id
  646. action_log.action_type = 1
  647. action_log.service_state = agent_service_state.code
  648. action_log.pre_service_state = agent_monitor.service_state
  649. action_log.task_id = task_id
  650. action_log.service_id = service_id
  651. if agent_log_enum:
  652. action_log.event_type = agent_log_enum.code
  653. action_log.event_desc = agent_log_enum.description
  654. now = datetime.now()
  655. pre_date = None
  656. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  657. pre_date = agent_monitor.idle_time
  658. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  659. pre_date = agent_monitor.busy_time
  660. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  661. pre_date = agent_monitor.call_time
  662. if agent_monitor.service_state == AgentServiceState.LOGOUT.code:
  663. pre_date = agent_monitor.check_out_time
  664. if agent_monitor.service_state == AgentServiceState.REPROCESSING.code:
  665. pre_date = agent_monitor.hang_time
  666. action_log.service_state_time = now
  667. if pre_date is None:
  668. pre_date = agent_monitor.update_time
  669. action_log.pre_service_state_time = pre_date
  670. action_log.service_state_duration = now.timestamp() - pre_date.timestamp()
  671. db.session.add(action_log)
  672. db.session.commit()
  673. class AgentStateService:
  674. def __init__(self, app):
  675. self.app = app
  676. self.logger = app.logger
  677. self.redis_handler = RedisHandler()
  678. self.assigned_recycle_millisecond = 30 * 1000
  679. self.state_service_id_data_map = defaultdict(dict)
  680. self.executor = ThreadPoolExecutor(max_workers=10)
  681. self.data_handle_server = DataHandleServer(app)
  682. self.agent_monitor_service = AgentMonitorService(app)
  683. self.agent_actionlog_service = AgentActionLogService(app)
  684. def idle(self, saas_id, agent_id, phone_num):
  685. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  686. if human_service is None:
  687. self.logger.info(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log
  688. return
  689. self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id)
  690. def busy(self, saas_id, agent_id, phone_num):
  691. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  692. if human_service is None:
  693. self.logger.info(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log
  694. return
  695. self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id)
  696. def idle_by_human(self, saas_id, agent_id, service_id):
  697. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  698. if not agent:
  699. return
  700. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  701. if not agent_monitor:
  702. return
  703. if agent_monitor.check_state == AgentCheck.IN.code:
  704. self.agent_monitor_service.update_idle(agent_monitor)
  705. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE)
  706. self.idle_hash(saas_id, agent_id, agent.phone_num, service_id)
  707. def busy_by_human(self, saas_id, service_id, agent_id=None):
  708. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  709. if not agent:
  710. return
  711. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  712. if not agent_monitor:
  713. return
  714. if agent_monitor.check_state == AgentCheck.IN.code:
  715. self.agent_monitor_service.update_busy(agent_monitor)
  716. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY,
  717. AgentLogState.DEL_HUMAN_SERVICE)
  718. self.busy_hash(saas_id, agent_id, agent.phone_num, service_id)
  719. def checkin(self, saas_id, agent_id, phone_num):
  720. key = self._check_in_key(saas_id)
  721. state_data = AgentStateData()
  722. state_data.status = HumanState.DEFAULT.code
  723. state_data.time = datetime.now().timestamp()
  724. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  725. self.redis_handler.redis.expire(key, self._get_expire_time())
  726. def checkout(self, saas_id, agent_id, phone_num):
  727. key = self._check_in_key(saas_id)
  728. self.redis_handler.redis.hdel(key, phone_num)
  729. self.busy(saas_id, agent_id, phone_num)
  730. def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
  731. choose_phone_num = ''
  732. self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called))
  733. idle_agents = self.idle_agents(saas_id, service_id)
  734. if len(idle_agents) <= 0:
  735. return choose_phone_num
  736. choose_phone_num = self._choose_max_idle_time(idle_agents)
  737. self.handle_assign_time(saas_id, service_id, choose_phone_num)
  738. return choose_phone_num
  739. def handle_assign_time(self, saas_id, service_id, choose_phone_num):
  740. key = self._key(saas_id, service_id)
  741. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  742. if cache_agent_map and choose_phone_num in cache_agent_map:
  743. state_data = cache_agent_map[choose_phone_num]
  744. state_data.assign_time = datetime.now().timestamp() * 1000
  745. self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string())
  746. self.redis_handler.redis.expire(key, self._get_expire_time())
  747. # self.update_report_state(saas_id, service_id)
  748. def idle_agents(self, saas_id, service_id):
  749. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  750. if not cache_agent_list:
  751. return []
  752. agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list])
  753. self.logger.info("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str))
  754. return self.get_idle_agents(cache_agent_list)
  755. def idle_hash(self, saas_id, agent_id, phone_num, service_id):
  756. key = self._key(saas_id, service_id)
  757. state_data = AgentStateData()
  758. # self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s", key, saas_id, phone_num)
  759. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  760. if cache_agent_map and phone_num in cache_agent_map:
  761. state_data = cache_agent_map[phone_num]
  762. state_data.status = HumanState.IDLE.code
  763. state_data.time = datetime.now().timestamp() * 1000
  764. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  765. self.redis_handler.redis.expire(key, self._get_expire_time())
  766. # self.update_report_state(saas_id, service_id)
  767. self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s, state_data=%s"%(key, saas_id, phone_num, state_data))
  768. def busy_hash(self, saas_id, agent_id, phone_num, service_id):
  769. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  770. state_data = cache_agent_map.get(phone_num)
  771. key = self._key(saas_id, service_id)
  772. if state_data is None:
  773. return
  774. state_data.status = HumanState.BUSY.code
  775. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  776. self.redis_handler.redis.expire(key, self._get_expire_time())
  777. # self.update_report_state(saas_id, service_id)
  778. def get_cache_agent_map(self, saas_id, service_id):
  779. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  780. # 检查列表是否为空,如果为空返回空字典
  781. if not cache_agent_list:
  782. return {}
  783. # 使用字典推导式将 cache_agent_list 转换为字典
  784. return {agent.phone_num: agent for agent in cache_agent_list}
  785. def get_cache_agent_list(self, saas_id, service_id):
  786. redis_key = self._key(saas_id, service_id)
  787. map_cache_by_key = self.redis_handler.redis.hgetall(redis_key)
  788. self.logger.info("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key))
  789. if not map_cache_by_key: # 检查字典是否为空
  790. return [] # 返回空列表
  791. free_agents = []
  792. for phone_num, json_value in map_cache_by_key.items():
  793. agent_status_data_dict = json.loads(json_value)
  794. agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象
  795. agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码
  796. free_agents.append(agent_status_data)
  797. return free_agents
  798. def get_idle_agents(self,cache_agent_list):
  799. current_time =int(datetime.now().timestamp() * 1000) # 获取当前时间的毫秒级时间戳
  800. idle_agents = [
  801. agent for agent in cache_agent_list
  802. if agent.status == 1 and (
  803. agent.assign_time == 0 or
  804. agent.assign_time + self.assigned_recycle_millisecond < current_time
  805. )
  806. ]
  807. return idle_agents
  808. def get_agent_service_idle_size(self, saas_id, service_id):
  809. idle_agents_size = 0
  810. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  811. if cache_agent_list: # 检查列表是否非空
  812. idle_agents = self.get_idle_agents(cache_agent_list)
  813. idle_agents_size = len(idle_agents) # 获取空闲代理的数量
  814. return idle_agents_size
  815. def get_agent_service_busy_size(self, saas_id, service_id):
  816. busy_agents_size = 0
  817. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  818. if cache_agent_list: # 检查列表是否非空
  819. idle_agents = self.get_idle_agents(cache_agent_list)
  820. busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理
  821. busy_agents_size = len(busy_agents) # 获取忙碌代理的数量
  822. return busy_agents_size
  823. def update_report_state(self, saas_id, service_id):
  824. key = self._key(saas_id, service_id)
  825. # data_map 这个地方有疑问
  826. data_map = self.state_service_id_data_map[key]
  827. idle = HumanState.IDLE
  828. if idle.value not in data_map:
  829. data_map[idle.code] = threading.Lock()
  830. self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
  831. # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
  832. busy = HumanState.BUSY
  833. if busy.value not in data_map:
  834. data_map[busy.code] = threading.Lock()
  835. self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
  836. # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
  837. def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
  838. name = "cti_center_real_time_human_service_state"
  839. tag_list = {
  840. "vcc_id": saas_id,
  841. "service_id": service_id,
  842. "state": human_state.code,
  843. }
  844. if human_state == HumanState.IDLE:
  845. # meter_registry 这块疑问
  846. self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
  847. elif human_state == HumanState.BUSY:
  848. self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
  849. return 0
  850. def _check_in_key(self, saas_id):
  851. return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
  852. def _key(self, saas_id, service_id):
  853. return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
  854. def _get_expire_time(self):
  855. now = datetime.now()
  856. end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
  857. expire_time = (end_of_day - now).total_seconds() * 1000 # Convert to milliseconds
  858. return int(expire_time)
  859. def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
  860. idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)
  861. return idle_agents[0].phone_num