1 Star 0 Fork 0

Lowell/txt2audio

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
main.py 6.84 KB
一键复制 编辑 原始数据 按行查看 历史
Lowell 提交于 2024-08-14 06:10 . 初版完成
import hashlib
import os
import re
from time import sleep
import requests
import json
# 导入腾讯云 SDK
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.tts.v20190823 import tts_client, models
#初始文本路径
text_path = 'txt/Java面试题'
#段落分隔符
paragraph_delimiter = "\n\n"
import requests
# 腾讯云
secret_id = ""
secret_key = ""
task_list = []
temp_list=[]
download_list = []
def createTtsTask(text,output_file_path):
try:
cred = credential.Credential(secret_id, secret_key)
httpProfile = HttpProfile()
httpProfile.endpoint = "tts.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = tts_client.TtsClient(cred, "ap-guangzhou", clientProfile)
#req = models.TextToVoiceRequest()
req = models.CreateTtsTaskRequest()
params={
"Text": text,
"Volume": 0,
"Speed": 0,
"ProjectId": 0,
"ModelType": 1,
"VoiceType": 301034,
"PrimaryLanguage": 1,
"SampleRate": 16000,
"Codec": "mp3",
"EmotionCategory": "neutral",
"EmotionIntensity": 100
}
req.from_json_string(json.dumps(params))
resp = client.CreateTtsTask(req)
data = resp.to_json_string()
taskId=json.loads(data)['Data'].get('TaskId')
print("已经创建任务",taskId)
task_list.append({
'taskId':taskId,
'path':output_file_path
})
return True
except Exception as e:
print("发生异常:" + str(e))
return False
def queryTaskStatus(task):
try:
cred = credential.Credential(secret_id, secret_key)
httpProfile = HttpProfile()
httpProfile.endpoint = "tts.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = tts_client.TtsClient(cred, "ap-guangzhou", clientProfile)
req = models.DescribeTtsTaskStatusRequest()
params={
"TaskId": task['taskId']
}
req.from_json_string(json.dumps(params))
resp = client.DescribeTtsTaskStatus(req)
data = resp.to_json_string()
resultUrl=json.loads(data)['Data'].get('ResultUrl')
if not resultUrl:
temp_list.append(task)
print("暂时还没有",data)
return
download_list.append({'resultUrl':resultUrl,'path':task['path']})
except Exception as e:
print("发生异常:" + str(e))
temp_list.append(task)
return False
def download_mp3(download):
try:
mp3_url = download['resultUrl']
output_file_path = download['path']
if mp3_url:
response = requests.get(mp3_url)
with open(output_file_path, 'wb') as f:
f.write(response.content)
print("MP3文件已生成",output_file_path)
else:
print("未能生成MP3文件")
temp_list.append(download)
print("下载未完成",download)
except Exception as e:
print("发生异常:" + str(e))
temp_list.append(download)
def sha256(content=None):
if content is None:
return ''
sha256gen = hashlib.sha256()
sha256gen.update(content.encode())
sha256code = sha256gen.hexdigest()
sha256gen = None
return sha256code
def camel_to_spaces(text):
# 使用正则表达式将驼峰命名法的单词分割成多个单词
result = re.sub(r'(?<!\b)(?=[A-Z][a-z])', ' ', text)
# 去除开头可能的空格
#print(result)
#return result.strip()
return result
def main():
global task_list,download_list,temp_list
audio_files = []
# 打开文件
with open(text_path, 'r', encoding='utf-8') as file:
# 读取文件内容并存储为字符串
file_content = file.read()
# 待处理的文字数组
text_list=file_content.split(paragraph_delimiter)
# # 打开保存失败文字的文件
# with open('failed_texts_'+datetime.now().strftime("%Y-%m-%d %H:%M"), 'w', encoding='utf-8') as failed_file:
print('===================开始创建任务======================')
for i, text in enumerate(text_list):
sleep(1)
if not text:
continue
newText=camel_to_spaces(text=text) #有驼峰命名的,必须要打开,否则有概率跳过
#newText=text
audio_file_path=f"./target/mp3/{sha256(newText)}.mp3"
if os.path.exists(audio_file_path):
audio_files.append(audio_file_path)
print('文件已经存在,已跳过')
continue
# 创建tts任务
createTtsTaskResult=createTtsTask(text=newText,output_file_path=audio_file_path)
if createTtsTaskResult:
audio_files.append(audio_file_path)
else:
# 将失败的文本写入文件
print("\033[0;31;40m已写入失败文件\033[0m")
#failed_file.write(text + '\n\n')
print('===================开始查询任务状态======================')
#查询任务
total = len(task_list)
while len(task_list)>0:
print("===========查询中===============")
sleep(5)
for i, task in enumerate(task_list):
sleep(1)
print("正在查询任务",task['taskId'])
queryTaskStatus(task=task)
task_list=temp_list
temp_list=[]
print('目前下载任务队列数量 ',str(len(download_list))+'/'+str(total))
print('===================查询任务状态完成======================')
print('最终下载任务队列数量',str(len(download_list))+'/'+str(total))
if len(task_list)>0:
print("\033[0;31;还有未完成的任务\033[0m")
print(task_list)
print('===================开始下载MP3======================')
total = len(download_list)
while len(download_list)>0:
print("===========下载中===============")
sleep(5)
for i, download in enumerate(download_list):
sleep(1)
print("正在下载mp3",download['path'])
download_mp3(download=download)
download_list=temp_list
temp_list=[]
print('目前下载任务队列数量 ',str(total-len(download_list))+'/'+str(total))
print('===================开始查询下载状态======================')
print('最终下载队列数量',str(total-len(download_list))+'/'+str(total))
if len(download_list)>0:
print("\033[0;31;还有未完成的下载\033[0m")
print(download_list)
else:
print('全部下载已完成')
if __name__ == "__main__":
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/mxywds/txt2audio.git
git@gitee.com:mxywds/txt2audio.git
mxywds
txt2audio
txt2audio
master

搜索帮助