diff --git a/.env.example b/.env.example index a2c17071d80..464107ea1a4 100644 --- a/.env.example +++ b/.env.example @@ -65,18 +65,6 @@ POSTHOG_PROJECT_ID= # ----------------------------------------------------------------------------- FREESTYLE_API_KEY= -# ----------------------------------------------------------------------------- -# Streams (AI Chat Server) -# ----------------------------------------------------------------------------- -# Clients (Desktop Web Mobile) -NEXT_PUBLIC_STREAMS_URL=http://localhost:8080 - -# Streams server internals (optional) -STREAMS_PORT=8080 -STREAMS_INTERNAL_PORT=8081 -STREAMS_INTERNAL_URL=http://localhost:8081 -STREAMS_DATA_DIR=./data - # ----------------------------------------------------------------------------- # Sentry Error Tracking # ----------------------------------------------------------------------------- diff --git a/AGENTS.md b/AGENTS.md index 04397cdf780..2cf26e90842 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -13,8 +13,6 @@ Bun + Turbo monorepo with: - `apps/desktop` - Electron desktop application - `apps/docs` - Documentation site - `apps/mobile` - React Native mobile app (Expo) - - `apps/streams` - Streams service - - `apps/electric-proxy` - Electric proxy service - **Packages**: - `packages/ui` - Shared UI components (shadcn/ui + TailwindCSS v4). - Add components: `npx shadcn@latest add ` (run in `packages/ui/`) diff --git a/apps/electric-proxy/package.json b/apps/electric-proxy/package.json deleted file mode 100644 index eceb528dbd1..00000000000 --- a/apps/electric-proxy/package.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "name": "electric-proxy", - "version": "0.0.0", - "private": true -} diff --git a/bun.lock b/bun.lock index 28131e25fd8..a94d3b1e927 100644 --- a/bun.lock +++ b/bun.lock @@ -318,10 +318,6 @@ "typescript": "^5.9.3", }, }, - "apps/electric-proxy": { - "name": "electric-proxy", - "version": "0.0.0", - }, "apps/marketing": { "name": "@superset/marketing", "version": "0.1.0", @@ -2979,8 +2975,6 @@ "ejs": ["ejs@3.1.10", "", { "dependencies": { "jake": "^10.8.5" }, "bin": { "ejs": "bin/cli.js" } }, "sha512-UeJmFfOrAQS8OJWPZ4qtgHyWExa088/MtK5UEyoJGFH67cDEXkZSviOiKRCZ4Xij0zxI3JECgYs3oKx+AizQBA=="], - "electric-proxy": ["electric-proxy@workspace:apps/electric-proxy"], - "electron": ["electron@40.2.1", "", { "dependencies": { "@electron/get": "^2.0.0", "@types/node": "^24.9.0", "extract-zip": "^2.0.1" }, "bin": { "electron": "cli.js" } }, "sha512-0zOeyN8LB1KHIjVV5jyMmQmkqx3J8OkkVlab3p7vOM28jI46blxW7M52Tcdi6X2m5o2jj8ejOlAh5+boL3w8aQ=="], "electron-builder": ["electron-builder@26.7.0", "", { "dependencies": { "app-builder-lib": "26.7.0", "builder-util": "26.4.1", "builder-util-runtime": "9.5.1", "chalk": "^4.1.2", "ci-info": "^4.2.0", "dmg-builder": "26.7.0", "fs-extra": "^10.1.0", "lazy-val": "^1.0.5", "simple-update-notifier": "2.0.0", "yargs": "^17.6.2" }, "bin": { "electron-builder": "cli.js", "install-app-deps": "install-app-deps.js" } }, "sha512-LoXbCvSFxLesPneQ/fM7FB4OheIDA2tjqCdUkKlObV5ZKGhYgi5VHPHO/6UUOUodAlg7SrkPx7BZJPby+Vrtbg=="], diff --git a/docs/ai-chat-plan.md b/docs/ai-chat-plan.md deleted file mode 100644 index 00b6175b806..00000000000 --- a/docs/ai-chat-plan.md +++ /dev/null @@ -1,1195 +0,0 @@ -# Multiplayer AI Chat with Claude Code - -Build a real-time multiplayer AI chat powered by Claude Code SDK with Durable Streams for token streaming. - -## Architecture - -``` -Any Client (Web/Desktop/Mobile) -┌──────────────────────────────────────────────────────────┐ -│ useDurableChat() │ -│ @superset/durable-session (vendored) │ -│ │ -│ DurableChatClient │ -│ → collections.messages (reactive, materialized) │ -│ → collections.presence │ -│ → collections.activeGenerations │ -│ → sendMessage() (optimistic insert + POST to proxy) │ -└───────────┬──────────────────────────────────────────────┘ - │ HTTP - ▼ -┌──────────────────────────────────────────────────────────┐ -│ Durable Session Proxy (apps/streams, port 8080) │ -│ @superset/durable-session-proxy (vendored from │ -│ electric-sql/transport) │ -│ │ -│ Hono routes: │ -│ PUT /v1/sessions/:id Create session │ -│ POST /v1/sessions/:id/messages Send message │ -│ POST /v1/sessions/:id/agents Register agent │ -│ POST /v1/sessions/:id/stop Stop generation │ -│ GET /v1/stream/sessions/:id SSE stream proxy │ -│ │ -│ AIDBSessionProtocol │ -│ → writeUserMessage() to durable stream │ -│ → notifyRegisteredAgents() on new user message │ -│ → writeChunk() for each agent SSE chunk │ -│ → stopGeneration() via AbortController │ -│ │ -│ ┌────────────────────────────────────────────────┐ │ -│ │ DurableStreamTestServer (internal port 8081) │ │ -│ │ @durable-streams/server │ │ -│ │ LMDB + append-only logs │ │ -│ └────────────────────────────────────────────────┘ │ -└───────────┬──────────────────────────────────────────────┘ - │ HTTP (agent invocation) - ▼ -┌──────────────────────────────────────────────────────────┐ -│ Claude Agent Endpoint (apps/streams/src/claude-agent.ts)│ -│ │ -│ POST / receives { messages } from proxy │ -│ → Extracts latest user message │ -│ → Runs query() from @anthropic-ai/claude-agent-sdk │ -│ → Converts SDKMessage → TanStack AI SSE chunks │ -│ → Returns SSE response │ -│ → Manages multi-turn resume via claudeSessionId │ -│ │ -│ SDK Message Conversion (sdk-to-ai-chunks.ts): │ -│ stream_event (text_delta) → text-delta chunk │ -│ stream_event (tool_use start) → tool-call-start │ -│ stream_event (input_json_delta)→ tool-call-delta │ -│ stream_event (thinking_delta) → reasoning chunk │ -│ user (tool_result) → tool-result chunk │ -│ result → finish chunk │ -└──────────────────────────────────────────────────────────┘ -``` - -### Message Flow - -1. Client calls `sendMessage("fix the bug")` via `useDurableChat` -2. Optimistic insert into local `chunks` collection (instant UI update) -3. POST to proxy `/v1/sessions/:id/messages` -4. Proxy writes user message chunk to durable stream -5. Proxy detects new user message, calls registered Claude agent endpoint -6. Agent runs `query()` with Claude SDK, streams SSE chunks back -7. Proxy writes each chunk to durable stream with `messageId` + `seq` -8. Client's `SessionDB` syncs new chunks via SSE -9. `messages` collection auto-rematerializes → UI updates reactively - -## Key Design Decisions - -1. **Vendor `@electric-sql/durable-session`** — Not published to npm. Vendored from [electric-sql/transport](https://github.com/electric-sql/transport) into `packages/durable-session/` (~20 files). Required compatibility fixes for unreleased `@tanstack/db` aggregates (`collect`, `minStr`) and `@tanstack/ai` types. Gives us reactive collections, optimistic mutations, TanStack AI compatibility. -2. **Proxy pattern** — Proxy handles message writing, agent invocation, stream fan-out. Clients never write to durable stream directly. -3. **Agent endpoint** — Claude SDK runs as an "agent" the proxy calls via HTTP. Agent handles entire tool loop server-side. Returns standard TanStack AI SSE chunks. -4. **TanStack AI message format** — Messages use `parts: MessagePart[]` (TextPart, ToolCallPart, ToolResultPart, ThinkingPart) not Anthropic-specific `BetaContentBlock[]`. SDK output converted at the agent boundary. -5. **Postgres for completed messages** — Single write on completion, Electric syncs history. Durable stream is the live source of truth during streaming. -6. **`@tanstack/ai` for materialization** — Official `StreamProcessor` handles chunk accumulation. No custom materialization needed. - -## Claude SDK Streaming Format - -The Claude Agent SDK emits `SDKMessage` objects when `includePartialMessages: true`: - -```typescript -// Types: system, stream_event, assistant, user, result -type SDKMessage = - | { type: 'system'; subtype: 'init'; session_id: string } - | { type: 'stream_event'; event: RawMessageStreamEvent } - | { type: 'assistant'; message: { content: BetaContentBlock[] } } - | { type: 'user'; message: { content: ToolResultBlock[] } } - | { type: 'result'; ... } -``` - -The agent endpoint converts these to TanStack AI `StreamChunk` format before writing to the durable stream. This is a one-way conversion at the write boundary — clients never see raw SDK messages. - ---- - -## Status - -| Component | Status | -|-----------|--------| -| Claude binary download | DONE — `apps/desktop/scripts/download-claude-binary.ts` | -| Auth (buildClaudeEnv) | DONE — `apps/desktop/src/lib/trpc/routers/ai-chat/utils/auth/auth.ts` | -| Session manager (v1) | DONE — `apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts` | -| Desktop tRPC router | DONE — `apps/desktop/src/lib/trpc/routers/ai-chat/index.ts` | -| Durable stream server (v1) | DONE — `apps/streams/` (vendored proxy from electric-sql/transport) | -| Vendored durable-session client | DONE — `packages/durable-session/` (vendored from electric-sql/transport) | -| React hook (useDurableChat) | DONE — `packages/durable-session/src/react/use-durable-chat.ts` | -| ChatInput component | DONE — `packages/durable-session/src/react/components/ChatInput/` | -| PresenceBar component | DONE — `packages/durable-session/src/react/components/PresenceBar/` | -| Old ai-chat package | REMOVED — replaced by `@superset/durable-session` | -| Vendored proxy (A2) | DONE — `apps/streams/src/` (vendored from electric-sql/transport, JSON.stringify fix for DurableStream.append) | -| Claude agent endpoint (B) | DONE — `apps/streams/src/claude-agent.ts` + `apps/streams/src/sdk-to-ai-chunks.ts` | -| Database schema | NOT BUILT | -| API chat router | NOT BUILT | -| Desktop chat UI (renderer) | NOT BUILT | -| Web chat UI | NOT BUILT | -| Message rendering component | NOT BUILT | - ---- - -## Phase A: Vendor `@electric-sql/durable-session` - -Source: [electric-sql/transport](https://github.com/electric-sql/transport) (unpublished, Apache-2.0) - -Reference source cloned to `/tmp/electric-sql-transport/` via: -```bash -git clone https://github.com/electric-sql/transport.git /tmp/electric-sql-transport -``` - -### A1. Create `packages/durable-session/` — DONE - -Vendor from `packages/durable-session` + `packages/react-durable-session` in the transport repo. - -#### File-by-File Vendoring Reference - -| Source (in `/tmp/electric-sql-transport/`) | Destination | Import Changes | -|---|---|---| -| `packages/durable-session/src/index.ts` | `packages/durable-session/src/index.ts` | None (relative imports) | -| `packages/durable-session/src/client.ts` | `packages/durable-session/src/client.ts` | None (relative imports) | -| `packages/durable-session/src/collection.ts` | `packages/durable-session/src/collection.ts` | None (relative imports) | -| `packages/durable-session/src/materialize.ts` | `packages/durable-session/src/materialize.ts` | None (relative imports) | -| `packages/durable-session/src/schema.ts` | `packages/durable-session/src/schema.ts` | None (relative imports) | -| `packages/durable-session/src/types.ts` | `packages/durable-session/src/types.ts` | None (relative imports) | -| `packages/durable-session/src/collections/index.ts` | `packages/durable-session/src/collections/index.ts` | None | -| `packages/durable-session/src/collections/messages.ts` | `packages/durable-session/src/collections/messages.ts` | None | -| `packages/durable-session/src/collections/active-generations.ts` | `packages/durable-session/src/collections/active-generations.ts` | None | -| `packages/durable-session/src/collections/session-meta.ts` | `packages/durable-session/src/collections/session-meta.ts` | None | -| `packages/durable-session/src/collections/session-stats.ts` | `packages/durable-session/src/collections/session-stats.ts` | None | -| `packages/durable-session/src/collections/model-messages.ts` | `packages/durable-session/src/collections/model-messages.ts` | None | -| `packages/durable-session/src/collections/presence.ts` | `packages/durable-session/src/collections/presence.ts` | None | -| `packages/react-durable-session/src/index.ts` | `packages/durable-session/src/react/index.ts` | `@electric-sql/durable-session` → `../` | -| `packages/react-durable-session/src/types.ts` | `packages/durable-session/src/react/types.ts` | `@electric-sql/durable-session` → `../` | -| `packages/react-durable-session/src/use-durable-chat.ts` | `packages/durable-session/src/react/use-durable-chat.ts` | `@electric-sql/durable-session` → `../` | - -**Specific import changes in react files:** -```typescript -// BEFORE (in react-durable-session source): -import { DurableChatClient, messageRowToUIMessage } from '@electric-sql/durable-session' -import type { DurableChatClientOptions } from '@electric-sql/durable-session' - -// AFTER (in packages/durable-session/src/react/): -import { DurableChatClient, messageRowToUIMessage } from '..' -import type { DurableChatClientOptions } from '..' -``` - -```typescript -// BEFORE (react index.ts re-exports): -export { DurableChatClient, ... } from '@electric-sql/durable-session' - -// AFTER: -export { DurableChatClient, ... } from '..' -``` - -#### Package Configuration - -**`packages/durable-session/package.json`:** -```json -{ - "name": "@superset/durable-session", - "version": "0.0.1", - "private": true, - "type": "module", - "exports": { - ".": { - "types": "./src/index.ts", - "default": "./src/index.ts" - }, - "./react": { - "types": "./src/react/index.ts", - "default": "./src/react/index.ts" - } - }, - "dependencies": { - "@durable-streams/state": "^0.2.0", - "@standard-schema/spec": "^1.0.0", - "@tanstack/ai": "^0.3.0", - "@tanstack/db": "^0.5.22", - "@tanstack/db-ivm": "^0.1.17", - "zod": "^4.1.12" - }, - "peerDependencies": { - "react": "^18.0.0 || ^19.0.0", - "@tanstack/react-db": "^0.1.66" - } -} -``` - -**`packages/durable-session/tsconfig.json`:** -```json -{ - "extends": "@superset/typescript-config/react-library.json", - "compilerOptions": { - "outDir": "./dist" - }, - "include": ["src"] -} -``` - -#### Key Internals to Understand - -**Schema** (`schema.ts`) — Three STATE-PROTOCOL collections: -```typescript -export const sessionStateSchema = createStateSchema({ - chunks: { - schema: chunkValueSchema, // messageId, actorId, role, chunk (JSON), seq, createdAt - type: 'chunk', - primaryKey: 'id', // Injected from event.key = `${messageId}:${seq}` - allowSyncWhilePersisting: true, - }, - presence: { - schema: presenceValueSchema, // actorId, deviceId, actorType, name?, status, lastSeenAt - type: 'presence', - primaryKey: 'id', // Injected from event.key = `${actorId}:${deviceId}` - }, - agents: { - schema: agentValueSchema, // agentId, name?, endpoint, triggers? - type: 'agent', - primaryKey: 'agentId', - }, -}) -``` - -**Materialization pipeline** (`materialize.ts`): -- Uses `StreamProcessor` from `@tanstack/ai` (the key dependency) -- Two paths: `WholeMessageChunk` (type: 'whole-message') for user msgs, `StreamChunk[]` for assistant msgs -- `parseChunk(row.chunk)` → JSON.parse the chunk field -- `materializeWholeMessage()` → extract UIMessage from chunk, return MessageRow -- `materializeAssistantMessage()` → sort chunks by seq, feed to StreamProcessor, get parts -- `isDoneChunk()` / stop/error chunk types mark `isComplete: true` - -**Collection pipeline** (`collections/messages.ts`): -``` -chunks → groupBy(messageId) + count(chunk) + min(createdAt) - → orderBy(startedAt, 'asc') - → fn.select(imperatively gather chunks → materializeMessage(rows)) - → getKey: row.id -``` - -Derived collections use `.fn.where()`: -- `toolCalls`: `parts.some(p => p.type === 'tool-call')` -- `pendingApprovals`: `parts.some(p => p.type === 'tool-call' && p.approval?.needsApproval && p.approval.approved === undefined)` -- `toolResults`: `parts.some(p => p.type === 'tool-result')` -- `activeGenerations`: `!message.isComplete` → maps to `ActiveGenerationRow` - -**Session DB factory** (`collection.ts`): -```typescript -const streamUrl = `${baseUrl}/v1/stream/sessions/${sessionId}` -const rawDb = createStreamDB({ - streamOptions: { url: streamUrl, headers, signal }, - state: sessionStateSchema, -}) -``` - -**Chunk key format**: `${messageId}:${seq}` — e.g., "msg-1:0", "msg-2:5" - -**React hook** (`react/use-durable-chat.ts`): -- `useCollectionData()` — SSR-safe collection subscription using `useSyncExternalStore` -- Client created synchronously in render (ref-cached by `${sessionId}:${proxyUrl}` key) -- Handles Strict Mode: checks `client.isDisposed` and recreates if needed -- Auto-connects on mount if `autoConnect: true` (default) -- Returns TanStack AI-compatible API: messages, sendMessage, isLoading, etc. - -#### Compatibility Fixes Applied - -The transport repo uses `workspace:*` (unreleased local versions) of `@tanstack/db`, `@tanstack/ai`, and `@durable-streams/state`. The published npm versions differ, requiring these fixes: - -| Issue | Fix | -|-------|-----| -| `collect` aggregate not in `@tanstack/db` v0.5.22 | Rewrote `messages.ts`, `session-stats.ts`, `presence.ts` to use `groupBy + count` as change discriminator + `fn.select` with imperative collection filtering | -| `minStr` aggregate not in `@tanstack/db` v0.5.22 | Replaced with `min()` which handles strings at runtime | -| `DoneStreamChunk` not in `@tanstack/ai` v0.3.0 | Replaced with `chunk.type === 'RUN_FINISHED'` type guard | -| `LiveMode` not in `@durable-streams/state` v0.2.1 | Removed import and re-export (was already unused in practice) | - -#### UI Components Migrated - -`ChatInput` and `PresenceBar` from the old `packages/ai-chat` were moved into `packages/durable-session/src/react/components/`. They are exported from `@superset/durable-session/react`: - -```typescript -import { ChatInput, PresenceBar } from '@superset/durable-session/react' -``` - -The old `packages/ai-chat` package has been fully removed. - -### A2. Vendor proxy into `apps/streams/` — DONE - -Vendor from `packages/durable-session-proxy` in the transport repo. - -#### File-by-File Vendoring Reference - -| Source (in `/tmp/electric-sql-transport/`) | Destination | Import Changes | -|---|---|---| -| `packages/durable-session-proxy/src/index.ts` | `packages/durable-session-proxy/src/index.ts` (re-exports only, keep for reference) | N/A | -| `packages/durable-session-proxy/src/server.ts` | `apps/streams/src/server.ts` | `@electric-sql/durable-session` → `@superset/durable-session` | -| `packages/durable-session-proxy/src/protocol.ts` | `apps/streams/src/protocol.ts` | `@electric-sql/durable-session` → `@superset/durable-session` | -| `packages/durable-session-proxy/src/types.ts` | `apps/streams/src/types.ts` | `@electric-sql/durable-session` → `@superset/durable-session` | -| `packages/durable-session-proxy/src/handlers/index.ts` | `apps/streams/src/handlers/index.ts` | None | -| `packages/durable-session-proxy/src/handlers/send-message.ts` | `apps/streams/src/handlers/send-message.ts` | None (relative) | -| `packages/durable-session-proxy/src/handlers/invoke-agent.ts` | `apps/streams/src/handlers/invoke-agent.ts` | None (relative) | -| `packages/durable-session-proxy/src/handlers/stream-writer.ts` | `apps/streams/src/handlers/stream-writer.ts` | None (relative) | -| `packages/durable-session-proxy/src/routes/index.ts` | `apps/streams/src/routes/index.ts` | None | -| `packages/durable-session-proxy/src/routes/sessions.ts` | `apps/streams/src/routes/sessions.ts` | None (relative) | -| `packages/durable-session-proxy/src/routes/messages.ts` | `apps/streams/src/routes/messages.ts` | None (relative) | -| `packages/durable-session-proxy/src/routes/agents.ts` | `apps/streams/src/routes/agents.ts` | None (relative) | -| `packages/durable-session-proxy/src/routes/stream.ts` | `apps/streams/src/routes/stream.ts` | None | -| `packages/durable-session-proxy/src/routes/tool-results.ts` | `apps/streams/src/routes/tool-results.ts` | None (relative) | -| `packages/durable-session-proxy/src/routes/approvals.ts` | `apps/streams/src/routes/approvals.ts` | None (relative) | -| `packages/durable-session-proxy/src/routes/health.ts` | `apps/streams/src/routes/health.ts` | None | -| `packages/durable-session-proxy/src/routes/auth.ts` | `apps/streams/src/routes/auth.ts` | None (relative) | -| `packages/durable-session-proxy/src/routes/fork.ts` | `apps/streams/src/routes/fork.ts` | None (relative) | - -**Import change in proxy files** (3 files: `server.ts`, `protocol.ts`, `types.ts`): -```typescript -// BEFORE: -import { sessionStateSchema, createSessionDB, ... } from '@electric-sql/durable-session' -import type { SessionDB, MessageRow, ModelMessage } from '@electric-sql/durable-session' - -// AFTER: -import { sessionStateSchema, createSessionDB, ... } from '@superset/durable-session' -import type { SessionDB, MessageRow, ModelMessage } from '@superset/durable-session' -``` - -**Replace** existing `apps/streams/src/index.ts` and **delete** `session-registry.ts`. - -#### New entrypoint: `apps/streams/src/index.ts` - -Based on vendored `dev.ts` pattern, combined with existing DurableStreamTestServer. All env vars are validated via `env.ts` (required, no defaults). - -```typescript -import { DurableStreamTestServer } from '@durable-streams/server' -import { serve } from '@hono/node-server' -import { claudeAgentApp } from './claude-agent' -import { env } from './env' -import { createServer } from './server' - -const durableStreamServer = new DurableStreamTestServer({ - port: env.STREAMS_INTERNAL_PORT, - dataDir: env.STREAMS_DATA_DIR, -}) -await durableStreamServer.start() - -const { app } = createServer({ - baseUrl: env.STREAMS_INTERNAL_URL, - cors: true, - logging: true, - authToken: env.STREAMS_SECRET, -}) - -serve({ fetch: app.fetch, port: env.PORT }) -serve({ fetch: claudeAgentApp.fetch, port: env.STREAMS_AGENT_PORT }) - -for (const signal of ['SIGINT', 'SIGTERM']) { - process.on(signal, async () => { - /* graceful shutdown */ - }) -} -``` - -#### Key Protocol Internals (`protocol.ts`, ~917 lines) - -The `AIDBSessionProtocol` class manages: - -1. **Session lifecycle**: `createSession()` → creates DurableStream + SessionDB + reactive trigger -2. **Chunk writing** via `sessionStateSchema.chunks.insert({ key, value })`: - - User messages: single chunk with `{ type: 'whole-message', message: UIMessage }` - - Agent responses: sequential chunks with TanStack AI StreamChunk objects -3. **Reactive agent triggering**: After `preload()`, subscribes to `modelMessages.subscribeChanges()` — only triggers for NEW user messages (not historical) -4. **Agent invocation**: `fetch()` to agent endpoint → parse SSE → `writeChunk()` for each data line -5. **Active generation tracking**: Map for interrupt support -6. **Stop generation**: `abortController.abort()` → writes `{ type: 'stop', reason: 'aborted' }` chunk -7. **Message history**: Reads from materialized `modelMessages` collection (not raw chunks) - -**Add to `apps/streams/package.json`:** -```json -{ - "dependencies": { - "@durable-streams/server": "^0.2.0", - "@durable-streams/client": "^0.2.0", - "@hono/node-server": "^1.13.0", - "@superset/durable-session": "workspace:*", - "@tanstack/db": "^0.5.22", - "hono": "^4.4.0", - "zod": "^4.1.12" - } -} -``` - ---- - -## Phase B: Claude Agent Endpoint - -### B1. Create `apps/streams/src/claude-agent.ts` - -Hono app that acts as an AI agent endpoint the proxy can invoke. The proxy's `invokeAgent()` calls this endpoint via `fetch()` and parses the SSE response. - -```typescript -import { Hono } from 'hono' -import { query } from '@anthropic-ai/claude-agent-sdk' -import { convertSDKMessageToSSE } from './sdk-to-ai-chunks' - -const app = new Hono() - -// Session state for multi-turn resume -const claudeSessions = new Map() // sessionId → claudeSessionId - -app.post('/', async (c) => { - const { messages, stream: shouldStream, sessionId } = await c.req.json() - - // Extract prompt from latest user message - const latestUserMessage = messages.filter(m => m.role === 'user').pop() - if (!latestUserMessage) { - return c.json({ error: 'No user message found' }, 400) - } - - const prompt = latestUserMessage.content - const claudeSessionId = claudeSessions.get(sessionId) - - // Run Claude query - const result = query({ - prompt, - options: { - ...(claudeSessionId && { resume: claudeSessionId }), - model: 'claude-sonnet-4-5-20250929', - maxTurns: 25, - }, - abortSignal: c.req.raw.signal, - }) - - // Return SSE response - const encoder = new TextEncoder() - const readable = new ReadableStream({ - async start(controller) { - try { - for await (const message of result) { - // Extract claudeSessionId from system init - if (message.type === 'system' && message.subtype === 'init') { - claudeSessions.set(sessionId, message.session_id) - continue - } - - // Convert SDKMessage → TanStack AI SSE chunks - const chunks = convertSDKMessageToSSE(message) - for (const chunk of chunks) { - controller.enqueue(encoder.encode(`data: ${JSON.stringify(chunk)}\n\n`)) - } - } - controller.enqueue(encoder.encode('data: [DONE]\n\n')) - controller.close() - } catch (err) { - controller.error(err) - } - }, - }) - - return new Response(readable, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - }, - }) -}) - -export { app as claudeAgentApp } -``` - -**Integration with proxy:** Register this agent endpoint in the proxy: -```typescript -// In session startup: -await protocol.registerAgent(sessionId, { - id: 'claude', - name: 'Claude Agent', - endpoint: `http://localhost:${CLAUDE_AGENT_PORT}/`, - method: 'POST', - triggers: 'user-messages', - bodyTemplate: { sessionId }, -}) -``` - -**Session state:** Maintains `Map` for multi-turn resume. - -**Abort handling:** When proxy calls `stopGeneration()`, it aborts the fetch to this endpoint. The agent detects the abort via `c.req.raw.signal` and the `query()` call is interrupted. - -### B2. Create `apps/streams/src/sdk-to-ai-chunks.ts` - -Pure conversion module. Maps Claude SDK `SDKMessage` types to TanStack AI `StreamChunk`. - -**The proxy expects standard JSON chunks** — it reads SSE `data: {...}` lines, parses JSON, and writes each chunk to the durable stream via `protocol.writeChunk()`. The `StreamProcessor` on the client side then materializes these into `MessagePart[]`. - -#### Conversion Table - -| SDKMessage | TanStack AI Chunk | Notes | -|---|---|---| -| `stream_event` → `content_block_start` (text) | — | No chunk, wait for deltas | -| `stream_event` → `content_block_delta` (text_delta) | `{ type: "text-delta", textDelta }` | | -| `stream_event` → `content_block_start` (tool_use) | `{ type: "tool-call-streaming-start", toolCallId, toolName }` | | -| `stream_event` → `content_block_delta` (input_json_delta) | `{ type: "tool-call-delta", toolCallId, argsTextDelta }` | | -| `stream_event` → `content_block_stop` (tool_use) | `{ type: "tool-call", toolCallId, toolName, args }` | Full args from accumulator | -| `stream_event` → `content_block_start` (thinking) | — | Wait for deltas | -| `stream_event` → `content_block_delta` (thinking_delta) | `{ type: "reasoning", textDelta }` | | -| `user` (tool_result blocks) | `{ type: "tool-result", toolCallId, result }` | Server-side tool execution | -| `result` | `{ type: "done", finishReason: "stop" }` | End of agent turn, maps to `DoneStreamChunk` | -| `system` (init) | — | Extract `claudeSessionId` internally | -| `assistant` | — | Skip (stream_events already cover content) | - -#### Implementation Skeleton - -```typescript -import type { SDKMessage } from '@anthropic-ai/claude-agent-sdk' - -interface ConversionState { - // Map content_block index → block type + metadata - activeBlocks: Map -} - -export function createConverter(): { - state: ConversionState - convert: (message: SDKMessage) => StreamChunk[] -} { - const state: ConversionState = { activeBlocks: new Map() } - - return { - state, - convert(message: SDKMessage): StreamChunk[] { - if (message.type === 'stream_event') { - return handleStreamEvent(state, message.event) - } - if (message.type === 'user') { - // tool_result from Claude's internal tool execution - return message.message.content - .filter(block => block.type === 'tool_result') - .map(block => ({ - type: 'tool-result' as const, - toolCallId: block.tool_use_id, - result: block.content, - })) - } - if (message.type === 'result') { - return [{ type: 'done' as const, finishReason: 'stop' }] - } - return [] // Skip system, assistant - }, - } -} - -// Simpler stateless wrapper -export function convertSDKMessageToSSE(message: SDKMessage): StreamChunk[] { - // ... delegates to converter -} -``` - -**ConversionState** tracks: -- Active content block indices (to correlate starts with deltas) -- JSON accumulator per tool_use block (for partial → full args on `content_block_stop`) -- Current tool call IDs per block index - -**Key TanStack AI StreamChunk types** (from `@tanstack/ai`): -```typescript -type StreamChunk = - | { type: 'text-delta'; textDelta: string } - | { type: 'tool-call-streaming-start'; toolCallId: string; toolName: string } - | { type: 'tool-call-delta'; toolCallId: string; argsTextDelta: string } - | { type: 'tool-call'; toolCallId: string; toolName: string; args: Record } - | { type: 'tool-result'; toolCallId: string; result: unknown } - | { type: 'reasoning'; textDelta: string } - | { type: 'done'; finishReason: string } -``` - ---- - -## Phase C: Update Client Packages - -### C1. ~~Update `packages/ai-chat`~~ — DONE - -`packages/ai-chat` has been fully removed. All stream client code, hooks, materialization, and UI components are now in `packages/durable-session`. Consumers import directly: - -```typescript -// Data layer -import { DurableChatClient, createDurableChatClient } from '@superset/durable-session' - -// React hooks + components -import { useDurableChat, ChatInput, PresenceBar } from '@superset/durable-session/react' -``` - -### C2. Simplify desktop session manager - -**Rewrite** `apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts`: - -**Remove entirely:** -- `StreamWatcher` class (watched stream for user_input via SSE — proxy now handles this reactively) -- `IdempotentProducer` / `createProducer` / `closeProducer` (proxy writes to durable stream) -- `processUserMessage()` (moved to Claude agent endpoint) -- `binaryPathResolver` (moved to agent endpoint env) - -**New session manager** — thin HTTP orchestrator: - -```typescript -const PROXY_URL = process.env.STREAMS_URL || 'http://localhost:8080' - -export class ClaudeSessionManager extends EventEmitter { - private activeSessions = new Map() - - async startSession({ sessionId, cwd, env }: StartSessionOptions): Promise { - // 1. Create session on proxy - await fetch(`${PROXY_URL}/v1/sessions/${sessionId}`, { method: 'PUT' }) - - // 2. Register Claude agent - await fetch(`${PROXY_URL}/v1/sessions/${sessionId}/agents`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - agents: [{ - id: 'claude', - name: 'Claude Agent', - endpoint: `http://localhost:${CLAUDE_AGENT_PORT}/`, - triggers: 'user-messages', - bodyTemplate: { sessionId, cwd, env }, - }], - }), - }) - - this.activeSessions.set(sessionId, { sessionId }) - this.emit('session:started', { sessionId }) - } - - async stopSession(sessionId: string): Promise { - // 1. Stop active generations - await this.interrupt(sessionId) - // 2. Unregister agent - await fetch(`${PROXY_URL}/v1/sessions/${sessionId}/agents/claude`, { method: 'DELETE' }) - // 3. Delete session - await fetch(`${PROXY_URL}/v1/sessions/${sessionId}`, { method: 'DELETE' }) - - this.activeSessions.delete(sessionId) - this.emit('session:stopped', { sessionId }) - } - - async interrupt(sessionId: string): Promise { - await fetch(`${PROXY_URL}/v1/sessions/${sessionId}/stop`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({}), - }) - this.emit('session:interrupted', { sessionId }) - } - - isActive(sessionId: string): boolean { - return this.activeSessions.has(sessionId) - } - - getActiveSessions(): string[] { - return [...this.activeSessions.keys()] - } -} -``` - -**tRPC router** (`apps/desktop/src/lib/trpc/routers/ai-chat/index.ts`) keeps same shape — just the session manager internals are simpler. - -### C3. Handle drafts - -Official schema has `agents` instead of `drafts`. Typing indicators come from presence `status` field. - -- Draft content → local React state / Zustand -- Typing indicator → presence `status: 'typing'` (can extend presence schema) - ---- - -## Phase D: Database Schema - -**`packages/db/src/schema/chat.ts`** (new): -```typescript -export const chatSessions = pgTable("chat_sessions", { - id: uuid().primaryKey().defaultRandom(), - organizationId: uuid("organization_id").notNull().references(() => organizations.id), - repositoryId: uuid("repository_id").references(() => repositories.id), - workspaceId: text("workspace_id"), - title: text().notNull(), - claudeSessionId: text("claude_session_id"), - cwd: text(), - createdById: uuid("created_by_id").notNull().references(() => users.id), - archivedAt: timestamp("archived_at"), - createdAt: timestamp("created_at").notNull().defaultNow(), - updatedAt: timestamp("updated_at").notNull().defaultNow().$onUpdate(() => new Date()), -}); - -export const chatMessages = pgTable("chat_messages", { - id: uuid().primaryKey().defaultRandom(), - sessionId: uuid("session_id").notNull().references(() => chatSessions.id), - organizationId: uuid("organization_id").notNull().references(() => organizations.id), - role: text().notNull(), - content: text().notNull(), - toolCalls: jsonb("tool_calls"), - inputTokens: integer("input_tokens"), - outputTokens: integer("output_tokens"), - createdById: uuid("created_by_id").references(() => users.id), - processingStartedAt: timestamp("processing_started_at"), - processingExpiresAt: timestamp("processing_expires_at"), - processedAt: timestamp("processed_at"), - processingError: text("processing_error"), - createdAt: timestamp("created_at").notNull().defaultNow(), -}); - -export const chatParticipants = pgTable("chat_participants", { - id: uuid().primaryKey().defaultRandom(), - sessionId: uuid("session_id").notNull().references(() => chatSessions.id), - userId: uuid("user_id").notNull().references(() => users.id), - role: text().notNull().default("viewer"), - joinedAt: timestamp("joined_at").notNull().defaultNow(), -}); -``` - ---- - -## Phase E: API tRPC Router - -**`packages/trpc/src/router/chat/index.ts`**: -- `createSession`, `sendMessage`, `listSessions`, `getSession`, `getMessages` -- `saveAssistantMessage` (called by desktop on completion) -- `archiveSession` - ---- - -## Phase F: Desktop Chat UI ✅ DONE - -Chat pane integrated as a tab type in the desktop workspace view. The UI connects to the durable session proxy via `useDurableChat` and manages session lifecycle through the existing `ai-chat` tRPC router. - -``` -apps/desktop/src/renderer/.../ChatPane/ -├── ChatPane.tsx -- Threads sessionId + cwd from pane store/workspace -├── ChatInterface/ -│ ├── ChatInterface.tsx -- Core: useDurableChat + tRPC session lifecycle -│ ├── constants.ts -- MODELS, SUGGESTIONS -│ ├── types.ts -- ModelOption -│ ├── utils/ -│ │ └── map-tool-state.ts -- Maps TanStack AI ToolCallPart states → ToolDisplayState -│ └── components/ -│ ├── ChatMessageItem/ -- Renders UIMessage.parts[] (text, thinking, tool-call) -│ ├── ToolCallBlock/ -- ToolCallPart + ToolResultPart → Tool + Confirmation UI -│ ├── ModelPicker/ -│ ├── ContextIndicator/ -│ └── PlanBlock/ -``` - -### Session lifecycle - -1. `ChatPane` reads `sessionId` from pane store (generated by `createChatPane()`) and `cwd` from workspace query -2. `ChatInterface` mounts → tRPC `startSession.mutate()` → main process → HTTP PUT to proxy -3. tRPC `onSuccess` → `useDurableChat.connect()` opens SSE stream from proxy -4. User sends message → `sendMessage()` → proxy → Claude agent → streamed chunks → reactive UI -5. Unmount → tRPC `stopSession.mutate()` cleans up - -### Type bridge: TanStack AI → UI components - -`packages/ui` AI element components define a local `ToolDisplayState` type that covers both TanStack AI states (`awaiting-input`, `input-complete`, `approval-requested`, `approval-responded`) and UI-only states (`input-available`, `output-available`, `output-error`, `output-denied`). The `mapToolCallState()` utility in the desktop app bridges `ToolCallPart.state` → `ToolDisplayState`. - -### Environment variables - -| Variable | Description | -|----------|-------------| -| `STREAMS_URL` | Proxy URL exposed via tRPC `getConfig` query | -| `STREAMS_SECRET` | Bearer token for authenticated proxy | - ---- - -## Phase G: Web Chat UI - -``` -apps/web/src/app/(dashboard)/chat/ -├── page.tsx -├── [sessionId]/ -│ └── page.tsx -└── components/ - ├── ChatMessageList.tsx - ├── ChatMessage.tsx - ├── ChatInput.tsx - ├── PresenceBar.tsx - └── TypingIndicator.tsx -``` - -Web uses same `useDurableChat` hook pointing at deployed proxy URL. - ---- - -## Dependencies - -**New packages needed** (all published on npm): - -| Package | Version | Used By | -|---------|---------|---------| -| `@tanstack/ai` | ^0.3.0 | durable-session (StreamProcessor for materialization) | -| `@tanstack/db-ivm` | ^0.1.17 | durable-session (incremental view maintenance) | -| `@standard-schema/spec` | ^1.0.0 | durable-session (schema validation) | -| `hono` | ^4.4.0 | apps/streams (proxy HTTP framework) | -| `@hono/node-server` | ^1.13.0 | apps/streams (Hono Node.js HTTP adapter) | - -**Already installed:** -- `@durable-streams/client` ^0.2.0, `@durable-streams/server` ^0.2.0, `@durable-streams/state` ^0.2.0 -- `@tanstack/db` 0.5.22, `@tanstack/react-db` 0.1.66 -- `@anthropic-ai/claude-agent-sdk` ^0.2.19 -- `zod` ^4.3.5 - -## Environment Variables - -All env vars are required — the streams server throws at startup if any are missing. - -```bash -# Streams server (apps/streams) -PORT=8080 # Proxy port (set by Fly.io in production) -STREAMS_INTERNAL_PORT=8081 # Internal durable stream server port -STREAMS_AGENT_PORT=9090 # Claude agent endpoint port -STREAMS_INTERNAL_URL=http://127.0.0.1:8081 # Internal durable stream server URL -STREAMS_DATA_DIR=/data # Data directory for LMDB + session persistence -STREAMS_SECRET= # Bearer token for /v1/* route auth -ANTHROPIC_API_KEY=sk-ant-... # Claude API key - -# Desktop (apps/desktop) — validated in env.main.ts, required -STREAMS_URL=http://localhost:8080 # Proxy URL exposed via tRPC getConfig -STREAMS_SECRET= # Bearer token for authenticated proxy -``` - ---- - -## Complete File Operations Summary - -### Files CREATED (vendored client — Phase A1) ✅ - -All files below are created and typechecking. Compatibility fixes applied for unreleased `@tanstack/db` aggregates (`collect`, `minStr`) and `@tanstack/ai` types (`DoneStreamChunk`). - -| Destination | Source | Status | -|---|---|---| -| `packages/durable-session/package.json` | NEW | ✅ | -| `packages/durable-session/tsconfig.json` | NEW | ✅ | -| `packages/durable-session/src/index.ts` | `durable-session/src/index.ts` | ✅ | -| `packages/durable-session/src/client.ts` | `durable-session/src/client.ts` | ✅ (fixed) | -| `packages/durable-session/src/collection.ts` | `durable-session/src/collection.ts` | ✅ | -| `packages/durable-session/src/materialize.ts` | `durable-session/src/materialize.ts` | ✅ (fixed) | -| `packages/durable-session/src/schema.ts` | `durable-session/src/schema.ts` | ✅ | -| `packages/durable-session/src/types.ts` | `durable-session/src/types.ts` | ✅ (fixed) | -| `packages/durable-session/src/collections/index.ts` | `durable-session/src/collections/index.ts` | ✅ | -| `packages/durable-session/src/collections/messages.ts` | `durable-session/src/collections/messages.ts` | ✅ (rewritten) | -| `packages/durable-session/src/collections/active-generations.ts` | `durable-session/src/collections/active-generations.ts` | ✅ | -| `packages/durable-session/src/collections/session-meta.ts` | `durable-session/src/collections/session-meta.ts` | ✅ | -| `packages/durable-session/src/collections/session-stats.ts` | `durable-session/src/collections/session-stats.ts` | ✅ (rewritten) | -| `packages/durable-session/src/collections/model-messages.ts` | `durable-session/src/collections/model-messages.ts` | ✅ | -| `packages/durable-session/src/collections/presence.ts` | `durable-session/src/collections/presence.ts` | ✅ (rewritten) | -| `packages/durable-session/src/react/index.ts` | `react-durable-session/src/index.ts` | ✅ | -| `packages/durable-session/src/react/types.ts` | `react-durable-session/src/types.ts` | ✅ | -| `packages/durable-session/src/react/use-durable-chat.ts` | `react-durable-session/src/use-durable-chat.ts` | ✅ | -| `packages/durable-session/src/react/components/ChatInput/` | Migrated from `packages/ai-chat` | ✅ | -| `packages/durable-session/src/react/components/PresenceBar/` | Migrated from `packages/ai-chat` | ✅ | - -### Files CREATED (Phase B — Claude Agent Endpoint) ✅ - -| File | Description | Status | -|---|---|---| -| `apps/streams/src/claude-agent.ts` | Claude agent HTTP endpoint (Hono, SSE response) | ✅ | -| `apps/streams/src/sdk-to-ai-chunks.ts` | SDKMessage → TanStack AI AG-UI chunk converter | ✅ | - -### Files CREATED (vendored proxy — Phase A2) ✅ - -All files below are created and typechecking. Compatibility fix: `DurableStream.append()` in published `@durable-streams/client@0.2.1` only accepts `string | Uint8Array`, not `ChangeEvent` objects. All `stream.append(event)` calls wrapped with `JSON.stringify()`. - -| Destination | Source | Status | -|---|---|---| -| `apps/streams/src/server.ts` | `durable-session-proxy/src/server.ts` | ✅ | -| `apps/streams/src/protocol.ts` | `durable-session-proxy/src/protocol.ts` | ✅ (fixed: JSON.stringify for append) | -| `apps/streams/src/types.ts` | `durable-session-proxy/src/types.ts` | ✅ | -| `apps/streams/src/handlers/index.ts` | `durable-session-proxy/src/handlers/index.ts` | ✅ | -| `apps/streams/src/handlers/send-message.ts` | `durable-session-proxy/src/handlers/send-message.ts` | ✅ | -| `apps/streams/src/handlers/invoke-agent.ts` | `durable-session-proxy/src/handlers/invoke-agent.ts` | ✅ | -| `apps/streams/src/handlers/stream-writer.ts` | `durable-session-proxy/src/handlers/stream-writer.ts` | ✅ | -| `apps/streams/src/routes/index.ts` | `durable-session-proxy/src/routes/index.ts` | ✅ | -| `apps/streams/src/routes/sessions.ts` | `durable-session-proxy/src/routes/sessions.ts` | ✅ | -| `apps/streams/src/routes/messages.ts` | `durable-session-proxy/src/routes/messages.ts` | ✅ | -| `apps/streams/src/routes/agents.ts` | `durable-session-proxy/src/routes/agents.ts` | ✅ | -| `apps/streams/src/routes/stream.ts` | `durable-session-proxy/src/routes/stream.ts` | ✅ | -| `apps/streams/src/routes/tool-results.ts` | `durable-session-proxy/src/routes/tool-results.ts` | ✅ | -| `apps/streams/src/routes/approvals.ts` | `durable-session-proxy/src/routes/approvals.ts` | ✅ | -| `apps/streams/src/routes/health.ts` | `durable-session-proxy/src/routes/health.ts` | ✅ | -| `apps/streams/src/routes/auth.ts` | `durable-session-proxy/src/routes/auth.ts` | ✅ | -| `apps/streams/src/routes/fork.ts` | `durable-session-proxy/src/routes/fork.ts` | ✅ | - -### Files DELETED ✅ - -| File | Reason | Status | -|---|---|---| -| `packages/ai-chat/` (entire package) | Replaced by `@superset/durable-session` | ✅ Removed | - -### Files DELETED (Phase A2) ✅ - -| File | Reason | Status | -|---|---|---| -| `apps/streams/src/session-registry.ts` | Replaced by proxy's built-in session management | ✅ Removed | - -### Files REWRITTEN (Phase A2) ✅ - -| File | Description | Status | -|---|---|---| -| `apps/streams/src/index.ts` | New entrypoint with Hono proxy + DurableStreamTestServer | ✅ | - -### Files REWRITTEN (Phase C2) ✅ - -| File | Description | Status | -|---|---|---| -| `apps/desktop/.../session-manager.ts` | Thin HTTP orchestrator (no StreamWatcher/Producer) | ✅ | - -### Files MODIFIED (Phase A2) ✅ - -| File | Changes | Status | -|---|---|---| -| `apps/streams/package.json` | Added: hono, @hono/node-server, @durable-streams/client, @superset/durable-session, @tanstack/db, zod | ✅ | -| `packages/durable-session/src/client.ts` | Fixed: `response.json()` return type assertion for `ForkResult` | ✅ | - -### Files MODIFIED (Phase B) ✅ - -| File | Changes | Status | -|---|---|---| -| `apps/streams/package.json` | Added: @anthropic-ai/claude-agent-sdk, @tanstack/ai | ✅ | -| `apps/streams/src/index.ts` | Added: Claude agent endpoint on STREAMS_AGENT_PORT | ✅ | - ---- - -## Implementation Order - -1. ~~**Phase A1** — Vendor `@superset/durable-session` package~~ ✅ DONE -2. ~~**Phase C1** — Remove old `packages/ai-chat`, migrate UI components~~ ✅ DONE -3. ~~**Phase A2** — Vendor proxy into `apps/streams` (copy 17 files, adjust 3 import paths)~~ ✅ DONE -4. ~~**Phase B** — Claude agent endpoint + SDK-to-AI chunk converter (2 new files)~~ ✅ DONE -5. ~~**Phase C2** — Simplify desktop session manager~~ ✅ DONE -6. ~~**Phase F** — Desktop chat UI (works with existing proxy, no DB needed)~~ ✅ DONE -7. **Phase C3** — Handle drafts (local state + typing indicators) -8. **Phase G** — Web chat UI -9. **Phase D** — Database schema + migration (persistent storage) -10. **Phase E** — API tRPC router (web session management) - ---- - -## Risks - -| Risk | Impact | Mitigation | Status | -|------|--------|------------|--------| -| `@tanstack/ai` API mismatch with vendored code | Build breaks | Vendored code uses `workspace:*` — pin to compatible published versions, fix API differences | ✅ Resolved — `DoneStreamChunk` → `RUN_FINISHED`, `LiveMode` removed | -| `@tanstack/db` unreleased aggregates | Build breaks | Rewrite collection pipelines with `groupBy + count + fn.select` workaround | ✅ Resolved — `collect`/`minStr` replaced | -| SDKMessage → AI chunk conversion errors | Broken rendering | Comprehensive unit tests with real Claude output fixtures | Pending (Phase B) | -| Dual `StreamChunk` types | Type confusion, silent mismatches at module boundaries | `sdk-to-ai-chunks.ts` imports strict `StreamChunk` from `@tanstack/ai` (union of 14 AG-UI events). `types.ts` defines a loose `{ type: string; [key: string]: unknown }` used by `protocol.ts` and `stream-writer.ts`. Works at runtime because JSON serialization is the boundary, but `protocol.ts` gets zero type safety when constructing/consuming chunks. **Fix:** delete local `StreamChunk` from `types.ts`, use `@tanstack/ai`'s everywhere, replace `as StreamChunk` casts in `protocol.ts` with typed construction (~10 call sites). | Deferred — cleanup PR | -| Claude binary path outside Electron | Agent can't start | Claude agent SDK resolves binary automatically | ✅ Resolved — CLAUDE_BINARY_PATH removed | -| Multi-turn resume state lost on restart | Context lost | In-memory map + optional file-based persistence in data dir | Pending | -| Interrupt via HTTP abort | Claude subprocess continues | Agent detects fetch abort → calls `query.interrupt()` + `abortController.abort()` | Pending | -| Proxy `workspace:*` TanStack DB deps | Import errors | Pin all `@tanstack/*` to compatible published versions across monorepo | ✅ Resolved — imports changed to `@superset/durable-session`, `DurableStream.append()` wrapped with `JSON.stringify()` | - ---- - -## API Quick Reference - -### `useDurableChat(options)` Return Type - -```typescript -interface UseDurableChatReturn { - // TanStack AI useChat-compatible - messages: UIMessage[] // All messages (reactive) - sendMessage: (content: string) => Promise - append: (message: UIMessage | { role: string; content: string }) => Promise - reload: () => Promise // Regenerate last response - stop: () => void // Stop active generations - clear: () => void // Clear local messages - isLoading: boolean // Any generation active? - error: Error | undefined - addToolResult: (result: ToolResultInput) => Promise - addToolApprovalResponse: (response: ApprovalResponseInput) => Promise - - // Durable extensions - client: DurableChatClient // Underlying client instance - collections: DurableChatCollections // All reactive collections - connectionStatus: ConnectionStatus // 'disconnected' | 'connecting' | 'connected' | 'error' - fork: (options?: ForkOptions) => Promise - registerAgents: (agents: AgentSpec[]) => Promise - unregisterAgent: (agentId: string) => Promise - connect: () => Promise - disconnect: () => void - pause: () => void - resume: () => Promise -} -``` - -### `DurableChatCollections` - -```typescript -interface DurableChatCollections { - chunks: Collection // Root — synced from stream - presence: Collection // Aggregated per-actor presence - agents: Collection // Registered webhook agents - messages: Collection // Materialized messages - toolCalls: Collection // Messages with tool-call parts - pendingApprovals: Collection // Messages with unapproved tool calls - toolResults: Collection // Messages with tool-result parts - activeGenerations: Collection // Incomplete messages - sessionMeta: Collection // Local connection state - sessionStats: Collection // Aggregate statistics -} -``` - -### `MessageRow` (from materialized messages) - -```typescript -interface MessageRow { - id: string // messageId - role: 'user' | 'assistant' | 'system' - parts: MessagePart[] // TanStack AI parts (TextPart, ToolCallPart, etc.) - actorId: string - isComplete: boolean // Has finish/done chunk been received? - createdAt: Date -} -``` - -### Proxy HTTP API - -| Method | Endpoint | Body | Response | -|---|---|---|---| -| `PUT` | `/v1/sessions/:id` | — | `{ sessionId, streamUrl }` | -| `GET` | `/v1/sessions/:id` | — | `{ sessionId, streamUrl }` | -| `DELETE` | `/v1/sessions/:id` | — | 204 | -| `POST` | `/v1/sessions/:id/messages` | `{ content, actorId?, agent? }` | `{ messageId }` | -| `POST` | `/v1/sessions/:id/stop` | `{ messageId? }` | 204 | -| `POST` | `/v1/sessions/:id/regenerate` | `{ fromMessageId, content }` | `{ success }` | -| `POST` | `/v1/sessions/:id/reset` | `{ clearPresence? }` | `{ success }` | -| `POST` | `/v1/sessions/:id/agents` | `{ agents: AgentSpec[] }` | `{ success }` | -| `GET` | `/v1/sessions/:id/agents` | — | `{ agents }` | -| `DELETE` | `/v1/sessions/:id/agents/:agentId` | — | 204 | -| `POST` | `/v1/sessions/:id/tool-results` | `{ toolCallId, output, error? }` | 204 | -| `POST` | `/v1/sessions/:id/approvals/:id` | `{ approved }` | 204 | -| `POST` | `/v1/sessions/:id/fork` | `{ atMessageId?, newSessionId? }` | `{ sessionId, offset }` | -| `POST` | `/v1/sessions/:id/login` | `{ actorId, deviceId, name? }` | `{ success }` | -| `POST` | `/v1/sessions/:id/logout` | `{ actorId, deviceId }` | `{ success }` | -| `GET` | `/v1/stream/sessions/:id` | — | SSE stream (proxied to durable stream) | -| `GET` | `/health` | — | `{ status: 'ok' }` | - ---- - -## Testing Patterns - -The vendored source includes test helpers at `packages/durable-session/tests/fixtures/test-helpers.ts`. Key patterns: - -### Mock SessionDB for Unit Tests - -```typescript -import { createMockSessionDB } from '@superset/durable-session/test-helpers' - -// Create mock with controllable collections -const { sessionDB, controllers } = createMockSessionDB('test-session') - -const client = new DurableChatClient({ - sessionId: 'test-session', - proxyUrl: 'http://localhost:4000', - sessionDB, // Inject mock — skips real stream connection -}) - -await client.connect() - -// Emit test chunks via controller -controllers.chunks.emit([{ - id: 'msg-1:0', - messageId: 'msg-1', - actorId: 'user-1', - role: 'user', - chunk: JSON.stringify({ - type: 'whole-message', - message: { id: 'msg-1', role: 'user', parts: [{ type: 'text', content: 'Hello' }] } - }), - seq: 0, - createdAt: new Date().toISOString(), -}]) -controllers.chunks.markReady() - -// Wait for live query pipeline -await new Promise(r => setTimeout(r, 40)) - -// Assert materialized messages -const messages = [...client.collections.messages.values()] -expect(messages).toHaveLength(1) -expect(messages[0].role).toBe('user') -``` - -### SDK-to-AI Chunk Conversion Tests - -Test with captured SDKMessage fixtures to verify the conversion: -```typescript -import { convertSDKMessageToSSE } from './sdk-to-ai-chunks' - -it('converts text_delta to text-delta chunk', () => { - const sdkMessage = { - type: 'stream_event', - event: { - type: 'content_block_delta', - index: 0, - delta: { type: 'text_delta', text: 'Hello' }, - }, - } - const chunks = convertSDKMessageToSSE(sdkMessage) - expect(chunks).toEqual([{ type: 'text-delta', textDelta: 'Hello' }]) -}) - -it('converts result to done chunk', () => { - const sdkMessage = { type: 'result', result: { stop_reason: 'end_turn' } } - const chunks = convertSDKMessageToSSE(sdkMessage) - expect(chunks).toEqual([{ type: 'done', finishReason: 'stop' }]) -}) -``` - ---- - -## Verification - -### Phase A1 Verification (Vendored Package) ✅ PASSED -```bash -# 1. Install deps -cd packages/durable-session && bun install -# 2. Type check vendored package — 0 errors, 0 warnings -bunx tsc --noEmit -# 3. Lint — 0 errors, 0 warnings -bun run lint:fix -``` - -### Phase A2 + B Verification (Proxy + Agent) -```bash -# 1. Start streams server -cd apps/streams && bun dev - -# 2. Health check -curl http://localhost:8080/health -# → { "status": "ok", "timestamp": "..." } - -# 3. Create session -curl -X PUT http://localhost:8080/v1/sessions/test-1 -# → { "sessionId": "test-1", "streamUrl": "/v1/stream/sessions/test-1" } - -# 4. Register Claude agent -curl -X POST http://localhost:8080/v1/sessions/test-1/agents \ - -H 'Content-Type: application/json' \ - -d '{"agents":[{"id":"claude","endpoint":"http://localhost:9090/","triggers":"user-messages"}]}' - -# 5. Send message (triggers agent) -curl -X POST http://localhost:8080/v1/sessions/test-1/messages \ - -H 'Content-Type: application/json' \ - -d '{"content":"Hello","actorId":"user-1"}' -# → { "messageId": "..." } - -# 6. Read stream (verify chunks) -curl http://localhost:8080/v1/stream/sessions/test-1 -# → SSE events with chunk data - -# 7. Stop generation -curl -X POST http://localhost:8080/v1/sessions/test-1/stop \ - -H 'Content-Type: application/json' \ - -d '{}' -``` - -### Phase C Verification (Client Integration) -1. `useDurableChat({ sessionId: "test-1", proxyUrl: "http://localhost:8080" })` → messages render -2. Interrupt: POST `/v1/sessions/test-1/stop` → generation halts, `isLoading` becomes `false` -3. Reconnection: reload page → messages replayed from stream offset (not re-fetched) -4. Multi-client: open 2 tabs → both see same messages in real-time via SSE sync -5. Presence: both tabs show in `collections.presence` diff --git a/docs/productionize-chat.md b/docs/productionize-chat.md deleted file mode 100644 index a8f11a636b2..00000000000 --- a/docs/productionize-chat.md +++ /dev/null @@ -1,474 +0,0 @@ -# Productionize Chat GUI - -Full plan to take the AI chat from prototype to production, following established deployment patterns. - -## Current State - -| Component | Location | Status | -|-----------|----------|--------| -| Streams server (Hono + Durable Streams) | `apps/streams/` | Built, `fly.toml` exists, **not in CI/CD** | -| Durable session client + `useDurableChat` | `packages/durable-session/` | Built | -| Claude agent endpoint | `apps/streams/src/claude-agent.ts` | Built | -| AI element components (39+) | `packages/ui/src/components/ai-elements/` | Built | -| Desktop chat tRPC router | `apps/desktop/src/lib/trpc/routers/ai-chat/` | Built | -| Desktop chat renderer UI | `apps/desktop/.../ChatPane/` | Built | -| Web app chat UI | `apps/web/` | **Not built** | -| Auth on streams endpoints | `apps/streams/` | **Not built** | -| Chat history DB persistence | `packages/db/src/schema/` | **Not built** | -| Streams in production CI/CD | `.github/workflows/` | **Not built** | -| Streams in preview CI/CD | `.github/workflows/` | **Not built** | -| Observability (Sentry/PostHog) | `apps/streams/` | **Not built** | - -## Established Deployment Patterns - -These are the patterns already used in the repo. Chat should follow the same conventions. - -| Concern | Pattern | Example | -|---------|---------|---------| -| Next.js apps | **Vercel** via `vercel deploy --prod --prebuilt` | `deploy-production.yml` | -| Stateful/streaming services | **Fly.io** via `fly deploy` or `superfly/fly-pr-review-apps` | `fly.toml` (ElectricSQL) | -| Database | **Neon PostgreSQL** with per-PR branch | `neondatabase/create-branch-action@v6` | -| Secrets | **GitHub Secrets** per environment (`production`, `preview`) | All workflows | -| Error tracking | **Sentry** (`@sentry/nextjs`, per-app DSN) | Web, API, Marketing, Admin, Docs, Desktop | -| Analytics | **PostHog** (`posthog-js` client, `posthog-node` server) | Web, Admin | -| CI | **GitHub Actions** — sherif, lint, test, typecheck, build | `ci.yml` | -| Preview | Full isolated env per PR (Neon branch + Electric + all Vercel apps) | `deploy-preview.yml` | -| Cleanup | Delete preview resources on PR close | `cleanup-preview.yml` | - ---- - -## Phase 1: Deploy Streams Server to Production - -**Goal:** Get `apps/streams` reliably running on Fly.io with CI/CD. - -### 1.1 Add `deploy-streams` job to `deploy-production.yml` - -The streams server already has `fly.toml` (`app = "superset-stream"`, region `iad`, port 8080). Add a deploy job that follows the same pattern as ElectricSQL. - -```yaml -# .github/workflows/deploy-production.yml -deploy-streams: - name: Deploy Streams to Fly.io - runs-on: ubuntu-latest - environment: production - - steps: - - uses: actions/checkout@v4 - - - uses: superfly/flyctl-actions/setup-flyctl@master - - - name: Deploy to Fly.io - run: flyctl deploy --config apps/streams/fly.toml --remote-only - env: - FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} - - - name: Set secrets - run: | - flyctl secrets set \ - ANTHROPIC_API_KEY="${{ secrets.ANTHROPIC_API_KEY }}" \ - STREAMS_SECRET="${{ secrets.STREAMS_SECRET }}" \ - --app superset-stream - env: - FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} -``` - -**Secrets to add to GitHub:** -- `STREAMS_SECRET` — bearer token for auth (generate a random 64-char string) - -**Env vars to add to `.env.example`:** -- `STREAMS_URL` — production URL (e.g., `https://superset-stream.fly.dev`) -- `STREAMS_SECRET` — bearer token for authenticated requests - -### 1.2 Add streams to preview deployments (`deploy-preview.yml`) - -Follow the ElectricSQL preview pattern with `superfly/fly-pr-review-apps`: - -```yaml -deploy-streams-preview: - name: Deploy Streams (Fly.io) - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - - name: Deploy Streams preview to Fly.io - uses: superfly/fly-pr-review-apps@1.3.0 - with: - name: superset-stream-pr-${{ github.event.pull_request.number }} - region: iad - org: ${{ vars.FLY_ORG }} - config: apps/streams/fly.toml - secrets: | - ANTHROPIC_API_KEY=${{ secrets.ANTHROPIC_API_KEY }} - STREAMS_SECRET=${{ secrets.STREAMS_SECRET }} - env: - FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} -``` - -Add to preview env: -``` -STREAMS_ALIAS: superset-stream-pr-${{ github.event.pull_request.number }}.fly.dev -``` - -### 1.3 Add streams cleanup to `cleanup-preview.yml` - -```yaml -- name: Destroy Streams Fly.io app - uses: superfly/fly-pr-review-apps@1.3.0 - with: - name: superset-stream-pr-${{ github.event.pull_request.number }} - org: ${{ vars.FLY_ORG }} - env: - FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} -``` - -### 1.4 Verify `fly.toml` is production-ready - -Current config is reasonable. Consider bumping resources for production: - -```toml -# apps/streams/fly.toml -[[vm]] - memory = "512mb" # bump from 256mb for production - cpu_kind = "shared" - cpus = 1 -``` - -The `auto_stop_machines = "stop"` + `auto_start_machines = true` + `min_machines_running = 1` config is correct — one machine always warm, extras auto-scale. - ---- - -## Phase 2: Auth on Streams Endpoints - -**Goal:** Prevent unauthorized access to chat sessions. - -### 2.1 Bearer token middleware - -Add a Hono middleware in `apps/streams/src/server.ts` that validates the `Authorization: Bearer ` header against `STREAMS_SECRET`. This is the simplest approach that works for both desktop and web clients. - -``` -Request flow: - Client → Bearer token in header → Streams server validates → Allow/Deny -``` - -**Scope:** All `/v1/sessions/*` routes. Exclude `/health`. - -### 2.2 Per-user session isolation (future) - -For multi-tenant production, sessions should be scoped to authenticated users. This requires: -1. Pass the user's session token (from Better Auth) to the streams server -2. Streams server validates the token against the API (`apps/api`) -3. Session IDs are prefixed/scoped per user - -This can be a follow-up — bearer token auth is sufficient for initial launch. - ---- - -## Phase 3: Web App Chat UI - -**Goal:** Add chat to `apps/web` using the same components and hooks already built. - -### 3.1 Chat route - -Create a chat page in the web app. The exact route depends on product decisions (standalone `/chat` page vs. embedded panel), but the wiring is the same: - -``` -apps/web/src/app/(app)/chat/ -├── page.tsx # Chat page -├── components/ -│ └── ChatView/ -│ ├── ChatView.tsx # Main chat container -│ └── index.ts -└── hooks/ - └── useChatSession/ - ├── useChatSession.ts # Session lifecycle - └── index.ts -``` - -### 3.2 Wire up `useDurableChat` - -The hook from `packages/durable-session` is client-agnostic. Connect it to the streams server: - -```typescript -const { messages, sendMessage, isLoading, stop } = useDurableChat({ - sessionId, - proxyUrl: env.NEXT_PUBLIC_STREAMS_URL, - autoConnect: true, - stream: { - headers: { Authorization: `Bearer ${authToken}` }, - }, -}); -``` - -### 3.3 Reuse `packages/ui/src/components/ai-elements/` - -The 39+ AI element components are already published from `packages/ui`. Import and compose: - -- `conversation.tsx` — message list container -- `message.tsx` — individual messages -- `prompt-input.tsx` — input with file attachment -- `tool-call.tsx`, `bash-tool.tsx`, etc. — tool rendering -- `reasoning.tsx` — extended thinking display -- `model-selector.tsx` — model picker - -### 3.4 Add `NEXT_PUBLIC_STREAMS_URL` to web deployment - -Add to `deploy-production.yml` `deploy-web` job and `deploy-preview.yml`: - -``` -NEXT_PUBLIC_STREAMS_URL=https://superset-stream.fly.dev # production -NEXT_PUBLIC_STREAMS_URL=https://superset-stream-pr-{N}.fly.dev # preview -``` - ---- - -## Phase 4: Chat History Persistence - -**Goal:** Persist completed chat sessions to PostgreSQL so users can browse history. - -### 4.1 Database schema - -Add tables to `packages/db/src/schema/`: - -```typescript -// chat-sessions table -export const chatSessions = pgTable("chat_sessions", { - id: text("id").primaryKey(), // durable stream session ID - userId: text("user_id").notNull().references(() => users.id), - workspaceId: text("workspace_id"), // optional, for workspace-scoped chats - title: text("title"), // auto-generated or user-edited - model: text("model"), // e.g. "claude-sonnet-4-5-20250929" - messageCount: integer("message_count").default(0), - createdAt: timestamp("created_at").defaultNow().notNull(), - updatedAt: timestamp("updated_at").defaultNow().notNull(), -}); - -// chat-messages table (optional — only if search/export needed) -export const chatMessages = pgTable("chat_messages", { - id: text("id").primaryKey(), // message ID from durable stream - sessionId: text("session_id").notNull().references(() => chatSessions.id), - role: text("role").notNull(), // "user" | "assistant" - content: text("content"), // plain text content - parts: jsonb("parts"), // full TanStack AI MessagePart[] - createdAt: timestamp("created_at").defaultNow().notNull(), -}); -``` - -### 4.2 Persist on session end - -When a chat session completes (all generations done), the streams server writes the session summary to the database. Two approaches: - -**Option A: Webhook from streams to API.** Streams server POSTs to `apps/api` on session end. API writes to DB. This keeps DB access in the API layer. - -**Option B: Direct write from streams.** Streams server connects to Neon directly. Simpler, but requires `DATABASE_URL` in streams env. - -Recommend **Option A** — follows existing separation of concerns (API owns DB writes). - -### 4.3 Chat history API - -Add a tRPC router in `apps/api` for listing/searching chat sessions: - -``` -chatSession.list → paginated list for current user -chatSession.get → single session with messages -chatSession.rename → update title -chatSession.delete → soft delete -``` - ---- - -## Phase 5: Observability - -**Goal:** Match the monitoring level of other production services. - -### 5.1 Sentry for streams server - -Add `@sentry/bun` (or `@sentry/node`) to `apps/streams`: - -```typescript -import * as Sentry from "@sentry/bun"; - -Sentry.init({ - dsn: process.env.SENTRY_DSN_STREAMS, - environment: process.env.NODE_ENV, - tracesSampleRate: 0.1, -}); -``` - -**Secret to add:** `SENTRY_DSN_STREAMS` — create a new Sentry project under `superset-sh` org. - -### 5.2 PostHog events - -Track key chat events server-side from streams: - -| Event | Properties | -|-------|------------| -| `chat_session_started` | `sessionId`, `model`, `userId` | -| `chat_message_sent` | `sessionId`, `role`, `messageLength` | -| `chat_tool_executed` | `sessionId`, `toolName`, `approved` | -| `chat_session_ended` | `sessionId`, `messageCount`, `durationMs` | -| `chat_error` | `sessionId`, `errorType`, `errorMessage` | - -Use `posthog-node` (already a dependency in the monorepo). - -### 5.3 Structured logging - -Replace `console.log` with structured JSON logs following existing patterns: - -```typescript -console.log("[streams/session] Session started:", { sessionId, model, userId }); -console.error("[streams/agent] Claude SDK error:", { sessionId, error: err.message }); -``` - ---- - -## Phase 6: Hardening - -**Goal:** Production safety for user-facing traffic. - -### 6.1 CORS - -Configure Hono CORS middleware in `apps/streams/src/server.ts`: - -```typescript -app.use("*", cors({ - origin: [ - process.env.ALLOWED_ORIGIN_WEB, // e.g. https://app.superset.sh - process.env.ALLOWED_ORIGIN_DESKTOP, // electron:// or localhost for dev - ].filter(Boolean), - credentials: true, -})); -``` - -### 6.2 Rate limiting - -Rate limit at the session/message level: - -| Endpoint | Limit | -|----------|-------| -| `PUT /v1/sessions/:id` (create) | 10/min per user | -| `POST /v1/sessions/:id/messages` (send) | 30/min per session | - -Implement with Upstash Redis (already used in the monorepo via `KV_REST_API_URL`): - -```typescript -import { Ratelimit } from "@upstash/ratelimit"; -import { Redis } from "@upstash/redis"; - -const ratelimit = new Ratelimit({ - redis: Redis.fromEnv(), - limiter: Ratelimit.slidingWindow(30, "1 m"), -}); -``` - -### 6.3 Input validation - -- Max message length (e.g., 100KB) -- Session ID format validation (UUID) -- Model allowlist validation -- Sanitize tool outputs before persisting - -### 6.4 Graceful shutdown - -Handle `SIGTERM` in the streams server for Fly.io rolling deploys: - -```typescript -process.on("SIGTERM", async () => { - console.log("[streams] SIGTERM received, draining connections..."); - // Stop accepting new sessions - // Wait for active generations to complete (with timeout) - // Close durable stream connections - process.exit(0); -}); -``` - ---- - -## Phase 7: Desktop App Updates - -**Goal:** Point desktop chat at production streams server. - -### 7.1 Config resolution - -Desktop tRPC router (`apps/desktop/src/lib/trpc/routers/ai-chat/`) already has a `getConfig()` procedure that returns `{ proxyUrl, authToken }`. Update it to: - -1. Read `STREAMS_URL` from `.env` (loaded in main process) -2. Pass the user's auth token (from Better Auth desktop flow) - -### 7.2 Desktop auto-update - -Desktop canary builds (`release-desktop-canary.yml`) will pick up chat changes automatically since chat code lives in `apps/desktop/src/renderer/`. No workflow changes needed. - ---- - -## Implementation Order - -``` -Phase 1: Deploy Streams (CI/CD) - ├── 1.1 Add deploy-streams to deploy-production.yml - ├── 1.2 Add deploy-streams-preview to deploy-preview.yml - ├── 1.3 Add cleanup to cleanup-preview.yml - └── 1.4 Verify fly.toml resources - -Phase 2: Auth - └── 2.1 Bearer token middleware on streams - -Phase 3: Web Chat UI - ├── 3.1 Chat route in apps/web - ├── 3.2 Wire useDurableChat - ├── 3.3 Compose ai-elements - └── 3.4 Add STREAMS_URL to web deploys - -Phase 4: Persistence - ├── 4.1 DB schema (chatSessions, chatMessages) - ├── 4.2 Session-end webhook → API → DB - └── 4.3 Chat history tRPC router - -Phase 5: Observability - ├── 5.1 Sentry in streams - ├── 5.2 PostHog events - └── 5.3 Structured logging - -Phase 6: Hardening - ├── 6.1 CORS - ├── 6.2 Rate limiting (Upstash) - ├── 6.3 Input validation - └── 6.4 Graceful shutdown - -Phase 7: Desktop updates - ├── 7.1 Config resolution for production URL - └── 7.2 Desktop auto-update (no changes needed) -``` - -Phases 1-2 are prerequisites. Phases 3-7 can largely be parallelized. - ---- - -## Secrets Checklist - -New secrets to add to GitHub (production + preview environments): - -| Secret | Purpose | Where | -|--------|---------|-------| -| `STREAMS_SECRET` | Bearer auth for streams API | `apps/streams`, clients | -| `SENTRY_DSN_STREAMS` | Error tracking for streams | `apps/streams` | -| `NEXT_PUBLIC_STREAMS_URL` | Streams server URL (client-side) | `apps/web` | -| `STREAMS_URL` | Streams server URL (server-side) | `apps/api` (for webhooks) | - -Existing secrets already available: -- `ANTHROPIC_API_KEY` — already in GitHub secrets -- `FLY_API_TOKEN` — already used for ElectricSQL -- `KV_REST_API_URL` / `KV_REST_API_TOKEN` — already used for rate limiting -- `POSTHOG_API_KEY` — already used across apps - ---- - -## Risks & Mitigations - -| Risk | Impact | Mitigation | -|------|--------|------------| -| Fly.io single machine for streams | Downtime during deploys | `min_machines_running = 1` + rolling deploys. Scale to 2 machines if latency matters. | -| LMDB data on Fly.io volumes | Data loss if volume fails | Durable streams are ephemeral by design. Persist completed sessions to PostgreSQL (Phase 4). | -| Anthropic API rate limits | Users blocked from chatting | Per-user rate limiting (Phase 6.2), queue overflow to retry, display user-facing error. | -| Claude Agent SDK instability | Agent crashes mid-conversation | Sentry alerts (Phase 5.1), session resumption (already built into SDK). | -| Cost runaway (Anthropic tokens) | Unexpected bills | Budget limits via `maxBudgetUsd` in agent config (already supported), admin dashboard monitoring. | diff --git a/docs/realign-streams-architecture.md b/docs/realign-streams-architecture.md deleted file mode 100644 index 47df9abff9a..00000000000 --- a/docs/realign-streams-architecture.md +++ /dev/null @@ -1,405 +0,0 @@ -# Realign Architecture — SDK Runs Locally on Desktop - -## Context - -The original design has Claude SDK running locally on the desktop (access to user's filesystem, credentials, keychain). The current implementation mistakenly put the SDK on the Fly.io server via `apps/streams/src/claude-agent.ts`, meaning the proxy both manages durable streams AND runs the agent. This breaks when deployed — the SDK on Fly.io can't access the user's local files or credentials. - -**Goal**: Move Claude Agent SDK execution from the streams server to a shared package consumed by desktop. The streams server becomes a pure durable streams layer (message persistence, SSE fan-out, auth). The desktop runs the SDK locally and writes streaming chunks back to the proxy. - -## Architecture Principle - -**Start simple, add abstraction when needed.** We're creating a shared `packages/agent` package that desktop consumes directly. When we need to support sandboxes or cloud workers in the future, we'll add abstraction layers then (follow the "three instances" heuristic). - -## Migration Summary - -1. **Create `packages/agent`** - Move agent logic from `apps/streams` to shared package -2. **Desktop consumes it** - Import `executeAgent()` and call with local context -3. **Streams becomes pure proxy** - Only handles chunk writing, SSE, session management -4. **Future**: Add abstraction layer when we have 2+ concrete use cases (desktop + sandbox, desktop + cloud worker, etc.) - -## New Architecture - -``` -packages/agent/ (shared package) -├── agent-executor.ts # Core SDK execution logic -├── sdk-to-ai-chunks.ts # SDK event → stream chunk conversion -├── session-store.ts # Session state management -├── permission-manager.ts # Permission/approval handling -└── types.ts # Shared types - -Desktop (Electron main process) -├── Imports @superset/agent -├── Calls executeAgent() with local context (cwd, env, credentials) -├── POSTs each chunk to proxy: POST /v1/sessions/:id/chunks -├── Handles permissions/approvals locally via tRPC events -└── Uses user's local credentials (buildClaudeEnv — keychain, config, env) - -Streams Proxy (Fly.io) — pure durable streams -├── Session management (create, delete, fork) -├── Chunk-writing endpoint (NEW — accepts chunks from desktop) -├── SSE fan-out to all clients -├── Auth middleware (Bearer token on /v1/*) -└── Stop generation (aborts active generation controllers) -``` - -## Chunk Flow - -``` -1. User sends message → Desktop writes to proxy (existing POST /messages) -2. Desktop runs SDK locally with user's cwd + credentials -3. For each SDK event → convert to StreamChunk → POST /v1/sessions/:id/chunks -4. Proxy writes chunk to durable stream → SSE fan-out to all clients -5. Stop → Desktop calls tRPC interrupt → aborts local AbortController → SDK stops -``` - ---- - -## Part A: Streams Server Changes (remove agent, add chunk endpoint) - -### A1. Delete agent-specific files from `apps/streams/src/` - -| File | Reason | -|------|--------| -| `claude-agent.ts` | Moves to `packages/agent` | -| `sdk-to-ai-chunks.ts` | Moves to `packages/agent` | -| `claude-session-store.ts` | Moves to `packages/agent` | -| `notification-hooks.ts` | Delete (desktop emits events directly, no HTTP webhooks) | -| `permission-manager.ts` | Moves to `packages/agent` | - -### A2. Clean up `env.ts` - -Remove `ANTHROPIC_API_KEY` and `STREAMS_AGENT_PORT` — no longer needed on server. - -### A3. Clean up `index.ts` - -Remove the agent HTTP server (`agentServer` on `STREAMS_AGENT_PORT`). Keep only the proxy server. - -### A4. Clean up `protocol.ts` - -Remove server-side agent invocation methods: -- `setupReactiveAgentTrigger()` — desktop handles trigger -- `invokeAgent()` — desktop runs SDK directly -- `streamAgentResponse()` — desktop writes chunks via HTTP -- `notifyRegisteredAgents()` — desktop handles trigger - -**Keep**: `writeChunk`, `writeUserMessage`, `writeToolResult`, `writeApprovalResponse`, agent registration methods (future web use), `stopGeneration`, session management, forking. - -### A5. Add chunk-writing route - -New `POST /v1/sessions/:id/chunks` endpoint in routes: - -```typescript -// Accepts: { messageId, actorId, role, chunk, txid? } -// Calls protocol.writeChunk() — reuses existing durable stream write logic -``` - -Also add generation lifecycle endpoints: -``` -POST /v1/sessions/:id/generations/start → { messageId } (creates messageId, tracks active) -POST /v1/sessions/:id/generations/finish → 204 (clears active generation) -``` - -### A6. Update env / CI files - -| File | Change | -|------|--------| -| `.env` | Remove `STREAMS_AGENT_PORT` | -| `.env.example` | Remove `STREAMS_AGENT_PORT` | -| `.github/workflows/deploy-production.yml` | Remove `ANTHROPIC_API_KEY` from `flyctl secrets set` | -| `.github/workflows/deploy-preview.yml` | Remove `ANTHROPIC_API_KEY` from secrets | - ---- - -## Part B: Create Shared Agent Package - -### B1. Create `packages/agent/` structure - -New package: `packages/agent/` - -``` -packages/agent/ -├── package.json -├── tsconfig.json -├── src/ -│ ├── agent-executor.ts # Main entry — executeAgent() -│ ├── sdk-to-ai-chunks.ts # From apps/streams (as-is) -│ ├── session-store.ts # From apps/streams (make storage path configurable) -│ ├── permission-manager.ts # From apps/streams (as-is) -│ ├── types.ts # Shared types -│ └── index.ts # Barrel export -└── README.md -``` - -### B2. Move files from `apps/streams/src/` to `packages/agent/src/` - -| Source File | Destination | Changes | -|-------------|-------------|---------| -| `claude-agent.ts` | `agent-executor.ts` | Strip Hono HTTP server, keep core `query()` logic | -| `sdk-to-ai-chunks.ts` | `sdk-to-ai-chunks.ts` | Move as-is (pure conversion logic) | -| `claude-session-store.ts` | `session-store.ts` | Make storage path configurable (inject via constructor) | -| `permission-manager.ts` | `permission-manager.ts` | Move as-is (in-memory promise pattern) | - -### B3. `agent-executor.ts` interface - -```typescript -export interface ExecuteAgentParams { - sessionId: string; - prompt: string; - model?: string; - cwd: string; - env: Record; - permissionMode?: "default" | "acceptEdits" | "bypassPermissions"; - - // Callbacks for environment-specific behavior - onChunk: (chunk: StreamChunk) => Promise; - onPermissionRequest?: (params: PermissionRequestParams) => Promise; - onEvent?: (event: AgentEvent) => void; - - // Optional - resume?: boolean; - allowedTools?: string[]; - disallowedTools?: string[]; - maxBudgetUsd?: number; - signal?: AbortSignal; -} - -export async function executeAgent(params: ExecuteAgentParams): Promise { - // 1. Get claudeSessionId from session store (for resume) - // 2. Build SDK options from params - // 3. Create converter with onChunk callback - // 4. Call query() with options - // 5. Handle errors, cleanup -} -``` - -### B4. `packages/agent/package.json` - -```json -{ - "name": "@superset/agent", - "version": "0.0.1", - "private": true, - "main": "./src/index.ts", - "dependencies": { - "@anthropic-ai/claude-agent-sdk": "^0.2.38", - "zod": "^4.3.5" - } -} -``` - ---- - -## Part C: Desktop Integration - -### C1. Update session manager - -`apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager.ts`: - -```typescript -import { executeAgent } from "@superset/agent"; - -class SessionManager { - private runningAgents = new Map(); - - async startAgent(sessionId: string, prompt: string) { - const session = this.sessions.get(sessionId); - if (!session) throw new Error("Session not found"); - - const controller = new AbortController(); - this.runningAgents.set(sessionId, controller); - - try { - await executeAgent({ - sessionId, - prompt, - cwd: session.cwd, - env: buildClaudeEnv(), - model: session.model, - permissionMode: session.permissionMode, - signal: controller.signal, - - // Write chunks to streams server - onChunk: async (chunk) => { - await fetch(`${this.streamsUrl}/v1/sessions/${sessionId}/chunks`, { - method: "POST", - headers: { - "Content-Type": "application/json", - "Authorization": `Bearer ${session.authToken}`, - }, - body: JSON.stringify({ - messageId: chunk.messageId, - actorId: "claude", - role: "assistant", - chunk, - }), - }); - }, - - // Handle permissions locally via tRPC - onPermissionRequest: async (params) => { - return this.requestPermission(sessionId, params); - }, - - // Emit events for renderer - onEvent: (event) => { - this.eventEmitter.emit("agent-event", { sessionId, event }); - }, - }); - } finally { - this.runningAgents.delete(sessionId); - } - } - - async stopAgent(sessionId: string) { - const controller = this.runningAgents.get(sessionId); - if (controller) { - controller.abort(); - } - } - - private async requestPermission( - sessionId: string, - params: PermissionRequestParams - ): Promise { - // Create permission request, emit event to renderer - // Wait for tRPC mutation response - // Return PermissionResult ({ behavior: "allow" | "deny", ... }) - } -} -``` - -### C2. Delete `agent-provider/` directory - -Remove `apps/desktop/src/lib/trpc/routers/ai-chat/utils/agent-provider/`: -- `claude-sdk-provider.ts` — replaced by direct `executeAgent()` calls -- `types.ts` — `AgentProvider`, `AgentRegistration` interfaces no longer needed -- `index.ts` — barrel export - -### C3. Update tRPC router - -Add mutations for local permission handling: - -```typescript -export const aiChatRouter = router({ - // ... existing routes - - approveToolUse: protectedProcedure - .input(z.object({ - sessionId: z.string(), - toolUseId: z.string(), - approved: z.boolean(), - updatedInput: z.record(z.unknown()).optional(), - })) - .mutation(({ input }) => { - // Resolve permission in session manager - sessionManager.resolvePermission(input.sessionId, input.toolUseId, { - approved: input.approved, - updatedInput: input.updatedInput, - }); - }), - - interrupt: protectedProcedure - .input(z.object({ sessionId: z.string() })) - .mutation(({ input }) => { - sessionManager.interrupt({ sessionId: input.sessionId }); - }), -}); -``` - -### C4. Add package dependency - -Add to `apps/desktop/package.json`: - -```json -{ - "dependencies": { - "@superset/agent": "workspace:*" - } -} -``` - ---- - -## Permission/Approval Flow (Local) - -Old: SDK → agent endpoint (Fly.io) → SSE to proxy → client → HTTP back → resolve -New: SDK → `onPermissionRequest` callback (local) → tRPC event → renderer UI → tRPC mutation → resolve - -1. SDK calls `onPermissionRequest()` callback on desktop -2. Callback creates a pending permission promise and emits event -3. Permission request chunk is written to proxy → renderer sees it via SSE -4. User approves/denies in renderer UI -5. Renderer calls `approveToolUse` tRPC mutation -6. Mutation resolves pending permission promise locally -7. SDK continues -8. Also write approval chunk to proxy so other clients see it - -## Stop Generation Flow - -- **From desktop**: `runner.interrupt()` → abort local AbortController → SDK stops → also calls proxy `/stop` as fallback -- **From web/mobile**: Not yet implemented (proxy `/stop` endpoint exists but has no effect on desktop agent — requires future cross-client signaling) - ---- - -## Part D: Future Extensibility (When Needed) - -When we add sandboxes or cloud workers, we'll introduce abstraction layers: - -``` -packages/agent-runtime/ # Future: abstraction layer -├── runtime/ -│ ├── agent-runtime.ts # Interface: AgentRuntime -│ └── base-runtime.ts # Base implementation -├── transports/ -│ ├── http-transport.ts # POST chunks via HTTP -│ └── ipc-transport.ts # Write via tRPC -└── environments/ - ├── desktop-context.ts # Desktop capabilities - └── sandbox-context.ts # Sandbox capabilities - -apps/desktop/ → uses DesktopAgentRuntime -apps/sandbox-worker/ → uses SandboxAgentRuntime -``` - -**Don't build this yet.** Only add it when we have concrete requirements for 2+ environments. See "three instances" heuristic in AGENTS.md. - ---- - -## Verification - -1. **Shared package builds:** - ```bash - cd packages/agent - bun run typecheck # Should compile without errors - ``` - -2. **Desktop imports and runs agent:** - ```typescript - import { executeAgent } from "@superset/agent"; - // Should have type inference for all params - ``` - -3. **Proxy works as pure durable stream layer:** - ```bash - curl http://localhost:8080/health # 200 - curl -H "Authorization: Bearer $TOKEN" -X PUT http://localhost:8080/v1/sessions/test - curl -H "Authorization: Bearer $TOKEN" -X POST http://localhost:8080/v1/sessions/test/chunks \ - -d '{"messageId":"m1","actorId":"claude","role":"assistant","chunk":{"type":"text-delta","textDelta":"Hi"}}' - ``` - -4. **Desktop runs SDK locally:** - - Start desktop + streams, open chat, send message - - Desktop console shows `[agent/executor] Running query...` - - Chunks POST to `/v1/sessions/:id/chunks` - - Chunks appear in durable stream SSE - - Response renders in UI - -5. **Permissions work locally:** - - Set permission mode to "default", trigger tool use - - `onPermissionRequest` callback fires → tRPC event emitted - - Approval UI appears in renderer - - User approves → tRPC mutation → promise resolves - - SDK continues execution - -6. **Stop works across clients:** - - Start generation → call tRPC `interrupt` mutation → `sessionManager.interrupt()` → AbortController aborts → SDK stops diff --git a/docs/streaming-performance-reliability-recommendations.md b/docs/streaming-performance-reliability-recommendations.md deleted file mode 100644 index 36e2bcee3f0..00000000000 --- a/docs/streaming-performance-reliability-recommendations.md +++ /dev/null @@ -1,189 +0,0 @@ -# Streaming Performance and Reliability Recommendations - -This is the complete recommendation list for the current desktop + streams architecture and PR review. - -## Critical correctness fixes - -1. ~~Fail `/generations/finish` when producer background errors occurred earlier in the run (not just log them).~~ DONE -2. ~~In desktop, check `res.ok` for `/generations/finish`; treat non-2xx as failure.~~ DONE -3. ~~Make `deleteSession` await producer drain/detach before returning `204`.~~ DONE -4. ~~Flush producer before reset/control events so reset never races ahead of queued chunks.~~ DONE -5. ~~Use one write path per session (prefer producer) for all session events to preserve global ordering.~~ DONE -6. ~~Clear per-message seq state after normal assistant completion to avoid unbounded `messageSeqs` growth.~~ DONE -7. ~~Add abort signal to chunk POSTs so interrupt cancels in-flight sends quickly.~~ DONE -8. Decide API semantics explicitly: `/chunks` should be `202 Accepted` (async ack) or `200` only after durable write. -9. ~~If finish fails, emit an explicit terminal error marker so UI does not show a silent done.~~ DONE -10. ~~Guard session close/reset/delete with a per-session mutex to avoid concurrent lifecycle races.~~ DONE - -## Performance improvements (start streaming + stream path) - -11. ~~Remove `/generations/start` round trip; generate `messageId` client-side.~~ DONE -12. ~~Add `/chunks/batch` endpoint to reduce per-chunk HTTP overhead.~~ DONE -13. ~~Coalesce adjacent text deltas on desktop (small time/size window).~~ DONE (ChunkBatcher 5ms linger) -14. Replace per-chunk POST with one streaming upload channel (NDJSON or WebSocket) per generation. -15. ~~Tune `IdempotentProducer` params (`lingerMs`, `maxBatchBytes`, `maxInFlight`) using load tests.~~ DONE (lingerMs=1, maxInFlight=5) -16. Reuse HTTP connections aggressively (keep-alive/pooling) for desktop to proxy writes. -17. Optionally compress large chunk payloads. -18. Optionally drop/coalesce low-value chunks (for example verbose reasoning deltas) under pressure. -19. ~~Avoid unnecessary stringify/parse hops where possible in hot paths.~~ DONE (batch endpoint skips Zod) -20. ~~Add bounded queueing in desktop to prevent memory growth when proxy/network slows.~~ DONE (ChunkBatcher maxBufferSize=2000) - -## Reliability and retry model - -21. ~~Add retry with backoff for transient chunk POST failures.~~ DONE (ChunkBatcher 3 retries, 50ms base exponential) -22. ~~Add idempotency keys on chunk writes so retries do not duplicate logical chunks.~~ DONE (IdempotentProducer provides this via autoClaim/epoch) -23. ~~Track a per-session producer unhealthy state and fail fast until recovered.~~ DONE (producerHealthy map) -24. ~~Add fallback mode: switch to synchronous `stream.append` if producer repeatedly errors.~~ DONE (appendToStream checks producerHealthy) -25. ~~Fence stale writers with a generation token returned at generation start.~~ DONE (activeGenerationIds tracking) -26. ~~Ensure seq handling survives process restarts (or move seq assignment to client message stream).~~ DONE (IdempotentProducer autoClaim handles epoch) -27. ~~Add explicit chunk ordering guarantees in API contract.~~ DONE (IdempotentProducer provides ordering; ChunkBatcher sendChain preserves order) -28. ~~Add timeout + clear error for flush/finish so runs do not hang indefinitely.~~ DONE (FLUSH_TIMEOUT_MS = 10s) - -## Protocol/API cleanups - -29. ~~Collapse `start/chunks/finish` into one generation lifecycle API with explicit generation id.~~ DONE (removed /generations/start; generation auto-registers from first chunk) -30. ~~Add an optional strict-ack endpoint (`txid`) for flows that need synced-to-stream confirmation.~~ DONE (already in use via writeUserMessage txid pattern) -31. ~~Standardize terminal semantics (`done` vs `message-end` vs `stop/error`) and document one canonical end signal.~~ DONE (documented in types.ts: `message-end` = UI signal, `/finish` = server cleanup) -32. ~~Return structured error codes from finish/flush routes for better client behavior.~~ DONE (all routes have `code` field) -33. ~~Define whether `/chunks` supports multi-writer per session; enforce if single-writer.~~ DONE (single-writer via activeGenerationIds) -34. ~~Add request/session/message IDs in all responses for tracing.~~ DONE - -## Observability - -35. Add metrics: queue depth, enqueue-to-flush latency, finish latency, dropped/retried chunks. -36. Add error counters: producer onError, finish failures, delete/reset race failures. -37. Add tracing context: `sessionId`, `messageId`, generation id, request id in logs. -38. Add SLO dashboards for time to first visible token and finish success rate. -39. Alert on rising async-ack failures (`200` or `202` accepted but later flush failed). -40. Sample payload size histograms to guide batching/coalescing thresholds. - -## Tests to add - -41. Integration test: producer error during stream causes finish to fail. -42. Integration test: delete waits for producer drain. -43. Race test: reset/delete during active streaming does not reorder/corrupt stream. -44. Load test: long responses (thousands of chunks) with bounded memory. -45. Chaos test: intermittent network failure with retries + idempotency. -46. Benchmark: current per-chunk POST vs batch vs streaming-upload modes. - -## Rollout strategy - -47. Ship behind a feature flag for producer async-ack behavior. -48. Canary compare metrics before/after (time to first token, finish failure, chunk loss). -49. Keep a runtime toggle to force synchronous append as emergency fallback. -50. Document an operational runbook for flush failures and stuck sessions. - -## Non-stream PR issue - -51. `core.hooksPath=/dev/null` is not cross-platform (fails on Windows); use OS-specific null device handling. - -## Sources - -### External references - -- Durable Sessions blog post: - - https://electric-sql.com/blog/2026/01/12/durable-sessions-for-collaborative-ai -- Transport repo (Durable Session client, proxy, materialization, transport resume): - - https://github.com/electric-sql/transport - - https://raw.githubusercontent.com/electric-sql/transport/main/packages/durable-session/src/client.ts - - https://raw.githubusercontent.com/electric-sql/transport/main/packages/durable-session/src/collections/messages.ts - - https://raw.githubusercontent.com/electric-sql/transport/main/packages/durable-session/src/materialize.ts - - https://raw.githubusercontent.com/electric-sql/transport/main/packages/durable-session-proxy/src/protocol.ts - - https://raw.githubusercontent.com/electric-sql/transport/main/packages/transport/src/client.ts - - https://raw.githubusercontent.com/electric-sql/transport/main/packages/transport/src/stream.ts -- Electric examples (txid sync confirmation pattern): - - https://github.com/electric-sql/electric - - https://raw.githubusercontent.com/electric-sql/electric/main/examples/burn/assets/src/db/mutations.ts - - https://raw.githubusercontent.com/electric-sql/electric/main/examples/burn/assets/src/db/transaction.ts -- Durable Streams producer behavior: - - https://raw.githubusercontent.com/durable-streams/durable-streams/main/packages/client/src/idempotent-producer.ts - -### Internal references (this repo) - -- Stream protocol and producer usage: - - `apps/streams/src/protocol.ts` -- Chunk/start/finish routes: - - `apps/streams/src/routes/chunks.ts` -- Desktop chunk send ordering + finish call path: - - `apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts` -- Worktree hooks bypass change and tests: - - `apps/desktop/src/lib/trpc/routers/workspaces/utils/git.ts` - - `apps/desktop/src/lib/trpc/routers/workspaces/utils/git.test.ts` - -### Source mapping by recommendation numbers - -- `1-4`, `8-9`, `11`, `20`, `31`, `32`, `34`: supported by current implementation details in `apps/streams/src/protocol.ts`, `apps/streams/src/routes/chunks.ts`, and `apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts`. -- `15`, `21-24`, `27-28`: informed by `IdempotentProducer` semantics in durable-streams client (`idempotent-producer.ts`) covering batching, pipelining, retries, and error surfaces. -- `30`: based on txid + wait-for-sync patterns in `packages/durable-session/src/client.ts` and Electric example `examples/burn/assets/src/db/mutations.ts`. -- `3`, `5`, `29`, `31`: informed by durable-session/proxy protocol design and materialization pipeline in `packages/durable-session-proxy/src/protocol.ts`, `packages/durable-session/src/collections/messages.ts`, and `packages/durable-session/src/materialize.ts`. -- `11-14`: reinforced by durable transport patterns for resumable streaming in `packages/transport/src/client.ts` and `packages/transport/src/stream.ts`. -- `51`: based on current repo changes and tests in `apps/desktop/src/lib/trpc/routers/workspaces/utils/git.ts` and `apps/desktop/src/lib/trpc/routers/workspaces/utils/git.test.ts`. - -Recommendations not explicitly mapped above are engineering suggestions derived from standard distributed systems and streaming architecture tradeoffs, not direct one-to-one source prescriptions. - -## Adoption classification by item - -Legend: - -- `Local`: implement directly in this repo. -- `Vendor`: copy/adapt patterns from `electric-sql/transport` into workspace packages (`@superset/durable-session` / `apps/streams`) to keep tight control. -- `Package`: use upstream package capability directly (no vendoring). - -1. Local -2. Local -3. Local -4. Local -5. Local -6. Local -7. Local -8. Local -9. Local -10. Local -11. Local -12. Local -13. Local -14. Local -15. Package -16. Local -17. Local -18. Local -19. Local -20. Local -21. Local -22. Local -23. Local -24. Local -25. Local -26. Local -27. Local -28. Local -29. Vendor -30. Package -31. Vendor -32. Local -33. Local -34. Local -35. Local -36. Local -37. Local -38. Local -39. Local -40. Local -41. Local -42. Local -43. Local -44. Local -45. Local -46. Local -47. Local -48. Local -49. Local -50. Local -51. Local - -### Notes on `Vendor` and `Package` items - -- `15 (Package)`: use `@durable-streams/client` producer tuning knobs (`lingerMs`, `maxBatchBytes`, `maxInFlight`) directly. -- `29 (Vendor)`: if you collapse lifecycle APIs, adapt from `durable-session-proxy` patterns rather than hard-switching architecture. -- `30 (Package)`: use txid + await-sync capability from durable state/client primitives. -- `31 (Vendor)`: reuse durable-session materialization/terminal handling patterns from `electric-sql/transport` where it matches Superset semantics.