3 Star 5 Fork 2

新媒体网络营销/针对cosyvoice开发的大文本转语音处理工具_听书狂人处理机

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
merge_audioD.py 10.34 KB
一键复制 编辑 原始数据 按行查看 历史
import os
import re
import sqlite3
from pydub import AudioSegment
import logging
from logging.handlers import RotatingFileHandler
import time
# 设置日志记录,将日志文件放在 ./output 目录下
log_dir = './output'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, 'merge_audioD.log')
# 设置日志级别为 INFO,以确保所有日志都被记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 配置日志处理器,确保日志写入文件
handler = RotatingFileHandler(log_file, maxBytes=10000, backupCount=1, encoding='utf-8')
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def sanitize_filename(filename):
"""清理文件名,使其适应文件系统要求。"""
return re.sub(r'[\\/*?:"<>|]', "_", filename).strip("_")
def convert_milliseconds_to_lrc_time(ms):
"""将毫秒转换为LRC格式的时间戳 [hh:mm:ss.xx]"""
seconds = ms // 1000
minutes = seconds // 60
hours = minutes // 60
minutes = minutes % 60
seconds = seconds % 60
milliseconds = ms % 1000
if hours > 0:
return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds//10:02}"
else:
return f"{minutes:02}:{seconds:02}.{milliseconds//10:02}"
def convert_milliseconds_to_srt_time(ms):
seconds = ms // 1000
minutes = seconds // 60
hours = minutes // 60
minutes = minutes % 60
seconds = seconds % 60
milliseconds = ms % 1000
return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"
def generate_lrc_file(output_filename, text_list):
"""生成对应的 .lrc 文件,包含时间戳"""
lrc_filename = os.path.normpath(output_filename) # 确保路径格式统一
try:
logger.info(f"尝试生成 LRC 文件: {lrc_filename}")
# 检查并创建目录
os.makedirs(os.path.dirname(lrc_filename), exist_ok=True)
current_time = 0
with open(lrc_filename, 'w', encoding='utf-8') as lrc_file:
for duration, sentence in text_list:
timestamp = convert_milliseconds_to_lrc_time(current_time)
lrc_file.write(f"[{timestamp}]{sentence}\n")
current_time += duration # 累加当前句子的时长
logger.info(f"时间戳: {timestamp}, 写入句子: {sentence}")
logger.info(f"LRC 文件生成成功: {lrc_filename}")
except Exception as e:
logger.error(f"生成 LRC 文件时出错: {str(e)}")
def generate_srt_file(output_filename, text_list):
srt_filename = os.path.normpath(output_filename.replace('.ogg', '.srt'))
try:
logger.info(f"尝试生成 SRT 文件: {srt_filename}")
os.makedirs(os.path.dirname(srt_filename), exist_ok=True)
current_time = 0
with open(srt_filename, 'w', encoding='utf-8') as srt_file:
for index, (duration, sentence) in enumerate(text_list):
start_time = convert_milliseconds_to_srt_time(current_time)
end_time = convert_milliseconds_to_srt_time(current_time + duration)
srt_file.write(f"{index + 1}\n{start_time} --> {end_time}\n{sentence}\n\n")
current_time += duration
logger.info(f"写入字幕: {sentence}, 时间范围: {start_time} --> {end_time}")
logger.info(f"SRT 文件生成成功: {srt_filename}")
except Exception as e:
logger.error(f"生成 SRT 文件时出错: {str(e)}")
def generate_merged_filename(base_filename, ext="ogg"):
"""生成合并后的文件名"""
try:
merged_filename = os.path.normpath(os.path.join('./output', f'{base_filename}.{ext}'))
logger.info(f"生成文件名: {merged_filename}")
return merged_filename
except Exception as e:
logger.error(f"生成文件名时出错: {e}")
return os.path.normpath(os.path.join('./output', f'default_output.{ext}'))
def send_progress_to_comm(current_part, total_parts, filename, progress_message):
"""发送合并进度到 comm.py"""
feedback = f"{progress_message} - {current_part}/{total_parts}, 文件: {filename}"
try:
response = requests.post("http://127.0.0.1:5000/feedback", data={"feedback": feedback})
if response.status_code == 200:
logger.info(f"进度发送成功: {feedback}")
else:
logger.error(f"进度发送失败,状态码: {response.status_code}")
except Exception as e:
logger.error(f"发送进度时发生错误: {e}")
def trigger_merge_logic(db_path='./db/data.db', notification=None):
"""基于数据库中的信息触发合并过程,并详细记录合并过程。"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 从 submission_info 表获取文件名前缀信息
base_filename = get_filename_prefix_from_submission_info(db_path)
# 获取所有已处理的音频文件信息
cursor.execute('''SELECT audio_filename, audio_duration, sentence FROM split_sentences WHERE status="completed" ORDER BY original_order''')
rows = cursor.fetchall()
conn.close()
if not rows:
logger.error("没有从数据库中找到任何记录。")
return []
total_sentences = len(rows)
current_files = []
text_list = []
# 遍历数据库中的每一条记录,收集音频文件和对应的文本
for index, row in enumerate(rows):
audio_filename, audio_duration, sentence = row
if not audio_filename:
logger.error(f"未找到文件名: {row}")
continue
current_files.append(os.path.normpath(os.path.join('./temp', audio_filename)))
text_list.append((audio_duration, sentence))
logger.info(f"文件添加到当前集合: {os.path.normpath(audio_filename)}")
# 进度更新
progress = ((index + 1) / total_sentences) * 100
print(f"合并进展: {index + 1}/{total_sentences} ({progress:.2f}%)")
logger.info(f"合并进展: {index + 1}/{total_sentences} ({progress:.2f}%)")
if notification:
logger.info(f"收到来自 {notification} 的通知,跳转到 D 方案")
# 合并所有的音频文件
if current_files:
merged_filename = generate_merged_filename(base_filename)
merge_files(current_files, merged_filename)
generate_lrc_file(merged_filename.replace('.ogg', '.lrc'), text_list)
generate_srt_file(merged_filename.replace('.ogg', '.srt'), text_list) # 生成SRT文件
logger.info(f"最终文件已合并为: {merged_filename}")
except Exception as e:
logger.error(f"触发合并逻辑时出错: {e}")
def merge_files(audio_files, output_filename):
"""将多个音频文件合并为一个,并记录合并过程。"""
output_filename = os.path.normpath(output_filename) # 确保路径格式统一
logger.info(f"开始合并文件: {audio_files}{output_filename}")
try:
# 这里初始化音频合并所需的变量,确保没有遗留的状态
merged_audio = AudioSegment.empty()
for file in audio_files:
file = os.path.normpath(file) # 确保路径格式统一
logger.info(f"正在处理文件: {file}")
audio_segment = AudioSegment.from_file(file)
logger.info(f"音频时长: {len(audio_segment)} 毫秒")
merged_audio += audio_segment
# 导出音频文件,检查是否成功
merged_audio.export(output_filename, format="ogg")
logger.info(f"文件合并成功: {output_filename}")
# 生成 WAV 格式音频文件
wav_output_filename = output_filename.replace('.ogg', '.wav')
merged_audio.export(wav_output_filename, format="wav")
logger.info(f"WAV 文件生成成功: {wav_output_filename}")
return output_filename
except Exception as e:
logger.error(f"音频合并失败: {e}")
return None
def get_filename_prefix_from_submission_info(db_path):
"""从submission_info表获取文件名前缀信息"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''SELECT filename, first_slice FROM submission_info LIMIT 1''')
result = cursor.fetchone()
conn.close()
if result:
filename, first_slice = result
if filename:
prefix = sanitize_filename(filename)
elif first_slice:
first_slice = sanitize_filename(first_slice)
# 检查是否包含中文
if re.search(r'[\u4e00-\u9fff]', first_slice):
# 截取前10个字符(只保留中文开头的前10个字符)
prefix = first_slice[:10]
else:
prefix = first_slice
else:
prefix = "【聆听】"
else:
prefix = "【聆听】"
logger.info(f"获取文件名前缀成功: {prefix}")
return prefix
except Exception as e:
logger.error(f"获取文件名前缀信息时出错: {e}")
return "【聆听】"
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="从数据库中生成 .lrc 文件并合并音频文件。")
parser.add_argument("--db_path", type=str, default='./db/data.db', help="SQLite 数据库的路径。")
parser.add_argument("--notification", type=str, help="接收到的通知,例如来自时间选择器或其他模块的通知。")
args = parser.parse_args()
try:
trigger_merge_logic(args.db_path, args.notification) # 确保函数调用与定义一致
print("所有文件生成完成")
except Exception as e:
logger.error(f"程序运行时出错: {e}")
print(f"程序运行时出错: {e}")
# 防止程序自动退出,等待用户输入
input("按回车键退出...")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/xinmeitiyingxiao/lingting.git
git@gitee.com:xinmeitiyingxiao/lingting.git
xinmeitiyingxiao
lingting
针对cosyvoice开发的大文本转语音处理工具_听书狂人处理机
master

搜索帮助