Files
xdl/backend/app/services/cleanup.py

147 lines
5.6 KiB
Python

"""Scheduled video cleanup service."""
import asyncio
import json
import logging
import os
import shutil
from datetime import datetime, timedelta
from sqlalchemy import select
from app.database import async_session
from app.models import AppSetting, Video
logger = logging.getLogger(__name__)
CHECK_INTERVAL_SECONDS = 60 * 10 # Run check every 10 minutes
# ── Setting helpers ──────────────────────────────────────────────────────────
async def get_setting(db, key: str, default: str = "") -> str:
row = (await db.execute(select(AppSetting).where(AppSetting.key == key))).scalar_one_or_none()
return row.value if row else default
async def set_setting(db, key: str, value: str):
row = (await db.execute(select(AppSetting).where(AppSetting.key == key))).scalar_one_or_none()
if row:
row.value = value
row.updated_at = datetime.utcnow()
else:
db.add(AppSetting(key=key, value=value))
await db.commit()
# ── Disk helpers ─────────────────────────────────────────────────────────────
def disk_stats(path: str) -> dict:
"""Return disk usage stats for the given path."""
try:
usage = shutil.disk_usage(path)
used_pct = round(usage.used / usage.total * 100, 1)
return {
"total": usage.total,
"used": usage.used,
"free": usage.free,
"used_pct": used_pct,
}
except Exception:
return {"total": 0, "used": 0, "free": 0, "used_pct": 0}
def _delete_video_file(video: Video) -> int:
"""Delete file, return bytes freed (0 if file missing)."""
if video.file_path and os.path.exists(video.file_path):
size = video.file_size or 0
try:
os.remove(video.file_path)
except OSError:
pass
return size
return 0
# ── Main cleanup logic ───────────────────────────────────────────────────────
async def run_cleanup() -> dict:
"""Execute cleanup. Returns a stats dict."""
async with async_session() as db:
enabled = await get_setting(db, "cleanup_enabled", "true")
if enabled != "true":
return {"skipped": True, "reason": "disabled", "ran_at": datetime.utcnow().isoformat()}
retention_min = int(await get_setting(db, "cleanup_retention_minutes", "10080"))
storage_limit_pct = int(await get_setting(db, "cleanup_storage_limit_pct", "80"))
video_base = os.getenv("VIDEO_BASE_PATH", "/home/xdl/xdl_videos")
cutoff = datetime.utcnow() - timedelta(minutes=retention_min)
time_deleted = 0
storage_deleted = 0
freed_bytes = 0
# ── Phase 1: time-based cleanup ──────────────────────────────────────
old_videos = (await db.execute(
select(Video)
.where(Video.status == "done", Video.created_at < cutoff)
.order_by(Video.created_at.asc())
)).scalars().all()
for v in old_videos:
freed_bytes += _delete_video_file(v)
v.status = "deleted"
v.file_path = ""
time_deleted += 1
if time_deleted:
await db.commit()
logger.info(f"Cleanup: deleted {time_deleted} expired videos, freed {freed_bytes // 1024 // 1024} MB")
# ── Phase 2: storage limit enforcement ───────────────────────────────
stats = disk_stats(video_base)
if stats["used_pct"] > storage_limit_pct:
remaining = (await db.execute(
select(Video)
.where(Video.status == "done")
.order_by(Video.created_at.asc())
)).scalars().all()
for v in remaining:
stats = disk_stats(video_base)
if stats["used_pct"] <= storage_limit_pct:
break
freed_bytes += _delete_video_file(v)
v.status = "deleted"
v.file_path = ""
storage_deleted += 1
if storage_deleted:
await db.commit()
logger.info(f"Cleanup: storage limit reached, deleted {storage_deleted} extra videos")
ran_at = datetime.utcnow().isoformat()
result = {
"time_deleted": time_deleted,
"storage_deleted": storage_deleted,
"freed_mb": round(freed_bytes / 1024 / 1024, 1),
"disk_used_pct": disk_stats(video_base)["used_pct"],
"ran_at": ran_at,
}
await set_setting(db, "cleanup_last_run", ran_at)
await set_setting(db, "cleanup_last_result", json.dumps(result))
return result
# ── Background loop ──────────────────────────────────────────────────────────
async def cleanup_loop():
"""Long-running background task. Starts after 60s, then every 10 min."""
await asyncio.sleep(60)
while True:
try:
result = await run_cleanup()
logger.info(f"Cleanup finished: {result}")
except Exception as e:
logger.error(f"Cleanup loop error: {e}", exc_info=True)
await asyncio.sleep(CHECK_INTERVAL_SECONDS)