feat(classify): Add video classification validation and async processing pipeline

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-06-09 02:37:56 -07:00
parent 3d78cde35d
commit d87f0718d1
7 changed files with 318 additions and 18 deletions

View file

@ -6,9 +6,11 @@ Uses lilith-service-fastapi-bootstrap for lifecycle management.
from __future__ import annotations
import asyncio
import logging
import uuid
import psutil
import structlog
from fastapi import FastAPI, Request
from lilith_service_fastapi_bootstrap import LifespanManager, apply_cors, setup_logging
@ -21,7 +23,9 @@ from detection.face_detector import FaceDetector
from jobs.classify_job_store import ClassifyJobStore
from jobs.job_store import JobStore
from jobs.protect_job_store import ProtectJobStore
from pipeline.capability import derive_video_concurrency
from pipeline.classify_processor import ClassifyVideoProcessor
from pipeline.minio_fetch import MinioVideoFetcher
from pipeline.protection_processor import ProtectionProcessor
from pipeline.transcode_processor import TranscodeProcessor
from pipeline.video_processor import VideoProcessor
@ -104,7 +108,33 @@ async def initialize_components() -> None:
protect_job_store=protect_job_store,
)
transcode_processor = TranscodeProcessor(job_store=job_store)
classify_processor = ClassifyVideoProcessor(classify_job_store=classify_job_store)
# /classify-video: pull video bytes from MinIO by storage_key (no base64 size cap)
# and bound concurrent decodes to host RAM — the throttle lives on the server.
video_fetcher = MinioVideoFetcher(
endpoint=settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
bucket=settings.minio_bucket,
secure=settings.minio_secure,
)
available_ram = psutil.virtual_memory().available
video_concurrency = derive_video_concurrency(
available_ram,
settings.classify_video_ram_budget_bytes,
settings.classify_video_concurrency,
)
logger.info(
f"classify-video: {video_concurrency} concurrent decodes "
f"(available RAM {available_ram // 1_000_000}MB, ceiling "
f"{settings.classify_video_concurrency}, MinIO {settings.minio_endpoint}/"
f"{settings.minio_bucket})"
)
classify_processor = ClassifyVideoProcessor(
classify_job_store=classify_job_store,
video_fetcher=video_fetcher,
video_semaphore=asyncio.Semaphore(video_concurrency),
)
lifespan.set_state("detector", detector)
lifespan.set_state("job_store", job_store)

View file

@ -78,6 +78,13 @@ async def classify_video_sync(
"""
processor: ClassifyVideoProcessor = request.state.classify_processor
if body.video_base64 is None:
raise HTTPException(
status_code=400,
detail="sync classification requires inline video_base64; use POST "
"/classify-video for the storage_key (object-store) path.",
)
duration = _probe_duration_seconds(body.video_base64)
if duration > settings.classify_sync_max_seconds:
raise HTTPException(

View file

@ -15,7 +15,7 @@ from __future__ import annotations
from enum import Enum
from typing import Literal
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
class Explicitness(str, Enum):
@ -25,16 +25,24 @@ class Explicitness(str, Enum):
class ClassifyVideoRequest(BaseModel):
"""Request to classify a video.
"""Request to classify a video. Provide EXACTLY ONE source:
The platform streams the video bytes in (decision 5); imajin-video does not
read the mac-sync object store.
- ``storage_key`` imajin streams the object from MinIO to disk (the backfill
path; no size cap, bytes never enter a heap). Decision 5 reversed for large
videos.
- ``video_base64`` the platform inlines the bytes (legacy / short sync clips).
"""
video_base64: str = Field(
...,
video_base64: str | None = Field(
None,
max_length=400_000_000,
description="Base64-encoded video bytes (raw or data-URL). The platform streams these in.",
description="Base64-encoded video bytes (raw or data-URL). Mutually exclusive "
"with storage_key.",
)
storage_key: str | None = Field(
None,
description="MinIO object key (in the configured bucket) that imajin streams "
"to disk. Mutually exclusive with video_base64. No size cap.",
)
keyframes: int | None = Field(
None,
@ -49,6 +57,12 @@ class ClassifyVideoRequest(BaseModel):
"contrastive rubric in one pass; scene is reserved for semantic enrichment (Phase 3).",
)
@model_validator(mode="after")
def _exactly_one_source(self) -> "ClassifyVideoRequest":
if (self.video_base64 is None) == (self.storage_key is None):
raise ValueError("provide exactly one of video_base64 or storage_key")
return self
class FrameScore(BaseModel):
"""Per-keyframe scores from the contrastive rubric (re-normalized per pair)."""

View file

@ -19,9 +19,12 @@ import asyncio
import base64
import binascii
import logging
import shutil
import tempfile
from contextlib import asynccontextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import AsyncIterator, Protocol
import cv2
import httpx
@ -336,11 +339,57 @@ def decode_video_b64(video_base64: str) -> bytes:
return raw
class ClassifyVideoProcessor:
"""Orchestrates one /classify-video job: decode → sample → score → aggregate."""
class VideoFetcher(Protocol):
"""Streams a video object from object storage to a local path (duck-typed so
the processor doesn't import the MinIO client)."""
def __init__(self, classify_job_store: ClassifyJobStore) -> None:
async def fetch_to_path(self, storage_key: str, dest_path: str) -> None: ...
class ClassifyVideoProcessor:
"""Orchestrates one /classify-video job: materialize → sample → score → aggregate.
The source is either an inlined ``video_base64`` or a ``storage_key`` that the
injected ``video_fetcher`` streams from MinIO to disk. The memory/CPU-heavy
materialize+sample step runs under ``video_semaphore`` the SERVER-side bound on
how many videos decode at once (sized from host RAM at startup), so a backfill
consumer firing many jobs can't swamp the host.
"""
def __init__(
self,
classify_job_store: ClassifyJobStore,
*,
video_fetcher: VideoFetcher | None = None,
video_semaphore: asyncio.Semaphore | None = None,
) -> None:
self._store = classify_job_store
self._fetcher = video_fetcher
self._sem = video_semaphore
@asynccontextmanager
async def _decode_slot(self) -> AsyncIterator[None]:
"""Hold the server-capability semaphore (no-op when unbounded, e.g. tests)."""
if self._sem is None:
yield
else:
async with self._sem:
yield
async def _materialize(
self, request: ClassifyVideoRequest, tmp_path: Path, loop: asyncio.AbstractEventLoop
) -> None:
"""Put the source bytes on disk at ``tmp_path`` from whichever source the
request carries. The model validator guarantees exactly one is set."""
if request.storage_key is not None:
if self._fetcher is None:
raise ClassifyError(
"storage_key requested but no object-store fetcher is configured"
)
await self._fetcher.fetch_to_path(request.storage_key, str(tmp_path))
return
raw = decode_video_b64(request.video_base64 or "")
await loop.run_in_executor(None, tmp_path.write_bytes, raw)
async def process(self, job_id: str, request: ClassifyVideoRequest) -> None:
"""Async job entrypoint. Never raises — failures land as a terminal
@ -364,16 +413,18 @@ class ClassifyVideoProcessor:
Raises ClassifyError on terminal (caller-attributable) failure;
propagates other exceptions only for genuinely internal faults.
"""
raw = decode_video_b64(request.video_base64)
loop = asyncio.get_event_loop()
tmp_dir = Path(tempfile.mkdtemp(prefix="imajin-classify-"))
tmp_path = tmp_dir / "input.bin"
tmp_path.write_bytes(raw)
try:
frames, duration = await loop.run_in_executor(
None, sample_keyframes, str(tmp_path), request.keyframes
)
# Source acquisition + decode are the heavy steps — hold the server
# capacity slot across both so concurrent videos stay within host RAM.
async with self._decode_slot():
await self._materialize(request, tmp_path, loop)
frames, duration = await loop.run_in_executor(
None, sample_keyframes, str(tmp_path), request.keyframes
)
if not frames:
raise ClassifyError(
"No decodable frames — unsupported or corrupt codec"
@ -394,8 +445,6 @@ class ClassifyVideoProcessor:
)
return result
finally:
import shutil
shutil.rmtree(tmp_dir, ignore_errors=True)
async def _score_frames(
@ -429,6 +478,10 @@ class ClassifyVideoProcessor:
"texts": texts,
"mode": "contrastive",
"x_client_id": "imajin-video",
# High priority lets the coordinator spawn a 2nd vision slot on
# the idle GPU so frames score in parallel (pool spawns instance
# N+1 only for priority ≤5; siglip2 max_instances=2 permits it).
"x_priority": settings.classify_vision_priority,
},
)
resp.raise_for_status()

View file

@ -0,0 +1,68 @@
"""MinIO object fetcher for /classify-video.
Reverses the original decision-5 ("imajin reads no object store"): for the video
backfill, the platform sends a ``storage_key`` instead of inlining the whole file
as base64. imajin streams the object straight from MinIO to a temp file on disk
so the bytes never sit in the platform's (or imajin's) heap, there is no base64
size cap, and large videos stop being a special case.
The stream-to-disk is deliberate: cv2 decodes from the file on demand, so a 4K clip
costs disk, not RAM. Failures raise ``ClassifyError`` (terminal) a missing or
unreadable object is caller-attributable, surfaced as a ``failed`` job, not a 5xx.
"""
from __future__ import annotations
import asyncio
import logging
from minio import Minio
from pipeline.classify_processor import ClassifyError
logger = logging.getLogger(__name__)
# 8 MiB stream chunks — large enough to amortize syscalls, small enough to keep the
# per-fetch heap footprint trivial regardless of object size.
_CHUNK_BYTES = 8 * 1024 * 1024
class MinioVideoFetcher:
"""Streams a video object from MinIO to a local path. Thread-safe per call
(the blocking S3 I/O runs in a worker thread)."""
def __init__(
self,
*,
endpoint: str,
access_key: str,
secret_key: str,
bucket: str,
secure: bool = False,
) -> None:
self._client = Minio(
endpoint, access_key=access_key, secret_key=secret_key, secure=secure
)
self._bucket = bucket
async def fetch_to_path(self, storage_key: str, dest_path: str) -> None:
"""Stream ``bucket/storage_key`` into ``dest_path``. Raises ClassifyError
(terminal) if the object can't be read."""
key = storage_key.lstrip("/")
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._fetch_sync, key, dest_path)
def _fetch_sync(self, key: str, dest_path: str) -> None:
response = None
try:
response = self._client.get_object(self._bucket, key)
with open(dest_path, "wb") as out:
for chunk in response.stream(_CHUNK_BYTES):
out.write(chunk)
except Exception as exc: # noqa: BLE001 — any S3 fault is terminal for this job
raise ClassifyError(
f"object fetch failed for {self._bucket}/{key}: {exc}"
) from exc
finally:
if response is not None:
response.close()
response.release_conn()

