feat: add YouTube video download support

This commit is contained in:
mini
2026-02-18 21:00:17 +08:00
parent 958bdcd100
commit d3eed07e44
4 changed files with 144 additions and 20 deletions

View File

@@ -16,6 +16,7 @@ async def parse_url(req: ParseRequest):
duration=info["duration"], duration=info["duration"],
formats=[FormatInfo(**f) for f in info["formats"]], formats=[FormatInfo(**f) for f in info["formats"]],
url=info["url"], url=info["url"],
platform=info.get("platform", "twitter"),
) )
except Exception as e: except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to parse URL: {str(e)}") raise HTTPException(status_code=400, detail=f"Failed to parse URL: {str(e)}")

View File

@@ -22,6 +22,7 @@ class ParseResponse(BaseModel):
duration: int duration: int
formats: list[FormatInfo] formats: list[FormatInfo]
url: str url: str
platform: str = ""
class DownloadRequest(BaseModel): class DownloadRequest(BaseModel):

View File

@@ -14,9 +14,16 @@ logger = logging.getLogger(__name__)
VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos") VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos")
X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos") X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos")
YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos")
# Ensure directories exist # Ensure directories exist
os.makedirs(X_VIDEOS_PATH, exist_ok=True) os.makedirs(X_VIDEOS_PATH, exist_ok=True)
os.makedirs(YOUTUBE_VIDEOS_PATH, exist_ok=True)
# Pattern to match YouTube URLs
YOUTUBE_URL_RE = re.compile(
r'https?://(?:(?:www\.|m\.)?youtube\.com/(?:watch\?.*v=|shorts/|embed/|v/)|youtu\.be/)[\w-]+'
)
# Pattern to match Twitter/X URLs and extract tweet ID # Pattern to match Twitter/X URLs and extract tweet ID
TWITTER_URL_RE = re.compile( TWITTER_URL_RE = re.compile(
@@ -24,10 +31,16 @@ TWITTER_URL_RE = re.compile(
) )
def get_video_path(filename: str) -> str: def get_video_path(filename: str, platform: str = "twitter") -> str:
if platform == "youtube":
return os.path.join(YOUTUBE_VIDEOS_PATH, filename)
return os.path.join(X_VIDEOS_PATH, filename) return os.path.join(X_VIDEOS_PATH, filename)
def _is_youtube_url(url: str) -> bool:
return bool(YOUTUBE_URL_RE.match(url))
def _is_twitter_url(url: str) -> bool: def _is_twitter_url(url: str) -> bool:
return bool(TWITTER_URL_RE.match(url)) return bool(TWITTER_URL_RE.match(url))
@@ -184,19 +197,126 @@ def _download_twitter_video(url: str, format_id: str = "best", progress_callback
} }
def parse_video_url(url: str) -> dict: def _parse_youtube_video(url: str) -> dict:
"""Extract video info without downloading.""" """Parse YouTube video info using yt-dlp."""
# Use syndication API for Twitter/X URLs ydl_opts = {
if _is_twitter_url(url): "quiet": True,
logger.info(f"Using Twitter syndication API for: {url}") "no_warnings": True,
try: "extract_flat": False,
result = _parse_twitter_video(url) "skip_download": True,
# Remove internal keys before returning }
result.pop('_formats_full', None) with yt_dlp.YoutubeDL(ydl_opts) as ydl:
return result info = ydl.extract_info(url, download=False)
except Exception as e:
logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}") formats = []
seen = set()
for f in info.get("formats", []):
if f.get("vcodec", "none") == "none":
continue
height = f.get("height", 0)
if not height:
continue
ext = f.get("ext", "mp4")
fmt_id = f.get("format_id", "")
quality = f"{height}p"
key = f"{quality}"
if key in seen:
continue
seen.add(key)
formats.append({
"format_id": fmt_id,
"quality": quality,
"ext": ext,
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
"note": f.get("format_note", ""),
})
formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True)
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"formats": formats,
"url": url,
"platform": "youtube",
}
def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None) -> dict:
"""Download YouTube video using yt-dlp."""
task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(YOUTUBE_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
if format_id == "best":
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"
else:
format_spec = f"{format_id}+bestaudio/best"
def hook(d):
if d["status"] == "downloading" and progress_callback:
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
downloaded = d.get("downloaded_bytes", 0)
pct = int(downloaded * 100 / total) if total > 0 else 0
progress_callback(pct)
elif d["status"] == "finished" and progress_callback:
progress_callback(100)
ydl_opts = {
"format": format_spec,
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"progress_hooks": [hook],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
base = os.path.splitext(filename)[0]
filename = base + ".mp4"
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "youtube",
}
def parse_video_url(url: str) -> dict:
"""Extract video info without downloading."""
# Use syndication API for Twitter/X URLs
if _is_twitter_url(url):
logger.info(f"Using Twitter syndication API for: {url}")
try:
result = _parse_twitter_video(url)
result.pop('_formats_full', None)
return result
except Exception as e:
logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}")
# YouTube URLs
if _is_youtube_url(url):
logger.info(f"Parsing YouTube video: {url}")
return _parse_youtube_video(url)
# Fallback to generic yt-dlp
ydl_opts = { ydl_opts = {
"quiet": True, "quiet": True,
"no_warnings": True, "no_warnings": True,
@@ -209,7 +329,6 @@ def parse_video_url(url: str) -> dict:
formats = [] formats = []
seen = set() seen = set()
for f in info.get("formats", []): for f in info.get("formats", []):
# Only video formats with both video and audio, or video-only
if f.get("vcodec", "none") == "none": if f.get("vcodec", "none") == "none":
continue continue
height = f.get("height", 0) height = f.get("height", 0)
@@ -228,10 +347,8 @@ def parse_video_url(url: str) -> dict:
"note": f.get("format_note", ""), "note": f.get("format_note", ""),
}) })
# Sort by resolution descending
formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True) formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True)
# Add a "best" option
formats.insert(0, { formats.insert(0, {
"format_id": "best", "format_id": "best",
"quality": "best", "quality": "best",
@@ -259,6 +376,11 @@ def download_video(url: str, format_id: str = "best", progress_callback=None) ->
except Exception as e: except Exception as e:
logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}") logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}")
# YouTube URLs
if _is_youtube_url(url):
logger.info(f"Downloading YouTube video: {url}")
return _download_youtube_video(url, format_id, progress_callback)
task_id = str(uuid.uuid4())[:8] task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")

View File

@@ -1,10 +1,10 @@
<template> <template>
<div class="home"> <div class="home">
<h1>Twitter/X Video Downloader</h1> <h1>Video Downloader</h1>
<p class="subtitle">Paste a Twitter/X video link to download</p> <p class="subtitle">Paste a Twitter/X or YouTube video link to download</p>
<div class="input-group"> <div class="input-group">
<input v-model="url" placeholder="https://x.com/user/status/123..." @keyup.enter="parseUrl" :disabled="loading" /> <input v-model="url" placeholder="https://x.com/... or https://youtube.com/watch?v=..." @keyup.enter="parseUrl" :disabled="loading" />
<button @click="parseUrl" :disabled="loading || !url.trim()"> <button @click="parseUrl" :disabled="loading || !url.trim()">
{{ loading ? '⏳ Parsing...' : '🔍 Parse' }} {{ loading ? '⏳ Parsing...' : '🔍 Parse' }}
</button> </button>