"""Tests for migration 0004_fleet — agent_status push/pull hybrid.""" from __future__ import annotations import time import uuid as _uuid from claire import events as ev from claire.db import migrate, open_db from claire.domain import TaskStatus from claire.hlc import HLCGenerator from claire.web import service def _setup() -> tuple: conn = open_db(":memory:") migrate(conn) gen = HLCGenerator("test-machine") return conn, gen def test_report_status_push_creates_row() -> None: conn, gen = _setup() service.create_project(conn, gen, name="p") task = service.add_task(conn, gen, project="p", title="t") sid = _uuid.uuid4() status = service.report_agent_status( conn, gen, session_uuid=sid, summary="working on it", task_ref=str(task.id), state=TaskStatus.IN_PROGRESS, ) assert status.task_id == task.id assert status.project_id == task.project_id assert status.state == TaskStatus.IN_PROGRESS assert status.source == "push" def test_report_status_upserts_existing() -> None: conn, gen = _setup() sid = _uuid.uuid4() first = service.report_agent_status(conn, gen, session_uuid=sid, summary="A") second = service.report_agent_status(conn, gen, session_uuid=sid, summary="B") assert second.summary == "B" rows = conn.execute("SELECT COUNT(*) FROM agent_status WHERE session_uuid = ?", (str(sid),)).fetchone() assert rows[0] == 1 # upsert, not insert def test_triage_synthesizes_status_when_none_exists() -> None: """A TriageRecorded event with no prior agent_status fills the gap.""" conn, gen = _setup() sid = _uuid.uuid4() ev.append(conn, gen, ev.TriageRecorded( session_uuid=sid, priority=2, status="working", summary="from triage", next_action="x", )) rows = conn.execute("SELECT source, summary FROM agent_status WHERE session_uuid = ?", (str(sid),)).fetchone() assert rows is not None assert rows["source"] == "triage" assert rows["summary"] == "from triage" def test_recent_push_blocks_triage_override() -> None: """A fresh push (<5min ago) should NOT be overwritten by triage.""" conn, gen = _setup() sid = _uuid.uuid4() service.report_agent_status(conn, gen, session_uuid=sid, summary="pushed!") # Immediate triage — must not overwrite the push row. ev.append(conn, gen, ev.TriageRecorded( session_uuid=sid, priority=2, status="working", summary="triage interference", next_action="x", )) row = conn.execute( "SELECT source, summary FROM agent_status WHERE session_uuid = ?", (str(sid),) ).fetchone() assert row["source"] == "push" assert row["summary"] == "pushed!" def test_stale_push_yields_to_triage() -> None: """A push older than the freshness window should be replaced by triage.""" conn, gen = _setup() sid = _uuid.uuid4() # Insert a push row directly with an HLC 10 minutes in the past. past_ms = int(time.time() * 1000) - 10 * 60 * 1000 past_hlc = f"{past_ms}.000000.test-machine" conn.execute( "INSERT INTO agent_status (session_uuid, task_id, project_id, state, summary, source, hlc) " "VALUES (?, NULL, NULL, NULL, 'old push', 'push', ?)", (str(sid), past_hlc), ) ev.append(conn, gen, ev.TriageRecorded( session_uuid=sid, priority=2, status="working", summary="newer triage", next_action="x", )) row = conn.execute( "SELECT source, summary FROM agent_status WHERE session_uuid = ?", (str(sid),) ).fetchone() assert row["source"] == "triage" assert row["summary"] == "newer triage" def test_list_agent_status() -> None: conn, gen = _setup() a, b = _uuid.uuid4(), _uuid.uuid4() service.report_agent_status(conn, gen, session_uuid=a, summary="A") service.report_agent_status(conn, gen, session_uuid=b, summary="B") fleet = service.list_agent_status(conn) assert len(fleet) == 2