# Copyright (c) Alibaba, Inc. and its affiliates. import logging import uuid import json import threading from enum import IntEnum from nls.core import NlsCore from . import logging from .exception import StartTimeoutException, WrongStateException, InvalidParameter __STREAM_INPUT_TTS_NAMESPACE__ = "FlowingSpeechSynthesizer" __STREAM_INPUT_TTS_REQUEST_CMD__ = { "start": "StartSynthesis", "send": "RunSynthesis", "stop": "StopSynthesis", } __STREAM_INPUT_TTS_REQUEST_NAME__ = { "started": "SynthesisStarted", "sentence_begin": "SentenceBegin", "sentence_synthesis": "SentenceSynthesis", "sentence_end": "SentenceEnd", "completed": "SynthesisCompleted", "task_failed": "TaskFailed", } __URL__ = "wss://nls-gateway.cn-shanghai.aliyuncs.com/ws/v1" __all__ = ["NlsStreamInputTtsSynthesizer"] class NlsStreamInputTtsRequest: def __init__(self, task_id, session_id, appkey): self.task_id = task_id self.appkey = appkey self.session_id = session_id def getStartCMD(self, voice, format, sample_rate, volumn, speech_rate, pitch_rate, ex): self.voice = voice self.format = format self.sample_rate = sample_rate self.volumn = volumn self.speech_rate = speech_rate self.pitch_rate = pitch_rate cmd = { "header": { "message_id": uuid.uuid4().hex, "task_id": self.task_id, "name": __STREAM_INPUT_TTS_REQUEST_CMD__["start"], "namespace": __STREAM_INPUT_TTS_NAMESPACE__, "appkey": self.appkey, }, "payload": { "session_id": self.session_id, "voice": self.voice, "format": self.format, "sample_rate": self.sample_rate, "volumn": self.volumn, "speech_rate": self.speech_rate, "pitch_rate": self.pitch_rate, }, } if ex: cmd["payload"].update(ex) return json.dumps(cmd) def getSendCMD(self, text): cmd = { "header": { "message_id": uuid.uuid4().hex, "task_id": self.task_id, "name": __STREAM_INPUT_TTS_REQUEST_CMD__["send"], "namespace": __STREAM_INPUT_TTS_NAMESPACE__, "appkey": self.appkey, }, "payload": {"text": text}, } return json.dumps(cmd) def getStopCMD(self): cmd = { "header": { "message_id": uuid.uuid4().hex, "task_id": self.task_id, "name": __STREAM_INPUT_TTS_REQUEST_CMD__["stop"], "namespace": __STREAM_INPUT_TTS_NAMESPACE__, "appkey": self.appkey, }, } return json.dumps(cmd) class NlsStreamInputTtsStatus(IntEnum): Begin = 1 Start = 2 Started = 3 WaitingComplete = 3 Completed = 4 Failed = 5 Closed = 6 class ThreadSafeStatus: def __init__(self, state: NlsStreamInputTtsStatus): self._state = state self._lock = threading.Lock() def get(self) -> NlsStreamInputTtsStatus: with self._lock: return self._state def set(self, state: NlsStreamInputTtsStatus): with self._lock: self._state = state class NlsStreamInputTtsSynthesizer: """ Api for text-to-speech """ def __init__( self, url=__URL__, token=None, appkey=None, session_id=None, on_data=None, on_sentence_begin=None, on_sentence_synthesis=None, on_sentence_end=None, on_completed=None, on_error=None, on_close=None, callback_args=[], ): """ NlsSpeechSynthesizer initialization Parameters: ----------- url: str websocket url. akid: str access id from aliyun. if you provide a token, ignore this argument. appkey: str appkey from aliyun session_id: str 32-character string, if empty, sdk will generate a random string. on_data: function Callback object which is called when partial synthesis result arrived arrived. on_result_changed has two arguments. The 1st argument is binary data corresponding to aformat in start method. The 2nd argument is *args which is callback_args. on_sentence_begin: function Callback object which is called when detected sentence start. on_start has two arguments. The 1st argument is message which is a json format string. The 2nd argument is *args which is callback_args. on_sentence_synthesis: function Callback object which is called when detected sentence synthesis. The incremental timestamp is returned within payload. on_start has two arguments. The 1st argument is message which is a json format string. The 2nd argument is *args which is callback_args. on_sentence_end: function Callback object which is called when detected sentence end. The timestamp of the whole sentence is returned within payload. on_start has two arguments. The 1st argument is message which is a json format string. The 2nd argument is *args which is callback_args. on_completed: function Callback object which is called when recognition is completed. on_completed has two arguments. The 1st argument is message which is a json format string. The 2nd argument is *args which is callback_args. on_error: function Callback object which is called when any error occurs. on_error has two arguments. The 1st argument is message which is a json format string. The 2nd argument is *args which is callback_args. on_close: function Callback object which is called when connection closed. on_close has one arguments. The 1st argument is *args which is callback_args. callback_args: list callback_args will return in callbacks above for *args. """ if not token or not appkey: raise InvalidParameter("Must provide token and appkey") self.__response_handler__ = { __STREAM_INPUT_TTS_REQUEST_NAME__["started"]: self.__synthesis_started, __STREAM_INPUT_TTS_REQUEST_NAME__["sentence_begin"]: self.__sentence_begin, __STREAM_INPUT_TTS_REQUEST_NAME__[ "sentence_synthesis" ]: self.__sentence_synthesis, __STREAM_INPUT_TTS_REQUEST_NAME__["sentence_end"]: self.__sentence_end, __STREAM_INPUT_TTS_REQUEST_NAME__["completed"]: self.__synthesis_completed, __STREAM_INPUT_TTS_REQUEST_NAME__["task_failed"]: self.__task_failed, } self.__callback_args = callback_args self.__url = url self.__appkey = appkey self.__token = token self.__session_id = session_id self.start_sended = threading.Event() self.started_event = threading.Event() self.complete_event = threading.Event() self.__on_sentence_begin = on_sentence_begin self.__on_sentence_synthesis = on_sentence_synthesis self.__on_sentence_end = on_sentence_end self.__on_data = on_data self.__on_completed = on_completed self.__on_error = on_error self.__on_close = on_close self.__allow_aformat = ("pcm", "wav", "mp3") self.__allow_sample_rate = ( 8000, 11025, 16000, 22050, 24000, 32000, 44100, 48000, ) self.state = ThreadSafeStatus(NlsStreamInputTtsStatus.Begin) if not self.__session_id: self.__session_id = uuid.uuid4().hex self.request = NlsStreamInputTtsRequest( uuid.uuid4().hex, self.__session_id, self.__appkey ) def __handle_message(self, message): logging.debug("__handle_message") try: __result = json.loads(message) if __result["header"]["name"] in self.__response_handler__: __handler = self.__response_handler__[__result["header"]["name"]] __handler(message) else: logging.error("cannot handle cmd{}".format(__result["header"]["name"])) return except json.JSONDecodeError: logging.error("cannot parse message:{}".format(message)) return def __syn_core_on_open(self): logging.debug("__syn_core_on_open") self.start_sended.set() def __syn_core_on_data(self, data, opcode, flag): logging.debug("__syn_core_on_data") if self.__on_data: self.__on_data(data, *self.__callback_args) def __syn_core_on_msg(self, msg, *args): logging.debug("__syn_core_on_msg:msg={} args={}".format(msg, args)) self.__handle_message(msg) def __syn_core_on_error(self, msg, *args): logging.debug("__sr_core_on_error:msg={} args={}".format(msg, args)) def __syn_core_on_close(self): logging.debug("__sr_core_on_close") if self.__on_close: self.__on_close(*self.__callback_args) self.state.set(NlsStreamInputTtsStatus.Closed) self.start_sended.set() self.started_event.set() self.complete_event.set() def __synthesis_started(self, message): logging.debug("__synthesis_started") self.started_event.set() def __sentence_begin(self, message): logging.debug("__sentence_begin") if self.__on_sentence_begin: self.__on_sentence_begin(message, *self.__callback_args) def __sentence_synthesis(self, message): logging.debug("__sentence_synthesis") if self.__on_sentence_synthesis: self.__on_sentence_synthesis(message, *self.__callback_args) def __sentence_end(self, message): logging.debug("__sentence_end") if self.__on_sentence_end: self.__on_sentence_end(message, *self.__callback_args) def __synthesis_completed(self, message): logging.debug("__synthesis_completed") if self.__on_completed: self.__on_completed(message, *self.__callback_args) self.__nls.shutdown() logging.debug("__synthesis_completed shutdown done") self.complete_event.set() def __task_failed(self, message): logging.debug("__task_failed") self.start_sended.set() self.started_event.set() self.complete_event.set() if self.__on_error: self.__on_error(message, *self.__callback_args) self.state.set(NlsStreamInputTtsStatus.Failed) def startStreamInputTts( self, voice="longxiaochun", aformat="pcm", sample_rate=24000, volume=50, speech_rate=0, pitch_rate=0, ex:dict=None, ): """ Synthesis start Parameters: ----------- voice: str voice for text-to-speech, default is xiaoyun aformat: str audio binary format, support: 'pcm', 'wav', 'mp3', default is 'pcm' sample_rate: int audio sample rate, default is 24000, support:8000, 11025, 16000, 22050, 24000, 32000, 44100, 48000 volume: int audio volume, from 0~100, default is 50 speech_rate: int speech rate from -500~500, default is 0 pitch_rate: int pitch for voice from -500~500, default is 0 ex: dict dict which will merge into 'payload' field in request """ self.__nls = NlsCore( url=self.__url, token=self.__token, on_open=self.__syn_core_on_open, on_message=self.__syn_core_on_msg, on_data=self.__syn_core_on_data, on_close=self.__syn_core_on_close, on_error=self.__syn_core_on_error, callback_args=[], ) if aformat not in self.__allow_aformat: raise InvalidParameter("format {} not support".format(aformat)) if sample_rate not in self.__allow_sample_rate: raise InvalidParameter("samplerate {} not support".format(sample_rate)) if volume < 0 or volume > 100: raise InvalidParameter("volume {} not support".format(volume)) if speech_rate < -500 or speech_rate > 500: raise InvalidParameter("speech_rate {} not support".format(speech_rate)) if pitch_rate < -500 or pitch_rate > 500: raise InvalidParameter("pitch rate {} not support".format(pitch_rate)) request = self.request.getStartCMD( voice, aformat, sample_rate, volume, speech_rate, pitch_rate, ex ) last_state = self.state.get() if last_state != NlsStreamInputTtsStatus.Begin: logging.debug("start with wrong state {}".format(last_state)) self.state.set(NlsStreamInputTtsStatus.Failed) raise WrongStateException("start with wrong state {}".format(last_state)) logging.debug("start with request: {}".format(request)) self.__nls.start(request, ping_interval=0, ping_timeout=None) self.state.set(NlsStreamInputTtsStatus.Start) if not self.start_sended.wait(timeout=10): logging.debug("syn start timeout") raise StartTimeoutException(f"Waiting Connection before Start over 10s") if last_state != NlsStreamInputTtsStatus.Begin: logging.debug("start with wrong state {}".format(last_state)) self.state.set(NlsStreamInputTtsStatus.Failed) raise WrongStateException("start with wrong state {}".format(last_state)) if not self.started_event.wait(timeout=10): logging.debug("syn started timeout") self.state.set(NlsStreamInputTtsStatus.Failed) raise StartTimeoutException(f"Waiting Started over 10s") self.state.set(NlsStreamInputTtsStatus.Started) def sendStreamInputTts(self, text): """ send text to server Parameters: ----------- text: str utf-8 text """ last_state = self.state.get() if last_state != NlsStreamInputTtsStatus.Started: logging.debug("send with wrong state {}".format(last_state)) self.state.set(NlsStreamInputTtsStatus.Failed) raise WrongStateException("send with wrong state {}".format(last_state)) request = self.request.getSendCMD(text) logging.debug("send with request: {}".format(request)) self.__nls.send(request, None) def stopStreamInputTts(self): """ Synthesis end """ last_state = self.state.get() if last_state != NlsStreamInputTtsStatus.Started: logging.debug("send with wrong state {}".format(last_state)) self.state.set(NlsStreamInputTtsStatus.Failed) raise WrongStateException("stop with wrong state {}".format(last_state)) request = self.request.getStopCMD() logging.debug("stop with request: {}".format(request)) self.__nls.send(request, None) self.state.set(NlsStreamInputTtsStatus.WaitingComplete) self.complete_event.wait() self.state.set(NlsStreamInputTtsStatus.Completed) self.shutdown() def shutdown(self): """ Shutdown connection immediately """ self.__nls.shutdown()