#!/usr/bin/env python3 # encoding:utf-8 # from src.core.callcenter.views import app from . import app from datetime import datetime from flask import request, session from flask_socketio import SocketIO, Namespace, join_room, leave_room, emit from threading import Thread from src.core.datasource import RedisHandler import json from .constant import CENTER_AGENT_HEARTBEAT socketio = SocketIO(app, async_mode='gevent', cors_allowed_origins="*") logger = app.logger redis_handler = RedisHandler() def listen_to_redis(): """监听 Redis 频道并将消息广播到 SocketIO 客户端""" pubsub = redis_handler.redis.pubsub() pubsub.subscribe('socket_channel') for message in pubsub.listen(): if message['type'] == 'message': data = json.loads(message['data']) user_id = data.get('user_id') event = data.get('event') payload = data.get('data') # 通过 emit 广播消息到用户所在的房间 # emit(event, payload, room=user_id, namespace='/ws/cs-im') socketio.emit(event, payload, room=user_id, namespace='/ws/cs-im') # 启动 Redis 监听线程 Thread(target=listen_to_redis).start() # def common_down_data(user_id, data, namespace='/ws/cs-im'): # emit('common_down_data', data, room=user_id, namespace=namespace) def common_down_data(user_id, data, namespace='/ws/cs-im'): """推送 common_down_data 消息""" # 发布消息到 Redis 频道 redis_handler.publish('socket_channel', json.dumps({ 'event': 'common_down_data', 'user_id': user_id, 'data': data })) def common_down_cmd(user_id, data): """推送 common_down_cmd 消息""" # 发布消息到 Redis 频道 redis_handler.publish('socket_channel', json.dumps({ 'event': 'common_down_cmd', 'user_id': user_id, 'data': data })) # def common_down_cmd(user_id, data): # emit('common_down_cmd', data, room=user_id) class MyNamespace(Namespace): def on_connect(self): logger.info('Client connected, %s', request.sid) def on_login(self, msg): logger.info('login.Received message: %s, %s', str(msg), request.sid) user_id = msg.get('userId') session['user_id'] = user_id join_room(user_id) emit('login', request.sid) def on_heartbeat(self, heartbeat): if not heartbeat: return data = json.loads(heartbeat) saas_id = data.get('saas_id') agent_id = data.get('agent_id') now = datetime.now().timestamp() key = CENTER_AGENT_HEARTBEAT % saas_id redis_handler.redis.hset(key, agent_id, now) # logger.info("on_heartbeat %s %s", agent_id, now) def on_OnPrompt(self, event): pass def on_OnSocketEvent(self, event): pass def on_OnSipEvent(self, event): pass def on_OnInitalFailure(self, event): pass def on_OnCallAnswer(self, msg): pass def on_disconnect(self): user_id = session.get('user_id') leave_room(user_id) logger.info('Client disconnected %s', user_id) socketio.on_namespace(MyNamespace('/ws/cs-im'))