|
- #!/usr/bin/env python3
- # encoding:utf-8
- import os
- import time
- import json
- import wave
- import queue
- import threading
- import traceback
- import sys
- import pjsua2 as pj
- from enum import Enum
- from datetime import datetime
- from src.core.callcenter import registry
- from src.core.callcenter.cache import Cache
- from src.core.datasource import SIP_SERVER, SERVE_HOST
- from src.core.voip.constant import *
- import requests
- from src.core.callcenter.api import BotChatRequest,ChatMessage
- from src.core import singleton_keys
- from src.core.callcenter.snowflake import Snowflake
- from src.core.callcenter.data_handler import *
- calls = {}
- # recording_file = '/code/src/core/voip/incoming_call.wav'
- # player_file1 = '/code/src/core/voip/test111.wav'
- # player_file2 = '/code/src/core/voip/test222.wav'
- class BotStatus(Enum):
- # 等待用户讲话,未超时
- wait_speaking_not_timeout = 1
- # 等待用户讲话,已超时但用户仍在持续讲话
- wait_speaking_timeout_user_speaking = 2
- # 机器人讲话过程中
- speaking = 3
- # 等待用户讲话结束
- wait_user_speaking_end = 4
- # 等待大模型输出
- wait_gpt_return = 5
- class UserStatus(Enum):
- # ASR返回值为sentenceBegin后将状态置为Speaking
- speaking = 1
- # ASR返回值为sentenceEnd后,将状态置为Silence
- silence = 2
- class MyAudioMediaPort(pj.AudioMediaPort):
- def __init__(self, call, aud_media=None, asr=None):
- pj.AudioMediaPort.__init__(self)
- # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav)
- # self.wav = wave.open(f"{recording_file}", "wb")
- # self.wav.setnchannels(1)
- # self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节)
- # self.wav.setframerate(16000)
- print("MyAudioMediaPort::come in, debugger")
- self.call = call
- self.aud_media = aud_media
- self.asr = asr
- self.first = True
- self.user_asr_texts = []
- def onFrameRequested(self, frame):
- print("Request audio frame:", frame)
- def onFrameReceived(self, frame):
- # self.wav.writeframes(bytes(frame.buf))
- if self.asr: # 如果ASR实例存在,则发送音频数据
- if self.first:
- self.first = False
- self.logger.info("Received audio frame: %s %s", frame.buf, frame.size)
- self.asr.send_audio(frame.buf)
- try:
- asr_text = self.get_asr_text()
- play_complete = self.call.is_play_complete()
- current_time = time.time() # 实时当前时间
- if self.call.inputType == '1.0':
- time_difference = int(current_time - self.call.inputLongStart)
- # print('current_time - self.call.inputLongStart:',time_difference > 35, self.call.txtLock , play_complete)
- if time_difference > 35 and play_complete:
- self.user_asr_texts.append(f"DTMF({self.call.digit})DTMF")
- user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
- self.user_asr_texts.clear()
- self.call.chat(user_asr_text)
- # print("测试超长", user_asr_text)
- elif asr_text:
- self.user_asr_texts.append(asr_text)
- if time_difference > int(self.call.wait_time):
- self.call.reset_wait_time()
- else:
- if asr_text and not play_complete:
- self.user_asr_texts.append(asr_text)
- if (asr_text and play_complete) or (play_complete and self.user_asr_texts):
- if asr_text:
- self.user_asr_texts.append(asr_text)
- user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
- self.user_asr_texts.clear()
- self.call.chat(user_asr_text)
- if self.call.wait_time and self.call.wait_time != "0" and play_complete and not asr_text:
- self.call.wait_time_check(current_time, self.call.wait_time)
- message_queue_size = self.call.message_queue.qsize()
- if (message_queue_size > 0 and not self.call.cur_player_file) or (message_queue_size > 0 and play_complete):
- # self.call.logger.info('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text)
- self.call.cur_player_file, self.call.wait_time, self.call.inputType,self.call.action, self.call.node_id = self.get_player_file()
- # 重置播放完成标志和超时计时器,确保新的播放从头开始计时
- self.call.reset_wait_time()
- self.call.send_bot_speaker(self.call.cur_player_file)
- except:
- pass
- def get_asr_text(self):
- try:
- asr_text = self.call.user_asr_text_queue.get(block=False)
- self.call.logger.info('get_asr_text: %s', asr_text)
- return asr_text
- except:
- pass
- def get_player_file(self):
- try:
- message = self.call.message_queue.get(block=False)
- player_file = [item.voice_url for item in message.contents if item.content_type == 'voice']
- return player_file, message.wait_time, message.inputType, message.action, message.node_id
- except Exception as e:
- traceback.print_exc()
- class MyAudioMediaPlayer(pj.AudioMediaPlayer):
- def __init__(self, player_id, sink, on_complete=None):
- pj.AudioMediaPlayer.__init__(self)
- self.player_id = player_id
- self.sink = sink
- self.on_complete = on_complete
- def onEof2(self):
- # self.stopTransmit(self.sink)
- if self.on_complete:
- self.on_complete(self.player_id)
- # Subclass to extend the Account and get notifications etc.
- class Account(pj.Account):
- def __init__(self, agent, user_part, **kwargs):
- pj.Account.__init__(self)
- self.agent = agent
- self.user_part = user_part
- self.calls = {}
- self.kwargs = kwargs
- def onRegState(self, prm):
- ai = self.getInfo()
- print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
- def onIncomingCall(self, prm):
- ai = self.getInfo()
- print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
- call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs)
- call_op_param = pj.CallOpParam(True)
- call_op_param.statusCode = pj.PJSIP_SC_OK
- call.answer(call_op_param)
- self.calls[prm.callId] = call
- class MyCall(pj.Call):
- def __init__(self, agent, acc, user_part, call_id, **kwargs):
- pj.Call.__init__(self, acc, call_id)
- self.agent = agent
- self.logger = agent.logger
- self.user_part = user_part
- self.call_id = call_id
- self.kwargs = kwargs
- self.aud_media = None
- self.audio_port = None
- self.audio_player = None
- self.asr = None
- self.session_id = kwargs.get('variable_sip_h_P-LIBRA-CallId')
- self.device_id = kwargs.get('variable_sip_h_P-LIBRA-DeviceId')
- self.call_phone = kwargs.get("Caller-Caller-ID-Number")
- self.logger.info("self.session_id:%s, self.call_phone:%s", self.session_id,self.call_phone)
- self.taskId = "10001"
- # self.scripts = build_demo_script()
- self.user_asr_text_queue = queue.Queue(maxsize=100)
- self.message_queue = queue.Queue(maxsize=3)
- self.player_complete_dict = {}
- self.wait_time = None
- self.inputType = None #记录按键类型 1为长按键类型
- self.digit = '' # 存储长按键内容
- self.action = None
- self.node_id= 'start'
- self.cur_player_file = None #当前播放的文件
- from src.core.voip.asr import TestSt
- self.asr = TestSt(call_id, message_receiver=self.on_receiver_asr_result) # 创建ASR实例
- self.asr.start() # 启动ASR线程
- self.start_time = time.time() # 当前机器人对话开始时间
- # 超时设置
- self.play_start_time = time.time() # 倒计时开始时间
- self.play_complete_flag = False # 倒计时开始标志
- self.txtLock = False
- self.inputLongStart = time.time() #长按键开始时间
- self.inter_action_total = 0
- self.statistics_lock = False
- def wait_time_check(self, current_time, wait_time):
- try:
- # 确保 wait_time 是整数类型
- wait_time = int(wait_time)
- # 如果播放尚未完成,重置标志并返回
- if not self.play_complete_flag:
- self.play_complete_flag = True
- self.play_start_time = current_time
- # print(f"开始计时: {self.play_start_time}")
- elapsed_time = int(current_time - self.play_start_time)
- # print(f"当前时间: {current_time}, 已过时间: {elapsed_time}, 最大等待时间: {wait_time}")
- if elapsed_time > wait_time:
- # self.user_asr_text_queue.put("ASR408error")
- self.chat('ASR408error')
- registry.BOT_ASR_408.labels(self.taskId).inc()
- except ValueError as e:
- self.logger.info(f"无效的等待时间参数: {wait_time}, 错误: {e}")
- self.reset_wait_time()
- def reset_wait_time(self):
- self.play_complete_flag = False # 重置播放完成标志
- self.play_start_time = None # 重置开始计时时间
- def is_play_complete(self): #语音机器人是否播放结束
- if self.cur_player_file:
- player_id = murmur3_32(self.cur_player_file)
- return self.player_complete_dict.get(player_id)
- def onDtmfDigit(self, prm):
- # 判断是否播放完成 否则不记录用户说的内容
- if not self.is_play_complete():
- return
- digit = prm.digit
- self.reset_wait_time()
- # 假设为超长类型按键 把用户输入的按键进行拼接 如果为# 则把用户输入所有按键放入队列并发送文本机器人
- # 如果为非正常按键服务 输入以后直接发送文本机器人
- if self.inputType == '1.0':
- if digit != '#':
- self.digit += digit
- elif digit == '#':
- # self.user_asr_text_queue.put(f"DTMF({self.digit})DTMF")
- self.chat(f"DTMF({self.digit})DTMF")
- else:
- self.user_asr_text_queue.put(f"DTMF({digit})DTMF")
- def onCallState(self, prm):
- call_info = self.getInfo()
- # pj.PJSIP_INV_STATE_NULL
- # pj.PJSIP_INV_STATE_CALLING
- # pj.PJSIP_INV_STATE_INCOMING
- # pj.PJSIP_INV_STATE_EARLY
- # pj.PJSIP_INV_STATE_CONNECTING
- # pj.PJSIP_INV_STATE_CONFIRMED
- # pj.PJSIP_INV_STATE_DISCONNECTED
- if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED:
- # 当呼叫状态为已确认(即接通)
- self.bot_say_hello()
- if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
- self.logger.info("通话结束:%s", self.user_part)
- self.release()
- def onCallMediaState(self, prm):
- call_info = self.getInfo()
- # print("Call Media state: ", call_info.stateText)
- for media in call_info.media:
- if media.type == pj.PJMEDIA_TYPE_AUDIO and \
- (media.status == pj.PJSUA_CALL_MEDIA_ACTIVE):
- self.logger.info("Call Media state 111: %s", call_info.stateText)
- self.aud_media = self.getAudioMedia(media.index)
- try:
- # 建立双向通道
- self.receive_user_speaker()
- # self.bot_say_hello()
- except Exception as e:
- traceback.print_exc()
- def receive_user_speaker(self):
- self.audio_port = MyAudioMediaPort(self, self.aud_media, self.asr)
- self.audio_port.createPort("Incoming Call Port", build_audio_format())
- self.aud_media.startTransmit(self.audio_port)
- def send_bot_speaker(self, player_file):
- if not player_file :
- return
- player_id = murmur3_32(player_file)
- self.player_complete_dict[player_id] = False
- # print('self.player_complete_dict[player_id]D:', player_id, player_file, self.player_complete_dict[player_id])
- self.logger.info(f"[DEBUG] Sending bot speaker, player_file: {player_file}, player_id: {player_id}")
- self.audio_player = MyAudioMediaPlayer(player_id, self.aud_media, on_complete=self.on_media_player_complete)
- # self.audio_player.createPlayer(player_file[0], pj.PJMEDIA_FILE_NO_LOOP)
- self.audio_player.createPlaylist(player_file, f'my_hello_playlist{player_id}', pj.PJMEDIA_FILE_NO_LOOP)
- self.audio_player.startTransmit(self.aud_media)
- def on_receiver_asr_result(self, message, *args):
- # 判断是否播放完成 否则不记录用户说的内容
- if not self.is_play_complete():
- return
- message = json.loads(message)
- if message["header"]["status"] == 20000000:
- if message["header"]["name"] == "SentenceEnd":
- result = message["payload"]["result"]
- # self.logger.info("asr返回内容Result:%s", result)
- self.user_asr_text_queue.put(result)
- elif message["header"]["name"] == "TranscriptionResultChanged":
- self.reset_wait_time()
- else:
- self.logger.info(f"Status is not {message['header']['status']}")
- registry.ASR_ERRORS.labels(message['header']['status']).inc()
- def on_media_player_complete(self, player_id):
- self.logger.info('player complete')
- self.player_complete_dict[player_id] = True
- self.digit = ''
- self.inputLongStart = time.time()
- #播放完毕执行的动作
- self.say_end_action(self.action)
- def bot_say_hello(self):
- self.chat(user_asr_text="start")
- def chat(self, user_asr_text=None):
- # 调用文本机器人接口
- ToTextBotAgent(user_asr_text,self)
- def say_end_action(self, action):
- self.logger.info('handling_release %s', action.action_code)
- action_code = action.action_code
- if action_code == 'hang': # 挂断
- self.agent.hangup(self.user_part)
- self.end_statistics()
- elif action_code == 'transfer': # 转人工
- self.agent.transfer(user_part=self.user_part, call_id=self.session_id, device_id=self.device_id)
- self.end_statistics()
- #更新通话记录
- self.agent.dataHandleServer.update_record(self.session_id, service_category=2)
- def end_statistics(self):
- if not self.statistics_lock:
- self.statistics_lock = True
- self.logger.info(f"self.inter_action_total:{self.inter_action_total}")
- latency = (time.time() - self.start_time)
- registry.BOT_CALL_DURATION.labels(self.taskId).observe(latency)
- registry.BOT_INTERACTION_ROUNDS.labels(self.taskId).observe(self.inter_action_total)
- def release(self):
- self.logger.info('liuwei::debugger::release:: come in ')
- if self.audio_port:
- # self.audio_port.stopTransmit()
- self.audio_port = None # 或调用相关销毁方法
- if self.audio_player:
- # self.audio_player.stopTransmit()
- self.audio_player = None # 或调用播放器停止方法
- if self.aud_media:
- # self.audio_media.stopTransmit()
- self.aud_media = None
- self.asr.close()
- # 远程挂机之后要将分机号回收
- self.agent.hangup(self.user_part)
- # self.agent.release(self.user_part)
- self.end_statistics()
- class ToTextBotAgent:
- def __init__(self, user_asr_text, call_agent):
- if not user_asr_text or (call_agent.action and call_agent.action.action_code != 'normal'):
- # print("ASR文本为空,终止执行。")
- return
- self.call_agent = call_agent
- self.request_data = BotChatRequest(
- nodeId=self.call_agent.node_id,
- userId=self.call_agent.call_phone,
- sessionId= self.call_agent.session_id,
- recordId="",
- taskId=self.call_agent.taskId,
- asrText=user_asr_text,
- ext= None
- )
- self.call_agent.logger.info("user_asr_text发送结果: %s", user_asr_text)
- if user_asr_text != 'ASR408error':
- self.call_agent.inter_action_total += 1
- # 发送请求并处理响应
- # self.test_request(self.request_data)
- self.to_quest(self.request_data)
- def to_quest(self, request: BotChatRequest):
- headers = {'Content-Type': 'application/json'}
- request_data = request.to_json_string()
- url = f"http://{SERVE_HOST}:40072/botservice"
- start_time = time.time()
- self.call_agent.logger.info(f"请求数据:{request_data},url:{url}")
- with requests.Session() as session:
- session.headers.update({'Content-Type': 'application/json'})
- try:
- response = session.post(url=url, json=json.loads(request_data), timeout=10)
- # response = requests.post(url=url, json=json.loads(request_data), headers=headers, timeout=10) # 使用占位URL
- self.call_agent.logger.info(f"sessionId={ self.call_agent.session_id}, chat::request:{request_data}, response:{response.text}")
- if response.status_code == 200:
- response_data = response.json()
- if "data" in response_data and response_data["code"]==0:
- data = response_data["data"]
- parsed_response = ChatMessage.from_json(data)
- self.call_agent.message_queue.put(parsed_response)
- else:
- self.call_agent.logger.info("响应中没有 'data' 字段")
- else:
- # 错误处理
- self.call_agent.logger.info(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
- except requests.RequestException as e:
- traceback.print_exc()
- self.call_agent.logger.error(f"TaskId={request.taskId}, 请求发生异常: {e}, URL: {url}")
- finally:
- latency = (time.time() - start_time)
- registry.BOT_REQUEST_COUNT.inc()
- registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
- session.close()
- # 模拟接口请求返回
- def test_request(self, params: BotChatRequest):
- self.call_agent.logger.info("test_request::params= %s", params)
- response_data = {
- "node_id": "1.0",
- "contents": [],
- "wait_time": "6",
- "action": {
- "action_code": "normal",
- "action_content": "正常通话"
- },
- "inputType": "0"
- }
- self.call_agent.logger.info("asrText: %s", params.asrText)
- if params.asrText == 'start': #欢迎语
- response_data['contents'].append({
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/scripts/1_00.wav',
- "voice_content": "五一北京到上海的高铁票还有吗?"
- })
- response_data['inputType'] = '1.0'
- elif params.asrText == 'ASR408error': #超时执行
- response_data['contents'].append({
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/scripts/4_00.wav',
- "voice_content": "waitTime超时"
- })
- elif "DTMF" in params.asrText and self.call_agent.inputType =='1.0': #长按键超30s执行
- response_data['contents'].append({
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/scripts/2_00.wav',
- "voice_content": "sds"
- })
- else :
- response_data['contents'] = [
- {
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/test111.wav',
- "voice_content": "测试第二个录音文件"
- },
- {
- "content_type": "voice",
- "content": "",
- "voice_url": '/code/src/core/voip/test222.wav',
- "voice_content": "五一北京到上海的高铁票还有吗?"
- }
- ]
- try:
- parsed_response = ChatMessage.from_json(response_data)
- self.call_agent.message_queue.put(parsed_response)
- except Exception as e:
- traceback.print_exc() # 打印完整的错误信息
- return None
- @singleton_keys
- class BotAgent:
- def __init__(self, app, user_part_range=range(1001, 1011), host="192.168.100.159", port="5060", password="slibra@#123456"):
- self.app = app
- self.logger = app.logger
- self.user_part_range, self.host, self.port, self.password = user_part_range, host, port, password
- self.user_part_pool = queue.Queue(maxsize=len(user_part_range))
- self.accounts = {}
- self.calls = {}
- self.ep = pj.Endpoint()
- self.is_stopping = False
- self.acd_service = None
- self.cache = Cache(app)
- self.dataHandleServer = DataHandleServer(app)
- threading.Thread(target=self.create_pjsua2, daemon=True).start()
- def create_pjsua2(self, timeout_sec=86400):
- start_time = time.time()
- try:
- self.cache.set_register_per_hours(expire=timeout_sec - (60*3))
- # Create and initialize the library
- ep_cfg = pj.EpConfig()
- ep_cfg.uaConfig.threadCnt = 12
- ep_cfg.uaConfig.mainThreadOnly = False
- ep_cfg.uaConfig.maxCalls = 20
- ep_cfg.uaConfig.maxAccounts = 20
- ep_cfg.medConfig.noVad = True
- ep_cfg.logConfig.level = 3
- ep_cfg.logConfig.consoleLevel = 3
- self.ep.libCreate()
- self.ep.libInit(ep_cfg)
- aud_dev_mgr = self.ep.audDevManager()
- aud_dev_mgr.setNullDev() # 使用虚拟音频设备(如果没有实际设备)
- # Set up media configuration, particularly jitter buffer
- media_cfg = pj.MediaConfig()
- media_cfg.jbMinPre = 4 # Minimum pre-fetch frames
- media_cfg.jbMaxPre = 16 # Maximum pre-fetch frames
- media_cfg.noVad = True # Disable Voice Activity Detection if needed
- self.ep.medConfig = media_cfg # Apply media config to endpoint
- # Create SIP transport. Error handling sample is shown
- sipTpConfig = pj.TransportConfig()
- sipTpConfig.port = 30506
- self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
- # Start the library
- self.ep.libStart()
- for user_part in self.user_part_range:
- acfg = pj.AccountConfig()
- acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}"
- acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
- cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password)
- acfg.sipConfig.authCreds.append(cred)
- acfg.regConfig.timeoutSec = timeout_sec # 注册超时时间(秒)
- acfg.regConfig.retryIntervalSec = 10 # 重试间隔时间(秒)
- acfg.regConfig.firstRetryIntervalSec = 10 # 首次重试间隔时间(秒)
- # acfg.natConfig.iceEnabled = True
- # acfg.natConfig.turnEnabled = True
- # acfg.natConfig.turnServer = f"stun:{self.host}:3478"
- # acfg.natConfig.turnUsername = "username"
- # acfg.natConfig.turnPassword = "password"
- # Create the account
- acc = Account(self, user_part=user_part)
- acc.create(acfg)
- self.user_part_pool.put(user_part)
- self.accounts[user_part] = acc
- except:
- traceback.print_exc()
- finally:
- latency = time.time() - start_time
- registry.BOT_CREATE_ACCOUNT_LATENCY.observe(latency)
- self.logger.info("create pjsua latency: %.3fs", latency)
- while not self.is_stopping:
- registry.BOT_AGENT_LIVES.set(self.user_part_pool.qsize())
- self.register_per_hours()
- self.ep.libHandleEvents(100)
- def register_per_hours(self):
- _lock = self.cache.lock_register_per_hours()
- if not _lock and len(self.accounts) == len(self.user_part_range):
- self.restart()
- def transfer(self, user_part, call_id, device_id, service_id='00000000000000000'):
- if self.acd_service:
- self.acd_service.transfer_to_agent(call_id, device_id, service_id, hold=True)
- # sip_headers = {'P-LIBRA-HangUpReason': 'transferToAgent', 'P-LIBRA-ServiceId': service_id}
- try_count = 100
- while try_count >0:
- if self.cache.get_after_play_hold_music(call_id):
- self.hangup(user_part)
- break
- time.sleep(0.1)
- def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers):
- call_op_param = pj.CallOpParam(True)
- call_op_param.statusCode = pj.PJSIP_SC_OK
- call_op_param.reason = reason
- call_op_param.txOption = pj.SipTxOption()
- sip_header_vector = pj.SipHeaderVector()
- for k, v in sip_headers.items():
- _sip_header = pj.SipHeader()
- _sip_header.hName = str(k)
- _sip_header.hValue = str(v)
- self.logger.info('hangup, header_name=%s, header_value=%s'%(k, v))
- sip_header_vector.push_back(_sip_header)
- call_op_param.txOption.headers = sip_header_vector
- try:
- acc = self.accounts.get(user_part)
- if acc:
- for k, v in acc.calls.items():
- try:
- v.hangup(call_op_param)
- except:
- traceback.print_exc()
- except:
- traceback.print_exc()
- finally:
- # 机器人主动挂机回收分机号
- self.release(user_part)
- def register(self, **kwargs):
- self.logger.info('register,come in, pool.size :%d', self.user_part_pool.qsize())
- user_part = self.user_part_pool.get()
- acc = self.accounts.get(user_part)
- self.logger.info('register, user_part :%d, pool.size :%d', user_part, self.user_part_pool.qsize())
- if acc:
- self.logger.info('register==========> %s', acc.getId())
- # ps = pj.PresenceStatus()
- # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE
- # ps.activity = pj.PJRPID_ACTIVITY_AWAY
- # ps.note = "Away"
- # acc.setOnlineStatus(ps)
- # acc.setRegistration(renew=True)
- acc.kwargs = kwargs
- return user_part
- def unregister(self, user_part):
- acc = self.accounts.get(user_part)
- if acc:
- acc.setRegistration(renew=False)
- # 用户远程挂机回收分机号
- self.release(user_part)
- def release(self, user_part):
- if not user_part:
- self.logger.info("release, user_part is None")
- return
- def element_in_queue(q, element):
- with q.mutex: # 确保线程安全
- for item in list(q.queue): # 将队列转换为列表进行遍历
- if item == element:
- return True
- return False
- if element_in_queue(self.user_part_pool, user_part):
- self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
- return
- self.user_part_pool.put(user_part)
- self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
- def restart(self):
- self.destroy()
- self.is_stopping = False
- threading.Thread(target=self.create_pjsua2, daemon=True).start()
- def destroy(self):
- self.is_stopping = True
- try:
- while not self.user_part_pool.empty():
- self.user_part_pool.get_nowait()
- except:
- pass
- self.accounts.clear()
- self.calls.clear()
- # Destroy the library
- self.ep.libDestroy()
- def __del__(self):
- self.destroy()
- # if __name__ == '__main__':
- # import logging
- # logger = logging.getLogger('voip')
- # bot = BotAgent(logger)
|