diff --git a/services/imajin-video/service/src/api/app.py b/services/imajin-video/service/src/api/app.py index e13927e3..172f7b0b 100644 --- a/services/imajin-video/service/src/api/app.py +++ b/services/imajin-video/service/src/api/app.py @@ -6,9 +6,11 @@ Uses lilith-service-fastapi-bootstrap for lifecycle management. from __future__ import annotations +import asyncio import logging import uuid +import psutil import structlog from fastapi import FastAPI, Request from lilith_service_fastapi_bootstrap import LifespanManager, apply_cors, setup_logging @@ -21,7 +23,9 @@ from detection.face_detector import FaceDetector from jobs.classify_job_store import ClassifyJobStore from jobs.job_store import JobStore from jobs.protect_job_store import ProtectJobStore +from pipeline.capability import derive_video_concurrency from pipeline.classify_processor import ClassifyVideoProcessor +from pipeline.minio_fetch import MinioVideoFetcher from pipeline.protection_processor import ProtectionProcessor from pipeline.transcode_processor import TranscodeProcessor from pipeline.video_processor import VideoProcessor @@ -104,7 +108,33 @@ async def initialize_components() -> None: protect_job_store=protect_job_store, ) transcode_processor = TranscodeProcessor(job_store=job_store) - classify_processor = ClassifyVideoProcessor(classify_job_store=classify_job_store) + + # /classify-video: pull video bytes from MinIO by storage_key (no base64 size cap) + # and bound concurrent decodes to host RAM — the throttle lives on the server. + video_fetcher = MinioVideoFetcher( + endpoint=settings.minio_endpoint, + access_key=settings.minio_access_key, + secret_key=settings.minio_secret_key, + bucket=settings.minio_bucket, + secure=settings.minio_secure, + ) + available_ram = psutil.virtual_memory().available + video_concurrency = derive_video_concurrency( + available_ram, + settings.classify_video_ram_budget_bytes, + settings.classify_video_concurrency, + ) + logger.info( + f"classify-video: {video_concurrency} concurrent decodes " + f"(available RAM {available_ram // 1_000_000}MB, ceiling " + f"{settings.classify_video_concurrency}, MinIO {settings.minio_endpoint}/" + f"{settings.minio_bucket})" + ) + classify_processor = ClassifyVideoProcessor( + classify_job_store=classify_job_store, + video_fetcher=video_fetcher, + video_semaphore=asyncio.Semaphore(video_concurrency), + ) lifespan.set_state("detector", detector) lifespan.set_state("job_store", job_store) diff --git a/services/imajin-video/service/src/api/routes/classify_video.py b/services/imajin-video/service/src/api/routes/classify_video.py index 432ab59c..d3dababc 100644 --- a/services/imajin-video/service/src/api/routes/classify_video.py +++ b/services/imajin-video/service/src/api/routes/classify_video.py @@ -78,6 +78,13 @@ async def classify_video_sync( """ processor: ClassifyVideoProcessor = request.state.classify_processor + if body.video_base64 is None: + raise HTTPException( + status_code=400, + detail="sync classification requires inline video_base64; use POST " + "/classify-video for the storage_key (object-store) path.", + ) + duration = _probe_duration_seconds(body.video_base64) if duration > settings.classify_sync_max_seconds: raise HTTPException( diff --git a/services/imajin-video/service/src/models/classify_types.py b/services/imajin-video/service/src/models/classify_types.py index d0a697fd..557b792f 100644 --- a/services/imajin-video/service/src/models/classify_types.py +++ b/services/imajin-video/service/src/models/classify_types.py @@ -15,7 +15,7 @@ from __future__ import annotations from enum import Enum from typing import Literal -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator class Explicitness(str, Enum): @@ -25,16 +25,24 @@ class Explicitness(str, Enum): class ClassifyVideoRequest(BaseModel): - """Request to classify a video. + """Request to classify a video. Provide EXACTLY ONE source: - The platform streams the video bytes in (decision 5); imajin-video does not - read the mac-sync object store. + - ``storage_key`` — imajin streams the object from MinIO to disk (the backfill + path; no size cap, bytes never enter a heap). Decision 5 reversed for large + videos. + - ``video_base64`` — the platform inlines the bytes (legacy / short sync clips). """ - video_base64: str = Field( - ..., + video_base64: str | None = Field( + None, max_length=400_000_000, - description="Base64-encoded video bytes (raw or data-URL). The platform streams these in.", + description="Base64-encoded video bytes (raw or data-URL). Mutually exclusive " + "with storage_key.", + ) + storage_key: str | None = Field( + None, + description="MinIO object key (in the configured bucket) that imajin streams " + "to disk. Mutually exclusive with video_base64. No size cap.", ) keyframes: int | None = Field( None, @@ -49,6 +57,12 @@ class ClassifyVideoRequest(BaseModel): "contrastive rubric in one pass; scene is reserved for semantic enrichment (Phase 3).", ) + @model_validator(mode="after") + def _exactly_one_source(self) -> "ClassifyVideoRequest": + if (self.video_base64 is None) == (self.storage_key is None): + raise ValueError("provide exactly one of video_base64 or storage_key") + return self + class FrameScore(BaseModel): """Per-keyframe scores from the contrastive rubric (re-normalized per pair).""" diff --git a/services/imajin-video/service/src/pipeline/classify_processor.py b/services/imajin-video/service/src/pipeline/classify_processor.py index 8076eafe..b55b0c29 100644 --- a/services/imajin-video/service/src/pipeline/classify_processor.py +++ b/services/imajin-video/service/src/pipeline/classify_processor.py @@ -19,9 +19,12 @@ import asyncio import base64 import binascii import logging +import shutil import tempfile +from contextlib import asynccontextmanager from dataclasses import dataclass from pathlib import Path +from typing import AsyncIterator, Protocol import cv2 import httpx @@ -336,11 +339,57 @@ def decode_video_b64(video_base64: str) -> bytes: return raw -class ClassifyVideoProcessor: - """Orchestrates one /classify-video job: decode → sample → score → aggregate.""" +class VideoFetcher(Protocol): + """Streams a video object from object storage to a local path (duck-typed so + the processor doesn't import the MinIO client).""" - def __init__(self, classify_job_store: ClassifyJobStore) -> None: + async def fetch_to_path(self, storage_key: str, dest_path: str) -> None: ... + + +class ClassifyVideoProcessor: + """Orchestrates one /classify-video job: materialize → sample → score → aggregate. + + The source is either an inlined ``video_base64`` or a ``storage_key`` that the + injected ``video_fetcher`` streams from MinIO to disk. The memory/CPU-heavy + materialize+sample step runs under ``video_semaphore`` — the SERVER-side bound on + how many videos decode at once (sized from host RAM at startup), so a backfill + consumer firing many jobs can't swamp the host. + """ + + def __init__( + self, + classify_job_store: ClassifyJobStore, + *, + video_fetcher: VideoFetcher | None = None, + video_semaphore: asyncio.Semaphore | None = None, + ) -> None: self._store = classify_job_store + self._fetcher = video_fetcher + self._sem = video_semaphore + + @asynccontextmanager + async def _decode_slot(self) -> AsyncIterator[None]: + """Hold the server-capability semaphore (no-op when unbounded, e.g. tests).""" + if self._sem is None: + yield + else: + async with self._sem: + yield + + async def _materialize( + self, request: ClassifyVideoRequest, tmp_path: Path, loop: asyncio.AbstractEventLoop + ) -> None: + """Put the source bytes on disk at ``tmp_path`` from whichever source the + request carries. The model validator guarantees exactly one is set.""" + if request.storage_key is not None: + if self._fetcher is None: + raise ClassifyError( + "storage_key requested but no object-store fetcher is configured" + ) + await self._fetcher.fetch_to_path(request.storage_key, str(tmp_path)) + return + raw = decode_video_b64(request.video_base64 or "") + await loop.run_in_executor(None, tmp_path.write_bytes, raw) async def process(self, job_id: str, request: ClassifyVideoRequest) -> None: """Async job entrypoint. Never raises — failures land as a terminal @@ -364,16 +413,18 @@ class ClassifyVideoProcessor: Raises ClassifyError on terminal (caller-attributable) failure; propagates other exceptions only for genuinely internal faults. """ - raw = decode_video_b64(request.video_base64) loop = asyncio.get_event_loop() tmp_dir = Path(tempfile.mkdtemp(prefix="imajin-classify-")) tmp_path = tmp_dir / "input.bin" - tmp_path.write_bytes(raw) try: - frames, duration = await loop.run_in_executor( - None, sample_keyframes, str(tmp_path), request.keyframes - ) + # Source acquisition + decode are the heavy steps — hold the server + # capacity slot across both so concurrent videos stay within host RAM. + async with self._decode_slot(): + await self._materialize(request, tmp_path, loop) + frames, duration = await loop.run_in_executor( + None, sample_keyframes, str(tmp_path), request.keyframes + ) if not frames: raise ClassifyError( "No decodable frames — unsupported or corrupt codec" @@ -394,8 +445,6 @@ class ClassifyVideoProcessor: ) return result finally: - import shutil - shutil.rmtree(tmp_dir, ignore_errors=True) async def _score_frames( @@ -429,6 +478,10 @@ class ClassifyVideoProcessor: "texts": texts, "mode": "contrastive", "x_client_id": "imajin-video", + # High priority lets the coordinator spawn a 2nd vision slot on + # the idle GPU so frames score in parallel (pool spawns instance + # N+1 only for priority ≤5; siglip2 max_instances=2 permits it). + "x_priority": settings.classify_vision_priority, }, ) resp.raise_for_status() diff --git a/services/imajin-video/service/src/pipeline/minio_fetch.py b/services/imajin-video/service/src/pipeline/minio_fetch.py new file mode 100644 index 00000000..6c767169 --- /dev/null +++ b/services/imajin-video/service/src/pipeline/minio_fetch.py @@ -0,0 +1,68 @@ +"""MinIO object fetcher for /classify-video. + +Reverses the original decision-5 ("imajin reads no object store"): for the video +backfill, the platform sends a ``storage_key`` instead of inlining the whole file +as base64. imajin streams the object straight from MinIO to a temp file on disk — +so the bytes never sit in the platform's (or imajin's) heap, there is no base64 +size cap, and large videos stop being a special case. + +The stream-to-disk is deliberate: cv2 decodes from the file on demand, so a 4K clip +costs disk, not RAM. Failures raise ``ClassifyError`` (terminal) — a missing or +unreadable object is caller-attributable, surfaced as a ``failed`` job, not a 5xx. +""" +from __future__ import annotations + +import asyncio +import logging + +from minio import Minio + +from pipeline.classify_processor import ClassifyError + +logger = logging.getLogger(__name__) + +# 8 MiB stream chunks — large enough to amortize syscalls, small enough to keep the +# per-fetch heap footprint trivial regardless of object size. +_CHUNK_BYTES = 8 * 1024 * 1024 + + +class MinioVideoFetcher: + """Streams a video object from MinIO to a local path. Thread-safe per call + (the blocking S3 I/O runs in a worker thread).""" + + def __init__( + self, + *, + endpoint: str, + access_key: str, + secret_key: str, + bucket: str, + secure: bool = False, + ) -> None: + self._client = Minio( + endpoint, access_key=access_key, secret_key=secret_key, secure=secure + ) + self._bucket = bucket + + async def fetch_to_path(self, storage_key: str, dest_path: str) -> None: + """Stream ``bucket/storage_key`` into ``dest_path``. Raises ClassifyError + (terminal) if the object can't be read.""" + key = storage_key.lstrip("/") + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._fetch_sync, key, dest_path) + + def _fetch_sync(self, key: str, dest_path: str) -> None: + response = None + try: + response = self._client.get_object(self._bucket, key) + with open(dest_path, "wb") as out: + for chunk in response.stream(_CHUNK_BYTES): + out.write(chunk) + except Exception as exc: # noqa: BLE001 — any S3 fault is terminal for this job + raise ClassifyError( + f"object fetch failed for {self._bucket}/{key}: {exc}" + ) from exc + finally: + if response is not None: + response.close() + response.release_conn() diff --git a/services/imajin-video/service/tests/test_capability.py b/services/imajin-video/service/tests/test_capability.py new file mode 100644 index 00000000..c4276d1c --- /dev/null +++ b/services/imajin-video/service/tests/test_capability.py @@ -0,0 +1,33 @@ +"""Unit tests for server-capability concurrency sizing.""" +from __future__ import annotations + +from pipeline.capability import derive_video_concurrency + +GB = 1_000_000_000 + + +def test_ram_clamps_below_ceiling(): + # 3 GB available, 1.5 GB budget → 2 fit, below the ceiling of 4. + assert derive_video_concurrency(3 * GB, 1_500_000_000, ceiling=4) == 2 + + +def test_ceiling_caps_when_ram_is_plentiful(): + # 64 GB available would allow far more, but the ceiling holds the line. + assert derive_video_concurrency(64 * GB, 1_500_000_000, ceiling=4) == 4 + + +def test_never_below_floor_even_when_starved(): + # Less than one budget's worth of RAM still yields the floor (never 0). + assert derive_video_concurrency(100_000_000, 1_500_000_000, ceiling=4) == 1 + + +def test_custom_floor(): + assert derive_video_concurrency(100_000_000, 1_500_000_000, ceiling=8, floor=2) == 2 + + +def test_nonpositive_budget_disables_ram_clamp(): + assert derive_video_concurrency(1, 0, ceiling=4) == 4 + + +def test_ceiling_below_floor_is_lifted_to_floor(): + assert derive_video_concurrency(64 * GB, 1, ceiling=0, floor=1) == 1 diff --git a/services/imajin-video/service/tests/test_classify_video.py b/services/imajin-video/service/tests/test_classify_video.py index 50bc3608..0439bd08 100644 --- a/services/imajin-video/service/tests/test_classify_video.py +++ b/services/imajin-video/service/tests/test_classify_video.py @@ -286,3 +286,98 @@ async def test_process_job_failed_is_not_5xx_on_modelboss_down(multi_scene_video req = ClassifyVideoRequest(video_base64=_video_b64(multi_scene_video), keyframes=2) await proc.process("job-3", req) # must not raise assert store.records["job-3"]["status"] == "failed" + + +# --------------------------------------------------------------------------- +# Request source validation — exactly one of {video_base64, storage_key} +# --------------------------------------------------------------------------- +def test_request_rejects_both_sources(): + import pytest as _pytest + + with _pytest.raises(ValueError, match="exactly one"): + ClassifyVideoRequest(video_base64="AAAA", storage_key="some/key") + + +def test_request_rejects_neither_source(): + import pytest as _pytest + + with _pytest.raises(ValueError, match="exactly one"): + ClassifyVideoRequest(keyframes=4) + + +def test_request_accepts_storage_key_only(): + req = ClassifyVideoRequest(storage_key="device/2026/clip.mov") + assert req.storage_key == "device/2026/clip.mov" + assert req.video_base64 is None + + +# --------------------------------------------------------------------------- +# storage_key path — imajin streams the object to disk via the injected fetcher +# --------------------------------------------------------------------------- +class _FakeFetcher: + """Copies a pre-written local video into the processor's tmp path, standing in + for a MinIO stream-to-disk.""" + + def __init__(self, src_path: str) -> None: + self._src = src_path + self.calls: list[str] = [] + + async def fetch_to_path(self, storage_key: str, dest_path: str) -> None: + import shutil + + self.calls.append(storage_key) + shutil.copyfile(self._src, dest_path) + + +async def test_classify_via_storage_key_streams_and_scores(multi_scene_video, explicit_scorer): + fetcher = _FakeFetcher(multi_scene_video) + proc = ClassifyVideoProcessor( + classify_job_store=_FakeStore(), # type: ignore[arg-type] + video_fetcher=fetcher, + ) + req = ClassifyVideoRequest(storage_key="device/clip.avi", keyframes=4) + result = await proc.classify(req) + assert fetcher.calls == ["device/clip.avi"] # the fetcher was actually used + assert result.frame_count >= 1 + assert result.is_explicit is True + assert result.poster_b64 is not None + + +async def test_storage_key_without_fetcher_is_terminal(): + proc = ClassifyVideoProcessor(classify_job_store=_FakeStore()) # type: ignore[arg-type] + req = ClassifyVideoRequest(storage_key="device/clip.avi") + with pytest.raises(ClassifyError): + await proc.classify(req) + + +async def test_decode_slot_bounds_concurrency(multi_scene_video, explicit_scorer): + """The server-capability semaphore must cap concurrent decodes: with a slot of + 1, two overlapping classify() calls never decode at the same time.""" + import asyncio + + sem = asyncio.Semaphore(1) + fetcher = _FakeFetcher(multi_scene_video) + proc = ClassifyVideoProcessor( + classify_job_store=_FakeStore(), # type: ignore[arg-type] + video_fetcher=fetcher, + video_semaphore=sem, + ) + + in_slot = 0 + max_seen = 0 + real_materialize = proc._materialize + + async def tracking_materialize(request, tmp_path, loop): + nonlocal in_slot, max_seen + in_slot += 1 + max_seen = max(max_seen, in_slot) + await asyncio.sleep(0.02) # widen the window where overlap could happen + try: + await real_materialize(request, tmp_path, loop) + finally: + in_slot -= 1 + + proc._materialize = tracking_materialize # type: ignore[method-assign] + req = ClassifyVideoRequest(storage_key="device/clip.avi", keyframes=2) + await asyncio.gather(proc.classify(req), proc.classify(req)) + assert max_seen == 1 # never two inside the decode slot at once