claire/tests/test_pull_reap.py
Natalie c22cae1b74 feat(@projects/@claire): add session reaping logic
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-06-01 01:16:42 -06:00

118 lines
4.6 KiB
Python

"""Reap pass: a dead worker's assignment is deactivated so its task re-surfaces.
When a worker dies its tmux pane vanishes and the liveness pass marks the
session `closed`, but nothing ever flipped the assignment inactive — so the
task kept *looking* assigned and the scheduler never re-offered it. The reap
pass deactivates active assignments whose session is closed-and-stale, gated
on a grace window so a momentarily-misflagged live worker is never reaped.
"""
from __future__ import annotations
from datetime import datetime, timezone
from uuid import UUID
from claire.domain import TaskStatus
from claire.pull import pull
from claire.rclaude import SessionRow, TmuxRow
from claire.web import service
_CWD = "/var/home/lilith/Code/@projects/@claire"
_SLUG = "var-home-lilith-Code--projects--claire"
class _FakeRclaude:
def __init__(self, sessions: list[SessionRow], tmux: list[TmuxRow]):
self._sessions = sessions
self._tmux = tmux
def list_sessions(self) -> list[SessionRow]:
return list(self._sessions)
def list_tmux(self) -> list[TmuxRow]:
return list(self._tmux)
def triage(self) -> list:
return []
def _session(uuid_hex: str, mtime: int) -> SessionRow:
return SessionRow(
host="apricot", uuid=UUID(uuid_hex), snippet="x", cwd=_CWD, mtime_epoch=mtime,
)
def _active(conn, assignment_id: UUID) -> bool:
row = conn.execute(
"SELECT active FROM assignments WHERE id = ?", (str(assignment_id),)
).fetchone()
return bool(row["active"])
def _setup_in_progress_task(conn, gen) -> UUID:
service.create_project(conn, gen, name="proj")
task = service.add_task(conn, gen, project="proj", title="t", priority=1)
service.transition_task_state(
conn, gen, task_id=task.id, to_state=TaskStatus.IN_PROGRESS
)
return task.id
def test_reaps_only_closed_and_stale_assignment(conn, gen) -> None:
"""Three workers on one in-progress task: dead-stale reaped; closed-fresh
and alive both protected."""
now = int(datetime.now(timezone.utc).timestamp())
dead = "11111111-1111-1111-1111-111111111111" # old → closed + stale
fresh_closed = "22222222-2222-2222-2222-222222222222" # closed but within grace
alive = "33333333-3333-3333-3333-333333333333" # live pane → alive
task_id = _setup_in_progress_task(conn, gen)
a_dead = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
a_fresh = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(fresh_closed))
a_alive = service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(alive))
fake = _FakeRclaude(
sessions=[
_session(dead, 1_700_000_000), # years old
_session(fresh_closed, now - 300), # 5 min ago — inside the 30-min grace
_session(alive, now - 60), # freshest → claims the one live pane
],
# One live pane at the workspace → only the freshest session is alive.
tmux=[TmuxRow(host="apricot", session_name=f"claude-natalie-{_SLUG}-1779320000", detail="")],
)
stats = pull(conn, gen, rclaude=fake)
assert stats.assignments_reaped == 1
assert _active(conn, a_dead.id) is False # dead + stale → reaped
assert _active(conn, a_fresh.id) is True # closed but fresh → protected
assert _active(conn, a_alive.id) is True # alive → protected
def test_reap_is_idempotent(conn, gen) -> None:
"""A second pull over unchanged fleet reaps nothing — the dead assignment
is already inactive."""
dead = "44444444-4444-4444-4444-444444444444"
task_id = _setup_in_progress_task(conn, gen)
service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[])
first = pull(conn, gen, rclaude=fake)
second = pull(conn, gen, rclaude=fake)
assert first.assignments_reaped == 1
assert second.assignments_reaped == 0
def test_reaped_task_becomes_dispatchable_again(conn, gen) -> None:
"""After reaping, the in-progress task has no active assignment, which is
exactly what the scheduler treats as unassigned open work."""
dead = "55555555-5555-5555-5555-555555555555"
task_id = _setup_in_progress_task(conn, gen)
service.create_assignment(conn, gen, task_id=task_id, session_uuid=UUID(dead))
fake = _FakeRclaude(sessions=[_session(dead, 1_700_000_000)], tmux=[])
pull(conn, gen, rclaude=fake)
active_for_task = service.read.list_assignments(conn, task_id=task_id, active_only=True)
assert active_for_task == []