#!/usr/bin/env python3 # encoding:utf-8 import os import time import json import wave import queue import threading import traceback import sys import pjsua2 as pj from enum import Enum from src.core.callcenter.cache import Cache from src.core.datasource import SIP_SERVER, SERVE_HOST from src.core.voip.constant import * import requests from src.core.callcenter.api import BotChatRequest,ChatMessage from src.core import singleton_keys from src.core.callcenter.snowflake import Snowflake calls = {} # recording_file = '/code/src/core/voip/incoming_call.wav' # player_file1 = '/code/src/core/voip/test111.wav' # player_file2 = '/code/src/core/voip/test222.wav' class BotStatus(Enum): # 等待用户讲话,未超时 wait_speaking_not_timeout = 1 # 等待用户讲话,已超时但用户仍在持续讲话 wait_speaking_timeout_user_speaking = 2 # 机器人讲话过程中 speaking = 3 # 等待用户讲话结束 wait_user_speaking_end = 4 # 等待大模型输出 wait_gpt_return = 5 class UserStatus(Enum): # ASR返回值为sentenceBegin后将状态置为Speaking speaking = 1 # ASR返回值为sentenceEnd后,将状态置为Silence silence = 2 class MyAudioMediaPort(pj.AudioMediaPort): def __init__(self, call, aud_med=None, asr=None): pj.AudioMediaPort.__init__(self) # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav) # self.wav = wave.open(f"{recording_file}", "wb") # self.wav.setnchannels(1) # self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节) # self.wav.setframerate(16000) print("MyAudioMediaPort::come in, debugger") self.call = call self.aud_med = aud_med self.asr = asr self.user_asr_texts = [] def onFrameRequested(self, frame): print("Request audio frame:", frame) def onFrameReceived(self, frame): # self.wav.writeframes(bytes(frame.buf)) # print("Received audio frame:", frame.buf, frame.size) if self.asr: # 如果ASR实例存在,则发送音频数据 self.asr.send_audio(frame.buf) try: asr_text = self.get_asr_text() play_complete = self.call.is_play_complete() current_time = time.time() # 实时当前时间 if self.call.inputType == '1.0': time_difference = int(current_time - self.call.inputLongStart) # print('current_time - self.call.inputLongStart:',time_difference > 35, self.call.txtLock , play_complete) if time_difference > 35 and play_complete: self.user_asr_texts.append(f"DTMF({self.call.digit})DTMF") user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts) self.user_asr_texts.clear() self.call.chat(user_asr_text) # print("测试超长", user_asr_text) elif asr_text: self.user_asr_texts.append(asr_text) if time_difference > int(self.call.wait_time): self.call.reset_wait_time() else: if asr_text and not play_complete: self.user_asr_texts.append(asr_text) if (asr_text and play_complete) or (play_complete and self.user_asr_texts): if asr_text: self.user_asr_texts.append(asr_text) user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts) self.user_asr_texts.clear() self.call.chat(user_asr_text) if self.call.wait_time and self.call.wait_time != "0" and play_complete and not asr_text: self.call.wait_time_check(current_time, self.call.wait_time) message_queue_size = self.call.message_queue.qsize() if (message_queue_size > 0 and not self.call.cur_player_file) or (message_queue_size > 0 and play_complete): print('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text) self.call.cur_player_file, self.call.wait_time, self.call.inputType,self.call.action, self.call.node_id = self.get_player_file() # 重置播放完成标志和超时计时器,确保新的播放从头开始计时 self.call.reset_wait_time() self.call.send_bot_speaker(self.call.cur_player_file) except: pass def get_asr_text(self): try: asr_text = self.call.user_asr_text_queue.get(block=False) print('get_asr_text:', asr_text) return asr_text except: pass def get_player_file(self): try: message = self.call.message_queue.get(block=False) player_file = [item.voice_url for item in message.contents if item.content_type == 'voice'] print('get_player_file:', player_file,message.wait_time, message.inputType, message.action, message.node_id) return player_file, message.wait_time, message.inputType, message.action, message.node_id except Exception as e: traceback.print_exc() class MyAudioMediaPlayer(pj.AudioMediaPlayer): def __init__(self, player_id, sink, on_complete=None): pj.AudioMediaPlayer.__init__(self) self.player_id = player_id self.sink = sink self.on_complete = on_complete def onEof2(self): # self.stopTransmit(self.sink) if self.on_complete: self.on_complete(self.player_id) # Subclass to extend the Account and get notifications etc. class Account(pj.Account): def __init__(self, agent, user_part, **kwargs): pj.Account.__init__(self) self.agent = agent self.user_part = user_part self.calls = {} self.kwargs = kwargs def onRegState(self, prm): ai = self.getInfo() print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText) def onIncomingCall(self, prm): ai = self.getInfo() print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText) call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs) call_op_param = pj.CallOpParam(True) call_op_param.statusCode = pj.PJSIP_SC_OK call.answer(call_op_param) self.calls[prm.callId] = call class MyCall(pj.Call): def __init__(self, agent, acc, user_part, call_id, **kwargs): pj.Call.__init__(self, acc, call_id) self.agent = agent self.user_part = user_part self.call_id = call_id self.kwargs = kwargs self.aud_med = None self.audio_port = None self.player = None self.asr = None self.session_id = kwargs.get('variable_sip_h_P-LIBRA-CallId') self.device_id = kwargs.get('variable_sip_h_P-LIBRA-DeviceId') print("self.session_id:", self.session_id) # self.scripts = build_demo_script() self.user_asr_text_queue = queue.Queue(maxsize=100) self.message_queue = queue.Queue(maxsize=3) self.player_complete_dict = {} self.wait_time = None self.inputType = None #记录按键类型 1为长按键类型 self.digit = '' # 存储长按键内容 self.action = None self.node_id= 'start' self.cur_player_file = None #当前播放的文件 from src.core.voip.asr import TestSt self.asr = TestSt(call_id, message_receiver=self.on_receiver_asr_result) # 创建ASR实例 self.asr.start() # 启动ASR线程 self.call_phone, self.callIdString = self.get_phone() # 超时设置 self.play_start_time = time.time() # 倒计时开始时间 self.play_complete_flag = False # 倒计时开始标志 self.txtLock = False self.inputLongStart = time.time() #长按键开始时间 def wait_time_check(self, current_time, wait_time): try: # 确保 wait_time 是整数类型 wait_time = int(wait_time) # 如果播放尚未完成,重置标志并返回 if not self.play_complete_flag: self.play_complete_flag = True self.play_start_time = current_time # print(f"开始计时: {self.play_start_time}") elapsed_time = int(current_time - self.play_start_time) # print(f"当前时间: {current_time}, 已过时间: {elapsed_time}, 最大等待时间: {wait_time}") if elapsed_time > wait_time: # self.user_asr_text_queue.put("ASR408error") self.chat('ASR408error') except ValueError as e: print(f"无效的等待时间参数: {wait_time}, 错误: {e}") self.reset_wait_time() def reset_wait_time(self): self.play_complete_flag = False # 重置播放完成标志 self.play_start_time = None # 重置开始计时时间 def get_phone(self): import re call_info = self.getInfo() match = re.match(r'"(\d+)" 0: if self.cache.get_need_play_hold_music(call_id): self.hangup(user_part) self.cache.del_need_play_hold_music(call_id) break time.sleep(1) def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers): call_op_param = pj.CallOpParam(True) call_op_param.statusCode = pj.PJSIP_SC_OK call_op_param.reason = reason call_op_param.txOption = pj.SipTxOption() sip_header_vector = pj.SipHeaderVector() for k, v in sip_headers.items(): _sip_header = pj.SipHeader() _sip_header.hName = str(k) _sip_header.hValue = str(v) print('hangup, header_name=%s, header_value=%s'%(k, v)) sip_header_vector.push_back(_sip_header) call_op_param.txOption.headers = sip_header_vector acc = self.accounts.get(user_part) if acc: for k, v in acc.calls.items(): v.hangup(call_op_param) # 机器人主动挂机回收分机号 self.release(user_part) def register(self, **kwargs): user_part = self.user_part_pool.get() acc = self.accounts.get(user_part) self.logger.info('register, user_part :%d, pool.size :%d', user_part, self.user_part_pool.qsize()) if acc: print('register==========>', acc.getId()) # ps = pj.PresenceStatus() # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE # ps.activity = pj.PJRPID_ACTIVITY_AWAY # ps.note = "Away" # acc.setOnlineStatus(ps) # acc.setRegistration(renew=True) acc.kwargs = kwargs return user_part def unregister(self, user_part): acc = self.accounts.get(user_part) if acc: acc.setRegistration(renew=False) # 用户远程挂机回收分机号 self.release(user_part) def release(self, user_part): if not user_part: self.logger.info("release, user_part is None") return def element_in_queue(q, element): with q.mutex: # 确保线程安全 for item in list(q.queue): # 将队列转换为列表进行遍历 if item == element: return True return False if element_in_queue(self.user_part_pool, user_part): self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize()) return self.user_part_pool.put(user_part) self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize()) def destroy(self): self.is_stopping = True # Destroy the library self.ep.libDestroy() def __del__(self): self.destroy() # if __name__ == '__main__': # import logging # logger = logging.getLogger('voip') # bot = BotAgent(logger)