claire/tests/test_task_deps.py
Natalie c78ca31fa8 feat(@projects/@claire): add blocker status checks in scheduler
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-21 23:32:16 -07:00

389 lines
13 KiB
Python

"""Tests for set_task_description / set_task_blockers — event-sourced.
Both mutations go through the `TaskUpdated` event (`description` /
`blocked_by`) so a `replay()` reconstructs the new state; a direct
projection UPDATE would be lost. These tests assert the projection changed,
an event was logged, and the structured `blocked_by` round-trips through
the projection + read layer.
"""
from __future__ import annotations
import sqlite3
import uuid as _uuid
from dataclasses import dataclass
from uuid import UUID
import pytest
from claire import events as ev
from claire import read, scheduler
from claire.domain import TaskStatus
from claire.hlc import HLCGenerator
from claire.web import service
def _event_count(conn: sqlite3.Connection) -> int:
return conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
def _task_description(conn: sqlite3.Connection, task_id: str) -> str | None:
return conn.execute(
"SELECT description FROM tasks WHERE id = ?", (task_id,)
).fetchone()["description"]
# --- set_task_description --------------------------------------------------
def test_set_task_description(conn: sqlite3.Connection, gen: HLCGenerator) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
before = _event_count(conn)
updated = service.set_task_description(
conn, gen, task_ref=str(task.id), description="now with details"
)
assert updated.id == task.id
assert updated.description == "now with details"
assert _event_count(conn) == before + 1
assert _task_description(conn, str(task.id)) == "now with details"
def test_set_task_description_strips(conn: sqlite3.Connection, gen: HLCGenerator) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
updated = service.set_task_description(
conn, gen, task_ref=str(task.id), description=" trimmed "
)
assert updated.description == "trimmed"
def test_set_task_description_allows_empty(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(
conn, gen, project="alpha", title="ship it", description="old text"
)
updated = service.set_task_description(conn, gen, task_ref=str(task.id), description="")
assert updated.description == ""
def test_set_task_description_unknown_task(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
with pytest.raises(service.NotFound):
service.set_task_description(
conn, gen,
task_ref="00000000-0000-0000-0000-000000000000",
description="x",
)
# --- set_task_blockers -----------------------------------------------------
def test_set_task_blockers_by_id(conn: sqlite3.Connection, gen: HLCGenerator) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
b1 = service.add_task(conn, gen, project="alpha", title="blocker one")
b2 = service.add_task(conn, gen, project="alpha", title="blocker two")
before = _event_count(conn)
updated = service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(b1.id), str(b2.id)]
)
assert updated.id == task.id
assert set(updated.blocked_by) == {b1.id, b2.id}
assert _event_count(conn) == before + 1
# Round-trips through the read layer.
reread = read.get_task(conn, task.id)
assert reread is not None
assert set(reread.blocked_by) == {b1.id, b2.id}
def test_set_task_blockers_by_name_prefix(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
"""Blockers resolve via task uuid prefixes, like move_task's task_ref."""
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
blocker = service.add_task(conn, gen, project="alpha", title="prereq")
updated = service.set_task_blockers(
conn, gen, task_ref=str(task.id)[:8], blocker_refs=[str(blocker.id)[:8]]
)
assert updated.blocked_by == [blocker.id]
def test_set_task_blockers_clears_with_empty_list(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
blocker = service.add_task(conn, gen, project="alpha", title="prereq")
service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
)
cleared = service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[]
)
assert cleared.blocked_by == []
reread = read.get_task(conn, task.id)
assert reread is not None
assert reread.blocked_by == []
def test_set_task_blockers_rejects_self_block(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
with pytest.raises(service.InvalidInput):
service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(task.id)]
)
def test_set_task_blockers_rejects_unknown_blocker(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
with pytest.raises(service.NotFound):
service.set_task_blockers(
conn, gen,
task_ref=str(task.id),
blocker_refs=["00000000-0000-0000-0000-000000000000"],
)
def test_blocked_by_survives_replay(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
"""The structured dependency must be rebuilt from the event log."""
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
blocker = service.add_task(conn, gen, project="alpha", title="prereq")
service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
)
ev.replay(conn)
reread = read.get_task(conn, task.id)
assert reread is not None
assert reread.blocked_by == [blocker.id]
def test_list_tasks_returns_blocked_by(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
blocker = service.add_task(conn, gen, project="alpha", title="prereq")
service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
)
tasks = service.list_tasks(conn, project="alpha")
by_id = {t.id: t for t in tasks}
assert by_id[task.id].blocked_by == [blocker.id]
# A task with no blockers projects to an empty list.
assert by_id[blocker.id].blocked_by == []
# --- blocked-check helper --------------------------------------------------
def test_unfinished_blockers_excludes_done(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
b_open = service.add_task(conn, gen, project="alpha", title="open blocker")
b_done = service.add_task(conn, gen, project="alpha", title="done blocker")
service.transition_task_state(
conn, gen, task_id=b_done.id, to_state=TaskStatus.DONE
)
service.set_task_blockers(
conn, gen, task_ref=str(task.id),
blocker_refs=[str(b_open.id), str(b_done.id)],
)
fresh = read.get_task(conn, task.id)
assert fresh is not None
unfinished = scheduler.unfinished_blockers(conn, fresh)
# Only the still-open blocker counts.
assert [b.id for b in unfinished] == [b_open.id]
assert scheduler.is_blocked(conn, fresh) is True
def test_unfinished_blockers_nonexistent_does_not_block(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
"""A blocker id with no matching task is treated as satisfied."""
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
blocker = service.add_task(conn, gen, project="alpha", title="prereq")
service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
)
# Hard-delete the blocker row directly so the id is now stale. The
# service layer never deletes tasks, so this simulates a stale id.
conn.execute("DELETE FROM tasks WHERE id = ?", (str(blocker.id),))
fresh = read.get_task(conn, task.id)
assert fresh is not None
assert scheduler.unfinished_blockers(conn, fresh) == []
assert scheduler.is_blocked(conn, fresh) is False
# --- dispatch enforcement --------------------------------------------------
@dataclass
class _FakeSessionRow:
host: str
uuid: UUID
cwd: str
mtime_epoch: int
snippet: str = ""
class _FakeRclaude:
"""spawn() records the call and makes a new session discoverable."""
def __init__(self) -> None:
self._rows: list[_FakeSessionRow] = []
self.spawn_calls: list[dict] = []
self.send_calls: list[dict] = []
def list_sessions(self) -> list[_FakeSessionRow]:
return list(self._rows)
def list_tmux(self) -> list:
return []
def spawn(
self, *, host: str, cwd: str,
mcp_config: str | None = None, name: str | None = None,
) -> str:
self.spawn_calls.append(
{"host": host, "cwd": cwd, "mcp_config": mcp_config, "name": name}
)
self._rows.append(_FakeSessionRow(
host=host, uuid=_uuid.uuid4(), cwd=cwd, mtime_epoch=999,
))
return f"claude-tester-{len(self.spawn_calls)}"
def send(self, *, text: str, match: str, yes: bool = False, dry_run: bool = False): # noqa: ARG002
self.send_calls.append({"text": text, "match": match, "yes": yes})
return None
def _no_mcp_stager(host: str) -> str | None: # noqa: ARG001
return None
def test_dispatch_refuses_blocked_task(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
blocker = service.add_task(conn, gen, project="alpha", title="prereq work")
service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
)
rcl = _FakeRclaude()
result = service.dispatch_task(
conn, gen, task_id=task.id, host="plum", cwd="/work",
rclaude=rcl, discover_timeout_s=2, mcp_stager=_no_mcp_stager,
)
assert result.dispatched is False
assert "blocked by 1 unfinished task" in result.reason
assert "prereq work" in result.reason
assert rcl.spawn_calls == [] # never spawned
def test_dispatch_allows_task_once_blockers_done(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
blocker = service.add_task(conn, gen, project="alpha", title="prereq work")
service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
)
# Complete the blocker.
service.transition_task_state(
conn, gen, task_id=blocker.id, to_state=TaskStatus.DONE
)
rcl = _FakeRclaude()
result = service.dispatch_task(
conn, gen, task_id=task.id, host="plum", cwd="/work",
rclaude=rcl, discover_timeout_s=2, mcp_stager=_no_mcp_stager,
)
assert result.dispatched is True
assert result.reason == "ok"
assert len(rcl.spawn_calls) == 1
# --- scheduler enforcement -------------------------------------------------
def _alive_session(conn: sqlite3.Connection, gen: HLCGenerator, host: str) -> UUID:
sid = _uuid.uuid4()
ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host=host, cwd="/w"))
conn.execute("UPDATE sessions SET liveness = 'alive' WHERE uuid = ?", (str(sid),))
return sid
def test_suggest_assignments_excludes_blocked_task(
conn: sqlite3.Connection, gen: HLCGenerator
) -> None:
service.create_project(conn, gen, name="alpha")
task = service.add_task(conn, gen, project="alpha", title="ship it")
blocker = service.add_task(conn, gen, project="alpha", title="prereq work")
service.set_task_blockers(
conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)]
)
_alive_session(conn, gen, "plum")
# While the blocker is open, the blocked task must not be paired or
# surfaced as a remaining (workable) task. The blocker itself is open
# and unassigned, so it CAN be surfaced.
out = scheduler.suggest_assignments(conn)
surfaced = {p["task_id"] for p in out["pairings"]} | {
t["id"] for t in out["remaining_tasks"]
}
assert str(task.id) not in surfaced
assert str(blocker.id) in surfaced
# Once the blocker is done, the previously-blocked task becomes eligible.
service.transition_task_state(
conn, gen, task_id=blocker.id, to_state=TaskStatus.DONE
)
out2 = scheduler.suggest_assignments(conn)
surfaced2 = {p["task_id"] for p in out2["pairings"]} | {
t["id"] for t in out2["remaining_tasks"]
}
assert str(task.id) in surfaced2