#!/usr/bin/env python3 # encoding:utf-8 import os import json import threading import traceback import nls # 引入阿里云语音识别库 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest import time # 定义实时转写类 class TestSt: # 静态变量用于缓存Token token_cache = { "token": None, "expire_time": None } # 获取Token的函数 @classmethod def get_token(cls): ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR" ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99" client = AcsClient(ak_id, ak_secret, "cn-shanghai") request = CommonRequest() request.set_method('POST') request.set_domain('nls-meta.cn-shanghai.aliyuncs.com') request.set_version('2019-02-28') request.set_action_name('CreateToken') try: response = client.do_action_with_exception(request) jss = json.loads(response) if 'Token' in jss and 'Id' in jss['Token']: token = jss['Token']['Id'] expire_time = jss['Token']['ExpireTime'] print(f"Token: {token}, ExpireTime: {expire_time}") return token, int(expire_time) # 返回Token和过期时间 else: print("Token获取失败,响应内容: ", response) except Exception as e: print(f"获取Token时发生错误: {e}") return None, None @classmethod def get_cached_token(cls): # 检查是否已有缓存的Token且未过期s): # # 检查是否已有缓存的Token且未 current_time = int(time.time()) # if cls.token_cache["token"] and cls.token_cache["expire_time"]: if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60: # if current_time < cls.token_cache["expire_time"]: # print("使用缓存的Token") return cls.token_cache["token"] # 如果没有缓存Token或者Token已过期,重新获取 new_token, expire_time = cls.get_token() if new_token: cls.token_cache["token"] = new_token cls.token_cache["expire_time"] = expire_time print("获取新的Token") return new_token else: print("无法获取Token") return None def __init__(self, tid, logger, message_receiver=None): # self.is_closed = False # self.lock = threading.Lock() self.logger = logger self.__event = threading.Event() self.__th = threading.Thread(target=self.__test_run) self.__id = tid self.message_receiver = message_receiver self._Token = self.get_cached_token() self.sr = None # WebSocket服务地址 self.URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1" self.APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey self.logger.debug("开始") def start(self): self.__th.start() def send_audio(self, audio_data): if self.sr: self.sr.send_audio(audio_data) def close(self): try: self.sr.stop() except Exception as e: self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}") def __test_run(self): self.logger.debug("Thread:%s start..",self.__id) nls.enableTrace(True) count = 0 self.__event.clear() while not self.__event.is_set(): self.sr = nls.NlsSpeechTranscriber( url=self.URL, token=self._Token, appkey=self.APPKEY, on_sentence_begin=self.test_on_sentence_begin, on_sentence_end=self.test_on_sentence_end, on_start=self.test_on_start, on_result_changed=self.test_on_result_chg, on_completed=self.test_on_completed, on_error=self.test_on_error, on_close=self.test_on_close, callback_args=[self.__id] ) try: self.sr.start( aformat="pcm", sample_rate=8000, enable_intermediate_result=True, enable_punctuation_prediction=True, enable_inverse_text_normalization=True, ex={'max_sentence_silence': 2000, 'disfluency': True, 'enable_words': True} ) # _res = self.sr.ctrl(ex={'max_sentence_silence': 6000, 'disfluency': True,'enable_words': True }) self.logger.debug(f"[{self.__id}]ASR session started. {count}") self.__event.wait(timeout=.5) self.logger.debug(f"[{self.__id}]ASR session started. {count}") except Exception as e: traceback.print_exc() self.logger.debug(f"[{self.__id}]ASR session start exception. {e}") count = count + 1 def test_on_sentence_begin(self, message, *args): self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message) if self.message_receiver: self.message_receiver(message, *args) def test_on_sentence_end(self, message, *args): self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message) if self.message_receiver: self.message_receiver(message, *args) def test_on_start(self, message, *args): self.__event.set() self.logger.debug("[%s]test_on_start:%s", self.__id, message) pass def test_on_error(self, message, *args): self.logger.debug("on_error args=>%s", args) if not self.__event.is_set(): self.__event.set() if self.message_receiver: self.message_receiver(message, *args) def test_on_close(self, *args): self.logger.debug("on_close: args=>%s", args) if not self.__event.is_set(): self.__event.set() pass def test_on_result_chg(self, message, *args): # self.logger.debug("test_on_chg:{}".format(message)) if self.message_receiver: self.message_receiver(message, *args) def test_on_completed(self, message, *args): # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message)) pass # 讯飞ASR实时转写 class XfAsr: def __init__(self, tid, logger, message_receiver=None): import hashlib import hmac import base64 # from socket import * # import json, time, threading from websocket import create_connection # import websocket from urllib.parse import quote app_id = "1ec1097b" api_key = "60b7d2d8d172b065b1c3e723e5ba0696" base_url = "ws://rtasr.xfyun.cn/v1/ws" ts = str(int(time.time())) tt = (app_id + ts).encode('utf-8') md5 = hashlib.md5() md5.update(tt) baseString = md5.hexdigest() baseString = bytes(baseString, encoding='utf-8') apiKey = api_key.encode('utf-8') signa = hmac.new(apiKey, baseString, hashlib.sha1).digest() signa = base64.b64encode(signa) signa = str(signa, 'utf-8') self.end_tag = "{\"end\": true}" self.tid = tid self.logger = logger self.message_receiver = message_receiver self.ws = create_connection(base_url + "?appid=" + app_id + "&ts=" + ts + "&signa=" + quote(signa)) self.trecv = threading.Thread(target=self.recv) def start(self): self.trecv.start() # def send(self, file_path): # file_object = open(file_path, 'rb') # try: # index = 1 # while True: # chunk = file_object.read(1280) # if not chunk: # break # self.ws.send(chunk) # # index += 1 # time.sleep(0.04) # finally: # file_object.close() # # self.ws.send(bytes(self.end_tag.encode('utf-8'))) # print("send end tag success") def send_audio(self, chunk): self.ws.send(chunk) def close(self): self.ws.send(bytes(self.end_tag.encode('utf-8'))) self.ws.close() print("connection closed") def recv(self): try: while self.ws.connected: result = str(self.ws.recv()) if len(result) == 0: print("receive result end") break result_dict = json.loads(result) # 解析结果 if result_dict["action"] == "started": print("handshake success, result: " + result) if result_dict["action"] == "result": result_1 = result_dict # result_2 = json.loads(result_1["cn"]) # result_3 = json.loads(result_2["st"]) # result_4 = json.loads(result_3["rt"]) print("rtasr result: " + result_1["data"]) if result_dict["action"] == "error": print("rtasr error: " + result) self.ws.close() return except Exception as e: traceback.print_exc() print("receive result end", e)