Browse Source

Merge branch 'dev_xunfeiasr_20250224' into jms_20250106_prod

Davidliu 1 month ago
parent
commit
0ac9b3622c

+ 3 - 1
requirements.txt

@@ -11,4 +11,6 @@ flask_sqlalchemy
 prometheus_client
 prometheus_flask_exporter
 pydub
-cacheout
+cacheout
+websocket-client
+websockets

+ 4 - 4
src/core/callcenter/agent.py

@@ -93,7 +93,7 @@ class AgentEventService:
 
         start_time = time.time()
         try:
-            self.logger.info('agent_event_channel, event_name=%s, call_id=%s, event_time=%s, agent_num=%s, device_id=%s, is_agent=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, event_time, agent_num, device_id, is_agent, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
+            self.logger.debug('agent_event_channel, event_name=%s, call_id=%s, event_time=%s, agent_num=%s, device_id=%s, is_agent=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, event_time, agent_num, device_id, is_agent, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
             agent = self.data_handle_server.get_agent(saas_id, agent_num)
             if not agent:
                 self.logger.warn("agent_event_channel:return, agent is null %s %s %s %s %s", saas_id, event_name, event_time, caller, called)
@@ -181,7 +181,7 @@ class AgentEventService:
             registry.ESL_EVENT_CALLBACK_COST.labels(event_name, "agent").observe(time_cost)
             latency = (time.time() - event_timestamp) * 1000
             registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "agent").observe(latency)
-            self.logger.info('agent_event_channel, event_name=%s, time_cost=%s, latency=%s, call_id=%s, event_time=%s, agent_num=%s, device_id=%s, is_agent=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, time_cost, latency, call_id, event_time, agent_num, device_id, is_agent, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
+            self.logger.debug('agent_event_channel, event_name=%s, time_cost=%s, latency=%s, call_id=%s, event_time=%s, agent_num=%s, device_id=%s, is_agent=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, time_cost, latency, call_id, event_time, agent_num, device_id, is_agent, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
 
     def bot_event_channel(self, event, call_info, device_info):
         event_name = EslEventUtil.getEventName(event)
@@ -199,7 +199,7 @@ class AgentEventService:
 
         start_time = time.time()
         try:
-            self.logger.info('bot_event_channel, event_name=%s, call_id=%s, device_id=%s, event_time=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, event_time, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
+            self.logger.debug('bot_event_channel, event_name=%s, call_id=%s, device_id=%s, event_time=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, call_id, device_id, event_time, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
             agent = self.data_handle_server.get_agent(saas_id, agent_num)
             if not agent:
                 self.logger.warn("bot_event_channel:return, agent is null %s %s %s %s %s %s", saas_id, event_name, event_time, call_id, caller, called)
@@ -251,7 +251,7 @@ class AgentEventService:
             registry.ESL_EVENT_CALLBACK_COST.labels(event_name, "bot").observe(time_cost)
             latency = (time.time() - event_timestamp) * 1000
             registry.ESL_EVENT_CALLBACK_LATENCY.labels(event_name, "bot").observe(latency)
-            self.logger.info('bot_event_channel, event_name=%s, time_cost=%s, latency=%s, call_id=%s, device_id=%s, event_time=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, time_cost, latency, call_id, device_id, event_time, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
+            self.logger.debug('bot_event_channel, event_name=%s, time_cost=%s, latency=%s, call_id=%s, device_id=%s, event_time=%s, is_agent=%s, agent_num=%s, hangup_dir=%s, hangup_count=%s, answer_count=%s', event_name, time_cost, latency, call_id, device_id, event_time, is_agent, agent_num, call_info.hangup_dir, call_info.hangup_count, call_info.answer_count)
 
     def reprocessing_idle(self, state_data: AgentDelayStateData):
         agent = self.data_handle_server.get_agent(state_data.saas_id, state_data.agent_num)

+ 1 - 1
src/core/callcenter/callback.py

@@ -64,7 +64,7 @@ class Callback(object):
             self.logger.info("callback:return::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
             return
         device_info = call_info.device_info_map.get(device_id) if call_info and call_info.device_info_map else None
-        self.logger.info("callback::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
+        self.logger.debug("callback::event_name=%s, call_id=%s, device_id=%s", event_name, call_id, device_id)
         self.event_queue.put_nowait((event, call_info, device_info))
 
 

+ 12 - 12
src/core/callcenter/config.py

@@ -35,21 +35,21 @@ dictConfig({
                 "level": "DEBUG",
                 "formatter": "default",
             },
-            "log_file": {
-                "class": "logging.handlers.TimedRotatingFileHandler",  # 按时间切分日志
-                "level": "DEBUG",
-                "formatter": "default",   # 日志输出样式对应formatters
-                "filename": "./logs/flask.log",  # 指定log文件目录
-                "when": "midnight",        # 按天切分,午夜时分创建新文件
-                "interval": 1,             # 间隔1天
-                "backupCount": 10,         # 保留10天的日志文件
-                "encoding": "utf8",        # 文件编码
-            },
+            # "log_file": {
+            #     "class": "logging.handlers.TimedRotatingFileHandler",  # 按时间切分日志
+            #     "level": "DEBUG",
+            #     "formatter": "default",   # 日志输出样式对应formatters
+            #     "filename": "./logs/flask.log",  # 指定log文件目录
+            #     "when": "midnight",        # 按天切分,午夜时分创建新文件
+            #     "interval": 1,             # 间隔1天
+            #     "backupCount": 10,         # 保留10天的日志文件
+            #     "encoding": "utf8",        # 文件编码
+            # },
 
         },
         "root": {
-            "level": "DEBUG",  # # handler中的level会覆盖掉这里的level
-            "handlers": ["console", "log_file"],
+            "level": "INFO",  # # handler中的level会覆盖掉这里的level
+            "handlers": ["console"],
         },
     }
 )

+ 4 - 4
src/core/callcenter/esl/client.py

@@ -129,7 +129,7 @@ class InboundClient:
         event_timestamp = EslEventUtil.getEventTimestamp(e)
         event_time = datetime.fromtimestamp(event_timestamp).strftime('%Y-%m-%d %H:%M:%S')
         address = self.host + ':' + self.port
-        self.logger.info('process_esl_event:event_name=%s, call_id=%s, unique_id=%s, device_id=%s, event_time=%s', event_name, call_id, device_id, wdh_device_id, event_time)
+        self.logger.debug('process_esl_event:event_name=%s, call_id=%s, unique_id=%s, device_id=%s, event_time=%s', event_name, call_id, device_id, wdh_device_id, event_time)
         try:
             self.callback.callback_event(e)
             if event_name in self.handler_table:
@@ -149,7 +149,7 @@ class InboundClient:
             registry.ESL_EVENT_COST.labels(event_name).observe(time_cost)
             latency = (time.time() - event_timestamp) * 1000
             registry.ESL_EVENT_LATENCY.labels(event_name).observe(latency)
-            self.logger.info('process_esl_event:event_name=%s, time_cost=%s, latency=%s, call_id=%s, unique_id=%s, device_id=%s, event_time=%s', event_name, time_cost, latency, call_id, device_id, wdh_device_id, event_time)
+            self.logger.debug('process_esl_event:event_name=%s, time_cost=%s, latency=%s, call_id=%s, unique_id=%s, device_id=%s, event_time=%s', event_name, time_cost, latency, call_id, device_id, wdh_device_id, event_time)
 
     def do_delay_action(self, action, message):
         delay_action = DelayAction.from_json(message)
@@ -693,7 +693,7 @@ class OutboundClient:
             with self.app.app_context():
                 phones = Whitelist.query.filter_by(del_flag=0).all()
                 self.whitelist = [(phone.phone, phone.type) for phone in phones]
-                self.logger.info("Whitelist updated: %s", self.whitelist)
+                self.logger.debug("Whitelist updated: %s", self.whitelist)
 
         def get_whitelist(self):
             return self.whitelist
@@ -720,7 +720,7 @@ class OutboundClient:
             with self.app.app_context():
                 buckets = Bucket.query.filter_by(eid='001').all()
                 self.buckets = buckets
-                self.logger.info("bucket updated: %s", self.buckets)
+                self.logger.debug("bucket updated: %s", self.buckets)
 
     def start(self, HOST='0.0.0.0', PORT=8084, agent=None):
         # HOST, PORT = "0.0.0.0", 8084

+ 23 - 6
src/core/callcenter/test.py

@@ -4,7 +4,7 @@
 # TestStr = "能帮我查一下,我家水费欠多少"
 # seg_list = jieba.cut(TestStr, cut_all=False, HMM=True)
 # print ("Default Mode:", "/ ".join(seg_list))
-
+import json
 import uuid
 import mmh3
 
@@ -19,9 +19,26 @@ def get_bucket(custom_uuid=None, buckets=[]):
     return -1, buckets[0]
 
 if __name__ == '__main__':
-    arr = ['C1879412349555838976','C1879412206890782720','C1879411969535119360','C1879411406290423808','C1879408024871899136','C1879407680997691392','C1879406254007390208','C1879404740748644352','C1879403850650226688','C1879402961977872384','C1879402509785763840','C1879402237567045632','C1879402005592674304','C1879400827102302208','C1879400778024751104','C1879400705488457728','C1879400533513605120','C1879400336188379136','C1879400327959154688','C1879399300082044928','C1879399233669435392','C1879396009050771456','C1879394097295396864','C1879393224498483200','C1879381728368398336','C1879381287505104896','C1879379466774515712','C1879376723787780096','C1879374004641468416','C1879373548330553344','C1879372415646175232','C1879367459866284032','C1879365634769424384','C1879364921326702592','C1879364787436130304','C1879363948554358784','C1879362454358724608','C1879360081448013824','C1879358294565457920','C1879358151116066816','C1879357497190518784','C1879357257641234432','C1879357023229972480','C1879355792935751680','C1879355755749052416','C1879354039309832192']
-    buckets=[Bucket(id=1, name="传统", lower=0, upper=90), Bucket(id=2, name="AI",lower=90, upper=100)]
-    for custom_uuid in arr:
-        num, bucket = get_bucket(custom_uuid=custom_uuid, buckets=buckets)
-        print(custom_uuid, num, bucket.name)
+    # arr = ['C1879412349555838976','C1879412206890782720','C1879411969535119360','C1879411406290423808','C1879408024871899136','C1879407680997691392','C1879406254007390208','C1879404740748644352','C1879403850650226688','C1879402961977872384','C1879402509785763840','C1879402237567045632','C1879402005592674304','C1879400827102302208','C1879400778024751104','C1879400705488457728','C1879400533513605120','C1879400336188379136','C1879400327959154688','C1879399300082044928','C1879399233669435392','C1879396009050771456','C1879394097295396864','C1879393224498483200','C1879381728368398336','C1879381287505104896','C1879379466774515712','C1879376723787780096','C1879374004641468416','C1879373548330553344','C1879372415646175232','C1879367459866284032','C1879365634769424384','C1879364921326702592','C1879364787436130304','C1879363948554358784','C1879362454358724608','C1879360081448013824','C1879358294565457920','C1879358151116066816','C1879357497190518784','C1879357257641234432','C1879357023229972480','C1879355792935751680','C1879355755749052416','C1879354039309832192']
+    # buckets=[Bucket(id=1, name="传统", lower=0, upper=90), Bucket(id=2, name="AI",lower=90, upper=100)]
+    # for custom_uuid in arr:
+    #     num, bucket = get_bucket(custom_uuid=custom_uuid, buckets=buckets)
+    #     print(custom_uuid, num, bucket.name)
+
+    # message = """{"seg_id": 2, "cn": {"st": {"rt": [{"ws": [{"cw": [{"sc": 0.00, "w": "停水", "wp": "n", "rl": "0", "wb": 9, "wc": 0.00, "we": 64}], "wb": 9,"we": 64},{"cw": [{"sc": 0.00, "w": "咨询", "wp": "n", "rl": "0", "wb": 65, "wc": 0.00, "we": 132}], "wb": 65,"we": 132}]}], "bg": "9510", "type": "0", "ed": "10950"}}, "ls": false}"""
+    # message = '{"action":"result","code":"0","data":"{\"seg_id\":0,\"cn\":{\"st\":{\"rt\":[{\"ws\":[{\"cw\":[{\"sc\":0.00,\"w\":\"蜓\",\"wp\":\"n\",\"rl\":\"0\",\"wb\":13,\"wc\":0.00,\"we\":26}],\"wb\":0,\"we\":0}]}],\"bg\":\"9160\",\"type\":\"1\",\"ed\":\"0\"}},\"ls\":false}","desc":"success","sid":"rta108a8500@dx2f5f1b177d38000100"}'
+    # # 解析最外层 JSON
+    # message_dict = json.loads(message)
+    #
+    # # 解析嵌套的 JSON 字符串
+    # data = json.loads(message_dict['data'])
+    #
+    # # 提取 "w" 字段中的词汇
+    # words = ''.join(cw["w"] for item in data["cn"]["st"]["rt"] for ws in item["ws"] for cw in ws["cw"])
+    #
+    # print(words)
 
+    # 读取文件
+    with open('/Users/davidliu/hot_words.txt', 'r', encoding='utf-8') as f:
+        content = f.read()
+        print(content)

+ 175 - 35
src/core/voip/asr.py

@@ -5,40 +5,13 @@ import os
 import json
 import threading
 import traceback
+from src.core.callcenter import registry
 
 import nls  # 引入阿里云语音识别库
 from aliyunsdkcore.client import AcsClient
 from aliyunsdkcore.request import CommonRequest
 import time
 
-# 获取Token的函数
-def get_token():
-    ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
-    ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
-    client = AcsClient(ak_id, ak_secret, "cn-shanghai")
-    request = CommonRequest()
-    request.set_method('POST')
-    request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
-    request.set_version('2019-02-28')
-    request.set_action_name('CreateToken')
-
-    try:
-        response = client.do_action_with_exception(request)
-        jss = json.loads(response)
-        if 'Token' in jss and 'Id' in jss['Token']:
-            token = jss['Token']['Id']
-            expire_time = jss['Token']['ExpireTime']
-            print(f"Token: {token}, ExpireTime: {expire_time}")
-            return token, int(expire_time)  # 返回Token和过期时间
-        else:
-            print("Token获取失败,响应内容: ", response)
-    except Exception as e:
-        print(f"获取Token时发生错误: {e}")
-    return None, None
-
-# WebSocket服务地址
-URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
-APPKEY = "OKt6jogp6fRjHQVp"  # 你的Appkey
 
 # 定义实时转写类
 class TestSt:
@@ -48,6 +21,32 @@ class TestSt:
         "expire_time": None
     }
 
+    # 获取Token的函数
+    @classmethod
+    def get_token(cls):
+        ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
+        ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
+        client = AcsClient(ak_id, ak_secret, "cn-shanghai")
+        request = CommonRequest()
+        request.set_method('POST')
+        request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
+        request.set_version('2019-02-28')
+        request.set_action_name('CreateToken')
+
+        try:
+            response = client.do_action_with_exception(request)
+            jss = json.loads(response)
+            if 'Token' in jss and 'Id' in jss['Token']:
+                token = jss['Token']['Id']
+                expire_time = jss['Token']['ExpireTime']
+                print(f"Token: {token}, ExpireTime: {expire_time}")
+                return token, int(expire_time)  # 返回Token和过期时间
+            else:
+                print("Token获取失败,响应内容: ", response)
+        except Exception as e:
+            print(f"获取Token时发生错误: {e}")
+        return None, None
+
     @classmethod
     def get_cached_token(cls):
         # 检查是否已有缓存的Token且未过期s):
@@ -60,7 +59,7 @@ class TestSt:
             return cls.token_cache["token"]
 
         # 如果没有缓存Token或者Token已过期,重新获取
-        new_token, expire_time = get_token()
+        new_token, expire_time = cls.get_token()
         if new_token:
             cls.token_cache["token"] = new_token
             cls.token_cache["expire_time"] = expire_time
@@ -80,6 +79,10 @@ class TestSt:
         self.message_receiver = message_receiver
         self._Token = self.get_cached_token()
         self.sr = None
+
+        # WebSocket服务地址
+        self.URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
+        self.APPKEY = "OKt6jogp6fRjHQVp"  # 你的Appkey
         self.logger.debug("开始")
 
     def start(self):
@@ -102,9 +105,9 @@ class TestSt:
         self.__event.clear()
         while not self.__event.is_set():
             self.sr = nls.NlsSpeechTranscriber(
-                url=URL,
+                url=self.URL,
                 token=self._Token,
-                appkey=APPKEY,
+                appkey=self.APPKEY,
                 on_sentence_begin=self.test_on_sentence_begin,
                 on_sentence_end=self.test_on_sentence_end,
                 on_start=self.test_on_start,
@@ -136,12 +139,12 @@ class TestSt:
     def test_on_sentence_begin(self, message, *args):
         self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message)
         if self.message_receiver:
-            self.message_receiver(message, *args)
+            self.message_receiver(self.convert_message(message), *args)
 
     def test_on_sentence_end(self, message, *args):
         self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message)
         if self.message_receiver:
-            self.message_receiver(message, *args)
+            self.message_receiver(self.convert_message(message), *args)
 
     def test_on_start(self, message, *args):
         self.__event.set()
@@ -153,7 +156,7 @@ class TestSt:
         if not self.__event.is_set():
             self.__event.set()
         if self.message_receiver:
-            self.message_receiver(message, *args)
+            self.message_receiver(self.convert_message(message), *args)
 
     def test_on_close(self, *args):
         self.logger.debug("on_close: args=>%s", args)
@@ -164,8 +167,145 @@ class TestSt:
     def test_on_result_chg(self, message, *args):
         # self.logger.debug("test_on_chg:{}".format(message))
         if self.message_receiver:
-            self.message_receiver(message, *args)
+            self.message_receiver(self.convert_message(message), *args)
 
     def test_on_completed(self, message, *args):
         # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message))
         pass
+
+    def convert_message(self, message):
+        final_result = {}
+        message = json.loads(message)
+        if message["header"]["status"] == 20000000:
+            if message["header"]["name"] == "SentenceBegin":
+                final_result['name'] = 'SentenceBegin'
+            if message["header"]["name"] == "SentenceEnd":
+                result = message["payload"]["result"]
+                # self.logger.info("asr返回内容Result:%s", result)
+                final_result['name'] = 'SentenceEnd'
+                final_result['result'] = result
+            elif message["header"]["name"] == "TranscriptionResultChanged":
+                final_result['name'] = 'TranscriptionResultChanged'
+        else:
+            final_result['name'] = 'TranscriptionResultError'
+            final_result['status'] = message['header']['status']
+            final_result['result'] = ''
+            self.logger.info(f"Status is not {message['header']['status']}")
+            registry.ASR_ERRORS.labels(message['header']['status']).inc()
+
+        self.logger.error("aliyun.Asr.recv: call_id:%s, final_result: %s", self.__id, final_result)
+        return final_result
+
+
+# 讯飞ASR实时转写
+class XfAsr:
+
+    def __init__(self, tid, logger, message_receiver=None):
+        self.end_tag = "{\"end\": true}"
+        self.tid = tid
+        self.logger = logger
+        self.message_receiver = message_receiver
+        self.ws = self.new_connection()
+        self.logger.info(f"xunfei.Asr: call_id: {tid}, ws.connected:{self.ws.connected}")
+        self.trecv = threading.Thread(target=self.recv)
+
+    def new_connection(self, base_url = "ws://rtasr.xfyun.cn/v1/ws", app_id = "1ec1097b", api_key = "60b7d2d8d172b065b1c3e723e5ba0696"):
+        import hashlib
+        import hmac
+        import base64
+        # from socket import *
+        # import json, time, threading
+
+        from websocket import create_connection
+        # import websocket
+        from urllib.parse import quote
+        # import logging
+
+        # logging.basicConfig()
+
+        ts = str(int(time.time()))
+        tt = (app_id + ts).encode('utf-8')
+        md5 = hashlib.md5()
+        md5.update(tt)
+        baseString = md5.hexdigest()
+        baseString = bytes(baseString, encoding='utf-8')
+
+        apiKey = api_key.encode('utf-8')
+        signa = hmac.new(apiKey, baseString, hashlib.sha1).digest()
+        signa = base64.b64encode(signa)
+        signa = str(signa, 'utf-8')
+
+        count = 10
+        _ws = None
+        while count > 0:
+            try:
+                _ws = create_connection(base_url + "?appid=" + app_id + "&ts=" + ts + "&signa=" + quote(signa))
+                break
+            except Exception as e:
+                count -= 1
+                self.logger.info("new_connection:exception, call_id: %s, count=%s, message=%s", self.tid, count, e)
+                time.sleep(.010)
+        return _ws
+
+    def start(self):
+        self.trecv.start()
+
+    def send_audio(self, chunk):
+        # self.logger.debug('xunfei.Asr.send_audio: call_id: %s, chunk:%s, %s', self.tid, len(chunk), chunk)
+        if self.ws:
+            self.ws.send(chunk)
+
+    def close(self):
+        if self.ws:
+            self.ws.send(bytes(self.end_tag.encode('utf-8')))
+            self.ws.close()
+        self.logger.info("connection closed call_id: %s", self.tid)
+
+    def recv(self):
+        try:
+            self.logger.info(f"xunfei.Asr.recv: call_id: {self.tid}, ws.connected:{self.ws.connected}")
+            while self.ws and self.ws.connected:
+                message = str(self.ws.recv())
+                if len(message) == 0:
+                    self.logger.info("xunfei.Asr.recv: receive result end call_id: %s", self.tid)
+                    break
+
+                self.logger.info("xunfei.Asr.recv: call_id: %s, message :%s", self.tid, message)
+                if self.message_receiver:
+                    self.message_receiver(self.convert_message(message))
+
+        except Exception as e:
+            traceback.print_exc()
+            self.logger.error("xunfei.Asr.recv: receive result end, call_id: %s, message: %s", self.tid, e)
+
+    def convert_message(self, message):
+        final_result = {}
+        result_dict = json.loads(message)
+        if result_dict["code"] == "0":
+            if result_dict["action"] == "started":
+                final_result['name'] = 'SentenceBegin'
+            elif result_dict["action"] == "result":
+                result_dict = json.loads(message)
+                result_1 = json.loads(result_dict["data"])
+                st = result_1["cn"]["st"]
+                rt = st["rt"]
+                if st['type'] == "1":
+                    final_result['name'] = 'TranscriptionResultChanged'
+                if st['type'] == "0":
+                    final_result['name'] = 'SentenceEnd'
+                final_result['result'] = ''.join(cw["w"] for item in rt for ws in item["ws"] for cw in ws["cw"]).strip()
+            elif result_dict["action"] == "error":
+                self.logger.error("xunfei.Asr.recv: call_id: %s, action is error: %s", self.tid, message)
+                final_result['name'] = 'TranscriptionResultError'
+                final_result['result'] = message
+                if self.ws:
+                    self.ws.close()
+        else:
+            self.logger.error("xunfei.Asr.recv: call_id: %s, Status is not: %s", self.tid, result_dict["code"])
+            final_result['name'] = 'TranscriptionResultError'
+            final_result['status'] = result_dict["code"]
+            registry.ASR_ERRORS.labels(result_dict["code"]).inc()
+            if self.ws:
+                self.ws.close()
+        self.logger.error("xunfei.Asr.recv: call_id: %s, final_result: %s", self.tid, final_result)
+        return final_result

+ 33 - 49
src/core/voip/bot.py

@@ -9,6 +9,8 @@ import queue
 import threading
 import traceback
 import sys
+from random import randint
+
 import pjsua2 as pj
 from enum import Enum
 from datetime import datetime
@@ -34,7 +36,7 @@ calls = {}
 # recording_file = '/code/src/core/voip/incoming_call.wav'
 # player_file1 = '/code/src/core/voip/test111.wav'
 # player_file2 = '/code/src/core/voip/test222.wav'
-from src.core.voip.asr import TestSt
+from src.core.voip.asr import TestSt, XfAsr
 
 class BotStatus(Enum):
     # 等待用户讲话,未超时
@@ -82,7 +84,7 @@ class MyAudioMediaPort(pj.AudioMediaPort):
         if self.asr:  # 如果ASR实例存在,则发送音频数据
             if self.first:
                 self.first = False
-                self.call.logger.debug("Received audio frame: %s, %s %s", self.call.session_id,frame.buf, frame.size)
+                self.call.logger.warn("Received audio frame: %s, %s, %s", self.call.session_id,frame.buf,frame.size)
             self.asr.send_audio(frame.buf)
 
         try:
@@ -203,7 +205,6 @@ class MyCall(pj.Call):
         self.call_phone = kwargs.get("Caller-Caller-ID-Number")
         self.logger.info("daviddebugger::self.session_id:%s, self.call_phone:%s", self.session_id,self.call_phone)
         self.taskId = "10001"
-        # self.scripts = build_demo_script()
         self.user_asr_text_queue = queue.Queue(maxsize=100)
         self.message_queue = queue.Queue(maxsize=3)
         self.player_complete_dict = {}
@@ -216,7 +217,8 @@ class MyCall(pj.Call):
 
         self.cur_player_file = None   #当前播放的文件
 
-        self.asr = TestSt(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
+        # self.asr = TestSt(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
+        self.asr = XfAsr(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
         self.asr.start()  # 启动ASR线程
 
         self.start_time = time.time()  # 当前机器人对话开始时间
@@ -347,17 +349,25 @@ class MyCall(pj.Call):
         # 判断是否播放完成 否则不记录用户说的内容
         if not self.is_play_complete():
             return
-        message = json.loads(message)
-        if message["header"]["status"] == 20000000:
-            if message["header"]["name"] == "SentenceEnd":
-                result = message["payload"]["result"]
-                # self.logger.info("asr返回内容Result:%s", result)
-                self.user_asr_text_queue.put(result)
-            elif message["header"]["name"] == "TranscriptionResultChanged":
-                self.reset_wait_time()
-        else:
-            self.logger.info(f"Status is not {message['header']['status']}")
-            registry.ASR_ERRORS.labels(message['header']['status']).inc()
+        # self.logger.info("on_receiver_asr_result:message: %s", message)
+        if message["name"] == "SentenceEnd":
+            self.user_asr_text_queue.put(message["result"])
+        elif message["name"] == "TranscriptionResultChanged":
+            self.reset_wait_time()
+        elif message["name"] == "TranscriptionResultError":
+            pass
+
+        # message = json.loads(message)
+        # if message["header"]["status"] == 20000000:
+        #     if message["header"]["name"] == "SentenceEnd":
+        #         result = message["payload"]["result"]
+        #         # self.logger.info("asr返回内容Result:%s", result)
+        #         self.user_asr_text_queue.put(result)
+        #     elif message["header"]["name"] == "TranscriptionResultChanged":
+        #         self.reset_wait_time()
+        # else:
+        #     self.logger.info(f"Status is not {message['header']['status']}")
+        #     registry.ASR_ERRORS.labels(message['header']['status']).inc()
 
     def on_media_player_complete(self, player_id):
         self.logger.info('player complete')
@@ -404,6 +414,7 @@ class MyCall(pj.Call):
                 print("Success to stopTransmit:")
             except pj.Error as e:
                 print("Failed to stopTransmit:", e)
+            del self.audio_player
             self.audio_player = None  # 或调用播放器停止方法
         if self.audio_port:
             try:
@@ -411,9 +422,11 @@ class MyCall(pj.Call):
                 print("Success to stopTransmit:")
             except pj.Error as e:
                 print("Failed to stopTransmit:", e)
+            del self.audio_port
             self.audio_port = None  # 或调用相关销毁方法
         if self.audio_media:
             # self.audio_media.stopTransmit()
+            del self.audio_media
             self.audio_media = None
 
         self.asr.close()
@@ -587,54 +600,25 @@ class BotAgent:
         try:
             self.cache.set_register_per_hours(expire=timeout_sec - (60*3))
             # Create and initialize the library
-            ep_cfg = pj.EpConfig()
-            ep_cfg.uaConfig.threadCnt = 4
-            ep_cfg.uaConfig.mainThreadOnly = False
-            ep_cfg.uaConfig.maxCalls = 12
-            ep_cfg.uaConfig.maxAccounts = 12
-            ep_cfg.medConfig.noVad = True
-            ep_cfg.logConfig.level = 3
-            ep_cfg.logConfig.consoleLevel = 3
+            ep_cfg = build_ep_config()
             self.ep.libCreate()
             self.ep.libInit(ep_cfg)
 
             aud_dev_mgr = self.ep.audDevManager()
             aud_dev_mgr.setNullDev()  # 使用虚拟音频设备(如果没有实际设备)
+
             # Set up media configuration, particularly jitter buffer
-            media_cfg = pj.MediaConfig()
-            media_cfg.jbMinPre = 4  # Minimum pre-fetch frames
-            media_cfg.jbMaxPre = 16  # Maximum pre-fetch frames
-            media_cfg.noVad = True  # Disable Voice Activity Detection if needed
-            media_cfg.clockRate = 8000
+            media_cfg = build_media_config()
             self.ep.medConfig = media_cfg  # Apply media config to endpoint
 
             # Create SIP transport. Error handling sample is shown
-            sipTpConfig = pj.TransportConfig()
-            sipTpConfig.port = 30506
+            sipTpConfig = build_sip_transport_config()
             self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
             # Start the library
             self.ep.libStart()
 
             for user_part in self.user_part_range:
-                acfg = pj.AccountConfig()
-                acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}"
-                acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
-                cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password)
-                acfg.sipConfig.authCreds.append(cred)
-
-                acfg.regConfig.timeoutSec = timeout_sec  # 注册超时时间(秒)
-                acfg.regConfig.retryIntervalSec = 10  # 重试间隔时间(秒)
-                acfg.regConfig.firstRetryIntervalSec = 10  # 首次重试间隔时间(秒)
-
-                acfg.natConfig.iceEnabled = True
-                acfg.natConfig.turnEnabled = True
-                acfg.natConfig.turnServer = f"stun:{self.host}:3478"
-                # acfg.natConfig.turnUsername = "username"
-                # acfg.natConfig.turnPassword = "password"
-                acfg.natConfig.udpKaIntervalSec = 30
-                acfg.natConfig.contactRewriteUse = 2
-                acfg.natConfig.sdpNatRewriteUse = 2
-
+                acfg = build_account_config(self.host, self.port, user_part, self.password, timeout_sec)
                 # Create the account
                 acc = Account(self, user_part=user_part)
                 acc.create(acfg)

+ 52 - 18
src/core/voip/constant.py

@@ -1,19 +1,30 @@
 #!/usr/bin/env python3
 # encoding:utf-8
 
-import os
 import mmh3
-import queue
 import pjsua2 as pj
 
-player_script_dir = '/code/src/core/voip/scripts/'
 
+def build_ep_config():
+    ep_cfg = pj.EpConfig()
+    ep_cfg.uaConfig.threadCnt = 4
+    ep_cfg.uaConfig.mainThreadOnly = False
+    ep_cfg.uaConfig.maxCalls = 12
+    ep_cfg.uaConfig.maxAccounts = 12
+    ep_cfg.medConfig.noVad = True
+    ep_cfg.logConfig.level = 3
+    ep_cfg.logConfig.consoleLevel = 3
+    return ep_cfg
 
-def murmur3_32(player_file):
-    if isinstance(player_file, list):
-        player_file = ','.join(player_file)
-    return abs(mmh3.hash(player_file))
-
+def build_media_config():
+    media_cfg = pj.MediaConfig()
+    media_cfg.jbMinPre = 4  # Minimum pre-fetch frames
+    media_cfg.jbMaxPre = 16  # Maximum pre-fetch frames
+    media_cfg.noVad = True  # Disable Voice Activity Detection if needed
+    media_cfg.clockRate = 16000
+    media_cfg.channelCount = 1
+    media_cfg.audioFramePtime = 40
+    return media_cfg
 
 def build_audio_format():
     fmt = pj.MediaFormatAudio()
@@ -21,17 +32,40 @@ def build_audio_format():
     fmt.id = pj.PJMEDIA_FORMAT_PCM
     fmt.channelCount = 1  # 通道数
     fmt.bitsPerSample = 16  # 每个采样的位数
-    fmt.clockRate = 8000  # 采样率
-    fmt.frameTimeUsec = 12500  # 每帧的时间(20 毫秒)
+    # fmt.clockRate = 8000  # 采样率
+    fmt.clockRate = 16000  # 采样率
+    fmt.frameTimeUsec = 40000  #微秒
     # fmt.clockRate = 16000  # 采样率
-    # fmt.frameTimeUsec = 20000  # 每帧的时间(20 毫秒)
+    # fmt.frameTimeUsec = 20000
     return fmt
 
+def build_sip_transport_config():
+    sip_transport_cfg = pj.TransportConfig()
+    sip_transport_cfg.port = 30506
+    return sip_transport_cfg
+
+def build_account_config(host, port, user_part, password, timeout_sec):
+    acfg = pj.AccountConfig()
+    acfg.idUri = f"sip:{user_part}@{host}:{port}"
+    acfg.regConfig.registrarUri = f"sip:{host}:{port}"
+    cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, password)
+    acfg.sipConfig.authCreds.append(cred)
 
-def build_demo_script():
-    res = queue.Queue(maxsize=10)
-    for file in os.listdir(player_script_dir):
-        file = os.path.join(player_script_dir, file)
-        print('build_demo_script::', file)
-        res.put(file)
-    return res
+    acfg.regConfig.timeoutSec = timeout_sec  # 注册超时时间(秒)
+    acfg.regConfig.retryIntervalSec = 10  # 重试间隔时间(秒)
+    acfg.regConfig.firstRetryIntervalSec = 10  # 首次重试间隔时间(秒)
+
+    acfg.natConfig.iceEnabled = True
+    acfg.natConfig.turnEnabled = True
+    acfg.natConfig.turnServer = f"stun:{host}:3478"
+    # acfg.natConfig.turnUsername = "username"
+    # acfg.natConfig.turnPassword = "password"
+    acfg.natConfig.udpKaIntervalSec = 30
+    acfg.natConfig.contactRewriteUse = 2
+    acfg.natConfig.sdpNatRewriteUse = 2
+    return acfg
+
+def murmur3_32(player_file):
+    if isinstance(player_file, list):
+        player_file = ','.join(player_file)
+    return abs(mmh3.hash(player_file))