feat(@projects/@claire): routing resolver for location-transparent Claire
route(signals, fleet) -> RouteDecision via a deterministic cascade: explicit host > capability-pin (uses hosts_with_capability) > sticky (subject's session/task already runs on a host, via sessions+assignments) > default-local. Pure + auditable (reason+candidates surfaced); the LLM classify step and cross-host execution are separate layers. 13 tests. Part of task 13764f2f. (manual commit via ALLOW_COMMIT — autocommit LLM still down on claire)
This commit is contained in:
parent
24c6f24f43
commit
16c030c6b3
2 changed files with 280 additions and 0 deletions
137
src/claire/routing.py
Normal file
137
src/claire/routing.py
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
"""Deterministic host routing for location-transparent Claire.
|
||||
|
||||
When the user talks to ANY host's Claire, a turn about work that lives
|
||||
elsewhere should run on THAT host. The receiving Claire (an LLM) CLASSIFIES a
|
||||
turn into structured signals — an explicit host if named, the capability the
|
||||
work needs, the subject (session/task) it references — and this module turns
|
||||
those signals + fleet state into a host decision via a fixed priority cascade.
|
||||
|
||||
Split of responsibility: the fuzzy natural-language step (turn → signals) lives
|
||||
in the orchestrator prompt; the DECISION here is pure + deterministic so it's
|
||||
testable and auditable. The cross-host *execution* (forwarding the turn + proxying
|
||||
the reply) is a separate layer — this only answers "which host?".
|
||||
|
||||
Cascade (first match wins):
|
||||
1. explicit — the user named a host
|
||||
2. capability — work needs a host-specific resource (gpu/media/mount/svc/…)
|
||||
3. sticky — the subject already has live work on a host (keep a thread coherent on one node)
|
||||
4. default — run on the receiving node (most turns are host-agnostic)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from dataclasses import dataclass
|
||||
|
||||
from .config import ClaireConfig
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RouteDecision:
|
||||
"""Where a turn should run + WHY (the reason/detail are surfaced for
|
||||
transparency — the user/operator can always see how routing decided)."""
|
||||
|
||||
host: str # canonical host label to run on
|
||||
reason: str # machine-readable: explicit | capability | sticky | default-local | unknown-host
|
||||
detail: str # human one-liner
|
||||
candidates: tuple[str, ...] = () # hosts considered (capability matches etc.)
|
||||
|
||||
|
||||
def _least_loaded(hosts: list[str], load: dict[str, int] | None) -> str:
|
||||
"""Pick the least-loaded host (stable: known order when no load info)."""
|
||||
if not load:
|
||||
return hosts[0]
|
||||
# min is stable on ties → preserves the input (known_hosts) order.
|
||||
return min(hosts, key=lambda h: load.get(h, 0))
|
||||
|
||||
|
||||
def _host_of_session(conn: sqlite3.Connection, uuid: str) -> str | None:
|
||||
row = conn.execute(
|
||||
"SELECT host FROM sessions WHERE uuid = ?", (str(uuid),)
|
||||
).fetchone()
|
||||
return row["host"] if row and row["host"] else None
|
||||
|
||||
|
||||
def _host_of_task(conn: sqlite3.Connection, task_id: str) -> str | None:
|
||||
"""Host of a task's current worker — newest active assignment → session."""
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT s.host
|
||||
FROM assignments a
|
||||
JOIN sessions s ON s.uuid = a.session_uuid
|
||||
WHERE a.task_id = ? AND a.active = 1
|
||||
ORDER BY a.created_hlc DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
(str(task_id),),
|
||||
).fetchone()
|
||||
return row["host"] if row and row["host"] else None
|
||||
|
||||
|
||||
def route(
|
||||
conn: sqlite3.Connection,
|
||||
cfg: ClaireConfig,
|
||||
*,
|
||||
receiving_host: str,
|
||||
explicit_host: str | None = None,
|
||||
capability_needs: list[str] | None = None,
|
||||
session_uuid: str | None = None,
|
||||
task_id: str | None = None,
|
||||
host_load: dict[str, int] | None = None,
|
||||
) -> RouteDecision:
|
||||
"""Resolve which host a classified turn should run on.
|
||||
|
||||
`receiving_host` is the node the user is talking to (the default). The other
|
||||
args are the classifier's output: `explicit_host` (named), `capability_needs`
|
||||
(tags the work requires — host must satisfy ALL), `session_uuid`/`task_id`
|
||||
(the subject, for stickiness). `host_load` (host → live-session count) is an
|
||||
optional tiebreaker among equally-capable hosts.
|
||||
"""
|
||||
recv = cfg.resolve_host_label(receiving_host)
|
||||
known = {h.name for h in cfg.known_hosts} | {recv}
|
||||
|
||||
# 1. Explicit — the user named a host.
|
||||
if explicit_host:
|
||||
h = cfg.resolve_host_label(explicit_host)
|
||||
if h in known:
|
||||
return RouteDecision(h, "explicit", f"user named host {h!r}", (h,))
|
||||
# Named something we don't know — don't silently send it nowhere.
|
||||
return RouteDecision(
|
||||
recv, "unknown-host",
|
||||
f"host {explicit_host!r} not in known_hosts — running local", (recv,),
|
||||
)
|
||||
|
||||
# 2. Capability — the work needs a host-specific resource. Host must satisfy
|
||||
# ALL declared needs (intersection). No match → fall through (best-effort).
|
||||
needs = [n for n in (capability_needs or []) if n]
|
||||
if needs:
|
||||
cand: set[str] | None = None
|
||||
for n in needs:
|
||||
hs = set(cfg.hosts_with_capability(n))
|
||||
cand = hs if cand is None else (cand & hs)
|
||||
candidates = sorted(cand or set())
|
||||
if candidates:
|
||||
pick = _least_loaded(candidates, host_load)
|
||||
return RouteDecision(
|
||||
pick, "capability",
|
||||
f"needs {'+'.join(needs)} → {pick}", tuple(candidates),
|
||||
)
|
||||
|
||||
# 3. Sticky — keep a thread where its subject's work already lives. Session
|
||||
# reference wins over task (more specific); both resolve to a host.
|
||||
sticky: str | None = None
|
||||
if session_uuid:
|
||||
sticky = _host_of_session(conn, session_uuid)
|
||||
if sticky is None and task_id:
|
||||
sticky = _host_of_task(conn, task_id)
|
||||
if sticky:
|
||||
sticky = cfg.resolve_host_label(sticky)
|
||||
return RouteDecision(
|
||||
sticky, "sticky",
|
||||
"continuing where the subject's work already runs", (sticky,),
|
||||
)
|
||||
|
||||
# 4. Default — no host-specific signal; the receiving node handles it.
|
||||
return RouteDecision(
|
||||
recv, "default-local", "no host-specific signal — running local", (recv,)
|
||||
)
|
||||
143
tests/test_routing.py
Normal file
143
tests/test_routing.py
Normal file
|
|
@ -0,0 +1,143 @@
|
|||
"""Routing cascade: explicit > capability > sticky > default-local."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from uuid import UUID
|
||||
|
||||
from claire.config import ClaireConfig, HostEntry
|
||||
from claire.routing import route
|
||||
from claire.web import service
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cfg() -> ClaireConfig:
|
||||
return ClaireConfig(
|
||||
machine_id="m",
|
||||
this_host="plum",
|
||||
known_hosts=[
|
||||
HostEntry(name="plum", aliases=["local"]),
|
||||
HostEntry(name="apricot", capabilities=["cores:64", "gpu"]),
|
||||
HostEntry(name="black", capabilities=["media", "transmission"]),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def _add_session(conn, uuid: str, host: str) -> None:
|
||||
conn.execute(
|
||||
"INSERT INTO sessions (uuid, host, updated_hlc) VALUES (?, ?, ?)",
|
||||
(uuid, host, "1"),
|
||||
)
|
||||
|
||||
|
||||
def _task_with_worker(conn, gen, *, project: str, host: str, session_uuid: str):
|
||||
"""Create a project+task and an active assignment to a session on `host`.
|
||||
Returns the task id (str). Uses service so FK constraints are satisfied."""
|
||||
service.create_project(conn, gen, name=project)
|
||||
task = service.add_task(conn, gen, project=project, title="t")
|
||||
_add_session(conn, session_uuid, host)
|
||||
service.create_assignment(conn, gen, task_id=task.id, session_uuid=UUID(session_uuid))
|
||||
return str(task.id)
|
||||
|
||||
|
||||
# 1. explicit -----------------------------------------------------------------
|
||||
|
||||
def test_explicit_host_wins(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="plum", explicit_host="apricot")
|
||||
assert (d.host, d.reason) == ("apricot", "explicit")
|
||||
|
||||
|
||||
def test_explicit_alias_resolves(conn, cfg) -> None:
|
||||
# "local" → plum even when received on plum
|
||||
d = route(conn, cfg, receiving_host="plum", explicit_host="local")
|
||||
assert (d.host, d.reason) == ("plum", "explicit")
|
||||
|
||||
|
||||
def test_explicit_unknown_host_falls_back_local_not_silent(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="plum", explicit_host="mars")
|
||||
assert d.host == "plum"
|
||||
assert d.reason == "unknown-host"
|
||||
|
||||
|
||||
# 2. capability ---------------------------------------------------------------
|
||||
|
||||
def test_capability_single(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="plum", capability_needs=["media"])
|
||||
assert (d.host, d.reason) == ("black", "capability")
|
||||
|
||||
|
||||
def test_capability_key_prefix(conn, cfg) -> None:
|
||||
# asking "cores" matches "cores:64"
|
||||
d = route(conn, cfg, receiving_host="plum", capability_needs=["cores"])
|
||||
assert d.host == "apricot"
|
||||
|
||||
|
||||
def test_capability_intersection_of_needs(conn, cfg) -> None:
|
||||
# gpu AND cores → only apricot has both; media-only black excluded
|
||||
d = route(conn, cfg, receiving_host="plum", capability_needs=["gpu", "cores"])
|
||||
assert d.host == "apricot"
|
||||
|
||||
|
||||
def test_capability_no_match_falls_through_to_default(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="plum", capability_needs=["fpga"])
|
||||
assert (d.host, d.reason) == ("plum", "default-local")
|
||||
|
||||
|
||||
def test_capability_tiebreak_least_loaded(conn) -> None:
|
||||
cfg = ClaireConfig(
|
||||
machine_id="m", this_host="plum",
|
||||
known_hosts=[
|
||||
HostEntry(name="a", capabilities=["media"]),
|
||||
HostEntry(name="b", capabilities=["media"]),
|
||||
],
|
||||
)
|
||||
d = route(None, cfg, receiving_host="plum", capability_needs=["media"],
|
||||
host_load={"a": 5, "b": 1})
|
||||
assert d.host == "b"
|
||||
assert set(d.candidates) == {"a", "b"}
|
||||
|
||||
|
||||
# 3. sticky -------------------------------------------------------------------
|
||||
|
||||
def test_sticky_by_session(conn, cfg) -> None:
|
||||
_add_session(conn, "11111111-1111-1111-1111-111111111111", "apricot")
|
||||
d = route(conn, cfg, receiving_host="plum",
|
||||
session_uuid="11111111-1111-1111-1111-111111111111")
|
||||
assert (d.host, d.reason) == ("apricot", "sticky")
|
||||
|
||||
|
||||
def test_sticky_by_task_via_active_assignment(conn, gen, cfg) -> None:
|
||||
task_id = _task_with_worker(
|
||||
conn, gen, project="p", host="black",
|
||||
session_uuid="22222222-2222-2222-2222-222222222222",
|
||||
)
|
||||
d = route(conn, cfg, receiving_host="plum", task_id=task_id)
|
||||
assert (d.host, d.reason) == ("black", "sticky")
|
||||
|
||||
|
||||
def test_session_reference_beats_task(conn, gen, cfg) -> None:
|
||||
_add_session(conn, "33333333-3333-3333-3333-333333333333", "apricot")
|
||||
task_id = _task_with_worker(
|
||||
conn, gen, project="p", host="black",
|
||||
session_uuid="44444444-4444-4444-4444-444444444444",
|
||||
)
|
||||
d = route(conn, cfg, receiving_host="plum",
|
||||
session_uuid="33333333-3333-3333-3333-333333333333", task_id=task_id)
|
||||
assert d.host == "apricot" # session wins
|
||||
|
||||
|
||||
# 4. default ------------------------------------------------------------------
|
||||
|
||||
def test_default_local_when_no_signal(conn, cfg) -> None:
|
||||
d = route(conn, cfg, receiving_host="apricot")
|
||||
assert (d.host, d.reason) == ("apricot", "default-local")
|
||||
|
||||
|
||||
def test_precedence_explicit_over_everything(conn, cfg) -> None:
|
||||
# a sticky session on black, capability=media (black), but explicit apricot wins
|
||||
_add_session(conn, "55555555-5555-5555-5555-555555555555", "black")
|
||||
d = route(conn, cfg, receiving_host="plum", explicit_host="apricot",
|
||||
capability_needs=["media"],
|
||||
session_uuid="55555555-5555-5555-5555-555555555555")
|
||||
assert (d.host, d.reason) == ("apricot", "explicit")
|
||||
Loading…
Add table
Reference in a new issue