feat(imajin-mcp): Add video processing tools with face detection and health monitoring capabilities

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Claude Code 2026-04-04 15:10:57 -07:00
parent b946863c2f
commit 5a07f65a4d
7 changed files with 68 additions and 44 deletions

View file

@ -55,6 +55,15 @@ export class VideoServiceClient {
}
return res.json() as Promise<TranscodeJobStatus>;
}
async health(): Promise<unknown> {
const res = await fetch(`${this.baseUrl}/health`);
if (!res.ok) {
const text = await res.text();
throw new Error(`Health check failed (${res.status}): ${text}`);
}
return res.json();
}
}
export function createVideoClient(): VideoServiceClient {

View file

@ -33,7 +33,7 @@ export async function createServer(): Promise<void> {
...identityTools(identity),
...searchTools(identity),
...embedTools(identity),
...healthTools(identity, processing, diffusion),
...healthTools(identity, processing, diffusion, video),
...queueTools(queue),
];

View file

@ -1,6 +1,7 @@
import type { IdentityClient } from '@lilith/imajin-identity-client';
import type { ImageProcessingClient } from '@lilith/imajin-processing-client';
import type { ImageGenerationClient } from '@lilith/imajin-diffusion-client';
import type { VideoServiceClient } from '../client';
import type { ToolEntry, ContentBlock } from '../types';
import { jsonContent } from '../types';
@ -8,13 +9,14 @@ export function healthTools(
identity: IdentityClient,
processing: ImageProcessingClient,
diffusion: ImageGenerationClient,
video: VideoServiceClient,
): ToolEntry[] {
return [
{
definition: {
name: 'imajin_health',
description:
'Check health of all Imajin services: processing (8004), diffusion (8002), identity (8009). Returns status for each.',
'Check health of all Imajin services: processing (8004), diffusion (8002), identity (8009), video (8010). Returns status for each.',
inputSchema: {
type: 'object' as const,
properties: {},
@ -41,6 +43,12 @@ export function healthTools(
results.identity = { status: 'unreachable', error: err instanceof Error ? err.message : String(err) };
}
try {
results.video = await video.health();
} catch (err) {
results.video = { status: 'unreachable', error: err instanceof Error ? err.message : String(err) };
}
return jsonContent(results);
},
},

View file

@ -31,9 +31,9 @@ export function videoTools(client: VideoServiceClient): ToolEntry[] {
definition: {
name: 'convert_video',
description:
'Convert a video to standard definition (480p by default) to reduce file size. ' +
'iPhone HD clips (~12MB for 5s at 720p) are reduced to under 10MB. ' +
'Submits to the imajin-video service queue and waits for completion. ' +
'Convert a video to reduce file size. Default: 720p, CRF 28. ' +
'iPhone 1080p clips (~15MB for 7s) compress to under 1MB at 720p. ' +
'Submits to the imajin-video service and waits for completion (up to 5min). ' +
'For fire-and-forget on large files, use submit_video_job instead.',
inputSchema: {
type: 'object' as const,
@ -48,7 +48,7 @@ export function videoTools(client: VideoServiceClient): ToolEntry[] {
},
height: {
type: 'number',
description: 'Target height in pixels. Width auto-calculated to preserve aspect ratio. Default: 480.',
description: 'Target height in pixels. Width auto-calculated to preserve aspect ratio. Default: 720.',
},
crf: {
type: 'number',

View file

@ -16,7 +16,7 @@ platforms:
description: Video processing REST API (protection pipeline, recording proxy)
start:
path: ~/Code/@applications/@imajin/services/imajin-video/service
script: CUDA_VISIBLE_DEVICES=1 python3.12 -m uvicorn api.app:app --host 0.0.0.0 --port 8010 --app-dir src
script: python3.12 -m uvicorn api.app:app --host 0.0.0.0 --port 8010 --app-dir src
stop:
path: ~/Code/@applications/@imajin/services/imajin-video/service
script: pkill -f "python.*uvicorn api.app:app" ; pkill -f "python.*uvicorn src.api.app:app" ; true

View file

@ -11,7 +11,7 @@ import uuid
import structlog
from fastapi import FastAPI, Request
from lilith_service_fastapi_bootstrap import LifespanManager, apply_cors
from lilith_service_fastapi_bootstrap import LifespanManager, apply_cors, setup_logging
from prometheus_fastapi_instrumentator import Instrumentator
from starlette.middleware.base import BaseHTTPMiddleware
@ -28,8 +28,11 @@ from pipeline.video_processor import VideoProcessor
from .routes import detect, health, invisible_protect, media, process, recordings, transcode
# ---------------------------------------------------------------------------
# Structured logging
# Logging — configure stdlib root logger first so all libraries are visible,
# then layer structlog on top for structured output.
# ---------------------------------------------------------------------------
setup_logging(settings.service_name, level="INFO")
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,

View file

@ -1,8 +1,8 @@
"""InsightFace buffalo_l face detector adapted for video frame processing.
Loads the model once at startup (stateless across requests).
Acquires a VRAM lease from model-boss before loading; degrades gracefully
if model-boss is unavailable (logs a warning, proceeds without lease).
Acquires a VRAM lease from model-boss before loading; fails fast if
model-boss coordinator is unavailable.
"""
from __future__ import annotations
@ -14,6 +14,7 @@ from pathlib import Path
import cv2
import numpy as np
from model_boss.client import InferenceClient
from models.types import FaceRegion
@ -23,7 +24,7 @@ logger = logging.getLogger(__name__)
# buffalo_l recognition/landmark modules are NOT loaded here — imajin-adversarial owns those.
_INSIGHTFACE_VRAM_MB = 400
# Match the CUDA library path config from imajin-identity
def _configure_cuda_library_paths() -> None:
try:
import importlib.util
@ -80,7 +81,7 @@ class FaceDetector:
self.service_name = service_name
self._face_app = None
self._inference_client = None
self._inference_client: InferenceClient | None = None
self._lease_id: str | None = None
self._heartbeat_task: asyncio.Task | None = None
self._initialized = False
@ -100,42 +101,45 @@ class FaceDetector:
raise
async def _acquire_gpu_lease(self) -> None:
try:
from model_boss.client import InferenceClient
self._inference_client = InferenceClient(
client_id="imajin-video",
auto_start_services=False,
)
lease = await self._inference_client.acquire_lease(
model_id=f"service:insightface-{self.model_name}",
vram_mb=_INSIGHTFACE_VRAM_MB,
priority="normal",
)
self._lease_id = lease["lease_id"]
self.gpu_device_id = lease["gpu_index"]
logger.info(
f"Coordinator lease acquired: cuda:{self.gpu_device_id} "
f"(lease_id={self._lease_id})"
)
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
except Exception as exc:
logger.warning(f"Coordinator lease unavailable: {exc}; proceeding without lease")
if self._inference_client is not None:
await self._inference_client.dispose()
self._inference_client = None
self._inference_client = InferenceClient(
client_id="imajin-video",
auto_start_services=True,
)
await self._inference_client.connect()
lease = await self._inference_client.acquire_lease(
model_id=f"service:insightface-{self.model_name}",
vram_mb=_INSIGHTFACE_VRAM_MB,
priority="normal",
)
self._lease_id = lease["lease_id"]
self.gpu_device_id = lease["gpu_index"]
logger.info(
f"GPU lease acquired: cuda:{self.gpu_device_id} (lease_id={self._lease_id})"
)
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
async def _heartbeat_loop(self) -> None:
"""Send periodic heartbeats to keep the coordinator lease alive."""
while True:
try:
await asyncio.sleep(10.0)
if self._inference_client is not None and self._lease_id is not None:
await self._inference_client.heartbeat(self._lease_id)
if self._inference_client is None or self._lease_id is None:
return
still_alive = await self._inference_client.heartbeat(self._lease_id)
if not still_alive:
logger.error(
f"GPU lease {self._lease_id} was evicted by coordinator — "
"VRAM tracking is no longer accurate for this session"
)
return
except asyncio.CancelledError:
break
except Exception:
pass
return
except Exception as exc:
logger.error(
f"GPU lease heartbeat failed (lease_id={self._lease_id}): {exc}"
"stopping heartbeat; lease may be evicted by coordinator"
)
return
async def _load_model(self) -> None:
loop = asyncio.get_event_loop()
@ -160,9 +164,9 @@ class FaceDetector:
if self._inference_client is not None and self._lease_id is not None:
try:
await self._inference_client.release_lease(self._lease_id)
logger.info("Released coordinator lease")
logger.info(f"Released GPU lease {self._lease_id}")
except Exception as exc:
logger.warning(f"Error releasing coordinator lease: {exc}")
logger.warning(f"Error releasing GPU lease {self._lease_id}: {exc}")
self._lease_id = None
if self._inference_client is not None: