feat: add streams server (Hono + durable-streams)#1203
Conversation
Add apps/streams with: - Hono HTTP server for AI chat streaming - Session registry for managing Claude agent sessions - Dockerfile and fly.toml for deployment - Architecture documentation
📝 WalkthroughWalkthroughAdds a new Streams application: service code (HTTP facade + internal Durable Streams server), a file-backed in-memory session registry, TypeScript project and package, Docker and Fly deployment configs, architecture docs, .gitignore updates, and integrates the workspace into the monorepo dev script. Changes
Sequence DiagramsequenceDiagram
autonumber
actor Client
participant HTTP as HTTP Server (Public Facade)
participant Registry as Session Registry (In-Memory + File)
participant Durable as Durable Streams Server (Internal)
Client->>HTTP: POST /sessions (sessionId, title, createdBy?)
HTTP->>Registry: register(sessionInfo)
Registry->>Registry: add entry & persist to sessions.json
Registry-->>HTTP: SessionInfo (with createdAt)
HTTP-->>Client: 201 Created
Client->>HTTP: GET /sessions/:id
HTTP->>Registry: get(sessionId)
Registry-->>HTTP: SessionInfo | undefined
HTTP-->>Client: 200 OK or 404
Client->>HTTP: Any other request (stream operations)
HTTP->>Durable: proxy request (method, headers, body)
Durable-->>HTTP: proxied response/stream
HTTP-->>Client: proxied response
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~35 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
🧹 Preview Cleanup CompleteThe following preview resources have been cleaned up:
Thank you for your contribution! 🎉 |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In `@apps/streams/Dockerfile`:
- Around line 6-8: The Dockerfile currently runs npm install after copying
package.json and bun.lock (see the COPY package.json bun.lock ./ and RUN npm
install lines); replace both RUN npm install occurrences with RUN bun install
--frozen-lockfile so Bun is used and the bun.lock is honored for deterministic,
reproducible builds.
In `@apps/streams/package.json`:
- Around line 18-27: The apps/streams package.json lists "@superset/typescript":
"workspace:*" which will fail when building the Docker image from the
apps/streams context because the workspace root and tooling package aren’t
available; either replace "@superset/typescript":"workspace:*" with a published
semver version that npm can resolve during isolated npm install, or change the
Docker build to run from the repo root (or copy the workspace root package.json
and the tooling/typescript package into the build context before running npm
install) so the workspace reference can be resolved.
In `@apps/streams/src/index.ts`:
- Around line 166-175: The durableServer.start() promise and server.listen()
call lack error handling; add a .catch on durableServer.start() to log the error
(using console.error or your logger), perform graceful shutdown/exit, and ensure
server.listen has an 'error' handler (e.g., server.on('error', ...)) to handle
EADDRINUSE and other listen failures, logging the error and exiting; update
references in this block around durableServer.start and server.listen so
failures don't produce unhandled rejections or leave the process in an unclear
state.
- Around line 16-19: The code currently uses magic literals and doesn't validate
PORT/internalPort; extract constants (e.g., DEFAULT_PORT = 8080,
INTERNAL_PORT_OFFSET = 1, MAX_PORT = 65535, DEFAULT_DATA_DIR = "./data") at
module top, parse PORT into an integer for variable port (use Number.parseInt)
and explicitly check that parsing succeeded (NaN) and that port is within
1..MAX_PORT and that internalPort = port + INTERNAL_PORT_OFFSET <= MAX_PORT; if
validation fails, log a clear error and exit/throw to prevent startup with
invalid ports; update references to dataDir, port, and internalPort to use the
new constants and validated values.
- Around line 61-65: The request body is currently unbounded via readBody which
risks memory DoS; update the call in the POST handler to pass a max size (e.g.,
maxBytes) and change readBody signature to accept an options object (e.g.,
readBody({ req, maxBytes })) so it enforces the limit and throws/returns a
specific error when exceeded; catch that error in the POST branch and respond
with status 413 Payload Too Large. Reference the readBody function and the POST
handler where readBody is invoked to implement these changes.
In `@apps/streams/src/session-registry.ts`:
- Around line 61-92: The sync file-IO in persist() and load() blocks the event
loop; change them to async using fs/promises (replace
dirname/existsSync/mkdirSync/writeFileSync/renameSync/readFileSync with their
async counterparts) and serialize persist calls with a simple write queue (e.g.,
a private Promise chain or mutex field like pendingPersistPromise) so multiple
register() calls don't race; also extract hardcoded strings (tmp suffix, log
prefix like "[session-registry]") and this.filePath usage into module-level
constants and update load() to be async (or provide an async init that awaits
load()) so all file operations are non-blocking and crash-safe.
🧹 Nitpick comments (2)
apps/streams/src/index.ts (2)
61-83: Use Zod validation for the /sessions POST body.
JSON.parse+ presence checks won’t guard against incorrect types or unexpected shapes. A small Zod schema gives consistent validation and clearer errors.As per coding guidelines: Use Zod schemas for validating tRPC inputs and API route bodies at boundaries; Validate external API data as untrusted by handling missing fields, unknown enums, and unexpected shapes with tolerant parsing and explicit fallbacks.✅ Example with Zod
+import { z } from "zod"; + +const SessionCreateSchema = z.object({ + sessionId: z.string().min(1), + title: z.string().min(1), + createdBy: z.string().optional(), +}); ... - const { sessionId, title, createdBy } = JSON.parse(body); + const parsed = SessionCreateSchema.safeParse(JSON.parse(body)); + if (!parsed.success) { + res.writeHead(400, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: "Invalid request body" })); + return; + } + const { sessionId, title, createdBy } = parsed.data;
121-123: Use an options object and add a proxy timeout.Positional params violate the object‑param guideline and missing timeouts can hang requests indefinitely if the internal server stalls.
As per coding guidelines: Use object parameters for functions with 2 or more parameters instead of positional arguments.⏱️ Suggested update
+const PROXY_TIMEOUT_MS = 10_000; ... - proxyToDurableStreams(req, res, url); + proxyToDurableStreams({ req, res, url }); ... -function proxyToDurableStreams( - req: import("node:http").IncomingMessage, - res: import("node:http").ServerResponse, - url: URL, -) { +type ProxyArgs = { + req: import("node:http").IncomingMessage; + res: import("node:http").ServerResponse; + url: URL; +}; + +function proxyToDurableStreams({ req, res, url }: ProxyArgs) { const proxyReq = httpRequest( { hostname: "127.0.0.1", port: internalPort, path: url.pathname + url.search, method: req.method, headers: req.headers, }, (proxyRes) => { // Forward status and headers res.writeHead(proxyRes.statusCode || 500, proxyRes.headers); proxyRes.pipe(res); }, ); + proxyReq.setTimeout(PROXY_TIMEOUT_MS, () => { + proxyReq.destroy(new Error("Proxy timeout")); + });Also applies to: 134-164
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@apps/streams/ARCHITECTURE.md`:
- Around line 5-53: The markdown code fences in ARCHITECTURE.md lack language
identifiers and trigger markdownlint MD040; update every triple-backtick fenced
block (the ASCII diagrams/code blocks shown throughout the file) to include a
neutral language tag such as "text" (i.e., change ``` to ```text for each fenced
block including the diagram at the top and the other blocks referenced),
ensuring all occurrences (including the blocks around lines 57-95, 99-118,
122-139) are updated so lint passes while rendering remains unchanged.
🧹 Nitpick comments (2)
apps/streams/src/index.ts (2)
121-123: Use an object parameter for proxyToDurableStreams.This function takes 3 parameters; prefer a single options object to avoid positional ambiguity.
As per coding guidelines, Use object parameters for functions with 2 or more parameters instead of positional arguments.✅ Targeted refactor
- proxyToDurableStreams(req, res, url); + proxyToDurableStreams({ req, res, url }); ... -function proxyToDurableStreams( - req: import("node:http").IncomingMessage, - res: import("node:http").ServerResponse, - url: URL, -) { +function proxyToDurableStreams({ + req, + res, + url, +}: { + req: import("node:http").IncomingMessage; + res: import("node:http").ServerResponse; + url: URL; +}) {Also applies to: 134-138
9-15: Add Zod schema validation for /sessions POST payloads and include zod dependency.Manual validation at lines 67–73 only checks for truthiness and doesn't validate types or reject unexpected shapes. The /sessions endpoint is a boundary and should validate payloads with a Zod schema per coding guidelines.
First, add zod to apps/streams/package.json dependencies, then implement schema validation:
✅ Implementation
+import { z } from "zod"; + +const sessionPayloadSchema = z.object({ + sessionId: z.string().min(1), + title: z.string().min(1), + createdBy: z.string().optional(), +}); ... if (req.method === "POST") { // Register a new session try { const body = await readBody(req); - const { sessionId, title, createdBy } = JSON.parse(body); - - if (!sessionId || !title) { - res.writeHead(400, { "Content-Type": "application/json" }); - res.end( - JSON.stringify({ error: "sessionId and title are required" }), - ); - return; - } + const parsed = sessionPayloadSchema.safeParse(JSON.parse(body)); + if (!parsed.success) { + res.writeHead(400, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: "Invalid session payload" })); + return; + } + const { sessionId, title, createdBy } = parsed.data;
| ``` | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Clients (packages/ai-chat) │ | ||
| │ │ | ||
| │ ┌─────────────────────────┐ ┌───────────────────────┐ │ | ||
| │ │ @durable-streams/client │ │ @durable-streams/state│ │ | ||
| │ │ │ │ │ │ | ||
| │ │ DurableStream │ │ State Protocol │ │ | ||
| │ │ .create(url) │ │ │ │ │ | ||
| │ │ .append(data) │ │ ▼ TanStack DB │ │ | ||
| │ │ .read(offset?) │ │ Reactive Collections │ │ | ||
| │ │ .subscribe(cb) │ │ │ │ | ||
| │ └────────────┬────────────┘ └───────────┬───────────┘ │ | ||
| └───────────────┼───────────────────────────┼─────────────┘ | ||
| │ HTTP + SSE │ HTTP + SSE | ||
| ▼ ▼ | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Server (apps/streams) │ | ||
| │ │ | ||
| │ ┌───────────────────────────────────────────────────┐ │ | ||
| │ │ @durable-streams/server │ │ | ||
| │ │ │ │ | ||
| │ │ DurableStreamTestServer (port, host, dataDir) │ │ | ||
| │ │ │ │ | ||
| │ │ ┌─────────────────────────────────────────────┐ │ │ | ||
| │ │ │ HTTP Protocol │ │ │ | ||
| │ │ │ │ │ │ | ||
| │ │ │ PUT /streams/:id ─ Create stream │ │ │ | ||
| │ │ │ POST /streams/:id ─ Append data │ │ │ | ||
| │ │ │ GET /streams/:id ─ Read / SSE │ │ │ | ||
| │ │ │ HEAD /streams/:id ─ Metadata │ │ │ | ||
| │ │ │ DELETE /streams/:id ─ Delete │ │ │ | ||
| │ │ └──────────────────┬──────────────────────────┘ │ │ | ||
| │ │ │ │ │ | ||
| │ │ ▼ │ │ | ||
| │ │ FileBackedStreamStore │ │ | ||
| │ └─────────────────────┬─────────────────────────────┘ │ | ||
| └────────────────────────┼────────────────────────────────┘ | ||
| ┌────┴─────┐ | ||
| ▼ ▼ | ||
| ┌─────────────────────────────────────────────────────────┐ | ||
| │ Storage (./data) │ | ||
| │ │ | ||
| │ ┌──────────────────┐ ┌────────────────────────────┐ │ | ||
| │ │ LMDB │ │ Append-Only Logs │ │ | ||
| │ │ Metadata Index │ │ Stream Data │ │ | ||
| │ └──────────────────┘ └────────────────────────────┘ │ | ||
| └─────────────────────────────────────────────────────────┘ | ||
| ``` |
There was a problem hiding this comment.
Add language identifiers to fenced code blocks to satisfy markdownlint.
markdownlint MD040 is triggered because the fences don’t specify a language. Using a neutral language like text keeps rendering the same while passing lint.
✅ Minimal fix
-```
+```text
...
-```
+```text
...
-```
+```text
...
-```
+```textAlso applies to: 57-95, 99-118, 122-139
🧰 Tools
🪛 markdownlint-cli2 (0.20.0)
[warning] 5-5: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🤖 Prompt for AI Agents
In `@apps/streams/ARCHITECTURE.md` around lines 5 - 53, The markdown code fences
in ARCHITECTURE.md lack language identifiers and trigger markdownlint MD040;
update every triple-backtick fenced block (the ASCII diagrams/code blocks shown
throughout the file) to include a neutral language tag such as "text" (i.e.,
change ``` to ```text for each fenced block including the diagram at the top and
the other blocks referenced), ensuring all occurrences (including the blocks
around lines 57-95, 99-118, 122-139) are updated so lint passes while rendering
remains unchanged.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@apps/streams/src/index.ts`:
- Around line 83-96: The handler that reads body via readBody must enforce that
sessionId and title are strings before calling registry.register to avoid
non-string Map keys; validate typeof sessionId === "string" and typeof title ===
"string" (and reject with 400 JSON like you already do) and sanitize createdBy
by allowing only a string (trim it or set undefined if missing/non-string) prior
to calling registry.register; update the block around readBody/JSON.parse and
the call to registry.register so only validated/coerced string values are passed
to registry.register, keeping the same response patterns (res.writeHead/res.end)
for errors and success.
- Around line 50-51: The code calls new URL(req.url || "/",
`http://${req.headers.host}`) which throws if req.headers.host is
missing/malformed; fix by deriving a safeHost (e.g., check typeof
req.headers.host === "string" && non-empty, else use a default like "localhost")
and then create the URL inside a try/catch that falls back to new URL(req.url ||
"/", "http://localhost") on error; update the variable construction where `url`
is defined in apps/streams/src/index.ts to use safeHost and the try/catch
fallback so request handling cannot crash due to a bad Host header.
🧹 Nitpick comments (1)
apps/streams/src/index.ts (1)
145-147: Use an options object for the proxy helper.This keeps the helper consistent with the codebase guideline for 2+ params and makes call sites clearer.
♻️ Proposed refactor
- proxyToDurableStreams(req, res, url); + proxyToDurableStreams({ req, res, url });-function proxyToDurableStreams( - req: import("node:http").IncomingMessage, - res: import("node:http").ServerResponse, - url: URL, -) { +function proxyToDurableStreams({ + req, + res, + url, +}: { + req: import("node:http").IncomingMessage; + res: import("node:http").ServerResponse; + url: URL; +}) {As per coding guidelines: Use object parameters for functions with 2 or more parameters instead of positional arguments.
Also applies to: 201-205
| const url = new URL(req.url || "/", `http://${req.headers.host}`); | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check if the file exists and read the relevant section
head -60 apps/streams/src/index.ts | tail -20Repository: superset-sh/superset
Length of output: 687
🏁 Script executed:
# Get more context around lines 50-51
sed -n '40,60p' apps/streams/src/index.tsRepository: superset-sh/superset
Length of output: 688
🏁 Script executed:
# Check the entire file to understand the request object type
cat -n apps/streams/src/index.ts | head -70Repository: superset-sh/superset
Length of output: 2634
🏁 Script executed:
# Search for uses of the url variable after its creation
sed -n '50,150p' apps/streams/src/index.tsRepository: superset-sh/superset
Length of output: 3043
🏁 Script executed:
# Check if there are any tests or error handling examples
rg -i "host.*header|new URL" apps/streams/src/ -A 2 -B 2Repository: superset-sh/superset
Length of output: 381
🏁 Script executed:
# Look at the entire request handler to understand context
sed -n '48,100p' apps/streams/src/index.tsRepository: superset-sh/superset
Length of output: 1709
🏁 Script executed:
# Search for the proxyToDurableStreams function definition
rg -n "proxyToDurableStreams" apps/streams/src/index.ts -A 15Repository: superset-sh/superset
Length of output: 951
🏁 Script executed:
# Also check imports and the full file structure
grep -n "proxyToDurableStreams\|function\|const.*=" apps/streams/src/index.ts | head -30Repository: superset-sh/superset
Length of output: 1509
Guard against missing/invalid Host header when building the URL.
new URL() will throw if req.headers.host is absent or malformed, which can crash request handling for non-standard clients. A safe fallback keeps the handler resilient.
🔧 Proposed fix
- const url = new URL(req.url || "/", `http://${req.headers.host}`);
+ const host = req.headers.host ?? "127.0.0.1";
+ const url = new URL(req.url || "/", `http://${host}`);🤖 Prompt for AI Agents
In `@apps/streams/src/index.ts` around lines 50 - 51, The code calls new
URL(req.url || "/", `http://${req.headers.host}`) which throws if
req.headers.host is missing/malformed; fix by deriving a safeHost (e.g., check
typeof req.headers.host === "string" && non-empty, else use a default like
"localhost") and then create the URL inside a try/catch that falls back to new
URL(req.url || "/", "http://localhost") on error; update the variable
construction where `url` is defined in apps/streams/src/index.ts to use safeHost
and the try/catch fallback so request handling cannot crash due to a bad Host
header.
| const body = await readBody({ req, maxBytes: MAX_BODY_BYTES }); | ||
| const { sessionId, title, createdBy } = JSON.parse(body); | ||
|
|
||
| if (!sessionId || !title) { | ||
| res.writeHead(400, { "Content-Type": "application/json" }); | ||
| res.end( | ||
| JSON.stringify({ error: "sessionId and title are required" }), | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| const session = registry.register({ sessionId, title, createdBy }); | ||
| res.writeHead(201, { "Content-Type": "application/json" }); | ||
| res.end(JSON.stringify(session)); |
There was a problem hiding this comment.
Validate sessionId/title types before registering.
Non‑string values can be stored as Map keys and will never be retrievable via /sessions/:id (string), causing false 404s. Enforce string types (and sanitize optional createdBy) before registration.
🔧 Proposed fix
- const { sessionId, title, createdBy } = JSON.parse(body);
+ const { sessionId, title, createdBy } = JSON.parse(body);
- if (!sessionId || !title) {
+ if (
+ typeof sessionId !== "string" ||
+ typeof title !== "string" ||
+ !sessionId.trim() ||
+ !title.trim()
+ ) {
res.writeHead(400, { "Content-Type": "application/json" });
res.end(
JSON.stringify({ error: "sessionId and title are required" }),
);
return;
}
- const session = registry.register({ sessionId, title, createdBy });
+ const session = registry.register({
+ sessionId,
+ title,
+ createdBy: typeof createdBy === "string" ? createdBy : undefined,
+ });As per coding guidelines: Validate external API data as untrusted by handling missing fields, unknown enums, and unexpected shapes with tolerant parsing and explicit fallbacks.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const body = await readBody({ req, maxBytes: MAX_BODY_BYTES }); | |
| const { sessionId, title, createdBy } = JSON.parse(body); | |
| if (!sessionId || !title) { | |
| res.writeHead(400, { "Content-Type": "application/json" }); | |
| res.end( | |
| JSON.stringify({ error: "sessionId and title are required" }), | |
| ); | |
| return; | |
| } | |
| const session = registry.register({ sessionId, title, createdBy }); | |
| res.writeHead(201, { "Content-Type": "application/json" }); | |
| res.end(JSON.stringify(session)); | |
| const body = await readBody({ req, maxBytes: MAX_BODY_BYTES }); | |
| const { sessionId, title, createdBy } = JSON.parse(body); | |
| if ( | |
| typeof sessionId !== "string" || | |
| typeof title !== "string" || | |
| !sessionId.trim() || | |
| !title.trim() | |
| ) { | |
| res.writeHead(400, { "Content-Type": "application/json" }); | |
| res.end( | |
| JSON.stringify({ error: "sessionId and title are required" }), | |
| ); | |
| return; | |
| } | |
| const session = registry.register({ | |
| sessionId, | |
| title, | |
| createdBy: typeof createdBy === "string" ? createdBy : undefined, | |
| }); | |
| res.writeHead(201, { "Content-Type": "application/json" }); | |
| res.end(JSON.stringify(session)); |
🤖 Prompt for AI Agents
In `@apps/streams/src/index.ts` around lines 83 - 96, The handler that reads body
via readBody must enforce that sessionId and title are strings before calling
registry.register to avoid non-string Map keys; validate typeof sessionId ===
"string" and typeof title === "string" (and reject with 400 JSON like you
already do) and sanitize createdBy by allowing only a string (trim it or set
undefined if missing/non-string) prior to calling registry.register; update the
block around readBody/JSON.parse and the call to registry.register so only
validated/coerced string values are passed to registry.register, keeping the
same response patterns (res.writeHead/res.end) for errors and success.
Summary
apps/streams— Hono HTTP server for AI chat streaming@superset/streamsto root dev scriptContext
Split from
ai-chatbranch. Independent — can merge in parallel with PR #1.Test plan
bun devstarts the streams serverSummary by CodeRabbit
New Features
Documentation
Chores