diff --git a/src/clare/pull.py b/src/clare/pull.py index 2148578..517d388 100644 --- a/src/clare/pull.py +++ b/src/clare/pull.py @@ -1,14 +1,16 @@ -"""Pull loop — refresh fleet view from rclaude/_claude-projects → events. +"""Pull loop — refresh fleet view from rclaude → events. Each `pull()` invocation: - 1. Enumerates local claude sessions via `_claude-projects --sessions`. - 2. Emits a `SessionObserved` event for each — projection updaters handle - UPSERT so re-running is idempotent (a session already known with the - same data results in a no-op event-row but a SQL UPSERT that's a noop). + 1. Enumerates Claude sessions across every host rclaude knows about + (`rclaude list sessions --tsv`) and emits `SessionObserved` events + for any (uuid, host, mtime, cwd) tuple that differs from what's + already projected in the `sessions` table. + 2. Pulls triage signal (`rclaude triage --tsv`) and emits + `TriageRecorded` events for any session whose (priority, status, + summary, next_action) differs from the projection. -Multi-host enumeration is deferred until rclaude exposes a `--tsv` mode for -`list sessions`. The HLC + event substrate is in place so adding more event -sources later doesn't require schema changes. +Both passes are idempotent — re-running `clare pull` against an +unchanged fleet results in zero new events because nothing diffs. """ from __future__ import annotations @@ -16,6 +18,7 @@ from __future__ import annotations import sqlite3 from dataclasses import dataclass from datetime import datetime, timezone +from uuid import UUID from . import events from .hlc import HLCGenerator @@ -25,9 +28,45 @@ from .rclaude import Rclaude, RclaudeError @dataclass(frozen=True) class PullStats: sessions_observed: int + triage_recorded: int errors: list[str] +# --------------------------------------------------------------------------- +# Projection snapshots (used purely for diff/idempotency — never written from) +# --------------------------------------------------------------------------- + + +def _session_snapshot( + conn: sqlite3.Connection, +) -> dict[UUID, tuple[str, str | None, str | None]]: + """Return {uuid: (host, cwd, last_seen_mtime_iso)} from the sessions table.""" + rows = conn.execute( + "SELECT uuid, host, cwd, last_seen_mtime FROM sessions" + ).fetchall() + return {UUID(r["uuid"]): (r["host"], r["cwd"], r["last_seen_mtime"]) for r in rows} + + +def _triage_snapshot( + conn: sqlite3.Connection, +) -> dict[UUID, tuple[int | None, str | None, str | None]]: + """Return {uuid: (priority, status, summary)} from the sessions table.""" + rows = conn.execute( + """ + SELECT uuid, last_triage_priority, last_triage_status, last_triage_summary + FROM sessions + """ + ).fetchall() + return { + UUID(r["uuid"]): ( + r["last_triage_priority"], + r["last_triage_status"], + r["last_triage_summary"], + ) + for r in rows + } + + def pull( conn: sqlite3.Connection, generator: HLCGenerator, @@ -37,15 +76,22 @@ def pull( rclaude = rclaude or Rclaude() errors: list[str] = [] sessions_observed = 0 + triage_recorded = 0 + # --- Sessions --------------------------------------------------------- try: - rows = rclaude.local_sessions() + session_rows = rclaude.list_sessions() except RclaudeError as exc: - errors.append(f"local_sessions: {exc}") - rows = [] + errors.append(f"list_sessions: {exc}") + session_rows = [] - for row in rows: + seen_snapshot = _session_snapshot(conn) + for row in session_rows: mtime = datetime.fromtimestamp(row.mtime_epoch, tz=timezone.utc) + mtime_iso = mtime.isoformat() + prev = seen_snapshot.get(row.uuid) + if prev is not None and prev == (row.host, row.cwd, mtime_iso): + continue events.append( conn, generator, @@ -59,4 +105,33 @@ def pull( ) sessions_observed += 1 - return PullStats(sessions_observed=sessions_observed, errors=errors) + # --- Triage ----------------------------------------------------------- + try: + triage_rows = rclaude.triage() + except RclaudeError as exc: + errors.append(f"triage: {exc}") + triage_rows = [] + + triage_snap = _triage_snapshot(conn) + for trow in triage_rows: + prev_t = triage_snap.get(trow.uuid) + if prev_t is not None and prev_t == (trow.priority, trow.status, trow.summary): + continue + events.append( + conn, + generator, + events.TriageRecorded( + session_uuid=trow.uuid, + priority=trow.priority, + status=trow.status, + summary=trow.summary, + next_action=trow.next_action or None, + ), + ) + triage_recorded += 1 + + return PullStats( + sessions_observed=sessions_observed, + triage_recorded=triage_recorded, + errors=errors, + ) diff --git a/src/clare/rclaude.py b/src/clare/rclaude.py index 98e4758..c43e6d5 100644 --- a/src/clare/rclaude.py +++ b/src/clare/rclaude.py @@ -14,11 +14,9 @@ signature. from __future__ import annotations import shutil -import socket import subprocess from collections.abc import Callable, Sequence from dataclasses import dataclass -from pathlib import Path from typing import Any from uuid import UUID @@ -116,80 +114,65 @@ class Rclaude: return self._run(["--version"]).strip() def list_tmux(self) -> list[TmuxRow]: - # `rclaude list tmux` prints a header row, then `host\tkind\tname\tdetail` - # after column-aligned formatting. To get raw TSV we'd need rclaude to - # expose a --tsv mode; in its absence we parse the column-aligned form. - # Header lines: "HOST KIND SESSION/CWD/UUID DETAIL" - raw = self._run(["list", "tmux"]) + """Cross-host live tmux sessions via `rclaude list tmux --tsv`. + + Raw TSV row shape: `host\\ttmux\\tsession_name\\tdetail`. + """ + raw = self._run(["list", "tmux", "--tsv"]) rows: list[TmuxRow] = [] for line in raw.splitlines(): - if not line.strip() or line.startswith("HOST"): + if not line.strip(): continue - # Split into at most 4 whitespace-delimited columns. rclaude uses - # at-least-two-space gaps for column alignment. - parts = [p for p in line.split(" ") if p.strip()] - if len(parts) < 3: + parts = line.split("\t") + if len(parts) < 3 or parts[1] != "tmux": continue - host = parts[0].strip() - kind = parts[1].strip() - if kind != "tmux": - continue - session = parts[2].strip() - detail = parts[3].strip() if len(parts) >= 4 else "" - rows.append(TmuxRow(host=host, session_name=session, detail=detail)) + rows.append( + TmuxRow( + host=parts[0], + session_name=parts[2], + detail=parts[3] if len(parts) >= 4 else "", + ) + ) return rows - def local_sessions(self, *, helper_path: Path | None = None) -> list[SessionRow]: - """Enumerate local claude sessions via `_claude-projects --sessions`. + def list_sessions(self) -> list[SessionRow]: + """Cross-host on-disk Claude sessions via `rclaude list sessions --tsv`. - Multi-host enumeration is deferred until rclaude grows a `--tsv` flag; - for Push A we work off the local machine's `~/.claude/projects/` only. - Output TSV: `mtime_epoch\\tuuid\\tcwd\\tsnippet`. + Raw TSV row shape: `host\\tsession\\tuuid\\tsnippet\\tcwd\\tmtime_epoch`. + Rows with kind != "session" (e.g. tmux header rows that rclaude + interleaves under `list sessions`) are skipped. """ - if helper_path is None: - # Live next to rclaude under session-tools/bin/. - helper_path = Path.home() / "Code" / "@scripts" / "session-tools" / "bin" / "_claude-projects" - if not helper_path.exists(): - raise RclaudeError(f"_claude-projects helper not found at {helper_path}") - try: - result = self._runner( - [str(helper_path), "--sessions"], - capture_output=True, - text=True, - check=False, - timeout=30, - ) - except subprocess.TimeoutExpired as exc: - raise RclaudeError("_claude-projects --sessions: timed out") from exc - if result.returncode != 0: - raise RclaudeError( - f"_claude-projects --sessions exited {result.returncode}: " - f"{(result.stderr or '').strip()}" - ) - host = socket.gethostname().split(".")[0] or "local" + raw = self._run(["list", "sessions", "--tsv"]) rows: list[SessionRow] = [] - for line in result.stdout.splitlines(): + for line in raw.splitlines(): + if not line.strip(): + continue parts = line.split("\t") - if len(parts) < 4: + if len(parts) < 6 or parts[1] != "session": continue try: - mtime = int(parts[0]) - uuid_val = UUID(parts[1]) + uuid_val = UUID(parts[2]) + mtime = int(parts[5]) except (ValueError, IndexError): continue rows.append( SessionRow( - host=host, + host=parts[0], uuid=uuid_val, snippet=parts[3], - cwd=parts[2], + cwd=parts[4], mtime_epoch=mtime, ) ) return rows def triage(self, *, refresh: bool = False, limit: int | None = None) -> list[TriageRow]: - args = ["triage"] + """Cross-host triage via `rclaude triage --tsv`. + + Raw TSV row shape: + `host\\ttriage\\tuuid\\tpriority\\tstatus\\tsummary\\tnext_action\\tcwd\\tmtime_epoch`. + """ + args = ["triage", "--tsv"] if refresh: args.append("--refresh") if limit is not None: @@ -197,30 +180,27 @@ class Rclaude: raw = self._run(args) rows: list[TriageRow] = [] for line in raw.splitlines(): - if not line.strip() or line.startswith("HOST"): + if not line.strip(): continue - # rclaude triage uses fixed-width column output: - # HOST(8) UUID8(8) PRI(3) STATUS(15) SUMMARY(50) NEXT_ACTION(...) - # We split on >=2 spaces. With uuid8 we cannot reconstruct full uuid - # so triage rows are surfaced for human consumption but `pull.py` - # uses a JSON mode to be added in a follow-up rclaude change. - parts = [p for p in line.split(" ") if p.strip()] - if len(parts) < 5: + parts = line.split("\t") + if len(parts) < 9 or parts[1] != "triage": continue try: - priority = int(parts[2].strip()) - except ValueError: + uuid_val = UUID(parts[2]) + priority = int(parts[3]) + mtime = int(parts[8]) + except (ValueError, IndexError): continue rows.append( TriageRow( - host=parts[0].strip(), - uuid=UUID(int=0), + host=parts[0], + uuid=uuid_val, priority=priority, - status=parts[3].strip(), - summary=parts[4].strip(), - next_action=parts[5].strip() if len(parts) >= 6 else "", - cwd="", - mtime_epoch=0, + status=parts[4], + summary=parts[5], + next_action=parts[6], + cwd=parts[7], + mtime_epoch=mtime, ) ) return rows diff --git a/tests/test_rclaude_wrapper.py b/tests/test_rclaude_wrapper.py index 0587e7c..225cfca 100644 --- a/tests/test_rclaude_wrapper.py +++ b/tests/test_rclaude_wrapper.py @@ -1,7 +1,6 @@ from __future__ import annotations import subprocess -from unittest.mock import MagicMock import pytest @@ -34,17 +33,73 @@ def test_rclaude_raises_on_nonzero_exit(monkeypatch: pytest.MonkeyPatch) -> None rcl.version() -def test_rclaude_list_tmux_parses_column_aligned(monkeypatch: pytest.MonkeyPatch) -> None: +def test_rclaude_list_tmux_parses_tsv(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr("shutil.which", lambda _: "/usr/local/bin/rclaude") output = ( - "HOST KIND SESSION/CWD/UUID DETAIL\n" - "local tmux claude-natalie-foo-1715964800 1 windows (created ...)\n" - "apricot tmux claude-natalie-bar-1715964900 2 windows (created ...)\n" - "apricot disk /home/natalie/Code/whatever sessions=3\n" + "local\ttmux\tclaude-natalie-foo-1715964800\t1 windows (created ...)\n" + "apricot\ttmux\tclaude-natalie-bar-1715964900\t2 windows (created ...)\n" ) rcl = Rclaude(runner=_fake_runner(stdout=output)) rows = rcl.list_tmux() assert len(rows) == 2 assert rows[0].host == "local" assert rows[0].session_name == "claude-natalie-foo-1715964800" + assert rows[0].detail.startswith("1 windows") assert rows[1].host == "apricot" + + +def test_rclaude_list_sessions_parses_tsv(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr("shutil.which", lambda _: "/usr/local/bin/rclaude") + # `rclaude list sessions --tsv` interleaves tmux rows with session rows; + # the wrapper should skip the tmux ones. + output = ( + "local\ttmux\tclaude-foo\t1 windows\n" + "local\tsession\t94fc4f45-0160-4fb8-9c23-475a0c63c983\thello world\t/Users/x/Code\t1779098754\n" + "apricot\tsession\tbd306333-eb2d-4b65-8a50-2179b2697756\tworking on rclaude\t/home/u/proj\t1779098693\n" + ) + rcl = Rclaude(runner=_fake_runner(stdout=output)) + rows = rcl.list_sessions() + assert len(rows) == 2 + assert rows[0].host == "local" + assert str(rows[0].uuid) == "94fc4f45-0160-4fb8-9c23-475a0c63c983" + assert rows[0].cwd == "/Users/x/Code" + assert rows[0].mtime_epoch == 1779098754 + assert rows[0].snippet == "hello world" + assert rows[1].host == "apricot" + assert rows[1].cwd == "/home/u/proj" + + +def test_rclaude_triage_parses_tsv(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr("shutil.which", lambda _: "/usr/local/bin/rclaude") + output = ( + "local\ttriage\t94fc4f45-0160-4fb8-9c23-475a0c63c983\t1\tin_progress" + "\tMulti-agent setup\tResume #31\t/Users/x/Code\t1779098754\n" + "apricot\ttriage\t41daf63a-245b-4194-b9ee-b704f833400f\t2\tblocked" + "\tWrong branch\tUpdate config\t/home/u/proj\t1779098796\n" + ) + rcl = Rclaude(runner=_fake_runner(stdout=output)) + rows = rcl.triage() + assert len(rows) == 2 + assert rows[0].host == "local" + assert str(rows[0].uuid) == "94fc4f45-0160-4fb8-9c23-475a0c63c983" + assert rows[0].priority == 1 + assert rows[0].status == "in_progress" + assert rows[0].summary == "Multi-agent setup" + assert rows[0].next_action == "Resume #31" + assert rows[0].cwd == "/Users/x/Code" + assert rows[0].mtime_epoch == 1779098754 + assert rows[1].host == "apricot" + assert rows[1].priority == 2 + + +def test_rclaude_triage_passes_flags(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr("shutil.which", lambda _: "/usr/local/bin/rclaude") + captured_args: list[list[str]] = [] + + def runner(args, **kwargs): + captured_args.append(list(args)) + return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") + + rcl = Rclaude(runner=runner) + rcl.triage(refresh=True, limit=5) + assert captured_args == [["rclaude", "triage", "--tsv", "--refresh", "--limit", "5"]]