claire/tests/test_pm_events.py
autocommit 6d212b7dbe refactor(testing-test): ♻️ Update test imports to use claire instead of clare in package references
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-20 19:54:05 -07:00

162 lines
7.3 KiB
Python

"""Projection tests for PM-expansion events.
Verifies that each new event payload writes to its projection table and that
a full `replay()` reconstructs identical state from the event log.
"""
from __future__ import annotations
import uuid as _uuid
from claire import events as ev
from claire.db import migrate, open_db
from claire.domain import EpicStatus, TaskStatus, TaskType
from claire.hlc import HLCGenerator
def _setup() -> tuple:
conn = open_db(":memory:")
migrate(conn)
gen = HLCGenerator("test-machine")
return conn, gen
def test_org_created_projects() -> None:
conn, gen = _setup()
org_id = _uuid.uuid4()
ev.append(conn, gen, ev.OrgCreated(org_id=org_id, name="acme", description="d"))
row = conn.execute("SELECT * FROM orgs WHERE id = ?", (str(org_id),)).fetchone()
assert row["name"] == "acme" and row["description"] == "d"
def test_person_created_unique_handle() -> None:
conn, gen = _setup()
p1 = _uuid.uuid4()
p2 = _uuid.uuid4()
ev.append(conn, gen, ev.PersonCreated(person_id=p1, handle="@quinn", display_name="Quinn"))
# Second insert with the same handle violates UNIQUE — surfaces as an error
# at the projection layer (events.append uses a single transaction).
try:
ev.append(conn, gen, ev.PersonCreated(person_id=p2, handle="@quinn", display_name="Quinn 2"))
raise AssertionError("expected UNIQUE constraint violation")
except Exception: # noqa: BLE001
pass
def test_epic_archive_nulls_task_epic_id() -> None:
conn, gen = _setup()
proj_id = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="proj"))
epic_id = _uuid.uuid4()
ev.append(conn, gen, ev.EpicCreated(epic_id=epic_id, project_id=proj_id, title="E"))
# Three tasks linked to the epic via direct UPDATE (we have no
# "set epic_id" event yet — tasks land in the epic via projection-level
# link in normal usage; here we set epic_id manually for the test).
for _ in range(3):
tid = _uuid.uuid4()
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=proj_id, title="t"))
conn.execute("UPDATE tasks SET epic_id = ? WHERE id = ?", (str(epic_id), str(tid)))
assert conn.execute("SELECT COUNT(*) FROM tasks WHERE epic_id = ?", (str(epic_id),)).fetchone()[0] == 3
ev.append(conn, gen, ev.EpicUpdated(epic_id=epic_id, status=EpicStatus.ARCHIVED))
assert conn.execute("SELECT COUNT(*) FROM tasks WHERE epic_id = ?", (str(epic_id),)).fetchone()[0] == 0
def test_task_state_transition_writes_history() -> None:
conn, gen = _setup()
proj_id = _uuid.uuid4()
task_id = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="proj"))
ev.append(conn, gen, ev.TaskAdded(task_id=task_id, project_id=proj_id, title="t"))
ev.append(conn, gen, ev.TaskStateTransitioned(
task_id=task_id, from_state=TaskStatus.TODO, to_state=TaskStatus.IN_PROGRESS,
))
row = conn.execute("SELECT status FROM tasks WHERE id = ?", (str(task_id),)).fetchone()
assert row["status"] == "in_progress"
hist = conn.execute(
"SELECT from_state, to_state FROM task_state_history WHERE task_id = ?", (str(task_id),)
).fetchall()
assert len(hist) == 1
assert hist[0]["from_state"] == "todo" and hist[0]["to_state"] == "in_progress"
def test_task_tagged_and_untagged_idempotent() -> None:
conn, gen = _setup()
proj_id = _uuid.uuid4()
task_id = _uuid.uuid4()
tag_id = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="p"))
ev.append(conn, gen, ev.TaskAdded(task_id=task_id, project_id=proj_id, title="t"))
ev.append(conn, gen, ev.TagCreated(tag_id=tag_id, name="urgent"))
ev.append(conn, gen, ev.TaskTagged(task_id=task_id, tag_id=tag_id))
ev.append(conn, gen, ev.TaskTagged(task_id=task_id, tag_id=tag_id)) # idempotent
count = conn.execute(
"SELECT COUNT(*) FROM task_tags WHERE task_id = ?", (str(task_id),)
).fetchone()[0]
assert count == 1
ev.append(conn, gen, ev.TaskUntagged(task_id=task_id, tag_id=tag_id))
ev.append(conn, gen, ev.TaskUntagged(task_id=task_id, tag_id=tag_id)) # no-op
count = conn.execute(
"SELECT COUNT(*) FROM task_tags WHERE task_id = ?", (str(task_id),)
).fetchone()[0]
assert count == 0
def test_task_meta_partial_update() -> None:
conn, gen = _setup()
proj_id = _uuid.uuid4()
task_id = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="p"))
ev.append(conn, gen, ev.TaskAdded(task_id=task_id, project_id=proj_id, title="t"))
ev.append(conn, gen, ev.TaskMetaSet(task_id=task_id, emoji="🐛"))
row = conn.execute(
"SELECT color, emoji, apply_color_rule FROM tasks WHERE id = ?", (str(task_id),)
).fetchone()
assert row["emoji"] == "🐛"
assert row["color"] is None # unchanged
assert row["apply_color_rule"] == 1
ev.append(conn, gen, ev.TaskMetaSet(task_id=task_id, color="#ff0000"))
row = conn.execute(
"SELECT color, emoji FROM tasks WHERE id = ?", (str(task_id),)
).fetchone()
assert row["color"] == "#ff0000" and row["emoji"] == "🐛"
def test_replay_reproduces_pm_projection() -> None:
"""Full replay round-trip on a PM-rich event log produces identical state."""
conn, gen = _setup()
org_id = _uuid.uuid4()
person_id = _uuid.uuid4()
proj_id = _uuid.uuid4()
epic_id = _uuid.uuid4()
tag_id = _uuid.uuid4()
task_id = _uuid.uuid4()
ev.append(conn, gen, ev.OrgCreated(org_id=org_id, name="acme"))
ev.append(conn, gen, ev.PersonCreated(person_id=person_id, handle="@q", display_name="Q"))
ev.append(conn, gen, ev.ProjectCreated(project_id=proj_id, name="p"))
ev.append(conn, gen, ev.EpicCreated(epic_id=epic_id, project_id=proj_id, title="E"))
ev.append(conn, gen, ev.TagCreated(tag_id=tag_id, name="urgent", color="#f00"))
ev.append(conn, gen, ev.TaskAdded(task_id=task_id, project_id=proj_id, title="t"))
ev.append(conn, gen, ev.TaskTagged(task_id=task_id, tag_id=tag_id))
ev.append(conn, gen, ev.TaskOwnerSet(task_id=task_id, owner_person_id=person_id))
ev.append(conn, gen, ev.TaskTypeSet(task_id=task_id, task_type=TaskType.BUG))
ev.append(conn, gen, ev.TaskStateTransitioned(
task_id=task_id, from_state=TaskStatus.TODO, to_state=TaskStatus.IN_PROGRESS,
))
def snapshot() -> dict:
return {
"orgs": conn.execute("SELECT name FROM orgs ORDER BY name").fetchall(),
"people": conn.execute("SELECT handle FROM people ORDER BY handle").fetchall(),
"projects": conn.execute("SELECT name FROM projects ORDER BY name").fetchall(),
"epics": conn.execute("SELECT title, status FROM epics ORDER BY title").fetchall(),
"tags": conn.execute("SELECT name, color FROM tags ORDER BY name").fetchall(),
"tasks": conn.execute("SELECT status, task_type, owner_person_id FROM tasks").fetchall(),
"tags_links": conn.execute("SELECT * FROM task_tags").fetchall(),
"history": conn.execute("SELECT from_state, to_state FROM task_state_history").fetchall(),
}
before = {k: [tuple(r) for r in v] for k, v in snapshot().items()}
n = ev.replay(conn)
assert n >= 10
after = {k: [tuple(r) for r in v] for k, v in snapshot().items()}
assert before == after