feat(imajin-manage): ✨ Introduce new management commands and API endpoints for improved scalability and orchestration in Imajin service
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
69a0df63c9
commit
034bc70eb2
4 changed files with 482 additions and 0 deletions
62
services/imajin-manage/run
Executable file
62
services/imajin-manage/run
Executable file
|
|
@ -0,0 +1,62 @@
|
|||
#!/usr/bin/env bash
|
||||
# imajin-manage service runner
|
||||
# Usage: ./run [start|stop|restart|dev|logs|status]
|
||||
|
||||
set -euo pipefail
|
||||
SERVICE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/service" && pwd)"
|
||||
cd "$SERVICE_DIR"
|
||||
|
||||
LOG_FILE="/tmp/imajin-manage.log"
|
||||
PORT=8015
|
||||
APP="src.main:app"
|
||||
|
||||
_pid() {
|
||||
ss -tlnp "sport = :${PORT}" 2>/dev/null | grep -oP 'pid=\K[0-9]+' | head -1 || true
|
||||
}
|
||||
|
||||
cmd="${1:-status}"
|
||||
|
||||
case "$cmd" in
|
||||
start)
|
||||
existing=$(_pid)
|
||||
if [[ -n "$existing" ]]; then
|
||||
echo "Already running (PID $existing)"
|
||||
exit 0
|
||||
fi
|
||||
nohup .venv/bin/python -m uvicorn "$APP" --host 0.0.0.0 --port "$PORT" --log-level info \
|
||||
> "$LOG_FILE" 2>&1 &
|
||||
echo "Started PID $! — logs: $LOG_FILE"
|
||||
;;
|
||||
stop)
|
||||
pid=$(_pid)
|
||||
if [[ -n "$pid" ]]; then
|
||||
kill "$pid" && echo "Stopped PID $pid"
|
||||
else
|
||||
echo "Not running"
|
||||
fi
|
||||
;;
|
||||
restart)
|
||||
"$0" stop
|
||||
sleep 1
|
||||
"$0" start
|
||||
;;
|
||||
dev)
|
||||
exec .venv/bin/python -m uvicorn "$APP" --host 0.0.0.0 --port "$PORT" --reload
|
||||
;;
|
||||
logs)
|
||||
exec tail -f "$LOG_FILE"
|
||||
;;
|
||||
status)
|
||||
pid=$(_pid)
|
||||
if [[ -n "$pid" ]]; then
|
||||
echo "Running (PID $pid)"
|
||||
curl -s "http://localhost:${PORT}/health" && echo ""
|
||||
else
|
||||
echo "Stopped"
|
||||
fi
|
||||
;;
|
||||
*)
|
||||
echo "Usage: $0 [start|stop|restart|dev|logs|status]"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
19
services/imajin-manage/service/pyproject.toml
Normal file
19
services/imajin-manage/service/pyproject.toml
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "imajin-manage"
|
||||
version = "0.1.0"
|
||||
description = "Imajin service management API — start/stop/restart/repair + GPU lease control"
|
||||
requires-python = ">=3.12"
|
||||
license = "MIT"
|
||||
authors = [{ name = "Lilith", email = "dev@atlilith.com" }]
|
||||
dependencies = [
|
||||
"fastapi>=0.115.0",
|
||||
"uvicorn[standard]>=0.32.0",
|
||||
"httpx>=0.25.0",
|
||||
]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src"]
|
||||
0
services/imajin-manage/service/src/__init__.py
Normal file
0
services/imajin-manage/service/src/__init__.py
Normal file
401
services/imajin-manage/service/src/main.py
Normal file
401
services/imajin-manage/service/src/main.py
Normal file
|
|
@ -0,0 +1,401 @@
|
|||
"""imajin-manage — service management + GPU lease API.
|
||||
|
||||
Provides HTTP endpoints to start/stop/restart/repair the three core imajin
|
||||
services (diffusion, identity, classifier) and to inspect/release GPU leases
|
||||
from the model-boss coordinator.
|
||||
|
||||
Port: 8015
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from pydantic import BaseModel
|
||||
|
||||
# ─── Config ──────────────────────────────────────────────────────────────────
|
||||
|
||||
MODEL_BOSS_URL = os.environ.get("MODEL_BOSS_URL", "http://localhost:8210")
|
||||
_BASE = Path("/var/home/lilith/Code/@applications/@imajin/services")
|
||||
USERDATA_PATH = Path.home() / ".local/share/imajin/userdata.json"
|
||||
|
||||
|
||||
class ServiceCfg(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
port: int
|
||||
cwd: str
|
||||
start_cmd: str
|
||||
log_file: str
|
||||
description: str
|
||||
gpu_required: bool = False
|
||||
|
||||
|
||||
SERVICES: dict[str, ServiceCfg] = {
|
||||
"diffusion": ServiceCfg(
|
||||
id="diffusion",
|
||||
name="Diffusion",
|
||||
port=8002,
|
||||
cwd=str(_BASE / "imajin-diffusion/service"),
|
||||
start_cmd=(
|
||||
"CLASSIFIER_URL=http://localhost:8012 "
|
||||
".venv/bin/python -m uvicorn src.api.main:app "
|
||||
"--host 0.0.0.0 --port 8002 --workers 1 --log-level info"
|
||||
),
|
||||
log_file="/tmp/imajin-diffusion.log",
|
||||
description="SDXL image generation + gallery API",
|
||||
gpu_required=True,
|
||||
),
|
||||
"identity": ServiceCfg(
|
||||
id="identity",
|
||||
name="Identity",
|
||||
port=8009,
|
||||
cwd=str(_BASE / "imajin-identity/service"),
|
||||
start_cmd=(
|
||||
"PYTHONPATH=src .venv/bin/python -m uvicorn src.api.app:app "
|
||||
"--host 0.0.0.0 --port 8009 --workers 1 --log-level info"
|
||||
),
|
||||
log_file="/tmp/imajin-identity.log",
|
||||
description="Face detection, identity profiles, InsightFace",
|
||||
gpu_required=True,
|
||||
),
|
||||
"classifier": ServiceCfg(
|
||||
id="classifier",
|
||||
name="Classifier",
|
||||
port=8012,
|
||||
cwd=str(_BASE / "imajin-classifier/service"),
|
||||
start_cmd=(
|
||||
".venv/bin/python -m uvicorn src.api.main:app "
|
||||
"--host 0.0.0.0 --port 8012 --workers 1 --log-level info"
|
||||
),
|
||||
log_file="/tmp/imajin-classifier.log",
|
||||
description="Image analysis — SigLIP2 + CV + Qwen3VL",
|
||||
gpu_required=True,
|
||||
),
|
||||
}
|
||||
|
||||
# ─── App ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
app = FastAPI(title="imajin-manage", version="0.1.0")
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# ─── Process helpers ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def _pid_on_port(port: int) -> int | None:
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
f"ss -tlnp 'sport = :{port}' 2>/dev/null | grep -oP 'pid=\\K[0-9]+' | head -1",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.DEVNULL,
|
||||
)
|
||||
out, _ = await proc.communicate()
|
||||
s = out.decode().strip()
|
||||
return int(s) if s.isdigit() else None
|
||||
|
||||
|
||||
async def _health_ok(port: int) -> bool:
|
||||
try:
|
||||
async with httpx.AsyncClient() as c:
|
||||
r = await c.get(f"http://localhost:{port}/health", timeout=3.0)
|
||||
return r.status_code < 500
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
async def _stop_svc(svc: ServiceCfg) -> str:
|
||||
pid = await _pid_on_port(svc.port)
|
||||
if pid is None:
|
||||
return "not running"
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
return "already gone"
|
||||
for _ in range(10):
|
||||
await asyncio.sleep(0.5)
|
||||
if await _pid_on_port(svc.port) is None:
|
||||
return f"stopped pid={pid}"
|
||||
try:
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
return f"killed pid={pid}"
|
||||
|
||||
|
||||
async def _start_svc(svc: ServiceCfg) -> str:
|
||||
pid = await _pid_on_port(svc.port)
|
||||
if pid:
|
||||
return f"already running pid={pid}"
|
||||
log_path = Path(svc.log_file)
|
||||
with open(log_path, "a") as log_f:
|
||||
ts = datetime.now(timezone.utc).isoformat()
|
||||
log_f.write(f"\n--- started by imajin-manage at {ts} ---\n")
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
svc.start_cmd,
|
||||
cwd=svc.cwd,
|
||||
stdout=log_f,
|
||||
stderr=log_f,
|
||||
start_new_session=True,
|
||||
)
|
||||
# Wait up to 8s for the port to bind
|
||||
for _ in range(8):
|
||||
await asyncio.sleep(1)
|
||||
if await _pid_on_port(svc.port) is not None:
|
||||
return f"started pid={proc.pid}"
|
||||
return f"started pid={proc.pid} (port not yet bound)"
|
||||
|
||||
|
||||
# ─── Model-boss helpers ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def _get_leases() -> list[dict]:
|
||||
async with httpx.AsyncClient() as c:
|
||||
r = await c.get(f"{MODEL_BOSS_URL}/v1/leases", timeout=5.0)
|
||||
r.raise_for_status()
|
||||
return r.json().get("leases", [])
|
||||
|
||||
|
||||
async def _release_lease(lease_id: str) -> dict:
|
||||
async with httpx.AsyncClient() as c:
|
||||
r = await c.post(
|
||||
f"{MODEL_BOSS_URL}/v1/leases/{lease_id}/release", timeout=5.0
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
async def _cleanup_stale_leases() -> dict:
|
||||
"""Release leases whose acquired_at == last_heartbeat (never heartbeated)
|
||||
and whose owning process is no longer running on the reported port."""
|
||||
leases = await _get_leases()
|
||||
released = 0
|
||||
skipped = 0
|
||||
for lease in leases:
|
||||
acquired = lease.get("acquired_at", 0)
|
||||
heartbeat = lease.get("last_heartbeat", acquired)
|
||||
if acquired != heartbeat:
|
||||
skipped += 1
|
||||
continue # Has live heartbeats — leave it
|
||||
# Check if any service is still running that owns this lease
|
||||
# We identify by matching service_name pattern to known ports
|
||||
service_name: str = lease.get("service_name", "")
|
||||
port: int | None = None
|
||||
for svc in SERVICES.values():
|
||||
if svc.id in service_name.lower():
|
||||
port = svc.port
|
||||
break
|
||||
if port is not None:
|
||||
pid = await _pid_on_port(port)
|
||||
if pid is not None:
|
||||
skipped += 1
|
||||
continue # Service is running — keep its lease
|
||||
try:
|
||||
await _release_lease(lease["lease_id"])
|
||||
released += 1
|
||||
except Exception:
|
||||
skipped += 1
|
||||
return {"released": released, "skipped": skipped}
|
||||
|
||||
|
||||
async def _gpu_info() -> list[dict]:
|
||||
"""Get GPU capacity from nvidia-smi."""
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
"nvidia-smi --query-gpu=index,name,memory.total,memory.used,temperature.gpu,utilization.gpu "
|
||||
"--format=csv,noheader,nounits",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.DEVNULL,
|
||||
)
|
||||
out, _ = await proc.communicate()
|
||||
gpus = []
|
||||
for line in out.decode().strip().splitlines():
|
||||
parts = [p.strip() for p in line.split(",")]
|
||||
if len(parts) < 6:
|
||||
continue
|
||||
gpus.append({
|
||||
"index": int(parts[0]),
|
||||
"name": parts[1],
|
||||
"total_mb": int(parts[2]),
|
||||
"used_mb": int(parts[3]),
|
||||
"temperature": int(parts[4]),
|
||||
"utilization": int(parts[5]),
|
||||
})
|
||||
return gpus
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
# ─── Repair logic ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def _repair(svc: ServiceCfg) -> list[str]:
|
||||
steps: list[str] = []
|
||||
|
||||
if svc.gpu_required:
|
||||
result = await _cleanup_stale_leases()
|
||||
steps.append(f"cleanup stale GPU leases → released={result['released']} skipped={result['skipped']}")
|
||||
|
||||
stop_msg = await _stop_svc(svc)
|
||||
steps.append(f"stop → {stop_msg}")
|
||||
await asyncio.sleep(2)
|
||||
|
||||
start_msg = await _start_svc(svc)
|
||||
steps.append(f"start → {start_msg}")
|
||||
|
||||
return steps
|
||||
|
||||
|
||||
# ─── Response models ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class ActionResponse(BaseModel):
|
||||
ok: bool
|
||||
message: str
|
||||
|
||||
|
||||
class RepairResponse(BaseModel):
|
||||
ok: bool
|
||||
steps: list[str]
|
||||
|
||||
|
||||
class ServiceStatus(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
port: int
|
||||
description: str
|
||||
online: bool
|
||||
pid: int | None
|
||||
gpu_required: bool
|
||||
|
||||
|
||||
# ─── Routes ──────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health() -> dict:
|
||||
return {"status": "healthy", "service": "imajin-manage"}
|
||||
|
||||
|
||||
@app.get("/services")
|
||||
async def list_services() -> dict:
|
||||
results = await asyncio.gather(*[_service_status(svc) for svc in SERVICES.values()])
|
||||
return {"services": list(results)}
|
||||
|
||||
|
||||
async def _service_status(svc: ServiceCfg) -> dict:
|
||||
pid = await _pid_on_port(svc.port)
|
||||
online = pid is not None and await _health_ok(svc.port)
|
||||
return {
|
||||
"id": svc.id,
|
||||
"name": svc.name,
|
||||
"port": svc.port,
|
||||
"description": svc.description,
|
||||
"online": online,
|
||||
"pid": pid,
|
||||
"gpu_required": svc.gpu_required,
|
||||
}
|
||||
|
||||
|
||||
@app.post("/services/{service_id}/start")
|
||||
async def start_service(service_id: str) -> ActionResponse:
|
||||
svc = SERVICES.get(service_id)
|
||||
if not svc:
|
||||
raise HTTPException(404, f"Unknown service: {service_id}")
|
||||
msg = await _start_svc(svc)
|
||||
return ActionResponse(ok=True, message=msg)
|
||||
|
||||
|
||||
@app.post("/services/{service_id}/stop")
|
||||
async def stop_service(service_id: str) -> ActionResponse:
|
||||
svc = SERVICES.get(service_id)
|
||||
if not svc:
|
||||
raise HTTPException(404, f"Unknown service: {service_id}")
|
||||
msg = await _stop_svc(svc)
|
||||
return ActionResponse(ok=True, message=msg)
|
||||
|
||||
|
||||
@app.post("/services/{service_id}/restart")
|
||||
async def restart_service(service_id: str) -> ActionResponse:
|
||||
svc = SERVICES.get(service_id)
|
||||
if not svc:
|
||||
raise HTTPException(404, f"Unknown service: {service_id}")
|
||||
stop_msg = await _stop_svc(svc)
|
||||
await asyncio.sleep(1)
|
||||
start_msg = await _start_svc(svc)
|
||||
return ActionResponse(ok=True, message=f"stop: {stop_msg} | start: {start_msg}")
|
||||
|
||||
|
||||
@app.post("/services/{service_id}/repair")
|
||||
async def repair_service(service_id: str) -> RepairResponse:
|
||||
svc = SERVICES.get(service_id)
|
||||
if not svc:
|
||||
raise HTTPException(404, f"Unknown service: {service_id}")
|
||||
steps = await _repair(svc)
|
||||
return RepairResponse(ok=True, steps=steps)
|
||||
|
||||
|
||||
@app.get("/gpu/leases")
|
||||
async def gpu_leases() -> dict:
|
||||
try:
|
||||
leases = await _get_leases()
|
||||
gpus = await _gpu_info()
|
||||
return {"leases": leases, "gpus": gpus, "total": len(leases)}
|
||||
except Exception as exc:
|
||||
raise HTTPException(503, f"model-boss unavailable: {exc}") from exc
|
||||
|
||||
|
||||
@app.post("/gpu/leases/cleanup")
|
||||
async def cleanup_leases() -> dict:
|
||||
try:
|
||||
return await _cleanup_stale_leases()
|
||||
except Exception as exc:
|
||||
raise HTTPException(503, f"model-boss unavailable: {exc}") from exc
|
||||
|
||||
|
||||
@app.delete("/gpu/leases/{lease_id}")
|
||||
async def release_lease_by_id(lease_id: str) -> dict:
|
||||
try:
|
||||
return await _release_lease(lease_id)
|
||||
except Exception as exc:
|
||||
raise HTTPException(503, f"Failed to release lease: {exc}") from exc
|
||||
|
||||
|
||||
# ─── Userdata ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@app.get("/userdata")
|
||||
async def get_userdata() -> dict[str, Any]:
|
||||
if not USERDATA_PATH.exists():
|
||||
return {}
|
||||
return json.loads(USERDATA_PATH.read_text())
|
||||
|
||||
|
||||
@app.patch("/userdata")
|
||||
async def patch_userdata(data: dict[str, Any]) -> dict[str, Any]:
|
||||
USERDATA_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
existing: dict[str, Any] = {}
|
||||
if USERDATA_PATH.exists():
|
||||
try:
|
||||
existing = json.loads(USERDATA_PATH.read_text())
|
||||
except json.JSONDecodeError:
|
||||
existing = {}
|
||||
existing.update(data)
|
||||
USERDATA_PATH.write_text(json.dumps(existing, indent=2))
|
||||
return existing
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(app, host="0.0.0.0", port=8015, log_level="info")
|
||||
Loading…
Add table
Reference in a new issue