Jelajahi Sumber

讯飞实时听写接口

Davidliu 1 bulan lalu
induk
melakukan
9c2571f3d0
2 mengubah file dengan 196 tambahan dan 4 penghapusan
  1. 192 0
      src/core/voip/asr.py
  2. 4 4
      src/core/voip/bot.py

+ 192 - 0
src/core/voip/asr.py

@@ -3,8 +3,10 @@
 
 import os
 import json
+import base64
 import threading
 import traceback
+from datetime import datetime
 from src.core.callcenter import registry
 
 import nls  # 引入阿里云语音识别库
@@ -308,4 +310,194 @@ class XfAsr:
             if self.ws:
                 self.ws.close()
         self.logger.error("xunfei.Asr.recv: call_id: %s, final_result: %s", self.tid, final_result)
+        return final_result
+
+
+class XunfeiAsr:
+
+    STATUS_FIRST_FRAME = 0  # 第一帧的标识
+    STATUS_CONTINUE_FRAME = 1  # 中间帧标识
+    STATUS_LAST_FRAME = 2  # 最后一帧的标识
+
+    def __init__(self, tid, logger, message_receiver=None):
+        self.__id = tid
+        self.logger = logger
+        self.message_receiver = message_receiver
+
+        self.APPID = '1ec1097b'
+        self.APIKey = '7237c29d862daa6fd805f788ed70c409'
+        self.APISecret = 'YTk1YzAyMmQwYjQ3ZDJkYWQyZGQwMDFm'
+        # 公共参数(common)
+        self.CommonArgs = {"app_id": self.APPID}
+        # 业务参数(business),更多个性化参数可在官网查看
+        self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1,"vad_eos":2000}
+
+        self.__event = threading.Event()
+        self.__th = threading.Thread(target=self.__run())
+
+    def create_url(self):
+        import hashlib
+        import hmac
+        from urllib.parse import urlencode
+        from wsgiref.handlers import format_date_time
+        from time import mktime
+
+        url = 'wss://ws-api.xfyun.cn/v2/iat'
+        # 生成RFC1123格式的时间戳
+        now = datetime.now()
+        date = format_date_time(mktime(now.timetuple()))
+
+        # 拼接字符串
+        signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
+        signature_origin += "date: " + date + "\n"
+        signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
+        # 进行hmac-sha256进行加密
+        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
+                                 digestmod=hashlib.sha256).digest()
+        signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
+
+        authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
+            self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
+        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
+        # 将请求的鉴权参数组合为字典
+        v = {
+            "authorization": authorization,
+            "date": date,
+            "host": "ws-api.xfyun.cn"
+        }
+        # 拼接鉴权参数,生成url
+        url = url + '?' + urlencode(v)
+        # print("date: ",date)
+        # print("v: ",v)
+        # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
+        # print('websocket url :', url)
+        return url
+
+    def __run(self):
+        import ssl
+        import websocket
+
+        count = 0
+        self.__event.clear()
+        websocket.enableTrace(True)
+        while not self.__event.is_set():
+            try:
+                # 测试时候在此处正确填写相关信息即可运行
+                time1 = datetime.now()
+                ws_url = self.create_url()
+                self.ws = websocket.WebSocketApp(ws_url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
+                self.ws.on_open = self.on_open
+                self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
+                time_cost = (datetime.now() - time1)
+                self.logger.debug(f"[{self.__id}]ASR session started. timeCost:{time_cost}, count:{count}")
+                self.__event.wait(timeout=.5)
+                self.logger.debug(f"[{self.__id}]ASR session started. {count}")
+            except Exception as e:
+                traceback.print_exc()
+                self.logger.debug(f"[{self.__id}]ASR session start exception. {e}")
+            count += 1
+
+    def start(self):
+        self.__th.start()
+
+    def send_audio(self, audio_data=None, first=False):
+        if not self.ws:
+            self.logger.debug('xunfei.Asr.send_audio:ws_is_None: call_id: %s, chunk:%s, %s', self.__id, len(audio_data), audio_data)
+            return
+        status = XunfeiAsr.STATUS_FIRST_FRAME if first else XunfeiAsr.STATUS_CONTINUE_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
+        if not audio_data:
+            status = XunfeiAsr.STATUS_LAST_FRAME
+        self.logger.debug('xunfei.Asr.send_audio: call_id: %s, status:%s, chunk:%s, %s', self.__id, status, len(audio_data), audio_data)
+
+        # 第一帧处理
+        # 发送第一帧音频,带business 参数
+        # appid 必须带上,只需第一帧发送
+        if status == XunfeiAsr.STATUS_FIRST_FRAME:
+            d = {"common": self.CommonArgs,
+                 "business": self.BusinessArgs,
+                 "data": {"status": 0, "format": "audio/L16;rate=16000",
+                          "audio": str(base64.b64encode(audio_data), 'utf-8'),
+                          "encoding": "raw"}}
+            d = json.dumps(d)
+            self.ws.send(d)
+        # 中间帧处理
+        elif status == XunfeiAsr.STATUS_CONTINUE_FRAME:
+            d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
+                          "audio": str(base64.b64encode(audio_data), 'utf-8'),
+                          "encoding": "raw"}}
+            self.ws.send(json.dumps(d))
+        # 最后一帧处理
+        elif status == XunfeiAsr.STATUS_LAST_FRAME:
+            d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
+                          "audio": str(base64.b64encode(audio_data), 'utf-8'),
+                          "encoding": "raw"}}
+            self.ws.send(json.dumps(d))
+            time.sleep(1)
+
+    def close(self):
+        try:
+            self.send_audio()
+            self.ws.close()
+        except Exception as e:
+            self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}")
+
+    # 收到websocket连接建立的处理
+    def on_open(self, ws):
+        self.logger.info("xunfei.Asr.open: call_id: %s", self.__id)
+        self.__event.set()
+
+    # 收到websocket消息的处理
+    def on_message(self, ws, message):
+        try:
+            self.logger.info("xunfei.Asr.recv: call_id: %s, message :%s", self.__id, message)
+            code = json.loads(message)["code"]
+            sid = json.loads(message)["sid"]
+            if code != 0:
+                errMsg = json.loads(message)["message"]
+                self.logger.info("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
+
+            else:
+                data = json.loads(message)["data"]["result"]["ws"]
+                # print(json.loads(message))
+                result = ""
+                for i in data:
+                    for w in i["cw"]:
+                        result += w["w"]
+                self.logger.info("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False)))
+        except Exception as e:
+            self.logger.error("receive msg,but parse exception:%s", e)
+
+    # 收到websocket错误的处理
+    def on_error(self, ws, error):
+        self.logger.error("xunfei.Asr.recv::error, call_id:%s, msg:%s", self.__id, error)
+        if not self.__event.is_set():
+            self.__event.set()
+
+    # 收到websocket关闭的处理
+    def on_close(self, ws, a, b):
+        self.logger.error("xunfei.Asr.recv::close, call_id:%s", self.__id)
+        if not self.__event.is_set():
+            self.__event.set()
+
+    def convert_message(self, message):
+        final_result = {}
+        message = json.loads(message)
+        if message["header"]["status"] == 20000000:
+            if message["header"]["name"] == "SentenceBegin":
+                final_result['name'] = 'SentenceBegin'
+            if message["header"]["name"] == "SentenceEnd":
+                result = message["payload"]["result"]
+                # self.logger.info("asr返回内容Result:%s", result)
+                final_result['name'] = 'SentenceEnd'
+                final_result['result'] = result
+            elif message["header"]["name"] == "TranscriptionResultChanged":
+                final_result['name'] = 'TranscriptionResultChanged'
+        else:
+            final_result['name'] = 'TranscriptionResultError'
+            final_result['status'] = message['header']['status']
+            final_result['result'] = ''
+            self.logger.info(f"Status is not {message['header']['status']}")
+            registry.ASR_ERRORS.labels(message['header']['status']).inc()
+
+        self.logger.error("xunfei.Asr.recv: call_id:%s, final_result: %s", self.__id, final_result)
         return final_result

+ 4 - 4
src/core/voip/bot.py

@@ -36,7 +36,7 @@ calls = {}
 # recording_file = '/code/src/core/voip/incoming_call.wav'
 # player_file1 = '/code/src/core/voip/test111.wav'
 # player_file2 = '/code/src/core/voip/test222.wav'
-from src.core.voip.asr import TestSt, XfAsr
+from src.core.voip.asr import TestSt, XunfeiAsr
 
 class BotStatus(Enum):
     # 等待用户讲话,未超时
@@ -83,9 +83,9 @@ class MyAudioMediaPort(pj.AudioMediaPort):
         # self.wav.writeframes(bytes(frame.buf))
         if self.asr:  # 如果ASR实例存在,则发送音频数据
             if self.first:
-                self.first = False
                 self.call.logger.warn("Received audio frame: %s, %s, %s", self.call.session_id,frame.buf,frame.size)
-            self.asr.send_audio(frame.buf)
+            self.asr.send_audio(frame.buf, first=self.first)
+            self.first = False
 
         try:
             asr_text = self.get_asr_text()
@@ -218,7 +218,7 @@ class MyCall(pj.Call):
         self.cur_player_file = None   #当前播放的文件
 
         # self.asr = TestSt(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
-        self.asr = XfAsr(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
+        self.asr = XunfeiAsr(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
         self.asr.start()  # 启动ASR线程
 
         self.start_time = time.time()  # 当前机器人对话开始时间