diff --git a/services/imajin-video/service/src/detection/face_detector.py b/services/imajin-video/service/src/detection/face_detector.py index dc6842a3..df595b1c 100644 --- a/services/imajin-video/service/src/detection/face_detector.py +++ b/services/imajin-video/service/src/detection/face_detector.py @@ -80,8 +80,9 @@ class FaceDetector: self.service_name = service_name self._face_app = None - self._gpu_boss = None - self._lease = None + self._inference_client = None + self._lease_id: str | None = None + self._heartbeat_task: asyncio.Task | None = None self._initialized = False async def initialize(self) -> None: @@ -100,20 +101,41 @@ class FaceDetector: async def _acquire_gpu_lease(self) -> None: try: - from model_boss import GPUBoss, Priority + from model_boss.client import InferenceClient - self._gpu_boss = GPUBoss() - await self._gpu_boss.connect() - self._lease = await self._gpu_boss.acquire( - vram_mb=_INSIGHTFACE_VRAM_MB, - priority=Priority.NORMAL, - model_id=f"insightface-{self.model_name}-{self.service_name}", + self._inference_client = InferenceClient( + client_id="imajin-video", + auto_start_services=False, ) - logger.info(f"Acquired VRAM lease: {_INSIGHTFACE_VRAM_MB}MB") - except ImportError: - logger.warning("model-boss not available; proceeding without VRAM coordination") + lease = await self._inference_client.acquire_lease( + model_id=f"service:insightface-{self.model_name}", + vram_mb=_INSIGHTFACE_VRAM_MB, + priority="normal", + ) + self._lease_id = lease["lease_id"] + self.gpu_device_id = lease["gpu_index"] + logger.info( + f"Coordinator lease acquired: cuda:{self.gpu_device_id} " + f"(lease_id={self._lease_id})" + ) + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) except Exception as exc: - logger.warning(f"GPU coordination unavailable: {exc}; proceeding without lease") + logger.warning(f"Coordinator lease unavailable: {exc}; proceeding without lease") + if self._inference_client is not None: + await self._inference_client.dispose() + self._inference_client = None + + async def _heartbeat_loop(self) -> None: + """Send periodic heartbeats to keep the coordinator lease alive.""" + while True: + try: + await asyncio.sleep(10.0) + if self._inference_client is not None and self._lease_id is not None: + await self._inference_client.heartbeat(self._lease_id) + except asyncio.CancelledError: + break + except Exception: + pass async def _load_model(self) -> None: loop = asyncio.get_event_loop() @@ -131,19 +153,25 @@ class FaceDetector: self._face_app = await loop.run_in_executor(None, _load) async def cleanup(self) -> None: - if self._lease: + if self._heartbeat_task is not None: + self._heartbeat_task.cancel() + self._heartbeat_task = None + + if self._inference_client is not None and self._lease_id is not None: try: - await self._lease.release() - logger.info("Released GPU lease") + await self._inference_client.release_lease(self._lease_id) + logger.info("Released coordinator lease") except Exception as exc: - logger.warning(f"Error releasing GPU lease: {exc}") - self._lease = None - if self._gpu_boss: + logger.warning(f"Error releasing coordinator lease: {exc}") + self._lease_id = None + + if self._inference_client is not None: try: - await self._gpu_boss.close() + await self._inference_client.dispose() except Exception as exc: - logger.warning(f"Error closing GPU boss: {exc}") - self._gpu_boss = None + logger.warning(f"Error disposing InferenceClient: {exc}") + self._inference_client = None + self._face_app = None self._initialized = False