#!/usr/bin/env python3 # encoding:utf-8 import os import time import json import wave import queue import threading import traceback import sys import pjsua2 as pj from enum import Enum from src.core.datasource import SIP_SERVER, SERVE_HOST from src.core.voip.constant import * import requests from src.core.callcenter.api import BotChatRequest,ChatMessage from src.core import singleton_keys from src.core.callcenter.snowflake import Snowflake calls = {} # recording_file = '/code/src/core/voip/incoming_call.wav' # player_file1 = '/code/src/core/voip/test111.wav' # player_file2 = '/code/src/core/voip/test222.wav' class BotStatus(Enum): # 等待用户讲话,未超时 wait_speaking_not_timeout = 1 # 等待用户讲话,已超时但用户仍在持续讲话 wait_speaking_timeout_user_speaking = 2 # 机器人讲话过程中 speaking = 3 # 等待用户讲话结束 wait_user_speaking_end = 4 # 等待大模型输出 wait_gpt_return = 5 class UserStatus(Enum): # ASR返回值为sentenceBegin后将状态置为Speaking speaking = 1 # ASR返回值为sentenceEnd后,将状态置为Silence silence = 2 class MyAudioMediaPort(pj.AudioMediaPort): def __init__(self, call, aud_med=None, asr=None): pj.AudioMediaPort.__init__(self) # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav) # self.wav = wave.open(f"{recording_file}", "wb") # self.wav.setnchannels(1) # self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节) # self.wav.setframerate(16000) print("MyAudioMediaPort::come in, debugger") self.call = call self.aud_med = aud_med self.asr = asr self.user_asr_texts = [] def onFrameRequested(self, frame): print("Request audio frame:", frame) def onFrameReceived(self, frame): # self.wav.writeframes(bytes(frame.buf)) # print("Received audio frame:", frame.buf, frame.size) if self.asr: # 如果ASR实例存在,则发送音频数据 self.asr.send_audio(frame.buf) try: asr_text = self.get_asr_text() play_complete = self.call.is_play_complete() current_time = time.time() # 实时当前时间 if self.call.inputType == '1.0': time_difference = int(current_time - self.call.inputLongStart) # print('current_time - self.call.inputLongStart:',time_difference > 35, self.call.txtLock , play_complete) if int(current_time - self.call.inputLongStart) > 35 and play_complete and asr_text: self.user_asr_texts.append(f"DTMF({self.call.digit})DTMF") user_asr_text = asr_text if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts) self.user_asr_texts.clear() self.call.chat(user_asr_text) # print("测试超长", user_asr_text) elif asr_text: self.user_asr_texts.append(asr_text) if time_difference > int(self.call.wait_time): self.call.reset_wait_time() else: if asr_text and not play_complete: self.user_asr_texts.append(asr_text) if (asr_text and play_complete) or (play_complete and self.user_asr_texts): if asr_text: # 等价于 if asr_text is not None and asr_text != "" self.user_asr_texts.append(asr_text) # self.user_asr_texts.append(asr_text) user_asr_text = asr_text if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts) self.user_asr_texts.clear() self.call.chat(user_asr_text) if self.call.wait_time and self.call.wait_time != "0" and play_complete and not asr_text: self.call.wait_time_check(current_time, self.call.wait_time) message_queue_size = self.call.message_queue.qsize() if (message_queue_size > 0 and not self.call.cur_player_file) or (message_queue_size > 0 and play_complete): print('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text) self.call.cur_player_file, self.call.wait_time, self.call.inputType,self.call.action, self.call.node_id = self.get_player_file() # 重置播放完成标志和超时计时器,确保新的播放从头开始计时 self.call.reset_wait_time() # self.call.txtLock = False self.call.send_bot_speaker(self.call.cur_player_file) except: pass def get_asr_text(self): try: asr_text = self.call.user_asr_text_queue.get(block=False) print('get_asr_text:', asr_text) return asr_text except: pass def get_player_file(self): try: message = self.call.message_queue.get(block=False) player_file = [item.voice_url for item in message.contents if item.content_type == 'voice'] print('get_player_file:', player_file,message.wait_time, message.inputType, message.action, message.node_id) return player_file, message.wait_time, message.inputType, message.action, message.node_id except Exception as e: traceback.print_exc() class MyAudioMediaPlayer(pj.AudioMediaPlayer): def __init__(self, player_id, sink, on_complete=None): pj.AudioMediaPlayer.__init__(self) self.player_id = player_id self.sink = sink self.on_complete = on_complete def onEof2(self): # self.stopTransmit(self.sink) if self.on_complete: self.on_complete(self.player_id) # Subclass to extend the Account and get notifications etc. class Account(pj.Account): def __init__(self, agent, user_part, **kwargs): pj.Account.__init__(self) self.agent = agent self.user_part = user_part self.calls = {} self.kwargs = kwargs def onRegState(self, prm): ai = self.getInfo() print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText) def onIncomingCall(self, prm): ai = self.getInfo() print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText) call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs) call_op_param = pj.CallOpParam(True) call_op_param.statusCode = pj.PJSIP_SC_OK call.answer(call_op_param) self.calls[prm.callId] = call class MyCall(pj.Call): def __init__(self, agent, acc, user_part, call_id, **kwargs): pj.Call.__init__(self, acc, call_id) self.agent = agent self.user_part = user_part self.call_id = call_id self.kwargs = kwargs self.aud_med = None self.audio_port = None self.player = None self.asr = None self.session_id = kwargs.get('variable_sip_h_P-LIBRA-CallId') print("self.session_id:", self.session_id) # self.scripts = build_demo_script() self.user_asr_text_queue = queue.Queue(maxsize=100) self.message_queue = queue.Queue(maxsize=3) self.player_complete_dict = {} self.wait_time = None self.inputType = None #记录按键类型 1为长按键类型 self.digit = '' # 存储长按键内容 self.action = None self.node_id= 'start' self.cur_player_file = None #当前播放的文件 from src.core.voip.asr import TestSt self.asr = TestSt(call_id, message_receiver=self.on_receiver_asr_result) # 创建ASR实例 self.asr.start() # 启动ASR线程 self.call_phone, self.callIdString = self.get_phone() # 超时设置 self.play_start_time = time.time() # 倒计时开始时间 self.play_complete_flag = False # 倒计时开始标志 self.txtLock = False self.inputLongStart = time.time() #长按键开始时间 def wait_time_check(self, current_time, wait_time): try: # 确保 wait_time 是整数类型 wait_time = int(wait_time) # 如果播放尚未完成,重置标志并返回 if not self.play_complete_flag: self.play_complete_flag = True self.play_start_time = current_time # print(f"开始计时: {self.play_start_time}") elapsed_time = int(current_time - self.play_start_time) print(f"当前时间: {current_time}, 已过时间: {elapsed_time}, 最大等待时间: {wait_time}") if elapsed_time >= wait_time: # self.user_asr_text_queue.put("ASR408error") self.chat('ASR408error') except ValueError as e: print(f"无效的等待时间参数: {wait_time}, 错误: {e}") self.reset_wait_time() def reset_wait_time(self): self.play_complete_flag = False # 重置播放完成标志 self.play_start_time = None # 重置开始计时时间 self.digit = '' def get_phone(self): import re call_info = self.getInfo() match = re.match(r'"(\d+)" ', acc.getId()) # ps = pj.PresenceStatus() # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE # ps.activity = pj.PJRPID_ACTIVITY_AWAY # ps.note = "Away" # acc.setOnlineStatus(ps) # acc.setRegistration(renew=True) acc.kwargs = kwargs return user_part def unregister(self, user_part): acc = self.accounts.get(user_part) if acc: acc.setRegistration(renew=False) # 用户远程挂机回收分机号 self.release(user_part) def release(self, user_part): if not user_part: self.logger.info("release, user_part is None") return def element_in_queue(q, element): with q.mutex: # 确保线程安全 for item in list(q.queue): # 将队列转换为列表进行遍历 if item == element: return True return False if element_in_queue(self.user_part_pool, user_part): self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize()) return self.user_part_pool.put(user_part) self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize()) def destroy(self): self.is_stopping = True # Destroy the library self.ep.libDestroy() def __del__(self): self.destroy() # if __name__ == '__main__': # import logging # logger = logging.getLogger('voip') # bot = BotAgent(logger)