feat(@projects/@claire): ✨ add rounds skip logic and heartbeat control
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
39ccf5a9f5
commit
8d82bb0abc
3 changed files with 102 additions and 9 deletions
|
|
@ -130,6 +130,19 @@ class OrchestratorConfig(_Strict):
|
|||
# the fleet, identifies blockers, and surfaces next-step recommendations
|
||||
# — closing the loop on "reduce required user interaction." 0 disables.
|
||||
rounds_interval_s: int = Field(default=0, ge=0, le=86400)
|
||||
# Cost gate for the rounds timer. When True (default), the loop computes a
|
||||
# cheap pure-Python fingerprint of *worker* fleet state (live sessions minus
|
||||
# the orchestrator's own, + open task states) each tick and SKIPS posting a
|
||||
# DO-ROUNDS turn — i.e. never wakes the (expensive) orchestrator model —
|
||||
# while nothing has changed. The orchestrator's own session/chat is excluded
|
||||
# from the fingerprint so a round's own side effects can't keep it "changed"
|
||||
# forever. Set False to post every tick unconditionally (legacy behavior).
|
||||
rounds_skip_unchanged: bool = True
|
||||
# Max-staleness floor for the skip gate: force a real round at least once
|
||||
# every N ticks even when the fingerprint is unchanged, so the HUD/heartbeat
|
||||
# never goes fully silent. With rounds_interval_s=900 (15m), 6 ⇒ a forced
|
||||
# round at least every 90m. Ignored when rounds_skip_unchanged is False.
|
||||
rounds_heartbeat_every: int = Field(default=6, ge=1, le=1000)
|
||||
# When False (default), Claire's rounds loop only *proposes* dispatch —
|
||||
# spawning a session is a real external action, so it waits for the
|
||||
# user. Set True to let Claire dispatch eligible work autonomously
|
||||
|
|
|
|||
|
|
@ -170,23 +170,53 @@ def create_app(
|
|||
from ..config import load_or_init
|
||||
from ..db import migrate, open_db
|
||||
from ..hlc import HLCGenerator
|
||||
from .rounds import post_rounds_turn
|
||||
from .rounds import fleet_fingerprint, post_rounds_turn
|
||||
|
||||
cfg = load_or_init(config_path)
|
||||
interval = cfg.orchestrator.rounds_interval_s
|
||||
interval = load_or_init(config_path).orchestrator.rounds_interval_s
|
||||
if interval <= 0:
|
||||
return # disabled
|
||||
logger.info("orchestrator rounds loop enabled (every %ds)", interval)
|
||||
last_fp: str | None = None
|
||||
skipped = 0
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(interval)
|
||||
def _post() -> None:
|
||||
|
||||
def _tick(prev_fp: str | None, skips: int) -> tuple[bool, str]:
|
||||
# Reload cfg each tick so a newly-discovered orchestrator
|
||||
# session_uuid (excluded from the fingerprint) and any
|
||||
# gate retune take effect without a server restart.
|
||||
cfg = load_or_init(config_path)
|
||||
conn = open_db(db_path)
|
||||
migrate(conn)
|
||||
gen = HLCGenerator(cfg.machine_id)
|
||||
post_rounds_turn(conn, gen, cfg)
|
||||
conn.close()
|
||||
await asyncio.to_thread(_post)
|
||||
try:
|
||||
migrate(conn)
|
||||
fp = fleet_fingerprint(
|
||||
conn, cfg.orchestrator.session_uuid
|
||||
)
|
||||
gate = cfg.orchestrator.rounds_skip_unchanged
|
||||
floor = cfg.orchestrator.rounds_heartbeat_every
|
||||
# Skip (don't wake the model) only while unchanged
|
||||
# AND under the max-staleness floor.
|
||||
if gate and fp == prev_fp and skips + 1 < floor:
|
||||
return (False, fp)
|
||||
gen = HLCGenerator(cfg.machine_id)
|
||||
post_rounds_turn(conn, gen, cfg)
|
||||
return (True, fp)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
posted, last_fp = await asyncio.to_thread(
|
||||
_tick, last_fp, skipped
|
||||
)
|
||||
if posted:
|
||||
skipped = 0
|
||||
else:
|
||||
skipped += 1
|
||||
logger.debug(
|
||||
"rounds tick skipped (fleet unchanged, %d/%d)",
|
||||
skipped,
|
||||
load_or_init(config_path).orchestrator.rounds_heartbeat_every,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
except Exception as exc: # noqa: BLE001
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ prompt lives here once rather than being duplicated.
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import sqlite3
|
||||
|
||||
from ..config import ClaireConfig
|
||||
|
|
@ -15,6 +16,55 @@ from ..domain import ChatScope
|
|||
from ..hlc import HLCGenerator
|
||||
|
||||
|
||||
def fleet_fingerprint(
|
||||
conn: sqlite3.Connection, orchestrator_uuid: str | None
|
||||
) -> str:
|
||||
"""A cheap, deterministic digest of the *worker* fleet state a round reacts to.
|
||||
|
||||
Used by the rounds timer to skip waking the (expensive) orchestrator model
|
||||
when nothing has changed since the last round. Covers exactly the inputs a
|
||||
round acts on:
|
||||
• live worker sessions — (uuid, triage_status, pushed state/summary/source,
|
||||
task_id) — i.e. the `list_fleet` rows;
|
||||
• open (non-`done`) task states — (id, status).
|
||||
|
||||
Deliberately EXCLUDES the orchestrator's own session and its agent_status,
|
||||
and the orchestrator chat scope entirely. A round mutates exactly those
|
||||
(it calls `report_status` for itself and writes its prompt+reply into the
|
||||
orchestrator scope every tick); folding them in would make the fingerprint
|
||||
change on every round and the skip gate would never engage. A user turn
|
||||
wakes the orchestrator on its own `[turn:…]` path, independent of this loop,
|
||||
so user chat need not be fingerprinted either.
|
||||
"""
|
||||
orch = orchestrator_uuid or ""
|
||||
session_rows = conn.execute(
|
||||
"""
|
||||
SELECT s.uuid, s.last_triage_status,
|
||||
ag.state, ag.summary, ag.source, ag.task_id
|
||||
FROM sessions s
|
||||
LEFT JOIN agent_status ag ON ag.session_uuid = s.uuid
|
||||
WHERE s.liveness = 'alive' AND s.uuid != ?
|
||||
ORDER BY s.uuid
|
||||
""",
|
||||
(orch,),
|
||||
).fetchall()
|
||||
task_rows = conn.execute(
|
||||
"""
|
||||
SELECT id, status FROM tasks
|
||||
WHERE status != 'done'
|
||||
ORDER BY id
|
||||
"""
|
||||
).fetchall()
|
||||
|
||||
h = hashlib.sha256()
|
||||
for r in session_rows:
|
||||
h.update(repr(tuple(r)).encode())
|
||||
h.update(b"\x00tasks\x00")
|
||||
for r in task_rows:
|
||||
h.update(repr(tuple(r)).encode())
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def build_rounds_prompt(cfg: ClaireConfig) -> str:
|
||||
"""The structured DO-ROUNDS prompt posted to the orchestrator.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue