agent.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import traceback
  4. from typing import List
  5. from collections import defaultdict
  6. from src.core.callcenter.constant import START_AGENT_NUM
  7. from src.core.callcenter.enumeration import AgentState, AgentCheck, AgentHeartState, AgentServiceState, AgentLogState, \
  8. AgentScene, BizErrorCode, WorkStatus, DownEvent, HumanState
  9. from sqlalchemy import or_
  10. from src.core.callcenter.dao import *
  11. from src.core.callcenter.exception import BizException
  12. from src.core.callcenter.api import AgentActionRequest, AgentInfo, AgentQueryRequest, AgentRequest, AgentEventData, \
  13. AgentStateData, HumanServiceQueryRequest, AgentMonitorData
  14. from src.core.callcenter.push import PushHandler
  15. from src.core.datasource import RedisHandler
  16. from concurrent.futures import ThreadPoolExecutor
  17. import threading
  18. class AgentOperService:
  19. def __init__(self, client, logger):
  20. self.inbound_client = client
  21. self.logger = logger
  22. self.push_handler = PushHandler(logger)
  23. self.agent_monitor_service = AgentMonitorService(client, logger)
  24. self.agent_actionlog_service = AgentActionLogService(client, logger)
  25. self.agent_state_service = AgentStateService(client, logger)
  26. def enable(self, req: AgentActionRequest):
  27. agent = _get_agent(req.saas_id, req.agent_number, req.out_id)
  28. if agent.agent_state == AgentState.ENABLE.code:
  29. return
  30. agent.agent_state = AgentState.ENABLE.code
  31. db.session.commit()
  32. def disable(self, req: AgentActionRequest):
  33. agent = _get_agent(req.saas_id, req.agent_id)
  34. if agent.agent_state == AgentState.DISABLE.code:
  35. return
  36. agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_number)
  37. if agent_monitor.check_state == AgentCheck.IN.code and \
  38. agent_monitor.service_state == AgentServiceState.CALLING.code:
  39. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  40. agent.phone_num = ''
  41. agent.agent_state = AgentState.DISABLE.code
  42. db.session.commit()
  43. phone = _get_phone(req.saas_id, agent.phone_num)
  44. phone.is_delete = 1
  45. db.session.commit()
  46. def checkin(self, req: AgentActionRequest):
  47. try:
  48. agent = _get_agent(req.saas_id, req.agent_id)
  49. if not agent or agent.agent_state == AgentState.DISABLE.code:
  50. raise BizException(BizErrorCode.ERROR_NOT_FOLLOW_CHECK_IN)
  51. phone = _get_phone(req.saas_id, agent.phone_num)
  52. agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num)
  53. agent_monitor.check_scene = req.scene
  54. self.agent_monitor_service.update_checkin(agent_monitor)
  55. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.IN, AgentLogState.CHECKIN)
  56. self.agent_state_service.checkin(agent.saas_id, agent.out_id, agent.phone_num)
  57. if req.scene == AgentScene.MANUAL.code:
  58. # 如果是手动外呼增加置闲
  59. self._handle_idle(req.scene, agent)
  60. return self._push_event_for_checkin(agent, agent_monitor, phone, req.scene)
  61. except Exception as e:
  62. self.logger.error(traceback.format_exc())
  63. raise e
  64. def checkout(self, req: AgentActionRequest):
  65. agent = _get_agent(req.saas_id, req.agent_id)
  66. if not agent or agent.agent_state == AgentState.DISABLE.code:
  67. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  68. agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num)
  69. if not agent_monitor or agent_monitor.service_state == AgentServiceState.CALLING.code:
  70. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  71. if agent_monitor.check_state == AgentCheck.OUT.code:
  72. return self._push_event_for_checkout(agent, req.scene)
  73. self.agent_monitor_service.update_checkout(agent_monitor)
  74. self.agent_actionlog_service.insert_check_state(agent_monitor, AgentCheck.OUT, AgentLogState.CHECKOUT)
  75. self.agent_state_service.checkout(agent.saas_id, agent.out_id, agent.phone_num)
  76. return self._push_event_for_checkout(agent, req.scene)
  77. def busy(self, req: AgentActionRequest):
  78. agent = _get_agent(req.saas_id, req.agent_id)
  79. if not agent or agent.agent_state == AgentState.DISABLE.code:
  80. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  81. agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num)
  82. if not agent_monitor or agent_monitor.check_state == AgentCheck.OUT.code:
  83. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  84. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  85. raise BizException(BizErrorCode.AGENT_CALLING_NOT_ALLOW_OPERATE)
  86. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  87. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  88. self._push_event_for_busy(agent, req.scene)
  89. return
  90. self.agent_monitor_service.update_busy(agent_monitor)
  91. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY, AgentLogState.BUSY)
  92. self._push_event_for_busy(agent, req.scene)
  93. def idle(self, req: AgentActionRequest):
  94. agent = _get_agent(req.saas_id, req.agent_id)
  95. if not agent or agent.agent_state == AgentState.DISABLE.code:
  96. raise BizException(BizErrorCode.AGENT_DISABLE_NOT_ALLOW_OPERATE)
  97. self._handle_idle(req.scene, agent)
  98. def assign(self, req: AgentActionRequest):
  99. return self.agent_state_service.assign_agent(req.saas_id, req.service_id)
  100. def idle_agent_exist(self, request: AgentActionRequest):
  101. pass
  102. def agent_state(self,req: AgentActionRequest):
  103. # agent = _get_agent(req.saas_id, req.agent_id)
  104. agent_monitor = _get_agent_monitor(req.saas_id, req.agent_id)
  105. return agent_monitor.service_state
  106. def turn_on(self, req: AgentActionRequest):
  107. agent = _get_agent(req.saas_id, req.agent_id)
  108. agent_monitor = _get_agent_monitor(req.saas_id, agent.agent_num)
  109. agent_scene = AgentScene.get_by_code(req.scene)
  110. if not agent_monitor:
  111. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  112. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  113. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  114. return
  115. self.agent_monitor_service.update_calling(agent_monitor)
  116. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  117. self.push_handler.push_on_agent_report(agent.saas_id, agent.out_id, agent_scene, AgentServiceState.BUSY)
  118. def _handle_idle(self, scene, agent):
  119. agent_monitor = _get_agent_monitor(agent.saas_id, agent.agent_num)
  120. if agent_monitor.check_state == AgentCheck.OUT.code:
  121. raise BizException(BizErrorCode.AGENT_CHECK_OUT_NOT_ALLOW_OPERATE)
  122. if agent_monitor.service_state == AgentServiceState.CALLING.code or agent_monitor.service_state == AgentServiceState.DIALING.code:
  123. raise BizException(BizErrorCode.AGENT_CALLING_NOT_HANG)
  124. if scene == AgentScene.ROBOT.code:
  125. self.agent_state_service.idle(agent.saas_id, agent.out_id, agent.phone_num)
  126. else:
  127. self.agent_state_service.busy(agent.saas_id, agent.out_id, agent.phone_num)
  128. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  129. self._push_event_for_idle(agent, scene)
  130. return
  131. self.agent_monitor_service.update_idle(agent_monitor)
  132. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.IDLE)
  133. self._push_event_for_idle(agent, scene)
  134. def _push_event_for_checkin(self, agent, agent_monitor, phone, scene):
  135. """坐席签入事件推送"""
  136. agent_scene = AgentScene.get_by_code(scene)
  137. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.LOGIN_SUCCESS)
  138. event_data = AgentEventData(agent.saas_id, agent.out_id)
  139. event_data.data = {'eventName': DownEvent.ON_INITAL_SUCCESS.code,
  140. 'ext': {'saasId': agent.saas_id,
  141. 'agentId': agent.out_id,
  142. 'phoneNum': phone.phone_num,
  143. 'phonePwd': phone.phone_pwd,
  144. 'sipServer': phone.sip_server
  145. }
  146. }
  147. return event_data.__dict__
  148. def _push_event_for_checkout(self,agent,scene):
  149. """签出事件推送"""
  150. agent_scene = AgentScene.get_by_code(scene)
  151. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.NO_INIT, '签出成功')
  152. event_data = AgentEventData(agent.saas_id, agent.out_id)
  153. event_data.data = {'eventName': DownEvent.ON_INITAL_FAILURE.code,
  154. 'ext': {'saasId': agent.saas_id,
  155. 'agentId': agent.out_id,
  156. }
  157. }
  158. return event_data.__dict__
  159. def _push_event_for_busy(self,agent,scene):
  160. """置忙事件推送"""
  161. agent_scene = AgentScene.get_by_code(scene)
  162. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_BUSY)
  163. pass
  164. def _push_event_for_idle(self, agent, scene):
  165. """坐席置闲事件推送"""
  166. agent_scene = AgentScene.get_by_code(scene)
  167. self.push_handler.push_on_agent_work_report(agent.saas_id, "", agent.out_id, "", agent_scene, WorkStatus.AGENT_READY)
  168. class AgentService:
  169. def __init__(self, client, logger):
  170. self.inbound_client = client
  171. self.logger = logger
  172. self.agent_monitor_service = AgentMonitorService(client, logger)
  173. def get_and_check(self, req: AgentActionRequest):
  174. agent = _get_agent(req.saas_id, req.agent_id)
  175. if not agent:
  176. return {}
  177. phone = _get_phone(req.saas_id, agent.phone_num)
  178. return phone.to_dict()
  179. def watch_agent_state(self, req: HumanServiceQueryRequest):
  180. pos = HumanServiceMap.query.filter(HumanServiceMap.is_delete == 0, HumanServiceMap.saas_id == req.saas_id,
  181. HumanServiceMap.service_id == req.serviceId).all()
  182. agent_ids = [x.agent_id for x in pos]
  183. monitors = self.agent_monitor_service.detail_monitor_out_ids(req.saas_id, agent_ids)
  184. return monitors
  185. def add(self, req: AgentRequest):
  186. new_agent_num = _get_newest_agent_number(req.saas_id)
  187. agent = Agent(saas_id=req.saas_id, agent_num=new_agent_num, agent_name=req.agent_name, out_id=req.out_id,
  188. agent_pwd=req.agent_password, agent_type=req.agent_type, phone_num=req.phone_number,
  189. distribute=req.distribute, agent_state=req.agent_state)
  190. db.session.add(agent)
  191. db.session.commit()
  192. agent_monitor = AgentMonitor(saas_id=req.saas_id, agent_num=new_agent_num, out_id=req.out_id)
  193. db.session.add(agent_monitor)
  194. db.session.commit()
  195. return new_agent_num
  196. def update(self, req: AgentRequest):
  197. agent = _get_agent(req.saas_id, req.agent_number, req.out_id)
  198. if not agent:
  199. return
  200. phone_num = agent.phone_num
  201. state_change = agent.agent_state != req.agent_state
  202. disable = req.agent_state == AgentState.DISABLE
  203. agent.agent_name = req.agent_name
  204. agent.agent_pwd = req.agent_password
  205. agent.agent_type = req.agent_type
  206. agent.phone_num = req.phone_number if not disable else ''
  207. agent.distribute = req.distribute
  208. agent.agent_state = req.agent_state
  209. db.session.commit()
  210. if state_change and disable:
  211. phone = Phone.query.filter(Phone.saas_id == req.saas_id, Phone.phone_num == phone_num).first()
  212. if phone:
  213. db.session.delete(phone)
  214. def detail(self, saas_id, agent_number):
  215. agent = _get_agent(saas_id, agent_number=agent_number, out_id='')
  216. return agent
  217. def count(self, req: AgentQueryRequest):
  218. cnt = Agent.query.filter(Agent.saas_id == req.saas_id,
  219. or_(Agent.agent_num == req.agent_number,
  220. Agent.agent_name.contains(req.agent_name),
  221. Agent.out_id == req.out_id,
  222. Agent.agent_type == req.agent_type
  223. )
  224. ).count()
  225. return cnt
  226. def query_page(self, req: AgentQueryRequest):
  227. pagination = Agent.query.filter(Agent.saas_id == req.saas_id,
  228. or_(Agent.agent_num == req.agent_number,
  229. Agent.agent_name.contains(req.agent_name),
  230. Agent.out_id == req.out_id,
  231. Agent.agent_type == req.agent_type
  232. )
  233. ).paginate(req.page, req.size)
  234. # data = {
  235. # "page": pagination.page, # 当前页码
  236. # "pages": pagination.pages, # 总页码
  237. # "has_prev": pagination.has_prev, # 是否有上一页
  238. # "prev_num": pagination.prev_num, # 上一页页码
  239. # "has_next": pagination.has_next, # 是否有下一页
  240. # "next_num": pagination.next_num, # 下一页页码
  241. # "items": [{
  242. # "id": item.id,
  243. # "name": item.name,
  244. # "age": item.age,
  245. # "sex": item.sex,
  246. # "money": item.money,
  247. # } for item in pagination.items]
  248. # }
  249. return pagination
  250. def delete(self, saas_id, agent_number):
  251. agent = _get_agent(saas_id, agent_number=agent_number, out_id='')
  252. if not agent:
  253. return
  254. agent.is_delete = 1
  255. agent_monitor = _get_agent_monitor(saas_id, agent_number)
  256. agent_monitor.is_delete = 1
  257. db.session.commit()
  258. phone = _get_phone(saas_id, agent.phone_num)
  259. phone.is_delete = 1
  260. db.session.commit()
  261. class AgentMonitorService:
  262. def __init__(self, client, logger):
  263. self.inbound_client = client
  264. self.logger = logger
  265. def detail_monitor_out_ids(self, saas_id, out_ids, check_scene=None):
  266. if not out_ids:
  267. return []
  268. agents = Agent.query.filter(Agent.is_delete == 0, Agent.saas_id == saas_id, Agent.out_id.in_(out_ids)).all()
  269. if not agents:
  270. raise BizException(BizErrorCode.RECORD_NOT_EXIST_ERROR)
  271. agent_num_map = {x.agent_num: x for x in agents}
  272. agent_nums = [x.agent_num for x in agents]
  273. agent_monitors = AgentMonitor.query.filter(AgentMonitor.is_delete == 0, AgentMonitor.agent_num.in_(agent_nums)).all()
  274. res = []
  275. for agent_monitor in agent_monitors:
  276. agent = agent_num_map.get(agent_monitor.agent_num)
  277. data = AgentMonitorData()
  278. data.saas_id = saas_id
  279. data.agent_num = agent_monitor.agent_num
  280. data.agent_name = agent.agent_name
  281. data.out_id = agent_monitor.out_id
  282. data.check_state = agent_monitor.check_state
  283. data.check_scene = agent_monitor.check_scene
  284. data.online_state = 1 if agent_monitor.check_state == AgentCheck.IN.code else 0
  285. if agent_monitor.check_in_time:
  286. data.check_in_time = agent_monitor.check_in_time.timestamp()
  287. day_start = self._get_day_start()
  288. if agent_monitor.check_in_time.timestamp() > day_start and \
  289. agent_monitor.check_state == AgentCheck.OUT.code:
  290. data.online_state = 2
  291. data.check_out_time = agent_monitor.check_out_time.timestamp() if agent_monitor.check_out_time else None
  292. data.service_state = agent_monitor.service_state
  293. data.busy_time = agent_monitor.busy_time.timestamp() if agent_monitor.busy_time else None
  294. data.idle_time = agent_monitor.idle_time.timestamp() if agent_monitor.idle_time else None
  295. data.call_time = agent_monitor.call_time.timestamp() if agent_monitor.call_time else None
  296. data.hang_time = agent_monitor.hang_time.timestamp() if agent_monitor.hang_time else None
  297. data.heart_state = agent_monitor.heart_state
  298. data.heart_time = agent_monitor.heart_time.timestamp() if agent_monitor.heart_time else None
  299. data.update_time = agent_monitor.update_time.timestamp() if agent_monitor.update_time else None
  300. res.append(data.__dict__)
  301. return res
  302. def update_checkin(self, agent_monitor):
  303. agent_monitor.check_state = AgentCheck.IN.code
  304. agent_monitor.check_in_time = datetime.utcnow()
  305. agent_monitor.heart_state = AgentHeartState.NORMAL.code
  306. agent_monitor.heart_time = datetime.utcnow()
  307. db.session.commit()
  308. def update_checkout(self, agent_monitor):
  309. agent_monitor.check_state = AgentCheck.OUT.code
  310. agent_monitor.check_out_time = datetime.utcnow()
  311. agent_monitor.service_state = AgentServiceState.LOGOUT.code
  312. agent_monitor.heart_state = AgentHeartState.DEFAULT.code
  313. agent_monitor.heart_time = datetime.utcnow()
  314. print("update_checkout", agent_monitor.check_out_time)
  315. db.session.commit()
  316. def update_idle(self, agent_monitor):
  317. agent_monitor.service_state = AgentServiceState.IDLE.code
  318. agent_monitor.idle_time = datetime.utcnow()
  319. db.session.commit()
  320. def update_busy(self, agent_monitor):
  321. agent_monitor.service_state = AgentServiceState.BUSY.code
  322. agent_monitor.busy_time = datetime.utcnow()
  323. db.session.commit()
  324. def update_dialing(self, agent_monitor):
  325. agent_monitor.service_state = AgentServiceState.DIALING.code
  326. db.session.commit()
  327. def update_calling(self, agent_monitor):
  328. agent_monitor.service_state = AgentServiceState.CALLING.code
  329. agent_monitor.call_time = datetime.utcnow()
  330. db.session.commit()
  331. def update_processing(self, agent_monitor):
  332. agent_monitor.service_state = AgentServiceState.REPROCESSING.code
  333. agent_monitor.hang_time = datetime.utcnow()
  334. db.session.commit()
  335. def update_session_id(self, agent_monitor, session_id):
  336. agent_monitor.session_id = session_id
  337. db.session.commit()
  338. def update_heart_error(self, agent_monitor):
  339. agent_monitor.heart_state = AgentHeartState.ABNORMAL.code
  340. agent_monitor.heart_time = datetime.utcnow()
  341. db.session.commit()
  342. def _get_day_start(self):
  343. today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
  344. return int(today_start.timestamp() * 1000) # Return milliseconds
  345. class AgentActionLogService:
  346. def __init__(self, client, logger):
  347. self.inbound_client = client
  348. self.logger = logger
  349. def insert_check_state(self, agent_monitor, agent_check_enum: AgentCheck, agent_log_enum: AgentLogState):
  350. action_log = AgentActionLog()
  351. action_log.saas_id = agent_monitor.saas_id
  352. action_log.agent_num = agent_monitor.agent_num
  353. action_log.out_id = agent_monitor.out_id
  354. action_log.action_type = 0
  355. action_log.check_state = agent_check_enum.code
  356. action_log.pre_check_state = agent_monitor.check_state
  357. if agent_log_enum:
  358. action_log.event_type = agent_log_enum.code
  359. action_log.event_desc = agent_log_enum.description
  360. now = datetime.utcnow()
  361. pre_date = None
  362. if agent_monitor.check_state == AgentCheck.IN.code:
  363. pre_date = agent_monitor.check_in_time
  364. if agent_monitor.check_state == AgentCheck.OUT.code:
  365. pre_date = agent_monitor.check_out_time
  366. if pre_date is None:
  367. pre_date = agent_monitor.update_time
  368. action_log.check_state_time = now
  369. action_log.pre_check_state_time = pre_date
  370. action_log.check_state_duration = now.timestamp() - pre_date.timestamp()
  371. db.session.add(action_log)
  372. db.session.commit()
  373. def insert_service_state(self, agent_monitor, agent_service_state: AgentServiceState, agent_log_enum: AgentLogState,
  374. task_id=None, service_id=None):
  375. if agent_monitor.service_state == agent_service_state.code:
  376. self.logger.info("agent action log insert service state same %s %s", agent_monitor.service_state, agent_service_state.code)
  377. action_log = AgentActionLog()
  378. action_log.saas_id = agent_monitor.saas_id
  379. action_log.agent_num = agent_monitor.agent_num
  380. action_log.out_id = agent_monitor.out_id
  381. action_log.action_type = 1
  382. action_log.service_state = agent_service_state.code
  383. action_log.pre_service_state = agent_monitor.service_state
  384. action_log.task_id = task_id
  385. action_log.service_id = service_id
  386. if agent_log_enum:
  387. action_log.event_type = agent_log_enum.code
  388. action_log.event_desc = agent_log_enum.description
  389. now = datetime.utcnow()
  390. pre_date = None
  391. if agent_monitor.service_state == AgentServiceState.IDLE.code:
  392. pre_date = agent_monitor.idle_time
  393. if agent_monitor.service_state == AgentServiceState.BUSY.code:
  394. pre_date = agent_monitor.busy_time
  395. if agent_monitor.service_state == AgentServiceState.CALLING.code:
  396. pre_date = agent_monitor.call_time
  397. if agent_monitor.service_state == AgentServiceState.LOGOUT.code:
  398. pre_date = agent_monitor.check_out_time
  399. if agent_monitor.service_state == AgentServiceState.REPROCESSING.code:
  400. pre_date = agent_monitor.hang_time
  401. action_log.service_state_time = now
  402. if pre_date is None:
  403. pre_date = agent_monitor.update_time
  404. action_log.pre_service_state_time = pre_date
  405. action_log.service_state_duration = now.timestamp() - pre_date.timestamp()
  406. db.session.add(action_log)
  407. db.session.commit()
  408. class AgentStateService:
  409. def __init__(self, client, logger):
  410. self.inbound_client = client
  411. self.logger = logger
  412. self.redis_handler = RedisHandler()
  413. self.assigned_recycle_millisecond = 60000
  414. self.state_service_id_data_map = defaultdict(dict)
  415. self.executor = ThreadPoolExecutor(max_workers=10)
  416. self.agent_monitor_service = AgentMonitorService(client, logger)
  417. self.agent_actionlog_service = AgentActionLogService(client, logger)
  418. def idle(self, saas_id, agent_id, phone_num):
  419. human_service = _get_human_service_service(saas_id, agent_id)
  420. if human_service is None:
  421. print(f"agent engine idle not have human service {saas_id} {agent_id}") # 使用print替代log
  422. return
  423. self.idle_hash(saas_id, agent_id, phone_num, human_service.service_id)
  424. def busy(self, saas_id, agent_id, phone_num):
  425. human_service = _get_human_service_service(saas_id, agent_id)
  426. if human_service is None:
  427. print(f"agent engine busy not hava human service {saas_id} {agent_id}") # 使用print替代log
  428. return
  429. self.busy_hash(saas_id, agent_id, phone_num, human_service.service_id)
  430. def idle_by_human(self, saas_id, agent_id, service_id):
  431. agent = _get_agent(saas_id, out_id=agent_id)
  432. if not agent:
  433. return
  434. agent_monitor = _get_agent_monitor(saas_id, agent_number=agent.agent_num)
  435. if not agent_monitor:
  436. return
  437. if agent_monitor.check_state == AgentCheck.IN.code:
  438. self.agent_monitor_service.update_idle(agent_monitor)
  439. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.IDLE, AgentLogState.ACTIVE_HUMAN_SERVICE)
  440. self.idle_hash(saas_id, agent_id, agent.phone_num, service_id)
  441. def busy_by_human(self, saas_id, service_id, agent_id=None):
  442. agent = _get_agent(saas_id, out_id=agent_id)
  443. if not agent:
  444. return
  445. agent_monitor = _get_agent_monitor(saas_id, agent_number=agent.agent_num)
  446. if not agent_monitor:
  447. return
  448. if agent_monitor.check_state == AgentCheck.IN.code:
  449. self.agent_monitor_service.update_busy(agent_monitor)
  450. self.agent_actionlog_service.insert_service_state(agent_monitor, AgentServiceState.BUSY,
  451. AgentLogState.DEL_HUMAN_SERVICE)
  452. self.busy_hash(saas_id, agent_id, agent.phone_num, service_id)
  453. def checkin(self, saas_id, agent_id, phone_num):
  454. key = self._check_in_key(saas_id)
  455. state_data = AgentStateData()
  456. state_data.status = HumanState.DEFAULT.code
  457. state_data.time = datetime.utcnow().timestamp()
  458. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  459. self.redis_handler.redis.expire(key, self._get_expire_time())
  460. def checkout(self, saas_id, agent_id, phone_num):
  461. key = self._check_in_key(saas_id)
  462. self.redis_handler.redis.hdel(key, phone_num)
  463. def assign_agent(self, saas_id, service_id, called=None, ivr_id=None, task_id=None, cbp=None):
  464. choose_phone_num = ''
  465. print("assignAgent %s %s %s"% (saas_id, service_id, called), flush=True)
  466. idle_agents = self.idle_agents(saas_id, service_id)
  467. if len(idle_agents) <= 0:
  468. return choose_phone_num
  469. choose_phone_num = self._choose_max_idle_time(idle_agents)
  470. self.handle_assign_time(saas_id, service_id, choose_phone_num)
  471. return choose_phone_num
  472. def handle_assign_time(self, saas_id, service_id, choose_phone_num):
  473. key = self._key(saas_id, service_id)
  474. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  475. if cache_agent_map and choose_phone_num in cache_agent_map:
  476. state_data = cache_agent_map[choose_phone_num]
  477. state_data.assign_time = datetime.utcnow().timestamp() * 1000
  478. self.redis_handler.redis.hset(key, choose_phone_num, state_data.to_json_string())
  479. self.redis_handler.redis.expire(key, self._get_expire_time())
  480. # self.update_report_state(saas_id, service_id)
  481. def idle_agents(self, saas_id, service_id):
  482. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  483. if not cache_agent_list:
  484. return []
  485. agent_str = '\n'.join(["%s-%s-%s-%s"%(x.phone_num, x.status, x.time, x.assign_time) for x in cache_agent_list])
  486. print("assignAgent %s %s idleAgents:%s "% (saas_id, service_id, agent_str), flush=True)
  487. return self.get_idle_agents(cache_agent_list)
  488. def idle_hash(self, saas_id, agent_id, phone_num, service_id):
  489. key = self._key(saas_id, service_id)
  490. state_data = AgentStateData()
  491. print("idle_hash, key=%s, saas_id=%s, phone_num=%s"%(key, saas_id, phone_num), flush=True)
  492. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  493. if cache_agent_map and phone_num in cache_agent_map:
  494. state_data = cache_agent_map[phone_num]
  495. state_data.status = HumanState.IDLE.code
  496. state_data.time = datetime.utcnow().timestamp() * 1000
  497. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  498. self.redis_handler.redis.expire(key, self._get_expire_time())
  499. # self.update_report_state(saas_id, service_id)
  500. def busy_hash(self, saas_id, agent_id, phone_num, service_id):
  501. cache_agent_map = self.get_cache_agent_map(saas_id, service_id)
  502. state_data = cache_agent_map.get(phone_num)
  503. key = self._key(saas_id, service_id)
  504. if state_data is None:
  505. return
  506. state_data.status = HumanState.BUSY.code
  507. self.redis_handler.redis.hset(key, phone_num, state_data.to_json_string())
  508. self.redis_handler.redis.expire(key, self._get_expire_time())
  509. # self.update_report_state(saas_id, service_id)
  510. def get_cache_agent_map(self, saas_id, service_id):
  511. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  512. # 检查列表是否为空,如果为空返回空字典
  513. if not cache_agent_list:
  514. return {}
  515. # 使用字典推导式将 cache_agent_list 转换为字典
  516. return {agent.phone_num: agent for agent in cache_agent_list}
  517. def get_cache_agent_list(self, saas_id, service_id):
  518. redis_key = self._key(saas_id, service_id)
  519. map_cache_by_key = self.redis_handler.redis.hgetall(redis_key)
  520. print("get_cache_agent_list, redis_key=%s, map_cache_by_key=%s"%(redis_key, map_cache_by_key), flush=True)
  521. if not map_cache_by_key: # 检查字典是否为空
  522. return [] # 返回空列表
  523. free_agents = []
  524. for phone_num, json_value in map_cache_by_key.items():
  525. agent_status_data_dict = json.loads(json_value)
  526. agent_status_data = AgentStateData(**agent_status_data_dict) # 解析 JSON 为 AgentStateData 对象
  527. agent_status_data.phone_num = phone_num.decode('utf-8') # 设置电话号码
  528. free_agents.append(agent_status_data)
  529. return free_agents
  530. def get_idle_agents(self,cache_agent_list):
  531. current_time =int(datetime.utcnow().timestamp() * 1000) # 获取当前时间的毫秒级时间戳
  532. idle_agents = [
  533. agent for agent in cache_agent_list
  534. if agent.status == 1 and (
  535. agent.assign_time == 0 or
  536. agent.assign_time + self.assigned_recycle_millisecond < current_time
  537. )
  538. ]
  539. return idle_agents
  540. def get_agent_service_idle_size(self, saas_id, service_id):
  541. idle_agents_size = 0
  542. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  543. if cache_agent_list: # 检查列表是否非空
  544. idle_agents = self.get_idle_agents(cache_agent_list)
  545. idle_agents_size = len(idle_agents) # 获取空闲代理的数量
  546. return idle_agents_size
  547. def get_agent_service_busy_size(self, saas_id, service_id):
  548. busy_agents_size = 0
  549. cache_agent_list = self.get_cache_agent_list(saas_id, service_id)
  550. if cache_agent_list: # 检查列表是否非空
  551. idle_agents = self.get_idle_agents(cache_agent_list)
  552. busy_agents = [agent for agent in cache_agent_list if agent not in idle_agents] # 计算忙碌代理
  553. busy_agents_size = len(busy_agents) # 获取忙碌代理的数量
  554. return busy_agents_size
  555. def update_report_state(self, saas_id, service_id):
  556. key = self._key(saas_id, service_id)
  557. # data_map 这个地方有疑问
  558. data_map = self.state_service_id_data_map[key]
  559. idle = HumanState.IDLE
  560. if idle.value not in data_map:
  561. data_map[idle.code] = threading.Lock()
  562. self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, idle)
  563. # data_map[idle.code] = self.do_report_real_time_human_service_id(saas_id, service_id, idle)
  564. busy = HumanState.BUSY
  565. if busy.value not in data_map:
  566. data_map[busy.code] = threading.Lock()
  567. self.executor.submit(self.do_report_real_time_human_service_id, saas_id, service_id, busy)
  568. # data_map[busy.code] = self.do_report_real_time_human_service_id(saas_id, service_id, busy)
  569. def do_report_real_time_human_service_id(self, saas_id, service_id, human_state):
  570. name = "cti_center_real_time_human_service_state"
  571. tag_list = {
  572. "vcc_id": saas_id,
  573. "service_id": service_id,
  574. "state": human_state.code,
  575. }
  576. if human_state == HumanState.IDLE:
  577. # meter_registry 这块疑问
  578. self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_busy_size(saas_id, service_id))
  579. elif human_state == HumanState.BUSY:
  580. self.meter_registry.gauge(name, tag_list, self, lambda ctx: ctx.get_agent_service_idle_size(saas_id, service_id))
  581. return 0
  582. def _check_in_key(self, saas_id):
  583. return "CTI:%s:HUMAN:AGENT"%(saas_id.upper())
  584. def _key(self, saas_id, service_id):
  585. return "CTI:%s:HUMAN:%s"%(saas_id.upper(), service_id)
  586. def _get_expire_time(self):
  587. now = datetime.now()
  588. end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=0)
  589. expire_time = (end_of_day - now).total_seconds() * 1000 # Convert to milliseconds
  590. return int(expire_time)
  591. def _choose_max_idle_time(self, idle_agents: List[AgentStateData]) -> str:
  592. idle_agents = sorted(idle_agents, key=lambda agent: agent.assign_time, reverse=False)
  593. return idle_agents[0].phone_num
  594. def _get_agent(saas_id, agent_id=None, agent_number=None, out_id=None):
  595. agent = Agent.query.filter(
  596. Agent.saas_id == saas_id,
  597. or_(Agent.out_id == agent_id, Agent.out_id == out_id, Agent.agent_num == agent_number)
  598. ).first()
  599. return agent
  600. def _get_newest_agent_number(saas_id):
  601. agent = Agent.query.filter(Agent.saas_id == saas_id).order_by(Agent.agent_num.desc()).first()
  602. agentNum = START_AGENT_NUM
  603. if agent and agent.agent_num:
  604. agentNum = str(int(agent.agent_num) + 1)
  605. return agentNum
  606. def _get_agent_monitor(saas_id, agent_number):
  607. monitor = AgentMonitor.query.filter(AgentMonitor.saas_id == saas_id,
  608. AgentMonitor.agent_num == agent_number).first()
  609. print('_get_agent_monitor', saas_id, agent_number, monitor)
  610. return monitor
  611. def _get_phone(saas_id, phone_num):
  612. phone = Phone.query.filter(Phone.saas_id == saas_id, Phone.phone_num == phone_num).first()
  613. return phone
  614. def _get_human_service_service(saas_id, agent_id):
  615. human_service_map = HumanServiceMap.query.filter(HumanServiceMap.saas_id == saas_id, HumanServiceMap.agent_id ==agent_id).first()
  616. return human_service_map