feat(chat): Add ChatPage component with SSE support and backend SSE endpoint

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-05-20 20:07:40 -07:00
parent 29da6b434f
commit 5367c2cd43
3 changed files with 0 additions and 650 deletions

View file

@ -1,296 +0,0 @@
"""FastAPI factory for the Clare web layer.
Hosts:
- JSON API under `/api/v1/*` (api.router)
- SSE chat channel at `/chat/stream` (chat.stream.router)
- MCP server at `/mcp/*` (orchestrator.mcp_server.asgi_app)
- React SPA at every other path (_mount_spa)
On startup, the app supervises the orchestrator Claude session: if it's
not running, spawn it via rclaude in detached mode and persist the new
session uuid to clare.toml. `clare web` thus brings up *everything*
API, SPA, MCP, and the live orchestrator in one command.
"""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
logger = logging.getLogger(__name__)
_PKG_DIR = Path(__file__).parent
_SPA_DIST = _PKG_DIR / "app" / "dist"
_USE_CONFIG: Any = object()
def create_app(
*,
config_path: Path | None = None,
db_path: Path | None = None,
sync_secret: Any = _USE_CONFIG,
) -> FastAPI:
"""Build a FastAPI app.
`config_path` + `db_path` override the XDG defaults used by tests to
stand up multiple isolated Clares in one process. Production callers
pass nothing and Clare uses ~/.config and ~/.local/share.
`sync_secret` overrides the per-config value; default sentinel means
"read from config at request-validation time" (production behavior).
Pass an explicit `None` or string to pin for tests.
"""
# Build the MCP sub-app eagerly so we can chain its lifespan into the
# parent's — streamable HTTP relies on a task group started in that
# lifespan, and Starlette's `mount()` does not invoke sub-app lifespans.
from ..orchestrator.mcp_server import asgi_app as mcp_asgi_app
mcp_app = mcp_asgi_app()
@asynccontextmanager
async def lifespan(_app: FastAPI):
"""Bring up the orchestrator session after uvicorn is serving, then
keep it alive with a periodic heartbeat.
- Initial bootstrap runs in a worker thread so blocking subprocess
calls (rclaude, ssh, rsync) don't stall the event loop.
- A background heartbeat loop re-runs `ensure_running` every
HEARTBEAT_SECONDS so the supervisor recovers if the orchestrator
tmux dies mid-session (e.g. apricot tmux segfault).
- Both are fire-and-forget; failures log a warning but never abort
uvicorn the chat UI surfaces "not configured" until recovery.
"""
from ..orchestrator.bootstrap import ensure_running
HEARTBEAT_SECONDS = 60
async def _bootstrap_once() -> None:
try:
uuid = await asyncio.to_thread(ensure_running, config_path=config_path)
except Exception as exc: # noqa: BLE001 — supervisor must never crash startup
logger.warning("orchestrator bootstrap raised: %s", exc)
return
if uuid is None:
logger.warning(
"orchestrator bootstrap completed without a live session "
"(rclaude unavailable or discovery timed out); "
"chat will show 'not configured' until the session appears."
)
else:
logger.info("orchestrator session live: %s", uuid)
async def _heartbeat() -> None:
"""Re-check liveness on a fixed cadence; respawn on death."""
while True:
try:
await asyncio.sleep(HEARTBEAT_SECONDS)
await _bootstrap_once()
except asyncio.CancelledError:
return
except Exception as exc: # noqa: BLE001
logger.warning("orchestrator heartbeat raised: %s", exc)
async def _rounds_loop() -> None:
"""Periodic fleet-rounds. Posts a `DO ROUNDS` prompt to the
orchestrator on a cadence so Clare proactively scans the fleet,
flags blockers, and surfaces next-step recommendations without
waiting on a user turn. Disabled when rounds_interval_s == 0.
"""
from ..config import load_or_init
from ..db import migrate, open_db
from ..hlc import HLCGenerator
from ..domain import ChatScope
from .chat.handler import handle_input
cfg = load_or_init(config_path)
interval = cfg.orchestrator.rounds_interval_s
if interval <= 0:
return # disabled
logger.info("orchestrator rounds loop enabled (every %ds)", interval)
prompt = (
"DO ROUNDS. REDUCE the user's burden — act when safe, "
"escalate only when judgment is needed.\n"
"\n"
"Workflow:\n"
"1. `list_fleet` → see every alive session + host, cwd, project, "
"state, summary, source.\n"
"2. `list_recent_events` (limit 30) for project context.\n"
"3. **Take safe actions YOURSELF** — never ask the user to do "
"what you can:\n"
" • Stale (source=triage / no recent push) and idle? "
"`send_to_session session_ref=<uuid_prefix> "
"text='[probe] one-line status?'`\n"
" • Session reports done work? `transition_task_state` it.\n"
" • Pattern across sessions (same error / blocked on same "
"thing)? ONE broadcast with the fix.\n"
"4. RECONCILE ORGS & PROJECTS — the hierarchy must mirror "
"reality:\n"
" • A standalone repo `~/Code/@projects/@X` → project `X`.\n"
" • A monorepo PLATFORM with many concurrent workstreams "
"is an ORG, and each workstream is a project UNDER it. "
"`lilith-platform` is an org; `lilith-platform.live` is its "
"v2 codebase and there is also a v4 — model v2 and v4 as "
"projects under the `lilith-platform` org, and the distinct "
"workstreams inside (coworker-agent, messenger-pilot, "
"mcp-prospector, hotel-stays, FanimeCon-ads, "
"events-locations…) as their own projects under that org "
"too.\n"
" • Identify a session's workstream from its cwd subpath "
"when the path is distinctive (…/users/x/agents/coworker-"
"agent → `coworker-agent`); when sessions share the repo "
"root, read their triage summary to classify the workstream "
"(FanimeCon ads vs hotel-stays vs events-schema).\n"
" • Use `create_org`, then `create_project(..., org=<org>)`. "
"For projects that already exist flat, `set_project_org` to "
"re-parent them. `list_projects` shows the current org→"
"project tree; `list_fleet` shows live workstreams. Conflict "
"on create = already exists, fine.\n"
" • `set_project_status name_or_id=<x> status=archived` for "
"anything that's clearly a sync/demo/test artifact — never "
"delete, just archive.\n"
"5. CONSIDER DISPATCH — call `budget_status` and "
"`fleet_load`. If there is unassigned open work, pick the "
"top-priority task that fits the envelope (under the daily "
"token cap; P0/P1 only if past the low-priority floor; a "
"host with capacity). "
+ (
"Autonomous dispatch is ON — `dispatch_task(task_id, "
"host, cwd)` it directly.\n"
if cfg.orchestrator.autonomous_dispatch
else "Autonomous dispatch is OFF — do NOT dispatch; "
"instead surface the single best dispatch in NEEDS YOU "
"as a proposal (task, host, cwd, why) for the user to "
"approve.\n"
)
+ "6. ALWAYS `report_status` with your own one-line summary.\n"
"7. ALWAYS Write the formatted output below to "
"`.claude/plans/clare-mobile-app-plan.md` in your cwd. This "
"file IS the user's HUD when they connect via "
"/remote-control clare from claude.ai/code or mobile.\n"
"\n"
"Then reply (and Write to the plan file) using EXACTLY this "
"ASCII layout (no markdown). Skip any section with no live "
"items. Cap total reply at 30 lines.\n"
"\n"
"```\n"
"═══ CLARE ROUNDS ═══════════════════════════════════════════\n"
"\n"
"▸ PROJECTS\n"
" ┌─ <project-name> [<n alive agents>] state: <one-word>\n"
" │ next:\n"
" │ 1. <concrete next step>\n"
" │ 2. <concrete next step>\n"
" │ 3. <concrete next step>\n"
" └─\n"
" (repeat per project with live work)\n"
"\n"
"▸ RECENT ACCOMPLISHMENTS (since last rounds)\n"
" • <agent-handle or session-prefix> — <what shipped/closed>\n"
" • ...\n"
"\n"
"▸ AUTO-ACTIONS TAKEN THIS ROUND\n"
" • probed <N> sessions (results: <hits/misses>)\n"
" • broadcast: <N> (summary: <topic>)\n"
" • state transitions: <N>\n"
"\n"
"▸ NEEDS YOU\n"
" ⚠ <one-line question or decision> — context: <8-word why>\n"
" (omit section entirely when nothing needs the user)\n"
"════════════════════════════════════════════════════════════\n"
"```"
)
while True:
try:
await asyncio.sleep(interval)
def _post() -> None:
conn = open_db(db_path)
migrate(conn)
gen = HLCGenerator(cfg.machine_id)
handle_input(
conn, gen,
scope=ChatScope.ORCHESTRATOR, scope_ref=None,
body=prompt,
)
conn.close()
await asyncio.to_thread(_post)
except asyncio.CancelledError:
return
except Exception as exc: # noqa: BLE001
logger.warning("rounds loop tick raised: %s", exc)
# Enter the MCP sub-app's lifespan so its streamable HTTP task
# group is started before any request lands.
async with mcp_app.router.lifespan_context(mcp_app):
initial = asyncio.create_task(_bootstrap_once())
heartbeat = asyncio.create_task(_heartbeat())
rounds = asyncio.create_task(_rounds_loop())
try:
yield
finally:
rounds.cancel()
heartbeat.cancel()
initial.cancel()
app = FastAPI(title="Clare", version="0.1.0", lifespan=lifespan)
app.state.config_path = config_path
app.state.db_path = db_path
app.state.sync_secret_override = sync_secret
from . import api
from .chat import stream as chat_stream
app.include_router(api.router)
app.include_router(chat_stream.router)
# Clare MCP server — orchestrator's .mcp.json points here.
app.mount("/mcp", mcp_app, name="mcp")
# React SPA (catch-all). Order matters: mounted LAST so the explicit
# routers + /assets static mount take precedence on matching paths.
_mount_spa(app)
return app
# ---------------------------------------------------------------------------
# Single-page-app mount
# ---------------------------------------------------------------------------
_API_PREFIXES: tuple[str, ...] = (
"/api/", "/chat/stream", "/mcp/", "/assets/", "/docs", "/openapi",
)
def _mount_spa(app: FastAPI) -> None:
"""Serve the built React SPA.
Contract:
- If `web/app/dist/` doesn't exist, the catch-all is a no-op and any
non-API GET returns 404. (Tests don't build the SPA; they hit the
JSON API only.)
- If it exists, `/assets/*` serves Vite's hashed bundle; any GET that
isn't an API/SSE/MCP path returns `dist/index.html` so the React
router can take over.
"""
if not _SPA_DIST.exists():
return
assets = _SPA_DIST / "assets"
if assets.exists():
app.mount("/assets", StaticFiles(directory=str(assets)), name="spa-assets")
index = _SPA_DIST / "index.html"
@app.get("/{full_path:path}", include_in_schema=False)
def _spa_catchall(full_path: str, request: Request) -> FileResponse:
path = "/" + full_path
if any(path.startswith(p) for p in _API_PREFIXES):
# Should never reach here for those (routers + StaticFiles win),
# but guard explicitly so a stray miss returns a real 404 rather
# than the SPA shell.
raise HTTPException(404)
return FileResponse(str(index))

