diff --git a/src/claire/config.py b/src/claire/config.py index f083bf3..a1c9d85 100644 --- a/src/claire/config.py +++ b/src/claire/config.py @@ -130,6 +130,19 @@ class OrchestratorConfig(_Strict): # the fleet, identifies blockers, and surfaces next-step recommendations # — closing the loop on "reduce required user interaction." 0 disables. rounds_interval_s: int = Field(default=0, ge=0, le=86400) + # Cost gate for the rounds timer. When True (default), the loop computes a + # cheap pure-Python fingerprint of *worker* fleet state (live sessions minus + # the orchestrator's own, + open task states) each tick and SKIPS posting a + # DO-ROUNDS turn — i.e. never wakes the (expensive) orchestrator model — + # while nothing has changed. The orchestrator's own session/chat is excluded + # from the fingerprint so a round's own side effects can't keep it "changed" + # forever. Set False to post every tick unconditionally (legacy behavior). + rounds_skip_unchanged: bool = True + # Max-staleness floor for the skip gate: force a real round at least once + # every N ticks even when the fingerprint is unchanged, so the HUD/heartbeat + # never goes fully silent. With rounds_interval_s=900 (15m), 6 ⇒ a forced + # round at least every 90m. Ignored when rounds_skip_unchanged is False. + rounds_heartbeat_every: int = Field(default=6, ge=1, le=1000) # When False (default), Claire's rounds loop only *proposes* dispatch — # spawning a session is a real external action, so it waits for the # user. Set True to let Claire dispatch eligible work autonomously diff --git a/src/claire/web/app.py b/src/claire/web/app.py index 45db448..af31656 100644 --- a/src/claire/web/app.py +++ b/src/claire/web/app.py @@ -170,23 +170,53 @@ def create_app( from ..config import load_or_init from ..db import migrate, open_db from ..hlc import HLCGenerator - from .rounds import post_rounds_turn + from .rounds import fleet_fingerprint, post_rounds_turn - cfg = load_or_init(config_path) - interval = cfg.orchestrator.rounds_interval_s + interval = load_or_init(config_path).orchestrator.rounds_interval_s if interval <= 0: return # disabled logger.info("orchestrator rounds loop enabled (every %ds)", interval) + last_fp: str | None = None + skipped = 0 while True: try: await asyncio.sleep(interval) - def _post() -> None: + + def _tick(prev_fp: str | None, skips: int) -> tuple[bool, str]: + # Reload cfg each tick so a newly-discovered orchestrator + # session_uuid (excluded from the fingerprint) and any + # gate retune take effect without a server restart. + cfg = load_or_init(config_path) conn = open_db(db_path) - migrate(conn) - gen = HLCGenerator(cfg.machine_id) - post_rounds_turn(conn, gen, cfg) - conn.close() - await asyncio.to_thread(_post) + try: + migrate(conn) + fp = fleet_fingerprint( + conn, cfg.orchestrator.session_uuid + ) + gate = cfg.orchestrator.rounds_skip_unchanged + floor = cfg.orchestrator.rounds_heartbeat_every + # Skip (don't wake the model) only while unchanged + # AND under the max-staleness floor. + if gate and fp == prev_fp and skips + 1 < floor: + return (False, fp) + gen = HLCGenerator(cfg.machine_id) + post_rounds_turn(conn, gen, cfg) + return (True, fp) + finally: + conn.close() + + posted, last_fp = await asyncio.to_thread( + _tick, last_fp, skipped + ) + if posted: + skipped = 0 + else: + skipped += 1 + logger.debug( + "rounds tick skipped (fleet unchanged, %d/%d)", + skipped, + load_or_init(config_path).orchestrator.rounds_heartbeat_every, + ) except asyncio.CancelledError: return except Exception as exc: # noqa: BLE001 diff --git a/src/claire/web/rounds.py b/src/claire/web/rounds.py index c216926..de9c3c1 100644 --- a/src/claire/web/rounds.py +++ b/src/claire/web/rounds.py @@ -8,6 +8,7 @@ prompt lives here once rather than being duplicated. from __future__ import annotations +import hashlib import sqlite3 from ..config import ClaireConfig @@ -15,6 +16,55 @@ from ..domain import ChatScope from ..hlc import HLCGenerator +def fleet_fingerprint( + conn: sqlite3.Connection, orchestrator_uuid: str | None +) -> str: + """A cheap, deterministic digest of the *worker* fleet state a round reacts to. + + Used by the rounds timer to skip waking the (expensive) orchestrator model + when nothing has changed since the last round. Covers exactly the inputs a + round acts on: + • live worker sessions — (uuid, triage_status, pushed state/summary/source, + task_id) — i.e. the `list_fleet` rows; + • open (non-`done`) task states — (id, status). + + Deliberately EXCLUDES the orchestrator's own session and its agent_status, + and the orchestrator chat scope entirely. A round mutates exactly those + (it calls `report_status` for itself and writes its prompt+reply into the + orchestrator scope every tick); folding them in would make the fingerprint + change on every round and the skip gate would never engage. A user turn + wakes the orchestrator on its own `[turn:…]` path, independent of this loop, + so user chat need not be fingerprinted either. + """ + orch = orchestrator_uuid or "" + session_rows = conn.execute( + """ + SELECT s.uuid, s.last_triage_status, + ag.state, ag.summary, ag.source, ag.task_id + FROM sessions s + LEFT JOIN agent_status ag ON ag.session_uuid = s.uuid + WHERE s.liveness = 'alive' AND s.uuid != ? + ORDER BY s.uuid + """, + (orch,), + ).fetchall() + task_rows = conn.execute( + """ + SELECT id, status FROM tasks + WHERE status != 'done' + ORDER BY id + """ + ).fetchall() + + h = hashlib.sha256() + for r in session_rows: + h.update(repr(tuple(r)).encode()) + h.update(b"\x00tasks\x00") + for r in task_rows: + h.update(repr(tuple(r)).encode()) + return h.hexdigest() + + def build_rounds_prompt(cfg: ClaireConfig) -> str: """The structured DO-ROUNDS prompt posted to the orchestrator.