asr.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import os
  4. import json
  5. import threading
  6. import traceback
  7. import nls # 引入阿里云语音识别库
  8. from aliyunsdkcore.client import AcsClient
  9. from aliyunsdkcore.request import CommonRequest
  10. import time
  11. # 获取Token的函数
  12. def get_token():
  13. ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
  14. ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
  15. client = AcsClient(ak_id, ak_secret, "cn-shanghai")
  16. request = CommonRequest()
  17. request.set_method('POST')
  18. request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
  19. request.set_version('2019-02-28')
  20. request.set_action_name('CreateToken')
  21. try:
  22. response = client.do_action_with_exception(request)
  23. jss = json.loads(response)
  24. if 'Token' in jss and 'Id' in jss['Token']:
  25. token = jss['Token']['Id']
  26. expire_time = jss['Token']['ExpireTime']
  27. print(f"Token: {token}, ExpireTime: {expire_time}")
  28. return token, int(expire_time) # 返回Token和过期时间
  29. else:
  30. print("Token获取失败,响应内容: ", response)
  31. except Exception as e:
  32. print(f"获取Token时发生错误: {e}")
  33. return None, None
  34. # WebSocket服务地址
  35. URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
  36. APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey
  37. # 定义实时转写类
  38. class TestSt:
  39. # 静态变量用于缓存Token
  40. token_cache = {
  41. "token": None,
  42. "expire_time": None
  43. }
  44. @classmethod
  45. def get_cached_token(cls):
  46. # 检查是否已有缓存的Token且未过期s):
  47. # # 检查是否已有缓存的Token且未
  48. current_time = int(time.time())
  49. # if cls.token_cache["token"] and cls.token_cache["expire_time"]:
  50. if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60:
  51. # if current_time < cls.token_cache["expire_time"]:
  52. # print("使用缓存的Token")
  53. return cls.token_cache["token"]
  54. # 如果没有缓存Token或者Token已过期,重新获取
  55. new_token, expire_time = get_token()
  56. if new_token:
  57. cls.token_cache["token"] = new_token
  58. cls.token_cache["expire_time"] = expire_time
  59. print("获取新的Token")
  60. return new_token
  61. else:
  62. print("无法获取Token")
  63. return None
  64. def __init__(self, tid, logger, message_receiver=None):
  65. # self.is_closed = False
  66. # self.lock = threading.Lock()
  67. self.logger = logger
  68. self.__event = threading.Event()
  69. self.__th = threading.Thread(target=self.__test_run)
  70. self.__id = tid
  71. self.message_receiver = message_receiver
  72. self._Token = self.get_cached_token()
  73. self.sr = None
  74. self.logger.debug("开始")
  75. def start(self):
  76. self.__th.start()
  77. def send_audio(self, audio_data):
  78. if self.sr:
  79. self.sr.send_audio(audio_data)
  80. def close(self):
  81. try:
  82. self.sr.stop()
  83. except Exception as e:
  84. self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}")
  85. def __test_run(self):
  86. self.logger.debug("Thread:%s start..",self.__id)
  87. nls.enableTrace(True)
  88. count = 0
  89. self.__event.clear()
  90. while not self.__event.is_set():
  91. self.sr = nls.NlsSpeechTranscriber(
  92. url=URL,
  93. token=self._Token,
  94. appkey=APPKEY,
  95. on_sentence_begin=self.test_on_sentence_begin,
  96. on_sentence_end=self.test_on_sentence_end,
  97. on_start=self.test_on_start,
  98. on_result_changed=self.test_on_result_chg,
  99. on_completed=self.test_on_completed,
  100. on_error=self.test_on_error,
  101. on_close=self.test_on_close,
  102. callback_args=[self.__id]
  103. )
  104. try:
  105. self.sr.start(
  106. aformat="pcm",
  107. sample_rate=8000,
  108. enable_intermediate_result=True,
  109. enable_punctuation_prediction=True,
  110. enable_inverse_text_normalization=True
  111. )
  112. self.sr.ctrl(ex={'max_sentence_silence': '2000ms', 'disfluency': True,'enable_words': True })
  113. self.logger.debug(f"[{self.__id}]ASR session started. {count}")
  114. self.__event.wait(timeout=.5)
  115. self.logger.debug(f"[{self.__id}]ASR session started. {count}")
  116. except Exception as e:
  117. traceback.print_exc()
  118. self.logger.debug(f"[{self.__id}]ASR session start exception. {e}")
  119. count = count + 1
  120. def test_on_sentence_begin(self, message, *args):
  121. self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message)
  122. if self.message_receiver:
  123. self.message_receiver(message, *args)
  124. def test_on_sentence_end(self, message, *args):
  125. self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message)
  126. if self.message_receiver:
  127. self.message_receiver(message, *args)
  128. def test_on_start(self, message, *args):
  129. self.__event.set()
  130. self.logger.debug("[%s]test_on_start:%s", self.__id, message)
  131. pass
  132. def test_on_error(self, message, *args):
  133. self.logger.debug("on_error args=>%s", args)
  134. if not self.__event.is_set():
  135. self.__event.set()
  136. if self.message_receiver:
  137. self.message_receiver(message, *args)
  138. def test_on_close(self, *args):
  139. self.logger.debug("on_close: args=>%s", args)
  140. if not self.__event.is_set():
  141. self.__event.set()
  142. pass
  143. def test_on_result_chg(self, message, *args):
  144. # self.logger.debug("test_on_chg:{}".format(message))
  145. if self.message_receiver:
  146. self.message_receiver(message, *args)
  147. def test_on_completed(self, message, *args):
  148. # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message))
  149. pass