feat(api): Update main API routes or add new endpoints for the imajin-aesthetic service

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Claude Code 2026-03-25 23:46:40 -07:00
parent 3421d99d04
commit 94f88c4e1d

View file

@ -3,6 +3,7 @@
Uses lilith-fastapi-service-base for standardized service patterns.
"""
import asyncio
import base64
import io
import logging
@ -70,94 +71,90 @@ class RequestIDMiddleware(BaseHTTPMiddleware):
logger = get_logger(__name__)
# Global GPU coordination (initialized at startup)
_boss = None
# Global coordinator state (initialized at startup)
_inference_client = None
_lease_id: str | None = None
_heartbeat_task: asyncio.Task | None = None
async def init_gpu_boss() -> None:
"""Initialize GPU coordination.
Must be called at service startup before loading any models.
"""
global _boss
if _boss is not None:
logger.warning("GPU boss already initialized")
return
if not settings.gpu_enabled:
logger.info("GPU coordination disabled, using fallback device")
return
try:
from model_boss import GPUBoss
_boss = GPUBoss(redis_url=settings.gpu_redis_url)
await _boss.connect()
logger.info("GPU coordination initialized")
except ImportError:
logger.warning(
"model-boss not installed - running without GPU coordination. "
"Install with: pip install model-boss"
)
except Exception as e:
logger.warning(f"Failed to initialize GPU boss: {e}")
logger.warning("Running without GPU coordination")
async def _heartbeat_loop() -> None:
"""Send periodic heartbeats to the coordinator to keep the lease alive."""
while True:
try:
await asyncio.sleep(10.0)
if _inference_client is not None and _lease_id is not None:
await _inference_client.heartbeat(_lease_id)
except asyncio.CancelledError:
break
except Exception:
pass
async def shutdown_gpu_boss() -> None:
"""Shutdown GPU coordination and release all leases.
Call at service shutdown.
"""
global _boss
if _boss is not None:
await _boss.close()
_boss = None
logger.info("GPU coordination shutdown complete")
async def get_device() -> str:
"""Get device for model loading via GPU boss or fallback.
async def acquire_coordinator_lease() -> str:
"""Acquire a VRAM lease from the model-boss coordinator.
Returns:
Device string (e.g., 'cuda:0', 'cuda:1', 'cpu')
"""
if _boss is not None:
try:
from model_boss import Priority
global _inference_client, _lease_id, _heartbeat_task
priority_map = {
"low": Priority.LOW,
"normal": Priority.NORMAL,
"high": Priority.HIGH,
}
priority = priority_map.get(settings.gpu_priority, Priority.NORMAL)
try:
from model_boss.client import InferenceClient
# Request ~2GB for ImageReward model
lease = await _boss.acquire(
model_id="image-reward",
vram_mb=2048,
priority=priority,
)
device = f"cuda:{lease.device_id}"
logger.info(f"GPU boss allocated device: {device}")
return device
_inference_client = InferenceClient(
client_id="imajin-aesthetic",
auto_start_services=False,
)
lease = await _inference_client.acquire_lease(
model_id="service:image-reward",
vram_mb=2048,
priority="normal",
)
_lease_id = lease["lease_id"]
gpu_index = lease["gpu_index"]
device = f"cuda:{gpu_index}"
logger.info(f"Coordinator lease acquired: {device} (lease_id={_lease_id})")
except Exception as e:
logger.warning(f"GPU boss allocation failed: {e}, using fallback")
_heartbeat_task = asyncio.create_task(_heartbeat_loop())
return device
except Exception as e:
logger.warning(f"Coordinator lease failed: {e}, using fallback device")
if _inference_client is not None:
await _inference_client.dispose()
_inference_client = None
# Fallback when GPU boss unavailable
import torch
if torch.cuda.is_available():
return settings.fallback_device
return "cpu"
async def release_coordinator_lease() -> None:
"""Cancel heartbeat, release the coordinator lease, and dispose the client."""
global _inference_client, _lease_id, _heartbeat_task
if _heartbeat_task is not None:
_heartbeat_task.cancel()
_heartbeat_task = None
if _inference_client is not None and _lease_id is not None:
try:
await _inference_client.release_lease(_lease_id)
except Exception as e:
logger.warning(f"Error releasing coordinator lease: {e}")
_lease_id = None
if _inference_client is not None:
try:
await _inference_client.dispose()
except Exception as e:
logger.warning(f"Error disposing InferenceClient: {e}")
_inference_client = None
logger.info("Coordinator lease released")
# =============================================================================
# Lifespan Manager (using lilith-fastapi-service-base)
# =============================================================================
@ -167,14 +164,11 @@ lifespan = LifespanManager()
@lifespan.on_startup # type: ignore[untyped-decorator]
async def init_service() -> None:
"""Initialize aesthetic scorer and GPU coordination."""
"""Initialize aesthetic scorer and coordinator lease."""
logger.info("Initializing aesthetic scorer...")
# Initialize GPU coordination
await init_gpu_boss()
# Get device allocation
device = await get_device()
# Acquire coordinator lease
device = await acquire_coordinator_lease()
# Create scorer with allocated device
scorer = ImageRewardScorer(
@ -192,14 +186,14 @@ async def init_service() -> None:
@lifespan.on_shutdown # type: ignore[untyped-decorator]
async def cleanup_service() -> None:
"""Cleanup scorer and GPU resources."""
"""Cleanup scorer and coordinator lease."""
logger.info("Shutting down aesthetic scoring service")
scorer = lifespan.get_state("scorer")
if scorer is not None:
scorer.unload_model()
await shutdown_gpu_boss()
await release_coordinator_lease()
# =============================================================================
@ -279,7 +273,7 @@ async def health() -> dict[str, object]:
"model_loaded": scorer.is_loaded if scorer else False,
"gpu_enabled": scorer.is_gpu_enabled if scorer else False,
"device": scorer.device if scorer else None,
"gpu_coordination": _boss is not None,
"gpu_coordination": _inference_client is not None and _lease_id is not None,
}