123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- #!/usr/bin/env python3
- # encoding:utf-8
- import json
- import sys
- import time
- import uuid
- from datetime import datetime
- from src.core.callcenter.constant import *
- from src.core.callcenter.api import AgentInfo, CallInfo, RouteGateway
- from src.core.callcenter.dao import Agent, Phone
- from src.core.callcenter.data_handler import DataHandleServer
- from src.core.datasource import RedisHandler
- class Cache:
- def __init__(self, app):
- self.cacheDay = 7
- self.deviceCall = {}
- self.deviceUserPart = {}
- self.logger = app.logger
- self.redis_handler = RedisHandler()
- self.dataHandleServer = DataHandleServer(app)
- def get_agent_info(self, saas_id, agent_number):
- text = self.redis_handler.get(AGENT_INFO + saas_id + ":" + agent_number)
- self.logger.info('get_agent_info %s %s %s'%(saas_id, agent_number, text))
- if text:
- return AgentInfo.from_json(text)
- phone = self.dataHandleServer.get_agent_phone(saas_id, agent_number)
- agent_info = AgentInfo(saas_id=saas_id, sip_server=phone.sip_server, agent_number=agent_number)
- self.logger.info('get_agent_info %s %s %s'% (saas_id, agent_number, agent_info))
- self.add_agent_info(agent=agent_info)
- sys.stdout.flush() # 强制刷新输出缓冲区
- return agent_info
- def refresh_agent_token(self, agent_number, token):
- self.redis_handler.set(AGENT_TOKEN + token, agent_number, self.cacheDay * 24 * 60 * 60)
- def delete_key(self, key):
- self.redis_handler.redis.delete(key)
- def get_agent_number(self, token):
- return self.redis_handler.get(AGENT_TOKEN + token)
- # 缓存坐席
- def add_agent_info(self, call_info: CallInfo = None, agent: AgentInfo = None, call_id=None, device_id=None):
- if call_info and not agent:
- agent = self.get_agent_info(call_info.saas_id, call_info.agent_key)
- if not agent:
- return
- if call_id:
- agent.call_id = call_id
- if device_id:
- agent.device_id = device_id
- self.redis_handler.set(AGENT_INFO + agent.saas_id + ":" + agent.agent_number, agent.to_json_string(), self.cacheDay * 24 * 60 * 60)
- # 缓存CALL_INFO
- def add_call_info(self, call: CallInfo):
- for k, v in call.device_info_map.items():
- self.add_device(k, call.call_id)
- # print('liuwei::debugger::add_call_info call_id:%s, call=%s'% (call.call_id, call))
- self.redis_handler.set(CALL_INFO + call.call_id, call.to_json_string(), self.cacheDay * 24 * 60 * 60)
- def get_call_info_by_device_id(self, device_id):
- call_id = self.deviceCall.get(device_id)
- if not call_id:
- return self.get_call_info(call_id)
- # 获取callInfo
- def get_call_info(self, call_id):
- text = None
- if call_id:
- text = self.redis_handler.get(CALL_INFO + call_id)
- # print('liuwei::debugger::get_call_info call_id:%s, text:%s'% (call_id, text))
- if text:
- return CallInfo.from_json(text)
- def remove_call_info(self, call_id):
- if not call_id:
- return None
- call_info = self.get_call_info(call_id)
- if call_info and call_info.device_info_map:
- for k, v in call_info.device_info_map.items():
- self.deviceCall.pop(k)
- self.delete_key(CALL_INFO + call_id)
- def add_device(self, device_id, call_id):
- if not device_id or not call_id:
- return None
- self.deviceCall[device_id] = call_id
- def get_user_part(self, device_id):
- return self.deviceUserPart.get(device_id)
- def add_device_user_part(self, device_id, user_part):
- if not device_id or not user_part:
- return
- self.deviceUserPart[device_id] = user_part
- def get_route_gateway(self, saas_id):
- return RouteGateway(id=1,
- saas_id=saas_id,
- name='63366692',
- media_host='192.168.20.99',
- media_port=5060,
- caller_prefix='',
- called_prefix='',
- status=0)
- def add_delay_message(self, action, delay_action, timeouts):
- delay_action.uuid = uuid.uuid4()
- key = CTI_ENGINE_DELAY_ACTION % action
- msg = delay_action.to_json_string()
- action_time = datetime.utcnow().timestamp() + timeouts * 1000
- self.redis_handler.redis.zadd(key, {msg : action_time})
- def get_delay_message(self, action):
- key = CTI_ENGINE_DELAY_ACTION % action
- current_time = int(time.time() * 1000) # 毫秒级时间戳
- members = self.redis_handler.redis.zrangebyscore(key, 0, current_time, start=0, num=DELAY_ACTION_BATCH_SIZE, withscores=True)
- if not members:
- return []
- # scored_entries = [{"member": entry[0].decode('utf-8'), "score": entry[1]} for entry in members]
- action_list = [entry[0].decode('utf-8') for entry in members]
- if action_list:
- self.redis_handler.redis.zrem(key, *action_list)
- return action_list
- def lock_delay_action(self, val):
- key = CTI_ENGINE_DELAY_ACTION_LOCK % val
- return self.redis_handler.redis.set(key, "1", ex=60*10, nx=True)
- def set_need_play_hold_music(self, call_id):
- key = NEED_PLAY_HOLD_MUSIC % call_id
- return self.redis_handler.redis.set(key, "1", ex=60 * 1, nx=True)
- def get_need_play_hold_music(self, call_id):
- key = NEED_PLAY_HOLD_MUSIC % call_id
- return self.redis_handler.redis.get(key)
- def del_need_play_hold_music(self, call_id):
- key = NEED_PLAY_HOLD_MUSIC % call_id
- self.delete_key(key)
- def set_after_play_hold_music(self, call_id):
- key = AFTER_PLAY_HOLD_MUSIC % call_id
- return self.redis_handler.redis.set(key, "1", ex=60 * 1, nx=True)
- def get_after_play_hold_music(self, call_id):
- key = AFTER_PLAY_HOLD_MUSIC % call_id
- return self.redis_handler.redis.get(key)
|