asr.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import os
  4. import json
  5. import threading
  6. import traceback
  7. import nls # 引入阿里云语音识别库
  8. from aliyunsdkcore.client import AcsClient
  9. from aliyunsdkcore.request import CommonRequest
  10. import time
  11. # 获取Token的函数
  12. def get_token():
  13. ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
  14. ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
  15. client = AcsClient(ak_id, ak_secret, "cn-shanghai")
  16. request = CommonRequest()
  17. request.set_method('POST')
  18. request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
  19. request.set_version('2019-02-28')
  20. request.set_action_name('CreateToken')
  21. try:
  22. response = client.do_action_with_exception(request)
  23. jss = json.loads(response)
  24. if 'Token' in jss and 'Id' in jss['Token']:
  25. token = jss['Token']['Id']
  26. expire_time = jss['Token']['ExpireTime']
  27. print(f"Token: {token}, ExpireTime: {expire_time}")
  28. return token, int(expire_time) # 返回Token和过期时间
  29. else:
  30. print("Token获取失败,响应内容: ", response)
  31. except Exception as e:
  32. print(f"获取Token时发生错误: {e}")
  33. return None, None
  34. # WebSocket服务地址
  35. URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
  36. APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey
  37. # 定义实时转写类
  38. class TestSt:
  39. # 静态变量用于缓存Token
  40. token_cache = {
  41. "token": None,
  42. "expire_time": None
  43. }
  44. @classmethod
  45. def get_cached_token(cls):
  46. # 检查是否已有缓存的Token且未过期s):
  47. # # 检查是否已有缓存的Token且未
  48. current_time = int(time.time())
  49. # if cls.token_cache["token"] and cls.token_cache["expire_time"]:
  50. if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60:
  51. # if current_time < cls.token_cache["expire_time"]:
  52. # print("使用缓存的Token")
  53. return cls.token_cache["token"]
  54. # 如果没有缓存Token或者Token已过期,重新获取
  55. new_token, expire_time = get_token()
  56. if new_token:
  57. cls.token_cache["token"] = new_token
  58. cls.token_cache["expire_time"] = expire_time
  59. print("获取新的Token")
  60. return new_token
  61. else:
  62. print("无法获取Token")
  63. return None
  64. def __init__(self, tid, logger, message_receiver=None):
  65. # self.is_closed = False
  66. # self.lock = threading.Lock()
  67. self.logger = logger
  68. self.__event = threading.Event()
  69. self.__th = threading.Thread(target=self.__test_run)
  70. self.__id = tid
  71. self.message_receiver = message_receiver
  72. self._Token = self.get_cached_token()
  73. self.sr = None
  74. self.logger.debug("开始")
  75. def start(self):
  76. self.__th.start()
  77. def send_audio(self, audio_data):
  78. if self.sr:
  79. self.sr.send_audio(audio_data)
  80. def close(self):
  81. try:
  82. self.sr.stop()
  83. except Exception as e:
  84. self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}")
  85. def __test_run(self):
  86. self.logger.debug("Thread:%s start..",self.__id)
  87. nls.enableTrace(True)
  88. count = 0
  89. self.__event.clear()
  90. while not self.__event.is_set():
  91. self.sr = nls.NlsSpeechTranscriber(
  92. url=URL,
  93. token=self._Token,
  94. appkey=APPKEY,
  95. on_sentence_begin=self.test_on_sentence_begin,
  96. on_sentence_end=self.test_on_sentence_end,
  97. on_start=self.test_on_start,
  98. on_result_changed=self.test_on_result_chg,
  99. on_completed=self.test_on_completed,
  100. on_error=self.test_on_error,
  101. on_close=self.test_on_close,
  102. callback_args=[self.__id]
  103. )
  104. try:
  105. self.sr.start(
  106. aformat="pcm",
  107. sample_rate=8000,
  108. enable_intermediate_result=True,
  109. enable_punctuation_prediction=True,
  110. enable_inverse_text_normalization=True,
  111. ex={'max_sentence_silence': 2000, 'disfluency': True, 'enable_words': True}
  112. )
  113. # _res = self.sr.ctrl(ex={'max_sentence_silence': 6000, 'disfluency': True,'enable_words': True })
  114. self.logger.debug(f"[{self.__id}]ASR session started. {count}")
  115. self.__event.wait(timeout=.5)
  116. self.logger.debug(f"[{self.__id}]ASR session started. {count}")
  117. except Exception as e:
  118. traceback.print_exc()
  119. self.logger.debug(f"[{self.__id}]ASR session start exception. {e}")
  120. count = count + 1
  121. def test_on_sentence_begin(self, message, *args):
  122. self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message)
  123. if self.message_receiver:
  124. self.message_receiver(message, *args)
  125. def test_on_sentence_end(self, message, *args):
  126. self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message)
  127. if self.message_receiver:
  128. self.message_receiver(message, *args)
  129. def test_on_start(self, message, *args):
  130. self.__event.set()
  131. self.logger.debug("[%s]test_on_start:%s", self.__id, message)
  132. pass
  133. def test_on_error(self, message, *args):
  134. self.logger.debug("on_error args=>%s", args)
  135. if not self.__event.is_set():
  136. self.__event.set()
  137. if self.message_receiver:
  138. self.message_receiver(message, *args)
  139. def test_on_close(self, *args):
  140. self.logger.debug("on_close: args=>%s", args)
  141. if not self.__event.is_set():
  142. self.__event.set()
  143. pass
  144. def test_on_result_chg(self, message, *args):
  145. # self.logger.debug("test_on_chg:{}".format(message))
  146. if self.message_receiver:
  147. self.message_receiver(message, *args)
  148. def test_on_completed(self, message, *args):
  149. # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message))
  150. pass