#!/usr/bin/env python3
# encoding:utf-8
import ctypes
import os
import time
import json
import wave
import queue
import threading
import traceback
import sys
from random import randint

import pjsua2 as pj
from enum import Enum
from datetime import datetime
from multiprocessing import Process

from apscheduler.schedulers.background import BackgroundScheduler

from src.core.callcenter import registry
from src.core.callcenter.cache import Cache
from src.core.callcenter.enumeration import HangupDir
from src.core.datasource import SIP_SERVER, SERVE_HOST
from src.core.voip.constant import *

import requests
from src.core.callcenter.api import BotChatRequest,ChatMessage

from src.core import singleton_keys
from src.core.callcenter.snowflake import Snowflake

from src.core.callcenter.data_handler import *

calls = {}
# recording_file = '/code/src/core/voip/incoming_call.wav'
# player_file1 = '/code/src/core/voip/test111.wav'
# player_file2 = '/code/src/core/voip/test222.wav'
from src.core.voip.asr import TestSt, XfAsr

class BotStatus(Enum):
    # 等待用户讲话,未超时
    wait_speaking_not_timeout = 1
    # 等待用户讲话,已超时但用户仍在持续讲话
    wait_speaking_timeout_user_speaking = 2
    # 机器人讲话过程中
    speaking = 3
    # 等待用户讲话结束
    wait_user_speaking_end = 4
    # 等待大模型输出
    wait_gpt_return = 5


class UserStatus(Enum):
    # ASR返回值为sentenceBegin后将状态置为Speaking
    speaking = 1
    # ASR返回值为sentenceEnd后,将状态置为Silence
    silence = 2


class MyAudioMediaPort(pj.AudioMediaPort):
    def __init__(self, call, audio_media=None, asr=None):
        pj.AudioMediaPort.__init__(self)
        # # 打开一个 .pcm 文件来保存音频流(可选:保存为 .wav)
        # self.wav = wave.open(f"{recording_file}", "wb")
        # self.wav.setnchannels(1)
        # self.wav.setsampwidth(2)  # 假设每个样本是 16 位(2 字节)
        # self.wav.setframerate(16000)
        print("MyAudioMediaPort::come in, debugger")
        self.call = call
        self.audio_media = audio_media
        self.asr = asr
        self.first = True

        self.user_asr_texts = []



    def onFrameRequested(self, frame):
        print("Request audio frame:", frame)

    def onFrameReceived(self, frame):
        # self.wav.writeframes(bytes(frame.buf))
        if self.asr:  # 如果ASR实例存在,则发送音频数据
            if self.first:
                self.first = False
                self.call.logger.warn("Received audio frame: %s, %s, %s", self.call.session_id,frame.buf,frame.size)
            self.asr.send_audio(frame.buf)

        try:
            asr_text = self.get_asr_text()
            play_complete = self.call.is_play_complete()
            current_time = time.time() # 实时当前时间
            if self.call.inputType == '1.0':
                time_difference = int(current_time - self.call.inputLongStart)
                # print('current_time - self.call.inputLongStart:',time_difference > 35, self.call.txtLock , play_complete)
                if time_difference > 35 and play_complete:
                    self.user_asr_texts.append(f"DTMF({self.call.digit})DTMF")
                    user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
                    self.user_asr_texts.clear()
                    self.call.chat(user_asr_text)
                    # print("测试超长", user_asr_text)
                elif asr_text:
                    self.user_asr_texts.append(asr_text)
                    user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
                    self.user_asr_texts.clear()
                    self.call.chat(user_asr_text)
                if time_difference > int(self.call.wait_time):
                    self.call.reset_wait_time()
            else:
                if asr_text and not play_complete:
                    self.user_asr_texts.append(asr_text)
                if (asr_text and play_complete) or (play_complete and self.user_asr_texts):
                    if asr_text:
                        self.user_asr_texts.append(asr_text)
                    user_asr_text = self.user_asr_texts[0] if len(self.user_asr_texts) == 1 else '###'.join(self.user_asr_texts)
                    self.user_asr_texts.clear()
                    self.call.chat(user_asr_text)

            if self.call.wait_time and self.call.wait_time != "0" and play_complete and not asr_text:
                self.call.wait_time_check(current_time, self.call.wait_time)

            message_queue_size = self.call.message_queue.qsize()
            if (message_queue_size > 0 and not self.call.cur_player_file) or (message_queue_size > 0 and play_complete):
                # self.call.logger.info('onFrameReceived:message_queue_size=', message_queue_size, 'play_complete=', play_complete, asr_text)
                self.call.cur_player_file, self.call.wait_time, self.call.inputType,self.call.action, self.call.node_id = self.get_player_file()
                # 重置播放完成标志和超时计时器,确保新的播放从头开始计时
                self.call.reset_wait_time()
                self.call.send_bot_speaker(self.call.cur_player_file)
        except:
            pass

    def get_asr_text(self):
        try:
            asr_text = self.call.user_asr_text_queue.get(block=False)
            self.call.logger.info('get_asr_text: %s', asr_text)
            return asr_text
        except:
            pass

    def get_player_file(self):
        try:
            message = self.call.message_queue.get(block=False)
            player_file = [item.voice_url for item in message.contents if item.content_type == 'voice']
            return player_file, message.wait_time, message.inputType, message.action, message.node_id
        except Exception as e:
            traceback.print_exc()



