"""yt-dlp wrapper service for video downloading.""" import os import re import uuid import json import asyncio import logging import urllib.request from pathlib import Path from typing import Optional import yt_dlp logger = logging.getLogger(__name__) VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos") X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos") YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos") # Ensure directories exist os.makedirs(X_VIDEOS_PATH, exist_ok=True) os.makedirs(YOUTUBE_VIDEOS_PATH, exist_ok=True) # Pattern to match YouTube URLs YOUTUBE_URL_RE = re.compile( r'https?://(?:(?:www\.|m\.)?youtube\.com/(?:watch\?.*v=|shorts/|embed/|v/)|youtu\.be/)[\w-]+' ) # Pattern to match Twitter/X URLs and extract tweet ID TWITTER_URL_RE = re.compile( r'https?://(?:(?:www\.)?(?:twitter\.com|x\.com)|[a-z]*twitter\.com)/\w+/status/(\d+)' ) def get_video_path(filename: str, platform: str = "twitter") -> str: if platform == "youtube": return os.path.join(YOUTUBE_VIDEOS_PATH, filename) return os.path.join(X_VIDEOS_PATH, filename) def _is_youtube_url(url: str) -> bool: return bool(YOUTUBE_URL_RE.match(url)) def _is_twitter_url(url: str) -> bool: return bool(TWITTER_URL_RE.match(url)) def _extract_tweet_id(url: str) -> Optional[str]: m = TWITTER_URL_RE.match(url) return m.group(1) if m else None def _twitter_syndication_info(tweet_id: str) -> dict: """Fetch tweet info via Twitter's syndication API (no auth required).""" api_url = f'https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}&token=x' req = urllib.request.Request(api_url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) resp = urllib.request.urlopen(req, timeout=15) return json.loads(resp.read().decode()) def _parse_twitter_video(url: str) -> dict: """Parse Twitter video using syndication API.""" tweet_id = _extract_tweet_id(url) if not tweet_id: raise ValueError(f"Could not extract tweet ID from URL: {url}") data = _twitter_syndication_info(tweet_id) title = data.get('text', 'Untitled') # Truncate title to first line or 100 chars title = title.split('\n')[0][:100] thumbnail = '' duration = 0 formats = [] for media in data.get('mediaDetails', []): if media.get('type') != 'video': continue thumbnail = media.get('media_url_https', '') video_info = media.get('video_info', {}) duration = (video_info.get('duration_millis', 0) or 0) // 1000 for i, variant in enumerate(video_info.get('variants', [])): content_type = variant.get('content_type', '') if content_type == 'application/x-mpegURL': continue # Skip HLS bitrate = variant.get('bitrate', 0) vid_url = variant.get('url', '') # Extract resolution from URL height_match = re.search(r'/(\d+)x(\d+)/', vid_url) height = int(height_match.group(2)) if height_match else 0 quality = f"{height}p" if height else f"{bitrate // 1000}k" formats.append({ "format_id": f"tw-{i}", "quality": quality, "ext": "mp4", "filesize": 0, "note": f"{bitrate // 1000}kbps" if bitrate else "", "_url": vid_url, "_bitrate": bitrate, }) # Sort by bitrate descending formats.sort(key=lambda x: x.get('_bitrate', 0), reverse=True) # Add best option formats.insert(0, { "format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality", }) return { "title": title, "thumbnail": thumbnail, "duration": duration, "formats": [{k: v for k, v in f.items() if not k.startswith('_')} for f in formats], "url": url, "_formats_full": formats, # Keep full info for download } def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None) -> dict: """Download Twitter video using syndication API.""" tweet_id = _extract_tweet_id(url) if not tweet_id: raise ValueError(f"Could not extract tweet ID from URL: {url}") data = _twitter_syndication_info(tweet_id) title = data.get('text', 'Untitled').split('\n')[0][:100] thumbnail = '' duration = 0 best_url = None best_bitrate = 0 for media in data.get('mediaDetails', []): if media.get('type') != 'video': continue thumbnail = media.get('media_url_https', '') video_info = media.get('video_info', {}) duration = (video_info.get('duration_millis', 0) or 0) // 1000 for i, variant in enumerate(video_info.get('variants', [])): if variant.get('content_type') == 'application/x-mpegURL': continue vid_url = variant.get('url', '') bitrate = variant.get('bitrate', 0) if format_id == "best" or format_id == f"tw-{i}": if format_id != "best" or bitrate > best_bitrate: best_url = vid_url best_bitrate = bitrate if format_id != "best": break if not best_url: raise ValueError("No video found in tweet") # Download the video task_id = str(uuid.uuid4())[:8] filename = os.path.join(X_VIDEOS_PATH, f"{tweet_id}_{task_id}.mp4") req = urllib.request.Request(best_url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) resp = urllib.request.urlopen(req, timeout=120) total = int(resp.headers.get('Content-Length', 0)) downloaded = 0 with open(filename, 'wb') as f: while True: chunk = resp.read(65536) if not chunk: break f.write(chunk) downloaded += len(chunk) if progress_callback and total > 0: progress_callback(int(downloaded * 100 / total)) if progress_callback: progress_callback(100) file_size = os.path.getsize(filename) return { "title": title, "thumbnail": thumbnail, "duration": duration, "filename": os.path.basename(filename), "file_path": filename, "file_size": file_size, "platform": "twitter", } def _parse_youtube_video(url: str) -> dict: """Parse YouTube video info using yt-dlp.""" ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": False, "skip_download": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) formats = [] seen = set() for f in info.get("formats", []): if f.get("vcodec", "none") == "none": continue height = f.get("height", 0) if not height: continue ext = f.get("ext", "mp4") fmt_id = f.get("format_id", "") quality = f"{height}p" key = f"{quality}" if key in seen: continue seen.add(key) formats.append({ "format_id": fmt_id, "quality": quality, "ext": ext, "filesize": f.get("filesize") or f.get("filesize_approx") or 0, "note": f.get("format_note", ""), }) formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True) formats.insert(0, { "format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality", }) return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "formats": formats, "url": url, "platform": "youtube", } def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None) -> dict: """Download YouTube video using yt-dlp.""" task_id = str(uuid.uuid4())[:8] output_template = os.path.join(YOUTUBE_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") if format_id == "best": format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" else: format_spec = f"{format_id}+bestaudio/best" def hook(d): if d["status"] == "downloading" and progress_callback: total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 downloaded = d.get("downloaded_bytes", 0) pct = int(downloaded * 100 / total) if total > 0 else 0 progress_callback(pct) elif d["status"] == "finished" and progress_callback: progress_callback(100) ydl_opts = { "format": format_spec, "outtmpl": output_template, "merge_output_format": "mp4", "quiet": True, "no_warnings": True, "progress_hooks": [hook], } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info) if not os.path.exists(filename): base = os.path.splitext(filename)[0] filename = base + ".mp4" file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "filename": os.path.basename(filename), "file_path": filename, "file_size": file_size, "platform": "youtube", } def parse_video_url(url: str) -> dict: """Extract video info without downloading.""" # Use syndication API for Twitter/X URLs if _is_twitter_url(url): logger.info(f"Using Twitter syndication API for: {url}") try: result = _parse_twitter_video(url) result.pop('_formats_full', None) return result except Exception as e: logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}") # YouTube URLs if _is_youtube_url(url): logger.info(f"Parsing YouTube video: {url}") return _parse_youtube_video(url) # Fallback to generic yt-dlp ydl_opts = { "quiet": True, "no_warnings": True, "extract_flat": False, "skip_download": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) formats = [] seen = set() for f in info.get("formats", []): if f.get("vcodec", "none") == "none": continue height = f.get("height", 0) ext = f.get("ext", "mp4") fmt_id = f.get("format_id", "") quality = f"{height}p" if height else f.get("format_note", "unknown") key = f"{quality}-{ext}" if key in seen: continue seen.add(key) formats.append({ "format_id": fmt_id, "quality": quality, "ext": ext, "filesize": f.get("filesize") or f.get("filesize_approx") or 0, "note": f.get("format_note", ""), }) formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True) formats.insert(0, { "format_id": "best", "quality": "best", "ext": "mp4", "filesize": 0, "note": "Best available quality", }) return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "formats": formats, "url": url, } def download_video(url: str, format_id: str = "best", progress_callback=None) -> dict: """Download video and return file info.""" # Use syndication API for Twitter/X URLs if _is_twitter_url(url): logger.info(f"Using Twitter syndication API for download: {url}") try: return _download_twitter_video(url, format_id, progress_callback) except Exception as e: logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}") # YouTube URLs if _is_youtube_url(url): logger.info(f"Downloading YouTube video: {url}") return _download_youtube_video(url, format_id, progress_callback) task_id = str(uuid.uuid4())[:8] output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" if format_id == "best" else f"{format_id}+bestaudio/best" def hook(d): if d["status"] == "downloading" and progress_callback: total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 downloaded = d.get("downloaded_bytes", 0) pct = int(downloaded * 100 / total) if total > 0 else 0 progress_callback(pct) elif d["status"] == "finished" and progress_callback: progress_callback(100) ydl_opts = { "format": format_spec, "outtmpl": output_template, "merge_output_format": "mp4", "quiet": True, "no_warnings": True, "progress_hooks": [hook], } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) filename = ydl.prepare_filename(info) # yt-dlp may change extension after merge if not os.path.exists(filename): base = os.path.splitext(filename)[0] filename = base + ".mp4" file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 return { "title": info.get("title", "Untitled"), "thumbnail": info.get("thumbnail", ""), "duration": info.get("duration", 0) or 0, "filename": os.path.basename(filename), "file_path": filename, "file_size": file_size, "platform": "twitter", }