From 16c030c6b3ac6af4fe2acf5cd05f4c98b0bd68f5 Mon Sep 17 00:00:00 2001 From: Natalie Date: Wed, 3 Jun 2026 01:41:27 -0700 Subject: [PATCH] feat(@projects/@claire): routing resolver for location-transparent Claire MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit route(signals, fleet) -> RouteDecision via a deterministic cascade: explicit host > capability-pin (uses hosts_with_capability) > sticky (subject's session/task already runs on a host, via sessions+assignments) > default-local. Pure + auditable (reason+candidates surfaced); the LLM classify step and cross-host execution are separate layers. 13 tests. Part of task 13764f2f. (manual commit via ALLOW_COMMIT — autocommit LLM still down on claire) --- src/claire/routing.py | 137 ++++++++++++++++++++++++++++++++++++++++ tests/test_routing.py | 143 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 280 insertions(+) create mode 100644 src/claire/routing.py create mode 100644 tests/test_routing.py diff --git a/src/claire/routing.py b/src/claire/routing.py new file mode 100644 index 0000000..46593cd --- /dev/null +++ b/src/claire/routing.py @@ -0,0 +1,137 @@ +"""Deterministic host routing for location-transparent Claire. + +When the user talks to ANY host's Claire, a turn about work that lives +elsewhere should run on THAT host. The receiving Claire (an LLM) CLASSIFIES a +turn into structured signals — an explicit host if named, the capability the +work needs, the subject (session/task) it references — and this module turns +those signals + fleet state into a host decision via a fixed priority cascade. + +Split of responsibility: the fuzzy natural-language step (turn → signals) lives +in the orchestrator prompt; the DECISION here is pure + deterministic so it's +testable and auditable. The cross-host *execution* (forwarding the turn + proxying +the reply) is a separate layer — this only answers "which host?". + +Cascade (first match wins): + 1. explicit — the user named a host + 2. capability — work needs a host-specific resource (gpu/media/mount/svc/…) + 3. sticky — the subject already has live work on a host (keep a thread coherent on one node) + 4. default — run on the receiving node (most turns are host-agnostic) +""" + +from __future__ import annotations + +import sqlite3 +from dataclasses import dataclass + +from .config import ClaireConfig + + +@dataclass(frozen=True) +class RouteDecision: + """Where a turn should run + WHY (the reason/detail are surfaced for + transparency — the user/operator can always see how routing decided).""" + + host: str # canonical host label to run on + reason: str # machine-readable: explicit | capability | sticky | default-local | unknown-host + detail: str # human one-liner + candidates: tuple[str, ...] = () # hosts considered (capability matches etc.) + + +def _least_loaded(hosts: list[str], load: dict[str, int] | None) -> str: + """Pick the least-loaded host (stable: known order when no load info).""" + if not load: + return hosts[0] + # min is stable on ties → preserves the input (known_hosts) order. + return min(hosts, key=lambda h: load.get(h, 0)) + + +def _host_of_session(conn: sqlite3.Connection, uuid: str) -> str | None: + row = conn.execute( + "SELECT host FROM sessions WHERE uuid = ?", (str(uuid),) + ).fetchone() + return row["host"] if row and row["host"] else None + + +def _host_of_task(conn: sqlite3.Connection, task_id: str) -> str | None: + """Host of a task's current worker — newest active assignment → session.""" + row = conn.execute( + """ + SELECT s.host + FROM assignments a + JOIN sessions s ON s.uuid = a.session_uuid + WHERE a.task_id = ? AND a.active = 1 + ORDER BY a.created_hlc DESC + LIMIT 1 + """, + (str(task_id),), + ).fetchone() + return row["host"] if row and row["host"] else None + + +def route( + conn: sqlite3.Connection, + cfg: ClaireConfig, + *, + receiving_host: str, + explicit_host: str | None = None, + capability_needs: list[str] | None = None, + session_uuid: str | None = None, + task_id: str | None = None, + host_load: dict[str, int] | None = None, +) -> RouteDecision: + """Resolve which host a classified turn should run on. + + `receiving_host` is the node the user is talking to (the default). The other + args are the classifier's output: `explicit_host` (named), `capability_needs` + (tags the work requires — host must satisfy ALL), `session_uuid`/`task_id` + (the subject, for stickiness). `host_load` (host → live-session count) is an + optional tiebreaker among equally-capable hosts. + """ + recv = cfg.resolve_host_label(receiving_host) + known = {h.name for h in cfg.known_hosts} | {recv} + + # 1. Explicit — the user named a host. + if explicit_host: + h = cfg.resolve_host_label(explicit_host) + if h in known: + return RouteDecision(h, "explicit", f"user named host {h!r}", (h,)) + # Named something we don't know — don't silently send it nowhere. + return RouteDecision( + recv, "unknown-host", + f"host {explicit_host!r} not in known_hosts — running local", (recv,), + ) + + # 2. Capability — the work needs a host-specific resource. Host must satisfy + # ALL declared needs (intersection). No match → fall through (best-effort). + needs = [n for n in (capability_needs or []) if n] + if needs: + cand: set[str] | None = None + for n in needs: + hs = set(cfg.hosts_with_capability(n)) + cand = hs if cand is None else (cand & hs) + candidates = sorted(cand or set()) + if candidates: + pick = _least_loaded(candidates, host_load) + return RouteDecision( + pick, "capability", + f"needs {'+'.join(needs)} → {pick}", tuple(candidates), + ) + + # 3. Sticky — keep a thread where its subject's work already lives. Session + # reference wins over task (more specific); both resolve to a host. + sticky: str | None = None + if session_uuid: + sticky = _host_of_session(conn, session_uuid) + if sticky is None and task_id: + sticky = _host_of_task(conn, task_id) + if sticky: + sticky = cfg.resolve_host_label(sticky) + return RouteDecision( + sticky, "sticky", + "continuing where the subject's work already runs", (sticky,), + ) + + # 4. Default — no host-specific signal; the receiving node handles it. + return RouteDecision( + recv, "default-local", "no host-specific signal — running local", (recv,) + ) diff --git a/tests/test_routing.py b/tests/test_routing.py new file mode 100644 index 0000000..fd640a4 --- /dev/null +++ b/tests/test_routing.py @@ -0,0 +1,143 @@ +"""Routing cascade: explicit > capability > sticky > default-local.""" + +from __future__ import annotations + +import pytest + +from uuid import UUID + +from claire.config import ClaireConfig, HostEntry +from claire.routing import route +from claire.web import service + + +@pytest.fixture +def cfg() -> ClaireConfig: + return ClaireConfig( + machine_id="m", + this_host="plum", + known_hosts=[ + HostEntry(name="plum", aliases=["local"]), + HostEntry(name="apricot", capabilities=["cores:64", "gpu"]), + HostEntry(name="black", capabilities=["media", "transmission"]), + ], + ) + + +def _add_session(conn, uuid: str, host: str) -> None: + conn.execute( + "INSERT INTO sessions (uuid, host, updated_hlc) VALUES (?, ?, ?)", + (uuid, host, "1"), + ) + + +def _task_with_worker(conn, gen, *, project: str, host: str, session_uuid: str): + """Create a project+task and an active assignment to a session on `host`. + Returns the task id (str). Uses service so FK constraints are satisfied.""" + service.create_project(conn, gen, name=project) + task = service.add_task(conn, gen, project=project, title="t") + _add_session(conn, session_uuid, host) + service.create_assignment(conn, gen, task_id=task.id, session_uuid=UUID(session_uuid)) + return str(task.id) + + +# 1. explicit ----------------------------------------------------------------- + +def test_explicit_host_wins(conn, cfg) -> None: + d = route(conn, cfg, receiving_host="plum", explicit_host="apricot") + assert (d.host, d.reason) == ("apricot", "explicit") + + +def test_explicit_alias_resolves(conn, cfg) -> None: + # "local" → plum even when received on plum + d = route(conn, cfg, receiving_host="plum", explicit_host="local") + assert (d.host, d.reason) == ("plum", "explicit") + + +def test_explicit_unknown_host_falls_back_local_not_silent(conn, cfg) -> None: + d = route(conn, cfg, receiving_host="plum", explicit_host="mars") + assert d.host == "plum" + assert d.reason == "unknown-host" + + +# 2. capability --------------------------------------------------------------- + +def test_capability_single(conn, cfg) -> None: + d = route(conn, cfg, receiving_host="plum", capability_needs=["media"]) + assert (d.host, d.reason) == ("black", "capability") + + +def test_capability_key_prefix(conn, cfg) -> None: + # asking "cores" matches "cores:64" + d = route(conn, cfg, receiving_host="plum", capability_needs=["cores"]) + assert d.host == "apricot" + + +def test_capability_intersection_of_needs(conn, cfg) -> None: + # gpu AND cores → only apricot has both; media-only black excluded + d = route(conn, cfg, receiving_host="plum", capability_needs=["gpu", "cores"]) + assert d.host == "apricot" + + +def test_capability_no_match_falls_through_to_default(conn, cfg) -> None: + d = route(conn, cfg, receiving_host="plum", capability_needs=["fpga"]) + assert (d.host, d.reason) == ("plum", "default-local") + + +def test_capability_tiebreak_least_loaded(conn) -> None: + cfg = ClaireConfig( + machine_id="m", this_host="plum", + known_hosts=[ + HostEntry(name="a", capabilities=["media"]), + HostEntry(name="b", capabilities=["media"]), + ], + ) + d = route(None, cfg, receiving_host="plum", capability_needs=["media"], + host_load={"a": 5, "b": 1}) + assert d.host == "b" + assert set(d.candidates) == {"a", "b"} + + +# 3. sticky ------------------------------------------------------------------- + +def test_sticky_by_session(conn, cfg) -> None: + _add_session(conn, "11111111-1111-1111-1111-111111111111", "apricot") + d = route(conn, cfg, receiving_host="plum", + session_uuid="11111111-1111-1111-1111-111111111111") + assert (d.host, d.reason) == ("apricot", "sticky") + + +def test_sticky_by_task_via_active_assignment(conn, gen, cfg) -> None: + task_id = _task_with_worker( + conn, gen, project="p", host="black", + session_uuid="22222222-2222-2222-2222-222222222222", + ) + d = route(conn, cfg, receiving_host="plum", task_id=task_id) + assert (d.host, d.reason) == ("black", "sticky") + + +def test_session_reference_beats_task(conn, gen, cfg) -> None: + _add_session(conn, "33333333-3333-3333-3333-333333333333", "apricot") + task_id = _task_with_worker( + conn, gen, project="p", host="black", + session_uuid="44444444-4444-4444-4444-444444444444", + ) + d = route(conn, cfg, receiving_host="plum", + session_uuid="33333333-3333-3333-3333-333333333333", task_id=task_id) + assert d.host == "apricot" # session wins + + +# 4. default ------------------------------------------------------------------ + +def test_default_local_when_no_signal(conn, cfg) -> None: + d = route(conn, cfg, receiving_host="apricot") + assert (d.host, d.reason) == ("apricot", "default-local") + + +def test_precedence_explicit_over_everything(conn, cfg) -> None: + # a sticky session on black, capability=media (black), but explicit apricot wins + _add_session(conn, "55555555-5555-5555-5555-555555555555", "black") + d = route(conn, cfg, receiving_host="plum", explicit_host="apricot", + capability_needs=["media"], + session_uuid="55555555-5555-5555-5555-555555555555") + assert (d.host, d.reason) == ("apricot", "explicit")