浏览代码

超时逻辑开发

774056846 5 月之前
父节点
当前提交
c8b1625850

+ 8 - 0
src/core/callcenter/api.py

@@ -635,6 +635,14 @@ class NextCommand(BaseApi):
         self.next_value = next_value
 
 
+class DelayAction(BaseApi):
+
+    def __init__(self, uuid=None, call_id=None, device_id=None):
+        self.uuid=uuid
+        self.call_id = call_id
+        self.device_id = device_id
+
+
 class CallDetail(BaseApi):
     def __init__(self, id=None, cts=None, start_time=None, end_time=None, call_id=None,
                  detail_index=None, transfer_type=None, transfer_id=None, reason=None,

+ 26 - 1
src/core/callcenter/cache.py

@@ -2,7 +2,9 @@
 # encoding:utf-8
 
 import json
-
+import time
+import uuid
+from datetime import datetime
 from src.core.callcenter.constant import *
 from src.core.callcenter.api import AgentInfo, CallInfo, RouteGateway
 from src.core.callcenter.dao import Agent, Phone
@@ -112,3 +114,26 @@ def get_route_gateway(saas_id):
 
 def get_agent_phone(saas_id, agent_num):
     return Phone.query.filter(Phone.saas_id == saas_id, Phone.phone_num == agent_num).first()
+
+def add_delay_message(action, delay_action, timeouts):
+    delay_action.uuid = uuid.uuid4()
+    key = CTI_ENGINE_DELAY_ACTION % action
+    msg = delay_action.to_json_string()
+    action_time = datetime.utcnow().timestamp() + timeouts * 1000
+    redis_handler.redis.zadd(key, {msg : action_time})
+
+def get_delay_message(action):
+    key = CTI_ENGINE_DELAY_ACTION % action
+    current_time = int(time.time() * 1000)  # 毫秒级时间戳
+    members = redis_handler.redis.zrangebyscore(key, 0, current_time, start=0, num=DELAY_ACTION_BATCH_SIZE, withscores=True)
+    if not members:
+        return []
+    # scored_entries = [{"member": entry[0].decode('utf-8'), "score": entry[1]} for entry in members]
+    action_list = [entry[0].decode('utf-8') for entry in members]
+    if action_list:
+        redis_handler.redis.zrem(key, *action_list)
+    return action_list
+
+def lock_delay_action(val):
+    key = CTI_ENGINE_DELAY_ACTION_LOCK % val
+    return redis_handler.redis.set(key, "1", ex=1000*10, nx=True)

+ 4 - 0
src/core/callcenter/constant.py

@@ -77,6 +77,10 @@ ADMIN_INFO = "ADMIN_INFO:"
 CALL_INFO = "CALL_INFO:"
 START_AGENT_NUM = "1000"
 
+DELAY_ACTION_BATCH_SIZE = 10
+CTI_ENGINE_DELAY_ACTION = "DELAY:ACTION:%s"
+CTI_ENGINE_DELAY_ACTION_LOCK = "DELAY:ACTION:LOCK:%s"
+
 
 def get_json_dict(json_string):
     data = json_string

+ 16 - 0
src/core/callcenter/enumeration.py

@@ -4,6 +4,22 @@
 from enum import Enum
 
 
+class DelayActionEnum(Enum):
+    CALL_TIMEOUT_HANGUP = ('CALL_TIMEOUT_HANGUP', "超时挂机")
+    PLAY_TIMEOUT_HANGUP = ('PLAY_TIMEOUT_HANGUP','播放超时挂机')
+    NOTICE_TIMEOUT_HANGUP = ('NOTICE_TIMEOUT_HANGUP', '')
+    CALL_TIMEOUT_DECR = ('CALL_TIMEOUT_DECR', ''),
+    ACD_TIMEOUT_PLAY = ('ACD_TIMEOUT_PLAY','')
+
+    def __init__(self, code=None, description=None):
+        self.code = code
+        self.description = description
+
+    @classmethod
+    def get_by_code(cls, code):
+        return next((member for member in cls if member.code == code), None)
+
+
 class HumanState(Enum):
     DEFAULT = (0, "默认")
     IDLE = (1, "空闲")

+ 87 - 6
src/core/callcenter/esl/client.py

@@ -11,16 +11,19 @@ import mmh3
 import threading
 import traceback
 import concurrent.futures
+
+from apscheduler.schedulers.background import BackgroundScheduler
+
 import src.core.callcenter.cache as Cache
-from src.core.callcenter.api import MakeCallContext
-from src.core.callcenter.constant import SK, EMPTY
+from src.core.callcenter.api import MakeCallContext, DelayAction
+from src.core.callcenter.constant import SK, EMPTY, CTI_ENGINE_DELAY_ACTION_LOCK, HOLD_MUSIC_PATH
 from src.core.callcenter.esl.constant.esl_constant import BRIDGE_VARIABLES, BRIDGE, HANGUP, NORMAL_CLEARING, SIP_HEADER, SPACE, SPLIT, SOFIA, \
     ORIGINATE, PARK, SET, EAVESDROP, SMF_ALEG, EXECUTE, PLAYBACK, PAUSE, TRANSFER, UUID_TRANSFER, UUID_BROADCAST, UUID_BREAK, UUID_HOLD, \
     UUID_RECORD, UUID_SETVAR, UUID_GETVAR
 import src.core.callcenter.esl.utils.esl_event_util as EslEventUtil
 import src.core.callcenter.esl.handler as event_handler
 from src.core.callcenter.esl.constant.sip_header_constant import sipHeaderHoldMusic, profile1, profile2
-from src.core.callcenter.enumeration import CallCause, DeviceType
+from src.core.callcenter.enumeration import CallCause, DeviceType, DelayActionEnum, HangupDir, CallType, NextType
 from src.core.callcenter.esl.handler.default_esl_event_handler import DefaultEslEventHandler
 from src.core.datasource import SERVE_HOST
 from src.core.voip.constant import *
@@ -38,9 +41,19 @@ class InboundClient:
         self.default_event_handler = DefaultEslEventHandler(self, self.bot_agent, self.logger)
         self.host, self.port, self.password = SERVE_HOST, '8021', '4918257983818884358'
         self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(self.thread_num)}
