#!/usr/bin/env python # -*- coding: utf-8 -*- # import sys # import json from typing import (Any, List, Text, Dict ) from scene import Kernel, Msg, Scene from config import (get_logger, FILE_PATH, START, END, HANGUP, TRANSFER, EMPTY, UNK, CONTINUE, MAX_TIMES_ANSWER, GENERATED, FIXED, # MOUDLES, FAQ, PURSUE) # from importlib import reload from botloader import Speech from util import (get_robot_speeches ,speech_main_contents ,get_next_code_with_track ,business_service ,intent_service ,nlg_service ) import re # # # from io import StringIO # from func import get_next_code_with_track logger = get_logger("taskbased") # 预加载话术 speeches = Speech.get_speech(FILE_PATH) class RobotService(Kernel): def __init__(self, **kwargs): self.code = kwargs.get("code", "start") self.times = kwargs.get("times", 1) self.answer = kwargs.get("answer", []) self.robot_speech_status = kwargs.get("robot_speech_status", None) self.robot_speech_interrupt = kwargs.get("robot_speech_interrupt", None) self.robot_speech = kwargs.get("robot_speech", '') # 机器人话术 self.status = kwargs.get("status", None) # 电话场景通话状态 def next(self, bid, uid, session_id, asr) -> Msg: # 进入下一节点 questions = speeches[bid] if self.code == END: # others bot 跳转结果页 return Msg(END) elif self.code == START: code = list(questions)[0] topic = questions[code] else: topic = questions[self.code] self.code = topic['id'] timeout = topic["waitTime"] inputType = topic['inputType'] if not topic['tools']: return Msg(code=self.code, question=self.robot_speech, interruptable=self.robot_speech_interrupt, wait_time=timeout, action=self.status, inputType= inputType ) else: tools = topic["tools"] option = business_service(session_id, uid, self.code, tools, asr) msg = Msg(code=self.code, option=option, session=session_id, asr=asr ) self.parse(msg, bid, uid, session_id) return self.next(bid, uid, session_id, msg.asr) def parse(self, msg, bid, uid, session_id) -> None: # 根据用户反馈解析数据 questions = speeches[bid] self.answer.append(dict(code=msg.code, question=self.robot_speech, option=msg.option, nodeName=questions[msg.code]['nodeName']) ) logger.info("userId: {}, code: {}, question:{}, user_asr:{}, answer:{}" \ .format(uid, msg.code, self.robot_speech, msg.option[-1]['asr'], msg.option[-1]['title'], ) ) # 当前节点话术状态选择 robot_speech, robot_speech_status, robot_speech_interrupt = get_robot_speeches(msg, bid, uid, questions) if msg.code == "101.00" and self.robot_speech and msg.option[-1]['title'] == "未听清": self.robot_speech = self.robot_speech else: self.robot_speech = robot_speech if robot_speech else self.robot_speech self.robot_speech_status = robot_speech_status self.robot_speech_interrupt = robot_speech_interrupt self.code = msg.code self.judge_over(questions) if msg.code != END: self.code = get_next_code_with_track(uid, msg.code, msg.option[-1]['title'], questions) if len(questions[self.code]['options']) <= 1 or not \ questions[self.code]['options'][0]['title']: if questions[self.code]['nodeName'] in ["转人工", "操作故障转人工"] or self.code in ["99.01", "99.00"]: self.status = TRANSFER else: self.status = HANGUP robot_speech, self.robot_speech_interrupt = speech_main_contents(uid, bid, self.code, questions, msg.option) else: self.status = CONTINUE if self.code != msg.code or msg.option[-1]['title'] == "FAQ": self.times = 1 else: self.times +=1 robot_speech, self.robot_speech_interrupt = speech_main_contents(uid, bid, self.code, questions, msg.option) if msg.code == "101.00" and self.robot_speech and msg.option[-1]['title'] == "未听清": self.robot_speech = self.robot_speech elif robot_speech: self.robot_speech = robot_speech @staticmethod def get_next_node(code: Text = None, option: Text = None, _questions: Dict[Any, Any] = None): """ @param code: node id @param option: 响应结果 @param _questions: 话术对象 @return: """ options = _questions[code]['options'] for cell in options: if option == cell['title']: return cell['next'] return code def judge_over(self, _questions: Dict[Any, Any] = None): # 判断结束(根据目前话术版本,以【节点跳转逻辑】为依据) options = _questions[self.code]['options'] if len(options) == 1 and not options[0]['title']: self.code = END class Dialog(Scene): def __init__(self, _case=None): if _case: self.case=_case else: self.case = RobotService() def dialogue(self, msg, bid, uid, session_id) -> Msg: # 驱动对话机器人跳转 # 会话开始 logger.info(f"节点:{msg.code}, 话术:{msg.question}, 用户asr:{msg.asr}, 用户意图:{msg.option}") if msg.code == "start": self.case.parse(msg, bid, uid, session_id) return self.case.next(bid, uid, session_id, msg.asr) # 触发下一条问答 self.case.parse(msg, bid, uid, session_id) return self.case.next(bid, uid, session_id, msg.asr) def get_current_code(self, bid=None): # 当前节点code if bid not in speeches: logger.info("({}) is not exist".format(bid)) questions = speeches[bid] if self.case.code == START: return list(questions)[0] return self.case.code def get_current_content(self, bid=None, uid=None): # 当前播报话术 from util import nlg_service questions = speeches[bid] cur_code = self.get_current_code(bid) topic = questions[cur_code] node_name = topic['nodeName'] main_question = topic['mainContent'] main_type = topic['mainType'] main_interrupt = topic['mainInterrupt'] timeout = topic["waitTime"] inputType = topic["inputType"] next_act = HANGUP if self.case.code == END else CONTINUE if main_type == GENERATED: # TODO 生成话术 self.case.robot_speech = nlg_service(uid, bid, node_name, 0) # self.case.robot_speech = main_question elif main_type == FIXED: self.case.robot_speech = main_question cur = self.case.robot_speech return Msg(code=cur_code, question=self.case.robot_speech, interruptable=main_interrupt, wait_time=timeout, action=next_act, inputType=inputType ) def mapping_semantics(self, code: Text = None, asr: Text = None, bid: Text = None, uid: Text = None, sessionid: Text = None): """ """ pattern = r'DTMF(.*?)DTMF' matches = re.findall(pattern, asr, re.DOTALL) if matches: asr = re.sub("[()]", "", matches[-1]) else: asr = asr.split("###")[-1] asr = asr.strip(r""""$%&'()*+,,-./:;<=>?@[\]^_`{|}~。??!""") # 有按键,提取按键内容 questions = speeches[bid] res = [] options = questions[code]['options'] node_name = questions[code]['nodeName'] max_times = questions[code]['maxTimes'] main_type = questions[code]['mainType'] if not self.case.robot_speech: if main_type == GENERATED: self.case.robot_speech = nlg_service(uid, bid, node_name, 0) elif main_type == FIXED: self.case.robot_speech = questions[code]['mainContent'] for cell in options: if asr in cell['commonReply']: tmp = dict(title=cell['title'], asr=asr, isFaq=False, faqContent='') res.append(tmp) if not res and code not in ["2.10", "3.10", "4.10", "2.40", "3.40", "4.40", "5.10", "5.40"]: intent_res = intent_service(node_name, asr, bid, code, uid, sessionid) logger.info(f"intent_res:{intent_res}") if intent_res: for cell in options: for intent in intent_res: if cell['title'] == intent['intent'] or (cell['title'] == 'FAQ' and intent['isFaq']): res.append(dict(title=cell['title'], asr=asr, isFaq=True if intent['isFaq'] else False, faqContent=intent['ans'] if intent['isFaq'] else '', firstclass = intent.get('class') if intent.get('class') else '', subclass = intent.get('subclass') if intent.get('subclass') else '', ) ) logger.info("语义模型映射结果:{}".format(res)) return res if not res: res.append(dict(title=UNK, asr=asr, isFaq=False, faqContent='')) else: return res if (self.case.times >= int(max_times) + 1) and int(max_times) != 0: res.append(dict(title=MAX_TIMES_ANSWER, asr=asr, isFaq=False, faqContent='')) return res