agent.py 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import threading
  4. import time
  5. import traceback
  6. from collections import defaultdict
  7. from concurrent.futures import ThreadPoolExecutor
  8. from typing import List
  9. from apscheduler.schedulers.background import BackgroundScheduler
  10. from sqlalchemy import or_
  11. from src.core.callcenter import registry
  12. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  13. from src.core import with_app_context
  14. from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \
  15. AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData
  16. from src.core.callcenter.cache import Cache
  17. from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID, CENTER_AGENT_LIVE_CNT
  18. from src.core.callcenter.dao import *
  19. from src.core.callcenter.data_handler import DataHandleServer
  20. from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
  21. AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect
  22. from src.core.callcenter.esl.constant.event_names import *
  23. from src.core.callcenter.exception import BizException
  24. from src.core.callcenter.push import PushHandler
  25. from src.core.datasource import RedisHandler
  26. class AgentEventService:
  27. def __init__(self, app):
  28. self.app = app
  29. self.logger = app.logger
  30. self.cache = Cache(app)
  31. self.push_handler = PushHandler(app.logger)
  32. self.data_handle_server = DataHandleServer(app)
  33. self.agent_monitor_service = AgentMonitorService(app)
  34. self.agent_state_service = AgentStateService(app)
  35. self.agent_actionlog_service = AgentActionLogService(app)
  36. def delay_state(self, state_data: AgentDelayStateData):
  37. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  38. if not agent:
  39. return
  40. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  41. if not agent_monitor:
  42. return
  43. #TODO 非最新通话的延迟事件,忽略.
  44. agent_scene = AgentScene.get_by_code(state_data.scene)
  45. self.logger.info("agent event delay state %s %s %s %s", state_data.saas_id, state_data.agent_num, state_data.service_state, agent_monitor.service_state)
  46. if AgentServiceState.REPROCESSING.code == state_data.service_state and AgentServiceState.REPROCESSING.code == agent_monitor.service_state:
  47. self.reprocessing_idle(state_data)
  48. if AgentServiceState.DIALING.code == state_data.service_state and AgentServiceState.DIALING.code == agent_monitor.service_state:
  49. if self.cache.get_call_is_answer(state_data.saas_id, state_data.agent_num):
  50. return
  51. self.agent_monitor_service.update_idle(agent_monitor)
  52. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  53. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  54. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  55. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.BIZ_DIALING_IDLE)
  56. if AgentServiceState.HANGING.code == state_data.service_state \
  57. and (AgentServiceState.DIALING.code == agent_monitor.service_state
  58. or AgentServiceState.CALLING.code == agent_monitor.service_state):
  59. self.agent_monitor_service.update_idle(agent_monitor)
  60. if AgentServiceState.DIALING.code == agent_monitor.service_state:
  61. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  62. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  63. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  64. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.MANUAL_HANG_UP)
  65. def agent_event_channel(self, event, call_info: CallInfo, device_info: DeviceInfo):
  66. event_name = EslEventUtil.getEventName(event)
  67. saas_id = call_info.saas_id if call_info else None
  68. flow_id = call_info.cti_flow_id if call_info else None
  69. call_id = call_info.call_id if call_info else None
  70. device_id = device_info.device_id if device_info else None
  71. agent_num = device_info.agent_key if device_info else None
  72. caller = device_info.caller if device_info else None
  73. called = device_info.called if device_info else None
  74. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  75. start_time = time.time()
  76. try:
  77. self.logger.info('agent_event_channel, event_name=%s, agent_num=%s, device_id=%s, is_agent=%s', event_name, agent_num, device_id, is_agent)
  78. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  79. if not agent:
  80. # self.logger.warn("event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  81. return
  82. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  83. if not agent_monitor:
  84. # self.logger.warn("event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  85. return
  86. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  87. if CHANNEL_ORIGINATE == event_name and is_agent:
  88. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  89. # 进度事件,外呼时对方提醒。或者入呼时提醒 && 坐席侧
  90. if CHANNEL_PROGRESS == event_name and is_agent:
  91. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.DIALING)
  92. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING)
  93. # 媒体进度事件,外呼时对方提醒。或者入呼时提醒 && 用户侧
  94. if CHANNEL_PROGRESS_MEDIA == event_name and not is_agent:
  95. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING_RINGING)
  96. #应答
  97. if CHANNEL_ANSWER == event_name:
  98. if call_id:
  99. if self.cache.get_call_is_end(call_id):
  100. # self.logger.warn("event service channel call is end {} {} {} {} {} {}", saas_id, event_name, caller, called, call_id, json.dumps(event.serialize('json')))
  101. return
  102. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  103. self.logger.info('agent接通:%s',agent )
  104. self.data_handle_server.update_record(call_id, status=1, user_id=agent.user_id,user_name=agent.agent_name)
  105. if is_agent:
  106. # 坐席接起
  107. self.cache.set_call_is_answer(saas_id, flow_id)
  108. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.ANSWER_COMPENSATE)
  109. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  110. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON)
  111. else:
  112. # 用户侧接起
  113. self.agent_monitor_service.update_calling(agent_monitor)
  114. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  115. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  116. #挂断
  117. if CHANNEL_HANGUP == event_name:
  118. # 坐席侧挂断
  119. if is_agent:
  120. if call_id:
  121. self.cache.set_call_is_end(call_id)
  122. self.agent_monitor_service.update_processing(agent_monitor)
  123. self.logger.info('挂断更新:%s', agent)
  124. # self.data_handle_server.update_record(call_id, user_id=agent.user_id, user_name=agent.agent_name)
  125. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL))
  126. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, '0')
  127. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_HANG_REPROCESSING)
  128. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.REPROCESSING)
  129. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING, AgentLogState.CHANNEL_HANG_UP)
  130. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  131. # 同步处理后处理置闲
  132. # reprocessingIdle(statusDto);
  133. # agentProducer.pushDelayedStatus(statusDto, reprocessingTimeout);
  134. if (CHANNEL_BRIDGE == event_name or PLAYBACK_START == event_name) and is_agent:
  135. self.push_handler.push_on_ring_start(saas_id, flow_id, agent_num, AgentScene.MANUAL, call_id)
  136. # self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  137. # self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  138. if DETECTED_TONE == event_name and not is_agent:
  139. self.push_handler.push_on_detected_tone(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  140. if (CHANNEL_UNBRIDGE == event_name or PLAYBACK_STOP == event_name) and is_agent:
  141. self.push_handler.push_on_ring_end(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  142. except:
  143. traceback.print_exc()
  144. finally:
  145. latency = (time.time() - start_time)
  146. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  147. def bot_event_channel(self, event, call_info, device_info):
  148. event_name = EslEventUtil.getEventName(event)
  149. saas_id = call_info.saas_id if call_info else None
  150. flow_id = call_info.cti_flow_id if call_info else None
  151. call_id = call_info.call_id if call_info else None
  152. device_id = device_info.device_id if device_info else None
  153. agent_num = device_info.agent_key if device_info else None
  154. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  155. caller = (device_info.called if is_agent else device_info.caller) if device_info else None
  156. called = (device_info.caller if is_agent else device_info.called) if device_info else None
  157. human_service_id = '00000000000000000'
  158. start_time = time.time()
  159. try:
  160. self.logger.info('bot_event_channel, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s', event_name, call_id, device_id, is_agent, agent_num)
  161. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  162. if not agent:
  163. # self.logger.warn("bot event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called,
  164. # json.dumps(event.serialize('json')))
  165. return
  166. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  167. if not agent_monitor:
  168. # self.logger.warn("bot event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller,
  169. # called, json.dumps(event.serialize('json')))
  170. return
  171. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  172. if CHANNEL_ORIGINATE == event_name and is_agent:
  173. self.push_handler.push_on_call_ring(saas_id, flow_id, agent_num, AgentScene.ROBOT, call_id, ServiceDirect.ROBOT_CALL.service_direct, called, caller, human_service_id)
  174. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  175. self.data_handle_server.update_record(call_id, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name) #转接给客服以后更新转接人
  176. if CHANNEL_ANSWER == event_name:
  177. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  178. # self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)
  179. if is_agent:
  180. self.agent_monitor_service.update_calling(agent_monitor)
  181. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! internal")
  182. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.CALLING)
  183. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  184. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON, service_id=human_service_id)
  185. else:
  186. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! external")
  187. if CHANNEL_HANGUP == event_name and is_agent:
  188. self.agent_monitor_service.update_processing(agent_monitor)
  189. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT))
  190. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, "0")
  191. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_HANG_REPROCESSING)
  192. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.REPROCESSING)
  193. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING,
  194. AgentLogState.CHANNEL_HANG_UP, service_id=human_service_id)
  195. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  196. except:
  197. traceback.print_exc()
  198. finally:
  199. latency = (time.time() - start_time)
  200. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  201. def reprocessing_idle(self, state_data: AgentDelayStateData):
  202. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  203. if not agent:
  204. return
  205. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  206. if not agent_monitor:
  207. return
  208. self.logger.info('reprocessing_idle %s %s %s', state_data.saas_id, state_data.agent_num, agent_monitor)
  209. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  210. self.logger.info('reprocessing_idle_end')
  211. self.agent_monitor_service.update_idle(agent_monitor)
  212. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, "", state_data.scene, WorkStatus.AGENT_HANG_IDLE)
  213. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.REPROCESSING_IDLE)
  214. class AgentOperService:
  215. def __init__(self, app):
  216. self.app = app
  217. self.logger = app.logger
  218. self.redis_handler = RedisHandler()
  219. self.push_handler = PushHandler(app.logger)
  220. self.data_handle_server = DataHandleServer(app)
  221. self.agent_monitor_service = AgentMonitorService(app)
  222. self.agent_actionlog_service = AgentActionLogService(app)
  223. self.agent_state_service = AgentStateService(app)
  224. self.agent_heartbeat_expire = 30
  225. self.agent_serial_live_expire = 60*10
  226. self.agent_heartbeat_job_scheduler = BackgroundScheduler()
  227. self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
  228. self.agent_heartbeat_job_scheduler.start()
  229. def agent_heartbeat_daemon(self):
  230. def check_out_daemon(_name, key, value):
  231. try:
  232. sec = datetime.now().timestamp() - float(value)
  233. if sec > self.agent_heartbeat_expire:
  234. self.redis_handler.redis.hdel(_name, key)
  235. self.logger.error("agent heartbeat expired, will checkout %s %s", key, value)
  236. # self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key))
  237. except:
  238. traceback.print_exc()
  239. def check_agent_live_daemon(_members):
  240. key = CENTER_AGENT_LIVE_CNT % SAAS_ID
  241. pre_val = self.redis_handler.redis.get(key)
  242. if not pre_val:
  243. if not _members or len(_members) == 0:
  244. value = datetime.now().timestamp()
  245. self.redis_handler.redis.set(key, value, ex=60*60, nx=True)
  246. else:
  247. diff = datetime.now().timestamp() - float(pre_val)
  248. # self.logger.info('check_agent_live_daemon, members=%s, diff=%s, pre_val=%s', (len(_members) if _members else 0), diff, pre_val)
  249. if diff > self.agent_serial_live_expire:
  250. self.logger.warn('WARING::live agent count less than 1 serial ten minutes')
  251. self.redis_handler.redis.delete(key)
  252. if _members and len(_members) > 0:
  253. self.redis_handler.redis.delete(key)
  254. name = CENTER_AGENT_HEARTBEAT % SAAS_ID
  255. members = self.redis_handler.redis.hgetall(name)
  256. check_agent_live_daemon(members)
  257. if not members:
  258. return
  259. registry.MANUAL_AGENT_LIVES.set(len(members))
  260. for k,v in members.items():
  261. check_out_daemon(name, k, v)
  262. def __del__(self):
  263. self.agent_heartbeat_job_scheduler.shutdown()
  264. @with_app_context
  265. def enable(self, req: AgentActionRequest):
  266. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  267. if agent.agent_state == AgentState.ENABLE.code:
  268. return
  269. agent.agent_state = AgentState.ENABLE.code
  270. db.session.commit()
  271. @with_app_context
  272. def disable(self, req: AgentActionRequest):
  273. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  274. if agent.agent_state == AgentState.DISABLE.code:
  275. return
  276. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_number)
  277. if agent_monitor.check_state == AgentCheck.IN.code and \
  278. agent_monitor.service_state == AgentServiceState.CALLING.code:
  279. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  280. agent.phone_num = ''
  281. agent.agent_state = AgentState.DISABLE.code
  282. db.session.commit()
  283. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  284. phone.is_delete = 1
  285. db.session.commit()
  286. @with_app_context
  287. def checkin(self, req: AgentActionRequest):
  288. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  289. if not agent or agent.agent_state == AgentState.DISABLE.code:
  290. raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN)
  291. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  292. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  293. # agent_monitor.check_scene = req.scene
  294. self.agent_monitor_service.update_checkin(agent_monitor,req.scene)
  295. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN)
  296. self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num)
  297. print("checkin", agent_monitor,agent_monitor.check_state)
  298. if req.scene == AgentScene.MANUAL.code:
  299. # 如果是手动外呼增加置忙
  300. self._handle_idle(req.scene, agent)
  301. return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene)
  302. @with_app_context
  303. def checkout(self, req: AgentActionRequest):
  304. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  305. if not agent or agent.agent_state == AgentState.DISABLE.code:
  306. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  307. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  308. if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code:
  309. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  310. if agent_monitor.check_state == AgentCheck.OUT.code:
  311. return self._push_event_for_checkout(agent, req.scene)
  312. self.agent_monitor_service.update_checkout(agent_monitor)
  313. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT)
  314. self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num)
  315. return self._push_event_for_checkout(agent, req.scene)
  316. @with_app_context
  317. def busy(self, req: AgentActionRequest):
  318. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  319. if not agent or agent.agent_state == AgentState.DISABLE.code:
  320. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  321. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  322. if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code:
  323. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  324. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  325. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  326. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  327. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  328. self._push_event_for_busy(agent, req.scene)
  329. return
  330. self.agent_monitor_service.update_busy(agent_monitor)
  331. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY)
  332. self._push_event_for_busy(agent, req.scene)
  333. @with_app_context
  334. def idle(self, req: AgentActionRequest):
  335. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  336. if not agent or agent.agent_state == AgentState.DISABLE.code:
  337. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  338. self._handle_idle(req.scene, agent)
  339. @with_app_context
  340. def assign(self, req: AgentActionRequest):
  341. return self.agent_state_service.assign_agent(req.saas_id, req.service_id)
  342. def idle_agent_exist(self, request: AgentActionRequest):
  343. pass
  344. @with_app_context
  345. def agent_state(self,req: AgentActionRequest):
  346. # agent = _get_agent(req.saas_id, req.agent_id)
  347. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, req.agent_id)
  348. return agent_monitor.service_state
  349. @with_app_context
  350. def turn_on(self, req: AgentActionRequest):
  351. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  352. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  353. agent_scene = AgentScene.get_by_code(req.scene)
  354. if not agent_monitor:
  355. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  356. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  357. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  358. return
  359. self.agent_monitor_service.update_calling(agent_monitor)
  360. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  361. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  362. @with_app_context
  363. def _handle_idle(self, scene, agent):
  364. agent_monitor = self.data_handle_server.get_agent_monitor(agent.saas_id, agent.agent_num)
  365. print('agent_monitor:', agent_monitor, agent)
  366. if agent_monitor.check_state == AgentCheck.OUT.code:
  367. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  368. if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code:
  369. raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG)
  370. if scene == AgentScene.ROBOT.code:
  371. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  372. else:
  373. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  374. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  375. self._push_event_for_idle(agent, scene)
  376. return
  377. self.agent_monitor_service.update_idle(agent_monitor)
  378. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE)
  379. self._push_event_for_idle(agent, scene)
  380. def _push_event_for_checkin(self, agent, agent_monitor, phone, scene):
  381. """坐席签入事件推送"""
  382. agent_scene = AgentScene.get_by_code(scene)
  383. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS)
  384. event_data = AgentEventData(agent.saas_id, agent.out_id)
  385. event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code,
  386. 'ext': {'saasId': agent.saas_id,
  387. 'agentId': agent.out_id,
  388. 'phoneNum': phone.phone_num,
  389. 'phonePwd': phone.phone_pwd,
  390. 'sipServer': phone.sip_server
  391. }
  392. }
  393. return event_data.__dict__
  394. def _push_event_for_checkout(self,agent,scene):
  395. """签出事件推送"""
  396. agent_scene = AgentScene.get_by_code(scene)
  397. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功')
  398. event_data = AgentEventData(agent.saas_id, agent.out_id)
  399. event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code,
  400. 'ext': {'saasId': agent.saas_id,
  401. 'agentId': agent.out_id,
  402. }
  403. }
  404. return event_data.__dict__
  405. def _push_event_for_busy(self,agent,scene):
  406. """置忙事件推送"""
  407. agent_scene = AgentScene.get_by_code(scene)
  408. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY)
  409. pass
  410. def _push_event_for_idle(self, agent, scene):
  411. """坐席置闲事件推送"""
  412. agent_scene = AgentScene.get_by_code(scene)
  413. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY)
  414. class AgentService:
  415. def __init__(self, app):
  416. self.app = app
  417. self.logger = app.logger
  418. self.data_handle_server = DataHandleServer(app)
  419. self.agent_monitor_service = AgentMonitorService(app)
  420. @with_app_context
  421. def get_and_check(self, req: AgentActionRequest):
  422. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  423. if not agent:
  424. return {}
  425. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  426. return phone.to_dict()
  427. @with_app_context
  428. def watch_agent_state(self, req: HumanServiceQueryRequest):
  429. pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id,
  430. HumanServiceMap.service_id == req.serviceId).all()
  431. agent_ids = [x.agent_id for x in pos]
  432. monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids)
  433. return monitors
  434. @with_app_context
  435. def add(self, req: AgentRequest):
  436. new_agent_num = self.data_handle_server.get_newest_agent_number(req.saas_id)
  437. agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id,
  438. agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number,
  439. distribute=req.distribute, agent_state=req.agent_state)
  440. db.session.add(agent)
  441. db.session.commit()
  442. agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id)
  443. db.session.add(agent_monitor)
  444. db.session.commit()
  445. return new_agent_num
  446. @with_app_context
  447. def update(self, req: AgentRequest):
  448. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  449. if not agent:
  450. return
  451. phone_num = agent.phone_num
  452. state_change = agent.agent_state != req.agent_state
  453. disable = req.agent_state == AgentState.DISABLE
  454. agent.agent_name = req.agent_name
  455. agent.agent_pwd = req.agent_password
  456. agent.agent_type = req.agent_type
  457. agent.phone_num = req.phone_number if not disable else ''
  458. agent.distribute = req.distribute
  459. agent.agent_state = req.agent_state
  460. db.session.commit()
  461. if state_change and disable:
  462. phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first()
  463. if phone:
  464. db.session.delete(phone)
  465. @with_app_context
  466. def detail(self, saas_id, agent_number):
  467. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  468. return agent
  469. @with_app_context
  470. def count(self, req: AgentQueryRequest):
  471. cnt = Agent.query.filter(Agent.saas_id == req.saas_id,
  472. or_(Agent.agent_num == req.agent_number,
  473. Agent.agent_name.contains(req.agent_name),
  474. Agent.out_id == req.out_id,
  475. Agent.agent_type == req.agent_type
  476. )
  477. ).count()
  478. return cnt
  479. @with_app_context
  480. def query_page(self, req: AgentQueryRequest):
  481. pagination = Agent.query.filter(Agent.saas_id == req.saas_id,
  482. or_(Agent.agent_num == req.agent_number,
  483. Agent.agent_name.contains(req.agent_name),
  484. Agent.out_id == req.out_id,
  485. Agent.agent_type == req.agent_type
  486. )
  487. ).paginate(req.page, req.size)
  488. # data = {
  489. # "page": pagination.page, # 当前页码
  490. # "pages": pagination.pages, # 总页码
  491. # "has_prev": pagination.has_prev, # 是否有上一页
  492. # "prev_num": pagination.prev_num, # 上一页页码
  493. # "has_next": pagination.has_next, # 是否有下一页
  494. # "next_num": pagination.next_num, # 下一页页码
  495. # "items": [{
  496. # "id": item.id,
  497. # "name": item.name,
  498. # "age": item.age,
  499. # "sex": item.sex,
  500. # "money": item.money,
  501. # } for item in pagination.items]
  502. # }
  503. return pagination
  504. @with_app_context
  505. def delete(self, saas_id, agent_number):
  506. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  507. if not agent:
  508. return
  509. agent.is_delete = 1
  510. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number)
  511. agent_monitor.is_delete = 1
  512. db.session.commit()
  513. phone = self.data_handle_server.get_phone(saas_id, agent.phone_num)
  514. phone.is_delete = 1
  515. db.session.commit()
  516. class AgentMonitorService:
  517. def __init__(self, app):
  518. self.app = app
  519. self.logger = app.logger
  520. self.data_handle_server = DataHandleServer(app)
  521. @with_app_context
  522. def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None):
  523. if not out_ids:
  524. return []
  525. agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all()
  526. if not agents:
  527. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  528. agent_num_map = {x.agent_num: x for x in agents}
  529. agent_nums = [x.agent_num for x in agents]
  530. agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all()
  531. res = []
  532. for agent_monitor in agent_monitors:
  533. agent = agent_num_map.get(agent_monitor.agent_num)
  534. data = AgentMonitorData()
  535. data.saas_id = saas_id
  536. data.agent_num = agent_monitor.agent_num
  537. data.agent_name = agent.agent_name
  538. data.out_id = agent_monitor.out_id
  539. data.check_state = agent_monitor.check_state
  540. data.check_scene = agent_monitor.check_scene
  541. data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0
  542. if agent_monitor.check_in_time:
  543. data.check_in_time = agent_monitor.check_in_time.timestamp()
  544. day_start = self._get_day_start()
  545. if agent_monitor.check_in_time.timestamp() > day_start and \
  546. agent_monitor.check_state == AgentCheck.OUT.code:
  547. data.online_state = 2
  548. data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None
  549. data.service_state = agent_monitor.service_state
  550. data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None
  551. data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None
  552. data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None
  553. data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None
  554. data.heart_state = agent_monitor.heart_state
  555. data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None
  556. data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None
  557. res.append(data.__dict__)
  558. return res
  559. @with_app_context
  560. def update_checkin(self, agent_monitor,scene=None):
  561. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  562. agent_monitor.check_state = AgentCheck.IN.code
  563. agent_monitor.check_in_time = datetime.now()
  564. agent_monitor.heart_state = AgentHeartState.NORMAL.code
  565. agent_monitor.heart_time = datetime.now()
  566. agent_monitor.check_scene = scene
  567. self.logger.info("update_checkin %s", agent_monitor.check_state)
  568. db.session.commit()
  569. @with_app_context
  570. def update_checkout(self, agent_monitor):
  571. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  572. agent_monitor.check_state = AgentCheck.OUT.code
  573. agent_monitor.check_out_time = datetime.now()
  574. agent_monitor.service_state = AgentServiceState.LOGOUT.code
  575. agent_monitor.heart_state = AgentHeartState.DEFAULT.code
  576. agent_monitor.heart_time = datetime.now()
  577. self.logger.info("update_checkout %s", agent_monitor.check_out_time)
  578. db.session.commit()
  579. @with_app_context
  580. def update_idle(self, agent_monitor):
  581. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  582. agent_monitor.service_state = AgentServiceState.IDLE.code
  583. agent_monitor.idle_time = datetime.now()
  584. db.session.commit()
  585. @with_app_context
  586. def update_busy(self, agent_monitor):
  587. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  588. agent_monitor.service_state = AgentServiceState.BUSY.code
  589. agent_monitor.busy_time = datetime.now()
  590. db.session.commit()
  591. @with_app_context
  592. def update_dialing(self, agent_monitor):
  593. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  594. agent_monitor.service_state = AgentServiceState.DIALING.code
  595. db.session.commit()
  596. @with_app_context
  597. def update_calling(self, agent_monitor):
  598. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  599. agent_monitor.service_state = AgentServiceState.CALLING.code
  600. agent_monitor.call_time = datetime.now()
  601. db.session.commit()
  602. @with_app_context
  603. def update_processing(self, agent_monitor):
  604. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  605. agent_monitor.service_state = AgentServiceState.REPROCESSING.code
  606. agent_monitor.hang_time = datetime.now()
  607. db.session.commit()
  608. @with_app_context
  609. def update_session_id(self, agent_monitor, session_id):
  610. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  611. agent_monitor.session_id = session_id
  612. db.session.commit()
  613. @with_app_context
  614. def update_heart_error(self, agent_monitor):
  615. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  616. agent_monitor.heart_state = AgentHeartState.ABNORMAL.code
  617. agent_monitor.heart_time = datetime.now()
  618. db.session.commit()
  619. @with_app_context
  620. def _get_day_start(self):
  621. today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  622. return int(today_start.timestamp() * 1000) # Return milliseconds
  623. class AgentActionLogService:
  624. def __init__(self, app):
  625. self.app = app
  626. self.logger = app.logger
  627. self.data_handle_server = DataHandleServer(app)
  628. @with_app_context
  629. def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState):
  630. action_log = AgentActionLog()
  631. action_log.saas_id = agent_monitor.saas_id
  632. action_log.agent_num = agent_monitor.agent_num
  633. action_log.out_id = agent_monitor.out_id
  634. action_log.action_type = 0
  635. action_log.check_state = agent_check_enum.code
  636. action_log.pre_check_state = agent_monitor.check_state
  637. if agent_log_enum:
  638. action_log.event_type = agent_log_enum.code
  639. action_log.event_desc = agent_log_enum.description
  640. now = datetime.now()
  641. pre_date = None
  642. if agent_monitor.check_state == AgentCheck.IN.code:
  643. pre_date = agent_monitor.check_in_time
  644. if agent_monitor.check_state == AgentCheck.OUT.code:
  645. pre_date = agent_monitor.check_out_time
  646. if pre_date is None:
  647. pre_date = agent_monitor.update_time
  648. action_log.check_state_time = now
  649. action_log.pre_check_state_time = pre_date
  650. action_log.check_state_duration = now.timestamp() - pre_date.timestamp()
  651. db.session.add(action_log)
  652. db.session.commit()
  653. @with_app_context
  654. def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState,
  655. task_id=None, service_id=None):
  656. if agent_monitor.service_state == agent_service_state.code:
  657. self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code)
  658. action_log = AgentActionLog()
  659. action_log.saas_id = agent_monitor.saas_id
  660. action_log.agent_num = agent_monitor.agent_num
  661. action_log.out_id = agent_monitor.out_id
  662. action_log.action_type = 1
  663. action_log.service_state = agent_service_state.code
  664. action_log.pre_service_state = agent_monitor.service_state
  665. action_log.task_id = task_id
  666. action_log.service_id = service_id
  667. if agent_log_enum:
  668. action_log.event_type = agent_log_enum.code
  669. action_log.event_desc = agent_log_enum.description
  670. now = datetime.now()
  671. pre_date = None
  672. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  673. pre_date = agent_monitor.idle_time
  674. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  675. pre_date = agent_monitor.busy_time
  676. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  677. pre_date = agent_monitor.call_time
  678. if agent_monitor.service_state == AgentServiceState.LOGOUT.code:
  679. pre_date = agent_monitor.check_out_time
  680. if agent_monitor.service_state == AgentServiceState.REPROCESSING.code:
  681. pre_date = agent_monitor.hang_time
  682. action_log.service_state_time = now
  683. if pre_date is None:
  684. pre_date = agent_monitor.update_time
  685. action_log.pre_service_state_time = pre_date
  686. action_log.service_state_duration = now.timestamp() - pre_date.timestamp()
  687. db.session.add(action_log)
  688. db.session.commit()
  689. class AgentStateService:
  690. def __init__(self, app):
  691. self.app = app
  692. self.logger = app.logger
  693. self.redis_handler = RedisHandler()
  694. self.assigned_recycle_millisecond = 30 * 1000
  695. self.state_service_id_data_map = defaultdict(dict)
  696. self.executor = ThreadPoolExecutor(max_workers=10)
  697. self.data_handle_server = DataHandleServer(app)
  698. self.agent_monitor_service = AgentMonitorService(app)
  699. self.agent_actionlog_service = AgentActionLogService(app)
  700. def idle(self, saas_id, agent_id, phone_num):
  701. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  702. if human_service is None:
  703. self.logger.info(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log
  704. return
  705. self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id)
  706. def busy(self, saas_id, agent_id, phone_num):
  707. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  708. if human_service is None:
  709. self.logger.info(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log
  710. return
  711. self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id)
  712. def idle_by_human(self, saas_id, agent_id, service_id):
  713. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  714. if not agent:
  715. return
  716. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  717. if not agent_monitor:
  718. return
  719. if agent_monitor.check_state == AgentCheck.IN.code:
  720. self.agent_monitor_service.update_idle(agent_monitor)
  721. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE)
  722. self.idle_hash(saas_id, agent_id, agent.phone_num, service_id)
  723. def busy_by_human(self, saas_id, service_id, agent_id=None):
  724. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  725. if not agent:
  726. return
  727. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  728. if not agent_monitor:
  729. return
  730. if agent_monitor.check_state == AgentCheck.IN.code:
  731. self.agent_monitor_service.update_busy(agent_monitor)
  732. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY,
  733. AgentLogState.DEL_HUMAN_SERVICE)
  734. self.busy_hash(saas_id, agent_id, agent.phone_num, service_id)
  735. def checkin(self, saas_id, agent_id, phone_num):
  736. key = self._check_in_key(saas_id)
  737. state_data = AgentStateData()
  738. state_data.status = HumanState.DEFAULT.code
  739. state_data.time = datetime.now().timestamp()
  740. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  741. self.redis_handler.redis.expire(key, self._get_expire_time())
  742. def checkout(self, saas_id, agent_id, phone_num):
  743. key = self._check_in_key(saas_id)
  744. self.redis_handler.redis.hdel(key, phone_num)
  745. self.busy(saas_id, agent_id, phone_num)
  746. def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
  747. choose_phone_num = ''
  748. lock = threading.Lock()
  749. try:
  750. lock.acquire()
  751. self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called))
  752. idle_agents = self.idle_agents(saas_id, service_id)
  753. if len(idle_agents) <= 0:
  754. return choose_phone_num
  755. choose_phone_num = self._choose_max_idle_time(idle_agents)
  756. self.handle_assign_time(saas_id, service_id, choose_phone_num)
  757. finally:
  758. lock.release()
  759. return choose_phone_num
  760. def handle_assign_time(self, saas_id, service_id, choose_phone_num):
  761. key = self._key(saas_id, service_id)
  762. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  763. if cache_agent_map and choose_phone_num in cache_agent_map:
  764. state_data = cache_agent_map[choose_phone_num]
  765. state_data.assign_time = datetime.now().timestamp() * 1000
  766. self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string())
  767. self.redis_handler.redis.expire(key, self._get_expire_time())
  768. # self.update_report_state(saas_id, service_id)
  769. def idle_agents(self, saas_id, service_id):
  770. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  771. if not cache_agent_list:
  772. return []
  773. agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list])
  774. self.logger.info("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str))
  775. return self.get_idle_agents(cache_agent_list)
  776. def idle_hash(self, saas_id, agent_id, phone_num, service_id):
  777. key = self._key(saas_id, service_id)
  778. state_data = AgentStateData()
  779. # self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s", key, saas_id, phone_num)
  780. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  781. if cache_agent_map and phone_num in cache_agent_map:
  782. state_data = cache_agent_map[phone_num]
  783. state_data.status = HumanState.IDLE.code
  784. state_data.time = datetime.now().timestamp() * 1000
  785. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  786. self.redis_handler.redis.expire(key, self._get_expire_time())
  787. # self.update_report_state(saas_id, service_id)
  788. self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s, state_data=%s"%(key, saas_id, phone_num, state_data))
  789. def busy_hash(self, saas_id, agent_id, phone_num, service_id):
  790. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  791. state_data = cache_agent_map.get(phone_num)
  792. key = self._key(saas_id, service_id)
  793. if state_data is None:
  794. return
  795. state_data.status = HumanState.BUSY.code
  796. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  797. self.redis_handler.redis.expire(key, self._get_expire_time())
  798. # self.update_report_state(saas_id, service_id)
  799. def get_cache_agent_map(self, saas_id, service_id):
  800. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  801. # 检查列表是否为空,如果为空返回空字典
  802. if not cache_agent_list:
  803. return {}
  804. # 使用字典推导式将 cache_agent_list 转换为字典
  805. return {agent.phone_num: agent for agent in cache_agent_list}
  806. def get_cache_agent_list(self, saas_id, service_id):
  807. redis_key = self._key(saas_id, service_id)
  808. map_cache_by_key = self.redis_handler.redis.hgetall(redis_key)
  809. self.logger.info("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key))
  810. if not map_cache_by_key: # 检查字典是否为空
  811. return [] # 返回空列表
  812. free_agents = []
  813. for phone_num, json_value in map_cache_by_key.items():
  814. agent_status_data_dict = json.loads(json_value)
  815. agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象
  816. agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码
  817. free_agents.append(agent_status_data)
  818. return free_agents
  819. def get_idle_agents(self,cache_agent_list):
  820. current_time =int(datetime.now().timestamp() * 1000) # 获取当前时间的毫秒级时间戳
  821. idle_agents = [
  822. agent for agent in cache_agent_list
  823. if agent.status == 1 and (
  824. agent.assign_time == 0 or
  825. agent.assign_time + self.assigned_recycle_millisecond < current_time
  826. )
  827. ]
  828. return idle_agents
  829. def get_agent_service_idle_size(self, saas_id, service_id):
  830. idle_agents_size = 0
  831. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  832. if cache_agent_list: # 检查列表是否非空
  833. idle_agents = self.get_idle_agents(cache_agent_list)
  834. idle_agents_size = len(idle_agents) # 获取空闲代理的数量
  835. return idle_agents_size
  836. def get_agent_service_busy_size(self, saas_id, service_id):
  837. busy_agents_size = 0
  838. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  839. if cache_agent_list: # 检查列表是否非空
  840. idle_agents = self.get_idle_agents(cache_agent_list)
  841. busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理
  842. busy_agents_size = len(busy_agents) # 获取忙碌代理的数量
  843. return busy_agents_size
  844. def update_report_state(self, saas_id, service_id):
  845. key = self._key(saas_id, service_id)
  846. # data_map 这个地方有疑问
  847. data_map = self.state_service_id_data_map[key]
  848. idle = HumanState.IDLE
  849. if idle.value not in data_map:
  850. data_map[idle.code] = threading.Lock()
  851. self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
  852. # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
  853. busy = HumanState.BUSY
  854. if busy.value not in data_map:
  855. data_map[busy.code] = threading.Lock()
  856. self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
  857. # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
  858. def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
  859. name = "cti_center_real_time_human_service_state"
  860. tag_list = {
  861. "vcc_id": saas_id,
  862. "service_id": service_id,
  863. "state": human_state.code,
  864. }
  865. if human_state == HumanState.IDLE:
  866. # meter_registry 这块疑问
  867. self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
  868. elif human_state == HumanState.BUSY:
  869. self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
  870. return 0
  871. def _check_in_key(self, saas_id):
  872. return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
  873. def _key(self, saas_id, service_id):
  874. return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
  875. def _get_expire_time(self):
  876. now = datetime.now()
  877. end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
  878. expire_time = (end_of_day - now).total_seconds() * 1000 # Convert to milliseconds
  879. return int(expire_time)
  880. def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
  881. idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)
  882. return idle_agents[0].phone_num