ws.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #!/usr/bin/env python3
  2. # encoding:utf-8
  3. # from src.core.callcenter.views import app
  4. from . import app
  5. from datetime import datetime
  6. from flask import request, session
  7. from flask_socketio import SocketIO, Namespace, join_room, leave_room, emit
  8. from threading import Thread
  9. from src.core.datasource import RedisHandler
  10. import json
  11. from .constant import CENTER_AGENT_HEARTBEAT
  12. socketio = SocketIO(app, async_mode='gevent', cors_allowed_origins="*")
  13. logger = app.logger
  14. redis_handler = RedisHandler()
  15. def listen_to_redis():
  16. """监听 Redis 频道并将消息广播到 SocketIO 客户端"""
  17. pubsub = redis_handler.redis.pubsub()
  18. pubsub.subscribe('socket_channel')
  19. for message in pubsub.listen():
  20. if message['type'] == 'message':
  21. data = json.loads(message['data'])
  22. user_id = data.get('user_id')
  23. event = data.get('event')
  24. payload = data.get('data')
  25. # 通过 emit 广播消息到用户所在的房间
  26. # emit(event, payload, room=user_id, namespace='/ws/cs-im')
  27. socketio.emit(event, payload, room=user_id, namespace='/ws/cs-im')
  28. # 启动 Redis 监听线程
  29. Thread(target=listen_to_redis).start()
  30. # def common_down_data(user_id, data, namespace='/ws/cs-im'):
  31. # emit('common_down_data', data, room=user_id, namespace=namespace)
  32. def common_down_data(user_id, data, namespace='/ws/cs-im'):
  33. """推送 common_down_data 消息"""
  34. # 发布消息到 Redis 频道
  35. redis_handler.publish('socket_channel', json.dumps({
  36. 'event': 'common_down_data',
  37. 'user_id': user_id,
  38. 'data': data
  39. }))
  40. def common_down_cmd(user_id, data):
  41. """推送 common_down_cmd 消息"""
  42. # 发布消息到 Redis 频道
  43. redis_handler.publish('socket_channel', json.dumps({
  44. 'event': 'common_down_cmd',
  45. 'user_id': user_id,
  46. 'data': data
  47. }))
  48. # def common_down_cmd(user_id, data):
  49. # emit('common_down_cmd', data, room=user_id)
  50. class MyNamespace(Namespace):
  51. def on_connect(self):
  52. logger.info('Client connected, %s', request.sid)
  53. def on_login(self, msg):
  54. logger.info('login.Received message: %s, %s', str(msg), request.sid)
  55. user_id = msg.get('userId')
  56. session['user_id'] = user_id
  57. join_room(user_id)
  58. emit('login', request.sid)
  59. def on_heartbeat(self, heartbeat):
  60. if not heartbeat:
  61. return
  62. data = json.loads(heartbeat)
  63. saas_id = data.get('saas_id')
  64. agent_id = data.get('agent_id')
  65. now = datetime.now().timestamp()
  66. key = CENTER_AGENT_HEARTBEAT % saas_id
  67. redis_handler.redis.hset(key, agent_id, now)
  68. # logger.info("on_heartbeat %s %s", agent_id, now)
  69. def on_OnPrompt(self, event):
  70. pass
  71. def on_OnSocketEvent(self, event):
  72. pass
  73. def on_OnSipEvent(self, event):
  74. pass
  75. def on_OnInitalFailure(self, event):
  76. pass
  77. def on_OnCallAnswer(self, msg):
  78. pass
  79. def on_disconnect(self):
  80. user_id = session.get('user_id')
  81. leave_room(user_id)
  82. logger.info('Client disconnected %s', user_id)
  83. socketio.on_namespace(MyNamespace('/ws/cs-im'))