From d3eed07e448adc89258e4d6b9a8890e7fd191e75 Mon Sep 17 00:00:00 2001 From: mini Date: Wed, 18 Feb 2026 21:00:17 +0800 Subject: [PATCH] feat: add YouTube video download support --- backend/app/routes/parse.py | 1 + backend/app/schemas.py | 1 + backend/app/services/downloader.py | 156 +++++++++++++++++++++++++---- frontend/src/views/Home.vue | 6 +- 4 files changed, 144 insertions(+), 20 deletions(-) diff --git a/backend/app/routes/parse.py b/backend/app/routes/parse.py index aeabd60..730e53e 100644 --- a/backend/app/routes/parse.py +++ b/backend/app/routes/parse.py @@ -16,6 +16,7 @@ async def parse_url(req: ParseRequest): duration=info["duration"], formats=[FormatInfo(**f) for f in info["formats"]], url=info["url"], + platform=info.get("platform", "twitter"), ) except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to parse URL: {str(e)}") diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 1b991e0..8152d4b 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -22,6 +22,7 @@ class ParseResponse(BaseModel): duration: int formats: list[FormatInfo] url: str + platform: str = "" class DownloadRequest(BaseModel): diff --git a/backend/app/services/downloader.py b/backend/app/services/downloader.py index 72a9ea0..a291426 100644 --- a/backend/app/services/downloader.py +++ b/backend/app/services/downloader.py @@ -14,9 +14,16 @@ logger = logging.getLogger(__name__) VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos") X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos") +YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos") # Ensure directories exist os.makedirs(X_VIDEOS_PATH, exist_ok=True) +os.makedirs(YOUTUBE_VIDEOS_PATH, exist_ok=True) + +# Pattern to match YouTube URLs +YOUTUBE_URL_RE = re.compile( + r'https?://(?:(?:www\.|m\.)?youtube\.com/(?:watch\?.*v=|shorts/|embed/|v/)|youtu\.be/)[\w-]+' +) # Pattern to match Twitter/X URLs and extract tweet ID TWITTER_URL_RE = re.compile( @@ -24,10 +31,16 @@ TWITTER_URL_RE = re.compile( ) -def get_video_path(filename: str) -> str: +def get_video_path(filename: str, platform: str = "twitter") -> str: + if platform == "youtube": + return os.path.join(YOUTUBE_VIDEOS_PATH, filename) return os.path.join(X_VIDEOS_PATH, filename) +def _is_youtube_url(url: str) -> bool: + return bool(YOUTUBE_URL_RE.match(url)) + + def _is_twitter_url(url: str) -> bool: return bool(TWITTER_URL_RE.match(url)) @@ -184,19 +197,126 @@ def _download_twitter_video(url: str, format_id: str = "best", progress_callback } -def parse_video_url(url: str) -> dict: - """Extract video info without downloading.""" - # Use syndication API for Twitter/X URLs - if _is_twitter_url(url): - logger.info(f"Using Twitter syndication API for: {url}") - try: - result = _parse_twitter_video(url) - # Remove internal keys before returning - result.pop('_formats_full', None) - return result - except Exception as e: - logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}") - +def _parse_youtube_video(url: str) -> dict: + """Parse YouTube video info using yt-dlp.""" + ydl_opts = { + "quiet": True, + "no_warnings": True, + "extract_flat": False, + "skip_download": True, + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + + formats = [] + seen = set() + for f in info.get("formats", []): + if f.get("vcodec", "none") == "none": + continue + height = f.get("height", 0) + if not height: + continue + ext = f.get("ext", "mp4") + fmt_id = f.get("format_id", "") + quality = f"{height}p" + key = f"{quality}" + if key in seen: + continue + seen.add(key) + formats.append({ + "format_id": fmt_id, + "quality": quality, + "ext": ext, + "filesize": f.get("filesize") or f.get("filesize_approx") or 0, + "note": f.get("format_note", ""), + }) + + formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True) + + formats.insert(0, { + "format_id": "best", + "quality": "best", + "ext": "mp4", + "filesize": 0, + "note": "Best available quality", + }) + + return { + "title": info.get("title", "Untitled"), + "thumbnail": info.get("thumbnail", ""), + "duration": info.get("duration", 0) or 0, + "formats": formats, + "url": url, + "platform": "youtube", + } + + +def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None) -> dict: + """Download YouTube video using yt-dlp.""" + task_id = str(uuid.uuid4())[:8] + output_template = os.path.join(YOUTUBE_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") + + if format_id == "best": + format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" + else: + format_spec = f"{format_id}+bestaudio/best" + + def hook(d): + if d["status"] == "downloading" and progress_callback: + total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 + downloaded = d.get("downloaded_bytes", 0) + pct = int(downloaded * 100 / total) if total > 0 else 0 + progress_callback(pct) + elif d["status"] == "finished" and progress_callback: + progress_callback(100) + + ydl_opts = { + "format": format_spec, + "outtmpl": output_template, + "merge_output_format": "mp4", + "quiet": True, + "no_warnings": True, + "progress_hooks": [hook], + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=True) + filename = ydl.prepare_filename(info) + if not os.path.exists(filename): + base = os.path.splitext(filename)[0] + filename = base + ".mp4" + + file_size = os.path.getsize(filename) if os.path.exists(filename) else 0 + + return { + "title": info.get("title", "Untitled"), + "thumbnail": info.get("thumbnail", ""), + "duration": info.get("duration", 0) or 0, + "filename": os.path.basename(filename), + "file_path": filename, + "file_size": file_size, + "platform": "youtube", + } + + +def parse_video_url(url: str) -> dict: + """Extract video info without downloading.""" + # Use syndication API for Twitter/X URLs + if _is_twitter_url(url): + logger.info(f"Using Twitter syndication API for: {url}") + try: + result = _parse_twitter_video(url) + result.pop('_formats_full', None) + return result + except Exception as e: + logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}") + + # YouTube URLs + if _is_youtube_url(url): + logger.info(f"Parsing YouTube video: {url}") + return _parse_youtube_video(url) + + # Fallback to generic yt-dlp ydl_opts = { "quiet": True, "no_warnings": True, @@ -209,7 +329,6 @@ def parse_video_url(url: str) -> dict: formats = [] seen = set() for f in info.get("formats", []): - # Only video formats with both video and audio, or video-only if f.get("vcodec", "none") == "none": continue height = f.get("height", 0) @@ -228,10 +347,8 @@ def parse_video_url(url: str) -> dict: "note": f.get("format_note", ""), }) - # Sort by resolution descending formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True) - # Add a "best" option formats.insert(0, { "format_id": "best", "quality": "best", @@ -259,6 +376,11 @@ def download_video(url: str, format_id: str = "best", progress_callback=None) -> except Exception as e: logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}") + # YouTube URLs + if _is_youtube_url(url): + logger.info(f"Downloading YouTube video: {url}") + return _download_youtube_video(url, format_id, progress_callback) + task_id = str(uuid.uuid4())[:8] output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") diff --git a/frontend/src/views/Home.vue b/frontend/src/views/Home.vue index 2145cdf..6f8e879 100644 --- a/frontend/src/views/Home.vue +++ b/frontend/src/views/Home.vue @@ -1,10 +1,10 @@