Sfoglia il codice sorgente

voip语音机器人模块调试

刘威 6 mesi fa
parent
commit
1947ab59b1

+ 4 - 4
src/core/callcenter/acd.py

@@ -11,10 +11,10 @@ class AcdService:
     def __init__(self, client, logger):
         self.client = client
         self.logger = logger
-        self.call_service = CallService(client, logger)
-        scheduler = BackgroundScheduler()
-        scheduler.add_job(self.try_transfer_agent, 'interval', seconds=5, max_instances=1)
-        scheduler.start()
+        # self.call_service = CallService(client, logger)
+        # scheduler = BackgroundScheduler()
+        # scheduler.add_job(self.try_transfer_agent, 'interval', seconds=5, max_instances=1)
+        # scheduler.start()
 
     def transfer_to_agent(self, call_info: CallInfo, device_id, service_id):
         pass

+ 8 - 7
src/core/callcenter/esl/client.py

@@ -1,6 +1,7 @@
 #!/usr/bin/env python3
 # encoding:utf-8
 
+import json
 import random
 import ESL
 import time
@@ -28,7 +29,7 @@ class InboundClient:
         self.logger = logger
         self.handler_table = self.scan_esl_event_handlers()
         self.default_event_handler = DefaultEslEventHandler(self, self.logger)
-        self.host, self.port, self.password = 'localhost', '8021', '4918257983818884358'
+        self.host, self.port, self.password = '192.168.124.6', '8021', '4918257983818884358'
         self.executors = {x: concurrent.futures.ThreadPoolExecutor(max_workers=1) for x in range(self.thread_num)}
 
         threading.Thread(target=self.start, args=()).start()
@@ -56,24 +57,24 @@ class InboundClient:
         return handlers
 
     def start(self):
-        print('inbound.start')
+        self.logger.info('inbound.start')
         self.con = ESL.ESLconnection(self.host, self.port, self.password)
         if self.con.connected():
-            print('inbound esl connected ... ')
+            self.logger.info('inbound esl connected ... ')
             self.con.events('plain', 'all') #CHANNEL_ORIGINATE,CHANNEL_PROGRESS,CHANNEL_PROGRESS_MEDIA,CHANNEL_ANSWER,CHANNEL_HANGUP,CUSTOM,PLAYBACK_START,PLAYBACK_STOP,DETECTED_TONE
             while not self.is_stopping:
                 e = self.con.recvEvent()
                 # if e:
-                #     print(json.loads(e.serialize('json')))
-                    # print(e.serialize('json'))
+                #     self.logger.info(json.loads(e.serialize('json')))
                 event_name = e.getHeader("Event-Name")
                 if event_name == "SERVER_DISCONNECTED":
-                    print('come in SERVER_DISCONNECTED case')
+                    self.logger.info('come in SERVER_DISCONNECTED case')
                     self.con.disconnect()
                     time.sleep(3)
                     self.start()
                 else:
-                    self.choose_thread_pool_executor(e).submit(self.process_esl_event, e)
+                    threading.Thread(target=self.process_esl_event, args=(e,)).start()
+                    # self.choose_thread_pool_executor(e).submit(self.process_esl_event, e)
 
     def choose_thread_pool_executor(self, e):
         call_id = EslEventUtil.getCallId(e)

+ 1 - 1
src/core/callcenter/esl/handler/channel_park_handler.py

@@ -27,7 +27,7 @@ class ChannelParkHandler(EslEventHandler):
         print(f"Incoming call with UUID: {call_uuid}  {service} is transfer to {destination}")
         self.inbound_client.con.execute("transfer", "1001 XML default", call_uuid)
         # self.con.execute("answer", "", call_uuid)
-        # self.con.execute("bridge", "user/1001", call_uuid)
+        # self.inbound_client.con.execute("bridge", "user/1001", call_uuid)
 
         # destination = "user/1001"
         # msg = ESL.ESLevent("sendmsg", call_uuid)

+ 1 - 0
src/core/callcenter/esl/handler/default_esl_event_handler.py

@@ -13,4 +13,5 @@ class DefaultEslEventHandler(EslEventHandler):
 
     def handle(self, address, event, coreUUID):
         self.logger.info(json.loads(event.serialize('json')))
+        pass
 

+ 3 - 1
src/core/callcenter/web.py

@@ -9,6 +9,7 @@ from flask import Flask, render_template_string
 
 from src.core.callcenter.call import CallService
 from src.core.callcenter.model import MakeCallRequest, AgentInfo
+from src.core.voip.hello import Voip
 
 dictConfig({
         "version": 1,
@@ -21,7 +22,7 @@ dictConfig({
         "handlers": {
             "console": {
                 "class": "logging.StreamHandler",  # 控制台输出
-                "level": "INFO",
+                "level": "DEBUG",
                 "formatter": "default",
             },
             "log_file": {
@@ -45,6 +46,7 @@ dictConfig({
 app = Flask(__name__)
 app.config['SECRET_KEY'] = ''
 
+voip = Voip(app.logger)
 client = InboundClient(app.logger)
 call_service = CallService(client, app.logger)
 

+ 35 - 35
src/core/datasource.py

@@ -139,38 +139,38 @@ class RedisHandler:
         self.redis.close()
 
 
-def main():
-    redis_handler = RedisHandler()
-    print(redis_handler.redis.info())
-    redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333})
-
-    stream_name = 'stop_gen_stream'
-    consumer_group = 'stop_gen_group'
-    consumer_name = 'stop_gen_consumer'
-    # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
-    # # 从流中读取消息
-    # while True:
-    #     # 读取数据
-    #     data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'},
-    #                                                count=1, block=1000)
-    #     # 处理收到的消息
-    #     for stream, messages in data:
-    #         for message in messages:
-    #             message_id = message[0].decode('utf-8')
-    #             message_data = message[1]
-    #             print(f"Received message {message_id}: {message_data}")
-    #             # 在这里可以添加任何自定义的处理逻辑
-
-
-def main1():
-    mysql = MysqlHandler()
-    count = mysql.query("select count(0) as totals from t_xinyi_industry where TEST_TIME like '%00'")
-    print(count)
-
-    rows = mysql.query("select * from t_xinyi_industry where TEST_TIME like '%00' limit 10")
-    for idx, cols in enumerate(rows):
-        print(cols.get('TEST_TIME'), cols.get('JS_COD'))
-
-
-if __name__ == "__main__":
-    main()
+# def main():
+#     redis_handler = RedisHandler()
+#     print(redis_handler.redis.info())
+#     redis_handler.redis.xadd('stop_gen_stream', {'session_id': 3333333})
+#
+#     stream_name = 'stop_gen_stream'
+#     consumer_group = 'stop_gen_group'
+#     consumer_name = 'stop_gen_consumer'
+#     # # redis_handler.redis.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
+#     # # 从流中读取消息
+#     # while True:
+#     #     # 读取数据
+#     #     data = redis_handler.redis.xreadgroup(consumer_group, consumer_name, {stream_name: '>'},
+#     #                                                count=1, block=1000)
+#     #     # 处理收到的消息
+#     #     for stream, messages in data:
+#     #         for message in messages:
+#     #             message_id = message[0].decode('utf-8')
+#     #             message_data = message[1]
+#     #             print(f"Received message {message_id}: {message_data}")
+#     #             # 在这里可以添加任何自定义的处理逻辑
+#
+#
+# def main1():
+#     mysql = MysqlHandler()
+#     count = mysql.query("select count(0) as totals from t_xinyi_industry where TEST_TIME like '%00'")
+#     print(count)
+#
+#     rows = mysql.query("select * from t_xinyi_industry where TEST_TIME like '%00' limit 10")
+#     for idx, cols in enumerate(rows):
+#         print(cols.get('TEST_TIME'), cols.get('JS_COD'))
+#
+#
+# if __name__ == "__main__":
+#     main()

+ 1 - 163
src/core/server.py

@@ -2,174 +2,12 @@
 # encoding:utf-8
 
 import threading
-# from logging.config import dictConfig
-# from flask import Flask, render_template_string
-#
-#
-# dictConfig({
-#         "version": 1,
-#         "disable_existing_loggers": False,  # 不覆盖默认配置
-#         "formatters": {  # 日志输出样式
-#             "default": {
-#                 "format": "%(asctime)s - %(module)s.%(lineno)d - %(levelname)s - %(threadName)s: %(message)s"
-#             }
-#         },
-#         "handlers": {
-#             "console": {
-#                 "class": "logging.StreamHandler",  # 控制台输出
-#                 "level": "DEBUG",
-#                 "formatter": "default",
-#             },
-#             "log_file": {
-#                 "class": "logging.handlers.RotatingFileHandler",
-#                 "level": "INFO",
-#                 "formatter": "default",   # 日志输出样式对应formatters
-#                 "filename": "./logs/flask.log",  # 指定log文件目录
-#                 "maxBytes": 20*1024*1024,   # 文件最大20M
-#                 "backupCount": 10,          # 最多10个文件
-#                 "encoding": "utf8",         # 文件编码
-#             },
-#
-#         },
-#         "root": {
-#             "level": "DEBUG",  # # handler中的level会覆盖掉这里的level
-#             "handlers": ["console", "log_file"],
-#         },
-#     }
-# )
-#
-#
-# app = Flask(__name__)
-# app.config['SECRET_KEY'] = ''
-#
-#
-# @app.route('/')
-# def index():
-#     return render_template_string("""<!DOCTYPE html>
-# <html lang="en">
-# <head>
-#     <meta charset="UTF-8">
-#     <title>SocketIO Example</title>
-#     <script src="https://cdn.socket.io/4.0.0/socket.io.min.js"></script>
-# </head>
-# <body>
-#     <h1>SocketIO Test</h1>
-#     <script>
-#         var socket = io('/ws/cs-im');
-#         socket.on('response', function(msg) {
-#             alert(msg);
-#         });
-#
-#         socket.on('login', function(msg) {
-#             alert('Received from server: ' + msg);
-#         });
-#         socket.emit('login', {'appCode':'1111','userId':'1111','token':'1111'});
-#     </script>
-# </body>
-# </html>""")
-#
-#
-# @app.route('/open/agent/get-cdn-url', methods=['POST'])
-# def get_cdn_url():
-#     """获取cdn地址"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/get-init-config', methods=['POST'])
-# def get_init_config():
-#     """获取初始化配置"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/check-in', methods=['POST'])
-# def check_in():
-#     """坐席签入"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/check-out', methods=['POST'])
-# def check_out():
-#     """坐席签出"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/busy', methods=['POST'])
-# def busy():
-#     """坐席置忙"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/idle', methods=['POST'])
-# def idle():
-#     """坐席置闲"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/turn-on', methods=['POST'])
-# def turn_on():
-#     """接通"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/hang-up', methods=['POST'])
-# def hang_up():
-#     """挂断"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/agent-state', methods=['POST'])
-# def agent_state():
-#     """获取坐席状态"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/manual-call', methods=['POST'])
-# def manual_call():
-#     """外呼"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/manual-hang', methods=['POST'])
-# def manual_hang():
-#     """挂断"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/listen', methods=['POST'])
-# def listen():
-#     """发起监听"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/agent/reload-phone', methods=['POST'])
-# def reload_phone():
-#     """重新获取分机信息"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/monitor/load-agent-group-data', methods=['POST'])
-# def load_agent_group_data():
-#     """获取监控组成员信息"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/human-service/member-active', methods=['POST'])
-# def member_active():
-#     """机器人外呼-签入人工组"""
-#     return 'Hello World!'
-#
-#
-# @app.route('/open/num/generate', methods=['POST'])
-# def num_generate():
-#     """获取 cti 流程 ID"""
-#     return 'Hello World!'
+import traceback
 
 from src.core.callcenter.web import app
 from src.core.callcenter.ws import socketio
-# from src.core.voip.hello import pjsua2_test
 
 if __name__ == '__main__':
-    # threading.Thread(target=pjsua2_test, args=()).start()
     # out = OutboundClient()
     # threading.Thread(target=out.start, args=()).start()
     # threading.Thread(target=on.start, args=()).start()

+ 100 - 57
src/core/voip/hello.py

@@ -4,15 +4,15 @@
 import time
 import json
 import wave
+import queue
 import threading
 import traceback
 import pjsua2 as pj
 from enum import Enum
-from src.core.voip.asr import TestSt
 
 calls = {}
-recording_file = '/code/voip/incoming_call.wav'
-player_file = '/code/voip/test111.wav'
+recording_file = '/code/src/core/voip/incoming_call.wav'
+player_file = '/code/src/core/voip/test111.wav'
 
 
 class BotStatus(Enum):
@@ -37,8 +37,9 @@ class UserStatus(Enum):
 
 # Subclass to extend the Account and get notifications etc.
 class Account(pj.Account):
-    def __init__(self):
+    def __init__(self, user_agent):
         pj.Account.__init__(self)
+        self.user_agent = user_agent
 
     def onRegState(self, prm):
         print("***OnRegState: " + prm.reason)
@@ -50,7 +51,7 @@ class Account(pj.Account):
         call_op_param = pj.CallOpParam(True)
         call_op_param.statusCode = pj.PJSIP_SC_OK
         call.answer(call_op_param)
-        calls[prm.callId] = call
+        self.user_agent.calls[prm.callId] = call
 
 
 class MyAudioMediaPort(pj.AudioMediaPort):
@@ -83,6 +84,7 @@ class MyAudioMediaPlayer(pj.AudioMediaPlayer):
         print('player complete')
         self.stopTransmit(self.sink)
 
+
 class MyCall(pj.Call):
 
     def __init__(self, acc, call_id):
@@ -92,6 +94,7 @@ class MyCall(pj.Call):
         self.aud_med = None
         self.recorder = None
         self.player = None  # 用于播放录音的媒体播放器
+        from src.core.voip.asr import TestSt
         self.asr = TestSt(call_id)  # 创建ASR实例
         self.asr.start()  # 启动ASR线程
 
@@ -102,6 +105,15 @@ class MyCall(pj.Call):
     def onCallState(self, prm):
         call_info = self.getInfo()
         print("Call state: ", call_info.stateText)
+
+        # pj.PJSIP_INV_STATE_NULL
+        # pj.PJSIP_INV_STATE_CALLING
+        # pj.PJSIP_INV_STATE_INCOMING
+        # pj.PJSIP_INV_STATE_EARLY
+        # pj.PJSIP_INV_STATE_CONNECTING
+        # pj.PJSIP_INV_STATE_CONFIRMED
+        # pj.PJSIP_INV_STATE_DISCONNECTED
+
         # if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED:
         #     # 当呼叫状态为已确认(即接通)
         #     audio_media = self.getAudioMedia()  # 获取音频媒体对象
@@ -109,7 +121,6 @@ class MyCall(pj.Call):
         #     print ('11111111111')
         if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
             print("通话结束")
-            self.delete()
 
     def onCallMediaState(self, prm):
         call_info = self.getInfo()
@@ -169,66 +180,98 @@ class MyCall(pj.Call):
 
             # time.sleep(0.1)  # 每隔1秒更新一次进度
 
-    def stopMedia(self):
-        pass
-        # if self.player:
-        #     self.player.stopTransmit(self.aud_med)
-        #     self.player = None
-        # if self.recorder:
-        #     self.recorder.stopTransmit(self.aud_med)
-        #     self.recorder = None
-
-    def delete(self):
-        self.stopMedia()
-
-
-# pjsua2 test function
-def pjsua2_test():
-    # Create and initialize the library
-    ep = pj.Endpoint()
-    ep_cfg = pj.EpConfig()
-    try:
-        ep.libCreate()
-        ep.libInit(ep_cfg)
-
-        aud_dev_mgr = ep.audDevManager()
-        aud_dev_mgr.setNullDev()  # 使用虚拟音频设备(如果没有实际设备)
 
+class UserAgent:
+    def __init__(self, user_part, host="192.168.124.6", port="5060", password="slibra@#123456"):
+        self.user_part, self.host, self.port, self.password = user_part, host, port, password
+        self.calls = {}
+        self.ep = pj.Endpoint()
+        self.acc = Account(self)
+        self.is_stopping = False
+
+    def start(self):
+        threading.Thread(target=self.create_pjsua2, args=()).start()
+
+    # pjsua2 test function
+    def create_pjsua2(self):
+        # Create and initialize the library
+        ep_cfg = pj.EpConfig()
+        self.ep.libCreate()
+        self.ep.libInit(ep_cfg)
+
+        aud_dev_mgr = self.ep.audDevManager()
+        aud_dev_mgr.setNullDev()  # 使用虚拟音频设备(如果没有实际设备)
 
         # Create SIP transport. Error handling sample is shown
         sipTpConfig = pj.TransportConfig()
         sipTpConfig.port = 30506
-        ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
-        # Start the library
-        ep.libStart()
+        self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
 
+        # Start the library
+        self.ep.libStart()
         acfg = pj.AccountConfig()
-        acfg.idUri = "sip:1001@192.168.124.6:5060"
-        acfg.regConfig.registrarUri = "sip:192.168.124.6:5060"
-        cred = pj.AuthCredInfo("digest", "*", "1001", 0, "slibra@#123456")
-        # acfg.idUri = "sip:1013@test-bot.shuiditech.com:30506"
-        # acfg.regConfig.registrarUri = "sip:test-bot.shuiditech.com:30506"
-        # cred = pj.AuthCredInfo("digest", "*", "1013", 0, "000000")
+        acfg.idUri = f"sip:{self.user_part}@{self.host}:{self.port}"
+        acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
+        cred = pj.AuthCredInfo("digest", "*", f"{self.user_part}", 0, self.password)
         acfg.sipConfig.authCreds.append(cred)
         # Create the account
-        acc = Account()
-        acc.create(acfg)
-
-        try:
-            while True:
-                ep.libHandleEvents(100)
-        except KeyboardInterrupt:
-            print('Existing...')
-        # while True:
-        #     # Here we don't have anything else to do..
-        #     time.sleep(10);
-    finally:
+        self.acc.create(acfg)
+
+        while not self.is_stopping:
+            self.ep.libHandleEvents(100)
+
+    def hangup(self, reason="NORMAL_CLEARING", **sip_headers):
+        call_op_param = pj.CallOpParam(True)
+        call_op_param.statusCode = pj.PJSIP_SC_OK
+        call_op_param.reason = reason
+        for k, v in sip_headers:
+            call_op_param.headers.append(pj.SipHeader(f"sip_h_{k}", v))
+        for k, v in self.calls.items():
+            v.hangup(call_op_param)
+
+    def register(self, renew=False):
+        self.acc.setRegistration(renew=renew)
+
+    def destroy(self):
+        self.is_stopping = True
         # Destroy the library
-        ep.libDestroy()
+        self.ep.libDestroy()
+
+
+class UserAgentPool:
+    def __init__(self, pool_size, user_part_range=range(1000, 1019)):
+        self.pool_size = pool_size
+        self.pool = queue.Queue(maxsize=pool_size)
+
+        # 初始化连接池
+        for i in range(pool_size):
+            conn = self.create(user_part_range[i])
+            self.pool.put(conn)
+
+    def create(self, user_part, host='192.168.124.6', port='6379', password="slibra@#123456"):
+        return UserAgent(user_part, host=host, port=port, password=password)
+
+    def get(self):
+        return self.pool.get()
+
+    def release(self, conn):
+        self.pool.put(conn)
+
+    def destroy(self):
+        while not self.pool.empty():
+            agent = self.pool.get()
+            agent.destroy()
+
+
+class Voip:
+
+    def __init__(self, logger):
+        self.logger = logger
+        # self.pool = UserAgentPool(pool_size=2)
+        agent = UserAgent('1001')
+        agent.start()
 
 
-#
-# main()
-#
-if __name__ == "__main__":
-    pjsua2_test()
+# if __name__ == "__main__":
+#     voip = Voip()
+#     voip.create_pjsua2()

BIN
src/core/voip/incoming_call.wav