diff --git a/backend/app/services/downloader.py b/backend/app/services/downloader.py index 5474eb8..72a9ea0 100644 --- a/backend/app/services/downloader.py +++ b/backend/app/services/downloader.py @@ -1,8 +1,11 @@ """yt-dlp wrapper service for video downloading.""" import os +import re import uuid +import json import asyncio import logging +import urllib.request from pathlib import Path from typing import Optional import yt_dlp @@ -15,13 +18,185 @@ X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos") # Ensure directories exist os.makedirs(X_VIDEOS_PATH, exist_ok=True) +# Pattern to match Twitter/X URLs and extract tweet ID +TWITTER_URL_RE = re.compile( + r'https?://(?:(?:www\.)?(?:twitter\.com|x\.com)|[a-z]*twitter\.com)/\w+/status/(\d+)' +) + def get_video_path(filename: str) -> str: return os.path.join(X_VIDEOS_PATH, filename) +def _is_twitter_url(url: str) -> bool: + return bool(TWITTER_URL_RE.match(url)) + + +def _extract_tweet_id(url: str) -> Optional[str]: + m = TWITTER_URL_RE.match(url) + return m.group(1) if m else None + + +def _twitter_syndication_info(tweet_id: str) -> dict: + """Fetch tweet info via Twitter's syndication API (no auth required).""" + api_url = f'https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}&token=x' + req = urllib.request.Request(api_url, headers={ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + }) + resp = urllib.request.urlopen(req, timeout=15) + return json.loads(resp.read().decode()) + + +def _parse_twitter_video(url: str) -> dict: + """Parse Twitter video using syndication API.""" + tweet_id = _extract_tweet_id(url) + if not tweet_id: + raise ValueError(f"Could not extract tweet ID from URL: {url}") + + data = _twitter_syndication_info(tweet_id) + title = data.get('text', 'Untitled') + # Truncate title to first line or 100 chars + title = title.split('\n')[0][:100] + + thumbnail = '' + duration = 0 + formats = [] + + for media in data.get('mediaDetails', []): + if media.get('type') != 'video': + continue + thumbnail = media.get('media_url_https', '') + video_info = media.get('video_info', {}) + duration = (video_info.get('duration_millis', 0) or 0) // 1000 + + for i, variant in enumerate(video_info.get('variants', [])): + content_type = variant.get('content_type', '') + if content_type == 'application/x-mpegURL': + continue # Skip HLS + bitrate = variant.get('bitrate', 0) + vid_url = variant.get('url', '') + # Extract resolution from URL + height_match = re.search(r'/(\d+)x(\d+)/', vid_url) + height = int(height_match.group(2)) if height_match else 0 + quality = f"{height}p" if height else f"{bitrate // 1000}k" + formats.append({ + "format_id": f"tw-{i}", + "quality": quality, + "ext": "mp4", + "filesize": 0, + "note": f"{bitrate // 1000}kbps" if bitrate else "", + "_url": vid_url, + "_bitrate": bitrate, + }) + + # Sort by bitrate descending + formats.sort(key=lambda x: x.get('_bitrate', 0), reverse=True) + + # Add best option + formats.insert(0, { + "format_id": "best", + "quality": "best", + "ext": "mp4", + "filesize": 0, + "note": "Best available quality", + }) + + return { + "title": title, + "thumbnail": thumbnail, + "duration": duration, + "formats": [{k: v for k, v in f.items() if not k.startswith('_')} for f in formats], + "url": url, + "_formats_full": formats, # Keep full info for download + } + + +def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None) -> dict: + """Download Twitter video using syndication API.""" + tweet_id = _extract_tweet_id(url) + if not tweet_id: + raise ValueError(f"Could not extract tweet ID from URL: {url}") + + data = _twitter_syndication_info(tweet_id) + title = data.get('text', 'Untitled').split('\n')[0][:100] + thumbnail = '' + duration = 0 + best_url = None + best_bitrate = 0 + + for media in data.get('mediaDetails', []): + if media.get('type') != 'video': + continue + thumbnail = media.get('media_url_https', '') + video_info = media.get('video_info', {}) + duration = (video_info.get('duration_millis', 0) or 0) // 1000 + + for i, variant in enumerate(video_info.get('variants', [])): + if variant.get('content_type') == 'application/x-mpegURL': + continue + vid_url = variant.get('url', '') + bitrate = variant.get('bitrate', 0) + + if format_id == "best" or format_id == f"tw-{i}": + if format_id != "best" or bitrate > best_bitrate: + best_url = vid_url + best_bitrate = bitrate + if format_id != "best": + break + + if not best_url: + raise ValueError("No video found in tweet") + + # Download the video + task_id = str(uuid.uuid4())[:8] + filename = os.path.join(X_VIDEOS_PATH, f"{tweet_id}_{task_id}.mp4") + + req = urllib.request.Request(best_url, headers={ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + }) + resp = urllib.request.urlopen(req, timeout=120) + total = int(resp.headers.get('Content-Length', 0)) + downloaded = 0 + + with open(filename, 'wb') as f: + while True: + chunk = resp.read(65536) + if not chunk: + break + f.write(chunk) + downloaded += len(chunk) + if progress_callback and total > 0: + progress_callback(int(downloaded * 100 / total)) + + if progress_callback: + progress_callback(100) + + file_size = os.path.getsize(filename) + + return { + "title": title, + "thumbnail": thumbnail, + "duration": duration, + "filename": os.path.basename(filename), + "file_path": filename, + "file_size": file_size, + "platform": "twitter", + } + + def parse_video_url(url: str) -> dict: """Extract video info without downloading.""" + # Use syndication API for Twitter/X URLs + if _is_twitter_url(url): + logger.info(f"Using Twitter syndication API for: {url}") + try: + result = _parse_twitter_video(url) + # Remove internal keys before returning + result.pop('_formats_full', None) + return result + except Exception as e: + logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}") + ydl_opts = { "quiet": True, "no_warnings": True, @@ -76,6 +251,14 @@ def parse_video_url(url: str) -> dict: def download_video(url: str, format_id: str = "best", progress_callback=None) -> dict: """Download video and return file info.""" + # Use syndication API for Twitter/X URLs + if _is_twitter_url(url): + logger.info(f"Using Twitter syndication API for download: {url}") + try: + return _download_twitter_video(url, format_id, progress_callback) + except Exception as e: + logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}") + task_id = str(uuid.uuid4())[:8] output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")