claire/tests/test_chat_projection.py
autocommit 6d212b7dbe refactor(testing-test): ♻️ Update test imports to use claire instead of clare in package references
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-05-20 19:54:05 -07:00

235 lines
9.3 KiB
Python

"""Chat projection fan-out tests.
System messages are projection side-effects of other events — they're written
to `chat_messages` when the originating event is applied. These tests pin the
routing rules from the design doc.
"""
from __future__ import annotations
import uuid as _uuid
from claire import events as ev
from claire import read
from claire.domain import ChatRole, ChatScope, TaskStatus
def _orch(conn):
return read.list_chat_messages(conn, scope=ChatScope.ORCHESTRATOR, scope_ref=None)
def _proj(conn, name: str):
return read.list_chat_messages(conn, scope=ChatScope.PROJECT, scope_ref=name)
def _sess(conn, sid):
return read.list_chat_messages(conn, scope=ChatScope.SESSION, scope_ref=str(sid))
# ---------------------------------------------------------------------------
# ChatMessagePosted — explicit posts go to exactly one scope, no fan-out.
# ---------------------------------------------------------------------------
def test_chat_message_posted_orchestrator(conn, gen) -> None:
ev.append(
conn, gen,
ev.ChatMessagePosted(
scope=ChatScope.ORCHESTRATOR, role=ChatRole.USER, body="hello claire"
),
)
msgs = _orch(conn)
assert len(msgs) == 1
assert msgs[0].body == "hello claire"
assert msgs[0].role == ChatRole.USER
def test_chat_message_posted_project_only(conn, gen) -> None:
pid = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(
conn, gen,
ev.ChatMessagePosted(
scope=ChatScope.PROJECT, scope_ref="alpha",
role=ChatRole.USER, body="user note",
),
)
# Project chat has the system "project created" note + the user message.
proj_msgs = _proj(conn, "alpha")
bodies = [m.body for m in proj_msgs]
assert "user note" in bodies
# User message is NOT cross-fanned to orchestrator.
orch_bodies = [m.body for m in _orch(conn)]
assert "user note" not in orch_bodies
# ---------------------------------------------------------------------------
# ProjectCreated → orchestrator system note
# ---------------------------------------------------------------------------
def test_project_created_fans_out_to_orchestrator(conn, gen) -> None:
pid = _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
msgs = _orch(conn)
assert len(msgs) == 1
assert "alpha" in msgs[0].body
assert msgs[0].role == ChatRole.SYSTEM
# ---------------------------------------------------------------------------
# TaskAdded — project scope always, orchestrator only for P0/P1
# ---------------------------------------------------------------------------
def test_task_added_fans_out_to_project_only_for_p2(conn, gen) -> None:
pid, tid = _uuid.uuid4(), _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="t1", priority=2))
proj_bodies = [m.body for m in _proj(conn, "alpha")]
assert any("t1" in b for b in proj_bodies)
# Orchestrator gets the ProjectCreated note but NOT the P2 task.
orch_bodies = [m.body for m in _orch(conn)]
assert not any("t1" in b for b in orch_bodies)
def test_task_added_fans_out_to_orchestrator_for_p0(conn, gen) -> None:
pid, tid = _uuid.uuid4(), _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="urgent", priority=0))
orch_bodies = [m.body for m in _orch(conn)]
assert any("urgent" in b for b in orch_bodies)
# ---------------------------------------------------------------------------
# TriageRecorded — session always, project if assigned, orchestrator if P0/P1
# ---------------------------------------------------------------------------
def test_triage_fans_out_to_session_always(conn, gen) -> None:
sid = _uuid.uuid4()
ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host="local"))
ev.append(
conn, gen,
ev.TriageRecorded(session_uuid=sid, priority=3, status="working", summary="ok"),
)
sess_msgs = _sess(conn, sid)
assert any("P3" in m.body and "working" in m.body for m in sess_msgs)
def test_triage_fans_out_to_project_when_assigned(conn, gen) -> None:
pid, tid, sid, aid = _uuid.uuid4(), _uuid.uuid4(), _uuid.uuid4(), _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="t"))
ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host="local"))
ev.append(conn, gen, ev.AssignmentCreated(assignment_id=aid, task_id=tid, session_uuid=sid))
ev.append(
conn, gen,
ev.TriageRecorded(session_uuid=sid, priority=2, status="working", summary="busy"),
)
proj_bodies = [m.body for m in _proj(conn, "alpha")]
assert any("busy" in b for b in proj_bodies)
def test_triage_p1_fans_out_to_orchestrator(conn, gen) -> None:
sid = _uuid.uuid4()
ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host="local"))
ev.append(
conn, gen,
ev.TriageRecorded(session_uuid=sid, priority=1, status="blocked", summary="needs key"),
)
orch_bodies = [m.body for m in _orch(conn)]
assert any("P1" in b and "needs key" in b for b in orch_bodies)
# ---------------------------------------------------------------------------
# AssignmentCreated → both project and session
# ---------------------------------------------------------------------------
def test_assignment_fans_out_to_project_and_session(conn, gen) -> None:
pid, tid, sid, aid = _uuid.uuid4(), _uuid.uuid4(), _uuid.uuid4(), _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="job"))
ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host="local"))
ev.append(conn, gen, ev.AssignmentCreated(assignment_id=aid, task_id=tid, session_uuid=sid))
proj_bodies = [m.body for m in _proj(conn, "alpha")]
assert any("Assigned" in b and "job" in b for b in proj_bodies)
sess_bodies = [m.body for m in _sess(conn, sid)]
assert any("job" in b for b in sess_bodies)
# ---------------------------------------------------------------------------
# TaskUpdated → project system note when fields changed
# ---------------------------------------------------------------------------
def test_task_updated_status_fans_out(conn, gen) -> None:
pid, tid = _uuid.uuid4(), _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="t"))
ev.append(conn, gen, ev.TaskUpdated(task_id=tid, status=TaskStatus.DONE))
proj_bodies = [m.body for m in _proj(conn, "alpha")]
assert any("status=done" in b for b in proj_bodies)
# ---------------------------------------------------------------------------
# TaskSplit — records link + project system note
# ---------------------------------------------------------------------------
def test_task_split_records_link_and_chat(conn, gen) -> None:
pid, parent = _uuid.uuid4(), _uuid.uuid4()
c1, c2 = _uuid.uuid4(), _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(conn, gen, ev.TaskAdded(task_id=parent, project_id=pid, title="big"))
ev.append(conn, gen, ev.TaskAdded(task_id=c1, project_id=pid, title="small-1"))
ev.append(conn, gen, ev.TaskAdded(task_id=c2, project_id=pid, title="small-2"))
ev.append(conn, gen, ev.TaskSplit(parent_task_id=parent, child_task_ids=(c1, c2)))
links = conn.execute(
"SELECT child_task_id FROM task_splits WHERE parent_task_id = ?",
(str(parent),),
).fetchall()
assert {row["child_task_id"] for row in links} == {str(c1), str(c2)}
proj_bodies = [m.body for m in _proj(conn, "alpha")]
assert any("Split" in b and "2 subtasks" in b for b in proj_bodies)
# ---------------------------------------------------------------------------
# Replay reconstructs identical chat history from the event log.
# ---------------------------------------------------------------------------
def test_replay_reconstructs_chat_messages(conn, gen) -> None:
pid, tid, sid, aid = _uuid.uuid4(), _uuid.uuid4(), _uuid.uuid4(), _uuid.uuid4()
ev.append(conn, gen, ev.ProjectCreated(project_id=pid, name="alpha"))
ev.append(conn, gen, ev.TaskAdded(task_id=tid, project_id=pid, title="t", priority=0))
ev.append(conn, gen, ev.SessionObserved(session_uuid=sid, host="local"))
ev.append(conn, gen, ev.AssignmentCreated(assignment_id=aid, task_id=tid, session_uuid=sid))
ev.append(
conn, gen,
ev.TriageRecorded(session_uuid=sid, priority=1, status="working", summary="going"),
)
ev.append(
conn, gen,
ev.ChatMessagePosted(
scope=ChatScope.ORCHESTRATOR, role=ChatRole.USER, body="hi"
),
)
before = conn.execute(
"SELECT scope, scope_ref, role, body FROM chat_messages ORDER BY rowid"
).fetchall()
before_bodies = [tuple(r) for r in before]
ev.replay(conn)
after = conn.execute(
"SELECT scope, scope_ref, role, body FROM chat_messages ORDER BY rowid"
).fetchall()
after_bodies = [tuple(r) for r in after]
assert before_bodies == after_bodies