real_time_voice.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #! /usr/bin/env python
  2. # coding=utf-8
  3. import os
  4. import time
  5. import json
  6. from aliyunsdkcore.client import AcsClient
  7. from aliyunsdkcore.request import CommonRequest
  8. import threading
  9. import sys
  10. import nls # 引入阿里云语音识别库
  11. def gettoken ():
  12. ak_id = os.getenv('ALIYUN_AK_ID')
  13. ak_secret = os.getenv('ALIYUN_AK_SECRET')
  14. print(f"AK_ID: {ak_id}, AK_SECRET: {ak_secret}")
  15. # 创建AcsClient实例
  16. client = AcsClient(
  17. ak_id,
  18. ak_secret,
  19. "cn-shanghai"
  20. );
  21. # 创建request,并设置参数。
  22. request = CommonRequest()
  23. request.set_method('POST')
  24. request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
  25. request.set_version('2019-02-28')
  26. request.set_action_name('CreateToken')
  27. try:
  28. response = client.do_action_with_exception(request)
  29. print(response)
  30. jss = json.loads(response)
  31. if 'Token' in jss and 'Id' in jss['Token']:
  32. token = jss['Token']['Id']
  33. expireTime = jss['Token']['ExpireTime']
  34. print("token = " + token)
  35. print("expireTime = " + str(expireTime))
  36. return token
  37. except Exception as e:
  38. print(e)
  39. # WebSocket服务地址,提供语音转写服务
  40. URL = "wss://nls-gateway-cn-shanghai.aliyuncs.com/ws/v1"
  41. # 在实际使用中,需替换成你自己的TOKEN和APPKEY
  42. TOKEN = gettoken() # 参考https://help.aliyun.com/document_detail/450255.html获取token
  43. print("token=" + TOKEN)
  44. APPKEY = "OKt6jogp6fRjHQVp" # 获取Appkey请前往控制台:https://nls-portal.console.aliyun.com/applist
  45. # 定义测试类,用于处理音频实时转写
  46. class TestSt:
  47. def __init__(self, tid, test_file):
  48. # 初始化测试线程
  49. self.__th = threading.Thread(target=self.__test_run)
  50. self.__id = tid # 线程ID
  51. self.__test_file = test_file # 测试音频文件路径
  52. def loadfile(self, filename):
  53. # 加载音频文件,读取成二进制数据
  54. with open(filename, "rb") as f:
  55. self.__data = f.read()
  56. def start(self):
  57. # 加载文件并启动线程
  58. self.loadfile(self.__test_file)
  59. self.__th.start()
  60. # 回调函数:句子开始时调用
  61. def test_on_sentence_begin(self, message, *args):
  62. print("test_on_sentence_begin:{}".format(message))
  63. # 回调函数:句子结束时调用
  64. def test_on_sentence_end(self, message, *args):
  65. print("test_on_sentence_end:{}".format(message))
  66. # 回调函数:任务开始时调用
  67. def test_on_start(self, message, *args):
  68. print("test_on_start:{}".format(message))
  69. # 回调函数:发生错误时调用
  70. def test_on_error(self, message, *args):
  71. print("on_error args=>{}".format(args))
  72. # 回调函数:任务关闭时调用
  73. def test_on_close(self, *args):
  74. print("on_close: args=>{}".format(args))
  75. # 回调函数:中间结果变化时调用
  76. def test_on_result_chg(self, message, *args):
  77. print("test_on_chg:{}".format(message))
  78. # 回调函数:任务完成时调用
  79. def test_on_completed(self, message, *args):
  80. print("on_completed:args=>{} message=>{}".format(args, message))
  81. # 私有方法:运行测试
  82. def __test_run(self):
  83. print("thread:{} start..".format(self.__id))
  84. # 初始化语音转写对象,绑定回调函数
  85. sr = nls.NlsSpeechTranscriber(
  86. url=URL,
  87. token=TOKEN,
  88. appkey=APPKEY,
  89. on_sentence_begin=self.test_on_sentence_begin,
  90. on_sentence_end=self.test_on_sentence_end,
  91. on_start=self.test_on_start,
  92. on_result_changed=self.test_on_result_chg,
  93. on_completed=self.test_on_completed,
  94. on_error=self.test_on_error,
  95. on_close=self.test_on_close,
  96. callback_args=[self.__id]
  97. )
  98. print("{}: session start".format(self.__id))
  99. # 启动转写会话,设置音频格式和相关选项
  100. r = sr.start(
  101. aformat="pcm",
  102. enable_intermediate_result=True,
  103. enable_punctuation_prediction=True,
  104. enable_inverse_text_normalization=True
  105. )
  106. # 每次读取640字节的音频数据并发送
  107. self.__slices = zip(*(iter(self.__data),) * 640)
  108. for i in self.__slices:
  109. sr.send_audio(bytes(i))
  110. time.sleep(0.01) # 模拟实时发送音频数据,避免数据过快发送
  111. # 控制传输,发送自定义的控制数据
  112. sr.ctrl(ex={"test": "tttt"})
  113. time.sleep(1) # 等待一段时间
  114. # 停止转写会话
  115. r = sr.stop()
  116. print("{}: sr stopped:{}".format(self.__id, r))
  117. time.sleep(1)
  118. # 多线程测试函数,默认创建500个线程
  119. def multiruntest(num=500):
  120. for i in range(0, num):
  121. name = "thread" + str(i) # 每个线程的名称
  122. t = TestSt(name, "./test111.wav") # 创建TestSt实例,指定音频文件路径
  123. t.start() # 启动线程
  124. # 禁用详细日志
  125. nls.enableTrace(False)
  126. # 运行测试,启动1个线程进行测试
  127. multiruntest(1)