Files
xdl/backend/app/services/downloader.py

561 lines
18 KiB
Python

"""yt-dlp wrapper service for video downloading."""
import os
import re
import uuid
import json
import asyncio
import logging
import urllib.request
from pathlib import Path
from typing import Optional
import yt_dlp
logger = logging.getLogger(__name__)
VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos")
X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos")
YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos")
PH_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "ph_videos")
# Ensure directories exist
os.makedirs(X_VIDEOS_PATH, exist_ok=True)
os.makedirs(YOUTUBE_VIDEOS_PATH, exist_ok=True)
os.makedirs(PH_VIDEOS_PATH, exist_ok=True)
# Pattern to match YouTube URLs
YOUTUBE_URL_RE = re.compile(
r'https?://(?:(?:www\.|m\.)?youtube\.com/(?:watch\?.*v=|shorts/|embed/|v/)|youtu\.be/)[\w-]+'
)
# Pattern to match Twitter/X URLs and extract tweet ID
TWITTER_URL_RE = re.compile(
r'https?://(?:(?:www\.)?(?:twitter\.com|x\.com)|[a-z]*twitter\.com)/\w+/status/(\d+)'
)
# Pattern to match Pornhub URLs
PORNHUB_URL_RE = re.compile(
r'https?://(?:[\w-]+\.)?pornhub\.com/(?:view_video\.php\?viewkey=|video/|embed/)[\w-]+'
r'|https?://phub\.to/[\w-]+'
)
def get_video_path(filename: str, platform: str = "twitter") -> str:
if platform == "youtube":
return os.path.join(YOUTUBE_VIDEOS_PATH, filename)
if platform == "pornhub":
return os.path.join(PH_VIDEOS_PATH, filename)
return os.path.join(X_VIDEOS_PATH, filename)
def _is_youtube_url(url: str) -> bool:
return bool(YOUTUBE_URL_RE.match(url))
def _is_pornhub_url(url: str) -> bool:
return bool(PORNHUB_URL_RE.match(url))
def _is_twitter_url(url: str) -> bool:
return bool(TWITTER_URL_RE.match(url))
def _extract_tweet_id(url: str) -> Optional[str]:
m = TWITTER_URL_RE.match(url)
return m.group(1) if m else None
def _twitter_syndication_info(tweet_id: str) -> dict:
"""Fetch tweet info via Twitter's syndication API (no auth required)."""
api_url = f'https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}&token=x'
req = urllib.request.Request(api_url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
resp = urllib.request.urlopen(req, timeout=15)
return json.loads(resp.read().decode())
def _parse_twitter_video(url: str) -> dict:
"""Parse Twitter video using syndication API."""
tweet_id = _extract_tweet_id(url)
if not tweet_id:
raise ValueError(f"Could not extract tweet ID from URL: {url}")
data = _twitter_syndication_info(tweet_id)
title = data.get('text', 'Untitled')
# Truncate title to first line or 100 chars
title = title.split('\n')[0][:100]
thumbnail = ''
duration = 0
formats = []
for media in data.get('mediaDetails', []):
if media.get('type') != 'video':
continue
thumbnail = media.get('media_url_https', '')
video_info = media.get('video_info', {})
duration = (video_info.get('duration_millis', 0) or 0) // 1000
for i, variant in enumerate(video_info.get('variants', [])):
content_type = variant.get('content_type', '')
if content_type == 'application/x-mpegURL':
continue # Skip HLS
bitrate = variant.get('bitrate', 0)
vid_url = variant.get('url', '')
# Extract resolution from URL
height_match = re.search(r'/(\d+)x(\d+)/', vid_url)
height = int(height_match.group(2)) if height_match else 0
quality = f"{height}p" if height else f"{bitrate // 1000}k"
formats.append({
"format_id": f"tw-{i}",
"quality": quality,
"ext": "mp4",
"filesize": 0,
"note": f"{bitrate // 1000}kbps" if bitrate else "",
"_url": vid_url,
"_bitrate": bitrate,
})
# Sort by bitrate descending
formats.sort(key=lambda x: x.get('_bitrate', 0), reverse=True)
# Add best option
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"formats": [{k: v for k, v in f.items() if not k.startswith('_')} for f in formats],
"url": url,
"_formats_full": formats, # Keep full info for download
}
def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None) -> dict:
"""Download Twitter video using syndication API."""
tweet_id = _extract_tweet_id(url)
if not tweet_id:
raise ValueError(f"Could not extract tweet ID from URL: {url}")
data = _twitter_syndication_info(tweet_id)
title = data.get('text', 'Untitled').split('\n')[0][:100]
thumbnail = ''
duration = 0
best_url = None
best_bitrate = 0
for media in data.get('mediaDetails', []):
if media.get('type') != 'video':
continue
thumbnail = media.get('media_url_https', '')
video_info = media.get('video_info', {})
duration = (video_info.get('duration_millis', 0) or 0) // 1000
for i, variant in enumerate(video_info.get('variants', [])):
if variant.get('content_type') == 'application/x-mpegURL':
continue
vid_url = variant.get('url', '')
bitrate = variant.get('bitrate', 0)
if format_id == "best" or format_id == f"tw-{i}":
if format_id != "best" or bitrate > best_bitrate:
best_url = vid_url
best_bitrate = bitrate
if format_id != "best":
break
if not best_url:
raise ValueError("No video found in tweet")
# Download the video
task_id = str(uuid.uuid4())[:8]
filename = os.path.join(X_VIDEOS_PATH, f"{tweet_id}_{task_id}.mp4")
req = urllib.request.Request(best_url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
resp = urllib.request.urlopen(req, timeout=120)
total = int(resp.headers.get('Content-Length', 0))
downloaded = 0
with open(filename, 'wb') as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if progress_callback and total > 0:
progress_callback(int(downloaded * 100 / total))
if progress_callback:
progress_callback(100)
file_size = os.path.getsize(filename)
return {
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "twitter",
}
def _parse_youtube_video(url: str) -> dict:
"""Parse YouTube video info using yt-dlp."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": False,
"skip_download": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = []
seen = set()
for f in info.get("formats", []):
if f.get("vcodec", "none") == "none":
continue
height = f.get("height", 0)
if not height:
continue
ext = f.get("ext", "mp4")
fmt_id = f.get("format_id", "")
quality = f"{height}p"
key = f"{quality}"
if key in seen:
continue
seen.add(key)
formats.append({
"format_id": fmt_id,
"quality": quality,
"ext": ext,
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
"note": f.get("format_note", ""),
})
formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True)
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"formats": formats,
"url": url,
"platform": "youtube",
}
def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None) -> dict:
"""Download YouTube video using yt-dlp."""
task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(YOUTUBE_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
if format_id == "best":
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"
else:
format_spec = f"{format_id}+bestaudio/best"
def hook(d):
if d["status"] == "downloading" and progress_callback:
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
downloaded = d.get("downloaded_bytes", 0)
pct = int(downloaded * 100 / total) if total > 0 else 0
progress_callback(pct)
elif d["status"] == "finished" and progress_callback:
progress_callback(100)
ydl_opts = {
"format": format_spec,
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"progress_hooks": [hook],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
base = os.path.splitext(filename)[0]
filename = base + ".mp4"
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "youtube",
}
_PH_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Referer": "https://www.pornhub.com/",
}
def _parse_pornhub_video(url: str) -> dict:
"""Parse Pornhub video info using yt-dlp."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": False,
"skip_download": True,
"http_headers": _PH_HEADERS,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = []
seen = set()
for f in info.get("formats", []):
if f.get("vcodec", "none") == "none":
continue
height = f.get("height", 0)
if not height:
continue
ext = f.get("ext", "mp4")
fmt_id = f.get("format_id", "")
quality = f"{height}p"
if quality in seen:
continue
seen.add(quality)
formats.append({
"format_id": fmt_id,
"quality": quality,
"ext": ext,
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
"note": f.get("format_note", ""),
})
formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True)
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"formats": formats,
"url": url,
"platform": "pornhub",
}
def _download_pornhub_video(url: str, format_id: str = "best", progress_callback=None) -> dict:
"""Download Pornhub video using yt-dlp."""
task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(PH_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
if format_id == "best":
# Prefer mp4 with audio; fall back to best available
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio/best[ext=mp4]/best"
else:
# The format may already contain audio (merged); try with audio fallback gracefully
format_spec = f"{format_id}+bestaudio/{format_id}/best"
def hook(d):
if d["status"] == "downloading" and progress_callback:
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
downloaded = d.get("downloaded_bytes", 0)
pct = int(downloaded * 100 / total) if total > 0 else 0
progress_callback(pct)
elif d["status"] == "finished" and progress_callback:
progress_callback(100)
ydl_opts = {
"format": format_spec,
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"http_headers": _PH_HEADERS,
"progress_hooks": [hook],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
base = os.path.splitext(filename)[0]
filename = base + ".mp4"
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "pornhub",
}
def parse_video_url(url: str) -> dict:
"""Extract video info without downloading."""
# Use syndication API for Twitter/X URLs
if _is_twitter_url(url):
logger.info(f"Using Twitter syndication API for: {url}")
try:
result = _parse_twitter_video(url)
result.pop('_formats_full', None)
return result
except Exception as e:
logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}")
# YouTube URLs
if _is_youtube_url(url):
logger.info(f"Parsing YouTube video: {url}")
return _parse_youtube_video(url)
# Pornhub URLs
if _is_pornhub_url(url):
logger.info(f"Parsing Pornhub video: {url}")
return _parse_pornhub_video(url)
# Fallback to generic yt-dlp
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": False,
"skip_download": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = []
seen = set()
for f in info.get("formats", []):
if f.get("vcodec", "none") == "none":
continue
height = f.get("height", 0)
ext = f.get("ext", "mp4")
fmt_id = f.get("format_id", "")
quality = f"{height}p" if height else f.get("format_note", "unknown")
key = f"{quality}-{ext}"
if key in seen:
continue
seen.add(key)
formats.append({
"format_id": fmt_id,
"quality": quality,
"ext": ext,
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
"note": f.get("format_note", ""),
})
formats.sort(key=lambda x: int(x["quality"].replace("p", "")) if x["quality"].endswith("p") else 0, reverse=True)
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"formats": formats,
"url": url,
}
def download_video(url: str, format_id: str = "best", progress_callback=None) -> dict:
"""Download video and return file info."""
# Use syndication API for Twitter/X URLs
if _is_twitter_url(url):
logger.info(f"Using Twitter syndication API for download: {url}")
try:
return _download_twitter_video(url, format_id, progress_callback)
except Exception as e:
logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}")
# YouTube URLs
if _is_youtube_url(url):
logger.info(f"Downloading YouTube video: {url}")
return _download_youtube_video(url, format_id, progress_callback)
# Pornhub URLs
if _is_pornhub_url(url):
logger.info(f"Downloading Pornhub video: {url}")
return _download_pornhub_video(url, format_id, progress_callback)
task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" if format_id == "best" else f"{format_id}+bestaudio/best"
def hook(d):
if d["status"] == "downloading" and progress_callback:
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
downloaded = d.get("downloaded_bytes", 0)
pct = int(downloaded * 100 / total) if total > 0 else 0
progress_callback(pct)
elif d["status"] == "finished" and progress_callback:
progress_callback(100)
ydl_opts = {
"format": format_spec,
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"progress_hooks": [hook],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
filename = ydl.prepare_filename(info)
# yt-dlp may change extension after merge
if not os.path.exists(filename):
base = os.path.splitext(filename)[0]
filename = base + ".mp4"
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
return {
"title": info.get("title", "Untitled"),
"thumbnail": info.get("thumbnail", ""),
"duration": info.get("duration", 0) or 0,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "twitter",
}