claire/tests/test_fleet.py
autocommit 6d212b7dbe refactor(testing-test): ♻️ Update test imports to use claire instead of clare in package references
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-20 19:54:05 -07:00

107 lines
3.9 KiB
Python

"""Tests for migration 0004_fleet — agent_status push/pull hybrid."""
from __future__ import annotations
import time
import uuid as _uuid
from claire import events as ev
from claire.db import migrate, open_db
from claire.domain import TaskStatus
from claire.hlc import HLCGenerator
from claire.web import service
def _setup() -> tuple:
conn = open_db(":memory:")
migrate(conn)
gen = HLCGenerator("test-machine")
return conn, gen
def test_report_status_push_creates_row() -> None:
conn, gen = _setup()
service.create_project(conn, gen, name="p")
task = service.add_task(conn, gen, project="p", title="t")
sid = _uuid.uuid4()
status = service.report_agent_status(
conn, gen, session_uuid=sid, summary="working on it",
task_ref=str(task.id), state=TaskStatus.IN_PROGRESS,
)
assert status.task_id == task.id
assert status.project_id == task.project_id
assert status.state == TaskStatus.IN_PROGRESS
assert status.source == "push"
def test_report_status_upserts_existing() -> None:
conn, gen = _setup()
sid = _uuid.uuid4()
first = service.report_agent_status(conn, gen, session_uuid=sid, summary="A")
second = service.report_agent_status(conn, gen, session_uuid=sid, summary="B")
assert second.summary == "B"
rows = conn.execute("SELECT COUNT(*) FROM agent_status WHERE session_uuid = ?", (str(sid),)).fetchone()
assert rows[0] == 1 # upsert, not insert
def test_triage_synthesizes_status_when_none_exists() -> None:
"""A TriageRecorded event with no prior agent_status fills the gap."""
conn, gen = _setup()
sid = _uuid.uuid4()
ev.append(conn, gen, ev.TriageRecorded(
session_uuid=sid, priority=2, status="working",
summary="from triage", next_action="x",
))
rows = conn.execute("SELECT source, summary FROM agent_status WHERE session_uuid = ?", (str(sid),)).fetchone()
assert rows is not None
assert rows["source"] == "triage"
assert rows["summary"] == "from triage"
def test_recent_push_blocks_triage_override() -> None:
"""A fresh push (<5min ago) should NOT be overwritten by triage."""
conn, gen = _setup()
sid = _uuid.uuid4()
service.report_agent_status(conn, gen, session_uuid=sid, summary="pushed!")
# Immediate triage — must not overwrite the push row.
ev.append(conn, gen, ev.TriageRecorded(
session_uuid=sid, priority=2, status="working",
summary="triage interference", next_action="x",
))
row = conn.execute(
"SELECT source, summary FROM agent_status WHERE session_uuid = ?", (str(sid),)
).fetchone()
assert row["source"] == "push"
assert row["summary"] == "pushed!"
def test_stale_push_yields_to_triage() -> None:
"""A push older than the freshness window should be replaced by triage."""
conn, gen = _setup()
sid = _uuid.uuid4()
# Insert a push row directly with an HLC 10 minutes in the past.
past_ms = int(time.time() * 1000) - 10 * 60 * 1000
past_hlc = f"{past_ms}.000000.test-machine"
conn.execute(
"INSERT INTO agent_status (session_uuid, task_id, project_id, state, summary, source, hlc) "
"VALUES (?, NULL, NULL, NULL, 'old push', 'push', ?)",
(str(sid), past_hlc),
)
ev.append(conn, gen, ev.TriageRecorded(
session_uuid=sid, priority=2, status="working",
summary="newer triage", next_action="x",
))
row = conn.execute(
"SELECT source, summary FROM agent_status WHERE session_uuid = ?", (str(sid),)
).fetchone()
assert row["source"] == "triage"
assert row["summary"] == "newer triage"
def test_list_agent_status() -> None:
conn, gen = _setup()
a, b = _uuid.uuid4(), _uuid.uuid4()
service.report_agent_status(conn, gen, session_uuid=a, summary="A")
service.report_agent_status(conn, gen, session_uuid=b, summary="B")
fleet = service.list_agent_status(conn)
assert len(fleet) == 2