cache.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import json
  4. import sys
  5. import time
  6. import uuid
  7. from datetime import datetime
  8. from src.core import singleton_keys
  9. from src.core.callcenter.constant import *
  10. from src.core.callcenter.api import AgentInfo, CallInfo, RouteGateway
  11. from src.core.callcenter.dao import Agent, Phone
  12. from src.core.callcenter.data_handler import DataHandleServer
  13. from src.core.datasource import RedisHandler
  14. from cacheout import CacheManager
  15. @singleton_keys
  16. class Cache:
  17. def __init__(self, app):
  18. self.cacheDay = 7
  19. # self.deviceCall = {}
  20. # self.deviceUserPart = {}
  21. self.logger = app.logger
  22. self.redis_handler = RedisHandler()
  23. self.dataHandleServer = DataHandleServer(app)
  24. self.cacheman = CacheManager({'call':{'maxsize': 600, 'ttl': 60*60*1},
  25. 'agent': {'maxsize': 600, 'ttl': 60*60*1},
  26. 'deviceCall': {'maxsize': 600, 'ttl': 60*60*1},
  27. 'deviceUserPart': {'maxsize': 600, 'ttl': 60*60*1},
  28. })
  29. def get_agent_info(self, saas_id, agent_number):
  30. key = AGENT_INFO + saas_id + ":" + agent_number
  31. agent_info = self.cacheman['agent'].get(key)
  32. if agent_info:
  33. return agent_info
  34. text = self.redis_handler.get(key)
  35. self.logger.info('get_agent_info %s %s %s'%(saas_id, agent_number, text))
  36. if text:
  37. agent_info = AgentInfo.from_json(text)
  38. self.cacheman['agent'].set(key, agent_info)
  39. return agent_info
  40. phone = self.dataHandleServer.get_agent_phone(saas_id, agent_number)
  41. agent_info = AgentInfo(saas_id=saas_id, sip_server=phone.sip_server, agent_number=agent_number)
  42. self.logger.info('get_agent_info %s %s %s'% (saas_id, agent_number, agent_info))
  43. self.add_agent_info(agent=agent_info)
  44. return agent_info
  45. def refresh_agent_token(self, agent_number, token):
  46. self.redis_handler.set(AGENT_TOKEN + token, agent_number, self.cacheDay * 24 * 60 * 60)
  47. def delete_key(self, key):
  48. self.redis_handler.redis.delete(key)
  49. def get_agent_number(self, token):
  50. return self.redis_handler.get(AGENT_TOKEN + token)
  51. # 缓存坐席
  52. def add_agent_info(self, call_info: CallInfo = None, agent: AgentInfo = None, call_id=None, device_id=None):
  53. if call_info and not agent:
  54. agent = self.get_agent_info(call_info.saas_id, call_info.agent_key)
  55. if not agent:
  56. return
  57. if call_id:
  58. agent.call_id = call_id
  59. if device_id:
  60. agent.device_id = device_id
  61. key = AGENT_INFO + agent.saas_id + ":" + agent.agent_number
  62. self.cacheman['agent'].set(key, agent)
  63. self.redis_handler.set(key, agent.to_json_string(), self.cacheDay * 24 * 60 * 60)
  64. # 缓存CALL_INFO
  65. def add_call_info(self, call: CallInfo, persistent=False):
  66. for k, v in call.device_info_map.items():
  67. self.add_device(k, call.call_id)
  68. # print('add_call_info call_id:%s, call=%s'% (call.call_id, call))
  69. self.cacheman['call'].set(call.call_id, call)
  70. if persistent:
  71. self.redis_handler.set(CALL_INFO + call.call_id, call.to_json_string(), self.cacheDay * 24 * 60 * 60)
  72. def get_call_id_by_device_id(self, device_id):
  73. return self.cacheman['deviceCall'].get(device_id)
  74. # call_id = self.deviceCall.get(device_id)
  75. # return call_id
  76. # 获取callInfo
  77. def get_call_info(self, call_id):
  78. if not call_id:
  79. return None
  80. call_info = self.cacheman['call'].get(call_id)
  81. if call_info:
  82. return call_info
  83. text = self.redis_handler.get(CALL_INFO + call_id)
  84. if text:
  85. call_info = CallInfo.from_json(text)
  86. self.cacheman['call'].set(call_id, call_info)
  87. return call_info
  88. # print('get_call_info call_id:%s, text:%s'% (call_id, text))
  89. def remove_call_info(self, call_id):
  90. if not call_id:
  91. return None
  92. call_info = self.get_call_info(call_id)
  93. if call_info and call_info.device_info_map:
  94. for k, v in call_info.device_info_map.items():
  95. self.cacheman['deviceCall'].delete(k)
  96. # self.deviceCall.pop(k)
  97. self.cacheman['call'].delete(call_id)
  98. self.delete_key(CALL_INFO + call_id)
  99. def add_device(self, device_id, call_id):
  100. if not device_id or not call_id:
  101. return None
  102. self.cacheman['deviceCall'].set(device_id, call_id)
  103. # self.deviceCall[device_id] = call_id
  104. def get_user_part(self, device_id):
  105. return self.cacheman['deviceUserPart'].get(device_id)
  106. # return self.deviceUserPart.get(device_id)
  107. def add_device_user_part(self, device_id, user_part):
  108. if not device_id or not user_part:
  109. return
  110. self.cacheman['deviceUserPart'].set(device_id, user_part)
  111. # self.deviceUserPart[device_id] = user_part
  112. def get_route_gateway(self, saas_id):
  113. return RouteGateway(id=1,
  114. saas_id=saas_id,
  115. name='63366692',
  116. media_host='192.168.20.99',
  117. media_port=5060,
  118. caller_prefix='',
  119. called_prefix='',
  120. status=0)
  121. def add_delay_message(self, action, delay_action, timeouts):
  122. delay_action.uuid = str(uuid.uuid4())
  123. key = CTI_ENGINE_DELAY_ACTION % action
  124. msg = delay_action.to_json_string()
  125. action_time = datetime.now().timestamp()*1000 + timeouts * 1000
  126. self.redis_handler.redis.zadd(key, {msg : action_time})
  127. def get_delay_message(self, action):
  128. key = CTI_ENGINE_DELAY_ACTION % action
  129. current_time = int(datetime.now().timestamp() * 1000) # 毫秒级时间戳
  130. members = self.redis_handler.redis.zrangebyscore(key, 0, current_time, start=0, num=DELAY_ACTION_BATCH_SIZE, withscores=True)
  131. if not members:
  132. return []
  133. self.logger.info('get_delay_message %s %s %s'%(key, action, members))
  134. # scored_entries = [{"member": entry[0].decode('utf-8'), "score": entry[1]} for entry in members]
  135. action_list = [entry[0].decode('utf-8') for entry in members]
  136. self.logger.info('get_delay_message %s %s %s'%(key, action, json.dumps(action_list)))
  137. if action_list:
  138. a = self.redis_handler.redis.zrem(key, *action_list)
  139. self.logger.info('get_delay_message %s %s %s %s'%(key, action, json.dumps(action_list), a))
  140. return action_list
  141. def lock_delay_action(self, val):
  142. key = CTI_ENGINE_DELAY_ACTION_LOCK % val
  143. return self.redis_handler.redis.set(key, "1", ex=60*10, nx=True)
  144. def set_need_play_hold_music(self, call_id):
  145. key = NEED_PLAY_HOLD_MUSIC % call_id
  146. return self.redis_handler.redis.set(key, "1", ex=60 * 1, nx=True)
  147. def get_need_play_hold_music(self, call_id):
  148. key = NEED_PLAY_HOLD_MUSIC % call_id
  149. return self.redis_handler.redis.get(key)
  150. def del_need_play_hold_music(self, call_id):
  151. key = NEED_PLAY_HOLD_MUSIC % call_id
  152. self.delete_key(key)
  153. def set_after_play_hold_music(self, call_id):
  154. key = AFTER_PLAY_HOLD_MUSIC % call_id
  155. return self.redis_handler.redis.set(key, "1", ex=60 * 1, nx=True)
  156. def get_after_play_hold_music(self, call_id):
  157. key = AFTER_PLAY_HOLD_MUSIC % call_id
  158. return self.redis_handler.redis.get(key)
  159. def get_call_is_end(self, call_id):
  160. key = CTI_MANAGE_CENTER_CALL_END_KEY % call_id
  161. return self.redis_handler.get(key)
  162. def set_call_is_end(self, call_id):
  163. key = CTI_MANAGE_CENTER_CALL_END_KEY % call_id
  164. return self.redis_handler.redis.set(key, "1", ex=60 * 10, nx=True)
  165. def get_call_is_answer(self, saas_id, flow_id):
  166. key = CTI_AGENT_MANUAL_ANSWER%(saas_id, flow_id)
  167. return self.redis_handler.get(key)
  168. def set_call_is_answer(self, saas_id, flow_id):
  169. key = CTI_AGENT_MANUAL_ANSWER%(saas_id, flow_id)
  170. return self.redis_handler.redis.set(key, "1", ex=60, nx=True)
  171. def lock_register_per_hours(self):
  172. hour = datetime.now().strftime('%Y%m%d%H')
  173. key = BOT_REGISTER_PER_HOURS %hour
  174. return self.redis_handler.redis.get(key)
  175. def set_register_per_hours(self, expire=86400):
  176. hour = datetime.now().strftime('%Y%m%d%H')
  177. key = BOT_REGISTER_PER_HOURS %hour
  178. return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)
  179. def get_pjsua_thread_lock(self):
  180. minute = datetime.now().strftime('%Y%m%d%H%M')
  181. key = BOT_PJSUA_THREAD_LOCK % minute
  182. _lock = self.redis_handler.redis.get(key)
  183. # self.logger.info("get_pjsua_thread_lock %s", _lock)
  184. return _lock
  185. def del_pjsua_thread_lock(self):
  186. minute = datetime.now().strftime('%Y%m%d%H%M')
  187. key = BOT_PJSUA_THREAD_LOCK % minute
  188. return self.redis_handler.redis.delete(key)
  189. def set_pjsua_thread_lock(self, expire=60*3):
  190. minute = datetime.now().strftime('%Y%m%d%H%M')
  191. key = BOT_PJSUA_THREAD_LOCK % minute
  192. self.logger.info("set_pjsua_thread_lock")
  193. return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)