View file

@ -1,261 +0,0 @@
/**
* R3 chat surface.
*
* Three scopes from one component (orchestrator/project/session), driven by
* the `scope` prop the router passes and the `scope_ref` we resolve from
* `useParams`. Backfill + live updates come from `useChatStream`; submission
* goes through `postChatMessage` and we `ingest()` the echo so the user sees
* their message and Clare's reply without waiting for SSE.
*/
import { useEffect, useMemo, useRef, useState } from "react";
import type { ReactElement } from "react";
import { useParams } from "react-router-dom";
import styled from "styled-components";
import { MessageBubble } from "@lilith/ui-messaging";
import { ApiError, postChatMessage } from "../lib/api";
import { useChatStream } from "../lib/sse";
import type { ChatMessage, ChatScope } from "../lib/types";
import { ChatSidebar } from "./Sidebar";
import { Composer } from "./Composer";
import { SENDER_IDS, toMessageBubble } from "./messageAdapter";
const Layout = styled.div`
display: grid;
grid-template-columns: 220px 1fr;
flex: 1 1 auto;
min-height: 0;
width: 100%;
`;
const Main = styled.section`
display: flex;
flex-direction: column;
min-height: 0;
min-width: 0;
`;
const Header = styled.header`
display: flex;
align-items: center;
gap: 0.75rem;
padding: 0.6rem 1rem;
border-bottom: 1px solid ${({ theme }): string => theme.colors.border};
background: ${({ theme }): string => theme.colors.bgAlt};
`;
const Title = styled.h2`
margin: 0;
font-size: 0.95rem;
font-weight: 600;
color: ${({ theme }): string => theme.colors.fg};
`;
const SubTitle = styled.span`
color: ${({ theme }): string => theme.colors.dim};
font-size: 0.85rem;
`;
const StatusDot = styled.span<{ $on: boolean }>`
width: 8px;
height: 8px;
border-radius: 50%;
background: ${({ theme, $on }): string => ($on ? theme.colors.good : theme.colors.dim)};
margin-left: auto;
box-shadow: 0 0 6px
${({ theme, $on }): string => ($on ? theme.colors.good : "transparent")};
`;
const Feed = styled.div`
flex: 1 1 auto;
overflow-y: auto;
padding: 1rem;
display: flex;
flex-direction: column;
gap: 0.5rem;
`;
const Empty = styled.div`
color: ${({ theme }): string => theme.colors.dim};
font-style: italic;
margin: auto;
`;
const Row = styled.div<{ $role: "user" | "clare" | "system" }>`
display: flex;
justify-content: ${({ $role }): string =>
$role === "user" ? "flex-end" : "flex-start"};
opacity: ${({ $role }): number => ($role === "system" ? 0.75 : 1)};
`;
/**
* Role-specific frame around the library bubble. We don't override the bubble
* internals just wrap it so each role has a visually distinct accent.
*/
const Frame = styled.div<{ $role: "user" | "clare" | "system" }>`
max-width: 80%;
border-left: 3px solid
${({ theme, $role }): string =>
$role === "user"
? theme.colors.accent
: $role === "clare"
? theme.colors.good
: theme.colors.warn};
padding-left: 0.5rem;
`;
const RoleTag = styled.div<{ $role: "user" | "clare" | "system" }>`
font-size: 0.7rem;
text-transform: uppercase;
letter-spacing: 0.08em;
margin-bottom: 0.15rem;
color: ${({ theme, $role }): string =>
$role === "user"
? theme.colors.accent
: $role === "clare"
? theme.colors.good
: theme.colors.warn};
`;
const ErrorBar = styled.div`
display: flex;
align-items: center;
gap: 0.75rem;
padding: 0.5rem 1rem;
background: ${({ theme }): string => theme.colors.bgAlt};
border-top: 1px solid ${({ theme }): string => theme.colors.bad};
color: ${({ theme }): string => theme.colors.bad};
font-size: 0.85rem;
`;
const Dismiss = styled.button`
background: transparent;
border: 1px solid ${({ theme }): string => theme.colors.border};
color: ${({ theme }): string => theme.colors.dim};
border-radius: 3px;
padding: 0.1rem 0.5rem;
margin-left: auto;
cursor: pointer;
font: inherit;
&:hover {
color: ${({ theme }): string => theme.colors.fg};
}
`;
export interface ChatPageProps {
scope: ChatScope;
}
function placeholderFor(scope: ChatScope): string {
if (scope === "orchestrator") {
return "/help · /pull · /status · /project new <name> · or natural language";
}
if (scope === "project") {
return "/task new <title> · /broadcast <text> · /task list";
}
return "/broadcast <text> to send to this session";
}
function titleFor(scope: ChatScope, ref: string | null): { title: string; sub: string } {
if (scope === "orchestrator") return { title: "clare", sub: "orchestrator" };
if (scope === "project") return { title: ref ?? "(project)", sub: "project" };
return { title: ref ?? "(session)", sub: "session" };
}
export function ChatPage({ scope }: ChatPageProps): ReactElement {
const params = useParams<{ name?: string; uuid?: string }>();
const scopeRef: string | null =
scope === "project" ? params.name ?? null : scope === "session" ? params.uuid ?? null : null;
const { messages, connected, ingest } = useChatStream({ scope, scopeRef });
const [error, setError] = useState<string | null>(null);
const [sending, setSending] = useState<boolean>(false);
const feedRef = useRef<HTMLDivElement | null>(null);
// Auto-scroll on new messages.
useEffect((): void => {
const el = feedRef.current;
if (el === null) return;
el.scrollTop = el.scrollHeight;
}, [messages]);
// Clear inline error when the user changes scope.
useEffect((): void => {
setError(null);
}, [scope, scopeRef]);
async function handleSubmit(body: string): Promise<void> {
setSending(true);
setError(null);
try {
const resp = await postChatMessage({ scope, scope_ref: scopeRef, body });
ingest([resp.user_message, ...resp.replies]);
} catch (e: unknown) {
if (e instanceof ApiError) {
setError(e.detail);
} else if (e instanceof Error) {
setError(e.message);
} else {
setError("send failed");
}
} finally {
setSending(false);
}
}
const header = useMemo((): { title: string; sub: string } => titleFor(scope, scopeRef), [
scope,
scopeRef,
]);
return (
<Layout>
<ChatSidebar />
<Main>
<Header>
<Title>{header.title}</Title>
<SubTitle>· {header.sub}</SubTitle>
<StatusDot $on={connected} title={connected ? "live" : "disconnected"} />
</Header>
<Feed ref={feedRef}>
{messages.length === 0 ? (
<Empty>(empty chat say something)</Empty>
) : (
messages.map((m: ChatMessage): ReactElement => (
<Row key={m.rowid} $role={m.role}>
<Frame $role={m.role}>
<RoleTag $role={m.role}>{m.role}</RoleTag>
<MessageBubble
message={toMessageBubble(m)}
currentUserId={SENDER_IDS.user}
showTimestamp={false}
/>
</Frame>
</Row>
))
)}
</Feed>
{error !== null && (
<ErrorBar role="alert">
<span>{error}</span>
<Dismiss type="button" onClick={(): void => setError(null)}>
dismiss
</Dismiss>
</ErrorBar>
)}
<Composer
placeholder={placeholderFor(scope)}
disabled={sending}
onSubmit={handleSubmit}
/>
</Main>
</Layout>
);
}

