#!/usr/bin/env python3 # encoding:utf-8 import json import sys import time import uuid from datetime import datetime from src.core import singleton_keys from src.core.callcenter.constant import * from src.core.callcenter.api import AgentInfo, CallInfo, RouteGateway from src.core.callcenter.dao import Agent, Phone from src.core.callcenter.data_handler import DataHandleServer from src.core.datasource import RedisHandler from cacheout import CacheManager @singleton_keys class Cache: def __init__(self, app): self.cacheDay = 7 # self.deviceCall = {} # self.deviceUserPart = {} self.logger = app.logger self.redis_handler = RedisHandler() self.dataHandleServer = DataHandleServer(app) self.cacheman = CacheManager({'call':{'maxsize': 600, 'ttl': 60*60*1}, 'agent': {'maxsize': 600, 'ttl': 60*60*1}, 'deviceCall': {'maxsize': 600, 'ttl': 60*60*1}, 'deviceUserPart': {'maxsize': 600, 'ttl': 60*60*1}, }) def get_agent_info(self, saas_id, agent_number): key = AGENT_INFO + saas_id + ":" + agent_number agent_info = self.cacheman['agent'].get(key) if agent_info: return agent_info text = self.redis_handler.get(key) self.logger.info('get_agent_info %s %s %s'%(saas_id, agent_number, text)) if text: agent_info = AgentInfo.from_json(text) self.cacheman['agent'].set(key, agent_info) return agent_info phone = self.dataHandleServer.get_agent_phone(saas_id, agent_number) agent_info = AgentInfo(saas_id=saas_id, sip_server=phone.sip_server, agent_number=agent_number) self.logger.info('get_agent_info %s %s %s'% (saas_id, agent_number, agent_info)) self.add_agent_info(agent=agent_info) return agent_info def refresh_agent_token(self, agent_number, token): self.redis_handler.set(AGENT_TOKEN + token, agent_number, self.cacheDay * 24 * 60 * 60) def delete_key(self, key): self.redis_handler.redis.delete(key) def get_agent_number(self, token): return self.redis_handler.get(AGENT_TOKEN + token) # 缓存坐席 def add_agent_info(self, call_info: CallInfo = None, agent: AgentInfo = None, call_id=None, device_id=None): if call_info and not agent: agent = self.get_agent_info(call_info.saas_id, call_info.agent_key) if not agent: return if call_id: agent.call_id = call_id if device_id: agent.device_id = device_id key = AGENT_INFO + agent.saas_id + ":" + agent.agent_number self.cacheman['agent'].set(key, agent) self.redis_handler.set(key, agent.to_json_string(), self.cacheDay * 24 * 60 * 60) # 缓存CALL_INFO def add_call_info(self, call: CallInfo, persistent=False): for k, v in call.device_info_map.items(): self.add_device(k, call.call_id) # print('add_call_info call_id:%s, call=%s'% (call.call_id, call)) self.cacheman['call'].set(call.call_id, call) if persistent: self.redis_handler.set(CALL_INFO + call.call_id, call.to_json_string(), self.cacheDay * 24 * 60 * 60) def get_call_id_by_device_id(self, device_id): return self.cacheman['deviceCall'].get(device_id) # call_id = self.deviceCall.get(device_id) # return call_id # 获取callInfo def get_call_info(self, call_id): if not call_id: return None call_info = self.cacheman['call'].get(call_id) if call_info: return call_info text = self.redis_handler.get(CALL_INFO + call_id) if text: call_info = CallInfo.from_json(text) self.cacheman['call'].set(call_id, call_info) return call_info # print('get_call_info call_id:%s, text:%s'% (call_id, text)) def remove_call_info(self, call_id): if not call_id: return None call_info = self.get_call_info(call_id) if call_info and call_info.device_info_map: for k, v in call_info.device_info_map.items(): self.cacheman['deviceCall'].delete(k) # self.deviceCall.pop(k) self.cacheman['call'].delete(call_id) self.delete_key(CALL_INFO + call_id) def add_device(self, device_id, call_id): if not device_id or not call_id: return None self.cacheman['deviceCall'].set(device_id, call_id) # self.deviceCall[device_id] = call_id def get_user_part(self, device_id): return self.cacheman['deviceUserPart'].get(device_id) # return self.deviceUserPart.get(device_id) def add_device_user_part(self, device_id, user_part): if not device_id or not user_part: return self.cacheman['deviceUserPart'].set(device_id, user_part) # self.deviceUserPart[device_id] = user_part def get_route_gateway(self, saas_id): return RouteGateway(id=1, saas_id=saas_id, name='63366692', media_host='192.168.20.99', media_port=5060, caller_prefix='', called_prefix='', status=0) def add_delay_message(self, action, delay_action, timeouts): delay_action.uuid = str(uuid.uuid4()) key = CTI_ENGINE_DELAY_ACTION % action msg = delay_action.to_json_string() action_time = datetime.now().timestamp()*1000 + timeouts * 1000 self.redis_handler.redis.zadd(key, {msg : action_time}) def get_delay_message(self, action): key = CTI_ENGINE_DELAY_ACTION % action current_time = int(datetime.now().timestamp() * 1000) # 毫秒级时间戳 members = self.redis_handler.redis.zrangebyscore(key, 0, current_time, start=0, num=DELAY_ACTION_BATCH_SIZE, withscores=True) if not members: return [] self.logger.info('get_delay_message %s %s %s'%(key, action, members)) # scored_entries = [{"member": entry[0].decode('utf-8'), "score": entry[1]} for entry in members] action_list = [entry[0].decode('utf-8') for entry in members] self.logger.info('get_delay_message %s %s %s'%(key, action, json.dumps(action_list))) if action_list: a = self.redis_handler.redis.zrem(key, *action_list) self.logger.info('get_delay_message %s %s %s %s'%(key, action, json.dumps(action_list), a)) return action_list def lock_delay_action(self, val): key = CTI_ENGINE_DELAY_ACTION_LOCK % val return self.redis_handler.redis.set(key, "1", ex=60*10, nx=True) def set_need_play_hold_music(self, call_id): key = NEED_PLAY_HOLD_MUSIC % call_id return self.redis_handler.redis.set(key, "1", ex=60 * 1, nx=True) def get_need_play_hold_music(self, call_id): key = NEED_PLAY_HOLD_MUSIC % call_id return self.redis_handler.redis.get(key) def del_need_play_hold_music(self, call_id): key = NEED_PLAY_HOLD_MUSIC % call_id self.delete_key(key) def set_after_play_hold_music(self, call_id): key = AFTER_PLAY_HOLD_MUSIC % call_id return self.redis_handler.redis.set(key, "1", ex=60 * 1, nx=True) def get_after_play_hold_music(self, call_id): key = AFTER_PLAY_HOLD_MUSIC % call_id return self.redis_handler.redis.get(key) def get_call_is_end(self, call_id): key = CTI_MANAGE_CENTER_CALL_END_KEY % call_id return self.redis_handler.get(key) def set_call_is_end(self, call_id): key = CTI_MANAGE_CENTER_CALL_END_KEY % call_id return self.redis_handler.redis.set(key, "1", ex=60 * 10, nx=True) def get_call_is_answer(self, saas_id, flow_id): key = CTI_AGENT_MANUAL_ANSWER%(saas_id, flow_id) return self.redis_handler.get(key) def set_call_is_answer(self, saas_id, flow_id): key = CTI_AGENT_MANUAL_ANSWER%(saas_id, flow_id) return self.redis_handler.redis.set(key, "1", ex=60, nx=True) def lock_register_per_hours(self): hour = datetime.now().strftime('%Y%m%d%H') key = BOT_REGISTER_PER_HOURS %hour return self.redis_handler.redis.get(key) def set_register_per_hours(self, expire=86400): hour = datetime.now().strftime('%Y%m%d%H') key = BOT_REGISTER_PER_HOURS %hour return self.redis_handler.redis.set(key, "1", ex=expire, nx=True) def get_pjsua_thread_lock(self): minute = datetime.now().strftime('%Y%m%d%H%M') key = BOT_PJSUA_THREAD_LOCK % minute _lock = self.redis_handler.redis.get(key) # self.logger.info("get_pjsua_thread_lock %s", _lock) return _lock def del_pjsua_thread_lock(self): minute = datetime.now().strftime('%Y%m%d%H%M') key = BOT_PJSUA_THREAD_LOCK % minute return self.redis_handler.redis.delete(key) def set_pjsua_thread_lock(self, expire=60*3): minute = datetime.now().strftime('%Y%m%d%H%M') key = BOT_PJSUA_THREAD_LOCK % minute self.logger.info("set_pjsua_thread_lock") return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)