Parcourir la source

Merge branch 'dev_20241205' of ssh://gitlab.fuxicarbon.com:1111/client_service/voice-gateway-service into dev_20241205

shanghui il y a 3 mois
Parent
commit
fc560eff20

+ 8 - 0
src/core/callcenter/cache.py

@@ -189,3 +189,11 @@ class Cache:
     def set_call_is_answer(self, saas_id, flow_id):
         key = CTI_AGENT_MANUAL_ANSWER%(saas_id, flow_id)
         return self.redis_handler.redis.set(key, "1", ex=60, nx=True)
+
+    def lock_register_per_hours(self, hour):
+        key = BOT_REGISTER_PER_HOURS %hour
+        return self.redis_handler.redis.get(key)
+
+    def set_register_per_hours(self, hour):
+        key = BOT_REGISTER_PER_HOURS %hour
+        return self.redis_handler.redis.set(key, "1", ex=60*60, nx=True)

+ 2 - 1
src/core/callcenter/constant.py

@@ -88,6 +88,7 @@ NEED_PLAY_HOLD_MUSIC = "CTI:ENGINE:NEED:HOLD:%s"
 AFTER_PLAY_HOLD_MUSIC = "CTI:ENGINE:AFTER:HOLD:%s"
 CTI_MANAGE_CENTER_CALL_END_KEY = "CTI:MANAGE:CENTER:CALL:END:KEY:%s"
 CTI_AGENT_MANUAL_ANSWER = "AGENT:MANUAL:ANSWER:%s:%s"
+BOT_REGISTER_PER_HOURS = "BOT:REGISTER:PER_HOURS:%s"
 
 def get_json_dict(json_text=None):
     if isinstance(json_text, str):
@@ -113,7 +114,7 @@ def error_response(msg, data=None, code=1, http_code=200):
 
 def format_time_millis(time_millis, pattern='%Y%m%d'):
     from datetime import datetime
-    dt = datetime.utcfromtimestamp(time_millis)
+    dt = datetime.fromtimestamp(time_millis)
     return dt.strftime(pattern)
 
 

+ 1 - 0
src/core/callcenter/registry.py

@@ -36,6 +36,7 @@ ESL_EVENT_CALLBACK_LATENCY = Histogram('esl_event_callback_latency', 'Esl Event
 BOT_CALL_DURATION = Histogram('bot_call_duration', '通话时长',['taskId'])
 BOT_REQUEST_LATENCY = Histogram('bot_request_latency','请求机器人耗时',['taskId'])
 BOT_INTERACTION_ROUNDS = Histogram('bot_interaction_rounds',  '机器人交互轮次',['taskId'] )
+BOT_CREATE_ACCOUNT_LATENCY = Histogram('bot_create_account_latency',  '创建虚拟机器人耗时')
 
 
 FLASK_ACTIVE_THREADS = Gauge('flask_active_threads', 'Number of active threads')

+ 56 - 30
src/core/voip/bot.py

@@ -10,6 +10,7 @@ import traceback
 import sys
 import pjsua2 as pj
 from enum import Enum
+from datetime import datetime
 
 from src.core.callcenter import registry
 from src.core.callcenter.cache import Cache
@@ -540,41 +541,66 @@ class BotAgent:
         self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
         # Start the library
         self.ep.libStart()
-
-        for user_part in self.user_part_range:
-            acfg = pj.AccountConfig()
-            acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}"
-            acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
-            cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password)
-            acfg.sipConfig.authCreds.append(cred)
-
-            acfg.regConfig.timeoutSec = 86400  # 注册超时时间(秒)
-            acfg.regConfig.retryIntervalSec = 10  # 重试间隔时间(秒)
-            acfg.regConfig.firstRetryIntervalSec = 10  # 首次重试间隔时间(秒)
-
-            # acfg.natConfig.iceEnabled = True
-            # acfg.natConfig.turnEnabled = True
-            # acfg.natConfig.turnServer = "stun:pbx.fuxicarbon.com:3478"
-            # acfg.natConfig.turnUsername = "username"
-            # acfg.natConfig.turnPassword = "password"
-
-            # Create the account
-            acc = Account(self, user_part=user_part)
-            acc.create(acfg)
-
-            self.user_part_pool.put(user_part)
-            self.accounts[user_part] = acc
+        self.create_account()
 
         while not self.is_stopping:
             registry.BOT_AGENT_LIVES.set(self.user_part_pool.qsize())
-            self.flush_register_expire()
+            self.register_per_hours()
             self.ep.libHandleEvents(100)
 
-    def flush_register_expire(self):
-        if len(self.accounts) == self.user_part_pool.qsize():
-            self.logger.info("------------------>flush register ::%s", len(self.accounts))
-            user_part = self.user_part_range[0]
-            self.accounts[user_part].setRegistration(renew=True)
+    def create_account(self, recreate = False):
+        start_time = time.time()
+        try:
+            _hour = datetime.now().strftime('%Y%m%d%H')
+            self.cache.set_register_per_hours(_hour)
+            if recreate:
+                for k, v in self.accounts.items():
+                    try:
+                        v.shutdown()
+                    except:
+                        traceback.print_exc()
+                while not self.user_part_pool.empty():
+                    try:
+                        self.user_part_pool.get_nowait()
+                    except:
+                        pass
+
+            for user_part in self.user_part_range:
+                acfg = pj.AccountConfig()
+                acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}"
+                acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
+                cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password)
+                acfg.sipConfig.authCreds.append(cred)
+
+                acfg.regConfig.timeoutSec = 86400  # 注册超时时间(秒)
+                acfg.regConfig.retryIntervalSec = 10  # 重试间隔时间(秒)
+                acfg.regConfig.firstRetryIntervalSec = 10  # 首次重试间隔时间(秒)
+
+                # acfg.natConfig.iceEnabled = True
+                # acfg.natConfig.turnEnabled = True
+                # acfg.natConfig.turnServer = f"stun:{self.host}:3478"
+                # acfg.natConfig.turnUsername = "username"
+                # acfg.natConfig.turnPassword = "password"
+
+                # Create the account
+                acc = Account(self, user_part=user_part)
+                acc.create(acfg)
+
+                self.user_part_pool.put(user_part)
+                self.accounts[user_part] = acc
+        except:
+            traceback.print_exc()
+        finally:
+            latency = time.time() - start_time
+            registry.BOT_CREATE_ACCOUNT_LATENCY.observe(latency)
+            self.logger.info("create account latency: %.3fs", latency)
+
+    def register_per_hours(self):
+        _hour = datetime.now().strftime('%Y%m%d%H')
+        _lock = self.cache.lock_register_per_hours(_hour)
+        if _lock and len(self.accounts) == len(self.user_part_range):
+            self.cache.set_register_per_hours(_hour)
+            self.create_account(recreate=True)
 
     def transfer(self, user_part, call_id, device_id, service_id='00000000000000000'):
         if self.acd_service: