asr.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import os
  4. import json
  5. import threading
  6. import traceback
  7. import nls # 引入阿里云语音识别库
  8. from aliyunsdkcore.client import AcsClient
  9. from aliyunsdkcore.request import CommonRequest
  10. import time
  11. # 定义实时转写类
  12. class TestSt:
  13. # 静态变量用于缓存Token
  14. token_cache = {
  15. "token": None,
  16. "expire_time": None
  17. }
  18. # 获取Token的函数
  19. @classmethod
  20. def get_token(cls):
  21. ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
  22. ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
  23. client = AcsClient(ak_id, ak_secret, "cn-shanghai")
  24. request = CommonRequest()
  25. request.set_method('POST')
  26. request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
  27. request.set_version('2019-02-28')
  28. request.set_action_name('CreateToken')
  29. try:
  30. response = client.do_action_with_exception(request)
  31. jss = json.loads(response)
  32. if 'Token' in jss and 'Id' in jss['Token']:
  33. token = jss['Token']['Id']
  34. expire_time = jss['Token']['ExpireTime']
  35. print(f"Token: {token}, ExpireTime: {expire_time}")
  36. return token, int(expire_time) # 返回Token和过期时间
  37. else:
  38. print("Token获取失败,响应内容: ", response)
  39. except Exception as e:
  40. print(f"获取Token时发生错误: {e}")
  41. return None, None
  42. @classmethod
  43. def get_cached_token(cls):
  44. # 检查是否已有缓存的Token且未过期s):
  45. # # 检查是否已有缓存的Token且未
  46. current_time = int(time.time())
  47. # if cls.token_cache["token"] and cls.token_cache["expire_time"]:
  48. if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60:
  49. # if current_time < cls.token_cache["expire_time"]:
  50. # print("使用缓存的Token")
  51. return cls.token_cache["token"]
  52. # 如果没有缓存Token或者Token已过期,重新获取
  53. new_token, expire_time = cls.get_token()
  54. if new_token:
  55. cls.token_cache["token"] = new_token
  56. cls.token_cache["expire_time"] = expire_time
  57. print("获取新的Token")
  58. return new_token
  59. else:
  60. print("无法获取Token")
  61. return None
  62. def __init__(self, tid, logger, message_receiver=None):
  63. # self.is_closed = False
  64. # self.lock = threading.Lock()
  65. self.logger = logger
  66. self.__event = threading.Event()
  67. self.__th = threading.Thread(target=self.__test_run)
  68. self.__id = tid
  69. self.message_receiver = message_receiver
  70. self._Token = self.get_cached_token()
  71. self.sr = None
  72. # WebSocket服务地址
  73. self.URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
  74. self.APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey
  75. self.logger.debug("开始")
  76. def start(self):
  77. self.__th.start()
  78. def send_audio(self, audio_data):
  79. if self.sr:
  80. self.sr.send_audio(audio_data)
  81. def close(self):
  82. try:
  83. self.sr.stop()
  84. except Exception as e:
  85. self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}")
  86. def __test_run(self):
  87. self.logger.debug("Thread:%s start..",self.__id)
  88. nls.enableTrace(True)
  89. count = 0
  90. self.__event.clear()
  91. while not self.__event.is_set():
  92. self.sr = nls.NlsSpeechTranscriber(
  93. url=self.URL,
  94. token=self._Token,
  95. appkey=self.APPKEY,
  96. on_sentence_begin=self.test_on_sentence_begin,
  97. on_sentence_end=self.test_on_sentence_end,
  98. on_start=self.test_on_start,
  99. on_result_changed=self.test_on_result_chg,
  100. on_completed=self.test_on_completed,
  101. on_error=self.test_on_error,
  102. on_close=self.test_on_close,
  103. callback_args=[self.__id]
  104. )
  105. try:
  106. self.sr.start(
  107. aformat="pcm",
  108. sample_rate=8000,
  109. enable_intermediate_result=True,
  110. enable_punctuation_prediction=True,
  111. enable_inverse_text_normalization=True,
  112. ex={'max_sentence_silence': 2000, 'disfluency': True, 'enable_words': True}
  113. )
  114. # _res = self.sr.ctrl(ex={'max_sentence_silence': 6000, 'disfluency': True,'enable_words': True })
  115. self.logger.debug(f"[{self.__id}]ASR session started. {count}")
  116. self.__event.wait(timeout=.5)
  117. self.logger.debug(f"[{self.__id}]ASR session started. {count}")
  118. except Exception as e:
  119. traceback.print_exc()
  120. self.logger.debug(f"[{self.__id}]ASR session start exception. {e}")
  121. count = count + 1
  122. def test_on_sentence_begin(self, message, *args):
  123. self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message)
  124. if self.message_receiver:
  125. self.message_receiver(message, *args)
  126. def test_on_sentence_end(self, message, *args):
  127. self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message)
  128. if self.message_receiver:
  129. self.message_receiver(message, *args)
  130. def test_on_start(self, message, *args):
  131. self.__event.set()
  132. self.logger.debug("[%s]test_on_start:%s", self.__id, message)
  133. pass
  134. def test_on_error(self, message, *args):
  135. self.logger.debug("on_error args=>%s", args)
  136. if not self.__event.is_set():
  137. self.__event.set()
  138. if self.message_receiver:
  139. self.message_receiver(message, *args)
  140. def test_on_close(self, *args):
  141. self.logger.debug("on_close: args=>%s", args)
  142. if not self.__event.is_set():
  143. self.__event.set()
  144. pass
  145. def test_on_result_chg(self, message, *args):
  146. # self.logger.debug("test_on_chg:{}".format(message))
  147. if self.message_receiver:
  148. self.message_receiver(message, *args)
  149. def test_on_completed(self, message, *args):
  150. # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message))
  151. pass
  152. # 讯飞ASR实时转写
  153. class XfAsr:
  154. def __init__(self, tid, logger, message_receiver=None):
  155. import hashlib
  156. import hmac
  157. import base64
  158. # from socket import *
  159. # import json, time, threading
  160. from websocket import create_connection
  161. # import websocket
  162. from urllib.parse import quote
  163. app_id = "1ec1097b"
  164. api_key = "60b7d2d8d172b065b1c3e723e5ba0696"
  165. base_url = "ws://rtasr.xfyun.cn/v1/ws"
  166. ts = str(int(time.time()))
  167. tt = (app_id + ts).encode('utf-8')
  168. md5 = hashlib.md5()
  169. md5.update(tt)
  170. baseString = md5.hexdigest()
  171. baseString = bytes(baseString, encoding='utf-8')
  172. apiKey = api_key.encode('utf-8')
  173. signa = hmac.new(apiKey, baseString, hashlib.sha1).digest()
  174. signa = base64.b64encode(signa)
  175. signa = str(signa, 'utf-8')
  176. self.end_tag = "{\"end\": true}"
  177. self.tid = tid
  178. self.logger = logger
  179. self.message_receiver = message_receiver
  180. self.ws = create_connection(base_url + "?appid=" + app_id + "&ts=" + ts + "&signa=" + quote(signa))
  181. self.trecv = threading.Thread(target=self.recv)
  182. def start(self):
  183. self.trecv.start()
  184. # def send(self, file_path):
  185. # file_object = open(file_path, 'rb')
  186. # try:
  187. # index = 1
  188. # while True:
  189. # chunk = file_object.read(1280)
  190. # if not chunk:
  191. # break
  192. # self.ws.send(chunk)
  193. #
  194. # index += 1
  195. # time.sleep(0.04)
  196. # finally:
  197. # file_object.close()
  198. #
  199. # self.ws.send(bytes(self.end_tag.encode('utf-8')))
  200. # print("send end tag success")
  201. def send_audio(self, chunk):
  202. self.ws.send(chunk)
  203. def close(self):
  204. self.ws.send(bytes(self.end_tag.encode('utf-8')))
  205. self.ws.close()
  206. print("connection closed")
  207. def recv(self):
  208. try:
  209. while self.ws.connected:
  210. result = str(self.ws.recv())
  211. if len(result) == 0:
  212. print("receive result end")
  213. break
  214. result_dict = json.loads(result)
  215. # 解析结果
  216. if result_dict["action"] == "started":
  217. print("handshake success, result: " + result)
  218. if result_dict["action"] == "result":
  219. result_1 = result_dict
  220. # result_2 = json.loads(result_1["cn"])
  221. # result_3 = json.loads(result_2["st"])
  222. # result_4 = json.loads(result_3["rt"])
  223. print("rtasr result: " + result_1["data"])
  224. if result_dict["action"] == "error":
  225. print("rtasr error: " + result)
  226. self.ws.close()
  227. return
  228. except Exception as e:
  229. traceback.print_exc()
  230. print("receive result end", e)