agent.py 56 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import threading
  4. import time
  5. import traceback
  6. from collections import defaultdict
  7. from concurrent.futures import ThreadPoolExecutor
  8. from typing import List
  9. from apscheduler.schedulers.background import BackgroundScheduler
  10. from sqlalchemy import or_
  11. from src.core.callcenter import registry
  12. import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
  13. from src.core import with_app_context
  14. from src.core.callcenter.api import AgentActionRequest, AgentQueryRequest, AgentRequest, AgentEventData, \
  15. AgentStateData, HumanServiceQueryRequest, AgentMonitorData, CallInfo, DeviceInfo, AgentDelayStateData
  16. from src.core.callcenter.cache import Cache
  17. from src.core.callcenter.constant import CENTER_AGENT_HEARTBEAT, SAAS_ID, CENTER_AGENT_LIVE_CNT
  18. from src.core.callcenter.dao import *
  19. from src.core.callcenter.data_handler import DataHandleServer
  20. from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
  21. AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState, DeviceType, ServiceDirect, HangupDir
  22. from src.core.callcenter.esl.constant.event_names import *
  23. from src.core.callcenter.exception import BizException
  24. from src.core.callcenter.push import PushHandler
  25. from src.core.datasource import RedisHandler
  26. class AgentEventService:
  27. def __init__(self, app):
  28. self.app = app
  29. self.logger = app.logger
  30. self.cache = Cache(app)
  31. self.push_handler = PushHandler(app.logger)
  32. self.data_handle_server = DataHandleServer(app)
  33. self.agent_monitor_service = AgentMonitorService(app)
  34. self.agent_state_service = AgentStateService(app)
  35. self.agent_actionlog_service = AgentActionLogService(app)
  36. def delay_state(self, state_data: AgentDelayStateData):
  37. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  38. if not agent:
  39. return
  40. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  41. if not agent_monitor:
  42. return
  43. #TODO 非最新通话的延迟事件,忽略.
  44. agent_scene = AgentScene.get_by_code(state_data.scene)
  45. self.logger.info("agent event delay state %s %s %s %s", state_data.saas_id, state_data.agent_num, state_data.service_state, agent_monitor.service_state)
  46. if AgentServiceState.REPROCESSING.code == state_data.service_state and AgentServiceState.REPROCESSING.code == agent_monitor.service_state:
  47. self.reprocessing_idle(state_data)
  48. if AgentServiceState.DIALING.code == state_data.service_state and AgentServiceState.DIALING.code == agent_monitor.service_state:
  49. if self.cache.get_call_is_answer(state_data.saas_id, state_data.agent_num):
  50. return
  51. self.agent_monitor_service.update_idle(agent_monitor)
  52. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  53. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  54. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  55. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.BIZ_DIALING_IDLE)
  56. if AgentServiceState.HANGING.code == state_data.service_state \
  57. and (AgentServiceState.DIALING.code == agent_monitor.service_state
  58. or AgentServiceState.CALLING.code == agent_monitor.service_state):
  59. self.agent_monitor_service.update_idle(agent_monitor)
  60. if AgentServiceState.DIALING.code == agent_monitor.service_state:
  61. self.push_handler.push_on_call_end(state_data.saas_id, state_data.flow_id, state_data.agent_num, agent_scene, ServiceDirect.MANUAL_CALL.service_direct, '0')
  62. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_REPROCESSING)
  63. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, '', agent_scene, WorkStatus.AGENT_HANG_IDLE)
  64. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.MANUAL_HANG_UP)
  65. def agent_event_channel(self, event, call_info: CallInfo, device_info: DeviceInfo):
  66. event_name = EslEventUtil.getEventName(event)
  67. saas_id = call_info.saas_id if call_info else None
  68. flow_id = call_info.cti_flow_id if call_info else None
  69. call_id = call_info.call_id if call_info else None
  70. device_id = device_info.device_id if device_info else None
  71. agent_num = device_info.agent_key if device_info else None
  72. caller = device_info.caller if device_info else None
  73. called = device_info.called if device_info else None
  74. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  75. start_time = time.time()
  76. try:
  77. self.logger.info('agent_event_channel, event_name=%s, agent_num=%s, device_id=%s, is_agent=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, agent_num, device_id, is_agent, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
  78. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  79. if not agent:
  80. # self.logger.warn("event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  81. return
  82. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  83. if not agent_monitor:
  84. # self.logger.warn("event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller, called, json.loads(event.serialize('json')))
  85. return
  86. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  87. if CHANNEL_ORIGINATE == event_name and is_agent:
  88. if call_info.hangup_dir and call_info.hangup_dir == HangupDir.CUSTOMER_HANGUP.code:
  89. self.logger.info('agent_event_channel:already:hangup, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
  90. return
  91. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  92. # 进度事件,外呼时对方提醒。或者入呼时提醒 && 坐席侧
  93. if CHANNEL_PROGRESS == event_name and is_agent:
  94. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.DIALING)
  95. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING)
  96. # 媒体进度事件,外呼时对方提醒。或者入呼时提醒 && 用户侧
  97. if CHANNEL_PROGRESS_MEDIA == event_name and not is_agent:
  98. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_CALLING_RINGING)
  99. #应答
  100. if CHANNEL_ANSWER == event_name:
  101. if call_id:
  102. if self.cache.get_call_is_end(call_id):
  103. # self.logger.warn("event service channel call is end {} {} {} {} {} {}", saas_id, event_name, caller, called, call_id, json.dumps(event.serialize('json')))
  104. return
  105. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  106. self.logger.info('agent接通:%s',agent )
  107. self.data_handle_server.update_record(call_id, status=1, user_id=agent.user_id,user_name=agent.agent_name)
  108. if is_agent:
  109. # 坐席接起
  110. self.cache.set_call_is_answer(saas_id, flow_id)
  111. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.ANSWER_COMPENSATE)
  112. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  113. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON)
  114. else:
  115. # 用户侧接起
  116. self.agent_monitor_service.update_calling(agent_monitor)
  117. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  118. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  119. #挂断
  120. if CHANNEL_HANGUP == event_name:
  121. # 坐席侧挂断
  122. if is_agent:
  123. if call_id:
  124. self.cache.set_call_is_end(call_id)
  125. self.agent_monitor_service.update_processing(agent_monitor)
  126. self.logger.info('挂断更新:%s', agent)
  127. # self.data_handle_server.update_record(call_id, user_id=agent.user_id, user_name=agent.agent_name)
  128. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, call_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.MANUAL))
  129. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.MANUAL, ServiceDirect.MANUAL_CALL.service_direct, '0')
  130. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_HANG_REPROCESSING)
  131. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.REPROCESSING)
  132. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING, AgentLogState.CHANNEL_HANG_UP)
  133. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  134. # 同步处理后处理置闲
  135. # reprocessingIdle(statusDto);
  136. # agentProducer.pushDelayedStatus(statusDto, reprocessingTimeout);
  137. if (CHANNEL_BRIDGE == event_name or PLAYBACK_START == event_name) and is_agent:
  138. self.push_handler.push_on_ring_start(saas_id, flow_id, agent_num, AgentScene.MANUAL, call_id)
  139. # self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.MANUAL, WorkStatus.AGENT_ANSWER_OUTGOING)
  140. # self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.MANUAL, AgentServiceState.CALLING)
  141. if DETECTED_TONE == event_name and not is_agent:
  142. self.push_handler.push_on_detected_tone(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  143. if (CHANNEL_UNBRIDGE == event_name or PLAYBACK_STOP == event_name) and is_agent:
  144. self.push_handler.push_on_ring_end(saas_id, flow_id, call_id, AgentScene.MANUAL, call_id)
  145. except:
  146. traceback.print_exc()
  147. finally:
  148. latency = (time.time() - start_time)
  149. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  150. def bot_event_channel(self, event, call_info, device_info):
  151. event_name = EslEventUtil.getEventName(event)
  152. saas_id = call_info.saas_id if call_info else None
  153. flow_id = call_info.cti_flow_id if call_info else None
  154. call_id = call_info.call_id if call_info else None
  155. device_id = device_info.device_id if device_info else None
  156. agent_num = device_info.agent_key if device_info else None
  157. is_agent = (device_info and DeviceType.AGENT.code == device_info.device_type) if device_info else False
  158. caller = (device_info.called if is_agent else device_info.caller) if device_info else None
  159. called = (device_info.caller if is_agent else device_info.called) if device_info else None
  160. human_service_id = '00000000000000000'
  161. start_time = time.time()
  162. try:
  163. self.logger.info('bot_event_channel, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
  164. agent = self.data_handle_server.get_agent(saas_id, agent_num)
  165. if not agent:
  166. # self.logger.warn("bot event service channel agent is null %s %s %s %s %s", saas_id, event_name, caller, called,
  167. # json.dumps(event.serialize('json')))
  168. return
  169. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_num)
  170. if not agent_monitor:
  171. # self.logger.warn("bot event service channel agentMonitor is null %s %s %s %s %s", saas_id, event_name, caller,
  172. # called, json.dumps(event.serialize('json')))
  173. return
  174. # 信道发起事件,触发完成发起(或桥)&& 坐席侧
  175. if CHANNEL_ORIGINATE == event_name and is_agent:
  176. self.data_handle_server.update_record(call_id, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name) # 转接给客服以后更新转接人
  177. if call_info.hangup_dir and call_info.hangup_dir == HangupDir.CUSTOMER_HANGUP.code:
  178. self.logger.info('bot_event_channel:already:hangup, event_name=%s, call_id=%s, device_id=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
  179. return
  180. self.push_handler.push_on_call_ring(saas_id, flow_id, agent_num, AgentScene.ROBOT, call_id, ServiceDirect.ROBOT_CALL.service_direct, called, caller, human_service_id)
  181. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_RINGING,phone=call_info.caller)
  182. if CHANNEL_ANSWER == event_name:
  183. self.agent_state_service.busy(saas_id, agent.agent_num, agent.phone_num)
  184. # self.data_handle_server.update_record(call_id, status=1, transfer_user_id=agent.user_id,transfer_user_name=agent.agent_name)
  185. if is_agent:
  186. self.agent_monitor_service.update_calling(agent_monitor)
  187. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! internal")
  188. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.CALLING)
  189. self.push_handler.push_answer_call(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, WorkStatus.AGENT_HANG_REPROCESSING)
  190. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.CALLING, AgentLogState.CHANNEL_TURN_ON, service_id=human_service_id)
  191. else:
  192. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_ANSWER_INCOMING, "座席接通呼入电话! external")
  193. if CHANNEL_HANGUP == event_name and is_agent:
  194. self.agent_monitor_service.update_processing(agent_monitor)
  195. self.reprocessing_idle(AgentDelayStateData(saas_id, flow_id, call_id, agent_num, AgentServiceState.REPROCESSING, AgentScene.ROBOT))
  196. self.push_handler.push_on_call_end(saas_id, flow_id, agent_num, AgentScene.ROBOT, ServiceDirect.ROBOT_CALL.service_direct, "0")
  197. self.push_handler.push_on_agent_work_report(saas_id, flow_id, agent_num, call_id, AgentScene.ROBOT, WorkStatus.AGENT_HANG_REPROCESSING)
  198. self.push_handler.push_on_agent_report(saas_id, agent_num, AgentScene.ROBOT, AgentServiceState.REPROCESSING)
  199. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.REPROCESSING,
  200. AgentLogState.CHANNEL_HANG_UP, service_id=human_service_id)
  201. # self.data_handle_server.update_record(call_id, time_end=datetime.now())
  202. except:
  203. traceback.print_exc()
  204. finally:
  205. latency = (time.time() - start_time)
  206. registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
  207. def reprocessing_idle(self, state_data: AgentDelayStateData):
  208. agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)
  209. if not agent:
  210. return
  211. agent_monitor = self.data_handle_server.get_agent_monitor(state_data.saas_id, state_data.agent_num)
  212. if not agent_monitor:
  213. return
  214. self.logger.info('reprocessing_idle %s %s %s', state_data.saas_id, state_data.agent_num, agent_monitor)
  215. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  216. self.logger.info('reprocessing_idle_end')
  217. self.agent_monitor_service.update_idle(agent_monitor)
  218. self.push_handler.push_on_agent_work_report(state_data.saas_id, state_data.flow_id, state_data.agent_num, state_data.call_id, state_data.scene, WorkStatus.AGENT_HANG_IDLE)
  219. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.REPROCESSING_IDLE)
  220. class AgentOperService:
  221. def __init__(self, app):
  222. self.app = app
  223. self.logger = app.logger
  224. self.redis_handler = RedisHandler()
  225. self.push_handler = PushHandler(app.logger)
  226. self.data_handle_server = DataHandleServer(app)
  227. self.agent_monitor_service = AgentMonitorService(app)
  228. self.agent_actionlog_service = AgentActionLogService(app)
  229. self.agent_state_service = AgentStateService(app)
  230. self.agent_heartbeat_expire = 30
  231. self.agent_serial_live_expire = 60*10
  232. self.agent_heartbeat_job_scheduler = BackgroundScheduler()
  233. self.agent_heartbeat_job_scheduler.add_job(self.agent_heartbeat_daemon, 'interval', seconds=1, max_instances=1, name='agent_heartbeat_daemon')
  234. self.agent_heartbeat_job_scheduler.start()
  235. def agent_heartbeat_daemon(self):
  236. def check_out_daemon(_name, key, value):
  237. try:
  238. sec = datetime.now().timestamp() - float(value)
  239. if sec > self.agent_heartbeat_expire:
  240. self.redis_handler.redis.hdel(_name, key)
  241. self.logger.error("agent heartbeat expired, will checkout %s %s", key, value)
  242. # self.checkout(AgentActionRequest(saas_id=SAAS_ID, agent_id=key, agent_number=key))
  243. except:
  244. traceback.print_exc()
  245. def check_agent_live_daemon(_members):
  246. key = CENTER_AGENT_LIVE_CNT % SAAS_ID
  247. pre_val = self.redis_handler.redis.get(key)
  248. if not pre_val:
  249. if not _members or len(_members) == 0:
  250. value = datetime.now().timestamp()
  251. self.redis_handler.redis.set(key, value, ex=60*60, nx=True)
  252. else:
  253. diff = datetime.now().timestamp() - float(pre_val)
  254. # self.logger.info('check_agent_live_daemon, members=%s, diff=%s, pre_val=%s', (len(_members) if _members else 0), diff, pre_val)
  255. if diff > self.agent_serial_live_expire:
  256. self.logger.warn('WARING::live agent count less than 1 serial ten minutes')
  257. self.data_handle_server.create_warning_record(1, '10分钟空岗报警')
  258. self.redis_handler.redis.delete(key)
  259. if _members and len(_members) > 0:
  260. self.redis_handler.redis.delete(key)
  261. name = CENTER_AGENT_HEARTBEAT % SAAS_ID
  262. members = self.redis_handler.redis.hgetall(name)
  263. check_agent_live_daemon(members)
  264. if not members:
  265. return
  266. registry.MANUAL_AGENT_LIVES.set(len(members))
  267. for k,v in members.items():
  268. check_out_daemon(name, k, v)
  269. def __del__(self):
  270. self.agent_heartbeat_job_scheduler.shutdown()
  271. @with_app_context
  272. def enable(self, req: AgentActionRequest):
  273. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  274. if agent.agent_state == AgentState.ENABLE.code:
  275. return
  276. agent.agent_state = AgentState.ENABLE.code
  277. db.session.commit()
  278. @with_app_context
  279. def disable(self, req: AgentActionRequest):
  280. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  281. if agent.agent_state == AgentState.DISABLE.code:
  282. return
  283. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_number)
  284. if agent_monitor.check_state == AgentCheck.IN.code and \
  285. agent_monitor.service_state == AgentServiceState.CALLING.code:
  286. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  287. agent.phone_num = ''
  288. agent.agent_state = AgentState.DISABLE.code
  289. db.session.commit()
  290. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  291. phone.is_delete = 1
  292. db.session.commit()
  293. @with_app_context
  294. def checkin(self, req: AgentActionRequest):
  295. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  296. if not agent or agent.agent_state == AgentState.DISABLE.code:
  297. raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN)
  298. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  299. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  300. # agent_monitor.check_scene = req.scene
  301. self.agent_monitor_service.update_checkin(agent_monitor,req.scene)
  302. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN)
  303. self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num)
  304. print("checkin", agent_monitor,agent_monitor.check_state)
  305. if req.scene == AgentScene.MANUAL.code:
  306. # 如果是手动外呼增加置忙
  307. self._handle_idle(req.scene, agent)
  308. return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene)
  309. @with_app_context
  310. def checkout(self, req: AgentActionRequest):
  311. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  312. if not agent or agent.agent_state == AgentState.DISABLE.code:
  313. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  314. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  315. if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code:
  316. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  317. if agent_monitor.check_state == AgentCheck.OUT.code:
  318. return self._push_event_for_checkout(agent, req.scene)
  319. self.agent_monitor_service.update_checkout(agent_monitor)
  320. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT)
  321. self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num)
  322. return self._push_event_for_checkout(agent, req.scene)
  323. @with_app_context
  324. def busy(self, req: AgentActionRequest):
  325. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  326. if not agent or agent.agent_state == AgentState.DISABLE.code:
  327. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  328. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  329. if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code:
  330. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  331. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  332. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  333. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  334. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  335. self._push_event_for_busy(agent, req.scene)
  336. return
  337. self.agent_monitor_service.update_busy(agent_monitor)
  338. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY)
  339. self._push_event_for_busy(agent, req.scene)
  340. @with_app_context
  341. def idle(self, req: AgentActionRequest):
  342. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  343. if not agent or agent.agent_state == AgentState.DISABLE.code:
  344. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  345. self._handle_idle(req.scene, agent)
  346. @with_app_context
  347. def assign(self, req: AgentActionRequest):
  348. return self.agent_state_service.assign_agent(req.saas_id, req.service_id)
  349. def idle_agent_exist(self, request: AgentActionRequest):
  350. pass
  351. @with_app_context
  352. def agent_state(self,req: AgentActionRequest):
  353. # agent = _get_agent(req.saas_id, req.agent_id)
  354. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, req.agent_id)
  355. return agent_monitor.service_state
  356. @with_app_context
  357. def turn_on(self, req: AgentActionRequest):
  358. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  359. agent_monitor = self.data_handle_server.get_agent_monitor(req.saas_id, agent.agent_num)
  360. agent_scene = AgentScene.get_by_code(req.scene)
  361. if not agent_monitor:
  362. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  363. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  364. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  365. return
  366. self.agent_monitor_service.update_calling(agent_monitor)
  367. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  368. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  369. @with_app_context
  370. def _handle_idle(self, scene, agent):
  371. agent_monitor = self.data_handle_server.get_agent_monitor(agent.saas_id, agent.agent_num)
  372. print('agent_monitor:', agent_monitor, agent)
  373. if agent_monitor.check_state == AgentCheck.OUT.code:
  374. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  375. if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code:
  376. raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG)
  377. if scene == AgentScene.ROBOT.code:
  378. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  379. else:
  380. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  381. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  382. self._push_event_for_idle(agent, scene)
  383. return
  384. self.agent_monitor_service.update_idle(agent_monitor)
  385. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE)
  386. self._push_event_for_idle(agent, scene)
  387. def _push_event_for_checkin(self, agent, agent_monitor, phone, scene):
  388. """坐席签入事件推送"""
  389. agent_scene = AgentScene.get_by_code(scene)
  390. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS)
  391. event_data = AgentEventData(agent.saas_id, agent.out_id)
  392. event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code,
  393. 'ext': {'saasId': agent.saas_id,
  394. 'agentId': agent.out_id,
  395. 'phoneNum': phone.phone_num,
  396. 'phonePwd': phone.phone_pwd,
  397. 'sipServer': phone.sip_server
  398. }
  399. }
  400. return event_data.__dict__
  401. def _push_event_for_checkout(self,agent,scene):
  402. """签出事件推送"""
  403. agent_scene = AgentScene.get_by_code(scene)
  404. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功')
  405. event_data = AgentEventData(agent.saas_id, agent.out_id)
  406. event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code,
  407. 'ext': {'saasId': agent.saas_id,
  408. 'agentId': agent.out_id,
  409. }
  410. }
  411. return event_data.__dict__
  412. def _push_event_for_busy(self,agent,scene):
  413. """置忙事件推送"""
  414. agent_scene = AgentScene.get_by_code(scene)
  415. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY)
  416. pass
  417. def _push_event_for_idle(self, agent, scene):
  418. """坐席置闲事件推送"""
  419. agent_scene = AgentScene.get_by_code(scene)
  420. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY)
  421. class AgentService:
  422. def __init__(self, app):
  423. self.app = app
  424. self.logger = app.logger
  425. self.data_handle_server = DataHandleServer(app)
  426. self.agent_monitor_service = AgentMonitorService(app)
  427. @with_app_context
  428. def get_and_check(self, req: AgentActionRequest):
  429. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_id)
  430. if not agent:
  431. return {}
  432. phone = self.data_handle_server.get_phone(req.saas_id, agent.phone_num)
  433. return phone.to_dict()
  434. @with_app_context
  435. def watch_agent_state(self, req: HumanServiceQueryRequest):
  436. pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id,
  437. HumanServiceMap.service_id == req.serviceId).all()
  438. agent_ids = [x.agent_id for x in pos]
  439. monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids)
  440. return monitors
  441. @with_app_context
  442. def add(self, req: AgentRequest):
  443. new_agent_num = self.data_handle_server.get_newest_agent_number(req.saas_id)
  444. agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id,
  445. agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number,
  446. distribute=req.distribute, agent_state=req.agent_state)
  447. db.session.add(agent)
  448. db.session.commit()
  449. agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id)
  450. db.session.add(agent_monitor)
  451. db.session.commit()
  452. return new_agent_num
  453. @with_app_context
  454. def update(self, req: AgentRequest):
  455. agent = self.data_handle_server.get_agent(req.saas_id, req.agent_number, req.out_id)
  456. if not agent:
  457. return
  458. phone_num = agent.phone_num
  459. state_change = agent.agent_state != req.agent_state
  460. disable = req.agent_state == AgentState.DISABLE
  461. agent.agent_name = req.agent_name
  462. agent.agent_pwd = req.agent_password
  463. agent.agent_type = req.agent_type
  464. agent.phone_num = req.phone_number if not disable else ''
  465. agent.distribute = req.distribute
  466. agent.agent_state = req.agent_state
  467. db.session.commit()
  468. if state_change and disable:
  469. phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first()
  470. if phone:
  471. db.session.delete(phone)
  472. @with_app_context
  473. def detail(self, saas_id, agent_number):
  474. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  475. return agent
  476. @with_app_context
  477. def count(self, req: AgentQueryRequest):
  478. cnt = Agent.query.filter(Agent.saas_id == req.saas_id,
  479. or_(Agent.agent_num == req.agent_number,
  480. Agent.agent_name.contains(req.agent_name),
  481. Agent.out_id == req.out_id,
  482. Agent.agent_type == req.agent_type
  483. )
  484. ).count()
  485. return cnt
  486. @with_app_context
  487. def query_page(self, req: AgentQueryRequest):
  488. pagination = Agent.query.filter(Agent.saas_id == req.saas_id,
  489. or_(Agent.agent_num == req.agent_number,
  490. Agent.agent_name.contains(req.agent_name),
  491. Agent.out_id == req.out_id,
  492. Agent.agent_type == req.agent_type
  493. )
  494. ).paginate(req.page, req.size)
  495. # data = {
  496. # "page": pagination.page, # 当前页码
  497. # "pages": pagination.pages, # 总页码
  498. # "has_prev": pagination.has_prev, # 是否有上一页
  499. # "prev_num": pagination.prev_num, # 上一页页码
  500. # "has_next": pagination.has_next, # 是否有下一页
  501. # "next_num": pagination.next_num, # 下一页页码
  502. # "items": [{
  503. # "id": item.id,
  504. # "name": item.name,
  505. # "age": item.age,
  506. # "sex": item.sex,
  507. # "money": item.money,
  508. # } for item in pagination.items]
  509. # }
  510. return pagination
  511. @with_app_context
  512. def delete(self, saas_id, agent_number):
  513. agent = self.data_handle_server.get_agent(saas_id, agent_number=agent_number, out_id='')
  514. if not agent:
  515. return
  516. agent.is_delete = 1
  517. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number)
  518. agent_monitor.is_delete = 1
  519. db.session.commit()
  520. phone = self.data_handle_server.get_phone(saas_id, agent.phone_num)
  521. phone.is_delete = 1
  522. db.session.commit()
  523. class AgentMonitorService:
  524. def __init__(self, app):
  525. self.app = app
  526. self.logger = app.logger
  527. self.data_handle_server = DataHandleServer(app)
  528. @with_app_context
  529. def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None):
  530. if not out_ids:
  531. return []
  532. agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all()
  533. if not agents:
  534. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  535. agent_num_map = {x.agent_num: x for x in agents}
  536. agent_nums = [x.agent_num for x in agents]
  537. agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all()
  538. res = []
  539. for agent_monitor in agent_monitors:
  540. agent = agent_num_map.get(agent_monitor.agent_num)
  541. data = AgentMonitorData()
  542. data.saas_id = saas_id
  543. data.agent_num = agent_monitor.agent_num
  544. data.agent_name = agent.agent_name
  545. data.out_id = agent_monitor.out_id
  546. data.check_state = agent_monitor.check_state
  547. data.check_scene = agent_monitor.check_scene
  548. data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0
  549. if agent_monitor.check_in_time:
  550. data.check_in_time = agent_monitor.check_in_time.timestamp()
  551. day_start = self._get_day_start()
  552. if agent_monitor.check_in_time.timestamp() > day_start and \
  553. agent_monitor.check_state == AgentCheck.OUT.code:
  554. data.online_state = 2
  555. data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None
  556. data.service_state = agent_monitor.service_state
  557. data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None
  558. data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None
  559. data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None
  560. data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None
  561. data.heart_state = agent_monitor.heart_state
  562. data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None
  563. data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None
  564. res.append(data.__dict__)
  565. return res
  566. @with_app_context
  567. def update_checkin(self, agent_monitor,scene=None):
  568. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  569. agent_monitor.check_state = AgentCheck.IN.code
  570. agent_monitor.check_in_time = datetime.now()
  571. agent_monitor.heart_state = AgentHeartState.NORMAL.code
  572. agent_monitor.heart_time = datetime.now()
  573. agent_monitor.check_scene = scene
  574. self.logger.info("update_checkin %s", agent_monitor.check_state)
  575. db.session.commit()
  576. @with_app_context
  577. def update_checkout(self, agent_monitor):
  578. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  579. agent_monitor.check_state = AgentCheck.OUT.code
  580. agent_monitor.check_out_time = datetime.now()
  581. agent_monitor.service_state = AgentServiceState.LOGOUT.code
  582. agent_monitor.heart_state = AgentHeartState.DEFAULT.code
  583. agent_monitor.heart_time = datetime.now()
  584. self.logger.info("update_checkout %s", agent_monitor.check_out_time)
  585. db.session.commit()
  586. @with_app_context
  587. def update_idle(self, agent_monitor):
  588. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  589. agent_monitor.service_state = AgentServiceState.IDLE.code
  590. agent_monitor.idle_time = datetime.now()
  591. db.session.commit()
  592. @with_app_context
  593. def update_busy(self, agent_monitor):
  594. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  595. agent_monitor.service_state = AgentServiceState.BUSY.code
  596. agent_monitor.busy_time = datetime.now()
  597. db.session.commit()
  598. @with_app_context
  599. def update_dialing(self, agent_monitor):
  600. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  601. agent_monitor.service_state = AgentServiceState.DIALING.code
  602. db.session.commit()
  603. @with_app_context
  604. def update_calling(self, agent_monitor):
  605. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  606. agent_monitor.service_state = AgentServiceState.CALLING.code
  607. agent_monitor.call_time = datetime.now()
  608. db.session.commit()
  609. @with_app_context
  610. def update_processing(self, agent_monitor):
  611. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  612. agent_monitor.service_state = AgentServiceState.REPROCESSING.code
  613. agent_monitor.hang_time = datetime.now()
  614. db.session.commit()
  615. @with_app_context
  616. def update_session_id(self, agent_monitor, session_id):
  617. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  618. agent_monitor.session_id = session_id
  619. db.session.commit()
  620. @with_app_context
  621. def update_heart_error(self, agent_monitor):
  622. agent_monitor = db.session.query(AgentMonitor).get(agent_monitor.id)
  623. agent_monitor.heart_state = AgentHeartState.ABNORMAL.code
  624. agent_monitor.heart_time = datetime.now()
  625. db.session.commit()
  626. @with_app_context
  627. def _get_day_start(self):
  628. today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  629. return int(today_start.timestamp() * 1000) # Return milliseconds
  630. class AgentActionLogService:
  631. def __init__(self, app):
  632. self.app = app
  633. self.logger = app.logger
  634. self.data_handle_server = DataHandleServer(app)
  635. @with_app_context
  636. def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState):
  637. action_log = AgentActionLog()
  638. action_log.saas_id = agent_monitor.saas_id
  639. action_log.agent_num = agent_monitor.agent_num
  640. action_log.out_id = agent_monitor.out_id
  641. action_log.action_type = 0
  642. action_log.check_state = agent_check_enum.code
  643. action_log.pre_check_state = agent_monitor.check_state
  644. if agent_log_enum:
  645. action_log.event_type = agent_log_enum.code
  646. action_log.event_desc = agent_log_enum.description
  647. now = datetime.now()
  648. pre_date = None
  649. if agent_monitor.check_state == AgentCheck.IN.code:
  650. pre_date = agent_monitor.check_in_time
  651. if agent_monitor.check_state == AgentCheck.OUT.code:
  652. pre_date = agent_monitor.check_out_time
  653. if pre_date is None:
  654. pre_date = agent_monitor.update_time
  655. action_log.check_state_time = now
  656. action_log.pre_check_state_time = pre_date
  657. action_log.check_state_duration = now.timestamp() - pre_date.timestamp()
  658. db.session.add(action_log)
  659. db.session.commit()
  660. @with_app_context
  661. def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState,
  662. task_id=None, service_id=None):
  663. if agent_monitor.service_state == agent_service_state.code:
  664. self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code)
  665. action_log = AgentActionLog()
  666. action_log.saas_id = agent_monitor.saas_id
  667. action_log.agent_num = agent_monitor.agent_num
  668. action_log.out_id = agent_monitor.out_id
  669. action_log.action_type = 1
  670. action_log.service_state = agent_service_state.code
  671. action_log.pre_service_state = agent_monitor.service_state
  672. action_log.task_id = task_id
  673. action_log.service_id = service_id
  674. if agent_log_enum:
  675. action_log.event_type = agent_log_enum.code
  676. action_log.event_desc = agent_log_enum.description
  677. now = datetime.now()
  678. pre_date = None
  679. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  680. pre_date = agent_monitor.idle_time
  681. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  682. pre_date = agent_monitor.busy_time
  683. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  684. pre_date = agent_monitor.call_time
  685. if agent_monitor.service_state == AgentServiceState.LOGOUT.code:
  686. pre_date = agent_monitor.check_out_time
  687. if agent_monitor.service_state == AgentServiceState.REPROCESSING.code:
  688. pre_date = agent_monitor.hang_time
  689. action_log.service_state_time = now
  690. if pre_date is None:
  691. pre_date = agent_monitor.update_time
  692. action_log.pre_service_state_time = pre_date
  693. action_log.service_state_duration = now.timestamp() - pre_date.timestamp()
  694. db.session.add(action_log)
  695. db.session.commit()
  696. class AgentStateService:
  697. def __init__(self, app):
  698. self.app = app
  699. self.logger = app.logger
  700. self.redis_handler = RedisHandler()
  701. self.assigned_recycle_millisecond = 30 * 1000
  702. # self.state_service_id_data_map = defaultdict(dict)
  703. # self.executor = ThreadPoolExecutor(max_workers=10)
  704. self.data_handle_server = DataHandleServer(app)
  705. self.agent_monitor_service = AgentMonitorService(app)
  706. self.agent_actionlog_service = AgentActionLogService(app)
  707. def idle(self, saas_id, agent_id, phone_num):
  708. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  709. if human_service is None:
  710. self.logger.info(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log
  711. return
  712. self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id)
  713. def busy(self, saas_id, agent_id, phone_num):
  714. human_service = self.data_handle_server.get_human_service_service(saas_id, agent_id)
  715. if human_service is None:
  716. self.logger.info(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log
  717. return
  718. self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id)
  719. def idle_by_human(self, saas_id, agent_id, service_id):
  720. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  721. if not agent:
  722. return
  723. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  724. if not agent_monitor:
  725. return
  726. if agent_monitor.check_state == AgentCheck.IN.code:
  727. self.agent_monitor_service.update_idle(agent_monitor)
  728. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE)
  729. self.idle_hash(saas_id, agent_id, agent.phone_num, service_id)
  730. def busy_by_human(self, saas_id, service_id, agent_id=None):
  731. agent = self.data_handle_server.get_agent(saas_id, out_id=agent_id)
  732. if not agent:
  733. return
  734. agent_monitor = self.data_handle_server.get_agent_monitor(saas_id, agent_number=agent.agent_num)
  735. if not agent_monitor:
  736. return
  737. if agent_monitor.check_state == AgentCheck.IN.code:
  738. self.agent_monitor_service.update_busy(agent_monitor)
  739. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY,
  740. AgentLogState.DEL_HUMAN_SERVICE)
  741. self.busy_hash(saas_id, agent_id, agent.phone_num, service_id)
  742. def checkin(self, saas_id, agent_id, phone_num):
  743. key = self._check_in_key(saas_id)
  744. state_data = AgentStateData()
  745. state_data.status = HumanState.DEFAULT.code
  746. state_data.time = datetime.now().timestamp()
  747. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  748. self.redis_handler.redis.expire(key, self._get_expire_time())
  749. def checkout(self, saas_id, agent_id, phone_num):
  750. key = self._check_in_key(saas_id)
  751. self.redis_handler.redis.hdel(key, phone_num)
  752. self.busy(saas_id, agent_id, phone_num)
  753. def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
  754. choose_phone_num = ''
  755. lock = threading.Lock()
  756. try:
  757. lock.acquire()
  758. self.logger.info("assignAgent %s %s %s"% (saas_id, service_id, called))
  759. idle_agents = self.idle_agents(saas_id, service_id)
  760. if len(idle_agents) <= 0:
  761. return choose_phone_num
  762. choose_phone_num = self._choose_max_idle_time(idle_agents)
  763. self.handle_assign_time(saas_id, service_id, choose_phone_num)
  764. self.handle_lock_agent(choose_phone_num, saas_id, service_id)
  765. finally:
  766. lock.release()
  767. return choose_phone_num
  768. def handle_check_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'):
  769. key = self._lock_key(saas_id, service_id, choose_phone_num)
  770. res = self.redis_handler.redis.get(key)
  771. self.logger.info('checkAgent %s %s %s %s'% (saas_id, service_id, choose_phone_num, res))
  772. return False if res else True
  773. def handle_lock_agent(self, choose_phone_num, saas_id, service_id='00000000000000000'):
  774. key = self._lock_key(saas_id, service_id, choose_phone_num)
  775. expire = self._get_expire_time()
  776. self.redis_handler.redis.set(key, 1, nx=True, ex=expire)
  777. res = self.redis_handler.redis.get(key)
  778. self.logger.info('lockAgent %s %s %s %s %s'% (saas_id, service_id, choose_phone_num, expire, res))
  779. def handle_release_agent_lock(self, choose_phone_num, saas_id, service_id='00000000000000000'):
  780. key = self._lock_key(saas_id, service_id, choose_phone_num)
  781. # self.redis_handler.redis.delete(key)
  782. self.redis_handler.redis.expire(key, 3)
  783. self.logger.info('releaseAgent %s %s %s'% (saas_id, service_id, choose_phone_num))
  784. def handle_assign_time(self, saas_id, service_id, choose_phone_num):
  785. key = self._key(saas_id, service_id)
  786. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  787. if cache_agent_map and choose_phone_num in cache_agent_map:
  788. state_data = cache_agent_map[choose_phone_num]
  789. state_data.assign_time = datetime.now().timestamp() * 1000
  790. self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string())
  791. self.redis_handler.redis.expire(key, self._get_expire_time())
  792. # self.update_report_state(saas_id, service_id)
  793. def idle_agents(self, saas_id, service_id):
  794. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  795. if not cache_agent_list:
  796. return []
  797. agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list])
  798. self.logger.info("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str))
  799. return self.get_idle_agents(cache_agent_list)
  800. def idle_hash(self, saas_id, agent_id, phone_num, service_id):
  801. key = self._key(saas_id, service_id)
  802. state_data = AgentStateData()
  803. # self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s", key, saas_id, phone_num)
  804. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  805. if cache_agent_map and phone_num in cache_agent_map:
  806. state_data = cache_agent_map[phone_num]
  807. state_data.status = HumanState.IDLE.code
  808. state_data.time = datetime.now().timestamp() * 1000
  809. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  810. self.redis_handler.redis.expire(key, self._get_expire_time())
  811. # self.update_report_state(saas_id, service_id)
  812. self.logger.info("idle_hash, key=%s, saas_id=%s, phone_num=%s, state_data=%s"%(key, saas_id, phone_num, state_data))
  813. def busy_hash(self, saas_id, agent_id, phone_num, service_id):
  814. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  815. state_data = cache_agent_map.get(phone_num)
  816. key = self._key(saas_id, service_id)
  817. if state_data is None:
  818. return
  819. state_data.status = HumanState.BUSY.code
  820. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  821. self.redis_handler.redis.expire(key, self._get_expire_time())
  822. # self.update_report_state(saas_id, service_id)
  823. def get_cache_agent_map(self, saas_id, service_id):
  824. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  825. # 检查列表是否为空,如果为空返回空字典
  826. if not cache_agent_list:
  827. return {}
  828. # 使用字典推导式将 cache_agent_list 转换为字典
  829. return {agent.phone_num: agent for agent in cache_agent_list}
  830. def get_cache_agent_list(self, saas_id, service_id):
  831. redis_key = self._key(saas_id, service_id)
  832. map_cache_by_key = self.redis_handler.redis.hgetall(redis_key)
  833. self.logger.info("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key))
  834. if not map_cache_by_key: # 检查字典是否为空
  835. return [] # 返回空列表
  836. free_agents = []
  837. for phone_num, json_value in map_cache_by_key.items():
  838. agent_status_data_dict = json.loads(json_value)
  839. agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象
  840. agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码
  841. free_agents.append(agent_status_data)
  842. return free_agents
  843. def get_idle_agents(self,cache_agent_list):
  844. # current_time =int(datetime.now().timestamp() * 1000) # 获取当前时间的毫秒级时间戳
  845. idle_agents = [
  846. agent for agent in cache_agent_list
  847. if agent.status == 1 and self.handle_check_agent_lock(agent.phone_num, SAAS_ID)
  848. #and (
  849. # agent.assign_time == 0 or
  850. # agent.assign_time + self.assigned_recycle_millisecond < current_time
  851. #)
  852. ]
  853. return idle_agents
  854. def get_agent_service_idle_size(self, saas_id, service_id):
  855. idle_agents_size = 0
  856. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  857. if cache_agent_list: # 检查列表是否非空
  858. idle_agents = self.get_idle_agents(cache_agent_list)
  859. idle_agents_size = len(idle_agents) # 获取空闲代理的数量
  860. return idle_agents_size
  861. def get_agent_service_busy_size(self, saas_id, service_id):
  862. busy_agents_size = 0
  863. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  864. if cache_agent_list: # 检查列表是否非空
  865. idle_agents = self.get_idle_agents(cache_agent_list)
  866. busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理
  867. busy_agents_size = len(busy_agents) # 获取忙碌代理的数量
  868. return busy_agents_size
  869. # def update_report_state(self, saas_id, service_id):
  870. # key = self._key(saas_id, service_id)
  871. # # data_map 这个地方有疑问
  872. # data_map = self.state_service_id_data_map[key]
  873. # idle = HumanState.IDLE
  874. # if idle.value not in data_map:
  875. # data_map[idle.code] = threading.Lock()
  876. # self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
  877. # # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
  878. # busy = HumanState.BUSY
  879. # if busy.value not in data_map:
  880. # data_map[busy.code] = threading.Lock()
  881. # self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
  882. # # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
  883. #
  884. # def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
  885. # name = "cti_center_real_time_human_service_state"
  886. # tag_list = {
  887. # "vcc_id": saas_id,
  888. # "service_id": service_id,
  889. # "state": human_state.code,
  890. # }
  891. # if human_state == HumanState.IDLE:
  892. # # meter_registry 这块疑问
  893. # self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
  894. # elif human_state == HumanState.BUSY:
  895. # self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
  896. # return 0
  897. def _check_in_key(self, saas_id):
  898. return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
  899. def _key(self, saas_id, service_id):
  900. return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
  901. def _lock_key(self, saas_id, service_id, choose_phone_num):
  902. return "CTI:%s:HUMAN:%s:%s:LOCK"%(saas_id.upper(), service_id, choose_phone_num)
  903. def _get_expire_time(self):
  904. now = datetime.now()
  905. end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
  906. expire_time = (end_of_day - now).total_seconds() # Convert to milliseconds
  907. return int(expire_time)
  908. def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
  909. idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)
  910. return idle_agents[0].phone_num