diff --git a/backend/app/routes/admin.py b/backend/app/routes/admin.py index 7afdbef..3c57cdd 100644 --- a/backend/app/routes/admin.py +++ b/backend/app/routes/admin.py @@ -13,6 +13,7 @@ from app.schemas import ( ) from app.auth import get_current_user from app.services.cleanup import get_setting, set_setting, disk_stats, run_cleanup +from app.services.downloader import get_progress router = APIRouter(prefix="/api/admin", tags=["admin"]) @@ -48,12 +49,13 @@ async def list_videos( total = (await db.execute(count_query)).scalar() or 0 videos = (await db.execute(query.offset((page - 1) * page_size).limit(page_size))).scalars().all() - return VideoListResponse( - videos=[VideoInfo.model_validate(v) for v in videos], - total=total, - page=page, - page_size=page_size, - ) + items = [] + for v in videos: + info = VideoInfo.model_validate(v) + if v.status == "downloading": + info.progress = get_progress(v.task_id) + items.append(info) + return VideoListResponse(videos=items, total=total, page=page, page_size=page_size) @router.delete("/videos/{video_id}") diff --git a/backend/app/routes/download.py b/backend/app/routes/download.py index b375ff6..ca53d9a 100644 --- a/backend/app/routes/download.py +++ b/backend/app/routes/download.py @@ -12,7 +12,10 @@ from app.schemas import DownloadRequest, DownloadResponse, TaskStatus from app.database import get_db, async_session from app.models import Video, DownloadLog from app.auth import get_current_user, optional_auth -from app.services.downloader import download_video, get_video_path, detect_platform +from app.services.downloader import ( + download_video, get_video_path, detect_platform, + register_task, get_progress, request_cancel, cleanup_task, +) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api", tags=["download"]) @@ -107,7 +110,7 @@ async def _log_download(video_id: int, request: Request): async def _do_download(task_id: str, url: str, format_id: str): - """Background download task.""" + """Background download task with real-time progress and cancel support.""" from app.database import async_session async with async_session() as db: video = (await db.execute(select(Video).where(Video.task_id == task_id))).scalar_one_or_none() @@ -117,10 +120,9 @@ async def _do_download(task_id: str, url: str, format_id: str): video.status = "downloading" await db.commit() - def update_progress(pct): - pass # Progress tracking in sync context is complex; keep simple + register_task(task_id) + result = download_video(url, format_id, task_id=task_id) - result = download_video(url, format_id, progress_callback=update_progress) video.title = result["title"] video.thumbnail = result["thumbnail"] video.duration = result["duration"] @@ -133,9 +135,13 @@ async def _do_download(task_id: str, url: str, format_id: str): await db.commit() except Exception as e: logger.error(f"Download failed for {task_id}: {e}") + is_cancel = "Cancelled" in str(e) or "DownloadCancelled" in type(e).__name__ video.status = "error" - video.error_message = str(e)[:500] + video.error_message = "下载已取消,请重试" if is_cancel else str(e)[:500] + video.progress = 0 await db.commit() + finally: + cleanup_task(task_id) @router.post("/download", response_model=DownloadResponse) @@ -167,16 +173,29 @@ async def get_download_status(task_id: str, db: AsyncSession = Depends(get_db)): video = (await db.execute(select(Video).where(Video.task_id == task_id))).scalar_one_or_none() if not video: raise HTTPException(status_code=404, detail="Task not found") + # Inject real-time progress for active downloads + progress = get_progress(task_id) if video.status == "downloading" else video.progress return TaskStatus( task_id=video.task_id, status=video.status, - progress=video.progress, + progress=progress, title=video.title, error_message=video.error_message or "", video_id=video.id if video.status == "done" else None, ) +@router.post("/download/{task_id}/cancel") +async def cancel_download(task_id: str, db: AsyncSession = Depends(get_db)): + video = (await db.execute(select(Video).where(Video.task_id == task_id))).scalar_one_or_none() + if not video: + raise HTTPException(status_code=404, detail="Task not found") + if video.status != "downloading": + raise HTTPException(status_code=400, detail="Task is not downloading") + request_cancel(task_id) + return {"ok": True, "message": "Cancel requested"} + + @router.get("/file/{video_id}") async def download_file(video_id: int, request: Request, background_tasks: BackgroundTasks, user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_db)): video = (await db.execute(select(Video).where(Video.id == video_id))).scalar_one_or_none() diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 5d616b3..2f9f1f4 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -57,6 +57,7 @@ class VideoInfo(BaseModel): file_size: int duration: int status: str + progress: int = 0 error_message: str = "" created_at: datetime diff --git a/backend/app/services/downloader.py b/backend/app/services/downloader.py index 1b10195..2186a4e 100644 --- a/backend/app/services/downloader.py +++ b/backend/app/services/downloader.py @@ -5,6 +5,7 @@ import uuid import json import asyncio import logging +import threading import urllib.request from pathlib import Path from typing import Optional @@ -12,6 +13,45 @@ import yt_dlp logger = logging.getLogger(__name__) +# ── In-memory progress / cancel store (thread-safe via GIL) ───────────────── +_download_progress: dict[str, int] = {} # task_id → 0-100 +_cancel_flags: dict[str, threading.Event] = {} # task_id → Event + + +def register_task(task_id: str): + _cancel_flags[task_id] = threading.Event() + _download_progress[task_id] = 0 + + +def get_progress(task_id: str) -> int: + return _download_progress.get(task_id, 0) + + +def request_cancel(task_id: str): + flag = _cancel_flags.get(task_id) + if flag: + flag.set() + + +def cleanup_task(task_id: str): + _cancel_flags.pop(task_id, None) + _download_progress.pop(task_id, None) + + +def _make_hook(task_id: str): + """yt-dlp progress hook with real-time tracking and cancel support.""" + def hook(d): + flag = _cancel_flags.get(task_id) + if flag and flag.is_set(): + raise yt_dlp.utils.DownloadCancelled("Cancelled by user") + if d["status"] == "downloading": + total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 + done = d.get("downloaded_bytes", 0) + _download_progress[task_id] = int(done * 100 / total) if total else 0 + elif d["status"] == "finished": + _download_progress[task_id] = 99 # merging, not 100 yet + return hook + VIDEO_BASE_PATH = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos") X_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "x_videos") YOUTUBE_VIDEOS_PATH = os.path.join(VIDEO_BASE_PATH, "youtube_videos") @@ -149,7 +189,7 @@ def _parse_twitter_video(url: str) -> dict: } -def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None) -> dict: +def _download_twitter_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict: """Download Twitter video using syndication API.""" tweet_id = _extract_tweet_id(url) if not tweet_id: @@ -198,14 +238,22 @@ def _download_twitter_video(url: str, format_id: str = "best", progress_callback with open(filename, 'wb') as f: while True: + # Check cancel flag + if task_id and _cancel_flags.get(task_id, threading.Event()).is_set(): + raise yt_dlp.utils.DownloadCancelled("Cancelled by user") chunk = resp.read(65536) if not chunk: break f.write(chunk) downloaded += len(chunk) + pct = int(downloaded * 100 / total) if total > 0 else 0 + if task_id: + _download_progress[task_id] = pct if progress_callback and total > 0: - progress_callback(int(downloaded * 100 / total)) + progress_callback(pct) + if task_id: + _download_progress[task_id] = 99 if progress_callback: progress_callback(100) @@ -276,7 +324,7 @@ def _parse_youtube_video(url: str) -> dict: } -def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None) -> dict: +def _download_youtube_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict: """Download YouTube video using yt-dlp.""" task_id = str(uuid.uuid4())[:8] output_template = os.path.join(YOUTUBE_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") @@ -286,14 +334,7 @@ def _download_youtube_video(url: str, format_id: str = "best", progress_callback else: format_spec = f"{format_id}+bestaudio/best" - def hook(d): - if d["status"] == "downloading" and progress_callback: - total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 - downloaded = d.get("downloaded_bytes", 0) - pct = int(downloaded * 100 / total) if total > 0 else 0 - progress_callback(pct) - elif d["status"] == "finished" and progress_callback: - progress_callback(100) + hooks = [_make_hook(task_id)] if task_id else [] ydl_opts = { "format": format_spec, @@ -301,7 +342,7 @@ def _download_youtube_video(url: str, format_id: str = "best", progress_callback "merge_output_format": "mp4", "quiet": True, "no_warnings": True, - "progress_hooks": [hook], + "progress_hooks": hooks, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: @@ -384,7 +425,7 @@ def _parse_pornhub_video(url: str) -> dict: } -def _download_pornhub_video(url: str, format_id: str = "best", progress_callback=None) -> dict: +def _download_pornhub_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict: """Download Pornhub video using yt-dlp.""" task_id = str(uuid.uuid4())[:8] output_template = os.path.join(PH_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") @@ -396,14 +437,7 @@ def _download_pornhub_video(url: str, format_id: str = "best", progress_callback # The format may already contain audio (merged); try with audio fallback gracefully format_spec = f"{format_id}+bestaudio/{format_id}/best" - def hook(d): - if d["status"] == "downloading" and progress_callback: - total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 - downloaded = d.get("downloaded_bytes", 0) - pct = int(downloaded * 100 / total) if total > 0 else 0 - progress_callback(pct) - elif d["status"] == "finished" and progress_callback: - progress_callback(100) + hooks = [_make_hook(task_id)] if task_id else [] ydl_opts = { "format": format_spec, @@ -412,7 +446,7 @@ def _download_pornhub_video(url: str, format_id: str = "best", progress_callback "quiet": True, "no_warnings": True, "http_headers": _PH_HEADERS, - "progress_hooks": [hook], + "progress_hooks": hooks, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: @@ -507,39 +541,32 @@ def parse_video_url(url: str) -> dict: } -def download_video(url: str, format_id: str = "best", progress_callback=None) -> dict: +def download_video(url: str, format_id: str = "best", progress_callback=None, task_id: str = None) -> dict: """Download video and return file info.""" # Use syndication API for Twitter/X URLs if _is_twitter_url(url): logger.info(f"Using Twitter syndication API for download: {url}") try: - return _download_twitter_video(url, format_id, progress_callback) + return _download_twitter_video(url, format_id, progress_callback, task_id=task_id) except Exception as e: logger.warning(f"Twitter syndication download failed, falling back to yt-dlp: {e}") # YouTube URLs if _is_youtube_url(url): logger.info(f"Downloading YouTube video: {url}") - return _download_youtube_video(url, format_id, progress_callback) + return _download_youtube_video(url, format_id, progress_callback, task_id=task_id) # Pornhub URLs if _is_pornhub_url(url): logger.info(f"Downloading Pornhub video: {url}") - return _download_pornhub_video(url, format_id, progress_callback) + return _download_pornhub_video(url, format_id, progress_callback, task_id=task_id) task_id = str(uuid.uuid4())[:8] output_template = os.path.join(X_VIDEOS_PATH, f"%(id)s_{task_id}.%(ext)s") format_spec = "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" if format_id == "best" else f"{format_id}+bestaudio/best" - def hook(d): - if d["status"] == "downloading" and progress_callback: - total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 - downloaded = d.get("downloaded_bytes", 0) - pct = int(downloaded * 100 / total) if total > 0 else 0 - progress_callback(pct) - elif d["status"] == "finished" and progress_callback: - progress_callback(100) + hooks = [_make_hook(task_id)] if task_id else [] ydl_opts = { "format": format_spec, @@ -547,7 +574,7 @@ def download_video(url: str, format_id: str = "best", progress_callback=None) -> "merge_output_format": "mp4", "quiet": True, "no_warnings": True, - "progress_hooks": [hook], + "progress_hooks": hooks, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: diff --git a/frontend/src/views/Admin.vue b/frontend/src/views/Admin.vue index e9a2ae6..df5b863 100644 --- a/frontend/src/views/Admin.vue +++ b/frontend/src/views/Admin.vue @@ -44,6 +44,13 @@ {{ humanSize(v.file_size) }} {{ fmtTime(v.created_at) }} + +
+
+
+
+ {{ v.progress }}% +
⚠️ {{ v.error_message }} @@ -53,11 +60,12 @@
+ 💾 - +
No videos found
@@ -222,6 +230,7 @@ const pageSize = 20 const playing = ref(null) const playUrl = ref('') const totalPages = computed(() => Math.ceil(total.value / pageSize) || 1) +let pollTimer = null // ── Settings / Cleanup ── const cleanupStatus = ref(null) @@ -372,11 +381,29 @@ async function fetchVideos() { }) videos.value = res.data.videos total.value = res.data.total + // Auto-poll while any video is downloading + const hasDownloading = res.data.videos.some(v => v.status === 'downloading') + if (hasDownloading && tab.value === 'videos') { + if (!pollTimer) pollTimer = setInterval(fetchVideos, 2000) + } else { + if (pollTimer) { clearInterval(pollTimer); pollTimer = null } + } } catch (e) { if (e.response?.status === 401) { auth.logout(); location.href = '/login' } } } +async function cancelDownload(v) { + try { + await axios.post(`/api/download/${v.task_id}/cancel`, {}, { headers: auth.getHeaders() }) + v.status = 'error' + v.error_message = '下载已取消,请重试' + await fetchVideos() + } catch (e) { + alert('Cancel failed: ' + (e.response?.data?.detail || e.message)) + } +} + async function fetchStats() { try { const res = await axios.get('/api/admin/stats', { headers: auth.getHeaders() }) @@ -439,6 +466,14 @@ async function fetchLogs() { } onMounted(() => { fetchVideos(); fetchStats() }) + +// Clean up poll timer when leaving videos tab +import { watch, onUnmounted } from 'vue' +watch(tab, (val) => { + if (val !== 'videos' && pollTimer) { clearInterval(pollTimer); pollTimer = null } + if (val === 'videos') fetchVideos() +}) +onUnmounted(() => { if (pollTimer) clearInterval(pollTimer) })