feat(@projects/clare): add live tool-call logging system

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Natalie 2026-05-18 16:14:40 -07:00
parent 507575bbad
commit 00401248db
4 changed files with 163 additions and 87 deletions

View file

@ -47,9 +47,55 @@ def _open(
return conn, gen
async def _run_sync(fn, /, *args, **kwargs):
"""Run a sync tool body in a worker thread (sqlite3 + service.* are sync)."""
return await asyncio.to_thread(fn, *args, **kwargs)
def _short(v: Any) -> str:
"""Compact a value for inclusion in a tool-call log message."""
s = repr(v)
return s if len(s) <= 60 else s[:57] + "..."
def _log_tool_call(
conn: sqlite3.Connection,
gen: HLCGenerator,
name: str,
args: dict[str, Any],
) -> None:
"""Post a `→ tool(args)` system message to the orchestrator chat.
Gives the user live visibility into what the orchestrator is doing.
State-changing tools ALSO produce their own fan-out system messages
via the projection (service.* events). The redundancy is intentional:
this log shows *intent*; the fan-out shows the *result*.
Errors here are swallowed; logging must never break the actual call.
"""
try:
from ..domain import ChatRole, ChatScope
from ..events import ChatMessagePosted, append
rendered = ", ".join(
f"{k}={_short(v)}" for k, v in args.items() if v is not None
)
body = f"{name}({rendered})" if rendered else f"{name}()"
append(conn, gen, ChatMessagePosted(
scope=ChatScope.ORCHESTRATOR, scope_ref=None,
role=ChatRole.SYSTEM, body=body,
meta={"kind": "tool_call", "tool": name, "args": args},
))
except Exception:
pass
async def _call_tool(name: str, args: dict[str, Any], fn) -> Any:
"""One pattern for every MCP tool: log → open → run → close.
`fn(conn, gen)` returns the tool's payload synchronously; we hop into a
worker thread to keep the MCP event loop free.
"""
conn, gen = _open()
try:
_log_tool_call(conn, gen, name, args)
return await asyncio.to_thread(fn, conn, gen)
finally:
conn.close()
# ---------------------------------------------------------------------------
@ -87,78 +133,69 @@ def build_server() -> FastMCP:
@mcp.tool()
async def create_project(name: str, goal: str | None = None, owner: str | None = None) -> dict[str, Any]:
"""Create a new project."""
conn, gen = _open()
try:
return await _run_sync(tools.create_project, conn, gen, name=name, goal=goal, owner=owner)
finally:
conn.close()
return await _call_tool(
"create_project", {"name": name, "goal": goal, "owner": owner},
lambda c, g: tools.create_project(c, g, name=name, goal=goal, owner=owner),
)
@mcp.tool()
async def add_task(
project: str, title: str, priority: int = 2, description: str | None = None,
) -> dict[str, Any]:
"""Add a task to a project. Priority 0=urgent, 4=abandonable."""
conn, gen = _open()
try:
return await _run_sync(
tools.add_task, conn, gen,
project=project, title=title, priority=priority, description=description,
)
finally:
conn.close()
return await _call_tool(
"add_task",
{"project": project, "title": title, "priority": priority, "description": description},
lambda c, g: tools.add_task(
c, g, project=project, title=title, priority=priority, description=description,
),
)
@mcp.tool()
async def list_tasks(project: str | None = None, status: str | None = None) -> dict[str, Any]:
"""List tasks (optional project name and/or status filter)."""
conn, _ = _open()
try:
return await _run_sync(tools.list_tasks, conn, project=project, status=status)
finally:
conn.close()
return await _call_tool(
"list_tasks", {"project": project, "status": status},
lambda c, _g: tools.list_tasks(c, project=project, status=status),
)
@mcp.tool()
async def create_assignment(task_ref: str, session_ref: str) -> dict[str, Any]:
"""Assign a task to a session. Accepts full uuid or unambiguous prefix."""
conn, gen = _open()
try:
return await _run_sync(
tools.create_assignment, conn, gen,
task_ref=task_ref, session_ref=session_ref,
)
finally:
conn.close()
return await _call_tool(
"create_assignment", {"task_ref": task_ref, "session_ref": session_ref},
lambda c, g: tools.create_assignment(c, g, task_ref=task_ref, session_ref=session_ref),
)
@mcp.tool()
async def broadcast(target: str, text: str) -> dict[str, Any]:
"""Broadcast text to every session assigned in a project (or @group)."""
conn, gen = _open()
try:
return await _run_sync(tools.broadcast, conn, gen, target=target, text=text)
finally:
conn.close()
return await _call_tool(
"broadcast", {"target": target, "text": text},
lambda c, g: tools.broadcast(c, g, target=target, text=text),
)
@mcp.tool()
async def pull() -> dict[str, Any]:
"""Refresh fleet view from rclaude."""
conn, gen = _open()
try:
return await _run_sync(tools.pull, conn, gen)
finally:
conn.close()
return await _call_tool(
"pull", {}, lambda c, g: tools.pull(c, g),
)
@mcp.tool()
async def status() -> dict[str, Any]:
"""Fleet counts: projects, open tasks, sessions, blocked."""
conn, _ = _open()
try:
return await _run_sync(tools.status, conn)
finally:
conn.close()
return await _call_tool(
"status", {}, lambda c, _g: tools.status(c),
)
@mcp.tool()
async def help() -> dict[str, Any]:
"""List available Clare tools with one-line summaries."""
return tools.help_text()
# help() takes no DB — log via a throwaway conn for symmetry.
return await _call_tool(
"help", {}, lambda _c, _g: tools.help_text(),
)
# Read tools ------------------------------------------------------------
@mcp.tool()
@ -166,69 +203,60 @@ def build_server() -> FastMCP:
scope: str = "orchestrator", scope_ref: str | None = None, limit: int = 50,
) -> dict[str, Any]:
"""Recent chat messages for a scope, oldest-first."""
conn, _ = _open()
try:
return await _run_sync(
tools.list_recent_events, conn,
scope=scope, scope_ref=scope_ref, limit=limit,
)
finally:
conn.close()
return await _call_tool(
"list_recent_events",
{"scope": scope, "scope_ref": scope_ref, "limit": limit},
lambda c, _g: tools.list_recent_events(
c, scope=scope, scope_ref=scope_ref, limit=limit,
),
)
@mcp.tool()
async def search_chat_messages(
query: str, scope: str | None = None, limit: int = 50,
) -> dict[str, Any]:
"""Case-insensitive substring search over chat bodies."""
conn, _ = _open()
try:
return await _run_sync(
tools.search_chat_messages, conn,
query=query, scope=scope, limit=limit,
)
finally:
conn.close()
return await _call_tool(
"search_chat_messages",
{"query": query, "scope": scope, "limit": limit},
lambda c, _g: tools.search_chat_messages(
c, query=query, scope=scope, limit=limit,
),
)
@mcp.tool()
async def get_session(uuid: str) -> dict[str, Any]:
"""Details for one session by uuid (or short prefix)."""
conn, _ = _open()
try:
return await _run_sync(tools.get_session, conn, uuid=uuid)
finally:
conn.close()
return await _call_tool(
"get_session", {"uuid": uuid},
lambda c, _g: tools.get_session(c, uuid=uuid),
)
# Planning tools --------------------------------------------------------
@mcp.tool()
async def summarize_project(name: str) -> dict[str, Any]:
"""Project summary with task counts and open-title preview."""
conn, _ = _open()
try:
return await _run_sync(tools.summarize_project, conn, name=name)
finally:
conn.close()
return await _call_tool(
"summarize_project", {"name": name},
lambda c, _g: tools.summarize_project(c, name=name),
)
@mcp.tool()
async def suggest_assignments() -> dict[str, Any]:
"""Unassigned tasks + free sessions, ready to pair."""
conn, _ = _open()
try:
return await _run_sync(tools.suggest_assignments, conn)
finally:
conn.close()
return await _call_tool(
"suggest_assignments", {},
lambda c, _g: tools.suggest_assignments(c),
)
# Send tool -------------------------------------------------------------
@mcp.tool()
async def send_to_session(session_ref: str, text: str) -> dict[str, Any]:
"""Send text directly to one session, bypassing project broadcast."""
conn, gen = _open()
try:
return await _run_sync(
tools.send_to_session, conn, gen,
session_ref=session_ref, text=text,
)
finally:
conn.close()
return await _call_tool(
"send_to_session", {"session_ref": session_ref, "text": text},
lambda c, g: tools.send_to_session(c, g, session_ref=session_ref, text=text),
)
# Control tool: submit_chat_reply --------------------------------------
@mcp.tool()