-
         threading.Thread(target=self.start, args=()).start()
 
+        self.delay_action_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
+        self.delay_action_scheduler = BackgroundScheduler()
+        self.delay_action_scheduler.add_job(self.do_delay_action, 'interval', seconds=1, max_instances=1)
+        self.delay_action_scheduler.start()
+
+    def submit_delay_action(self):
+        for name, member in DelayActionEnum.__members__.items():
+            action_messages = Cache.get_delay_message(name)
+            for action_message in action_messages:
+                self.delay_action_executor.submit(self.do_delay_action, name, action_message)
+
     def scan_esl_event_handlers(self):
         import inspect
         import importlib
@@ -111,6 +124,72 @@ class InboundClient:
         except:
             traceback.print_exc()
 
+    def do_delay_action(self, action, message):
+        delay_action = DelayAction.from_json(message)
+        flag = Cache.lock_delay_action(delay_action.uuid)
+        if not flag:
+            self.logger.info("异步延迟执行操作重复 action:%s msg:%s", action, message)
+            return
+        delay_action_enum = DelayActionEnum.get_by_code(action)
+        if not delay_action_enum:
+            self.logger.info("异步延迟执行 delayActionEnum为空 action:%s msg:%s", action, message)
+            return
+
+        if DelayActionEnum.CALL_TIMEOUT_HANGUP == delay_action_enum:
+            self.exec_when_call_timeout(delay_action.call_id, delay_action.device_id)
+        elif DelayActionEnum.PLAY_TIMEOUT_HANGUP == delay_action_enum:
+            self.exec_when_play_timeout(delay_action.call_id)
+        elif DelayActionEnum.ACD_TIMEOUT_PLAY == delay_action_enum:
+            self.exec_when_acd_timeout(delay_action.call_id)
+
+    def exec_when_call_timeout(self, call_id, device_id):
+        call_info = Cache.get_call_info(call_id)
+        if not call_info or not (device_id in call_info.device_list):
+            return
+        device_info = call_info.device_info_map.get(device_id)
+        if device_info and device_info.answer_time is None:
+            self.logger.info("call:%s deviceId:%s execWhenCallTimeOut", call_id, device_id)
+            device_info.hangup_cause = CallCause.CALL_TIMEOUT.name
+            call_info.next_commands = []
+            if device_info.device_type <= DeviceType.ROBOT.code:
+                call_info.hangup_dir = HangupDir.PLATFORM_HANGUP.code
+                call_info.hangup_code = CallCause.CALL_TIMEOUT.code
+            # if device_info.device_type.code == DeviceType.CUSTOMER.code:
+                # call_info.user_no_answer_end_call = True
+
+            if not device_info.end_time and device_info.device_type.code == DeviceType.CUSTOMER.code:
+                channel = self.show_channel(device_id)
+                if channel:
+                    delay_action = DelayAction(call_id=call_id, device_id=device_id)
+                    Cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_DECR, delay_action, timeouts=20)
+            Cache.add_call_info(call_info)
+            self.hangup_call(call_id, device_id, CallCause.CALL_TIMEOUT)
+
+    def exec_when_play_timeout(self, call_id):
+        call_info = Cache.get_call_info(call_id)
+        if not call_info or not call_info.next_commands:
+            return
+        self.logger.debug("播放结束音乐失败,进行挂机 callId:%s", call_id)
+        next_types = [x.next_type for x in call_info.next_commands]
+        if NextType.NEXT_HANGUP.code in next_types:
+            for device_id in call_info.device_list:
+                self.hangup_call(call_id, device_id, CallCause.PLAY_TIMEOUT)
+
+    def exec_when_acd_timeout(self, call_id):
+        call_info = Cache.get_call_info(call_id)
+        if not call_info:
+            self.logger.info("exec_when_acd_timeout callInfo为空 callId: {}", call_id)
+            return
+        device_list = [v for k,v in call_info.device_info_map.items() if v.device_type == DeviceType.CUSTOMER]
+        if device_list and len(device_list) == 1:
+            device_id = device_list[0].device_id
+            self.bridge_break(device_id)
+            self.hold_play(device_id, HOLD_MUSIC_PATH)
+            self.play_timeout(call_id, timeout=30)
+            Cache.add_call_info(call_info)
+            self.logger.info("waitingTimeOut 开始播放结束音乐 callId:%s customerDeviceId:%s playFile:%s", call_id,
+                             device_id, HOLD_MUSIC_PATH)
+
     def make_call_new(self, context: MakeCallContext):
         called = context.get_called()
         params = {'gateway': context.route_gateway_name, 'called': called, 'realm': context.get_realm()}
@@ -184,7 +263,8 @@ class InboundClient:
 
     def call_timeout(self, call_id, device_id, timeout):
         """呼叫超时主动挂机"""
-        pass
+        delay_action = DelayAction(call_id=call_id, device_id=device_id)
+        Cache.add_delay_message(DelayActionEnum.CALL_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
 
     def send_args(self, device_id, name, arg):
         msg = ESL.ESLevent("sendmsg", device_id)
@@ -365,7 +445,8 @@ class InboundClient:
 
     def play_timeout(self, call_id, timeout):
         """播放超时主动挂机"""
-        pass
+        delay_action = DelayAction(call_id=call_id)
+        Cache.add_delay_message(DelayActionEnum.PLAY_TIMEOUT_HANGUP.code, delay_action, timeouts=timeout)
 
     def listen(self, device_id1, device_id2, aleg=True, bleg=True):
         """监听"""