Browse Source

Merge branch 'develop' into jms_20250106_prod

Davidliu 3 months ago
parent
commit
e934494a7f
3 changed files with 99 additions and 107 deletions
  1. 2 2
      src/core/callcenter/cache.py
  2. 41 50
      src/core/voip/asr.py
  3. 56 55
      src/core/voip/bot.py

+ 2 - 2
src/core/callcenter/cache.py

@@ -211,12 +211,12 @@ class Cache:
         return self.redis_handler.redis.set(key, "1", ex=60, nx=True)
 
     def lock_register_per_hours(self):
-        hour = datetime.now().strftime('%Y%m%d%H')
+        hour = datetime.now().strftime('%Y%m%d')
         key = BOT_REGISTER_PER_HOURS %hour
         return self.redis_handler.redis.get(key)
 
     def set_register_per_hours(self, expire=86400):
-        hour = datetime.now().strftime('%Y%m%d%H')
+        hour = datetime.now().strftime('%Y%m%d')
         key = BOT_REGISTER_PER_HOURS %hour
         return self.redis_handler.redis.set(key, "1", ex=expire, nx=True)
 

+ 41 - 50
src/core/voip/asr.py

@@ -68,101 +68,92 @@ class TestSt:
             print("无法获取Token")
             return None
 
-    def __init__(self, tid, message_receiver=None):
+    def __init__(self, tid, logger, message_receiver=None):
         # self.is_closed = False
         # self.lock = threading.Lock()
-
+        self.logger = logger
+        self.__event = threading.Event()
         self.__th = threading.Thread(target=self.__test_run)
         self.__id = tid
         self.message_receiver = message_receiver
         self._Token = self.get_cached_token()
         self.sr = None
-        print("开始")
+        self.logger.debug("开始")
 
     def start(self):
         self.__th.start()
 
     def send_audio(self, audio_data):
         if self.sr:
-            # print("Sending audio data of length:", len(audio_data))
             self.sr.send_audio(audio_data)
-            # print("Audio data sent.")
-        # if self.sr and not self.is_closed:
-        #     with self.lock:
-        #         try:
-        #             self.sr.send_audio(audio_data)
-        #         except Exception as e:
-        #             print(f"Error sending audio: {e}")
-        #             self.close()
-    # def close(self):
-    #     with self.lock:
-    #         if not self.is_closed:
-    #             self.is_closed = True
-    #             try:
-    #                 self.sr.stop()
-    #             except Exception as e:
-    #                 print(f"Error stopping ASR: {e}")
 
     def close(self):
         try:
             self.sr.stop()
         except Exception as e:
-            print(f"Error stopping ASR: {e}")
+            self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}")
 
     def __test_run(self):
-        print("Thread:{} start..".format(self.__id))
+        self.logger.debug("Thread:%s start..",self.__id)
         nls.enableTrace(True)
-        self.sr = nls.NlsSpeechTranscriber(
-            url=URL,
-            token=self._Token,
-            appkey=APPKEY,
-            on_sentence_begin=self.test_on_sentence_begin,
-            on_sentence_end=self.test_on_sentence_end,
-            on_start=self.test_on_start,
-            on_result_changed=self.test_on_result_chg,
-            on_completed=self.test_on_completed,
-            on_error=self.test_on_error,
-            on_close=self.test_on_close,
-            callback_args=[self.__id]
-        )
-        self.sr.start(
-            aformat="pcm",
-            enable_intermediate_result=True,
-            enable_punctuation_prediction=True,
-            enable_inverse_text_normalization=True
-        )
-        self.sr.ctrl(ex={'max_sentence_silence': '1200ms', 'disfluency': True,'enable_words': True })
-        print("ASR session started.")
+        count = 0
+        self.__event.clear()
+        while not self.__event.is_set():
+            self.sr = nls.NlsSpeechTranscriber(
+                url=URL,
+                token=self._Token,
+                appkey=APPKEY,
+                on_sentence_begin=self.test_on_sentence_begin,
+                on_sentence_end=self.test_on_sentence_end,
+                on_start=self.test_on_start,
+                on_result_changed=self.test_on_result_chg,
+                on_completed=self.test_on_completed,
+                on_error=self.test_on_error,
+                on_close=self.test_on_close,
+                callback_args=[self.__id]
+            )
+            self.sr.start(
+                aformat="pcm",
+                enable_intermediate_result=True,
+                enable_punctuation_prediction=True,
+                enable_inverse_text_normalization=True
+            )
+            self.sr.ctrl(ex={'max_sentence_silence': '1200ms', 'disfluency': True,'enable_words': True })
+            self.logger.debug(f"[{self.__id}]ASR session started. {count}")
+            self.__event.wait(timeout=.5)
+            self.logger.debug(f"[{self.__id}]ASR session started. {count}")
+            count = count + 1
 
     def test_on_sentence_begin(self, message, *args):
         # pass
-        print("test_on_sentence_begin:{}".format(message))
+        self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message)
         if self.message_receiver:
             self.message_receiver(message, *args)
 
     def test_on_sentence_end(self, message, *args):
-        print("test_on_sentence_end:{}".format(message))
+        self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message)
         if self.message_receiver:
             self.message_receiver(message, *args)
 
     def test_on_start(self, message, *args):
-        # print("test_on_start:{}".format(message))
+        self.__event.set()
+        self.logger.debug("[%s]test_on_start:%s", self.__id, message)
         pass
 
     def test_on_error(self, message, *args):
-        # print("on_error args=>{}".format(args))
+        # self.logger.debug("on_error args=>{}".format(args))
         if self.message_receiver:
             self.message_receiver(message, *args)
 
     def test_on_close(self, *args):
-        # print("on_close: args=>{}".format(args))
+        # self.logger.debug("on_close: args=>{}".format(args))
         pass
 
     def test_on_result_chg(self, message, *args):
-        # print("test_on_chg:{}".format(message))
+        # self.logger.debug("test_on_chg:{}".format(message))
         if self.message_receiver:
             self.message_receiver(message, *args)
 
     def test_on_completed(self, message, *args):
-        # print("on_completed:args=>{} message=>{}".format(args, message))
+        # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message))
         pass

+ 56 - 55
src/core/voip/bot.py

@@ -81,7 +81,7 @@ class MyAudioMediaPort(pj.AudioMediaPort):
         if self.asr:  # 如果ASR实例存在,则发送音频数据
             if self.first:
                 self.first = False
-                self.call.logger.info("Received audio frame: %s %s", frame.buf, frame.size)
+                self.call.logger.debug("Received audio frame: %s, %s %s", self.call.session_id,frame.buf, frame.size)
             self.asr.send_audio(frame.buf)
 
         try:
@@ -212,7 +212,7 @@ class MyCall(pj.Call):
 
         self.cur_player_file = None   #当前播放的文件
 
