#!/usr/bin/env python3 # encoding:utf-8 import concurrent.futures import queue import random import threading import mmh3 import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil from src.core.callcenter.agent import AgentEventService from src.core.callcenter.cache import Cache from src.core.callcenter.enumeration import CallType from src.core.callcenter.esl.constant.event_names import DETECTED_TONE class Callback(object): def __init__(self, app, thread_num): self.app = app self.is_stopping = False self.logger = app.logger self.cache = Cache(app) self.event_queue = queue.Queue() self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1, thread_name_prefix="callback-event-pool") for x in range(thread_num)} self.agent_event_service = AgentEventService(app) threading.Thread(target=self.start).start() def start(self): while not self.is_stopping: try: event, call_info, device_info = self.event_queue.get(timeout=1) call_type = CallType.get_by_code(call_info.call_type) if call_info else None if call_type is None: event_name = EslEventUtil.getEventName(event) self.logger.info("callback:call_type_none:return::event_name=%s, call_info=%s, device_info=%s", event_name, call_info, device_info) continue if CallType.BOT_CALL == call_type or CallType.INCOMING_BOT_CALL == call_type: self.choose_thread_pool_executor(event).submit(self.agent_event_service.bot_event_channel, event, call_info, device_info) else: self.choose_thread_pool_executor(event).submit(self.agent_event_service.agent_event_channel, event, call_info, device_info) except: pass def stop(self): self.is_stopping = True for k, v in self.executors.items(): v.shutdown() def callback_event(self, event): event_name = EslEventUtil.getEventName(event) # CUSTOM == event_name or if not (event_name.startswith('CHANNEL_') or event_name.startswith('PLAYBACK_') or event_name == DETECTED_TONE): return call_id = EslEventUtil.getCallId(event) device_id = EslEventUtil.getDeviceId(event) if not call_id and device_id: call_id = self.cache.get_call_id_by_device_id(device_id) call_info = self.cache.get_call_info(call_id) if not call_info: self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id) return device_info = call_info.device_info_map.get(device_id) if call_info and call_info.device_info_map else None self.logger.info("callback::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id) self.event_queue.put_nowait((event, call_info, device_info)) def choose_thread_pool_executor(self, e): call_id = EslEventUtil.getCallId(e) device_id = EslEventUtil.getUniqueId(e) wdh_device_id = EslEventUtil.getDeviceId(e) random_id = call_id if call_id else device_id if random_id: random_index = abs(mmh3.hash(random_id)) % len(self.executors) else: random_index = random.randint(0, len(self.executors) - 1) if self.executors else 0 # self.logger.info('choose_thread_pool_executor.index %s %s %s %s', random_index, call_id, device_id, wdh_device_id) return self.executors.get(random_index)