#!/usr/bin/env python3 # encoding:utf-8 import time import json import wave import traceback import pjsua2 as pj calls = {} recording_file = '/code/pjsip-apps/src/swig/python/incoming_call.wav' player_file = '/code/pjsip-apps/src/swig/python/test111.wav' # Subclass to extend the Account and get notifications etc. class Account(pj.Account): def __init__(self): pj.Account.__init__(self) def onRegState(self, prm): print("***OnRegState: " + prm.reason) def onIncomingCall(self, prm): print("***onIncomingCall: ", prm.callId) call = MyCall(self, prm.callId) call_op_param = pj.CallOpParam(True) call_op_param.statusCode = pj.PJSIP_SC_OK call.answer(call_op_param) calls[prm.callId] = call class MyAudioMediaPort(pj.AudioMediaPort): def __init__(self): pj.AudioMediaPort.__init__(self) # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav) self.wav = wave.open(f"{recording_file}", "wb") self.wav.setnchannels(1) self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节) self.wav.setframerate(16000) def onFrameRequested(self, frame): print("Request audio frame:", frame) def onFrameReceived(self, frame): self.wav.writeframes(bytes(frame.buf)) # print("Received audio frame:", frame.buf, frame.size) class MyCall(pj.Call): def __init__(self, acc, call_id): pj.Call.__init__(self, acc, call_id) self.call_id = call_id self.audio_port = None self.aud_med = None self.recorder = None self.player = None # 用于播放录音的媒体播放器 def onDtmfDigit(self, prm): digit = prm.digit print(f"Received DTMF digit: {digit}") def onCallState(self, prm): call_info = self.getInfo() print("Call state: ", call_info.stateText, dir(call_info)) # if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED: # # 当呼叫状态为已确认(即接通) # audio_media = self.getAudioMedia() # 获取音频媒体对象 # audio_media.startTransmit(audio_media) # 这里可以对音频流进行处理 # print ('11111111111') if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED: print("通话结束") self.delete() def onCallMediaState(self, prm): call_info = self.getInfo() print("Call Media state: ", call_info.stateText) for media in call_info.media: if media.type == pj.PJMEDIA_TYPE_AUDIO and \ (media.status == pj.PJSUA_CALL_MEDIA_ACTIVE): self.aud_med = self.getAudioMedia(media.index) try: self.audio_port = MyAudioMediaPort() self.audio_port.createPort("Incoming Call Port", self.build_audio_format()) self.aud_med.startTransmit(self.audio_port) # 录制来电声音 # self.recorder = pj.AudioMediaRecorder() # self.recorder.createRecorder(recording_file) # self.aud_med.startTransmit(self.recorder) # self.send_audio_to_asr() # 播放其它录音文件 self.player = pj.AudioMediaPlayer() self.player.createPlayer(player_file) self.player.startTransmit(self.aud_med) # 显示播放进度 # self.display_playback_progress() except Exception as e: traceback.print_exc() def build_audio_format(self): fmt = pj.MediaFormatAudio() fmt.type = pj.PJMEDIA_TYPE_AUDIO fmt.id = pj.PJMEDIA_FORMAT_PCM fmt.clockRate = 16000 # 采样率 fmt.channelCount = 1 # 通道数 fmt.frameTimeUsec = 20000 # 每帧的时间(20 毫秒) fmt.bitsPerSample = 16 # 每个采样的位数 return fmt def display_playback_progress(self): while self.player: # 获取当前播放位置和总时长 current_pos = self.player.getPos() player_info = self.player.getInfo() print (current_pos, player_info.sizeBytes, player_info.sizeSamples) total_duration = player_info.sizeBytes if total_duration > 0: progress = (current_pos / total_duration) * 100 print(f"播放进度: {progress:.2f}%") time.sleep(1) # 每隔1秒更新一次进度 def stopMedia(self): pass # if self.player: # self.player.stopTransmit(self.aud_med) # self.player = None # if self.recorder: # self.recorder.stopTransmit(self.aud_med) # self.recorder = None def delete(self): self.stopMedia() # pjsua2 test function def pjsua2_test(): # Create and initialize the library ep = pj.Endpoint() ep_cfg = pj.EpConfig() try: ep.libCreate() ep.libInit(ep_cfg) aud_dev_mgr = ep.audDevManager() aud_dev_mgr.setNullDev() # 使用虚拟音频设备(如果没有实际设备) # Create SIP transport. Error handling sample is shown sipTpConfig = pj.TransportConfig() sipTpConfig.port = 30506 ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig) # Start the library ep.libStart() acfg = pj.AccountConfig() acfg.idUri = "sip:1001@172.16.12.16:5060" acfg.regConfig.registrarUri = "sip:172.16.12.16:5060" cred = pj.AuthCredInfo("digest", "*", "1001", 0, "slibra@#123456") # acfg.idUri = "sip:1013@test-bot.shuiditech.com:30506" # acfg.regConfig.registrarUri = "sip:test-bot.shuiditech.com:30506" # cred = pj.AuthCredInfo("digest", "*", "1013", 0, "000000") acfg.sipConfig.authCreds.append(cred) # Create the account acc = Account() acc.create(acfg) try: while True: ep.libHandleEvents(100) except KeyboardInterrupt: print('Existing...') # while True: # # Here we don't have anything else to do.. # time.sleep(10); finally: # Destroy the library ep.libDestroy() # # main() # if __name__ == "__main__": pjsua2_test()