From 87114cfc8fc5a7e14b24745ef634b31ebf782842 Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Thu, 5 Feb 2026 15:40:50 -0800 Subject: [PATCH 1/4] feat(streams): vendor durable-session-proxy into apps/streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Vendor the Hono-based proxy server from electric-sql/transport into apps/streams/src/ replacing the old custom HTTP proxy + session registry. The proxy handles session management, agent webhook invocation, message routing, stream fan-out, tool results, approvals, and session forking via the STATE-PROTOCOL on top of Durable Streams. Key changes: - Copy 17 files from durable-session-proxy with @electric-sql → @superset imports - Replace index.ts entrypoint with Hono proxy + DurableStreamTestServer - Delete old session-registry.ts - Add hono, @hono/node-server, @durable-streams/client, @tanstack/db, zod deps - Fix DurableStream.append() compatibility (JSON.stringify for ChangeEvent objects) - Fix ForkResult type assertion in vendored client --- apps/streams/package.json | 9 +- apps/streams/src/handlers/index.ts | 7 + apps/streams/src/handlers/invoke-agent.ts | 101 +++ apps/streams/src/handlers/send-message.ts | 57 ++ apps/streams/src/handlers/stream-writer.ts | 91 +++ apps/streams/src/index.ts | 293 +------- apps/streams/src/protocol.ts | 760 +++++++++++++++++++++ apps/streams/src/routes/agents.ts | 35 + apps/streams/src/routes/approvals.ts | 43 ++ apps/streams/src/routes/auth.ts | 123 ++++ apps/streams/src/routes/fork.ts | 37 + apps/streams/src/routes/health.ts | 28 + apps/streams/src/routes/index.ts | 9 + apps/streams/src/routes/messages.ts | 73 ++ apps/streams/src/routes/sessions.ts | 107 +++ apps/streams/src/routes/stream.ts | 116 ++++ apps/streams/src/routes/tool-results.ts | 45 ++ apps/streams/src/server.ts | 123 ++++ apps/streams/src/session-registry.ts | 99 --- apps/streams/src/types.ts | 203 ++++++ bun.lock | 35 +- docs/ai-chat-plan.md | 72 +- packages/durable-session/src/client.ts | 2 +- 23 files changed, 2035 insertions(+), 433 deletions(-) create mode 100644 apps/streams/src/handlers/index.ts create mode 100644 apps/streams/src/handlers/invoke-agent.ts create mode 100644 apps/streams/src/handlers/send-message.ts create mode 100644 apps/streams/src/handlers/stream-writer.ts create mode 100644 apps/streams/src/protocol.ts create mode 100644 apps/streams/src/routes/agents.ts create mode 100644 apps/streams/src/routes/approvals.ts create mode 100644 apps/streams/src/routes/auth.ts create mode 100644 apps/streams/src/routes/fork.ts create mode 100644 apps/streams/src/routes/health.ts create mode 100644 apps/streams/src/routes/index.ts create mode 100644 apps/streams/src/routes/messages.ts create mode 100644 apps/streams/src/routes/sessions.ts create mode 100644 apps/streams/src/routes/stream.ts create mode 100644 apps/streams/src/routes/tool-results.ts create mode 100644 apps/streams/src/server.ts delete mode 100644 apps/streams/src/session-registry.ts create mode 100644 apps/streams/src/types.ts diff --git a/apps/streams/package.json b/apps/streams/package.json index 688b5859e72..5758514b14e 100644 --- a/apps/streams/package.json +++ b/apps/streams/package.json @@ -13,11 +13,16 @@ "test:conformance": "npx @durable-streams/server-conformance-tests --run http://localhost:8080" }, "dependencies": { - "@durable-streams/server": "^0.2.0" + "@durable-streams/client": "^0.2.0", + "@durable-streams/server": "^0.2.0", + "@superset/durable-session": "workspace:*", + "@tanstack/db": "^0.5.22", + "hono": "^4.4.0", + "zod": "^4.1.12" }, "devDependencies": { - "@durable-streams/client": "^0.2.0", "@durable-streams/server-conformance-tests": "^0.2.0", + "@hono/node-server": "^1.13.0", "@superset/typescript": "workspace:*", "@types/node": "^24.9.1", "fast-check": "^4.5.3", diff --git a/apps/streams/src/handlers/index.ts b/apps/streams/src/handlers/index.ts new file mode 100644 index 00000000000..b3e6a7284d4 --- /dev/null +++ b/apps/streams/src/handlers/index.ts @@ -0,0 +1,7 @@ +export { + handleInvokeAgent, + handleRegisterAgents, + handleUnregisterAgent, +} from "./invoke-agent"; +export { handleSendMessage } from "./send-message"; +export { createStreamWriter, StreamWriter } from "./stream-writer"; diff --git a/apps/streams/src/handlers/invoke-agent.ts b/apps/streams/src/handlers/invoke-agent.ts new file mode 100644 index 00000000000..c4c47bc1acf --- /dev/null +++ b/apps/streams/src/handlers/invoke-agent.ts @@ -0,0 +1,101 @@ +import type { Context } from "hono"; +import { z } from "zod"; +import type { AIDBSessionProtocol } from "../protocol"; +import { type AgentSpec, agentSpecSchema } from "../types"; + +const invokeAgentRequestSchema = z.object({ + agent: agentSpecSchema, + messages: z.array( + z.object({ + role: z.string(), + content: z.string(), + }), + ), +}); + +type InvokeAgentRequest = z.infer; + +export async function handleInvokeAgent( + c: Context, + protocol: AIDBSessionProtocol, +): Promise { + const sessionId = c.req.param("sessionId"); + + let body: InvokeAgentRequest; + try { + const rawBody = await c.req.json(); + body = invokeAgentRequestSchema.parse(rawBody); + } catch (error) { + return c.json( + { error: "Invalid request body", details: (error as Error).message }, + 400, + ); + } + + try { + const stream = await protocol.getOrCreateSession(sessionId); + await protocol.invokeAgent(stream, sessionId, body.agent, body.messages); + return c.json({ success: true }, 200); + } catch (error) { + console.error("Failed to invoke agent:", error); + return c.json( + { error: "Failed to invoke agent", details: (error as Error).message }, + 500, + ); + } +} + +export async function handleRegisterAgents( + c: Context, + protocol: AIDBSessionProtocol, +): Promise { + const sessionId = c.req.param("sessionId"); + + let agents: AgentSpec[]; + try { + const rawBody = await c.req.json(); + const parsed = z + .object({ agents: z.array(agentSpecSchema) }) + .parse(rawBody); + agents = parsed.agents; + } catch (error) { + return c.json( + { error: "Invalid request body", details: (error as Error).message }, + 400, + ); + } + + try { + await protocol.getOrCreateSession(sessionId); + await protocol.registerAgents(sessionId, agents); + return c.json({ success: true }, 200); + } catch (error) { + console.error("Failed to register agents:", error); + return c.json( + { error: "Failed to register agents", details: (error as Error).message }, + 500, + ); + } +} + +export async function handleUnregisterAgent( + c: Context, + protocol: AIDBSessionProtocol, +): Promise { + const sessionId = c.req.param("sessionId"); + const agentId = c.req.param("agentId"); + + try { + await protocol.unregisterAgent(sessionId, agentId); + return new Response(null, { status: 204 }); + } catch (error) { + console.error("Failed to unregister agent:", error); + return c.json( + { + error: "Failed to unregister agent", + details: (error as Error).message, + }, + 500, + ); + } +} diff --git a/apps/streams/src/handlers/send-message.ts b/apps/streams/src/handlers/send-message.ts new file mode 100644 index 00000000000..1f9208b4ffe --- /dev/null +++ b/apps/streams/src/handlers/send-message.ts @@ -0,0 +1,57 @@ +import type { Context } from "hono"; +import type { AIDBSessionProtocol } from "../protocol"; +import { + type SendMessageRequest, + type SendMessageResponse, + sendMessageRequestSchema, +} from "../types"; + +export async function handleSendMessage( + c: Context, + protocol: AIDBSessionProtocol, +): Promise { + const sessionId = c.req.param("sessionId"); + + let body: SendMessageRequest; + try { + const rawBody = await c.req.json(); + body = sendMessageRequestSchema.parse(rawBody); + } catch (error) { + return c.json( + { error: "Invalid request body", details: (error as Error).message }, + 400, + ); + } + + const actorId = + body.actorId ?? c.req.header("X-Actor-Id") ?? crypto.randomUUID(); + + const messageId = body.messageId ?? crypto.randomUUID(); + + try { + const stream = await protocol.getOrCreateSession(sessionId); + + await protocol.writeUserMessage( + stream, + sessionId, + messageId, + actorId, + body.content, + body.txid, + ); + + if (body.agent) { + const messageHistory = await protocol.getMessageHistory(sessionId); + protocol.invokeAgent(stream, sessionId, body.agent, messageHistory); + } + + const response: SendMessageResponse = { messageId }; + return c.json(response, 200); + } catch (error) { + console.error("Failed to send message:", error); + return c.json( + { error: "Failed to send message", details: (error as Error).message }, + 500, + ); + } +} diff --git a/apps/streams/src/handlers/stream-writer.ts b/apps/streams/src/handlers/stream-writer.ts new file mode 100644 index 00000000000..19e25d9010e --- /dev/null +++ b/apps/streams/src/handlers/stream-writer.ts @@ -0,0 +1,91 @@ +import type { DurableStream } from "@durable-streams/client"; +import type { AIDBSessionProtocol } from "../protocol"; +import type { StreamChunk } from "../types"; + +type MessageRole = "user" | "assistant" | "system"; + +export class StreamWriter { + constructor( + private readonly protocol: AIDBSessionProtocol, + private readonly stream: DurableStream, + private readonly sessionId: string, + ) {} + + async writeUserMessage( + messageId: string, + actorId: string, + content: string, + txid?: string, + ): Promise { + await this.protocol.writeUserMessage( + this.stream, + this.sessionId, + messageId, + actorId, + content, + txid, + ); + } + + async writeChunk( + messageId: string, + actorId: string, + role: MessageRole, + chunk: StreamChunk, + txid?: string, + ): Promise { + await this.protocol.writeChunk( + this.stream, + this.sessionId, + messageId, + actorId, + role, + chunk, + txid, + ); + } + + async writeToolResult( + messageId: string, + actorId: string, + toolCallId: string, + output: unknown, + error: string | null, + txid?: string, + ): Promise { + await this.protocol.writeToolResult( + this.stream, + this.sessionId, + messageId, + actorId, + toolCallId, + output, + error, + txid, + ); + } + + async writeApprovalResponse( + actorId: string, + approvalId: string, + approved: boolean, + txid?: string, + ): Promise { + await this.protocol.writeApprovalResponse( + this.stream, + this.sessionId, + actorId, + approvalId, + approved, + txid, + ); + } +} + +export function createStreamWriter( + protocol: AIDBSessionProtocol, + stream: DurableStream, + sessionId: string, +): StreamWriter { + return new StreamWriter(protocol, stream, sessionId); +} diff --git a/apps/streams/src/index.ts b/apps/streams/src/index.ts index 5085746b209..a51b7cf12f5 100644 --- a/apps/streams/src/index.ts +++ b/apps/streams/src/index.ts @@ -1,279 +1,32 @@ -/** - * Durable Streams Server with Session Registry - * - * Combines the official @durable-streams/server with a session registry API. - * The durable streams server runs on an internal port, and this Hono server - * proxies requests to it while handling /sessions routes directly. - */ - -import { - createServer as createHttpServer, - request as httpRequest, -} from "node:http"; import { DurableStreamTestServer } from "@durable-streams/server"; -import { SessionRegistry } from "./session-registry.js"; - -const DEFAULT_DATA_DIR = "./data"; -const DEFAULT_PORT = 8080; -const INTERNAL_PORT_OFFSET = 1; -const MAX_PORT = 65_535; -const MAX_BODY_BYTES = 1_000_000; - -const dataDir = process.env.DATA_DIR || DEFAULT_DATA_DIR; -const portEnv = process.env.PORT; -const parsedPort = portEnv ? Number.parseInt(portEnv, 10) : DEFAULT_PORT; - -if (!Number.isInteger(parsedPort) || parsedPort < 1 || parsedPort > MAX_PORT) { - console.error(`[streams] Invalid PORT: ${portEnv ?? "unset"}`); - process.exit(1); -} - -const port = parsedPort; -const internalPort = port + INTERNAL_PORT_OFFSET; // Durable streams runs on internal port - -if (internalPort > MAX_PORT) { - console.error(`[streams] Internal port ${internalPort} exceeds ${MAX_PORT}.`); - process.exit(1); -} - -const registry = new SessionRegistry(dataDir); - -// Start the durable streams server on internal port -const durableServer = new DurableStreamTestServer({ - port: internalPort, - host: "127.0.0.1", - dataDir, -}); - -// Create main HTTP server that routes requests -const server = createHttpServer(async (req, res) => { - const url = new URL(req.url || "/", `http://${req.headers.host}`); - - // CORS headers - res.setHeader("Access-Control-Allow-Origin", "*"); - res.setHeader( - "Access-Control-Allow-Methods", - "GET, POST, PUT, DELETE, HEAD, OPTIONS", - ); - res.setHeader( - "Access-Control-Allow-Headers", - "Content-Type, Producer-Id, Producer-Epoch, Producer-Seq, Authorization", - ); - res.setHeader("Access-Control-Expose-Headers", "*"); - - if (req.method === "OPTIONS") { - res.writeHead(204); - res.end(); - return; - } - - // Handle session registry routes - if (url.pathname === "/sessions" || url.pathname === "/sessions/") { - if (req.method === "GET") { - // List all sessions - const sessions = registry.list(); - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify(sessions)); - return; - } - - if (req.method === "POST") { - // Register a new session - try { - const body = await readBody({ req, maxBytes: MAX_BODY_BYTES }); - const { sessionId, title, createdBy } = JSON.parse(body); - - if (!sessionId || !title) { - res.writeHead(400, { "Content-Type": "application/json" }); - res.end( - JSON.stringify({ error: "sessionId and title are required" }), - ); - return; - } - - const session = registry.register({ sessionId, title, createdBy }); - res.writeHead(201, { "Content-Type": "application/json" }); - res.end(JSON.stringify(session)); - return; - } catch (error) { - if (error instanceof PayloadTooLargeError) { - res.writeHead(413, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ error: "Payload too large" })); - return; - } - console.error("[sessions] Failed to parse request body:", error); - res.writeHead(400, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ error: "Invalid JSON body" })); - return; - } - } - - res.writeHead(405, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ error: "Method not allowed" })); - return; - } - - // Handle GET /sessions/:id - const sessionMatch = url.pathname.match(/^\/sessions\/([^/]+)$/); - if (sessionMatch?.[1]) { - const sessionId = sessionMatch[1]; +import { serve } from "@hono/node-server"; +import { createServer } from "./server"; - if (req.method === "GET") { - const session = registry.get(sessionId); - if (!session) { - res.writeHead(404, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ error: "Session not found" })); - return; - } - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify(session)); - return; - } +const PORT = parseInt(process.env.PORT ?? "8080", 10); +const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? "8081", 10); +const DURABLE_STREAMS_URL = + process.env.DURABLE_STREAMS_URL ?? `http://127.0.0.1:${INTERNAL_PORT}`; - res.writeHead(405, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ error: "Method not allowed" })); - return; - } - - // Health check - if (url.pathname === "/health") { - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ status: "ok" })); - return; - } - - // Proxy all other requests to durable streams server - proxyToDurableStreams(req, res, url); +// Start internal durable stream server +const durableStreamServer = new DurableStreamTestServer({ + port: INTERNAL_PORT, }); - -server.on("error", (err) => { - console.error("[streams] HTTP server error:", err); - process.exit(1); +await durableStreamServer.start(); +console.log(`[streams] Durable stream server on port ${INTERNAL_PORT}`); + +// Start proxy server +const { app, protocol } = createServer({ + baseUrl: DURABLE_STREAMS_URL, + cors: true, + logging: true, }); -class PayloadTooLargeError extends Error { - constructor(maxBytes: number) { - super(`Payload exceeded ${maxBytes} bytes`); - this.name = "PayloadTooLargeError"; - } -} - -function readBody({ - req, - maxBytes = Number.POSITIVE_INFINITY, -}: { - req: import("node:http").IncomingMessage; - maxBytes?: number; -}): Promise { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - let size = 0; - let done = false; - - const finish = (error?: Error) => { - if (done) { - return; - } - done = true; - if (error) { - reject(error); - return; - } - resolve(Buffer.concat(chunks).toString("utf-8")); - }; - - req.on("data", (chunk) => { - if (done) { - return; - } - size += chunk.length; - if (size > maxBytes) { - finish(new PayloadTooLargeError(maxBytes)); - return; - } - chunks.push(chunk); - }); - req.on("end", () => finish()); - req.on("error", (error) => finish(error)); - }); -} - -function proxyToDurableStreams( - req: import("node:http").IncomingMessage, - res: import("node:http").ServerResponse, - url: URL, -) { - const proxyReq = httpRequest( - { - hostname: "127.0.0.1", - port: internalPort, - path: url.pathname + url.search, - method: req.method, - headers: req.headers, - }, - (proxyRes) => { - // Forward status and headers - res.writeHead(proxyRes.statusCode || 500, proxyRes.headers); - proxyRes.pipe(res); - }, - ); - - proxyReq.on("error", (err: Error) => { - console.error("[proxy] Error proxying to durable streams:", err); - if (!res.headersSent) { - res.writeHead(502, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ error: "Bad gateway" })); - } - }); - - // Pipe request body for POST/PUT requests - req.pipe(proxyReq); -} - -console.log(`[streams] Starting on port ${port}`); - -// Start both servers -async function start() { - await registry.init(); - - const durableUrl = await durableServer.start(); - console.log(`[streams] Durable streams internal: ${durableUrl}`); - - server.listen(port, "0.0.0.0", () => { - console.log(`[streams] Server running at http://0.0.0.0:${port}`); - }); -} - -start().catch((err) => { - console.error("[streams] Failed to start streams server:", err); - process.exit(1); +serve({ fetch: app.fetch, port: PORT }, (info) => { + console.log(`[streams] Proxy running on http://localhost:${info.port}`); }); // Graceful shutdown -function shutdown(signal: string) { - console.log(`[streams] Received ${signal}, shutting down...`); - server.close(() => { - console.log("[streams] HTTP server closed"); - durableServer - .stop() - .then(() => { - console.log("[streams] Durable streams server closed"); - process.exit(0); - }) - .catch((err) => { - console.error("[streams] Error stopping durable server:", err); - process.exit(1); - }); - }); - - // Force exit if graceful shutdown takes too long - setTimeout(() => { - console.error("[streams] Graceful shutdown timed out, forcing exit"); - process.exit(1); - }, 10_000); -} - -process.on("SIGTERM", () => shutdown("SIGTERM")); -process.on("SIGINT", () => shutdown("SIGINT")); - -export default server; +process.on("SIGINT", async () => { + await durableStreamServer.stop(); + process.exit(0); +}); diff --git a/apps/streams/src/protocol.ts b/apps/streams/src/protocol.ts new file mode 100644 index 00000000000..2ff6b87d98a --- /dev/null +++ b/apps/streams/src/protocol.ts @@ -0,0 +1,760 @@ +/** + * AIDBSessionProtocol - STATE-PROTOCOL implementation for AI DB. + * + * Uses @durable-streams/client to write STATE-PROTOCOL events to Durable Streams. + * Provides: + * - Session management + * - LLM API proxying with stream teeing + * - Agent webhook invocation + * - Chunk framing with sequence numbers + */ + +import { DurableStream } from "@durable-streams/client"; +import { + createMessagesCollection, + createModelMessagesCollection, + createSessionDB, + sessionStateSchema, +} from "@superset/durable-session"; +import type { + AgentSpec, + AIDBProtocolOptions, + ProxySessionState, + StreamChunk, +} from "./types"; + +// Map role to the role type expected by the schema +type MessageRole = "user" | "assistant" | "system"; + +export class AIDBSessionProtocol { + private readonly baseUrl: string; + + /** Active streams by sessionId */ + private streams = new Map(); + + /** Sequence counters per message for deduplication */ + private messageSeqs = new Map(); + + /** Active generation abort controllers */ + private activeAbortControllers = new Map(); + + /** Session state with SessionDB and collections for message materialization */ + private sessionStates = new Map(); + + constructor(options: AIDBProtocolOptions) { + this.baseUrl = options.baseUrl; + } + + // ═══════════════════════════════════════════════════════════════════════ + // Session Management + // ═══════════════════════════════════════════════════════════════════════ + + async createSession( + sessionId: string, + defaultAgents?: AgentSpec[], + ): Promise { + const stream = new DurableStream({ + url: `${this.baseUrl}/v1/stream/sessions/${sessionId}`, + }); + + // Create the stream on the Durable Streams server + await stream.create({ contentType: "application/json" }); + + this.streams.set(sessionId, stream); + + // Initialize session state with SessionDB and collections + await this.initializeSessionState(sessionId); + + // Register default agents if provided + if (defaultAgents && defaultAgents.length > 0) { + for (const agent of defaultAgents) { + await this.writeAgentRegistration(stream, sessionId, agent); + const state = this.sessionStates.get(sessionId); + if (state) { + state.agents.push(agent); + } + } + } + + return stream; + } + + async getOrCreateSession( + sessionId: string, + defaultAgents?: AgentSpec[], + ): Promise { + let stream = this.streams.get(sessionId); + if (!stream) { + stream = await this.createSession(sessionId, defaultAgents); + } + return stream; + } + + getSession(sessionId: string): DurableStream | undefined { + return this.streams.get(sessionId); + } + + deleteSession(sessionId: string): void { + const state = this.sessionStates.get(sessionId); + if (state) { + // Unsubscribe from changes + state.changeSubscription?.unsubscribe(); + // Close SessionDB to cleanup stream subscription + state.sessionDB.close(); + } + + this.streams.delete(sessionId); + this.sessionStates.delete(sessionId); + } + + async resetSession(sessionId: string, _clearPresence = false): Promise { + const stream = this.streams.get(sessionId); + if (!stream) { + throw new Error(`Session ${sessionId} not found`); + } + + // Write control reset event to the stream + const resetEvent = { + headers: { + control: "reset" as const, + }, + }; + + await stream.append(JSON.stringify(resetEvent)); + + // Clear in-memory state + this.messageSeqs.clear(); + const state = this.sessionStates.get(sessionId); + if (state) { + state.activeGenerations = []; + } + + this.updateLastActivity(sessionId); + } + + private updateLastActivity(sessionId: string): void { + const state = this.sessionStates.get(sessionId); + if (state) { + state.lastActivityAt = new Date().toISOString(); + } + } + + private async initializeSessionState(sessionId: string): Promise { + // Create SessionDB (same as client does) + const sessionDB = createSessionDB({ + sessionId, + baseUrl: this.baseUrl, + }); + + // Preload to sync initial data from stream + await sessionDB.preload(); + + // Create the messages collection from chunks + const messages = createMessagesCollection({ + chunksCollection: sessionDB.collections.chunks, + }); + + // Create the model messages collection (LLM-ready) + const modelMessages = createModelMessagesCollection({ + messagesCollection: messages, + }); + + // Store in session state + const state: ProxySessionState = { + createdAt: new Date().toISOString(), + lastActivityAt: new Date().toISOString(), + agents: [], + activeGenerations: [], + sessionDB, + messages, + modelMessages, + changeSubscription: null, + isReady: true, + }; + + this.sessionStates.set(sessionId, state); + + // Set up reactive agent triggering AFTER preload completes + this.setupReactiveAgentTrigger(sessionId); + } + + private setupReactiveAgentTrigger(sessionId: string): void { + const state = this.sessionStates.get(sessionId); + if (!state) return; + + const stream = this.streams.get(sessionId); + if (!stream) return; + + // Subscribe to changes in the modelMessages collection + // subscribeChanges() only fires for NEW changes (after subscription) + const subscription = state.modelMessages.subscribeChanges((changes) => { + for (const change of changes) { + if (change.type !== "insert") continue; + + const message = change.value; + if (!message) continue; + + if (message.role !== "user") continue; + + this.getMessageHistory(sessionId) + .then((history) => { + this.notifyRegisteredAgents( + stream, + sessionId, + "user-messages", + history, + ); + }) + .catch((err) => { + console.error( + `[Protocol] Failed to get message history for agent trigger:`, + err, + ); + }); + } + }); + + state.changeSubscription = subscription; + } + + // ═══════════════════════════════════════════════════════════════════════ + // Chunk Writing (STATE-PROTOCOL) + // ═══════════════════════════════════════════════════════════════════════ + + private getNextSeq(messageId: string): number { + const current = this.messageSeqs.get(messageId) ?? -1; + const next = current + 1; + this.messageSeqs.set(messageId, next); + return next; + } + + private clearSeq(messageId: string): void { + this.messageSeqs.delete(messageId); + } + + async writeChunk( + stream: DurableStream, + sessionId: string, + messageId: string, + actorId: string, + role: MessageRole, + chunk: StreamChunk, + txid?: string, + ): Promise { + const seq = this.getNextSeq(messageId); + + const event = sessionStateSchema.chunks.insert({ + key: `${messageId}:${seq}`, + value: { + messageId, + actorId, + role, + chunk: JSON.stringify(chunk), + seq, + createdAt: new Date().toISOString(), + }, + ...(txid && { headers: { txid } }), + }); + + const result = await stream.append(JSON.stringify(event)); + this.updateLastActivity(sessionId); + + return result; + } + + async writeUserMessage( + stream: DurableStream, + sessionId: string, + messageId: string, + actorId: string, + content: string, + txid?: string, + ): Promise { + const message = { + id: messageId, + role: "user" as const, + parts: [{ type: "text" as const, content }], + createdAt: new Date().toISOString(), + }; + + const event = sessionStateSchema.chunks.insert({ + key: `${messageId}:0`, + value: { + messageId, + actorId, + role: "user" as const, + chunk: JSON.stringify({ + type: "whole-message", + message, + }), + seq: 0, + createdAt: new Date().toISOString(), + }, + ...(txid && { headers: { txid } }), + }); + + const result = await stream.append(JSON.stringify(event)); + this.updateLastActivity(sessionId); + + return result; + } + + async writePresence( + stream: DurableStream, + sessionId: string, + actorId: string, + deviceId: string, + actorType: "user" | "agent", + status: "online" | "offline" | "away", + name?: string, + ): Promise { + const event = sessionStateSchema.presence.upsert({ + key: `${actorId}:${deviceId}`, + value: { + actorId, + deviceId, + actorType, + name, + status, + lastSeenAt: new Date().toISOString(), + }, + }); + + await stream.append(JSON.stringify(event)); + this.updateLastActivity(sessionId); + } + + async getDeviceIdsForActor( + sessionId: string, + actorId: string, + ): Promise { + const state = this.sessionStates.get(sessionId); + if (!state) { + return []; + } + + const presence = state.sessionDB.collections.presence; + const deviceIds: string[] = []; + + for (const row of presence.values()) { + if (row.actorId === actorId && row.status === "online") { + deviceIds.push(row.deviceId); + } + } + + return deviceIds; + } + + async writeAgentRegistration( + stream: DurableStream, + sessionId: string, + agent: AgentSpec, + ): Promise { + const event = sessionStateSchema.agents.upsert({ + key: agent.id, + value: { + agentId: agent.id, + name: agent.name, + endpoint: agent.endpoint, + triggers: agent.triggers, + }, + }); + + const result = await stream.append(JSON.stringify(event)); + this.updateLastActivity(sessionId); + + return result; + } + + async removeAgentRegistration( + stream: DurableStream, + sessionId: string, + agentId: string, + ): Promise { + const event = sessionStateSchema.agents.delete({ + key: agentId, + }); + + const result = await stream.append(JSON.stringify(event)); + this.updateLastActivity(sessionId); + + return result; + } + + // ═══════════════════════════════════════════════════════════════════════ + // Agent Invocation + // ═══════════════════════════════════════════════════════════════════════ + + async invokeAgent( + stream: DurableStream, + sessionId: string, + agent: AgentSpec, + messageHistory: Array<{ role: string; content: string }>, + ): Promise { + const messageId = crypto.randomUUID(); + const abortController = new AbortController(); + + this.activeAbortControllers.set(messageId, abortController); + this.addActiveGeneration(sessionId, messageId); + + try { + const requestBody = { + ...agent.bodyTemplate, + messages: messageHistory, + stream: true, + }; + + const response = await fetch(agent.endpoint, { + method: agent.method ?? "POST", + headers: { + "Content-Type": "application/json", + ...agent.headers, + }, + body: JSON.stringify(requestBody), + signal: abortController.signal, + }); + + if (!response.ok) { + throw new Error( + `Agent request failed: ${response.status} ${response.statusText}`, + ); + } + + if (response.body) { + await this.streamAgentResponse( + stream, + sessionId, + messageId, + agent.id, + response.body, + abortController.signal, + ); + } + } catch (error) { + if ((error as Error).name === "AbortError") { + await this.writeChunk( + stream, + sessionId, + messageId, + agent.id, + "assistant", + { + type: "stop", + reason: "aborted", + } as StreamChunk, + ); + } else { + await this.writeChunk( + stream, + sessionId, + messageId, + agent.id, + "assistant", + { + type: "error", + error: (error as Error).message, + } as StreamChunk, + ); + } + throw error; + } finally { + this.clearSeq(messageId); + this.activeAbortControllers.delete(messageId); + this.removeActiveGeneration(sessionId, messageId); + } + } + + private async streamAgentResponse( + stream: DurableStream, + sessionId: string, + messageId: string, + agentId: string, + responseBody: ReadableStream, + signal: AbortSignal, + ): Promise { + const reader = responseBody.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + try { + while (true) { + if (signal.aborted) break; + + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith(":")) continue; + + if (trimmed.startsWith("data: ")) { + const data = trimmed.slice(6); + if (data === "[DONE]") continue; + + try { + const chunk = JSON.parse(data) as StreamChunk; + await this.writeChunk( + stream, + sessionId, + messageId, + agentId, + "assistant", + chunk, + ); + } catch { + // Skip malformed JSON + } + } + } + } + + // Process remaining buffer + if (buffer.trim()) { + const data = buffer.startsWith("data: ") ? buffer.slice(6) : buffer; + if (data !== "[DONE]") { + try { + const chunk = JSON.parse(data) as StreamChunk; + await this.writeChunk( + stream, + sessionId, + messageId, + agentId, + "assistant", + chunk, + ); + } catch { + // Skip malformed JSON + } + } + } + } finally { + reader.releaseLock(); + } + } + + // ═══════════════════════════════════════════════════════════════════════ + // Agent Registration + // ═══════════════════════════════════════════════════════════════════════ + + async registerAgent(sessionId: string, agent: AgentSpec): Promise { + const state = this.sessionStates.get(sessionId); + if (state) { + state.agents = state.agents.filter((a) => a.id !== agent.id); + state.agents.push(agent); + + const stream = this.streams.get(sessionId); + if (stream) { + await this.writeAgentRegistration(stream, sessionId, agent); + } + } + } + + async registerAgents(sessionId: string, agents: AgentSpec[]): Promise { + for (const agent of agents) { + await this.registerAgent(sessionId, agent); + } + } + + async unregisterAgent(sessionId: string, agentId: string): Promise { + const state = this.sessionStates.get(sessionId); + if (state) { + state.agents = state.agents.filter((a) => a.id !== agentId); + + const stream = this.streams.get(sessionId); + if (stream) { + await this.removeAgentRegistration(stream, sessionId, agentId); + } + } + } + + getRegisteredAgents(sessionId: string): AgentSpec[] { + const state = this.sessionStates.get(sessionId); + return state?.agents ?? []; + } + + async notifyRegisteredAgents( + stream: DurableStream, + sessionId: string, + triggerType: "all" | "user-messages", + messageHistory: Array<{ role: string; content: string }>, + ): Promise { + const agents = this.getRegisteredAgents(sessionId); + + for (const agent of agents) { + const shouldTrigger = + agent.triggers === "all" || + agent.triggers === triggerType || + (agent.triggers === undefined && triggerType === "user-messages"); + + if (shouldTrigger) { + this.invokeAgent(stream, sessionId, agent, messageHistory).catch( + (err) => { + console.error(`Failed to invoke agent ${agent.id}:`, err); + }, + ); + } + } + } + + // ═══════════════════════════════════════════════════════════════════════ + // Active Generation Tracking + // ═══════════════════════════════════════════════════════════════════════ + + private addActiveGeneration(sessionId: string, messageId: string): void { + const state = this.sessionStates.get(sessionId); + if (state && !state.activeGenerations.includes(messageId)) { + state.activeGenerations.push(messageId); + } + } + + private removeActiveGeneration(sessionId: string, messageId: string): void { + const state = this.sessionStates.get(sessionId); + if (state) { + state.activeGenerations = state.activeGenerations.filter( + (id) => id !== messageId, + ); + } + } + + stopGeneration(sessionId: string, messageId: string | null): void { + if (messageId) { + const controller = this.activeAbortControllers.get(messageId); + if (controller) { + controller.abort(); + } + } else { + const state = this.sessionStates.get(sessionId); + if (state) { + for (const id of state.activeGenerations) { + const controller = this.activeAbortControllers.get(id); + if (controller) { + controller.abort(); + } + } + } + } + } + + // ═══════════════════════════════════════════════════════════════════════ + // Tool Results & Approvals + // ═══════════════════════════════════════════════════════════════════════ + + async writeToolResult( + stream: DurableStream, + sessionId: string, + messageId: string, + actorId: string, + toolCallId: string, + output: unknown, + error: string | null, + txid?: string, + ): Promise { + const result = await this.writeChunk( + stream, + sessionId, + messageId, + actorId, + "user", + { + type: "tool-result", + toolCallId, + output, + error, + } as StreamChunk, + txid, + ); + + this.clearSeq(messageId); + return result; + } + + async writeApprovalResponse( + stream: DurableStream, + sessionId: string, + actorId: string, + approvalId: string, + approved: boolean, + txid?: string, + ): Promise { + const messageId = crypto.randomUUID(); + + const result = await this.writeChunk( + stream, + sessionId, + messageId, + actorId, + "user", + { + type: "approval-response", + approvalId, + approved, + } as StreamChunk, + txid, + ); + + this.clearSeq(messageId); + return result; + } + + // ═══════════════════════════════════════════════════════════════════════ + // Session Forking + // ═══════════════════════════════════════════════════════════════════════ + + async forkSession( + sessionId: string, + _atMessageId: string | null, + newSessionId: string | null, + ): Promise<{ sessionId: string; offset: string }> { + const targetSessionId = newSessionId ?? crypto.randomUUID(); + + const sourceStream = this.streams.get(sessionId); + if (!sourceStream) { + throw new Error(`Session ${sessionId} not found`); + } + + await this.createSession(targetSessionId); + + const sourceState = this.sessionStates.get(sessionId); + if (sourceState) { + this.sessionStates.set(targetSessionId, { + ...sourceState, + createdAt: new Date().toISOString(), + lastActivityAt: new Date().toISOString(), + activeGenerations: [], + }); + } + + // TODO: Copy stream data up to atMessageId + return { + sessionId: targetSessionId, + offset: "-1", + }; + } + + // ═══════════════════════════════════════════════════════════════════════ + // Message History + // ═══════════════════════════════════════════════════════════════════════ + + async getMessageHistory( + sessionId: string, + ): Promise> { + const state = this.sessionStates.get(sessionId); + + if (!state || !state.isReady) { + console.warn( + `[Protocol] Session ${sessionId} not ready for message history`, + ); + return []; + } + + return state.modelMessages.toArray.map((msg) => ({ + role: msg.role, + content: msg.content, + })); + } +} diff --git a/apps/streams/src/routes/agents.ts b/apps/streams/src/routes/agents.ts new file mode 100644 index 00000000000..a626b158769 --- /dev/null +++ b/apps/streams/src/routes/agents.ts @@ -0,0 +1,35 @@ +import { Hono } from "hono"; +import { + handleRegisterAgents, + handleUnregisterAgent, +} from "../handlers/invoke-agent"; +import type { AIDBSessionProtocol } from "../protocol"; + +export function createAgentRoutes(protocol: AIDBSessionProtocol) { + const app = new Hono(); + + app.post("/:sessionId/agents", async (c) => { + return handleRegisterAgents(c, protocol); + }); + + app.get("/:sessionId/agents", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const agents = await protocol.getRegisteredAgents(sessionId); + return c.json({ agents }); + } catch (error) { + console.error("Failed to get agents:", error); + return c.json( + { error: "Failed to get agents", details: (error as Error).message }, + 500, + ); + } + }); + + app.delete("/:sessionId/agents/:agentId", async (c) => { + return handleUnregisterAgent(c, protocol); + }); + + return app; +} diff --git a/apps/streams/src/routes/approvals.ts b/apps/streams/src/routes/approvals.ts new file mode 100644 index 00000000000..3760136c7ba --- /dev/null +++ b/apps/streams/src/routes/approvals.ts @@ -0,0 +1,43 @@ +import { Hono } from "hono"; +import type { AIDBSessionProtocol } from "../protocol"; +import { approvalResponseRequestSchema } from "../types"; + +export function createApprovalRoutes(protocol: AIDBSessionProtocol) { + const app = new Hono(); + + app.post("/:sessionId/approvals/:approvalId", async (c) => { + const sessionId = c.req.param("sessionId"); + const approvalId = c.req.param("approvalId"); + + try { + const rawBody = await c.req.json(); + const body = approvalResponseRequestSchema.parse(rawBody); + + const actorId = c.req.header("X-Actor-Id") ?? crypto.randomUUID(); + + const stream = await protocol.getOrCreateSession(sessionId); + + await protocol.writeApprovalResponse( + stream, + sessionId, + actorId, + approvalId, + body.approved, + body.txid, + ); + + return new Response(null, { status: 204 }); + } catch (error) { + console.error("Failed to respond to approval:", error); + return c.json( + { + error: "Failed to respond to approval", + details: (error as Error).message, + }, + 500, + ); + } + }); + + return app; +} diff --git a/apps/streams/src/routes/auth.ts b/apps/streams/src/routes/auth.ts new file mode 100644 index 00000000000..4a932334b0f --- /dev/null +++ b/apps/streams/src/routes/auth.ts @@ -0,0 +1,123 @@ +import { Hono } from "hono"; +import type { AIDBSessionProtocol } from "../protocol"; +import type { AgentSpec } from "../types"; + +export function createAuthRoutes(protocol: AIDBSessionProtocol) { + const app = new Hono(); + + app.post("/:sessionId/login", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const body = await c.req.json(); + const { actorId, deviceId, name, defaultAgents } = body as { + actorId: string; + deviceId: string; + name?: string; + defaultAgents?: AgentSpec[]; + }; + + if (!actorId || !deviceId) { + return c.json({ error: "actorId and deviceId are required" }, 400); + } + + const stream = await protocol.getOrCreateSession( + sessionId, + defaultAgents, + ); + + await protocol.writePresence( + stream, + sessionId, + actorId, + deviceId, + "user", + "online", + name ?? actorId, + ); + + return c.json({ success: true, actorId, deviceId, status: "online" }); + } catch (error) { + console.error("Failed to login:", error); + return c.json( + { error: "Failed to login", details: (error as Error).message }, + 500, + ); + } + }); + + app.post("/:sessionId/logout", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const rawBody = await c.req.text(); + + let body: { actorId?: string; deviceId?: string; allDevices?: boolean }; + try { + body = JSON.parse(rawBody); + } catch (parseError) { + console.error("[AUTH] Failed to parse logout body:", parseError); + return c.json({ error: "Invalid JSON body" }, 400); + } + + const { actorId, deviceId, allDevices } = body; + + if (!actorId) { + return c.json({ error: "actorId is required" }, 400); + } + + if (!deviceId && !allDevices) { + return c.json({ error: "deviceId or allDevices is required" }, 400); + } + + const stream = protocol.getSession(sessionId); + if (!stream) { + return c.json({ error: "Session not found" }, 404); + } + + if (allDevices) { + const deviceIds = await protocol.getDeviceIdsForActor( + sessionId, + actorId, + ); + + for (const devId of deviceIds) { + await protocol.writePresence( + stream, + sessionId, + actorId, + devId, + "user", + "offline", + ); + } + + return c.json({ + success: true, + actorId, + devicesLoggedOut: deviceIds.length, + status: "offline", + }); + } else { + await protocol.writePresence( + stream, + sessionId, + actorId, + deviceId!, + "user", + "offline", + ); + + return c.json({ success: true, actorId, deviceId, status: "offline" }); + } + } catch (error) { + console.error("[AUTH] Failed to logout:", error); + return c.json( + { error: "Failed to logout", details: (error as Error).message }, + 500, + ); + } + }); + + return app; +} diff --git a/apps/streams/src/routes/fork.ts b/apps/streams/src/routes/fork.ts new file mode 100644 index 00000000000..cd133766497 --- /dev/null +++ b/apps/streams/src/routes/fork.ts @@ -0,0 +1,37 @@ +import { Hono } from "hono"; +import type { AIDBSessionProtocol } from "../protocol"; +import { type ForkSessionResponse, forkSessionRequestSchema } from "../types"; + +export function createForkRoutes(protocol: AIDBSessionProtocol) { + const app = new Hono(); + + app.post("/:sessionId/fork", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const rawBody = await c.req.json(); + const body = forkSessionRequestSchema.parse(rawBody); + + const result = await protocol.forkSession( + sessionId, + body.atMessageId ?? null, + body.newSessionId ?? null, + ); + + const response: ForkSessionResponse = { + sessionId: result.sessionId, + offset: result.offset, + }; + + return c.json(response, 201); + } catch (error) { + console.error("Failed to fork session:", error); + return c.json( + { error: "Failed to fork session", details: (error as Error).message }, + 500, + ); + } + }); + + return app; +} diff --git a/apps/streams/src/routes/health.ts b/apps/streams/src/routes/health.ts new file mode 100644 index 00000000000..39a0681682a --- /dev/null +++ b/apps/streams/src/routes/health.ts @@ -0,0 +1,28 @@ +import { Hono } from "hono"; + +export function createHealthRoutes() { + const app = new Hono(); + + app.get("/", (c) => { + return c.json({ + status: "ok", + timestamp: new Date().toISOString(), + }); + }); + + app.get("/ready", (c) => { + return c.json({ + status: "ready", + timestamp: new Date().toISOString(), + }); + }); + + app.get("/live", (c) => { + return c.json({ + status: "live", + timestamp: new Date().toISOString(), + }); + }); + + return app; +} diff --git a/apps/streams/src/routes/index.ts b/apps/streams/src/routes/index.ts new file mode 100644 index 00000000000..a34fe1d7ed3 --- /dev/null +++ b/apps/streams/src/routes/index.ts @@ -0,0 +1,9 @@ +export { createAgentRoutes } from "./agents"; +export { createApprovalRoutes } from "./approvals"; +export { createAuthRoutes } from "./auth"; +export { createForkRoutes } from "./fork"; +export { createHealthRoutes } from "./health"; +export { createMessageRoutes } from "./messages"; +export { createSessionRoutes } from "./sessions"; +export { createStreamRoutes, PROTOCOL_RESPONSE_HEADERS } from "./stream"; +export { createToolResultRoutes } from "./tool-results"; diff --git a/apps/streams/src/routes/messages.ts b/apps/streams/src/routes/messages.ts new file mode 100644 index 00000000000..98c5e1f0891 --- /dev/null +++ b/apps/streams/src/routes/messages.ts @@ -0,0 +1,73 @@ +import { Hono } from "hono"; +import { handleSendMessage } from "../handlers/send-message"; +import type { AIDBSessionProtocol } from "../protocol"; +import { regenerateRequestSchema, stopGenerationRequestSchema } from "../types"; + +export function createMessageRoutes(protocol: AIDBSessionProtocol) { + const app = new Hono(); + + app.post("/:sessionId/messages", async (c) => { + return handleSendMessage(c, protocol); + }); + + app.post("/:sessionId/regenerate", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const rawBody = await c.req.json(); + const body = regenerateRequestSchema.parse(rawBody); + + const _actorId = + body.actorId ?? c.req.header("X-Actor-Id") ?? crypto.randomUUID(); + + const stream = await protocol.getOrCreateSession(sessionId); + + const agents = await protocol.getRegisteredAgents(sessionId); + + if (agents.length === 0) { + return c.json({ error: "No agents registered for regeneration" }, 400); + } + + const messageHistory = [ + { + role: "user", + content: body.content, + }, + ]; + + await protocol.invokeAgent(stream, sessionId, agents[0]!, messageHistory); + + return c.json({ success: true }, 200); + } catch (error) { + console.error("Failed to regenerate:", error); + return c.json( + { error: "Failed to regenerate", details: (error as Error).message }, + 500, + ); + } + }); + + app.post("/:sessionId/stop", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const rawBody = await c.req.json(); + const body = stopGenerationRequestSchema.parse(rawBody); + + await protocol.stopGeneration(sessionId, body.messageId ?? null); + + return new Response(null, { status: 204 }); + } catch (error) { + console.error("Failed to stop generation:", error); + return c.json( + { + error: "Failed to stop generation", + details: (error as Error).message, + }, + 500, + ); + } + }); + + return app; +} diff --git a/apps/streams/src/routes/sessions.ts b/apps/streams/src/routes/sessions.ts new file mode 100644 index 00000000000..4f9023e9b1c --- /dev/null +++ b/apps/streams/src/routes/sessions.ts @@ -0,0 +1,107 @@ +import { Hono } from "hono"; +import type { AIDBSessionProtocol } from "../protocol"; + +export function createSessionRoutes(protocol: AIDBSessionProtocol) { + const app = new Hono(); + + app.put("/:sessionId", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const _stream = await protocol.getOrCreateSession(sessionId); + + return c.json( + { + sessionId, + streamUrl: `/v1/stream/sessions/${sessionId}`, + }, + 200, + ); + } catch (error) { + console.error("Failed to create session:", error); + return c.json( + { + error: "Failed to create session", + details: (error as Error).message, + }, + 500, + ); + } + }); + + app.get("/:sessionId", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const stream = await protocol.getSession(sessionId); + + if (!stream) { + return c.json({ error: "Session not found" }, 404); + } + + return c.json({ + sessionId, + streamUrl: `/v1/stream/sessions/${sessionId}`, + }); + } catch (error) { + console.error("Failed to get session:", error); + return c.json( + { error: "Failed to get session", details: (error as Error).message }, + 500, + ); + } + }); + + app.delete("/:sessionId", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + await protocol.deleteSession(sessionId); + return new Response(null, { status: 204 }); + } catch (error) { + console.error("Failed to delete session:", error); + return c.json( + { + error: "Failed to delete session", + details: (error as Error).message, + }, + 500, + ); + } + }); + + app.post("/:sessionId/reset", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + let clearPresence = false; + try { + const body = await c.req.json(); + clearPresence = body?.clearPresence === true; + } catch { + // No body or invalid JSON - use defaults + } + + await protocol.resetSession(sessionId, clearPresence); + + return c.json({ + success: true, + sessionId, + message: "Session reset. All connected clients will clear their state.", + }); + } catch (error) { + console.error("Failed to reset session:", error); + + if ((error as Error).message.includes("not found")) { + return c.json({ error: "Session not found" }, 404); + } + + return c.json( + { error: "Failed to reset session", details: (error as Error).message }, + 500, + ); + } + }); + + return app; +} diff --git a/apps/streams/src/routes/stream.ts b/apps/streams/src/routes/stream.ts new file mode 100644 index 00000000000..73d80c39025 --- /dev/null +++ b/apps/streams/src/routes/stream.ts @@ -0,0 +1,116 @@ +import { + DURABLE_STREAM_PROTOCOL_QUERY_PARAMS, + STREAM_CURSOR_HEADER, + STREAM_OFFSET_HEADER, + STREAM_UP_TO_DATE_HEADER, +} from "@durable-streams/client"; +import { Hono } from "hono"; + +export const PROTOCOL_RESPONSE_HEADERS = [ + STREAM_OFFSET_HEADER, + STREAM_CURSOR_HEADER, + STREAM_UP_TO_DATE_HEADER, + "Content-Type", + "Cache-Control", + "ETag", +] as const; + +const PROTOCOL_QUERY_PARAMS = DURABLE_STREAM_PROTOCOL_QUERY_PARAMS; + +const _HEADERS_TO_STRIP = [ + "content-encoding", + "content-length", + "transfer-encoding", + "connection", +] as const; + +export function createStreamRoutes(baseUrl: string) { + const app = new Hono(); + + app.get("/sessions/:sessionId", async (c) => { + const sessionId = c.req.param("sessionId"); + + const upstreamUrl = new URL(`${baseUrl}/v1/stream/sessions/${sessionId}`); + + for (const param of PROTOCOL_QUERY_PARAMS) { + const value = c.req.query(param); + if (value !== undefined) { + upstreamUrl.searchParams.set(param, value); + } + } + + try { + const upstreamResponse = await fetch(upstreamUrl.toString(), { + method: "GET", + headers: { + ...Object.fromEntries( + [...c.req.raw.headers.entries()].filter( + ([key]) => + key.toLowerCase() === "authorization" || + key.toLowerCase().startsWith("x-"), + ), + ), + }, + }); + + if (!upstreamResponse.ok) { + if (upstreamResponse.status === 404) { + return c.json({ error: "Stream not found" }, 404); + } + + const errorText = await upstreamResponse + .text() + .catch(() => "Unknown error"); + return c.json( + { + error: "Upstream error", + status: upstreamResponse.status, + details: errorText, + }, + upstreamResponse.status as 400 | 500, + ); + } + + const responseHeaders = new Headers(); + + for (const header of PROTOCOL_RESPONSE_HEADERS) { + const value = upstreamResponse.headers.get(header); + if (value !== null) { + responseHeaders.set(header, value); + } + } + + if (upstreamResponse.status === 204) { + const nextOffset = upstreamResponse.headers.get(STREAM_OFFSET_HEADER); + if (nextOffset) { + c.header(STREAM_OFFSET_HEADER, nextOffset); + } + return c.body(null, 204); + } + + if (!upstreamResponse.body) { + for (const [key, value] of responseHeaders.entries()) { + c.header(key, value); + } + return c.body(null, upstreamResponse.status as 200); + } + + for (const [key, value] of responseHeaders.entries()) { + c.header(key, value); + } + c.status(upstreamResponse.status as 200); + return c.body(upstreamResponse.body); + } catch (error) { + console.error("Stream proxy error:", error); + return c.json( + { + error: "Failed to proxy stream request", + details: (error as Error).message, + }, + 502, + ); + } + }); + + return app; +} diff --git a/apps/streams/src/routes/tool-results.ts b/apps/streams/src/routes/tool-results.ts new file mode 100644 index 00000000000..ca7e29912dc --- /dev/null +++ b/apps/streams/src/routes/tool-results.ts @@ -0,0 +1,45 @@ +import { Hono } from "hono"; +import type { AIDBSessionProtocol } from "../protocol"; +import { toolResultRequestSchema } from "../types"; + +export function createToolResultRoutes(protocol: AIDBSessionProtocol) { + const app = new Hono(); + + app.post("/:sessionId/tool-results", async (c) => { + const sessionId = c.req.param("sessionId"); + + try { + const rawBody = await c.req.json(); + const body = toolResultRequestSchema.parse(rawBody); + + const actorId = c.req.header("X-Actor-Id") ?? crypto.randomUUID(); + const messageId = body.messageId ?? crypto.randomUUID(); + + const stream = await protocol.getOrCreateSession(sessionId); + + await protocol.writeToolResult( + stream, + sessionId, + messageId, + actorId, + body.toolCallId, + body.output, + body.error ?? null, + body.txid, + ); + + return new Response(null, { status: 204 }); + } catch (error) { + console.error("Failed to add tool result:", error); + return c.json( + { + error: "Failed to add tool result", + details: (error as Error).message, + }, + 500, + ); + } + }); + + return app; +} diff --git a/apps/streams/src/server.ts b/apps/streams/src/server.ts new file mode 100644 index 00000000000..9f49ae2f1d2 --- /dev/null +++ b/apps/streams/src/server.ts @@ -0,0 +1,123 @@ +/** + * AI DB Proxy Server + * + * Hono-based HTTP server implementing the AI DB Wrapper Protocol. + */ + +import { Hono } from "hono"; +import { cors } from "hono/cors"; +import { logger } from "hono/logger"; +import { AIDBSessionProtocol } from "./protocol"; +import { + createAgentRoutes, + createApprovalRoutes, + createAuthRoutes, + createForkRoutes, + createHealthRoutes, + createMessageRoutes, + createSessionRoutes, + createStreamRoutes, + createToolResultRoutes, + PROTOCOL_RESPONSE_HEADERS, +} from "./routes"; +import type { AIDBProtocolOptions } from "./types"; + +export interface AIDBProxyServerOptions extends AIDBProtocolOptions { + /** Enable CORS */ + cors?: boolean; + /** Enable request logging */ + logging?: boolean; + /** Custom CORS origins */ + corsOrigins?: string | string[]; +} + +export function createServer(options: AIDBProxyServerOptions) { + const app = new Hono(); + + // Create protocol instance + const protocol = new AIDBSessionProtocol({ + baseUrl: options.baseUrl, + storage: options.storage, + }); + + // Middleware + if (options.cors !== false) { + app.use( + "*", + cors({ + origin: options.corsOrigins ?? "*", + allowMethods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"], + allowHeaders: [ + "Content-Type", + "Authorization", + "X-Actor-Id", + "X-Actor-Type", + "X-Session-Id", + ], + // Expose Durable Streams protocol headers to browser clients + exposeHeaders: [...PROTOCOL_RESPONSE_HEADERS], + }), + ); + } + + if (options.logging !== false) { + app.use("*", logger()); + } + + // Health routes + app.route("/health", createHealthRoutes()); + + // API v1 routes + const v1 = new Hono(); + + // Session management + v1.route("/sessions", createSessionRoutes(protocol)); + + // Auth (login/logout - nested under sessions) + v1.route("/sessions", createAuthRoutes(protocol)); + + // Messages (nested under sessions) + v1.route("/sessions", createMessageRoutes(protocol)); + + // Agents (nested under sessions) + v1.route("/sessions", createAgentRoutes(protocol)); + + // Tool results (nested under sessions) + v1.route("/sessions", createToolResultRoutes(protocol)); + + // Approvals (nested under sessions) + v1.route("/sessions", createApprovalRoutes(protocol)); + + // Fork (nested under sessions) + v1.route("/sessions", createForkRoutes(protocol)); + + // Stream proxy - forwards to Durable Streams server + v1.route("/stream", createStreamRoutes(options.baseUrl)); + + app.route("/v1", v1); + + // Root info + app.get("/", (c) => { + return c.json({ + name: "@superset/streams", + version: "0.1.0", + endpoints: { + health: "/health", + stream: "/v1/stream/sessions/:sessionId", + sessions: "/v1/sessions/:sessionId", + messages: "/v1/sessions/:sessionId/messages", + agents: "/v1/sessions/:sessionId/agents", + toolResults: "/v1/sessions/:sessionId/tool-results", + approvals: "/v1/sessions/:sessionId/approvals/:approvalId", + fork: "/v1/sessions/:sessionId/fork", + stop: "/v1/sessions/:sessionId/stop", + regenerate: "/v1/sessions/:sessionId/regenerate", + reset: "/v1/sessions/:sessionId/reset", + }, + }); + }); + + return { app, protocol }; +} + +export default createServer; diff --git a/apps/streams/src/session-registry.ts b/apps/streams/src/session-registry.ts deleted file mode 100644 index 8ee11a9b8f9..00000000000 --- a/apps/streams/src/session-registry.ts +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Session Registry - * - * In-memory session registry with file persistence for tracking chat sessions. - * This provides a lightweight session index without full database persistence. - */ - -import { mkdir, readFile, rename, writeFile } from "node:fs/promises"; -import { dirname, join } from "node:path"; - -const LOG_PREFIX = "[session-registry]"; -const SESSIONS_FILE = "sessions.json"; -const TMP_SUFFIX = ".tmp"; - -export interface SessionInfo { - sessionId: string; - title: string; - createdAt: string; - createdBy?: string; -} - -export class SessionRegistry { - private sessions: Map = new Map(); - private filePath: string; - private persistQueue: Promise = Promise.resolve(); - - constructor(dataDir: string) { - this.filePath = join(dataDir, SESSIONS_FILE); - } - - async init(): Promise { - await this.load(); - } - - list(): SessionInfo[] { - return Array.from(this.sessions.values()).sort( - (a, b) => - new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime(), - ); - } - - get(sessionId: string): SessionInfo | undefined { - return this.sessions.get(sessionId); - } - - register(info: Omit): SessionInfo { - const existing = this.sessions.get(info.sessionId); - if (existing) { - return existing; - } - - const session: SessionInfo = { - ...info, - createdAt: new Date().toISOString(), - }; - - this.sessions.set(info.sessionId, session); - this.persist(); - - return session; - } - - private persist(): void { - this.persistQueue = this.persistQueue.then(async () => { - try { - const dir = dirname(this.filePath); - await mkdir(dir, { recursive: true }); - const data = JSON.stringify( - Array.from(this.sessions.values()), - null, - 2, - ); - // Write to temp file then rename for crash safety - const tmpPath = `${this.filePath}${TMP_SUFFIX}`; - await writeFile(tmpPath, data, "utf-8"); - await rename(tmpPath, this.filePath); - } catch (error) { - console.error(`${LOG_PREFIX} Failed to persist sessions:`, error); - } - }); - } - - private async load(): Promise { - try { - const data = await readFile(this.filePath, "utf-8"); - const sessions: SessionInfo[] = JSON.parse(data); - for (const session of sessions) { - this.sessions.set(session.sessionId, session); - } - console.log(`${LOG_PREFIX} Loaded ${sessions.length} sessions from disk`); - } catch (error) { - const err = error as NodeJS.ErrnoException; - if (err.code === "ENOENT") { - return; - } - console.error(`${LOG_PREFIX} Failed to load sessions:`, error); - } - } -} diff --git a/apps/streams/src/types.ts b/apps/streams/src/types.ts new file mode 100644 index 00000000000..89e14927224 --- /dev/null +++ b/apps/streams/src/types.ts @@ -0,0 +1,203 @@ +/** + * Type definitions for the durable session proxy. + */ + +import type { + MessageRow, + ModelMessage, + SessionDB, +} from "@superset/durable-session"; +import type { Collection } from "@tanstack/db"; +import { z } from "zod"; + +// ============================================================================ +// Stream Row Types +// ============================================================================ + +export type ActorType = "user" | "agent"; + +export interface StreamRow { + sessionId: string; + messageId: string; + actorId: string; + actorType: ActorType; + chunk: string; + createdAt: string; + seq: number; +} + +export const streamRowSchema = z.object({ + sessionId: z.string(), + messageId: z.string(), + actorId: z.string(), + actorType: z.enum(["user", "agent"]), + chunk: z.string(), + createdAt: z.string(), + seq: z.number(), +}); + +// ============================================================================ +// Agent Types +// ============================================================================ + +export type AgentTrigger = "all" | "user-messages"; + +export interface AgentSpec { + id: string; + name?: string; + endpoint: string; + method?: "POST"; + headers?: Record; + triggers?: AgentTrigger; + bodyTemplate?: Record; +} + +export const agentSpecSchema = z.object({ + id: z.string(), + name: z.string().optional(), + endpoint: z.string().url(), + method: z.literal("POST").optional(), + headers: z.record(z.string(), z.string()).optional(), + triggers: z.enum(["all", "user-messages"]).optional(), + bodyTemplate: z.record(z.string(), z.unknown()).optional(), +}); + +// ============================================================================ +// Request Types +// ============================================================================ + +export interface SendMessageRequest { + messageId?: string; + content: string; + role?: "user" | "assistant" | "system"; + actorId?: string; + actorType?: ActorType; + agent?: AgentSpec; + txid?: string; +} + +export const sendMessageRequestSchema = z.object({ + messageId: z.string().uuid().optional(), + content: z.string(), + role: z.enum(["user", "assistant", "system"]).optional(), + actorId: z.string().optional(), + actorType: z.enum(["user", "agent"]).optional(), + agent: agentSpecSchema.optional(), + txid: z.string().uuid().optional(), +}); + +export interface ToolResultRequest { + toolCallId: string; + output: unknown; + error?: string | null; + messageId?: string; + txid?: string; +} + +export const toolResultRequestSchema = z.object({ + toolCallId: z.string(), + output: z.unknown(), + error: z.string().nullable().optional(), + messageId: z.string().optional(), + txid: z.string().uuid().optional(), +}); + +export interface ApprovalResponseRequest { + approved: boolean; + txid?: string; +} + +export const approvalResponseRequestSchema = z.object({ + approved: z.boolean(), + txid: z.string().uuid().optional(), +}); + +export interface RegisterAgentsRequest { + agents: AgentSpec[]; +} + +export const registerAgentsRequestSchema = z.object({ + agents: z.array(agentSpecSchema), +}); + +export interface ForkSessionRequest { + atMessageId?: string | null; + newSessionId?: string | null; +} + +export const forkSessionRequestSchema = z.object({ + atMessageId: z.string().nullable().optional(), + newSessionId: z.string().uuid().nullable().optional(), +}); + +export interface StopGenerationRequest { + messageId?: string | null; +} + +export const stopGenerationRequestSchema = z.object({ + messageId: z.string().nullable().optional(), +}); + +export interface RegenerateRequest { + fromMessageId: string; + content: string; + actorId?: string; + actorType?: ActorType; +} + +export const regenerateRequestSchema = z.object({ + fromMessageId: z.string(), + content: z.string(), + actorId: z.string().optional(), + actorType: z.enum(["user", "agent"]).optional(), +}); + +// ============================================================================ +// Response Types +// ============================================================================ + +export interface SendMessageResponse { + messageId: string; +} + +export interface ForkSessionResponse { + sessionId: string; + offset: string; +} + +// ============================================================================ +// Stream Chunk Types (TanStack AI compatible) +// ============================================================================ + +export interface StreamChunk { + type: string; + [key: string]: unknown; +} + +// ============================================================================ +// Session State Types +// ============================================================================ + +export interface SessionState { + createdAt: string; + lastActivityAt: string; + agents: AgentSpec[]; + activeGenerations: string[]; +} + +export interface ProxySessionState extends SessionState { + sessionDB: SessionDB; + messages: Collection; + modelMessages: Collection; + changeSubscription: { unsubscribe: () => void } | null; + isReady: boolean; +} + +// ============================================================================ +// Protocol Options +// ============================================================================ + +export interface AIDBProtocolOptions { + baseUrl: string; + storage?: "memory" | "durable-object"; +} diff --git a/bun.lock b/bun.lock index fe6e3436d9d..ec277ce7d42 100644 --- a/bun.lock +++ b/bun.lock @@ -454,11 +454,16 @@ "name": "@superset/streams", "version": "0.0.1", "dependencies": { + "@durable-streams/client": "^0.2.0", "@durable-streams/server": "^0.2.0", + "@superset/durable-session": "workspace:*", + "@tanstack/db": "^0.5.22", + "hono": "^4.4.0", + "zod": "^4.1.12", }, "devDependencies": { - "@durable-streams/client": "^0.2.0", "@durable-streams/server-conformance-tests": "^0.2.0", + "@hono/node-server": "^1.13.0", "@superset/typescript": "workspace:*", "@types/node": "^24.9.1", "fast-check": "^4.5.3", @@ -515,32 +520,6 @@ "typescript": "^5.9.3", }, }, - "packages/ai-chat": { - "name": "@superset/ai-chat", - "version": "0.0.1", - "dependencies": { - "@anthropic-ai/claude-agent-sdk": "^0.2.19", - "@anthropic-ai/sdk": "^0.72.1", - "@durable-streams/client": "^0.2.0", - "@durable-streams/state": "^0.2.0", - "@superset/ui": "workspace:*", - "@tanstack/db": "0.5.22", - "@tanstack/react-db": "0.1.66", - "lucide-react": "^0.563.0", - "zod": "^4.3.5", - }, - "devDependencies": { - "@superset/typescript": "workspace:*", - "@types/node": "^24.9.1", - "@types/react": "~19.1.0", - "bun-types": "^1.3.1", - "react": "19.1.0", - "typescript": "^5.9.3", - }, - "peerDependencies": { - "react": "^18.0.0 || ^19.0.0", - }, - }, "packages/auth": { "name": "@superset/auth", "version": "0.1.0", @@ -1908,8 +1887,6 @@ "@superset/admin": ["@superset/admin@workspace:apps/admin"], - "@superset/ai-chat": ["@superset/ai-chat@workspace:packages/ai-chat"], - "@superset/api": ["@superset/api@workspace:apps/api"], "@superset/auth": ["@superset/auth@workspace:packages/auth"], diff --git a/docs/ai-chat-plan.md b/docs/ai-chat-plan.md index 40bd2225081..a5ae641b479 100644 --- a/docs/ai-chat-plan.md +++ b/docs/ai-chat-plan.md @@ -117,7 +117,7 @@ The agent endpoint converts these to TanStack AI `StreamChunk` format before wri | ChatInput component | DONE — `packages/durable-session/src/react/components/ChatInput/` | | PresenceBar component | DONE — `packages/durable-session/src/react/components/PresenceBar/` | | Old ai-chat package | REMOVED — replaced by `@superset/durable-session` | -| Vendored proxy (A2) | NOT BUILT — **Next phase** | +| Vendored proxy (A2) | DONE — `apps/streams/src/` (vendored from electric-sql/transport, JSON.stringify fix for DurableStream.append) | | Claude agent endpoint (B) | NOT BUILT | | Database schema | NOT BUILT | | API chat router | NOT BUILT | @@ -310,7 +310,7 @@ import { ChatInput, PresenceBar } from '@superset/durable-session/react' The old `packages/ai-chat` package has been fully removed. -### A2. Vendor proxy into `apps/streams/` — NEXT +### A2. Vendor proxy into `apps/streams/` — DONE Vendor from `packages/durable-session-proxy` in the transport repo. @@ -901,27 +901,29 @@ All files below are created and typechecking. Compatibility fixes applied for un | `apps/streams/src/claude-agent.ts` | Claude agent HTTP endpoint | ~120 | | `apps/streams/src/sdk-to-ai-chunks.ts` | SDKMessage → TanStack AI chunk converter | ~200 | -### Files to CREATE (vendored proxy) +### Files CREATED (vendored proxy — Phase A2) ✅ -| Destination | Source | Lines | +All files below are created and typechecking. Compatibility fix: `DurableStream.append()` in published `@durable-streams/client@0.2.1` only accepts `string | Uint8Array`, not `ChangeEvent` objects. All `stream.append(event)` calls wrapped with `JSON.stringify()`. + +| Destination | Source | Status | |---|---|---| -| `apps/streams/src/server.ts` | `durable-session-proxy/src/server.ts` | 149 | -| `apps/streams/src/protocol.ts` | `durable-session-proxy/src/protocol.ts` | 917 | -| `apps/streams/src/types.ts` | `durable-session-proxy/src/types.ts` | 278 | -| `apps/streams/src/handlers/index.ts` | `durable-session-proxy/src/handlers/index.ts` | 12 | -| `apps/streams/src/handlers/send-message.ts` | `durable-session-proxy/src/handlers/send-message.ts` | 81 | -| `apps/streams/src/handlers/invoke-agent.ts` | `durable-session-proxy/src/handlers/invoke-agent.ts` | 128 | -| `apps/streams/src/handlers/stream-writer.ts` | `durable-session-proxy/src/handlers/stream-writer.ts` | 124 | -| `apps/streams/src/routes/index.ts` | `durable-session-proxy/src/routes/index.ts` | 14 | -| `apps/streams/src/routes/sessions.ts` | `durable-session-proxy/src/routes/sessions.ts` | 136 | -| `apps/streams/src/routes/messages.ts` | `durable-session-proxy/src/routes/messages.ts` | 99 | -| `apps/streams/src/routes/agents.ts` | `durable-session-proxy/src/routes/agents.ts` | 58 | -| `apps/streams/src/routes/stream.ts` | `durable-session-proxy/src/routes/stream.ts` | 162 | -| `apps/streams/src/routes/tool-results.ts` | `durable-session-proxy/src/routes/tool-results.ts` | 57 | -| `apps/streams/src/routes/approvals.ts` | `durable-session-proxy/src/routes/approvals.ts` | 58 | -| `apps/streams/src/routes/health.ts` | `durable-session-proxy/src/routes/health.ts` | 51 | -| `apps/streams/src/routes/auth.ts` | `durable-session-proxy/src/routes/auth.ts` | 146 | -| `apps/streams/src/routes/fork.ts` | `durable-session-proxy/src/routes/fork.ts` | 50 | +| `apps/streams/src/server.ts` | `durable-session-proxy/src/server.ts` | ✅ | +| `apps/streams/src/protocol.ts` | `durable-session-proxy/src/protocol.ts` | ✅ (fixed: JSON.stringify for append) | +| `apps/streams/src/types.ts` | `durable-session-proxy/src/types.ts` | ✅ | +| `apps/streams/src/handlers/index.ts` | `durable-session-proxy/src/handlers/index.ts` | ✅ | +| `apps/streams/src/handlers/send-message.ts` | `durable-session-proxy/src/handlers/send-message.ts` | ✅ | +| `apps/streams/src/handlers/invoke-agent.ts` | `durable-session-proxy/src/handlers/invoke-agent.ts` | ✅ | +| `apps/streams/src/handlers/stream-writer.ts` | `durable-session-proxy/src/handlers/stream-writer.ts` | ✅ | +| `apps/streams/src/routes/index.ts` | `durable-session-proxy/src/routes/index.ts` | ✅ | +| `apps/streams/src/routes/sessions.ts` | `durable-session-proxy/src/routes/sessions.ts` | ✅ | +| `apps/streams/src/routes/messages.ts` | `durable-session-proxy/src/routes/messages.ts` | ✅ | +| `apps/streams/src/routes/agents.ts` | `durable-session-proxy/src/routes/agents.ts` | ✅ | +| `apps/streams/src/routes/stream.ts` | `durable-session-proxy/src/routes/stream.ts` | ✅ | +| `apps/streams/src/routes/tool-results.ts` | `durable-session-proxy/src/routes/tool-results.ts` | ✅ | +| `apps/streams/src/routes/approvals.ts` | `durable-session-proxy/src/routes/approvals.ts` | ✅ | +| `apps/streams/src/routes/health.ts` | `durable-session-proxy/src/routes/health.ts` | ✅ | +| `apps/streams/src/routes/auth.ts` | `durable-session-proxy/src/routes/auth.ts` | ✅ | +| `apps/streams/src/routes/fork.ts` | `durable-session-proxy/src/routes/fork.ts` | ✅ | ### Files DELETED ✅ @@ -929,24 +931,30 @@ All files below are created and typechecking. Compatibility fixes applied for un |---|---|---| | `packages/ai-chat/` (entire package) | Replaced by `@superset/durable-session` | ✅ Removed | -### Files to DELETE (remaining) +### Files DELETED (Phase A2) ✅ -| File | Reason | -|---|---| -| `apps/streams/src/session-registry.ts` | Replaced by proxy's built-in session management (Phase A2) | +| File | Reason | Status | +|---|---|---| +| `apps/streams/src/session-registry.ts` | Replaced by proxy's built-in session management | ✅ Removed | + +### Files REWRITTEN (Phase A2) ✅ + +| File | Description | Status | +|---|---|---| +| `apps/streams/src/index.ts` | New entrypoint with Hono proxy + DurableStreamTestServer | ✅ | ### Files to REWRITE (remaining) | File | Description | |---|---| -| `apps/streams/src/index.ts` | New entrypoint with Hono proxy + DurableStreamTestServer (Phase A2) | | `apps/desktop/.../session-manager.ts` | Thin HTTP orchestrator (no StreamWatcher/Producer) (Phase C2) | -### Files to MODIFY (remaining) +### Files MODIFIED (Phase A2) ✅ -| File | Changes | -|---|---| -| `apps/streams/package.json` | Add: hono, @hono/node-server, @durable-streams/client, @superset/durable-session (Phase A2) | +| File | Changes | Status | +|---|---|---| +| `apps/streams/package.json` | Added: hono, @hono/node-server, @durable-streams/client, @superset/durable-session, @tanstack/db, zod | ✅ | +| `packages/durable-session/src/client.ts` | Fixed: `response.json()` return type assertion for `ForkResult` | ✅ | --- @@ -954,7 +962,7 @@ All files below are created and typechecking. Compatibility fixes applied for un 1. ~~**Phase A1** — Vendor `@superset/durable-session` package~~ ✅ DONE 2. ~~**Phase C1** — Remove old `packages/ai-chat`, migrate UI components~~ ✅ DONE -3. **Phase A2** — Vendor proxy into `apps/streams` (copy 17 files, adjust 3 import paths) ← NEXT +3. ~~**Phase A2** — Vendor proxy into `apps/streams` (copy 17 files, adjust 3 import paths)~~ ✅ DONE 4. **Phase B** — Claude agent endpoint + SDK-to-AI chunk converter (2 new files) 5. **Phase C2** — Simplify desktop session manager 6. **Phase C3** — Handle drafts @@ -975,7 +983,7 @@ All files below are created and typechecking. Compatibility fixes applied for un | Claude binary path outside Electron | Agent can't start | `CLAUDE_BINARY_PATH` env var set by desktop at streams startup | Pending | | Multi-turn resume state lost on restart | Context lost | In-memory map + optional file-based persistence in data dir | Pending | | Interrupt via HTTP abort | Claude subprocess continues | Agent detects fetch abort → calls `query.interrupt()` + `abortController.abort()` | Pending | -| Proxy `workspace:*` TanStack DB deps | Import errors | Pin all `@tanstack/*` to compatible published versions across monorepo | Pending (Phase A2) | +| Proxy `workspace:*` TanStack DB deps | Import errors | Pin all `@tanstack/*` to compatible published versions across monorepo | ✅ Resolved — imports changed to `@superset/durable-session`, `DurableStream.append()` wrapped with `JSON.stringify()` | --- diff --git a/packages/durable-session/src/client.ts b/packages/durable-session/src/client.ts index c074c0e3db2..4a715be31d1 100644 --- a/packages/durable-session/src/client.ts +++ b/packages/durable-session/src/client.ts @@ -654,7 +654,7 @@ export class DurableChatClient< ); } - return await response.json(); + return (await response.json()) as ForkResult; } /** From 11986c35c98d95c1e3c2b7b583a34f609be5fff1 Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Thu, 5 Feb 2026 16:10:20 -0800 Subject: [PATCH 2/4] Fix minor issues --- apps/streams/package.json | 2 +- apps/streams/src/handlers/send-message.ts | 6 +++++- apps/streams/src/index.ts | 2 +- apps/streams/src/routes/auth.ts | 4 ++-- apps/streams/src/routes/messages.ts | 7 ++++++- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/apps/streams/package.json b/apps/streams/package.json index 5758514b14e..512bcc2ca5a 100644 --- a/apps/streams/package.json +++ b/apps/streams/package.json @@ -17,12 +17,12 @@ "@durable-streams/server": "^0.2.0", "@superset/durable-session": "workspace:*", "@tanstack/db": "^0.5.22", + "@hono/node-server": "^1.13.0", "hono": "^4.4.0", "zod": "^4.1.12" }, "devDependencies": { "@durable-streams/server-conformance-tests": "^0.2.0", - "@hono/node-server": "^1.13.0", "@superset/typescript": "workspace:*", "@types/node": "^24.9.1", "fast-check": "^4.5.3", diff --git a/apps/streams/src/handlers/send-message.ts b/apps/streams/src/handlers/send-message.ts index 1f9208b4ffe..0e79844bb28 100644 --- a/apps/streams/src/handlers/send-message.ts +++ b/apps/streams/src/handlers/send-message.ts @@ -42,7 +42,11 @@ export async function handleSendMessage( if (body.agent) { const messageHistory = await protocol.getMessageHistory(sessionId); - protocol.invokeAgent(stream, sessionId, body.agent, messageHistory); + protocol + .invokeAgent(stream, sessionId, body.agent, messageHistory) + .catch((err) => { + console.error("[streams/send-message] Agent invocation failed:", err); + }); } const response: SendMessageResponse = { messageId }; diff --git a/apps/streams/src/index.ts b/apps/streams/src/index.ts index a51b7cf12f5..affce4c3a31 100644 --- a/apps/streams/src/index.ts +++ b/apps/streams/src/index.ts @@ -15,7 +15,7 @@ await durableStreamServer.start(); console.log(`[streams] Durable stream server on port ${INTERNAL_PORT}`); // Start proxy server -const { app, protocol } = createServer({ +const { app } = createServer({ baseUrl: DURABLE_STREAMS_URL, cors: true, logging: true, diff --git a/apps/streams/src/routes/auth.ts b/apps/streams/src/routes/auth.ts index 4a932334b0f..7b59ad10655 100644 --- a/apps/streams/src/routes/auth.ts +++ b/apps/streams/src/routes/auth.ts @@ -66,7 +66,7 @@ export function createAuthRoutes(protocol: AIDBSessionProtocol) { return c.json({ error: "actorId is required" }, 400); } - if (!deviceId && !allDevices) { + if (!allDevices && !deviceId) { return c.json({ error: "deviceId or allDevices is required" }, 400); } @@ -103,7 +103,7 @@ export function createAuthRoutes(protocol: AIDBSessionProtocol) { stream, sessionId, actorId, - deviceId!, + deviceId as string, "user", "offline", ); diff --git a/apps/streams/src/routes/messages.ts b/apps/streams/src/routes/messages.ts index 98c5e1f0891..ed958e5a6b2 100644 --- a/apps/streams/src/routes/messages.ts +++ b/apps/streams/src/routes/messages.ts @@ -35,7 +35,12 @@ export function createMessageRoutes(protocol: AIDBSessionProtocol) { }, ]; - await protocol.invokeAgent(stream, sessionId, agents[0]!, messageHistory); + const agent = agents[0]; + if (!agent) { + return c.json({ error: "No agents registered for regeneration" }, 400); + } + + await protocol.invokeAgent(stream, sessionId, agent, messageHistory); return c.json({ success: true }, 200); } catch (error) { From e4624fd4d124c31498eee702df17092c33c7057d Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Thu, 5 Feb 2026 16:12:47 -0800 Subject: [PATCH 3/4] Add error logging to silent catch blocks in SSE chunk parsing --- apps/streams/src/protocol.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/streams/src/protocol.ts b/apps/streams/src/protocol.ts index 2ff6b87d98a..750496c14c4 100644 --- a/apps/streams/src/protocol.ts +++ b/apps/streams/src/protocol.ts @@ -506,8 +506,12 @@ export class AIDBSessionProtocol { "assistant", chunk, ); - } catch { - // Skip malformed JSON + } catch (err) { + console.error( + "[streams/protocol] Malformed SSE chunk:", + data, + err, + ); } } } @@ -527,8 +531,8 @@ export class AIDBSessionProtocol { "assistant", chunk, ); - } catch { - // Skip malformed JSON + } catch (err) { + console.error("[streams/protocol] Malformed SSE chunk:", data, err); } } } From 9de10e5b47432fed3a506846d8539db937451113 Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Thu, 5 Feb 2026 16:17:50 -0800 Subject: [PATCH 4/4] docs: update ai-chat-plan to reflect PR review fixes --- docs/ai-chat-plan.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/ai-chat-plan.md b/docs/ai-chat-plan.md index a5ae641b479..a459352f357 100644 --- a/docs/ai-chat-plan.md +++ b/docs/ai-chat-plan.md @@ -111,7 +111,7 @@ The agent endpoint converts these to TanStack AI `StreamChunk` format before wri | Auth (buildClaudeEnv) | DONE — `apps/desktop/src/lib/trpc/routers/ai-chat/utils/auth/auth.ts` | | Session manager (v1) | DONE — `apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts` | | Desktop tRPC router | DONE — `apps/desktop/src/lib/trpc/routers/ai-chat/index.ts` | -| Durable stream server (v1) | DONE — `apps/streams/` (custom HTTP proxy + session registry) | +| Durable stream server (v1) | DONE — `apps/streams/` (vendored proxy from electric-sql/transport) | | Vendored durable-session client | DONE — `packages/durable-session/` (vendored from electric-sql/transport) | | React hook (useDurableChat) | DONE — `packages/durable-session/src/react/use-durable-chat.ts` | | ChatInput component | DONE — `packages/durable-session/src/react/components/ChatInput/` | @@ -369,7 +369,7 @@ await durableStreamServer.start() console.log(`[streams] Durable stream server on port ${INTERNAL_PORT}`) // Start proxy server -const { app, protocol } = createServer({ +const { app } = createServer({ baseUrl: DURABLE_STREAMS_URL, cors: true, logging: true, @@ -406,13 +406,11 @@ The `AIDBSessionProtocol` class manages: "dependencies": { "@durable-streams/server": "^0.2.0", "@durable-streams/client": "^0.2.0", + "@hono/node-server": "^1.13.0", "@superset/durable-session": "workspace:*", "@tanstack/db": "^0.5.22", "hono": "^4.4.0", "zod": "^4.1.12" - }, - "devDependencies": { - "@hono/node-server": "^1.13.0" } } ``` @@ -837,7 +835,7 @@ Web uses same `useDurableChat` hook pointing at deployed proxy URL. | `@tanstack/db-ivm` | ^0.1.17 | durable-session (incremental view maintenance) | | `@standard-schema/spec` | ^1.0.0 | durable-session (schema validation) | | `hono` | ^4.4.0 | apps/streams (proxy HTTP framework) | -| `@hono/node-server` | ^1.13.0 | apps/streams (dev server) | +| `@hono/node-server` | ^1.13.0 | apps/streams (Hono Node.js HTTP adapter) | **Already installed:** - `@durable-streams/client` ^0.2.0, `@durable-streams/server` ^0.2.0, `@durable-streams/state` ^0.2.0