From 94f88c4e1da124db6d653a0151527cbccdd5e4ee Mon Sep 17 00:00:00 2001 From: Claude Code Date: Wed, 25 Mar 2026 23:46:40 -0700 Subject: [PATCH] =?UTF-8?q?feat(api):=20=E2=9C=A8=20Update=20main=20API=20?= =?UTF-8?q?routes=20or=20add=20new=20endpoints=20for=20the=20imajin-aesthe?= =?UTF-8?q?tic=20service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- .../imajin-aesthetic/service/src/api/main.py | 152 +++++++++--------- 1 file changed, 73 insertions(+), 79 deletions(-) diff --git a/services/imajin-aesthetic/service/src/api/main.py b/services/imajin-aesthetic/service/src/api/main.py index 9ca8db06..d7fe02cc 100644 --- a/services/imajin-aesthetic/service/src/api/main.py +++ b/services/imajin-aesthetic/service/src/api/main.py @@ -3,6 +3,7 @@ Uses lilith-fastapi-service-base for standardized service patterns. """ +import asyncio import base64 import io import logging @@ -70,94 +71,90 @@ class RequestIDMiddleware(BaseHTTPMiddleware): logger = get_logger(__name__) -# Global GPU coordination (initialized at startup) -_boss = None +# Global coordinator state (initialized at startup) +_inference_client = None +_lease_id: str | None = None +_heartbeat_task: asyncio.Task | None = None -async def init_gpu_boss() -> None: - """Initialize GPU coordination. - - Must be called at service startup before loading any models. - """ - global _boss - - if _boss is not None: - logger.warning("GPU boss already initialized") - return - - if not settings.gpu_enabled: - logger.info("GPU coordination disabled, using fallback device") - return - - try: - from model_boss import GPUBoss - - _boss = GPUBoss(redis_url=settings.gpu_redis_url) - await _boss.connect() - - logger.info("GPU coordination initialized") - - except ImportError: - logger.warning( - "model-boss not installed - running without GPU coordination. " - "Install with: pip install model-boss" - ) - except Exception as e: - logger.warning(f"Failed to initialize GPU boss: {e}") - logger.warning("Running without GPU coordination") +async def _heartbeat_loop() -> None: + """Send periodic heartbeats to the coordinator to keep the lease alive.""" + while True: + try: + await asyncio.sleep(10.0) + if _inference_client is not None and _lease_id is not None: + await _inference_client.heartbeat(_lease_id) + except asyncio.CancelledError: + break + except Exception: + pass -async def shutdown_gpu_boss() -> None: - """Shutdown GPU coordination and release all leases. - - Call at service shutdown. - """ - global _boss - - if _boss is not None: - await _boss.close() - _boss = None - - logger.info("GPU coordination shutdown complete") - - -async def get_device() -> str: - """Get device for model loading via GPU boss or fallback. +async def acquire_coordinator_lease() -> str: + """Acquire a VRAM lease from the model-boss coordinator. Returns: Device string (e.g., 'cuda:0', 'cuda:1', 'cpu') """ - if _boss is not None: - try: - from model_boss import Priority + global _inference_client, _lease_id, _heartbeat_task - priority_map = { - "low": Priority.LOW, - "normal": Priority.NORMAL, - "high": Priority.HIGH, - } - priority = priority_map.get(settings.gpu_priority, Priority.NORMAL) + try: + from model_boss.client import InferenceClient - # Request ~2GB for ImageReward model - lease = await _boss.acquire( - model_id="image-reward", - vram_mb=2048, - priority=priority, - ) - device = f"cuda:{lease.device_id}" - logger.info(f"GPU boss allocated device: {device}") - return device + _inference_client = InferenceClient( + client_id="imajin-aesthetic", + auto_start_services=False, + ) + lease = await _inference_client.acquire_lease( + model_id="service:image-reward", + vram_mb=2048, + priority="normal", + ) + _lease_id = lease["lease_id"] + gpu_index = lease["gpu_index"] + device = f"cuda:{gpu_index}" + logger.info(f"Coordinator lease acquired: {device} (lease_id={_lease_id})") - except Exception as e: - logger.warning(f"GPU boss allocation failed: {e}, using fallback") + _heartbeat_task = asyncio.create_task(_heartbeat_loop()) + return device + + except Exception as e: + logger.warning(f"Coordinator lease failed: {e}, using fallback device") + if _inference_client is not None: + await _inference_client.dispose() + _inference_client = None - # Fallback when GPU boss unavailable import torch if torch.cuda.is_available(): return settings.fallback_device return "cpu" +async def release_coordinator_lease() -> None: + """Cancel heartbeat, release the coordinator lease, and dispose the client.""" + global _inference_client, _lease_id, _heartbeat_task + + if _heartbeat_task is not None: + _heartbeat_task.cancel() + _heartbeat_task = None + + if _inference_client is not None and _lease_id is not None: + try: + await _inference_client.release_lease(_lease_id) + except Exception as e: + logger.warning(f"Error releasing coordinator lease: {e}") + _lease_id = None + + if _inference_client is not None: + try: + await _inference_client.dispose() + except Exception as e: + logger.warning(f"Error disposing InferenceClient: {e}") + _inference_client = None + + logger.info("Coordinator lease released") + + # ============================================================================= # Lifespan Manager (using lilith-fastapi-service-base) # ============================================================================= @@ -167,14 +164,11 @@ lifespan = LifespanManager() @lifespan.on_startup # type: ignore[untyped-decorator] async def init_service() -> None: - """Initialize aesthetic scorer and GPU coordination.""" + """Initialize aesthetic scorer and coordinator lease.""" logger.info("Initializing aesthetic scorer...") - # Initialize GPU coordination - await init_gpu_boss() - - # Get device allocation - device = await get_device() + # Acquire coordinator lease + device = await acquire_coordinator_lease() # Create scorer with allocated device scorer = ImageRewardScorer( @@ -192,14 +186,14 @@ async def init_service() -> None: @lifespan.on_shutdown # type: ignore[untyped-decorator] async def cleanup_service() -> None: - """Cleanup scorer and GPU resources.""" + """Cleanup scorer and coordinator lease.""" logger.info("Shutting down aesthetic scoring service") scorer = lifespan.get_state("scorer") if scorer is not None: scorer.unload_model() - await shutdown_gpu_boss() + await release_coordinator_lease() # ============================================================================= @@ -279,7 +273,7 @@ async def health() -> dict[str, object]: "model_loaded": scorer.is_loaded if scorer else False, "gpu_enabled": scorer.is_gpu_enabled if scorer else False, "device": scorer.device if scorer else None, - "gpu_coordination": _boss is not None, + "gpu_coordination": _inference_client is not None and _lease_id is not None, }