diff --git a/src/claire/scheduler.py b/src/claire/scheduler.py index 8bad17c..f745bb1 100644 --- a/src/claire/scheduler.py +++ b/src/claire/scheduler.py @@ -9,6 +9,7 @@ ranked ordering. This makes the logic trivially unit-testable. from __future__ import annotations +import logging import sqlite3 from collections.abc import Iterable from datetime import datetime, timedelta, timezone @@ -19,6 +20,37 @@ from .domain import ProjectStatus, Session, Task, TaskStatus from .hlc import HLC +_log = logging.getLogger(__name__) + + +def unfinished_blockers(conn: sqlite3.Connection, task: Task) -> list[Task]: + """Return the tasks in `task.blocked_by` that are NOT yet `done`. + + A task is "effectively blocked" iff this list is non-empty. A blocker + id that resolves to no existing task is treated as satisfied (it does + NOT block) so a stale/deleted id can't permanently wedge a task — such + ids are logged at WARNING level. + """ + unfinished: list[Task] = [] + for blocker_id in task.blocked_by: + blocker = read.get_task(conn, blocker_id) + if blocker is None: + _log.warning( + "task %s lists blocker %s which no longer exists — " + "treating as satisfied", + task.id, blocker_id, + ) + continue + if blocker.status != TaskStatus.DONE: + unfinished.append(blocker) + return unfinished + + +def is_blocked(conn: sqlite3.Connection, task: Task) -> bool: + """True iff `task` has at least one unfinished blocker.""" + return bool(unfinished_blockers(conn, task)) + + def rank_open_tasks(tasks: Iterable[Task]) -> list[Task]: """Order tasks by (priority asc, created_hlc asc). @@ -285,9 +317,12 @@ def suggest_assignments( assigned_task_ids = {a.task_id for a in active} assigned_session_ids = {a.session_uuid for a in active} - unassigned_tasks = rank_open_tasks( - [t for t in open_tasks if t.id not in assigned_task_ids] - ) + # Exclude effectively-blocked tasks: a task with an unfinished blocker + # is not workable, so Clare must not propose it for pairing. + unassigned_tasks = rank_open_tasks([ + t for t in open_tasks + if t.id not in assigned_task_ids and not is_blocked(conn, t) + ]) free_sessions = sorted( [s for s in read.list_sessions(conn) if s.uuid not in assigned_session_ids], key=lambda s: (-session_attention_score(s), s.host, str(s.uuid)), diff --git a/src/claire/web/service.py b/src/claire/web/service.py index ca0521f..6760682 100644 --- a/src/claire/web/service.py +++ b/src/claire/web/service.py @@ -1326,6 +1326,20 @@ def dispatch_task( if task.status in {TaskStatus.DONE}: return DispatchResult(False, str(task_id), "task already done") + from .. import scheduler as _sched + + # Dependency gate — checked FIRST: being blocked is intrinsic to the + # task, unlike the transient budget/host gates below, so it's the + # truest reason to surface. Refuse a task whose `blocked_by` list + # still has an unfinished blocker — its prerequisites aren't met. + blockers = _sched.unfinished_blockers(conn, task) + if blockers: + titles = ", ".join(b.title for b in blockers) + return DispatchResult( + False, str(task_id), + f"blocked by {len(blockers)} unfinished task(s): {titles}", + ) + # Budget gate. bs = budget_status(conn) if bs["over_cap"]: @@ -1341,7 +1355,6 @@ def dispatch_task( # Host-load gate. cfg = load_or_init() - from .. import scheduler as _sched if not _sched.host_has_capacity(conn, host, per_host_max=cfg.limits.per_host_max): return DispatchResult( False, str(task_id), diff --git a/tests/test_task_deps.py b/tests/test_task_deps.py index 8e7d043..8dc2ac4 100644 --- a/tests/test_task_deps.py +++ b/tests/test_task_deps.py @@ -10,11 +10,15 @@ the projection + read layer. from __future__ import annotations import sqlite3 +import uuid as _uuid +from dataclasses import dataclass +from uuid import UUID import pytest from claire import events as ev -from claire import read +from claire import read, scheduler +from claire.domain import TaskStatus from claire.hlc import HLCGenerator from claire.web import service @@ -200,3 +204,186 @@ def test_list_tasks_returns_blocked_by( assert by_id[task.id].blocked_by == [blocker.id] # A task with no blockers projects to an empty list. assert by_id[blocker.id].blocked_by == [] + + +# --- blocked-check helper -------------------------------------------------- + + +def test_unfinished_blockers_excludes_done( + conn: sqlite3.Connection, gen: HLCGenerator +) -> None: + service.create_project(conn, gen, name="alpha") + task = service.add_task(conn, gen, project="alpha", title="ship it") + b_open = service.add_task(conn, gen, project="alpha", title="open blocker") + b_done = service.add_task(conn, gen, project="alpha", title="done blocker") + service.transition_task_state( + conn, gen, task_id=b_done.id, to_state=TaskStatus.DONE + ) + service.set_task_blockers( + conn, gen, task_ref=str(task.id), + blocker_refs=[str(b_open.id), str(b_done.id)], + ) + + fresh = read.get_task(conn, task.id) + assert fresh is not None + unfinished = scheduler.unfinished_blockers(conn, fresh) + + # Only the still-open blocker counts. + assert [b.id for b in unfinished] == [b_open.id] + assert scheduler.is_blocked(conn, fresh) is True + + +def test_unfinished_blockers_nonexistent_does_not_block( + conn: sqlite3.Connection, gen: HLCGenerator +) -> None: + """A blocker id with no matching task is treated as satisfied.""" + service.create_project(conn, gen, name="alpha") + task = service.add_task(conn, gen, project="alpha", title="ship it") + blocker = service.add_task(conn, gen, project="alpha", title="prereq") + service.set_task_blockers( + conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)] + ) + # Hard-delete the blocker row directly so the id is now stale. The + # service layer never deletes tasks, so this simulates a stale id. + conn.execute("DELETE FROM tasks WHERE id = ?", (str(blocker.id),)) + + fresh = read.get_task(conn, task.id) + assert fresh is not None + assert scheduler.unfinished_blockers(conn, fresh) == [] + assert scheduler.is_blocked(conn, fresh) is False + + +# --- dispatch enforcement -------------------------------------------------- + + +@dataclass +class _FakeSessionRow: + host: str + uuid: UUID + cwd: str + mtime_epoch: int + snippet: str = "" + + +class _FakeRclaude: + """spawn() records the call and makes a new session discoverable.""" + + def __init__(self) -> None: + self._rows: list[_FakeSessionRow] = [] + self.spawn_calls: list[dict] = [] + self.send_calls: list[dict] = [] + + def list_sessions(self) -> list[_FakeSessionRow]: + return list(self._rows) + + def list_tmux(self) -> list: + return [] + + def spawn( + self, *, host: str, cwd: str, + mcp_config: str | None = None, name: str | None = None, + ) -> str: + self.spawn_calls.append( + {"host": host, "cwd": cwd, "mcp_config": mcp_config, "name": name} + ) + self._rows.append(_FakeSessionRow( + host=host, uuid=_uuid.uuid4(), cwd=cwd, mtime_epoch=999, + )) + return f"claude-tester-{len(self.spawn_calls)}" + + def send(self, *, text: str, match: str, yes: bool = False, dry_run: bool = False): # noqa: ARG002 + self.send_calls.append({"text": text, "match": match, "yes": yes}) + return None + + +def _no_mcp_stager(host: str) -> str | None: # noqa: ARG001 + return None + + +def test_dispatch_refuses_blocked_task( + conn: sqlite3.Connection, gen: HLCGenerator +) -> None: + service.create_project(conn, gen, name="alpha") + task = service.add_task(conn, gen, project="alpha", title="ship it") + blocker = service.add_task(conn, gen, project="alpha", title="prereq work") + service.set_task_blockers( + conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)] + ) + rcl = _FakeRclaude() + + result = service.dispatch_task( + conn, gen, task_id=task.id, host="plum", cwd="/work", + rclaude=rcl, discover_timeout_s=2, mcp_stager=_no_mcp_stager, + ) + + assert result.dispatched is False + assert "blocked by 1 unfinished task" in result.reason + assert "prereq work" in result.reason + assert rcl.spawn_calls == [] # never spawned + + +def test_dispatch_allows_task_once_blockers_done( + conn: sqlite3.Connection, gen: HLCGenerator +) -> None: + service.create_project(conn, gen, name="alpha") + task = service.add_task(conn, gen, project="alpha", title="ship it") + blocker = service.add_task(conn, gen, project="alpha", title="prereq work") + service.set_task_blockers( + conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)] + ) + # Complete the blocker. + service.transition_task_state( + conn, gen, task_id=blocker.id, to_state=TaskStatus.DONE + ) + rcl = _FakeRclaude() + + result = service.dispatch_task( + conn, gen, task_id=task.id, host="plum", cwd="/work", + rclaude=rcl, discover_timeout_s=2, mcp_stager=_no_mcp_stager, + ) + + assert result.dispatched is True + assert result.reason == "ok" + assert len(rcl.spawn_calls) == 1 + + +# --- scheduler enforcement ------------------------------------------------- + + +def _alive_session(conn: sqlite3.Connection, gen: HLCGenerator, host: str) -> UUID: + sid = _uuid.uuid4() + ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host=host, cwd="/w")) + conn.execute("UPDATE sessions SET liveness = 'alive' WHERE uuid = ?", (str(sid),)) + return sid + + +def test_suggest_assignments_excludes_blocked_task( + conn: sqlite3.Connection, gen: HLCGenerator +) -> None: + service.create_project(conn, gen, name="alpha") + task = service.add_task(conn, gen, project="alpha", title="ship it") + blocker = service.add_task(conn, gen, project="alpha", title="prereq work") + service.set_task_blockers( + conn, gen, task_ref=str(task.id), blocker_refs=[str(blocker.id)] + ) + _alive_session(conn, gen, "plum") + + # While the blocker is open, the blocked task must not be paired or + # surfaced as a remaining (workable) task. The blocker itself is open + # and unassigned, so it CAN be surfaced. + out = scheduler.suggest_assignments(conn) + surfaced = {p["task_id"] for p in out["pairings"]} | { + t["id"] for t in out["remaining_tasks"] + } + assert str(task.id) not in surfaced + assert str(blocker.id) in surfaced + + # Once the blocker is done, the previously-blocked task becomes eligible. + service.transition_task_state( + conn, gen, task_id=blocker.id, to_state=TaskStatus.DONE + ) + out2 = scheduler.suggest_assignments(conn) + surfaced2 = {p["task_id"] for p in out2["pairings"]} | { + t["id"] for t in out2["remaining_tasks"] + } + assert str(task.id) in surfaced2