-        self.asr = TestSt(self.session_id, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
+        self.asr = TestSt(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
         self.asr.start()  # 启动ASR线程
 
         self.start_time = time.time()  # 当前机器人对话开始时间
@@ -458,12 +458,12 @@ class ToTextBotAgent:
                             self.call_agent.message_queue.put(message)
                             break
                         else:
-                            self.call_agent.logger.info("响应中没有 'data' 字段")
+                            self.call_agent.logger.info(f"to_request::failed, sessionId={request.sessionId}, response_data:{response_data}")
                     else:
-                        self.call_agent.logger.info(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
+                        self.call_agent.logger.info(f"to_request::请求失败,sessionId:{request.sessionId}, 状态码: {response.status_code if response else None}, 响应内容: {response.text if response else None}")
                 except Exception as e:
                     traceback.print_exc()
-                    self.call_agent.logger.error(f"TaskId={request.taskId}, 请求发生异常 {e}, URL: {url}")
+                    self.call_agent.logger.error(f"to_request::exception, TaskId={request.taskId}, sessionId={request.sessionId}, 请求发生异常: {e}")
                 finally:
                     try_count = try_count - 1
                     latency = (time.time() - once_start)
@@ -476,56 +476,57 @@ class ToTextBotAgent:
             latency = (time.time() - start_time)
             registry.BOT_REQUEST_COUNT.inc()
             registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
-            self.call_agent.logger.info(f"sessionId={ self.call_agent.session_id}, timeCost={latency}, chat::request:{request_data}, response:{response_data if response_data else None}")
-
-
-    def to_quest(self, request: BotChatRequest, try_count = 3):
-        start_time = time.time()
-        request_data = request.to_json_string()
-        response = None
-        try:
-            url = f"http://{SERVE_HOST}:40072/botservice"
-            # payload = request.to_json_string()
-            # self.call_agent.logger.info(f"请求数据:{request_data},url:{url}")
-            with requests.Session() as session:
-                message = None
-                # try:
-                session.headers.update({'Content-Type': 'application/json'})
-                while try_count > 0:
-                    once_start = time.time()
-                    try:
-                        response = session.post(url=url, json=request_data, timeout=3)
-                        # response = requests.post(url=url,  json=json.loads(request_data), headers=headers, timeout=10)  # 使用占位URL
-                        self.call_agent.logger.info("to_request come in , try_count=%s", try_count)
-                        if response.status_code == 200:
-                            response_data = response.json()
-                            if "data" in response_data and response_data["code"]==0:
-                                data = response_data["data"]
-                                message = ChatMessage.from_json(data)
-                                self.call_agent.message_queue.put(message)
-                                break
-                            else:
-                                self.call_agent.logger.info("响应中没有 'data' 字段")
-                        else:
-                            self.call_agent.logger.info(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
-                    except Exception as e:
-                        traceback.print_exc()
-                        self.call_agent.logger.error(f"TaskId={request.taskId}, 请求发生异常: {e}, URL: {url}")
-                    finally:
-                        try_count = try_count - 1
-                        latency = (time.time() - once_start)
-                        registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)
-
-                if not message:
-                    message = self.get_default_response()
-                    self.call_agent.message_queue.put(message)
-                # finally:
-                #     session.close()
-        finally:
-            latency = (time.time() - start_time)
-            registry.BOT_REQUEST_COUNT.inc()
-            registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
-            self.call_agent.logger.info(f"sessionId={ self.call_agent.session_id}, timeCost={latency}, chat::request:{request_data}, response:{response.text if response else None}")
+            self.call_agent.logger.info(f"to_request sessionId={self.call_agent.session_id}, timeCost={latency}, request:{request_data}, response:{response_data if response_data else None}")
+
+
+    # def to_quest(self, request: BotChatRequest, try_count = 3):
+    #     start_time = time.time()
+    #     request_data = request.to_json_string()
+    #     response = None
+    #     try:
+    #         url = f"http://{SERVE_HOST}:40072/botservice"
+    #         # payload = request.to_json_string()
+    #         # self.call_agent.logger.info(f"请求数据:{request_data},url:{url}")
+    #         with requests.Session() as session:
+    #             message = None
+    #             # try:
+    #             session.headers.update({'Content-Type': 'application/json'})
+    #             while try_count > 0:
+    #                 once_start = time.time()
+    #                 try:
+    #                     response = session.post(url=url, json=request_data, timeout=3)
+    #                     # response = requests.post(url=url,  json=json.loads(request_data), headers=headers, timeout=10)  # 使用占位URL
+    #                     # self.call_agent.logger.info("to_request come in , try_count=%s", try_count)
+    #                     if response.status_code == 200:
+    #                         response_data = response.json()
+    #                         if "data" in response_data and response_data["code"]==0:
+    #                             data = response_data["data"]
+    #                             message = ChatMessage.from_json(data)
+    #                             self.call_agent.message_queue.put(message)
+    #                             break
+    #                         else:
+    #                             self.call_agent.logger.info(f"to_request::sessionId:{request.sessionId}, 响应中没有 'data' 字段")
+    #                     else:
+    #                         self.call_agent.logger.info(f"to_request::请求失败,sessionId:{request.sessionId}, 状态码: {response.status_code}, 响应内容: {response.text}")
+    #                 except Exception as e:
+    #                     traceback.print_exc()
+    #                     self.call_agent.logger.error(f"to_request::TaskId={request.taskId}, sessionId={request.sessionId}, 请求发生异常: {e}, URL: {url}")
+    #                 finally:
+    #                     try_count = try_count - 1
+    #                     latency = (time.time() - once_start)
+    #                     registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)
+    #
+    #             self.call_agent.logger.info(f"to_request::sessionId:{request.sessionId}, message:{message.to_json_string() if message else None}")
+    #             if not message:
+    #                 message = self.get_default_response()
+    #                 self.call_agent.message_queue.put(message)
+    #             # finally:
+    #             #     session.close()
+    #     finally:
+    #         latency = (time.time() - start_time)
+    #         registry.BOT_REQUEST_COUNT.inc()
+    #         registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
+    #         self.call_agent.logger.info(f"to_request::sessionId={ self.call_agent.session_id}, timeCost={latency}, request:{request_data}, response:{response.text if response else None}")
 
     def get_default_response(self):
         response=   {