Browse Source

fix: 修改socketio和pjsua阻塞问题

余尚辉 3 months ago
parent
commit
91e44dbbf5

+ 2 - 1
Dockerfile

@@ -18,6 +18,7 @@ RUN cd /code/alibabacloud-nls-python-sdk-1.0.2 && pip install -r requirements.tx
 RUN pip install -r requirements.txt
 
 EXPOSE 8000
-CMD ["gunicorn", "-w 1", "-b 0.0.0.0:8000", "src.core.server:app"]
+#CMD ["gunicorn", "-w 1", "-b 0.0.0.0:8000", "src.core.server:app"]
 #CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "--worker-class", "eventlet", "src.core.server:app"]
+CMD ["sh", "-c", "gunicorn -w 1 -b 0.0.0.0:8000 src.core.server:app & gunicorn -w 1 -b 0.0.0.0:8001 --worker-class gevent src.core.socket_server:app & wait"]
 #CMD ["tail", "-f", "/dev/null"]

+ 10 - 8
docker-compose.yml

@@ -1,6 +1,6 @@
 services:
   pjsua:
-    image: pjsua2:v2.14.1110
+    image: pjsua2:v2.14.1114
     container_name: pjsua
     restart: always
     network_mode: host
@@ -8,13 +8,15 @@ services:
       - /home/hongshan/voice-gateway-service:/code
     environment:
       - SERVE_HOST=192.168.100.159
-        #ports:
-        #- 8084:8084/udp
-        #- 8084:8084/tcp
-        #- 5000:5000/udp
-        #- 5000:5000/tcp
-        #- 8000:8000/udp
-        #- 8000:8000/tcp
+#    ports:
+#        - 8084:8084/udp
+#        - 8084:8084/tcp
+#        - 5008:5000/udp
+#        - 5008:5000/tcp
+#        - 8000:8000/udp
+#        - 8000:8000/tcp
+#        - 8001:8001/udp
+#        - 8001:8001/tcp
           #        - 5060:5060/udp
           #        - 5060:5060/tcp
           #        - 5080:5080/tcp

+ 1 - 1
requirements.txt

@@ -6,5 +6,5 @@ apscheduler
 gunicorn
 pymysql
 redis
-eventlet
+gevent
 flask_sqlalchemy

+ 11 - 1
src/core/__init__.py

@@ -8,4 +8,14 @@ def singleton(cls):
             _instance[cls] = cls()
         return _instance[cls]
 
-    return inner
+    return inner
+
+def singleton_keys(cls):
+    _instances = {}
+
+    def wrapper(*args, **kwargs):
+        if cls not in _instances:
+            _instances[cls] = cls(*args, **kwargs)  # Pass the arguments to the class constructor
+        return _instances[cls]
+
+    return wrapper

+ 1 - 1
src/core/callcenter/esl/client.py

@@ -30,7 +30,7 @@ class InboundClient:
 
     def __init__(self, agent, logger):
         self.con = None
-        self.thread_num = 32
+        self.thread_num = 12
         self.is_stopping = False
         self.logger = logger
         self.bot_agent = agent

+ 24 - 17
src/core/callcenter/push.py

@@ -1,17 +1,23 @@
 #!/usr/bin/env python3
 # encoding:utf-8
-import json
-from src.core.callcenter.enumeration import AgentScene, AgentServiceState, WorkStatus, DownEvent
-from src.core.callcenter.ws import common_down_data, common_down_cmd
 
+from src.core.callcenter.enumeration import AgentScene, AgentServiceState, WorkStatus, DownEvent
 from src.core.callcenter.dao import *
+from src.core.datasource import RedisHandler
+
 class PushHandler:
     def __init__(self, logger):
         self.logger = logger
-
-    # def push_on_agent_report(self, saas_id, out_id, scene: AgentScene, service_state: AgentServiceState):
-    #     pass
-
+    def push_to_socket_service(self,user_id, data, event='common_down_data'):
+        # 创建发布的消息
+        message = json.dumps({
+            'event': event,
+            'user_id': user_id,
+            'data': data
+        })
+        # 获取 RedisHandler 实例并发布消息到 Redis 频道
+        redis_handler = RedisHandler()
+        redis_handler.publish('socket_channel', message)
     def push_on_agent_work_report(self, saas_id, flow_id, user_id, call_id, scene: AgentScene, work_status: WorkStatus, description=""):
         data = {
             'eventName': DownEvent.ON_AGENT_WORK_REPORT.code,
@@ -23,23 +29,24 @@ class PushHandler:
                     }
         }
         self.logger.info("flowId:[%s] OnAgentWorkReport push:[%s].", flow_id, json.dumps(data))
