# -*- coding: utf8 -*- import json import time import os import re from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest def fileTrans(akId, akSecret, appKey, fileLink): # 地域ID,固定值。 REGION_ID = "cn-beijing" PRODUCT = "nls-filetrans" DOMAIN = "filetrans.cn-beijing.aliyuncs.com" API_VERSION = "2018-08-17" POST_REQUEST_ACTION = "SubmitTask" GET_REQUEST_ACTION = "GetTaskResult" # 请求参数 KEY_APP_KEY = "appkey" KEY_FILE_LINK = "file_link" KEY_VERSION = "version" KEY_ENABLE_WORDS = "enable_words" # 是否开启智能分轨 KEY_AUTO_SPLIT = "auto_split" # 响应参数 KEY_TASK = "Task" KEY_TASK_ID = "TaskId" KEY_STATUS_TEXT = "StatusText" KEY_RESULT = "Result" # 状态值 STATUS_SUCCESS = "SUCCESS" STATUS_RUNNING = "RUNNING" STATUS_QUEUEING = "QUEUEING" # 创建AcsClient实例 client = AcsClient(akId, akSecret, REGION_ID) # 提交录音文件识别请求 postRequest = CommonRequest() postRequest.set_domain(DOMAIN) postRequest.set_version(API_VERSION) postRequest.set_product(PRODUCT) postRequest.set_action_name(POST_REQUEST_ACTION) postRequest.set_method('POST') task = {KEY_APP_KEY : appKey, KEY_FILE_LINK : fileLink, KEY_VERSION : "4.0", KEY_ENABLE_WORDS : False, KEY_AUTO_SPLIT : True} task = json.dumps(task) postRequest.add_body_params(KEY_TASK, task) taskId = "" try: postResponse = client.do_action_with_exception(postRequest) postResponse = json.loads(postResponse) statusText = postResponse[KEY_STATUS_TEXT] if statusText == STATUS_SUCCESS: taskId = postResponse[KEY_TASK_ID] else: print(f"录音文件 {fileLink} 识别请求失败!") return None except ServerException as e: print(e) return None except ClientException as e: print(e) return None # 创建CommonRequest,设置任务ID。 getRequest = CommonRequest() getRequest.set_domain(DOMAIN) getRequest.set_version(API_VERSION) getRequest.set_product(PRODUCT) getRequest.set_action_name(GET_REQUEST_ACTION) getRequest.set_method('GET') getRequest.add_query_param(KEY_TASK_ID, taskId) # 提交录音文件识别结果查询请求 statusText = "" while True: try: getResponse = client.do_action_with_exception(getRequest) getResponse = json.loads(getResponse) statusText = getResponse[KEY_STATUS_TEXT] if statusText == STATUS_RUNNING or statusText == STATUS_QUEUEING: time.sleep(10) else: break except ServerException as e: print(e) return None except ClientException as e: print(e) return None if statusText == STATUS_SUCCESS: result = getResponse[KEY_RESULT] return result else: print(f"录音文件 {fileLink} 识别失败!") return None def batchProcessFiles(akId, akSecret, appKey, baseUrl, fileNames, outputFile): results = {} for fileName in fileNames: fileLink = f"{baseUrl}{fileName}" print(f"处理文件:{fileLink}") result = fileTrans(akId, akSecret, appKey, fileLink) if result: fileName=re.sub(r"\.wav$", "", fileName) results[fileName] = result['Sentences'] print("-" * 40) # 确保目录存在 outputDir = os.path.dirname(outputFile) if not os.path.exists(outputDir): os.makedirs(outputDir) # 将结果写入 JSON 文件 with open(outputFile, 'w', encoding='utf8') as f: json.dump(results, f, ensure_ascii=False, indent=4) print(f"所有结果已保存到 {outputFile}") if __name__ == "__main__": accessKeyId = "LTAI5tQ2HmiHCygZkt5BYrYR" accessKeySecret = "KhmxTd14SUcXafpFk5yofA43FoeM99" appKey = "OKt6jogp6fRjHQVp" # 录音文件基础 URL baseUrl = "https://static.fuxicarbon.com/fullVoice/" # 录音文件名称列表(假设你已经知道或生成了文件名) fileNames = [ "230027-I-0155-18804546916-S.wav", # 添加更多文件名称 ] # 输出 JSON 文件名称 outputFile = "/Users/yushanghui/hongshantianping/git/dataTools/voice/results.json" # 执行批量处理 batchProcessFiles(accessKeyId, accessKeySecret, appKey, baseUrl, fileNames, outputFile)