123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- #!/usr/bin/env python3
- # encoding:utf-8
- # from src.core.callcenter.views import app
- from . import app
- from datetime import datetime
- from flask import request, session
- from flask_socketio import SocketIO, Namespace, join_room, leave_room, emit
- from threading import Thread
- from src.core.datasource import RedisHandler
- import json
- from .constant import CENTER_AGENT_HEARTBEAT
- socketio = SocketIO(app, async_mode='gevent', cors_allowed_origins="*")
- logger = app.logger
- redis_handler = RedisHandler()
- def listen_to_redis():
- """监听 Redis 频道并将消息广播到 SocketIO 客户端"""
- pubsub = redis_handler.redis.pubsub()
- pubsub.subscribe('socket_channel')
- for message in pubsub.listen():
- if message['type'] == 'message':
- data = json.loads(message['data'])
- user_id = data.get('user_id')
- event = data.get('event')
- payload = data.get('data')
- # 通过 emit 广播消息到用户所在的房间
- # emit(event, payload, room=user_id, namespace='/ws/cs-im')
- socketio.emit(event, payload, room=user_id, namespace='/ws/cs-im')
- # 启动 Redis 监听线程
- Thread(target=listen_to_redis).start()
- # def common_down_data(user_id, data, namespace='/ws/cs-im'):
- # emit('common_down_data', data, room=user_id, namespace=namespace)
- def common_down_data(user_id, data, namespace='/ws/cs-im'):
- """推送 common_down_data 消息"""
- # 发布消息到 Redis 频道
- redis_handler.publish('socket_channel', json.dumps({
- 'event': 'common_down_data',
- 'user_id': user_id,
- 'data': data
- }))
- def common_down_cmd(user_id, data):
- """推送 common_down_cmd 消息"""
- # 发布消息到 Redis 频道
- redis_handler.publish('socket_channel', json.dumps({
- 'event': 'common_down_cmd',
- 'user_id': user_id,
- 'data': data
- }))
- # def common_down_cmd(user_id, data):
- # emit('common_down_cmd', data, room=user_id)
- class MyNamespace(Namespace):
- def on_connect(self):
- logger.info('Client connected, %s', request.sid)
- def on_login(self, msg):
- logger.info('login.Received message: %s, %s', str(msg), request.sid)
- user_id = msg.get('userId')
- session['user_id'] = user_id
- join_room(user_id)
- emit('login', request.sid)
- def on_heartbeat(self, heartbeat):
- if not heartbeat:
- return
- data = json.loads(heartbeat)
- saas_id = data.get('saas_id')
- agent_id = data.get('agent_id')
- now = datetime.now().timestamp()
- key = CENTER_AGENT_HEARTBEAT % saas_id
- redis_handler.redis.hset(key, agent_id, now)
- # logger.info("on_heartbeat %s %s", agent_id, now)
- def on_OnPrompt(self, event):
- pass
- def on_OnSocketEvent(self, event):
- pass
- def on_OnSipEvent(self, event):
- pass
- def on_OnInitalFailure(self, event):
- pass
- def on_OnCallAnswer(self, msg):
- pass
- def on_disconnect(self):
- user_id = session.get('user_id')
- leave_room(user_id)
- logger.info('Client disconnected %s', user_id)
- socketio.on_namespace(MyNamespace('/ws/cs-im'))
|