agent.py 52 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import threading
  4. import time
  5. import traceback
  6. from collections import defaultdict
  7. from concurrent.futures import ThreadPoolExecutor
  8. from typing import List
  9. from apscheduler.schedulers.background import BackgroundScheduler
  10. from sqlalchemy import or_
  11. from src.core.callcenter import registry
  12. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  13. from src.core import with_app_context
  14. from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \
  15. AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData
  16. from src.core.callcenter.cache import Cache
  17. from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID
  18. from src.core.callcenter.dao import *
  19. from src.core.callcenter.data_handler import DataHandleServer
  20. from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
  21. AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect
  22. from src.core.callcenter.esl.constant.event_names import *
  23. from src.core.callcenter.exception import BizException
  24. from src.core.callcenter.push import PushHandler
  25. from src.core.datasource import RedisHandler
  26. class AgentEventService:
  27. def __init__(self, app):
  28. self.app = app
  29. self.logger = app.logger
  30. self.cache = Cache(app)
  31. self.push_handler = PushHandler(app.logger)
  32. self.data_handle_server = DataHandleServer(app)
  33. self.agent_monitor_service = AgentMonitorService(app)
  34. self.agent_state_service = AgentStateService(app)
  35. self.agent_actionlog_service = AgentActionLogService(app)
  36. def delay_state(self, state_data: AgentDelayStateData):
  37. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  38. if not agent:
  39. return
  40. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  41. if not agent_monitor:
  42. return
  43. #TODO 非最新通话的延迟事件,忽略.
  44. agent_scene = AgentScene.get_by_code(state_data.scene)
  45. self.logger.info("agent event delay state %s %s %s %s", state_data.saas_id, state_data.agent_num, state_data.service_state, agent_monitor.service_state)
  46. if AgentServiceState.REPROCESSING.code == state_data.service_state and AgentServiceState.REPROCESSING.code == agent_monitor.service_state:
  47. self.reprocessing_idle(state_data)
  48. if AgentServiceState.DIALING.code == state_data.service_state and AgentServiceState.DIALING.code == agent_monitor.service_state:
  49. if self.cache.get_call_is_answer(state_data.saas_id, state_data.agent_num):
  50. return
  51. self.agent_monitor_service.update_idle(agent_monitor)
  52. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  53. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  54. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  55. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.BIZ_DIALING_IDLE)
  56. if AgentServiceState.HANGING.code == state_data.service_state \
  57. and (AgentServiceState.DIALING.code == agent_monitor.service_state
  58. or AgentServiceState.CALLING.code == agent_monitor.service_state):
  59. self.agent_monitor_service.update_idle(agent_monitor)
  60. if AgentServiceState.DIALING.code == agent_monitor.service_state:
  61. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  62. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  63. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  64. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.MANUAL_HANG_UP)
  65. def agent_event_channel(self, event, call_info: CallInfo, device_info: DeviceInfo):
  66. event_name = EslEventUtil.getEventName(event)
  67. saas_id = call_info.saas_id if call_info else None
  68. flow_id = call_info.cti_flow_id if call_info else None
  69. call_id = call_info.call_id if call_info else None
  70. device_id = device_info.device_id if device_info else None
  71. agent_num = device_info.agent_key if device_info else None
  72. caller = device_info.caller if device_info else None
  73. called = device_info.called if device_info else None
  74. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  75. start_time = time.time()
  76. try:
  77. self.logger.info('agent_event_channel, event_name=%s, agent_num=%s, device_id=%s, is_agent=%s', event_name, agent_num, device_id, is_agent)
  78. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  79. if not agent:
  80. # self.logger.warn("event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  81. return
  82. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  83. if not agent_monitor:
  84. # self.logger.warn("event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  85. return
  86. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  87. if CHANNEL_ORIGINATE == event_name and is_agent:
  88. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  89. self.logger.info('信道发起事件:%s', agent)
  90. self.data_handle_server.update_record(call_id, user_id=agent.user_id, user_name=agent.agent_name)
  91. # 进度事件,外呼时对方提醒。或者入呼时提醒 && 坐席侧
  92. if CHANNEL_PROGRESS == event_name and is_agent:
  93. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.DIALING)
  94. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING)
  95. # 媒体进度事件,外呼时对方提醒。或者入呼时提醒 && 用户侧
  96. if CHANNEL_PROGRESS_MEDIA == event_name and not is_agent:
  97. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING_RINGING)
  98. #应答
  99. if CHANNEL_ANSWER == event_name:
  100. if call_id:
  101. if self.cache.get_call_is_end(call_id):
  102. # self.logger.warn("event service channel call is end {} {} {} {} {} {}", saas_id, event_name, caller, called, call_id, json.dumps(event.serialize('json')))
  103. return
  104. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  105. self.logger.info('agent接通:%s',agent )
  106. self.data_handle_server.update_record(call_id, status=1, user_id=agent.user_id,user_name=agent.agent_name)
  107. if is_agent:
  108. # 坐席接起
  109. self.cache.set_call_is_answer(saas_id, flow_id)
  110. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.ANSWER_COMPENSATE)
  111. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  112. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON)
  113. else:
  114. # 用户侧接起
  115. self.agent_monitor_service.update_calling(agent_monitor)
  116. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  117. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  118. #挂断
  119. if CHANNEL_HANGUP == event_name:
  120. # 坐席侧挂断
  121. if is_agent:
  122. if call_id:
  123. self.cache.set_call_is_end(call_id)
  124. self.agent_monitor_service.update_processing(agent_monitor)
  125. self.logger.info('挂断更新:%s', agent)
  126. # self.data_handle_server.update_record(call_id, user_id=agent.user_id, user_name=agent.agent_name)
  127. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL))
  128. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, '0')
  129. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_HANG_REPROCESSING)
  130. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.REPROCESSING)
  131. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING, AgentLogState.CHANNEL_HANG_UP)
  132. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  133. # 同步处理后处理置闲
  134. # reprocessingIdle(statusDto);
  135. # agentProducer.pushDelayedStatus(statusDto, reprocessingTimeout);
  136. if (CHANNEL_BRIDGE == event_name or PLAYBACK_START == event_name) and is_agent:
  137. self.push_handler.push_on_ring_start(saas_id, flow_id, agent_num, AgentScene.MANUAL, call_id)
  138. # self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  139. # self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  140. if DETECTED_TONE == event_name and not is_agent:
  141. self.push_handler.push_on_detected_tone(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  142. if (CHANNEL_UNBRIDGE == event_name or PLAYBACK_STOP == event_name) and is_agent:
  143. self.push_handler.push_on_ring_end(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  144. except:
  145. traceback.print_exc()
  146. finally:
  147. latency = (time.time() - start_time)
  148. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  149. def bot_event_channel(self, event, call_info, device_info):
  150. event_name = EslEventUtil.getEventName(event)
  151. saas_id = call_info.saas_id if call_info else None
  152. flow_id = call_info.cti_flow_id if call_info else None
  153. call_id = call_info.call_id if call_info else None
  154. agent_num = device_info.agent_key if device_info else None
  155. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  156. caller = (device_info.called if is_agent else device_info.caller) if device_info else None
  157. called = (device_info.caller if is_agent else device_info.called) if device_info else None
  158. human_service_id = '00000000000000000'
  159. start_time = time.time()
  160. try:
  161. self.logger.info('bot_event_channel, event_name=%s, call_id=%s, is_agent=%s, agent_num=%s', event_name, call_id, is_agent, agent_num)
  162. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  163. if not agent:
  164. # self.logger.warn("bot event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called,
  165. # json.dumps(event.serialize('json')))
  166. return
  167. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  168. if not agent_monitor:
  169. # self.logger.warn("bot event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller,
  170. # called, json.dumps(event.serialize('json')))
  171. return
  172. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  173. if CHANNEL_ORIGINATE == event_name and is_agent:
  174. self.push_handler.push_on_call_ring(saas_id, flow_id, agent_num, AgentScene.ROBOT, call_id, ServiceDirect.ROBOT_CALL.service_direct, called, caller, human_service_id)
  175. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  176. self.data_handle_server.update_record(call_id, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name) #转接给客服以后更新转接人
  177. if CHANNEL_ANSWER == event_name:
  178. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  179. # self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)
  180. if is_agent:
  181. self.agent_monitor_service.update_calling(agent_monitor)
  182. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! internal")
  183. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.CALLING)
  184. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  185. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON, service_id=human_service_id)
  186. else:
  187. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! external")
  188. if CHANNEL_HANGUP == event_name and is_agent:
  189. self.agent_monitor_service.update_processing(agent_monitor)
  190. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT))
  191. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, "0")
  192. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_HANG_REPROCESSING)
  193. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.REPROCESSING)
  194. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING,
  195. AgentLogState.CHANNEL_HANG_UP, service_id=human_service_id)
  196. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  197. except:
  198. traceback.print_exc()
  199. finally:
  200. latency = (time.time() - start_time)
  201. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  202. def reprocessing_idle(self, state_data: AgentDelayStateData):
  203. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  204. if not agent:
  205. return
  206. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  207. if not agent_monitor:
  208. return
  209. self.logger.info('reprocessing_idle %s %s %s', state_data.saas_id, state_data.agent_num, agent_monitor)
  210. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  211. self.logger.info('reprocessing_idle_end')
  212. self.agent_monitor_service.update_idle(agent_monitor)
  213. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, "", state_data.scene, WorkStatus.AGENT_HANG_IDLE)
  214. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.REPROCESSING_IDLE)
  215. class AgentOperService:
  216. def __init__(self, app):
  217. self.app = app
  218. self.logger = app.logger
  219. self.redis_handler = RedisHandler()
  220. self.push_handler = PushHandler(app.logger)
  221. self.data_handle_server = DataHandleServer(app)
  222. self.agent_monitor_service = AgentMonitorService(app)
  223. self.agent_actionlog_service = AgentActionLogService(app)
  224. self.agent_state_service = AgentStateService(app)
  225. # self.daemon_stopping = False
  226. self.agent_heartbeat_expire = 30
  227. self.agent_heartbeat_job_scheduler = BackgroundScheduler()
  228. self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
  229. self.agent_heartbeat_job_scheduler.start()
  230. # threading.Thread(target=self.agent_heartbeat_daemon).start()
  231. def agent_heartbeat_daemon(self):
  232. def check_out_daemon(_name, key, value):
  233. try:
  234. sec = datetime.now().timestamp() - float(value)
  235. if sec > self.agent_heartbeat_expire:
  236. self.redis_handler.redis.hdel(_name, key)
  237. self.logger.error("agent heartbeat expired, will checkout %s %s", key, value)
  238. # self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key))
  239. except:
  240. traceback.print_exc()
  241. # while not self.daemon_stopping:
  242. name = CENTER_AGENT_HEARTBEAT % SAAS_ID
  243. members = self.redis_handler.redis.hgetall(name)
  244. if not members:
  245. return
  246. registry.MANUAL_AGENT_LIVES.set(len(members))
  247. for k,v in members.items():
  248. check_out_daemon(name, k, v)
  249. # time.sleep(1)
  250. def __del__(self):
  251. self.agent_heartbeat_job_scheduler.shutdown()
  252. # self.daemon_stopping = True
  253. @with_app_context
  254. def enable(self, req: AgentActionRequest):
  255. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  256. if agent.agent_state == AgentState.ENABLE.code:
  257. return
  258. agent.agent_state = AgentState.ENABLE.code
  259. db.session.commit()
  260. @with_app_context
  261. def disable(self, req: AgentActionRequest):
  262. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  263. if agent.agent_state == AgentState.DISABLE.code:
  264. return
  265. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_number)
  266. if agent_monitor.check_state == AgentCheck.IN.code and \
  267. agent_monitor.service_state == AgentServiceState.CALLING.code:
  268. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  269. agent.phone_num = ''
  270. agent.agent_state = AgentState.DISABLE.code
  271. db.session.commit()
  272. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  273. phone.is_delete = 1
  274. db.session.commit()
  275. @with_app_context
  276. def checkin(self, req: AgentActionRequest):
  277. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  278. if not agent or agent.agent_state == AgentState.DISABLE.code:
  279. raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN)
  280. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  281. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  282. # agent_monitor.check_scene = req.scene
  283. self.agent_monitor_service.update_checkin(agent_monitor,req.scene)
  284. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN)
  285. self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num)
  286. print("checkin", agent_monitor,agent_monitor.check_state)
  287. if req.scene == AgentScene.MANUAL.code:
  288. # 如果是手动外呼增加置忙
  289. self._handle_idle(req.scene, agent)
  290. return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene)
  291. @with_app_context
  292. def checkout(self, req: AgentActionRequest):
  293. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  294. if not agent or agent.agent_state == AgentState.DISABLE.code:
  295. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  296. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  297. if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code:
  298. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  299. if agent_monitor.check_state == AgentCheck.OUT.code:
  300. return self._push_event_for_checkout(agent, req.scene)
  301. self.agent_monitor_service.update_checkout(agent_monitor)
  302. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT)
  303. self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num)
  304. return self._push_event_for_checkout(agent, req.scene)
  305. @with_app_context
  306. def busy(self, req: AgentActionRequest):
  307. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  308. if not agent or agent.agent_state == AgentState.DISABLE.code:
  309. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  310. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  311. if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code:
  312. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  313. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  314. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  315. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  316. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  317. self._push_event_for_busy(agent, req.scene)
  318. return
  319. self.agent_monitor_service.update_busy(agent_monitor)
  320. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY)
  321. self._push_event_for_busy(agent, req.scene)
  322. @with_app_context
  323. def idle(self, req: AgentActionRequest):
  324. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  325. if not agent or agent.agent_state == AgentState.DISABLE.code:
  326. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  327. self._handle_idle(req.scene, agent)
  328. @with_app_context
  329. def assign(self, req: AgentActionRequest):
  330. return self.agent_state_service.assign_agent(req.saas_id, req.service_id)
  331. def idle_agent_exist(self, request: AgentActionRequest):
  332. pass
  333. @with_app_context
  334. def agent_state(self,req: AgentActionRequest):
  335. # agent = _get_agent(req.saas_id, req.agent_id)
  336. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, req.agent_id)
  337. return agent_monitor.service_state
  338. @with_app_context
  339. def turn_on(self, req: AgentActionRequest):
  340. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  341. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  342. agent_scene = AgentScene.get_by_code(req.scene)
  343. if not agent_monitor:
  344. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  345. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  346. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  347. return
  348. self.agent_monitor_service.update_calling(agent_monitor)
  349. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  350. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  351. @with_app_context
  352. def _handle_idle(self, scene, agent):
  353. agent_monitor = self.data_handle_server.get_agent_monitor(agent.saas_id, agent.agent_num)
  354. print('agent_monitor:', agent_monitor, agent)
  355. if agent_monitor.check_state == AgentCheck.OUT.code:
  356. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  357. if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code:
  358. raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG)
  359. if scene == AgentScene.ROBOT.code:
  360. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  361. else:
  362. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  363. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  364. self._push_event_for_idle(agent, scene)
  365. return
  366. self.agent_monitor_service.update_idle(agent_monitor)
  367. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE)
  368. self._push_event_for_idle(agent, scene)
  369. def _push_event_for_checkin(self, agent, agent_monitor, phone, scene):
  370. """坐席签入事件推送"""
  371. agent_scene = AgentScene.get_by_code(scene)
  372. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS)
  373. event_data = AgentEventData(agent.saas_id, agent.out_id)
  374. event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code,
  375. 'ext': {'saasId': agent.saas_id,
  376. 'agentId': agent.out_id,
  377. 'phoneNum': phone.phone_num,
  378. 'phonePwd': phone.phone_pwd,
  379. 'sipServer': phone.sip_server
  380. }
  381. }
  382. return event_data.__dict__
  383. def _push_event_for_checkout(self,agent,scene):
  384. """签出事件推送"""
  385. agent_scene = AgentScene.get_by_code(scene)
  386. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功')
  387. event_data = AgentEventData(agent.saas_id, agent.out_id)
  388. event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code,
  389. 'ext': {'saasId': agent.saas_id,
  390. 'agentId': agent.out_id,
  391. }
  392. }
  393. return event_data.__dict__
  394. def _push_event_for_busy(self,agent,scene):
  395. """置忙事件推送"""
  396. agent_scene = AgentScene.get_by_code(scene)
  397. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY)
  398. pass
  399. def _push_event_for_idle(self, agent, scene):
  400. """坐席置闲事件推送"""
  401. agent_scene = AgentScene.get_by_code(scene)
  402. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY)
  403. class AgentService:
  404. def __init__(self, app):
  405. self.app = app
  406. self.logger = app.logger
  407. self.data_handle_server = DataHandleServer(app)
  408. self.agent_monitor_service = AgentMonitorService(app)
  409. @with_app_context
  410. def get_and_check(self, req: AgentActionRequest):
  411. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  412. if not agent:
  413. return {}
  414. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  415. return phone.to_dict()
  416. @with_app_context
  417. def watch_agent_state(self, req: HumanServiceQueryRequest):
  418. pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id,
  419. HumanServiceMap.service_id == req.serviceId).all()
  420. agent_ids = [x.agent_id for x in pos]
  421. monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids)
  422. return monitors
  423. @with_app_context
  424. def add(self, req: AgentRequest):
  425. new_agent_num = self.data_handle_server.get_newest_agent_number(req.saas_id)
  426. agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id,
  427. agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number,
  428. distribute=req.distribute, agent_state=req.agent_state)
  429. db.session.add(agent)
  430. db.session.commit()
  431. agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id)
  432. db.session.add(agent_monitor)
  433. db.session.commit()
  434. return new_agent_num
  435. @with_app_context
  436. def update(self, req: AgentRequest):
  437. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  438. if not agent:
  439. return
  440. phone_num = agent.phone_num
  441. state_change = agent.agent_state != req.agent_state
  442. disable = req.agent_state == AgentState.DISABLE
  443. agent.agent_name = req.agent_name
  444. agent.agent_pwd = req.agent_password
  445. agent.agent_type = req.agent_type
  446. agent.phone_num = req.phone_number if not disable else ''
  447. agent.distribute = req.distribute
  448. agent.agent_state = req.agent_state
  449. db.session.commit()
  450. if state_change and disable:
  451. phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first()
  452. if phone:
  453. db.session.delete(phone)
  454. @with_app_context
  455. def detail(self, saas_id, agent_number):
  456. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  457. return agent
  458. @with_app_context
  459. def count(self, req: AgentQueryRequest):
  460. cnt = Agent.query.filter(Agent.saas_id == req.saas_id,
  461. or_(Agent.agent_num == req.agent_number,
  462. Agent.agent_name.contains(req.agent_name),
  463. Agent.out_id == req.out_id,
  464. Agent.agent_type == req.agent_type
  465. )
  466. ).count()
  467. return cnt
  468. @with_app_context
  469. def query_page(self, req: AgentQueryRequest):
  470. pagination = Agent.query.filter(Agent.saas_id == req.saas_id,
  471. or_(Agent.agent_num == req.agent_number,
  472. Agent.agent_name.contains(req.agent_name),
  473. Agent.out_id == req.out_id,
  474. Agent.agent_type == req.agent_type
  475. )
  476. ).paginate(req.page, req.size)
  477. # data = {
  478. # "page": pagination.page, # 当前页码
  479. # "pages": pagination.pages, # 总页码
  480. # "has_prev": pagination.has_prev, # 是否有上一页
  481. # "prev_num": pagination.prev_num, # 上一页页码
  482. # "has_next": pagination.has_next, # 是否有下一页
  483. # "next_num": pagination.next_num, # 下一页页码
  484. # "items": [{
  485. # "id": item.id,
  486. # "name": item.name,
  487. # "age": item.age,
  488. # "sex": item.sex,
  489. # "money": item.money,
  490. # } for item in pagination.items]
  491. # }
  492. return pagination
  493. @with_app_context
  494. def delete(self, saas_id, agent_number):
  495. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  496. if not agent:
  497. return
  498. agent.is_delete = 1
  499. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number)
  500. agent_monitor.is_delete = 1
  501. db.session.commit()
  502. phone = self.data_handle_server.get_phone(saas_id, agent.phone_num)
  503. phone.is_delete = 1
  504. db.session.commit()
  505. class AgentMonitorService:
  506. def __init__(self, app):
  507. self.app = app
  508. self.logger = app.logger
  509. self.data_handle_server = DataHandleServer(app)
  510. @with_app_context
  511. def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None):
  512. if not out_ids:
  513. return []
  514. agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all()
  515. if not agents:
  516. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  517. agent_num_map = {x.agent_num: x for x in agents}
  518. agent_nums = [x.agent_num for x in agents]
  519. agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all()
  520. res = []
  521. for agent_monitor in agent_monitors:
  522. agent = agent_num_map.get(agent_monitor.agent_num)
  523. data = AgentMonitorData()
  524. data.saas_id = saas_id
  525. data.agent_num = agent_monitor.agent_num
  526. data.agent_name = agent.agent_name
  527. data.out_id = agent_monitor.out_id
  528. data.check_state = agent_monitor.check_state
  529. data.check_scene = agent_monitor.check_scene
  530. data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0
  531. if agent_monitor.check_in_time:
  532. data.check_in_time = agent_monitor.check_in_time.timestamp()
  533. day_start = self._get_day_start()
  534. if agent_monitor.check_in_time.timestamp() > day_start and \
  535. agent_monitor.check_state == AgentCheck.OUT.code:
  536. data.online_state = 2
  537. data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None
  538. data.service_state = agent_monitor.service_state
  539. data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None
  540. data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None
  541. data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None
  542. data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None
  543. data.heart_state = agent_monitor.heart_state
  544. data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None
  545. data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None
  546. res.append(data.__dict__)
  547. return res
  548. @with_app_context
  549. def update_checkin(self, agent_monitor,scene=None):
  550. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  551. agent_monitor.check_state = AgentCheck.IN.code
  552. agent_monitor.check_in_time = datetime.now()
  553. agent_monitor.heart_state = AgentHeartState.NORMAL.code
  554. agent_monitor.heart_time = datetime.now()
  555. agent_monitor.check_scene = scene
  556. self.logger.info("update_checkin %s", agent_monitor.check_state)
  557. db.session.commit()
  558. @with_app_context
  559. def update_checkout(self, agent_monitor):
  560. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  561. agent_monitor.check_state = AgentCheck.OUT.code
  562. agent_monitor.check_out_time = datetime.now()
  563. agent_monitor.service_state = AgentServiceState.LOGOUT.code
  564. agent_monitor.heart_state = AgentHeartState.DEFAULT.code
  565. agent_monitor.heart_time = datetime.now()
  566. self.logger.info("update_checkout %s", agent_monitor.check_out_time)
  567. db.session.commit()
  568. @with_app_context
  569. def update_idle(self, agent_monitor):
  570. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  571. agent_monitor.service_state = AgentServiceState.IDLE.code
  572. agent_monitor.idle_time = datetime.now()
  573. db.session.commit()
  574. @with_app_context
  575. def update_busy(self, agent_monitor):
  576. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  577. agent_monitor.service_state = AgentServiceState.BUSY.code
  578. agent_monitor.busy_time = datetime.now()
  579. db.session.commit()
  580. @with_app_context
  581. def update_dialing(self, agent_monitor):
  582. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  583. agent_monitor.service_state = AgentServiceState.DIALING.code
  584. db.session.commit()
  585. @with_app_context
  586. def update_calling(self, agent_monitor):
  587. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  588. agent_monitor.service_state = AgentServiceState.CALLING.code
  589. agent_monitor.call_time = datetime.now()
  590. db.session.commit()
  591. @with_app_context
  592. def update_processing(self, agent_monitor):
  593. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  594. agent_monitor.service_state = AgentServiceState.REPROCESSING.code
  595. agent_monitor.hang_time = datetime.now()
  596. db.session.commit()
  597. @with_app_context
  598. def update_session_id(self, agent_monitor, session_id):
  599. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  600. agent_monitor.session_id = session_id
  601. db.session.commit()
  602. @with_app_context
  603. def update_heart_error(self, agent_monitor):
  604. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  605. agent_monitor.heart_state = AgentHeartState.ABNORMAL.code
  606. agent_monitor.heart_time = datetime.now()
  607. db.session.commit()
  608. @with_app_context
  609. def _get_day_start(self):
  610. today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  611. return int(today_start.timestamp() * 1000) # Return milliseconds
  612. class AgentActionLogService:
  613. def __init__(self, app):
  614. self.app = app
  615. self.logger = app.logger
  616. self.data_handle_server = DataHandleServer(app)
  617. @with_app_context
  618. def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState):
  619. action_log = AgentActionLog()
  620. action_log.saas_id = agent_monitor.saas_id
  621. action_log.agent_num = agent_monitor.agent_num
  622. action_log.out_id = agent_monitor.out_id
  623. action_log.action_type = 0
  624. action_log.check_state = agent_check_enum.code
  625. action_log.pre_check_state = agent_monitor.check_state
  626. if agent_log_enum:
  627. action_log.event_type = agent_log_enum.code
  628. action_log.event_desc = agent_log_enum.description
  629. now = datetime.now()
  630. pre_date = None
  631. if agent_monitor.check_state == AgentCheck.IN.code:
  632. pre_date = agent_monitor.check_in_time
  633. if agent_monitor.check_state == AgentCheck.OUT.code:
  634. pre_date = agent_monitor.check_out_time
  635. if pre_date is None:
  636. pre_date = agent_monitor.update_time
  637. action_log.check_state_time = now
  638. action_log.pre_check_state_time = pre_date
  639. action_log.check_state_duration = now.timestamp() - pre_date.timestamp()
  640. db.session.add(action_log)
  641. db.session.commit()
  642. @with_app_context
  643. def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState,
  644. task_id=None, service_id=None):
  645. if agent_monitor.service_state == agent_service_state.code:
  646. self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code)
  647. action_log = AgentActionLog()
  648. action_log.saas_id = agent_monitor.saas_id
  649. action_log.agent_num = agent_monitor.agent_num
  650. action_log.out_id = agent_monitor.out_id
  651. action_log.action_type = 1
  652. action_log.service_state = agent_service_state.code
  653. action_log.pre_service_state = agent_monitor.service_state
  654. action_log.task_id = task_id
  655. action_log.service_id = service_id
  656. if agent_log_enum:
  657. action_log.event_type = agent_log_enum.code
  658. action_log.event_desc = agent_log_enum.description
  659. now = datetime.now()
  660. pre_date = None
  661. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  662. pre_date = agent_monitor.idle_time
  663. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  664. pre_date = agent_monitor.busy_time
  665. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  666. pre_date = agent_monitor.call_time
  667. if agent_monitor.service_state == AgentServiceState.LOGOUT.code:
  668. pre_date = agent_monitor.check_out_time
  669. if agent_monitor.service_state == AgentServiceState.REPROCESSING.code:
  670. pre_date = agent_monitor.hang_time
  671. action_log.service_state_time = now
  672. if pre_date is None:
  673. pre_date = agent_monitor.update_time
  674. action_log.pre_service_state_time = pre_date
  675. action_log.service_state_duration = now.timestamp() - pre_date.timestamp()
  676. db.session.add(action_log)
  677. db.session.commit()
  678. class AgentStateService:
  679. def __init__(self, app):
  680. self.app = app
  681. self.logger = app.logger
  682. self.redis_handler = RedisHandler()
  683. self.assigned_recycle_millisecond = 30 * 1000
  684. self.state_service_id_data_map = defaultdict(dict)
  685. self.executor = ThreadPoolExecutor(max_workers=10)
  686. self.data_handle_server = DataHandleServer(app)
  687. self.agent_monitor_service = AgentMonitorService(app)
  688. self.agent_actionlog_service = AgentActionLogService(app)
  689. def idle(self, saas_id, agent_id, phone_num):
  690. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  691. if human_service is None:
  692. self.logger.info(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log
  693. return
  694. self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id)
  695. def busy(self, saas_id, agent_id, phone_num):
  696. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  697. if human_service is None:
  698. self.logger.info(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log
  699. return
  700. self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id)
  701. def idle_by_human(self, saas_id, agent_id, service_id):
  702. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  703. if not agent:
  704. return
  705. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  706. if not agent_monitor:
  707. return
  708. if agent_monitor.check_state == AgentCheck.IN.code:
  709. self.agent_monitor_service.update_idle(agent_monitor)
  710. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE)
  711. self.idle_hash(saas_id, agent_id, agent.phone_num, service_id)
  712. def busy_by_human(self, saas_id, service_id, agent_id=None):
  713. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  714. if not agent:
  715. return
  716. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  717. if not agent_monitor:
  718. return
  719. if agent_monitor.check_state == AgentCheck.IN.code:
  720. self.agent_monitor_service.update_busy(agent_monitor)
  721. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY,
  722. AgentLogState.DEL_HUMAN_SERVICE)
  723. self.busy_hash(saas_id, agent_id, agent.phone_num, service_id)
  724. def checkin(self, saas_id, agent_id, phone_num):
  725. key = self._check_in_key(saas_id)
  726. state_data = AgentStateData()
  727. state_data.status = HumanState.DEFAULT.code
  728. state_data.time = datetime.now().timestamp()
  729. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  730. self.redis_handler.redis.expire(key, self._get_expire_time())
  731. def checkout(self, saas_id, agent_id, phone_num):
  732. key = self._check_in_key(saas_id)
  733. self.redis_handler.redis.hdel(key, phone_num)
  734. self.busy(saas_id, agent_id, phone_num)
  735. def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
  736. choose_phone_num = ''
  737. self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called))
  738. idle_agents = self.idle_agents(saas_id, service_id)
  739. if len(idle_agents) <= 0:
  740. return choose_phone_num
  741. choose_phone_num = self._choose_max_idle_time(idle_agents)
  742. self.handle_assign_time(saas_id, service_id, choose_phone_num)
  743. return choose_phone_num
  744. def handle_assign_time(self, saas_id, service_id, choose_phone_num):
  745. key = self._key(saas_id, service_id)
  746. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  747. if cache_agent_map and choose_phone_num in cache_agent_map:
  748. state_data = cache_agent_map[choose_phone_num]
  749. state_data.assign_time = datetime.now().timestamp() * 1000
  750. self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string())
  751. self.redis_handler.redis.expire(key, self._get_expire_time())
  752. # self.update_report_state(saas_id, service_id)
  753. def idle_agents(self, saas_id, service_id):
  754. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  755. if not cache_agent_list:
  756. return []
  757. agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list])
  758. self.logger.info("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str))
  759. return self.get_idle_agents(cache_agent_list)
  760. def idle_hash(self, saas_id, agent_id, phone_num, service_id):
  761. key = self._key(saas_id, service_id)
  762. state_data = AgentStateData()
  763. # self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s", key, saas_id, phone_num)
  764. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  765. if cache_agent_map and phone_num in cache_agent_map:
  766. state_data = cache_agent_map[phone_num]
  767. state_data.status = HumanState.IDLE.code
  768. state_data.time = datetime.now().timestamp() * 1000
  769. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  770. self.redis_handler.redis.expire(key, self._get_expire_time())
  771. # self.update_report_state(saas_id, service_id)
  772. self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s, state_data=%s"%(key, saas_id, phone_num, state_data))
  773. def busy_hash(self, saas_id, agent_id, phone_num, service_id):
  774. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  775. state_data = cache_agent_map.get(phone_num)
  776. key = self._key(saas_id, service_id)
  777. if state_data is None:
  778. return
  779. state_data.status = HumanState.BUSY.code
  780. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  781. self.redis_handler.redis.expire(key, self._get_expire_time())
  782. # self.update_report_state(saas_id, service_id)
  783. def get_cache_agent_map(self, saas_id, service_id):
  784. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  785. # 检查列表是否为空,如果为空返回空字典
  786. if not cache_agent_list:
  787. return {}
  788. # 使用字典推导式将 cache_agent_list 转换为字典
  789. return {agent.phone_num: agent for agent in cache_agent_list}
  790. def get_cache_agent_list(self, saas_id, service_id):
  791. redis_key = self._key(saas_id, service_id)
  792. map_cache_by_key = self.redis_handler.redis.hgetall(redis_key)
  793. self.logger.info("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key))
  794. if not map_cache_by_key: # 检查字典是否为空
  795. return [] # 返回空列表
  796. free_agents = []
  797. for phone_num, json_value in map_cache_by_key.items():
  798. agent_status_data_dict = json.loads(json_value)
  799. agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象
  800. agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码
  801. free_agents.append(agent_status_data)
  802. return free_agents
  803. def get_idle_agents(self,cache_agent_list):
  804. current_time =int(datetime.now().timestamp() * 1000) # 获取当前时间的毫秒级时间戳
  805. idle_agents = [
  806. agent for agent in cache_agent_list
  807. if agent.status == 1 and (
  808. agent.assign_time == 0 or
  809. agent.assign_time + self.assigned_recycle_millisecond < current_time
  810. )
  811. ]
  812. return idle_agents
  813. def get_agent_service_idle_size(self, saas_id, service_id):
  814. idle_agents_size = 0
  815. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  816. if cache_agent_list: # 检查列表是否非空
  817. idle_agents = self.get_idle_agents(cache_agent_list)
  818. idle_agents_size = len(idle_agents) # 获取空闲代理的数量
  819. return idle_agents_size
  820. def get_agent_service_busy_size(self, saas_id, service_id):
  821. busy_agents_size = 0
  822. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  823. if cache_agent_list: # 检查列表是否非空
  824. idle_agents = self.get_idle_agents(cache_agent_list)
  825. busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理
  826. busy_agents_size = len(busy_agents) # 获取忙碌代理的数量
  827. return busy_agents_size
  828. def update_report_state(self, saas_id, service_id):
  829. key = self._key(saas_id, service_id)
  830. # data_map 这个地方有疑问
  831. data_map = self.state_service_id_data_map[key]
  832. idle = HumanState.IDLE
  833. if idle.value not in data_map:
  834. data_map[idle.code] = threading.Lock()
  835. self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
  836. # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
  837. busy = HumanState.BUSY
  838. if busy.value not in data_map:
  839. data_map[busy.code] = threading.Lock()
  840. self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
  841. # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
  842. def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
  843. name = "cti_center_real_time_human_service_state"
  844. tag_list = {
  845. "vcc_id": saas_id,
  846. "service_id": service_id,
  847. "state": human_state.code,
  848. }
  849. if human_state == HumanState.IDLE:
  850. # meter_registry 这块疑问
  851. self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
  852. elif human_state == HumanState.BUSY:
  853. self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
  854. return 0
  855. def _check_in_key(self, saas_id):
  856. return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
  857. def _key(self, saas_id, service_id):
  858. return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
  859. def _get_expire_time(self):
  860. now = datetime.now()
  861. end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
  862. expire_time = (end_of_day - now).total_seconds() * 1000 # Convert to milliseconds
  863. return int(expire_time)
  864. def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
  865. idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)
  866. return idle_agents[0].phone_num