Files
xdl/backend/app/services/downloader.py
mini 7d71ba2986
All checks were successful
continuous-integration/drone/push Build is passing
feat: support magnet/torrent download via aria2c
2026-03-17 20:29:18 +08:00

880 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""yt-dlp wrapper service for video downloading."""
import os
import re
import uuid
import json
import asyncio
import logging
import threading
import subprocess
import urllib.request
from pathlib import Path
from typing import Optional
import yt_dlp
logger = logging.getLogger(__name__)
# ── In-memory progress / cancel store (thread-safe via GIL) ─────────────────
_download_progress: dict[str, int] = {} # task_id → 0-100
_cancel_flags: dict[str, threading.Event] = {} # task_id → Event
def register_task(task_id: str):
_cancel_flags[task_id] = threading.Event()
_download_progress[task_id] = 0
def get_progress(task_id: str) -> int:
return _download_progress.get(task_id, 0)
def request_cancel(task_id: str):
flag = _cancel_flags.get(task_id)
if flag:
flag.set()
def cleanup_task(task_id: str):
_cancel_flags.pop(task_id, None)
_download_progress.pop(task_id, None)
def _make_hook(task_id: str):
"""yt-dlp progress hook: handles DASH multi-phase + HLS fragments + cancel."""
state = {"phase": 0} # counts "finished" events (video phase, audio phase…)
PHASE_WEIGHTS = [0.80, 0.19] # phase-0 → 0-80%, phase-1 → 80-99%
def hook(d):
flag = _cancel_flags.get(task_id)
if flag and flag.is_set():
raise yt_dlp.utils.DownloadCancelled("Cancelled by user")
if d["status"] == "downloading":
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
done = d.get("downloaded_bytes", 0)
if total > 0:
phase_pct = done / total # 0.01.0
else:
# HLS / unknown size: use fragment index
fc = d.get("fragment_count") or 0
fi = d.get("fragment_index") or 0
phase_pct = (fi / fc) if fc > 0 else 0.5 # 0.5 = "working"
ph = min(state["phase"], len(PHASE_WEIGHTS) - 1)
base = sum(PHASE_WEIGHTS[:ph]) * 100
span = PHASE_WEIGHTS[ph] * 100
pct = int(base + phase_pct * span)
_download_progress[task_id] = max(1, pct) # at least 1 to show activity
elif d["status"] == "finished":
state["phase"] += 1
done_pct = int(sum(PHASE_WEIGHTS[:state["phase"]]) * 100)
_download_progress[task_id] = min(done_pct, 99)
# Ensure at least 1% progress so UI shows activity
if _download_progress.get(task_id, 0) == 0:
_download_progress[task_id] = 1
return hook
VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos")
X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos")
YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos")
PH_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "ph_videos")
HLS_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "hls_videos")
TORRENT_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "torrent_videos")
# Ensure directories exist
os.makedirs(X_VIDEOS_PATH, exist_ok=True)
os.makedirs(YOUTUBE_VIDEOS_PATH, exist_ok=True)
os.makedirs(PH_VIDEOS_PATH, exist_ok=True)
os.makedirs(HLS_VIDEOS_PATH, exist_ok=True)
os.makedirs(TORRENT_VIDEOS_PATH, exist_ok=True)
# Pattern to match YouTube URLs
YOUTUBE_URL_RE = re.compile(
r'https?://(?:(?:www\.|m\.)?youtube\.com/(?:watch\?.*v=|shorts/|embed/|v/)|youtu\.be/)[\w-]+'
)
# Pattern to match Twitter/X URLs and extract tweet ID
TWITTER_URL_RE = re.compile(
r'https?://(?:(?:www\.)?(?:twitter\.com|x\.com)|[a-z]*twitter\.com)/\w+/status/(\d+)'
)
# Pattern to match Pornhub URLs
PORNHUB_URL_RE = re.compile(
r'https?://(?:[\w-]+\.)?pornhub\.com/(?:view_video\.php\?viewkey=|video/|embed/)[\w-]+'
r'|https?://phub\.to/[\w-]+'
)
# Pattern to match HLS / m3u8 URLs (direct stream links)
HLS_URL_RE = re.compile(
r'https?://[^\s]+\.m3u8(?:[?#][^\s]*)?',
re.IGNORECASE,
)
# Pattern to match Magnet links
MAGNET_RE = re.compile(r'^magnet:\?xt=urn:[a-z0-9]+:[a-zA-Z0-9]+', re.IGNORECASE)
# Pattern to match .torrent file URLs
TORRENT_URL_RE = re.compile(r'https?://[^\s]+\.torrent(?:[?#][^\s]*)?', re.IGNORECASE)
def get_video_path(filename: str, platform: str = "twitter") -> str:
if platform == "youtube":
return os.path.join(YOUTUBE_VIDEOS_PATH, filename)
if platform == "pornhub":
return os.path.join(PH_VIDEOS_PATH, filename)
if platform == "hls":
return os.path.join(HLS_VIDEOS_PATH, filename)
if platform == "torrent":
return os.path.join(TORRENT_VIDEOS_PATH, filename)
return os.path.join(X_VIDEOS_PATH, filename)
def _is_youtube_url(url: str) -> bool:
return bool(YOUTUBE_URL_RE.match(url))
def _is_pornhub_url(url: str) -> bool:
return bool(PORNHUB_URL_RE.match(url))
def detect_platform(url: str) -> str:
"""Detect platform from URL."""
if _is_twitter_url(url):
return "twitter"
if _is_youtube_url(url):
return "youtube"
if _is_pornhub_url(url):
return "pornhub"
if _is_hls_url(url):
return "hls"
if _is_torrent(url):
return "torrent"
return "unknown"
def _is_twitter_url(url: str) -> bool:
return bool(TWITTER_URL_RE.match(url))
def _is_hls_url(url: str) -> bool:
return bool(HLS_URL_RE.match(url))
def _is_torrent(url: str) -> bool:
return bool(MAGNET_RE.match(url)) or bool(TORRENT_URL_RE.match(url))
def _extract_tweet_id(url: str) -> Optional[str]:
m = TWITTER_URL_RE.match(url)
return m.group(1) if m else None
def _twitter_syndication_info(tweet_id: str) -> dict:
"""Fetch tweet info via Twitter's syndication API (no auth required)."""
api_url = f'https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}&token=x'
req = urllib.request.Request(api_url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
resp = urllib.request.urlopen(req, timeout=15)
data = json.loads(resp.read().decode())
# Check for restricted content (TweetTombstone)
if data.get('__typename') == 'TweetTombstone':
raise ValueError('内容受限不支持下载(敏感内容/年龄限制),需要登录账号访问')
return data
def _parse_twitter_video(url: str) -> dict:
"""Parse Twitter video using syndication API."""
tweet_id = _extract_tweet_id(url)
if not tweet_id:
raise ValueError(f"Could not extract tweet ID from URL: {url}")
data = _twitter_syndication_info(tweet_id)
title = data.get('text', 'Untitled')
# Truncate title to first line or 100 chars
title = title.split('\n')[0][:100]
thumbnail = ''
duration = 0
formats = []
for media in data.get('mediaDetails', []):
if media.get('type') != 'video':
continue
thumbnail = media.get('media_url_https', '')
video_info = media.get('video_info', {})
duration = (video_info.get('duration_millis', 0) or 0) // 1000
for i, variant in enumerate(video_info.get('variants', [])):
content_type = variant.get('content_type', '')
if content_type == 'application/x-mpegURL':
continue # Skip HLS
bitrate = variant.get('bitrate', 0)
vid_url = variant.get('url', '')
# Extract resolution from URL
height_match = re.search(r'/(\d+)x(\d+)/', vid_url)
height = int(height_match.group(2)) if height_match else 0
quality = f"{height}p" if height else f"{bitrate // 1000}k"
formats.append({
"format_id": f"tw-{i}",
"quality": quality,
"ext": "mp4",
"filesize": 0,
"note": f"{bitrate // 1000}kbps" if bitrate else "",
"_url": vid_url,
"_bitrate": bitrate,
})
# Sort by bitrate descending
formats.sort(key=lambda x: x.get('_bitrate', 0), reverse=True)
# Add best option
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"formats": [{k: v for k, v in f.items() if not k.startswith('_')} for f in formats],
"url": url,
"_formats_full": formats, # Keep full info for download
}
def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
"""Download Twitter video using syndication API."""
tweet_id = _extract_tweet_id(url)
if not tweet_id:
raise ValueError(f"Could not extract tweet ID from URL: {url}")
data = _twitter_syndication_info(tweet_id)
title = data.get('text', 'Untitled').split('\n')[0][:100]
thumbnail = ''
duration = 0
best_url = None
best_bitrate = 0
for media in data.get('mediaDetails', []):
if media.get('type') != 'video':
continue
thumbnail = media.get('media_url_https', '')
video_info = media.get('video_info', {})
duration = (video_info.get('duration_millis', 0) or 0) // 1000
for i, variant in enumerate(video_info.get('variants', [])):
if variant.get('content_type') == 'application/x-mpegURL':
continue
vid_url = variant.get('url', '')
bitrate = variant.get('bitrate', 0)
if format_id == "best" or format_id == f"tw-{i}":
if format_id != "best" or bitrate > best_bitrate:
best_url = vid_url
best_bitrate = bitrate
if format_id != "best":
break
if not best_url:
raise ValueError("No video found in tweet")
# Download the video
task_id = str(uuid.uuid4())[:8]
filename = os.path.join(X_VIDEOS_PATH, f"{tweet_id}_{task_id}.mp4")
req = urllib.request.Request(best_url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
resp = urllib.request.urlopen(req, timeout=120)
total = int(resp.headers.get('Content-Length', 0))
downloaded = 0
with open(filename, 'wb') as f:
while True:
# Check cancel flag
if task_id and _cancel_flags.get(task_id, threading.Event()).is_set():
raise yt_dlp.utils.DownloadCancelled("Cancelled by user")
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
pct = int(downloaded * 100 / total) if total > 0 else 0
if task_id:
_download_progress[task_id] = pct
if progress_callback and total > 0:
progress_callback(pct)
if task_id:
_download_progress[task_id] = 99
if progress_callback:
progress_callback(100)
file_size = os.path.getsize(filename)
return {
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "twitter",
}
def _parse_youtube_video(url: str) -> dict:
"""Parse YouTube video info using yt-dlp."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": False,
"skip_download": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = []
seen = set()
for f in info.get("formats", []):
if f.get("vcodec", "none") == "none":
continue
height = f.get("height", 0)
if not height:
continue
ext = f.get("ext", "mp4")
fmt_id = f.get("format_id", "")
quality = f"{height}p"
key = f"{quality}"
if key in seen:
continue
seen.add(key)
formats.append({
"format_id": fmt_id,
"quality": quality,
"ext": ext,
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
"note": f.get("format_note", ""),
})
formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True)
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"formats": formats,
"url": url,
"platform": "youtube",
}
def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
"""Download YouTube video using yt-dlp."""
task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(YOUTUBE_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
if format_id == "best":
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"
else:
format_spec = f"{format_id}+bestaudio/best"
hooks = [_make_hook(task_id)] if task_id else []
ydl_opts = {
"format": format_spec,
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"progress_hooks": hooks,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
base = os.path.splitext(filename)[0]
filename = base + ".mp4"
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "youtube",
}
_PH_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Referer": "https://www.pornhub.com/",
}
def _parse_pornhub_video(url: str) -> dict:
"""Parse Pornhub video info using yt-dlp."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": False,
"skip_download": True,
"http_headers": _PH_HEADERS,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = []
seen = set()
for f in info.get("formats", []):
if f.get("vcodec", "none") == "none":
continue
height = f.get("height", 0)
if not height:
continue
ext = f.get("ext", "mp4")
fmt_id = f.get("format_id", "")
quality = f"{height}p"
if quality in seen:
continue
seen.add(quality)
formats.append({
"format_id": fmt_id,
"quality": quality,
"ext": ext,
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
"note": f.get("format_note", ""),
})
formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True)
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"formats": formats,
"url": url,
"platform": "pornhub",
}
def _download_pornhub_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
"""Download Pornhub video using yt-dlp."""
task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(PH_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
if format_id == "best":
# Prefer mp4 with audio; fall back to best available
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio/best[ext=mp4]/best"
else:
# The format may already contain audio (merged); try with audio fallback gracefully
format_spec = f"{format_id}+bestaudio/{format_id}/best"
hooks = [_make_hook(task_id)] if task_id else []
ydl_opts = {
"format": format_spec,
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"http_headers": _PH_HEADERS,
"progress_hooks": hooks,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
base = os.path.splitext(filename)[0]
filename = base + ".mp4"
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "pornhub",
}
def _parse_torrent(url: str) -> dict:
"""Return minimal info for magnet/torrent (metadata only available after download starts)."""
return {
"title": url[:80] if url.startswith("magnet:") else os.path.basename(url.split("?")[0]),
"thumbnail": "",
"duration": 0,
"formats": [{"format_id": "best", "quality": "original", "ext": "*", "filesize": 0, "note": "原始文件(不转码)"}],
"url": url,
"platform": "torrent",
}
def _download_torrent(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
"""Download magnet link or .torrent file via aria2c."""
out_dir = os.path.join(TORRENT_VIDEOS_PATH, task_id or str(uuid.uuid4())[:8])
os.makedirs(out_dir, exist_ok=True)
cmd = [
"aria2c",
"--dir", out_dir,
"--seed-time=0", # 下完即停,不做 seed
"--max-connection-per-server=4",
"--split=4",
"--bt-stop-timeout=300", # 5分钟没速度则超时报错
"--summary-interval=5",
"--console-log-level=warn",
"--file-allocation=none",
url,
]
flag = _cancel_flags.get(task_id) if task_id else None
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
# Parse aria2c stdout for progress
# aria2c prints lines like: [#abcd 100MiB/500MiB(20%) CN:4 DL:2.0MiB]
progress_re = re.compile(r'\((\d+)%\)')
for line in proc.stdout:
if flag and flag.is_set():
proc.terminate()
raise yt_dlp.utils.DownloadCancelled("Cancelled by user")
m = progress_re.search(line)
if m and task_id:
pct = int(m.group(1))
_download_progress[task_id] = max(1, min(pct, 99))
proc.wait()
if proc.returncode != 0:
raise RuntimeError(f"aria2c exited with code {proc.returncode}")
if task_id:
_download_progress[task_id] = 99
# Find the largest file in out_dir (most likely the main video)
files = sorted(Path(out_dir).rglob("*"), key=lambda p: p.stat().st_size if p.is_file() else 0, reverse=True)
video_exts = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".ts", ".m2ts"}
# Prefer known video extension, fallback to largest file
target = next((f for f in files if f.is_file() and f.suffix.lower() in video_exts), None)
if not target:
target = next((f for f in files if f.is_file()), None)
if not target:
raise RuntimeError("aria2c finished but no file found")
return {
"title": target.stem,
"thumbnail": "",
"duration": 0,
"filename": target.name,
"file_path": str(target),
"file_size": target.stat().st_size,
"platform": "torrent",
}
def _parse_hls_video(url: str) -> dict:
"""Parse HLS/m3u8 stream info using yt-dlp."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"allowed_extractors": ["generic"],
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
except Exception:
# If yt-dlp can't parse, return minimal info to allow direct download
return {
"title": "HLS Stream",
"thumbnail": "",
"duration": 0,
"formats": [{"format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "HLS stream (auto-merge)"}],
"url": url,
"platform": "hls",
}
formats = []
seen = set()
for f in (info.get("formats") or []):
if f.get("vcodec", "none") == "none":
continue
height = f.get("height", 0)
fmt_id = f.get("format_id", "")
quality = f"{height}p" if height else f.get("format_note", "HLS")
key = quality
if key in seen:
continue
seen.add(key)
formats.append({
"format_id": fmt_id,
"quality": quality,
"ext": "mp4",
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
"note": f.get("format_note", "HLS"),
})
formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True)
formats.insert(0, {"format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality"})
return {
"title": info.get("title") or "HLS Stream",
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"formats": formats,
"url": url,
"platform": "hls",
}
def _download_hls_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
"""Download HLS/m3u8 stream using yt-dlp (handles segment merge automatically)."""
uid = str(uuid.uuid4())[:8]
output_template = os.path.join(HLS_VIDEOS_PATH, f"hls_{uid}.%(ext)s")
if format_id == "best":
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio/best[ext=mp4]/best"
else:
format_spec = f"{format_id}+bestaudio/{format_id}/best"
hooks = [_make_hook(task_id)] if task_id else []
ydl_opts = {
"format": format_spec,
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"progress_hooks": hooks,
"allowed_extractors": ["generic", "m3u8"],
# HLS-specific: concurrent fragment download for speed
"concurrent_fragment_downloads": 5,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
base = os.path.splitext(filename)[0]
filename = base + ".mp4"
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
title = info.get("title") or "HLS Stream"
return {
"title": title,
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "hls",
}
def parse_video_url(url: str) -> dict:
"""Extract video info without downloading."""
# Use syndication API for Twitter/X URLs
if _is_twitter_url(url):
logger.info(f"Using Twitter syndication API for: {url}")
try:
result = _parse_twitter_video(url)
result.pop('_formats_full', None)
return result
except ValueError as e:
error_msg = str(e)
# If it's restricted content error, don't fallback to yt-dlp
if '内容受限不支持下载' in error_msg:
logger.error(f"Twitter content restricted: {error_msg}")
raise
# For other errors, fallback to yt-dlp
logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}")
except Exception as e:
logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}")
# YouTube URLs
if _is_youtube_url(url):
logger.info(f"Parsing YouTube video: {url}")
return _parse_youtube_video(url)
# Pornhub URLs
if _is_pornhub_url(url):
logger.info(f"Parsing Pornhub video: {url}")
return _parse_pornhub_video(url)
# HLS / m3u8 direct stream URLs
if _is_hls_url(url):
logger.info(f"Parsing HLS stream: {url}")
return _parse_hls_video(url)
# Magnet / torrent
if _is_torrent(url):
logger.info(f"Parsing torrent/magnet: {url}")
return _parse_torrent(url)
# Fallback to generic yt-dlp
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": False,
"skip_download": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = []
seen = set()
for f in info.get("formats", []):
if f.get("vcodec", "none") == "none":
continue
height = f.get("height", 0)
ext = f.get("ext", "mp4")
fmt_id = f.get("format_id", "")
quality = f"{height}p" if height else f.get("format_note", "unknown")
key = f"{quality}-{ext}"
if key in seen:
continue
seen.add(key)
formats.append({
"format_id": fmt_id,
"quality": quality,
"ext": ext,
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
"note": f.get("format_note", ""),
})
formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True)
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"formats": formats,
"url": url,
}
def download_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
"""Download video and return file info."""
# Use syndication API for Twitter/X URLs
if _is_twitter_url(url):
logger.info(f"Using Twitter syndication API for download: {url}")
try:
return _download_twitter_video(url, format_id, progress_callback, task_id=task_id)
except ValueError as e:
error_msg = str(e)
# If it's restricted content error, don't fallback to yt-dlp
if '内容受限不支持下载' in error_msg:
logger.error(f"Twitter content restricted: {error_msg}")
raise
# For other errors, fallback to yt-dlp
logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}")
except Exception as e:
logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}")
# YouTube URLs
if _is_youtube_url(url):
logger.info(f"Downloading YouTube video: {url}")
return _download_youtube_video(url, format_id, progress_callback, task_id=task_id)
# Pornhub URLs
if _is_pornhub_url(url):
logger.info(f"Downloading Pornhub video: {url}")
return _download_pornhub_video(url, format_id, progress_callback, task_id=task_id)
# HLS / m3u8 direct stream URLs
if _is_hls_url(url):
logger.info(f"Downloading HLS stream: {url}")
return _download_hls_video(url, format_id, progress_callback, task_id=task_id)
# Magnet / torrent
if _is_torrent(url):
logger.info(f"Downloading torrent/magnet: {url}")
return _download_torrent(url, format_id, progress_callback, task_id=task_id)
task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" if format_id == "best" else f"{format_id}+bestaudio/best"
hooks = [_make_hook(task_id)] if task_id else []
ydl_opts = {
"format": format_spec,
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"progress_hooks": hooks,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
# yt-dlp may change extension after merge
if not os.path.exists(filename):
base = os.path.splitext(filename)[0]
filename = base + ".mp4"
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": detect_platform(url),
}