feat(@projects/@claire): add peer sync, supervisor, and telemetry loops

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-05-31 18:38:10 -06:00
parent 9b3bf1a607
commit c557774b57
4 changed files with 146 additions and 5 deletions

View file

@ -48,8 +48,8 @@ rsync -az --delete \
--exclude='src/claire/web/app/dist/' \
"$SRC/" "${HOST}:${REMOTE_DIR}/"
say "[$HOST] install (uv) + init"
remote-run "$HOST" "cd ~/$REMOTE_DIR && { [ -d .venv ] || uv venv; } && uv pip install -e . && .venv/bin/claire init"
say "[$HOST] install (uv if present, else python venv+pip) + init"
remote-run "$HOST" "export PATH=\"\$HOME/.local/bin:\$PATH\"; cd ~/$REMOTE_DIR && if command -v uv >/dev/null 2>&1; then { [ -d .venv ] || uv venv; }; uv pip install -e .; else { [ -d .venv ] || python3 -m venv .venv; }; .venv/bin/pip install -q -e .; fi && .venv/bin/claire init"
say "[$HOST] configure peer (idempotent — points this host at plum)"
remote-run "$HOST" "cd ~/$REMOTE_DIR && .venv/bin/claire agent add-peer --url '$PLUM_URL' --secret '$PLUM_SECRET'"
@ -59,7 +59,9 @@ remote-run "$HOST" "
mkdir -p ~/.config/systemd/user
cp ~/$REMOTE_DIR/deployments/systemd/claire-agent.service ~/.config/systemd/user/
systemctl --user daemon-reload
systemctl --user enable --now claire-agent.service
systemctl --user enable claire-agent.service
# restart (not just enable --now) so a redeploy actually loads the new code.
systemctl --user restart claire-agent.service
loginctl enable-linger \$(whoami) 2>/dev/null || true
sleep 2
systemctl --user --no-pager status claire-agent.service | head -5

View file

