#!/usr/bin/env python3 # encoding:utf-8 import os import json import threading import traceback import nls # 引入阿里云语音识别库 from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest import time # 获取Token的函数 def get_token(): ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR" ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99" client = AcsClient(ak_id, ak_secret, "cn-shanghai") request = CommonRequest() request.set_method('POST') request.set_domain('nls-meta.cn-shanghai.aliyuncs.com') request.set_version('2019-02-28') request.set_action_name('CreateToken') try: response = client.do_action_with_exception(request) jss = json.loads(response) if 'Token' in jss and 'Id' in jss['Token']: token = jss['Token']['Id'] expire_time = jss['Token']['ExpireTime'] print(f"Token: {token}, ExpireTime: {expire_time}") return token, int(expire_time) # 返回Token和过期时间 else: print("Token获取失败,响应内容: ", response) except Exception as e: print(f"获取Token时发生错误: {e}") return None, None # WebSocket服务地址 URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1" APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey # 定义实时转写类 class TestSt: # 静态变量用于缓存Token token_cache = { "token": None, "expire_time": None } @classmethod def get_cached_token(cls): # 检查是否已有缓存的Token且未过期s): # # 检查是否已有缓存的Token且未 current_time = int(time.time()) # if cls.token_cache["token"] and cls.token_cache["expire_time"]: if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60: # if current_time < cls.token_cache["expire_time"]: # print("使用缓存的Token") return cls.token_cache["token"] # 如果没有缓存Token或者Token已过期,重新获取 new_token, expire_time = get_token() if new_token: cls.token_cache["token"] = new_token cls.token_cache["expire_time"] = expire_time print("获取新的Token") return new_token else: print("无法获取Token") return None def __init__(self, tid, logger, message_receiver=None): # self.is_closed = False # self.lock = threading.Lock() self.logger = logger self.__event = threading.Event() self.__th = threading.Thread(target=self.__test_run) self.__id = tid self.message_receiver = message_receiver self._Token = self.get_cached_token() self.sr = None self.logger.debug("开始") def start(self): self.__th.start() def send_audio(self, audio_data): if self.sr: self.sr.send_audio(audio_data) def close(self): try: self.sr.stop() except Exception as e: self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}") def __test_run(self): self.logger.debug("Thread:%s start..",self.__id) nls.enableTrace(True) count = 0 self.__event.clear() while not self.__event.is_set(): self.sr = nls.NlsSpeechTranscriber( url=URL, token=self._Token, appkey=APPKEY, on_sentence_begin=self.test_on_sentence_begin, on_sentence_end=self.test_on_sentence_end, on_start=self.test_on_start, on_result_changed=self.test_on_result_chg, on_completed=self.test_on_completed, on_error=self.test_on_error, on_close=self.test_on_close, callback_args=[self.__id] ) try: self.sr.start( aformat="pcm", sample_rate=8000, enable_intermediate_result=True, enable_punctuation_prediction=True, enable_inverse_text_normalization=True ) self.sr.ctrl(ex={'max_sentence_silence': '2000ms', 'disfluency': True,'enable_words': True }) self.logger.debug(f"[{self.__id}]ASR session started. {count}") self.__event.wait(timeout=.5) self.logger.debug(f"[{self.__id}]ASR session started. {count}") except Exception as e: traceback.print_exc() self.logger.debug(f"[{self.__id}]ASR session start exception. {e}") count = count + 1 def test_on_sentence_begin(self, message, *args): self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message) if self.message_receiver: self.message_receiver(message, *args) def test_on_sentence_end(self, message, *args): self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message) if self.message_receiver: self.message_receiver(message, *args) def test_on_start(self, message, *args): self.__event.set() self.logger.debug("[%s]test_on_start:%s", self.__id, message) pass def test_on_error(self, message, *args): self.logger.debug("on_error args=>%s", args) if not self.__event.is_set(): self.__event.set() if self.message_receiver: self.message_receiver(message, *args) def test_on_close(self, *args): self.logger.debug("on_close: args=>%s", args) if not self.__event.is_set(): self.__event.set() pass def test_on_result_chg(self, message, *args): # self.logger.debug("test_on_chg:{}".format(message)) if self.message_receiver: self.message_receiver(message, *args) def test_on_completed(self, message, *args): # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message)) pass