diff --git a/.env.example b/.env.example index db55969a944..1d298563f36 100644 --- a/.env.example +++ b/.env.example @@ -66,10 +66,18 @@ POSTHOG_PROJECT_ID= FREESTYLE_API_KEY= # ----------------------------------------------------------------------------- -# Durable Streams -# ----------------------------------------------------------------------------- -DURABLE_STREAM_URL= -DURABLE_STREAM_AUTH_TOKEN= +# Streams (AI Chat Server) +# ----------------------------------------------------------------------------- +# Desktop app / client-facing +STREAMS_URL=http://localhost:8080 +STREAMS_SECRET= + +# Streams server internals +PORT=8080 +STREAMS_INTERNAL_PORT=8081 +STREAMS_AGENT_PORT=9090 +STREAMS_INTERNAL_URL=http://127.0.0.1:8081 +STREAMS_DATA_DIR=.data # ----------------------------------------------------------------------------- # Sentry Error Tracking diff --git a/.github/templates/preview-comment.md b/.github/templates/preview-comment.md index a1bd37c3d40..84924bc4647 100644 --- a/.github/templates/preview-comment.md +++ b/.github/templates/preview-comment.md @@ -19,6 +19,11 @@ $ELECTRIC_LINK +Fly.io Streams (Fly.io) +$STREAMS_STATUS +$STREAMS_LINK + + Vercel API (Vercel) $API_STATUS $API_LINK diff --git a/.github/workflows/cleanup-preview.yml b/.github/workflows/cleanup-preview.yml index e7b4e732e26..e05abdd6ec2 100644 --- a/.github/workflows/cleanup-preview.yml +++ b/.github/workflows/cleanup-preview.yml @@ -33,6 +33,14 @@ jobs: run: | flyctl apps destroy "superset-electric-pr-${{ github.event.pull_request.number }}" --yes + - name: Delete Streams Fly.io app + id: streams-cleanup + continue-on-error: true + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} + run: | + flyctl apps destroy "superset-stream-pr-${{ github.event.pull_request.number }}" --yes + - name: Update comment if: always() uses: thollander/actions-comment-pull-request@v3 @@ -43,6 +51,7 @@ jobs: The following preview resources have been cleaned up: - ${{ steps.neon-cleanup.outcome == 'success' && '✅' || '⚠️' }} Neon database branch - ${{ steps.electric-cleanup.outcome == 'success' && '✅' || '⚠️' }} Electric Fly.io app + - ${{ steps.streams-cleanup.outcome == 'success' && '✅' || '⚠️' }} Streams Fly.io app Thank you for your contribution! 🎉 comment-tag: "🚀-preview-deployment" diff --git a/.github/workflows/deploy-preview.yml b/.github/workflows/deploy-preview.yml index 8bc77c0ebeb..5f64b6c0b7f 100644 --- a/.github/workflows/deploy-preview.yml +++ b/.github/workflows/deploy-preview.yml @@ -18,6 +18,7 @@ env: ADMIN_ALIAS: admin-pr-${{ github.event.pull_request.number }}-superset.vercel.app DOCS_ALIAS: docs-pr-${{ github.event.pull_request.number }}-superset.vercel.app ELECTRIC_URL: https://superset-electric-pr-${{ github.event.pull_request.number }}.fly.dev/v1/shape + STREAMS_URL: https://superset-stream-pr-${{ github.event.pull_request.number }}.fly.dev jobs: deploy-database: @@ -120,6 +121,36 @@ jobs: name: electric-status path: electric-status.env + deploy-streams-preview: + name: Deploy Streams (Fly.io) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Deploy Streams preview to Fly.io + uses: superfly/fly-pr-review-apps@1.3.0 + with: + name: superset-stream-pr-${{ github.event.pull_request.number }} + region: iad + org: ${{ vars.FLY_ORG }} + config: apps/streams/fly.toml + secrets: | + ANTHROPIC_API_KEY=${{ secrets.ANTHROPIC_API_KEY }} + STREAMS_SECRET=${{ secrets.STREAMS_SECRET }} + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} + - name: Save streams status + run: | + cat > streams-status.env << EOF + STREAMS_STATUS="✅" + STREAMS_LINK="View App" + EOF + - name: Upload streams status + uses: actions/upload-artifact@v4 + with: + name: streams-status + path: streams-status.env + deploy-api: name: Deploy API runs-on: ubuntu-latest @@ -649,7 +680,7 @@ jobs: name: Post Deployment Comment runs-on: ubuntu-latest if: always() - needs: [deploy-database, deploy-electric, deploy-api, deploy-web, deploy-marketing, deploy-admin, deploy-docs] + needs: [deploy-database, deploy-electric, deploy-streams-preview, deploy-api, deploy-web, deploy-marketing, deploy-admin, deploy-docs] permissions: contents: read pull-requests: write @@ -670,6 +701,8 @@ jobs: DATABASE_LINK="Failed to create" ELECTRIC_STATUS="❌" ELECTRIC_LINK="Failed to deploy" + STREAMS_STATUS="❌" + STREAMS_LINK="Failed to deploy" API_STATUS="❌" API_LINK="Failed to deploy" WEB_STATUS="❌" @@ -689,6 +722,10 @@ jobs: source electric-status.env fi + if [[ "${{ needs.deploy-streams-preview.result }}" == "success" ]]; then + source streams-status.env + fi + if [[ "${{ needs.deploy-api.result }}" == "success" ]]; then source api-status.env fi @@ -709,7 +746,7 @@ jobs: source docs-status.env fi - export DATABASE_STATUS DATABASE_LINK ELECTRIC_STATUS ELECTRIC_LINK API_STATUS API_LINK WEB_STATUS WEB_LINK MARKETING_STATUS MARKETING_LINK ADMIN_STATUS ADMIN_LINK DOCS_STATUS DOCS_LINK + export DATABASE_STATUS DATABASE_LINK ELECTRIC_STATUS ELECTRIC_LINK STREAMS_STATUS STREAMS_LINK API_STATUS API_LINK WEB_STATUS WEB_LINK MARKETING_STATUS MARKETING_LINK ADMIN_STATUS ADMIN_LINK DOCS_STATUS DOCS_LINK envsubst < .github/templates/preview-comment.md > final-comment.md - name: Post final deployment comment diff --git a/.github/workflows/deploy-production.yml b/.github/workflows/deploy-production.yml index 3e2616fed5c..c47de7b25eb 100644 --- a/.github/workflows/deploy-production.yml +++ b/.github/workflows/deploy-production.yml @@ -398,6 +398,28 @@ jobs: --env STRIPE_PRO_YEARLY_PRICE_ID=$STRIPE_PRO_YEARLY_PRICE_ID \ --env SLACK_BILLING_WEBHOOK_URL=$SLACK_BILLING_WEBHOOK_URL + deploy-streams: + name: Deploy Streams to Fly.io + runs-on: ubuntu-latest + environment: production + + steps: + - uses: actions/checkout@v4 + - uses: superfly/flyctl-actions/setup-flyctl@master + - name: Stage secrets + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} + run: | + flyctl secrets set \ + ANTHROPIC_API_KEY="${{ secrets.ANTHROPIC_API_KEY }}" \ + STREAMS_SECRET="${{ secrets.STREAMS_SECRET }}" \ + --app superset-stream \ + --stage + - name: Deploy to Fly.io + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} + run: flyctl deploy --config apps/streams/fly.toml --remote-only + deploy-docs: name: Deploy Docs to Vercel runs-on: ubuntu-latest diff --git a/.superset/setup.sh b/.superset/setup.sh index 65e98ee60e4..8ee4ac394f6 100755 --- a/.superset/setup.sh +++ b/.superset/setup.sh @@ -299,6 +299,16 @@ step_start_electric() { return 0 } +step_setup_streams() { + echo "🔄 Setting up Streams secret..." + + STREAMS_SECRET=$(openssl rand -hex 32) + export STREAMS_SECRET + + success "Streams secret generated" + return 0 +} + step_write_env() { echo "📝 Writing .env file..." @@ -341,6 +351,10 @@ step_write_env() { if [ -n "${ELECTRIC_SECRET:-}" ]; then echo "ELECTRIC_SECRET=$ELECTRIC_SECRET" fi + + echo "" + echo "# Workspace Streams (AI Chat Server)" + echo "STREAMS_SECRET=$STREAMS_SECRET" } >> .env success "Workspace .env written" @@ -376,7 +390,12 @@ main() { step_failed "Start Electric SQL" fi - # Step 6: Write .env file + # Step 6: Setup Streams secret + if ! step_setup_streams; then + step_failed "Setup Streams secret" + fi + + # Step 7: Write .env file if ! step_write_env; then step_failed "Write .env file" fi diff --git a/apps/desktop/src/lib/trpc/routers/ai-chat/index.ts b/apps/desktop/src/lib/trpc/routers/ai-chat/index.ts index 2d6acb51a9d..b1b65ddd410 100644 --- a/apps/desktop/src/lib/trpc/routers/ai-chat/index.ts +++ b/apps/desktop/src/lib/trpc/routers/ai-chat/index.ts @@ -2,6 +2,7 @@ import { existsSync, readdirSync, readFileSync } from "node:fs"; import { homedir } from "node:os"; import { join } from "node:path"; import { observable } from "@trpc/server/observable"; +import { env } from "main/env.main"; import { z } from "zod"; import { publicProcedure, router } from "../.."; import { @@ -60,11 +61,8 @@ function scanCustomCommands(cwd: string): CommandEntry[] { export const createAiChatRouter = () => { return router({ getConfig: publicProcedure.query(() => ({ - proxyUrl: process.env.DURABLE_STREAM_URL || "http://localhost:8080", - authToken: - process.env.DURABLE_STREAM_AUTH_TOKEN || - process.env.DURABLE_STREAM_TOKEN || - null, + proxyUrl: env.STREAMS_URL, + authToken: env.STREAMS_SECRET, })), getSlashCommands: publicProcedure diff --git a/apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts b/apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts index 34779449fc0..87f8aa5ad06 100644 --- a/apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts +++ b/apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts @@ -1,10 +1,10 @@ import { EventEmitter } from "node:events"; +import { env } from "main/env.main"; import type { AgentProvider } from "../agent-provider"; import type { SessionStore } from "../session-store"; -const PROXY_URL = process.env.DURABLE_STREAM_URL || "http://localhost:8080"; -const DURABLE_STREAM_AUTH_TOKEN = - process.env.DURABLE_STREAM_AUTH_TOKEN || process.env.DURABLE_STREAM_TOKEN; +const PROXY_URL = env.STREAMS_URL; +const STREAMS_SECRET = env.STREAMS_SECRET; /** * Set, clear, or skip a field on a body template. @@ -26,13 +26,10 @@ function applyBodyField( } function buildProxyHeaders(): Record { - const headers: Record = { + return { "Content-Type": "application/json", + Authorization: `Bearer ${STREAMS_SECRET}`, }; - if (DURABLE_STREAM_AUTH_TOKEN) { - headers.Authorization = `Bearer ${DURABLE_STREAM_AUTH_TOKEN}`; - } - return headers; } export interface SessionStartEvent { diff --git a/apps/desktop/src/main/env.main.ts b/apps/desktop/src/main/env.main.ts index 56228b05716..7522d607594 100644 --- a/apps/desktop/src/main/env.main.ts +++ b/apps/desktop/src/main/env.main.ts @@ -21,6 +21,8 @@ export const env = createEnv({ NEXT_PUBLIC_POSTHOG_KEY: z.string().optional(), NEXT_PUBLIC_POSTHOG_HOST: z.string().default("https://us.i.posthog.com"), SENTRY_DSN_DESKTOP: z.string().optional(), + STREAMS_URL: z.url(), + STREAMS_SECRET: z.string().min(1), }, runtimeEnv: { @@ -35,6 +37,8 @@ export const env = createEnv({ NEXT_PUBLIC_POSTHOG_KEY: process.env.NEXT_PUBLIC_POSTHOG_KEY, NEXT_PUBLIC_POSTHOG_HOST: process.env.NEXT_PUBLIC_POSTHOG_HOST, SENTRY_DSN_DESKTOP: process.env.SENTRY_DSN_DESKTOP, + STREAMS_URL: process.env.STREAMS_URL, + STREAMS_SECRET: process.env.STREAMS_SECRET, }, emptyStringAsUndefined: true, // Only allow skipping validation in development (never in production) diff --git a/apps/streams/package.json b/apps/streams/package.json index 290e8ae1ceb..a620458a08b 100644 --- a/apps/streams/package.json +++ b/apps/streams/package.json @@ -18,6 +18,7 @@ "@durable-streams/server": "^0.2.0", "@hono/node-server": "^1.13.0", "@superset/durable-session": "workspace:*", + "@t3-oss/env-core": "^0.13.8", "@tanstack/ai": "^0.3.0", "@tanstack/db": "0.5.22", "hono": "^4.4.0", diff --git a/apps/streams/src/claude-agent.ts b/apps/streams/src/claude-agent.ts index 2e287ce45d3..743a1c1550d 100644 --- a/apps/streams/src/claude-agent.ts +++ b/apps/streams/src/claude-agent.ts @@ -102,8 +102,6 @@ app.post("/", async (c) => { const queryEnv: Record = { ...baseEnv }; queryEnv.CLAUDE_CODE_ENTRYPOINT = "sdk-ts"; - const binaryPath = process.env.CLAUDE_BINARY_PATH; - const hooks = notification ? buildNotificationHooks({ notification }) : undefined; @@ -164,7 +162,7 @@ app.post("/", async (c) => { options: { ...(claudeSessionId && { resume: claudeSessionId }), ...(cwd && { cwd }), - model: model ?? process.env.CLAUDE_MODEL ?? DEFAULT_MODEL, + model: model ?? DEFAULT_MODEL, maxTurns: MAX_AGENT_TURNS, includePartialMessages: true, permissionMode: resolvedPermissionMode as @@ -173,7 +171,6 @@ app.post("/", async (c) => { | "bypassPermissions", settingSources: ["user", "project", "local"], systemPrompt: { type: "preset" as const, preset: "claude_code" as const }, - ...(binaryPath && { pathToClaudeCodeExecutable: binaryPath }), env: queryEnv, abortController, ...(hooks && { hooks }), @@ -361,7 +358,6 @@ app.get("/health", (c) => { return c.json({ status: "ok", agent: "claude", - hasBinary: !!process.env.CLAUDE_BINARY_PATH, activeSessions: getActiveSessionCount(), }); }); diff --git a/apps/streams/src/claude-session-store.ts b/apps/streams/src/claude-session-store.ts index 12cff6f9077..2ae071c529c 100644 --- a/apps/streams/src/claude-session-store.ts +++ b/apps/streams/src/claude-session-store.ts @@ -1,6 +1,6 @@ import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; -import { homedir } from "node:os"; import { join } from "node:path"; +import { env } from "./env"; const SESSION_MAX_SIZE = 1000; const SESSION_TTL_MS = 24 * 60 * 60 * 1000; @@ -12,9 +12,7 @@ interface SessionEntry { const claudeSessions = new Map(); -const SESSIONS_DIR = - process.env.DURABLE_STREAMS_DATA_DIR ?? - join(homedir(), ".superset", "chat-streams"); +const SESSIONS_DIR = env.STREAMS_DATA_DIR; const SESSIONS_FILE = join(SESSIONS_DIR, "claude-sessions.json"); function loadPersistedSessions(): void { diff --git a/apps/streams/src/env.ts b/apps/streams/src/env.ts new file mode 100644 index 00000000000..c3d02164f42 --- /dev/null +++ b/apps/streams/src/env.ts @@ -0,0 +1,19 @@ +import { createEnv } from "@t3-oss/env-core"; +import { z } from "zod"; + +export const env = createEnv({ + server: { + PORT: z.coerce.number(), + STREAMS_INTERNAL_PORT: z.coerce.number(), + STREAMS_AGENT_PORT: z.coerce.number(), + STREAMS_INTERNAL_URL: z.string().url(), + STREAMS_DATA_DIR: z.string().min(1), + STREAMS_SECRET: z.string().min(1), + ANTHROPIC_API_KEY: z.string().min(1), + }, + clientPrefix: "PUBLIC_", + client: {}, + runtimeEnv: process.env, + emptyStringAsUndefined: true, + skipValidation: !!process.env.SKIP_ENV_VALIDATION, +}); diff --git a/apps/streams/src/index.ts b/apps/streams/src/index.ts index b1facc5cfff..a456880da14 100644 --- a/apps/streams/src/index.ts +++ b/apps/streams/src/index.ts @@ -1,44 +1,36 @@ import { existsSync, mkdirSync } from "node:fs"; -import { homedir } from "node:os"; -import { join } from "node:path"; import { DurableStreamTestServer } from "@durable-streams/server"; import { serve } from "@hono/node-server"; import { claudeAgentApp } from "./claude-agent"; +import { env } from "./env"; import { createServer } from "./server"; -const PORT = parseInt(process.env.PORT ?? "8080", 10); -const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? "8082", 10); -const AGENT_PORT = parseInt(process.env.CLAUDE_AGENT_PORT ?? "9090", 10); -const DURABLE_STREAMS_URL = - process.env.DURABLE_STREAMS_URL ?? `http://127.0.0.1:${INTERNAL_PORT}`; - -const DATA_DIR = - process.env.DURABLE_STREAMS_DATA_DIR ?? - join(homedir(), ".superset", "chat-streams"); - -if (!existsSync(DATA_DIR)) { - mkdirSync(DATA_DIR, { recursive: true }); +if (!existsSync(env.STREAMS_DATA_DIR)) { + mkdirSync(env.STREAMS_DATA_DIR, { recursive: true }); } const durableStreamServer = new DurableStreamTestServer({ - port: INTERNAL_PORT, - dataDir: DATA_DIR, + port: env.STREAMS_INTERNAL_PORT, + dataDir: env.STREAMS_DATA_DIR, }); await durableStreamServer.start(); -console.log(`[streams] Durable stream server on port ${INTERNAL_PORT}`); +console.log( + `[streams] Durable stream server on port ${env.STREAMS_INTERNAL_PORT}`, +); const { app } = createServer({ - baseUrl: DURABLE_STREAMS_URL, + baseUrl: env.STREAMS_INTERNAL_URL, cors: true, logging: true, + authToken: env.STREAMS_SECRET, }); -const proxyServer = serve({ fetch: app.fetch, port: PORT }, (info) => { +const proxyServer = serve({ fetch: app.fetch, port: env.PORT }, (info) => { console.log(`[streams] Proxy running on http://localhost:${info.port}`); }); const agentServer = serve( - { fetch: claudeAgentApp.fetch, port: AGENT_PORT }, + { fetch: claudeAgentApp.fetch, port: env.STREAMS_AGENT_PORT }, (info) => { console.log( `[streams] Claude agent endpoint on http://localhost:${info.port}`, @@ -46,9 +38,11 @@ const agentServer = serve( }, ); -process.on("SIGINT", async () => { - proxyServer.close(); - agentServer.close(); - await durableStreamServer.stop(); - process.exit(0); -}); +for (const signal of ["SIGINT", "SIGTERM"]) { + process.on(signal, async () => { + proxyServer.close(); + agentServer.close(); + await durableStreamServer.stop(); + process.exit(0); + }); +} diff --git a/apps/streams/src/server.ts b/apps/streams/src/server.ts index 9f49ae2f1d2..6b8fcd25fa2 100644 --- a/apps/streams/src/server.ts +++ b/apps/streams/src/server.ts @@ -29,6 +29,8 @@ export interface AIDBProxyServerOptions extends AIDBProtocolOptions { logging?: boolean; /** Custom CORS origins */ corsOrigins?: string | string[]; + /** If set, require Bearer token on /v1/* routes */ + authToken?: string; } export function createServer(options: AIDBProxyServerOptions) { @@ -64,9 +66,21 @@ export function createServer(options: AIDBProxyServerOptions) { app.use("*", logger()); } - // Health routes + // Health routes (no auth) app.route("/health", createHealthRoutes()); + // Auth middleware on /v1/* routes + if (options.authToken) { + const expectedHeader = `Bearer ${options.authToken}`; + app.use("/v1/*", async (c, next) => { + const authorization = c.req.header("Authorization"); + if (authorization !== expectedHeader) { + return c.json({ error: "Unauthorized" }, 401); + } + return next(); + }); + } + // API v1 routes const v1 = new Hono(); diff --git a/bun.lock b/bun.lock index cde2a392f1b..d5532ec666d 100644 --- a/bun.lock +++ b/bun.lock @@ -488,6 +488,7 @@ "@durable-streams/server": "^0.2.0", "@hono/node-server": "^1.13.0", "@superset/durable-session": "workspace:*", + "@t3-oss/env-core": "^0.13.8", "@tanstack/ai": "^0.3.0", "@tanstack/db": "0.5.22", "hono": "^4.4.0", diff --git a/docs/ai-chat-plan.md b/docs/ai-chat-plan.md index df808d59d3f..00b6175b806 100644 --- a/docs/ai-chat-plan.md +++ b/docs/ai-chat-plan.md @@ -352,38 +352,36 @@ import type { SessionDB, MessageRow, ModelMessage } from '@superset/durable-sess #### New entrypoint: `apps/streams/src/index.ts` -Based on vendored `dev.ts` pattern, combined with existing DurableStreamTestServer: +Based on vendored `dev.ts` pattern, combined with existing DurableStreamTestServer. All env vars are validated via `env.ts` (required, no defaults). ```typescript -import { serve } from '@hono/node-server' import { DurableStreamTestServer } from '@durable-streams/server' +import { serve } from '@hono/node-server' +import { claudeAgentApp } from './claude-agent' +import { env } from './env' import { createServer } from './server' -const PORT = parseInt(process.env.PORT ?? '8080', 10) -const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? '8081', 10) -const DURABLE_STREAMS_URL = process.env.DURABLE_STREAMS_URL ?? `http://127.0.0.1:${INTERNAL_PORT}` - -// Start internal durable stream server -const durableStreamServer = new DurableStreamTestServer({ port: INTERNAL_PORT }) +const durableStreamServer = new DurableStreamTestServer({ + port: env.STREAMS_INTERNAL_PORT, + dataDir: env.STREAMS_DATA_DIR, +}) await durableStreamServer.start() -console.log(`[streams] Durable stream server on port ${INTERNAL_PORT}`) -// Start proxy server const { app } = createServer({ - baseUrl: DURABLE_STREAMS_URL, + baseUrl: env.STREAMS_INTERNAL_URL, cors: true, logging: true, + authToken: env.STREAMS_SECRET, }) -serve({ fetch: app.fetch, port: PORT }, (info) => { - console.log(`[streams] Proxy running on http://localhost:${info.port}`) -}) +serve({ fetch: app.fetch, port: env.PORT }) +serve({ fetch: claudeAgentApp.fetch, port: env.STREAMS_AGENT_PORT }) -// Graceful shutdown -process.on('SIGINT', async () => { - await durableStreamServer.stop() - process.exit(0) -}) +for (const signal of ['SIGINT', 'SIGTERM']) { + process.on(signal, async () => { + /* graceful shutdown */ + }) +} ``` #### Key Protocol Internals (`protocol.ts`, ~917 lines) @@ -450,12 +448,9 @@ app.post('/', async (c) => { prompt, options: { ...(claudeSessionId && { resume: claudeSessionId }), - model: process.env.CLAUDE_MODEL ?? 'claude-sonnet-4-5-20250929', + model: 'claude-sonnet-4-5-20250929', maxTurns: 25, - allowedTools: ['computer', 'bash', 'edit', 'browser'], }, - executable: process.env.CLAUDE_BINARY_PATH, - env: buildClaudeEnv(), // Auth env vars from desktop abortSignal: c.req.raw.signal, }) @@ -512,10 +507,6 @@ await protocol.registerAgent(sessionId, { **Session state:** Maintains `Map` for multi-turn resume. -**Binary path:** From `CLAUDE_BINARY_PATH` env var (set by desktop app when starting streams process). - -**Auth:** From `CLAUDE_AUTH_*` env vars forwarded from desktop process via `buildClaudeEnv()`. - **Abort handling:** When proxy calls `stopGeneration()`, it aborts the fetch to this endpoint. The agent detects the abort via `c.req.raw.signal` and the `query()` call is interrupted. ### B2. Create `apps/streams/src/sdk-to-ai-chunks.ts` @@ -637,7 +628,7 @@ import { useDurableChat, ChatInput, PresenceBar } from '@superset/durable-sessio **New session manager** — thin HTTP orchestrator: ```typescript -const PROXY_URL = process.env.DURABLE_STREAM_URL ?? 'http://localhost:8080' +const PROXY_URL = process.env.STREAMS_URL || 'http://localhost:8080' export class ClaudeSessionManager extends EventEmitter { private activeSessions = new Map() @@ -797,10 +788,10 @@ apps/desktop/src/renderer/.../ChatPane/ ### Environment variables -| Variable | Default | Description | -|----------|---------|-------------| -| `DURABLE_STREAM_URL` | `http://localhost:8080` | Proxy URL exposed via tRPC `getConfig` query | -| `DURABLE_STREAM_AUTH_TOKEN` | — | Optional Bearer token for authenticated proxy | +| Variable | Description | +|----------|-------------| +| `STREAMS_URL` | Proxy URL exposed via tRPC `getConfig` query | +| `STREAMS_SECRET` | Bearer token for authenticated proxy | --- @@ -843,20 +834,21 @@ Web uses same `useDurableChat` hook pointing at deployed proxy URL. ## Environment Variables +All env vars are required — the streams server throws at startup if any are missing. + ```bash -# Desktop -DURABLE_STREAM_URL=http://localhost:8080 # Proxy URL (local dev) -CLAUDE_BINARY_PATH=... # Set by desktop when starting streams -CLAUDE_AUTH_TOKEN=... # Forwarded from desktop auth - -# Streams server -PORT=8080 # Proxy port -DURABLE_STREAMS_URL=http://127.0.0.1:8081 # Internal durable stream server -CLAUDE_BINARY_PATH=... # Path to claude binary -CLAUDE_MODEL=claude-sonnet-4-5-20250929 # Default model - -# Production -DURABLE_STREAM_URL=https://stream.superset.sh +# Streams server (apps/streams) +PORT=8080 # Proxy port (set by Fly.io in production) +STREAMS_INTERNAL_PORT=8081 # Internal durable stream server port +STREAMS_AGENT_PORT=9090 # Claude agent endpoint port +STREAMS_INTERNAL_URL=http://127.0.0.1:8081 # Internal durable stream server URL +STREAMS_DATA_DIR=/data # Data directory for LMDB + session persistence +STREAMS_SECRET= # Bearer token for /v1/* route auth +ANTHROPIC_API_KEY=sk-ant-... # Claude API key + +# Desktop (apps/desktop) — validated in env.main.ts, required +STREAMS_URL=http://localhost:8080 # Proxy URL exposed via tRPC getConfig +STREAMS_SECRET= # Bearer token for authenticated proxy ``` --- @@ -957,7 +949,7 @@ All files below are created and typechecking. Compatibility fix: `DurableStream. | File | Changes | Status | |---|---|---| | `apps/streams/package.json` | Added: @anthropic-ai/claude-agent-sdk, @tanstack/ai | ✅ | -| `apps/streams/src/index.ts` | Added: Claude agent endpoint on CLAUDE_AGENT_PORT (default 9090) | ✅ | +| `apps/streams/src/index.ts` | Added: Claude agent endpoint on STREAMS_AGENT_PORT | ✅ | --- @@ -984,7 +976,7 @@ All files below are created and typechecking. Compatibility fix: `DurableStream. | `@tanstack/db` unreleased aggregates | Build breaks | Rewrite collection pipelines with `groupBy + count + fn.select` workaround | ✅ Resolved — `collect`/`minStr` replaced | | SDKMessage → AI chunk conversion errors | Broken rendering | Comprehensive unit tests with real Claude output fixtures | Pending (Phase B) | | Dual `StreamChunk` types | Type confusion, silent mismatches at module boundaries | `sdk-to-ai-chunks.ts` imports strict `StreamChunk` from `@tanstack/ai` (union of 14 AG-UI events). `types.ts` defines a loose `{ type: string; [key: string]: unknown }` used by `protocol.ts` and `stream-writer.ts`. Works at runtime because JSON serialization is the boundary, but `protocol.ts` gets zero type safety when constructing/consuming chunks. **Fix:** delete local `StreamChunk` from `types.ts`, use `@tanstack/ai`'s everywhere, replace `as StreamChunk` casts in `protocol.ts` with typed construction (~10 call sites). | Deferred — cleanup PR | -| Claude binary path outside Electron | Agent can't start | `CLAUDE_BINARY_PATH` env var set by desktop at streams startup | Pending | +| Claude binary path outside Electron | Agent can't start | Claude agent SDK resolves binary automatically | ✅ Resolved — CLAUDE_BINARY_PATH removed | | Multi-turn resume state lost on restart | Context lost | In-memory map + optional file-based persistence in data dir | Pending | | Interrupt via HTTP abort | Claude subprocess continues | Agent detects fetch abort → calls `query.interrupt()` + `abortController.abort()` | Pending | | Proxy `workspace:*` TanStack DB deps | Import errors | Pin all `@tanstack/*` to compatible published versions across monorepo | ✅ Resolved — imports changed to `@superset/durable-session`, `DurableStream.append()` wrapped with `JSON.stringify()` | diff --git a/docs/productionize-chat.md b/docs/productionize-chat.md new file mode 100644 index 00000000000..a8f11a636b2 --- /dev/null +++ b/docs/productionize-chat.md @@ -0,0 +1,474 @@ +# Productionize Chat GUI + +Full plan to take the AI chat from prototype to production, following established deployment patterns. + +## Current State + +| Component | Location | Status | +|-----------|----------|--------| +| Streams server (Hono + Durable Streams) | `apps/streams/` | Built, `fly.toml` exists, **not in CI/CD** | +| Durable session client + `useDurableChat` | `packages/durable-session/` | Built | +| Claude agent endpoint | `apps/streams/src/claude-agent.ts` | Built | +| AI element components (39+) | `packages/ui/src/components/ai-elements/` | Built | +| Desktop chat tRPC router | `apps/desktop/src/lib/trpc/routers/ai-chat/` | Built | +| Desktop chat renderer UI | `apps/desktop/.../ChatPane/` | Built | +| Web app chat UI | `apps/web/` | **Not built** | +| Auth on streams endpoints | `apps/streams/` | **Not built** | +| Chat history DB persistence | `packages/db/src/schema/` | **Not built** | +| Streams in production CI/CD | `.github/workflows/` | **Not built** | +| Streams in preview CI/CD | `.github/workflows/` | **Not built** | +| Observability (Sentry/PostHog) | `apps/streams/` | **Not built** | + +## Established Deployment Patterns + +These are the patterns already used in the repo. Chat should follow the same conventions. + +| Concern | Pattern | Example | +|---------|---------|---------| +| Next.js apps | **Vercel** via `vercel deploy --prod --prebuilt` | `deploy-production.yml` | +| Stateful/streaming services | **Fly.io** via `fly deploy` or `superfly/fly-pr-review-apps` | `fly.toml` (ElectricSQL) | +| Database | **Neon PostgreSQL** with per-PR branch | `neondatabase/create-branch-action@v6` | +| Secrets | **GitHub Secrets** per environment (`production`, `preview`) | All workflows | +| Error tracking | **Sentry** (`@sentry/nextjs`, per-app DSN) | Web, API, Marketing, Admin, Docs, Desktop | +| Analytics | **PostHog** (`posthog-js` client, `posthog-node` server) | Web, Admin | +| CI | **GitHub Actions** — sherif, lint, test, typecheck, build | `ci.yml` | +| Preview | Full isolated env per PR (Neon branch + Electric + all Vercel apps) | `deploy-preview.yml` | +| Cleanup | Delete preview resources on PR close | `cleanup-preview.yml` | + +--- + +## Phase 1: Deploy Streams Server to Production + +**Goal:** Get `apps/streams` reliably running on Fly.io with CI/CD. + +### 1.1 Add `deploy-streams` job to `deploy-production.yml` + +The streams server already has `fly.toml` (`app = "superset-stream"`, region `iad`, port 8080). Add a deploy job that follows the same pattern as ElectricSQL. + +```yaml +# .github/workflows/deploy-production.yml +deploy-streams: + name: Deploy Streams to Fly.io + runs-on: ubuntu-latest + environment: production + + steps: + - uses: actions/checkout@v4 + + - uses: superfly/flyctl-actions/setup-flyctl@master + + - name: Deploy to Fly.io + run: flyctl deploy --config apps/streams/fly.toml --remote-only + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} + + - name: Set secrets + run: | + flyctl secrets set \ + ANTHROPIC_API_KEY="${{ secrets.ANTHROPIC_API_KEY }}" \ + STREAMS_SECRET="${{ secrets.STREAMS_SECRET }}" \ + --app superset-stream + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} +``` + +**Secrets to add to GitHub:** +- `STREAMS_SECRET` — bearer token for auth (generate a random 64-char string) + +**Env vars to add to `.env.example`:** +- `STREAMS_URL` — production URL (e.g., `https://superset-stream.fly.dev`) +- `STREAMS_SECRET` — bearer token for authenticated requests + +### 1.2 Add streams to preview deployments (`deploy-preview.yml`) + +Follow the ElectricSQL preview pattern with `superfly/fly-pr-review-apps`: + +```yaml +deploy-streams-preview: + name: Deploy Streams (Fly.io) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Deploy Streams preview to Fly.io + uses: superfly/fly-pr-review-apps@1.3.0 + with: + name: superset-stream-pr-${{ github.event.pull_request.number }} + region: iad + org: ${{ vars.FLY_ORG }} + config: apps/streams/fly.toml + secrets: | + ANTHROPIC_API_KEY=${{ secrets.ANTHROPIC_API_KEY }} + STREAMS_SECRET=${{ secrets.STREAMS_SECRET }} + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} +``` + +Add to preview env: +``` +STREAMS_ALIAS: superset-stream-pr-${{ github.event.pull_request.number }}.fly.dev +``` + +### 1.3 Add streams cleanup to `cleanup-preview.yml` + +```yaml +- name: Destroy Streams Fly.io app + uses: superfly/fly-pr-review-apps@1.3.0 + with: + name: superset-stream-pr-${{ github.event.pull_request.number }} + org: ${{ vars.FLY_ORG }} + env: + FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }} +``` + +### 1.4 Verify `fly.toml` is production-ready + +Current config is reasonable. Consider bumping resources for production: + +```toml +# apps/streams/fly.toml +[[vm]] + memory = "512mb" # bump from 256mb for production + cpu_kind = "shared" + cpus = 1 +``` + +The `auto_stop_machines = "stop"` + `auto_start_machines = true` + `min_machines_running = 1` config is correct — one machine always warm, extras auto-scale. + +--- + +## Phase 2: Auth on Streams Endpoints + +**Goal:** Prevent unauthorized access to chat sessions. + +### 2.1 Bearer token middleware + +Add a Hono middleware in `apps/streams/src/server.ts` that validates the `Authorization: Bearer ` header against `STREAMS_SECRET`. This is the simplest approach that works for both desktop and web clients. + +``` +Request flow: + Client → Bearer token in header → Streams server validates → Allow/Deny +``` + +**Scope:** All `/v1/sessions/*` routes. Exclude `/health`. + +### 2.2 Per-user session isolation (future) + +For multi-tenant production, sessions should be scoped to authenticated users. This requires: +1. Pass the user's session token (from Better Auth) to the streams server +2. Streams server validates the token against the API (`apps/api`) +3. Session IDs are prefixed/scoped per user + +This can be a follow-up — bearer token auth is sufficient for initial launch. + +--- + +## Phase 3: Web App Chat UI + +**Goal:** Add chat to `apps/web` using the same components and hooks already built. + +### 3.1 Chat route + +Create a chat page in the web app. The exact route depends on product decisions (standalone `/chat` page vs. embedded panel), but the wiring is the same: + +``` +apps/web/src/app/(app)/chat/ +├── page.tsx # Chat page +├── components/ +│ └── ChatView/ +│ ├── ChatView.tsx # Main chat container +│ └── index.ts +└── hooks/ + └── useChatSession/ + ├── useChatSession.ts # Session lifecycle + └── index.ts +``` + +### 3.2 Wire up `useDurableChat` + +The hook from `packages/durable-session` is client-agnostic. Connect it to the streams server: + +```typescript +const { messages, sendMessage, isLoading, stop } = useDurableChat({ + sessionId, + proxyUrl: env.NEXT_PUBLIC_STREAMS_URL, + autoConnect: true, + stream: { + headers: { Authorization: `Bearer ${authToken}` }, + }, +}); +``` + +### 3.3 Reuse `packages/ui/src/components/ai-elements/` + +The 39+ AI element components are already published from `packages/ui`. Import and compose: + +- `conversation.tsx` — message list container +- `message.tsx` — individual messages +- `prompt-input.tsx` — input with file attachment +- `tool-call.tsx`, `bash-tool.tsx`, etc. — tool rendering +- `reasoning.tsx` — extended thinking display +- `model-selector.tsx` — model picker + +### 3.4 Add `NEXT_PUBLIC_STREAMS_URL` to web deployment + +Add to `deploy-production.yml` `deploy-web` job and `deploy-preview.yml`: + +``` +NEXT_PUBLIC_STREAMS_URL=https://superset-stream.fly.dev # production +NEXT_PUBLIC_STREAMS_URL=https://superset-stream-pr-{N}.fly.dev # preview +``` + +--- + +## Phase 4: Chat History Persistence + +**Goal:** Persist completed chat sessions to PostgreSQL so users can browse history. + +### 4.1 Database schema + +Add tables to `packages/db/src/schema/`: + +```typescript +// chat-sessions table +export const chatSessions = pgTable("chat_sessions", { + id: text("id").primaryKey(), // durable stream session ID + userId: text("user_id").notNull().references(() => users.id), + workspaceId: text("workspace_id"), // optional, for workspace-scoped chats + title: text("title"), // auto-generated or user-edited + model: text("model"), // e.g. "claude-sonnet-4-5-20250929" + messageCount: integer("message_count").default(0), + createdAt: timestamp("created_at").defaultNow().notNull(), + updatedAt: timestamp("updated_at").defaultNow().notNull(), +}); + +// chat-messages table (optional — only if search/export needed) +export const chatMessages = pgTable("chat_messages", { + id: text("id").primaryKey(), // message ID from durable stream + sessionId: text("session_id").notNull().references(() => chatSessions.id), + role: text("role").notNull(), // "user" | "assistant" + content: text("content"), // plain text content + parts: jsonb("parts"), // full TanStack AI MessagePart[] + createdAt: timestamp("created_at").defaultNow().notNull(), +}); +``` + +### 4.2 Persist on session end + +When a chat session completes (all generations done), the streams server writes the session summary to the database. Two approaches: + +**Option A: Webhook from streams to API.** Streams server POSTs to `apps/api` on session end. API writes to DB. This keeps DB access in the API layer. + +**Option B: Direct write from streams.** Streams server connects to Neon directly. Simpler, but requires `DATABASE_URL` in streams env. + +Recommend **Option A** — follows existing separation of concerns (API owns DB writes). + +### 4.3 Chat history API + +Add a tRPC router in `apps/api` for listing/searching chat sessions: + +``` +chatSession.list → paginated list for current user +chatSession.get → single session with messages +chatSession.rename → update title +chatSession.delete → soft delete +``` + +--- + +## Phase 5: Observability + +**Goal:** Match the monitoring level of other production services. + +### 5.1 Sentry for streams server + +Add `@sentry/bun` (or `@sentry/node`) to `apps/streams`: + +```typescript +import * as Sentry from "@sentry/bun"; + +Sentry.init({ + dsn: process.env.SENTRY_DSN_STREAMS, + environment: process.env.NODE_ENV, + tracesSampleRate: 0.1, +}); +``` + +**Secret to add:** `SENTRY_DSN_STREAMS` — create a new Sentry project under `superset-sh` org. + +### 5.2 PostHog events + +Track key chat events server-side from streams: + +| Event | Properties | +|-------|------------| +| `chat_session_started` | `sessionId`, `model`, `userId` | +| `chat_message_sent` | `sessionId`, `role`, `messageLength` | +| `chat_tool_executed` | `sessionId`, `toolName`, `approved` | +| `chat_session_ended` | `sessionId`, `messageCount`, `durationMs` | +| `chat_error` | `sessionId`, `errorType`, `errorMessage` | + +Use `posthog-node` (already a dependency in the monorepo). + +### 5.3 Structured logging + +Replace `console.log` with structured JSON logs following existing patterns: + +```typescript +console.log("[streams/session] Session started:", { sessionId, model, userId }); +console.error("[streams/agent] Claude SDK error:", { sessionId, error: err.message }); +``` + +--- + +## Phase 6: Hardening + +**Goal:** Production safety for user-facing traffic. + +### 6.1 CORS + +Configure Hono CORS middleware in `apps/streams/src/server.ts`: + +```typescript +app.use("*", cors({ + origin: [ + process.env.ALLOWED_ORIGIN_WEB, // e.g. https://app.superset.sh + process.env.ALLOWED_ORIGIN_DESKTOP, // electron:// or localhost for dev + ].filter(Boolean), + credentials: true, +})); +``` + +### 6.2 Rate limiting + +Rate limit at the session/message level: + +| Endpoint | Limit | +|----------|-------| +| `PUT /v1/sessions/:id` (create) | 10/min per user | +| `POST /v1/sessions/:id/messages` (send) | 30/min per session | + +Implement with Upstash Redis (already used in the monorepo via `KV_REST_API_URL`): + +```typescript +import { Ratelimit } from "@upstash/ratelimit"; +import { Redis } from "@upstash/redis"; + +const ratelimit = new Ratelimit({ + redis: Redis.fromEnv(), + limiter: Ratelimit.slidingWindow(30, "1 m"), +}); +``` + +### 6.3 Input validation + +- Max message length (e.g., 100KB) +- Session ID format validation (UUID) +- Model allowlist validation +- Sanitize tool outputs before persisting + +### 6.4 Graceful shutdown + +Handle `SIGTERM` in the streams server for Fly.io rolling deploys: + +```typescript +process.on("SIGTERM", async () => { + console.log("[streams] SIGTERM received, draining connections..."); + // Stop accepting new sessions + // Wait for active generations to complete (with timeout) + // Close durable stream connections + process.exit(0); +}); +``` + +--- + +## Phase 7: Desktop App Updates + +**Goal:** Point desktop chat at production streams server. + +### 7.1 Config resolution + +Desktop tRPC router (`apps/desktop/src/lib/trpc/routers/ai-chat/`) already has a `getConfig()` procedure that returns `{ proxyUrl, authToken }`. Update it to: + +1. Read `STREAMS_URL` from `.env` (loaded in main process) +2. Pass the user's auth token (from Better Auth desktop flow) + +### 7.2 Desktop auto-update + +Desktop canary builds (`release-desktop-canary.yml`) will pick up chat changes automatically since chat code lives in `apps/desktop/src/renderer/`. No workflow changes needed. + +--- + +## Implementation Order + +``` +Phase 1: Deploy Streams (CI/CD) + ├── 1.1 Add deploy-streams to deploy-production.yml + ├── 1.2 Add deploy-streams-preview to deploy-preview.yml + ├── 1.3 Add cleanup to cleanup-preview.yml + └── 1.4 Verify fly.toml resources + +Phase 2: Auth + └── 2.1 Bearer token middleware on streams + +Phase 3: Web Chat UI + ├── 3.1 Chat route in apps/web + ├── 3.2 Wire useDurableChat + ├── 3.3 Compose ai-elements + └── 3.4 Add STREAMS_URL to web deploys + +Phase 4: Persistence + ├── 4.1 DB schema (chatSessions, chatMessages) + ├── 4.2 Session-end webhook → API → DB + └── 4.3 Chat history tRPC router + +Phase 5: Observability + ├── 5.1 Sentry in streams + ├── 5.2 PostHog events + └── 5.3 Structured logging + +Phase 6: Hardening + ├── 6.1 CORS + ├── 6.2 Rate limiting (Upstash) + ├── 6.3 Input validation + └── 6.4 Graceful shutdown + +Phase 7: Desktop updates + ├── 7.1 Config resolution for production URL + └── 7.2 Desktop auto-update (no changes needed) +``` + +Phases 1-2 are prerequisites. Phases 3-7 can largely be parallelized. + +--- + +## Secrets Checklist + +New secrets to add to GitHub (production + preview environments): + +| Secret | Purpose | Where | +|--------|---------|-------| +| `STREAMS_SECRET` | Bearer auth for streams API | `apps/streams`, clients | +| `SENTRY_DSN_STREAMS` | Error tracking for streams | `apps/streams` | +| `NEXT_PUBLIC_STREAMS_URL` | Streams server URL (client-side) | `apps/web` | +| `STREAMS_URL` | Streams server URL (server-side) | `apps/api` (for webhooks) | + +Existing secrets already available: +- `ANTHROPIC_API_KEY` — already in GitHub secrets +- `FLY_API_TOKEN` — already used for ElectricSQL +- `KV_REST_API_URL` / `KV_REST_API_TOKEN` — already used for rate limiting +- `POSTHOG_API_KEY` — already used across apps + +--- + +## Risks & Mitigations + +| Risk | Impact | Mitigation | +|------|--------|------------| +| Fly.io single machine for streams | Downtime during deploys | `min_machines_running = 1` + rolling deploys. Scale to 2 machines if latency matters. | +| LMDB data on Fly.io volumes | Data loss if volume fails | Durable streams are ephemeral by design. Persist completed sessions to PostgreSQL (Phase 4). | +| Anthropic API rate limits | Users blocked from chatting | Per-user rate limiting (Phase 6.2), queue overflow to retry, display user-facing error. | +| Claude Agent SDK instability | Agent crashes mid-conversation | Sentry alerts (Phase 5.1), session resumption (already built into SDK). | +| Cost runaway (Anthropic tokens) | Unexpected bills | Budget limits via `maxBudgetUsd` in agent config (already supported), admin dashboard monitoring. | diff --git a/docs/realign-streams-architecture.md b/docs/realign-streams-architecture.md new file mode 100644 index 00000000000..08887fc762f --- /dev/null +++ b/docs/realign-streams-architecture.md @@ -0,0 +1,209 @@ +# Realign Architecture — SDK Runs Locally on Desktop + +## Context + +The original design has Claude SDK running locally on the desktop (access to user's filesystem, credentials, keychain). The current implementation mistakenly put the SDK on the Fly.io server via `apps/streams/src/claude-agent.ts`, meaning the proxy both manages durable streams AND runs the agent. This breaks when deployed — the SDK on Fly.io can't access the user's local files or credentials. + +**Goal**: Move Claude Agent SDK execution from the streams server to the desktop Electron main process. The streams server becomes a pure durable streams layer (message persistence, SSE fan-out, auth). The desktop runs the SDK locally and writes streaming chunks back to the proxy. + +## New Architecture + +``` +Desktop (Electron main process) +├── Runs query() from @anthropic-ai/claude-agent-sdk locally +├── Converts SDK messages → chunks (sdk-to-ai-chunks.ts) +├── POSTs each chunk to proxy: POST /v1/sessions/:id/chunks +├── Handles permissions/approvals locally via tRPC events +└── Uses user's local credentials (buildClaudeEnv — keychain, config, env) + +Streams Proxy (Fly.io) — pure durable streams +├── Session management (create, delete, fork) +├── Chunk-writing endpoint (NEW — accepts chunks from desktop) +├── SSE fan-out to all clients +├── Auth middleware (Bearer token on /v1/*) +└── Stop generation (writes stop chunk, desktop detects via SSE) +``` + +## Chunk Flow + +``` +1. User sends message → Desktop writes to proxy (existing POST /messages) +2. Desktop runs SDK locally with user's cwd + credentials +3. For each SDK event → convert to StreamChunk → POST /v1/sessions/:id/chunks +4. Proxy writes chunk to durable stream → SSE fan-out to all clients +5. Stop from any client → proxy writes stop chunk → Desktop detects via SSE → aborts SDK +``` + +--- + +## Part A: Streams Server Changes (remove agent, add chunk endpoint) + +### A1. Delete agent-specific files from `apps/streams/src/` + +| File | Reason | +|------|--------| +| `claude-agent.ts` | Moves to desktop | +| `sdk-to-ai-chunks.ts` | Moves to desktop | +| `claude-session-store.ts` | Moves to desktop | +| `notification-hooks.ts` | Moves to desktop (simplified — direct events, no HTTP webhooks) | +| `permission-manager.ts` | Moves to desktop | + +### A2. Clean up `env.ts` + +Remove `ANTHROPIC_API_KEY` and `STREAMS_AGENT_PORT` — no longer needed on server. + +### A3. Clean up `index.ts` + +Remove the agent HTTP server (`agentServer` on `STREAMS_AGENT_PORT`). Keep only the proxy server. + +### A4. Clean up `protocol.ts` + +Remove server-side agent invocation methods: +- `setupReactiveAgentTrigger()` — desktop handles trigger +- `invokeAgent()` — desktop runs SDK directly +- `streamAgentResponse()` — desktop writes chunks via HTTP +- `notifyRegisteredAgents()` — desktop handles trigger + +**Keep**: `writeChunk`, `writeUserMessage`, `writeToolResult`, `writeApprovalResponse`, agent registration methods (future web use), `stopGeneration`, session management, forking. + +### A5. Add chunk-writing route + +New `POST /v1/sessions/:id/chunks` endpoint in routes: + +```typescript +// Accepts: { messageId, actorId, role, chunk, txid? } +// Calls protocol.writeChunk() — reuses existing durable stream write logic +``` + +Also add generation lifecycle endpoints: +``` +POST /v1/sessions/:id/generations/start → { messageId } (creates messageId, tracks active) +POST /v1/sessions/:id/generations/finish → 204 (clears active generation) +``` + +### A6. Update env / CI files + +| File | Change | +|------|--------| +| `.env` | Remove `STREAMS_AGENT_PORT` | +| `.env.example` | Remove `STREAMS_AGENT_PORT` | +| `.github/workflows/deploy-production.yml` | Remove `ANTHROPIC_API_KEY` from `flyctl secrets set` | +| `.github/workflows/deploy-preview.yml` | Remove `ANTHROPIC_API_KEY` from secrets | + +--- + +## Part B: Desktop Changes (add local SDK execution) + +### B1. Create `claude-runner/` module + +New directory: `apps/desktop/src/lib/trpc/routers/ai-chat/utils/claude-runner/` + +| File | Source | Notes | +|------|--------|-------| +| `claude-runner.ts` | New | Core module — calls `query()`, converts chunks, POSTs to proxy | +| `sdk-to-ai-chunks.ts` | From `apps/streams/` | Move as-is — pure conversion logic | +| `claude-session-store.ts` | From `apps/streams/` | Change data dir to `app.getPath('userData')` | +| `permission-manager.ts` | From `apps/streams/` | Move as-is — same in-memory promise pattern | +| `index.ts` | New | Barrel export | + +`notification-hooks.ts` is NOT moved — desktop doesn't need HTTP webhooks to notify itself. SDK hooks can emit events directly via the session manager's EventEmitter. + +### B2. `ClaudeRunner` implementation + +```typescript +export class ClaudeRunner { + async runQuery({ sessionId, cwd, prompt, model, permissionMode, onEvent }): Promise { + // 1. POST /v1/sessions/:id/generations/start → { messageId } + // 2. buildClaudeEnv() for user's local credentials + // 3. Resolve claudeSessionId from local store (for resume) + // 4. query({ prompt, options: { resume, cwd, model, env, canUseTool, ... } }) + // 5. For each SDK message → convert → POST /v1/sessions/:id/chunks + // 6. POST /v1/sessions/:id/generations/finish + } + + interrupt(): void { + // Abort SDK via AbortController + } +} +``` + +Key differences from server-side `claude-agent.ts`: +- No Hono HTTP server — called directly from session manager +- Uses `buildClaudeEnv()` for user's local credentials (already exists) +- Writes chunks via HTTP to proxy (not internal `protocol.writeChunk`) +- Permissions handled locally via tRPC events + +### B3. Update session manager + +`session-manager.ts` changes: +- Remove `AgentProvider` dependency and agent registration calls +- Own a `ClaudeRunner` instance +- `ensureSessionReady()` — only creates session on proxy (`PUT /v1/sessions/:id`), no agent registration +- New method to trigger SDK when user sends message +- Listen for stop chunks from durable stream SSE to abort local runner + +### B4. Delete `agent-provider/` directory + +The entire `agent-provider/` directory is no longer needed: +- `claude-sdk-provider.ts` — replaced by `ClaudeRunner` +- `types.ts` — `AgentProvider`, `AgentRegistration` interfaces no longer used +- `index.ts` — barrel export + +### B5. Update tRPC router + +Add mutations for local permission handling: +- `approveToolUse` — resolves pending permission promise locally +- `answerToolQuestion` — resolves pending question promise locally + +The existing `streamEvents` subscription already handles emitting events to the renderer — `ClaudeRunner` emits approval-requested events through it. + +### B6. Add SDK dependency + +Add `@anthropic-ai/claude-agent-sdk` to `apps/desktop/package.json`. + +--- + +## Permission/Approval Flow (Local) + +Old: SDK → agent endpoint (Fly.io) → SSE to proxy → client → HTTP back → resolve +New: SDK → `canUseTool` callback (local) → tRPC event → renderer UI → tRPC mutation → resolve + +1. SDK calls `canUseTool()` callback on desktop +2. Callback emits tRPC event: `{ type: "approval_requested", toolName, input, toolUseId }` +3. Renderer receives via existing `streamEvents` subscription +4. User approves/denies +5. Renderer calls `approveToolUse` tRPC mutation +6. Mutation resolves local `permissionManager` promise +7. SDK continues +8. Also write approval chunk to proxy so other clients see it + +## Stop Generation Flow + +- **From desktop**: `runner.interrupt()` → abort SDK → write stop chunk to proxy +- **From web/mobile**: `POST /v1/sessions/:id/stop` → proxy writes stop chunk → desktop detects via SSE → `runner.interrupt()` + +--- + +## Verification + +1. **Proxy works as pure durable stream layer:** + ```bash + curl http://localhost:8080/health # 200 + curl -H "Authorization: Bearer $TOKEN" -X PUT http://localhost:8080/v1/sessions/test + curl -H "Authorization: Bearer $TOKEN" -X POST http://localhost:8080/v1/sessions/test/chunks \ + -d '{"messageId":"m1","actorId":"claude","role":"assistant","chunk":{"type":"text-delta","textDelta":"Hi"}}' + ``` + +2. **Desktop runs SDK locally:** + - Start desktop + streams, open chat, send message + - Desktop console shows `[claude/runner] Running query...` + - Chunks appear in durable stream SSE + - Response renders in UI + +3. **Permissions work locally:** + - Set permission mode to "default", trigger tool use + - Approval UI appears, approve → SDK continues + +4. **Stop works across clients:** + - Start generation → stop from desktop → stops + - Start generation → stop via API → desktop detects stop chunk → stops