asr.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import os
  2. import json
  3. import threading
  4. import nls # 引入阿里云语音识别库
  5. from aliyunsdkcore.client import AcsClient
  6. from aliyunsdkcore.request import CommonRequest
  7. import time
  8. # 获取Token的函数
  9. def get_token():
  10. ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
  11. ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
  12. client = AcsClient(ak_id, ak_secret, "cn-shanghai")
  13. request = CommonRequest()
  14. request.set_method('POST')
  15. request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
  16. request.set_version('2019-02-28')
  17. request.set_action_name('CreateToken')
  18. try:
  19. response = client.do_action_with_exception(request)
  20. jss = json.loads(response)
  21. if 'Token' in jss and 'Id' in jss['Token']:
  22. token = jss['Token']['Id']
  23. expire_time = jss['Token']['ExpireTime']
  24. print(f"Token: {token}, ExpireTime: {expire_time}")
  25. return token, int(expire_time) # 返回Token和过期时间
  26. else:
  27. print("Token获取失败,响应内容: ", response)
  28. except Exception as e:
  29. print(f"获取Token时发生错误: {e}")
  30. return None, None
  31. # WebSocket服务地址
  32. URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
  33. APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey
  34. # 定义实时转写类
  35. class TestSt:
  36. # 静态变量用于缓存Token
  37. token_cache = {
  38. "token": None,
  39. "expire_time": None
  40. }
  41. @classmethod
  42. def get_cached_token(cls):
  43. # 检查是否已有缓存的Token且未过期
  44. if cls.token_cache["token"] and cls.token_cache["expire_time"]:
  45. current_time = int(time.time())
  46. if current_time < cls.token_cache["expire_time"]:
  47. print("使用缓存的Token")
  48. return cls.token_cache["token"]
  49. # 如果没有缓存Token或者Token已过期,重新获取
  50. new_token, expire_time = get_token()
  51. if new_token:
  52. cls.token_cache["token"] = new_token
  53. cls.token_cache["expire_time"] = expire_time
  54. print("获取新的Token")
  55. return new_token
  56. else:
  57. print("无法获取Token")
  58. return None
  59. def __init__(self, tid):
  60. self.__th = threading.Thread(target=self.__test_run)
  61. self.__id = tid
  62. self._Token = self.get_cached_token()
  63. self.sr = None
  64. print("开始")
  65. def start(self):
  66. self.__th.start()
  67. def send_audio(self, audio_data):
  68. if self.sr:
  69. print("Sending audio data of length:", len(audio_data))
  70. self.sr.send_audio(audio_data)
  71. print("Audio data sent.")
  72. def __test_run(self):
  73. print("Thread:{} start..".format(self.__id))
  74. self.sr = nls.NlsSpeechTranscriber(
  75. url=URL,
  76. token=self._Token,
  77. appkey=APPKEY,
  78. on_error=self.test_on_error,
  79. on_result_changed=self.test_on_result_chg,
  80. on_completed=self.test_on_completed
  81. )
  82. self.sr.start(
  83. aformat="pcm",
  84. enable_intermediate_result=True,
  85. enable_punctuation_prediction=True,
  86. enable_inverse_text_normalization=True
  87. )
  88. print("ASR session started.")
  89. def test_on_error(self, message, *args):
  90. print(f"Error occurred: {message}")
  91. def test_on_result_chg(self, message, *args):
  92. print(f"Result changed: {message}")
  93. def test_on_completed(self, message, *args):
  94. print(f"Recognition completed: {message}")