agent.py 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import threading
  4. import time
  5. import traceback
  6. from collections import defaultdict
  7. from concurrent.futures import ThreadPoolExecutor
  8. from typing import List
  9. from apscheduler.schedulers.background import BackgroundScheduler
  10. from sqlalchemy import or_
  11. from src.core.callcenter import registry
  12. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  13. from src.core import with_app_context
  14. from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \
  15. AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData
  16. from src.core.callcenter.cache import Cache
  17. from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID, CENTER_AGENT_LIVE_CNT
  18. from src.core.callcenter.dao import *
  19. from src.core.callcenter.data_handler import DataHandleServer
  20. from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
  21. AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect
  22. from src.core.callcenter.esl.constant.event_names import *
  23. from src.core.callcenter.exception import BizException
  24. from src.core.callcenter.push import PushHandler
  25. from src.core.datasource import RedisHandler
  26. class AgentEventService:
  27. def __init__(self, app):
  28. self.app = app
  29. self.logger = app.logger
  30. self.cache = Cache(app)
  31. self.push_handler = PushHandler(app.logger)
  32. self.data_handle_server = DataHandleServer(app)
  33. self.agent_monitor_service = AgentMonitorService(app)
  34. self.agent_state_service = AgentStateService(app)
  35. self.agent_actionlog_service = AgentActionLogService(app)
  36. def delay_state(self, state_data: AgentDelayStateData):
  37. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  38. if not agent:
  39. return
  40. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  41. if not agent_monitor:
  42. return
  43. #TODO 非最新通话的延迟事件,忽略.
  44. agent_scene = AgentScene.get_by_code(state_data.scene)
  45. self.logger.info("agent event delay state %s %s %s %s", state_data.saas_id, state_data.agent_num, state_data.service_state, agent_monitor.service_state)
  46. if AgentServiceState.REPROCESSING.code == state_data.service_state and AgentServiceState.REPROCESSING.code == agent_monitor.service_state:
  47. self.reprocessing_idle(state_data)
  48. if AgentServiceState.DIALING.code == state_data.service_state and AgentServiceState.DIALING.code == agent_monitor.service_state:
  49. if self.cache.get_call_is_answer(state_data.saas_id, state_data.agent_num):
  50. return
  51. self.agent_monitor_service.update_idle(agent_monitor)
  52. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  53. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  54. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  55. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.BIZ_DIALING_IDLE)
  56. if AgentServiceState.HANGING.code == state_data.service_state \
  57. and (AgentServiceState.DIALING.code == agent_monitor.service_state
  58. or AgentServiceState.CALLING.code == agent_monitor.service_state):
  59. self.agent_monitor_service.update_idle(agent_monitor)
  60. if AgentServiceState.DIALING.code == agent_monitor.service_state:
  61. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  62. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  63. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  64. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.MANUAL_HANG_UP)
  65. def agent_event_channel(self, event, call_info: CallInfo, device_info: DeviceInfo):
  66. event_name = EslEventUtil.getEventName(event)
  67. saas_id = call_info.saas_id if call_info else None
  68. flow_id = call_info.cti_flow_id if call_info else None
  69. call_id = call_info.call_id if call_info else None
  70. device_id = device_info.device_id if device_info else None
  71. agent_num = device_info.agent_key if device_info else None
  72. caller = device_info.caller if device_info else None
  73. called = device_info.called if device_info else None
  74. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  75. start_time = time.time()
  76. try:
  77. self.logger.info('agent_event_channel, event_name=%s, agent_num=%s, device_id=%s, is_agent=%s', event_name, agent_num, device_id, is_agent)
  78. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  79. if not agent:
  80. # self.logger.warn("event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  81. return
  82. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  83. if not agent_monitor:
  84. # self.logger.warn("event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  85. return
  86. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  87. if CHANNEL_ORIGINATE == event_name and is_agent:
  88. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  89. # 进度事件,外呼时对方提醒。或者入呼时提醒 && 坐席侧
  90. if CHANNEL_PROGRESS == event_name and is_agent:
  91. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.DIALING)
  92. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING)
  93. # 媒体进度事件,外呼时对方提醒。或者入呼时提醒 && 用户侧
  94. if CHANNEL_PROGRESS_MEDIA == event_name and not is_agent:
  95. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING_RINGING)
  96. #应答
  97. if CHANNEL_ANSWER == event_name:
  98. if call_id:
  99. if self.cache.get_call_is_end(call_id):
  100. # self.logger.warn("event service channel call is end {} {} {} {} {} {}", saas_id, event_name, caller, called, call_id, json.dumps(event.serialize('json')))
  101. return
  102. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  103. self.logger.info('agent接通:%s',agent )
  104. self.data_handle_server.update_record(call_id, status=1, user_id=agent.user_id,user_name=agent.agent_name)
  105. if is_agent:
  106. # 坐席接起
  107. self.cache.set_call_is_answer(saas_id, flow_id)
  108. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.ANSWER_COMPENSATE)
  109. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  110. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON)
  111. else:
  112. # 用户侧接起
  113. self.agent_monitor_service.update_calling(agent_monitor)
  114. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  115. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  116. #挂断
  117. if CHANNEL_HANGUP == event_name:
  118. # 坐席侧挂断
  119. if is_agent:
  120. if call_id:
  121. self.cache.set_call_is_end(call_id)
  122. self.agent_monitor_service.update_processing(agent_monitor)
  123. self.logger.info('挂断更新:%s', agent)
  124. # self.data_handle_server.update_record(call_id, user_id=agent.user_id, user_name=agent.agent_name)
  125. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL))
  126. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, '0')
  127. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_HANG_REPROCESSING)
  128. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.REPROCESSING)
  129. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING, AgentLogState.CHANNEL_HANG_UP)
  130. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  131. # 同步处理后处理置闲
  132. # reprocessingIdle(statusDto);
  133. # agentProducer.pushDelayedStatus(statusDto, reprocessingTimeout);
  134. if (CHANNEL_BRIDGE == event_name or PLAYBACK_START == event_name) and is_agent:
  135. self.push_handler.push_on_ring_start(saas_id, flow_id, agent_num, AgentScene.MANUAL, call_id)
  136. # self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  137. # self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  138. if DETECTED_TONE == event_name and not is_agent:
  139. self.push_handler.push_on_detected_tone(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  140. if (CHANNEL_UNBRIDGE == event_name or PLAYBACK_STOP == event_name) and is_agent:
  141. self.push_handler.push_on_ring_end(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  142. except:
  143. traceback.print_exc()
  144. finally:
  145. latency = (time.time() - start_time)
  146. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  147. def bot_event_channel(self, event, call_info, device_info):
  148. event_name = EslEventUtil.getEventName(event)
  149. saas_id = call_info.saas_id if call_info else None
  150. flow_id = call_info.cti_flow_id if call_info else None
  151. call_id = call_info.call_id if call_info else None
  152. device_id = device_info.device_id if device_info else None
  153. agent_num = device_info.agent_key if device_info else None
  154. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  155. caller = (device_info.called if is_agent else device_info.caller) if device_info else None
  156. called = (device_info.caller if is_agent else device_info.called) if device_info else None
  157. human_service_id = '00000000000000000'
  158. start_time = time.time()
  159. try:
  160. self.logger.info('bot_event_channel, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s', event_name, call_id, device_id, is_agent, agent_num)
  161. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  162. if not agent:
  163. # self.logger.warn("bot event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called,
  164. # json.dumps(event.serialize('json')))
  165. return
  166. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  167. if not agent_monitor:
  168. # self.logger.warn("bot event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller,
  169. # called, json.dumps(event.serialize('json')))
  170. return
  171. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  172. if CHANNEL_ORIGINATE == event_name and is_agent:
  173. self.push_handler.push_on_call_ring(saas_id, flow_id, agent_num, AgentScene.ROBOT, call_id, ServiceDirect.ROBOT_CALL.service_direct, called, caller, human_service_id)
  174. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  175. self.data_handle_server.update_record(call_id, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name) #转接给客服以后更新转接人
  176. if CHANNEL_ANSWER == event_name:
  177. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  178. # self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)
  179. if is_agent:
  180. self.agent_monitor_service.update_calling(agent_monitor)
  181. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! internal")
  182. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.CALLING)
  183. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  184. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON, service_id=human_service_id)
  185. else:
  186. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! external")
  187. if CHANNEL_HANGUP == event_name and is_agent:
  188. self.agent_monitor_service.update_processing(agent_monitor)
  189. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT))
  190. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, "0")
  191. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_HANG_REPROCESSING)
  192. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.REPROCESSING)
  193. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING,
  194. AgentLogState.CHANNEL_HANG_UP, service_id=human_service_id)
  195. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  196. except:
  197. traceback.print_exc()
  198. finally:
  199. latency = (time.time() - start_time)
  200. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  201. def reprocessing_idle(self, state_data: AgentDelayStateData):
  202. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  203. if not agent:
  204. return
  205. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  206. if not agent_monitor:
  207. return
  208. self.logger.info('reprocessing_idle %s %s %s', state_data.saas_id, state_data.agent_num, agent_monitor)
  209. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  210. self.logger.info('reprocessing_idle_end')
  211. self.agent_monitor_service.update_idle(agent_monitor)
  212. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, "", state_data.scene, WorkStatus.AGENT_HANG_IDLE)
  213. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.REPROCESSING_IDLE)
  214. class AgentOperService:
  215. def __init__(self, app):
  216. self.app = app
  217. self.logger = app.logger
  218. self.redis_handler = RedisHandler()
  219. self.push_handler = PushHandler(app.logger)
  220. self.data_handle_server = DataHandleServer(app)
  221. self.agent_monitor_service = AgentMonitorService(app)
  222. self.agent_actionlog_service = AgentActionLogService(app)
  223. self.agent_state_service = AgentStateService(app)
  224. self.agent_heartbeat_expire = 30
  225. self.agent_serial_live_expire = 60*10
  226. self.agent_heartbeat_job_scheduler = BackgroundScheduler()
  227. self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
  228. self.agent_heartbeat_job_scheduler.start()
  229. def agent_heartbeat_daemon(self):
  230. def check_out_daemon(_name, key, value):
  231. try:
  232. sec = datetime.now().timestamp() - float(value)
  233. if sec > self.agent_heartbeat_expire:
  234. self.redis_handler.redis.hdel(_name, key)
  235. self.logger.error("agent heartbeat expired, will checkout %s %s", key, value)
  236. # self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key))
  237. except:
  238. traceback.print_exc()
  239. def check_agent_live_daemon(_members):
  240. key = CENTER_AGENT_LIVE_CNT % SAAS_ID
  241. pre_val = self.redis_handler.redis.get(key)
  242. if not pre_val:
  243. if not _members or len(_members) == 0:
  244. value = datetime.now().timestamp()
  245. self.redis_handler.redis.set(key, value, ex=60*60, nx=True)
  246. else:
  247. diff = datetime.now().timestamp() - float(pre_val)
  248. # self.logger.info('check_agent_live_daemon, members=%s, diff=%s, pre_val=%s', (len(_members) if _members else 0), diff, pre_val)
  249. if diff > self.agent_serial_live_expire:
  250. self.logger.warn('WARING::live agent count less than 1 serial ten minutes')
  251. self.data_handle_server.create_warning_record(1, '10分钟空岗报警')
  252. self.redis_handler.redis.delete(key)
  253. if _members and len(_members) > 0:
  254. self.redis_handler.redis.delete(key)
  255. name = CENTER_AGENT_HEARTBEAT % SAAS_ID
  256. members = self.redis_handler.redis.hgetall(name)
  257. check_agent_live_daemon(members)
  258. if not members:
  259. return
  260. registry.MANUAL_AGENT_LIVES.set(len(members))
  261. for k,v in members.items():
  262. check_out_daemon(name, k, v)
  263. def __del__(self):
  264. self.agent_heartbeat_job_scheduler.shutdown()
  265. @with_app_context
  266. def enable(self, req: AgentActionRequest):
  267. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  268. if agent.agent_state == AgentState.ENABLE.code:
  269. return
  270. agent.agent_state = AgentState.ENABLE.code
  271. db.session.commit()
  272. @with_app_context
  273. def disable(self, req: AgentActionRequest):
  274. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  275. if agent.agent_state == AgentState.DISABLE.code:
  276. return
  277. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_number)
  278. if agent_monitor.check_state == AgentCheck.IN.code and \
  279. agent_monitor.service_state == AgentServiceState.CALLING.code:
  280. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  281. agent.phone_num = ''
  282. agent.agent_state = AgentState.DISABLE.code
  283. db.session.commit()
  284. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  285. phone.is_delete = 1
  286. db.session.commit()
  287. @with_app_context
  288. def checkin(self, req: AgentActionRequest):
  289. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  290. if not agent or agent.agent_state == AgentState.DISABLE.code:
  291. raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN)
  292. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  293. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  294. # agent_monitor.check_scene = req.scene
  295. self.agent_monitor_service.update_checkin(agent_monitor,req.scene)
  296. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN)
  297. self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num)
  298. print("checkin", agent_monitor,agent_monitor.check_state)
  299. if req.scene == AgentScene.MANUAL.code:
  300. # 如果是手动外呼增加置忙
  301. self._handle_idle(req.scene, agent)
  302. return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene)
  303. @with_app_context
  304. def checkout(self, req: AgentActionRequest):
  305. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  306. if not agent or agent.agent_state == AgentState.DISABLE.code:
  307. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  308. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  309. if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code:
  310. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  311. if agent_monitor.check_state == AgentCheck.OUT.code:
  312. return self._push_event_for_checkout(agent, req.scene)
  313. self.agent_monitor_service.update_checkout(agent_monitor)
  314. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT)
  315. self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num)
  316. return self._push_event_for_checkout(agent, req.scene)
  317. @with_app_context
  318. def busy(self, req: AgentActionRequest):
  319. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  320. if not agent or agent.agent_state == AgentState.DISABLE.code:
  321. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  322. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  323. if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code:
  324. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  325. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  326. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  327. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  328. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  329. self._push_event_for_busy(agent, req.scene)
  330. return
  331. self.agent_monitor_service.update_busy(agent_monitor)
  332. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY)
  333. self._push_event_for_busy(agent, req.scene)
  334. @with_app_context
  335. def idle(self, req: AgentActionRequest):
  336. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  337. if not agent or agent.agent_state == AgentState.DISABLE.code:
  338. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  339. self._handle_idle(req.scene, agent)
  340. @with_app_context
  341. def assign(self, req: AgentActionRequest):
  342. return self.agent_state_service.assign_agent(req.saas_id, req.service_id)
  343. def idle_agent_exist(self, request: AgentActionRequest):
  344. pass
  345. @with_app_context
  346. def agent_state(self,req: AgentActionRequest):
  347. # agent = _get_agent(req.saas_id, req.agent_id)
  348. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, req.agent_id)
  349. return agent_monitor.service_state
  350. @with_app_context
  351. def turn_on(self, req: AgentActionRequest):
  352. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  353. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  354. agent_scene = AgentScene.get_by_code(req.scene)
  355. if not agent_monitor:
  356. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  357. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  358. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  359. return
  360. self.agent_monitor_service.update_calling(agent_monitor)
  361. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  362. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  363. @with_app_context
  364. def _handle_idle(self, scene, agent):
  365. agent_monitor = self.data_handle_server.get_agent_monitor(agent.saas_id, agent.agent_num)
  366. print('agent_monitor:', agent_monitor, agent)
  367. if agent_monitor.check_state == AgentCheck.OUT.code:
  368. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  369. if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code:
  370. raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG)
  371. if scene == AgentScene.ROBOT.code:
  372. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  373. else:
  374. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  375. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  376. self._push_event_for_idle(agent, scene)
  377. return
  378. self.agent_monitor_service.update_idle(agent_monitor)
  379. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE)
  380. self._push_event_for_idle(agent, scene)
  381. def _push_event_for_checkin(self, agent, agent_monitor, phone, scene):
  382. """坐席签入事件推送"""
  383. agent_scene = AgentScene.get_by_code(scene)
  384. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS)
  385. event_data = AgentEventData(agent.saas_id, agent.out_id)
  386. event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code,
  387. 'ext': {'saasId': agent.saas_id,
  388. 'agentId': agent.out_id,
  389. 'phoneNum': phone.phone_num,
  390. 'phonePwd': phone.phone_pwd,
  391. 'sipServer': phone.sip_server
  392. }
  393. }
  394. return event_data.__dict__
  395. def _push_event_for_checkout(self,agent,scene):
  396. """签出事件推送"""
  397. agent_scene = AgentScene.get_by_code(scene)
  398. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功')
  399. event_data = AgentEventData(agent.saas_id, agent.out_id)
  400. event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code,
  401. 'ext': {'saasId': agent.saas_id,
  402. 'agentId': agent.out_id,
  403. }
  404. }
  405. return event_data.__dict__
  406. def _push_event_for_busy(self,agent,scene):
  407. """置忙事件推送"""
  408. agent_scene = AgentScene.get_by_code(scene)
  409. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY)
  410. pass
  411. def _push_event_for_idle(self, agent, scene):
  412. """坐席置闲事件推送"""
  413. agent_scene = AgentScene.get_by_code(scene)
  414. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY)
  415. class AgentService:
  416. def __init__(self, app):
  417. self.app = app
  418. self.logger = app.logger
  419. self.data_handle_server = DataHandleServer(app)
  420. self.agent_monitor_service = AgentMonitorService(app)
  421. @with_app_context
  422. def get_and_check(self, req: AgentActionRequest):
  423. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  424. if not agent:
  425. return {}
  426. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  427. return phone.to_dict()
  428. @with_app_context
  429. def watch_agent_state(self, req: HumanServiceQueryRequest):
  430. pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id,
  431. HumanServiceMap.service_id == req.serviceId).all()
  432. agent_ids = [x.agent_id for x in pos]
  433. monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids)
  434. return monitors
  435. @with_app_context
  436. def add(self, req: AgentRequest):
  437. new_agent_num = self.data_handle_server.get_newest_agent_number(req.saas_id)
  438. agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id,
  439. agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number,
  440. distribute=req.distribute, agent_state=req.agent_state)
  441. db.session.add(agent)
  442. db.session.commit()
  443. agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id)
  444. db.session.add(agent_monitor)
  445. db.session.commit()
  446. return new_agent_num
  447. @with_app_context
  448. def update(self, req: AgentRequest):
  449. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  450. if not agent:
  451. return
  452. phone_num = agent.phone_num
  453. state_change = agent.agent_state != req.agent_state
  454. disable = req.agent_state == AgentState.DISABLE
  455. agent.agent_name = req.agent_name
  456. agent.agent_pwd = req.agent_password
  457. agent.agent_type = req.agent_type
  458. agent.phone_num = req.phone_number if not disable else ''
  459. agent.distribute = req.distribute
  460. agent.agent_state = req.agent_state
  461. db.session.commit()
  462. if state_change and disable:
  463. phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first()
  464. if phone:
  465. db.session.delete(phone)
  466. @with_app_context
  467. def detail(self, saas_id, agent_number):
  468. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  469. return agent
  470. @with_app_context
  471. def count(self, req: AgentQueryRequest):
  472. cnt = Agent.query.filter(Agent.saas_id == req.saas_id,
  473. or_(Agent.agent_num == req.agent_number,
  474. Agent.agent_name.contains(req.agent_name),
  475. Agent.out_id == req.out_id,
  476. Agent.agent_type == req.agent_type
  477. )
  478. ).count()
  479. return cnt
  480. @with_app_context
  481. def query_page(self, req: AgentQueryRequest):
  482. pagination = Agent.query.filter(Agent.saas_id == req.saas_id,
  483. or_(Agent.agent_num == req.agent_number,
  484. Agent.agent_name.contains(req.agent_name),
  485. Agent.out_id == req.out_id,
  486. Agent.agent_type == req.agent_type
  487. )
  488. ).paginate(req.page, req.size)
  489. # data = {
  490. # "page": pagination.page, # 当前页码
  491. # "pages": pagination.pages, # 总页码
  492. # "has_prev": pagination.has_prev, # 是否有上一页
  493. # "prev_num": pagination.prev_num, # 上一页页码
  494. # "has_next": pagination.has_next, # 是否有下一页
  495. # "next_num": pagination.next_num, # 下一页页码
  496. # "items": [{
  497. # "id": item.id,
  498. # "name": item.name,
  499. # "age": item.age,
  500. # "sex": item.sex,
  501. # "money": item.money,
  502. # } for item in pagination.items]
  503. # }
  504. return pagination
  505. @with_app_context
  506. def delete(self, saas_id, agent_number):
  507. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  508. if not agent:
  509. return
  510. agent.is_delete = 1
  511. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number)
  512. agent_monitor.is_delete = 1
  513. db.session.commit()
  514. phone = self.data_handle_server.get_phone(saas_id, agent.phone_num)
  515. phone.is_delete = 1
  516. db.session.commit()
  517. class AgentMonitorService:
  518. def __init__(self, app):
  519. self.app = app
  520. self.logger = app.logger
  521. self.data_handle_server = DataHandleServer(app)
  522. @with_app_context
  523. def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None):
  524. if not out_ids:
  525. return []
  526. agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all()
  527. if not agents:
  528. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  529. agent_num_map = {x.agent_num: x for x in agents}
  530. agent_nums = [x.agent_num for x in agents]
  531. agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all()
  532. res = []
  533. for agent_monitor in agent_monitors:
  534. agent = agent_num_map.get(agent_monitor.agent_num)
  535. data = AgentMonitorData()
  536. data.saas_id = saas_id
  537. data.agent_num = agent_monitor.agent_num
  538. data.agent_name = agent.agent_name
  539. data.out_id = agent_monitor.out_id
  540. data.check_state = agent_monitor.check_state
  541. data.check_scene = agent_monitor.check_scene
  542. data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0
  543. if agent_monitor.check_in_time:
  544. data.check_in_time = agent_monitor.check_in_time.timestamp()
  545. day_start = self._get_day_start()
  546. if agent_monitor.check_in_time.timestamp() > day_start and \
  547. agent_monitor.check_state == AgentCheck.OUT.code:
  548. data.online_state = 2
  549. data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None
  550. data.service_state = agent_monitor.service_state
  551. data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None
  552. data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None
  553. data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None
  554. data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None
  555. data.heart_state = agent_monitor.heart_state
  556. data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None
  557. data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None
  558. res.append(data.__dict__)
  559. return res
  560. @with_app_context
  561. def update_checkin(self, agent_monitor,scene=None):
  562. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  563. agent_monitor.check_state = AgentCheck.IN.code
  564. agent_monitor.check_in_time = datetime.now()
  565. agent_monitor.heart_state = AgentHeartState.NORMAL.code
  566. agent_monitor.heart_time = datetime.now()
  567. agent_monitor.check_scene = scene
  568. self.logger.info("update_checkin %s", agent_monitor.check_state)
  569. db.session.commit()
  570. @with_app_context
  571. def update_checkout(self, agent_monitor):
  572. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  573. agent_monitor.check_state = AgentCheck.OUT.code
  574. agent_monitor.check_out_time = datetime.now()
  575. agent_monitor.service_state = AgentServiceState.LOGOUT.code
  576. agent_monitor.heart_state = AgentHeartState.DEFAULT.code
  577. agent_monitor.heart_time = datetime.now()
  578. self.logger.info("update_checkout %s", agent_monitor.check_out_time)
  579. db.session.commit()
  580. @with_app_context
  581. def update_idle(self, agent_monitor):
  582. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  583. agent_monitor.service_state = AgentServiceState.IDLE.code
  584. agent_monitor.idle_time = datetime.now()
  585. db.session.commit()
  586. @with_app_context
  587. def update_busy(self, agent_monitor):
  588. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  589. agent_monitor.service_state = AgentServiceState.BUSY.code
  590. agent_monitor.busy_time = datetime.now()
  591. db.session.commit()
  592. @with_app_context
  593. def update_dialing(self, agent_monitor):
  594. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  595. agent_monitor.service_state = AgentServiceState.DIALING.code
  596. db.session.commit()
  597. @with_app_context
  598. def update_calling(self, agent_monitor):
  599. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  600. agent_monitor.service_state = AgentServiceState.CALLING.code
  601. agent_monitor.call_time = datetime.now()
  602. db.session.commit()
  603. @with_app_context
  604. def update_processing(self, agent_monitor):
  605. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  606. agent_monitor.service_state = AgentServiceState.REPROCESSING.code
  607. agent_monitor.hang_time = datetime.now()
  608. db.session.commit()
  609. @with_app_context
  610. def update_session_id(self, agent_monitor, session_id):
  611. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  612. agent_monitor.session_id = session_id
  613. db.session.commit()
  614. @with_app_context
  615. def update_heart_error(self, agent_monitor):
  616. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  617. agent_monitor.heart_state = AgentHeartState.ABNORMAL.code
  618. agent_monitor.heart_time = datetime.now()
  619. db.session.commit()
  620. @with_app_context
  621. def _get_day_start(self):
  622. today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  623. return int(today_start.timestamp() * 1000) # Return milliseconds
  624. class AgentActionLogService:
  625. def __init__(self, app):
  626. self.app = app
  627. self.logger = app.logger
  628. self.data_handle_server = DataHandleServer(app)
  629. @with_app_context
  630. def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState):
  631. action_log = AgentActionLog()
  632. action_log.saas_id = agent_monitor.saas_id
  633. action_log.agent_num = agent_monitor.agent_num
  634. action_log.out_id = agent_monitor.out_id
  635. action_log.action_type = 0
  636. action_log.check_state = agent_check_enum.code
  637. action_log.pre_check_state = agent_monitor.check_state
  638. if agent_log_enum:
  639. action_log.event_type = agent_log_enum.code
  640. action_log.event_desc = agent_log_enum.description
  641. now = datetime.now()
  642. pre_date = None
  643. if agent_monitor.check_state == AgentCheck.IN.code:
  644. pre_date = agent_monitor.check_in_time
  645. if agent_monitor.check_state == AgentCheck.OUT.code:
  646. pre_date = agent_monitor.check_out_time
  647. if pre_date is None:
  648. pre_date = agent_monitor.update_time
  649. action_log.check_state_time = now
  650. action_log.pre_check_state_time = pre_date
  651. action_log.check_state_duration = now.timestamp() - pre_date.timestamp()
  652. db.session.add(action_log)
  653. db.session.commit()
  654. @with_app_context
  655. def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState,
  656. task_id=None, service_id=None):
  657. if agent_monitor.service_state == agent_service_state.code:
  658. self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code)
  659. action_log = AgentActionLog()
  660. action_log.saas_id = agent_monitor.saas_id
  661. action_log.agent_num = agent_monitor.agent_num
  662. action_log.out_id = agent_monitor.out_id
  663. action_log.action_type = 1
  664. action_log.service_state = agent_service_state.code
  665. action_log.pre_service_state = agent_monitor.service_state
  666. action_log.task_id = task_id
  667. action_log.service_id = service_id
  668. if agent_log_enum:
  669. action_log.event_type = agent_log_enum.code
  670. action_log.event_desc = agent_log_enum.description
  671. now = datetime.now()
  672. pre_date = None
  673. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  674. pre_date = agent_monitor.idle_time
  675. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  676. pre_date = agent_monitor.busy_time
  677. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  678. pre_date = agent_monitor.call_time
  679. if agent_monitor.service_state == AgentServiceState.LOGOUT.code:
  680. pre_date = agent_monitor.check_out_time
  681. if agent_monitor.service_state == AgentServiceState.REPROCESSING.code:
  682. pre_date = agent_monitor.hang_time
  683. action_log.service_state_time = now
  684. if pre_date is None:
  685. pre_date = agent_monitor.update_time
  686. action_log.pre_service_state_time = pre_date
  687. action_log.service_state_duration = now.timestamp() - pre_date.timestamp()
  688. db.session.add(action_log)
  689. db.session.commit()
  690. class AgentStateService:
  691. def __init__(self, app):
  692. self.app = app
  693. self.logger = app.logger
  694. self.redis_handler = RedisHandler()
  695. self.assigned_recycle_millisecond = 30 * 1000
  696. # self.state_service_id_data_map = defaultdict(dict)
  697. # self.executor = ThreadPoolExecutor(max_workers=10)
  698. self.data_handle_server = DataHandleServer(app)
  699. self.agent_monitor_service = AgentMonitorService(app)
  700. self.agent_actionlog_service = AgentActionLogService(app)
  701. def idle(self, saas_id, agent_id, phone_num):
  702. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  703. if human_service is None:
  704. self.logger.info(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log
  705. return
  706. self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id)
  707. def busy(self, saas_id, agent_id, phone_num):
  708. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  709. if human_service is None:
  710. self.logger.info(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log
  711. return
  712. self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id)
  713. def idle_by_human(self, saas_id, agent_id, service_id):
  714. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  715. if not agent:
  716. return
  717. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  718. if not agent_monitor:
  719. return
  720. if agent_monitor.check_state == AgentCheck.IN.code:
  721. self.agent_monitor_service.update_idle(agent_monitor)
  722. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE)
  723. self.idle_hash(saas_id, agent_id, agent.phone_num, service_id)
  724. def busy_by_human(self, saas_id, service_id, agent_id=None):
  725. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  726. if not agent:
  727. return
  728. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  729. if not agent_monitor:
  730. return
  731. if agent_monitor.check_state == AgentCheck.IN.code:
  732. self.agent_monitor_service.update_busy(agent_monitor)
  733. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY,
  734. AgentLogState.DEL_HUMAN_SERVICE)
  735. self.busy_hash(saas_id, agent_id, agent.phone_num, service_id)
  736. def checkin(self, saas_id, agent_id, phone_num):
  737. key = self._check_in_key(saas_id)
  738. state_data = AgentStateData()
  739. state_data.status = HumanState.DEFAULT.code
  740. state_data.time = datetime.now().timestamp()
  741. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  742. self.redis_handler.redis.expire(key, self._get_expire_time())
  743. def checkout(self, saas_id, agent_id, phone_num):
  744. key = self._check_in_key(saas_id)
  745. self.redis_handler.redis.hdel(key, phone_num)
  746. self.busy(saas_id, agent_id, phone_num)
  747. def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
  748. choose_phone_num = ''
  749. lock = threading.Lock()
  750. try:
  751. lock.acquire()
  752. self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called))
  753. idle_agents = self.idle_agents(saas_id, service_id)
  754. if len(idle_agents) <= 0:
  755. return choose_phone_num
  756. choose_phone_num = self._choose_max_idle_time(idle_agents)
  757. self.handle_assign_time(saas_id, service_id, choose_phone_num)
  758. self.handle_lock_agent(choose_phone_num, saas_id, service_id)
  759. finally:
  760. lock.release()
  761. return choose_phone_num
  762. def handle_check_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'):
  763. key = self._lock_key(saas_id, service_id, choose_phone_num)
  764. res = self.redis_handler.redis.get(key)
  765. self.logger.info('checkAgent %s %s %s %s'% (saas_id, service_id, choose_phone_num, res))
  766. return False if res else True
  767. def handle_lock_agent(self, choose_phone_num, saas_id, service_id='00000000000000000'):
  768. key = self._lock_key(saas_id, service_id, choose_phone_num)
  769. expire = self._get_expire_time()
  770. self.redis_handler.redis.set(key, 1, nx=True, ex=expire)
  771. res = self.redis_handler.redis.get(key)
  772. self.logger.info('lockAgent %s %s %s %s %s'% (saas_id, service_id, choose_phone_num, expire, res))
  773. def handle_release_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'):
  774. key = self._lock_key(saas_id, service_id, choose_phone_num)
  775. self.redis_handler.redis.delete(key)
  776. self.logger.info('releaseAgent %s %s %s'% (saas_id, service_id, choose_phone_num))
  777. def handle_assign_time(self, saas_id, service_id, choose_phone_num):
  778. key = self._key(saas_id, service_id)
  779. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  780. if cache_agent_map and choose_phone_num in cache_agent_map:
  781. state_data = cache_agent_map[choose_phone_num]
  782. state_data.assign_time = datetime.now().timestamp() * 1000
  783. self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string())
  784. self.redis_handler.redis.expire(key, self._get_expire_time())
  785. # self.update_report_state(saas_id, service_id)
  786. def idle_agents(self, saas_id, service_id):
  787. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  788. if not cache_agent_list:
  789. return []
  790. agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list])
  791. self.logger.info("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str))
  792. return self.get_idle_agents(cache_agent_list)
  793. def idle_hash(self, saas_id, agent_id, phone_num, service_id):
  794. key = self._key(saas_id, service_id)
  795. state_data = AgentStateData()
  796. # self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s", key, saas_id, phone_num)
  797. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  798. if cache_agent_map and phone_num in cache_agent_map:
  799. state_data = cache_agent_map[phone_num]
  800. state_data.status = HumanState.IDLE.code
  801. state_data.time = datetime.now().timestamp() * 1000
  802. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  803. self.redis_handler.redis.expire(key, self._get_expire_time())
  804. # self.update_report_state(saas_id, service_id)
  805. self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s, state_data=%s"%(key, saas_id, phone_num, state_data))
  806. def busy_hash(self, saas_id, agent_id, phone_num, service_id):
  807. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  808. state_data = cache_agent_map.get(phone_num)
  809. key = self._key(saas_id, service_id)
  810. if state_data is None:
  811. return
  812. state_data.status = HumanState.BUSY.code
  813. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  814. self.redis_handler.redis.expire(key, self._get_expire_time())
  815. # self.update_report_state(saas_id, service_id)
  816. def get_cache_agent_map(self, saas_id, service_id):
  817. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  818. # 检查列表是否为空,如果为空返回空字典
  819. if not cache_agent_list:
  820. return {}
  821. # 使用字典推导式将 cache_agent_list 转换为字典
  822. return {agent.phone_num: agent for agent in cache_agent_list}
  823. def get_cache_agent_list(self, saas_id, service_id):
  824. redis_key = self._key(saas_id, service_id)
  825. map_cache_by_key = self.redis_handler.redis.hgetall(redis_key)
  826. self.logger.info("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key))
  827. if not map_cache_by_key: # 检查字典是否为空
  828. return [] # 返回空列表
  829. free_agents = []
  830. for phone_num, json_value in map_cache_by_key.items():
  831. agent_status_data_dict = json.loads(json_value)
  832. agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象
  833. agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码
  834. free_agents.append(agent_status_data)
  835. return free_agents
  836. def get_idle_agents(self,cache_agent_list):
  837. # current_time =int(datetime.now().timestamp() * 1000) # 获取当前时间的毫秒级时间戳
  838. idle_agents = [
  839. agent for agent in cache_agent_list
  840. if agent.status == 1 and self.handle_check_agent_lock(agent.phone_num, SAAS_ID)
  841. #and (
  842. # agent.assign_time == 0 or
  843. # agent.assign_time + self.assigned_recycle_millisecond < current_time
  844. #)
  845. ]
  846. return idle_agents
  847. def get_agent_service_idle_size(self, saas_id, service_id):
  848. idle_agents_size = 0
  849. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  850. if cache_agent_list: # 检查列表是否非空
  851. idle_agents = self.get_idle_agents(cache_agent_list)
  852. idle_agents_size = len(idle_agents) # 获取空闲代理的数量
  853. return idle_agents_size
  854. def get_agent_service_busy_size(self, saas_id, service_id):
  855. busy_agents_size = 0
  856. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  857. if cache_agent_list: # 检查列表是否非空
  858. idle_agents = self.get_idle_agents(cache_agent_list)
  859. busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理
  860. busy_agents_size = len(busy_agents) # 获取忙碌代理的数量
  861. return busy_agents_size
  862. # def update_report_state(self, saas_id, service_id):
  863. # key = self._key(saas_id, service_id)
  864. # # data_map 这个地方有疑问
  865. # data_map = self.state_service_id_data_map[key]
  866. # idle = HumanState.IDLE
  867. # if idle.value not in data_map:
  868. # data_map[idle.code] = threading.Lock()
  869. # self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
  870. # # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
  871. # busy = HumanState.BUSY
  872. # if busy.value not in data_map:
  873. # data_map[busy.code] = threading.Lock()
  874. # self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
  875. # # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
  876. #
  877. # def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
  878. # name = "cti_center_real_time_human_service_state"
  879. # tag_list = {
  880. # "vcc_id": saas_id,
  881. # "service_id": service_id,
  882. # "state": human_state.code,
  883. # }
  884. # if human_state == HumanState.IDLE:
  885. # # meter_registry 这块疑问
  886. # self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
  887. # elif human_state == HumanState.BUSY:
  888. # self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
  889. # return 0
  890. def _check_in_key(self, saas_id):
  891. return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
  892. def _key(self, saas_id, service_id):
  893. return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
  894. def _lock_key(self, saas_id, service_id, choose_phone_num):
  895. return "CTI:%s:HUMAN:%s:%s:LOCK"%(saas_id.upper(), service_id, choose_phone_num)
  896. def _get_expire_time(self):
  897. now = datetime.now()
  898. end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
  899. expire_time = (end_of_day - now).total_seconds() # Convert to milliseconds
  900. return int(expire_time)
  901. def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
  902. idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)
  903. return idle_agents[0].phone_num