diff --git a/src/claire/agent/__init__.py b/src/claire/agent/__init__.py new file mode 100644 index 0000000..7bd0877 --- /dev/null +++ b/src/claire/agent/__init__.py @@ -0,0 +1,30 @@ +"""Headless per-host agent — the Linux counterpart to the macOS tray. + +Runs inside `create_app(peer_mode=True)` on apricot/black as a `systemd --user` +service. It drives background loops (sync now; supervisor + telemetry in a +later phase) but never touches the orchestrator lifecycle. +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + + +def start_agent_loops( + *, config_path: Path | None, db_path: Path | None +) -> list[asyncio.Task]: + """Start the agent's background loops; return them so the lifespan can + cancel them on shutdown. + + Phase 1: scheduled peer sync only. Supervisor + telemetry loops are + added here in a later phase. + """ + from .sync_loop import sync_loop + + return [ + asyncio.create_task(sync_loop(config_path=config_path, db_path=db_path)), + ] + + +__all__ = ["start_agent_loops"] diff --git a/src/claire/agent/sync_loop.py b/src/claire/agent/sync_loop.py new file mode 100644 index 0000000..d9cc1d6 --- /dev/null +++ b/src/claire/agent/sync_loop.py @@ -0,0 +1,67 @@ +"""Scheduled peer-sync loop for the headless `claire agent`. + +Push B's substrate (`sync.sync_peer`, `/api/v1/sync/*`) is one-shot via the +`claire sync` CLI. The agent adds the missing *scheduled* loop: every +`agent.sync_interval_s`, converge with each configured peer. Star topology — +peer hosts (apricot/black) sync only to plum, which fans changes out; so each +peer's `[[peers]]` lists plum, and apricot↔black converge in two hops. +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + + +async def sync_loop(*, config_path: Path | None, db_path: Path | None) -> None: + from ..config import load_or_init + from ..sync import sync_peer + + cfg = load_or_init(config_path) + interval = cfg.agent.sync_interval_s + # The agent serves its own API on loopback:agent.port; sync_peer POSTs + # ingested events back to ourselves and reads our own cursor there. + local_base = f"http://127.0.0.1:{cfg.agent.port}" + if not cfg.peers: + logger.warning( + "agent sync loop: no [[peers]] configured — nothing to sync (idle)" + ) + logger.info( + "agent sync loop enabled (every %ds, %d peer(s))", interval, len(cfg.peers) + ) + + while True: + try: + await asyncio.sleep(interval) + # Reload so peer/secret/interval edits apply without a restart. + cfg = load_or_init(config_path) + for peer in cfg.peers: + try: + result = await asyncio.to_thread( + sync_peer, + peer, + local_base=local_base, + local_secret=cfg.sync_secret, + ) + logger.info( + "sync %s: pulled=%d pushed=%d", + peer.url, result.pulled, result.pushed, + ) + except Exception as exc: # noqa: BLE001 + # A 401 here almost always means clock skew vs the HMAC + # window (SIG_MAX_SKEW_SEC) — log local time so it's + # diagnosable. Other failures (peer down) are transient. + logger.warning( + "sync %s failed (local time %s, epoch %.0f): %s", + peer.url, + datetime.now(timezone.utc).isoformat(), + time.time(), + exc, + ) + except asyncio.CancelledError: + return diff --git a/src/claire/cli.py b/src/claire/cli.py index 2a496d8..7960d8c 100644 --- a/src/claire/cli.py +++ b/src/claire/cli.py @@ -27,9 +27,11 @@ app = typer.Typer(help="Claire — project manager for the Claude agent fleet.", project_app = typer.Typer(help="Manage projects.") task_app = typer.Typer(help="Manage tasks.") orchestrator_app = typer.Typer(help="Manage the orchestrator Claude session.") +agent_app = typer.Typer(help="Headless peer-node daemon (Linux worker hosts).") app.add_typer(project_app, name="project") app.add_typer(task_app, name="task") app.add_typer(orchestrator_app, name="orchestrator") +app.add_typer(agent_app, name="agent") console = Console() @@ -122,6 +124,32 @@ def web( serve(host=host, port=port, no_build=no_build) +@agent_app.command("run") +def agent_run( + host: Annotated[str, typer.Option("--host")] = "127.0.0.1", + port: Annotated[int | None, typer.Option("--port")] = None, +) -> None: + """Run the headless peer-node daemon (peer mode — no orchestrator/SPA/MCP). + + Serves /api/v1/* (incl. /sync/*) on loopback + drives the agent loops + (sync now; supervisor + telemetry later). Deployed as a systemd --user + unit on apricot/black. + """ + import uvicorn + + from .web.app import create_app + + cfg = load_or_init() + bind_port = port or cfg.agent.port + console.print( + f"[bold]claire agent[/bold] (peer mode) on http://{host}:{bind_port} — " + f"{len(cfg.peers)} peer(s), sync every {cfg.agent.sync_interval_s}s" + ) + uvicorn.run( + create_app(peer_mode=True), host=host, port=bind_port, log_level="info" + ) + + # --------------------------------------------------------------------------- # SPA build management # --------------------------------------------------------------------------- diff --git a/src/claire/config.py b/src/claire/config.py index afaac13..2dd8ca1 100644 --- a/src/claire/config.py +++ b/src/claire/config.py @@ -51,6 +51,25 @@ class WebConfig(_Strict): port: int = Field(default=8765, ge=1, le=65535) +class AgentConfig(_Strict): + """Settings for the headless Linux peer-node daemon (`claire agent run`). + + Only consumed on apricot/black; harmless defaults on plum (the agent + isn't run there). The daemon binds loopback on `port`, syncs with each + configured peer every `sync_interval_s`, samples host telemetry every + `telemetry_interval_s`, and supervises local worker sessions (a session + with a live tmux pane but JSONL mtime older than `wedge_threshold_s` is + "wedged"). Respawn of wedged sessions stays OFF unless explicitly enabled. + """ + + port: int = Field(default=8766, ge=1, le=65535) + sync_interval_s: int = Field(default=120, ge=10, le=86400) + telemetry_interval_s: int = Field(default=60, ge=10, le=3600) + wedge_threshold_s: int = Field(default=300, ge=60, le=7200) + supervisor_enable: bool = True + supervisor_allow_respawn: bool = False + + class OrchestratorConfig(_Strict): """Settings for the live Claude session backing the orchestrator chat. @@ -153,6 +172,7 @@ class ClaireConfig(_Strict): orchestrator: OrchestratorConfig = Field(default_factory=OrchestratorConfig) budget: BudgetConfig = Field(default_factory=BudgetConfig) limits: LimitsConfig = Field(default_factory=LimitsConfig) + agent: AgentConfig = Field(default_factory=AgentConfig) # The canonical label this machine identifies as in dispatch + session # records. `None` falls back to the short form of `socket.gethostname()` # (first dot-segment, lowercased). Set explicitly to override (e.g. on a @@ -287,6 +307,19 @@ def _serialize(cfg: ClaireConfig) -> str: f'"{h}" = {c}' for h, c in sorted(lim.per_host.items()) ) lines.append(f"per_host = {{ {inner} }}") + # Emit [agent] only when it differs from defaults (peer hosts only). + ag = cfg.agent + if ag != AgentConfig(): + lines.append("") + lines.append("[agent]") + lines.append(f"port = {ag.port}") + lines.append(f"sync_interval_s = {ag.sync_interval_s}") + lines.append(f"telemetry_interval_s = {ag.telemetry_interval_s}") + lines.append(f"wedge_threshold_s = {ag.wedge_threshold_s}") + lines.append(f"supervisor_enable = {str(ag.supervisor_enable).lower()}") + lines.append( + f"supervisor_allow_respawn = {str(ag.supervisor_allow_respawn).lower()}" + ) for h in cfg.known_hosts: lines.append("") lines.append("[[known_hosts]]") diff --git a/src/claire/web/app.py b/src/claire/web/app.py index 704d26e..b86bb2f 100644 --- a/src/claire/web/app.py +++ b/src/claire/web/app.py @@ -38,6 +38,7 @@ def create_app( config_path: Path | None = None, db_path: Path | None = None, sync_secret: Any = _USE_CONFIG, + peer_mode: bool = False, ) -> FastAPI: """Build a FastAPI app. @@ -48,12 +49,22 @@ def create_app( `sync_secret` overrides the per-config value; default sentinel means "read from config at request-validation time" (production behavior). Pass an explicit `None` or string to pin for tests. + + `peer_mode` builds a headless PEER NODE (the Linux `claire agent`): it + serves the `/api/v1/*` API (notably `/sync/*`) and runs the agent loops + (sync + supervisor + telemetry), but NEVER spawns/supervises the + orchestrator, runs rounds, or mounts the MCP server / SPA. Making it a + factory argument (not a config key) structurally guarantees a peer can't + accidentally become a second orchestrator. """ # Build the MCP sub-app eagerly so we can chain its lifespan into the # parent's — streamable HTTP relies on a task group started in that # lifespan, and Starlette's `mount()` does not invoke sub-app lifespans. - from ..orchestrator.mcp_server import asgi_app as mcp_asgi_app - mcp_app = mcp_asgi_app() + # Peer nodes don't serve MCP, so skip it entirely. + mcp_app = None + if not peer_mode: + from ..orchestrator.mcp_server import asgi_app as mcp_asgi_app + mcp_app = mcp_asgi_app() @asynccontextmanager async def lifespan(_app: FastAPI): @@ -67,7 +78,22 @@ def create_app( tmux dies mid-session (e.g. apricot tmux segfault). - Both are fire-and-forget; failures log a warning but never abort uvicorn — the chat UI surfaces "not configured" until recovery. + + In `peer_mode`, none of the above runs: a peer node only drives the + agent loops (sync + supervisor + telemetry) and never touches the + orchestrator lifecycle or the MCP sub-app. """ + if peer_mode: + from ..agent import start_agent_loops + + tasks = start_agent_loops(config_path=config_path, db_path=db_path) + try: + yield + finally: + for t in tasks: + t.cancel() + return + from ..orchestrator.bootstrap import ensure_running HEARTBEAT_SECONDS = 60 @@ -153,11 +179,12 @@ def create_app( from .chat import stream as chat_stream app.include_router(api.router) app.include_router(chat_stream.router) - # Claire MCP server — orchestrator's .mcp.json points here. - app.mount("/mcp", mcp_app, name="mcp") - # React SPA (catch-all). Order matters: mounted LAST so the explicit - # routers + /assets static mount take precedence on matching paths. - _mount_spa(app) + if not peer_mode: + # Claire MCP server — orchestrator's .mcp.json points here. + app.mount("/mcp", mcp_app, name="mcp") + # React SPA (catch-all). Order matters: mounted LAST so the explicit + # routers + /assets static mount take precedence on matching paths. + _mount_spa(app) return app diff --git a/tests/test_agent_config.py b/tests/test_agent_config.py new file mode 100644 index 0000000..db75e2c --- /dev/null +++ b/tests/test_agent_config.py @@ -0,0 +1,27 @@ +"""AgentConfig serialize/deserialize round-trip via the hand-rolled _serialize.""" + +from __future__ import annotations + +from pathlib import Path + +from claire.config import AgentConfig, ClaireConfig, _serialize, load_or_init + + +def test_agent_section_omitted_when_default() -> None: + cfg = ClaireConfig(machine_id="m") + assert "[agent]" not in _serialize(cfg) + + +def test_agent_config_roundtrip(tmp_path: Path) -> None: + p = tmp_path / "claire.toml" + cfg = load_or_init(p) # fresh: mints machine_id + sync_secret, writes file + mutated = cfg.model_copy( + update={"agent": AgentConfig(sync_interval_s=30, supervisor_allow_respawn=True)} + ) + p.write_text(_serialize(mutated), encoding="utf-8") + + reloaded = load_or_init(p) + assert "[agent]" in p.read_text() + assert reloaded.agent.sync_interval_s == 30 + assert reloaded.agent.supervisor_allow_respawn is True + assert reloaded.agent.port == 8766 # default preserved across round-trip diff --git a/tests/test_agent_peer_mode.py b/tests/test_agent_peer_mode.py new file mode 100644 index 0000000..20192cb --- /dev/null +++ b/tests/test_agent_peer_mode.py @@ -0,0 +1,41 @@ +"""Peer-mode app: serves the sync API, never becomes a second orchestrator. + +Structural guarantee — `create_app(peer_mode=True)` must serve /api/v1/* (incl. +/sync/*) but NOT mount MCP/SPA and NOT bootstrap the orchestrator. +""" + +from __future__ import annotations + +from pathlib import Path + +from fastapi.testclient import TestClient + +from claire.web.app import create_app + + +def test_peer_mode_serves_sync_but_not_orchestrator(tmp_path: Path, monkeypatch) -> None: + # Spy: if peer mode ever bootstrapped the orchestrator, this trips. + import claire.orchestrator.bootstrap as boot + + called = {"v": False} + + def _spy(**_kw): + called["v"] = True + return None + + monkeypatch.setattr(boot, "ensure_running", _spy) + + app = create_app( + config_path=tmp_path / "claire.toml", + db_path=tmp_path / "claire.db", + sync_secret=None, # disable HMAC so the route returns 200, not 401 + peer_mode=True, + ) + with TestClient(app) as client: # `with` runs the lifespan + assert client.get("/api/v1/health").status_code == 200 + # /sync/* IS served in peer mode (200 with auth disabled). + assert client.get("/api/v1/sync/cursor").status_code == 200 + # MCP + SPA are NOT mounted in peer mode. + assert client.get("/mcp/").status_code == 404 + + assert called["v"] is False # orchestrator never bootstrapped diff --git a/tests/test_db.py b/tests/test_db.py index 1ed971c..cb1065a 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -11,7 +11,7 @@ def test_migrate_is_idempotent() -> None: "0001_initial", "0002_chat", "0003_pm", "0004_fleet", "0003_pm_alter", "0005_session_liveness", "0006_project_org", "0007_usage", "0008_task_blocked_by", "0009_decisions", - "0010_role_clare_to_claire", + "0010_role_clare_to_claire", "0011_host_telemetry", ] assert second == [] # already applied