diff --git a/apps/streams/package.json b/apps/streams/package.json index 512bcc2ca5a..f518670068e 100644 --- a/apps/streams/package.json +++ b/apps/streams/package.json @@ -13,9 +13,11 @@ "test:conformance": "npx @durable-streams/server-conformance-tests --run http://localhost:8080" }, "dependencies": { + "@anthropic-ai/claude-agent-sdk": "^0.2.19", "@durable-streams/client": "^0.2.0", "@durable-streams/server": "^0.2.0", "@superset/durable-session": "workspace:*", + "@tanstack/ai": "^0.3.0", "@tanstack/db": "^0.5.22", "@hono/node-server": "^1.13.0", "hono": "^4.4.0", diff --git a/apps/streams/src/claude-agent.ts b/apps/streams/src/claude-agent.ts new file mode 100644 index 00000000000..64f8caf0b8d --- /dev/null +++ b/apps/streams/src/claude-agent.ts @@ -0,0 +1,259 @@ +/** + * Claude Agent Endpoint + * + * Hono app that acts as an AI agent the proxy can invoke. + * The proxy's `invokeAgent()` POSTs to this endpoint and parses the SSE response. + * + * Flow: + * 1. Proxy sends { messages, stream, sessionId, cwd, env } + * 2. Agent extracts latest user message as the prompt + * 3. Runs `query()` from @anthropic-ai/claude-agent-sdk + * 4. Converts each SDKMessage to TanStack AI AG-UI chunks + * 5. Returns SSE response with `data: {chunk}\n\n` lines + * + * Session state: Maintains Map for multi-turn resume. + * Binary path: From CLAUDE_BINARY_PATH env var. + * Auth: From environment (ANTHROPIC_API_KEY or OAuth via ~/.claude/.credentials.json). + */ + +import { query } from "@anthropic-ai/claude-agent-sdk"; +import { Hono } from "hono"; +import { z } from "zod"; +import { createConverter } from "./sdk-to-ai-chunks"; + +// ============================================================================ +// Constants +// ============================================================================ + +const DEFAULT_MODEL = "claude-sonnet-4-5-20250929"; +const MAX_AGENT_TURNS = 25; +const SESSION_MAX_SIZE = 1000; +const SESSION_TTL_MS = 24 * 60 * 60 * 1000; // 24 hours + +// ============================================================================ +// Request Validation +// ============================================================================ + +const agentRequestSchema = z.object({ + messages: z + .array(z.object({ role: z.string(), content: z.string() })) + .optional(), + stream: z.boolean().optional(), + sessionId: z.string().optional(), + cwd: z.string().optional(), + env: z.record(z.string(), z.string()).optional(), +}); + +// ============================================================================ +// Session State +// ============================================================================ + +interface SessionEntry { + claudeSessionId: string; + lastAccessedAt: number; +} + +const claudeSessions = new Map(); + +function evictStaleSessions(): void { + const now = Date.now(); + for (const [key, entry] of claudeSessions) { + if (now - entry.lastAccessedAt > SESSION_TTL_MS) { + claudeSessions.delete(key); + } + } + + // If still over capacity, evict oldest entries + if (claudeSessions.size > SESSION_MAX_SIZE) { + const sorted = [...claudeSessions.entries()].sort( + (a, b) => a[1].lastAccessedAt - b[1].lastAccessedAt, + ); + const toRemove = sorted.slice(0, claudeSessions.size - SESSION_MAX_SIZE); + for (const [key] of toRemove) { + claudeSessions.delete(key); + } + } +} + +function getClaudeSessionId(sessionId: string): string | undefined { + const entry = claudeSessions.get(sessionId); + if (entry) { + entry.lastAccessedAt = Date.now(); + } + return entry?.claudeSessionId; +} + +function setClaudeSessionId(sessionId: string, claudeSessionId: string): void { + evictStaleSessions(); + claudeSessions.set(sessionId, { + claudeSessionId, + lastAccessedAt: Date.now(), + }); +} + +// ============================================================================ +// App +// ============================================================================ + +const app = new Hono(); + +app.post("/", async (c) => { + let rawBody: unknown; + try { + rawBody = await c.req.json(); + } catch { + return c.json({ error: "Invalid JSON body" }, 400); + } + + const parsed = agentRequestSchema.safeParse(rawBody); + + if (!parsed.success) { + return c.json( + { error: "Invalid request body", details: parsed.error.message }, + 400, + ); + } + + const { messages, sessionId, cwd, env: agentEnv } = parsed.data; + + // Extract prompt from latest user message + const latestUserMessage = messages?.filter((m) => m.role === "user").pop(); + + if (!latestUserMessage) { + return c.json({ error: "No user message found" }, 400); + } + + const prompt = latestUserMessage.content; + const claudeSessionId = sessionId ? getClaudeSessionId(sessionId) : undefined; + + // Build environment for Claude binary + const baseEnv = + agentEnv ?? (process.env as unknown as Record); + const queryEnv: Record = { ...baseEnv }; + + // Ensure CLAUDE_CODE_ENTRYPOINT is set + queryEnv.CLAUDE_CODE_ENTRYPOINT = "sdk-ts"; + + const binaryPath = process.env.CLAUDE_BINARY_PATH; + + // Run Claude query + const abortController = new AbortController(); + const result = query({ + prompt, + options: { + ...(claudeSessionId && { resume: claudeSessionId }), + ...(cwd && { cwd }), + model: process.env.CLAUDE_MODEL ?? DEFAULT_MODEL, + maxTurns: MAX_AGENT_TURNS, + includePartialMessages: true, + permissionMode: "bypassPermissions" as const, + ...(binaryPath && { pathToClaudeCodeExecutable: binaryPath }), + env: queryEnv, + abortController, + }, + }); + + // Create stateful converter + const converter = createConverter(); + + // Abort handling: when the fetch is aborted, interrupt the query + const requestSignal = c.req.raw.signal; + const abortHandler = () => { + abortController.abort(); + result.interrupt().catch(() => {}); + result.close(); + }; + requestSignal.addEventListener("abort", abortHandler, { once: true }); + + // Return SSE response + const encoder = new TextEncoder(); + const readable = new ReadableStream({ + async start(controller) { + try { + for await (const message of result) { + if (requestSignal.aborted) break; + + // Extract claudeSessionId from system init + const msg = message as Record; + if (msg.type === "system" && msg.subtype === "init") { + const sdkSessionId = msg.session_id as string | undefined; + if (sdkSessionId && sessionId) { + setClaudeSessionId(sessionId, sdkSessionId); + } + continue; + } + + // Convert SDKMessage to AG-UI chunks + const chunks = converter.convert(message); + for (const chunk of chunks) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`), + ); + } + } + + controller.enqueue(encoder.encode("data: [DONE]\n\n")); + controller.close(); + } catch (err) { + if ((err as Error).name !== "AbortError") { + console.error("[claude-agent] Stream error:", err); + const errorChunk = { + type: "RUN_ERROR", + runId: converter.state.runId, + error: { + message: (err as Error).message ?? "Unknown error", + }, + timestamp: Date.now(), + }; + try { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(errorChunk)}\n\n`), + ); + controller.enqueue(encoder.encode("data: [DONE]\n\n")); + } catch (enqueueErr) { + console.debug( + "[claude-agent] Controller already closed, could not write error event:", + enqueueErr, + ); + } + } + + try { + controller.close(); + } catch (closeErr) { + console.debug("[claude-agent] Controller already closed:", closeErr); + } + } finally { + requestSignal.removeEventListener("abort", abortHandler); + try { + result.close(); + } catch (resultCloseErr) { + console.debug( + "[claude-agent] Result already closed:", + resultCloseErr, + ); + } + } + }, + }); + + return new Response(readable, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }); +}); + +// Health check for the agent +app.get("/health", (c) => { + return c.json({ + status: "ok", + agent: "claude", + hasBinary: !!process.env.CLAUDE_BINARY_PATH, + activeSessions: claudeSessions.size, + }); +}); + +export { app as claudeAgentApp }; diff --git a/apps/streams/src/index.ts b/apps/streams/src/index.ts index affce4c3a31..eeb2b58b21a 100644 --- a/apps/streams/src/index.ts +++ b/apps/streams/src/index.ts @@ -1,9 +1,11 @@ import { DurableStreamTestServer } from "@durable-streams/server"; import { serve } from "@hono/node-server"; +import { claudeAgentApp } from "./claude-agent"; import { createServer } from "./server"; const PORT = parseInt(process.env.PORT ?? "8080", 10); const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? "8081", 10); +const AGENT_PORT = parseInt(process.env.CLAUDE_AGENT_PORT ?? "9090", 10); const DURABLE_STREAMS_URL = process.env.DURABLE_STREAMS_URL ?? `http://127.0.0.1:${INTERNAL_PORT}`; @@ -21,12 +23,24 @@ const { app } = createServer({ logging: true, }); -serve({ fetch: app.fetch, port: PORT }, (info) => { +const proxyServer = serve({ fetch: app.fetch, port: PORT }, (info) => { console.log(`[streams] Proxy running on http://localhost:${info.port}`); }); +// Start Claude agent endpoint +const agentServer = serve( + { fetch: claudeAgentApp.fetch, port: AGENT_PORT }, + (info) => { + console.log( + `[streams] Claude agent endpoint on http://localhost:${info.port}`, + ); + }, +); + // Graceful shutdown process.on("SIGINT", async () => { + proxyServer.close(); + agentServer.close(); await durableStreamServer.stop(); process.exit(0); }); diff --git a/apps/streams/src/sdk-to-ai-chunks.ts b/apps/streams/src/sdk-to-ai-chunks.ts new file mode 100644 index 00000000000..d83d0dac463 --- /dev/null +++ b/apps/streams/src/sdk-to-ai-chunks.ts @@ -0,0 +1,446 @@ +/** + * SDK-to-AI Chunk Converter + * + * Converts Claude Agent SDK `SDKMessage` objects to TanStack AI `StreamChunk` (AG-UI events). + * The proxy expects SSE with JSON AG-UI events. The `StreamProcessor` on the client + * materializes these into `MessagePart[]` for the UI. + * + * Handled AG-UI events: + * - TEXT_MESSAGE_CONTENT — text token streaming + * - TOOL_CALL_START — tool invocation begins + * - TOOL_CALL_ARGS — streaming tool arguments + * - TOOL_CALL_END — tool call complete (with optional result) + * - STEP_FINISHED — thinking/reasoning content (incremental delta) + * - RUN_FINISHED — agent turn complete + * - RUN_ERROR — error during execution + */ + +import type { StreamChunk } from "@tanstack/ai"; + +// ============================================================================ +// Claude SDK Types (subset used for conversion) +// ============================================================================ + +interface SDKPartialAssistantMessage { + type: "stream_event"; + event: BetaRawMessageStreamEvent; + parent_tool_use_id: string | null; + uuid: string; + session_id: string; +} + +interface SDKUserMessage { + type: "user"; + message: { + content: string | Array<{ type: string; [key: string]: unknown }>; + }; + parent_tool_use_id: string | null; + session_id: string; +} + +interface SDKResultMessage { + type: "result"; + subtype: string; + duration_ms: number; + num_turns: number; + stop_reason: string | null; + total_cost_usd: number; + usage: { input_tokens: number; output_tokens: number }; + session_id: string; +} + +interface SDKSystemMessage { + type: "system"; + subtype: string; + session_id: string; +} + +type SDKMessage = + | SDKPartialAssistantMessage + | SDKUserMessage + | SDKResultMessage + | SDKSystemMessage + | { type: string; [key: string]: unknown }; + +// ============================================================================ +// Beta Raw Message Stream Events (from Anthropic SDK) +// ============================================================================ + +type BetaRawMessageStreamEvent = + | { type: "message_start"; message: { id: string; model: string } } + | { + type: "message_delta"; + delta: { stop_reason?: string }; + usage?: { output_tokens: number }; + } + | { type: "message_stop" } + | { + type: "content_block_start"; + index: number; + content_block: ContentBlock; + } + | { + type: "content_block_delta"; + index: number; + delta: ContentBlockDelta; + } + | { type: "content_block_stop"; index: number }; + +type ContentBlock = + | { type: "text"; text: string } + | { type: "tool_use"; id: string; name: string; input: unknown } + | { type: "thinking"; thinking: string; signature: string } + | { type: "redacted_thinking"; data: string } + | { type: string; [key: string]: unknown }; + +type ContentBlockDelta = + | { type: "text_delta"; text: string } + | { type: "input_json_delta"; partial_json: string } + | { type: "thinking_delta"; thinking: string } + | { type: "signature_delta"; signature: string } + | { type: "citations_delta"; [key: string]: unknown }; + +// ============================================================================ +// Conversion State +// ============================================================================ + +interface ActiveBlock { + type: "text" | "tool_use" | "thinking" | "other"; + toolCallId?: string; + toolName?: string; + argsAccumulator?: string; +} + +export interface ConversionState { + activeBlocks: Map; + messageId: string; + runId: string; +} + +// ============================================================================ +// Converter Factory +// ============================================================================ + +export function createConverter(): { + state: ConversionState; + convert: (message: SDKMessage) => StreamChunk[]; +} { + const state: ConversionState = { + activeBlocks: new Map(), + messageId: crypto.randomUUID(), + runId: crypto.randomUUID(), + }; + + return { + state, + convert(message: SDKMessage): StreamChunk[] { + return convertMessage(state, message); + }, + }; +} + +// ============================================================================ +// Main Conversion Logic +// ============================================================================ + +function convertMessage( + state: ConversionState, + message: SDKMessage, +): StreamChunk[] { + switch (message.type) { + case "stream_event": + return handleStreamEvent( + state, + (message as SDKPartialAssistantMessage).event, + ); + + case "user": + return handleUserMessage(message as SDKUserMessage); + + case "result": + return handleResultMessage(state, message as SDKResultMessage); + + default: + // Skip system, assistant, status, hook, and other message types + return []; + } +} + +// ============================================================================ +// Stream Event Handlers +// ============================================================================ + +function handleStreamEvent( + state: ConversionState, + event: BetaRawMessageStreamEvent, +): StreamChunk[] { + const now = Date.now(); + + switch (event.type) { + case "content_block_start": + return handleContentBlockStart(state, event.index, event.content_block); + + case "content_block_delta": + return handleContentBlockDelta(state, event.index, event.delta, now); + + case "content_block_stop": + return handleContentBlockStop(state, event.index, now); + + default: + // message_start, message_delta, message_stop — not needed for chunk conversion + return []; + } +} + +function handleContentBlockStart( + state: ConversionState, + index: number, + block: ContentBlock, +): StreamChunk[] { + const now = Date.now(); + + switch (block.type) { + case "text": { + state.activeBlocks.set(index, { type: "text" }); + return []; + } + + case "tool_use": { + const toolBlock = block as { + type: "tool_use"; + id: string; + name: string; + }; + state.activeBlocks.set(index, { + type: "tool_use", + toolCallId: toolBlock.id, + toolName: toolBlock.name, + argsAccumulator: "", + }); + return [ + { + type: "TOOL_CALL_START", + toolCallId: toolBlock.id, + toolName: toolBlock.name, + index, + timestamp: now, + } satisfies StreamChunk, + ]; + } + + case "thinking": { + const stepId = `thinking-${index}`; + state.activeBlocks.set(index, { type: "thinking" }); + return [ + { + type: "STEP_STARTED", + stepId, + stepType: "thinking", + timestamp: now, + } satisfies StreamChunk, + ]; + } + + default: { + state.activeBlocks.set(index, { type: "other" }); + return []; + } + } +} + +function handleContentBlockDelta( + state: ConversionState, + index: number, + delta: ContentBlockDelta, + now: number, +): StreamChunk[] { + const block = state.activeBlocks.get(index); + + switch (delta.type) { + case "text_delta": { + return [ + { + type: "TEXT_MESSAGE_CONTENT", + messageId: state.messageId, + delta: delta.text, + timestamp: now, + } satisfies StreamChunk, + ]; + } + + case "input_json_delta": { + if (!block || block.type !== "tool_use" || !block.toolCallId) { + return []; + } + block.argsAccumulator = + (block.argsAccumulator ?? "") + delta.partial_json; + return [ + { + type: "TOOL_CALL_ARGS", + toolCallId: block.toolCallId, + delta: delta.partial_json, + args: block.argsAccumulator, + timestamp: now, + } satisfies StreamChunk, + ]; + } + + case "thinking_delta": { + const stepId = `thinking-${index}`; + return [ + { + type: "STEP_FINISHED", + stepId, + delta: delta.thinking, + timestamp: now, + } satisfies StreamChunk, + ]; + } + + default: + // signature_delta, citations_delta — skip + return []; + } +} + +function handleContentBlockStop( + state: ConversionState, + index: number, + now: number, +): StreamChunk[] { + const block = state.activeBlocks.get(index); + state.activeBlocks.delete(index); + + if (!block) return []; + + if (block.type === "tool_use" && block.toolCallId && block.toolName) { + let parsedInput: unknown; + try { + parsedInput = JSON.parse(block.argsAccumulator || "{}"); + } catch (parseErr) { + console.warn( + "[sdk-to-ai-chunks] Failed to parse tool args for", + block.toolName, + ":", + parseErr, + ); + parsedInput = {}; + } + + return [ + { + type: "TOOL_CALL_END", + toolCallId: block.toolCallId, + toolName: block.toolName, + input: parsedInput, + timestamp: now, + } satisfies StreamChunk, + ]; + } + + return []; +} + +// ============================================================================ +// User Message Handler (Tool Results) +// ============================================================================ + +function handleUserMessage(message: SDKUserMessage): StreamChunk[] { + if (!message.message) return []; + const content = message.message.content; + if (typeof content === "string" || !Array.isArray(content)) { + return []; + } + + const now = Date.now(); + const chunks: StreamChunk[] = []; + + for (const block of content) { + if (block.type === "tool_result") { + const toolResult = block as { + type: "tool_result"; + tool_use_id: string; + content?: + | string + | Array<{ type: string; text?: string; [key: string]: unknown }>; + is_error?: boolean; + }; + + let resultText: string; + if (typeof toolResult.content === "string") { + resultText = toolResult.content; + } else if (Array.isArray(toolResult.content)) { + resultText = toolResult.content + .filter( + (b): b is { type: string; text: string } => + b.type === "text" && typeof b.text === "string", + ) + .map((b) => b.text) + .join("\n"); + } else { + resultText = ""; + } + + chunks.push({ + type: "TOOL_CALL_END", + toolCallId: toolResult.tool_use_id, + toolName: "", + result: resultText, + timestamp: now, + } satisfies StreamChunk); + } + } + + return chunks; +} + +// ============================================================================ +// Result Message Handler +// ============================================================================ + +function handleResultMessage( + state: ConversionState, + message: SDKResultMessage, +): StreamChunk[] { + const now = Date.now(); + const chunks: StreamChunk[] = []; + + if (message.subtype?.startsWith("error")) { + chunks.push({ + type: "RUN_ERROR", + runId: state.runId, + error: { + message: `Claude agent error: ${message.subtype}`, + code: message.subtype, + }, + timestamp: now, + } satisfies StreamChunk); + return chunks; + } + + const finishReason = + message.stop_reason === "end_turn" || + message.stop_reason === "stop_sequence" + ? "stop" + : message.stop_reason === "max_tokens" + ? "length" + : message.stop_reason === "tool_use" + ? "tool_calls" + : "stop"; + + chunks.push({ + type: "RUN_FINISHED", + runId: state.runId, + finishReason: finishReason as "stop" | "length" | "tool_calls", + usage: message.usage + ? { + promptTokens: message.usage.input_tokens, + completionTokens: message.usage.output_tokens, + totalTokens: message.usage.input_tokens + message.usage.output_tokens, + } + : undefined, + timestamp: now, + } satisfies StreamChunk); + + return chunks; +} diff --git a/bun.lock b/bun.lock index ec277ce7d42..17d52a5e9a2 100644 --- a/bun.lock +++ b/bun.lock @@ -454,16 +454,18 @@ "name": "@superset/streams", "version": "0.0.1", "dependencies": { + "@anthropic-ai/claude-agent-sdk": "^0.2.19", "@durable-streams/client": "^0.2.0", "@durable-streams/server": "^0.2.0", + "@hono/node-server": "^1.13.0", "@superset/durable-session": "workspace:*", + "@tanstack/ai": "^0.3.0", "@tanstack/db": "^0.5.22", "hono": "^4.4.0", "zod": "^4.1.12", }, "devDependencies": { "@durable-streams/server-conformance-tests": "^0.2.0", - "@hono/node-server": "^1.13.0", "@superset/typescript": "workspace:*", "@types/node": "^24.9.1", "fast-check": "^4.5.3", diff --git a/docs/ai-chat-plan.md b/docs/ai-chat-plan.md index a459352f357..8f3c94b4f40 100644 --- a/docs/ai-chat-plan.md +++ b/docs/ai-chat-plan.md @@ -118,7 +118,7 @@ The agent endpoint converts these to TanStack AI `StreamChunk` format before wri | PresenceBar component | DONE — `packages/durable-session/src/react/components/PresenceBar/` | | Old ai-chat package | REMOVED — replaced by `@superset/durable-session` | | Vendored proxy (A2) | DONE — `apps/streams/src/` (vendored from electric-sql/transport, JSON.stringify fix for DurableStream.append) | -| Claude agent endpoint (B) | NOT BUILT | +| Claude agent endpoint (B) | DONE — `apps/streams/src/claude-agent.ts` + `apps/streams/src/sdk-to-ai-chunks.ts` | | Database schema | NOT BUILT | | API chat router | NOT BUILT | | Desktop chat UI (renderer) | NOT BUILT | @@ -892,12 +892,12 @@ All files below are created and typechecking. Compatibility fixes applied for un | `packages/durable-session/src/react/components/ChatInput/` | Migrated from `packages/ai-chat` | ✅ | | `packages/durable-session/src/react/components/PresenceBar/` | Migrated from `packages/ai-chat` | ✅ | -### Files to CREATE (new code) +### Files CREATED (Phase B — Claude Agent Endpoint) ✅ -| File | Description | Lines (est) | +| File | Description | Status | |---|---|---| -| `apps/streams/src/claude-agent.ts` | Claude agent HTTP endpoint | ~120 | -| `apps/streams/src/sdk-to-ai-chunks.ts` | SDKMessage → TanStack AI chunk converter | ~200 | +| `apps/streams/src/claude-agent.ts` | Claude agent HTTP endpoint (Hono, SSE response) | ✅ | +| `apps/streams/src/sdk-to-ai-chunks.ts` | SDKMessage → TanStack AI AG-UI chunk converter | ✅ | ### Files CREATED (vendored proxy — Phase A2) ✅ @@ -954,6 +954,13 @@ All files below are created and typechecking. Compatibility fix: `DurableStream. | `apps/streams/package.json` | Added: hono, @hono/node-server, @durable-streams/client, @superset/durable-session, @tanstack/db, zod | ✅ | | `packages/durable-session/src/client.ts` | Fixed: `response.json()` return type assertion for `ForkResult` | ✅ | +### Files MODIFIED (Phase B) ✅ + +| File | Changes | Status | +|---|---|---| +| `apps/streams/package.json` | Added: @anthropic-ai/claude-agent-sdk, @tanstack/ai | ✅ | +| `apps/streams/src/index.ts` | Added: Claude agent endpoint on CLAUDE_AGENT_PORT (default 9090) | ✅ | + --- ## Implementation Order @@ -961,7 +968,7 @@ All files below are created and typechecking. Compatibility fix: `DurableStream. 1. ~~**Phase A1** — Vendor `@superset/durable-session` package~~ ✅ DONE 2. ~~**Phase C1** — Remove old `packages/ai-chat`, migrate UI components~~ ✅ DONE 3. ~~**Phase A2** — Vendor proxy into `apps/streams` (copy 17 files, adjust 3 import paths)~~ ✅ DONE -4. **Phase B** — Claude agent endpoint + SDK-to-AI chunk converter (2 new files) +4. ~~**Phase B** — Claude agent endpoint + SDK-to-AI chunk converter (2 new files)~~ ✅ DONE 5. **Phase C2** — Simplify desktop session manager 6. **Phase C3** — Handle drafts 7. **Phase D** — Database schema + migration @@ -978,6 +985,7 @@ All files below are created and typechecking. Compatibility fix: `DurableStream. | `@tanstack/ai` API mismatch with vendored code | Build breaks | Vendored code uses `workspace:*` — pin to compatible published versions, fix API differences | ✅ Resolved — `DoneStreamChunk` → `RUN_FINISHED`, `LiveMode` removed | | `@tanstack/db` unreleased aggregates | Build breaks | Rewrite collection pipelines with `groupBy + count + fn.select` workaround | ✅ Resolved — `collect`/`minStr` replaced | | SDKMessage → AI chunk conversion errors | Broken rendering | Comprehensive unit tests with real Claude output fixtures | Pending (Phase B) | +| Dual `StreamChunk` types | Type confusion, silent mismatches at module boundaries | `sdk-to-ai-chunks.ts` imports strict `StreamChunk` from `@tanstack/ai` (union of 14 AG-UI events). `types.ts` defines a loose `{ type: string; [key: string]: unknown }` used by `protocol.ts` and `stream-writer.ts`. Works at runtime because JSON serialization is the boundary, but `protocol.ts` gets zero type safety when constructing/consuming chunks. **Fix:** delete local `StreamChunk` from `types.ts`, use `@tanstack/ai`'s everywhere, replace `as StreamChunk` casts in `protocol.ts` with typed construction (~10 call sites). | Deferred — cleanup PR | | Claude binary path outside Electron | Agent can't start | `CLAUDE_BINARY_PATH` env var set by desktop at streams startup | Pending | | Multi-turn resume state lost on restart | Context lost | In-memory map + optional file-based persistence in data dir | Pending | | Interrupt via HTTP abort | Claude subprocess continues | Agent detects fetch abort → calls `query.interrupt()` + `abortController.abort()` | Pending |