123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- @Time : 2024/10/15 16:19
- @File : utils.py
- @Desc :
- """
- from datetime import datetime
- from functools import wraps
- from typing import (Any,
- List,
- Text,
- Dict
- )
- from config import (
- get_logger,
- GENERATED,
- FIXED,
- MOUDLES
- )
- from threading import Thread
- import json
- logger = get_logger()
- from database import Mysql
- def get_speech_status(bid: Text = None, options: List[Dict[Text, Text]] = None):
- """which speech template to choose"""
- res = dict()
- module = MOUDLES[bid]
- option = ''
- if options:
- option = options[-1]['title'] # 外呼机器人单选
- for key, value in module.items():
- if isinstance(value, dict):
- if option in value['content']:
- res['action'] = key
- res['speech_id'] = value['speech_id']
- res['speech_type'] = value['speech_type']
- res['speech_interrupt'] = value['speech_interrupt']
- res['asr'] = options[-1]['asr']
- break
- return res
- def get_robot_speeches(msg, bid, uid, questions: Dict[Any, Any] = None):
- """GET speech and speech status"""
- from entity import Status
- choose_speech_status = get_speech_status(bid, msg.option)
- if not choose_speech_status:
- choose_speech = Status.base.value
- robot_speech, interrupt = speech_main_contents(uid, bid, msg.code, questions)
- interrupt = questions[msg.code]['mainInterrupt']
- return robot_speech, choose_speech, interrupt
- return None, None, None
- def speech_main_contents(uid: Text = None,
- bid: Text = None,
- code: Text = None,
- questions: Dict[Any, Any] = None,
- options: List[Dict[Any, Any]] = None):
- from util import nlg_service
- from entity import Status
- def parse_options():
- if options:
- option = options[-1]
- is_faq = option['isFaq']
- faq_content = option['faqContent']
- if is_faq:
- return faq_content
- if "businessContent" in option:
- return option['businessContent']
- return
- _robot_speech = ''
- speech_type = questions[code]['mainType']
- node_name = questions[code]['nodeName']
- interrupt = questions[code]['mainInterrupt']
- choose_speech = Status.base.value
- #logger.info(f"code:{code},speech_type:{speech_type}")
- topic = questions[code]
- # logger.info(f"topic: {topic}")
- _faq_content = parse_options()
- if speech_type == GENERATED:
- # TODO 生成话术
- resp = nlg_service(uid, bid, node_name, choose_speech)
- if _faq_content or resp:
- _robot_speech = "{}&{}".format(_faq_content, resp) if _faq_content else resp
- elif speech_type == FIXED:
- content = questions[code]['mainContent']
- if content or _faq_content:
- _robot_speech = "{}&{}".format(_faq_content, content) if _faq_content else content
- return _robot_speech, interrupt
- def get_next_code_with_track(uid: Text = None,
- code: Text = None,
- option: Text = None,
- questions: Dict[Any, Any] = None
- ):
- """
- use user trackCode get qus id
- @param uid:
- @param code:
- @param option:
- @param questions:
- @return:
- """
- options = questions[code]['options']
- for cell in options:
- if option == cell['title']:
- next = cell['next']
- logger.info(f"uid:{uid}:code from {code} to {next}")
- return next
- return code
- def _async(f):
- def wrapper(*args, **kwargs):
- thr = Thread(target=f, args=args, kwargs=kwargs)
- thr.start()
- return wrapper
- @_async
- def insert_log(bid, uid, session_id, scene):
- """
- CREATE TABLE botrecords (
- id INT AUTO_INCREMENT PRIMARY KEY,
- session VARCHAR(50) unique not null COMMENT '请求id',
- req_time DATETIME COMMENT '来电时间',
- uid VARCHAR(20) COMMENT '来电手机号',
- bid VARCHAR(20) COMMENT '话术id',
- intent VARCHAR(20) COMMENT '意图',
- contents TEXT comment '内容',
- dialog TEXT COMMENT '对话'
- );
- """
- tmp = json.dumps(scene.case, ensure_ascii=False, default=lambda obj: obj.__dict__)
- res = json.loads(tmp)
- if res:
- answers = res.get("answer")
- intent = ''
- codes = [i for i in map(lambda x: x.get("code"), answers)]
- if "1.20" in codes or "1.10" in codes or "1.00" in codes:
- for answer in answers:
- if answer.get("code") in ["1.20", "1.10", "1.00"]:
- asr = answer.get("option", [{}])[-1].get("asr", '')
- code = answer.get("code")
- opt = answer.get("option", {})
- if (code == "1.10" and asr!="1") or (code == "1.00" and asr not in ["2", "1"]):
- if opt:
- if opt[-1].get("subclass"):
- intent = opt[-1].get("subclass")
- elif opt[-1].get("firstclass"):
- intent = opt[-1].get("firstclass")
- else:
- intent =opt[-1].get("title", '')
- elif code == "1.20":
- tags = {"1": "1_停水咨询", "2":"1_漏水保修", "3":"1_户号查询","4":"1_水费查询", "5":"1_水价咨询","6":"1_水质水价保修", "7":"1_投诉建议", "0":"1_转人工"}
- intent = tags.get(asr, "1_其他")
- elif code == "1.00" and asr=="2":
- intent = "不体验AI服务"
- elif code == "1.00" and asr=="1":
- intent = asr
- filter_ans = filter(lambda x: x.get("code") not in ["2.00", "3.00", "4.00"], answers)
- content = [i for i in map(lambda x:[ x.get("question"), x.get("option", [{}])[-1].get("asr", '')], filter_ans)]
- content.append((res.get("robot_speech"), ''))
- contents = json.dumps({"data": content}, ensure_ascii=False)
- req_time = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
- mysql = Mysql()
- mysql.insert_records([session_id, req_time, uid, bid, intent, contents, json.dumps(res, ensure_ascii=False)])
- # records = mysql.get_records(uid)
- # logger.info(f"databases:uid:{uid}, {records}")
- mysql.close_mysql()
- def timetic(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- start = datetime.now()
- results = func(*args, **kwargs)
- cost = (datetime.now() - start).total_seconds()
- if func.__qualname__ == "botservice":
- bot = kwargs.get("reqbot")
- sessionId, userId,nodeId = bot.sessionId, bot.userId,bot.nodeId
- logger.info("{},session:{},uid:{},nodeid:{} ==> {}s".format(func.__qualname__,sessionId , userId, nodeId, cost))
- else:
- logger.info("{} ==> {}s".format(func.__qualname__, cost))
- return results
- return wrapper
|