feat(api-routes): Add media-related API endpoint handler for fetching or uploading media data

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Claude Code 2026-03-18 23:07:27 -07:00
parent d69ffb5b6c
commit 814a6c16c2

View file

@ -1,23 +1,28 @@
"""Video media manifest route.
GET /media/videos returns list of available videos from image-assistant,
with ffmpeg-extracted thumbnails, cached in-memory with TTL.
GET /media/videos list videos from LilithPhotos with best-frame thumbnails
and short hover-preview clips, all cached in-memory.
Designed for the developer demo UI eliminates the slow/broken path of:
demo -> image-assistant (NestJS) -> presigned S3 URL (no thumbnail)
Thumbnail strategy:
Sample frames at 2fps over the first 10s, score by Laplacian sharpness,
return the sharpest frame as a 320px JPEG.
Replaces it with:
demo -> imajin-video /media/videos -> (cache hit) instant list + thumbnails
-> (cache miss) image-assistant + ffmpeg extract
Preview clip strategy:
First 4s of video, 320px wide, H.264 ultrafast/fragmented MP4 piped to
stdout sized and optimised to match thumbnail dimensions.
"""
from __future__ import annotations
import asyncio
import base64
import logging
import os
import tempfile
import time
from pathlib import Path
from typing import Any
import cv2
import httpx
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
@ -28,24 +33,26 @@ logger = logging.getLogger(__name__)
router = APIRouter(prefix="/media", tags=["media"])
# ---------------------------------------------------------------------------
# In-memory cache (single-process dev service — no Redis needed here)
# In-memory cache
# ---------------------------------------------------------------------------
_MANIFEST_TTL = 60 * 45 # 45 min — image-assistant presigned URLs expire after 1h
_THUMBNAIL_SEMAPHORE = asyncio.Semaphore(3) # max 3 concurrent ffmpeg extractions
_MANIFEST_TTL = 60 * 45 # 45 min — presigned URLs expire after 1h
_FFMPEG_SEMAPHORE = asyncio.Semaphore(3) # total concurrent ffmpeg processes
_manifest_cache: list[dict[str, Any]] | None = None
_manifest_fetched_at: float = 0.0
_thumbnail_cache: dict[str, str | None] = {} # photo_id -> base64 JPEG or None
_thumbnail_cache: dict[str, str | None] = {} # photo_id -> base64 JPEG
_clip_cache: dict[str, str | None] = {} # photo_id -> base64 MP4 fragment
# ---------------------------------------------------------------------------
# Response model
# Response models
# ---------------------------------------------------------------------------
class VideoItem(BaseModel):
id: str
filename: str
original_url: str
thumbnail_b64: str | None
preview_clip_b64: str | None # short looping clip for hover preview
duration_seconds: float
file_size: int
width: int
@ -64,28 +71,29 @@ class VideoManifestResponse(BaseModel):
# ---------------------------------------------------------------------------
@router.get("/videos", response_model=VideoManifestResponse)
async def list_videos(force_refresh: bool = False) -> VideoManifestResponse:
"""Return a list of available videos with thumbnails.
"""Return videos with best-frame thumbnails and hover-preview clips.
Results are cached for 45 minutes. Pass ?force_refresh=true to bypass.
Thumbnails are cached indefinitely per video ID.
First call: slow (ffmpeg extracts frames + encodes clips per video).
Subsequent calls: instant (in-memory cache).
?force_refresh=true busts the manifest cache (thumbnails/clips persist).
"""
global _manifest_cache, _manifest_fetched_at
now = time.monotonic()
manifest_stale = (now - _manifest_fetched_at) > _MANIFEST_TTL
if force_refresh or manifest_stale or _manifest_cache is None:
raw_photos = await _fetch_from_image_assistant()
_manifest_cache = raw_photos
if force_refresh or _manifest_cache is None or (now - _manifest_fetched_at) > _MANIFEST_TTL:
_manifest_cache = await _fetch_from_lilithphotos()
_manifest_fetched_at = now
cached = False
else:
cached = True
# Extract thumbnails for any IDs not yet cached
missing = [p for p in _manifest_cache if p["id"] not in _thumbnail_cache]
if missing:
await _populate_thumbnails(missing)
# Process any videos not yet cached (thumb + clip in parallel per video)
uncached = [
p for p in _manifest_cache
if p["id"] not in _thumbnail_cache or p["id"] not in _clip_cache
]
if uncached:
await asyncio.gather(*[_process_video(p["id"], p["originalUrl"]) for p in uncached])
videos = [
VideoItem(
@ -93,6 +101,7 @@ async def list_videos(force_refresh: bool = False) -> VideoManifestResponse:
filename=p.get("originalFilename", p["id"]),
original_url=p["originalUrl"],
thumbnail_b64=_thumbnail_cache.get(p["id"]),
preview_clip_b64=_clip_cache.get(p["id"]),
duration_seconds=float(p.get("durationSeconds", 0)),
file_size=int(p.get("fileSize", 0)),
width=int(p.get("width", 0)),
@ -102,31 +111,152 @@ async def list_videos(force_refresh: bool = False) -> VideoManifestResponse:
for p in _manifest_cache
]
return VideoManifestResponse(
videos=videos,
cached=cached,
fetched_at=_manifest_fetched_at,
return VideoManifestResponse(videos=videos, cached=cached, fetched_at=_manifest_fetched_at)
# ---------------------------------------------------------------------------
# Per-video processing (thumbnail + clip)
# ---------------------------------------------------------------------------
async def _process_video(photo_id: str, url: str) -> None:
"""Extract best-frame thumbnail and preview clip for one video, cache both."""
thumb_task = _ensure_thumbnail(photo_id, url)
clip_task = _ensure_clip(photo_id, url)
await asyncio.gather(thumb_task, clip_task)
async def _ensure_thumbnail(photo_id: str, url: str) -> None:
if photo_id in _thumbnail_cache:
return
async with _FFMPEG_SEMAPHORE:
try:
_thumbnail_cache[photo_id] = await _best_frame_thumbnail(url)
except Exception as exc:
logger.warning(f"Thumbnail failed for {photo_id}: {exc}")
_thumbnail_cache[photo_id] = None
async def _ensure_clip(photo_id: str, url: str) -> None:
if photo_id in _clip_cache:
return
async with _FFMPEG_SEMAPHORE:
try:
_clip_cache[photo_id] = await _preview_clip(url)
except Exception as exc:
logger.warning(f"Clip failed for {photo_id}: {exc}")
_clip_cache[photo_id] = None
# ---------------------------------------------------------------------------
# Best-frame thumbnail (sharpest frame in first 10s)
# ---------------------------------------------------------------------------
async def _best_frame_thumbnail(url: str) -> str | None:
"""Sample frames at 2fps over first 10s, pick sharpest by Laplacian variance."""
with tempfile.TemporaryDirectory(prefix="imajin-thumb-") as tmp:
frame_pat = os.path.join(tmp, "f%03d.jpg")
proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-y",
"-i", url,
"-t", "10",
"-vf", "fps=2,scale=640:-2",
"-q:v", "3",
frame_pat,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
try:
await asyncio.wait_for(proc.communicate(), timeout=40)
except TimeoutError:
proc.kill()
await proc.communicate()
return None
frames = sorted(Path(tmp).glob("f*.jpg"))
if not frames:
return None
loop = asyncio.get_event_loop()
best_b64 = await loop.run_in_executor(None, _score_and_encode, frames)
return best_b64
def _score_and_encode(frames: list[Path]) -> str | None:
"""Synchronous: score frames by sharpness, encode best as 320px JPEG."""
best_img = None
best_score = -1.0
for f in frames:
img = cv2.imread(str(f))
if img is None:
continue
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
score = float(cv2.Laplacian(gray, cv2.CV_64F).var())
if score > best_score:
best_score = score
best_img = img
if best_img is None:
return None
h, w = best_img.shape[:2]
new_w, new_h = 320, max(2, int(h * 320 / w) & ~1) # keep even height
resized = cv2.resize(best_img, (new_w, new_h), interpolation=cv2.INTER_AREA)
ok, buf = cv2.imencode(".jpg", resized, [cv2.IMWRITE_JPEG_QUALITY, 82])
if not ok:
return None
return base64.b64encode(buf.tobytes()).decode()
# ---------------------------------------------------------------------------
# Preview clip (4s looping clip sized for thumbnail width)
# ---------------------------------------------------------------------------
async def _preview_clip(url: str) -> str | None:
"""Encode first 4s at 320px as a fragmented MP4 — optimised for size/speed.
Fragmented MP4 (frag_keyframe+empty_moov) can be piped to stdout and played
directly from a data: URL in the browser without seeking.
"""
proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-y",
"-i", url,
"-t", "4",
"-vf", "scale=320:-2,fps=15", # match thumbnail width, 15fps smooth enough
"-c:v", "libx264",
"-preset", "ultrafast", # fastest encode — demo tool, not production
"-crf", "32", # higher CRF = smaller file at 320px
"-an", # no audio
"-movflags", "frag_keyframe+empty_moov+default_base_moof",
"-f", "mp4",
"pipe:1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
try:
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=60)
except TimeoutError:
proc.kill()
await proc.communicate()
return None
if proc.returncode == 0 and stdout:
logger.debug(f"Preview clip: {len(stdout):,} bytes")
return base64.b64encode(stdout).decode()
return None
# ---------------------------------------------------------------------------
# Helpers
# LilithPhotos API fetch
# ---------------------------------------------------------------------------
async def _fetch_from_image_assistant() -> list[dict[str, Any]]:
"""Fetch video list from image-assistant API."""
async def _fetch_from_lilithphotos() -> list[dict[str, Any]]:
"""Fetch video list from LilithPhotos API."""
url = f"{settings.image_assistant_url}/api/photos?mediaType=video"
try:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
resp.raise_for_status()
except httpx.HTTPError as exc:
raise HTTPException(
status_code=502,
detail=f"image-assistant unreachable: {exc}",
) from exc
raise HTTPException(status_code=502, detail=f"LilithPhotos unreachable: {exc}") from exc
body = resp.json()
# Response envelope: { success: bool, data: { photos: [...] } }
if isinstance(body, dict) and "data" in body:
inner = body["data"]
if isinstance(inner, dict) and "photos" in inner:
@ -135,56 +265,5 @@ async def _fetch_from_image_assistant() -> list[dict[str, Any]]:
return inner
if isinstance(body, list):
return body
logger.warning(f"Unexpected image-assistant response shape: {list(body.keys())}")
logger.warning(f"Unexpected LilithPhotos response shape: {list(body.keys())}")
return []
async def _populate_thumbnails(photos: list[dict[str, Any]]) -> None:
"""Extract thumbnails in parallel (max 3 concurrent ffmpeg processes)."""
tasks = [_extract_and_cache_thumbnail(p["id"], p["originalUrl"]) for p in photos]
await asyncio.gather(*tasks)
async def _extract_and_cache_thumbnail(photo_id: str, url: str) -> None:
"""Extract first frame via ffmpeg, cache as base64 JPEG."""
async with _THUMBNAIL_SEMAPHORE:
try:
b64 = await _run_ffmpeg_thumbnail(url)
_thumbnail_cache[photo_id] = b64
if b64:
logger.debug(f"Thumbnail extracted for {photo_id} ({len(b64)} chars)")
else:
logger.warning(f"ffmpeg returned empty output for {photo_id}")
except Exception as exc:
logger.warning(f"Thumbnail extraction failed for {photo_id}: {exc}")
_thumbnail_cache[photo_id] = None
async def _run_ffmpeg_thumbnail(url: str) -> str | None:
"""Run ffmpeg to extract a single scaled frame as JPEG, return base64.
Uses create_subprocess_exec (list args, no shell) safe from injection.
"""
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
"-y",
"-i", url,
"-vframes", "1",
"-vf", "scale=320:-2", # 320px wide, height divisible by 2
"-f", "image2",
"-vcodec", "mjpeg",
"-q:v", "5", # JPEG quality 5 (1=best, 31=worst)
"pipe:1",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
try:
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30)
except TimeoutError:
proc.kill()
await proc.communicate()
return None
if proc.returncode == 0 and stdout:
return base64.b64encode(stdout).decode()
return None