@ -17,13 +17,17 @@ def start_agent_loops(
"""Start the agent's background loops; return them so the lifespan can
cancel them on shutdown.
Phase 1: scheduled peer sync only. Supervisor + telemetry loops are
added here in a later phase.
Runs the sync, supervisor, and telemetry loops. The supervisor self-
disables when `agent.supervisor_enable` is false.
"""
from .supervisor import supervisor_loop
from .sync_loop import sync_loop
from .telemetry import telemetry_loop
return [
asyncio.create_task(sync_loop(config_path=config_path, db_path=db_path)),
asyncio.create_task(supervisor_loop(config_path=config_path, db_path=db_path)),
asyncio.create_task(telemetry_loop(config_path=config_path, db_path=db_path)),
]

View file

@ -170,6 +170,79 @@ def agent_add_peer(
console.print(f"[green]✓[/green] peers: {[p.url for p in peers]}")
@agent_app.command("status")
def agent_status() -> None:
"""Local-DB HUD for this peer node — works OFFLINE (reads the synced DB
directly, no running server needed)."""
from rich.table import Table
from . import events as ev
from .db import migrate, open_db
cfg = load_or_init()
conn = open_db()
migrate(conn)
host = cfg.this_host_label()
cursors = ev.cursors_by_source(conn)
live = dict(
conn.execute(
"SELECT liveness, COUNT(*) FROM sessions GROUP BY liveness"
).fetchall()
)
tel = conn.execute(
"SELECT * FROM host_telemetry WHERE host = ?", (host,)
).fetchone()
conn.close()
t = Table(title=f"claire agent — {host}", show_header=False)
t.add_row("sync sources", str(len(cursors)) + " machine(s)")
t.add_row("peers", ", ".join(p.url for p in cfg.peers) or "(none)")
t.add_row(
"sessions",
", ".join(f"{k}={v}" for k, v in sorted(live.items())) or "(none observed)",
)
if tel is not None:
gb = 1024 ** 3
t.add_row("cpu", f"{tel['cpu_percent']:.0f}%")
t.add_row("load", f"{tel['load_1']:.2f} {tel['load_5']:.2f} {tel['load_15']:.2f}")
t.add_row("mem", f"{tel['mem_used_bytes']/gb:.1f} / {tel['mem_total_bytes']/gb:.1f} GiB")
t.add_row("disk", f"{tel['disk_used_bytes']/gb:.0f} / {tel['disk_total_bytes']/gb:.0f} GiB")
else:
t.add_row("telemetry", "(no sample yet)")
console.print(t)
@agent_app.command("prune-telemetry")
def agent_prune_telemetry(
before: Annotated[
str, typer.Option("--before", help="ISO date (YYYY-MM-DD, UTC); delete older telemetry events")
],
) -> None:
"""Delete host-telemetry events older than `--before`. The projection is
last-write-wins, so old samples carry no value; this keeps the event log lean."""
from datetime import datetime, timezone
from .db import migrate, open_db
try:
cutoff_ms = int(
datetime.fromisoformat(before).replace(tzinfo=timezone.utc).timestamp() * 1000
)
except ValueError as exc:
raise typer.BadParameter(f"--before must be ISO (YYYY-MM-DD): {exc}") from exc
conn = open_db()
migrate(conn)
cur = conn.execute(
"DELETE FROM events WHERE event_type = 'host_telemetry_reported' "
"AND CAST(substr(hlc, 1, instr(hlc, '.') - 1) AS INTEGER) < ?",
(cutoff_ms,),
)
conn.close()
console.print(f"[green]✓[/green] pruned {cur.rowcount} telemetry event(s) before {before}")
# ---------------------------------------------------------------------------
# SPA build management
# ---------------------------------------------------------------------------

62
tests/test_supervisor.py Normal file
View file

@ -0,0 +1,62 @@
"""Pure classification logic for the agent supervisor — no rclaude/process."""
from __future__ import annotations
from uuid import uuid4
from claire.agent.supervisor import detect_wedged_and_orphaned
from claire.rclaude import SessionRow, TmuxRow
NOW = 1_000_000.0
def _sess(uuid, *, host="local", age_s=0):
return SessionRow(host=host, uuid=uuid, snippet="", cwd="/x", mtime_epoch=int(NOW) - age_s)
def _tmux(resumed_uuid, *, host="local", name="claude-x-1"):
return TmuxRow(host=host, session_name=name, detail="1 windows", resumed_uuid=resumed_uuid)
def test_wedged_when_live_pane_and_stale_mtime():
u = uuid4()
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, age_s=400)], [_tmux(u)], wedge_threshold_s=300, now=NOW
)
assert [s.uuid for s in wedged] == [u]
assert orphaned == []
def test_not_wedged_when_fresh():
u = uuid4()
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, age_s=10)], [_tmux(u)], wedge_threshold_s=300, now=NOW
)
assert wedged == [] and orphaned == []
def test_orphaned_when_no_live_pane():
u = uuid4()
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, age_s=9999)], [], wedge_threshold_s=300, now=NOW
)
assert wedged == []
assert [s.uuid for s in orphaned] == [u]
def test_remote_sessions_not_supervised():
u = uuid4() # host != "local" → another machine's session, skip it
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, host="apricot", age_s=9999)], [], wedge_threshold_s=300, now=NOW
)
assert wedged == [] and orphaned == []
def test_no_resumed_uuid_means_no_wedge_classification():
# Older rclaude omits resumed_uuid → can't correlate → never act blind.
u = uuid4()
wedged, orphaned = detect_wedged_and_orphaned(
[_sess(u, age_s=9999)], [_tmux(None)], wedge_threshold_s=300, now=NOW
)
assert wedged == [] # not classified wedged without correlation
assert [s.uuid for s in orphaned] == [u] # no live pane matched → orphaned