Compare commits

...

10 commits

Author SHA1 Message Date
Natalie
0319ea6bcc chore(@projects/@claire): 🔧 remove scheduled_tasks.lock file
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-06-10 02:19:17 -07:00
Natalie
aa2aa68d72 feat(@projects/@claire): wire location-transparent routing into per-turn workflow
Orchestrator CLAUDE.md now instructs Claire, per turn, to call resolve_host
with the signals it extracts (explicit_host / capability_needs /
session_uuid|task_id); when the decision is NOT this node, surface that the
work belongs on that host's Claire and hand it off. Decision layer of
location-transparent Claire (13764f2f) is now live in orchestrator behavior;
cross-host execution/proxy remains the follow-up. resolve_host added to the
Plan tools list.
(manual commit via ALLOW_COMMIT — autocommit LLM still down on claire)
2026-06-03 20:12:46 -07:00
Natalie
3d626a6d88 fix(@projects/@claire): deploy-agent auto-resolves a reachable ssh transport
deploy-agent used bare `ssh <host>` / `rsync <host>:` (→ <host>.lan), which
fails off-LAN or when the direct plum→host WG relay drops — blocking deploys
even though the host is reachable via the black jump host. Now it probes
<host> → <host>-wg → <host>-j and uses the first that answers for the ssh/rsync
legs (remote-run keeps its own routing), keeping the claire host LABEL as
<host>. Override with CLAIRE_SSH_ALIAS. Verified: apricot deployed via apricot-j
while .lan + -wg were timing out.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 02:08:03 -07:00
Natalie
042f5d6756 feat(@projects/@claire): resolve_host MCP tool — orchestrator routing decisions
Exposes routing.route() to the orchestrator as `resolve_host(explicit_host,
capability_needs, session_uuid, task_id)`. Claire (LLM) extracts the signals
from a turn and calls it; gets back {host, reason, detail, candidates} via the
deterministic cascade (explicit>capability>sticky>default-local), with
live-session counts feeding the capability tiebreak. The decision layer of
location-transparent Claire is now callable from the orchestrator.

