123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- #!/usr/bin/env python3
- # encoding:utf-8
- import os
- import time
- import json
- import wave
- import queue
- import threading
- import traceback
- import pjsua2 as pj
- from enum import Enum
- from src.core.voip.constant import *
- import requests
- from src.core.callcenter.model import BotChatRequest, ChatResponse,ChatMessage
- calls = {}
- # recording_file = '/code/src/core/voip/incoming_call.wav'
- # player_file1 = '/code/src/core/voip/test111.wav'
- # player_file2 = '/code/src/core/voip/test222.wav'
- class BotStatus(Enum):
- # 等待用户讲话,未超时
- wait_speaking_not_timeout = 1
- # 等待用户讲话,已超时但用户仍在持续讲话
- wait_speaking_timeout_user_speaking = 2
- # 机器人讲话过程中
- speaking = 3
- # 等待用户讲话结束
- wait_user_speaking_end = 4
- # 等待大模型输出
- wait_gpt_return = 5
- class UserStatus(Enum):
- # ASR返回值为sentenceBegin后将状态置为Speaking
- speaking = 1
- # ASR返回值为sentenceEnd后,将状态置为Silence
- silence = 2
- class MyAudioMediaPort(pj.AudioMediaPort):
- def __init__(self, call, aud_med=None, asr=None):
- pj.AudioMediaPort.__init__(self)
- # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav)
- # self.wav = wave.open(f"{recording_file}", "wb")
- # self.wav.setnchannels(1)
- # self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节)
- # self.wav.setframerate(16000)
- print("MyAudioMediaPort::come in, debugger")
- self.call = call
- self.aud_med = aud_med
- self.asr = asr
- self.user_asr_texts = []
- self.cur_player_file = None
- self.wait_time = None
- self.play_start_time = None # 记录播放开始时间
- self.play_complete_flag = False # 播放完成标志
- def onFrameRequested(self, frame):
- print("Request audio frame:", frame)
- def onFrameReceived(self, frame):
- # self.wav.writeframes(bytes(frame.buf))
- # print("Received audio frame:", frame.buf, frame.size)
- if self.asr: # 如果ASR实例存在,则发送音频数据
- self.asr.send_audio(frame.buf)
- try:
- asr_text = self.get_asr_text()
- play_complete = self.is_play_complete()
- if asr_text and not play_complete:
- self.user_asr_texts.append(asr_text)
- if asr_text and play_complete:
- self.cur_player_file = None
- self.user_asr_texts.append(asr_text)
- user_asr_text = asr_text if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
- self.user_asr_texts.clear()
- self.call.chat("2",user_asr_text)
- #超时处理
- current_time = time.time()
- if self.wait_time and self.wait_time != "0" and play_complete and not asr_text:
- print('ssdsdsd',play_complete,asr_text)
- self.wait_time_check(current_time)
- message_queue_size = self.call.message_queue.qsize()
- if (message_queue_size > 0 and not self.cur_player_file) or (message_queue_size > 0 and play_complete):
- print('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text)
- self.cur_player_file, self.wait_time= self.get_player_file()
- self.call.send_bot_speaker(self.cur_player_file)
- # 重置播放完成标志和超时计时器,确保新的播放从头开始计时
- self.play_complete_flag = False # 重置播放完成标志
- self.play_start_time = time.time() # 重新开始计时
- except:
- pass
- def is_play_complete(self):
- if self.cur_player_file:
- player_id = murmur3_32(self.cur_player_file)
- return self.call.player_complete_dict.get(player_id)
- def get_asr_text(self):
- try:
- asr_text = self.call.user_asr_text_queue.get(block=False)
- print('get_asr_text:', asr_text)
- return asr_text
- except:
- pass
- def get_player_file(self):
- try:
- message = self.call.message_queue.get(block=False)
- player_file = [item.voice_url for item in message.contents if item.content_type == 'voice']
- wait_time = message.wait_time
- return player_file, wait_time
- except Exception as e:
- print(f"sdsd: {e}")
- traceback.print_exc()
- def wait_time_check(self,current_time):
- if not hasattr(self, 'play_complete_flag'):
- self.play_complete_flag = False
- self.play_start_time = 0
- # 播放完成后开始计时
- if not self.play_complete_flag:
- self.play_complete_flag = True
- self.play_start_time = current_time
- # 检查超时时间是否已到
- if current_time - self.play_start_time > int(self.wait_time):
- self.play_complete_flag = False # 重置标志位,避免重复超时
- self.call.chat("6", "重新请求文本机器人")
- class MyAudioMediaPlayer(pj.AudioMediaPlayer):
- def __init__(self, player_id, sink, on_complete=None):
- pj.AudioMediaPlayer.__init__(self)
- self.player_id = player_id
- self.sink = sink
- self.on_complete = on_complete
- def onEof2(self):
- # self.stopTransmit(self.sink)
- if self.on_complete:
- self.on_complete(self.player_id)
- # Subclass to extend the Account and get notifications etc.
- class Account(pj.Account):
- def __init__(self, agent, user_part, **kwargs):
- pj.Account.__init__(self)
- self.agent = agent
- self.user_part = user_part
- self.calls = {}
- self.kwargs = kwargs
- def onRegState(self, prm):
- ai = self.getInfo()
- print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
- def onIncomingCall(self, prm):
- ai = self.getInfo()
- print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
- call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs)
- call_op_param = pj.CallOpParam(True)
- call_op_param.statusCode = pj.PJSIP_SC_OK
- call.answer(call_op_param)
- self.calls[prm.callId] = call
- class MyCall(pj.Call):
- def __init__(self, agent, acc, user_part, call_id, **kwargs):
- pj.Call.__init__(self, acc, call_id)
- self.agent = agent
- self.user_part = user_part
- self.call_id = call_id
- self.kwargs = kwargs
- self.aud_med = None
- self.audio_port = None
- self.player = None
- self.asr = None
- # self.scripts = build_demo_script()
- self.user_asr_text_queue = queue.Queue(maxsize=100)
- self.message_queue = queue.Queue(maxsize=3)
- self.player_complete_dict = {}
- from src.core.voip.asr import TestSt
- self.asr = TestSt(call_id, message_receiver=self.on_receiver_asr_result) # 创建ASR实例
- self.asr.start() # 启动ASR线程
- def onDtmfDigit(self, prm):
- digit = prm.digit
- print(f"Received DTMF digit: {digit}")
- self.user_asr_text_queue.put(f"DTMF({digit})DTMF")
- def onCallState(self, prm):
- call_info = self.getInfo()
- print("Call state: ", call_info.stateText)
- # pj.PJSIP_INV_STATE_NULL
- # pj.PJSIP_INV_STATE_CALLING
- # pj.PJSIP_INV_STATE_INCOMING
- # pj.PJSIP_INV_STATE_EARLY
- # pj.PJSIP_INV_STATE_CONNECTING
- # pj.PJSIP_INV_STATE_CONFIRMED
- # pj.PJSIP_INV_STATE_DISCONNECTED
- if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED:
- # 当呼叫状态为已确认(即接通)
- self.bot_say_hello()
- if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
- print("通话结束")
- # 远程挂机之后要将分机号回收
- self.agent.release(self.user_part)
- def onCallMediaState(self, prm):
- call_info = self.getInfo()
- print("Call Media state: ", call_info.stateText)
- for media in call_info.media:
- if media.type == pj.PJMEDIA_TYPE_AUDIO and \
- (media.status == pj.PJSUA_CALL_MEDIA_ACTIVE):
- print("Call Media state 111: ", call_info.stateText)
- self.aud_med = self.getAudioMedia(media.index)
- try:
- # 建立双向通道
- self.receive_user_speaker()
- # self.bot_say_hello()
- except Exception as e:
- traceback.print_exc()
- def receive_user_speaker(self):
- self.audio_port = MyAudioMediaPort(self, self.aud_med, self.asr)
- self.audio_port.createPort("Incoming Call Port", build_audio_format())
- self.aud_med.startTransmit(self.audio_port)
- def send_bot_speaker(self, player_file):
- if not player_file :
- return
- player_id = murmur3_32(player_file)
- self.player_complete_dict[player_id] = False
- print('self.player_complete_dict[player_id]D:', player_id, player_file, self.player_complete_dict[player_id])
- self.player = MyAudioMediaPlayer(player_id, self.aud_med, on_complete=self.on_media_player_complete)
- # self.player.createPlayer(player_file[0], pj.PJMEDIA_FILE_NO_LOOP)
- self.player.createPlaylist(player_file, f'my_hello_playlist{player_id}', pj.PJMEDIA_FILE_NO_LOOP)
- self.player.startTransmit(self.aud_med)
- def on_receiver_asr_result(self, message, *args):
- print('asr返回内容:',message)
- self.user_asr_text_queue.put(message)
- def on_media_player_complete(self, player_id):
- print('player complete')
- self.player_complete_dict[player_id] = True
- def bot_say_hello(self):
- print('bot_say_hello, come in ')
- self.chat('1', user_asr_text="SAY_HELLO")
- def chat(self,event_type, user_asr_text=None):
- # 调用文本机器人接口
- ToTextBotAgent(event_type,user_asr_text,self)
- class ToTextBotAgent:
- def __init__(self, event_type, user_asr_text, call_agent):
- if not user_asr_text:
- print("ASR文本为空,终止执行。")
- return
- self.call_agent = call_agent
- self.request_data = BotChatRequest(
- node_id="1",
- user_id="139311",
- session_id="1",
- record_id="2",
- task_id="ceshi",
- asr_text=user_asr_text
- )
- # 发送请求并处理响应
- self.test_request(self.request_data,event_type)
- def to_hang(self, action_content):
- user_part = self.call_agent.user_part_pool.get()
- self.call_agent.hangup(user_part, action_content)
- def handling_message(self, message: ChatMessage):
- print('handling_release', message)
- if message:
- action = message.action
- action_code = action.action_code
- if action_code == 'hang': # 挂断
- action_content = action.action_content
- self.to_hang(action_content)
- elif action_code == 'transfer': # 转人工
- print('todo 转人工')
- elif action_code == 'normal': # 正常通话
- self.call_agent.message_queue.put(message)
- print(f"成功获取响应: ActionCode: {message.wait_time}")
- else:
- print("文本机器人接口调用失败")
- def to_quest(self, request: BotChatRequest):
- # 将实体类转换为JSON字符串
- headers = {'Content-Type': 'application/json'}
- request_json = request.to_json_string()
- # 发送POST请求
- try:
- response = requests.post('http://example.com/api', data=request_json, headers=headers) # 使用占位URL
- if response.status_code == 200:
- # 成功,解析响应JSON
- response_data = response.json()
- return ChatResponse.from_json(json.dumps(response_data)) # 转换为JSON字符串并解析
- else:
- # 错误处理
- print(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
- return None
- except requests.RequestException as e:
- traceback.print_exc()
- return None
- # 模拟接口请求返回
- def test_request(self, params: BotChatRequest,event_type):
- print("test_request::params=", params)
- response_data = {
- "node_id": "1.0",
- "contents": [],
- "interruptable": False,
- "wait_time": "6",
- "action": {
- "action_code": "normal",
- "action_content": "正常通话"
- },
- "talk_time_out": ""
- }
- print("event_type:", event_type)
- if event_type == '1':
- response_data['contents'].append({
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/scripts/1_00.wav',
- "voice_content": "五一北京到上海的高铁票还有吗?"
- })
- elif event_type == '2':
- new_contents = [
- {
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/test111.wav',
- "voice_content": "测试第二个录音文件"
- },
- {
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/test222.wav',
- "voice_content": "五一北京到上海的高铁票还有吗?"
- }
- ]
- response_data['contents']= new_contents
- elif event_type == '6':
- response_data['contents'].append({
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/scripts/4_00.wav',
- "voice_content": "sds"
- })
- try:
- print(json.dumps(response_data['contents']))
- parsed_response = ChatMessage.from_json(response_data)
- self.handling_message(parsed_response)
- except Exception as e:
- print(f"Error in test_request: {e}")
- traceback.print_exc() # 打印完整的错误信息
- return None
- class BotAgent:
- def __init__(self, logger, user_part_range=range(1001, 1019), host="192.168.100.195", port="5060", password="slibra@#123456"):
- self.logger = logger
- self.user_part_range, self.host, self.port, self.password = user_part_range, host, port, password
- self.user_part_pool = queue.Queue(maxsize=len(user_part_range))
- self.accounts = {}
- self.calls = {}
- self.ep = pj.Endpoint()
- self.is_stopping = False
- threading.Thread(target=self.create_pjsua2, args=()).start()
- def create_pjsua2(self):
- # Create and initialize the library
- ep_cfg = pj.EpConfig()
- ep_cfg.uaConfig.threadCnt = 32
- ep_cfg.uaConfig.mainThreadOnly = False
- ep_cfg.uaConfig.maxCalls = 20
- ep_cfg.uaConfig.maxAccounts = 20
- ep_cfg.medConfig.noVad = True
- ep_cfg.logConfig.level = 3
- ep_cfg.logConfig.consoleLevel = 3
- self.ep.libCreate()
- self.ep.libInit(ep_cfg)
- aud_dev_mgr = self.ep.audDevManager()
- aud_dev_mgr.setNullDev() # 使用虚拟音频设备(如果没有实际设备)
- # Set up media configuration, particularly jitter buffer
- media_cfg = pj.MediaConfig()
- media_cfg.jbMinPre = 4 # Minimum pre-fetch frames
- media_cfg.jbMaxPre = 16 # Maximum pre-fetch frames
- media_cfg.noVad = True # Disable Voice Activity Detection if needed
- self.ep.medConfig = media_cfg # Apply media config to endpoint
- # Create SIP transport. Error handling sample is shown
- sipTpConfig = pj.TransportConfig()
- sipTpConfig.port = 30506
- self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
- # Start the library
- self.ep.libStart()
- for user_part in self.user_part_range:
- acfg = pj.AccountConfig()
- acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}"
- acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
- cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password)
- acfg.sipConfig.authCreds.append(cred)
- acfg.regConfig.timeoutSec = 86400 # 注册超时时间(秒)
- acfg.regConfig.retryIntervalSec = 10 # 重试间隔时间(秒)
- acfg.regConfig.firstRetryIntervalSec = 10 # 首次重试间隔时间(秒)
- # acfg.natConfig.iceEnabled = True
- # acfg.natConfig.turnEnabled = True
- # acfg.natConfig.turnServer = "stun:pbx.fuxicarbon.com:3478"
- # acfg.natConfig.turnUsername = "username"
- # acfg.natConfig.turnPassword = "password"
- # Create the account
- acc = Account(self, user_part)
- acc.create(acfg)
- self.user_part_pool.put(user_part)
- self.accounts[user_part] = acc
- while not self.is_stopping:
- self.ep.libHandleEvents(100)
- def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers):
- call_op_param = pj.CallOpParam(True)
- call_op_param.statusCode = pj.PJSIP_SC_OK
- call_op_param.reason = reason
- for k, v in sip_headers:
- call_op_param.headers.append(pj.SipHeader(f"sip_h_{k}", v))
- acc = self.accounts.get(user_part)
- if acc:
- for k, v in acc.calls.items():
- v.hangup(call_op_param)
- # 机器人主动挂机回收分机号
- self.release(user_part)
- def register(self, **kwargs):
- user_part = self.user_part_pool.get()
- acc = self.accounts.get(user_part)
- self.logger.info('register, user_part :%d, pool.size :%d', user_part, self.user_part_pool.qsize())
- if acc:
- print('register==========>', acc.getId())
- # ps = pj.PresenceStatus()
- # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE
- # ps.activity = pj.PJRPID_ACTIVITY_AWAY
- # ps.note = "Away"
- # acc.setOnlineStatus(ps)
- # acc.setRegistration(renew=True)
- acc.kwargs = kwargs
- return user_part
- def unregister(self, user_part):
- acc = self.accounts.get(user_part)
- if acc:
- acc.setRegistration(renew=False)
- # 用户远程挂机回收分机号
- self.release(user_part)
- def release(self, user_part):
- if not user_part:
- self.logger.info("release, user_part is None")
- return
- def element_in_queue(q, element):
- with q.mutex: # 确保线程安全
- for item in list(q.queue): # 将队列转换为列表进行遍历
- if item == element:
- return True
- return False
- if element_in_queue(self.user_part_pool, user_part):
- self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
- return
- self.user_part_pool.put(user_part)
- self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
- def destroy(self):
- self.is_stopping = True
- # Destroy the library
- self.ep.libDestroy()
- def __del__(self):
- self.destroy()
- # if __name__ == '__main__':
- # import logging
- # logger = logging.getLogger('voip')
- # bot = BotAgent(logger)
|