3 Star 5 Fork 2

新媒体网络营销/针对cosyvoice开发的大文本转语音处理工具_听书狂人处理机

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
merge_audioA.py 12.13 KB
一键复制 编辑 原始数据 按行查看 历史
import os
import re
import sqlite3
from pydub import AudioSegment
import logging
from logging.handlers import RotatingFileHandler
import time
# 设置日志记录,将日志文件放在 ./output 目录下
log_dir = './output'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, 'merge_audio.log')
# 设置日志级别为 INFO,以确保所有日志都被记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 配置日志处理器,确保日志写入文件
handler = RotatingFileHandler(log_file, maxBytes=10000, backupCount=1, encoding='utf-8')
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def sanitize_filename(filename):
"""清理文件名,使其适应文件系统要求。"""
return re.sub(r'[\\/*?:"<>|]', "_", filename).strip("_")
def convert_milliseconds_to_lrc_time(ms):
"""将毫秒转换为LRC格式的时间戳 [hh:mm:ss.xx]"""
seconds = ms // 1000
minutes = seconds // 60
hours = minutes // 60
minutes = minutes % 60
seconds = seconds % 60
milliseconds = ms % 1000
if hours > 0:
return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds//10:02}"
else:
return f"{minutes:02}:{seconds:02}.{milliseconds//10:02}"
def calculate_average_section_duration(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 计算全文的总时长和节的总数
cursor.execute("SELECT SUM(audio_duration) FROM split_sentences WHERE status = 'completed'")
total_duration = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT section) FROM split_sentences WHERE status = 'completed'")
total_sections = cursor.fetchone()[0]
conn.close()
# 如果节的总数为0,避免除以0的错误
if total_sections == 0:
return None
# 计算平均节的时间长度
average_section_duration = total_duration / total_sections
return average_section_duration
# 计算平均节的时间长度
avg_section_duration = calculate_average_section_duration(db_path)
# 如果平均节的时间长度不大于1分钟,跳转到merge_audioB.py
if avg_section_duration is not None and avg_section_duration <= 1 * 60 * 1000:
print("跳转到merge_audioB.py处理")
os.system("python merge_audioB.py")
else:
print("执行merge_audioA.py逻辑")
merge_audio_A_logic()
def generate_lrc_file(output_filename, text_list):
"""生成对应的 .lrc 文件,包含时间戳"""
lrc_filename = os.path.normpath(output_filename) # 确保路径格式统一
try:
logger.info(f"尝试生成 LRC 文件: {lrc_filename}")
# 检查并创建目录
os.makedirs(os.path.dirname(lrc_filename), exist_ok=True)
current_time = 0
with open(lrc_filename, 'w', encoding='utf-8') as lrc_file:
for duration, sentence in text_list:
timestamp = convert_milliseconds_to_lrc_time(current_time)
lrc_file.write(f"[{timestamp}]{sentence}\n")
current_time += duration # 累加当前句子的时长
logger.info(f"时间戳: {timestamp}, 写入句子: {sentence}")
logger.info(f"LRC 文件生成成功: {lrc_filename}")
except Exception as e:
logger.error(f"生成 LRC 文件时出错: {str(e)}")
def generate_merged_filename(base_filename, chapter=None, section=None, part=None, is_preface=False, ext="ogg"):
"""根据章节、节和部分生成合并后的文件名,处理前言部分。"""
try:
if is_preface:
merged_filename = f'{base_filename}_前言.{ext}'
else:
if chapter:
chapter = re.sub(r'第(\d+)章', r'\1', chapter)
if section:
section = re.sub(r'第(\d+)节', r'\1', section)
merged_filename = f'{base_filename}_第{chapter}章_第{section}节.{ext}'
else:
merged_filename = f'{base_filename}_第{chapter}章.{ext}'
else:
merged_filename = f'{base_filename}.{ext}'
if part:
merged_filename = f'{merged_filename.rsplit(".", 1)[0]}_{part}.{ext}'
merged_filename = os.path.normpath(os.path.join('./output', merged_filename))
logger.info(f"生成文件名: {merged_filename}")
return merged_filename
except Exception as e:
logger.error(f"生成文件名时出错: {e}")
return os.path.normpath(os.path.join('./output', f'default_output.{ext}'))
def calculate_average_section_duration(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 计算全文的总时长和节的总数
cursor.execute("SELECT SUM(audio_duration) FROM split_sentences WHERE status = 'completed'")
total_duration = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT section) FROM split_sentences WHERE status = 'completed'")
total_sections = cursor.fetchone()[0]
conn.close()
# 如果节的总数为0,避免除以0的错误
if total_sections == 0:
return None
# 计算平均节的时间长度
average_section_duration = total_duration / total_sections
return average_section_duration
def trigger_merge_logic(db_path='./db/data.db'):
"""基于数据库中的信息触发合并过程,并详细记录合并过程。"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 从submission_info表获取文件名前缀信息
base_filename = get_filename_prefix_from_submission_info(db_path)
# 获取所有已处理的音频文件信息
cursor.execute('''SELECT audio_filename, chapter, section, audio_duration, sentence, original_order
FROM split_sentences WHERE status="completed" ORDER BY chapter, section, original_order''')
rows = cursor.fetchall()
conn.close()
if not rows:
logger.error("没有从数据库中找到任何记录。")
return []
merged_filenames = []
current_files = []
text_list = []
current_duration = 0
max_duration = 55 * 60 * 1000 # 55分钟
chapter_mode = False
current_chapter = None
current_section = None
for index, row in enumerate(rows):
audio_filename, chapter, section, audio_duration, sentence, original_order = row
if not audio_filename:
logger.error(f"未找到文件名: {row}")
continue
# 处理新节的逻辑
if section != current_section and current_files:
merged_filename = generate_merged_filename(base_filename, chapter=current_chapter, section=current_section)
merge_files(current_files, merged_filename)
generate_lrc_file(merged_filename.replace('.ogg', '.lrc'), text_list)
merged_filenames.append(merged_filename)
logger.info(f"文件已合并为: {merged_filename}")
current_files = []
text_list = []
current_duration = 0
# 累加当前段落的时长和文件
current_files.append(os.path.normpath(os.path.join('./temp', audio_filename)))
text_list.append((audio_duration, sentence))
current_duration += audio_duration
logger.info(f"文件添加到当前集合: {os.path.normpath(audio_filename)}")
# 更新当前章节和节
current_chapter = chapter
current_section = section
# 如果是最后一行或下一行的节不同,合并当前节
if index == len(rows) - 1 or (section != rows[index + 1][2]):
merged_filename = generate_merged_filename(base_filename, chapter=current_chapter, section=current_section)
merge_files(current_files, merged_filename)
generate_lrc_file(merged_filename.replace('.ogg', '.lrc'), text_list)
merged_filenames.append(merged_filename)
logger.info(f"文件已合并为: {merged_filename}")
current_files = []
text_list = []
current_duration = 0
# 合并最后的文件段落
if current_files:
merged_filename = generate_merged_filename(base_filename, chapter=current_chapter, section=current_section)
merge_files(current_files, merged_filename)
generate_lrc_file(merged_filename.replace('.ogg', '.lrc'), text_list)
merged_filenames.append(merged_filename)
logger.info(f"最终文件已合并为: {merged_filename}")
# 记录所有生成的文件
logger.info(f"生成的文件列表: {merged_filenames}")
return merged_filenames
except Exception as e:
logger.error(f"触发合并逻辑时出错: {e}")
return []
def merge_files(audio_files, output_filename):
"""将多个音频文件合并为一个,并记录合并过程。"""
output_filename = os.path.normpath(output_filename) # 确保路径格式统一
logger.info(f"开始合并文件: {audio_files}{output_filename}")
try:
# 这里初始化音频合并所需的变量,确保没有遗留的状态
merged_audio = AudioSegment.empty()
for file in audio_files:
file = os.path.normpath(file) # 确保路径格式统一
logger.info(f"正在处理文件: {file}")
audio_segment = AudioSegment.from_file(file)
logger.info(f"音频时长: {len(audio_segment)} 毫秒")
merged_audio += audio_segment
# 导出音频文件,检查是否成功
merged_audio.export(output_filename, format="ogg")
logger.info(f"文件合并成功: {output_filename}")
return output_filename
except Exception as e:
logger.error(f"音频合并失败: {e}")
return None
def get_filename_prefix_from_submission_info(db_path):
"""从submission_info表获取文件名前缀信息"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''SELECT filename, first_slice FROM submission_info LIMIT 1''')
result = cursor.fetchone()
conn.close()
if result:
filename, first_slice = result
if filename:
prefix = sanitize_filename(filename)
elif first_slice:
first_slice = sanitize_filename(first_slice)
# 检查是否包含中文
if re.search(r'[\u4e00-\u9fff]', first_slice):
# 截取前10个字符(只保留中文开头的前10个字符)
prefix = first_slice[:10]
else:
prefix = first_slice
else:
prefix = "【聆听】"
else:
prefix = "【聆听】"
logger.info(f"获取文件名前缀成功: {prefix}")
return prefix
except Exception as e:
logger.error(f"获取文件名前缀信息时出错: {e}")
return "【聆听】"
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="从数据库中生成 .lrc 文件并合并音频文件。")
parser.add_argument("--db_path", type=str, default='./db/data.db', help="SQLite 数据库的路径。")
args = parser.parse_args()
try:
trigger_merge_logic(args.db_path)
print("所有文件生成完成")
except Exception as e:
logger.error(f"程序运行时出错: {e}")
print(f"程序运行时出错: {e}")
# 防止程序自动退出,等待用户输入
input("按回车键退出...")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/xinmeitiyingxiao/lingting.git
git@gitee.com:xinmeitiyingxiao/lingting.git
xinmeitiyingxiao
lingting
针对cosyvoice开发的大文本转语音处理工具_听书狂人处理机
master

搜索帮助