Part of task 13764f2f. Smoke-verified: explicit→named, media→black (seeded
capability), no-signal→local. 371 tests green.
(manual commit via ALLOW_COMMIT — autocommit LLM still down on claire)
2026-06-03 01:55:31 -07:00
Natalie
16c030c6b3 feat(@projects/@claire): routing resolver for location-transparent Claire
route(signals, fleet) -> RouteDecision via a deterministic cascade:
explicit host > capability-pin (uses hosts_with_capability) > sticky
(subject's session/task already runs on a host, via sessions+assignments)
> default-local. Pure + auditable (reason+candidates surfaced); the LLM
classify step and cross-host execution are separate layers. 13 tests.

Part of task 13764f2f.
(manual commit via ALLOW_COMMIT — autocommit LLM still down on claire)
2026-06-03 01:41:27 -07:00
Natalie
24c6f24f43 feat(@projects/@claire): supervisor auto-resume of dead worker sessions
When a local worker pane dies (crash, OOM, host power-cycle), its JSONL persists
and is resumable. The agent supervisor now detects dead-but-recent local
sessions and `claude --resume <uuid>`s them, then sends a re-orient kick so the
session re-determines its OWN state (done vs pending vs finished) before acting
— mirrors the orchestrator's rehydrate-on-startup.

- rclaude.Rclaude.resume(): spawn `claude --resume <uuid>` via RCLAUDE_RESUME_ID
  (verified empirically against a real dead session on apricot).
- supervisor.select_resume_candidates(): pure, guarded selection — recency
  window, supersession (skip if a LIVE session shares the cwd), orchestrator-
  workspace exclusion, per-session retry cap, per-tick global ceiling (the
  first-wake token-storm guard). 7 unit tests.
- AgentConfig.auto_resume off|dry-run|on (default off) + max/per_tick/window.
  Ships off; roll out via dry-run, then on — same pattern as auto_continue.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 01:12:33 -07:00
Natalie
c5e63c5942 feat(@projects/@claire): host-capability registry for routing/dispatch
known_hosts gains a `capabilities` tag list (e.g. media, transmission,
cores:64, gpu) + ClaireConfig.hosts_with_capability(tag) (exact or key:
prefix match) and capabilities_for(host) (alias-resolved). Lets routing
(location-transparent Claire, task 13764f2f) and dispatch pick a host by
what it CAN do, not just load. Seeded black={media,transmission}.

Prereq task a5453fb8. 351 tests green.
(manual commit via ALLOW_COMMIT — autocommit LLM still timing out on claire)
2026-06-02 23:58:52 -07:00
Natalie
2d3777a7d0 fix(@projects/@claire): 🐛 claire-agent KillMode=process so redeploys don't kill workers
Default control-group KillMode meant restarting claire-agent.service (every
deploy) SIGTERMed the whole cgroup, silently killing the live worker claude/tmux
sessions the agent had spawned (next-tour-planner lost on 2026-06-02). KillMode=
process signals only `claire agent run`; panes survive and the fresh agent
re-discovers them via pull. Note: takes effect from the NEXT restart (a unit's
stop phase uses the KillMode it was started under).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 23:25:43 -07:00
Natalie
ce6948d6e9 feat(@projects/@claire): rounds skip-gate loop + fleet release tool
Wire the rounds timer to a pure-Python skip gate so claire-serve only wakes
the orchestrator model when worker fleet state changed (not every tick):
- web/rounds.py: fleet_fingerprint() over worker sessions (minus the
  orchestrator's own) + open tasks; should_skip_round() with heartbeat floor.
- web/app.py: _rounds_loop tracks last fingerprint + consecutive skips.
- excludes the orchestrator's own session/chat so a round's self-side-effects
  can't defeat the gate.
Add scripts/release-fleet.sh (test -> deploy apricot+black -> restart plum
services) and harden deploy-agent.sh's cosmetic status check against a SIGPIPE
false-abort. 3 new discriminating tests; 349 pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 22:58:41 -07:00
Natalie
8d82bb0abc feat(@projects/@claire): add rounds skip logic and heartbeat control
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-06-02 22:39:57 -07:00
18 changed files with 1082 additions and 31 deletions

View file

@ -1 +0,0 @@
{"sessionId":"c5a10f83-d897-4eb1-aa4f-a4264850dbf3","pid":15738,"procStart":"Mon Jun 1 06:29:14 2026","acquiredAt":1780437249785}

View file

@ -8,6 +8,13 @@ Wants=network-online.target
[Service]
Type=simple
# KillMode=process (NOT the default control-group): restarting this unit on a
# redeploy must NOT tear down the worker claude/tmux sessions the agent spawned
# into its cgroup. control-group SIGTERMs the whole tree, silently killing live
# dispatched workers on every deploy (the next-tour-planner was lost this way on
# 2026-06-02). With process, only `claire agent run` is signalled; the panes
# survive and the fresh agent re-discovers them via the pull/liveness pass.
KillMode=process
# rclaude (session-tools) lives in ~/.local/bin (+ pnpm global). systemd --user
# starts with a minimal PATH, so the supervisor couldn't find rclaude — add the
# user bin dirs. (Where rclaude is absent, the supervisor self-disables.)

View file

@ -32,31 +32,43 @@ PY
)"
say "plum peer URL = $PLUM_URL"
say "[$HOST] reachability + clock"
ssh -o ConnectTimeout=8 -o BatchMode=yes "$HOST" 'true' \
|| { echo "ERROR: cannot ssh $HOST" >&2; exit 1; }
ssh "$HOST" 'timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown'
# Resolve a reachable SSH transport. The host LABEL stays $HOST (claire
# identity / sessions.host / per_host config), but the plum↔host route flaps:
# `.lan` is unreachable off-site and the direct WG relay can drop, so fall back
# to the `-wg` (direct WireGuard) then `-j` (black jump-host) aliases defined in
# ~/.ssh/config. Only the bare ssh/rsync legs need this — `remote-run` does its
# own routing. Override with CLAIRE_SSH_ALIAS=<alias> to force one.
say "[$HOST] resolve ssh transport + clock"
SSH=""
for cand in ${CLAIRE_SSH_ALIAS:-"$HOST" "${HOST}-wg" "${HOST}-j"}; do
if ssh -o ConnectTimeout=8 -o BatchMode=yes "$cand" 'true' 2>/dev/null; then
SSH="$cand"; break
fi
done
[ -n "$SSH" ] || { echo "ERROR: no reachable ssh transport for $HOST (tried ${CLAIRE_SSH_ALIAS:-$HOST $HOST-wg $HOST-j})" >&2; exit 1; }
[ "$SSH" = "$HOST" ] || say "[$HOST] direct route down — using ssh transport '$SSH'"
ssh "$SSH" 'timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown'
say "[$HOST] rsync source"
ssh "$HOST" "mkdir -p ~/$REMOTE_DIR"
rsync -az --delete \
ssh "$SSH" "mkdir -p ~/$REMOTE_DIR"
rsync -az --delete -e ssh \
--exclude='.venv/' --exclude='.git/' --exclude='__pycache__/' \
--exclude='*.pyc' --exclude='.pytest_cache/' --exclude='.ruff_cache/' \
--exclude='claire.toml' \
--exclude='src/claire/web/app/node_modules/' \
--exclude='src/claire/web/app/dist/' \
"$SRC/" "${HOST}:${REMOTE_DIR}/"
"$SRC/" "${SSH}:${REMOTE_DIR}/"
say "[$HOST] install (uv if present, else python venv+pip) + init"
remote-run "$HOST" "export PATH=\"\$HOME/.local/bin:\$PATH\"; cd ~/$REMOTE_DIR && if command -v uv >/dev/null 2>&1; then { [ -d .venv ] || uv venv; }; uv pip install -e .; else { [ -d .venv ] || python3 -m venv .venv; }; .venv/bin/pip install -q -e .; fi && .venv/bin/claire init"
say "[$HOST] seed vault (BEFORE agent starts — it reads the HMAC secret from here)"
ssh "$HOST" 'mkdir -p ~/.vault && chmod 700 ~/.vault'
rsync -az --no-owner --no-group --chmod=D700,F600 \
ssh "$SSH" 'mkdir -p ~/.vault && chmod 700 ~/.vault'
rsync -az --no-owner --no-group --chmod=D700,F600 -e ssh \
--exclude='.vault-backups/' --exclude='*.prev.txt' \
"$HOME/.vault/" "${HOST}:.vault/"
"$HOME/.vault/" "${SSH}:.vault/"
# Gate: the agent will 401 forever without the shared secret present.
ssh "$HOST" '[ -s ~/.vault/claire-sync-secret.txt ]' \
ssh "$SSH" '[ -s ~/.vault/claire-sync-secret.txt ]' \
|| { echo "ERROR: ~/.vault/claire-sync-secret.txt missing on $HOST after seed" >&2; exit 1; }
say "[$HOST] configure peer (url only — secret is vault-sourced)"
@ -75,6 +87,10 @@ remote-run "$HOST" "
systemctl --user restart claire-agent.service
loginctl enable-linger \$(whoami) 2>/dev/null || true
sleep 2
systemctl --user --no-pager status claire-agent.service | head -5
# Real gate: is-active is non-zero iff the unit failed to come up. The status
# dump below is cosmetic — piping to head closes the pipe early (SIGPIPE), so
# keep it non-fatal or it false-aborts an otherwise-healthy deploy.
systemctl --user is-active claire-agent.service
systemctl --user --no-pager status claire-agent.service 2>&1 | head -5 || true
"
say "[$HOST] done."

91
scripts/release-fleet.sh Executable file
View file

@ -0,0 +1,91 @@
#!/usr/bin/env bash
#
# release-fleet.sh — one command to release the current claire working tree to
# the WHOLE fleet and restart every service. Runs FROM plum (the source of
# truth + the only host with the launchd services).
#
# scripts/release-fleet.sh # test → deploy apricot+black → restart plum
# scripts/release-fleet.sh --no-test # skip the pre-deploy pytest gate
# scripts/release-fleet.sh --no-plum # leave plum's services running (only push workers)
# scripts/release-fleet.sh --hosts apricot # restrict the worker host list
# scripts/release-fleet.sh --dry-run # print the plan, change nothing
#
# What it does, in order:
# 1. (gate) run the test suite — abort the whole release if it fails.
# 2. workers (apricot, black): scripts/deploy-agent.sh <host>
# → rsync working tree + `uv pip install -e .` + restart claire-agent.service.
# 3. plum: `launchctl kickstart -k` claire-serve + claire-tray
# → editable install, so a restart is all that's needed to load new code.
#
# ⚠ Restarting plum's `com.lilith.claire-serve` briefly drops the web / API /
# MCP endpoint (~a few seconds). Anything mid-call against claire's MCP tools
# — INCLUDING the orchestrator itself — will blip until it comes back. Run it
# when you can tolerate that, or pass --no-plum and restart plum by hand.
#
# Requires (same as deploy-agent.sh): `remote-run` on PATH, ssh to the worker
# hosts, uv/python on the remotes, NTP-synced clocks.
set -euo pipefail
SRC="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
HOSTS=(apricot black)
RUN_TESTS=1
RESTART_PLUM=1
DRY_RUN=0
PLUM_SERVICES=(com.lilith.claire-serve com.lilith.claire-tray)
say() { printf '\033[1;35m▸\033[0m %s\n' "$*"; }
warn() { printf '\033[1;33m⚠\033[0m %s\n' "$*" >&2; }
die() { printf '\033[1;31m✗\033[0m %s\n' "$*" >&2; exit 1; }
run() { if [ "$DRY_RUN" = 1 ]; then printf ' [dry-run] %s\n' "$*"; else eval "$@"; fi; }
while [ $# -gt 0 ]; do
case "$1" in
--no-test) RUN_TESTS=0; shift ;;
--no-plum) RESTART_PLUM=0; shift ;;
--dry-run) DRY_RUN=1; shift ;;
--hosts) shift; IFS=' ' read -r -a HOSTS <<< "${1:?--hosts needs a value}"; shift ;;
-h|--help) sed -n '2,30p' "$0"; exit 0 ;;
*) die "unknown arg: $1 (try --help)" ;;
esac
done
# --- 1. test gate ----------------------------------------------------------
if [ "$RUN_TESTS" = 1 ]; then
say "test gate: pytest (use --no-test to skip)"
# Run via the project venv (where pytest + dev deps live); fall back to uv's
# managed env. `uv run pytest` is deliberately NOT used — pytest is a dev-extra
# in .venv, not a uv tool, so that spawn fails on this repo.
if [ -x "$SRC/.venv/bin/python" ]; then
run "(cd '$SRC' && .venv/bin/python -m pytest -q)" || die "tests failed — release aborted"
elif command -v uv >/dev/null 2>&1; then
run "(cd '$SRC' && uv run python -m pytest -q)" || die "tests failed — release aborted"
else
die "no .venv/bin/python and no uv — cannot run the test gate (use --no-test to override)"
fi
else
warn "skipping test gate (--no-test)"
fi
# --- 2. worker hosts -------------------------------------------------------
for h in "${HOSTS[@]}"; do
say "deploy worker → $h"
run "'$SRC/scripts/deploy-agent.sh' '$h'" || die "deploy-agent.sh $h failed"
done
# --- 3. plum services ------------------------------------------------------
if [ "$RESTART_PLUM" = 1 ]; then
warn "restarting plum services ${PLUM_SERVICES[*]} — claire-serve restart blips the MCP/web endpoint"
uid="$(id -u)"
for svc in "${PLUM_SERVICES[@]}"; do
if launchctl print "gui/$uid/$svc" >/dev/null 2>&1; then
say "kickstart $svc"
run "launchctl kickstart -k 'gui/$uid/$svc'" || warn "kickstart $svc returned non-zero"
else
warn "$svc not loaded in gui/$uid — skipping (load its LaunchAgent first)"
fi
done
else
warn "leaving plum services running (--no-plum) — restart them by hand to load new code"
fi
if [ "$DRY_RUN" = 1 ]; then say "release plan printed (dry-run — nothing changed)."; else say "release complete."; fi