class MyAudioMediaPlayer(pj.AudioMediaPlayer):

    def __init__(self, player_id, sink, on_complete=None):
        pj.AudioMediaPlayer.__init__(self)
        self.player_id = player_id
        self.sink = sink
        self.on_complete = on_complete

    def onEof2(self):
        # self.stopTransmit(self.sink)
        if self.on_complete:
            self.on_complete(self.player_id)


# Subclass to extend the Account and get notifications etc.
class Account(pj.Account):
    def __init__(self, agent, user_part, **kwargs):
        pj.Account.__init__(self)
        self.agent = agent
        self.user_part = user_part
        self.calls = {}
        self.kwargs = kwargs

    def onRegState(self, prm):
        ai = self.getInfo()
        print("***OnRegState: " + prm.reason, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)

    def onIncomingCall(self, prm):
        # self.agent.logger.info("daviddebugger::onIncomingCall::%s ", prm.callId)
        ai = self.getInfo()
        print("***onIncomingCall: ", prm.callId, ai.id,ai.isDefault,ai.uri,ai.regIsConfigured,ai.regIsActive,ai.regExpiresSec,ai.regStatus,ai.regStatusText,ai.regLastErr,ai.onlineStatus,ai.onlineStatusText)

        call = MyCall(self.agent, self, self.user_part, prm.callId, **self.kwargs)
        call_op_param = pj.CallOpParam(True)
        call_op_param.statusCode = pj.PJSIP_SC_OK
        call.answer(call_op_param)
        self.calls[prm.callId] = call