-        new_data = {'data':json.dumps(data)}
-        common_down_data(user_id, json.dumps(new_data))
+        new_data = {'data': json.dumps(data)}
+        self.push_to_socket_service(user_id, json.dumps(new_data))
     def push_on_agent_report(self, saas_id, out_id, scene: AgentScene, service_state: AgentServiceState):
        pass
 
-    def do_push_on_agent_report(self, saas_id, out_id, scene: AgentScene, service_state: AgentServiceState):
+    def do_push_on_agent_report(self, saas_id, user_id, scene: AgentScene, service_state: AgentServiceState):
         data = {
             'eventName': DownEvent.ON_AGENT_REPORT.code,
             'ext': {
                 'scene': scene.code,
-                '_shortagentid': out_id,
+                '_shortagentid': user_id,
                 '_astate': service_state.code,
                 '_atime': datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
             }
         }
         self.logger.info("OnAgentReport event triger:", json.dumps(data))
-        common_down_data(out_id, data)
+        new_data = {'data': json.dumps(data)}
+        self.push_to_socket_service(user_id, json.dumps(new_data))
 
     def push_on_call_ring(self, saas_id, flow_id, user_id,scene, call_id, file_name, calling_no, called_no, human_service_id):
         data = {
@@ -57,7 +64,7 @@ class PushHandler:
         }
         self.logger.info("flowId:[%s] OnAgentWorkReport push:[%s].", flow_id, json.dumps(data))
         new_data = {'data': json.dumps(data)}
-        common_down_data(user_id, json.dumps(new_data))
+        self.push_to_socket_service(user_id, json.dumps(new_data))
 
     def push_on_call_end(self, saas_id, flow_id, user_id,  scene, service_direct=None, disconnect_type=0):
         data = {
@@ -71,7 +78,7 @@ class PushHandler:
         }
         self.logger.info("flowId:[%s] OnAgentWorkReport push:[%s].", flow_id, json.dumps(data))
         new_data = {'data': json.dumps(data)}
-        common_down_data(user_id, json.dumps(new_data))
+        self.push_to_socket_service(user_id, json.dumps(new_data))
 
     def push_on_ring_start(self, saas_id, flow_id, user_id, scene, call_id=None):
         data = {
@@ -84,7 +91,7 @@ class PushHandler:
         }
         self.logger.info("flowId:[%s] push_on_ring_start push:[%s].", flow_id, json.dumps(data))
         new_data = {'data': json.dumps(data)}
-        common_down_data(user_id, json.dumps(new_data))
+        self.push_to_socket_service(user_id, json.dumps(new_data))
 
     def push_on_ring_end(self, saas_id, flow_id, user_id, scene, call_id):
         data = {
@@ -97,7 +104,7 @@ class PushHandler:
         }
         self.logger.info("flowId:[%s] push_on_ring_end push:[%s].", flow_id, json.dumps(data))
         new_data = {'data': json.dumps(data)}
-        common_down_data(user_id, json.dumps(new_data))
+        self.push_to_socket_service(user_id, json.dumps(new_data))
 
     def push_answer_call(self, saas_id,flow_id, out_id, call_id, scene, service_direct,work_status,user_id ):
         data = {
@@ -112,4 +119,4 @@ class PushHandler:
         }
         self.logger.info("flowId:[%s] push_on_ring_end push:[%s].", flow_id, json.dumps(data))
         new_data = {'data': json.dumps(data)}
-        common_down_data(user_id, json.dumps(new_data))
+        self.push_to_socket_service(user_id, json.dumps(new_data))

+ 4 - 4
src/core/callcenter/views.py

@@ -10,13 +10,13 @@ from flask import Flask, request, render_template_string
 from src.core.callcenter.api import AgentCallRequest, AgentActionRequest, HangupCallRequest, \
     HumanServiceQueryRequest
 
-# from src.core.voip.bot import BotAgent
+from src.core.voip.bot import BotAgent
 from src.core.callcenter.call import CallService
 from src.core.callcenter.agent import AgentService, AgentOperService
 from src.core.callcenter.esl.client import InboundClient, OutboundClient
 
-# agent = BotAgent(app.logger)
-agent, _outbound_client = None, None
+agent = BotAgent(app.logger)
+# agent, inbound_client = None, None
 inbound_client = InboundClient(agent, app.logger)
 # outbound_client = OutboundClient(agent, app.logger)
 call_service = CallService(inbound_client, app.logger)
@@ -37,7 +37,7 @@ def index():
 <body>
     <h1>SocketIO Test</h1>
     <script>
-        var socket = io('/ws/cs-im');
+        var socket = io('http://localhost:8001/ws/cs-im');
         socket.on('response', function(msg) {
             alert(msg);
         });

