772 lines
25 KiB
Python
772 lines
25 KiB
Python
"""yt-dlp wrapper service for video downloading."""
|
||
import os
|
||
import re
|
||
import uuid
|
||
import json
|
||
import asyncio
|
||
import logging
|
||
import threading
|
||
import urllib.request
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
import yt_dlp
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── In-memory progress / cancel store (thread-safe via GIL) ─────────────────
|
||
_download_progress: dict[str, int] = {} # task_id → 0-100
|
||
_cancel_flags: dict[str, threading.Event] = {} # task_id → Event
|
||
|
||
|
||
def register_task(task_id: str):
|
||
_cancel_flags[task_id] = threading.Event()
|
||
_download_progress[task_id] = 0
|
||
|
||
|
||
def get_progress(task_id: str) -> int:
|
||
return _download_progress.get(task_id, 0)
|
||
|
||
|
||
def request_cancel(task_id: str):
|
||
flag = _cancel_flags.get(task_id)
|
||
if flag:
|
||
flag.set()
|
||
|
||
|
||
def cleanup_task(task_id: str):
|
||
_cancel_flags.pop(task_id, None)
|
||
_download_progress.pop(task_id, None)
|
||
|
||
|
||
def _make_hook(task_id: str):
|
||
"""yt-dlp progress hook: handles DASH multi-phase + HLS fragments + cancel."""
|
||
state = {"phase": 0} # counts "finished" events (video phase, audio phase…)
|
||
PHASE_WEIGHTS = [0.80, 0.19] # phase-0 → 0-80%, phase-1 → 80-99%
|
||
|
||
def hook(d):
|
||
flag = _cancel_flags.get(task_id)
|
||
if flag and flag.is_set():
|
||
raise yt_dlp.utils.DownloadCancelled("Cancelled by user")
|
||
|
||
if d["status"] == "downloading":
|
||
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
|
||
done = d.get("downloaded_bytes", 0)
|
||
|
||
if total > 0:
|
||
phase_pct = done / total # 0.0–1.0
|
||
else:
|
||
# HLS / unknown size: use fragment index
|
||
fc = d.get("fragment_count") or 0
|
||
fi = d.get("fragment_index") or 0
|
||
phase_pct = (fi / fc) if fc > 0 else 0.5 # 0.5 = "working"
|
||
|
||
ph = min(state["phase"], len(PHASE_WEIGHTS) - 1)
|
||
base = sum(PHASE_WEIGHTS[:ph]) * 100
|
||
span = PHASE_WEIGHTS[ph] * 100
|
||
pct = int(base + phase_pct * span)
|
||
_download_progress[task_id] = max(1, pct) # at least 1 to show activity
|
||
|
||
elif d["status"] == "finished":
|
||
state["phase"] += 1
|
||
done_pct = int(sum(PHASE_WEIGHTS[:state["phase"]]) * 100)
|
||
_download_progress[task_id] = min(done_pct, 99)
|
||
|
||
# Ensure at least 1% progress so UI shows activity
|
||
if _download_progress.get(task_id, 0) == 0:
|
||
_download_progress[task_id] = 1
|
||
|
||
return hook
|
||
|
||
VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos")
|
||
X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos")
|
||
YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos")
|
||
PH_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "ph_videos")
|
||
HLS_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "hls_videos")
|
||
|
||
# Ensure directories exist
|
||
os.makedirs(X_VIDEOS_PATH, exist_ok=True)
|
||
os.makedirs(YOUTUBE_VIDEOS_PATH, exist_ok=True)
|
||
os.makedirs(PH_VIDEOS_PATH, exist_ok=True)
|
||
os.makedirs(HLS_VIDEOS_PATH, exist_ok=True)
|
||
|
||
# Pattern to match YouTube URLs
|
||
YOUTUBE_URL_RE = re.compile(
|
||
r'https?://(?:(?:www\.|m\.)?youtube\.com/(?:watch\?.*v=|shorts/|embed/|v/)|youtu\.be/)[\w-]+'
|
||
)
|
||
|
||
# Pattern to match Twitter/X URLs and extract tweet ID
|
||
TWITTER_URL_RE = re.compile(
|
||
r'https?://(?:(?:www\.)?(?:twitter\.com|x\.com)|[a-z]*twitter\.com)/\w+/status/(\d+)'
|
||
)
|
||
|
||
# Pattern to match Pornhub URLs
|
||
PORNHUB_URL_RE = re.compile(
|
||
r'https?://(?:[\w-]+\.)?pornhub\.com/(?:view_video\.php\?viewkey=|video/|embed/)[\w-]+'
|
||
r'|https?://phub\.to/[\w-]+'
|
||
)
|
||
|
||
# Pattern to match HLS / m3u8 URLs (direct stream links)
|
||
HLS_URL_RE = re.compile(
|
||
r'https?://[^\s]+\.m3u8(?:[?#][^\s]*)?',
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
def get_video_path(filename: str, platform: str = "twitter") -> str:
|
||
if platform == "youtube":
|
||
return os.path.join(YOUTUBE_VIDEOS_PATH, filename)
|
||
if platform == "pornhub":
|
||
return os.path.join(PH_VIDEOS_PATH, filename)
|
||
if platform == "hls":
|
||
return os.path.join(HLS_VIDEOS_PATH, filename)
|
||
return os.path.join(X_VIDEOS_PATH, filename)
|
||
|
||
|
||
def _is_youtube_url(url: str) -> bool:
|
||
return bool(YOUTUBE_URL_RE.match(url))
|
||
|
||
|
||
def _is_pornhub_url(url: str) -> bool:
|
||
return bool(PORNHUB_URL_RE.match(url))
|
||
|
||
|
||
def detect_platform(url: str) -> str:
|
||
"""Detect platform from URL."""
|
||
if _is_twitter_url(url):
|
||
return "twitter"
|
||
if _is_youtube_url(url):
|
||
return "youtube"
|
||
if _is_pornhub_url(url):
|
||
return "pornhub"
|
||
if _is_hls_url(url):
|
||
return "hls"
|
||
return "unknown"
|
||
|
||
|
||
def _is_twitter_url(url: str) -> bool:
|
||
return bool(TWITTER_URL_RE.match(url))
|
||
|
||
|
||
def _is_hls_url(url: str) -> bool:
|
||
return bool(HLS_URL_RE.match(url))
|
||
|
||
|
||
def _extract_tweet_id(url: str) -> Optional[str]:
|
||
m = TWITTER_URL_RE.match(url)
|
||
return m.group(1) if m else None
|
||
|
||
|
||
def _twitter_syndication_info(tweet_id: str) -> dict:
|
||
"""Fetch tweet info via Twitter's syndication API (no auth required)."""
|
||
api_url = f'https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}&token=x'
|
||
req = urllib.request.Request(api_url, headers={
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||
})
|
||
resp = urllib.request.urlopen(req, timeout=15)
|
||
data = json.loads(resp.read().decode())
|
||
|
||
# Check for restricted content (TweetTombstone)
|
||
if data.get('__typename') == 'TweetTombstone':
|
||
raise ValueError('内容受限不支持下载(敏感内容/年龄限制),需要登录账号访问')
|
||
|
||
return data
|
||
|
||
|
||
def _parse_twitter_video(url: str) -> dict:
|
||
"""Parse Twitter video using syndication API."""
|
||
tweet_id = _extract_tweet_id(url)
|
||
if not tweet_id:
|
||
raise ValueError(f"Could not extract tweet ID from URL: {url}")
|
||
|
||
data = _twitter_syndication_info(tweet_id)
|
||
title = data.get('text', 'Untitled')
|
||
# Truncate title to first line or 100 chars
|
||
title = title.split('\n')[0][:100]
|
||
|
||
thumbnail = ''
|
||
duration = 0
|
||
formats = []
|
||
|
||
for media in data.get('mediaDetails', []):
|
||
if media.get('type') != 'video':
|
||
continue
|
||
thumbnail = media.get('media_url_https', '')
|
||
video_info = media.get('video_info', {})
|
||
duration = (video_info.get('duration_millis', 0) or 0) // 1000
|
||
|
||
for i, variant in enumerate(video_info.get('variants', [])):
|
||
content_type = variant.get('content_type', '')
|
||
if content_type == 'application/x-mpegURL':
|
||
continue # Skip HLS
|
||
bitrate = variant.get('bitrate', 0)
|
||
vid_url = variant.get('url', '')
|
||
# Extract resolution from URL
|
||
height_match = re.search(r'/(\d+)x(\d+)/', vid_url)
|
||
height = int(height_match.group(2)) if height_match else 0
|
||
quality = f"{height}p" if height else f"{bitrate // 1000}k"
|
||
formats.append({
|
||
"format_id": f"tw-{i}",
|
||
"quality": quality,
|
||
"ext": "mp4",
|
||
"filesize": 0,
|
||
"note": f"{bitrate // 1000}kbps" if bitrate else "",
|
||
"_url": vid_url,
|
||
"_bitrate": bitrate,
|
||
})
|
||
|
||
# Sort by bitrate descending
|
||
formats.sort(key=lambda x: x.get('_bitrate', 0), reverse=True)
|
||
|
||
# Add best option
|
||
formats.insert(0, {
|
||
"format_id": "best",
|
||
"quality": "best",
|
||
"ext": "mp4",
|
||
"filesize": 0,
|
||
"note": "Best available quality",
|
||
})
|
||
|
||
return {
|
||
"title": title,
|
||
"thumbnail": thumbnail,
|
||
"duration": duration,
|
||
"formats": [{k: v for k, v in f.items() if not k.startswith('_')} for f in formats],
|
||
"url": url,
|
||
"_formats_full": formats, # Keep full info for download
|
||
}
|
||
|
||
|
||
def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
|
||
"""Download Twitter video using syndication API."""
|
||
tweet_id = _extract_tweet_id(url)
|
||
if not tweet_id:
|
||
raise ValueError(f"Could not extract tweet ID from URL: {url}")
|
||
|
||
data = _twitter_syndication_info(tweet_id)
|
||
title = data.get('text', 'Untitled').split('\n')[0][:100]
|
||
thumbnail = ''
|
||
duration = 0
|
||
best_url = None
|
||
best_bitrate = 0
|
||
|
||
for media in data.get('mediaDetails', []):
|
||
if media.get('type') != 'video':
|
||
continue
|
||
thumbnail = media.get('media_url_https', '')
|
||
video_info = media.get('video_info', {})
|
||
duration = (video_info.get('duration_millis', 0) or 0) // 1000
|
||
|
||
for i, variant in enumerate(video_info.get('variants', [])):
|
||
if variant.get('content_type') == 'application/x-mpegURL':
|
||
continue
|
||
vid_url = variant.get('url', '')
|
||
bitrate = variant.get('bitrate', 0)
|
||
|
||
if format_id == "best" or format_id == f"tw-{i}":
|
||
if format_id != "best" or bitrate > best_bitrate:
|
||
best_url = vid_url
|
||
best_bitrate = bitrate
|
||
if format_id != "best":
|
||
break
|
||
|
||
if not best_url:
|
||
raise ValueError("No video found in tweet")
|
||
|
||
# Download the video
|
||
task_id = str(uuid.uuid4())[:8]
|
||
filename = os.path.join(X_VIDEOS_PATH, f"{tweet_id}_{task_id}.mp4")
|
||
|
||
req = urllib.request.Request(best_url, headers={
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||
})
|
||
resp = urllib.request.urlopen(req, timeout=120)
|
||
total = int(resp.headers.get('Content-Length', 0))
|
||
downloaded = 0
|
||
|
||
with open(filename, 'wb') as f:
|
||
while True:
|
||
# Check cancel flag
|
||
if task_id and _cancel_flags.get(task_id, threading.Event()).is_set():
|
||
raise yt_dlp.utils.DownloadCancelled("Cancelled by user")
|
||
chunk = resp.read(65536)
|
||
if not chunk:
|
||
break
|
||
f.write(chunk)
|
||
downloaded += len(chunk)
|
||
pct = int(downloaded * 100 / total) if total > 0 else 0
|
||
if task_id:
|
||
_download_progress[task_id] = pct
|
||
if progress_callback and total > 0:
|
||
progress_callback(pct)
|
||
|
||
if task_id:
|
||
_download_progress[task_id] = 99
|
||
if progress_callback:
|
||
progress_callback(100)
|
||
|
||
file_size = os.path.getsize(filename)
|
||
|
||
return {
|
||
"title": title,
|
||
"thumbnail": thumbnail,
|
||
"duration": duration,
|
||
"filename": os.path.basename(filename),
|
||
"file_path": filename,
|
||
"file_size": file_size,
|
||
"platform": "twitter",
|
||
}
|
||
|
||
|
||
def _parse_youtube_video(url: str) -> dict:
|
||
"""Parse YouTube video info using yt-dlp."""
|
||
ydl_opts = {
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"extract_flat": False,
|
||
"skip_download": True,
|
||
}
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=False)
|
||
|
||
formats = []
|
||
seen = set()
|
||
for f in info.get("formats", []):
|
||
if f.get("vcodec", "none") == "none":
|
||
continue
|
||
height = f.get("height", 0)
|
||
if not height:
|
||
continue
|
||
ext = f.get("ext", "mp4")
|
||
fmt_id = f.get("format_id", "")
|
||
quality = f"{height}p"
|
||
key = f"{quality}"
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
formats.append({
|
||
"format_id": fmt_id,
|
||
"quality": quality,
|
||
"ext": ext,
|
||
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
|
||
"note": f.get("format_note", ""),
|
||
})
|
||
|
||
formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True)
|
||
|
||
formats.insert(0, {
|
||
"format_id": "best",
|
||
"quality": "best",
|
||
"ext": "mp4",
|
||
"filesize": 0,
|
||
"note": "Best available quality",
|
||
})
|
||
|
||
return {
|
||
"title": info.get("title", "Untitled"),
|
||
"thumbnail": info.get("thumbnail", ""),
|
||
"duration": info.get("duration", 0) or 0,
|
||
"formats": formats,
|
||
"url": url,
|
||
"platform": "youtube",
|
||
}
|
||
|
||
|
||
def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
|
||
"""Download YouTube video using yt-dlp."""
|
||
task_id = str(uuid.uuid4())[:8]
|
||
output_template = os.path.join(YOUTUBE_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
|
||
|
||
if format_id == "best":
|
||
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"
|
||
else:
|
||
format_spec = f"{format_id}+bestaudio/best"
|
||
|
||
hooks = [_make_hook(task_id)] if task_id else []
|
||
|
||
ydl_opts = {
|
||
"format": format_spec,
|
||
"outtmpl": output_template,
|
||
"merge_output_format": "mp4",
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"progress_hooks": hooks,
|
||
}
|
||
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=True)
|
||
filename = ydl.prepare_filename(info)
|
||
if not os.path.exists(filename):
|
||
base = os.path.splitext(filename)[0]
|
||
filename = base + ".mp4"
|
||
|
||
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
|
||
|
||
return {
|
||
"title": info.get("title", "Untitled"),
|
||
"thumbnail": info.get("thumbnail", ""),
|
||
"duration": info.get("duration", 0) or 0,
|
||
"filename": os.path.basename(filename),
|
||
"file_path": filename,
|
||
"file_size": file_size,
|
||
"platform": "youtube",
|
||
}
|
||
|
||
|
||
_PH_HEADERS = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
||
"Referer": "https://www.pornhub.com/",
|
||
}
|
||
|
||
|
||
def _parse_pornhub_video(url: str) -> dict:
|
||
"""Parse Pornhub video info using yt-dlp."""
|
||
ydl_opts = {
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"extract_flat": False,
|
||
"skip_download": True,
|
||
"http_headers": _PH_HEADERS,
|
||
}
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=False)
|
||
|
||
formats = []
|
||
seen = set()
|
||
for f in info.get("formats", []):
|
||
if f.get("vcodec", "none") == "none":
|
||
continue
|
||
height = f.get("height", 0)
|
||
if not height:
|
||
continue
|
||
ext = f.get("ext", "mp4")
|
||
fmt_id = f.get("format_id", "")
|
||
quality = f"{height}p"
|
||
if quality in seen:
|
||
continue
|
||
seen.add(quality)
|
||
formats.append({
|
||
"format_id": fmt_id,
|
||
"quality": quality,
|
||
"ext": ext,
|
||
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
|
||
"note": f.get("format_note", ""),
|
||
})
|
||
|
||
formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True)
|
||
|
||
formats.insert(0, {
|
||
"format_id": "best",
|
||
"quality": "best",
|
||
"ext": "mp4",
|
||
"filesize": 0,
|
||
"note": "Best available quality",
|
||
})
|
||
|
||
return {
|
||
"title": info.get("title", "Untitled"),
|
||
"thumbnail": info.get("thumbnail", ""),
|
||
"duration": info.get("duration", 0) or 0,
|
||
"formats": formats,
|
||
"url": url,
|
||
"platform": "pornhub",
|
||
}
|
||
|
||
|
||
def _download_pornhub_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
|
||
"""Download Pornhub video using yt-dlp."""
|
||
task_id = str(uuid.uuid4())[:8]
|
||
output_template = os.path.join(PH_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
|
||
|
||
if format_id == "best":
|
||
# Prefer mp4 with audio; fall back to best available
|
||
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio/best[ext=mp4]/best"
|
||
else:
|
||
# The format may already contain audio (merged); try with audio fallback gracefully
|
||
format_spec = f"{format_id}+bestaudio/{format_id}/best"
|
||
|
||
hooks = [_make_hook(task_id)] if task_id else []
|
||
|
||
ydl_opts = {
|
||
"format": format_spec,
|
||
"outtmpl": output_template,
|
||
"merge_output_format": "mp4",
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"http_headers": _PH_HEADERS,
|
||
"progress_hooks": hooks,
|
||
}
|
||
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=True)
|
||
filename = ydl.prepare_filename(info)
|
||
if not os.path.exists(filename):
|
||
base = os.path.splitext(filename)[0]
|
||
filename = base + ".mp4"
|
||
|
||
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
|
||
|
||
return {
|
||
"title": info.get("title", "Untitled"),
|
||
"thumbnail": info.get("thumbnail", ""),
|
||
"duration": info.get("duration", 0) or 0,
|
||
"filename": os.path.basename(filename),
|
||
"file_path": filename,
|
||
"file_size": file_size,
|
||
"platform": "pornhub",
|
||
}
|
||
|
||
|
||
def _parse_hls_video(url: str) -> dict:
|
||
"""Parse HLS/m3u8 stream info using yt-dlp."""
|
||
ydl_opts = {
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"skip_download": True,
|
||
"allowed_extractors": ["generic"],
|
||
}
|
||
try:
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=False)
|
||
except Exception:
|
||
# If yt-dlp can't parse, return minimal info to allow direct download
|
||
return {
|
||
"title": "HLS Stream",
|
||
"thumbnail": "",
|
||
"duration": 0,
|
||
"formats": [{"format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "HLS stream (auto-merge)"}],
|
||
"url": url,
|
||
"platform": "hls",
|
||
}
|
||
|
||
formats = []
|
||
seen = set()
|
||
for f in (info.get("formats") or []):
|
||
if f.get("vcodec", "none") == "none":
|
||
continue
|
||
height = f.get("height", 0)
|
||
fmt_id = f.get("format_id", "")
|
||
quality = f"{height}p" if height else f.get("format_note", "HLS")
|
||
key = quality
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
formats.append({
|
||
"format_id": fmt_id,
|
||
"quality": quality,
|
||
"ext": "mp4",
|
||
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
|
||
"note": f.get("format_note", "HLS"),
|
||
})
|
||
|
||
formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True)
|
||
formats.insert(0, {"format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality"})
|
||
|
||
return {
|
||
"title": info.get("title") or "HLS Stream",
|
||
"thumbnail": info.get("thumbnail", ""),
|
||
"duration": info.get("duration", 0) or 0,
|
||
"formats": formats,
|
||
"url": url,
|
||
"platform": "hls",
|
||
}
|
||
|
||
|
||
def _download_hls_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
|
||
"""Download HLS/m3u8 stream using yt-dlp (handles segment merge automatically)."""
|
||
uid = str(uuid.uuid4())[:8]
|
||
output_template = os.path.join(HLS_VIDEOS_PATH, f"hls_{uid}.%(ext)s")
|
||
|
||
if format_id == "best":
|
||
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio/best[ext=mp4]/best"
|
||
else:
|
||
format_spec = f"{format_id}+bestaudio/{format_id}/best"
|
||
|
||
hooks = [_make_hook(task_id)] if task_id else []
|
||
|
||
ydl_opts = {
|
||
"format": format_spec,
|
||
"outtmpl": output_template,
|
||
"merge_output_format": "mp4",
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"progress_hooks": hooks,
|
||
"allowed_extractors": ["generic", "m3u8"],
|
||
# HLS-specific: concurrent fragment download for speed
|
||
"concurrent_fragment_downloads": 5,
|
||
}
|
||
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=True)
|
||
filename = ydl.prepare_filename(info)
|
||
if not os.path.exists(filename):
|
||
base = os.path.splitext(filename)[0]
|
||
filename = base + ".mp4"
|
||
|
||
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
|
||
title = info.get("title") or "HLS Stream"
|
||
|
||
return {
|
||
"title": title,
|
||
"thumbnail": info.get("thumbnail", ""),
|
||
"duration": info.get("duration", 0) or 0,
|
||
"filename": os.path.basename(filename),
|
||
"file_path": filename,
|
||
"file_size": file_size,
|
||
"platform": "hls",
|
||
}
|
||
|
||
|
||
def parse_video_url(url: str) -> dict:
|
||
"""Extract video info without downloading."""
|
||
# Use syndication API for Twitter/X URLs
|
||
if _is_twitter_url(url):
|
||
logger.info(f"Using Twitter syndication API for: {url}")
|
||
try:
|
||
result = _parse_twitter_video(url)
|
||
result.pop('_formats_full', None)
|
||
return result
|
||
except ValueError as e:
|
||
error_msg = str(e)
|
||
# If it's restricted content error, don't fallback to yt-dlp
|
||
if '内容受限不支持下载' in error_msg:
|
||
logger.error(f"Twitter content restricted: {error_msg}")
|
||
raise
|
||
# For other errors, fallback to yt-dlp
|
||
logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}")
|
||
except Exception as e:
|
||
logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}")
|
||
|
||
# YouTube URLs
|
||
if _is_youtube_url(url):
|
||
logger.info(f"Parsing YouTube video: {url}")
|
||
return _parse_youtube_video(url)
|
||
|
||
# Pornhub URLs
|
||
if _is_pornhub_url(url):
|
||
logger.info(f"Parsing Pornhub video: {url}")
|
||
return _parse_pornhub_video(url)
|
||
|
||
# HLS / m3u8 direct stream URLs
|
||
if _is_hls_url(url):
|
||
logger.info(f"Parsing HLS stream: {url}")
|
||
return _parse_hls_video(url)
|
||
|
||
# Fallback to generic yt-dlp
|
||
ydl_opts = {
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"extract_flat": False,
|
||
"skip_download": True,
|
||
}
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=False)
|
||
|
||
formats = []
|
||
seen = set()
|
||
for f in info.get("formats", []):
|
||
if f.get("vcodec", "none") == "none":
|
||
continue
|
||
height = f.get("height", 0)
|
||
ext = f.get("ext", "mp4")
|
||
fmt_id = f.get("format_id", "")
|
||
quality = f"{height}p" if height else f.get("format_note", "unknown")
|
||
key = f"{quality}-{ext}"
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
formats.append({
|
||
"format_id": fmt_id,
|
||
"quality": quality,
|
||
"ext": ext,
|
||
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
|
||
"note": f.get("format_note", ""),
|
||
})
|
||
|
||
formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True)
|
||
|
||
formats.insert(0, {
|
||
"format_id": "best",
|
||
"quality": "best",
|
||
"ext": "mp4",
|
||
"filesize": 0,
|
||
"note": "Best available quality",
|
||
})
|
||
|
||
return {
|
||
"title": info.get("title", "Untitled"),
|
||
"thumbnail": info.get("thumbnail", ""),
|
||
"duration": info.get("duration", 0) or 0,
|
||
"formats": formats,
|
||
"url": url,
|
||
}
|
||
|
||
|
||
def download_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict:
|
||
"""Download video and return file info."""
|
||
# Use syndication API for Twitter/X URLs
|
||
if _is_twitter_url(url):
|
||
logger.info(f"Using Twitter syndication API for download: {url}")
|
||
try:
|
||
return _download_twitter_video(url, format_id, progress_callback, task_id=task_id)
|
||
except ValueError as e:
|
||
error_msg = str(e)
|
||
# If it's restricted content error, don't fallback to yt-dlp
|
||
if '内容受限不支持下载' in error_msg:
|
||
logger.error(f"Twitter content restricted: {error_msg}")
|
||
raise
|
||
# For other errors, fallback to yt-dlp
|
||
logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}")
|
||
except Exception as e:
|
||
logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}")
|
||
|
||
# YouTube URLs
|
||
if _is_youtube_url(url):
|
||
logger.info(f"Downloading YouTube video: {url}")
|
||
return _download_youtube_video(url, format_id, progress_callback, task_id=task_id)
|
||
|
||
# Pornhub URLs
|
||
if _is_pornhub_url(url):
|
||
logger.info(f"Downloading Pornhub video: {url}")
|
||
return _download_pornhub_video(url, format_id, progress_callback, task_id=task_id)
|
||
|
||
# HLS / m3u8 direct stream URLs
|
||
if _is_hls_url(url):
|
||
logger.info(f"Downloading HLS stream: {url}")
|
||
return _download_hls_video(url, format_id, progress_callback, task_id=task_id)
|
||
|
||
task_id = str(uuid.uuid4())[:8]
|
||
output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
|
||
|
||
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" if format_id == "best" else f"{format_id}+bestaudio/best"
|
||
|
||
hooks = [_make_hook(task_id)] if task_id else []
|
||
|
||
ydl_opts = {
|
||
"format": format_spec,
|
||
"outtmpl": output_template,
|
||
"merge_output_format": "mp4",
|
||
"quiet": True,
|
||
"no_warnings": True,
|
||
"progress_hooks": hooks,
|
||
}
|
||
|
||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||
info = ydl.extract_info(url, download=True)
|
||
filename = ydl.prepare_filename(info)
|
||
# yt-dlp may change extension after merge
|
||
if not os.path.exists(filename):
|
||
base = os.path.splitext(filename)[0]
|
||
filename = base + ".mp4"
|
||
|
||
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
|
||
|
||
return {
|
||
"title": info.get("title", "Untitled"),
|
||
"thumbnail": info.get("thumbnail", ""),
|
||
"duration": info.get("duration", 0) or 0,
|
||
"filename": os.path.basename(filename),
|
||
"file_path": filename,
|
||
"file_size": file_size,
|
||
"platform": detect_platform(url),
|
||
}
|