claire/tests/test_orchestrator_supervisor.py
Natalie 9f9c1f0cd5 feat(@projects/@clare): enhance orchestrator heartbeat with periodic checks
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-20 03:15:55 -07:00

154 lines
5.4 KiB
Python

"""Supervisor tests: `clare web` brings up the orchestrator on startup."""
from __future__ import annotations
import uuid as _uuid
from dataclasses import dataclass
from pathlib import Path
from uuid import UUID
import pytest
from clare.orchestrator import bootstrap
@dataclass
class _FakeSessionRow:
host: str
uuid: UUID
cwd: str
mtime_epoch: int
snippet: str = ""
@dataclass
class _FakeTmuxRow:
host: str
session_name: str
detail: str = ""
class _FakeRclaude:
"""Captures spawn calls and feeds list_sessions a scripted response."""
def __init__(self, *, rows_after_spawn: list[_FakeSessionRow] | None = None) -> None:
self._initial_rows: list[_FakeSessionRow] = []
self._post_spawn_rows: list[_FakeSessionRow] = rows_after_spawn or []
self._tmux_rows: list[_FakeTmuxRow] = []
self.spawn_calls: list[dict[str, str]] = []
self._spawned = False
def list_sessions(self) -> list[_FakeSessionRow]:
return self._post_spawn_rows if self._spawned else self._initial_rows
def list_tmux(self) -> list[_FakeTmuxRow]:
"""Mirror the production Rclaude.list_tmux — defaults to empty."""
return self._tmux_rows
def with_tmux_rows(self, rows: list[_FakeTmuxRow]) -> "_FakeRclaude":
self._tmux_rows = rows
return self
def spawn(self, *, host: str, cwd: str, mcp_config: str | None = None) -> str:
self.spawn_calls.append({"host": host, "cwd": cwd, "mcp_config": mcp_config or ""})
self._spawned = True
return f"claude-tester-{len(self.spawn_calls)}"
# Bootstrap also calls `.send()` to kick the new session so Claude
# flushes its JSONL — fake it as a recorded no-op.
def send(self, *, text: str, match: str, yes: bool = False, dry_run: bool = False): # noqa: ARG002
return None
def with_initial_rows(self, rows: list[_FakeSessionRow]) -> "_FakeRclaude":
self._initial_rows = rows
return self
@pytest.fixture
def isolated_cfg(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "config"))
monkeypatch.setenv("XDG_DATA_HOME", str(tmp_path / "data"))
return tmp_path / "config" / "clare" / "clare.toml"
def test_ensure_running_spawns_when_unconfigured(isolated_cfg: Path) -> None:
new_uuid = _uuid.uuid4()
rcl = _FakeRclaude(rows_after_spawn=[
_FakeSessionRow(
host="local",
uuid=new_uuid,
cwd=str((Path.home() / ".local/share/clare/orchestrator").resolve()),
mtime_epoch=200,
),
])
result = bootstrap.ensure_running(rclaude=rcl, discover_timeout_s=2)
assert result == str(new_uuid)
assert len(rcl.spawn_calls) == 1
# Persisted to config.
from clare.config import load_or_init
assert load_or_init().orchestrator.session_uuid == str(new_uuid)
def test_ensure_running_noop_when_session_already_alive(isolated_cfg: Path) -> None:
"""A live tmux pane at the configured cwd is the source of truth.
The on-disk JSONL alone is no longer enough — apricot tmux can crash
leaving the JSONL behind, and the supervisor must detect that and
respawn. Liveness is checked against `list_tmux()`.
"""
existing = _uuid.uuid4()
bootstrap.write_session_uuid(str(existing))
# The cwd-slug `Users-natalie--local-share-clare-orchestrator` is the
# substring `_cwd_slug(_DEFAULT_CWD)` looks for in the tmux name.
from clare.orchestrator.bootstrap import _DEFAULT_CWD, _cwd_slug
slug = _cwd_slug(str(_DEFAULT_CWD.expanduser().resolve()))
rcl = _FakeRclaude().with_tmux_rows([
_FakeTmuxRow(host="local", session_name=f"claude-tester-{slug}-1"),
])
result = bootstrap.ensure_running(rclaude=rcl, discover_timeout_s=1)
assert result == str(existing)
assert rcl.spawn_calls == [] # nothing spawned — already alive
def test_ensure_running_respawns_when_session_died(isolated_cfg: Path) -> None:
dead = _uuid.uuid4()
bootstrap.write_session_uuid(str(dead))
new_uuid = _uuid.uuid4()
rcl = _FakeRclaude(rows_after_spawn=[
_FakeSessionRow(
host="local",
uuid=new_uuid,
cwd=str((Path.home() / ".local/share/clare/orchestrator").resolve()),
mtime_epoch=300,
),
])
# list_sessions initially returns empty (dead session is gone).
result = bootstrap.ensure_running(rclaude=rcl, discover_timeout_s=2)
assert result == str(new_uuid)
assert len(rcl.spawn_calls) == 1
from clare.config import load_or_init
assert load_or_init().orchestrator.session_uuid == str(new_uuid)
def test_ensure_running_returns_none_when_discovery_times_out(
isolated_cfg: Path,
) -> None:
# Spawn succeeds but rclaude never reports the new session.
rcl = _FakeRclaude(rows_after_spawn=[])
result = bootstrap.ensure_running(
rclaude=rcl, discover_timeout_s=0.1, # type: ignore[arg-type]
)
assert result is None
assert len(rcl.spawn_calls) == 1
def test_ensure_running_handles_rclaude_failure(isolated_cfg: Path) -> None:
from clare.rclaude import RclaudeError
class _FailingRcl(_FakeRclaude):
def spawn(self, *, host: str, cwd: str, mcp_config: str | None = None) -> str:
raise RclaudeError("rclaude not on PATH")
rcl = _FailingRcl()
result = bootstrap.ensure_running(rclaude=rcl, discover_timeout_s=1)
assert result is None