"""Reap pass: a dead worker's assignment is deactivated so its task re-surfaces. When a worker dies its tmux pane vanishes and the liveness pass marks the session `closed`, but nothing ever flipped the assignment inactive — so the task kept *looking* assigned and the scheduler never re-offered it. The reap pass deactivates active assignments whose session is closed-and-stale, gated on a grace window so a momentarily-misflagged live worker is never reaped. """ from __future__ import annotations from datetime import datetime, timezone from uuid import UUID from claire.domain import TaskStatus from claire.pull import pull from claire.rclaude import SessionRow, TmuxRow from claire.web import service _CWD = "/var/home/lilith/Code/@projects/@claire" _SLUG = "var-home-lilith-Code--projects--claire" class _FakeRclaude: def __init__(self, sessions: list[SessionRow], tmux: list[TmuxRow]): self._sessions = sessions self._tmux = tmux def list_sessions(self) -> list[SessionRow]: return list(self._sessions) def list_tmux(self) -> list[TmuxRow]: return list(self._tmux) def triage(self) -> list: return [] def _session(uuid_hex: str, mtime: int) -> SessionRow: return SessionRow( host="apricot", uuid=UUID(uuid_hex), snippet="x", cwd=_CWD, mtime_epoch=mtime, ) def _active(conn, assignment_id: UUID) -> bool: row = conn.execute( "SELECT active FROM assignments WHERE id = ?", (str(assignment_id),) ).fetchone() return bool(row["active"]) def _setup_in_progress_task(conn, gen) -> UUID: service.create_project(conn, gen, name="proj") task = service.add_task(conn, gen, project="proj", title="t", priority=1) service.transition_task_state( conn, gen, task_id=task.id, to_state=TaskStatus.IN_PROGRESS ) return task.id def test_reaps_only_closed_and_stale_assignment(conn, gen) -> None: """Three workers on one in-progress task: dead-stale reaped; closed-fresh and alive both protected.""" now = int(datetime.now(timezone.utc).timestamp()) dead = "11111111-1111-1111-1111-111111111111" # old → closed + stale fresh_closed = "22222222-2222-2222-2222-222222222222" # closed but within grace alive = "33333333-3333-3333-3333-333333333333" # live pane → alive task_id = _setup_in_progress_task(conn, gen) a_dead = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead)) a_fresh = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(fresh_closed)) a_alive = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(alive)) fake = _FakeRclaude( sessions=[ _session(dead, 1_700_000_000), # years old _session(fresh_closed, now - 300), # 5 min ago — inside the 30-min grace _session(alive, now - 60), # freshest → claims the one live pane ], # One live pane at the workspace → only the freshest session is alive. tmux=[TmuxRow(host="apricot", session_name=f"claude-natalie-{_SLUG}-1779320000", detail="")], ) stats = pull(conn, gen, rclaude=fake) assert stats.assignments_reaped == 1 assert _active(conn, a_dead.id) is False # dead + stale → reaped assert _active(conn, a_fresh.id) is True # closed but fresh → protected assert _active(conn, a_alive.id) is True # alive → protected def test_reap_is_idempotent(conn, gen) -> None: """A second pull over unchanged fleet reaps nothing — the dead assignment is already inactive.""" dead = "44444444-4444-4444-4444-444444444444" task_id = _setup_in_progress_task(conn, gen) service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead)) fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[]) first = pull(conn, gen, rclaude=fake) second = pull(conn, gen, rclaude=fake) assert first.assignments_reaped == 1 assert second.assignments_reaped == 0 def test_reaped_task_becomes_dispatchable_again(conn, gen) -> None: """After reaping, the in-progress task has no active assignment, which is exactly what the scheduler treats as unassigned open work.""" dead = "55555555-5555-5555-5555-555555555555" task_id = _setup_in_progress_task(conn, gen) service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead)) fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[]) pull(conn, gen, rclaude=fake) active_for_task = service.read.list_assignments(conn, task_id=task_id, active_only=True) assert active_for_task == []