From 00401248db397ef8cd5a9bdf45f141167277ef74 Mon Sep 17 00:00:00 2001 From: Natalie Date: Mon, 18 May 2026 16:14:40 -0700 Subject: [PATCH] =?UTF-8?q?feat(@projects/clare):=20=E2=9C=A8=20add=20live?= =?UTF-8?q?=20tool-call=20logging=20system?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- src/clare/orchestrator/mcp_server.py | 196 +++++++++++++++------------ src/clare/web/chat/handler.py | 13 +- tests/test_orchestrator_mcp.py | 35 +++++ tests/test_orchestrator_turn.py | 6 + 4 files changed, 163 insertions(+), 87 deletions(-) diff --git a/src/clare/orchestrator/mcp_server.py b/src/clare/orchestrator/mcp_server.py index 153aace..24be11b 100644 --- a/src/clare/orchestrator/mcp_server.py +++ b/src/clare/orchestrator/mcp_server.py @@ -47,9 +47,55 @@ def _open( return conn, gen -async def _run_sync(fn, /, *args, **kwargs): - """Run a sync tool body in a worker thread (sqlite3 + service.* are sync).""" - return await asyncio.to_thread(fn, *args, **kwargs) +def _short(v: Any) -> str: + """Compact a value for inclusion in a tool-call log message.""" + s = repr(v) + return s if len(s) <= 60 else s[:57] + "..." + + +def _log_tool_call( + conn: sqlite3.Connection, + gen: HLCGenerator, + name: str, + args: dict[str, Any], +) -> None: + """Post a `→ tool(args)` system message to the orchestrator chat. + + Gives the user live visibility into what the orchestrator is doing. + State-changing tools ALSO produce their own fan-out system messages + via the projection (service.* → events). The redundancy is intentional: + this log shows *intent*; the fan-out shows the *result*. + + Errors here are swallowed; logging must never break the actual call. + """ + try: + from ..domain import ChatRole, ChatScope + from ..events import ChatMessagePosted, append + rendered = ", ".join( + f"{k}={_short(v)}" for k, v in args.items() if v is not None + ) + body = f"→ {name}({rendered})" if rendered else f"→ {name}()" + append(conn, gen, ChatMessagePosted( + scope=ChatScope.ORCHESTRATOR, scope_ref=None, + role=ChatRole.SYSTEM, body=body, + meta={"kind": "tool_call", "tool": name, "args": args}, + )) + except Exception: + pass + + +async def _call_tool(name: str, args: dict[str, Any], fn) -> Any: + """One pattern for every MCP tool: log → open → run → close. + + `fn(conn, gen)` returns the tool's payload synchronously; we hop into a + worker thread to keep the MCP event loop free. + """ + conn, gen = _open() + try: + _log_tool_call(conn, gen, name, args) + return await asyncio.to_thread(fn, conn, gen) + finally: + conn.close() # --------------------------------------------------------------------------- @@ -87,78 +133,69 @@ def build_server() -> FastMCP: @mcp.tool() async def create_project(name: str, goal: str | None = None, owner: str | None = None) -> dict[str, Any]: """Create a new project.""" - conn, gen = _open() - try: - return await _run_sync(tools.create_project, conn, gen, name=name, goal=goal, owner=owner) - finally: - conn.close() + return await _call_tool( + "create_project", {"name": name, "goal": goal, "owner": owner}, + lambda c, g: tools.create_project(c, g, name=name, goal=goal, owner=owner), + ) @mcp.tool() async def add_task( project: str, title: str, priority: int = 2, description: str | None = None, ) -> dict[str, Any]: """Add a task to a project. Priority 0=urgent, 4=abandonable.""" - conn, gen = _open() - try: - return await _run_sync( - tools.add_task, conn, gen, - project=project, title=title, priority=priority, description=description, - ) - finally: - conn.close() + return await _call_tool( + "add_task", + {"project": project, "title": title, "priority": priority, "description": description}, + lambda c, g: tools.add_task( + c, g, project=project, title=title, priority=priority, description=description, + ), + ) @mcp.tool() async def list_tasks(project: str | None = None, status: str | None = None) -> dict[str, Any]: """List tasks (optional project name and/or status filter).""" - conn, _ = _open() - try: - return await _run_sync(tools.list_tasks, conn, project=project, status=status) - finally: - conn.close() + return await _call_tool( + "list_tasks", {"project": project, "status": status}, + lambda c, _g: tools.list_tasks(c, project=project, status=status), + ) @mcp.tool() async def create_assignment(task_ref: str, session_ref: str) -> dict[str, Any]: """Assign a task to a session. Accepts full uuid or unambiguous prefix.""" - conn, gen = _open() - try: - return await _run_sync( - tools.create_assignment, conn, gen, - task_ref=task_ref, session_ref=session_ref, - ) - finally: - conn.close() + return await _call_tool( + "create_assignment", {"task_ref": task_ref, "session_ref": session_ref}, + lambda c, g: tools.create_assignment(c, g, task_ref=task_ref, session_ref=session_ref), + ) @mcp.tool() async def broadcast(target: str, text: str) -> dict[str, Any]: """Broadcast text to every session assigned in a project (or @group).""" - conn, gen = _open() - try: - return await _run_sync(tools.broadcast, conn, gen, target=target, text=text) - finally: - conn.close() + return await _call_tool( + "broadcast", {"target": target, "text": text}, + lambda c, g: tools.broadcast(c, g, target=target, text=text), + ) @mcp.tool() async def pull() -> dict[str, Any]: """Refresh fleet view from rclaude.""" - conn, gen = _open() - try: - return await _run_sync(tools.pull, conn, gen) - finally: - conn.close() + return await _call_tool( + "pull", {}, lambda c, g: tools.pull(c, g), + ) @mcp.tool() async def status() -> dict[str, Any]: """Fleet counts: projects, open tasks, sessions, blocked.""" - conn, _ = _open() - try: - return await _run_sync(tools.status, conn) - finally: - conn.close() + return await _call_tool( + "status", {}, lambda c, _g: tools.status(c), + ) @mcp.tool() async def help() -> dict[str, Any]: """List available Clare tools with one-line summaries.""" - return tools.help_text() + # help() takes no DB — log via a throwaway conn for symmetry. + return await _call_tool( + "help", {}, lambda _c, _g: tools.help_text(), + ) # Read tools ------------------------------------------------------------ @mcp.tool() @@ -166,69 +203,60 @@ def build_server() -> FastMCP: scope: str = "orchestrator", scope_ref: str | None = None, limit: int = 50, ) -> dict[str, Any]: """Recent chat messages for a scope, oldest-first.""" - conn, _ = _open() - try: - return await _run_sync( - tools.list_recent_events, conn, - scope=scope, scope_ref=scope_ref, limit=limit, - ) - finally: - conn.close() + return await _call_tool( + "list_recent_events", + {"scope": scope, "scope_ref": scope_ref, "limit": limit}, + lambda c, _g: tools.list_recent_events( + c, scope=scope, scope_ref=scope_ref, limit=limit, + ), + ) @mcp.tool() async def search_chat_messages( query: str, scope: str | None = None, limit: int = 50, ) -> dict[str, Any]: """Case-insensitive substring search over chat bodies.""" - conn, _ = _open() - try: - return await _run_sync( - tools.search_chat_messages, conn, - query=query, scope=scope, limit=limit, - ) - finally: - conn.close() + return await _call_tool( + "search_chat_messages", + {"query": query, "scope": scope, "limit": limit}, + lambda c, _g: tools.search_chat_messages( + c, query=query, scope=scope, limit=limit, + ), + ) @mcp.tool() async def get_session(uuid: str) -> dict[str, Any]: """Details for one session by uuid (or short prefix).""" - conn, _ = _open() - try: - return await _run_sync(tools.get_session, conn, uuid=uuid) - finally: - conn.close() + return await _call_tool( + "get_session", {"uuid": uuid}, + lambda c, _g: tools.get_session(c, uuid=uuid), + ) # Planning tools -------------------------------------------------------- @mcp.tool() async def summarize_project(name: str) -> dict[str, Any]: """Project summary with task counts and open-title preview.""" - conn, _ = _open() - try: - return await _run_sync(tools.summarize_project, conn, name=name) - finally: - conn.close() + return await _call_tool( + "summarize_project", {"name": name}, + lambda c, _g: tools.summarize_project(c, name=name), + ) @mcp.tool() async def suggest_assignments() -> dict[str, Any]: """Unassigned tasks + free sessions, ready to pair.""" - conn, _ = _open() - try: - return await _run_sync(tools.suggest_assignments, conn) - finally: - conn.close() + return await _call_tool( + "suggest_assignments", {}, + lambda c, _g: tools.suggest_assignments(c), + ) # Send tool ------------------------------------------------------------- @mcp.tool() async def send_to_session(session_ref: str, text: str) -> dict[str, Any]: """Send text directly to one session, bypassing project broadcast.""" - conn, gen = _open() - try: - return await _run_sync( - tools.send_to_session, conn, gen, - session_ref=session_ref, text=text, - ) - finally: - conn.close() + return await _call_tool( + "send_to_session", {"session_ref": session_ref, "text": text}, + lambda c, g: tools.send_to_session(c, g, session_ref=session_ref, text=text), + ) # Control tool: submit_chat_reply -------------------------------------- @mcp.tool() diff --git a/src/clare/web/chat/handler.py b/src/clare/web/chat/handler.py index 77ec77f..dc1e124 100644 --- a/src/clare/web/chat/handler.py +++ b/src/clare/web/chat/handler.py @@ -123,10 +123,17 @@ def _run_orchestrator( return [_post_clare( conn, gen, ctx, body=( - f"Orchestrator didn't respond within {cfg.reply_timeout_s}s. " - f"See `/chat/session/{cfg.session_uuid}` for live state." + f"Orchestrator didn't respond within {cfg.reply_timeout_s}s.\n" + f" • See live state: /chat/session/{cfg.session_uuid}\n" + f" • Resend the same message to retry (the session may have " + f"caught the previous one mid-flight)." ), - meta={"kind": "timeout", "turn_id": turn_id}, + meta={ + "kind": "timeout", + "turn_id": turn_id, + "original_body": body, + "session_uuid": cfg.session_uuid, + }, )] return [_post_clare( conn, gen, ctx, diff --git a/tests/test_orchestrator_mcp.py b/tests/test_orchestrator_mcp.py index ea93ee3..8c99444 100644 --- a/tests/test_orchestrator_mcp.py +++ b/tests/test_orchestrator_mcp.py @@ -113,6 +113,41 @@ async def test_submit_chat_reply_without_turn_id_errors_cleanly() -> None: assert "turn_id" in payload["error"] +@pytest.mark.asyncio +async def test_tool_call_posts_intent_log_to_orchestrator_chat( + tmp_path, monkeypatch, +) -> None: + """Every MCP tool should log a `→ tool(args)` system message before running. + + Uses XDG_*HOME isolation so the log writes go to a tmp DB. + """ + monkeypatch.setenv("XDG_DATA_HOME", str(tmp_path / "data")) + monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "config")) + # Reset the cached default server so it sees the new env. + monkeypatch.setattr(mcp_server, "_DEFAULT_SERVER", None) + + server = mcp_server.build_server() + await server.call_tool("create_project", {"name": "alpha", "goal": "ship"}) + + # Inspect chat_messages for the tool-call log. + from clare.db import migrate, open_db + conn = open_db() + migrate(conn) + try: + rows = conn.execute( + "SELECT body, meta FROM chat_messages " + "WHERE scope='orchestrator' AND role='system' " + "ORDER BY rowid" + ).fetchall() + finally: + conn.close() + bodies = [r["body"] for r in rows] + # The tool-call log comes before the side-effect fan-out. + assert any(b.startswith("→ create_project(") for b in bodies) + # The actual side-effect ("Project created: alpha") still appears too. + assert any("Project created" in b for b in bodies) + + @pytest.mark.asyncio async def test_submit_chat_reply_with_turn_id_unblocks_waiter() -> None: server = mcp_server.build_server() diff --git a/tests/test_orchestrator_turn.py b/tests/test_orchestrator_turn.py index 00a4e8e..f624e1d 100644 --- a/tests/test_orchestrator_turn.py +++ b/tests/test_orchestrator_turn.py @@ -134,6 +134,12 @@ def test_orchestrator_timeout_when_claude_never_replies( reply = r.json()["replies"][0] assert reply["meta"]["kind"] == "timeout" assert "didn't respond within 1s" in reply["body"] + # Retry hint + session link are both surfaced. + assert "Resend the same message" in reply["body"] + assert fake_session in reply["body"] + # Original body preserved in meta so a future retry button can re-use it. + assert reply["meta"]["original_body"] == "anyone home?" + assert reply["meta"]["session_uuid"] == fake_session def test_orchestrator_send_error_surfaces_cleanly(