fix: use Twitter syndication API to bypass broken guest token API

Twitter's guest token GraphQL API returns 'Dependency: Unspecified' error.
Use cdn.syndication.twimg.com API instead, which works without auth.
Falls back to yt-dlp if syndication fails.
This commit is contained in:
mini
2026-02-18 17:31:49 +08:00
parent 694e7e7fb4
commit 958bdcd100

View File

@@ -1,8 +1,11 @@
"""yt-dlp wrapper service for video downloading.""" """yt-dlp wrapper service for video downloading."""
import os import os
import re
import uuid import uuid
import json
import asyncio import asyncio
import logging import logging
import urllib.request
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
import yt_dlp import yt_dlp
@@ -15,13 +18,185 @@ X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos")
# Ensure directories exist # Ensure directories exist
os.makedirs(X_VIDEOS_PATH, exist_ok=True) os.makedirs(X_VIDEOS_PATH, exist_ok=True)
# Pattern to match Twitter/X URLs and extract tweet ID
TWITTER_URL_RE = re.compile(
r'https?://(?:(?:www\.)?(?:twitter\.com|x\.com)|[a-z]*twitter\.com)/\w+/status/(\d+)'
)
def get_video_path(filename: str) -> str: def get_video_path(filename: str) -> str:
return os.path.join(X_VIDEOS_PATH, filename) return os.path.join(X_VIDEOS_PATH, filename)
def _is_twitter_url(url: str) -> bool:
return bool(TWITTER_URL_RE.match(url))
def _extract_tweet_id(url: str) -> Optional[str]:
m = TWITTER_URL_RE.match(url)
return m.group(1) if m else None
def _twitter_syndication_info(tweet_id: str) -> dict:
"""Fetch tweet info via Twitter's syndication API (no auth required)."""
api_url = f'https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}&token=x'
req = urllib.request.Request(api_url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
resp = urllib.request.urlopen(req, timeout=15)
return json.loads(resp.read().decode())
def _parse_twitter_video(url: str) -> dict:
"""Parse Twitter video using syndication API."""
tweet_id = _extract_tweet_id(url)
if not tweet_id:
raise ValueError(f"Could not extract tweet ID from URL: {url}")
data = _twitter_syndication_info(tweet_id)
title = data.get('text', 'Untitled')
# Truncate title to first line or 100 chars
title = title.split('\n')[0][:100]
thumbnail = ''
duration = 0
formats = []
for media in data.get('mediaDetails', []):
if media.get('type') != 'video':
continue
thumbnail = media.get('media_url_https', '')
video_info = media.get('video_info', {})
duration = (video_info.get('duration_millis', 0) or 0) // 1000
for i, variant in enumerate(video_info.get('variants', [])):
content_type = variant.get('content_type', '')
if content_type == 'application/x-mpegURL':
continue # Skip HLS
bitrate = variant.get('bitrate', 0)
vid_url = variant.get('url', '')
# Extract resolution from URL
height_match = re.search(r'/(\d+)x(\d+)/', vid_url)
height = int(height_match.group(2)) if height_match else 0
quality = f"{height}p" if height else f"{bitrate // 1000}k"
formats.append({
"format_id": f"tw-{i}",
"quality": quality,
"ext": "mp4",
"filesize": 0,
"note": f"{bitrate // 1000}kbps" if bitrate else "",
"_url": vid_url,
"_bitrate": bitrate,
})
# Sort by bitrate descending
formats.sort(key=lambda x: x.get('_bitrate', 0), reverse=True)
# Add best option
formats.insert(0, {
"format_id": "best",
"quality": "best",
"ext": "mp4",
"filesize": 0,
"note": "Best available quality",
})
return {
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"formats": [{k: v for k, v in f.items() if not k.startswith('_')} for f in formats],
"url": url,
"_formats_full": formats, # Keep full info for download
}
def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None) -> dict:
"""Download Twitter video using syndication API."""
tweet_id = _extract_tweet_id(url)
if not tweet_id:
raise ValueError(f"Could not extract tweet ID from URL: {url}")
data = _twitter_syndication_info(tweet_id)
title = data.get('text', 'Untitled').split('\n')[0][:100]
thumbnail = ''
duration = 0
best_url = None
best_bitrate = 0
for media in data.get('mediaDetails', []):
if media.get('type') != 'video':
continue
thumbnail = media.get('media_url_https', '')
video_info = media.get('video_info', {})
duration = (video_info.get('duration_millis', 0) or 0) // 1000
for i, variant in enumerate(video_info.get('variants', [])):
if variant.get('content_type') == 'application/x-mpegURL':
continue
vid_url = variant.get('url', '')
bitrate = variant.get('bitrate', 0)
if format_id == "best" or format_id == f"tw-{i}":
if format_id != "best" or bitrate > best_bitrate:
best_url = vid_url
best_bitrate = bitrate
if format_id != "best":
break
if not best_url:
raise ValueError("No video found in tweet")
# Download the video
task_id = str(uuid.uuid4())[:8]
filename = os.path.join(X_VIDEOS_PATH, f"{tweet_id}_{task_id}.mp4")
req = urllib.request.Request(best_url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
resp = urllib.request.urlopen(req, timeout=120)
total = int(resp.headers.get('Content-Length', 0))
downloaded = 0
with open(filename, 'wb') as f:
while True:
chunk = resp.read(65536)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if progress_callback and total > 0:
progress_callback(int(downloaded * 100 / total))
if progress_callback:
progress_callback(100)
file_size = os.path.getsize(filename)
return {
"title": title,
"thumbnail": thumbnail,
"duration": duration,
"filename": os.path.basename(filename),
"file_path": filename,
"file_size": file_size,
"platform": "twitter",
}
def parse_video_url(url: str) -> dict: def parse_video_url(url: str) -> dict:
"""Extract video info without downloading.""" """Extract video info without downloading."""
# Use syndication API for Twitter/X URLs
if _is_twitter_url(url):
logger.info(f"Using Twitter syndication API for: {url}")
try:
result = _parse_twitter_video(url)
# Remove internal keys before returning
result.pop('_formats_full', None)
return result
except Exception as e:
logger.warning(f"Twitter syndication failed, falling back to yt-dlp: {e}")
ydl_opts = { ydl_opts = {
"quiet": True, "quiet": True,
"no_warnings": True, "no_warnings": True,
@@ -76,6 +251,14 @@ def parse_video_url(url: str) -> dict:
def download_video(url: str, format_id: str = "best", progress_callback=None) -> dict: def download_video(url: str, format_id: str = "best", progress_callback=None) -> dict:
"""Download video and return file info.""" """Download video and return file info."""
# Use syndication API for Twitter/X URLs
if _is_twitter_url(url):
logger.info(f"Using Twitter syndication API for download: {url}")
try:
return _download_twitter_video(url, format_id, progress_callback)
except Exception as e:
logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}")
task_id = str(uuid.uuid4())[:8] task_id = str(uuid.uuid4())[:8]
output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s")