import json from urllib.parse import urlparse from typing import Dict, Any, Optional import queue import threading import traceback import mmh3 def murmur3_32(player_file): if isinstance(player_file, list): player_file = ','.join(player_file) return abs(mmh3.hash(player_file)) #机器人外呼 class BotChatRequest: def __init__(self, node_id=None, user_id=None, session_id=None, record_id=None, task_id=None, asr_text=None, key_input=None, ext:Dict={}): self.node_id = node_id # 节点id self.user_id = user_id # 用户id self.session_id = session_id # 会话id self.record_id = record_id # 唯一标识 self.task_id = task_id # 机器人任务id self.event_type = None # 1:电话接通,2:用户语音asr结果上传,3:用户按键输入,4:用户挂断电话,5:读取外呼结果(可在外呼过程中调用,不一定在结束之后),6:用户不说话超时,7:TTS或录音文件播放完毕或被打断,8:ASR错误导致挂断电话,9:TTS错误导致挂断电话,10:其它系统错误导致挂断电话 self.asr_text = asr_text # asr识别的文本 self.ext = ext def to_json_string(self): return json.dumps(self.__dict__, ensure_ascii=False) @classmethod def from_json(cls, json_string): data = json.loads(json_string) return cls(**data) # def __repr__(self): # return (f"OutboundRobotRequestParams(node_id={self.node_id}, user_id={self.user_id}, " # f"session_id={self.session_id}, record_id={self.record_id}, task_id={self.task_id}, " # f"event_type={self.event_type}, asr_text={self.asr_text}, key_input={self.key_input})") class ChatAction: def __init__(self, action_code=None, action_content=None): self.action_code = action_code # normal:正常通话;hang:挂断;transfer:转人工 self.action_content = action_content # 动作内容 def to_json_string(self): return json.dumps(self.__dict__, ensure_ascii=False) @classmethod def from_json(cls, json_data): return cls( action_code=json_data.get("action_code"), action_content=json_data.get("action_content") ) class ChatContent: def __init__(self, content_type=None, content=None, voice_url=None, voice_content=None): self.content_type = content_type # 播放类型 self.content = content # 播放内容 self.voice_url = voice_url # 语音地址 self.voice_content = voice_content # 语音文本 def to_json_string(self): return json.dumps(self.__dict__, ensure_ascii=False) @classmethod def from_json(cls, json_data): return cls( content_type=json_data.get("content_type"), content=json_data.get("content"), voice_url=json_data.get("voice_url"), voice_content=json_data.get("voice_content") ) class ChatMessage: def __init__(self, node_id=None, contents=None, interruptable=None, wait_time=None, action=None, talk_time_out=None): self.node_id = node_id # 节点id self.contents = contents if contents is not None else [] # 内容列表 self.interruptable = interruptable # 是否可打断 self.wait_time = wait_time # 用户静默时长 self.action = action # 动作代码 self.talk_time_out = talk_time_out # 用户通话时长 def to_json_string(self): return json.dumps({ "node_id": self.node_id, "contents": [content.__dict__ for content in self.contents], "interruptable": self.interruptable, "wait_time": self.wait_time, "action": self.action.__dict__ if self.action else None, "talk_time_out": self.talk_time_out }, ensure_ascii=False) @classmethod def from_json(cls, json_data): contents = [ChatContent.from_json(item) for item in json_data.get("contents", [])] action = ChatAction.from_json(json_data.get("action", {})) if json_data.get("action") else None return cls( node_id=json_data.get("node_id"), contents=contents, interruptable=json_data.get("interruptable"), wait_time=json_data.get("wait_time"), action=action, talk_time_out=json_data.get("talk_time_out") ) class ChatResponse: def __init__(self, data=None, message=None, code=None): self.data = data if data is not None else ChatMessage() self.message = message self.code = code def to_json_string(self): return json.dumps({ "data": json.loads(self.data.to_json_string()), "message": self.message, "code": self.code }, ensure_ascii=False) @classmethod def from_json(cls, json_string): data = json.loads(json_string) response_data = ChatMessage.from_json(data.get("data", {})) return cls( data=response_data, message=data.get("message"), code=data.get("code") ) class ToTextBotAgent: def __init__(self, event_type, user_asr_text): if not user_asr_text: print("ASR文本为空,终止执行。") return self.request_data = BotChatRequest( node_id="1", user_id="139311", session_id="1", record_id="2", task_id="ceshi", asr_text=user_asr_text ) self.cur_player_file = None self.message_queue = queue.Queue(maxsize=3) # 发送请求并处理响应 self.test_request(self.request_data,event_type) self.wait_time = None def to_hang(self, action_content): print('todo ') def transfer_people(self): print('todo 转人工') def handling_release(self, message: ChatMessage): print('handling_release', message) if message: action = message.action action_code = action.action_code print(f"ActionCode: {action_code}", message) if action_code == 'hang': # 挂断 action_content = action.action_content self.to_hang(action_content) elif action_code == 'transfer': # 转人工 self.transfer_people() elif action_code == 'normal': # 正常通话 self.message_queue.put(message) print(f"成功获取响应: ActionCode: {message}") else: print("文本机器人接口调用失败") # 模拟接口请求返回 def test_request(self, params: BotChatRequest,event_type): print("test_request::params=", params) response_data = { "node_id": "1.0", "contents": [], "interruptable": False, "wait_time": "6", "action": { "action_code": "normal", "action_content": "正常通话" }, "talk_time_out": "" } print("event_type:", event_type) if event_type == '1': response_data['contents'].append({ "content_type": "voice", "content": "", "voice_url": '/code/src/core/voip/test111.wav', "voice_content": "五一北京到上海的高铁票还有吗?" }) elif event_type == '2': response_data['contents'].append({ "content_type": "voice", "content": "", "voice_url": '/code/src/core/voip/test222.wav', "voice_content": "测试第二个录音文件" }) # response = { # "data": response_data, # "message": "success", # "code": 200 # } try: # 转换为JSON字符串并解析 # response_json = json.dumps(response) parsed_response = ChatMessage.from_json(response_data) print(f"Response in test_request: {parsed_response}") # 确认response self.handling_release(parsed_response) except Exception as e: print(f"Error in test_request: {e}") traceback.print_exc() # 打印完整的错误信息 return None def get_player_file(self): try: # player_file = self.call.player_queue.get(block=False) message = self.message_queue.get(block=False) player_file = [item.voice_url for item in message.contents if item.content_type == 'voice'] wait_time = message.wait_time return player_file, wait_time except: pass def send_bot_speaker(self, player_file): if not player_file : return print(type(player_file)) player_id = murmur3_32(player_file) print('send_bot_speaker:', player_id) def ceshi(self): message_queue_size = self.message_queue.qsize() if (message_queue_size > 0 and not self.cur_player_file) : self.cur_player_file, self.wait_time = self.get_player_file() print('cur_player_file:', self.cur_player_file , 'wait_time=', self.wait_time) self.send_bot_speaker(self.cur_player_file) if __name__ == "__main__": # 运行主函数 # asyncio.run(main()) # bot = ToTextBotAgent("1",'sddsd') # bot.ceshi() wait_time =None if wait_time and wait_time != "0": print('wait_time:', wait_time)