+ 47 - 9
src/core/callcenter/ws.py

@@ -1,19 +1,57 @@
 #!/usr/bin/env python3
 # encoding:utf-8
 
-from src.core.callcenter.views import app
+# from src.core.callcenter.views import app
+from . import app
 from flask import request, session
 from flask_socketio import SocketIO, Namespace, join_room, leave_room, emit
-
-socketio = SocketIO(app,  async_mode='eventlet', cors_allowed_origins="*")
-
-
+from threading import Thread
+from src.core.datasource import RedisHandler
+import json
+
+socketio = SocketIO(app,  async_mode='gevent', cors_allowed_origins="*")
+
+redis_handler = RedisHandler()
+
+def listen_to_redis():
+    """监听 Redis 频道并将消息广播到 SocketIO 客户端"""
+    pubsub = redis_handler.redis.pubsub()
+    pubsub.subscribe('socket_channel')
+
+    for message in pubsub.listen():
+        if message['type'] == 'message':
+            data = json.loads(message['data'])
+            user_id = data.get('user_id')
+            event = data.get('event')
+            payload = data.get('data')
+            # 通过 emit 广播消息到用户所在的房间
+            # emit(event, payload, room=user_id, namespace='/ws/cs-im')
+            socketio.emit(event, payload, room=user_id, namespace='/ws/cs-im')
+
+# 启动 Redis 监听线程
+Thread(target=listen_to_redis).start()
+
+# def common_down_data(user_id, data, namespace='/ws/cs-im'):
+#     emit('common_down_data', data, room=user_id, namespace=namespace)
 def common_down_data(user_id, data, namespace='/ws/cs-im'):
-    emit('common_down_data', data, room=user_id, namespace=namespace)
-
-
+    """推送 common_down_data 消息"""
+    # 发布消息到 Redis 频道
+    redis_handler.publish('socket_channel', json.dumps({
+        'event': 'common_down_data',
+        'user_id': user_id,
+        'data': data
+    }))
 def common_down_cmd(user_id, data):
-    emit('common_down_cmd', data, room=user_id)
+    """推送 common_down_cmd 消息"""
+    # 发布消息到 Redis 频道
+    redis_handler.publish('socket_channel', json.dumps({
+        'event': 'common_down_cmd',
+        'user_id': user_id,
+        'data': data
+    }))
+
+# def common_down_cmd(user_id, data):
+#     emit('common_down_cmd', data, room=user_id)
 
 
 class MyNamespace(Namespace):

+ 3 - 0
src/core/datasource.py

@@ -209,6 +209,9 @@ class RedisHandler:
         if expire:
             self.redis.expire(newkey, expire)
 
+    def publish(self, channel, message):
+        """发布消息到指定频道"""
+        self.redis.publish(channel, message)
     def __del__(self):
         self.redis.close()
 

+ 3 - 3
src/core/server.py

@@ -2,10 +2,10 @@
 # encoding:utf-8
 
 from src.core.callcenter.views import app
-from src.core.callcenter.ws import socketio
+# from src.core.callcenter.ws import socketio
 # import multiprocessing
 
 if __name__ == '__main__':
     # multiprocessing.set_start_method('spawn')
-    socketio.run(app, host='0.0.0.0', port=8000, allow_unsafe_werkzeug=True, debug=True)
-    # app.run(host='0.0.0.0', port=8000, debug=True)
+    # socketio.run(app, host='0.0.0.0', port=8000, allow_unsafe_werkzeug=True, debug=True)
+    app.run(host='0.0.0.0', port=8000, debug=True)

+ 12 - 0
src/core/socket_server.py

@@ -0,0 +1,12 @@
+#!/usr/bin/env python3
+# encoding:utf-8
+
+# from src.core.callcenter.views import app
+from src.core.callcenter.ws import socketio, app
+
+
+if __name__ == '__main__':
+    socketio.run(app, host='0.0.0.0', port=8001, allow_unsafe_werkzeug=True, debug=True)
+    # app.run(host='0.0.0.0', port=8000, debug=True)
+
+

+ 2 - 1
src/core/voip/bot.py

@@ -17,6 +17,7 @@ from src.core.voip.constant import *
 import requests
 from src.core.callcenter.api import BotChatRequest,ChatMessage
 
+from src.core import singleton_keys
 
 calls = {}
 # recording_file = '/code/src/core/voip/incoming_call.wav'
@@ -437,7 +438,7 @@ class ToTextBotAgent:
             traceback.print_exc()  # 打印完整的错误信息
             return None
 
-
+@singleton_keys
 class BotAgent:
 
     def __init__(self, logger, user_part_range=range(1001, 1011), host=SIP_SERVER, port="5060", password="slibra@#123456"):