3 Star 5 Fork 2

新媒体网络营销/针对cosyvoice开发的大文本转语音处理工具_听书狂人处理机

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
merge_audioB.py 21.01 KB
一键复制 编辑 原始数据 按行查看 历史
import os
import re
import sqlite3
from pydub import AudioSegment
import logging
from logging.handlers import RotatingFileHandler
import time
from datetime import datetime
# 全局状态变量
merge_status = {
"current_file": None,
"current_strategy": None,
"error_message": None,
"is_processing": False,
"progress": 0,
"completed_files": 0,
"total_files": 0
}
# 设置日志记录,将日志文件放在 ./output 目录下
log_dir = './output'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 获取当前时间戳
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 设置带有时间戳的日志文件名
log_file = os.path.join(log_dir, f'merge_audio_{timestamp}.log')
# 设置日志级别为 INFO,以确保所有日志都被记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 配置日志处理器,确保日志写入文件
handler = RotatingFileHandler(log_file, maxBytes=10000, backupCount=1, encoding='utf-8')
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def update_merge_status_in_db(conn):
"""将当前的 merge_status 更新到数据库"""
cursor = conn.cursor()
cursor.execute('''
UPDATE merge_status
SET current_file = ?,
current_strategy = ?,
error_message = ?,
is_processing = ?,
progress = ?,
completed_files = ?,
total_files = ?
WHERE task_id = 1
''', (
merge_status["current_file"],
merge_status["current_strategy"],
merge_status["error_message"],
int(merge_status["is_processing"]),
merge_status["progress"],
merge_status["completed_files"],
merge_status["total_files"]
))
conn.commit()
def trigger_merge_logic(db_path='./db/data.db'):
"""基于数据库中的信息触发合并过程,并详细记录合并过程。"""
global merge_status
merge_status = {
"current_file": None,
"current_strategy": None,
"error_message": None,
"is_processing": True, # 开始处理
"progress": 0,
"completed_files": 0,
"total_files": 0
}
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''SELECT audio_filename, chapter, section, audio_duration, sentence, original_order
FROM split_sentences WHERE status="completed" ORDER BY original_order''')
rows = cursor.fetchall()
conn.close()
if not rows:
merge_status["error_message"] = "没有从数据库中找到任何记录。"
merge_status["is_processing"] = False
return []
merged_filenames = []
current_files = []
total_duration = sum(row[3] for row in rows)
merge_status["total_files"] = len(rows)
# 记录合并策略
if total_duration < 2 * 60 * 60 * 1000 or len(rows) <= 2:
merge_status["current_strategy"] = "C" # 使用C策略
else:
merge_status["current_strategy"] = "B" # 使用B策略
for index, row in enumerate(rows):
audio_filename, chapter, section, audio_duration, sentence, original_order = row
merge_status["current_file"] = audio_filename # 更新当前处理的文件名
merge_status["completed_files"] = index + 1 # 更新已完成文件数
merge_status["progress"] = (index + 1) / len(rows) * 100 # 更新进度百分比
if not audio_filename:
merge_status["error_message"] = f"未找到文件名: {row}"
merge_status["is_processing"] = False
return
# 处理逻辑...
# 在合并时可以添加更多的状态更新逻辑,例如生成特定文件后更新 `merge_status`
merge_status["is_processing"] = False # 完成处理
return merged_filenames
def get_merge_status():
"""返回当前的 merge_status 变量"""
return merge_status
def sanitize_filename(filename):
"""清理文件名,使其适应文件系统要求。"""
return re.sub(r'[\\/*?:"<>|]', "_", filename).strip("_")
def convert_milliseconds_to_lrc_time(ms):
"""将毫秒转换为LRC格式的时间戳 [hh:mm:ss.xx]"""
seconds = ms // 1000
minutes = seconds // 60
hours = minutes // 60
minutes = minutes % 60
seconds = seconds % 60
milliseconds = ms % 1000
if hours > 0:
return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds//10:02}"
else:
return f"{minutes:02}:{seconds:02}.{milliseconds//10:02}"
def convert_milliseconds_to_srt_time(ms):
"""将毫秒转换为SRT格式的时间戳 [hh:mm:ss,ms]"""
seconds = ms // 1000
minutes = seconds // 60
hours = minutes // 60
minutes = minutes % 60
seconds = seconds % 60
milliseconds = ms % 1000
return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"
def generate_lrc_file(output_filename, text_list):
"""生成对应的 .lrc 文件,包含时间戳"""
lrc_filename = os.path.normpath(output_filename) # 确保路径格式统一
try:
logger.info(f"正在创建目标文件: {lrc_filename} (0%)")
# 检查并创建目录
os.makedirs(os.path.dirname(lrc_filename), exist_ok=True)
current_time = 0
with open(lrc_filename, 'w', encoding='utf-8') as lrc_file:
for duration, sentence in text_list:
timestamp = convert_milliseconds_to_lrc_time(current_time)
lrc_file.write(f"[{timestamp}]{sentence}\n")
current_time += duration # 累加当前句子的时长
logger.info(f"时间戳: {timestamp}, 写入句子: {sentence}")
# 增加生成SRT文件的逻辑
generate_srt_file(lrc_filename.replace('.lrc', '.srt'), text_list)
logger.info(f"{lrc_filename} (100%) 写入成功")
except Exception as e:
logger.error(f"生成 LRC 文件时出错: {str(e)}")
def generate_srt_file(output_filename, text_list):
"""生成SRT格式字幕文件"""
srt_filename = output_filename
try:
logger.info(f"正在创建目标文件: {srt_filename} (0%)")
os.makedirs(os.path.dirname(srt_filename), exist_ok=True)
current_time = 0
with open(srt_filename, 'w', encoding='utf-8') as srt_file:
for index, (duration, sentence) in enumerate(text_list):
start_time = convert_milliseconds_to_srt_time(current_time)
end_time = convert_milliseconds_to_srt_time(current_time + duration)
srt_file.write(f"{index + 1}\n{start_time} --> {end_time}\n{sentence}\n\n")
current_time += duration
logger.info(f"写入 SRT 字幕: {sentence}, 时间范围: {start_time} --> {end_time}")
logger.info(f"{srt_filename} (100%) 写入成功")
except Exception as e:
logger.error(f"生成 SRT 文件时出错: {str(e)}")
def generate_merged_filename(base_filename, chapter=None, section=None, part=None, is_preface=False, ext="ogg"):
"""根据章节、节和部分生成合并后的文件名,处理前言部分,并为拆分的文件追加_partX后缀。"""
try:
if is_preface:
merged_filename = f'{base_filename}_前言.{ext}'
else:
if chapter:
chapter = re.sub(r'第(\d+)章', r'\1', chapter)
if section:
merged_filename = f'{base_filename}_第{chapter}章_第{section}节.{ext}'
else:
merged_filename = f'{base_filename}_第{chapter}章.{ext}'
else:
merged_filename = f'{base_filename}.{ext}'
if part:
merged_filename = f'{merged_filename.rsplit(".", 1)[0]}_part{part}.{ext}'
# 确保只添加一次后缀
merged_filename = os.path.normpath(os.path.join('./output', merged_filename))
if not merged_filename.endswith(f'.{ext}'):
merged_filename = f'{merged_filename}.{ext}'
logger.info(f"生成文件名: {merged_filename}")
return merged_filename
except Exception as e:
logger.error(f"生成文件名时出错: {e}")
return os.path.normpath(os.path.join('./output', f'default_output.{ext}'))
def merge_files(audio_files, output_filename):
"""将多个音频文件合并为一个,并记录合并过程,生成OGG和WAV格式的文件。"""
output_filename_ogg = os.path.normpath(output_filename) # 确保路径格式统一
output_filename_wav = output_filename_ogg.replace('.ogg', '.wav') # 替换扩展名为WAV
logger.info(f"正在创建目标文件: {output_filename_ogg} (0%)")
try:
# 初始化合并的音频
merged_audio = AudioSegment.empty()
for idx, file in enumerate(audio_files):
file = os.path.normpath(file) # 确保路径格式统一
logger.info(f"正在处理文件: {file}")
audio_segment = AudioSegment.from_file(file)
logger.info(f"音频时长: {len(audio_segment)} 毫秒")
merged_audio += audio_segment
# 记录进度百分比
progress = (idx + 1) / len(audio_files) * 100
logger.info(f"正在创建目标文件: {output_filename_ogg} ({int(progress)}%)")
# 导出OGG格式音频文件
merged_audio.export(output_filename_ogg, format="ogg")
logger.info(f"{output_filename_ogg} (100%) 写入成功")
# 导出WAV格式音频文件
merged_audio.export(output_filename_wav, format="wav")
logger.info(f"{output_filename_wav} (100%) 写入成功")
return output_filename_ogg, output_filename_wav
except Exception as e:
logger.error(f"音频合并失败: {e}")
return None, None
def split_chapter_if_needed(current_files, text_list, current_duration, base_filename, current_chapter):
"""根据45分钟时间点切割章节,满足要求的拆分逻辑。"""
max_duration = 60 * 60 * 1000 # 每个部分的最大时长为60分钟
target_duration = 45 * 60 * 1000 # 目标拆分点为45分钟
min_duration = 15 * 60 * 1000 # 如果剩余部分小于等于15分钟,将其合并到上一部分
parts = []
part_files = []
part_text = []
part_duration = 0
# 确保只有超过1小时的情况才进行分割
if current_duration <= max_duration:
return [(current_files, text_list)]
for audio_file, (duration, sentence) in zip(current_files, text_list):
if part_duration + duration > target_duration:
# 如果当前音频文件加上已经累积的时间超过了目标时长
if part_duration >= min_duration:
# 如果当前部分已经足够长,将当前部分作为一个新部分
parts.append((part_files, part_text))
part_files = []
part_text = []
part_duration = 0
# 将当前音频文件加入当前部分
part_duration += duration
part_files.append(audio_file)
part_text.append((duration, sentence))
# 将最后剩下的部分也添加进去
if part_files:
if part_duration <= min_duration and parts:
# 如果最后一部分小于等于15分钟,并且已经有其他部分,则合并到最后一个部分
last_part_files, last_part_text = parts[-1]
parts[-1] = (last_part_files + part_files, last_part_text + part_text)
else:
parts.append((part_files, part_text))
return parts
def trigger_merge_logic(db_path='./db/data.db'):
"""基于数据库中的信息触发合并过程,并详细记录合并过程。"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 从submission_info表获取文件名前缀信息
base_filename = get_filename_prefix_from_submission_info(db_path)
# 获取所有已处理的音频文件信息
cursor.execute('''SELECT audio_filename, chapter, section, audio_duration, sentence, original_order
FROM split_sentences WHERE status="completed" ORDER BY original_order''')
rows = cursor.fetchall()
# 检查是否存在章节,并获取最大 original_order 值
cursor.execute("SELECT MAX(original_order) FROM split_sentences")
max_order = cursor.fetchone()[0]
conn.close()
if not rows:
logger.error("没有从数据库中找到任何记录。")
return []
merged_filenames = []
current_files = []
text_list = []
current_duration = 0
max_duration = 55 * 60 * 1000 # 55分钟
min_duration = 30 * 60 * 1000 # 30分钟
chapter_mode = False
chapter_count = 0
start_time = time.time()
total_duration = sum(row[3] for row in rows)
chapter_count = sum(1 for row in rows if row[1])
# 触发C方案逻辑
if total_duration < 2 * 60 * 60 * 1000 or chapter_count <= 2:
logger.info("调用C方案处理音频和LRC文件...")
os.system("python merge_audioC.py")
return # 跳出当前逻辑,不再执行后续操作
# 检查是否有超过2个章节
for row in rows:
if row[1]: # 检查 chapter 是否为空
chapter_count += 1
# 如果章节数大于2,启用章模式
if chapter_count > 2:
chapter_mode = True
current_chapter = None
preface_files = []
preface_texts = []
for index, row in enumerate(rows):
audio_filename, chapter, section, audio_duration, sentence, original_order = row
if not audio_filename:
logger.error(f"未找到文件名: {row}")
continue
# 处理前言部分
if current_chapter is None and chapter and preface_files:
merged_filename = generate_merged_filename(base_filename, is_preface=True)
output_ogg, output_wav = merge_files(preface_files, merged_filename)
generate_lrc_file(output_ogg.replace('.ogg', '.lrc'), preface_texts)
merged_filenames.append(merged_filename)
logger.info(f"前言文件已合并为: {merged_filename}")
preface_files = []
preface_texts = []
# 检查是否进入了新的章节
if chapter_mode:
if current_chapter is not None:
# 有章且有节的情况
if section is not None and section == 0 and chapter != current_chapter:
if current_files:
merged_filename = generate_merged_filename(base_filename, chapter=current_chapter)
output_ogg, output_wav = merge_files(current_files, merged_filename)
generate_lrc_file(output_ogg.replace('.ogg', '.lrc'), text_list)
merged_filenames.append(merged_filename)
logger.info(f"文件已合并为: {merged_filename}")
current_files = []
text_list = []
current_duration = 0
# 有章但无节的情况
elif section is None and chapter != current_chapter:
if current_files:
merged_filename = generate_merged_filename(base_filename, chapter=current_chapter)
output_ogg, output_wav = merge_files(current_files, merged_filename)
generate_lrc_file(output_ogg.replace('.ogg', '.lrc'), text_list)
merged_filenames.append(merged_filename)
logger.info(f"文件已合并为: {merged_filename}")
current_files = []
text_list = []
current_duration = 0
# 累加当前段落的时长和文件
current_files.append(os.path.normpath(os.path.join('./temp', audio_filename)))
text_list.append((audio_duration, sentence))
current_duration += audio_duration
logger.info(f"文件添加到当前集合: {os.path.normpath(audio_filename)}")
# 更新当前章节
current_chapter = chapter
# 在此处检查是否需要拆分章节
if current_duration > 60 * 60 * 1000:
parts = split_chapter_if_needed(current_files, text_list, current_duration, base_filename, current_chapter)
for i, (part_files, part_text) in enumerate(parts):
merged_filename = generate_merged_filename(base_filename, chapter=current_chapter, part=i+1)
output_ogg, output_wav = merge_files(part_files, merged_filename)
generate_lrc_file(output_ogg.replace('.ogg', '.lrc'), part_text)
merged_filenames.append(merged_filename)
logger.info(f"章节拆分并合并为: {merged_filename}")
current_files = []
text_list = []
current_duration = 0
# 如果是最后一行或下一行的章节不同,合并本章节
if index == len(rows) - 1 or (chapter_mode and chapter != rows[index + 1][1]):
merged_filename = generate_merged_filename(base_filename, chapter=current_chapter)
output_ogg, output_wav = merge_files(current_files, merged_filename)
generate_lrc_file(output_ogg.replace('.ogg', '.lrc'), text_list)
merged_filenames.append(merged_filename)
logger.info(f"最终文件已合并为: {merged_filename}")
current_files = []
text_list = []
current_duration = 0
# 合并最后的文件段落
if current_files:
if chapter_mode and current_chapter:
merged_filename = generate_merged_filename(base_filename, chapter=current_chapter)
else:
merged_filename = generate_merged_filename(base_filename)
output_ogg, output_wav = merge_files(current_files, merged_filename)
generate_lrc_file(output_ogg.replace('.ogg', '.lrc'), text_list)
merged_filenames.append(merged_filename)
logger.info(f"最终文件已合并为: {merged_filename}")
# 记录所有生成的文件
logger.info(f"生成的文件列表: {merged_filenames}")
end_time = time.time()
total_time = end_time - start_time
logger.info(f"合并总耗时: {total_time:.2f} 秒")
return merged_filenames
except Exception as e:
logger.error(f"触发合并逻辑时出错: {e}")
return []
def get_filename_prefix_from_submission_info(db_path):
"""从submission_info表获取文件名前缀信息"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''SELECT filename, first_slice FROM submission_info LIMIT 1''')
result = cursor.fetchone()
conn.close()
if result:
filename, first_slice = result
if filename:
prefix = sanitize_filename(filename)
elif first_slice:
first_slice = sanitize_filename(first_slice)
# 检查是否包含中文
if re.search(r'[\u4e00-\u9fff]', first_slice):
# 截取前10个字符(只保留中文开头的前10个字符)
prefix = first_slice[:10]
else:
prefix = first_slice
else:
prefix = "【聆听】"
else:
prefix = "【聆听】"
logger.info(f"获取文件名前缀成功: {prefix}")
return prefix
except Exception as e:
logger.error(f"获取文件名前缀信息时出错: {e}")
return "【聆听】"
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="从数据库中生成 .lrc 文件并合并音频文件。")
parser.add_argument("--db_path", type=str, default='./db/data.db', help="SQLite 数据库的路径。")
args = parser.parse_args()
try:
trigger_merge_logic(args.db_path)
print("所有文件生成完成")
except Exception as e:
logger.error(f"程序运行时出错: {e}")
print(f"程序运行时出错: {e}")
# 防止程序自动退出,等待用户输入
input("按回车键退出...")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/xinmeitiyingxiao/lingting.git
git@gitee.com:xinmeitiyingxiao/lingting.git
xinmeitiyingxiao
lingting
针对cosyvoice开发的大文本转语音处理工具_听书狂人处理机
master

搜索帮助