class MyCall(pj.Call):

    def __init__(self, agent, acc, user_part, call_id, **kwargs):
        pj.Call.__init__(self, acc, call_id)
        self.agent = agent
        self.logger = agent.logger
        self.user_part = user_part
        self.call_id = call_id
        self.kwargs = kwargs
        self.audio_media = None
        self.audio_port = None
        self.audio_player = None
        self.asr = None
        self.session_id = kwargs.get('variable_sip_h_P-LIBRA-CallId')
        self.device_id = kwargs.get('variable_sip_h_P-LIBRA-DeviceId')
        self.call_phone = kwargs.get("Caller-Caller-ID-Number")
        self.logger.info("daviddebugger::self.session_id:%s, self.call_phone:%s", self.session_id,self.call_phone)
        self.taskId = "10001"
        self.user_asr_text_queue = queue.Queue(maxsize=100)
        self.message_queue = queue.Queue(maxsize=3)
        self.player_complete_dict = {}

        self.wait_time = None
        self.inputType = None #记录按键类型 1为长按键类型
        self.digit = '' # 存储长按键内容
        self.action = None
        self.node_id= 'start'

        self.cur_player_file = None   #当前播放的文件

        # self.asr = TestSt(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
        self.asr = XfAsr(self.session_id, logger=self.logger, message_receiver=self.on_receiver_asr_result)  # 创建ASR实例
        self.asr.start()  # 启动ASR线程

        self.start_time = time.time()  # 当前机器人对话开始时间

        # 超时设置
        self.play_start_time = time.time()  # 倒计时开始时间
        self.play_complete_flag = False  # 倒计时开始标志

        self.txtLock = False
        self.inputLongStart = time.time()    #长按键开始时间

        self.inter_action_total = 0
        self.statistics_lock = False


    def wait_time_check(self, current_time, wait_time):
        try:
            # 确保 wait_time 是整数类型
            wait_time = int(wait_time)
            # 如果播放尚未完成,重置标志并返回
            if not self.play_complete_flag:
                self.play_complete_flag = True
                self.play_start_time = current_time
                # print(f"开始计时: {self.play_start_time}")
            elapsed_time = int(current_time - self.play_start_time)
            # print(f"当前时间: {current_time}, 已过时间: {elapsed_time}, 最大等待时间: {wait_time}")
            if elapsed_time > wait_time:
                # self.user_asr_text_queue.put("ASR408error")
                self.chat('ASR408error')
                registry.BOT_ASR_408.labels(self.taskId).inc()
                self.reset_wait_time()
        except ValueError as e:
            self.logger.info(f"无效的等待时间参数: {wait_time}, 错误: {e}")
            self.reset_wait_time()

    def reset_wait_time(self):
        self.play_complete_flag = False  # 重置播放完成标志
        self.play_start_time = None  # 重置开始计时时间

    def is_play_complete(self):    #语音机器人是否播放结束
        if self.cur_player_file:
            player_id = murmur3_32(self.cur_player_file)
            return self.player_complete_dict.get(player_id)

    def onDtmfDigit(self, prm):
        # 判断是否播放完成 否则不记录用户说的内容
        if not self.is_play_complete():
            return
        digit = prm.digit
        self.reset_wait_time()
        # 假设为超长类型按键 把用户输入的按键进行拼接 如果为# 则把用户输入所有按键放入队列并发送文本机器人
        # 如果为非正常按键服务 输入以后直接发送文本机器人
        if self.inputType == '1.0':
            if digit != '#':
                self.digit += digit
            elif digit == '#':
                # self.user_asr_text_queue.put(f"DTMF({self.digit})DTMF")
                self.chat(f"DTMF({self.digit})DTMF")
        else:
            self.user_asr_text_queue.put(f"DTMF({digit})DTMF")

    def onCallState(self, prm):
        call_info = self.getInfo()

        # pj.PJSIP_INV_STATE_NULL
        # pj.PJSIP_INV_STATE_CALLING
        # pj.PJSIP_INV_STATE_INCOMING
        # pj.PJSIP_INV_STATE_EARLY
        # pj.PJSIP_INV_STATE_CONNECTING
        # pj.PJSIP_INV_STATE_CONFIRMED
        # pj.PJSIP_INV_STATE_DISCONNECTED
        self.logger.info("daviddebugger::onCallState::[%s,%s] %s", pj.PJSIP_INV_STATE_CONFIRMED, pj.PJSIP_INV_STATE_DISCONNECTED, call_info.state)
        if call_info.state == pj.PJSIP_INV_STATE_CONFIRMED:
            # 当呼叫状态为已确认(即接通)
            self.bot_say_hello()

        if call_info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
            self.logger.info("通话结束:%s", self.user_part)
            self.release()

    def onCallMediaState(self, prm):
        call_info = self.getInfo()
        # print("Call Media state: ", call_info.stateText)
        for media in call_info.media:
            if media.type == pj.PJMEDIA_TYPE_AUDIO and \
               (media.status == pj.PJSUA_CALL_MEDIA_ACTIVE):
                self.logger.info("Call Media state 111: %s", call_info.stateText)
                self.audio_media = self.getAudioMedia(media.index)
                try:
                    # 建立双向通道
                    self.receive_user_speaker()
                    # self.bot_say_hello()
                except Exception as e:
                    traceback.print_exc()

    def receive_user_speaker(self):
        self.audio_port = MyAudioMediaPort(self, self.audio_media, self.asr)
        self.audio_port.createPort("Incoming Call Port", build_audio_format())
        self.audio_media.startTransmit(self.audio_port)

    def send_bot_speaker(self, player_file):
        if not player_file :
            return
        if not self.isActive():
            return
        player_id = murmur3_32(player_file)
        self.player_complete_dict[player_id] = False
        for f in player_file:
            if not os.path.isfile(f):
                self.logger.info(f"Sending bot speaker, not exists, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
                return
        key = murmur3_32(str(datetime.now().timestamp()))
        self.agent.call_players[key] = [datetime.now().timestamp()]
        # print('self.player_complete_dict[player_id]D:', player_id, player_file, self.player_complete_dict[player_id])
        self.logger.info(f"Sending bot speaker, 111, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
        self.audio_player = MyAudioMediaPlayer(player_id, self.audio_media, on_complete=self.on_media_player_complete)
        if len(player_file) == 1:
            self.logger.info(f"Sending bot speaker, 222, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
            self.audio_player.createPlayer(player_file[0], pj.PJMEDIA_FILE_NO_LOOP)
        else:
            self.logger.info(f"Sending bot speaker, 333, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
            self.audio_player.createPlaylist(player_file, f'my_hello_playlist{player_id}', pj.PJMEDIA_FILE_NO_LOOP)
        self.logger.info(f"Sending bot speaker, 444, player_file: {player_file}, player_id: {player_id}, isActive: {self.isActive()}")
        self.audio_player.startTransmit(self.audio_media)
        self.agent.call_players[key].append(datetime.now().timestamp())

    def on_receiver_asr_result(self, message, *args):
        # 判断是否播放完成 否则不记录用户说的内容
        if not self.is_play_complete():
            return
        # self.logger.info("on_receiver_asr_result:message: %s", message)
        if message["name"] == "SentenceEnd":
            self.user_asr_text_queue.put(message["result"])
        elif message["name"] == "TranscriptionResultChanged":
            self.reset_wait_time()
        elif message["name"] == "TranscriptionResultError":
            pass

        # message = json.loads(message)
        # if message["header"]["status"] == 20000000:
        #     if message["header"]["name"] == "SentenceEnd":
        #         result = message["payload"]["result"]
        #         # self.logger.info("asr返回内容Result:%s", result)
        #         self.user_asr_text_queue.put(result)
        #     elif message["header"]["name"] == "TranscriptionResultChanged":
        #         self.reset_wait_time()
        # else:
        #     self.logger.info(f"Status is not {message['header']['status']}")
        #     registry.ASR_ERRORS.labels(message['header']['status']).inc()

    def on_media_player_complete(self, player_id):
        self.logger.info('player complete')
        self.player_complete_dict[player_id] = True
        self.digit = ''
        self.inputLongStart = time.time()
        #播放完毕执行的动作
        self.say_end_action(self.action)

    def bot_say_hello(self):
        self.chat(user_asr_text="start")

    def chat(self, user_asr_text=None):
        # 调用文本机器人接口
        ToTextBotAgent(user_asr_text,self)

    def say_end_action(self, action):
        self.logger.info('handling_release %s', action.action_code)
        action_code = action.action_code
        if action_code == 'hang':  # 挂断
            self.agent.hangup(self.user_part)
            self.end_statistics()
            # 更新通话记录
            self.agent.dataHandleServer.update_record(self.session_id, hangup_dir=HangupDir.ROBOT_HANGUP.code)
        elif action_code == 'transfer':  # 转人工
            self.agent.transfer(user_part=self.user_part, call_id=self.session_id, device_id=self.device_id)
            self.end_statistics()
            #更新通话记录
            self.agent.dataHandleServer.update_record(self.session_id, service_category=2)

    def end_statistics(self):
        if not self.statistics_lock:
            self.statistics_lock = True
            self.logger.info(f"self.inter_action_total:{self.inter_action_total}")
            latency = (time.time() - self.start_time)
            registry.BOT_CALL_DURATION.labels(self.taskId).observe(latency)
            registry.BOT_INTERACTION_ROUNDS.labels(self.taskId).observe(self.inter_action_total)

    def release(self):
        self.logger.info('liuwei::debugger::release:: come in ')
        if self.audio_player:
            try:
                self.audio_player.stopTransmit(self.audio_media)
                print("Success to stopTransmit:")
            except pj.Error as e:
                print("Failed to stopTransmit:", e)
            del self.audio_player
            self.audio_player = None  # 或调用播放器停止方法
        if self.audio_port:
            try:
                self.audio_media.stopTransmit(self.audio_port)
                print("Success to stopTransmit:")
            except pj.Error as e:
                print("Failed to stopTransmit:", e)
            del self.audio_port
            self.audio_port = None  # 或调用相关销毁方法
        if self.audio_media:
            # self.audio_media.stopTransmit()
            del self.audio_media
            self.audio_media = None

        self.asr.close()
        # 远程挂机之后要将分机号回收
        self.agent.hangup(self.user_part)
        # self.agent.release(self.user_part)
        self.end_statistics()

class ToTextBotAgent:
    def __init__(self, user_asr_text, call_agent):
        if not user_asr_text or (call_agent.action and call_agent.action.action_code != 'normal'):
            # print("ASR文本为空,终止执行。")
            return
        self.call_agent = call_agent
        self.request_data = BotChatRequest(
            nodeId=self.call_agent.node_id,
            userId=self.call_agent.call_phone,
            sessionId= self.call_agent.session_id,
            recordId="",
            taskId=self.call_agent.taskId,
            asrText=user_asr_text,
            ext= None
        )
        self.call_agent.logger.info("user_asr_text发送结果: %s", user_asr_text)
        if user_asr_text != 'ASR408error':
            self.call_agent.inter_action_total += 1
        # 发送请求并处理响应
        self.to_request(self.request_data)
        # self.to_quest(self.request_data)

    def to_request(self, request: BotChatRequest, try_count = 3):
        start_time = time.time()
        request_data = request.to_json_string()
        response_data = None
        try:
            message = None
            url = f"http://{SERVE_HOST}:40072/botservice"
            headers = {"Content-Type": "application/json"}
            while try_count > 0:
                once_start = time.time()
                try:
                    response = requests.post(url, data=request_data, headers=headers, timeout=3)
                    if response and response.ok:
                        response_data = response.json()
                        if "data" in response_data and response_data["code"] == 0:
                            data = response_data["data"]
                            message = ChatMessage.from_json(data)
                            self.call_agent.message_queue.put(message)
                            break
                        else:
                            self.call_agent.logger.info(f"to_request::failed, sessionId={request.sessionId}, response_data:{response_data}")
                    else:
                        self.call_agent.logger.info(f"to_request::请求失败,sessionId:{request.sessionId}, 状态码: {response.status_code if response else None}, 响应内容: {response.text if response else None}")
                except Exception as e:
                    traceback.print_exc()
                    self.call_agent.logger.error(f"to_request::exception, TaskId={request.taskId}, sessionId={request.sessionId}, 请求发生异常: {e}")
                finally:
                    try_count = try_count - 1
                    latency = (time.time() - once_start)
                    registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)

            if not message:
                massage = self.get_default_response()
                self.call_agent.message_queue.put(massage)
        finally:
            latency = (time.time() - start_time)
            registry.BOT_REQUEST_COUNT.inc()
            registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
            self.call_agent.logger.info(f"to_request sessionId={self.call_agent.session_id}, timeCost={latency}, request:{request_data}, response:{response_data if response_data else None}")


    # def to_quest(self, request: BotChatRequest, try_count = 3):
    #     start_time = time.time()
    #     request_data = request.to_json_string()
    #     response = None
    #     try:
    #         url = f"http://{SERVE_HOST}:40072/botservice"
    #         # payload = request.to_json_string()
    #         # self.call_agent.logger.info(f"请求数据:{request_data},url:{url}")
    #         with requests.Session() as session:
    #             message = None
    #             # try:
    #             session.headers.update({'Content-Type': 'application/json'})
    #             while try_count > 0:
    #                 once_start = time.time()
    #                 try:
    #                     response = session.post(url=url, json=request_data, timeout=3)
    #                     # response = requests.post(url=url,  json=json.loads(request_data), headers=headers, timeout=10)  # 使用占位URL
    #                     # self.call_agent.logger.info("to_request come in , try_count=%s", try_count)
    #                     if response.status_code == 200:
    #                         response_data = response.json()
    #                         if "data" in response_data and response_data["code"]==0:
    #                             data = response_data["data"]
    #                             message = ChatMessage.from_json(data)
    #                             self.call_agent.message_queue.put(message)
    #                             break
    #                         else:
    #                             self.call_agent.logger.info(f"to_request::sessionId:{request.sessionId}, 响应中没有 'data' 字段")
    #                     else:
    #                         self.call_agent.logger.info(f"to_request::请求失败,sessionId:{request.sessionId}, 状态码: {response.status_code}, 响应内容: {response.text}")
    #                 except Exception as e:
    #                     traceback.print_exc()
    #                     self.call_agent.logger.error(f"to_request::TaskId={request.taskId}, sessionId={request.sessionId}, 请求发生异常: {e}, URL: {url}")
    #                 finally:
    #                     try_count = try_count - 1
    #                     latency = (time.time() - once_start)
    #                     registry.BOT_REQUEST_ONCE_LATENCY.labels(request.taskId).observe(latency)
    #
    #             self.call_agent.logger.info(f"to_request::sessionId:{request.sessionId}, message:{message.to_json_string() if message else None}")
    #             if not message:
    #                 message = self.get_default_response()
    #                 self.call_agent.message_queue.put(message)
    #             # finally:
    #             #     session.close()
    #     finally:
    #         latency = (time.time() - start_time)
    #         registry.BOT_REQUEST_COUNT.inc()
    #         registry.BOT_REQUEST_LATENCY.labels(request.taskId).observe(latency)
    #         self.call_agent.logger.info(f"to_request::sessionId={ self.call_agent.session_id}, timeCost={latency}, request:{request_data}, response:{response.text if response else None}")

    def get_default_response(self):
        response=   {
                        "node_id": "99.00",
                        "contents": [
                            {
                            "content_type": "voice",
                            "content": "正在为您转接人工服务,请稍后",
                            "voice_url": "/root/aibot/dm/voice/transfer.wav",
                            "voice_content": ""
                            }
                        ],
                        "wait_time": 1,
                        "action": {
                            "action_code": "transfer",
                            "action_content": "转人工"
                        },
                        "inputType": "0"
                    }
        parsed_response = ChatMessage.from_json(response)
        return parsed_response


@singleton_keys
class BotAgent:

    def __init__(self, app, user_part_range=range(1001, 1011), host=SIP_SERVER, port="5060", password="slibra@#123456"):
        self.app = app
        self.logger = app.logger
        self.user_part_range, self.host, self.port, self.password = user_part_range, host, port, password
        self.user_part_pool = queue.Queue(maxsize=len(user_part_range))
        self.accounts = {}
        self.calls = {}
        self.call_players={}
        self.ep = pj.Endpoint()
        self.daemon_stopping = False
        self.is_stopping = False
        self.counter = 0
        self.acd_service = None
        self.cache = Cache(app)
        self.dataHandleServer = DataHandleServer(app)
        self.pjsua_thread = None
        self._start()
        # threading.Thread(target=self.main_thread_daemon).start()

        self.daemon_job_scheduler = BackgroundScheduler()
        self.daemon_job_scheduler.add_job(self._main_thread_daemon, 'interval', seconds=1, max_instances=1, name='bot_agent_daemon')
        self.daemon_job_scheduler.start()

    class AsyncJob(pj.PendingJob):
        def __init__(self, agent):
            self.agent = agent
            super().__init__()
            agent.logger.warn("Job created id: %s", id(self))

        def execute(self, is_pending):
            self.agent.logger.warn("Executing job value: %s", is_pending)

            time.sleep(1)
            self.agent.ep.utilAddPendingJob(self)

        def __del__(self):
            self.agent.logger.warn("Job deleted id:%s", id(self))

    def _add_new_job(self):
        self.logger.warn("Creating job 1")
        job = self.AsyncJob(self)
        self.logger.warn("Adding job 1")
        self.ep.utilAddPendingJob(job)

    def _create_pjsua2(self, timeout_sec=86400):
        start_time = time.time()
        try:
            self.cache.set_register_per_hours(expire=timeout_sec - (60*3))
            # Create and initialize the library
            ep_cfg = build_ep_config()
            self.ep.libCreate()
            self.ep.libInit(ep_cfg)

            aud_dev_mgr = self.ep.audDevManager()
            aud_dev_mgr.setNullDev()  # 使用虚拟音频设备(如果没有实际设备)

            # Set up media configuration, particularly jitter buffer
            media_cfg = build_media_config()
            self.ep.medConfig = media_cfg  # Apply media config to endpoint

            # Create SIP transport. Error handling sample is shown
            sipTpConfig = build_sip_transport_config()
            self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, sipTpConfig)
            # Start the library
            self.ep.libStart()
            self._add_new_job()

            for user_part in self.user_part_range:
                acfg = build_account_config(self.host, self.port, user_part, self.password, timeout_sec)
                # Create the account
                acc = Account(self, user_part=user_part)
                acc.create(acfg)

                self.user_part_pool.put(user_part)
                self.accounts[user_part] = acc
        except:
            traceback.print_exc()
        finally:
            latency = time.time() - start_time
            registry.BOT_CREATE_ACCOUNT_LATENCY.observe(latency)
            self.logger.info("create pjsua latency: %.3fs", latency)

        while not self.is_stopping:
            registry.BOT_AGENT_LIVES.set(self.user_part_pool.qsize())
            self.ep.libHandleEvents(200)

        self.call_players.clear()
        self.accounts.clear()
        self.calls.clear()
        # Destroy the library
        self.ep.libDestroy()
        self.logger.info("create pjsua already shutdown")


    def _main_thread_daemon(self):
        # while not self.daemon_stopping:
        _lock = self._play_complete_degree_check()
        if _lock:
            self.logger.error("daviddebugger::play time greater than 60s, will restart")
            self.restart()
            return

        _lock = self.cache.get_pjsua_thread_lock()
        if _lock:
            self.cache.del_pjsua_thread_lock()
            self.logger.error("daviddebugger::thread is lock, will restart")
            self.restart()
            return

        _lock = self.cache.lock_register_per_hours()
        if not _lock and len(self.accounts) == len(self.user_part_range):
            self.logger.error("daviddebugger::register expire, will restart")
            self.restart()
            return
            # time.sleep(0.1)

    def _play_complete_degree_check(self):
        for k, v in list(self.call_players.items()):
            if len(v) == 2:
                self.call_players.pop(k)
                continue
            if len(v) == 1:
                sec = datetime.now().timestamp() - v[0]
                self.logger.info("daviddebugger::play_complete_degree_check, sec=%s", sec)
                if sec > 10:
                    return True
        return False

    def transfer(self, user_part, call_id, device_id, service_id='00000000000000000'):
        if self.acd_service:
            self.acd_service.transfer_to_agent(call_id, device_id, service_id, hold=True)
        # sip_headers = {'P-LIBRA-HangUpReason': 'transferToAgent', 'P-LIBRA-ServiceId': service_id}
        try_count = 30
        while try_count >0:
            if self.cache.get_after_play_hold_music(call_id):
                self.hangup(user_part)
                break
            try_count = try_count - 1
            time.sleep(0.1)

    def hangup(self, user_part, reason="NORMAL_CLEARING", **sip_headers):
        call_op_param = pj.CallOpParam(True)
        call_op_param.statusCode = pj.PJSIP_SC_OK
        call_op_param.reason = reason
        call_op_param.txOption = pj.SipTxOption()
        sip_header_vector = pj.SipHeaderVector()
        for k, v in sip_headers.items():
            _sip_header = pj.SipHeader()
            _sip_header.hName = str(k)
            _sip_header.hValue = str(v)
            self.logger.info('hangup, header_name=%s, header_value=%s'%(k, v))
            sip_header_vector.push_back(_sip_header)
        call_op_param.txOption.headers = sip_header_vector
        try:
            acc = self.accounts.get(user_part)
            if acc:
                for k, v in list(acc.calls.items()):
                    self.logger.info('hangup, call_idx=%s, call_active=%s'%(k, v.isActive()))
                    if v.isActive():
                        v.hangup(call_op_param)
                acc.calls.clear()
        except:
            traceback.print_exc()
        finally:
            # 机器人主动挂机回收分机号
            self.release(user_part)

    def register(self, **kwargs):
        self.logger.info('register,come in, pool.size :%d', self.user_part_pool.qsize())
        user_part = self.user_part_pool.get()
        acc = self.accounts.get(user_part)
        self.logger.info('register, user_part :%d, pool.size :%d', user_part, self.user_part_pool.qsize())
        if acc:
            self.logger.info('register==========> %s', acc.getId())
            # ps = pj.PresenceStatus()
            # ps.status = pj.PJSUA_BUDDY_STATUS_ONLINE
            # ps.activity = pj.PJRPID_ACTIVITY_AWAY
            # ps.note = "Away"
            # acc.setOnlineStatus(ps)
            # acc.setRegistration(renew=True)
            acc.kwargs = kwargs
        return user_part

    def unregister(self, user_part):
        acc = self.accounts.get(user_part)
        if acc:
            acc.setRegistration(renew=False)
        # 用户远程挂机回收分机号
        self.release(user_part)

    def release(self, user_part):
        if not user_part:
            self.logger.info("release, user_part is None")
            return

        def element_in_queue(q, element):
            with q.mutex:  # 确保线程安全
                for item in list(q.queue):  # 将队列转换为列表进行遍历
                    if item == element:
                        return True
            return False

        if element_in_queue(self.user_part_pool, user_part):
            self.logger.info("release, already exists, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())
            return
        self.user_part_pool.put(user_part)
        self.logger.info("release, user_part :%d, pool.size :%d", user_part, self.user_part_pool.qsize())

    def _start(self):
        self.is_stopping = False
        self.counter += 1
        self.pjsua_thread = threading.Thread(name=f"PJSUA-THREAD-{self.counter}", target=self._create_pjsua2, daemon=True)
        self.pjsua_thread.start()
        self.logger.info("bot agent starting ...")

    def restart(self):
        self.destroy()
        self.logger.info('restart, 22222')
        self._start()
        # threading.Thread(target=self.create_pjsua2, daemon=True).start()

    def destroy(self):
        self.is_stopping = True
        self.logger.info("destroy, come in 11111")
        try:
            while not self.user_part_pool.empty():
                self.user_part_pool.get_nowait()
        except:
            pass
        self.logger.info("destroy, come in 22222")
        # self.call_players.clear()
        # self.accounts.clear()
        # self.calls.clear()
        # # Destroy the library
        # self.ep.libDestroy()
        time.sleep(1)
        self.logger.info("destroy, come in 33333")

        if not self.pjsua_thread.is_alive():
            self.logger.info("destroy, pre thread already stopped")
            return
        ident = self.pjsua_thread.ident
        thread_id = ctypes.pythonapi.PyThreadState_SetAsyncExc
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
            ctypes.c_long(ident), ctypes.py_object(SystemExit)
        )
        self.logger.info("destroy, ident=%s, thread_id=%s, res=%s", ident, thread_id, res)
        # if res == 0:
        #     raise ValueError("Invalid thread ID")
        # elif res > 1:
        #     # 如果多次调用,需要复位
        #     ctypes.pythonapi.PyThreadState_SetAsyncExc(ident, 0)
        #     raise SystemError("PyThreadState_SetAsyncExc failed")


    def __del__(self):
        self.destroy()
        self.daemon_stopping = True


# if __name__ == '__main__':
#     import logging
#     logger = logging.getLogger('voip')
#     bot = BotAgent(logger)