"""Pure classification logic for the agent supervisor — no rclaude/process.""" from __future__ import annotations from uuid import uuid4 from claire.agent.supervisor import ( _is_orchestrator_cwd, detect_wedged_and_orphaned, select_resume_candidates, should_auto_continue, ) from claire.rclaude import SessionRow, TmuxRow NOW = 1_000_000.0 def _sess(uuid, *, host="local", age_s=0, cwd="/x"): return SessionRow(host=host, uuid=uuid, snippet="", cwd=cwd, mtime_epoch=int(NOW) - age_s) def _tmux(resumed_uuid, *, host="local", name="claude-x-1"): return TmuxRow(host=host, session_name=name, detail="1 windows", resumed_uuid=resumed_uuid) def test_wedged_when_live_pane_and_stale_mtime(): u = uuid4() wedged, orphaned = detect_wedged_and_orphaned( [_sess(u, age_s=400)], [_tmux(u)], wedge_threshold_s=300, now=NOW ) assert [s.uuid for s in wedged] == [u] assert orphaned == [] def test_not_wedged_when_fresh(): u = uuid4() wedged, orphaned = detect_wedged_and_orphaned( [_sess(u, age_s=10)], [_tmux(u)], wedge_threshold_s=300, now=NOW ) assert wedged == [] and orphaned == [] def test_orphaned_when_no_live_pane(): u = uuid4() wedged, orphaned = detect_wedged_and_orphaned( [_sess(u, age_s=9999)], [], wedge_threshold_s=300, now=NOW ) assert wedged == [] assert [s.uuid for s in orphaned] == [u] def test_remote_sessions_not_supervised(): u = uuid4() # host != "local" → another machine's session, skip it wedged, orphaned = detect_wedged_and_orphaned( [_sess(u, host="apricot", age_s=9999)], [], wedge_threshold_s=300, now=NOW ) assert wedged == [] and orphaned == [] def test_auto_continue_gate(): # Continuable when not parked and under the cap. assert should_auto_continue(None, 0, 3) is True assert should_auto_continue("in_progress", 2, 3) is True # Capped. assert should_auto_continue("in_progress", 3, 3) is False # Parked states never auto-continue. for parked in ("blocked", "user_review", "claire_review", "done"): assert should_auto_continue(parked, 0, 3) is False def test_no_resumed_uuid_means_no_wedge_classification(): # Older rclaude omits resumed_uuid → can't correlate → never act blind. u = uuid4() wedged, orphaned = detect_wedged_and_orphaned( [_sess(u, age_s=9999)], [_tmux(None)], wedge_threshold_s=300, now=NOW ) assert wedged == [] # not classified wedged without correlation assert [s.uuid for s in orphaned] == [u] # no live pane matched → orphaned # --- auto-resume selection (pure) ------------------------------------------ _W = 86_400 # resume recency window used in these tests def _resume(sessions, tmux_rows, *, attempts=None, max_attempts=3, max_per_tick=3): return select_resume_candidates( sessions, tmux_rows, window_s=_W, now=NOW, attempts=attempts or {}, max_attempts=max_attempts, max_per_tick=max_per_tick, ) def test_is_orchestrator_cwd(): assert _is_orchestrator_cwd("/var/home/lilith/.local/share/claire/orchestrator") assert _is_orchestrator_cwd("/home/x/.local/share/claire/orchestrator/") # trailing slash assert not _is_orchestrator_cwd("/home/x/Code/@projects/@lilith/lilith-platform.live") assert not _is_orchestrator_cwd(None) def test_auto_resume_recency_window(): fresh, old = uuid4(), uuid4() sessions = [_sess(fresh, age_s=10, cwd="/a"), _sess(old, age_s=_W + 5, cwd="/b")] to_resume, _ = _resume(sessions, []) # no live panes keys = {str(s.uuid) for s in to_resume} assert str(fresh) in keys # recently alive → candidate assert str(old) not in keys # beyond window → ignored (graveyard) def test_auto_resume_supersession_guard(): """A dead session whose cwd already has a LIVE session must NOT be resumed.""" dead, live = uuid4(), uuid4() sessions = [_sess(dead, age_s=30, cwd="/shared"), _sess(live, age_s=5, cwd="/shared")] tmux = [_tmux(str(live))] # `live` has a pane; `dead` does not to_resume, skipped = _resume(sessions, tmux) assert [str(s.uuid) for s in to_resume] == [] assert any(str(s.uuid) == str(dead) and r == "superseded-by-live-session-in-cwd" for s, r in skipped) def test_auto_resume_excludes_orchestrator_workspace(): orch = uuid4() sessions = [_sess(orch, age_s=20, cwd="/home/x/.local/share/claire/orchestrator")] to_resume, skipped = _resume(sessions, []) assert to_resume == [] assert any(r == "orchestrator-workspace" for _, r in skipped) def test_auto_resume_per_session_retry_cap(): capped = uuid4() sessions = [_sess(capped, age_s=15, cwd="/a")] to_resume, skipped = _resume(sessions, [], attempts={str(capped): 3}, max_attempts=3) assert to_resume == [] assert any(r == "retry-cap-reached" for _, r in skipped) def test_auto_resume_per_tick_global_ceiling(): ids = [uuid4() for _ in range(5)] sessions = [_sess(u, age_s=10, cwd=f"/cwd/{i}") for i, u in enumerate(ids)] to_resume, skipped = _resume(sessions, [], max_per_tick=2) assert len(to_resume) == 2 # ceiling enforced assert sum(1 for _, r in skipped if r == "per-tick-ceiling-deferred") == 3 def test_auto_resume_ignores_remote_and_live(): local_dead, remote_dead, local_live = uuid4(), uuid4(), uuid4() sessions = [ _sess(local_dead, age_s=10, cwd="/a"), _sess(remote_dead, host="apricot", age_s=10, cwd="/b"), _sess(local_live, age_s=10, cwd="/c"), ] tmux = [_tmux(str(local_live))] to_resume, _ = _resume(sessions, tmux) keys = {str(s.uuid) for s in to_resume} assert keys == {str(local_dead)} # not remote (not ours), not live