代码拉取完成,页面将自动刷新
from flask import Flask, request, render_template, jsonify, send_file
from pymongo import MongoClient
import yt_dlp
import os
import json
import threading
from datetime import datetime
import humanize
import time
# Initialize Flask app
app = Flask(__name__)
# MongoDB connection
try:
client = MongoClient('mongodb://localhost:2641/')
db = client['youtube_downloader']
videos_collection = db['videos']
except Exception as e:
print(f"MongoDB connection error: {e}")
videos_collection = None
# 确保下载目录存在
DOWNLOAD_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'downloads')
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
class VideoDownloader:
def __init__(self):
self.current_downloads = 0
self.max_concurrent = 2 # 最大并发下载数
self.progress = {}
self.download_queue = []
def process_queue(self):
while self.download_queue and self.current_downloads < self.max_concurrent:
url, video_id = self.download_queue.pop(0)
self.current_downloads += 1
thread = threading.Thread(
target=self._download_video,
args=(url, video_id)
)
thread.start()
def queue_download(self, url):
video_id = str(datetime.now().timestamp())
self.progress[video_id] = {'status': 'queued'}
self.download_queue.append((url, video_id))
self.process_queue()
return video_id
def _download_video(self, url, video_id):
try:
def progress_hook(d):
if d['status'] == 'downloading':
total_bytes = d.get('total_bytes', 0)
downloaded_bytes = d.get('downloaded_bytes', 0)
speed = d.get('speed', 0)
self.progress[video_id] = {
'status': 'downloading',
'downloaded_bytes': downloaded_bytes,
'total_bytes': total_bytes,
'speed': speed,
'percentage': (downloaded_bytes / total_bytes * 100) if total_bytes else 0,
'file_size': humanize.naturalsize(total_bytes) if total_bytes else 'Unknown',
'download_speed': humanize.naturalsize(speed) + '/s' if speed else 'Unknown'
}
elif d['status'] == 'finished':
self.progress[video_id]['status'] = 'processing'
# 添加cookies文件路径
COOKIES_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'youtube.cookies')
ydl_opts = {
'format': 'bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/best[ext=mp4]/best', # 指定兼容的编码格式
'outtmpl': os.path.join(DOWNLOAD_DIR, '%(title)s.%(ext)s'),
'progress_hooks': [progress_hook],
'no_warnings': True,
'quiet': True,
'extract_flat': False,
'no_check_certificates': True,
'ignoreerrors': True,
'nocheckcertificate': True,
'prefer_insecure': True,
'geo_bypass': True,
'geo_bypass_country': 'US',
'retries': 10,
'fragment_retries': 10,
'skip_unavailable_fragments': True,
'force_generic_extractor': False,
'merge_output_format': 'mp4',
'http_chunk_size': 10485760,
'cookiefile': COOKIES_FILE,
'keepvideo': False, # 合并后删除原始文件
'writethumbnail': True, # 下载缩略图
'headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-us,en;q=0.5',
'Sec-Fetch-Mode': 'navigate'
}
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
# 先获取视频信息
info_dict = ydl.extract_info(url, download=False)
if info_dict is None:
raise Exception("Could not extract video information")
# 确保是单个视频而不是播放列表
if 'entries' in info_dict:
info = info_dict['entries'][0]
else:
info = info_dict
# 预先检查文件是否存在
filename = ydl.prepare_filename(info)
# 确保使用绝对路径
if not os.path.isabs(filename):
filename = os.path.join(DOWNLOAD_DIR, filename)
base, ext = os.path.splitext(filename)
existing_file = None
# 检查所有可能的扩展名
for possible_ext in [ext, '.mp4', '.webm', '.mkv']:
possible_file = base + possible_ext
if os.path.exists(possible_file):
print(f"Found existing file: {possible_file}") # 调试信息
existing_file = possible_file
break
if existing_file:
# 文件已存在,直接使用
filename = existing_file
print(f"Using existing file: {filename}")
# 获取文件大小
file_size = os.path.getsize(filename)
# 首先更新为下载中状态
self.progress[video_id] = {
'status': 'downloading',
'downloaded_bytes': file_size,
'total_bytes': file_size,
'speed': 0,
'percentage': 100,
'file_size': humanize.naturalsize(file_size),
'download_speed': '0 MB/s'
}
# 短暂延迟以确保前端能看到状态变化
time.sleep(0.5)
# 更新为处理中状态
self.progress[video_id]['status'] = 'processing'
time.sleep(0.5)
# 构建完整的video_data
video_data = {
'_id': video_id,
'title': info.get('title', 'Unknown Title'),
'duration': info.get('duration', 0),
'uploader': info.get('uploader', 'Unknown Uploader'),
'description': info.get('description', ''),
'filename': os.path.basename(filename),
'file_path': filename,
'file_size': humanize.naturalsize(file_size),
'download_date': datetime.now(),
'thumbnail': info.get('thumbnail', ''),
'webpage_url': info.get('webpage_url', url)
}
# 更数据库
if videos_collection is not None:
videos_collection.update_one(
{'_id': video_id},
{'$set': video_data},
upsert=True
)
# 最后更新为完成状态
self.progress[video_id] = {
'status': 'completed',
'video_info': video_data,
'downloaded_bytes': file_size,
'total_bytes': file_size,
'percentage': 100,
'file_size': humanize.naturalsize(file_size)
}
else:
# 文件不存在,进行下载
print(f"No existing file found, downloading: {url}") # 调试信息
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if not os.path.isabs(filename):
filename = os.path.join(DOWNLOAD_DIR, filename)
# 再次检查文件否存在(以防扩展名变化)
if not os.path.exists(filename):
base, _ = os.path.splitext(filename)
for ext in ['.mp4', '.webm', '.mkv']:
alt_filename = base + ext
if os.path.exists(alt_filename):
filename = alt_filename
break
if not os.path.exists(filename):
raise Exception("Downloaded file not found")
# Get file size
file_size = os.path.getsize(filename)
video_data = {
'_id': video_id,
'title': info.get('title', 'Unknown Title'),
'duration': info.get('duration', 0),
'uploader': info.get('uploader', 'Unknown Uploader'),
'description': info.get('description', ''),
'filename': os.path.basename(filename),
'file_path': filename,
'file_size': humanize.naturalsize(file_size),
'download_date': datetime.now(),
'thumbnail': info.get('thumbnail', ''),
'webpage_url': info.get('webpage_url', url)
}
if videos_collection is not None:
videos_collection.update_one(
{'_id': video_id},
{'$set': video_data},
upsert=True
)
self.progress[video_id] = {
'status': 'completed',
'video_info': video_data
}
except Exception as e:
raise Exception(f"Download failed: {str(e)}")
except Exception as e:
self.progress[video_id] = {
'status': 'error',
'error': str(e)
}
finally:
self.current_downloads -= 1
self.process_queue()
downloader = VideoDownloader()
@app.route('/')
def index():
videos = []
if videos_collection is not None:
videos = list(videos_collection.find().sort('download_date', -1))
return render_template('index.html', videos=videos)
@app.route('/download', methods=['POST'])
def download():
url = request.json.get('url')
if not url:
return jsonify({'error': 'No URL provided'}), 400
video_id = downloader.queue_download(url)
return jsonify({'video_id': video_id})
@app.route('/progress/<video_id>')
def get_progress(video_id):
return jsonify(downloader.progress.get(video_id, {'status': 'not_found'}))
@app.route('/video/<path:filename>')
def serve_video(filename):
return send_file(
os.path.join(DOWNLOAD_DIR, filename),
mimetype='video/mp4'
)
if __name__ == '__main__':
app.run(debug=True)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。