feat(@projects/@claire): add headless peer sync agent loops

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-05-31 18:15:59 -06:00
parent bda5dad83d
commit 3057182717
8 changed files with 261 additions and 8 deletions

View file

@ -0,0 +1,30 @@
"""Headless per-host agent — the Linux counterpart to the macOS tray.
Runs inside `create_app(peer_mode=True)` on apricot/black as a `systemd --user`
service. It drives background loops (sync now; supervisor + telemetry in a
later phase) but never touches the orchestrator lifecycle.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
def start_agent_loops(
*, config_path: Path | None, db_path: Path | None
) -> list[asyncio.Task]:
"""Start the agent's background loops; return them so the lifespan can
cancel them on shutdown.
Phase 1: scheduled peer sync only. Supervisor + telemetry loops are
added here in a later phase.
"""
from .sync_loop import sync_loop
return [
asyncio.create_task(sync_loop(config_path=config_path, db_path=db_path)),
]
__all__ = ["start_agent_loops"]

View file

@ -0,0 +1,67 @@
"""Scheduled peer-sync loop for the headless `claire agent`.
Push B's substrate (`sync.sync_peer`, `/api/v1/sync/*`) is one-shot via the
`claire sync` CLI. The agent adds the missing *scheduled* loop: every
`agent.sync_interval_s`, converge with each configured peer. Star topology
peer hosts (apricot/black) sync only to plum, which fans changes out; so each
peer's `[[peers]]` lists plum, and apricot↔black converge in two hops.
"""
from __future__ import annotations
import asyncio
import logging
import time
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
async def sync_loop(*, config_path: Path | None, db_path: Path | None) -> None:
from ..config import load_or_init
from ..sync import sync_peer
cfg = load_or_init(config_path)
interval = cfg.agent.sync_interval_s
# The agent serves its own API on loopback:agent.port; sync_peer POSTs
# ingested events back to ourselves and reads our own cursor there.
local_base = f"http://127.0.0.1:{cfg.agent.port}"
if not cfg.peers:
logger.warning(
"agent sync loop: no [[peers]] configured — nothing to sync (idle)"
)
logger.info(
"agent sync loop enabled (every %ds, %d peer(s))", interval, len(cfg.peers)
)
while True:
try:
await asyncio.sleep(interval)
# Reload so peer/secret/interval edits apply without a restart.
cfg = load_or_init(config_path)
for peer in cfg.peers:
try:
result = await asyncio.to_thread(
sync_peer,
peer,
local_base=local_base,
local_secret=cfg.sync_secret,
)
logger.info(
"sync %s: pulled=%d pushed=%d",
peer.url, result.pulled, result.pushed,
)
except Exception as exc: # noqa: BLE001
# A 401 here almost always means clock skew vs the HMAC
# window (SIG_MAX_SKEW_SEC) — log local time so it's
# diagnosable. Other failures (peer down) are transient.
logger.warning(
"sync %s failed (local time %s, epoch %.0f): %s",
peer.url,
datetime.now(timezone.utc).isoformat(),
time.time(),
exc,
)
except asyncio.CancelledError:
return

View file

@ -27,9 +27,11 @@ app = typer.Typer(help="Claire — project manager for the Claude agent fleet.",
project_app = typer.Typer(help="Manage projects.")
task_app = typer.Typer(help="Manage tasks.")
orchestrator_app = typer.Typer(help="Manage the orchestrator Claude session.")
agent_app = typer.Typer(help="Headless peer-node daemon (Linux worker hosts).")
app.add_typer(project_app, name="project")
app.add_typer(task_app, name="task")
app.add_typer(orchestrator_app, name="orchestrator")
app.add_typer(agent_app, name="agent")
console = Console()
@ -122,6 +124,32 @@ def web(
serve(host=host, port=port, no_build=no_build)
@agent_app.command("run")
def agent_run(
host: Annotated[str, typer.Option("--host")] = "127.0.0.1",
port: Annotated[int | None, typer.Option("--port")] = None,
) -> None:
"""Run the headless peer-node daemon (peer mode — no orchestrator/SPA/MCP).
Serves /api/v1/* (incl. /sync/*) on loopback + drives the agent loops
(sync now; supervisor + telemetry later). Deployed as a systemd --user
unit on apricot/black.
"""
import uvicorn
from .web.app import create_app
cfg = load_or_init()
bind_port = port or cfg.agent.port
console.print(
f"[bold]claire agent[/bold] (peer mode) on http://{host}:{bind_port}"
f"{len(cfg.peers)} peer(s), sync every {cfg.agent.sync_interval_s}s"
)
uvicorn.run(
create_app(peer_mode=True), host=host, port=bind_port, log_level="info"
)
# ---------------------------------------------------------------------------
# SPA build management
# ---------------------------------------------------------------------------

View file

@ -51,6 +51,25 @@ class WebConfig(_Strict):
port: int = Field(default=8765, ge=1, le=65535)
class AgentConfig(_Strict):
"""Settings for the headless Linux peer-node daemon (`claire agent run`).
Only consumed on apricot/black; harmless defaults on plum (the agent
isn't run there). The daemon binds loopback on `port`, syncs with each
configured peer every `sync_interval_s`, samples host telemetry every
`telemetry_interval_s`, and supervises local worker sessions (a session
with a live tmux pane but JSONL mtime older than `wedge_threshold_s` is
"wedged"). Respawn of wedged sessions stays OFF unless explicitly enabled.
"""
port: int = Field(default=8766, ge=1, le=65535)
sync_interval_s: int = Field(default=120, ge=10, le=86400)
telemetry_interval_s: int = Field(default=60, ge=10, le=3600)
wedge_threshold_s: int = Field(default=300, ge=60, le=7200)
supervisor_enable: bool = True
supervisor_allow_respawn: bool = False
class OrchestratorConfig(_Strict):
"""Settings for the live Claude session backing the orchestrator chat.
@ -153,6 +172,7 @@ class ClaireConfig(_Strict):
orchestrator: OrchestratorConfig = Field(default_factory=OrchestratorConfig)
budget: BudgetConfig = Field(default_factory=BudgetConfig)
limits: LimitsConfig = Field(default_factory=LimitsConfig)
agent: AgentConfig = Field(default_factory=AgentConfig)
# The canonical label this machine identifies as in dispatch + session
# records. `None` falls back to the short form of `socket.gethostname()`
# (first dot-segment, lowercased). Set explicitly to override (e.g. on a
@ -287,6 +307,19 @@ def _serialize(cfg: ClaireConfig) -> str:
f'"{h}" = {c}' for h, c in sorted(lim.per_host.items())
)
lines.append(f"per_host = {{ {inner} }}")
# Emit [agent] only when it differs from defaults (peer hosts only).
ag = cfg.agent
if ag != AgentConfig():
lines.append("")
lines.append("[agent]")
lines.append(f"port = {ag.port}")
lines.append(f"sync_interval_s = {ag.sync_interval_s}")
lines.append(f"telemetry_interval_s = {ag.telemetry_interval_s}")
lines.append(f"wedge_threshold_s = {ag.wedge_threshold_s}")
lines.append(f"supervisor_enable = {str(ag.supervisor_enable).lower()}")
lines.append(
f"supervisor_allow_respawn = {str(ag.supervisor_allow_respawn).lower()}"
)
for h in cfg.known_hosts:
lines.append("")
lines.append("[[known_hosts]]")

View file

@ -38,6 +38,7 @@ def create_app(
config_path: Path | None = None,
db_path: Path | None = None,
sync_secret: Any = _USE_CONFIG,
peer_mode: bool = False,
) -> FastAPI:
"""Build a FastAPI app.
@ -48,12 +49,22 @@ def create_app(
`sync_secret` overrides the per-config value; default sentinel means
"read from config at request-validation time" (production behavior).
Pass an explicit `None` or string to pin for tests.
`peer_mode` builds a headless PEER NODE (the Linux `claire agent`): it
serves the `/api/v1/*` API (notably `/sync/*`) and runs the agent loops
(sync + supervisor + telemetry), but NEVER spawns/supervises the
orchestrator, runs rounds, or mounts the MCP server / SPA. Making it a
factory argument (not a config key) structurally guarantees a peer can't
accidentally become a second orchestrator.
"""
# Build the MCP sub-app eagerly so we can chain its lifespan into the
# parent's — streamable HTTP relies on a task group started in that
# lifespan, and Starlette's `mount()` does not invoke sub-app lifespans.
from ..orchestrator.mcp_server import asgi_app as mcp_asgi_app
mcp_app = mcp_asgi_app()
# Peer nodes don't serve MCP, so skip it entirely.
mcp_app = None
if not peer_mode:
from ..orchestrator.mcp_server import asgi_app as mcp_asgi_app
mcp_app = mcp_asgi_app()
@asynccontextmanager
async def lifespan(_app: FastAPI):
@ -67,7 +78,22 @@ def create_app(
tmux dies mid-session (e.g. apricot tmux segfault).
- Both are fire-and-forget; failures log a warning but never abort
uvicorn the chat UI surfaces "not configured" until recovery.
In `peer_mode`, none of the above runs: a peer node only drives the
agent loops (sync + supervisor + telemetry) and never touches the
orchestrator lifecycle or the MCP sub-app.
"""
if peer_mode:
from ..agent import start_agent_loops
tasks = start_agent_loops(config_path=config_path, db_path=db_path)
try:
yield
finally:
for t in tasks:
t.cancel()
return
from ..orchestrator.bootstrap import ensure_running
HEARTBEAT_SECONDS = 60
@ -153,11 +179,12 @@ def create_app(
from .chat import stream as chat_stream
app.include_router(api.router)
app.include_router(chat_stream.router)
# Claire MCP server — orchestrator's .mcp.json points here.
app.mount("/mcp", mcp_app, name="mcp")
# React SPA (catch-all). Order matters: mounted LAST so the explicit
# routers + /assets static mount take precedence on matching paths.
_mount_spa(app)
if not peer_mode:
# Claire MCP server — orchestrator's .mcp.json points here.
app.mount("/mcp", mcp_app, name="mcp")
# React SPA (catch-all). Order matters: mounted LAST so the explicit
# routers + /assets static mount take precedence on matching paths.
_mount_spa(app)
return app

View file

@ -0,0 +1,27 @@
"""AgentConfig serialize/deserialize round-trip via the hand-rolled _serialize."""
from __future__ import annotations
from pathlib import Path
from claire.config import AgentConfig, ClaireConfig, _serialize, load_or_init
def test_agent_section_omitted_when_default() -> None:
cfg = ClaireConfig(machine_id="m")
assert "[agent]" not in _serialize(cfg)
def test_agent_config_roundtrip(tmp_path: Path) -> None:
p = tmp_path / "claire.toml"
cfg = load_or_init(p) # fresh: mints machine_id + sync_secret, writes file
mutated = cfg.model_copy(
update={"agent": AgentConfig(sync_interval_s=30, supervisor_allow_respawn=True)}
)
p.write_text(_serialize(mutated), encoding="utf-8")
reloaded = load_or_init(p)
assert "[agent]" in p.read_text()
assert reloaded.agent.sync_interval_s == 30
assert reloaded.agent.supervisor_allow_respawn is True
assert reloaded.agent.port == 8766 # default preserved across round-trip

View file

@ -0,0 +1,41 @@
"""Peer-mode app: serves the sync API, never becomes a second orchestrator.
Structural guarantee `create_app(peer_mode=True)` must serve /api/v1/* (incl.
/sync/*) but NOT mount MCP/SPA and NOT bootstrap the orchestrator.
"""
from __future__ import annotations
from pathlib import Path
from fastapi.testclient import TestClient
from claire.web.app import create_app
def test_peer_mode_serves_sync_but_not_orchestrator(tmp_path: Path, monkeypatch) -> None:
# Spy: if peer mode ever bootstrapped the orchestrator, this trips.
import claire.orchestrator.bootstrap as boot
called = {"v": False}
def _spy(**_kw):
called["v"] = True
return None
monkeypatch.setattr(boot, "ensure_running", _spy)
app = create_app(
config_path=tmp_path / "claire.toml",
db_path=tmp_path / "claire.db",
sync_secret=None, # disable HMAC so the route returns 200, not 401
peer_mode=True,
)
with TestClient(app) as client: # `with` runs the lifespan
assert client.get("/api/v1/health").status_code == 200
# /sync/* IS served in peer mode (200 with auth disabled).
assert client.get("/api/v1/sync/cursor").status_code == 200
# MCP + SPA are NOT mounted in peer mode.
assert client.get("/mcp/").status_code == 404
assert called["v"] is False # orchestrator never bootstrapped

View file

@ -11,7 +11,7 @@ def test_migrate_is_idempotent() -> None:
"0001_initial", "0002_chat", "0003_pm", "0004_fleet",
"0003_pm_alter", "0005_session_liveness", "0006_project_org",
"0007_usage", "0008_task_blocked_by", "0009_decisions",
"0010_role_clare_to_claire",
"0010_role_clare_to_claire", "0011_host_telemetry",
]
assert second == [] # already applied