feat(pipeline): Introduce ClassifyProcessor to integrate model-boss client for video frame classification with inference and preprocessing logic

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2025-12-17 00:04:16 -08:00
parent 7d5e4cf46e
commit b0fa99c799

View file

@ -27,8 +27,8 @@ from pathlib import Path
from typing import AsyncIterator, Protocol
import cv2
import httpx
import numpy as np
from model_boss.client import InferenceClient
from config.settings import settings
from jobs.classify_job_store import ClassifyJobStore
@ -42,6 +42,25 @@ from models.classify_types import (
logger = logging.getLogger(__name__)
# Shared inference client for vision scoring (one connection pool, reused per frame).
_vision_client: InferenceClient | None = None
def _get_vision_client() -> InferenceClient:
"""Lazily build the shared model-boss client. auto_start_services is OFF — imajin
is a consumer and must never try to spin up the coordinator/redis itself."""
global _vision_client
if _vision_client is None:
_vision_client = InferenceClient(
coordinator_url=settings.model_boss_base_url,
client_id="imajin-video",
default_priority="normal",
auto_start_services=False,
timeout=settings.model_boss_timeout_s,
)
return _vision_client
# ---------------------------------------------------------------------------
# Shared contrastive rubric — MUST mirror the consumer's CLASSIFY_RUBRIC
# (cocotte content-ingestor classification.ts) so video explicitness is
@ -467,26 +486,17 @@ class ClassifyVideoProcessor:
[pos,neg,...] label array, re-normalize per pair.
"""
texts = [s for d in CLASSIFY_RUBRIC for s in (d.positive, d.negative)]
url = f"{settings.model_boss_base_url.rstrip('/')}/v1/vision/score"
try:
async with httpx.AsyncClient(timeout=settings.model_boss_timeout_s) as client:
resp = await client.post(
url,
json={
"model": settings.model_boss_vision_model,
"image_base64": image_b64,
"texts": texts,
"mode": "contrastive",
"x_client_id": "imajin-video",
# High priority lets the coordinator spawn a 2nd vision slot on
# the idle GPU so frames score in parallel (pool spawns instance
# N+1 only for priority ≤5; siglip2 max_instances=2 permits it).
"x_priority": settings.classify_vision_priority,
},
)
resp.raise_for_status()
raw_scores = resp.json().get("scores", []) or []
except httpx.HTTPError as exc:
# job_class=batch → the coordinator may spawn a 2nd vision slot on a free
# GPU to parallelize, at low priority that never preempts interactive vision.
raw_scores = await _get_vision_client().score(
model=settings.model_boss_vision_model,
image_base64=image_b64,
texts=texts,
mode="contrastive",
job_class=settings.classify_vision_job_class,
)
except Exception as exc:
# model-boss reachability is an INTERNAL fault (transient) — not a bad-media
# terminal failure. Re-raise so the job records it distinctly from ClassifyError.
raise RuntimeError(f"model-boss vision unreachable: {exc}") from exc