123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # import sys
- # import json
- from typing import (Any,
- List,
- Text,
- Dict
- )
- from scene import Kernel, Msg, Scene
- from config import (get_logger,
- FILE_PATH,
- START,
- END,
- HANGUP,
- TRANSFER,
- EMPTY,
- UNK,
- CONTINUE,
- MAX_TIMES_ANSWER,
- GENERATED,
- FIXED,
- # MOUDLES,
- FAQ,
- PURSUE)
- # from importlib import reload
- from botloader import Speech
- from util import (get_robot_speeches
- ,speech_main_contents
- ,get_next_code_with_track
- ,business_service
- ,intent_service
- ,nlg_service
- )
- import re
- #
- # # from io import StringIO
- # from func import get_next_code_with_track
- logger = get_logger("taskbased")
- # 预加载话术
- speeches = Speech.get_speech(FILE_PATH)
- class RobotService(Kernel):
- def __init__(self, **kwargs):
- self.code = kwargs.get("code", "start")
- self.times = kwargs.get("times", 1)
- self.answer = kwargs.get("answer", [])
- self.robot_speech_status = kwargs.get("robot_speech_status", None)
- self.robot_speech_interrupt = kwargs.get("robot_speech_interrupt", None)
- self.robot_speech = kwargs.get("robot_speech", '') # 机器人话术
- self.status = kwargs.get("status", None) # 电话场景通话状态
- def next(self, bid, uid, session_id, asr) -> Msg:
- # 进入下一节点
- questions = speeches[bid]
- if self.code == END:
- # others bot 跳转结果页
- return Msg(END)
- elif self.code == START:
- code = list(questions)[0]
- topic = questions[code]
- else:
- topic = questions[self.code]
- self.code = topic['id']
- timeout = topic["waitTime"]
- inputType = topic['inputType']
- if not topic['tools']:
- return Msg(code=self.code,
- question=self.robot_speech,
- interruptable=self.robot_speech_interrupt,
- wait_time=timeout,
- action=self.status,
- inputType= inputType
- )
- else:
- tools = topic["tools"]
- option = business_service(session_id, uid, self.code, tools, asr)
- msg = Msg(code=self.code,
- option=option,
- session=session_id,
- asr=asr
- )
- self.parse(msg, bid, uid, session_id)
- return self.next(bid, uid, session_id, msg.asr)
- def parse(self, msg, bid, uid, session_id) -> None:
- # 根据用户反馈解析数据
- questions = speeches[bid]
- self.answer.append(dict(code=msg.code,
- question=self.robot_speech,
- option=msg.option,
- nodeName=questions[msg.code]['nodeName'])
- )
- logger.info("userId: {}, code: {}, question:{}, user_asr:{}, answer:{}" \
- .format(uid,
- msg.code,
- self.robot_speech,
- msg.option[-1]['asr'],
- msg.option[-1]['title'],
- )
- )
- # 当前节点话术状态选择
- robot_speech, robot_speech_status, robot_speech_interrupt = get_robot_speeches(msg, bid, uid, questions)
- if msg.code == "101.00" and self.robot_speech and msg.option[-1]['title'] == "未听清":
- self.robot_speech = self.robot_speech
- else:
- self.robot_speech = robot_speech if robot_speech else self.robot_speech
- self.robot_speech_status = robot_speech_status
- self.robot_speech_interrupt = robot_speech_interrupt
- self.code = msg.code
- self.judge_over(questions)
- if msg.code != END:
- self.code = get_next_code_with_track(uid, msg.code, msg.option[-1]['title'], questions)
- if len(questions[self.code]['options']) <= 1 or not \
- questions[self.code]['options'][0]['title']:
- if questions[self.code]['nodeName'] in ["转人工", "操作故障转人工"] or self.code in ["99.01", "99.00"]:
- self.status = TRANSFER
- else:
- self.status = HANGUP
- robot_speech, self.robot_speech_interrupt = speech_main_contents(uid, bid, self.code,
- questions, msg.option)
- else:
- self.status = CONTINUE
- if self.code != msg.code or msg.option[-1]['title'] == "FAQ":
- self.times = 1
- else:
- self.times +=1
- robot_speech, self.robot_speech_interrupt = speech_main_contents(uid, bid, self.code,
- questions, msg.option)
- if msg.code == "101.00" and self.robot_speech and msg.option[-1]['title'] == "未听清":
- self.robot_speech = self.robot_speech
- elif robot_speech:
- self.robot_speech = robot_speech
- @staticmethod
- def get_next_node(code: Text = None,
- option: Text = None,
- _questions: Dict[Any, Any] = None):
- """
- @param code: node id
- @param option: 响应结果
- @param _questions: 话术对象
- @return:
- """
- options = _questions[code]['options']
- for cell in options:
- if option == cell['title']:
- return cell['next']
- return code
- def judge_over(self, _questions: Dict[Any, Any] = None):
- # 判断结束(根据目前话术版本,以【节点跳转逻辑】为依据)
- options = _questions[self.code]['options']
- if len(options) == 1 and not options[0]['title']:
- self.code = END
- class Dialog(Scene):
- def __init__(self, _case=None):
- if _case:
- self.case=_case
- else:
- self.case = RobotService()
- def dialogue(self, msg, bid, uid, session_id) -> Msg:
- # 驱动对话机器人跳转
- # 会话开始
- logger.info(f"节点:{msg.code}, 话术:{msg.question}, 用户asr:{msg.asr}, 用户意图:{msg.option}")
- if msg.code == "start":
- self.case.parse(msg, bid, uid, session_id)
- return self.case.next(bid, uid, session_id, msg.asr)
- # 触发下一条问答
- self.case.parse(msg, bid, uid, session_id)
- return self.case.next(bid, uid, session_id, msg.asr)
- def get_current_code(self, bid=None):
- # 当前节点code
- if bid not in speeches:
- logger.info("({}) is not exist".format(bid))
- questions = speeches[bid]
- if self.case.code == START:
- return list(questions)[0]
- return self.case.code
- def get_current_content(self, bid=None, uid=None):
- # 当前播报话术
- from util import nlg_service
- questions = speeches[bid]
- cur_code = self.get_current_code(bid)
- topic = questions[cur_code]
- node_name = topic['nodeName']
- main_question = topic['mainContent']
- main_type = topic['mainType']
- main_interrupt = topic['mainInterrupt']
- timeout = topic["waitTime"]
- inputType = topic["inputType"]
- next_act = HANGUP if self.case.code == END else CONTINUE
- if main_type == GENERATED:
- # TODO 生成话术
- self.case.robot_speech = nlg_service(uid, bid, node_name, 0)
- # self.case.robot_speech = main_question
- elif main_type == FIXED:
- self.case.robot_speech = main_question
- cur = self.case.robot_speech
- return Msg(code=cur_code,
- question=self.case.robot_speech,
- interruptable=main_interrupt,
- wait_time=timeout,
- action=next_act,
- inputType=inputType
- )
- def mapping_semantics(self,
- code: Text = None,
- asr: Text = None,
- bid: Text = None,
- uid: Text = None,
- sessionid: Text = None):
- """
- """
- pattern = r'DTMF(.*?)DTMF'
- matches = re.findall(pattern, asr, re.DOTALL)
- if matches:
- asr = re.sub("[()]", "", matches[-1])
- else:
- asr = asr.split("###")[-1]
- asr = asr.strip(r""""$%&'()*+,,-./:;<=>?@[\]^_`{|}~。??!""")
- # 有按键,提取按键内容
- questions = speeches[bid]
- res = []
- options = questions[code]['options']
- node_name = questions[code]['nodeName']
- max_times = questions[code]['maxTimes']
- main_type = questions[code]['mainType']
- if not self.case.robot_speech:
- if main_type == GENERATED:
- self.case.robot_speech = nlg_service(uid, bid, node_name, 0)
- elif main_type == FIXED:
- self.case.robot_speech = questions[code]['mainContent']
- for cell in options:
- if asr in cell['commonReply']:
- tmp = dict(title=cell['title'],
- asr=asr,
- isFaq=False,
- faqContent='')
- res.append(tmp)
- if not res and code not in ["2.10", "3.10", "4.10", "2.40", "3.40", "4.40", "5.10", "5.40"]:
- intent_res = intent_service(node_name, asr, bid, code, uid, sessionid)
- logger.info(f"intent_res:{intent_res}")
- if intent_res:
- for cell in options:
- for intent in intent_res:
- if cell['title'] == intent['intent'] or (cell['title'] == 'FAQ' and intent['isFaq']):
- res.append(dict(title=cell['title'],
- asr=asr,
- isFaq=True if intent['isFaq'] else False,
- faqContent=intent['ans'] if intent['isFaq'] else '',
- firstclass = intent.get('class') if intent.get('class') else '',
- subclass = intent.get('subclass') if intent.get('subclass')
- else '',
- )
- )
- logger.info("语义模型映射结果:{}".format(res))
- return res
- if not res:
- res.append(dict(title=UNK, asr=asr, isFaq=False, faqContent=''))
- else:
- return res
- if (self.case.times >= int(max_times) + 1) and int(max_times) != 0:
- res.append(dict(title=MAX_TIMES_ANSWER, asr=asr, isFaq=False, faqContent=''))
- return res
|