Wire the rounds timer to a pure-Python skip gate so claire-serve only wakes the orchestrator model when worker fleet state changed (not every tick): - web/rounds.py: fleet_fingerprint() over worker sessions (minus the orchestrator's own) + open tasks; should_skip_round() with heartbeat floor. - web/app.py: _rounds_loop tracks last fingerprint + consecutive skips. - excludes the orchestrator's own session/chat so a round's self-side-effects can't defeat the gate. Add scripts/release-fleet.sh (test -> deploy apricot+black -> restart plum services) and harden deploy-agent.sh's cosmetic status check against a SIGPIPE false-abort. 3 new discriminating tests; 349 pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
172 lines
6.3 KiB
Python
172 lines
6.3 KiB
Python
"""Deep health probe + on-demand rounds tick.
|
|
|
|
Covers the two endpoints the menu-bar watchdog/controls rely on:
|
|
1. `GET /api/v1/health/deep` — 200 on a writable DB, 503 when unwritable.
|
|
2. `POST /api/v1/rounds/tick` — 202 and schedules a real rounds turn.
|
|
|
|
The deep-health 503 path is exercised with a genuinely read-only SQLite
|
|
connection (not a mock), since detecting an unwritable DB is the whole point.
|
|
The rounds tick patches `post_rounds_turn` so the test stays hermetic — the
|
|
real one shells out to rclaude.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from claire.db import open_db
|
|
from claire.web.api import _db_write_probe
|
|
from claire.web.app import create_app
|
|
|
|
|
|
def _client(tmp_path: Path) -> TestClient:
|
|
return TestClient(
|
|
create_app(
|
|
config_path=tmp_path / "claire.toml",
|
|
db_path=tmp_path / "claire.db",
|
|
)
|
|
)
|
|
|
|
|
|
# --- deep health -----------------------------------------------------------
|
|
|
|
|
|
def test_health_deep_ok(tmp_path: Path) -> None:
|
|
c = _client(tmp_path)
|
|
r = c.get("/api/v1/health/deep")
|
|
assert r.status_code == 200
|
|
assert r.json()["status"] == "ok"
|
|
# Idempotent: the probe upserts a single row, so repeat calls stay 200.
|
|
assert c.get("/api/v1/health/deep").status_code == 200
|
|
|
|
|
|
def test_health_deep_503_when_unwritable(tmp_path: Path, monkeypatch) -> None:
|
|
c = _client(tmp_path)
|
|
assert c.get("/api/v1/health/deep").status_code == 200 # create the DB
|
|
|
|
def _boom(_conn: sqlite3.Connection) -> None:
|
|
raise sqlite3.OperationalError("disk I/O error")
|
|
|
|
monkeypatch.setattr("claire.web.api._db_write_probe", _boom)
|
|
r = c.get("/api/v1/health/deep")
|
|
assert r.status_code == 503
|
|
assert "db unwritable" in r.json()["detail"]
|
|
|
|
|
|
def test_db_write_probe_raises_on_readonly_db(tmp_path: Path) -> None:
|
|
"""The probe must fail on a genuinely unwritable connection."""
|
|
db = tmp_path / "claire.db"
|
|
open_db(db).close() # create the file
|
|
ro = sqlite3.connect(f"file:{db}?mode=ro", uri=True)
|
|
try:
|
|
with pytest.raises(sqlite3.Error):
|
|
_db_write_probe(ro)
|
|
finally:
|
|
ro.close()
|
|
|
|
|
|
# --- rounds tick -----------------------------------------------------------
|
|
|
|
|
|
def test_rounds_tick_schedules_turn(tmp_path: Path, monkeypatch) -> None:
|
|
calls: list[tuple] = []
|
|
monkeypatch.setattr(
|
|
"claire.web.rounds.post_rounds_turn",
|
|
lambda conn, gen, cfg: calls.append((conn, gen, cfg)),
|
|
)
|
|
c = _client(tmp_path)
|
|
r = c.post("/api/v1/rounds/tick")
|
|
assert r.status_code == 202
|
|
assert r.json()["status"] == "scheduled"
|
|
# TestClient runs background tasks before returning, so the turn posted.
|
|
assert len(calls) == 1
|
|
|
|
|
|
# --- rounds skip gate (fingerprint) ----------------------------------------
|
|
|
|
import uuid as _uuid # noqa: E402
|
|
|
|
from claire.db import migrate # noqa: E402
|
|
from claire.hlc import HLCGenerator # noqa: E402
|
|
from claire.orchestrator.tools import report_status # noqa: E402
|
|
from claire.web.rounds import fleet_fingerprint, should_skip_round # noqa: E402
|
|
|
|
_MACHINE = "00000000-0000-0000-0000-0000000000aa"
|
|
|
|
|
|
def _seed_db(tmp_path: Path):
|
|
conn = open_db(tmp_path / "claire.db")
|
|
migrate(conn)
|
|
gen = HLCGenerator(_MACHINE)
|
|
hlc = lambda: str(gen.tick()) # noqa: E731
|
|
conn.execute(
|
|
"INSERT INTO projects(id,name,created_hlc,updated_hlc) VALUES(?,?,?,?)",
|
|
("p1", "proj", hlc(), hlc()),
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO tasks(id,project_id,title,status,priority,created_hlc,"
|
|
"updated_hlc) VALUES(?,?,?,?,?,?,?)",
|
|
("t1", "p1", "work", "in_progress", 2, hlc(), hlc()),
|
|
)
|
|
worker = str(_uuid.uuid4())
|
|
orch = str(_uuid.uuid4())
|
|
for sid, host in ((worker, "apricot"), (orch, "local")):
|
|
conn.execute(
|
|
"INSERT INTO sessions(uuid,host,cwd,tmux_name,updated_hlc,liveness)"
|
|
" VALUES(?,?,?,?,?,?)",
|
|
(sid, host, f"/cwd/{host}", host, hlc(), "alive"),
|
|
)
|
|
report_status(conn, gen, session_uuid=worker, summary="worker booting")
|
|
conn.commit()
|
|
return conn, gen, worker, orch
|
|
|
|
|
|
def test_fingerprint_ignores_orchestrators_own_round_side_effects(
|
|
tmp_path: Path,
|
|
) -> None:
|
|
"""A round mutates the orchestrator's OWN session/status every tick. If the
|
|
fingerprint folded that in, the skip gate would never engage. It must not."""
|
|
conn, gen, _worker, orch = _seed_db(tmp_path)
|
|
fp1 = fleet_fingerprint(conn, orch)
|
|
|
|
# Simulate the round's own side effects: the orchestrator reports its own
|
|
# status (as it does every single round, with a fresh summary each time).
|
|
report_status(conn, gen, session_uuid=orch, summary="Rounds tick (1st)")
|
|
report_status(conn, gen, session_uuid=orch, summary="Rounds tick (2nd)…")
|
|
fp2 = fleet_fingerprint(conn, orch)
|
|
|
|
assert fp2 == fp1, "orchestrator self-status must not change the fingerprint"
|
|
# And the gate would therefore skip the next tick (under the floor).
|
|
assert should_skip_round(fp1, fp2, 0, enabled=True, heartbeat_every=6)
|
|
|
|
|
|
def test_fingerprint_changes_on_real_worker_movement(tmp_path: Path) -> None:
|
|
conn, gen, worker, orch = _seed_db(tmp_path)
|
|
fp1 = fleet_fingerprint(conn, orch)
|
|
|
|
# A worker pushing new status is real fleet movement → fingerprint changes.
|
|
report_status(conn, gen, session_uuid=worker, summary="now compiling")
|
|
fp_worker = fleet_fingerprint(conn, orch)
|
|
assert fp_worker != fp1
|
|
assert not should_skip_round(fp1, fp_worker, 0, enabled=True, heartbeat_every=6)
|
|
|
|
# A task leaving the open set (→ done) is also real movement.
|
|
conn.execute("UPDATE tasks SET status='done' WHERE id='t1'")
|
|
conn.commit()
|
|
assert fleet_fingerprint(conn, orch) != fp_worker
|
|
|
|
|
|
def test_should_skip_round_floor_and_toggle() -> None:
|
|
# Unchanged + enabled + under floor → skip.
|
|
assert should_skip_round("a", "a", 0, enabled=True, heartbeat_every=3)
|
|
assert should_skip_round("a", "a", 1, enabled=True, heartbeat_every=3)
|
|
# Hitting the floor forces a round (no skip), so the HUD never goes silent.
|
|
assert not should_skip_round("a", "a", 2, enabled=True, heartbeat_every=3)
|
|
# Any change forces a round.
|
|
assert not should_skip_round("a", "b", 0, enabled=True, heartbeat_every=3)
|
|
# Gate off → always post.
|
|
assert not should_skip_round("a", "a", 0, enabled=False, heartbeat_every=99)
|