claire/tests/test_events.py
autocommit 6d212b7dbe refactor(testing-test): ♻️ Update test imports to use claire instead of clare in package references
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-20 19:54:05 -07:00

87 lines
2.8 KiB
Python

from __future__ import annotations
import uuid as _uuid
from claire import events as ev
from claire import read
from claire.domain import ProjectStatus, TaskStatus
def test_project_created_creates_projection_row(conn, gen) -> None:
pid = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
proj = read.get_project(conn, "alpha")
assert proj is not None
assert proj.id == pid
assert proj.name == "alpha"
assert proj.status == ProjectStatus.ACTIVE
def test_task_added_creates_task_with_priority(conn, gen) -> None:
pid = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
tid = _uuid.uuid4()
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="t1", priority=1))
task = read.get_task(conn, tid)
assert task is not None
assert task.priority == 1
assert task.status == TaskStatus.TODO
def test_task_updated_changes_status(conn, gen) -> None:
pid, tid = _uuid.uuid4(), _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="t1"))
ev.append(conn, gen, ev.TaskUpdated(task_id=tid, status=TaskStatus.DONE))
task = read.get_task(conn, tid)
assert task is not None
assert task.status == TaskStatus.DONE
def test_replay_rebuilds_projections(conn, gen) -> None:
"""Wiping projections and replaying events restores identical state."""
pid = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha", goal="g"))
tid = _uuid.uuid4()
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="t", priority=3))
before = read.get_project(conn, "alpha")
count = ev.replay(conn)
after = read.get_project(conn, "alpha")
assert count == 2
assert after == before
assert read.get_task(conn, tid) is not None
def test_session_observed_upserts(conn, gen) -> None:
sid = _uuid.uuid4()
ev.append(
conn,
gen,
ev.SessionObserved(session_uuid=sid, host="local", cwd="/tmp"),
)
ev.append(
conn,
gen,
ev.SessionObserved(session_uuid=sid, host="local", cwd="/tmp"),
)
sessions = read.list_sessions(conn)
assert len(sessions) == 1
assert sessions[0].cwd == "/tmp"
def test_triage_recorded_updates_session(conn, gen) -> None:
sid = _uuid.uuid4()
ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host="local"))
ev.append(
conn,
gen,
ev.TriageRecorded(
session_uuid=sid, priority=0, status="active", summary="busy"
),
)
s = read.get_session(conn, sid)
assert s is not None
assert s.last_triage_priority == 0
assert s.last_triage_status == "active"