123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507 |
- #!/usr/bin/env python3
- # encoding:utf-8
- import os
- import json
- import base64
- import threading
- import traceback
- from datetime import datetime
- from src.core.callcenter import registry
- import nls # 引入阿里云语音识别库
- from aliyunsdkcore.client import AcsClient
- from aliyunsdkcore.request import CommonRequest
- import time
- # 定义实时转写类
- class TestSt:
- # 静态变量用于缓存Token
- token_cache = {
- "token": None,
- "expire_time": None
- }
- # 获取Token的函数
- @classmethod
- def get_token(cls):
- ak_id = "LTAI5tQ2HmiHCygZkt5BYrYR"
- ak_secret = "KhmxTd14SUcXafpFk5yofA43FoeM99"
- client = AcsClient(ak_id, ak_secret, "cn-shanghai")
- request = CommonRequest()
- request.set_method('POST')
- request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
- request.set_version('2019-02-28')
- request.set_action_name('CreateToken')
- try:
- response = client.do_action_with_exception(request)
- jss = json.loads(response)
- if 'Token' in jss and 'Id' in jss['Token']:
- token = jss['Token']['Id']
- expire_time = jss['Token']['ExpireTime']
- print(f"Token: {token}, ExpireTime: {expire_time}")
- return token, int(expire_time) # 返回Token和过期时间
- else:
- print("Token获取失败,响应内容: ", response)
- except Exception as e:
- print(f"获取Token时发生错误: {e}")
- return None, None
- @classmethod
- def get_cached_token(cls):
- # 检查是否已有缓存的Token且未过期s):
- # # 检查是否已有缓存的Token且未
- current_time = int(time.time())
- # if cls.token_cache["token"] and cls.token_cache["expire_time"]:
- if cls.token_cache["token"] and cls.token_cache["expire_time"] - current_time > 60:
- # if current_time < cls.token_cache["expire_time"]:
- # print("使用缓存的Token")
- return cls.token_cache["token"]
- # 如果没有缓存Token或者Token已过期,重新获取
- new_token, expire_time = cls.get_token()
- if new_token:
- cls.token_cache["token"] = new_token
- cls.token_cache["expire_time"] = expire_time
- print("获取新的Token")
- return new_token
- else:
- print("无法获取Token")
- return None
- def __init__(self, tid, logger, message_receiver=None):
- # self.is_closed = False
- # self.lock = threading.Lock()
- self.logger = logger
- self.__event = threading.Event()
- self.__th = threading.Thread(target=self.__test_run)
- self.__id = tid
- self.message_receiver = message_receiver
- self._Token = self.get_cached_token()
- self.sr = None
- # WebSocket服务地址
- self.URL = "wss://nls-gateway-cn-beijing.aliyuncs.com/ws/v1"
- self.APPKEY = "OKt6jogp6fRjHQVp" # 你的Appkey
- self.logger.debug("开始")
- def start(self):
- self.__th.start()
- def send_audio(self, audio_data):
- if self.sr:
- self.sr.send_audio(audio_data)
- def close(self):
- try:
- self.sr.stop()
- except Exception as e:
- self.logger.debug(f"[{self.__id}]Error stopping ASR: {e}")
- def __test_run(self):
- self.logger.debug("Thread:%s start..",self.__id)
- nls.enableTrace(True)
- count = 0
- self.__event.clear()
- while not self.__event.is_set():
- self.sr = nls.NlsSpeechTranscriber(
- url=self.URL,
- token=self._Token,
- appkey=self.APPKEY,
- on_sentence_begin=self.test_on_sentence_begin,
- on_sentence_end=self.test_on_sentence_end,
- on_start=self.test_on_start,
- on_result_changed=self.test_on_result_chg,
- on_completed=self.test_on_completed,
- on_error=self.test_on_error,
- on_close=self.test_on_close,
- callback_args=[self.__id]
- )
- try:
- self.sr.start(
- aformat="pcm",
- sample_rate=8000,
- enable_intermediate_result=True,
- enable_punctuation_prediction=True,
- enable_inverse_text_normalization=True,
- ex={'max_sentence_silence': 2000, 'disfluency': True, 'enable_words': True}
- )
- # _res = self.sr.ctrl(ex={'max_sentence_silence': 6000, 'disfluency': True,'enable_words': True })
- self.logger.debug(f"[{self.__id}]ASR session started. {count}")
- self.__event.wait(timeout=.5)
- self.logger.debug(f"[{self.__id}]ASR session started. {count}")
- except Exception as e:
- traceback.print_exc()
- self.logger.debug(f"[{self.__id}]ASR session start exception. {e}")
- count = count + 1
- def test_on_sentence_begin(self, message, *args):
- self.logger.debug("[%s]test_on_sentence_begin:%s", self.__id, message)
- if self.message_receiver:
- self.message_receiver(self.convert_message(message), *args)
- def test_on_sentence_end(self, message, *args):
- self.logger.debug("[%s]test_on_sentence_end:%s", self.__id, message)
- if self.message_receiver:
- self.message_receiver(self.convert_message(message), *args)
- def test_on_start(self, message, *args):
- self.__event.set()
- self.logger.debug("[%s]test_on_start:%s", self.__id, message)
- pass
- def test_on_error(self, message, *args):
- self.logger.debug("on_error args=>%s", args)
- if not self.__event.is_set():
- self.__event.set()
- if self.message_receiver:
- self.message_receiver(self.convert_message(message), *args)
- def test_on_close(self, *args):
- self.logger.debug("on_close: args=>%s", args)
- if not self.__event.is_set():
- self.__event.set()
- pass
- def test_on_result_chg(self, message, *args):
- # self.logger.debug("test_on_chg:{}".format(message))
- if self.message_receiver:
- self.message_receiver(self.convert_message(message), *args)
- def test_on_completed(self, message, *args):
- # self.logger.debug("on_completed:args=>{} message=>{}".format(args, message))
- pass
- def convert_message(self, message):
- final_result = {}
- message = json.loads(message)
- if message["header"]["status"] == 20000000:
- if message["header"]["name"] == "SentenceBegin":
- final_result['name'] = 'SentenceBegin'
- if message["header"]["name"] == "SentenceEnd":
- result = message["payload"]["result"]
- # self.logger.info("asr返回内容Result:%s", result)
- final_result['name'] = 'SentenceEnd'
- final_result['result'] = result
- elif message["header"]["name"] == "TranscriptionResultChanged":
- final_result['name'] = 'TranscriptionResultChanged'
- else:
- final_result['name'] = 'TranscriptionResultError'
- final_result['status'] = message['header']['status']
- final_result['result'] = ''
- self.logger.info(f"Status is not {message['header']['status']}")
- registry.ASR_ERRORS.labels(message['header']['status']).inc()
- self.logger.error("aliyun.Asr.recv: call_id:%s, final_result: %s", self.__id, final_result)
- return final_result
- # 讯飞ASR实时转写
- class XfAsr:
- def __init__(self, tid, logger, message_receiver=None):
- self.end_tag = "{\"end\": true}"
- self.tid = tid
- self.logger = logger
- self.message_receiver = message_receiver
- self.ws = self.new_connection()
- self.logger.info(f"xunfei.Asr: call_id: {tid}, ws.connected:{self.ws.connected}")
- self.trecv = threading.Thread(target=self.recv)
- def new_connection(self, base_url = "ws://rtasr.xfyun.cn/v1/ws", app_id = "1ec1097b", api_key = "60b7d2d8d172b065b1c3e723e5ba0696"):
- import hashlib
- import hmac
- import base64
- # from socket import *
- # import json, time, threading
- from websocket import create_connection
- # import websocket
- from urllib.parse import quote
- # import logging
- # logging.basicConfig()
- ts = str(int(time.time()))
- tt = (app_id + ts).encode('utf-8')
- md5 = hashlib.md5()
- md5.update(tt)
- baseString = md5.hexdigest()
- baseString = bytes(baseString, encoding='utf-8')
- apiKey = api_key.encode('utf-8')
- signa = hmac.new(apiKey, baseString, hashlib.sha1).digest()
- signa = base64.b64encode(signa)
- signa = str(signa, 'utf-8')
- count = 10
- _ws = None
- while count > 0:
- try:
- _ws = create_connection(base_url + "?appid=" + app_id + "&ts=" + ts + "&signa=" + quote(signa))
- break
- except Exception as e:
- count -= 1
- self.logger.info("new_connection:exception, call_id: %s, count=%s, message=%s", self.tid, count, e)
- time.sleep(.010)
- return _ws
- def start(self):
- self.trecv.start()
- def send_audio(self, chunk):
- # self.logger.debug('xunfei.Asr.send_audio: call_id: %s, chunk:%s, %s', self.tid, len(chunk), chunk)
- if self.ws:
- self.ws.send(chunk)
- def close(self):
- if self.ws:
- self.ws.send(bytes(self.end_tag.encode('utf-8')))
- self.ws.close()
- self.logger.info("connection closed call_id: %s", self.tid)
- def recv(self):
- try:
- self.logger.info(f"xunfei.Asr.recv: call_id: {self.tid}, ws.connected:{self.ws.connected}")
- while self.ws and self.ws.connected:
- message = str(self.ws.recv())
- if len(message) == 0:
- self.logger.info("xunfei.Asr.recv: receive result end call_id: %s", self.tid)
- break
- self.logger.info("xunfei.Asr.recv: call_id: %s, message :%s", self.tid, message)
- if self.message_receiver:
- self.message_receiver(self.convert_message(message))
- except Exception as e:
- traceback.print_exc()
- self.logger.error("xunfei.Asr.recv: receive result end, call_id: %s, message: %s", self.tid, e)
- def convert_message(self, message):
- final_result = {}
- result_dict = json.loads(message)
- if result_dict["code"] == "0":
- if result_dict["action"] == "started":
- final_result['name'] = 'SentenceBegin'
- elif result_dict["action"] == "result":
- result_dict = json.loads(message)
- result_1 = json.loads(result_dict["data"])
- st = result_1["cn"]["st"]
- rt = st["rt"]
- if st['type'] == "1":
- final_result['name'] = 'TranscriptionResultChanged'
- if st['type'] == "0":
- final_result['name'] = 'SentenceEnd'
- final_result['result'] = ''.join(cw["w"] for item in rt for ws in item["ws"] for cw in ws["cw"]).strip()
- elif result_dict["action"] == "error":
- self.logger.error("xunfei.Asr.recv: call_id: %s, action is error: %s", self.tid, message)
- final_result['name'] = 'TranscriptionResultError'
- final_result['result'] = message
- if self.ws:
- self.ws.close()
- else:
- self.logger.error("xunfei.Asr.recv: call_id: %s, Status is not: %s", self.tid, result_dict["code"])
- final_result['name'] = 'TranscriptionResultError'
- final_result['status'] = result_dict["code"]
- registry.ASR_ERRORS.labels(result_dict["code"]).inc()
- if self.ws:
- self.ws.close()
- self.logger.error("xunfei.Asr.recv: call_id: %s, final_result: %s", self.tid, final_result)
- return final_result
- class XunfeiAsr:
- STATUS_FIRST_FRAME = 0 # 第一帧的标识
- STATUS_CONTINUE_FRAME = 1 # 中间帧标识
- STATUS_LAST_FRAME = 2 # 最后一帧的标识
- def __init__(self, tid, logger, message_receiver=None):
- self.__id = tid
- self.logger = logger
- self.message_receiver = message_receiver
- self.APPID = '1ec1097b'
- self.APIKey = '7237c29d862daa6fd805f788ed70c409'
- self.APISecret = 'YTk1YzAyMmQwYjQ3ZDJkYWQyZGQwMDFm'
- # 公共参数(common)
- self.CommonArgs = {"app_id": self.APPID}
- # 业务参数(business),更多个性化参数可在官网查看
- self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1,"vad_eos":2000}
- self.__event = threading.Event()
- self.__th = threading.Thread(target=self.__run())
- def create_url(self):
- import hashlib
- import hmac
- from urllib.parse import urlencode
- from wsgiref.handlers import format_date_time
- from time import mktime
- url = 'wss://ws-api.xfyun.cn/v2/iat'
- # 生成RFC1123格式的时间戳
- now = datetime.now()
- date = format_date_time(mktime(now.timetuple()))
- # 拼接字符串
- signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
- signature_origin += "date: " + date + "\n"
- signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
- # 进行hmac-sha256进行加密
- signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
- signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
- authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
- self.APIKey, "hmac-sha256", "host date request-line", signature_sha)
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
- # 将请求的鉴权参数组合为字典
- v = {
- "authorization": authorization,
- "date": date,
- "host": "ws-api.xfyun.cn"
- }
- # 拼接鉴权参数,生成url
- url = url + '?' + urlencode(v)
- # print("date: ",date)
- # print("v: ",v)
- # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
- # print('websocket url :', url)
- return url
- def __run(self):
- import ssl
- import websocket
- count = 0
- self.__event.clear()
- websocket.enableTrace(True)
- while not self.__event.is_set():
- try:
- # 测试时候在此处正确填写相关信息即可运行
- time1 = datetime.now()
- ws_url = self.create_url()
- self.logger.info("xunfei.Asr.call_id:%s, ws_url:%s", self.__id, ws_url)
- self.ws = websocket.WebSocketApp(ws_url, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
- self.logger.info("xunfei.Asr.call_id:%s, 111111111", self.__id)
- self.ws.on_open = self.on_open
- self.logger.info("xunfei.Asr.call_id:%s, 2222222222", self.__id)
- self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
- self.logger.info("xunfei.Asr.call_id:%s, 3333333333", self.__id)
- time_cost = (datetime.now() - time1)
- self.logger.info(f"xunfei.Asr.started. call_id:{self.__id}, timeCost:{time_cost}, count:{count}")
- self.__event.wait(timeout=.5)
- self.logger.info(f"xunfei.Asr.started. call_id:{self.__id}, {count}")
- except Exception as e:
- traceback.print_exc()
- self.logger.info(f"[{self.__id}]ASR session start exception. {e}")
- count += 1
- def start(self):
- self.__th.start()
- def send_audio(self, audio_data=None, first=False):
- if not self.ws:
- self.logger.info('xunfei.Asr.send_audio:ws_is_None: call_id: %s, chunk:%s, %s', self.__id, len(audio_data), audio_data)
- return
- status = XunfeiAsr.STATUS_FIRST_FRAME if first else XunfeiAsr.STATUS_CONTINUE_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
- if not audio_data:
- status = XunfeiAsr.STATUS_LAST_FRAME
- self.logger.info('xunfei.Asr.send_audio: call_id: %s, status:%s, chunk:%s, %s', self.__id, status, len(audio_data), audio_data)
- # 第一帧处理
- # 发送第一帧音频,带business 参数
- # appid 必须带上,只需第一帧发送
- if status == XunfeiAsr.STATUS_FIRST_FRAME:
- d = {"common": self.CommonArgs,
- "business": self.BusinessArgs,
- "data": {"status": 0, "format": "audio/L16;rate=16000",
- "audio": str(base64.b64encode(audio_data), 'utf-8'),
- "encoding": "raw"}}
- d = json.dumps(d)
- self.ws.send(d)
- # 中间帧处理
- elif status == XunfeiAsr.STATUS_CONTINUE_FRAME:
- d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
- "audio": str(base64.b64encode(audio_data), 'utf-8'),
- "encoding": "raw"}}
- self.ws.send(json.dumps(d))
- # 最后一帧处理
- elif status == XunfeiAsr.STATUS_LAST_FRAME:
- d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
- "audio": str(base64.b64encode(audio_data), 'utf-8'),
- "encoding": "raw"}}
- self.ws.send(json.dumps(d))
- time.sleep(1)
- def close(self):
- try:
- self.send_audio()
- self.ws.close()
- except Exception as e:
- self.logger.info(f"[{self.__id}]Error stopping ASR: {e}")
- # 收到websocket连接建立的处理
- def on_open(self, ws):
- self.logger.info("xunfei.Asr.open: call_id: %s", self.__id)
- self.__event.set()
- # 收到websocket消息的处理
- def on_message(self, ws, message):
- try:
- self.logger.info("xunfei.Asr.recv: call_id: %s, message :%s", self.__id, message)
- code = json.loads(message)["code"]
- sid = json.loads(message)["sid"]
- if code != 0:
- errMsg = json.loads(message)["message"]
- self.logger.info("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
- else:
- data = json.loads(message)["data"]["result"]["ws"]
- # print(json.loads(message))
- result = ""
- for i in data:
- for w in i["cw"]:
- result += w["w"]
- self.logger.info("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False)))
- except Exception as e:
- self.logger.error("receive msg,but parse exception:%s", e)
- # 收到websocket错误的处理
- def on_error(self, ws, error):
- self.logger.error("xunfei.Asr.recv::error, call_id:%s, msg:%s", self.__id, error)
- if not self.__event.is_set():
- self.__event.set()
- # 收到websocket关闭的处理
- def on_close(self, ws, a, b):
- self.logger.error("xunfei.Asr.recv::close, call_id:%s", self.__id)
- if not self.__event.is_set():
- self.__event.set()
- def convert_message(self, message):
- final_result = {}
- message = json.loads(message)
- if message["header"]["status"] == 20000000:
- if message["header"]["name"] == "SentenceBegin":
- final_result['name'] = 'SentenceBegin'
- if message["header"]["name"] == "SentenceEnd":
- result = message["payload"]["result"]
- # self.logger.info("asr返回内容Result:%s", result)
- final_result['name'] = 'SentenceEnd'
- final_result['result'] = result
- elif message["header"]["name"] == "TranscriptionResultChanged":
- final_result['name'] = 'TranscriptionResultChanged'
- else:
- final_result['name'] = 'TranscriptionResultError'
- final_result['status'] = message['header']['status']
- final_result['result'] = ''
- self.logger.info(f"Status is not {message['header']['status']}")
- registry.ASR_ERRORS.labels(message['header']['status']).inc()
- self.logger.error("xunfei.Asr.recv: call_id:%s, final_result: %s", self.__id, final_result)
- return final_result
|