View file

@ -123,10 +123,17 @@ def _run_orchestrator(
return [_post_clare(
conn, gen, ctx,
body=(
f"Orchestrator didn't respond within {cfg.reply_timeout_s}s. "
f"See `/chat/session/{cfg.session_uuid}` for live state."
f"Orchestrator didn't respond within {cfg.reply_timeout_s}s.\n"
f" • See live state: /chat/session/{cfg.session_uuid}\n"
f" • Resend the same message to retry (the session may have "
f"caught the previous one mid-flight)."
),
meta={"kind": "timeout", "turn_id": turn_id},
meta={
"kind": "timeout",
"turn_id": turn_id,
"original_body": body,
"session_uuid": cfg.session_uuid,
},
)]
return [_post_clare(
conn, gen, ctx,

View file

@ -113,6 +113,41 @@ async def test_submit_chat_reply_without_turn_id_errors_cleanly() -> None:
assert "turn_id" in payload["error"]
@pytest.mark.asyncio
async def test_tool_call_posts_intent_log_to_orchestrator_chat(
tmp_path, monkeypatch,
) -> None:
"""Every MCP tool should log a `→ tool(args)` system message before running.
Uses XDG_*HOME isolation so the log writes go to a tmp DB.
"""
monkeypatch.setenv("XDG_DATA_HOME", str(tmp_path / "data"))
monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "config"))
# Reset the cached default server so it sees the new env.
monkeypatch.setattr(mcp_server, "_DEFAULT_SERVER", None)
server = mcp_server.build_server()
await server.call_tool("create_project", {"name": "alpha", "goal": "ship"})
# Inspect chat_messages for the tool-call log.
from clare.db import migrate, open_db
conn = open_db()
migrate(conn)
try:
rows = conn.execute(
"SELECT body, meta FROM chat_messages "
"WHERE scope='orchestrator' AND role='system' "
"ORDER BY rowid"
).fetchall()
finally:
conn.close()
bodies = [r["body"] for r in rows]
# The tool-call log comes before the side-effect fan-out.
assert any(b.startswith("→ create_project(") for b in bodies)
# The actual side-effect ("Project created: alpha") still appears too.
assert any("Project created" in b for b in bodies)
@pytest.mark.asyncio
async def test_submit_chat_reply_with_turn_id_unblocks_waiter() -> None:
server = mcp_server.build_server()

View file

@ -134,6 +134,12 @@ def test_orchestrator_timeout_when_claude_never_replies(
reply = r.json()["replies"][0]
assert reply["meta"]["kind"] == "timeout"
assert "didn't respond within 1s" in reply["body"]
# Retry hint + session link are both surfaced.
assert "Resend the same message" in reply["body"]
assert fake_session in reply["body"]
# Original body preserved in meta so a future retry button can re-use it.
assert reply["meta"]["original_body"] == "anyone home?"
assert reply["meta"]["session_uuid"] == fake_session
def test_orchestrator_send_error_surfaces_cleanly(