asr.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. import os
  4. import json
  5. import threading
  6. import nls # 引入阿里云语音识别库
  7. from aliyunsdkcore.client import AcsClient
  8. from aliyunsdkcore.request import CommonRequest
  9. import time
  10. # 获取Token的函数
  11. def get_token():
  12. ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
  13. ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
  14. client = AcsClient(ak_id, ak_secret, "cn-shanghai")
  15. request = CommonRequest()
  16. request.set_method('POST')
  17. request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
  18. request.set_version('2019-02-28')
  19. request.set_action_name('CreateToken')
  20. try:
  21. response = client.do_action_with_exception(request)
  22. jss = json.loads(response)
  23. if 'Token' in jss and 'Id' in jss['Token']:
  24. token = jss['Token']['Id']
  25. expire_time = jss['Token']['ExpireTime']
  26. print(f"Token: {token}, ExpireTime: {expire_time}")
  27. return token, int(expire_time) # 返回Token和过期时间
  28. else:
  29. print("Token获取失败,响应内容: ", response)
  30. except Exception as e:
  31. print(f"获取Token时发生错误: {e}")
  32. return None, None
  33. # WebSocket服务地址
  34. URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
  35. APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey
  36. # 定义实时转写类
  37. class TestSt:
  38. # 静态变量用于缓存Token
  39. token_cache = {
  40. "token": None,
  41. "expire_time": None
  42. }
  43. @classmethod
  44. def get_cached_token(cls):
  45. # 检查是否已有缓存的Token且未过期s):
  46. # # 检查是否已有缓存的Token且未
  47. current_time = int(time.time())
  48. # if cls.token_cache["token"] and cls.token_cache["expire_time"]:
  49. if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60:
  50. # if current_time < cls.token_cache["expire_time"]:
  51. # print("使用缓存的Token")
  52. return cls.token_cache["token"]
  53. # 如果没有缓存Token或者Token已过期,重新获取
  54. new_token, expire_time = get_token()
  55. if new_token:
  56. cls.token_cache["token"] = new_token
  57. cls.token_cache["expire_time"] = expire_time
  58. print("获取新的Token")
  59. return new_token
  60. else:
  61. print("无法获取Token")
  62. return None
  63. def __init__(self, tid, message_receiver=None):
  64. self.is_closed = False
  65. self.lock = threading.Lock()
  66. self.__th = threading.Thread(target=self.__test_run)
  67. self.__id = tid
  68. self.message_receiver = message_receiver
  69. self._Token = self.get_cached_token()
  70. self.sr = None
  71. print("开始")
  72. def start(self):
  73. self.__th.start()
  74. def send_audio(self, audio_data):
  75. # if self.sr:
  76. # # print("Sending audio data of length:", len(audio_data))
  77. # self.sr.send_audio(audio_data)
  78. # print("Audio data sent.")
  79. if self.sr and not self.is_closed:
  80. with self.lock:
  81. try:
  82. self.sr.send_audio(audio_data)
  83. except Exception as e:
  84. print(f"Error sending audio: {e}")
  85. self.close()
  86. def close(self):
  87. with self.lock:
  88. if not self.is_closed:
  89. self.is_closed = True
  90. try:
  91. self.sr.stop()
  92. except Exception as e:
  93. print(f"Error stopping ASR: {e}")
  94. def __test_run(self):
  95. print("Thread:{} start..".format(self.__id))
  96. self.sr = nls.NlsSpeechTranscriber(
  97. url=URL,
  98. token=self._Token,
  99. appkey=APPKEY,
  100. on_sentence_begin=self.test_on_sentence_begin,
  101. on_sentence_end=self.test_on_sentence_end,
  102. on_start=self.test_on_start,
  103. on_result_changed=self.test_on_result_chg,
  104. on_completed=self.test_on_completed,
  105. on_error=self.test_on_error,
  106. on_close=self.test_on_close,
  107. callback_args=[self.__id]
  108. )
  109. self.sr.__setattr__("max_sentence_silence", 1200)
  110. self.sr.start(
  111. aformat="pcm",
  112. enable_intermediate_result=True,
  113. enable_punctuation_prediction=True,
  114. enable_inverse_text_normalization=True
  115. )
  116. print("ASR session started.")
  117. def test_on_sentence_begin(self, message, *args):
  118. # pass
  119. # print("test_on_sentence_begin:{}".format(message))
  120. if self.message_receiver:
  121. self.message_receiver(message, *args)
  122. def test_on_sentence_end(self, message, *args):
  123. # print("test_on_sentence_end:{}".format(message))
  124. if self.message_receiver:
  125. self.message_receiver(message, *args)
  126. def test_on_start(self, message, *args):
  127. # print("test_on_start:{}".format(message))
  128. pass
  129. def test_on_error(self, message, *args):
  130. # print("on_error args=>{}".format(args))
  131. if self.message_receiver:
  132. self.message_receiver(message, *args)
  133. def test_on_close(self, *args):
  134. # print("on_close: args=>{}".format(args))
  135. pass
  136. def test_on_result_chg(self, message, *args):
  137. # print("test_on_chg:{}".format(message))
  138. if self.message_receiver:
  139. self.message_receiver(message, *args)
  140. def test_on_completed(self, message, *args):
  141. # print("on_completed:args=>{} message=>{}".format(args, message))
  142. pass