View file

@ -25,9 +25,107 @@ logger = logging.getLogger(__name__)
_escalation_state: dict[str, int] = {}
# session_uuid -> consecutive auto-continue nudges (reset when it recovers).
_auto_continue_state: dict[str, int] = {}
# session_uuid -> consecutive auto-resume attempts (reset once seen live again).
_auto_resume_state: dict[str, int] = {}
_LOCAL = "local"
# Sent to a session right after it is auto-resumed. The session must
# re-establish its OWN state before acting — it cannot assume it was mid-task
# (it may have already finished, or — though the supersession guard prevents
# this — been superseded). Mirrors the orchestrator's rehydrate-on-startup.
_REORIENT_KICK = (
"[resumed] Your session was auto-resumed after its pane died (deploy, "
"crash, OOM, or host reboot — not anything you did wrong). Before doing "
"ANYTHING else, DETERMINE YOUR CURRENT STATE: re-read your assigned task "
"and your own recent transcript, work out what you already completed vs "
"what is still pending, and whether you are in fact already finished. THEN "
"either continue from exactly where you left off, or — if the work is done "
"— report completion via report_status and stop. Do NOT blindly redo work "
"you already completed."
)
def _is_orchestrator_cwd(cwd: str | None) -> bool:
"""True if `cwd` is an orchestrator workspace (the `[<host>] claire` panes).
Those have their own bootstrap + age-recycle lifecycle; auto-resuming them
would race that machinery, so they're excluded. Matched by path suffix
(robust across hosts/accounts) rather than a session-name substring.
"""
if not cwd:
return False
return cwd.rstrip("/").endswith("claire/orchestrator")
def _stage_dispatch_mcp(host: str) -> str | None:
"""Stage the worker MCP config so a resumed session regains the same
`claire:*` tools a freshly-dispatched one gets (report_status, etc.).
Returns the staged path, or None if staging fails the session still
resumes, just degraded (invisible to the fleet view), never blocking it.
"""
try:
from ..orchestrator.bootstrap import stage_dispatch_mcp_config
return stage_dispatch_mcp_config(host)
except Exception as exc: # noqa: BLE001
logger.warning("auto-resume: dispatch MCP staging failed: %s", exc)
return None
def select_resume_candidates(
sessions,
tmux_rows,
*,
window_s: int,
now: float,
attempts: dict[str, int],
max_attempts: int,
max_per_tick: int,
):
"""Pure selection of DEAD-but-recent LOCAL sessions to auto-resume.
A candidate is a local session with no live tmux pane whose JSONL mtime is
within `window_s` (recently alive not a graveyard entry). Guards, in
order, each producing a `(session, reason)` skip record for logging:
orchestrator-workspace cwd excluded (own lifecycle);
supersession: a LIVE local session shares the dead one's cwd → excluded
(re-spawning would put two sessions in one workspace the re-orient
prompt can't detect a sibling, so this MUST be a hard guard);
per-session retry cap (`max_attempts`) excluded;
per-tick global ceiling (`max_per_tick`) the overflow is deferred.
Returns `(to_resume, skipped)`. `to_resume` is capped at `max_per_tick`.
"""
live_uuids = {
str(r.resumed_uuid) for r in tmux_rows if getattr(r, "resumed_uuid", None)
}
live_cwds = {
s.cwd for s in sessions if s.host == _LOCAL and str(s.uuid) in live_uuids
}
eligible = []
skipped: list[tuple] = []
for s in sessions:
if s.host != _LOCAL:
continue
if str(s.uuid) in live_uuids:
continue # alive — not a resume candidate
if now - s.mtime_epoch > window_s:
continue # too old to count as "recently alive" — silently ignore
if _is_orchestrator_cwd(s.cwd):
skipped.append((s, "orchestrator-workspace"))
continue
if s.cwd in live_cwds:
skipped.append((s, "superseded-by-live-session-in-cwd"))
continue
if attempts.get(str(s.uuid), 0) >= max_attempts:
skipped.append((s, "retry-cap-reached"))
continue
eligible.append(s)
for s in eligible[max_per_tick:]:
skipped.append((s, "per-tick-ceiling-deferred"))
return eligible[:max_per_tick], skipped
# Task states where a session is NOT just "idle with more to do" — it's
# genuinely parked (awaiting review or a human, or finished). Auto-continue
# must never nudge these, or it churns work that's deliberately paused.
@ -107,11 +205,13 @@ async def supervisor_loop(*, config_path: Path | None, db_path: Path | None) ->
allow_respawn = cfg.agent.supervisor_allow_respawn
ac_mode = cfg.agent.auto_continue # off | dry-run | on
ac_max = cfg.agent.auto_continue_max
ar_mode = cfg.agent.auto_resume # off | dry-run | on
poll = min(60, max(20, threshold // 3))
rcl = Rclaude()
logger.info(
"agent supervisor loop enabled (poll %ds, wedge>%ds, respawn=%s, auto_continue=%s)",
poll, threshold, allow_respawn, ac_mode,
"agent supervisor loop enabled (poll %ds, wedge>%ds, respawn=%s, "
"auto_continue=%s, auto_resume=%s)",
poll, threshold, allow_respawn, ac_mode, ar_mode,
)
while True:
@ -181,12 +281,65 @@ async def supervisor_loop(*, config_path: Path | None, db_path: Path | None) ->
except RclaudeError as exc:
logger.warning("supervisor respawn-kill failed: %s", exc)
# Auto-resume: resurrect DEAD-but-recent local sessions. Gated +
# capped; ships off → dry-run → on. The re-orient kick makes each
# resumed session re-determine its own state before acting.
live_uuids = {
str(r.resumed_uuid)
for r in tmux_rows
if getattr(r, "resumed_uuid", None)
}
if ar_mode != "off":
to_resume, skipped = select_resume_candidates(
sessions, tmux_rows,
window_s=cfg.agent.auto_resume_window_s, now=time.time(),
attempts=_auto_resume_state,
max_attempts=cfg.agent.auto_resume_max,
max_per_tick=cfg.agent.auto_resume_max_per_tick,
)
for s, reason in skipped:
logger.info(
"supervisor: auto-resume SKIP %s (%s)", str(s.uuid)[:8], reason
)
for s in to_resume:
key = str(s.uuid)
n = _auto_resume_state.get(key, 0) + 1
_auto_resume_state[key] = n
if ar_mode == "dry-run":
logger.info(
"supervisor: WOULD auto-resume %s in %s (x%d/%d)",
key[:8], s.cwd, n, cfg.agent.auto_resume_max,
)
continue
logger.warning(
"supervisor: auto-resuming %s in %s (x%d/%d)",
key[:8], s.cwd, n, cfg.agent.auto_resume_max,
)
try:
mcp_cfg = _stage_dispatch_mcp(_LOCAL)
new_name = await asyncio.to_thread(
rcl.resume,
host=_LOCAL, cwd=s.cwd, resume_uuid=key,
mcp_config=mcp_cfg, name=None,
)
await asyncio.sleep(8) # let tmux + claude reach the prompt
await asyncio.to_thread(
rcl.send, text=_REORIENT_KICK, match=new_name, yes=True
)
except Exception as exc: # noqa: BLE001
logger.warning("supervisor auto-resume failed for %s: %s", key[:8], exc)
# Reset counters for sessions that recovered (no longer wedged).
wedged_keys = {str(s.uuid) for s in wedged}
for d in (_escalation_state, _auto_continue_state):
for key in list(d):
if key not in wedged_keys:
d.pop(key, None)
# Auto-resume counter resets once the session is live again (the
# resume stuck); a later death is then eligible to resume afresh.
for key in list(_auto_resume_state):
if key in live_uuids:
_auto_resume_state.pop(key, None)
except asyncio.CancelledError:
return
except Exception as exc: # noqa: BLE001

View file

@ -44,6 +44,13 @@ class HostEntry(_Strict):
# views from synced data. Optional — legacy entries lack it; resolution
# falls back to label matching when absent.
machine_id: str | None = None
# Free-form capability tags describing what this host CAN do — used by
# routing (location-transparent Claire) and dispatch to pick a host by
# capability, not just load. Conventions: `gpu`, `media`, `transmission`,
# `cores:64`, `mount:<name>`, `svc:<name>`. Matching is by exact tag OR
# `key:` prefix (see `ClaireConfig.hosts_with_capability`). Empty = no
# declared specialties (host is general-purpose).
capabilities: list[str] = Field(default_factory=list)
class WebConfig(_Strict):
@ -75,6 +82,21 @@ class AgentConfig(_Strict):
# capped at `auto_continue_max` consecutive nudges per session.
auto_continue: Literal["off", "dry-run", "on"] = "off"
auto_continue_max: int = Field(default=3, ge=1, le=20)
# Auto-resume DEAD local worker sessions (pane gone — crash/OOM/host reboot;
# the KillMode=process unit fix already stops deploy-kills). On a death the
# session JSONL persists and is resumable, so the supervisor can spawn
# `claude --resume <uuid>` and send a re-orient kick so the session
# re-determines its OWN state (what's done/pending/finished) before acting.
# "off" (default) | "dry-run" (LOG the would-resume set — validate first) |
# "on" (actually resume). Guarded: recency window, supersession (skip if a
# LIVE session shares the dead one's cwd), orchestrator-workspace exclusion
# (those have their own bootstrap/recycle lifecycle), per-session retry cap,
# and a per-tick global ceiling — the real guard against a first-wake token
# storm when many recent sessions are found dead at once.
auto_resume: Literal["off", "dry-run", "on"] = "off"
auto_resume_max: int = Field(default=3, ge=1, le=20)
auto_resume_max_per_tick: int = Field(default=3, ge=1, le=50)
auto_resume_window_s: int = Field(default=86400, ge=300, le=604800)
# When True, this peer node ALSO bootstraps + registers a local orchestrator
# session (`[<host>] claire` in the remote dev list) alongside its sync/
# supervisor/telemetry loops — so every host is remote-controllable. The
@ -130,6 +152,19 @@ class OrchestratorConfig(_Strict):
# the fleet, identifies blockers, and surfaces next-step recommendations
# — closing the loop on "reduce required user interaction." 0 disables.
rounds_interval_s: int = Field(default=0, ge=0, le=86400)
# Cost gate for the rounds timer. When True (default), the loop computes a
# cheap pure-Python fingerprint of *worker* fleet state (live sessions minus
# the orchestrator's own, + open task states) each tick and SKIPS posting a
# DO-ROUNDS turn — i.e. never wakes the (expensive) orchestrator model —
# while nothing has changed. The orchestrator's own session/chat is excluded
# from the fingerprint so a round's own side effects can't keep it "changed"
# forever. Set False to post every tick unconditionally (legacy behavior).
rounds_skip_unchanged: bool = True
# Max-staleness floor for the skip gate: force a real round at least once
# every N ticks even when the fingerprint is unchanged, so the HUD/heartbeat
# never goes fully silent. With rounds_interval_s=900 (15m), 6 ⇒ a forced
# round at least every 90m. Ignored when rounds_skip_unchanged is False.
rounds_heartbeat_every: int = Field(default=6, ge=1, le=1000)
# When False (default), Claire's rounds loop only *proposes* dispatch —
# spawning a session is a real external action, so it waits for the
# user. Set True to let Claire dispatch eligible work autonomously
@ -278,6 +313,29 @@ class ClaireConfig(_Strict):
return h.name
return label
def capabilities_for(self, host: str) -> list[str]:
"""Capability tags declared for `host` (label resolved to canonical)."""
canon = self.resolve_host_label(host)
for h in self.known_hosts:
if h.name == canon:
return list(h.capabilities)
return []
def hosts_with_capability(self, tag: str) -> list[str]:
"""Canonical host names that satisfy capability `tag`.
Matches a host's `capabilities` either exactly (`"gpu" == "gpu"`) or by
key for `key:value` tags (asking `"cores"` matches `"cores:64"`), so a
router can query `hosts_with_capability("cores")` without knowing the
value. Returns names in `known_hosts` order (stable, caller picks).
"""
key = tag + ":"
out: list[str] = []
for h in self.known_hosts:
if any(c == tag or c.startswith(key) for c in h.capabilities):
out.append(h.name)
return out
def default_config_path() -> Path:
"""`~/.config/claire/claire.toml` — XDG-compliant."""
@ -363,6 +421,10 @@ def _serialize(cfg: ClaireConfig) -> str:
)
lines.append(f'auto_continue = "{ag.auto_continue}"')
lines.append(f"auto_continue_max = {ag.auto_continue_max}")
lines.append(f'auto_resume = "{ag.auto_resume}"')
lines.append(f"auto_resume_max = {ag.auto_resume_max}")
lines.append(f"auto_resume_max_per_tick = {ag.auto_resume_max_per_tick}")
lines.append(f"auto_resume_window_s = {ag.auto_resume_window_s}")
lines.append(
f"orchestrator_enable = {str(ag.orchestrator_enable).lower()}"
)
@ -387,6 +449,8 @@ def _serialize(cfg: ClaireConfig) -> str:
lines.append(f'description = "{h.description}"')
if h.machine_id is not None:
lines.append(f'machine_id = "{h.machine_id}"')
if h.capabilities:
lines.append(f"capabilities = {_fmt_str_list(list(h.capabilities))}")
for peer in cfg.peers:
lines.append("")
lines.append("[[peers]]")

View file

@ -436,6 +436,9 @@ def open_db(path: Path | str | None = None) -> sqlite3.Connection:
conn = sqlite3.connect(str(path), isolation_level=None, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
# Wait up to 5s for a held write lock instead of failing immediately with
# "database is locked" — peer-sync ingest and API writes contend otherwise.
conn.execute("PRAGMA busy_timeout = 5000")
# WAL doesn't apply to :memory: and isn't strictly needed; harmless to try.
if str(path) != ":memory:":
conn.execute("PRAGMA journal_mode = WAL")

View file

@ -73,7 +73,20 @@ Never redo finished work or silently drop a pending review.
## Per-turn workflow
1. Read the user's request (everything after `[turn:<id>]`).
2. Use the Claire MCP tools as needed:
2. **Route the turn (location transparency).** It should not matter which
`[<host>] claire` the user is talking to. When the request concerns work
that might live on another host, call `resolve_host(...)` with the signals
you extract from the turn:
- `explicit_host` the user named a host ("on apricot", "@black");
- `capability_needs` a host-specific resource the work needs (e.g.
`["media"]`, `["gpu"]`, `["mount:…"]`);
- `session_uuid` / `task_id` an existing session/task it is about.
It returns `{host, reason, candidates}`. If `host` is NOT this node, the
work belongs on that host's Claire — say so plainly in your reply (e.g.
"that runs on `[apricot] claire`") and surface the hand-off as the action.
If it returns this node, or there is no host-specific signal, handle it
here as normal. Skip routing for trivially-local requests (status, chat).
3. Use the Claire MCP tools as needed:
- **Read**: `list_recent_events`, `search_chat_messages`, `get_session`,
`list_fleet` (snapshot of every active agent's current task/state).
- **Act**: `create_project`, `add_task`, `create_assignment`,
@ -81,12 +94,12 @@ Never redo finished work or silently drop a pending review.
- **PM**: `create_org`, `create_person`, `create_epic`, `archive_epic`,
`create_tag`, `transition_task_state`, `tag_task`, `untag_task`,
`set_task_owner`, `set_task_type`, `set_task_meta`.
- **Plan**: `summarize_project`, `suggest_assignments`.
- **Plan**: `summarize_project`, `suggest_assignments`, `resolve_host`.
- **Reference**: `status`, `list_tasks`, `help`.
3. **Always call `report_status`** once per turn with your own
4. **Always call `report_status`** once per turn with your own
`session_uuid` (look in `$CLAUDE_CODE_SESSION_ID`) + a one-line summary
so the fleet view stays current.
4. When done, call `submit_chat_reply(body=<your reply>, turn_id=<id>)`.
5. When done, call `submit_chat_reply(body=<your reply>, turn_id=<id>)`.
This is REQUIRED without it, the user sees nothing.
Exactly one `submit_chat_reply` call per user turn.

View file

@ -541,6 +541,36 @@ def build_server() -> FastMCP:
lambda c, _g: tools.fleet_load(c),
)
@mcp.tool()
async def resolve_host(
explicit_host: str | None = None,
capability_needs: list[str] | None = None,
session_uuid: str | None = None,
task_id: str | None = None,
) -> dict[str, Any]:
"""Decide which host a turn or piece of work belongs on. Deterministic
cascade: explicit host named > capability required (gpu/media/mount/svc)
> sticky (the subject's session/task already runs on a host) >
default-local. Extract these signals from the user's turn and pass what
applies; act on the returned {host, reason}. For location-transparent
Claire: if `host` != this node, the work belongs on that host's Claire."""
return await _call_tool(
"resolve_host",
{
"explicit_host": explicit_host,
"capability_needs": capability_needs,
"session_uuid": session_uuid,
"task_id": task_id,
},
lambda c, _g: tools.resolve_host(
c,
explicit_host=explicit_host,
capability_needs=capability_needs,
session_uuid=session_uuid,
task_id=task_id,
),
)
@mcp.tool()
async def dispatch_task(task_id: str, host: str, cwd: str) -> dict[str, Any]:
"""Spawn a Claude session to work a task on `host` at `cwd`.

View file

@ -249,6 +249,7 @@ def help_text() -> dict[str, Any]:
{"name": "get_session", "summary": "Details for one session by uuid."},
{"name": "summarize_project", "summary": "Project-level summary (M5)."},
{"name": "suggest_assignments", "summary": "Suggest unassigned-task → session pairs (M5)."},
{"name": "resolve_host", "summary": "Decide which host a turn/work belongs on (explicit>capability>sticky>local)."},
{"name": "send_to_session", "summary": "Send text to one specific session."},
{"name": "submit_chat_reply", "summary": "Signal the user-turn is done."},
# PM expansion
@ -507,6 +508,47 @@ def fleet_load(conn: sqlite3.Connection) -> dict[str, Any]:
return service.fleet_load(conn)
def resolve_host(
conn: sqlite3.Connection,
*,
explicit_host: str | None = None,
capability_needs: list[str] | None = None,
session_uuid: str | None = None,
task_id: str | None = None,
) -> dict[str, Any]:
"""Decide which host a turn/work belongs on — deterministic cascade
(explicit > capability > sticky > default-local). The orchestrator (LLM)
extracts the signals from the user's turn and calls this; the returned
{host, reason, detail, candidates} is the routing decision to act on.
Live-session counts feed the capability tiebreak. For location-transparent
Claire: if `host` != this node, the work belongs on another host's Claire.
"""
from .. import routing
from ..config import load_or_init
cfg = load_or_init()
load = {
h["host"]: h["live_sessions"]
for h in service.fleet_load(conn).get("hosts", [])
}
d = routing.route(
conn,
cfg,
receiving_host=cfg.this_host_label(),
explicit_host=explicit_host,
capability_needs=capability_needs,
session_uuid=session_uuid,
task_id=task_id,
host_load=load,
)
return {
"host": d.host,
"reason": d.reason,
"detail": d.detail,
"candidates": list(d.candidates),
}
# ---------------------------------------------------------------------------
# PM expansion (migration 0003_pm). Each tool mirrors a service mutator.
# ---------------------------------------------------------------------------

View file

@ -284,6 +284,39 @@ class Rclaude:
raise RclaudeError("rclaude spawn returned no tmux session name")
return name
def resume(
self,
*,
host: str,
cwd: str,
resume_uuid: str,
mcp_config: str | None = None,
name: str | None = None,
) -> str:
"""Resurrect a DEAD session by uuid: spawn `claude --resume <uuid>` in a
fresh detached tmux at `cwd` on `host`.
Mirrors `spawn()` but injects `RCLAUDE_RESUME_ID` which rclaude honors
to resume the given session instead of starting fresh (the inverse of
the leak guard `spawn()` documents) and deliberately does NOT strip the
resume env vars. The new pane carries `claude --resume <uuid>` in its
command, so `list_tmux` reports `resumed_uuid == <uuid>` and the pull
loop re-discovers it. Returns the new tmux session name.
"""
env: dict[str, str] = {
"RCLAUDE_DETACHED": "1",
"RCLAUDE_RESUME_ID": resume_uuid,
}
if mcp_config is not None:
env["RCLAUDE_MCP_CONFIG"] = mcp_config
if name is not None:
env["RCLAUDE_SESSION_NAME"] = name
raw = self._run([host, cwd], env_extra=env) # NB: keep the resume vars
out = raw.strip().splitlines()[-1] if raw.strip() else ""
if not out:
raise RclaudeError("rclaude resume returned no tmux session name")
return out
def send(
self,
*,

137
src/claire/routing.py Normal file
View file

@ -0,0 +1,137 @@
"""Deterministic host routing for location-transparent Claire.
When the user talks to ANY host's Claire, a turn about work that lives
elsewhere should run on THAT host. The receiving Claire (an LLM) CLASSIFIES a
turn into structured signals an explicit host if named, the capability the
work needs, the subject (session/task) it references and this module turns
those signals + fleet state into a host decision via a fixed priority cascade.
Split of responsibility: the fuzzy natural-language step (turn signals) lives
in the orchestrator prompt; the DECISION here is pure + deterministic so it's
testable and auditable. The cross-host *execution* (forwarding the turn + proxying
the reply) is a separate layer this only answers "which host?".
Cascade (first match wins):
1. explicit the user named a host
2. capability work needs a host-specific resource (gpu/media/mount/svc/)
3. sticky the subject already has live work on a host (keep a thread coherent on one node)
4. default run on the receiving node (most turns are host-agnostic)
"""
from __future__ import annotations
import sqlite3
from dataclasses import dataclass
from .config import ClaireConfig
@dataclass(frozen=True)
class RouteDecision:
"""Where a turn should run + WHY (the reason/detail are surfaced for
transparency the user/operator can always see how routing decided)."""
host: str # canonical host label to run on
reason: str # machine-readable: explicit | capability | sticky | default-local | unknown-host
detail: str # human one-liner
candidates: tuple[str, ...] = () # hosts considered (capability matches etc.)
def _least_loaded(hosts: list[str], load: dict[str, int] | None) -> str:
"""Pick the least-loaded host (stable: known order when no load info)."""
if not load:
return hosts[0]
# min is stable on ties → preserves the input (known_hosts) order.
return min(hosts, key=lambda h: load.get(h, 0))
def _host_of_session(conn: sqlite3.Connection, uuid: str) -> str | None:
row = conn.execute(
"SELECT host FROM sessions WHERE uuid = ?", (str(uuid),)
).fetchone()
return row["host"] if row and row["host"] else None
def _host_of_task(conn: sqlite3.Connection, task_id: str) -> str | None:
"""Host of a task's current worker — newest active assignment → session."""
row = conn.execute(
"""
SELECT s.host
FROM assignments a
JOIN sessions s ON s.uuid = a.session_uuid
WHERE a.task_id = ? AND a.active = 1
ORDER BY a.created_hlc DESC
LIMIT 1
""",
(str(task_id),),
).fetchone()
return row["host"] if row and row["host"] else None
def route(
conn: sqlite3.Connection,
cfg: ClaireConfig,
*,
receiving_host: str,
explicit_host: str | None = None,
capability_needs: list[str] | None = None,
session_uuid: str | None = None,
task_id: str | None = None,
host_load: dict[str, int] | None = None,
) -> RouteDecision:
"""Resolve which host a classified turn should run on.
`receiving_host` is the node the user is talking to (the default). The other
args are the classifier's output: `explicit_host` (named), `capability_needs`
(tags the work requires host must satisfy ALL), `session_uuid`/`task_id`
(the subject, for stickiness). `host_load` (host live-session count) is an
optional tiebreaker among equally-capable hosts.
"""
recv = cfg.resolve_host_label(receiving_host)
known = {h.name for h in cfg.known_hosts} | {recv}
# 1. Explicit — the user named a host.
if explicit_host:
h = cfg.resolve_host_label(explicit_host)
if h in known:
return RouteDecision(h, "explicit", f"user named host {h!r}", (h,))
# Named something we don't know — don't silently send it nowhere.
return RouteDecision(
recv, "unknown-host",
f"host {explicit_host!r} not in known_hosts — running local", (recv,),
)
# 2. Capability — the work needs a host-specific resource. Host must satisfy
# ALL declared needs (intersection). No match → fall through (best-effort).
needs = [n for n in (capability_needs or []) if n]
if needs:
cand: set[str] | None = None
for n in needs:
hs = set(cfg.hosts_with_capability(n))
cand = hs if cand is None else (cand & hs)
candidates = sorted(cand or set())
if candidates:
pick = _least_loaded(candidates, host_load)
return RouteDecision(
pick, "capability",
f"needs {'+'.join(needs)}{pick}", tuple(candidates),
)
# 3. Sticky — keep a thread where its subject's work already lives. Session
# reference wins over task (more specific); both resolve to a host.
sticky: str | None = None
if session_uuid:
sticky = _host_of_session(conn, session_uuid)
if sticky is None and task_id:
sticky = _host_of_task(conn, task_id)
if sticky:
sticky = cfg.resolve_host_label(sticky)
return RouteDecision(
sticky, "sticky",
"continuing where the subject's work already runs", (sticky,),
)
# 4. Default — no host-specific signal; the receiving node handles it.
return RouteDecision(
recv, "default-local", "no host-specific signal — running local", (recv,)
)

View file

@ -170,23 +170,59 @@ def create_app(
from ..config import load_or_init
from ..db import migrate, open_db
from ..hlc import HLCGenerator
from .rounds import post_rounds_turn
from .rounds import (
fleet_fingerprint,
post_rounds_turn,
should_skip_round,
)
cfg = load_or_init(config_path)
interval = cfg.orchestrator.rounds_interval_s
interval = load_or_init(config_path).orchestrator.rounds_interval_s
if interval <= 0:
return # disabled
logger.info("orchestrator rounds loop enabled (every %ds)", interval)
last_fp: str | None = None
skipped = 0
while True:
try:
await asyncio.sleep(interval)
def _post() -> None:
def _tick(prev_fp: str | None, skips: int) -> tuple[bool, str]:
# Reload cfg each tick so a newly-discovered orchestrator
# session_uuid (excluded from the fingerprint) and any
# gate retune take effect without a server restart.
cfg = load_or_init(config_path)
conn = open_db(db_path)
migrate(conn)
gen = HLCGenerator(cfg.machine_id)
post_rounds_turn(conn, gen, cfg)
conn.close()
await asyncio.to_thread(_post)
try:
migrate(conn)
fp = fleet_fingerprint(
conn, cfg.orchestrator.session_uuid
)
# Skip (don't wake the model) only while the worker
# fleet is unchanged AND under the staleness floor.
if should_skip_round(
prev_fp, fp, skips,
enabled=cfg.orchestrator.rounds_skip_unchanged,
heartbeat_every=cfg.orchestrator.rounds_heartbeat_every,
):
return (False, fp)
gen = HLCGenerator(cfg.machine_id)
post_rounds_turn(conn, gen, cfg)
return (True, fp)
finally:
conn.close()
posted, last_fp = await asyncio.to_thread(
_tick, last_fp, skipped
)
if posted:
skipped = 0
else:
skipped += 1
logger.debug(
"rounds tick skipped (fleet unchanged, %d/%d)",
skipped,
load_or_init(config_path).orchestrator.rounds_heartbeat_every,
)
except asyncio.CancelledError:
return
except Exception as exc: # noqa: BLE001

View file

@ -8,6 +8,7 @@ prompt lives here once rather than being duplicated.
from __future__ import annotations
import hashlib
import sqlite3
from ..config import ClaireConfig
@ -15,6 +16,76 @@ from ..domain import ChatScope
from ..hlc import HLCGenerator
def fleet_fingerprint(
conn: sqlite3.Connection, orchestrator_uuid: str | None
) -> str:
"""A cheap, deterministic digest of the *worker* fleet state a round reacts to.
Used by the rounds timer to skip waking the (expensive) orchestrator model
when nothing has changed since the last round. Covers exactly the inputs a
round acts on:
live worker sessions (uuid, triage_status, pushed state/summary/source,
task_id) i.e. the `list_fleet` rows;
open (non-`done`) task states (id, status).
Deliberately EXCLUDES the orchestrator's own session and its agent_status,
and the orchestrator chat scope entirely. A round mutates exactly those
(it calls `report_status` for itself and writes its prompt+reply into the
orchestrator scope every tick); folding them in would make the fingerprint
change on every round and the skip gate would never engage. A user turn
wakes the orchestrator on its own `[turn:]` path, independent of this loop,
so user chat need not be fingerprinted either.
"""
orch = orchestrator_uuid or ""
session_rows = conn.execute(
"""
SELECT s.uuid, s.last_triage_status,
ag.state, ag.summary, ag.source, ag.task_id
FROM sessions s
LEFT JOIN agent_status ag ON ag.session_uuid = s.uuid
WHERE s.liveness = 'alive' AND s.uuid != ?
ORDER BY s.uuid
""",
(orch,),
).fetchall()
task_rows = conn.execute(
"""
SELECT id, status FROM tasks
WHERE status != 'done'
ORDER BY id
"""
).fetchall()
h = hashlib.sha256()
for r in session_rows:
h.update(repr(tuple(r)).encode())
h.update(b"\x00tasks\x00")
for r in task_rows:
h.update(repr(tuple(r)).encode())
return h.hexdigest()
def should_skip_round(
prev_fp: str | None,
fp: str,
consecutive_skips: int,
*,
enabled: bool,
heartbeat_every: int,
) -> bool:
"""Decide whether the rounds timer should SKIP waking the orchestrator.
Skip only when the gate is enabled, the worker fleet fingerprint is
unchanged since the last *posted* round, and we are still under the
max-staleness floor (`heartbeat_every` consecutive skips force a round).
"""
if not enabled:
return False
if fp != prev_fp:
return False
return consecutive_skips + 1 < heartbeat_every
def build_rounds_prompt(cfg: ClaireConfig) -> str:
"""The structured DO-ROUNDS prompt posted to the orchestrator.

View file

@ -223,3 +223,48 @@ def test_serialize_round_trips_per_host_caps(tmp_path: Path) -> None:
reloaded = load_or_init(cfg_path)
assert reloaded.limits.per_host_max == 5
assert reloaded.limits.per_host == {"apricot": 10}
def test_host_capabilities_roundtrip_and_resolver() -> None:
"""known_hosts capability tags serialize/reload and resolve by exact +
key-prefix match, with alias resolution."""
from claire.config import ClaireConfig, HostEntry, _serialize
cfg = ClaireConfig(
machine_id="m",
this_host="plum",
known_hosts=[
HostEntry(name="black", capabilities=["media", "transmission"]),
HostEntry(name="apricot", capabilities=["cores:64", "gpu"]),
HostEntry(name="plum", aliases=["local"]),
],
)
# Round-trip through the hand-rolled serializer (the field is easy to drop).
rt = ClaireConfig.model_validate(tomllib.loads(_serialize(cfg)))
black = next(h for h in rt.known_hosts if h.name == "black")
assert black.capabilities == ["media", "transmission"]
# Exact-tag match.
assert cfg.hosts_with_capability("media") == ["black"]
assert cfg.hosts_with_capability("gpu") == ["apricot"]
# key:value tag matched by key alone.
assert cfg.hosts_with_capability("cores") == ["apricot"]
# No host satisfies an unknown capability.
assert cfg.hosts_with_capability("fpga") == []
# capabilities_for resolves aliases (local -> plum) and unknown -> [].
assert cfg.capabilities_for("local") == []
assert cfg.capabilities_for("apricot") == ["cores:64", "gpu"]
assert cfg.capabilities_for("nope") == []
def test_host_without_capabilities_emits_no_capabilities_line() -> None:
"""A general-purpose host (no tags) stays minimal in the toml."""
from claire.config import ClaireConfig, HostEntry, _serialize
cfg = ClaireConfig(
machine_id="m", this_host="plum",
known_hosts=[HostEntry(name="plum", aliases=["local"])],
)
assert "capabilities" not in _serialize(cfg)

View file

@ -84,3 +84,89 @@ def test_rounds_tick_schedules_turn(tmp_path: Path, monkeypatch) -> None:
assert r.json()["status"] == "scheduled"
# TestClient runs background tasks before returning, so the turn posted.
assert len(calls) == 1
# --- rounds skip gate (fingerprint) ----------------------------------------
import uuid as _uuid # noqa: E402
from claire.db import migrate # noqa: E402
from claire.hlc import HLCGenerator # noqa: E402
from claire.orchestrator.tools import report_status # noqa: E402
from claire.web.rounds import fleet_fingerprint, should_skip_round # noqa: E402
_MACHINE = "00000000-0000-0000-0000-0000000000aa"
def _seed_db(tmp_path: Path):
conn = open_db(tmp_path / "claire.db")
migrate(conn)
gen = HLCGenerator(_MACHINE)
hlc = lambda: str(gen.tick()) # noqa: E731
conn.execute(
"INSERT INTO projects(id,name,created_hlc,updated_hlc) VALUES(?,?,?,?)",
("p1", "proj", hlc(), hlc()),
)
conn.execute(
"INSERT INTO tasks(id,project_id,title,status,priority,created_hlc,"
"updated_hlc) VALUES(?,?,?,?,?,?,?)",
("t1", "p1", "work", "in_progress", 2, hlc(), hlc()),
)
worker = str(_uuid.uuid4())
orch = str(_uuid.uuid4())
for sid, host in ((worker, "apricot"), (orch, "local")):
conn.execute(
"INSERT INTO sessions(uuid,host,cwd,tmux_name,updated_hlc,liveness)"
" VALUES(?,?,?,?,?,?)",
(sid, host, f"/cwd/{host}", host, hlc(), "alive"),
)
report_status(conn, gen, session_uuid=worker, summary="worker booting")
conn.commit()
return conn, gen, worker, orch
def test_fingerprint_ignores_orchestrators_own_round_side_effects(
tmp_path: Path,
) -> None:
"""A round mutates the orchestrator's OWN session/status every tick. If the
fingerprint folded that in, the skip gate would never engage. It must not."""
conn, gen, _worker, orch = _seed_db(tmp_path)
fp1 = fleet_fingerprint(conn, orch)
# Simulate the round's own side effects: the orchestrator reports its own
# status (as it does every single round, with a fresh summary each time).
report_status(conn, gen, session_uuid=orch, summary="Rounds tick (1st)")
report_status(conn, gen, session_uuid=orch, summary="Rounds tick (2nd)…")
fp2 = fleet_fingerprint(conn, orch)
assert fp2 == fp1, "orchestrator self-status must not change the fingerprint"
# And the gate would therefore skip the next tick (under the floor).
assert should_skip_round(fp1, fp2, 0, enabled=True, heartbeat_every=6)
def test_fingerprint_changes_on_real_worker_movement(tmp_path: Path) -> None:
conn, gen, worker, orch = _seed_db(tmp_path)
fp1 = fleet_fingerprint(conn, orch)
# A worker pushing new status is real fleet movement → fingerprint changes.
report_status(conn, gen, session_uuid=worker, summary="now compiling")
fp_worker = fleet_fingerprint(conn, orch)
assert fp_worker != fp1
assert not should_skip_round(fp1, fp_worker, 0, enabled=True, heartbeat_every=6)
# A task leaving the open set (→ done) is also real movement.
conn.execute("UPDATE tasks SET status='done' WHERE id='t1'")
conn.commit()
assert fleet_fingerprint(conn, orch) != fp_worker
def test_should_skip_round_floor_and_toggle() -> None:
# Unchanged + enabled + under floor → skip.
assert should_skip_round("a", "a", 0, enabled=True, heartbeat_every=3)
assert should_skip_round("a", "a", 1, enabled=True, heartbeat_every=3)
# Hitting the floor forces a round (no skip), so the HUD never goes silent.
assert not should_skip_round("a", "a", 2, enabled=True, heartbeat_every=3)
# Any change forces a round.
assert not should_skip_round("a", "b", 0, enabled=True, heartbeat_every=3)
# Gate off → always post.
assert not should_skip_round("a", "a", 0, enabled=False, heartbeat_every=99)

143
tests/test_routing.py Normal file
View file

@ -0,0 +1,143 @@
"""Routing cascade: explicit > capability > sticky > default-local."""
from __future__ import annotations
import pytest
from uuid import UUID
from claire.config import ClaireConfig, HostEntry
from claire.routing import route
from claire.web import service
@pytest.fixture
def cfg() -> ClaireConfig:
return ClaireConfig(
machine_id="m",
this_host="plum",
known_hosts=[
HostEntry(name="plum", aliases=["local"]),
HostEntry(name="apricot", capabilities=["cores:64", "gpu"]),
HostEntry(name="black", capabilities=["media", "transmission"]),
],
)
def _add_session(conn, uuid: str, host: str) -> None:
conn.execute(
"INSERT INTO sessions (uuid, host, updated_hlc) VALUES (?, ?, ?)",
(uuid, host, "1"),
)
def _task_with_worker(conn, gen, *, project: str, host: str, session_uuid: str):
"""Create a project+task and an active assignment to a session on `host`.
Returns the task id (str). Uses service so FK constraints are satisfied."""
service.create_project(conn, gen, name=project)
task = service.add_task(conn, gen, project=project, title="t")
_add_session(conn, session_uuid, host)
service.create_assignment(conn, gen, task_id=task.id, session_uuid=UUID(session_uuid))
return str(task.id)
# 1. explicit -----------------------------------------------------------------
def test_explicit_host_wins(conn, cfg) -> None:
d = route(conn, cfg, receiving_host="plum", explicit_host="apricot")
assert (d.host, d.reason) == ("apricot", "explicit")
def test_explicit_alias_resolves(conn, cfg) -> None:
# "local" → plum even when received on plum
d = route(conn, cfg, receiving_host="plum", explicit_host="local")
assert (d.host, d.reason) == ("plum", "explicit")
def test_explicit_unknown_host_falls_back_local_not_silent(conn, cfg) -> None:
d = route(conn, cfg, receiving_host="plum", explicit_host="mars")
assert d.host == "plum"
assert d.reason == "unknown-host"
# 2. capability ---------------------------------------------------------------
def test_capability_single(conn, cfg) -> None:
d = route(conn, cfg, receiving_host="plum", capability_needs=["media"])
assert (d.host, d.reason) == ("black", "capability")
def test_capability_key_prefix(conn, cfg) -> None:
# asking "cores" matches "cores:64"
d = route(conn, cfg, receiving_host="plum", capability_needs=["cores"])
assert d.host == "apricot"
def test_capability_intersection_of_needs(conn, cfg) -> None:
# gpu AND cores → only apricot has both; media-only black excluded
d = route(conn, cfg, receiving_host="plum", capability_needs=["gpu", "cores"])
assert d.host == "apricot"
def test_capability_no_match_falls_through_to_default(conn, cfg) -> None:
d = route(conn, cfg, receiving_host="plum", capability_needs=["fpga"])
assert (d.host, d.reason) == ("plum", "default-local")
def test_capability_tiebreak_least_loaded(conn) -> None:
cfg = ClaireConfig(
machine_id="m", this_host="plum",
known_hosts=[
HostEntry(name="a", capabilities=["media"]),
HostEntry(name="b", capabilities=["media"]),
],
)
d = route(None, cfg, receiving_host="plum", capability_needs=["media"],
host_load={"a": 5, "b": 1})
assert d.host == "b"
assert set(d.candidates) == {"a", "b"}
# 3. sticky -------------------------------------------------------------------
def test_sticky_by_session(conn, cfg) -> None:
_add_session(conn, "11111111-1111-1111-1111-111111111111", "apricot")
d = route(conn, cfg, receiving_host="plum",
session_uuid="11111111-1111-1111-1111-111111111111")
assert (d.host, d.reason) == ("apricot", "sticky")
def test_sticky_by_task_via_active_assignment(conn, gen, cfg) -> None:
task_id = _task_with_worker(
conn, gen, project="p", host="black",
session_uuid="22222222-2222-2222-2222-222222222222",
)
d = route(conn, cfg, receiving_host="plum", task_id=task_id)
assert (d.host, d.reason) == ("black", "sticky")
def test_session_reference_beats_task(conn, gen, cfg) -> None:
_add_session(conn, "33333333-3333-3333-3333-333333333333", "apricot")
task_id = _task_with_worker(
conn, gen, project="p", host="black",
session_uuid="44444444-4444-4444-4444-444444444444",
)
d = route(conn, cfg, receiving_host="plum",
session_uuid="33333333-3333-3333-3333-333333333333", task_id=task_id)
assert d.host == "apricot" # session wins
# 4. default ------------------------------------------------------------------
def test_default_local_when_no_signal(conn, cfg) -> None:
d = route(conn, cfg, receiving_host="apricot")
assert (d.host, d.reason) == ("apricot", "default-local")
def test_precedence_explicit_over_everything(conn, cfg) -> None:
# a sticky session on black, capability=media (black), but explicit apricot wins
_add_session(conn, "55555555-5555-5555-5555-555555555555", "black")
d = route(conn, cfg, receiving_host="plum", explicit_host="apricot",
capability_needs=["media"],
session_uuid="55555555-5555-5555-5555-555555555555")
assert (d.host, d.reason) == ("apricot", "explicit")

View file

@ -4,14 +4,19 @@ from __future__ import annotations
from uuid import uuid4
from claire.agent.supervisor import detect_wedged_and_orphaned, should_auto_continue
from claire.agent.supervisor import (
_is_orchestrator_cwd,
detect_wedged_and_orphaned,
select_resume_candidates,
should_auto_continue,
)
from claire.rclaude import SessionRow, TmuxRow
NOW = 1_000_000.0
def _sess(uuid, *, host="local", age_s=0):
return SessionRow(host=host, uuid=uuid, snippet="", cwd="/x", mtime_epoch=int(NOW) - age_s)
def _sess(uuid, *, host="local", age_s=0, cwd="/x"):
return SessionRow(host=host, uuid=uuid, snippet="", cwd=cwd, mtime_epoch=int(NOW) - age_s)
def _tmux(resumed_uuid, *, host="local", name="claude-x-1"):
@ -71,3 +76,80 @@ def test_no_resumed_uuid_means_no_wedge_classification():
)
assert wedged == [] # not classified wedged without correlation
assert [s.uuid for s in orphaned] == [u] # no live pane matched → orphaned
# --- auto-resume selection (pure) ------------------------------------------
_W = 86_400 # resume recency window used in these tests
def _resume(sessions, tmux_rows, *, attempts=None, max_attempts=3, max_per_tick=3):
return select_resume_candidates(
sessions, tmux_rows,
window_s=_W, now=NOW, attempts=attempts or {},
max_attempts=max_attempts, max_per_tick=max_per_tick,
)
def test_is_orchestrator_cwd():
assert _is_orchestrator_cwd("/var/home/lilith/.local/share/claire/orchestrator")
assert _is_orchestrator_cwd("/home/x/.local/share/claire/orchestrator/") # trailing slash
assert not _is_orchestrator_cwd("/home/x/Code/@projects/@lilith/lilith-platform.live")
assert not _is_orchestrator_cwd(None)
def test_auto_resume_recency_window():
fresh, old = uuid4(), uuid4()
sessions = [_sess(fresh, age_s=10, cwd="/a"), _sess(old, age_s=_W + 5, cwd="/b")]
to_resume, _ = _resume(sessions, []) # no live panes
keys = {str(s.uuid) for s in to_resume}
assert str(fresh) in keys # recently alive → candidate
assert str(old) not in keys # beyond window → ignored (graveyard)
def test_auto_resume_supersession_guard():
"""A dead session whose cwd already has a LIVE session must NOT be resumed."""
dead, live = uuid4(), uuid4()
sessions = [_sess(dead, age_s=30, cwd="/shared"), _sess(live, age_s=5, cwd="/shared")]
tmux = [_tmux(str(live))] # `live` has a pane; `dead` does not
to_resume, skipped = _resume(sessions, tmux)
assert [str(s.uuid) for s in to_resume] == []
assert any(str(s.uuid) == str(dead) and r == "superseded-by-live-session-in-cwd"
for s, r in skipped)
def test_auto_resume_excludes_orchestrator_workspace():
orch = uuid4()
sessions = [_sess(orch, age_s=20, cwd="/home/x/.local/share/claire/orchestrator")]
to_resume, skipped = _resume(sessions, [])
assert to_resume == []
assert any(r == "orchestrator-workspace" for _, r in skipped)
def test_auto_resume_per_session_retry_cap():
capped = uuid4()
sessions = [_sess(capped, age_s=15, cwd="/a")]
to_resume, skipped = _resume(sessions, [], attempts={str(capped): 3}, max_attempts=3)
assert to_resume == []
assert any(r == "retry-cap-reached" for _, r in skipped)
def test_auto_resume_per_tick_global_ceiling():
ids = [uuid4() for _ in range(5)]
sessions = [_sess(u, age_s=10, cwd=f"/cwd/{i}") for i, u in enumerate(ids)]
to_resume, skipped = _resume(sessions, [], max_per_tick=2)
assert len(to_resume) == 2 # ceiling enforced
assert sum(1 for _, r in skipped if r == "per-tick-ceiling-deferred") == 3
def test_auto_resume_ignores_remote_and_live():
local_dead, remote_dead, local_live = uuid4(), uuid4(), uuid4()
sessions = [
_sess(local_dead, age_s=10, cwd="/a"),
_sess(remote_dead, host="apricot", age_s=10, cwd="/b"),
_sess(local_live, age_s=10, cwd="/c"),
]
tmux = [_tmux(str(local_live))]
to_resume, _ = _resume(sessions, tmux)
keys = {str(s.uuid) for s in to_resume}
assert keys == {str(local_dead)} # not remote (not ours), not live