#!/usr/bin/env python3 # encoding:utf-8 import os import time import json import wave import queue import threading import traceback import pjsua2 as pj from enum import Enum from src.core.voip.constant import * import requests from src.core.callcenter.model import BotChatRequest, ChatResponse,ChatMessage calls = {} # recording_file = '/code/src/core/voip/incoming_call.wav' # player_file1 = '/code/src/core/voip/test111.wav' # player_file2 = '/code/src/core/voip/test222.wav' class BotStatus(Enum): # 等待用户讲话,未超时 wait_speaking_not_timeout = 1 # 等待用户讲话,已超时但用户仍在持续讲话 wait_speaking_timeout_user_speaking = 2 # 机器人讲话过程中 speaking = 3 # 等待用户讲话结束 wait_user_speaking_end = 4 # 等待大模型输出 wait_gpt_return = 5 class UserStatus(Enum): # ASR返回值为sentenceBegin后将状态置为Speaking speaking = 1 # ASR返回值为sentenceEnd后,将状态置为Silence silence = 2 class MyAudioMediaPort(pj.AudioMediaPort): def __init__(self, call, aud_med=None, asr=None): pj.AudioMediaPort.__init__(self) # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav) # self.wav = wave.open(f"{recording_file}", "wb") # self.wav.setnchannels(1) # self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节) # self.wav.setframerate(16000) print("MyAudioMediaPort::come in, debugger") self.call = call self.aud_med = aud_med self.asr = asr self.user_asr_texts = [] self.cur_player_file = None self.wait_time = 0 self.play_start_time = None # 记录播放开始时间 self.play_complete_flag = False # 播放完成标志 def onFrameRequested(self, frame): print("Request audio frame:", frame) def onFrameReceived(self, frame): # self.wav.writeframes(bytes(frame.buf)) # print("Received audio frame:", frame.buf, frame.size) if self.asr: # 如果ASR实例存在,则发送音频数据 self.asr.send_audio(frame.buf) try: asr_text = self.get_asr_text() play_complete = self.is_play_complete() if asr_text and not play_complete: self.user_asr_texts.append(asr_text) if asr_text and play_complete: self.cur_player_file = None self.user_asr_texts.append(asr_text) user_asr_text = asr_text if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts) self.user_asr_texts.clear() self.call.chat("2",user_asr_text) #超时处理 current_time = time.time() if play_complete and not asr_text: print('ssdsdsd',play_complete,asr_text) self.wait_time_check(current_time) message_queue_size = self.call.message_queue.qsize() if (message_queue_size > 0 and not self.cur_player_file) or (message_queue_size > 0 and play_complete): print('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text) self.cur_player_file, self.wait_time= self.get_player_file() self.call.send_bot_speaker(self.cur_player_file) # 重置播放完成标志和超时计时器,确保新的播放从头开始计时 self.play_complete_flag = False # 重置播放完成标志 self.play_start_time = time.time() # 重新开始计时 except: pass def is_play_complete(self): if self.cur_player_file: player_id = murmur3_32(self.cur_player_file) return self.call.player_complete_dict.get(player_id) def get_asr_text(self): try: asr_text = self.call.user_asr_text_queue.get(block=False) print('get_asr_text:', asr_text) return asr_text except: pass def get_player_file(self): try: message = self.call.message_queue.get(block=False) player_file = [item.voice_url for item in message.contents if item.content_type == 'voice'] wait_time = message.wait_time return player_file, wait_time except Exception as e: print(f"sdsd: {e}") traceback.print_exc() def wait_time_check(self,current_time): if not hasattr(self, 'play_complete_flag'): self.play_complete_flag = False self.play_start_time = 0 # 播放完成后开始计时 if not self.play_complete_flag: self.play_complete_flag = True self.play_start_time = current_time # 检查超时时间是否已到 if current_time - self.play_start_time > int(self.wait_time): self.play_complete_flag = False # 重置标志位,避免重复超时 self.call.chat("6", "重新请求文本机器人") class MyAudioMediaPlayer(pj.AudioMediaPlayer): def __init__(self, player_id, sink, on_complete=None): pj.AudioMediaPlayer.__init__(self) self.player_id = player_id self.sink = sink self.on_complete = on_complete def onEof2(self): # self.stopTransmit(self.sink) if self.on_complete: self.on_complete(self.player_id) # Subclass to extend the Account and get notifications etc. class Account(pj.Account): def __init__(self, agent, user_part, **kwargs): pj.Account.__init__(self) self.agent = agent self.user_part = user_part self.calls = {} self.kwargs = kwargs def onRegState(self, prm): ai = self.getInfo() print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText) def onIncomingCall(self, prm): ai = self.getInfo() print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText) call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs) call_op_param = pj.CallOpParam(True) call_op_param.statusCode = pj.PJSIP_SC_OK call.answer(call_op_param) self.calls[prm.callId] = call class MyCall(pj.Call): def __init__(self, agent, acc, user_part, call_id, **kwargs): pj.Call.__init__(self, acc, call_id) self.agent = agent self.user_part = user_part self.call_id = call_id self.kwargs = kwargs self.aud_med = None self.audio_port = None self.player = None self.asr = None # self.scripts = build_demo_script() self.user_asr_text_queue = queue.Queue(maxsize=100) self.message_queue = queue.Queue(maxsize=3) self.player_complete_dict = {} from src.core.voip.asr import TestSt self.asr = TestSt(call_id, message_receiver=self.on_receiver_asr_result) # 创建ASR实例 self.asr.start() # 启动ASR线程 def onDtmfDigit(self, prm): digit = prm.digit print(f"Received DTMF digit: {digit}") self.user_asr_text_queue.put(f"DTMF({digit})DTMF") def onCallState(self, prm): call_info = self.getInfo() print("Call state: ", call_info.stateText) # pj.PJSIP_INV_STATE_NULL # pj.PJSIP_INV_STATE_CALLING # pj.PJSIP_INV_STATE_INCOMING # pj.PJSIP_INV_STATE_EARLY # pj.PJSIP_INV_STATE_CONNECTING # pj.PJSIP_INV_STATE_CONFIRMED # pj.PJSIP_INV_STATE_DISCONNECTED if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED: # 当呼叫状态为已确认(即接通) self.bot_say_hello() if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED: print("通话结束") # 远程挂机之后要将分机号回收 self.agent.release(self.user_part) def onCallMediaState(self, prm): call_info = self.getInfo() print("Call Media state: ", call_info.stateText) for media in call_info.media: if media.type == pj.PJMEDIA_TYPE_AUDIO and \ (media.status == pj.PJSUA_CALL_MEDIA_ACTIVE): print("Call Media state 111: ", call_info.stateText) self.aud_med = self.getAudioMedia(media.index) try: # 建立双向通道 self.receive_user_speaker() # self.bot_say_hello() except Exception as e: traceback.print_exc() def receive_user_speaker(self): self.audio_port = MyAudioMediaPort(self, self.aud_med, self.asr) self.audio_port.createPort("Incoming Call Port", build_audio_format()) self.aud_med.startTransmit(self.audio_port) def send_bot_speaker(self, player_file): if not player_file : return player_id = murmur3_32(player_file) self.player_complete_dict[player_id] = False print('self.player_complete_dict[player_id]D:', player_id, player_file, self.player_complete_dict[player_id]) self.player = MyAudioMediaPlayer(player_id, self.aud_med, on_complete=self.on_media_player_complete) # self.player.createPlayer(player_file, pj.PJMEDIA_FILE_NO_LOOP) self.player.createPlaylist(player_file, f'my_hello_playlist{player_id}', pj.PJMEDIA_FILE_NO_LOOP) self.player.startTransmit(self.aud_med) def on_receiver_asr_result(self, message, *args): print('asr返回内容:',message) self.user_asr_text_queue.put(message) def on_media_player_complete(self, player_id): print('player complete') self.player_complete_dict[player_id] = True def bot_say_hello(self): print('bot_say_hello, come in ') self.chat('1', user_asr_text="SAY_HELLO") def chat(self,event_type, user_asr_text=None): # 调用文本机器人接口 ToTextBotAgent(event_type,user_asr_text,self) class ToTextBotAgent: def __init__(self, event_type, user_asr_text, call_agent): if not user_asr_text: print("ASR文本为空,终止执行。") return self.call_agent = call_agent self.request_data = BotChatRequest( node_id="1", user_id="139311", session_id="1", record_id="2", task_id="ceshi", asr_text=user_asr_text ) # 发送请求并处理响应 self.test_request(self.request_data,event_type) def to_hang(self, action_content): user_part = self.call_agent.user_part_pool.get() self.call_agent.hangup(user_part, action_content) def handling_message(self, message: ChatMessage): print('handling_release', message) if message: action = message.action action_code = action.action_code if action_code == 'hang': # 挂断 action_content = action.action_content self.to_hang(action_content) elif action_code == 'transfer': # 转人工 print('todo 转人工') elif action_code == 'normal': # 正常通话 self.call_agent.message_queue.put(message) print(f"成功获取响应: ActionCode: {message.wait_time}") else: print("文本机器人接口调用失败") def to_quest(self, request: BotChatRequest): # 将实体类转换为JSON字符串 headers = {'Content-Type': 'application/json'} request_json = request.to_json_string() # 发送POST请求 try: response = requests.post('http://example.com/api', data=request_json, headers=headers) # 使用占位URL if response.status_code == 200: # 成功,解析响应JSON response_data = response.json() return ChatResponse.from_json(json.dumps(response_data)) # 转换为JSON字符串并解析 else: # 错误处理 print(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}") return None except requests.RequestException as e: traceback.print_exc() return None # 模拟接口请求返回 def test_request(self, params: BotChatRequest,event_type): print("test_request::params=", params) response_data = { "node_id": "1.0", "contents": [], "interruptable": False, "wait_time": "6", "action": { "action_code": "normal", "action_content": "正常通话" }, "talk_time_out": "" } print("event_type:", event_type) if event_type == '1': response_data['contents'].append({ "content_type": "voice", "content": "", "voice_url": '/code/src/core/voip/scripts/1_00.wav', "voice_content": "五一北京到上海的高铁票还有吗?" }) elif event_type == '2': new_contents = [ { "content_type": "voice", "content": "", "voice_url": '/code/src/core/voip/test111.wav', "voice_content": "测试第二个录音文件" }, { "content_type": "voice", "content": "", "voice_url": '/code/src/core/voip/test222.wav', "voice_content": "五一北京到上海的高铁票还有吗?" } ] response_data['contents']= new_contents elif event_type == '6': response_data['contents'].append({ "content_type": "voice", "content": "", "voice_url": '/code/src/core/voip/scripts/4_00.wav', "voice_content": "sds" }) try: print(json.dumps(response_data['contents'])) parsed_response = ChatMessage.from_json(response_data) self.handling_message(parsed_response) except Exception as e: print(f"Error in test_request: {e}") traceback.print_exc() # 打印完整的错误信息 return None class BotAgent: def __init__(self, logger, user_part_range=range(1001, 1011), host=SERVE_HOST, port="5060", password="slibra@#123456"): self.logger = logger self.user_part_range, self.host, self.port, self.password = user_part_range, host, port, password self.user_part_pool = queue.Queue(maxsize=len(user_part_range)) self.accounts = {} self.calls = {} self.ep = pj.Endpoint() self.is_stopping = False threading.Thread(target=self.create_pjsua2, args=()).start() def create_pjsua2(self): # Create and initialize the library ep_cfg = pj.EpConfig() ep_cfg.uaConfig.threadCnt = 0 ep_cfg.uaConfig.mainThreadOnly = True ep_cfg.uaConfig.maxCalls = 10 ep_cfg.uaConfig.maxAccounts = 10 ep_cfg.medConfig.noVad = True ep_cfg.logConfig.level = 4 ep_cfg.logConfig.consoleLevel = 4 self.ep.libCreate() self.ep.libInit(ep_cfg) aud_dev_mgr = self.ep.audDevManager() aud_dev_mgr.setNullDev() # 使用虚拟音频设备(如果没有实际设备) # Create SIP transport. Error handling sample is shown sipTpConfig = pj.TransportConfig() sipTpConfig.port = 30506 self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig) # Start the library self.ep.libStart() for user_part in self.user_part_range: acfg = pj.AccountConfig() acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}" acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}" cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password) acfg.sipConfig.authCreds.append(cred) acfg.regConfig.timeoutSec = 86400 # 注册超时时间(秒) acfg.regConfig.retryIntervalSec = 10 # 重试间隔时间(秒) acfg.regConfig.firstRetryIntervalSec = 10 # 首次重试间隔时间(秒) acfg.natConfig.iceEnabled = True acfg.natConfig.turnEnabled = True acfg.natConfig.turnServer = "stun:192.168.100.159:3478" # acfg.natConfig.turnUsername = "username" # acfg.natConfig.turnPassword = "password" # Create the account acc = Account(self, user_part) acc.create(acfg) self.user_part_pool.put(user_part) self.accounts[user_part] = acc while not self.is_stopping: self.ep.libHandleEvents(100) def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers): call_op_param = pj.CallOpParam(True) call_op_param.statusCode = pj.PJSIP_SC_OK call_op_param.reason = reason for k, v in sip_headers: call_op_param.headers.append(pj.SipHeader(f"sip_h_{k}", v)) acc = self.accounts.get(user_part) if acc: for k, v in acc.calls.items(): v.hangup(call_op_param) # 机器人主动挂机回收分机号 self.release(user_part) def register(self, **kwargs): user_part = self.user_part_pool.get() acc = self.accounts.get(user_part) self.logger.info('register, user_part :%d, pool.size :%d', user_part, self.user_part_pool.qsize()) if acc: print('register==========>', acc.getId()) # ps = pj.PresenceStatus() # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE # ps.activity = pj.PJRPID_ACTIVITY_AWAY # ps.note = "Away" # acc.setOnlineStatus(ps) # acc.setRegistration(renew=True) acc.kwargs = kwargs return user_part def unregister(self, user_part): acc = self.accounts.get(user_part) if acc: acc.setRegistration(renew=False) # 用户远程挂机回收分机号 self.release(user_part) def release(self, user_part): if not user_part: self.logger.info("release, user_part is None") return def element_in_queue(q, element): with q.mutex: # 确保线程安全 for item in list(q.queue): # 将队列转换为列表进行遍历 if item == element: return True return False if element_in_queue(self.user_part_pool, user_part): self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize()) return self.user_part_pool.put(user_part) self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize()) def destroy(self): self.is_stopping = True # Destroy the library self.ep.libDestroy() def __del__(self): self.destroy() # if __name__ == '__main__': # import logging # logger = logging.getLogger('voip') # bot = BotAgent(logger)