diff --git a/src/claire/agent/supervisor.py b/src/claire/agent/supervisor.py index a5e8ec7..8e290e8 100644 --- a/src/claire/agent/supervisor.py +++ b/src/claire/agent/supervisor.py @@ -25,9 +25,107 @@ logger = logging.getLogger(__name__) _escalation_state: dict[str, int] = {} # session_uuid -> consecutive auto-continue nudges (reset when it recovers). _auto_continue_state: dict[str, int] = {} +# session_uuid -> consecutive auto-resume attempts (reset once seen live again). +_auto_resume_state: dict[str, int] = {} _LOCAL = "local" +# Sent to a session right after it is auto-resumed. The session must +# re-establish its OWN state before acting — it cannot assume it was mid-task +# (it may have already finished, or — though the supersession guard prevents +# this — been superseded). Mirrors the orchestrator's rehydrate-on-startup. +_REORIENT_KICK = ( + "[resumed] Your session was auto-resumed after its pane died (deploy, " + "crash, OOM, or host reboot — not anything you did wrong). Before doing " + "ANYTHING else, DETERMINE YOUR CURRENT STATE: re-read your assigned task " + "and your own recent transcript, work out what you already completed vs " + "what is still pending, and whether you are in fact already finished. THEN " + "either continue from exactly where you left off, or — if the work is done " + "— report completion via report_status and stop. Do NOT blindly redo work " + "you already completed." +) + + +def _is_orchestrator_cwd(cwd: str | None) -> bool: + """True if `cwd` is an orchestrator workspace (the `[] claire` panes). + + Those have their own bootstrap + age-recycle lifecycle; auto-resuming them + would race that machinery, so they're excluded. Matched by path suffix + (robust across hosts/accounts) rather than a session-name substring. + """ + if not cwd: + return False + return cwd.rstrip("/").endswith("claire/orchestrator") + + +def _stage_dispatch_mcp(host: str) -> str | None: + """Stage the worker MCP config so a resumed session regains the same + `claire:*` tools a freshly-dispatched one gets (report_status, etc.). + Returns the staged path, or None if staging fails — the session still + resumes, just degraded (invisible to the fleet view), never blocking it. + """ + try: + from ..orchestrator.bootstrap import stage_dispatch_mcp_config + + return stage_dispatch_mcp_config(host) + except Exception as exc: # noqa: BLE001 + logger.warning("auto-resume: dispatch MCP staging failed: %s", exc) + return None + + +def select_resume_candidates( + sessions, + tmux_rows, + *, + window_s: int, + now: float, + attempts: dict[str, int], + max_attempts: int, + max_per_tick: int, +): + """Pure selection of DEAD-but-recent LOCAL sessions to auto-resume. + + A candidate is a local session with no live tmux pane whose JSONL mtime is + within `window_s` (recently alive — not a graveyard entry). Guards, in + order, each producing a `(session, reason)` skip record for logging: + • orchestrator-workspace cwd → excluded (own lifecycle); + • supersession: a LIVE local session shares the dead one's cwd → excluded + (re-spawning would put two sessions in one workspace — the re-orient + prompt can't detect a sibling, so this MUST be a hard guard); + • per-session retry cap (`max_attempts`) → excluded; + • per-tick global ceiling (`max_per_tick`) → the overflow is deferred. + + Returns `(to_resume, skipped)`. `to_resume` is capped at `max_per_tick`. + """ + live_uuids = { + str(r.resumed_uuid) for r in tmux_rows if getattr(r, "resumed_uuid", None) + } + live_cwds = { + s.cwd for s in sessions if s.host == _LOCAL and str(s.uuid) in live_uuids + } + eligible = [] + skipped: list[tuple] = [] + for s in sessions: + if s.host != _LOCAL: + continue + if str(s.uuid) in live_uuids: + continue # alive — not a resume candidate + if now - s.mtime_epoch > window_s: + continue # too old to count as "recently alive" — silently ignore + if _is_orchestrator_cwd(s.cwd): + skipped.append((s, "orchestrator-workspace")) + continue + if s.cwd in live_cwds: + skipped.append((s, "superseded-by-live-session-in-cwd")) + continue + if attempts.get(str(s.uuid), 0) >= max_attempts: + skipped.append((s, "retry-cap-reached")) + continue + eligible.append(s) + for s in eligible[max_per_tick:]: + skipped.append((s, "per-tick-ceiling-deferred")) + return eligible[:max_per_tick], skipped + # Task states where a session is NOT just "idle with more to do" — it's # genuinely parked (awaiting review or a human, or finished). Auto-continue # must never nudge these, or it churns work that's deliberately paused. @@ -107,11 +205,13 @@ async def supervisor_loop(*, config_path: Path | None, db_path: Path | None) -> allow_respawn = cfg.agent.supervisor_allow_respawn ac_mode = cfg.agent.auto_continue # off | dry-run | on ac_max = cfg.agent.auto_continue_max + ar_mode = cfg.agent.auto_resume # off | dry-run | on poll = min(60, max(20, threshold // 3)) rcl = Rclaude() logger.info( - "agent supervisor loop enabled (poll %ds, wedge>%ds, respawn=%s, auto_continue=%s)", - poll, threshold, allow_respawn, ac_mode, + "agent supervisor loop enabled (poll %ds, wedge>%ds, respawn=%s, " + "auto_continue=%s, auto_resume=%s)", + poll, threshold, allow_respawn, ac_mode, ar_mode, ) while True: @@ -181,12 +281,65 @@ async def supervisor_loop(*, config_path: Path | None, db_path: Path | None) -> except RclaudeError as exc: logger.warning("supervisor respawn-kill failed: %s", exc) + # Auto-resume: resurrect DEAD-but-recent local sessions. Gated + + # capped; ships off → dry-run → on. The re-orient kick makes each + # resumed session re-determine its own state before acting. + live_uuids = { + str(r.resumed_uuid) + for r in tmux_rows + if getattr(r, "resumed_uuid", None) + } + if ar_mode != "off": + to_resume, skipped = select_resume_candidates( + sessions, tmux_rows, + window_s=cfg.agent.auto_resume_window_s, now=time.time(), + attempts=_auto_resume_state, + max_attempts=cfg.agent.auto_resume_max, + max_per_tick=cfg.agent.auto_resume_max_per_tick, + ) + for s, reason in skipped: + logger.info( + "supervisor: auto-resume SKIP %s (%s)", str(s.uuid)[:8], reason + ) + for s in to_resume: + key = str(s.uuid) + n = _auto_resume_state.get(key, 0) + 1 + _auto_resume_state[key] = n + if ar_mode == "dry-run": + logger.info( + "supervisor: WOULD auto-resume %s in %s (x%d/%d)", + key[:8], s.cwd, n, cfg.agent.auto_resume_max, + ) + continue + logger.warning( + "supervisor: auto-resuming %s in %s (x%d/%d)", + key[:8], s.cwd, n, cfg.agent.auto_resume_max, + ) + try: + mcp_cfg = _stage_dispatch_mcp(_LOCAL) + new_name = await asyncio.to_thread( + rcl.resume, + host=_LOCAL, cwd=s.cwd, resume_uuid=key, + mcp_config=mcp_cfg, name=None, + ) + await asyncio.sleep(8) # let tmux + claude reach the prompt + await asyncio.to_thread( + rcl.send, text=_REORIENT_KICK, match=new_name, yes=True + ) + except Exception as exc: # noqa: BLE001 + logger.warning("supervisor auto-resume failed for %s: %s", key[:8], exc) + # Reset counters for sessions that recovered (no longer wedged). wedged_keys = {str(s.uuid) for s in wedged} for d in (_escalation_state, _auto_continue_state): for key in list(d): if key not in wedged_keys: d.pop(key, None) + # Auto-resume counter resets once the session is live again (the + # resume stuck); a later death is then eligible to resume afresh. + for key in list(_auto_resume_state): + if key in live_uuids: + _auto_resume_state.pop(key, None) except asyncio.CancelledError: return except Exception as exc: # noqa: BLE001 diff --git a/src/claire/config.py b/src/claire/config.py index fa92f0b..1223122 100644 --- a/src/claire/config.py +++ b/src/claire/config.py @@ -82,6 +82,21 @@ class AgentConfig(_Strict): # capped at `auto_continue_max` consecutive nudges per session. auto_continue: Literal["off", "dry-run", "on"] = "off" auto_continue_max: int = Field(default=3, ge=1, le=20) + # Auto-resume DEAD local worker sessions (pane gone — crash/OOM/host reboot; + # the KillMode=process unit fix already stops deploy-kills). On a death the + # session JSONL persists and is resumable, so the supervisor can spawn + # `claude --resume ` and send a re-orient kick so the session + # re-determines its OWN state (what's done/pending/finished) before acting. + # "off" (default) | "dry-run" (LOG the would-resume set — validate first) | + # "on" (actually resume). Guarded: recency window, supersession (skip if a + # LIVE session shares the dead one's cwd), orchestrator-workspace exclusion + # (those have their own bootstrap/recycle lifecycle), per-session retry cap, + # and a per-tick global ceiling — the real guard against a first-wake token + # storm when many recent sessions are found dead at once. + auto_resume: Literal["off", "dry-run", "on"] = "off" + auto_resume_max: int = Field(default=3, ge=1, le=20) + auto_resume_max_per_tick: int = Field(default=3, ge=1, le=50) + auto_resume_window_s: int = Field(default=86400, ge=300, le=604800) # When True, this peer node ALSO bootstraps + registers a local orchestrator # session (`[] claire` in the remote dev list) alongside its sync/ # supervisor/telemetry loops — so every host is remote-controllable. The @@ -406,6 +421,10 @@ def _serialize(cfg: ClaireConfig) -> str: ) lines.append(f'auto_continue = "{ag.auto_continue}"') lines.append(f"auto_continue_max = {ag.auto_continue_max}") + lines.append(f'auto_resume = "{ag.auto_resume}"') + lines.append(f"auto_resume_max = {ag.auto_resume_max}") + lines.append(f"auto_resume_max_per_tick = {ag.auto_resume_max_per_tick}") + lines.append(f"auto_resume_window_s = {ag.auto_resume_window_s}") lines.append( f"orchestrator_enable = {str(ag.orchestrator_enable).lower()}" ) diff --git a/src/claire/rclaude.py b/src/claire/rclaude.py index 7a7e1ab..eaf191c 100644 --- a/src/claire/rclaude.py +++ b/src/claire/rclaude.py @@ -284,6 +284,39 @@ class Rclaude: raise RclaudeError("rclaude spawn returned no tmux session name") return name + def resume( + self, + *, + host: str, + cwd: str, + resume_uuid: str, + mcp_config: str | None = None, + name: str | None = None, + ) -> str: + """Resurrect a DEAD session by uuid: spawn `claude --resume ` in a + fresh detached tmux at `cwd` on `host`. + + Mirrors `spawn()` but injects `RCLAUDE_RESUME_ID` — which rclaude honors + to resume the given session instead of starting fresh (the inverse of + the leak guard `spawn()` documents) — and deliberately does NOT strip the + resume env vars. The new pane carries `claude --resume ` in its + command, so `list_tmux` reports `resumed_uuid == ` and the pull + loop re-discovers it. Returns the new tmux session name. + """ + env: dict[str, str] = { + "RCLAUDE_DETACHED": "1", + "RCLAUDE_RESUME_ID": resume_uuid, + } + if mcp_config is not None: + env["RCLAUDE_MCP_CONFIG"] = mcp_config + if name is not None: + env["RCLAUDE_SESSION_NAME"] = name + raw = self._run([host, cwd], env_extra=env) # NB: keep the resume vars + out = raw.strip().splitlines()[-1] if raw.strip() else "" + if not out: + raise RclaudeError("rclaude resume returned no tmux session name") + return out + def send( self, *, diff --git a/tests/test_supervisor.py b/tests/test_supervisor.py index f100638..43dacd2 100644 --- a/tests/test_supervisor.py +++ b/tests/test_supervisor.py @@ -4,14 +4,19 @@ from __future__ import annotations from uuid import uuid4 -from claire.agent.supervisor import detect_wedged_and_orphaned, should_auto_continue +from claire.agent.supervisor import ( + _is_orchestrator_cwd, + detect_wedged_and_orphaned, + select_resume_candidates, + should_auto_continue, +) from claire.rclaude import SessionRow, TmuxRow NOW = 1_000_000.0 -def _sess(uuid, *, host="local", age_s=0): - return SessionRow(host=host, uuid=uuid, snippet="", cwd="/x", mtime_epoch=int(NOW) - age_s) +def _sess(uuid, *, host="local", age_s=0, cwd="/x"): + return SessionRow(host=host, uuid=uuid, snippet="", cwd=cwd, mtime_epoch=int(NOW) - age_s) def _tmux(resumed_uuid, *, host="local", name="claude-x-1"): @@ -71,3 +76,80 @@ def test_no_resumed_uuid_means_no_wedge_classification(): ) assert wedged == [] # not classified wedged without correlation assert [s.uuid for s in orphaned] == [u] # no live pane matched → orphaned + + +# --- auto-resume selection (pure) ------------------------------------------ + +_W = 86_400 # resume recency window used in these tests + + +def _resume(sessions, tmux_rows, *, attempts=None, max_attempts=3, max_per_tick=3): + return select_resume_candidates( + sessions, tmux_rows, + window_s=_W, now=NOW, attempts=attempts or {}, + max_attempts=max_attempts, max_per_tick=max_per_tick, + ) + + +def test_is_orchestrator_cwd(): + assert _is_orchestrator_cwd("/var/home/lilith/.local/share/claire/orchestrator") + assert _is_orchestrator_cwd("/home/x/.local/share/claire/orchestrator/") # trailing slash + assert not _is_orchestrator_cwd("/home/x/Code/@projects/@lilith/lilith-platform.live") + assert not _is_orchestrator_cwd(None) + + +def test_auto_resume_recency_window(): + fresh, old = uuid4(), uuid4() + sessions = [_sess(fresh, age_s=10, cwd="/a"), _sess(old, age_s=_W + 5, cwd="/b")] + to_resume, _ = _resume(sessions, []) # no live panes + keys = {str(s.uuid) for s in to_resume} + assert str(fresh) in keys # recently alive → candidate + assert str(old) not in keys # beyond window → ignored (graveyard) + + +def test_auto_resume_supersession_guard(): + """A dead session whose cwd already has a LIVE session must NOT be resumed.""" + dead, live = uuid4(), uuid4() + sessions = [_sess(dead, age_s=30, cwd="/shared"), _sess(live, age_s=5, cwd="/shared")] + tmux = [_tmux(str(live))] # `live` has a pane; `dead` does not + to_resume, skipped = _resume(sessions, tmux) + assert [str(s.uuid) for s in to_resume] == [] + assert any(str(s.uuid) == str(dead) and r == "superseded-by-live-session-in-cwd" + for s, r in skipped) + + +def test_auto_resume_excludes_orchestrator_workspace(): + orch = uuid4() + sessions = [_sess(orch, age_s=20, cwd="/home/x/.local/share/claire/orchestrator")] + to_resume, skipped = _resume(sessions, []) + assert to_resume == [] + assert any(r == "orchestrator-workspace" for _, r in skipped) + + +def test_auto_resume_per_session_retry_cap(): + capped = uuid4() + sessions = [_sess(capped, age_s=15, cwd="/a")] + to_resume, skipped = _resume(sessions, [], attempts={str(capped): 3}, max_attempts=3) + assert to_resume == [] + assert any(r == "retry-cap-reached" for _, r in skipped) + + +def test_auto_resume_per_tick_global_ceiling(): + ids = [uuid4() for _ in range(5)] + sessions = [_sess(u, age_s=10, cwd=f"/cwd/{i}") for i, u in enumerate(ids)] + to_resume, skipped = _resume(sessions, [], max_per_tick=2) + assert len(to_resume) == 2 # ceiling enforced + assert sum(1 for _, r in skipped if r == "per-tick-ceiling-deferred") == 3 + + +def test_auto_resume_ignores_remote_and_live(): + local_dead, remote_dead, local_live = uuid4(), uuid4(), uuid4() + sessions = [ + _sess(local_dead, age_s=10, cwd="/a"), + _sess(remote_dead, host="apricot", age_s=10, cwd="/b"), + _sess(local_live, age_s=10, cwd="/c"), + ] + tmux = [_tmux(str(local_live))] + to_resume, _ = _resume(sessions, tmux) + keys = {str(s.uuid) for s in to_resume} + assert keys == {str(local_dead)} # not remote (not ours), not live