realtime_meeting.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. # Copyright (c) Alibaba, Inc. and its affiliates.
  2. import logging
  3. import uuid
  4. import json
  5. import threading
  6. from nls.core import NlsCore
  7. from . import logging
  8. from . import util
  9. from nls.exception import (StartTimeoutException,
  10. StopTimeoutException,
  11. InvalidParameter)
  12. __REALTIME_MEETING_NAMESPACE__ = 'SpeechTranscriber'
  13. __REALTIME_MEETING_REQUEST_CMD__ = {
  14. 'start': 'StartTranscription',
  15. 'stop': 'StopTranscription'
  16. }
  17. __all__ = ['NlsRealtimeMeeting']
  18. class NlsRealtimeMeeting:
  19. """
  20. Api for realtime meeting
  21. """
  22. def __init__(self,
  23. url=None,
  24. on_start=None,
  25. on_sentence_begin=None,
  26. on_sentence_end=None,
  27. on_result_changed=None,
  28. on_result_translated=None,
  29. on_completed=None,
  30. on_error=None,
  31. on_close=None,
  32. callback_args=[]):
  33. '''
  34. NlsRealtimeMeeting initialization
  35. Parameters:
  36. -----------
  37. url: str
  38. meeting join url.
  39. on_start: function
  40. Callback object which is called when recognition started.
  41. on_start has two arguments.
  42. The 1st argument is message which is a json format string.
  43. The 2nd argument is *args which is callback_args.
  44. on_sentence_begin: function
  45. Callback object which is called when one sentence started.
  46. on_sentence_begin has two arguments.
  47. The 1st argument is message which is a json format string.
  48. The 2nd argument is *args which is callback_args.
  49. on_sentence_end: function
  50. Callback object which is called when sentence is end.
  51. on_sentence_end has two arguments.
  52. The 1st argument is message which is a json format string.
  53. The 2nd argument is *args which is callback_args.
  54. on_result_changed: function
  55. Callback object which is called when partial recognition result
  56. arrived.
  57. on_result_changed has two arguments.
  58. The 1st argument is message which is a json format string.
  59. The 2nd argument is *args which is callback_args.
  60. on_result_translated: function
  61. Callback object which is called when partial translation result
  62. arrived.
  63. on_result_translated has two arguments.
  64. The 1st argument is message which is a json format string.
  65. The 2nd argument is *args which is callback_args.
  66. on_completed: function
  67. Callback object which is called when recognition is completed.
  68. on_completed has two arguments.
  69. The 1st argument is message which is a json format string.
  70. The 2nd argument is *args which is callback_args.
  71. on_error: function
  72. Callback object which is called when any error occurs.
  73. on_error has two arguments.
  74. The 1st argument is message which is a json format string.
  75. The 2nd argument is *args which is callback_args.
  76. on_close: function
  77. Callback object which is called when connection closed.
  78. on_close has one arguments.
  79. The 1st argument is *args which is callback_args.
  80. callback_args: list
  81. callback_args will return in callbacks above for *args.
  82. '''
  83. if not url:
  84. raise InvalidParameter('Must provide url')
  85. self.__response_handler__ = {
  86. 'SentenceBegin': self.__sentence_begin,
  87. 'SentenceEnd': self.__sentence_end,
  88. 'TranscriptionStarted': self.__transcription_started,
  89. 'TranscriptionResultChanged': self.__transcription_result_changed,
  90. 'ResultTranslated': self.__transcription_result_translated,
  91. 'TranscriptionCompleted': self.__transcription_completed,
  92. 'TaskFailed': self.__task_failed
  93. }
  94. self.__callback_args = callback_args
  95. self.__url = url
  96. self.__start_cond = threading.Condition()
  97. self.__start_flag = False
  98. self.__on_start = on_start
  99. self.__on_sentence_begin = on_sentence_begin
  100. self.__on_sentence_end = on_sentence_end
  101. self.__on_result_changed = on_result_changed
  102. self.__on_result_translated = on_result_translated
  103. self.__on_completed = on_completed
  104. self.__on_error = on_error
  105. self.__on_close = on_close
  106. def __handle_message(self, message):
  107. logging.debug('__handle_message {}'.format(message))
  108. try:
  109. __result = json.loads(message)
  110. if __result['header']['name'] in self.__response_handler__:
  111. __handler = self.__response_handler__[
  112. __result['header']['name']]
  113. __handler(message)
  114. else:
  115. logging.error('cannot handle cmd{}'.format(
  116. __result['header']['name']))
  117. return
  118. except json.JSONDecodeError:
  119. logging.error('cannot parse message:{}'.format(message))
  120. return
  121. def __tr_core_on_open(self):
  122. logging.debug('__tr_core_on_open')
  123. def __tr_core_on_msg(self, msg, *args):
  124. logging.debug('__tr_core_on_msg:msg={} args={}'.format(msg, args))
  125. self.__handle_message(msg)
  126. def __tr_core_on_error(self, msg, *args):
  127. logging.debug('__tr_core_on_error:msg={} args={}'.format(msg, args))
  128. with self.__start_cond:
  129. self.__start_flag = False
  130. self.__start_cond.notify()
  131. if self.__on_error:
  132. self.__on_error(msg, *self.__callback_args)
  133. def __tr_core_on_close(self):
  134. logging.debug('__tr_core_on_close')
  135. if self.__on_close:
  136. self.__on_close(*self.__callback_args)
  137. with self.__start_cond:
  138. self.__start_flag = False
  139. self.__start_cond.notify()
  140. def __sentence_begin(self, message):
  141. logging.debug('__sentence_begin')
  142. if self.__on_sentence_begin:
  143. self.__on_sentence_begin(message, *self.__callback_args)
  144. def __sentence_end(self, message):
  145. logging.debug('__sentence_end')
  146. if self.__on_sentence_end:
  147. self.__on_sentence_end(message, *self.__callback_args)
  148. def __transcription_started(self, message):
  149. logging.debug('__transcription_started')
  150. if self.__on_start:
  151. self.__on_start(message, *self.__callback_args)
  152. with self.__start_cond:
  153. self.__start_flag = True
  154. self.__start_cond.notify()
  155. def __transcription_result_changed(self, message):
  156. logging.debug('__transcription_result_changed')
  157. if self.__on_result_changed:
  158. self.__on_result_changed(message, *self.__callback_args)
  159. def __transcription_result_translated(self, message):
  160. logging.debug('__transcription_result_translated')
  161. if self.__on_result_translated:
  162. self.__on_result_translated(message, *self.__callback_args)
  163. def __transcription_completed(self, message):
  164. logging.debug('__transcription_completed')
  165. self.__nls.shutdown()
  166. logging.debug('__transcription_completed shutdown done')
  167. if self.__on_completed:
  168. self.__on_completed(message, *self.__callback_args)
  169. with self.__start_cond:
  170. self.__start_flag = False
  171. self.__start_cond.notify()
  172. def __task_failed(self, message):
  173. logging.debug('__task_failed')
  174. with self.__start_cond:
  175. self.__start_flag = False
  176. self.__start_cond.notify()
  177. if self.__on_error:
  178. self.__on_error(message, *self.__callback_args)
  179. def start(self,
  180. timeout=10,
  181. ping_interval=8,
  182. ping_timeout=None,
  183. ex:dict=None):
  184. """
  185. Realtime meeting start
  186. Parameters:
  187. -----------
  188. timeout: int
  189. wait timeout for connection setup
  190. ping_interval: int
  191. send ping interval, 0 for disable ping send, default is 8
  192. ping_timeout: int
  193. timeout after send ping and recive pong, set None for disable timeout check and default is None
  194. ex: dict
  195. dict which will merge into 'payload' field in request
  196. """
  197. self.__nls = NlsCore(
  198. url=self.__url,
  199. token='default',
  200. on_open=self.__tr_core_on_open,
  201. on_message=self.__tr_core_on_msg,
  202. on_close=self.__tr_core_on_close,
  203. on_error=self.__tr_core_on_error,
  204. callback_args=[])
  205. __id4 = uuid.uuid4().hex
  206. self.__task_id = uuid.uuid4().hex
  207. __header = {
  208. 'message_id': __id4,
  209. 'task_id': self.__task_id,
  210. 'namespace': __REALTIME_MEETING_NAMESPACE__,
  211. 'name': __REALTIME_MEETING_REQUEST_CMD__['start'],
  212. 'appkey': 'default'
  213. }
  214. __payload = {
  215. }
  216. if ex:
  217. __payload.update(ex)
  218. __msg = {
  219. 'header': __header,
  220. 'payload': __payload,
  221. 'context': util.GetDefaultContext()
  222. }
  223. __jmsg = json.dumps(__msg)
  224. with self.__start_cond:
  225. if self.__start_flag:
  226. logging.debug('already start...')
  227. return
  228. self.__nls.start(__jmsg, ping_interval, ping_timeout)
  229. if self.__start_flag == False:
  230. if self.__start_cond.wait(timeout):
  231. return
  232. else:
  233. raise StartTimeoutException(f'Waiting Start over {timeout}s')
  234. def stop(self, timeout=10):
  235. """
  236. Stop realtime meeting and mark session finished
  237. Parameters:
  238. -----------
  239. timeout: int
  240. timeout for waiting completed message from cloud
  241. """
  242. __id4 = uuid.uuid4().hex
  243. __header = {
  244. 'message_id': __id4,
  245. 'task_id': self.__task_id,
  246. 'namespace': __REALTIME_MEETING_NAMESPACE__,
  247. 'name': __REALTIME_MEETING_REQUEST_CMD__['stop'],
  248. 'appkey': 'default'
  249. }
  250. __msg = {
  251. 'header': __header,
  252. 'context': util.GetDefaultContext()
  253. }
  254. __jmsg = json.dumps(__msg)
  255. with self.__start_cond:
  256. if not self.__start_flag:
  257. logging.debug('not start yet...')
  258. return
  259. self.__nls.send(__jmsg, False)
  260. if self.__start_flag == True:
  261. logging.debug('stop wait..')
  262. if self.__start_cond.wait(timeout):
  263. return
  264. else:
  265. raise StopTimeoutException(f'Waiting stop over {timeout}s')
  266. def shutdown(self):
  267. """
  268. Shutdown connection immediately
  269. """
  270. self.__nls.shutdown()
  271. def send_audio(self, pcm_data):
  272. """
  273. Send audio binary, audio size prefer 20ms length
  274. Parameters:
  275. -----------
  276. pcm_data: bytes
  277. audio binary which format created by CreateTask
  278. """
  279. __data = pcm_data
  280. with self.__start_cond:
  281. if not self.__start_flag:
  282. return
  283. try:
  284. self.__nls.send(__data, True)
  285. except ConnectionResetError as __e:
  286. logging.error('connection reset')
  287. self.__start_flag = False
  288. self.__nls.shutdown()
  289. raise __e