Compare commits
10 commits
39ccf5a9f5
...
0319ea6bcc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0319ea6bcc | ||
|
|
aa2aa68d72 | ||
|
|
3d626a6d88 | ||
|
|
042f5d6756 | ||
|
|
16c030c6b3 | ||
|
|
24c6f24f43 | ||
|
|
c5e63c5942 | ||
|
|
2d3777a7d0 | ||
|
|
ce6948d6e9 | ||
|
|
8d82bb0abc |
18 changed files with 1082 additions and 31 deletions
|
|
@ -1 +0,0 @@
|
|||
{"sessionId":"c5a10f83-d897-4eb1-aa4f-a4264850dbf3","pid":15738,"procStart":"Mon Jun 1 06:29:14 2026","acquiredAt":1780437249785}
|
||||
|
|
@ -8,6 +8,13 @@ Wants=network-online.target
|
|||
|
||||
[Service]
|
||||
Type=simple
|
||||
# KillMode=process (NOT the default control-group): restarting this unit on a
|
||||
# redeploy must NOT tear down the worker claude/tmux sessions the agent spawned
|
||||
# into its cgroup. control-group SIGTERMs the whole tree, silently killing live
|
||||
# dispatched workers on every deploy (the next-tour-planner was lost this way on
|
||||
# 2026-06-02). With process, only `claire agent run` is signalled; the panes
|
||||
# survive and the fresh agent re-discovers them via the pull/liveness pass.
|
||||
KillMode=process
|
||||
# rclaude (session-tools) lives in ~/.local/bin (+ pnpm global). systemd --user
|
||||
# starts with a minimal PATH, so the supervisor couldn't find rclaude — add the
|
||||
# user bin dirs. (Where rclaude is absent, the supervisor self-disables.)
|
||||
|
|
|
|||
|
|
@ -32,31 +32,43 @@ PY
|
|||
)"
|
||||
say "plum peer URL = $PLUM_URL"
|
||||
|
||||
say "[$HOST] reachability + clock"
|
||||
ssh -o ConnectTimeout=8 -o BatchMode=yes "$HOST" 'true' \
|
||||
|| { echo "ERROR: cannot ssh $HOST" >&2; exit 1; }
|
||||
ssh "$HOST" 'timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown'
|
||||
# Resolve a reachable SSH transport. The host LABEL stays $HOST (claire
|
||||
# identity / sessions.host / per_host config), but the plum↔host route flaps:
|
||||
# `.lan` is unreachable off-site and the direct WG relay can drop, so fall back
|
||||
# to the `-wg` (direct WireGuard) then `-j` (black jump-host) aliases defined in
|
||||
# ~/.ssh/config. Only the bare ssh/rsync legs need this — `remote-run` does its
|
||||
# own routing. Override with CLAIRE_SSH_ALIAS=<alias> to force one.
|
||||
say "[$HOST] resolve ssh transport + clock"
|
||||
SSH=""
|
||||
for cand in ${CLAIRE_SSH_ALIAS:-"$HOST" "${HOST}-wg" "${HOST}-j"}; do
|
||||
if ssh -o ConnectTimeout=8 -o BatchMode=yes "$cand" 'true' 2>/dev/null; then
|
||||
SSH="$cand"; break
|
||||
fi
|
||||
done
|
||||
[ -n "$SSH" ] || { echo "ERROR: no reachable ssh transport for $HOST (tried ${CLAIRE_SSH_ALIAS:-$HOST $HOST-wg $HOST-j})" >&2; exit 1; }
|
||||
[ "$SSH" = "$HOST" ] || say "[$HOST] direct route down — using ssh transport '$SSH'"
|
||||
ssh "$SSH" 'timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown'
|
||||
|
||||
say "[$HOST] rsync source"
|
||||
ssh "$HOST" "mkdir -p ~/$REMOTE_DIR"
|
||||
rsync -az --delete \
|
||||
ssh "$SSH" "mkdir -p ~/$REMOTE_DIR"
|
||||
rsync -az --delete -e ssh \
|
||||
--exclude='.venv/' --exclude='.git/' --exclude='__pycache__/' \
|
||||
--exclude='*.pyc' --exclude='.pytest_cache/' --exclude='.ruff_cache/' \
|
||||
--exclude='claire.toml' \
|
||||
--exclude='src/claire/web/app/node_modules/' \
|
||||
--exclude='src/claire/web/app/dist/' \
|
||||
"$SRC/" "${HOST}:${REMOTE_DIR}/"
|
||||
"$SRC/" "${SSH}:${REMOTE_DIR}/"
|
||||
|
||||
say "[$HOST] install (uv if present, else python venv+pip) + init"
|
||||
remote-run "$HOST" "export PATH=\"\$HOME/.local/bin:\$PATH\"; cd ~/$REMOTE_DIR && if command -v uv >/dev/null 2>&1; then { [ -d .venv ] || uv venv; }; uv pip install -e .; else { [ -d .venv ] || python3 -m venv .venv; }; .venv/bin/pip install -q -e .; fi && .venv/bin/claire init"
|
||||
|
||||
say "[$HOST] seed vault (BEFORE agent starts — it reads the HMAC secret from here)"
|
||||
ssh "$HOST" 'mkdir -p ~/.vault && chmod 700 ~/.vault'
|
||||
rsync -az --no-owner --no-group --chmod=D700,F600 \
|
||||
ssh "$SSH" 'mkdir -p ~/.vault && chmod 700 ~/.vault'
|
||||
rsync -az --no-owner --no-group --chmod=D700,F600 -e ssh \
|
||||
--exclude='.vault-backups/' --exclude='*.prev.txt' \
|
||||
"$HOME/.vault/" "${HOST}:.vault/"
|
||||
"$HOME/.vault/" "${SSH}:.vault/"
|
||||
# Gate: the agent will 401 forever without the shared secret present.
|
||||
ssh "$HOST" '[ -s ~/.vault/claire-sync-secret.txt ]' \
|
||||
ssh "$SSH" '[ -s ~/.vault/claire-sync-secret.txt ]' \
|
||||
|| { echo "ERROR: ~/.vault/claire-sync-secret.txt missing on $HOST after seed" >&2; exit 1; }
|
||||
|
||||
say "[$HOST] configure peer (url only — secret is vault-sourced)"
|
||||
|
|
@ -75,6 +87,10 @@ remote-run "$HOST" "
|
|||
systemctl --user restart claire-agent.service
|
||||
loginctl enable-linger \$(whoami) 2>/dev/null || true
|
||||
sleep 2
|
||||
systemctl --user --no-pager status claire-agent.service | head -5
|
||||
# Real gate: is-active is non-zero iff the unit failed to come up. The status
|
||||
# dump below is cosmetic — piping to head closes the pipe early (SIGPIPE), so
|
||||
# keep it non-fatal or it false-aborts an otherwise-healthy deploy.
|
||||
systemctl --user is-active claire-agent.service
|
||||
systemctl --user --no-pager status claire-agent.service 2>&1 | head -5 || true
|
||||
"
|
||||
say "[$HOST] done."
|
||||
|
|
|
|||
91
scripts/release-fleet.sh
Executable file
91
scripts/release-fleet.sh
Executable file
|
|
@ -0,0 +1,91 @@
|
|||
#!/usr/bin/env bash
|
||||
#
|
||||
# release-fleet.sh — one command to release the current claire working tree to
|
||||
# the WHOLE fleet and restart every service. Runs FROM plum (the source of
|
||||
# truth + the only host with the launchd services).
|
||||
#
|
||||
# scripts/release-fleet.sh # test → deploy apricot+black → restart plum
|
||||
# scripts/release-fleet.sh --no-test # skip the pre-deploy pytest gate
|
||||
# scripts/release-fleet.sh --no-plum # leave plum's services running (only push workers)
|
||||
# scripts/release-fleet.sh --hosts apricot # restrict the worker host list
|
||||
# scripts/release-fleet.sh --dry-run # print the plan, change nothing
|
||||
#
|
||||
# What it does, in order:
|
||||
# 1. (gate) run the test suite — abort the whole release if it fails.
|
||||
# 2. workers (apricot, black): scripts/deploy-agent.sh <host>
|
||||
# → rsync working tree + `uv pip install -e .` + restart claire-agent.service.
|
||||
# 3. plum: `launchctl kickstart -k` claire-serve + claire-tray
|
||||
# → editable install, so a restart is all that's needed to load new code.
|
||||
#
|
||||
# ⚠ Restarting plum's `com.lilith.claire-serve` briefly drops the web / API /
|
||||
# MCP endpoint (~a few seconds). Anything mid-call against claire's MCP tools
|
||||
# — INCLUDING the orchestrator itself — will blip until it comes back. Run it
|
||||
# when you can tolerate that, or pass --no-plum and restart plum by hand.
|
||||
#
|
||||
# Requires (same as deploy-agent.sh): `remote-run` on PATH, ssh to the worker
|
||||
# hosts, uv/python on the remotes, NTP-synced clocks.
|
||||
set -euo pipefail
|
||||
|
||||
SRC="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
|
||||
HOSTS=(apricot black)
|
||||
RUN_TESTS=1
|
||||
RESTART_PLUM=1
|
||||
DRY_RUN=0
|
||||
PLUM_SERVICES=(com.lilith.claire-serve com.lilith.claire-tray)
|
||||
|
||||
say() { printf '\033[1;35m▸\033[0m %s\n' "$*"; }
|
||||
warn() { printf '\033[1;33m⚠\033[0m %s\n' "$*" >&2; }
|
||||
die() { printf '\033[1;31m✗\033[0m %s\n' "$*" >&2; exit 1; }
|
||||
run() { if [ "$DRY_RUN" = 1 ]; then printf ' [dry-run] %s\n' "$*"; else eval "$@"; fi; }
|
||||
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
--no-test) RUN_TESTS=0; shift ;;
|
||||
--no-plum) RESTART_PLUM=0; shift ;;
|
||||
--dry-run) DRY_RUN=1; shift ;;
|
||||
--hosts) shift; IFS=' ' read -r -a HOSTS <<< "${1:?--hosts needs a value}"; shift ;;
|
||||
-h|--help) sed -n '2,30p' "$0"; exit 0 ;;
|
||||
*) die "unknown arg: $1 (try --help)" ;;
|
||||
esac
|
||||
done
|
||||
|
||||
# --- 1. test gate ----------------------------------------------------------
|
||||
if [ "$RUN_TESTS" = 1 ]; then
|
||||
say "test gate: pytest (use --no-test to skip)"
|
||||
# Run via the project venv (where pytest + dev deps live); fall back to uv's
|
||||
# managed env. `uv run pytest` is deliberately NOT used — pytest is a dev-extra
|
||||
# in .venv, not a uv tool, so that spawn fails on this repo.
|
||||
if [ -x "$SRC/.venv/bin/python" ]; then
|
||||
run "(cd '$SRC' && .venv/bin/python -m pytest -q)" || die "tests failed — release aborted"
|
||||
elif command -v uv >/dev/null 2>&1; then
|
||||
run "(cd '$SRC' && uv run python -m pytest -q)" || die "tests failed — release aborted"
|
||||
else
|
||||
die "no .venv/bin/python and no uv — cannot run the test gate (use --no-test to override)"
|
||||
fi
|
||||
else
|
||||
warn "skipping test gate (--no-test)"
|
||||
fi
|
||||
|
||||
# --- 2. worker hosts -------------------------------------------------------
|
||||
for h in "${HOSTS[@]}"; do
|
||||
say "deploy worker → $h"
|
||||
run "'$SRC/scripts/deploy-agent.sh' '$h'" || die "deploy-agent.sh $h failed"
|
||||
done
|
||||
|
||||
# --- 3. plum services ------------------------------------------------------
|
||||
if [ "$RESTART_PLUM" = 1 ]; then
|
||||
warn "restarting plum services ${PLUM_SERVICES[*]} — claire-serve restart blips the MCP/web endpoint"
|
||||
uid="$(id -u)"
|
||||
for svc in "${PLUM_SERVICES[@]}"; do
|
||||
if launchctl print "gui/$uid/$svc" >/dev/null 2>&1; then
|
||||
say "kickstart $svc"
|
||||
run "launchctl kickstart -k 'gui/$uid/$svc'" || warn "kickstart $svc returned non-zero"
|
||||
else
|
||||
warn "$svc not loaded in gui/$uid — skipping (load its LaunchAgent first)"
|
||||
fi
|
||||
done
|
||||
else
|
||||
warn "leaving plum services running (--no-plum) — restart them by hand to load new code"
|
||||
fi
|
||||
|
||||
if [ "$DRY_RUN" = 1 ]; then say "release plan printed (dry-run — nothing changed)."; else say "release complete."; fi
|
||||
|
|
@ -25,9 +25,107 @@ logger = logging.getLogger(__name__)
|
|||
_escalation_state: dict[str, int] = {}
|
||||
# session_uuid -> consecutive auto-continue nudges (reset when it recovers).
|
||||
_auto_continue_state: dict[str, int] = {}
|
||||
# session_uuid -> consecutive auto-resume attempts (reset once seen live again).
|
||||
_auto_resume_state: dict[str, int] = {}
|
||||
|
||||
_LOCAL = "local"
|
||||
|
||||
# Sent to a session right after it is auto-resumed. The session must
|
||||
# re-establish its OWN state before acting — it cannot assume it was mid-task
|
||||
# (it may have already finished, or — though the supersession guard prevents
|
||||
# this — been superseded). Mirrors the orchestrator's rehydrate-on-startup.
|
||||
_REORIENT_KICK = (
|
||||
"[resumed] Your session was auto-resumed after its pane died (deploy, "
|
||||
"crash, OOM, or host reboot — not anything you did wrong). Before doing "
|
||||
"ANYTHING else, DETERMINE YOUR CURRENT STATE: re-read your assigned task "
|
||||
"and your own recent transcript, work out what you already completed vs "
|
||||
"what is still pending, and whether you are in fact already finished. THEN "
|
||||
"either continue from exactly where you left off, or — if the work is done "
|
||||
"— report completion via report_status and stop. Do NOT blindly redo work "
|
||||
"you already completed."
|
||||
)
|
||||
|
||||
|
||||
def _is_orchestrator_cwd(cwd: str | None) -> bool:
|
||||
"""True if `cwd` is an orchestrator workspace (the `[<host>] claire` panes).
|
||||
|
||||
Those have their own bootstrap + age-recycle lifecycle; auto-resuming them
|
||||
would race that machinery, so they're excluded. Matched by path suffix
|
||||
(robust across hosts/accounts) rather than a session-name substring.
|
||||
"""
|
||||
if not cwd:
|
||||
return False
|
||||
return cwd.rstrip("/").endswith("claire/orchestrator")
|
||||
|
||||
|
||||
def _stage_dispatch_mcp(host: str) -> str | None:
|
||||
"""Stage the worker MCP config so a resumed session regains the same
|
||||
`claire:*` tools a freshly-dispatched one gets (report_status, etc.).
|
||||
Returns the staged path, or None if staging fails — the session still
|
||||
resumes, just degraded (invisible to the fleet view), never blocking it.
|
||||
"""
|
||||
try:
|
||||
from ..orchestrator.bootstrap import stage_dispatch_mcp_config
|
||||
|
||||
return stage_dispatch_mcp_config(host)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("auto-resume: dispatch MCP staging failed: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
def select_resume_candidates(
|
||||
sessions,
|
||||
tmux_rows,
|
||||
*,
|
||||
window_s: int,
|
||||
now: float,
|
||||
attempts: dict[str, int],
|
||||
max_attempts: int,
|
||||
max_per_tick: int,
|
||||
):
|
||||
"""Pure selection of DEAD-but-recent LOCAL sessions to auto-resume.
|
||||
|
||||
A candidate is a local session with no live tmux pane whose JSONL mtime is
|
||||
within `window_s` (recently alive — not a graveyard entry). Guards, in
|
||||
order, each producing a `(session, reason)` skip record for logging:
|
||||
• orchestrator-workspace cwd → excluded (own lifecycle);
|
||||
• supersession: a LIVE local session shares the dead one's cwd → excluded
|
||||
(re-spawning would put two sessions in one workspace — the re-orient
|
||||
prompt can't detect a sibling, so this MUST be a hard guard);
|
||||
• per-session retry cap (`max_attempts`) → excluded;
|
||||
• per-tick global ceiling (`max_per_tick`) → the overflow is deferred.
|
||||
|
||||
Returns `(to_resume, skipped)`. `to_resume` is capped at `max_per_tick`.
|
||||
"""
|
||||
live_uuids = {
|
||||
str(r.resumed_uuid) for r in tmux_rows if getattr(r, "resumed_uuid", None)
|
||||
}
|
||||
live_cwds = {
|
||||
s.cwd for s in sessions if s.host == _LOCAL and str(s.uuid) in live_uuids
|
||||
}
|
||||
eligible = []
|
||||
skipped: list[tuple] = []
|
||||
for s in sessions:
|
||||
if s.host != _LOCAL:
|
||||
continue
|
||||
if str(s.uuid) in live_uuids:
|
||||
continue # alive — not a resume candidate
|
||||
if now - s.mtime_epoch > window_s:
|
||||
continue # too old to count as "recently alive" — silently ignore
|
||||
if _is_orchestrator_cwd(s.cwd):
|
||||
skipped.append((s, "orchestrator-workspace"))
|
||||
continue
|
||||
if s.cwd in live_cwds:
|
||||
skipped.append((s, "superseded-by-live-session-in-cwd"))
|
||||
continue
|
||||
if attempts.get(str(s.uuid), 0) >= max_attempts:
|
||||
skipped.append((s, "retry-cap-reached"))
|
||||
continue
|
||||
eligible.append(s)
|
||||
for s in eligible[max_per_tick:]:
|
||||
skipped.append((s, "per-tick-ceiling-deferred"))
|
||||
return eligible[:max_per_tick], skipped
|
||||
|
||||
# Task states where a session is NOT just "idle with more to do" — it's
|
||||
# genuinely parked (awaiting review or a human, or finished). Auto-continue
|
||||
# must never nudge these, or it churns work that's deliberately paused.
|
||||
|
|
@ -107,11 +205,13 @@ async def supervisor_loop(*, config_path: Path | None, db_path: Path | None) ->
|
|||
allow_respawn = cfg.agent.supervisor_allow_respawn
|
||||
ac_mode = cfg.agent.auto_continue # off | dry-run | on
|
||||
ac_max = cfg.agent.auto_continue_max
|
||||
ar_mode = cfg.agent.auto_resume # off | dry-run | on
|
||||
poll = min(60, max(20, threshold // 3))
|
||||
rcl = Rclaude()
|
||||
logger.info(
|
||||
"agent supervisor loop enabled (poll %ds, wedge>%ds, respawn=%s, auto_continue=%s)",
|
||||
poll, threshold, allow_respawn, ac_mode,
|
||||
"agent supervisor loop enabled (poll %ds, wedge>%ds, respawn=%s, "
|
||||
"auto_continue=%s, auto_resume=%s)",
|
||||
poll, threshold, allow_respawn, ac_mode, ar_mode,
|
||||
)
|
||||
|
||||
while True:
|
||||
|
|
@ -181,12 +281,65 @@ async def supervisor_loop(*, config_path: Path | None, db_path: Path | None) ->
|
|||
except RclaudeError as exc:
|
||||
logger.warning("supervisor respawn-kill failed: %s", exc)
|
||||
|
||||
# Auto-resume: resurrect DEAD-but-recent local sessions. Gated +
|
||||
# capped; ships off → dry-run → on. The re-orient kick makes each
|
||||
# resumed session re-determine its own state before acting.
|
||||
live_uuids = {
|
||||
str(r.resumed_uuid)
|
||||
for r in tmux_rows
|
||||
if getattr(r, "resumed_uuid", None)
|
||||
}
|
||||
if ar_mode != "off":
|
||||
to_resume, skipped = select_resume_candidates(
|
||||
sessions, tmux_rows,
|
||||
window_s=cfg.agent.auto_resume_window_s, now=time.time(),
|
||||
attempts=_auto_resume_state,
|
||||
max_attempts=cfg.agent.auto_resume_max,
|
||||
max_per_tick=cfg.agent.auto_resume_max_per_tick,
|
||||
)
|
||||
for s, reason in skipped:
|
||||
logger.info(
|
||||
"supervisor: auto-resume SKIP %s (%s)", str(s.uuid)[:8], reason
|
||||
)
|
||||
for s in to_resume:
|
||||
key = str(s.uuid)
|
||||
n = _auto_resume_state.get(key, 0) + 1
|
||||
_auto_resume_state[key] = n
|
||||
if ar_mode == "dry-run":
|
||||
logger.info(
|
||||
"supervisor: WOULD auto-resume %s in %s (x%d/%d)",
|
||||
key[:8], s.cwd, n, cfg.agent.auto_resume_max,
|
||||
)
|
||||
continue
|
||||
logger.warning(
|
||||
"supervisor: auto-resuming %s in %s (x%d/%d)",
|
||||
key[:8], s.cwd, n, cfg.agent.auto_resume_max,
|
||||
)
|
||||
try:
|
||||
mcp_cfg = _stage_dispatch_mcp(_LOCAL)
|
||||
new_name = await asyncio.to_thread(
|
||||
rcl.resume,
|
||||
host=_LOCAL, cwd=s.cwd, resume_uuid=key,
|
||||
mcp_config=mcp_cfg, name=None,
|
||||
)
|
||||
await asyncio.sleep(8) # let tmux + claude reach the prompt
|
||||
await asyncio.to_thread(
|
||||
rcl.send, text=_REORIENT_KICK, match=new_name, yes=True
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("supervisor auto-resume failed for %s: %s", key[:8], exc)
|
||||
|
||||
# Reset counters for sessions that recovered (no longer wedged).
|
||||
wedged_keys = {str(s.uuid) for s in wedged}
|
||||
for d in (_escalation_state, _auto_continue_state):
|
||||
for key in list(d):
|
||||
if key not in wedged_keys:
|
||||
d.pop(key, None)
|
||||
# Auto-resume counter resets once the session is live again (the
|
||||
# resume stuck); a later death is then eligible to resume afresh.
|
||||
for key in list(_auto_resume_state):
|
||||
if key in live_uuids:
|
||||
_auto_resume_state.pop(key, None)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
except Exception as exc: # noqa: BLE001
|
||||
|
|
|
|||
|
|
@ -44,6 +44,13 @@ class HostEntry(_Strict):
|
|||
# views from synced data. Optional — legacy entries lack it; resolution
|
||||
# falls back to label matching when absent.
|
||||
machine_id: str | None = None
|
||||
# Free-form capability tags describing what this host CAN do — used by
|
||||
# routing (location-transparent Claire) and dispatch to pick a host by
|
||||
# capability, not just load. Conventions: `gpu`, `media`, `transmission`,
|
||||
# `cores:64`, `mount:<name>`, `svc:<name>`. Matching is by exact tag OR
|
||||
# `key:` prefix (see `ClaireConfig.hosts_with_capability`). Empty = no
|
||||
# declared specialties (host is general-purpose).
|
||||
capabilities: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class WebConfig(_Strict):
|
||||
|
|
@ -75,6 +82,21 @@ class AgentConfig(_Strict):
|
|||
# capped at `auto_continue_max` consecutive nudges per session.
|
||||
auto_continue: Literal["off", "dry-run", "on"] = "off"
|
||||
auto_continue_max: int = Field(default=3, ge=1, le=20)
|
||||
# Auto-resume DEAD local worker sessions (pane gone — crash/OOM/host reboot;
|
||||
# the KillMode=process unit fix already stops deploy-kills). On a death the
|
||||
# session JSONL persists and is resumable, so the supervisor can spawn
|
||||
# `claude --resume <uuid>` and send a re-orient kick so the session
|
||||
# re-determines its OWN state (what's done/pending/finished) before acting.
|
||||
# "off" (default) | "dry-run" (LOG the would-resume set — validate first) |
|
||||
# "on" (actually resume). Guarded: recency window, supersession (skip if a
|
||||
# LIVE session shares the dead one's cwd), orchestrator-workspace exclusion
|
||||
# (those have their own bootstrap/recycle lifecycle), per-session retry cap,
|
||||
# and a per-tick global ceiling — the real guard against a first-wake token
|
||||
# storm when many recent sessions are found dead at once.
|
||||
auto_resume: Literal["off", "dry-run", "on"] = "off"
|
||||
auto_resume_max: int = Field(default=3, ge=1, le=20)
|
||||
auto_resume_max_per_tick: int = Field(default=3, ge=1, le=50)
|
||||
auto_resume_window_s: int = Field(default=86400, ge=300, le=604800)
|
||||
# When True, this peer node ALSO bootstraps + registers a local orchestrator
|
||||
# session (`[<host>] claire` in the remote dev list) alongside its sync/
|
||||
# supervisor/telemetry loops — so every host is remote-controllable. The
|
||||
|
|
@ -130,6 +152,19 @@ class OrchestratorConfig(_Strict):
|
|||
# the fleet, identifies blockers, and surfaces next-step recommendations
|
||||
# — closing the loop on "reduce required user interaction." 0 disables.
|
||||
rounds_interval_s: int = Field(default=0, ge=0, le=86400)
|
||||
# Cost gate for the rounds timer. When True (default), the loop computes a
|
||||
# cheap pure-Python fingerprint of *worker* fleet state (live sessions minus
|
||||
# the orchestrator's own, + open task states) each tick and SKIPS posting a
|
||||
# DO-ROUNDS turn — i.e. never wakes the (expensive) orchestrator model —
|
||||
# while nothing has changed. The orchestrator's own session/chat is excluded
|
||||
# from the fingerprint so a round's own side effects can't keep it "changed"
|
||||
# forever. Set False to post every tick unconditionally (legacy behavior).
|
||||
rounds_skip_unchanged: bool = True
|
||||
# Max-staleness floor for the skip gate: force a real round at least once
|
||||
# every N ticks even when the fingerprint is unchanged, so the HUD/heartbeat
|
||||
# never goes fully silent. With rounds_interval_s=900 (15m), 6 ⇒ a forced
|
||||
# round at least every 90m. Ignored when rounds_skip_unchanged is False.
|
||||
rounds_heartbeat_every: int = Field(default=6, ge=1, le=1000)
|
||||
# When False (default), Claire's rounds loop only *proposes* dispatch —
|
||||
# spawning a session is a real external action, so it waits for the
|
||||
# user. Set True to let Claire dispatch eligible work autonomously
|
||||
|
|
@ -278,6 +313,29 @@ class ClaireConfig(_Strict):
|
|||
return h.name
|
||||
return label
|
||||
|
||||
def capabilities_for(self, host: str) -> list[str]:
|
||||
"""Capability tags declared for `host` (label resolved to canonical)."""
|
||||
canon = self.resolve_host_label(host)
|
||||
for h in self.known_hosts:
|
||||
if h.name == canon:
|
||||
return list(h.capabilities)
|
||||
return []
|
||||
|
||||
def hosts_with_capability(self, tag: str) -> list[str]:
|
||||
"""Canonical host names that satisfy capability `tag`.
|
||||
|
||||
Matches a host's `capabilities` either exactly (`"gpu" == "gpu"`) or by
|
||||
key for `key:value` tags (asking `"cores"` matches `"cores:64"`), so a
|
||||
router can query `hosts_with_capability("cores")` without knowing the
|
||||
value. Returns names in `known_hosts` order (stable, caller picks).
|
||||
"""
|
||||
key = tag + ":"
|
||||
out: list[str] = []
|
||||
for h in self.known_hosts:
|
||||
if any(c == tag or c.startswith(key) for c in h.capabilities):
|
||||
out.append(h.name)
|
||||
return out
|
||||
|
||||
|
||||
def default_config_path() -> Path:
|
||||
"""`~/.config/claire/claire.toml` — XDG-compliant."""
|
||||
|
|
@ -363,6 +421,10 @@ def _serialize(cfg: ClaireConfig) -> str:
|
|||
)
|
||||
lines.append(f'auto_continue = "{ag.auto_continue}"')
|
||||
lines.append(f"auto_continue_max = {ag.auto_continue_max}")
|
||||
lines.append(f'auto_resume = "{ag.auto_resume}"')
|
||||
lines.append(f"auto_resume_max = {ag.auto_resume_max}")
|
||||
lines.append(f"auto_resume_max_per_tick = {ag.auto_resume_max_per_tick}")
|
||||
lines.append(f"auto_resume_window_s = {ag.auto_resume_window_s}")
|
||||
lines.append(
|
||||
f"orchestrator_enable = {str(ag.orchestrator_enable).lower()}"
|
||||
)
|
||||
|
|
@ -387,6 +449,8 @@ def _serialize(cfg: ClaireConfig) -> str:
|
|||
lines.append(f'description = "{h.description}"')
|
||||
if h.machine_id is not None:
|
||||
lines.append(f'machine_id = "{h.machine_id}"')
|
||||
if h.capabilities:
|
||||
lines.append(f"capabilities = {_fmt_str_list(list(h.capabilities))}")
|
||||
for peer in cfg.peers:
|
||||
lines.append("")
|
||||
lines.append("[[peers]]")
|
||||
|
|
|
|||
|
|
@ -436,6 +436,9 @@ def open_db(path: Path | str | None = None) -> sqlite3.Connection:
|
|||
conn = sqlite3.connect(str(path), isolation_level=None, check_same_thread=False)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA foreign_keys = ON")
|
||||
# Wait up to 5s for a held write lock instead of failing immediately with
|
||||
# "database is locked" — peer-sync ingest and API writes contend otherwise.
|
||||
conn.execute("PRAGMA busy_timeout = 5000")
|
||||
# WAL doesn't apply to :memory: and isn't strictly needed; harmless to try.
|
||||
if str(path) != ":memory:":
|
||||
conn.execute("PRAGMA journal_mode = WAL")
|
||||
|
|
|
|||
|
|
@ -73,7 +73,20 @@ Never redo finished work or silently drop a pending review.
|
|||
## Per-turn workflow
|
||||
|
||||
1. Read the user's request (everything after `[turn:<id>]`).
|
||||
2. Use the Claire MCP tools as needed:
|
||||
2. **Route the turn (location transparency).** It should not matter which
|
||||
`[<host>] claire` the user is talking to. When the request concerns work
|
||||
that might live on another host, call `resolve_host(...)` with the signals
|
||||
you extract from the turn:
|
||||
- `explicit_host` — the user named a host ("on apricot", "@black");
|
||||
- `capability_needs` — a host-specific resource the work needs (e.g.
|
||||
`["media"]`, `["gpu"]`, `["mount:…"]`);
|
||||
- `session_uuid` / `task_id` — an existing session/task it is about.
|
||||
It returns `{host, reason, candidates}`. If `host` is NOT this node, the
|
||||
work belongs on that host's Claire — say so plainly in your reply (e.g.
|
||||
"that runs on `[apricot] claire`") and surface the hand-off as the action.
|
||||
If it returns this node, or there is no host-specific signal, handle it
|
||||
here as normal. Skip routing for trivially-local requests (status, chat).
|
||||
3. Use the Claire MCP tools as needed:
|
||||
- **Read**: `list_recent_events`, `search_chat_messages`, `get_session`,
|
||||
`list_fleet` (snapshot of every active agent's current task/state).
|
||||
- **Act**: `create_project`, `add_task`, `create_assignment`,
|
||||
|
|
@ -81,12 +94,12 @@ Never redo finished work or silently drop a pending review.
|
|||
- **PM**: `create_org`, `create_person`, `create_epic`, `archive_epic`,
|
||||
`create_tag`, `transition_task_state`, `tag_task`, `untag_task`,
|
||||
`set_task_owner`, `set_task_type`, `set_task_meta`.
|
||||
- **Plan**: `summarize_project`, `suggest_assignments`.
|
||||
- **Plan**: `summarize_project`, `suggest_assignments`, `resolve_host`.
|
||||
- **Reference**: `status`, `list_tasks`, `help`.
|
||||
3. **Always call `report_status`** once per turn with your own
|
||||
4. **Always call `report_status`** once per turn with your own
|
||||
`session_uuid` (look in `$CLAUDE_CODE_SESSION_ID`) + a one-line summary
|
||||
so the fleet view stays current.
|
||||
4. When done, call `submit_chat_reply(body=<your reply>, turn_id=<id>)`.
|
||||
5. When done, call `submit_chat_reply(body=<your reply>, turn_id=<id>)`.
|
||||
This is REQUIRED — without it, the user sees nothing.
|
||||
|
||||
Exactly one `submit_chat_reply` call per user turn.
|
||||
|
|
|
|||
|
|
@ -541,6 +541,36 @@ def build_server() -> FastMCP:
|
|||
lambda c, _g: tools.fleet_load(c),
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
async def resolve_host(
|
||||
explicit_host: str | None = None,
|
||||
capability_needs: list[str] | None = None,
|
||||
session_uuid: str | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Decide which host a turn or piece of work belongs on. Deterministic
|
||||
cascade: explicit host named > capability required (gpu/media/mount/svc)
|
||||
> sticky (the subject's session/task already runs on a host) >
|
||||
default-local. Extract these signals from the user's turn and pass what
|
||||
applies; act on the returned {host, reason}. For location-transparent
|
||||
Claire: if `host` != this node, the work belongs on that host's Claire."""
|
||||
return await _call_tool(
|
||||
"resolve_host",
|
||||
{
|
||||
"explicit_host": explicit_host,
|
||||
"capability_needs": capability_needs,
|
||||
"session_uuid": session_uuid,
|
||||
"task_id": task_id,
|
||||
},
|
||||
lambda c, _g: tools.resolve_host(
|
||||
c,
|
||||
explicit_host=explicit_host,
|
||||
capability_needs=capability_needs,
|
||||
session_uuid=session_uuid,
|
||||
task_id=task_id,
|
||||
),
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
async def dispatch_task(task_id: str, host: str, cwd: str) -> dict[str, Any]:
|
||||
"""Spawn a Claude session to work a task on `host` at `cwd`.
|
||||
|
|
|
|||
|
|
@ -249,6 +249,7 @@ def help_text() -> dict[str, Any]:
|
|||
{"name": "get_session", "summary": "Details for one session by uuid."},
|
||||
{"name": "summarize_project", "summary": "Project-level summary (M5)."},
|
||||
{"name": "suggest_assignments", "summary": "Suggest unassigned-task → session pairs (M5)."},
|
||||
{"name": "resolve_host", "summary": "Decide which host a turn/work belongs on (explicit>capability>sticky>local)."},
|
||||
{"name": "send_to_session", "summary": "Send text to one specific session."},
|
||||
{"name": "submit_chat_reply", "summary": "Signal the user-turn is done."},
|
||||
# PM expansion
|
||||
|
|
@ -507,6 +508,47 @@ def fleet_load(conn: sqlite3.Connection) -> dict[str, Any]:
|
|||
return service.fleet_load(conn)
|
||||
|
||||
|
||||
def resolve_host(
|
||||
conn: sqlite3.Connection,
|
||||
*,
|
||||
explicit_host: str | None = None,
|
||||
capability_needs: list[str] | None = None,
|
||||
session_uuid: str | None = None,
|
||||
task_id: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Decide which host a turn/work belongs on — deterministic cascade
|
||||
(explicit > capability > sticky > default-local). The orchestrator (LLM)
|
||||
extracts the signals from the user's turn and calls this; the returned
|
||||
{host, reason, detail, candidates} is the routing decision to act on.
|
||||
Live-session counts feed the capability tiebreak. For location-transparent
|
||||
Claire: if `host` != this node, the work belongs on another host's Claire.
|
||||
"""
|
||||
from .. import routing
|
||||
from ..config import load_or_init
|
||||
|
||||
cfg = load_or_init()
|
||||
load = {
|
||||
h["host"]: h["live_sessions"]
|
||||
for h in service.fleet_load(conn).get("hosts", [])
|
||||
}
|
||||
d = routing.route(
|
||||
conn,
|
||||
cfg,
|
||||
receiving_host=cfg.this_host_label(),
|
||||
explicit_host=explicit_host,
|
||||
capability_needs=capability_needs,
|
||||
session_uuid=session_uuid,
|
||||
task_id=task_id,
|
||||
host_load=load,
|
||||
)
|
||||
return {
|
||||
"host": d.host,
|
||||
"reason": d.reason,
|
||||
"detail": d.detail,
|
||||
"candidates": list(d.candidates),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PM expansion (migration 0003_pm). Each tool mirrors a service mutator.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -284,6 +284,39 @@ class Rclaude:
|
|||
raise RclaudeError("rclaude spawn returned no tmux session name")
|
||||
return name
|
||||
|
||||
def resume(
|
||||
self,
|
||||
*,
|
||||
host: str,
|
||||
cwd: str,
|
||||
resume_uuid: str,
|
||||
mcp_config: str | None = None,
|
||||
name: str | None = None,
|
||||
) -> str:
|
||||
"""Resurrect a DEAD session by uuid: spawn `claude --resume <uuid>` in a
|
||||
fresh detached tmux at `cwd` on `host`.
|
||||
|
||||
Mirrors `spawn()` but injects `RCLAUDE_RESUME_ID` — which rclaude honors
|
||||
to resume the given session instead of starting fresh (the inverse of
|
||||
the leak guard `spawn()` documents) — and deliberately does NOT strip the
|
||||
resume env vars. The new pane carries `claude --resume <uuid>` in its
|
||||
command, so `list_tmux` reports `resumed_uuid == <uuid>` and the pull
|
||||
loop re-discovers it. Returns the new tmux session name.
|
||||
"""
|
||||
env: dict[str, str] = {
|
||||
"RCLAUDE_DETACHED": "1",
|
||||
"RCLAUDE_RESUME_ID": resume_uuid,
|
||||
}
|
||||
if mcp_config is not None:
|
||||
env["RCLAUDE_MCP_CONFIG"] = mcp_config
|
||||
if name is not None:
|
||||
env["RCLAUDE_SESSION_NAME"] = name
|
||||
raw = self._run([host, cwd], env_extra=env) # NB: keep the resume vars
|
||||
out = raw.strip().splitlines()[-1] if raw.strip() else ""
|
||||
if not out:
|
||||
raise RclaudeError("rclaude resume returned no tmux session name")
|
||||
return out
|
||||
|
||||
def send(
|
||||
self,
|
||||
*,
|
||||
|
|
|
|||
137
src/claire/routing.py
Normal file
137
src/claire/routing.py
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
"""Deterministic host routing for location-transparent Claire.
|
||||
|
||||
When the user talks to ANY host's Claire, a turn about work that lives
|
||||
elsewhere should run on THAT host. The receiving Claire (an LLM) CLASSIFIES a
|
||||
turn into structured signals — an explicit host if named, the capability the
|
||||
work needs, the subject (session/task) it references — and this module turns
|
||||
those signals + fleet state into a host decision via a fixed priority cascade.
|
||||
|
||||
Split of responsibility: the fuzzy natural-language step (turn → signals) lives
|
||||
in the orchestrator prompt; the DECISION here is pure + deterministic so it's
|
||||
testable and auditable. The cross-host *execution* (forwarding the turn + proxying
|
||||
the reply) is a separate layer — this only answers "which host?".
|
||||
|
||||
Cascade (first match wins):
|
||||
1. explicit — the user named a host
|
||||
2. capability — work needs a host-specific resource (gpu/media/mount/svc/…)
|
||||
3. sticky — the subject already has live work on a host (keep a thread coherent on one node)
|
||||
4. default — run on the receiving node (most turns are host-agnostic)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .config import ClaireConfig
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RouteDecision:
|
||||
"""Where a turn should run + WHY (the reason/detail are surfaced for
|
||||
transparency — the user/operator can always see how routing decided)."""
|
||||
|
||||
host: str # canonical host label to run on
|
||||
reason: str # machine-readable: explicit | capability | sticky | default-local | unknown-host
|
||||
detail: str # human one-liner
|
||||
candidates: tuple[str, ...] = () # hosts considered (capability matches etc.)
|
||||
|
||||
|
||||
def _least_loaded(hosts: list[str], load: dict[str, int] | None) -> str:
|
||||
"""Pick the least-loaded host (stable: known order when no load info)."""
|
||||
if not load:
|
||||
return hosts[0]
|
||||
# min is stable on ties → preserves the input (known_hosts) order.
|
||||
return min(hosts, key=lambda h: load.get(h, 0))
|
||||
|
||||
|
||||
def _host_of_session(conn: sqlite3.Connection, uuid: str) -> str | None:
|
||||
row = conn.execute(
|
||||
"SELECT host FROM sessions WHERE uuid = ?", (str(uuid),)
|
||||
).fetchone()
|
||||
return row["host"] if row and row["host"] else None
|
||||
|
||||
|
||||
def _host_of_task(conn: sqlite3.Connection, task_id: str) -> str | None:
|
||||
"""Host of a task's current worker — newest active assignment → session."""
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT s.host
|
||||
FROM assignments a
|
||||
JOIN sessions s ON s.uuid = a.session_uuid
|
||||
WHERE a.task_id = ? AND a.active = 1
|
||||
ORDER BY a.created_hlc DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
(str(task_id),),
|
||||
).fetchone()
|
||||
return row["host"] if row and row["host"] else None
|
||||
|
||||
|
||||
def route(
|
||||
conn: sqlite3.Connection,
|
||||
cfg: ClaireConfig,
|
||||
*,
|
||||
receiving_host: str,
|
||||
explicit_host: str | None = None,
|
||||
capability_needs: list[str] | None = None,
|
||||
session_uuid: str | None = None,
|
||||
task_id: str | None = None,
|
||||
host_load: dict[str, int] | None = None,
|
||||
) -> RouteDecision:
|
||||
"""Resolve which host a classified turn should run on.
|
||||
|
||||
`receiving_host` is the node the user is talking to (the default). The other
|
||||
args are the classifier's output: `explicit_host` (named), `capability_needs`
|
||||
(tags the work requires — host must satisfy ALL), `session_uuid`/`task_id`
|
||||
(the subject, for stickiness). `host_load` (host → live-session count) is an
|
||||
optional tiebreaker among equally-capable hosts.
|
||||
"""
|
||||
recv = cfg.resolve_host_label(receiving_host)
|
||||
known = {h.name for h in cfg.known_hosts} | {recv}
|
||||
|
||||
# 1. Explicit — the user named a host.
|
||||
if explicit_host:
|
||||
h = cfg.resolve_host_label(explicit_host)
|
||||
if h in known:
|
||||
return RouteDecision(h, "explicit", f"user named host {h!r}", (h,))
|
||||
# Named something we don't know — don't silently send it nowhere.
|
||||
return RouteDecision(
|
||||
recv, "unknown-host",
|
||||
f"host {explicit_host!r} not in known_hosts — running local", (recv,),
|
||||
)
|
||||
|
||||
# 2. Capability — the work needs a host-specific resource. Host must satisfy
|
||||
# ALL declared needs (intersection). No match → fall through (best-effort).
|
||||
needs = [n for n in (capability_needs or []) if n]
|
||||
if needs:
|
||||
cand: set[str] | None = None
|
||||
for n in needs:
|
||||
hs = set(cfg.hosts_with_capability(n))
|
||||
cand = hs if cand is None else (cand & hs)
|
||||
candidates = sorted(cand or set())
|
||||
if candidates:
|
||||
pick = _least_loaded(candidates, host_load)
|
||||
return RouteDecision(
|
||||
pick, "capability",
|
||||
f"needs {'+'.join(needs)} → {pick}", tuple(candidates),
|
||||
)
|
||||
|
||||
# 3. Sticky — keep a thread where its subject's work already lives. Session
|
||||
# reference wins over task (more specific); both resolve to a host.
|
||||
sticky: str | None = None
|
||||
if session_uuid:
|
||||
sticky = _host_of_session(conn, session_uuid)
|
||||
if sticky is None and task_id:
|
||||
sticky = _host_of_task(conn, task_id)
|
||||
if sticky:
|
||||
sticky = cfg.resolve_host_label(sticky)
|
||||
return RouteDecision(
|
||||
sticky, "sticky",
|
||||
"continuing where the subject's work already runs", (sticky,),
|
||||
)
|
||||
|
||||
# 4. Default — no host-specific signal; the receiving node handles it.
|
||||
return RouteDecision(
|
||||
recv, "default-local", "no host-specific signal — running local", (recv,)
|
||||
)
|
||||
|
|
@ -170,23 +170,59 @@ def create_app(
|
|||
from ..config import load_or_init
|
||||
from ..db import migrate, open_db
|
||||
from ..hlc import HLCGenerator
|
||||
from .rounds import post_rounds_turn
|
||||
from .rounds import (
|
||||
fleet_fingerprint,
|
||||
post_rounds_turn,
|
||||
should_skip_round,
|
||||
)
|
||||
|
||||
cfg = load_or_init(config_path)
|
||||
interval = cfg.orchestrator.rounds_interval_s
|
||||
interval = load_or_init(config_path).orchestrator.rounds_interval_s
|
||||
if interval <= 0:
|
||||
return # disabled
|
||||
logger.info("orchestrator rounds loop enabled (every %ds)", interval)
|
||||
last_fp: str | None = None
|
||||
skipped = 0
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(interval)
|
||||
def _post() -> None:
|
||||
|
||||
def _tick(prev_fp: str | None, skips: int) -> tuple[bool, str]:
|
||||
# Reload cfg each tick so a newly-discovered orchestrator
|
||||
# session_uuid (excluded from the fingerprint) and any
|
||||
# gate retune take effect without a server restart.
|
||||
cfg = load_or_init(config_path)
|
||||
conn = open_db(db_path)
|
||||
migrate(conn)
|
||||
gen = HLCGenerator(cfg.machine_id)
|
||||
post_rounds_turn(conn, gen, cfg)
|
||||
conn.close()
|
||||
await asyncio.to_thread(_post)
|
||||
try:
|
||||
migrate(conn)
|
||||
fp = fleet_fingerprint(
|
||||
conn, cfg.orchestrator.session_uuid
|
||||
)
|
||||
# Skip (don't wake the model) only while the worker
|
||||
# fleet is unchanged AND under the staleness floor.
|
||||
if should_skip_round(
|
||||
prev_fp, fp, skips,
|
||||
enabled=cfg.orchestrator.rounds_skip_unchanged,
|
||||
heartbeat_every=cfg.orchestrator.rounds_heartbeat_every,
|
||||
):
|
||||
return (False, fp)
|
||||
gen = HLCGenerator(cfg.machine_id)
|
||||
post_rounds_turn(conn, gen, cfg)
|
||||
return (True, fp)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
posted, last_fp = await asyncio.to_thread(
|
||||
_tick, last_fp, skipped
|
||||
)
|
||||
if posted:
|
||||
skipped = 0
|
||||
else:
|
||||
skipped += 1
|
||||
logger.debug(
|
||||
"rounds tick skipped (fleet unchanged, %d/%d)",
|
||||
skipped,
|
||||
load_or_init(config_path).orchestrator.rounds_heartbeat_every,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
except Exception as exc: # noqa: BLE001
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ prompt lives here once rather than being duplicated.
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import sqlite3
|
||||
|
||||
from ..config import ClaireConfig
|
||||
|
|
@ -15,6 +16,76 @@ from ..domain import ChatScope
|
|||
from ..hlc import HLCGenerator
|
||||
|
||||
|
||||
def fleet_fingerprint(
|
||||
conn: sqlite3.Connection, orchestrator_uuid: str | None
|
||||
) -> str:
|
||||
"""A cheap, deterministic digest of the *worker* fleet state a round reacts to.
|
||||
|
||||
Used by the rounds timer to skip waking the (expensive) orchestrator model
|
||||
when nothing has changed since the last round. Covers exactly the inputs a
|
||||
round acts on:
|
||||
• live worker sessions — (uuid, triage_status, pushed state/summary/source,
|
||||
task_id) — i.e. the `list_fleet` rows;
|
||||
• open (non-`done`) task states — (id, status).
|
||||
|
||||
Deliberately EXCLUDES the orchestrator's own session and its agent_status,
|
||||
and the orchestrator chat scope entirely. A round mutates exactly those
|
||||
(it calls `report_status` for itself and writes its prompt+reply into the
|
||||
orchestrator scope every tick); folding them in would make the fingerprint
|
||||
change on every round and the skip gate would never engage. A user turn
|
||||
wakes the orchestrator on its own `[turn:…]` path, independent of this loop,
|
||||
so user chat need not be fingerprinted either.
|
||||
"""
|
||||
orch = orchestrator_uuid or ""
|
||||
session_rows = conn.execute(
|
||||
"""
|
||||
SELECT s.uuid, s.last_triage_status,
|
||||
ag.state, ag.summary, ag.source, ag.task_id
|
||||
FROM sessions s
|
||||
LEFT JOIN agent_status ag ON ag.session_uuid = s.uuid
|
||||
WHERE s.liveness = 'alive' AND s.uuid != ?
|
||||
ORDER BY s.uuid
|
||||
""",
|
||||
(orch,),
|
||||
).fetchall()
|
||||
task_rows = conn.execute(
|
||||
"""
|
||||
SELECT id, status FROM tasks
|
||||
WHERE status != 'done'
|
||||
ORDER BY id
|
||||
"""
|
||||
).fetchall()
|
||||
|
||||
h = hashlib.sha256()
|
||||
for r in session_rows:
|
||||
h.update(repr(tuple(r)).encode())
|
||||
h.update(b"\x00tasks\x00")
|
||||
for r in task_rows:
|
||||
h.update(repr(tuple(r)).encode())
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def should_skip_round(
|
||||
prev_fp: str | None,
|
||||
fp: str,
|
||||
consecutive_skips: int,
|
||||
*,
|
||||
enabled: bool,
|
||||
heartbeat_every: int,
|
||||
) -> bool:
|
||||
"""Decide whether the rounds timer should SKIP waking the orchestrator.
|
||||
|
||||
Skip only when the gate is enabled, the worker fleet fingerprint is
|
||||
unchanged since the last *posted* round, and we are still under the
|
||||
max-staleness floor (`heartbeat_every` consecutive skips force a round).
|
||||
"""
|
||||
if not enabled:
|
||||
return False
|
||||
if fp != prev_fp:
|
||||
return False
|
||||
return consecutive_skips + 1 < heartbeat_every
|
||||
|
||||
|
||||
def build_rounds_prompt(cfg: ClaireConfig) -> str:
|
||||
"""The structured DO-ROUNDS prompt posted to the orchestrator.
|
||||
|
||||
|
|
|
|||
|
|
@ -223,3 +223,48 @@ def test_serialize_round_trips_per_host_caps(tmp_path: Path) -> None:
|
|||
reloaded = load_or_init(cfg_path)
|
||||
assert reloaded.limits.per_host_max == 5
|
||||
assert reloaded.limits.per_host == {"apricot": 10}
|
||||
|
||||
|
||||
def test_host_capabilities_roundtrip_and_resolver() -> None:
|
||||
"""known_hosts capability tags serialize/reload and resolve by exact +
|
||||
key-prefix match, with alias resolution."""
|
||||
from claire.config import ClaireConfig, HostEntry, _serialize
|
||||
|
||||
cfg = ClaireConfig(
|
||||
machine_id="m",
|
||||
this_host="plum",
|
||||
known_hosts=[
|
||||
HostEntry(name="black", capabilities=["media", "transmission"]),
|
||||
HostEntry(name="apricot", capabilities=["cores:64", "gpu"]),
|
||||
HostEntry(name="plum", aliases=["local"]),
|
||||
],
|
||||
)
|
||||
|
||||
# Round-trip through the hand-rolled serializer (the field is easy to drop).
|
||||
rt = ClaireConfig.model_validate(tomllib.loads(_serialize(cfg)))
|
||||
black = next(h for h in rt.known_hosts if h.name == "black")
|
||||
assert black.capabilities == ["media", "transmission"]
|
||||
|
||||
# Exact-tag match.
|
||||
assert cfg.hosts_with_capability("media") == ["black"]
|
||||
assert cfg.hosts_with_capability("gpu") == ["apricot"]
|
||||
# key:value tag matched by key alone.
|
||||
assert cfg.hosts_with_capability("cores") == ["apricot"]
|
||||
# No host satisfies an unknown capability.
|
||||
assert cfg.hosts_with_capability("fpga") == []
|
||||
|
||||
# capabilities_for resolves aliases (local -> plum) and unknown -> [].
|
||||
assert cfg.capabilities_for("local") == []
|
||||
assert cfg.capabilities_for("apricot") == ["cores:64", "gpu"]
|
||||
assert cfg.capabilities_for("nope") == []
|
||||
|
||||
|
||||
def test_host_without_capabilities_emits_no_capabilities_line() -> None:
|
||||
"""A general-purpose host (no tags) stays minimal in the toml."""
|
||||
from claire.config import ClaireConfig, HostEntry, _serialize
|
||||
|
||||
cfg = ClaireConfig(
|
||||
machine_id="m", this_host="plum",
|
||||
known_hosts=[HostEntry(name="plum", aliases=["local"])],
|
||||
)
|
||||
assert "capabilities" not in _serialize(cfg)
|
||||
|
|
|
|||
|
|
@ -84,3 +84,89 @@ def test_rounds_tick_schedules_turn(tmp_path: Path, monkeypatch) -> None:
|
|||
assert r.json()["status"] == "scheduled"
|
||||
# TestClient runs background tasks before returning, so the turn posted.
|
||||
assert len(calls) == 1
|
||||
|
||||
|
||||
# --- rounds skip gate (fingerprint) ----------------------------------------
|
||||
|
||||
import uuid as _uuid # noqa: E402
|
||||
|
||||
from claire.db import migrate # noqa: E402
|
||||
from claire.hlc import HLCGenerator # noqa: E402
|
||||
from claire.orchestrator.tools import report_status # noqa: E402
|
||||
from claire.web.rounds import fleet_fingerprint, should_skip_round # noqa: E402
|
||||
|
||||
_MACHINE = "00000000-0000-0000-0000-0000000000aa"
|
||||
|
||||
|
||||
def _seed_db(tmp_path: Path):
|
||||
conn = open_db(tmp_path / "claire.db")
|
||||
migrate(conn)
|
||||
gen = HLCGenerator(_MACHINE)
|
||||
hlc = lambda: str(gen.tick()) # noqa: E731
|
||||
conn.execute(
|
||||
"INSERT INTO projects(id,name,created_hlc,updated_hlc) VALUES(?,?,?,?)",
|
||||
("p1", "proj", hlc(), hlc()),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO tasks(id,project_id,title,status,priority,created_hlc,"
|
||||
"updated_hlc) VALUES(?,?,?,?,?,?,?)",
|
||||
("t1", "p1", "work", "in_progress", 2, hlc(), hlc()),
|
||||
)
|
||||
worker = str(_uuid.uuid4())
|
||||
orch = str(_uuid.uuid4())
|
||||
for sid, host in ((worker, "apricot"), (orch, "local")):
|
||||
conn.execute(
|
||||
"INSERT INTO sessions(uuid,host,cwd,tmux_name,updated_hlc,liveness)"
|
||||
" VALUES(?,?,?,?,?,?)",
|
||||
(sid, host, f"/cwd/{host}", host, hlc(), "alive"),
|
||||
)
|
||||
report_status(conn, gen, session_uuid=worker, summary="worker booting")
|
||||
conn.commit()
|
||||
return conn, gen, worker, orch
|
||||
|
||||
|
||||
def test_fingerprint_ignores_orchestrators_own_round_side_effects(
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
"""A round mutates the orchestrator's OWN session/status every tick. If the
|
||||
fingerprint folded that in, the skip gate would never engage. It must not."""
|
||||
conn, gen, _worker, orch = _seed_db(tmp_path)
|
||||
fp1 = fleet_fingerprint(conn, orch)
|
||||
|
||||
# Simulate the round's own side effects: the orchestrator reports its own
|
||||
# status (as it does every single round, with a fresh summary each time).
|
||||
report_status(conn, gen, session_uuid=orch, summary="Rounds tick (1st)")
|
||||
report_status(conn, gen, session_uuid=orch, summary="Rounds tick (2nd)…")
|
||||
fp2 = fleet_fingerprint(conn, orch)
|
||||
|
||||
assert fp2 == fp1, "orchestrator self-status must not change the fingerprint"
|
||||
# And the gate would therefore skip the next tick (under the floor).
|
||||
assert should_skip_round(fp1, fp2, 0, enabled=True, heartbeat_every=6)
|
||||
|
||||
|
||||
def test_fingerprint_changes_on_real_worker_movement(tmp_path: Path) -> None:
|
||||
conn, gen, worker, orch = _seed_db(tmp_path)
|
||||
fp1 = fleet_fingerprint(conn, orch)
|
||||
|
||||
# A worker pushing new status is real fleet movement → fingerprint changes.
|
||||
report_status(conn, gen, session_uuid=worker, summary="now compiling")
|
||||
fp_worker = fleet_fingerprint(conn, orch)
|
||||
assert fp_worker != fp1
|
||||
assert not should_skip_round(fp1, fp_worker, 0, enabled=True, heartbeat_every=6)
|
||||
|
||||
# A task leaving the open set (→ done) is also real movement.
|
||||
conn.execute("UPDATE tasks SET status='done' WHERE id='t1'")
|
||||
conn.commit()
|
||||
assert fleet_fingerprint(conn, orch) != fp_worker
|
||||
|
||||
|
||||
def test_should_skip_round_floor_and_toggle() -> None:
|
||||
# Unchanged + enabled + under floor → skip.
|
||||
assert should_skip_round("a", "a", 0, enabled=True, heartbeat_every=3)
|
||||
assert should_skip_round("a", "a", 1, enabled=True, heartbeat_every=3)
|
||||
# Hitting the floor forces a round (no skip), so the HUD never goes silent.
|
||||
assert not should_skip_round("a", "a", 2, enabled=True, heartbeat_every=3)
|
||||
# Any change forces a round.
|
||||
assert not should_skip_round("a", "b", 0, enabled=True, heartbeat_every=3)
|
||||
# Gate off → always post.
|
||||
assert not should_skip_round("a", "a", 0, enabled=False, heartbeat_every=99)
|
||||
|
|
|
|||
143
tests/test_routing.py
Normal file
143
tests/test_routing.py
Normal file
|
|
@ -0,0 +1,143 @@
|
|||
"""Routing cascade: explicit > capability > sticky > default-local."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from uuid import UUID
|
||||
|
||||
from claire.config import ClaireConfig, HostEntry
|
||||
from claire.routing import route
|
||||
from claire.web import service
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cfg() -> ClaireConfig:
|
||||
return ClaireConfig(
|
||||
machine_id="m",
|
||||
this_host="plum",
|
||||
known_hosts=[
|
||||
HostEntry(name="plum", aliases=["local"]),
|
||||
HostEntry(name="apricot", capabilities=["cores:64", "gpu"]),
|
||||
HostEntry(name="black", capabilities=["media", "transmission"]),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def _add_session(conn, uuid: str, host: str) -> None:
|
||||
conn.execute(
|
||||
"INSERT INTO sessions (uuid, host, updated_hlc) VALUES (?, ?, ?)",
|
||||
(uuid, host, "1"),
|
||||
)
|
||||
|
||||
|
||||
def _task_with_worker(conn, gen, *, project: str, host: str, session_uuid: str):
|
||||
"""Create a project+task and an active assignment to a session on `host`.
|
||||
Returns the task id (str). Uses service so FK constraints are satisfied."""
|
||||
service.create_project(conn, gen, name=project)
|
||||
task = service.add_task(conn, gen, project=project, title="t")
|
||||
_add_session(conn, session_uuid, host)
|
||||
service.create_assignment(conn, gen, task_id=task.id, session_uuid=UUID(session_uuid))
|
||||
return str(task.id)
|
||||
|
||||
|
||||
# 1. explicit -----------------------------------------------------------------
|
||||
|
||||
def test_explicit_host_wins(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="plum", explicit_host="apricot")
|
||||
assert (d.host, d.reason) == ("apricot", "explicit")
|
||||
|
||||
|
||||
def test_explicit_alias_resolves(conn, cfg) -> None:
|
||||
# "local" → plum even when received on plum
|
||||
d = route(conn, cfg, receiving_host="plum", explicit_host="local")
|
||||
assert (d.host, d.reason) == ("plum", "explicit")
|
||||
|
||||
|
||||
def test_explicit_unknown_host_falls_back_local_not_silent(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="plum", explicit_host="mars")
|
||||
assert d.host == "plum"
|
||||
assert d.reason == "unknown-host"
|
||||
|
||||
|
||||
# 2. capability ---------------------------------------------------------------
|
||||
|
||||
def test_capability_single(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="plum", capability_needs=["media"])
|
||||
assert (d.host, d.reason) == ("black", "capability")
|
||||
|
||||
|
||||
def test_capability_key_prefix(conn, cfg) -> None:
|
||||
# asking "cores" matches "cores:64"
|
||||
d = route(conn, cfg, receiving_host="plum", capability_needs=["cores"])
|
||||
assert d.host == "apricot"
|
||||
|
||||
|
||||
def test_capability_intersection_of_needs(conn, cfg) -> None:
|
||||
# gpu AND cores → only apricot has both; media-only black excluded
|
||||
d = route(conn, cfg, receiving_host="plum", capability_needs=["gpu", "cores"])
|
||||
assert d.host == "apricot"
|
||||
|
||||
|
||||
def test_capability_no_match_falls_through_to_default(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="plum", capability_needs=["fpga"])
|
||||
assert (d.host, d.reason) == ("plum", "default-local")
|
||||
|
||||
|
||||
def test_capability_tiebreak_least_loaded(conn) -> None:
|
||||
cfg = ClaireConfig(
|
||||
machine_id="m", this_host="plum",
|
||||
known_hosts=[
|
||||
HostEntry(name="a", capabilities=["media"]),
|
||||
HostEntry(name="b", capabilities=["media"]),
|
||||
],
|
||||
)
|
||||
d = route(None, cfg, receiving_host="plum", capability_needs=["media"],
|
||||
host_load={"a": 5, "b": 1})
|
||||
assert d.host == "b"
|
||||
assert set(d.candidates) == {"a", "b"}
|
||||
|
||||
|
||||
# 3. sticky -------------------------------------------------------------------
|
||||
|
||||
def test_sticky_by_session(conn, cfg) -> None:
|
||||
_add_session(conn, "11111111-1111-1111-1111-111111111111", "apricot")
|
||||
d = route(conn, cfg, receiving_host="plum",
|
||||
session_uuid="11111111-1111-1111-1111-111111111111")
|
||||
assert (d.host, d.reason) == ("apricot", "sticky")
|
||||
|
||||
|
||||
def test_sticky_by_task_via_active_assignment(conn, gen, cfg) -> None:
|
||||
task_id = _task_with_worker(
|
||||
conn, gen, project="p", host="black",
|
||||
session_uuid="22222222-2222-2222-2222-222222222222",
|
||||
)
|
||||
d = route(conn, cfg, receiving_host="plum", task_id=task_id)
|
||||
assert (d.host, d.reason) == ("black", "sticky")
|
||||
|
||||
|
||||
def test_session_reference_beats_task(conn, gen, cfg) -> None:
|
||||
_add_session(conn, "33333333-3333-3333-3333-333333333333", "apricot")
|
||||
task_id = _task_with_worker(
|
||||
conn, gen, project="p", host="black",
|
||||
session_uuid="44444444-4444-4444-4444-444444444444",
|
||||
)
|
||||
d = route(conn, cfg, receiving_host="plum",
|
||||
session_uuid="33333333-3333-3333-3333-333333333333", task_id=task_id)
|
||||
assert d.host == "apricot" # session wins
|
||||
|
||||
|
||||
# 4. default ------------------------------------------------------------------
|
||||
|
||||
def test_default_local_when_no_signal(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="apricot")
|
||||
assert (d.host, d.reason) == ("apricot", "default-local")
|
||||
|
||||
|
||||
def test_precedence_explicit_over_everything(conn, cfg) -> None:
|
||||
# a sticky session on black, capability=media (black), but explicit apricot wins
|
||||
_add_session(conn, "55555555-5555-5555-5555-555555555555", "black")
|
||||
d = route(conn, cfg, receiving_host="plum", explicit_host="apricot",
|
||||
capability_needs=["media"],
|
||||
session_uuid="55555555-5555-5555-5555-555555555555")
|
||||
assert (d.host, d.reason) == ("apricot", "explicit")
|
||||
|
|
@ -4,14 +4,19 @@ from __future__ import annotations
|
|||
|
||||
from uuid import uuid4
|
||||
|
||||
from claire.agent.supervisor import detect_wedged_and_orphaned, should_auto_continue
|
||||
from claire.agent.supervisor import (
|
||||
_is_orchestrator_cwd,
|
||||
detect_wedged_and_orphaned,
|
||||
select_resume_candidates,
|
||||
should_auto_continue,
|
||||
)
|
||||
from claire.rclaude import SessionRow, TmuxRow
|
||||
|
||||
NOW = 1_000_000.0
|
||||
|
||||
|
||||
def _sess(uuid, *, host="local", age_s=0):
|
||||
return SessionRow(host=host, uuid=uuid, snippet="", cwd="/x", mtime_epoch=int(NOW) - age_s)
|
||||
def _sess(uuid, *, host="local", age_s=0, cwd="/x"):
|
||||
return SessionRow(host=host, uuid=uuid, snippet="", cwd=cwd, mtime_epoch=int(NOW) - age_s)
|
||||
|
||||
|
||||
def _tmux(resumed_uuid, *, host="local", name="claude-x-1"):
|
||||
|
|
@ -71,3 +76,80 @@ def test_no_resumed_uuid_means_no_wedge_classification():
|
|||
)
|
||||
assert wedged == [] # not classified wedged without correlation
|
||||
assert [s.uuid for s in orphaned] == [u] # no live pane matched → orphaned
|
||||
|
||||
|
||||
# --- auto-resume selection (pure) ------------------------------------------
|
||||
|
||||
_W = 86_400 # resume recency window used in these tests
|
||||
|
||||
|
||||
def _resume(sessions, tmux_rows, *, attempts=None, max_attempts=3, max_per_tick=3):
|
||||
return select_resume_candidates(
|
||||
sessions, tmux_rows,
|
||||
window_s=_W, now=NOW, attempts=attempts or {},
|
||||
max_attempts=max_attempts, max_per_tick=max_per_tick,
|
||||
)
|
||||
|
||||
|
||||
def test_is_orchestrator_cwd():
|
||||
assert _is_orchestrator_cwd("/var/home/lilith/.local/share/claire/orchestrator")
|
||||
assert _is_orchestrator_cwd("/home/x/.local/share/claire/orchestrator/") # trailing slash
|
||||
assert not _is_orchestrator_cwd("/home/x/Code/@projects/@lilith/lilith-platform.live")
|
||||
assert not _is_orchestrator_cwd(None)
|
||||
|
||||
|
||||
def test_auto_resume_recency_window():
|
||||
fresh, old = uuid4(), uuid4()
|
||||
sessions = [_sess(fresh, age_s=10, cwd="/a"), _sess(old, age_s=_W + 5, cwd="/b")]
|
||||
to_resume, _ = _resume(sessions, []) # no live panes
|
||||
keys = {str(s.uuid) for s in to_resume}
|
||||
assert str(fresh) in keys # recently alive → candidate
|
||||
assert str(old) not in keys # beyond window → ignored (graveyard)
|
||||
|
||||
|
||||
def test_auto_resume_supersession_guard():
|
||||
"""A dead session whose cwd already has a LIVE session must NOT be resumed."""
|
||||
dead, live = uuid4(), uuid4()
|
||||
sessions = [_sess(dead, age_s=30, cwd="/shared"), _sess(live, age_s=5, cwd="/shared")]
|
||||
tmux = [_tmux(str(live))] # `live` has a pane; `dead` does not
|
||||
to_resume, skipped = _resume(sessions, tmux)
|
||||
assert [str(s.uuid) for s in to_resume] == []
|
||||
assert any(str(s.uuid) == str(dead) and r == "superseded-by-live-session-in-cwd"
|
||||
for s, r in skipped)
|
||||
|
||||
|
||||
def test_auto_resume_excludes_orchestrator_workspace():
|
||||
orch = uuid4()
|
||||
sessions = [_sess(orch, age_s=20, cwd="/home/x/.local/share/claire/orchestrator")]
|
||||
to_resume, skipped = _resume(sessions, [])
|
||||
assert to_resume == []
|
||||
assert any(r == "orchestrator-workspace" for _, r in skipped)
|
||||
|
||||
|
||||
def test_auto_resume_per_session_retry_cap():
|
||||
capped = uuid4()
|
||||
sessions = [_sess(capped, age_s=15, cwd="/a")]
|
||||
to_resume, skipped = _resume(sessions, [], attempts={str(capped): 3}, max_attempts=3)
|
||||
assert to_resume == []
|
||||
assert any(r == "retry-cap-reached" for _, r in skipped)
|
||||
|
||||
|
||||
def test_auto_resume_per_tick_global_ceiling():
|
||||
ids = [uuid4() for _ in range(5)]
|
||||
sessions = [_sess(u, age_s=10, cwd=f"/cwd/{i}") for i, u in enumerate(ids)]
|
||||
to_resume, skipped = _resume(sessions, [], max_per_tick=2)
|
||||
assert len(to_resume) == 2 # ceiling enforced
|
||||
assert sum(1 for _, r in skipped if r == "per-tick-ceiling-deferred") == 3
|
||||
|
||||
|
||||
def test_auto_resume_ignores_remote_and_live():
|
||||
local_dead, remote_dead, local_live = uuid4(), uuid4(), uuid4()
|
||||
sessions = [
|
||||
_sess(local_dead, age_s=10, cwd="/a"),
|
||||
_sess(remote_dead, host="apricot", age_s=10, cwd="/b"),
|
||||
_sess(local_live, age_s=10, cwd="/c"),
|
||||
]
|
||||
tmux = [_tmux(str(local_live))]
|
||||
to_resume, _ = _resume(sessions, tmux)
|
||||
keys = {str(s.uuid) for s in to_resume}
|
||||
assert keys == {str(local_dead)} # not remote (not ours), not live
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue