123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- #!/usr/bin/env python3
- # encoding:utf-8
- import os
- import json
- import threading
- import traceback
- import nls # 引入阿里云语音识别库
- from aliyunsdkcore.client import AcsClient
- from aliyunsdkcore.request import CommonRequest
- import time
- # 定义实时转写类
- class TestSt:
- # 静态变量用于缓存Token
- token_cache = {
- "token": None,
- "expire_time": None
- }
- # 获取Token的函数
- @classmethod
- def get_token(cls):
- ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
- ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
- client = AcsClient(ak_id, ak_secret, "cn-shanghai")
- request = CommonRequest()
- request.set_method('POST')
- request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
- request.set_version('2019-02-28')
- request.set_action_name('CreateToken')
- try:
- response = client.do_action_with_exception(request)
- jss = json.loads(response)
- if 'Token' in jss and 'Id' in jss['Token']:
- token = jss['Token']['Id']
- expire_time = jss['Token']['ExpireTime']
- print(f"Token: {token}, ExpireTime: {expire_time}")
- return token, int(expire_time) # 返回Token和过期时间
- else:
- print("Token获取失败,响应内容: ", response)
- except Exception as e:
- print(f"获取Token时发生错误: {e}")
- return None, None
- @classmethod
- def get_cached_token(cls):
- # 检查是否已有缓存的Token且未过期s):
- # # 检查是否已有缓存的Token且未
- current_time = int(time.time())
- # if cls.token_cache["token"] and cls.token_cache["expire_time"]:
- if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60:
- # if current_time < cls.token_cache["expire_time"]:
- # print("使用缓存的Token")
- return cls.token_cache["token"]
- # 如果没有缓存Token或者Token已过期,重新获取
- new_token, expire_time = cls.get_token()
- if new_token:
- cls.token_cache["token"] = new_token
- cls.token_cache["expire_time"] = expire_time
- print("获取新的Token")
- return new_token
- else:
- print("无法获取Token")
- return None
- def __init__(self, tid, logger, message_receiver=None):
- # self.is_closed = False
- # self.lock = threading.Lock()
- self.logger = logger
- self.__event = threading.Event()
- self.__th = threading.Thread(target=self.__test_run)
- self.__id = tid
- self.message_receiver = message_receiver
- self._Token = self.get_cached_token()
- self.sr = None
- # WebSocket服务地址
- self.URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
- self.APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey
- self.logger.debug("开始")
- def start(self):
- self.__th.start()
- def send_audio(self, audio_data):
- if self.sr:
- self.sr.send_audio(audio_data)
- def close(self):
- try:
- self.sr.stop()
- except Exception as e:
- self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}")
- def __test_run(self):
- self.logger.debug("Thread:%s start..",self.__id)
- nls.enableTrace(True)
- count = 0
- self.__event.clear()
- while not self.__event.is_set():
- self.sr = nls.NlsSpeechTranscriber(
- url=self.URL,
- token=self._Token,
- appkey=self.APPKEY,
- on_sentence_begin=self.test_on_sentence_begin,
- on_sentence_end=self.test_on_sentence_end,
- on_start=self.test_on_start,
- on_result_changed=self.test_on_result_chg,
- on_completed=self.test_on_completed,
- on_error=self.test_on_error,
- on_close=self.test_on_close,
- callback_args=[self.__id]
- )
- try:
- self.sr.start(
- aformat="pcm",
- sample_rate=8000,
- enable_intermediate_result=True,
- enable_punctuation_prediction=True,
- enable_inverse_text_normalization=True,
- ex={'max_sentence_silence': 2000, 'disfluency': True, 'enable_words': True}
- )
- # _res = self.sr.ctrl(ex={'max_sentence_silence': 6000, 'disfluency': True,'enable_words': True })
- self.logger.debug(f"[{self.__id}]ASR session started. {count}")
- self.__event.wait(timeout=.5)
- self.logger.debug(f"[{self.__id}]ASR session started. {count}")
- except Exception as e:
- traceback.print_exc()
- self.logger.debug(f"[{self.__id}]ASR session start exception. {e}")
- count = count + 1
- def test_on_sentence_begin(self, message, *args):
- self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message)
- if self.message_receiver:
- self.message_receiver(message, *args)
- def test_on_sentence_end(self, message, *args):
- self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message)
- if self.message_receiver:
- self.message_receiver(message, *args)
- def test_on_start(self, message, *args):
- self.__event.set()
- self.logger.debug("[%s]test_on_start:%s", self.__id, message)
- pass
- def test_on_error(self, message, *args):
- self.logger.debug("on_error args=>%s", args)
- if not self.__event.is_set():
- self.__event.set()
- if self.message_receiver:
- self.message_receiver(message, *args)
- def test_on_close(self, *args):
- self.logger.debug("on_close: args=>%s", args)
- if not self.__event.is_set():
- self.__event.set()
- pass
- def test_on_result_chg(self, message, *args):
- # self.logger.debug("test_on_chg:{}".format(message))
- if self.message_receiver:
- self.message_receiver(message, *args)
- def test_on_completed(self, message, *args):
- # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message))
- pass
- # 讯飞ASR实时转写
- class XfAsr:
- def __init__(self, tid, logger, message_receiver=None):
- import hashlib
- import hmac
- import base64
- # from socket import *
- # import json, time, threading
- from websocket import create_connection
- # import websocket
- from urllib.parse import quote
- app_id = "1ec1097b"
- api_key = "60b7d2d8d172b065b1c3e723e5ba0696"
- base_url = "ws://rtasr.xfyun.cn/v1/ws"
- ts = str(int(time.time()))
- tt = (app_id + ts).encode('utf-8')
- md5 = hashlib.md5()
- md5.update(tt)
- baseString = md5.hexdigest()
- baseString = bytes(baseString, encoding='utf-8')
- apiKey = api_key.encode('utf-8')
- signa = hmac.new(apiKey, baseString, hashlib.sha1).digest()
- signa = base64.b64encode(signa)
- signa = str(signa, 'utf-8')
- self.end_tag = "{\"end\": true}"
- self.tid = tid
- self.logger = logger
- self.message_receiver = message_receiver
- self.ws = create_connection(base_url + "?appid=" + app_id + "&ts=" + ts + "&signa=" + quote(signa))
- self.trecv = threading.Thread(target=self.recv)
- def start(self):
- self.trecv.start()
- # def send(self, file_path):
- # file_object = open(file_path, 'rb')
- # try:
- # index = 1
- # while True:
- # chunk = file_object.read(1280)
- # if not chunk:
- # break
- # self.ws.send(chunk)
- #
- # index += 1
- # time.sleep(0.04)
- # finally:
- # file_object.close()
- #
- # self.ws.send(bytes(self.end_tag.encode('utf-8')))
- # print("send end tag success")
- def send_audio(self, chunk):
- self.ws.send(chunk)
- def close(self):
- self.ws.send(bytes(self.end_tag.encode('utf-8')))
- self.ws.close()
- print("connection closed")
- def recv(self):
- try:
- while self.ws.connected:
- result = str(self.ws.recv())
- if len(result) == 0:
- print("receive result end")
- break
- result_dict = json.loads(result)
- # 解析结果
- if result_dict["action"] == "started":
- print("handshake success, result: " + result)
- if result_dict["action"] == "result":
- result_1 = result_dict
- # result_2 = json.loads(result_1["cn"])
- # result_3 = json.loads(result_2["st"])
- # result_4 = json.loads(result_3["rt"])
- print("rtasr result: " + result_1["data"])
- if result_dict["action"] == "error":
- print("rtasr error: " + result)
- self.ws.close()
- return
- except Exception as e:
- traceback.print_exc()
- print("receive result end", e)
|