feat: add Pornhub video download support
This commit is contained in:
@@ -15,10 +15,12 @@ logger = logging.getLogger(__name__)
|
||||
VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos")
|
||||
X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos")
|
||||
YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos")
|
||||
PH_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "ph_videos")
|
||||
|
||||
# Ensure directories exist
|
||||
os.makedirs(X_VIDEOS_PATH, exist_ok=True)
|
||||
os.makedirs(YOUTUBE_VIDEOS_PATH, exist_ok=True)
|
||||
os.makedirs(PH_VIDEOS_PATH, exist_ok=True)
|
||||
|
||||
# Pattern to match YouTube URLs
|
||||
YOUTUBE_URL_RE = re.compile(
|
||||
@@ -30,10 +32,18 @@ TWITTER_URL_RE = re.compile(
|
||||
r'https?://(?:(?:www\.)?(?:twitter\.com|x\.com)|[a-z]*twitter\.com)/\w+/status/(\d+)'
|
||||
)
|
||||
|
||||
# Pattern to match Pornhub URLs
|
||||
PORNHUB_URL_RE = re.compile(
|
||||
r'https?://(?:[\w-]+\.)?pornhub\.com/(?:view_video\.php\?viewkey=|video/|embed/)[\w-]+'
|
||||
r'|https?://phub\.to/[\w-]+'
|
||||
)
|
||||
|
||||
|
||||
def get_video_path(filename: str, platform: str = "twitter") -> str:
|
||||
if platform == "youtube":
|
||||
return os.path.join(YOUTUBE_VIDEOS_PATH, filename)
|
||||
if platform == "pornhub":
|
||||
return os.path.join(PH_VIDEOS_PATH, filename)
|
||||
return os.path.join(X_VIDEOS_PATH, filename)
|
||||
|
||||
|
||||
@@ -41,6 +51,10 @@ def _is_youtube_url(url: str) -> bool:
|
||||
return bool(YOUTUBE_URL_RE.match(url))
|
||||
|
||||
|
||||
def _is_pornhub_url(url: str) -> bool:
|
||||
return bool(PORNHUB_URL_RE.match(url))
|
||||
|
||||
|
||||
def _is_twitter_url(url: str) -> bool:
|
||||
return bool(TWITTER_URL_RE.match(url))
|
||||
|
||||
@@ -299,6 +313,107 @@ def _download_youtube_video(url: str, format_id: str = "best", progress_callback
|
||||
}
|
||||
|
||||
|
||||
def _parse_pornhub_video(url: str) -> dict:
|
||||
"""Parse Pornhub video info using yt-dlp."""
|
||||
ydl_opts = {
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
"extract_flat": False,
|
||||
"skip_download": True,
|
||||
}
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(url, download=False)
|
||||
|
||||
formats = []
|
||||
seen = set()
|
||||
for f in info.get("formats", []):
|
||||
if f.get("vcodec", "none") == "none":
|
||||
continue
|
||||
height = f.get("height", 0)
|
||||
if not height:
|
||||
continue
|
||||
ext = f.get("ext", "mp4")
|
||||
fmt_id = f.get("format_id", "")
|
||||
quality = f"{height}p"
|
||||
if quality in seen:
|
||||
continue
|
||||
seen.add(quality)
|
||||
formats.append({
|
||||
"format_id": fmt_id,
|
||||
"quality": quality,
|
||||
"ext": ext,
|
||||
"filesize": f.get("filesize") or f.get("filesize_approx") or 0,
|
||||
"note": f.get("format_note", ""),
|
||||
})
|
||||
|
||||
formats.sort(key=lambda x: int(x["quality"].replace("p", "")), reverse=True)
|
||||
|
||||
formats.insert(0, {
|
||||
"format_id": "best",
|
||||
"quality": "best",
|
||||
"ext": "mp4",
|
||||
"filesize": 0,
|
||||
"note": "Best available quality",
|
||||
})
|
||||
|
||||
return {
|
||||
"title": info.get("title", "Untitled"),
|
||||
"thumbnail": info.get("thumbnail", ""),
|
||||
"duration": info.get("duration", 0) or 0,
|
||||
"formats": formats,
|
||||
"url": url,
|
||||
"platform": "pornhub",
|
||||
}
|
||||
|
||||
|
||||
def _download_pornhub_video(url: str, format_id: str = "best", progress_callback=None) -> dict:
|
||||
"""Download Pornhub video using yt-dlp."""
|
||||
task_id = str(uuid.uuid4())[:8]
|
||||
output_template = os.path.join(PH_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
|
||||
|
||||
if format_id == "best":
|
||||
format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best"
|
||||
else:
|
||||
format_spec = f"{format_id}+bestaudio/best"
|
||||
|
||||
def hook(d):
|
||||
if d["status"] == "downloading" and progress_callback:
|
||||
total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0
|
||||
downloaded = d.get("downloaded_bytes", 0)
|
||||
pct = int(downloaded * 100 / total) if total > 0 else 0
|
||||
progress_callback(pct)
|
||||
elif d["status"] == "finished" and progress_callback:
|
||||
progress_callback(100)
|
||||
|
||||
ydl_opts = {
|
||||
"format": format_spec,
|
||||
"outtmpl": output_template,
|
||||
"merge_output_format": "mp4",
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
"progress_hooks": [hook],
|
||||
}
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(url, download=True)
|
||||
filename = ydl.prepare_filename(info)
|
||||
if not os.path.exists(filename):
|
||||
base = os.path.splitext(filename)[0]
|
||||
filename = base + ".mp4"
|
||||
|
||||
file_size = os.path.getsize(filename) if os.path.exists(filename) else 0
|
||||
|
||||
return {
|
||||
"title": info.get("title", "Untitled"),
|
||||
"thumbnail": info.get("thumbnail", ""),
|
||||
"duration": info.get("duration", 0) or 0,
|
||||
"filename": os.path.basename(filename),
|
||||
"file_path": filename,
|
||||
"file_size": file_size,
|
||||
"platform": "pornhub",
|
||||
}
|
||||
|
||||
|
||||
def parse_video_url(url: str) -> dict:
|
||||
"""Extract video info without downloading."""
|
||||
# Use syndication API for Twitter/X URLs
|
||||
@@ -316,6 +431,11 @@ def parse_video_url(url: str) -> dict:
|
||||
logger.info(f"Parsing YouTube video: {url}")
|
||||
return _parse_youtube_video(url)
|
||||
|
||||
# Pornhub URLs
|
||||
if _is_pornhub_url(url):
|
||||
logger.info(f"Parsing Pornhub video: {url}")
|
||||
return _parse_pornhub_video(url)
|
||||
|
||||
# Fallback to generic yt-dlp
|
||||
ydl_opts = {
|
||||
"quiet": True,
|
||||
@@ -381,6 +501,11 @@ def download_video(url: str, format_id: str = "best", progress_callback=None) ->
|
||||
logger.info(f"Downloading YouTube video: {url}")
|
||||
return _download_youtube_video(url, format_id, progress_callback)
|
||||
|
||||
# Pornhub URLs
|
||||
if _is_pornhub_url(url):
|
||||
logger.info(f"Downloading Pornhub video: {url}")
|
||||
return _download_pornhub_video(url, format_id, progress_callback)
|
||||
|
||||
task_id = str(uuid.uuid4())[:8]
|
||||
output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user