feat(pull): multi-host enumeration via rclaude --tsv

Replace local-only _claude-projects --sessions path with
'rclaude list sessions --tsv' (full uuids, separate cwd/mtime
columns) and add 'rclaude triage --tsv' ingestion emitting
TriageRecorded events. Diff against the sessions projection
to skip no-op events for unchanged (host, cwd, mtime) and
(priority, status, summary) tuples.
This commit is contained in:
Natalie 2026-05-18 03:23:55 -07:00
parent 5f316cde24
commit 87099fd99b
3 changed files with 198 additions and 88 deletions

View file

@ -1,14 +1,16 @@
"""Pull loop — refresh fleet view from rclaude/_claude-projects → events.
"""Pull loop — refresh fleet view from rclaude → events.
Each `pull()` invocation:
1. Enumerates local claude sessions via `_claude-projects --sessions`.
2. Emits a `SessionObserved` event for each projection updaters handle
UPSERT so re-running is idempotent (a session already known with the
same data results in a no-op event-row but a SQL UPSERT that's a noop).
1. Enumerates Claude sessions across every host rclaude knows about
(`rclaude list sessions --tsv`) and emits `SessionObserved` events
for any (uuid, host, mtime, cwd) tuple that differs from what's
already projected in the `sessions` table.
2. Pulls triage signal (`rclaude triage --tsv`) and emits
`TriageRecorded` events for any session whose (priority, status,
summary, next_action) differs from the projection.
Multi-host enumeration is deferred until rclaude exposes a `--tsv` mode for
`list sessions`. The HLC + event substrate is in place so adding more event
sources later doesn't require schema changes.
Both passes are idempotent re-running `clare pull` against an
unchanged fleet results in zero new events because nothing diffs.
"""
from __future__ import annotations
@ -16,6 +18,7 @@ from __future__ import annotations
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timezone
from uuid import UUID
from . import events
from .hlc import HLCGenerator
@ -25,9 +28,45 @@ from .rclaude import Rclaude, RclaudeError
@dataclass(frozen=True)
class PullStats:
sessions_observed: int
triage_recorded: int
errors: list[str]
# ---------------------------------------------------------------------------
# Projection snapshots (used purely for diff/idempotency — never written from)
# ---------------------------------------------------------------------------
def _session_snapshot(
conn: sqlite3.Connection,
) -> dict[UUID, tuple[str, str | None, str | None]]:
"""Return {uuid: (host, cwd, last_seen_mtime_iso)} from the sessions table."""
rows = conn.execute(
"SELECT uuid, host, cwd, last_seen_mtime FROM sessions"
).fetchall()
return {UUID(r["uuid"]): (r["host"], r["cwd"], r["last_seen_mtime"]) for r in rows}
def _triage_snapshot(
conn: sqlite3.Connection,
) -> dict[UUID, tuple[int | None, str | None, str | None]]:
"""Return {uuid: (priority, status, summary)} from the sessions table."""
rows = conn.execute(
"""
SELECT uuid, last_triage_priority, last_triage_status, last_triage_summary
FROM sessions
"""
).fetchall()
return {
UUID(r["uuid"]): (
r["last_triage_priority"],
r["last_triage_status"],
r["last_triage_summary"],
)
for r in rows
}
def pull(
conn: sqlite3.Connection,
generator: HLCGenerator,
@ -37,15 +76,22 @@ def pull(
rclaude = rclaude or Rclaude()
errors: list[str] = []
sessions_observed = 0
triage_recorded = 0
# --- Sessions ---------------------------------------------------------
try:
rows = rclaude.local_sessions()
session_rows = rclaude.list_sessions()
except RclaudeError as exc:
errors.append(f"local_sessions: {exc}")
rows = []
errors.append(f"list_sessions: {exc}")
session_rows = []
for row in rows:
seen_snapshot = _session_snapshot(conn)
for row in session_rows:
mtime = datetime.fromtimestamp(row.mtime_epoch, tz=timezone.utc)
mtime_iso = mtime.isoformat()
prev = seen_snapshot.get(row.uuid)
if prev is not None and prev == (row.host, row.cwd, mtime_iso):
continue
events.append(
conn,
generator,
@ -59,4 +105,33 @@ def pull(
)
sessions_observed += 1
return PullStats(sessions_observed=sessions_observed, errors=errors)
# --- Triage -----------------------------------------------------------
try:
triage_rows = rclaude.triage()
except RclaudeError as exc:
errors.append(f"triage: {exc}")
triage_rows = []
triage_snap = _triage_snapshot(conn)
for trow in triage_rows:
prev_t = triage_snap.get(trow.uuid)
if prev_t is not None and prev_t == (trow.priority, trow.status, trow.summary):
continue
events.append(
conn,
generator,
events.TriageRecorded(
session_uuid=trow.uuid,
priority=trow.priority,
status=trow.status,
summary=trow.summary,
next_action=trow.next_action or None,
),
)
triage_recorded += 1
return PullStats(
sessions_observed=sessions_observed,
triage_recorded=triage_recorded,
errors=errors,
)

View file

@ -14,11 +14,9 @@ signature.
from __future__ import annotations
import shutil
import socket
import subprocess
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from uuid import UUID
@ -116,80 +114,65 @@ class Rclaude:
return self._run(["--version"]).strip()
def list_tmux(self) -> list[TmuxRow]:
# `rclaude list tmux` prints a header row, then `host\tkind\tname\tdetail`
# after column-aligned formatting. To get raw TSV we'd need rclaude to
# expose a --tsv mode; in its absence we parse the column-aligned form.
# Header lines: "HOST KIND SESSION/CWD/UUID DETAIL"
raw = self._run(["list", "tmux"])
"""Cross-host live tmux sessions via `rclaude list tmux --tsv`.
Raw TSV row shape: `host\\ttmux\\tsession_name\\tdetail`.
"""
raw = self._run(["list", "tmux", "--tsv"])
rows: list[TmuxRow] = []
for line in raw.splitlines():
if not line.strip() or line.startswith("HOST"):
if not line.strip():
continue
# Split into at most 4 whitespace-delimited columns. rclaude uses
# at-least-two-space gaps for column alignment.
parts = [p for p in line.split(" ") if p.strip()]
if len(parts) < 3:
parts = line.split("\t")
if len(parts) < 3 or parts[1] != "tmux":
continue
host = parts[0].strip()
kind = parts[1].strip()
if kind != "tmux":
continue
session = parts[2].strip()
detail = parts[3].strip() if len(parts) >= 4 else ""
rows.append(TmuxRow(host=host, session_name=session, detail=detail))
rows.append(
TmuxRow(
host=parts[0],
session_name=parts[2],
detail=parts[3] if len(parts) >= 4 else "",
)
)
return rows
def local_sessions(self, *, helper_path: Path | None = None) -> list[SessionRow]:
"""Enumerate local claude sessions via `_claude-projects --sessions`.
def list_sessions(self) -> list[SessionRow]:
"""Cross-host on-disk Claude sessions via `rclaude list sessions --tsv`.
Multi-host enumeration is deferred until rclaude grows a `--tsv` flag;
for Push A we work off the local machine's `~/.claude/projects/` only.
Output TSV: `mtime_epoch\\tuuid\\tcwd\\tsnippet`.
Raw TSV row shape: `host\\tsession\\tuuid\\tsnippet\\tcwd\\tmtime_epoch`.
Rows with kind != "session" (e.g. tmux header rows that rclaude
interleaves under `list sessions`) are skipped.
"""
if helper_path is None:
# Live next to rclaude under session-tools/bin/.
helper_path = Path.home() / "Code" / "@scripts" / "session-tools" / "bin" / "_claude-projects"
if not helper_path.exists():
raise RclaudeError(f"_claude-projects helper not found at {helper_path}")
try:
result = self._runner(
[str(helper_path), "--sessions"],
capture_output=True,
text=True,
check=False,
timeout=30,
)
except subprocess.TimeoutExpired as exc:
raise RclaudeError("_claude-projects --sessions: timed out") from exc
if result.returncode != 0:
raise RclaudeError(
f"_claude-projects --sessions exited {result.returncode}: "
f"{(result.stderr or '').strip()}"
)
host = socket.gethostname().split(".")[0] or "local"
raw = self._run(["list", "sessions", "--tsv"])
rows: list[SessionRow] = []
for line in result.stdout.splitlines():
for line in raw.splitlines():
if not line.strip():
continue
parts = line.split("\t")
if len(parts) < 4:
if len(parts) < 6 or parts[1] != "session":
continue
try:
mtime = int(parts[0])
uuid_val = UUID(parts[1])
uuid_val = UUID(parts[2])
mtime = int(parts[5])
except (ValueError, IndexError):
continue
rows.append(
SessionRow(
host=host,
host=parts[0],
uuid=uuid_val,
snippet=parts[3],
cwd=parts[2],
cwd=parts[4],
mtime_epoch=mtime,
)
)
return rows
def triage(self, *, refresh: bool = False, limit: int | None = None) -> list[TriageRow]:
args = ["triage"]
"""Cross-host triage via `rclaude triage --tsv`.
Raw TSV row shape:
`host\\ttriage\\tuuid\\tpriority\\tstatus\\tsummary\\tnext_action\\tcwd\\tmtime_epoch`.
"""
args = ["triage", "--tsv"]
if refresh:
args.append("--refresh")
if limit is not None:
@ -197,30 +180,27 @@ class Rclaude:
raw = self._run(args)
rows: list[TriageRow] = []
for line in raw.splitlines():
if not line.strip() or line.startswith("HOST"):
if not line.strip():
continue
# rclaude triage uses fixed-width column output:
# HOST(8) UUID8(8) PRI(3) STATUS(15) SUMMARY(50) NEXT_ACTION(...)
# We split on >=2 spaces. With uuid8 we cannot reconstruct full uuid
# so triage rows are surfaced for human consumption but `pull.py`
# uses a JSON mode to be added in a follow-up rclaude change.
parts = [p for p in line.split(" ") if p.strip()]
if len(parts) < 5:
parts = line.split("\t")
if len(parts) < 9 or parts[1] != "triage":
continue
try:
priority = int(parts[2].strip())
except ValueError:
uuid_val = UUID(parts[2])
priority = int(parts[3])
mtime = int(parts[8])
except (ValueError, IndexError):
continue
rows.append(
TriageRow(
host=parts[0].strip(),
uuid=UUID(int=0),
host=parts[0],
uuid=uuid_val,
priority=priority,
status=parts[3].strip(),
summary=parts[4].strip(),
next_action=parts[5].strip() if len(parts) >= 6 else "",
cwd="",
mtime_epoch=0,
status=parts[4],
summary=parts[5],
next_action=parts[6],
cwd=parts[7],
mtime_epoch=mtime,
)
)
return rows

View file

@ -1,7 +1,6 @@
from __future__ import annotations
import subprocess
from unittest.mock import MagicMock
import pytest
@ -34,17 +33,73 @@ def test_rclaude_raises_on_nonzero_exit(monkeypatch: pytest.MonkeyPatch) -> None
rcl.version()
def test_rclaude_list_tmux_parses_column_aligned(monkeypatch: pytest.MonkeyPatch) -> None:
def test_rclaude_list_tmux_parses_tsv(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("shutil.which", lambda _: "/usr/local/bin/rclaude")
output = (
"HOST KIND SESSION/CWD/UUID DETAIL\n"
"local tmux claude-natalie-foo-1715964800 1 windows (created ...)\n"
"apricot tmux claude-natalie-bar-1715964900 2 windows (created ...)\n"
"apricot disk /home/natalie/Code/whatever sessions=3\n"
"local\ttmux\tclaude-natalie-foo-1715964800\t1 windows (created ...)\n"
"apricot\ttmux\tclaude-natalie-bar-1715964900\t2 windows (created ...)\n"
)
rcl = Rclaude(runner=_fake_runner(stdout=output))
rows = rcl.list_tmux()
assert len(rows) == 2
assert rows[0].host == "local"
assert rows[0].session_name == "claude-natalie-foo-1715964800"
assert rows[0].detail.startswith("1 windows")
assert rows[1].host == "apricot"
def test_rclaude_list_sessions_parses_tsv(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("shutil.which", lambda _: "/usr/local/bin/rclaude")
# `rclaude list sessions --tsv` interleaves tmux rows with session rows;
# the wrapper should skip the tmux ones.
output = (
"local\ttmux\tclaude-foo\t1 windows\n"
"local\tsession\t94fc4f45-0160-4fb8-9c23-475a0c63c983\thello world\t/Users/x/Code\t1779098754\n"
"apricot\tsession\tbd306333-eb2d-4b65-8a50-2179b2697756\tworking on rclaude\t/home/u/proj\t1779098693\n"
)
rcl = Rclaude(runner=_fake_runner(stdout=output))
rows = rcl.list_sessions()
assert len(rows) == 2
assert rows[0].host == "local"
assert str(rows[0].uuid) == "94fc4f45-0160-4fb8-9c23-475a0c63c983"
assert rows[0].cwd == "/Users/x/Code"
assert rows[0].mtime_epoch == 1779098754
assert rows[0].snippet == "hello world"
assert rows[1].host == "apricot"
assert rows[1].cwd == "/home/u/proj"
def test_rclaude_triage_parses_tsv(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("shutil.which", lambda _: "/usr/local/bin/rclaude")
output = (
"local\ttriage\t94fc4f45-0160-4fb8-9c23-475a0c63c983\t1\tin_progress"
"\tMulti-agent setup\tResume #31\t/Users/x/Code\t1779098754\n"
"apricot\ttriage\t41daf63a-245b-4194-b9ee-b704f833400f\t2\tblocked"
"\tWrong branch\tUpdate config\t/home/u/proj\t1779098796\n"
)
rcl = Rclaude(runner=_fake_runner(stdout=output))
rows = rcl.triage()
assert len(rows) == 2
assert rows[0].host == "local"
assert str(rows[0].uuid) == "94fc4f45-0160-4fb8-9c23-475a0c63c983"
assert rows[0].priority == 1
assert rows[0].status == "in_progress"
assert rows[0].summary == "Multi-agent setup"
assert rows[0].next_action == "Resume #31"
assert rows[0].cwd == "/Users/x/Code"
assert rows[0].mtime_epoch == 1779098754
assert rows[1].host == "apricot"
assert rows[1].priority == 2
def test_rclaude_triage_passes_flags(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr("shutil.which", lambda _: "/usr/local/bin/rclaude")
captured_args: list[list[str]] = []
def runner(args, **kwargs):
captured_args.append(list(args))
return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="")
rcl = Rclaude(runner=runner)
rcl.triage(refresh=True, limit=5)
assert captured_args == [["rclaude", "triage", "--tsv", "--refresh", "--limit", "5"]]