"""yt-dlp wrapper service for video downloading.""" import os import re import uuid import json import asyncio import logging import threading import urllib.request from pathlib import Path from typing import Optional import yt_dlp logger = logging.getLogger(__name__) # ── In-memory progress / cancel store (thread-safe via GIL) ───────────────── _download_progress: dict[str, int] = {} # task_id → 0-100 _cancel_flags: dict[str, threading.Event] = {} # task_id → Event def register_task(task_id: str): _cancel_flags[task_id] = threading.Event() _download_progress[task_id] = 0 def get_progress(task_id: str) -> int: return _download_progress.get(task_id, 0) def request_cancel(task_id: str): flag = _cancel_flags.get(task_id) if flag: flag.set() def cleanup_task(task_id: str): _cancel_flags.pop(task_id, None) _download_progress.pop(task_id, None) def _make_hook(task_id: str): """yt-dlp progress hook: handles DASH multi-phase + HLS fragments + cancel.""" state = {"phase": 0} # counts "finished" events (video phase, audio phase…) PHASE_WEIGHTS = [0.80, 0.19] # phase-0 → 0-80%, phase-1 → 80-99% def hook(d): flag = _cancel_flags.get(task_id) if flag and flag.is_set(): raise yt_dlp.utils.DownloadCancelled("Cancelled by user") if d["status"] == "downloading": total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 done = d.get("downloaded_bytes", 0) if total > 0: phase_pct = done / total # 0.0–1.0 else: # HLS / unknown size: use fragment index fc = d.get("fragment_count") or 0 fi = d.get("fragment_index") or 0 phase_pct = (fi / fc) if fc > 0 else 0.5 # 0.5 = "working" ph = min(state["phase"], len(PHASE_WEIGHTS) - 1) base = sum(PHASE_WEIGHTS[:ph]) * 100 span = PHASE_WEIGHTS[ph] * 100 pct = int(base + phase_pct * span) _download_progress[task_id] = max(1, pct) # at least 1 to show activity elif d["status"] == "finished": state["phase"] += 1 done_pct = int(sum(PHASE_WEIGHTS[:state["phase"]]) * 100) _download_progress[task_id] = min(done_pct, 99) return hook VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos") X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos") YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos") PH_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "ph_videos") # Ensure directories exist os.makedirs(X_VIDEOS_PATH, exist_ok=True) os.makedirs(YOUTUBE_VIDEOS_PATH, exist_ok=True) os.makedirs(PH_VIDEOS_PATH, exist_ok=True) # Pattern to match YouTube URLs YOUTUBE_URL_RE = re.compile( r'https?://(?:(?:www\.|m\.)?youtube\.com/(?:watch\?.*v=|shorts/|embed/|v/)|youtu\.be/)[\w-]+' ) # Pattern to match Twitter/X URLs and extract tweet ID TWITTER_URL_RE = re.compile( r'https?://(?:(?:www\.)?(?:twitter\.com|x\.com)|[a-z]*twitter\.com)/\w+/status/(\d+)' ) # Pattern to match Pornhub URLs PORNHUB_URL_RE = re.compile( r'https?://(?:[\w-]+\.)?pornhub\.com/(?:view_video\.php\?viewkey=|video/|embed/)[\w-]+' r'|https?://phub\.to/[\w-]+' ) def get_video_path(filename: str, platform: str = "twitter") -> str: if platform == "youtube": return os.path.join(YOUTUBE_VIDEOS_PATH, filename) if platform == "pornhub": return os.path.join(PH_VIDEOS_PATH, filename) return os.path.join(X_VIDEOS_PATH, filename) def _is_youtube_url(url: str) -> bool: return bool(YOUTUBE_URL_RE.match(url)) def _is_pornhub_url(url: str) -> bool: return bool(PORNHUB_URL_RE.match(url)) def detect_platform(url: str) -> str: """Detect platform from URL.""" if _is_twitter_url(url): return "twitter" if _is_youtube_url(url): return "youtube" if _is_pornhub_url(url): return "pornhub" return "unknown" def _is_twitter_url(url: str) -> bool: return bool(TWITTER_URL_RE.match(url)) def _extract_tweet_id(url: str) -> Optional[str]: m = TWITTER_URL_RE.match(url) return m.group(1) if m else None def _twitter_syndication_info(tweet_id: str) -> dict: """Fetch tweet info via Twitter's syndication API (no auth required).""" api_url = f'https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}&token=x' req = urllib.request.Request(api_url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) resp = urllib.request.urlopen(req, timeout=15) data = json.loads(resp.read().decode()) # Check for restricted content (TweetTombstone) if data.get('__typename') == 'TweetTombstone': raise ValueError('内容受限不支持下载(敏感内容/年龄限制),需要登录账号访问') return data def _parse_twitter_video(url: str) -> dict: """Parse Twitter video using syndication API.""" tweet_id = _extract_tweet_id(url) if not tweet_id: raise ValueError(f"Could not extract tweet ID from URL: {url}") data = _twitter_syndication_info(tweet_id) title = data.get('text', 'Untitled') # Truncate title to first line or 100 chars title = title.split('\n')[0][:100] thumbnail = '' duration = 0 formats = [] for media in data.get('mediaDetails', []): if media.get('type') != 'video': continue thumbnail = media.get('media_url_https', '') video_info = media.get('video_info', {}) duration = (video_info.get('duration_millis', 0) or 0) // 1000 for i, variant in enumerate(video_info.get('variants', [])): content_type = variant.get('content_type', '') if content_type == 'application/x-mpegURL': continue # Skip HLS bitrate = variant.get('bitrate', 0) vid_url = variant.get('url', '') # Extract resolution from URL height_match = re.search(r'/(\d+)x(\d+)/', vid_url) height = int(height_match.group(2)) if height_match else 0 quality = f"{height}p" if height else f"{bitrate // 1000}k" formats.append({ "format_id": f"tw-{i}", "quality": quality, "ext": "mp4", "filesize": 0, "note": f"{bitrate // 1000}kbps" if bitrate else "", "_url": vid_url, "_bitrate": bitrate, }) # Sort by bitrate descending formats.sort(key=lambda x: x.get('_bitrate', 0), reverse=True) # Add best option formats.insert(0, { "format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality", }) return { "title": title, "thumbnail": thumbnail, "duration": duration, "formats": [{k: v for k, v in f.items() if not k.startswith('_')} for f in formats], "url": url, "_formats_full": formats, # Keep full info for download } def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict: """Download Twitter video using syndication API.""" tweet_id = _extract_tweet_id(url) if not tweet_id: raise ValueError(f"Could not extract tweet ID from URL: {url}") data = _twitter_syndication_info(tweet_id) title = data.get('text', 'Untitled').split('\n')[0][:100] thumbnail = '' duration = 0 best_url = None best_bitrate = 0 for media in data.get('mediaDetails', []): if media.get('type') != 'video': continue thumbnail = media.get('media_url_https', '') video_info = media.get('video_info', {}) duration = (video_info.get('duration_millis', 0) or 0) // 1000 for i, variant in enumerate(video_info.get('variants', [])): if variant.get('content_type') == 'application/x-mpegURL': continue vid_url = variant.get('url', '') bitrate = variant.get('bitrate', 0) if format_id == "best" or format_id == f"tw-{i}": if format_id != "best" or bitrate > best_bitrate: best_url = vid_url best_bitrate = bitrate if format_id != "best": break if not best_url: raise ValueError("No video found in tweet") # Download the video task_id = str(uuid.uuid4())[:8] filename = os.path.join(X_VIDEOS_PATH, f"{tweet_id}_{task_id}.mp4") req = urllib.request.Request(best_url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) resp = urllib.request.urlopen(req, timeout=120) total = int(resp.headers.get('Content-Length', 0)) downloaded = 0 with open(filename, 'wb') as f: while True: # Check cancel flag if task_id and _cancel_flags.get(task_id, threading.Event()).is_set(): raise yt_dlp.utils.DownloadCancelled("Cancelled by user") chunk = resp.read(65536) if not chunk: break f.write(chunk) downloaded += len(chunk) pct = int(downloaded * 100 / total) if total > 0 else 0 if task_id: _download_progress[task_id] = pct if progress_callback and total > 0: progress_callback(pct) if task_id: _download_progress[task_id] = 99 if progress_callback: progress_callback(100) file_size = os.path.getsize(filename) return { "title": title, "thumbnail": thumbnail, "duration": duration, "filename": os.path.basename(filename), "file_path": filename, "file_size": file_size, "platform": "twitter", } def _parse_youtube_video(url: str) -> dict: """Parse YouTube video info using yt-dlp.""" ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": False, "skip_download": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) formats = [] seen = set() for f in info.get("formats", []): if f.get("vcodec", "none") == "none": continue height = f.get("height", 0) if not height: continue ext = f.get("ext", "mp4") fmt_id = f.get("format_id", "") quality = f"{height}p" key = f"{quality}" if key in seen: continue seen.add(key) formats.append({ "format_id": fmt_id, "quality": quality, "ext": ext, "filesize": f.get("filesize") or f.get("filesize_approx") or 0, "note": f.get("format_note", ""), }) formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True) formats.insert(0, { "format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality", }) return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "formats": formats, "url": url, "platform": "youtube", } def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict: """Download YouTube video using yt-dlp.""" task_id = str(uuid.uuid4())[:8] output_template = os.path.join(YOUTUBE_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") if format_id == "best": format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" else: format_spec = f"{format_id}+bestaudio/best" hooks = [_make_hook(task_id)] if task_id else [] ydl_opts = { "format": format_spec, "outtmpl": output_template, "merge_output_format": "mp4", "quiet": True, "no_warnings": True, "progress_hooks": hooks, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info) if not os.path.exists(filename): base = os.path.splitext(filename)[0] filename = base + ".mp4" file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "filename": os.path.basename(filename), "file_path": filename, "file_size": file_size, "platform": "youtube", } _PH_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", "Referer": "https://www.pornhub.com/", } def _parse_pornhub_video(url: str) -> dict: """Parse Pornhub video info using yt-dlp.""" ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": False, "skip_download": True, "http_headers": _PH_HEADERS, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) formats = [] seen = set() for f in info.get("formats", []): if f.get("vcodec", "none") == "none": continue height = f.get("height", 0) if not height: continue ext = f.get("ext", "mp4") fmt_id = f.get("format_id", "") quality = f"{height}p" if quality in seen: continue seen.add(quality) formats.append({ "format_id": fmt_id, "quality": quality, "ext": ext, "filesize": f.get("filesize") or f.get("filesize_approx") or 0, "note": f.get("format_note", ""), }) formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True) formats.insert(0, { "format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality", }) return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "formats": formats, "url": url, "platform": "pornhub", } def _download_pornhub_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict: """Download Pornhub video using yt-dlp.""" task_id = str(uuid.uuid4())[:8] output_template = os.path.join(PH_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") if format_id == "best": # Prefer mp4 with audio; fall back to best available format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio/best[ext=mp4]/best" else: # The format may already contain audio (merged); try with audio fallback gracefully format_spec = f"{format_id}+bestaudio/{format_id}/best" hooks = [_make_hook(task_id)] if task_id else [] ydl_opts = { "format": format_spec, "outtmpl": output_template, "merge_output_format": "mp4", "quiet": True, "no_warnings": True, "http_headers": _PH_HEADERS, "progress_hooks": hooks, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info) if not os.path.exists(filename): base = os.path.splitext(filename)[0] filename = base + ".mp4" file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "filename": os.path.basename(filename), "file_path": filename, "file_size": file_size, "platform": "pornhub", } def parse_video_url(url: str) -> dict: """Extract video info without downloading.""" # Use syndication API for Twitter/X URLs if _is_twitter_url(url): logger.info(f"Using Twitter syndication API for: {url}") try: result = _parse_twitter_video(url) result.pop('_formats_full', None) return result except ValueError as e: error_msg = str(e) # If it's restricted content error, don't fallback to yt-dlp if '内容受限不支持下载' in error_msg: logger.error(f"Twitter content restricted: {error_msg}") raise # For other errors, fallback to yt-dlp logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}") except Exception as e: logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}") # YouTube URLs if _is_youtube_url(url): logger.info(f"Parsing YouTube video: {url}") return _parse_youtube_video(url) # Pornhub URLs if _is_pornhub_url(url): logger.info(f"Parsing Pornhub video: {url}") return _parse_pornhub_video(url) # Fallback to generic yt-dlp ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": False, "skip_download": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) formats = [] seen = set() for f in info.get("formats", []): if f.get("vcodec", "none") == "none": continue height = f.get("height", 0) ext = f.get("ext", "mp4") fmt_id = f.get("format_id", "") quality = f"{height}p" if height else f.get("format_note", "unknown") key = f"{quality}-{ext}" if key in seen: continue seen.add(key) formats.append({ "format_id": fmt_id, "quality": quality, "ext": ext, "filesize": f.get("filesize") or f.get("filesize_approx") or 0, "note": f.get("format_note", ""), }) formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True) formats.insert(0, { "format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality", }) return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "formats": formats, "url": url, } def download_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict: """Download video and return file info.""" # Use syndication API for Twitter/X URLs if _is_twitter_url(url): logger.info(f"Using Twitter syndication API for download: {url}") try: return _download_twitter_video(url, format_id, progress_callback, task_id=task_id) except ValueError as e: error_msg = str(e) # If it's restricted content error, don't fallback to yt-dlp if '内容受限不支持下载' in error_msg: logger.error(f"Twitter content restricted: {error_msg}") raise # For other errors, fallback to yt-dlp logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}") except Exception as e: logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}") # YouTube URLs if _is_youtube_url(url): logger.info(f"Downloading YouTube video: {url}") return _download_youtube_video(url, format_id, progress_callback, task_id=task_id) # Pornhub URLs if _is_pornhub_url(url): logger.info(f"Downloading Pornhub video: {url}") return _download_pornhub_video(url, format_id, progress_callback, task_id=task_id) task_id = str(uuid.uuid4())[:8] output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" if format_id == "best" else f"{format_id}+bestaudio/best" hooks = [_make_hook(task_id)] if task_id else [] ydl_opts = { "format": format_spec, "outtmpl": output_template, "merge_output_format": "mp4", "quiet": True, "no_warnings": True, "progress_hooks": hooks, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info) # yt-dlp may change extension after merge if not os.path.exists(filename): base = os.path.splitext(filename)[0] filename = base + ".mp4" file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "filename": os.path.basename(filename), "file_path": filename, "file_size": file_size, "platform": detect_platform(url), }