View file

@ -1,93 +0,0 @@
/**
* useChatStream subscribe to the SSE channel for a chat scope.
*
* The FastAPI endpoint at `/chat/stream?scope=...&scope_ref=...&after_rowid=N`
* emits `event: chat\ndata: <html>` frames whenever new chat_messages rows
* appear. The HTML payload was for HTMX; we need parsed ChatMessage objects.
*
* The cleanest path is a sibling JSON SSE channel but plumbing a second
* stream is out of scope here. Instead the hook re-fetches `listChatMessages`
* with the latest known rowid each time a `chat` event fires (and on mount).
* The SSE channel is used only as a wake-up signal.
*
* Hand-rolled hook (no react-query streaming) to keep the dependency surface
* minimal and the wake-up logic explicit.
*/
import { useEffect, useState, useRef } from "react";
import { listChatMessages } from "./api";
import type { ChatMessage, ChatScope } from "./types";
export interface UseChatStreamArgs {
scope: ChatScope;
scopeRef: string | null;
}
export interface UseChatStreamResult {
messages: ChatMessage[];
connected: boolean;
/** Append a message locally (e.g. echoes from a POST) without a refetch. */
ingest: (msgs: ChatMessage[]) => void;
}
export function useChatStream({ scope, scopeRef }: UseChatStreamArgs): UseChatStreamResult {
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [connected, setConnected] = useState<boolean>(false);
const lastRowidRef = useRef<number>(0);
useEffect((): (() => void) => {
let cancelled = false;
const ctrl = new AbortController();
async function refresh(): Promise<void> {
try {
const rows = await listChatMessages(
{ scope, scope_ref: scopeRef, after_rowid: lastRowidRef.current, limit: 200 },
ctrl.signal,
);
if (cancelled || rows.length === 0) return;
lastRowidRef.current = rows[rows.length - 1]!.rowid;
setMessages((prev: ChatMessage[]): ChatMessage[] => mergeById(prev, rows));
} catch (e: unknown) {
if ((e as { name?: string }).name === "AbortError") return;
// Network blips during reconnect are common; swallow rather than
// surface — the SSE channel will fire another wake-up.
}
}
// Initial backfill (rowid 0 → everything).
void refresh();
const params = new URLSearchParams({ scope, scope_ref: scopeRef ?? "", after_rowid: "0" });
const es = new EventSource(`/chat/stream?${params.toString()}`);
es.addEventListener("open", (): void => setConnected(true));
es.addEventListener("error", (): void => setConnected(false));
es.addEventListener("chat", (): void => void refresh());
return (): void => {
cancelled = true;
ctrl.abort();
es.close();
};
}, [scope, scopeRef]);
function ingest(msgs: ChatMessage[]): void {
if (msgs.length === 0) return;
lastRowidRef.current = Math.max(
lastRowidRef.current,
msgs.reduce((m: number, r: ChatMessage): number => Math.max(m, r.rowid), 0),
);
setMessages((prev: ChatMessage[]): ChatMessage[] => mergeById(prev, msgs));
}
return { messages, connected, ingest };
}
function mergeById(prev: ChatMessage[], next: ChatMessage[]): ChatMessage[] {
const seen = new Set(prev.map((m: ChatMessage): number => m.rowid));
const additions = next.filter((m: ChatMessage): boolean => !seen.has(m.rowid));
if (additions.length === 0) return prev;
return [...prev, ...additions].sort(
(a: ChatMessage, b: ChatMessage): number => a.rowid - b.rowid,
);
}