diff --git a/services/imajin-video/service/src/pipeline/classify_processor.py b/services/imajin-video/service/src/pipeline/classify_processor.py index b55b0c29..12b74f7a 100644 --- a/services/imajin-video/service/src/pipeline/classify_processor.py +++ b/services/imajin-video/service/src/pipeline/classify_processor.py @@ -27,8 +27,8 @@ from pathlib import Path from typing import AsyncIterator, Protocol import cv2 -import httpx import numpy as np +from model_boss.client import InferenceClient from config.settings import settings from jobs.classify_job_store import ClassifyJobStore @@ -42,6 +42,25 @@ from models.classify_types import ( logger = logging.getLogger(__name__) +# Shared inference client for vision scoring (one connection pool, reused per frame). +_vision_client: InferenceClient | None = None + + +def _get_vision_client() -> InferenceClient: + """Lazily build the shared model-boss client. auto_start_services is OFF — imajin + is a consumer and must never try to spin up the coordinator/redis itself.""" + global _vision_client + if _vision_client is None: + _vision_client = InferenceClient( + coordinator_url=settings.model_boss_base_url, + client_id="imajin-video", + default_priority="normal", + auto_start_services=False, + timeout=settings.model_boss_timeout_s, + ) + return _vision_client + + # --------------------------------------------------------------------------- # Shared contrastive rubric — MUST mirror the consumer's CLASSIFY_RUBRIC # (cocotte content-ingestor classification.ts) so video explicitness is @@ -467,26 +486,17 @@ class ClassifyVideoProcessor: [pos,neg,...] label array, re-normalize per pair. """ texts = [s for d in CLASSIFY_RUBRIC for s in (d.positive, d.negative)] - url = f"{settings.model_boss_base_url.rstrip('/')}/v1/vision/score" try: - async with httpx.AsyncClient(timeout=settings.model_boss_timeout_s) as client: - resp = await client.post( - url, - json={ - "model": settings.model_boss_vision_model, - "image_base64": image_b64, - "texts": texts, - "mode": "contrastive", - "x_client_id": "imajin-video", - # High priority lets the coordinator spawn a 2nd vision slot on - # the idle GPU so frames score in parallel (pool spawns instance - # N+1 only for priority ≤5; siglip2 max_instances=2 permits it). - "x_priority": settings.classify_vision_priority, - }, - ) - resp.raise_for_status() - raw_scores = resp.json().get("scores", []) or [] - except httpx.HTTPError as exc: + # job_class=batch → the coordinator may spawn a 2nd vision slot on a free + # GPU to parallelize, at low priority that never preempts interactive vision. + raw_scores = await _get_vision_client().score( + model=settings.model_boss_vision_model, + image_base64=image_b64, + texts=texts, + mode="contrastive", + job_class=settings.classify_vision_job_class, + ) + except Exception as exc: # model-boss reachability is an INTERNAL fault (transient) — not a bad-media # terminal failure. Re-raise so the job records it distinctly from ClassifyError. raise RuntimeError(f"model-boss vision unreachable: {exc}") from exc