|
- #!/usr/bin/env python3
- # encoding:utf-8
- import ctypes
- import os
- import time
- import json
- import wave
- import queue
- import threading
- import traceback
- import sys
- import pjsua2 as pj
- from enum import Enum
- from datetime import datetime
- from multiprocessing import Process
- from apscheduler.schedulers.background import BackgroundScheduler
- from src.core.callcenter import registry
- from src.core.callcenter.cache import Cache
- from src.core.datasource import SIP_SERVER, SERVE_HOST
- from src.core.voip.constant import *
- import requests
- from src.core.callcenter.api import BotChatRequest,ChatMessage
- from src.core import singleton_keys
- from src.core.callcenter.snowflake import Snowflake
- from src.core.callcenter.data_handler import *
- calls = {}
- # recording_file = '/code/src/core/voip/incoming_call.wav'
- # player_file1 = '/code/src/core/voip/test111.wav'
- # player_file2 = '/code/src/core/voip/test222.wav'
- from src.core.voip.asr import TestSt
- class BotStatus(Enum):
- # 等待用户讲话,未超时
- wait_speaking_not_timeout = 1
- # 等待用户讲话,已超时但用户仍在持续讲话
- wait_speaking_timeout_user_speaking = 2
- # 机器人讲话过程中
- speaking = 3
- # 等待用户讲话结束
- wait_user_speaking_end = 4
- # 等待大模型输出
- wait_gpt_return = 5
- class UserStatus(Enum):
- # ASR返回值为sentenceBegin后将状态置为Speaking
- speaking = 1
- # ASR返回值为sentenceEnd后,将状态置为Silence
- silence = 2
- class MyAudioMediaPort(pj.AudioMediaPort):
- def __init__(self, call, audio_media=None, asr=None):
- pj.AudioMediaPort.__init__(self)
- # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav)
- # self.wav = wave.open(f"{recording_file}", "wb")
- # self.wav.setnchannels(1)
- # self.wav.setsampwidth(2) # 假设每个样本是 16 位(2 字节)
- # self.wav.setframerate(16000)
- print("MyAudioMediaPort::come in, debugger")
- self.call = call
- self.audio_media = audio_media
- self.asr = asr
- self.first = True
- self.user_asr_texts = []
- def onFrameRequested(self, frame):
- print("Request audio frame:", frame)
- def onFrameReceived(self, frame):
- # self.wav.writeframes(bytes(frame.buf))
- if self.asr: # 如果ASR实例存在,则发送音频数据
- if self.first:
- self.first = False
- self.call.logger.info("Received audio frame: %s %s", frame.buf, frame.size)
- self.asr.send_audio(frame.buf)
- try:
- asr_text = self.get_asr_text()
- play_complete = self.call.is_play_complete()
- current_time = time.time() # 实时当前时间
- if self.call.inputType == '1.0':
- time_difference = int(current_time - self.call.inputLongStart)
- # print('current_time - self.call.inputLongStart:',time_difference > 35, self.call.txtLock , play_complete)
- if time_difference > 35 and play_complete:
- self.user_asr_texts.append(f"DTMF({self.call.digit})DTMF")
- user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
- self.user_asr_texts.clear()
- self.call.chat(user_asr_text)
- # print("测试超长", user_asr_text)
- elif asr_text:
- self.user_asr_texts.append(asr_text)
- if time_difference > int(self.call.wait_time):
- self.call.reset_wait_time()
- else:
- if asr_text and not play_complete:
- self.user_asr_texts.append(asr_text)
- if (asr_text and play_complete) or (play_complete and self.user_asr_texts):
- if asr_text:
- self.user_asr_texts.append(asr_text)
- user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
- self.user_asr_texts.clear()
- self.call.chat(user_asr_text)
- if self.call.wait_time and self.call.wait_time != "0" and play_complete and not asr_text:
- self.call.wait_time_check(current_time, self.call.wait_time)
- message_queue_size = self.call.message_queue.qsize()
- if (message_queue_size > 0 and not self.call.cur_player_file) or (message_queue_size > 0 and play_complete):
- # self.call.logger.info('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text)
- self.call.cur_player_file, self.call.wait_time, self.call.inputType,self.call.action, self.call.node_id = self.get_player_file()
- # 重置播放完成标志和超时计时器,确保新的播放从头开始计时
- self.call.reset_wait_time()
- self.call.send_bot_speaker(self.call.cur_player_file)
- except:
- pass
- def get_asr_text(self):
- try:
- asr_text = self.call.user_asr_text_queue.get(block=False)
- self.call.logger.info('get_asr_text: %s', asr_text)
- return asr_text
- except:
- pass
- def get_player_file(self):
- try:
- message = self.call.message_queue.get(block=False)
- player_file = [item.voice_url for item in message.contents if item.content_type == 'voice']
- return player_file, message.wait_time, message.inputType, message.action, message.node_id
- except Exception as e:
- traceback.print_exc()
- class MyAudioMediaPlayer(pj.AudioMediaPlayer):
- def __init__(self, player_id, sink, on_complete=None):
- pj.AudioMediaPlayer.__init__(self)
- self.player_id = player_id
- self.sink = sink
- self.on_complete = on_complete
- def onEof2(self):
- # self.stopTransmit(self.sink)
- if self.on_complete:
- self.on_complete(self.player_id)
- # Subclass to extend the Account and get notifications etc.
- class Account(pj.Account):
- def __init__(self, agent, user_part, **kwargs):
- pj.Account.__init__(self)
- self.agent = agent
- self.user_part = user_part
- self.calls = {}
- self.kwargs = kwargs
- def onRegState(self, prm):
- ai = self.getInfo()
- print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
- def onIncomingCall(self, prm):
- # self.agent.logger.info("daviddebugger::onIncomingCall::%s ", prm.callId)
- ai = self.getInfo()
- print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)
- call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs)
- call_op_param = pj.CallOpParam(True)
- call_op_param.statusCode = pj.PJSIP_SC_OK
- call.answer(call_op_param)
- self.calls[prm.callId] = call
- class MyCall(pj.Call):
- def __init__(self, agent, acc, user_part, call_id, **kwargs):
- pj.Call.__init__(self, acc, call_id)
- self.agent = agent
- self.logger = agent.logger
- self.user_part = user_part
- self.call_id = call_id
- self.kwargs = kwargs
- self.audio_media = None
- self.audio_port = None
- self.audio_player = None
- self.asr = None
- self.session_id = kwargs.get('variable_sip_h_P-LIBRA-CallId')
- self.device_id = kwargs.get('variable_sip_h_P-LIBRA-DeviceId')
- self.call_phone = kwargs.get("Caller-Caller-ID-Number")
- self.logger.info("daviddebugger::self.session_id:%s, self.call_phone:%s", self.session_id,self.call_phone)
- self.taskId = "10001"
- # self.scripts = build_demo_script()
- self.user_asr_text_queue = queue.Queue(maxsize=100)
- self.message_queue = queue.Queue(maxsize=3)
- self.player_complete_dict = {}
- self.wait_time = None
- self.inputType = None #记录按键类型 1为长按键类型
- self.digit = '' # 存储长按键内容
- self.action = None
- self.node_id= 'start'
- self.cur_player_file = None #当前播放的文件
- self.asr = TestSt(self.session_id, message_receiver=self.on_receiver_asr_result) # 创建ASR实例
- self.asr.start() # 启动ASR线程
- self.start_time = time.time() # 当前机器人对话开始时间
- # 超时设置
- self.play_start_time = time.time() # 倒计时开始时间
- self.play_complete_flag = False # 倒计时开始标志
- self.txtLock = False
- self.inputLongStart = time.time() #长按键开始时间
- self.inter_action_total = 0
- self.statistics_lock = False
- def wait_time_check(self, current_time, wait_time):
- try:
- # 确保 wait_time 是整数类型
- wait_time = int(wait_time)
- # 如果播放尚未完成,重置标志并返回
- if not self.play_complete_flag:
- self.play_complete_flag = True
- self.play_start_time = current_time
- # print(f"开始计时: {self.play_start_time}")
- elapsed_time = int(current_time - self.play_start_time)
- # print(f"当前时间: {current_time}, 已过时间: {elapsed_time}, 最大等待时间: {wait_time}")
- if elapsed_time > wait_time:
- # self.user_asr_text_queue.put("ASR408error")
- self.chat('ASR408error')
- registry.BOT_ASR_408.labels(self.taskId).inc()
- self.reset_wait_time()
- except ValueError as e:
- self.logger.info(f"无效的等待时间参数: {wait_time}, 错误: {e}")
- self.reset_wait_time()
- def reset_wait_time(self):
- self.play_complete_flag = False # 重置播放完成标志
- self.play_start_time = None # 重置开始计时时间
- def is_play_complete(self): #语音机器人是否播放结束
- if self.cur_player_file:
- player_id = murmur3_32(self.cur_player_file)
- return self.player_complete_dict.get(player_id)
- def onDtmfDigit(self, prm):
- # 判断是否播放完成 否则不记录用户说的内容
- if not self.is_play_complete():
- return
- digit = prm.digit
- self.reset_wait_time()
- # 假设为超长类型按键 把用户输入的按键进行拼接 如果为# 则把用户输入所有按键放入队列并发送文本机器人
- # 如果为非正常按键服务 输入以后直接发送文本机器人
- if self.inputType == '1.0':
- if digit != '#':
- self.digit += digit
- elif digit == '#':
- # self.user_asr_text_queue.put(f"DTMF({self.digit})DTMF")
- self.chat(f"DTMF({self.digit})DTMF")
- else:
- self.user_asr_text_queue.put(f"DTMF({digit})DTMF")
- def onCallState(self, prm):
- call_info = self.getInfo()
- # pj.PJSIP_INV_STATE_NULL
- # pj.PJSIP_INV_STATE_CALLING
- # pj.PJSIP_INV_STATE_INCOMING
- # pj.PJSIP_INV_STATE_EARLY
- # pj.PJSIP_INV_STATE_CONNECTING
- # pj.PJSIP_INV_STATE_CONFIRMED
- # pj.PJSIP_INV_STATE_DISCONNECTED
- self.logger.info("daviddebugger::onCallState::[%s,%s] %s", pj.PJSIP_INV_STATE_CONFIRMED, pj.PJSIP_INV_STATE_DISCONNECTED, call_info.state)
- if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED:
- # 当呼叫状态为已确认(即接通)
- self.bot_say_hello()
- if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
- self.logger.info("通话结束:%s", self.user_part)
- self.release()
- def onCallMediaState(self, prm):
- call_info = self.getInfo()
- # print("Call Media state: ", call_info.stateText)
- for media in call_info.media:
- if media.type == pj.PJMEDIA_TYPE_AUDIO and \
- (media.status == pj.PJSUA_CALL_MEDIA_ACTIVE):
- self.logger.info("Call Media state 111: %s", call_info.stateText)
- self.audio_media = self.getAudioMedia(media.index)
- try:
- # 建立双向通道
- self.receive_user_speaker()
- # self.bot_say_hello()
- except Exception as e:
- traceback.print_exc()
- def receive_user_speaker(self):
- self.audio_port = MyAudioMediaPort(self, self.audio_media, self.asr)
- self.audio_port.createPort("Incoming Call Port", build_audio_format())
- self.audio_media.startTransmit(self.audio_port)
- def send_bot_speaker(self, player_file):
- if not player_file :
- return
- if not self.isActive():
- return
- player_id = murmur3_32(player_file)
- self.player_complete_dict[player_id] = False
- for f in player_file:
- if not os.path.isfile(f):
- self.logger.info(f"Sending bot speaker, not exists, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
- return
- key = murmur3_32(str(datetime.now().timestamp()))
- self.agent.call_players[key] = [datetime.now().timestamp()]
- # print('self.player_complete_dict[player_id]D:', player_id, player_file, self.player_complete_dict[player_id])
- self.logger.info(f"Sending bot speaker, 111, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
- self.audio_player = MyAudioMediaPlayer(player_id, self.audio_media, on_complete=self.on_media_player_complete)
- if len(player_file) == 1:
- self.logger.info(f"Sending bot speaker, 222, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
- self.audio_player.createPlayer(player_file[0], pj.PJMEDIA_FILE_NO_LOOP)
- else:
- self.logger.info(f"Sending bot speaker, 333, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
- self.audio_player.createPlaylist(player_file, f'my_hello_playlist{player_id}', pj.PJMEDIA_FILE_NO_LOOP)
- self.logger.info(f"Sending bot speaker, 444, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
- self.audio_player.startTransmit(self.audio_media)
- self.agent.call_players[key].append(datetime.now().timestamp())
- def on_receiver_asr_result(self, message, *args):
- # 判断是否播放完成 否则不记录用户说的内容
- if not self.is_play_complete():
- return
- message = json.loads(message)
- if message["header"]["status"] == 20000000:
- if message["header"]["name"] == "SentenceEnd":
- result = message["payload"]["result"]
- # self.logger.info("asr返回内容Result:%s", result)
- self.user_asr_text_queue.put(result)
- elif message["header"]["name"] == "TranscriptionResultChanged":
- self.reset_wait_time()
- else:
- self.logger.info(f"Status is not {message['header']['status']}")
- registry.ASR_ERRORS.labels(message['header']['status']).inc()
- def on_media_player_complete(self, player_id):
- self.logger.info('player complete')
- self.player_complete_dict[player_id] = True
- self.digit = ''
- self.inputLongStart = time.time()
- #播放完毕执行的动作
- self.say_end_action(self.action)
- def bot_say_hello(self):
- self.chat(user_asr_text="start")
- def chat(self, user_asr_text=None):
- # 调用文本机器人接口
- ToTextBotAgent(user_asr_text,self)
- def say_end_action(self, action):
- self.logger.info('handling_release %s', action.action_code)
- action_code = action.action_code
- if action_code == 'hang': # 挂断
- self.agent.hangup(self.user_part)
- self.end_statistics()
- elif action_code == 'transfer': # 转人工
- self.agent.transfer(user_part=self.user_part, call_id=self.session_id, device_id=self.device_id)
- self.end_statistics()
- #更新通话记录
- self.agent.dataHandleServer.update_record(self.session_id, service_category=2)
- def end_statistics(self):
- if not self.statistics_lock:
- self.statistics_lock = True
- self.logger.info(f"self.inter_action_total:{self.inter_action_total}")
- latency = (time.time() - self.start_time)
- registry.BOT_CALL_DURATION.labels(self.taskId).observe(latency)
- registry.BOT_INTERACTION_ROUNDS.labels(self.taskId).observe(self.inter_action_total)
- def release(self):
- self.logger.info('liuwei::debugger::release:: come in ')
- if self.audio_player:
- try:
- self.audio_player.stopTransmit(self.audio_media)
- print("Success to stopTransmit:")
- except pj.Error as e:
- print("Failed to stopTransmit:", e)
- self.audio_player = None # 或调用播放器停止方法
- if self.audio_port:
- try:
- self.audio_media.stopTransmit(self.audio_port)
- print("Success to stopTransmit:")
- except pj.Error as e:
- print("Failed to stopTransmit:", e)
- self.audio_port = None # 或调用相关销毁方法
- if self.audio_media:
- # self.audio_media.stopTransmit()
- self.audio_media = None
- self.asr.close()
- # 远程挂机之后要将分机号回收
- self.agent.hangup(self.user_part)
- # self.agent.release(self.user_part)
- self.end_statistics()
- class ToTextBotAgent:
- def __init__(self, user_asr_text, call_agent):
- if not user_asr_text or (call_agent.action and call_agent.action.action_code != 'normal'):
- # print("ASR文本为空,终止执行。")
- return
- self.call_agent = call_agent
- self.request_data = BotChatRequest(
- nodeId=self.call_agent.node_id,
- userId=self.call_agent.call_phone,
- sessionId= self.call_agent.session_id,
- recordId="",
- taskId=self.call_agent.taskId,
- asrText=user_asr_text,
- ext= None
- )
- self.call_agent.logger.info("user_asr_text发送结果: %s", user_asr_text)
- if user_asr_text != 'ASR408error':
- self.call_agent.inter_action_total += 1
- # 发送请求并处理响应
- self.to_request(self.request_data)
- # self.to_quest(self.request_data)
- def to_request(self, request: BotChatRequest, try_count = 3):
- start_time = time.time()
- request_data = request.to_json_string()
- response_data = None
- try:
- message = None
- url = f"http://{SERVE_HOST}:40072/botservice"
- headers = {"Content-Type": "application/json"}
- payload = json.dumps(request_data)
- while try_count > 0:
- once_start = time.time()
- try:
- response = requests.post(url, data=payload, headers=headers, timeout=3)
- if response and response.ok:
- response_data = response.json()
- if "data" in response_data and response_data["code"] == 0:
- data = response_data["data"]
- message = ChatMessage.from_json(data)
- self.call_agent.message_queue.put(message)
- break
- else:
- self.call_agent.logger.info("响应中没有 'data' 字段")
- else:
- self.call_agent.logger.info(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
- except Exception as e:
- traceback.print_exc()
- self.call_agent.logger.error(f"TaskId={request.taskId}, 请求发生异常 {e}, URL: {url}")
- finally:
- try_count = try_count - 1
- latency = (time.time() - once_start)
- registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)
- if not message:
- massage = self.get_default_response()
- self.call_agent.message_queue.put(massage)
- finally:
- latency = (time.time() - start_time)
- registry.BOT_REQUEST_COUNT.inc()
- registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
- self.call_agent.logger.info(f"sessionId={ self.call_agent.session_id}, timeCost={latency}, chat::request:{request_data}, response:{response_data if response_data else None}")
- def to_quest(self, request: BotChatRequest, try_count = 3):
- start_time = time.time()
- request_data = request.to_json_string()
- response = None
- try:
- url = f"http://{SERVE_HOST}:40072/botservice"
- payload = json.dumps(request_data)
- # self.call_agent.logger.info(f"请求数据:{request_data},url:{url}")
- with requests.Session() as session:
- message = None
- # try:
- session.headers.update({'Content-Type': 'application/json'})
- while try_count > 0:
- once_start = time.time()
- try:
- response = session.post(url=url, json=payload, timeout=3)
- # response = requests.post(url=url, json=json.loads(request_data), headers=headers, timeout=10) # 使用占位URL
- self.call_agent.logger.info("to_request come in , try_count=%s", try_count)
- if response.status_code == 200:
- response_data = response.json()
- if "data" in response_data and response_data["code"]==0:
- data = response_data["data"]
- message = ChatMessage.from_json(data)
- self.call_agent.message_queue.put(message)
- break
- else:
- self.call_agent.logger.info("响应中没有 'data' 字段")
- else:
- self.call_agent.logger.info(f"请求失败,状态码: {response.status_code}, 响应内容: {response.text}")
- except Exception as e:
- traceback.print_exc()
- self.call_agent.logger.error(f"TaskId={request.taskId}, 请求发生异常: {e}, URL: {url}")
- finally:
- try_count = try_count - 1
- latency = (time.time() - once_start)
- registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)
- if not message:
- message = self.get_default_response()
- self.call_agent.message_queue.put(message)
- # finally:
- # session.close()
- finally:
- latency = (time.time() - start_time)
- registry.BOT_REQUEST_COUNT.inc()
- registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
- self.call_agent.logger.info(f"sessionId={ self.call_agent.session_id}, timeCost={latency}, chat::request:{request_data}, response:{response.text if response else None}")
- def get_default_response(self):
- response= {
- "node_id": "99.00",
- "contents": [
- {
- "content_type": "voice",
- "content": "正在为您转接人工服务,请稍后",
- "voice_url": "/root/aibot/dm/voice/transfer.wav",
- "voice_content": ""
- }
- ],
- "wait_time": 1,
- "action": {
- "action_code": "transfer",
- "action_content": "转人工"
- },
- "inputType": "0"
- }
- parsed_response = ChatMessage.from_json(response)
- return parsed_response
- @singleton_keys
- class BotAgent:
- def __init__(self, app, user_part_range=range(1001, 1011), host=SIP_SERVER, port="5060", password="slibra@#123456"):
- self.app = app
- self.logger = app.logger
- self.user_part_range, self.host, self.port, self.password = user_part_range, host, port, password
- self.user_part_pool = queue.Queue(maxsize=len(user_part_range))
- self.accounts = {}
- self.calls = {}
- self.call_players={}
- self.ep = pj.Endpoint()
- self.daemon_stopping = False
- self.is_stopping = False
- self.counter = 0
- self.acd_service = None
- self.cache = Cache(app)
- self.dataHandleServer = DataHandleServer(app)
- self.pjsua_thread = None
- self._start()
- # threading.Thread(target=self.main_thread_daemon).start()
- self.daemon_job_scheduler = BackgroundScheduler()
- self.daemon_job_scheduler.add_job(self._main_thread_daemon, 'interval', seconds=1, max_instances=1, name='bot_agent_daemon')
- self.daemon_job_scheduler.start()
- def _create_pjsua2(self, timeout_sec=86400):
- start_time = time.time()
- try:
- self.cache.set_register_per_hours(expire=timeout_sec - (60*3))
- # Create and initialize the library
- ep_cfg = pj.EpConfig()
- ep_cfg.uaConfig.threadCnt = 4
- ep_cfg.uaConfig.mainThreadOnly = False
- ep_cfg.uaConfig.maxCalls = 12
- ep_cfg.uaConfig.maxAccounts = 12
- ep_cfg.medConfig.noVad = True
- ep_cfg.logConfig.level = 3
- ep_cfg.logConfig.consoleLevel = 3
- self.ep.libCreate()
- self.ep.libInit(ep_cfg)
- aud_dev_mgr = self.ep.audDevManager()
- aud_dev_mgr.setNullDev() # 使用虚拟音频设备(如果没有实际设备)
- # Set up media configuration, particularly jitter buffer
- media_cfg = pj.MediaConfig()
- media_cfg.jbMinPre = 4 # Minimum pre-fetch frames
- media_cfg.jbMaxPre = 16 # Maximum pre-fetch frames
- media_cfg.noVad = True # Disable Voice Activity Detection if needed
- self.ep.medConfig = media_cfg # Apply media config to endpoint
- # Create SIP transport. Error handling sample is shown
- sipTpConfig = pj.TransportConfig()
- sipTpConfig.port = 30506
- self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
- # Start the library
- self.ep.libStart()
- for user_part in self.user_part_range:
- acfg = pj.AccountConfig()
- acfg.idUri = f"sip:{user_part}@{self.host}:{self.port}"
- acfg.regConfig.registrarUri = f"sip:{self.host}:{self.port}"
- cred = pj.AuthCredInfo("digest", "*", f"{user_part}", 0, self.password)
- acfg.sipConfig.authCreds.append(cred)
- acfg.regConfig.timeoutSec = timeout_sec # 注册超时时间(秒)
- acfg.regConfig.retryIntervalSec = 10 # 重试间隔时间(秒)
- acfg.regConfig.firstRetryIntervalSec = 10 # 首次重试间隔时间(秒)
- acfg.natConfig.iceEnabled = True
- acfg.natConfig.turnEnabled = True
- acfg.natConfig.turnServer = f"stun:{self.host}:3478"
- # acfg.natConfig.turnUsername = "username"
- # acfg.natConfig.turnPassword = "password"
- # Create the account
- acc = Account(self, user_part=user_part)
- acc.create(acfg)
- self.user_part_pool.put(user_part)
- self.accounts[user_part] = acc
- except:
- traceback.print_exc()
- finally:
- latency = time.time() - start_time
- registry.BOT_CREATE_ACCOUNT_LATENCY.observe(latency)
- self.logger.info("create pjsua latency: %.3fs", latency)
- while not self.is_stopping:
- registry.BOT_AGENT_LIVES.set(self.user_part_pool.qsize())
- self.ep.libHandleEvents(200)
- self.call_players.clear()
- self.accounts.clear()
- self.calls.clear()
- # Destroy the library
- self.ep.libDestroy()
- self.logger.info("create pjsua already shutdown")
- def _main_thread_daemon(self):
- # while not self.daemon_stopping:
- _lock = self._play_complete_degree_check()
- if _lock:
- self.logger.error("daviddebugger::play time greater than 60s, will restart")
- self.restart()
- return
- _lock = self.cache.get_pjsua_thread_lock()
- if _lock:
- self.cache.del_pjsua_thread_lock()
- self.logger.error("daviddebugger::thread is lock, will restart")
- self.restart()
- return
- _lock = self.cache.lock_register_per_hours()
- if not _lock and len(self.accounts) == len(self.user_part_range):
- self.logger.error("daviddebugger::register expire, will restart")
- self.restart()
- return
- # time.sleep(0.1)
- def _play_complete_degree_check(self):
- for k, v in list(self.call_players.items()):
- if len(v) == 2:
- self.call_players.pop(k)
- continue
- if len(v) == 1:
- sec = datetime.now().timestamp() - v[0]
- self.logger.info("daviddebugger::play_complete_degree_check, sec=%s", sec)
- if sec > 10:
- return True
- return False
- def transfer(self, user_part, call_id, device_id, service_id='00000000000000000'):
- if self.acd_service:
- self.acd_service.transfer_to_agent(call_id, device_id, service_id, hold=True)
- # sip_headers = {'P-LIBRA-HangUpReason': 'transferToAgent', 'P-LIBRA-ServiceId': service_id}
- try_count = 30
- while try_count >0:
- if self.cache.get_after_play_hold_music(call_id):
- self.hangup(user_part)
- break
- try_count = try_count - 1
- time.sleep(0.1)
- def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers):
- call_op_param = pj.CallOpParam(True)
- call_op_param.statusCode = pj.PJSIP_SC_OK
- call_op_param.reason = reason
- call_op_param.txOption = pj.SipTxOption()
- sip_header_vector = pj.SipHeaderVector()
- for k, v in sip_headers.items():
- _sip_header = pj.SipHeader()
- _sip_header.hName = str(k)
- _sip_header.hValue = str(v)
- self.logger.info('hangup, header_name=%s, header_value=%s'%(k, v))
- sip_header_vector.push_back(_sip_header)
- call_op_param.txOption.headers = sip_header_vector
- try:
- acc = self.accounts.get(user_part)
- if acc:
- for k, v in list(acc.calls.items()):
- self.logger.info('hangup, call_idx=%s, call_active=%s'%(k, v.isActive()))
- if v.isActive():
- v.hangup(call_op_param)
- acc.calls.clear()
- except:
- traceback.print_exc()
- finally:
- # 机器人主动挂机回收分机号
- self.release(user_part)
- def register(self, **kwargs):
- self.logger.info('register,come in, pool.size :%d', self.user_part_pool.qsize())
- user_part = self.user_part_pool.get()
- acc = self.accounts.get(user_part)
- self.logger.info('register, user_part :%d, pool.size :%d', user_part, self.user_part_pool.qsize())
- if acc:
- self.logger.info('register==========> %s', acc.getId())
- # ps = pj.PresenceStatus()
- # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE
- # ps.activity = pj.PJRPID_ACTIVITY_AWAY
- # ps.note = "Away"
- # acc.setOnlineStatus(ps)
- # acc.setRegistration(renew=True)
- acc.kwargs = kwargs
- return user_part
- def unregister(self, user_part):
- acc = self.accounts.get(user_part)
- if acc:
- acc.setRegistration(renew=False)
- # 用户远程挂机回收分机号
- self.release(user_part)
- def release(self, user_part):
- if not user_part:
- self.logger.info("release, user_part is None")
- return
- def element_in_queue(q, element):
- with q.mutex: # 确保线程安全
- for item in list(q.queue): # 将队列转换为列表进行遍历
- if item == element:
- return True
- return False
- if element_in_queue(self.user_part_pool, user_part):
- self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
- return
- self.user_part_pool.put(user_part)
- self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
- def _start(self):
- self.is_stopping = False
- self.counter += 1
- self.pjsua_thread = threading.Thread(name=f"PJSUA-THREAD-{self.counter}", target=self._create_pjsua2, daemon=True)
- self.pjsua_thread.start()
- self.logger.info("bot agent starting ...")
- def restart(self):
- self.destroy()
- self.logger.info('restart, 22222')
- self._start()
- # threading.Thread(target=self.create_pjsua2, daemon=True).start()
- def destroy(self):
- self.is_stopping = True
- self.logger.info("destroy, come in 11111")
- try:
- while not self.user_part_pool.empty():
- self.user_part_pool.get_nowait()
- except:
- pass
- self.logger.info("destroy, come in 22222")
- # self.call_players.clear()
- # self.accounts.clear()
- # self.calls.clear()
- # # Destroy the library
- # self.ep.libDestroy()
- time.sleep(1)
- self.logger.info("destroy, come in 33333")
- if not self.pjsua_thread.is_alive():
- self.logger.info("destroy, pre thread already stopped")
- return
- ident = self.pjsua_thread.ident
- thread_id = ctypes.pythonapi.PyThreadState_SetAsyncExc
- res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
- ctypes.c_long(ident), ctypes.py_object(SystemExit)
- )
- self.logger.info("destroy, ident=%s, thread_id=%s, res=%s", ident, thread_id, res)
- # if res == 0:
- # raise ValueError("Invalid thread ID")
- # elif res > 1:
- # # 如果多次调用,需要复位
- # ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)
- # raise SystemError("PyThreadState_SetAsyncExc failed")
- def __del__(self):
- self.destroy()
- self.daemon_stopping = True
- # if __name__ == '__main__':
- # import logging
- # logger = logging.getLogger('voip')
- # bot = BotAgent(logger)
|