#!/usr/bin/env python3 # encoding:utf-8 import os import json import base64 import threading import traceback from datetime import datetime from src.core.callcenter import registry import nls # 引入阿里云语音识别库 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest import time # 定义实时转写类 class TestSt: # 静态变量用于缓存Token token_cache = { "token": None, "expire_time": None } # 获取Token的函数 @classmethod def get_token(cls): ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR" ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99" client = AcsClient(ak_id, ak_secret, "cn-shanghai") request = CommonRequest() request.set_method('POST') request.set_domain('nls-meta.cn-shanghai.aliyuncs.com') request.set_version('2019-02-28') request.set_action_name('CreateToken') try: response = client.do_action_with_exception(request) jss = json.loads(response) if 'Token' in jss and 'Id' in jss['Token']: token = jss['Token']['Id'] expire_time = jss['Token']['ExpireTime'] print(f"Token: {token}, ExpireTime: {expire_time}") return token, int(expire_time) # 返回Token和过期时间 else: print("Token获取失败,响应内容: ", response) except Exception as e: print(f"获取Token时发生错误: {e}") return None, None @classmethod def get_cached_token(cls): # 检查是否已有缓存的Token且未过期s): # # 检查是否已有缓存的Token且未 current_time = int(time.time()) # if cls.token_cache["token"] and cls.token_cache["expire_time"]: if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60: # if current_time < cls.token_cache["expire_time"]: # print("使用缓存的Token") return cls.token_cache["token"] # 如果没有缓存Token或者Token已过期,重新获取 new_token, expire_time = cls.get_token() if new_token: cls.token_cache["token"] = new_token cls.token_cache["expire_time"] = expire_time print("获取新的Token") return new_token else: print("无法获取Token") return None def __init__(self, tid, logger, message_receiver=None): # self.is_closed = False # self.lock = threading.Lock() self.logger = logger self.__event = threading.Event() self.__th = threading.Thread(target=self.__test_run) self.__id = tid self.message_receiver = message_receiver self._Token = self.get_cached_token() self.sr = None # WebSocket服务地址 self.URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1" self.APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey self.logger.debug("开始") def start(self): self.__th.start() def send_audio(self, audio_data): if self.sr: self.sr.send_audio(audio_data) def close(self): try: self.sr.stop() except Exception as e: self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}") def __test_run(self): self.logger.debug("Thread:%s start..",self.__id) nls.enableTrace(True) count = 0 self.__event.clear() while not self.__event.is_set(): self.sr = nls.NlsSpeechTranscriber( url=self.URL, token=self._Token, appkey=self.APPKEY, on_sentence_begin=self.test_on_sentence_begin, on_sentence_end=self.test_on_sentence_end, on_start=self.test_on_start, on_result_changed=self.test_on_result_chg, on_completed=self.test_on_completed, on_error=self.test_on_error, on_close=self.test_on_close, callback_args=[self.__id] ) try: self.sr.start( aformat="pcm", sample_rate=8000, enable_intermediate_result=True, enable_punctuation_prediction=True, enable_inverse_text_normalization=True, ex={'max_sentence_silence': 2000, 'disfluency': True, 'enable_words': True} ) # _res = self.sr.ctrl(ex={'max_sentence_silence': 6000, 'disfluency': True,'enable_words': True }) self.logger.debug(f"[{self.__id}]ASR session started. {count}") self.__event.wait(timeout=.5) self.logger.debug(f"[{self.__id}]ASR session started. {count}") except Exception as e: traceback.print_exc() self.logger.debug(f"[{self.__id}]ASR session start exception. {e}") count = count + 1 def test_on_sentence_begin(self, message, *args): self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message) if self.message_receiver: self.message_receiver(self.convert_message(message), *args) def test_on_sentence_end(self, message, *args): self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message) if self.message_receiver: self.message_receiver(self.convert_message(message), *args) def test_on_start(self, message, *args): self.__event.set() self.logger.debug("[%s]test_on_start:%s", self.__id, message) pass def test_on_error(self, message, *args): self.logger.debug("on_error args=>%s", args) if not self.__event.is_set(): self.__event.set() if self.message_receiver: self.message_receiver(self.convert_message(message), *args) def test_on_close(self, *args): self.logger.debug("on_close: args=>%s", args) if not self.__event.is_set(): self.__event.set() pass def test_on_result_chg(self, message, *args): # self.logger.debug("test_on_chg:{}".format(message)) if self.message_receiver: self.message_receiver(self.convert_message(message), *args) def test_on_completed(self, message, *args): # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message)) pass def convert_message(self, message): final_result = {} message = json.loads(message) if message["header"]["status"] == 20000000: if message["header"]["name"] == "SentenceBegin": final_result['name'] = 'SentenceBegin' if message["header"]["name"] == "SentenceEnd": result = message["payload"]["result"] # self.logger.info("asr返回内容Result:%s", result) final_result['name'] = 'SentenceEnd' final_result['result'] = result elif message["header"]["name"] == "TranscriptionResultChanged": final_result['name'] = 'TranscriptionResultChanged' else: final_result['name'] = 'TranscriptionResultError' final_result['status'] = message['header']['status'] final_result['result'] = '' self.logger.info(f"Status is not {message['header']['status']}") registry.ASR_ERRORS.labels(message['header']['status']).inc() self.logger.error("aliyun.Asr.recv: call_id:%s, final_result: %s", self.__id, final_result) return final_result # 讯飞ASR实时转写 class XfAsr: def __init__(self, tid, logger, message_receiver=None): self.end_tag = "{\"end\": true}" self.tid = tid self.logger = logger self.message_receiver = message_receiver self.ws = self.new_connection() self.logger.info(f"xunfei.Asr: call_id: {tid}, ws.connected:{self.ws.connected}") self.trecv = threading.Thread(target=self.recv) def new_connection(self, base_url = "ws://rtasr.xfyun.cn/v1/ws", app_id = "1ec1097b", api_key = "60b7d2d8d172b065b1c3e723e5ba0696"): import hashlib import hmac import base64 # from socket import * # import json, time, threading from websocket import create_connection # import websocket from urllib.parse import quote # import logging # logging.basicConfig() ts = str(int(time.time())) tt = (app_id + ts).encode('utf-8') md5 = hashlib.md5() md5.update(tt) baseString = md5.hexdigest() baseString = bytes(baseString, encoding='utf-8') apiKey = api_key.encode('utf-8') signa = hmac.new(apiKey, baseString, hashlib.sha1).digest() signa = base64.b64encode(signa) signa = str(signa, 'utf-8') count = 10 _ws = None while count > 0: try: _ws = create_connection(base_url + "?appid=" + app_id + "&ts=" + ts + "&signa=" + quote(signa)) break except Exception as e: count -= 1 self.logger.info("new_connection:exception, call_id: %s, count=%s, message=%s", self.tid, count, e) time.sleep(.010) return _ws def start(self): self.trecv.start() def send_audio(self, chunk): # self.logger.debug('xunfei.Asr.send_audio: call_id: %s, chunk:%s, %s', self.tid, len(chunk), chunk) if self.ws: self.ws.send(chunk) def close(self): if self.ws: self.ws.send(bytes(self.end_tag.encode('utf-8'))) self.ws.close() self.logger.info("connection closed call_id: %s", self.tid) def recv(self): try: self.logger.info(f"xunfei.Asr.recv: call_id: {self.tid}, ws.connected:{self.ws.connected}") while self.ws and self.ws.connected: message = str(self.ws.recv()) if len(message) == 0: self.logger.info("xunfei.Asr.recv: receive result end call_id: %s", self.tid) break self.logger.info("xunfei.Asr.recv: call_id: %s, message :%s", self.tid, message) if self.message_receiver: self.message_receiver(self.convert_message(message)) except Exception as e: traceback.print_exc() self.logger.error("xunfei.Asr.recv: receive result end, call_id: %s, message: %s", self.tid, e) def convert_message(self, message): final_result = {} result_dict = json.loads(message) if result_dict["code"] == "0": if result_dict["action"] == "started": final_result['name'] = 'SentenceBegin' elif result_dict["action"] == "result": result_dict = json.loads(message) result_1 = json.loads(result_dict["data"]) st = result_1["cn"]["st"] rt = st["rt"] if st['type'] == "1": final_result['name'] = 'TranscriptionResultChanged' if st['type'] == "0": final_result['name'] = 'SentenceEnd' final_result['result'] = ''.join(cw["w"] for item in rt for ws in item["ws"] for cw in ws["cw"]).strip() elif result_dict["action"] == "error": self.logger.error("xunfei.Asr.recv: call_id: %s, action is error: %s", self.tid, message) final_result['name'] = 'TranscriptionResultError' final_result['result'] = message if self.ws: self.ws.close() else: self.logger.error("xunfei.Asr.recv: call_id: %s, Status is not: %s", self.tid, result_dict["code"]) final_result['name'] = 'TranscriptionResultError' final_result['status'] = result_dict["code"] registry.ASR_ERRORS.labels(result_dict["code"]).inc() if self.ws: self.ws.close() self.logger.error("xunfei.Asr.recv: call_id: %s, final_result: %s", self.tid, final_result) return final_result class XunfeiAsr: STATUS_FIRST_FRAME = 0 # 第一帧的标识 STATUS_CONTINUE_FRAME = 1 # 中间帧标识 STATUS_LAST_FRAME = 2 # 最后一帧的标识 def __init__(self, tid, logger, message_receiver=None): self.__id = tid self.logger = logger self.message_receiver = message_receiver self.APPID = '1ec1097b' self.APIKey = '7237c29d862daa6fd805f788ed70c409' self.APISecret = 'YTk1YzAyMmQwYjQ3ZDJkYWQyZGQwMDFm' # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1,"vad_eos":2000} self.__event = threading.Event() self.__th = threading.Thread(target=self.__run()) def create_url(self): import hashlib import hmac from urllib.parse import urlencode from wsgiref.handlers import format_date_time from time import mktime url = 'wss://ws-api.xfyun.cn/v2/iat' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/iat " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) # print("date: ",date) # print("v: ",v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 # print('websocket url :', url) return url def __run(self): import ssl import websocket count = 0 self.__event.clear() websocket.enableTrace(True) while not self.__event.is_set(): try: # 测试时候在此处正确填写相关信息即可运行 time1 = datetime.now() ws_url = self.create_url() self.logger.info("xunfei.Asr.call_id:%s, ws_url:%s", self.__id, ws_url) self.ws = websocket.WebSocketApp(ws_url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close) self.logger.info("xunfei.Asr.call_id:%s, 111111111", self.__id) self.ws.on_open = self.on_open self.logger.info("xunfei.Asr.call_id:%s, 2222222222", self.__id) self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) self.logger.info("xunfei.Asr.call_id:%s, 3333333333", self.__id) time_cost = (datetime.now() - time1) self.logger.info(f"xunfei.Asr.started. call_id:{self.__id}, timeCost:{time_cost}, count:{count}") self.__event.wait(timeout=1) self.logger.info(f"xunfei.Asr.started. call_id:{self.__id}, {count}") except Exception as e: traceback.print_exc() self.logger.info(f"[{self.__id}]ASR session start exception. {e}") count += 1 def start(self): self.__th.start() def send_audio(self, audio_data=None, first=False): if not self.ws: self.logger.info('xunfei.Asr.send_audio:ws_is_None: call_id: %s, chunk:%s, %s', self.__id, len(audio_data), audio_data) return status = XunfeiAsr.STATUS_FIRST_FRAME if first else XunfeiAsr.STATUS_CONTINUE_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧 if not audio_data: status = XunfeiAsr.STATUS_LAST_FRAME self.logger.info('xunfei.Asr.send_audio: call_id: %s, status:%s, chunk:%s, %s', self.__id, status, len(audio_data), audio_data) # 第一帧处理 # 发送第一帧音频,带business 参数 # appid 必须带上,只需第一帧发送 if status == XunfeiAsr.STATUS_FIRST_FRAME: d = {"common": self.CommonArgs, "business": self.BusinessArgs, "data": {"status": 0, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(audio_data), 'utf-8'), "encoding": "raw"}} d = json.dumps(d) self.ws.send(d) # 中间帧处理 elif status == XunfeiAsr.STATUS_CONTINUE_FRAME: d = {"data": {"status": 1, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(audio_data), 'utf-8'), "encoding": "raw"}} self.ws.send(json.dumps(d)) # 最后一帧处理 elif status == XunfeiAsr.STATUS_LAST_FRAME: d = {"data": {"status": 2, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(audio_data), 'utf-8'), "encoding": "raw"}} self.ws.send(json.dumps(d)) time.sleep(1) def close(self): try: self.send_audio() self.ws.close() except Exception as e: self.logger.info(f"[{self.__id}]Error stopping ASR: {e}") # 收到websocket连接建立的处理 def on_open(self, ws): self.logger.info("xunfei.Asr.open: call_id: %s", self.__id) self.__event.set() # 收到websocket消息的处理 def on_message(self, ws, message): try: self.logger.info("xunfei.Asr.recv: call_id: %s, message :%s", self.__id, message) code = json.loads(message)["code"] sid = json.loads(message)["sid"] if code != 0: errMsg = json.loads(message)["message"] self.logger.info("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: data = json.loads(message)["data"]["result"]["ws"] # print(json.loads(message)) result = "" for i in data: for w in i["cw"]: result += w["w"] self.logger.info("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False))) except Exception as e: self.logger.error("receive msg,but parse exception:%s", e) # 收到websocket错误的处理 def on_error(self, ws, error): self.logger.error("xunfei.Asr.recv::error, call_id:%s, msg:%s", self.__id, error) if not self.__event.is_set(): self.__event.set() # 收到websocket关闭的处理 def on_close(self, ws, a, b): self.logger.error("xunfei.Asr.recv::close, call_id:%s", self.__id) if not self.__event.is_set(): self.__event.set() def convert_message(self, message): final_result = {} message = json.loads(message) if message["header"]["status"] == 20000000: if message["header"]["name"] == "SentenceBegin": final_result['name'] = 'SentenceBegin' if message["header"]["name"] == "SentenceEnd": result = message["payload"]["result"] # self.logger.info("asr返回内容Result:%s", result) final_result['name'] = 'SentenceEnd' final_result['result'] = result elif message["header"]["name"] == "TranscriptionResultChanged": final_result['name'] = 'TranscriptionResultChanged' else: final_result['name'] = 'TranscriptionResultError' final_result['status'] = message['header']['status'] final_result['result'] = '' self.logger.info(f"Status is not {message['header']['status']}") registry.ASR_ERRORS.labels(message['header']['status']).inc() self.logger.error("xunfei.Asr.recv: call_id:%s, final_result: %s", self.__id, final_result) return final_result