core.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. # Copyright (c) Alibaba, Inc. and its affiliates.
  2. import logging
  3. import threading
  4. from enum import Enum, unique
  5. from queue import Queue
  6. from . import logging, token, websocket
  7. from .exception import InvalidParameter, ConnectionTimeout, ConnectionUnavailable
  8. __URL__ = 'wss://nls-gateway.cn-shanghai.aliyuncs.com/ws/v1'
  9. __HEADER__ = [
  10. 'Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==',
  11. 'Sec-WebSocket-Version: 13',
  12. ]
  13. __FORMAT__ = '%(asctime)s - %(levelname)s - %(message)s'
  14. #__all__ = ['NlsCore']
  15. def core_on_msg(ws, message, args):
  16. logging.debug('core_on_msg:{}'.format(message))
  17. if not args:
  18. logging.error('callback core_on_msg with null args')
  19. return
  20. nls = args[0]
  21. nls._NlsCore__issue_callback('on_message', [message])
  22. def core_on_error(ws, message, args):
  23. logging.debug('core_on_error:{}'.format(message))
  24. if not args:
  25. logging.error('callback core_on_error with null args')
  26. return
  27. nls = args[0]
  28. nls._NlsCore__issue_callback('on_error', [message])
  29. def core_on_close(ws, close_status_code, close_msg, args):
  30. logging.debug('core_on_close')
  31. if not args:
  32. logging.error('callback core_on_close with null args')
  33. return
  34. nls = args[0]
  35. nls._NlsCore__issue_callback('on_close')
  36. def core_on_open(ws, args):
  37. logging.debug('core_on_open:{}'.format(args))
  38. if not args:
  39. logging.debug('callback with null args')
  40. ws.close()
  41. elif len(args) != 2:
  42. logging.debug('callback args not 2')
  43. ws.close()
  44. nls = args[0]
  45. nls._NlsCore__notify_on_open()
  46. nls.start(args[1], nls._NlsCore__ping_interval, nls._NlsCore__ping_timeout)
  47. nls._NlsCore__issue_callback('on_open')
  48. def core_on_data(ws, data, opcode, flag, args):
  49. logging.debug('core_on_data opcode={}'.format(opcode))
  50. if not args:
  51. logging.error('callback core_on_data with null args')
  52. return
  53. nls = args[0]
  54. nls._NlsCore__issue_callback('on_data', [data, opcode, flag])
  55. @unique
  56. class NlsConnectionStatus(Enum):
  57. Disconnected = 0
  58. Connected = 1
  59. class NlsCore:
  60. """
  61. NlsCore
  62. """
  63. def __init__(self,
  64. url=__URL__,
  65. token=None,
  66. on_open=None, on_message=None, on_close=None,
  67. on_error=None, on_data=None, asynch=False, callback_args=[]):
  68. self.__url = url
  69. self.__async = asynch
  70. if not token:
  71. raise InvalidParameter('Must provide a valid token!')
  72. else:
  73. self.__token = token
  74. self.__callbacks = {}
  75. if on_open:
  76. self.__callbacks['on_open'] = on_open
  77. if on_message:
  78. self.__callbacks['on_message'] = on_message
  79. if on_close:
  80. self.__callbacks['on_close'] = on_close
  81. if on_error:
  82. self.__callbacks['on_error'] = on_error
  83. if on_data:
  84. self.__callbacks['on_data'] = on_data
  85. if not on_open and not on_message and not on_close and not on_error:
  86. raise InvalidParameter('Must provide at least one callback')
  87. logging.debug('callback args:{}'.format(callback_args))
  88. self.__callback_args = callback_args
  89. self.__header = __HEADER__ + ['X-NLS-Token: {}'.format(self.__token)]
  90. websocket.enableTrace(True)
  91. self.__ws = websocket.WebSocketApp(self.__url,
  92. self.__header,
  93. on_message=core_on_msg,
  94. on_data=core_on_data,
  95. on_error=core_on_error,
  96. on_close=core_on_close,
  97. callback_args=[self])
  98. self.__ws.on_open = core_on_open
  99. self.__lock = threading.Lock()
  100. self.__cond = threading.Condition()
  101. self.__connection_status = NlsConnectionStatus.Disconnected
  102. def start(self, msg, ping_interval, ping_timeout):
  103. self.__lock.acquire()
  104. self.__ping_interval = ping_interval
  105. self.__ping_timeout = ping_timeout
  106. if self.__connection_status == NlsConnectionStatus.Disconnected:
  107. self.__ws.update_args(self, msg)
  108. self.__lock.release()
  109. self.__connect_before_start(ping_interval, ping_timeout)
  110. else:
  111. self.__lock.release()
  112. self.__ws.send(msg)
  113. def __notify_on_open(self):
  114. logging.debug('notify on open')
  115. with self.__cond:
  116. self.__connection_status = NlsConnectionStatus.Connected
  117. self.__cond.notify()
  118. def __issue_callback(self, which, exargs=[]):
  119. if which not in self.__callbacks:
  120. logging.error('no such callback:{}'.format(which))
  121. return
  122. if which is 'on_close':
  123. with self.__cond:
  124. self.__connection_status = NlsConnectionStatus.Disconnected
  125. self.__cond.notify()
  126. args = exargs+self.__callback_args
  127. self.__callbacks[which](*args)
  128. def send(self, msg, binary):
  129. self.__lock.acquire()
  130. if self.__connection_status == NlsConnectionStatus.Disconnected:
  131. self.__lock.release()
  132. logging.error('start before send')
  133. raise ConnectionUnavailable('Must call start before send!')
  134. else:
  135. self.__lock.release()
  136. if binary:
  137. self.__ws.send(msg, opcode=websocket.ABNF.OPCODE_BINARY)
  138. else:
  139. logging.debug('send {}'.format(msg))
  140. self.__ws.send(msg)
  141. def shutdown(self):
  142. self.__ws.close()
  143. def __run(self, ping_interval, ping_timeout):
  144. logging.debug('ws run...')
  145. self.__ws.run_forever(ping_interval=ping_interval,
  146. ping_timeout=ping_timeout)
  147. with self.__lock:
  148. self.__connection_status = NlsConnectionStatus.Disconnected
  149. logging.debug('ws exit...')
  150. def __connect_before_start(self, ping_interval, ping_timeout):
  151. with self.__cond:
  152. self.__th = threading.Thread(target=self.__run,
  153. args=[ping_interval, ping_timeout])
  154. self.__th.start()
  155. if self.__connection_status == NlsConnectionStatus.Disconnected:
  156. logging.debug('wait cond wakeup')
  157. if not self.__async:
  158. if self.__cond.wait(timeout=10):
  159. logging.debug('wakeup without timeout')
  160. return self.__connection_status == NlsConnectionStatus.Connected
  161. else:
  162. logging.debug('wakeup with timeout')
  163. raise ConnectionTimeout('Wait response timeout! Please check local network!')