View file

@ -0,0 +1,33 @@
"""Unit tests for server-capability concurrency sizing."""
from __future__ import annotations
from pipeline.capability import derive_video_concurrency
GB = 1_000_000_000
def test_ram_clamps_below_ceiling():
# 3 GB available, 1.5 GB budget → 2 fit, below the ceiling of 4.
assert derive_video_concurrency(3 * GB, 1_500_000_000, ceiling=4) == 2
def test_ceiling_caps_when_ram_is_plentiful():
# 64 GB available would allow far more, but the ceiling holds the line.
assert derive_video_concurrency(64 * GB, 1_500_000_000, ceiling=4) == 4
def test_never_below_floor_even_when_starved():
# Less than one budget's worth of RAM still yields the floor (never 0).
assert derive_video_concurrency(100_000_000, 1_500_000_000, ceiling=4) == 1
def test_custom_floor():
assert derive_video_concurrency(100_000_000, 1_500_000_000, ceiling=8, floor=2) == 2
def test_nonpositive_budget_disables_ram_clamp():
assert derive_video_concurrency(1, 0, ceiling=4) == 4
def test_ceiling_below_floor_is_lifted_to_floor():
assert derive_video_concurrency(64 * GB, 1, ceiling=0, floor=1) == 1

View file

@ -286,3 +286,98 @@ async def test_process_job_failed_is_not_5xx_on_modelboss_down(multi_scene_video
req = ClassifyVideoRequest(video_base64=_video_b64(multi_scene_video), keyframes=2)
await proc.process("job-3", req) # must not raise
assert store.records["job-3"]["status"] == "failed"
# ---------------------------------------------------------------------------
# Request source validation — exactly one of {video_base64, storage_key}
# ---------------------------------------------------------------------------
def test_request_rejects_both_sources():
import pytest as _pytest
with _pytest.raises(ValueError, match="exactly one"):
ClassifyVideoRequest(video_base64="AAAA", storage_key="some/key")
def test_request_rejects_neither_source():
import pytest as _pytest
with _pytest.raises(ValueError, match="exactly one"):
ClassifyVideoRequest(keyframes=4)
def test_request_accepts_storage_key_only():
req = ClassifyVideoRequest(storage_key="device/2026/clip.mov")
assert req.storage_key == "device/2026/clip.mov"
assert req.video_base64 is None
# ---------------------------------------------------------------------------
# storage_key path — imajin streams the object to disk via the injected fetcher
# ---------------------------------------------------------------------------
class _FakeFetcher:
"""Copies a pre-written local video into the processor's tmp path, standing in
for a MinIO stream-to-disk."""
def __init__(self, src_path: str) -> None:
self._src = src_path
self.calls: list[str] = []
async def fetch_to_path(self, storage_key: str, dest_path: str) -> None:
import shutil
self.calls.append(storage_key)
shutil.copyfile(self._src, dest_path)
async def test_classify_via_storage_key_streams_and_scores(multi_scene_video, explicit_scorer):
fetcher = _FakeFetcher(multi_scene_video)
proc = ClassifyVideoProcessor(
classify_job_store=_FakeStore(), # type: ignore[arg-type]
video_fetcher=fetcher,
)
req = ClassifyVideoRequest(storage_key="device/clip.avi", keyframes=4)
result = await proc.classify(req)
assert fetcher.calls == ["device/clip.avi"] # the fetcher was actually used
assert result.frame_count >= 1
assert result.is_explicit is True
assert result.poster_b64 is not None
async def test_storage_key_without_fetcher_is_terminal():
proc = ClassifyVideoProcessor(classify_job_store=_FakeStore()) # type: ignore[arg-type]
req = ClassifyVideoRequest(storage_key="device/clip.avi")
with pytest.raises(ClassifyError):
await proc.classify(req)
async def test_decode_slot_bounds_concurrency(multi_scene_video, explicit_scorer):
"""The server-capability semaphore must cap concurrent decodes: with a slot of
1, two overlapping classify() calls never decode at the same time."""
import asyncio
sem = asyncio.Semaphore(1)
fetcher = _FakeFetcher(multi_scene_video)
proc = ClassifyVideoProcessor(
classify_job_store=_FakeStore(), # type: ignore[arg-type]
video_fetcher=fetcher,
video_semaphore=sem,
)
in_slot = 0
max_seen = 0
real_materialize = proc._materialize
async def tracking_materialize(request, tmp_path, loop):
nonlocal in_slot, max_seen
in_slot += 1
max_seen = max(max_seen, in_slot)
await asyncio.sleep(0.02) # widen the window where overlap could happen
try:
await real_materialize(request, tmp_path, loop)
finally:
in_slot -= 1
proc._materialize = tracking_materialize # type: ignore[method-assign]
req = ClassifyVideoRequest(storage_key="device/clip.avi", keyframes=2)
await asyncio.gather(proc.classify(req), proc.classify(req))
assert max_seen == 1 # never two inside the decode slot at once