diff --git a/services/imajin-manage/run b/services/imajin-manage/run new file mode 100755 index 00000000..1ae5d5ed --- /dev/null +++ b/services/imajin-manage/run @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +# imajin-manage service runner +# Usage: ./run [start|stop|restart|dev|logs|status] + +set -euo pipefail +SERVICE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/service" && pwd)" +cd "$SERVICE_DIR" + +LOG_FILE="/tmp/imajin-manage.log" +PORT=8015 +APP="src.main:app" + +_pid() { + ss -tlnp "sport = :${PORT}" 2>/dev/null | grep -oP 'pid=\K[0-9]+' | head -1 || true +} + +cmd="${1:-status}" + +case "$cmd" in + start) + existing=$(_pid) + if [[ -n "$existing" ]]; then + echo "Already running (PID $existing)" + exit 0 + fi + nohup .venv/bin/python -m uvicorn "$APP" --host 0.0.0.0 --port "$PORT" --log-level info \ + > "$LOG_FILE" 2>&1 & + echo "Started PID $! — logs: $LOG_FILE" + ;; + stop) + pid=$(_pid) + if [[ -n "$pid" ]]; then + kill "$pid" && echo "Stopped PID $pid" + else + echo "Not running" + fi + ;; + restart) + "$0" stop + sleep 1 + "$0" start + ;; + dev) + exec .venv/bin/python -m uvicorn "$APP" --host 0.0.0.0 --port "$PORT" --reload + ;; + logs) + exec tail -f "$LOG_FILE" + ;; + status) + pid=$(_pid) + if [[ -n "$pid" ]]; then + echo "Running (PID $pid)" + curl -s "http://localhost:${PORT}/health" && echo "" + else + echo "Stopped" + fi + ;; + *) + echo "Usage: $0 [start|stop|restart|dev|logs|status]" + exit 1 + ;; +esac diff --git a/services/imajin-manage/service/pyproject.toml b/services/imajin-manage/service/pyproject.toml new file mode 100644 index 00000000..98a26aa0 --- /dev/null +++ b/services/imajin-manage/service/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "imajin-manage" +version = "0.1.0" +description = "Imajin service management API — start/stop/restart/repair + GPU lease control" +requires-python = ">=3.12" +license = "MIT" +authors = [{ name = "Lilith", email = "dev@atlilith.com" }] +dependencies = [ + "fastapi>=0.115.0", + "uvicorn[standard]>=0.32.0", + "httpx>=0.25.0", +] + +[tool.hatch.build.targets.wheel] +packages = ["src"] diff --git a/services/imajin-manage/service/src/__init__.py b/services/imajin-manage/service/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/services/imajin-manage/service/src/main.py b/services/imajin-manage/service/src/main.py new file mode 100644 index 00000000..b7370cd5 --- /dev/null +++ b/services/imajin-manage/service/src/main.py @@ -0,0 +1,401 @@ +"""imajin-manage — service management + GPU lease API. + +Provides HTTP endpoints to start/stop/restart/repair the three core imajin +services (diffusion, identity, classifier) and to inspect/release GPU leases +from the model-boss coordinator. + +Port: 8015 +""" + +import asyncio +import json +import os +import signal +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import httpx +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +# ─── Config ────────────────────────────────────────────────────────────────── + +MODEL_BOSS_URL = os.environ.get("MODEL_BOSS_URL", "http://localhost:8210") +_BASE = Path("/var/home/lilith/Code/@applications/@imajin/services") +USERDATA_PATH = Path.home() / ".local/share/imajin/userdata.json" + + +class ServiceCfg(BaseModel): + id: str + name: str + port: int + cwd: str + start_cmd: str + log_file: str + description: str + gpu_required: bool = False + + +SERVICES: dict[str, ServiceCfg] = { + "diffusion": ServiceCfg( + id="diffusion", + name="Diffusion", + port=8002, + cwd=str(_BASE / "imajin-diffusion/service"), + start_cmd=( + "CLASSIFIER_URL=http://localhost:8012 " + ".venv/bin/python -m uvicorn src.api.main:app " + "--host 0.0.0.0 --port 8002 --workers 1 --log-level info" + ), + log_file="/tmp/imajin-diffusion.log", + description="SDXL image generation + gallery API", + gpu_required=True, + ), + "identity": ServiceCfg( + id="identity", + name="Identity", + port=8009, + cwd=str(_BASE / "imajin-identity/service"), + start_cmd=( + "PYTHONPATH=src .venv/bin/python -m uvicorn src.api.app:app " + "--host 0.0.0.0 --port 8009 --workers 1 --log-level info" + ), + log_file="/tmp/imajin-identity.log", + description="Face detection, identity profiles, InsightFace", + gpu_required=True, + ), + "classifier": ServiceCfg( + id="classifier", + name="Classifier", + port=8012, + cwd=str(_BASE / "imajin-classifier/service"), + start_cmd=( + ".venv/bin/python -m uvicorn src.api.main:app " + "--host 0.0.0.0 --port 8012 --workers 1 --log-level info" + ), + log_file="/tmp/imajin-classifier.log", + description="Image analysis — SigLIP2 + CV + Qwen3VL", + gpu_required=True, + ), +} + +# ─── App ───────────────────────────────────────────────────────────────────── + +app = FastAPI(title="imajin-manage", version="0.1.0") +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +# ─── Process helpers ───────────────────────────────────────────────────────── + + +async def _pid_on_port(port: int) -> int | None: + proc = await asyncio.create_subprocess_shell( + f"ss -tlnp 'sport = :{port}' 2>/dev/null | grep -oP 'pid=\\K[0-9]+' | head -1", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + out, _ = await proc.communicate() + s = out.decode().strip() + return int(s) if s.isdigit() else None + + +async def _health_ok(port: int) -> bool: + try: + async with httpx.AsyncClient() as c: + r = await c.get(f"http://localhost:{port}/health", timeout=3.0) + return r.status_code < 500 + except Exception: + return False + + +async def _stop_svc(svc: ServiceCfg) -> str: + pid = await _pid_on_port(svc.port) + if pid is None: + return "not running" + try: + os.kill(pid, signal.SIGTERM) + except ProcessLookupError: + return "already gone" + for _ in range(10): + await asyncio.sleep(0.5) + if await _pid_on_port(svc.port) is None: + return f"stopped pid={pid}" + try: + os.kill(pid, signal.SIGKILL) + except ProcessLookupError: + pass + return f"killed pid={pid}" + + +async def _start_svc(svc: ServiceCfg) -> str: + pid = await _pid_on_port(svc.port) + if pid: + return f"already running pid={pid}" + log_path = Path(svc.log_file) + with open(log_path, "a") as log_f: + ts = datetime.now(timezone.utc).isoformat() + log_f.write(f"\n--- started by imajin-manage at {ts} ---\n") + proc = await asyncio.create_subprocess_shell( + svc.start_cmd, + cwd=svc.cwd, + stdout=log_f, + stderr=log_f, + start_new_session=True, + ) + # Wait up to 8s for the port to bind + for _ in range(8): + await asyncio.sleep(1) + if await _pid_on_port(svc.port) is not None: + return f"started pid={proc.pid}" + return f"started pid={proc.pid} (port not yet bound)" + + +# ─── Model-boss helpers ────────────────────────────────────────────────────── + + +async def _get_leases() -> list[dict]: + async with httpx.AsyncClient() as c: + r = await c.get(f"{MODEL_BOSS_URL}/v1/leases", timeout=5.0) + r.raise_for_status() + return r.json().get("leases", []) + + +async def _release_lease(lease_id: str) -> dict: + async with httpx.AsyncClient() as c: + r = await c.post( + f"{MODEL_BOSS_URL}/v1/leases/{lease_id}/release", timeout=5.0 + ) + r.raise_for_status() + return r.json() + + +async def _cleanup_stale_leases() -> dict: + """Release leases whose acquired_at == last_heartbeat (never heartbeated) + and whose owning process is no longer running on the reported port.""" + leases = await _get_leases() + released = 0 + skipped = 0 + for lease in leases: + acquired = lease.get("acquired_at", 0) + heartbeat = lease.get("last_heartbeat", acquired) + if acquired != heartbeat: + skipped += 1 + continue # Has live heartbeats — leave it + # Check if any service is still running that owns this lease + # We identify by matching service_name pattern to known ports + service_name: str = lease.get("service_name", "") + port: int | None = None + for svc in SERVICES.values(): + if svc.id in service_name.lower(): + port = svc.port + break + if port is not None: + pid = await _pid_on_port(port) + if pid is not None: + skipped += 1 + continue # Service is running — keep its lease + try: + await _release_lease(lease["lease_id"]) + released += 1 + except Exception: + skipped += 1 + return {"released": released, "skipped": skipped} + + +async def _gpu_info() -> list[dict]: + """Get GPU capacity from nvidia-smi.""" + try: + proc = await asyncio.create_subprocess_shell( + "nvidia-smi --query-gpu=index,name,memory.total,memory.used,temperature.gpu,utilization.gpu " + "--format=csv,noheader,nounits", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + out, _ = await proc.communicate() + gpus = [] + for line in out.decode().strip().splitlines(): + parts = [p.strip() for p in line.split(",")] + if len(parts) < 6: + continue + gpus.append({ + "index": int(parts[0]), + "name": parts[1], + "total_mb": int(parts[2]), + "used_mb": int(parts[3]), + "temperature": int(parts[4]), + "utilization": int(parts[5]), + }) + return gpus + except Exception: + return [] + + +# ─── Repair logic ───────────────────────────────────────────────────────────── + + +async def _repair(svc: ServiceCfg) -> list[str]: + steps: list[str] = [] + + if svc.gpu_required: + result = await _cleanup_stale_leases() + steps.append(f"cleanup stale GPU leases → released={result['released']} skipped={result['skipped']}") + + stop_msg = await _stop_svc(svc) + steps.append(f"stop → {stop_msg}") + await asyncio.sleep(2) + + start_msg = await _start_svc(svc) + steps.append(f"start → {start_msg}") + + return steps + + +# ─── Response models ────────────────────────────────────────────────────────── + + +class ActionResponse(BaseModel): + ok: bool + message: str + + +class RepairResponse(BaseModel): + ok: bool + steps: list[str] + + +class ServiceStatus(BaseModel): + id: str + name: str + port: int + description: str + online: bool + pid: int | None + gpu_required: bool + + +# ─── Routes ────────────────────────────────────────────────────────────────── + + +@app.get("/health") +async def health() -> dict: + return {"status": "healthy", "service": "imajin-manage"} + + +@app.get("/services") +async def list_services() -> dict: + results = await asyncio.gather(*[_service_status(svc) for svc in SERVICES.values()]) + return {"services": list(results)} + + +async def _service_status(svc: ServiceCfg) -> dict: + pid = await _pid_on_port(svc.port) + online = pid is not None and await _health_ok(svc.port) + return { + "id": svc.id, + "name": svc.name, + "port": svc.port, + "description": svc.description, + "online": online, + "pid": pid, + "gpu_required": svc.gpu_required, + } + + +@app.post("/services/{service_id}/start") +async def start_service(service_id: str) -> ActionResponse: + svc = SERVICES.get(service_id) + if not svc: + raise HTTPException(404, f"Unknown service: {service_id}") + msg = await _start_svc(svc) + return ActionResponse(ok=True, message=msg) + + +@app.post("/services/{service_id}/stop") +async def stop_service(service_id: str) -> ActionResponse: + svc = SERVICES.get(service_id) + if not svc: + raise HTTPException(404, f"Unknown service: {service_id}") + msg = await _stop_svc(svc) + return ActionResponse(ok=True, message=msg) + + +@app.post("/services/{service_id}/restart") +async def restart_service(service_id: str) -> ActionResponse: + svc = SERVICES.get(service_id) + if not svc: + raise HTTPException(404, f"Unknown service: {service_id}") + stop_msg = await _stop_svc(svc) + await asyncio.sleep(1) + start_msg = await _start_svc(svc) + return ActionResponse(ok=True, message=f"stop: {stop_msg} | start: {start_msg}") + + +@app.post("/services/{service_id}/repair") +async def repair_service(service_id: str) -> RepairResponse: + svc = SERVICES.get(service_id) + if not svc: + raise HTTPException(404, f"Unknown service: {service_id}") + steps = await _repair(svc) + return RepairResponse(ok=True, steps=steps) + + +@app.get("/gpu/leases") +async def gpu_leases() -> dict: + try: + leases = await _get_leases() + gpus = await _gpu_info() + return {"leases": leases, "gpus": gpus, "total": len(leases)} + except Exception as exc: + raise HTTPException(503, f"model-boss unavailable: {exc}") from exc + + +@app.post("/gpu/leases/cleanup") +async def cleanup_leases() -> dict: + try: + return await _cleanup_stale_leases() + except Exception as exc: + raise HTTPException(503, f"model-boss unavailable: {exc}") from exc + + +@app.delete("/gpu/leases/{lease_id}") +async def release_lease_by_id(lease_id: str) -> dict: + try: + return await _release_lease(lease_id) + except Exception as exc: + raise HTTPException(503, f"Failed to release lease: {exc}") from exc + + +# ─── Userdata ──────────────────────────────────────────────────────────────── + + +@app.get("/userdata") +async def get_userdata() -> dict[str, Any]: + if not USERDATA_PATH.exists(): + return {} + return json.loads(USERDATA_PATH.read_text()) + + +@app.patch("/userdata") +async def patch_userdata(data: dict[str, Any]) -> dict[str, Any]: + USERDATA_PATH.parent.mkdir(parents=True, exist_ok=True) + existing: dict[str, Any] = {} + if USERDATA_PATH.exists(): + try: + existing = json.loads(USERDATA_PATH.read_text()) + except json.JSONDecodeError: + existing = {} + existing.update(data) + USERDATA_PATH.write_text(json.dumps(existing, indent=2)) + return existing + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8015, log_level="info")