#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2024/10/15 16:19 @File : utils.py @Desc : """ import sys sys.path.append("..") from datetime import datetime from functools import wraps from typing import (Any, List, Text, Dict ) from config import ( get_logger, GENERATED, FIXED, MOUDLES ) import pandas as pd from threading import Thread import json logger = get_logger() from database import Mysql from pypinyin import pinyin, Style import jieba import re import itertools from concurrent.futures import ThreadPoolExecutor,as_completed executor = ThreadPoolExecutor(max_workers=20) def get_speech_status(bid: Text = None, options: List[Dict[Text, Text]] = None): """which speech template to choose""" res = dict() module = MOUDLES[bid] option = '' if options: option = options[-1]['title'] # 外呼机器人单选 for key, value in module.items(): if isinstance(value, dict): if option in value['content']: res['action'] = key res['speech_id'] = value['speech_id'] res['speech_type'] = value['speech_type'] res['speech_interrupt'] = value['speech_interrupt'] res['asr'] = options[-1]['asr'] break return res def get_robot_speeches(msg, bid, uid, questions: Dict[Any, Any] = None): """GET speech and speech status""" from entity import Status choose_speech_status = get_speech_status(bid, msg.option) if not choose_speech_status: choose_speech = Status.base.value robot_speech, interrupt = speech_main_contents(uid, bid, msg.code, questions) interrupt = questions[msg.code]['mainInterrupt'] return robot_speech, choose_speech, interrupt return None, None, None def speech_main_contents(uid: Text = None, bid: Text = None, code: Text = None, questions: Dict[Any, Any] = None, options: List[Dict[Any, Any]] = None): from util import nlg_service from entity import Status def parse_options(): if options: option = options[-1] is_faq = option['isFaq'] faq_content = option['faqContent'] if is_faq: return faq_content if "businessContent" in option: return option['businessContent'] return _robot_speech = '' speech_type = questions[code]['mainType'] node_name = questions[code]['nodeName'] interrupt = questions[code]['mainInterrupt'] choose_speech = Status.base.value #logger.info(f"code:{code},speech_type:{speech_type}") topic = questions[code] # logger.info(f"topic: {topic}") _faq_content = parse_options() if speech_type == GENERATED: # TODO 生成话术 resp = nlg_service(uid, bid, node_name, choose_speech) if _faq_content or resp: _robot_speech = "{}&{}".format(_faq_content, resp) if _faq_content else resp elif speech_type == FIXED: content = questions[code]['mainContent'] if content or _faq_content: _robot_speech = "{}&{}".format(_faq_content, content) if _faq_content else content return _robot_speech, interrupt def get_next_code_with_track(uid: Text = None, code: Text = None, option: Text = None, questions: Dict[Any, Any] = None ): """ use user trackCode get qus id @param uid: @param code: @param option: @param questions: @return: """ options = questions[code]['options'] for cell in options: if option == cell['title']: next = cell['next'] logger.info(f"uid:{uid}:code from {code} to {next}") return next return code def _async(f): def wrapper(*args, **kwargs): thr = Thread(target=f, args=args, kwargs=kwargs) thr.start() return wrapper @_async def insert_log(bid, uid, session_id, scene): """ CREATE TABLE botrecords ( id INT AUTO_INCREMENT PRIMARY KEY, session VARCHAR(50) unique not null COMMENT '请求id', req_time DATETIME COMMENT '来电时间', uid VARCHAR(20) COMMENT '来电手机号', bid VARCHAR(20) COMMENT '话术id', intent VARCHAR(20) COMMENT '意图', contents TEXT comment '内容', dialog TEXT COMMENT '对话' ); """ tmp = json.dumps(scene.case, ensure_ascii=False, default=lambda obj: obj.__dict__) res = json.loads(tmp) if res: answers = res.get("answer") intent = '' codes = [i for i in map(lambda x: x.get("code"), answers)] if "1.20" in codes or "1.10" in codes or "1.00" in codes: for answer in answers: if answer.get("code") in ["1.20", "1.10", "1.00"]: asr = answer.get("option", [{}])[-1].get("asr", '') code = answer.get("code") opt = answer.get("option", {}) if (code == "1.10" and asr!="1") or (code == "1.00" and asr not in ["2", "1"]): if opt: if opt[-1].get("subclass"): intent = opt[-1].get("subclass") elif opt[-1].get("firstclass"): intent = opt[-1].get("firstclass") else: intent =opt[-1].get("title", '') elif code == "1.20": tags = {"1": "1_停水咨询", "2":"1_漏水保修", "3":"1_户号查询","4":"1_水费查询", "5":"1_水价咨询","6":"1_水质水价保修", "7":"1_投诉建议", "0":"1_转人工"} intent = tags.get(asr, "1_其他") elif code == "1.00" and asr=="2": intent = "不体验AI服务" elif code == "1.00" and asr=="1": intent = asr filter_ans = filter(lambda x: x.get("code") not in ["2.00", "3.00", "4.00"], answers) content = [i for i in map(lambda x:[ x.get("question"), x.get("option", [{}])[-1].get("asr", '')], filter_ans)] content.append((res.get("robot_speech"), '')) contents = json.dumps({"data": content}, ensure_ascii=False) req_time = datetime.now().strftime("%Y/%m/%d %H:%M:%S") mysql = Mysql() mysql.insert_records([session_id, req_time, uid, bid, intent, contents, json.dumps(res, ensure_ascii=False)]) # records = mysql.get_records(uid) # logger.info(f"databases:uid:{uid}, {records}") mysql.close_mysql() def timetic(func): @wraps(func) def wrapper(*args, **kwargs): start = datetime.now() results = func(*args, **kwargs) cost = (datetime.now() - start).total_seconds() if func.__qualname__ == "botservice": bot = kwargs.get("reqbot") sessionId, userId,nodeId = bot.sessionId, bot.userId,bot.nodeId logger.info("{},session:{},uid:{},nodeid:{} ==> {}s".format(func.__qualname__,sessionId , userId, nodeId, cost)) else: logger.info("{} ==> {}s".format(func.__qualname__, cost)) return results return wrapper def loaddict(): loc = dict() df = pd.read_excel("../data/location.xlsx", header=0) loc['zh'] = dict(df[['norm_name', 'name']].values) loc['pinyin'] = dict(df[['name_pinyin', 'name']].values) short_val = [(i, 80) for i in df['short_name'].dropna().tolist()] norm_val = [(i, 100) for i in df['norm_name'].dropna().tolist()] norm_val.extend(short_val) loc['total'] = dict(norm_val) return loc user_dict= loaddict() jieba.load_userdict(user_dict['total']) def norm_community(asr): def match_loc(comb): cur_wd = ''.join(comb) if cur_wd in asr: if cur_wd in user_dict['zh']: return user_dict['zh'].get(cur_wd) term = "|".join([term[0] for term in pinyin(cur_wd, style=Style.NORMAL)]) if term in user_dict['pinyin']: return user_dict['pinyin'][term] return None if asr in user_dict['zh']: return user_dict['zh'][asr] text = re.sub(r'[(())]', '', asr) text = "|".join([word[0] for word in pinyin(text, style=Style.NORMAL)]) if text in user_dict['pinyin']: return user_dict['pinyin'][text] words = jieba.lcut(asr) for word in words: if word in user_dict['zh']: return user_dict['zh'].get(word) term = "|".join([term[0] for term in pinyin(word, style=Style.NORMAL)]) if term in user_dict['pinyin']: return user_dict['pinyin'][term] for r in range(1, len(words) + 1): combinations_list = list(itertools.combinations(words, r)) features = [executor.submit(match_loc, combo) for combo in combinations_list] result =[feature.result() for feature in as_completed(features)] res = [i for i in filter(lambda x: x is not None, result)] if len(res) >0: return res[0] return asr if __name__ == "__main__": print(norm_community("嗯,那个我们家是碧水兰庭的"))