✅ test(@projects/@clare): 🧪 add pm-command parser test suite
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
parent
bc894b550e
commit
d2e37ea751
2 changed files with 282 additions and 0 deletions
128
tests/test_pm_commands.py
Normal file
128
tests/test_pm_commands.py
Normal file
|
|
@ -0,0 +1,128 @@
|
|||
"""Parser/dispatcher tests for the PM-expansion chat commands."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from clare.db import migrate, open_db
|
||||
from clare.domain import ChatScope, TaskStatus, TaskType
|
||||
from clare.hlc import HLCGenerator
|
||||
from clare.web import service
|
||||
from clare.web.chat import commands as cmds
|
||||
|
||||
|
||||
def _setup() -> tuple:
|
||||
conn = open_db(":memory:")
|
||||
migrate(conn)
|
||||
gen = HLCGenerator("test-machine")
|
||||
return conn, gen
|
||||
|
||||
|
||||
def _orch_ctx() -> cmds.ScopeCtx:
|
||||
return cmds.ScopeCtx(scope=ChatScope.ORCHESTRATOR, scope_ref=None)
|
||||
|
||||
|
||||
def test_parse_org_new() -> None:
|
||||
cmd = cmds.parse("/org new acme --description big-corp", _orch_ctx())
|
||||
assert isinstance(cmd, cmds.OrgNewCmd)
|
||||
assert cmd.name == "acme" and cmd.description == "big-corp"
|
||||
|
||||
|
||||
def test_parse_person_new_requires_at_handle() -> None:
|
||||
with pytest.raises(Exception): # ValidationError from pydantic
|
||||
cmds.parse("/person new quinn Quinn", _orch_ctx())
|
||||
cmd = cmds.parse("/person new @quinn Quinn", _orch_ctx())
|
||||
assert isinstance(cmd, cmds.PersonNewCmd)
|
||||
assert cmd.handle == "@quinn" and cmd.display_name == "Quinn"
|
||||
|
||||
|
||||
def test_parse_state_invalid_value() -> None:
|
||||
with pytest.raises(cmds.ParseError):
|
||||
cmds.parse("/state abcd1234 flying", _orch_ctx())
|
||||
|
||||
|
||||
def test_parse_state_valid() -> None:
|
||||
cmd = cmds.parse("/state abcd1234 in_progress --reason 'started work'", _orch_ctx())
|
||||
assert isinstance(cmd, cmds.TaskStateCmd)
|
||||
assert cmd.to_state == TaskStatus.IN_PROGRESS
|
||||
assert cmd.reason == "started work"
|
||||
|
||||
|
||||
def test_parse_tag_and_untag() -> None:
|
||||
tag_cmd = cmds.parse("/tag abcd1234 urgent", _orch_ctx())
|
||||
assert isinstance(tag_cmd, cmds.TaskTagCmd) and not tag_cmd.remove
|
||||
untag_cmd = cmds.parse("/untag abcd1234 urgent", _orch_ctx())
|
||||
assert isinstance(untag_cmd, cmds.TaskTagCmd) and untag_cmd.remove
|
||||
|
||||
|
||||
def test_parse_own_with_and_without_handle() -> None:
|
||||
with_handle = cmds.parse("/own abcd1234 @quinn", _orch_ctx())
|
||||
assert isinstance(with_handle, cmds.TaskOwnCmd)
|
||||
assert with_handle.person_handle == "@quinn"
|
||||
without = cmds.parse("/own abcd1234", _orch_ctx())
|
||||
assert without.person_handle is None
|
||||
|
||||
|
||||
def test_parse_type_clear() -> None:
|
||||
typed = cmds.parse("/type abcd1234 bug", _orch_ctx())
|
||||
assert isinstance(typed, cmds.TaskTypeCmd) and typed.task_type == TaskType.BUG
|
||||
cleared = cmds.parse("/type abcd1234", _orch_ctx())
|
||||
assert cleared.task_type is None
|
||||
|
||||
|
||||
def test_parse_meta_flags() -> None:
|
||||
cmd = cmds.parse(
|
||||
"/meta abcd1234 --color #fff --emoji 🐛 --apply-color-rule off",
|
||||
_orch_ctx(),
|
||||
)
|
||||
assert isinstance(cmd, cmds.TaskMetaCmd)
|
||||
assert cmd.color == "#fff" and cmd.emoji == "🐛"
|
||||
assert cmd.apply_color_rule is False
|
||||
|
||||
|
||||
def test_dispatch_epic_new() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="proj")
|
||||
cmd = cmds.parse("/epic new 'auth rewrite' --project proj", _orch_ctx())
|
||||
result = cmds.dispatch(conn, gen, cmd, _orch_ctx())
|
||||
assert "auth rewrite" in result.body
|
||||
assert result.meta["kind"] == "epic_new"
|
||||
|
||||
|
||||
def test_dispatch_tag_creates_on_demand_then_untag() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
task = service.add_task(conn, gen, project="p", title="t")
|
||||
short = str(task.id)[:8]
|
||||
res1 = cmds.dispatch(
|
||||
conn, gen,
|
||||
cmds.parse(f"/tag {short} brand-new", _orch_ctx()),
|
||||
_orch_ctx(),
|
||||
)
|
||||
assert "tagged" in res1.body
|
||||
res2 = cmds.dispatch(
|
||||
conn, gen,
|
||||
cmds.parse(f"/untag {short} brand-new", _orch_ctx()),
|
||||
_orch_ctx(),
|
||||
)
|
||||
assert "untagged" in res2.body
|
||||
|
||||
|
||||
def test_dispatch_state_transition_legal_and_illegal() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
task = service.add_task(conn, gen, project="p", title="t")
|
||||
short = str(task.id)[:8]
|
||||
res = cmds.dispatch(
|
||||
conn, gen,
|
||||
cmds.parse(f"/state {short} in_progress", _orch_ctx()),
|
||||
_orch_ctx(),
|
||||
)
|
||||
assert "in_progress" in res.body
|
||||
# Illegal: in_progress → todo
|
||||
with pytest.raises(service.InvalidInput):
|
||||
cmds.dispatch(
|
||||
conn, gen,
|
||||
cmds.parse(f"/state {short} todo", _orch_ctx()),
|
||||
_orch_ctx(),
|
||||
)
|
||||
154
tests/test_pm_service.py
Normal file
154
tests/test_pm_service.py
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
"""Service-layer tests for PM expansion.
|
||||
|
||||
Covers state-machine enforcement, identity uniqueness, idempotent tagging,
|
||||
and the color-resolution chain.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid as _uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from clare.db import migrate, open_db
|
||||
from clare.domain import EpicStatus, TASK_TRANSITIONS, TaskStatus, TaskType
|
||||
from clare.hlc import HLCGenerator
|
||||
from clare.read import resolve_task_color
|
||||
from clare.web import service
|
||||
|
||||
|
||||
def _setup() -> tuple:
|
||||
conn = open_db(":memory:")
|
||||
migrate(conn)
|
||||
gen = HLCGenerator("test-machine")
|
||||
return conn, gen
|
||||
|
||||
|
||||
def test_create_org_unique() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_org(conn, gen, name="acme")
|
||||
with pytest.raises(service.Conflict):
|
||||
service.create_org(conn, gen, name="acme")
|
||||
|
||||
|
||||
def test_create_person_validates_handle_prefix() -> None:
|
||||
conn, gen = _setup()
|
||||
with pytest.raises(service.InvalidInput):
|
||||
service.create_person(conn, gen, handle="quinn", display_name="Q")
|
||||
service.create_person(conn, gen, handle="@quinn", display_name="Q")
|
||||
with pytest.raises(service.Conflict):
|
||||
service.create_person(conn, gen, handle="@quinn", display_name="Q2")
|
||||
|
||||
|
||||
def test_create_epic_requires_project() -> None:
|
||||
conn, gen = _setup()
|
||||
with pytest.raises(service.NotFound):
|
||||
service.create_epic(conn, gen, project="nope", title="t")
|
||||
service.create_project(conn, gen, name="proj")
|
||||
epic = service.create_epic(conn, gen, project="proj", title="E1")
|
||||
assert epic.title == "E1" and epic.status == EpicStatus.OPEN
|
||||
|
||||
|
||||
def test_archive_epic_already_archived() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
epic = service.create_epic(conn, gen, project="p", title="e")
|
||||
service.archive_epic(conn, gen, epic_id=epic.id)
|
||||
with pytest.raises(service.Conflict):
|
||||
service.archive_epic(conn, gen, epic_id=epic.id)
|
||||
|
||||
|
||||
def test_transition_rejects_illegal() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
task = service.add_task(conn, gen, project="p", title="t")
|
||||
# todo → review is illegal — must go through in_progress
|
||||
with pytest.raises(service.InvalidInput):
|
||||
service.transition_task_state(
|
||||
conn, gen, task_id=task.id, to_state=TaskStatus.REVIEW,
|
||||
)
|
||||
|
||||
|
||||
def test_transition_all_legal_pairs() -> None:
|
||||
"""Iterate every TASK_TRANSITIONS pair: each must be accepted."""
|
||||
for from_state, to_state in TASK_TRANSITIONS:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
task = service.add_task(conn, gen, project="p", title="t")
|
||||
# Manually seed the task into `from_state` via direct UPDATE — we're
|
||||
# testing the transition_task_state validator, not the path to reach it.
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status = ? WHERE id = ?",
|
||||
(from_state.value, str(task.id)),
|
||||
)
|
||||
updated = service.transition_task_state(
|
||||
conn, gen, task_id=task.id, to_state=to_state,
|
||||
)
|
||||
assert updated.status == to_state
|
||||
|
||||
|
||||
def test_tag_task_idempotent_and_untag() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
task = service.add_task(conn, gen, project="p", title="t")
|
||||
tag = service.create_tag(conn, gen, name="urgent")
|
||||
service.tag_task(conn, gen, task_id=task.id, tag_id=tag.id)
|
||||
# Second tag: idempotent (no Conflict)
|
||||
service.tag_task(conn, gen, task_id=task.id, tag_id=tag.id)
|
||||
# Untag, untag again: also a no-op.
|
||||
service.untag_task(conn, gen, task_id=task.id, tag_id=tag.id)
|
||||
service.untag_task(conn, gen, task_id=task.id, tag_id=tag.id)
|
||||
|
||||
|
||||
def test_get_or_create_tag() -> None:
|
||||
conn, gen = _setup()
|
||||
t1 = service.get_or_create_tag(conn, gen, name="bug")
|
||||
t2 = service.get_or_create_tag(conn, gen, name="bug")
|
||||
assert t1.id == t2.id
|
||||
|
||||
|
||||
def test_set_task_owner_unknown_person() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
task = service.add_task(conn, gen, project="p", title="t")
|
||||
with pytest.raises(service.NotFound):
|
||||
service.set_task_owner(conn, gen, task_id=task.id, owner_person_id=_uuid.uuid4())
|
||||
|
||||
|
||||
def test_set_task_meta_noop_when_all_none() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
task = service.add_task(conn, gen, project="p", title="t")
|
||||
# All-None call returns the task unchanged and emits no event.
|
||||
before_count = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||
service.set_task_meta(conn, gen, task_id=task.id)
|
||||
after_count = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0]
|
||||
assert before_count == after_count
|
||||
|
||||
|
||||
def test_resolve_task_color_priority_chain() -> None:
|
||||
conn, gen = _setup()
|
||||
service.create_project(conn, gen, name="p")
|
||||
task = service.add_task(conn, gen, project="p", title="t")
|
||||
# Default: state color for "todo"
|
||||
assert resolve_task_color(conn, task) == "#94a3b8"
|
||||
# Type wins over state
|
||||
task = service.set_task_type(conn, gen, task_id=task.id, task_type=TaskType.BUG)
|
||||
assert resolve_task_color(conn, task) == "#ef4444"
|
||||
# Category wins over type
|
||||
cat = service.create_category(conn, gen, name="urgent", color="#abcdef")
|
||||
task = service.set_task_category(conn, gen, task_id=task.id, category_id=cat.id)
|
||||
assert resolve_task_color(conn, task) == "#abcdef"
|
||||
# Tag wins over category
|
||||
tag = service.create_tag(conn, gen, name="critical", color="#123456")
|
||||
service.tag_task(conn, gen, task_id=task.id, tag_id=tag.id)
|
||||
from clare.read import get_task as _get
|
||||
task = _get(conn, task.id)
|
||||
assert task is not None
|
||||
assert resolve_task_color(conn, task) == "#123456"
|
||||
# Explicit color wins over everything
|
||||
task = service.set_task_meta(conn, gen, task_id=task.id, color="#deadbe")
|
||||
assert resolve_task_color(conn, task) == "#deadbe"
|
||||
# apply_color_rule=False with no explicit color returns None
|
||||
task = service.set_task_meta(conn, gen, task_id=task.id, color="", apply_color_rule=False)
|
||||
assert resolve_task_color(conn, task) is None
|
||||
Loading…
Add table
Reference in a new issue