Skip to content

feat(streams): vendor durable-session-proxy into apps/streams#1242

Merged
Kitenite merged 4 commits into
mainfrom
kitenite/ai-chat-ui
Feb 6, 2026
Merged

feat(streams): vendor durable-session-proxy into apps/streams#1242
Kitenite merged 4 commits into
mainfrom
kitenite/ai-chat-ui

Conversation

@Kitenite
Copy link
Copy Markdown
Collaborator

@Kitenite Kitenite commented Feb 5, 2026

Summary

  • Vendors the Hono-based proxy server from electric-sql/transport durable-session-proxy into apps/streams/src/, replacing the old custom HTTP proxy + session registry
  • The proxy handles session management, agent webhook invocation, message routing, stream fan-out (SSE), tool results, approvals, and session forking via the STATE-PROTOCOL on top of Durable Streams
  • Adds compatibility fix for DurableStream.append() which only accepts string | Uint8Array in the published npm version (source repo passes ChangeEvent objects directly via workspace:*)

Changes

  • 17 new files vendored from durable-session-proxy with @electric-sql/durable-session@superset/durable-session import changes
  • Replaced apps/streams/src/index.ts — new Hono entrypoint with DurableStreamTestServer on internal port
  • Deleted apps/streams/src/session-registry.ts — replaced by proxy's built-in session management
  • Updated apps/streams/package.json — added hono, @hono/node-server, @durable-streams/client, @superset/durable-session, @tanstack/db, zod
  • Fixed packages/durable-session/src/client.tsForkResult type assertion on response.json()

Phase A2 of docs/ai-chat-plan.md

Test plan

  • cd apps/streams && bunx tsc --noEmit — 0 errors
  • bun run lint:fix — 0 errors, 3 warnings (non-null assertions from upstream patterns)
  • bun dev in apps/streams starts Hono proxy on :8080 + DurableStreamTestServer on :8081
  • curl http://localhost:8080/health returns { "status": "ok" }
  • curl -X PUT http://localhost:8080/v1/sessions/test-1 creates session

Summary by CodeRabbit

  • New Features

    • New comprehensive session API: agent management, auth (login/logout), messaging, tool results, approvals, forking, streaming proxy, and health endpoints.
  • Refactor

    • Rewrote server wiring to a Hono-based proxy with a reusable server factory and centralized session/protocol layer.
  • Removed

    • Removed the local in-memory session registry and file-based persistence.
  • Chores

    • Updated package/config and type surfaces to align with the new proxy and public durable-session package.

Vendor the Hono-based proxy server from electric-sql/transport into
apps/streams/src/ replacing the old custom HTTP proxy + session registry.

The proxy handles session management, agent webhook invocation, message
routing, stream fan-out, tool results, approvals, and session forking
via the STATE-PROTOCOL on top of Durable Streams.

Key changes:
- Copy 17 files from durable-session-proxy with @electric-sql@superset imports
- Replace index.ts entrypoint with Hono proxy + DurableStreamTestServer
- Delete old session-registry.ts
- Add hono, @hono/node-server, @durable-streams/client, @tanstack/db, zod deps
- Fix DurableStream.append() compatibility (JSON.stringify for ChangeEvent objects)
- Fix ForkResult type assertion in vendored client
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 5, 2026

Caution

Review failed

The pull request is closed.

📝 Walkthrough

Walkthrough

Refactors apps/streams into a Hono-based proxy and server factory, adds AIDBSessionProtocol and typed schemas, introduces route and handler modules for sessions/messages/agents/auth/approvals/fork/tool-results/health/stream, removes the file-based SessionRegistry, and updates dependencies and package imports for durable-session and related libs.

Changes

Cohort / File(s) Summary
Package configuration
apps/streams/package.json
Expanded runtime deps (added @durable-streams/client, @superset/durable-session, @tanstack/db, hono, @hono/node-server, zod) and removed @durable-streams/client from devDependencies.
Entry & bootstrap
apps/streams/src/index.ts, apps/streams/src/server.ts
Added Hono-based server factory (createServer) and new public entrypoint wiring an internal DurableStreamTestServer + Hono proxy; replaced prior custom HTTP routing.
Protocol & types
apps/streams/src/protocol.ts, apps/streams/src/types.ts
New AIDBSessionProtocol implementing session lifecycle, DurableStream integration, agent invocation/streaming, active generation control, forking, message history; comprehensive Zod-backed types/schemas.
Handlers
apps/streams/src/handlers/...
apps/streams/src/handlers/index.ts
New handlers: handleSendMessage, handleInvokeAgent, register/unregister flows, StreamWriter utility and a re-export index for handlers/utilities.
Routes
apps/streams/src/routes/*, apps/streams/src/routes/index.ts
Added route factories wired to protocol: sessions, messages, agents, approvals, auth, fork, health, stream (proxy), tool-results; consolidated exports.
Removed registry
apps/streams/src/session-registry.ts
Deleted on-disk/in-memory SessionRegistry and SessionInfo types; persistence and registry APIs removed.
Client & docs
packages/durable-session/src/client.ts, docs/ai-chat-plan.md
Client: explicit cast of fork response to ForkResult. Docs: updated plan noting vendoring and import-path changes to @superset/durable-session.

Sequence Diagram

sequenceDiagram
    actor Client
    participant Hono as "Hono Server"
    participant Handler as "Route Handler"
    participant Protocol as "AIDBSessionProtocol"
    participant Stream as "DurableStream"
    participant Agent as "External Agent"

    Client->>Hono: POST /v1/sessions/:id/messages
    Hono->>Handler: handleSendMessage(context)
    Handler->>Protocol: getOrCreateSession(sessionId)
    Protocol->>Stream: create/retrieve stream
    Handler->>Protocol: writeUserMessage(message)
    Protocol->>Stream: append(chunk)
    alt agent present
        Handler->>Protocol: invokeAgent(sessionId, messages)
        Protocol->>Agent: HTTP POST (agent endpoint)
        Agent-->>Protocol: streaming response (SSE-like)
        loop per chunk
            Protocol->>Stream: writeChunk(parsed chunk)
        end
    end
    Handler-->>Client: 200 { messageId }
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰
I hopped into Hono, swift and spry,
Streams of chunks beneath the sky.
Agents wake and messages play,
Sessions fork and bounce away.
A carrot cheer — code hops today! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(streams): vendor durable-session-proxy into apps/streams' accurately and concisely describes the main objective of this PR—vendoring an external proxy into the codebase.
Description check ✅ Passed The PR description provides a clear summary of changes, lists affected files, and includes a test plan with concrete verification steps. However, it lacks explicit 'Related Issues' and 'Screenshots' sections from the template, though these are optional contextual elements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch kitenite/ai-chat-ui

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Feb 5, 2026

🧹 Preview Cleanup Complete

The following preview resources have been cleaned up:

  • ✅ Neon database branch
  • ⚠️ Electric Fly.io app

Thank you for your contribution! 🎉

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🤖 Fix all issues with AI agents
In `@apps/streams/package.json`:
- Around line 15-26: The package.json currently lists "@hono/node-server" under
devDependencies but it's imported by production code (see src/index.ts) and
required at runtime; move "@hono/node-server": "^1.13.0" from devDependencies
into dependencies, removing it from devDependencies so it is installed for
production builds and runtime. Ensure the dependency appears alongside the other
runtime packages in the "dependencies" object and remove the duplicate entry
from "devDependencies".

In `@apps/streams/src/handlers/invoke-agent.ts`:
- Around line 81-100: handleUnregisterAgent currently calls
protocol.unregisterAgent without validating the session; mirror the pattern used
in handleInvokeAgent and handleRegisterAgents by checking the session first.
Call getOrCreateSession (or the appropriate session lookup helper) with the same
sessionId extraction used in handleUnregisterAgent to ensure the session exists,
return an appropriate error response (e.g., 404 or a JSON error) if the session
is missing, and only then call protocol.unregisterAgent(sessionId, agentId);
also log the missing-session case so it isn’t silently swallowed.

In `@apps/streams/src/handlers/send-message.ts`:
- Around line 43-46: The current fire-and-forget call to
protocol.invokeAgent(stream, sessionId, body.agent, messageHistory) can produce
unhandled promise rejections; update the send-message handler to handle errors
by either awaiting the call inside a try/catch (e.g., await
protocol.invokeAgent(...) with catch and processLogger.error) or keep it
non-blocking but append .catch(err => processLogger.error(...)) so failures are
logged; reference protocol.getMessageHistory and protocol.invokeAgent (and
variables stream, sessionId, body.agent) when you modify the code.

In `@apps/streams/src/protocol.ts`:
- Around line 110-133: The _clearPresence parameter is unused in resetSession;
either remove it or implement presence-clearing when true: if the codebase
tracks presence (e.g. a this.presences Map or presence info inside sessionStates
entries), clear that session's presence state when _clearPresence is true, and
also emit/append a control presence-clear event to the stream (similar to the
existing resetEvent) so other consumers know presence was removed; otherwise,
delete the unused _clearPresence parameter from the resetSession signature and
all callers. Update references to resetSession, sessionStates, streams,
messageSeqs, and updateLastActivity accordingly.
- Around line 499-511: The code currently swallows JSON.parse errors when
parsing StreamChunk in the try block before calling writeChunk (parsing into
StreamChunk and invoking this.writeChunk), which hides malformed agent
responses; modify the catch to log the parsing error (include the raw data and
the error message/stack) via the same logger used in this module (or
console.error if none) so debugging is possible, and apply the identical change
to the second identical catch that handles JSON.parse failures later (the other
catch following the JSON.parse/this.writeChunk sequence) so both malformed-JSON
cases are logged with context including sessionId, messageId, agentId and the
offending data.

In `@apps/streams/src/routes/agents.ts`:
- Around line 18-26: The catch block for protocol.getRegisteredAgents(sessionId)
currently logs errors without the required context prefix; update the
console.error call in the catch to include the “[streams/agents]” prefix (e.g.
console.error("[streams/agents] Failed to get agents:", error)) so logs follow
the consistent "[domain/operation]" pattern used across the codebase; ensure the
change is made in the catch handling around protocol.getRegisteredAgents and
keeping the existing c.json error response intact.

In `@apps/streams/src/routes/auth.ts`:
- Around line 11-71: Replace the ad-hoc body handling in the login and logout
handlers with Zod validation: add loginRequestSchema and logoutRequestSchema in
types.ts matching the expected shapes (login: { actorId: string, deviceId:
string, name?: string, defaultAgents?: AgentSpec[] }; logout: { actorId: string,
deviceId?: string, allDevices?: boolean }) and import them into the auth route;
in the POST "/:sessionId/login" handler call loginRequestSchema.parse(await
c.req.json()) and use the returned object instead of the current `as` cast
before calling protocol.getOrCreateSession and protocol.writePresence, and in
the POST "/:sessionId/logout" handler call logoutRequestSchema.parse(await
c.req.json()) instead of manual JSON.parse/rawBody, returning a 400 on ZodError
(or letting middleware map it) so invalid payloads are rejected with proper
validation errors.

In `@apps/streams/src/routes/messages.ts`:
- Around line 16-38: The route validates regenerateRequestSchema which includes
fromMessageId but never uses it; update the handler (around regenerate logic
using regenerateRequestSchema, sessionId, protocol.getOrCreateSession,
protocol.getRegisteredAgents, and protocol.invokeAgent) to, when
body.fromMessageId is present, fetch the original message and its preceding
context (e.g., via a protocol method like getMessageById/getMessage or
getMessageHistory for sessionId and fromMessageId), prepend that returned
context messages into messageHistory before pushing the new user content, and
then call protocol.invokeAgent with the composed messageHistory; if no such
protocol method exists, either implement one or remove fromMessageId from
regenerateRequestSchema and adjust validations accordingly.
- Around line 16-67: Split request validation and business logic error handling
in the POST "/:sessionId/regenerate" and "/:sessionId/stop" handlers: wrap
regenerateRequestSchema.parse(rawBody) and
stopGenerationRequestSchema.parse(rawBody) each in their own try-catch that
returns a 400 with validation details, and keep the remaining logic
(protocol.getOrCreateSession, protocol.getRegisteredAgents,
protocol.invokeAgent, protocol.stopGeneration) in a separate try-catch that
returns 500 on failure; also ensure regenerateRequestSchema's required field
fromMessageId is either used to build messageHistory (include prior messages
starting from fromMessageId) or the schema is adjusted if intentional, and
change all console.error calls in these handlers to prefix messages with
"[streams/messages]" (e.g., "[streams/messages] Failed to regenerate: ...") to
follow logging convention.

In `@apps/streams/src/routes/tool-results.ts`:
- Around line 11-39: The handler currently treats all errors as 500 and logs
without the required prefix; update the error handling around
toolResultRequestSchema.parse in the try/catch so that if the caught error is a
ZodError (from toolResultRequestSchema.parse) you return status 400 with a JSON
body indicating invalid input, otherwise return 500 for server errors; also
prepend “[streams/tool-results]” to the console.error message for all logged
errors (the catch around await protocol.writeToolResult /
toolResultRequestSchema.parse should check error instanceof ZodError and branch
accordingly).
🧹 Nitpick comments (18)
apps/streams/src/routes/health.ts (1)

13-25: Consider implementing actual health checks for readiness and liveness probes.

The current /ready and /live endpoints return hardcoded responses without checking actual system health. For production deployments with Kubernetes-style orchestration:

  • /ready should verify the service can handle requests (e.g., protocol initialized, backend reachable)
  • /live should verify the process is not deadlocked

The current implementation works but provides limited observability value.

💡 Example with dependency injection for readiness check
-export function createHealthRoutes() {
+export function createHealthRoutes(options?: { 
+	isReady?: () => boolean 
+}) {
 	const app = new Hono();

 	app.get("/", (c) => {
 		return c.json({
 			status: "ok",
 			timestamp: new Date().toISOString(),
 		});
 	});

 	app.get("/ready", (c) => {
+		const ready = options?.isReady?.() ?? true;
 		return c.json({
-			status: "ready",
+			status: ready ? "ready" : "not_ready",
 			timestamp: new Date().toISOString(),
-		});
+		}, ready ? 200 : 503);
 	});
apps/streams/src/handlers/stream-writer.ts (1)

14-28: Consider using object parameters for better clarity and maintainability.

Methods with 4+ positional parameters are prone to argument ordering errors. Per coding guidelines, functions with 2+ parameters should use object parameters.

♻️ Proposed refactor using object parameters
+interface WriteUserMessageParams {
+	messageId: string;
+	actorId: string;
+	content: string;
+	txid?: string;
+}
+
 async writeUserMessage(
-	messageId: string,
-	actorId: string,
-	content: string,
-	txid?: string,
+	params: WriteUserMessageParams,
 ): Promise<void> {
+	const { messageId, actorId, content, txid } = params;
 	await this.protocol.writeUserMessage(
 		this.stream,
 		this.sessionId,
 		messageId,
 		actorId,
 		content,
 		txid,
 	);
 }

Apply similar refactoring to writeChunk, writeToolResult, and writeApprovalResponse.

As per coding guidelines: "Use object parameters for functions with 2 or more parameters instead of positional arguments"

apps/streams/src/handlers/invoke-agent.ts (2)

28-33: Improve error handling to safely extract error details.

The cast (error as Error).message assumes the caught value is an Error instance. For robustness, handle unknown error types safely.

♻️ Safer error extraction pattern
+function getErrorMessage(error: unknown): string {
+	if (error instanceof Error) return error.message;
+	return String(error);
+}
+
 } catch (error) {
 	return c.json(
-		{ error: "Invalid request body", details: (error as Error).message },
+		{ error: "Invalid request body", details: getErrorMessage(error) },
 		400,
 	);
 }

Apply the same pattern to all catch blocks in lines 39-45, 72-78, and 91-99.


39-45: Use prefixed console logging with consistent context pattern.

Per coding guidelines, use [domain/operation] message format for logging.

♻️ Suggested logging format
 } catch (error) {
-	console.error("Failed to invoke agent:", error);
+	console.error("[handlers/invoke-agent] Failed to invoke agent:", error);
 	return c.json(

As per coding guidelines: "Use prefixed console logging with consistent context pattern: [domain/operation] message"

apps/streams/src/protocol.ts (2)

595-600: Fire-and-forget agent invocation loses error context.

The .catch() handler logs errors but doesn't track which agents failed or provide any retry mechanism. For reliability, consider tracking invocation failures per agent.

♻️ Suggested improvement for better observability
 if (shouldTrigger) {
-	this.invokeAgent(stream, sessionId, agent, messageHistory).catch(
-		(err) => {
-			console.error(`Failed to invoke agent ${agent.id}:`, err);
-		},
-	);
+	this.invokeAgent(stream, sessionId, agent, messageHistory).catch(
+		(err) => {
+			console.error(
+				`[Protocol/notifyRegisteredAgents] Failed to invoke agent ${agent.id} for session ${sessionId}:`,
+				err,
+			);
+			// Consider: emit event, increment metric, or store failure state
+		},
+	);
 }

732-736: TODO indicates incomplete fork functionality.

The fork operation doesn't copy stream data to the new session as noted by the TODO. This means forked sessions start empty rather than inheriting message history.

Would you like me to help implement the stream data copying logic, or should this be tracked as a separate issue?

apps/streams/src/routes/sessions.ts (2)

21-21: Prefix console logs with domain/operation context.

The console.error calls lack the [domain/operation] prefix pattern recommended by coding guidelines. This makes log aggregation and filtering harder in production.

Suggested improvement
-			console.error("Failed to create session:", error);
+			console.error("[sessions/create] Failed to create session:", error);
-			console.error("Failed to get session:", error);
+			console.error("[sessions/get] Failed to get session:", error);
-			console.error("Failed to delete session:", error);
+			console.error("[sessions/delete] Failed to delete session:", error);
-			console.error("Failed to reset session:", error);
+			console.error("[sessions/reset] Failed to reset session:", error);

As per coding guidelines: "Use prefixed console logging with consistent context pattern: [domain/operation] message"

Also applies to: 47-47, 62-62, 93-93


73-103: Use Zod schema for reset endpoint body validation.

The reset endpoint manually parses JSON and extracts clearPresence (lines 78-83) without schema validation. Since types.ts already defines Zod schemas for other request types, consider adding one for reset to maintain consistency and get proper validation.

Additionally, detecting "session not found" via string matching on error messages (line 95) is fragile and could break if the upstream error message changes.

Suggested improvement
+import { z } from "zod";
+
+const resetSessionRequestSchema = z.object({
+	clearPresence: z.boolean().optional().default(false),
+});
+
 app.post("/:sessionId/reset", async (c) => {
   const sessionId = c.req.param("sessionId");

   try {
-    let clearPresence = false;
-    try {
-      const body = await c.req.json();
-      clearPresence = body?.clearPresence === true;
-    } catch {
-      // No body or invalid JSON - use defaults
-    }
+    const rawBody = await c.req.json().catch(() => ({}));
+    const { clearPresence } = resetSessionRequestSchema.parse(rawBody);

     await protocol.resetSession(sessionId, clearPresence);

As per coding guidelines: "Use Zod schemas for validating tRPC inputs and API route bodies at boundaries"

apps/streams/src/types.ts (1)

172-175: StreamChunk type is loosely defined.

The StreamChunk interface uses an index signature [key: string]: unknown which provides minimal type safety. Since this represents TanStack AI-compatible chunks, consider importing or defining the specific chunk types for better type checking.

apps/streams/src/routes/fork.ts (2)

27-32: Add domain prefix to error logging and consider distinguishing error types.

The error logging lacks the [domain/operation] prefix. Additionally, all errors return 500, but a "session not found" case might warrant a 404 response for better client handling.

Suggested improvement
 		} catch (error) {
-			console.error("Failed to fork session:", error);
+			console.error("[fork/create] Failed to fork session:", error);
+
+			if ((error as Error).message?.includes("not found")) {
+				return c.json({ error: "Session not found" }, 404);
+			}
+
 			return c.json(
 				{ error: "Failed to fork session", details: (error as Error).message },
 				500,
 			);
 		}

As per coding guidelines: "Use prefixed console logging with consistent context pattern: [domain/operation] message"


11-19: Zod validation catches parse errors but returns generic 500.

When forkSessionRequestSchema.parse(rawBody) throws a ZodError for invalid input, the catch block returns a 500 error. Consider catching Zod validation errors separately to return a 400 Bad Request with validation details.

Suggested improvement
+import { z } from "zod";
+
 app.post("/:sessionId/fork", async (c) => {
   const sessionId = c.req.param("sessionId");

+  let body;
   try {
     const rawBody = await c.req.json();
-    const body = forkSessionRequestSchema.parse(rawBody);
+    body = forkSessionRequestSchema.parse(rawBody);
+  } catch (error) {
+    if (error instanceof z.ZodError) {
+      return c.json({ error: "Invalid request body", details: error.errors }, 400);
+    }
+    return c.json({ error: "Invalid JSON" }, 400);
+  }

+  try {
     const result = await protocol.forkSession(
apps/streams/src/handlers/send-message.ts (2)

51-51: Add domain prefix to error logging.

Suggested improvement
-		console.error("Failed to send message:", error);
+		console.error("[messages/send] Failed to send message:", error);

As per coding guidelines: "Use prefixed console logging with consistent context pattern: [domain/operation] message"


34-41: Consider using object parameter for writeUserMessage.

The protocol.writeUserMessage() call uses 6 positional arguments which can be error-prone and hard to read. The coding guidelines recommend object parameters for functions with 2 or more parameters.

This is vendored code, so changing the protocol interface may be out of scope for this PR.

apps/streams/src/routes/approvals.ts (2)

12-14: Separate validation errors from runtime errors.

Similar to other routes, Zod validation errors from approvalResponseRequestSchema.parse() will be caught and returned as 500 errors. Validation failures should return 400 Bad Request.

Suggested improvement
+import { z } from "zod";
+
 app.post("/:sessionId/approvals/:approvalId", async (c) => {
   const sessionId = c.req.param("sessionId");
   const approvalId = c.req.param("approvalId");

+  let body;
   try {
     const rawBody = await c.req.json();
-    const body = approvalResponseRequestSchema.parse(rawBody);
+    body = approvalResponseRequestSchema.parse(rawBody);
+  } catch (error) {
+    if (error instanceof z.ZodError) {
+      return c.json({ error: "Invalid request body", details: error.errors }, 400);
+    }
+    return c.json({ error: "Invalid JSON" }, 400);
+  }

+  try {
     const actorId = c.req.header("X-Actor-Id") ?? crypto.randomUUID();

Also applies to: 30-38


31-31: Add domain prefix to error logging.

Suggested improvement
-			console.error("Failed to respond to approval:", error);
+			console.error("[approvals/respond] Failed to respond to approval:", error);

As per coding guidelines: "Use prefixed console logging with consistent context pattern: [domain/operation] message"

apps/streams/src/routes/stream.ts (3)

20-25: Remove unused _HEADERS_TO_STRIP constant.

This constant is defined but never used in the file. Even with the underscore prefix convention for intentionally unused variables, dead code should be removed for clarity.

Suggested fix
-const _HEADERS_TO_STRIP = [
-	"content-encoding",
-	"content-length",
-	"transfer-encoding",
-	"connection",
-] as const;

70-71: Status type cast may not cover all upstream status codes.

The cast upstreamResponse.status as 400 | 500 is incomplete. The upstream server could return other 4xx or 5xx status codes (401, 403, 502, 503, etc.). Consider using a more permissive type or explicitly mapping known statuses.

Suggested improvement
 				return c.json(
 					{
 						error: "Upstream error",
 						status: upstreamResponse.status,
 						details: errorText,
 					},
-					upstreamResponse.status as 400 | 500,
+					upstreamResponse.status as Parameters<typeof c.json>[1],
 				);

104-104: Add domain prefix to error logging.

Suggested improvement
-			console.error("Stream proxy error:", error);
+			console.error("[stream/proxy] Stream proxy error:", error);

As per coding guidelines: "Use prefixed console logging with consistent context pattern: [domain/operation] message"

Comment thread apps/streams/package.json
Comment on lines +81 to +100
export async function handleUnregisterAgent(
c: Context,
protocol: AIDBSessionProtocol,
): Promise<Response> {
const sessionId = c.req.param("sessionId");
const agentId = c.req.param("agentId");

try {
await protocol.unregisterAgent(sessionId, agentId);
return new Response(null, { status: 204 });
} catch (error) {
console.error("Failed to unregister agent:", error);
return c.json(
{
error: "Failed to unregister agent",
details: (error as Error).message,
},
500,
);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find unregisterAgent implementation in protocol
find . -name "*.ts" -type f | xargs grep -l "unregisterAgent" | head -10

Repository: superset-sh/superset

Length of output: 282


🏁 Script executed:

#!/bin/bash
# Look for the invoke-agent.ts file and examine the surrounding context
fd "invoke-agent.ts" --type f

Repository: superset-sh/superset

Length of output: 105


🏁 Script executed:

#!/bin/bash
# Search for getOrCreateSession usage in the handlers directory
rg "getOrCreateSession" --type ts

Repository: superset-sh/superset

Length of output: 910


🏁 Script executed:

#!/bin/bash
# Read the protocol.ts file to examine unregisterAgent and getOrCreateSession implementations
wc -l apps/streams/src/protocol.ts

Repository: superset-sh/superset

Length of output: 96


🏁 Script executed:

#!/bin/bash
# Read the invoke-agent.ts file to examine all handlers
wc -l apps/streams/src/handlers/invoke-agent.ts

Repository: superset-sh/superset

Length of output: 109


🏁 Script executed:

#!/bin/bash
# Read the entire invoke-agent.ts to see all handlers and patterns
cat -n apps/streams/src/handlers/invoke-agent.ts

Repository: superset-sh/superset

Length of output: 3266


🏁 Script executed:

#!/bin/bash
# Read the unregisterAgent and getOrCreateSession methods from protocol.ts
# First, let's find their line numbers
rg -n "async unregisterAgent|async getOrCreateSession" apps/streams/src/protocol.ts

Repository: superset-sh/superset

Length of output: 173


🏁 Script executed:

#!/bin/bash
# Read the unregisterAgent implementation from protocol.ts
sed -n '563,600p' apps/streams/src/protocol.ts

Repository: superset-sh/superset

Length of output: 1201


🏁 Script executed:

#!/bin/bash
# Also read getOrCreateSession to understand what it does
sed -n '82,120p' apps/streams/src/protocol.ts

Repository: superset-sh/superset

Length of output: 1091


Add session validation before unregistering agent to match the pattern in other handlers and avoid silent failures.

handleUnregisterAgent does not call getOrCreateSession first, unlike handleInvokeAgent (line 36) and handleRegisterAgents (line 69). The unregisterAgent implementation silently succeeds when the session doesn't exist—it returns without error or logging when the session state is not found. This violates the guideline to never swallow errors silently.

Either validate the session exists before unregistering (consistent with other handlers) or explicitly handle and log the missing session case.

🤖 Prompt for AI Agents
In `@apps/streams/src/handlers/invoke-agent.ts` around lines 81 - 100,
handleUnregisterAgent currently calls protocol.unregisterAgent without
validating the session; mirror the pattern used in handleInvokeAgent and
handleRegisterAgents by checking the session first. Call getOrCreateSession (or
the appropriate session lookup helper) with the same sessionId extraction used
in handleUnregisterAgent to ensure the session exists, return an appropriate
error response (e.g., 404 or a JSON error) if the session is missing, and only
then call protocol.unregisterAgent(sessionId, agentId); also log the
missing-session case so it isn’t silently swallowed.

Comment thread apps/streams/src/handlers/send-message.ts
Comment thread apps/streams/src/protocol.ts
Comment on lines +720 to +730
await this.createSession(targetSessionId);

const sourceState = this.sessionStates.get(sessionId);
if (sourceState) {
this.sessionStates.set(targetSessionId, {
...sourceState,
createdAt: new Date().toISOString(),
lastActivityAt: new Date().toISOString(),
activeGenerations: [],
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Session fork incorrectly shares mutable state references.

The forked session copies sourceState by spreading, but sessionDB, messages, and modelMessages are object references that will be shared between the original and forked sessions. Mutations in one session will corrupt the other.

🐛 Analysis of the bug
// Current code spreads sourceState, but these are references:
this.sessionStates.set(targetSessionId, {
  ...sourceState,  // sessionDB, messages, modelMessages are SHARED
  // ...
});

The forked session needs its own independent SessionDB and derived collections. Since createSession at line 720 already initializes fresh state for targetSessionId, this spread operation at line 724 actually overwrites the newly created state with shared references from the source.

🔧 Proposed fix
 async forkSession(
 	sessionId: string,
 	_atMessageId: string | null,
 	newSessionId: string | null,
 ): Promise<{ sessionId: string; offset: string }> {
 	const targetSessionId = newSessionId ?? crypto.randomUUID();

 	const sourceStream = this.streams.get(sessionId);
 	if (!sourceStream) {
 		throw new Error(`Session ${sessionId} not found`);
 	}

+	// createSession already initializes fresh sessionDB and collections
 	await this.createSession(targetSessionId);

-	const sourceState = this.sessionStates.get(sessionId);
-	if (sourceState) {
-		this.sessionStates.set(targetSessionId, {
-			...sourceState,
-			createdAt: new Date().toISOString(),
-			lastActivityAt: new Date().toISOString(),
-			activeGenerations: [],
-		});
-	}
+	// Copy agents from source to target (agents array is safe to copy)
+	const sourceState = this.sessionStates.get(sessionId);
+	const targetState = this.sessionStates.get(targetSessionId);
+	if (sourceState && targetState) {
+		targetState.agents = [...sourceState.agents];
+	}

 	// TODO: Copy stream data up to atMessageId
 	return {
 		sessionId: targetSessionId,
 		offset: "-1",
 	};
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await this.createSession(targetSessionId);
const sourceState = this.sessionStates.get(sessionId);
if (sourceState) {
this.sessionStates.set(targetSessionId, {
...sourceState,
createdAt: new Date().toISOString(),
lastActivityAt: new Date().toISOString(),
activeGenerations: [],
});
}
await this.createSession(targetSessionId);
// Copy agents from source to target (agents array is safe to copy)
const sourceState = this.sessionStates.get(sessionId);
const targetState = this.sessionStates.get(targetSessionId);
if (sourceState && targetState) {
targetState.agents = [...sourceState.agents];
}

Comment on lines +11 to +71
try {
const body = await c.req.json();
const { actorId, deviceId, name, defaultAgents } = body as {
actorId: string;
deviceId: string;
name?: string;
defaultAgents?: AgentSpec[];
};

if (!actorId || !deviceId) {
return c.json({ error: "actorId and deviceId are required" }, 400);
}

const stream = await protocol.getOrCreateSession(
sessionId,
defaultAgents,
);

await protocol.writePresence(
stream,
sessionId,
actorId,
deviceId,
"user",
"online",
name ?? actorId,
);

return c.json({ success: true, actorId, deviceId, status: "online" });
} catch (error) {
console.error("Failed to login:", error);
return c.json(
{ error: "Failed to login", details: (error as Error).message },
500,
);
}
});

app.post("/:sessionId/logout", async (c) => {
const sessionId = c.req.param("sessionId");

try {
const rawBody = await c.req.text();

let body: { actorId?: string; deviceId?: string; allDevices?: boolean };
try {
body = JSON.parse(rawBody);
} catch (parseError) {
console.error("[AUTH] Failed to parse logout body:", parseError);
return c.json({ error: "Invalid JSON body" }, 400);
}

const { actorId, deviceId, allDevices } = body;

if (!actorId) {
return c.json({ error: "actorId is required" }, 400);
}

if (!deviceId && !allDevices) {
return c.json({ error: "deviceId or allDevices is required" }, 400);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check if Zod is available in the project
find . -name "package.json" -type f | head -5 | xargs grep -l "zod" 2>/dev/null | head -3

Repository: superset-sh/superset

Length of output: 141


🏁 Script executed:

# Look at the apps/streams directory structure
ls -la apps/streams/src/ 2>/dev/null || echo "Directory not found"

Repository: superset-sh/superset

Length of output: 533


🏁 Script executed:

# Check if types.ts exists and examine existing schemas
cat apps/streams/src/types.ts 2>/dev/null | head -50

Repository: superset-sh/superset

Length of output: 1251


🏁 Script executed:

# Examine the auth.ts file to understand the full context
head -100 apps/streams/src/routes/auth.ts 2>/dev/null

Repository: superset-sh/superset

Length of output: 2401


🏁 Script executed:

# Search for existing Zod schema patterns in the codebase
rg "schema\s*=\s*z\." --type ts --type tsx -A 2 | head -50

Repository: superset-sh/superset

Length of output: 91


🏁 Script executed:

# Search for existing Zod schema patterns in the codebase
rg "schema\s*=\s*z\." -A 2 | head -50

Repository: superset-sh/superset

Length of output: 3515


🏁 Script executed:

# Check the complete types.ts to see all schema patterns
cat apps/streams/src/types.ts

Repository: superset-sh/superset

Length of output: 5275


🏁 Script executed:

# Verify if loginRequestSchema and logoutRequestSchema are already defined
rg "loginRequestSchema|logoutRequestSchema" apps/streams/

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

# Check the rest of auth.ts to see the complete file
tail -50 apps/streams/src/routes/auth.ts

Repository: superset-sh/superset

Length of output: 989


Use Zod schemas for login/logout request bodies instead of as casting and manual parsing.

The login handler bypasses validation with as casting, and the logout handler uses manual JSON parsing without Zod validation. Per coding guidelines, API route bodies should validate at the boundary using Zod schemas. Add loginRequestSchema and logoutRequestSchema to apps/streams/src/types.ts (following the pattern of existing schemas like sendMessageRequestSchema), then parse incoming payloads with .parse().

🛠️ Proposed fix
-import type { AgentSpec } from "../types";
+import { loginRequestSchema, logoutRequestSchema } from "../types";
@@
-			const body = await c.req.json();
-			const { actorId, deviceId, name, defaultAgents } = body as {
-				actorId: string;
-				deviceId: string;
-				name?: string;
-				defaultAgents?: AgentSpec[];
-			};
+			const rawBody = await c.req.json();
+			const { actorId, deviceId, name, defaultAgents } =
+				loginRequestSchema.parse(rawBody);
@@
-			const { actorId, deviceId, allDevices } = body;
+			const { actorId, deviceId, allDevices } =
+				logoutRequestSchema.parse(body);
🤖 Prompt for AI Agents
In `@apps/streams/src/routes/auth.ts` around lines 11 - 71, Replace the ad-hoc
body handling in the login and logout handlers with Zod validation: add
loginRequestSchema and logoutRequestSchema in types.ts matching the expected
shapes (login: { actorId: string, deviceId: string, name?: string,
defaultAgents?: AgentSpec[] }; logout: { actorId: string, deviceId?: string,
allDevices?: boolean }) and import them into the auth route; in the POST
"/:sessionId/login" handler call loginRequestSchema.parse(await c.req.json())
and use the returned object instead of the current `as` cast before calling
protocol.getOrCreateSession and protocol.writePresence, and in the POST
"/:sessionId/logout" handler call logoutRequestSchema.parse(await c.req.json())
instead of manual JSON.parse/rawBody, returning a 400 on ZodError (or letting
middleware map it) so invalid payloads are rejected with proper validation
errors.

Comment on lines +29 to +115
await protocol.writePresence(
stream,
sessionId,
actorId,
deviceId,
"user",
"online",
name ?? actorId,
);

return c.json({ success: true, actorId, deviceId, status: "online" });
} catch (error) {
console.error("Failed to login:", error);
return c.json(
{ error: "Failed to login", details: (error as Error).message },
500,
);
}
});

app.post("/:sessionId/logout", async (c) => {
const sessionId = c.req.param("sessionId");

try {
const rawBody = await c.req.text();

let body: { actorId?: string; deviceId?: string; allDevices?: boolean };
try {
body = JSON.parse(rawBody);
} catch (parseError) {
console.error("[AUTH] Failed to parse logout body:", parseError);
return c.json({ error: "Invalid JSON body" }, 400);
}

const { actorId, deviceId, allDevices } = body;

if (!actorId) {
return c.json({ error: "actorId is required" }, 400);
}

if (!deviceId && !allDevices) {
return c.json({ error: "deviceId or allDevices is required" }, 400);
}

const stream = protocol.getSession(sessionId);
if (!stream) {
return c.json({ error: "Session not found" }, 404);
}

if (allDevices) {
const deviceIds = await protocol.getDeviceIdsForActor(
sessionId,
actorId,
);

for (const devId of deviceIds) {
await protocol.writePresence(
stream,
sessionId,
actorId,
devId,
"user",
"offline",
);
}

return c.json({
success: true,
actorId,
devicesLoggedOut: deviceIds.length,
status: "offline",
});
} else {
await protocol.writePresence(
stream,
sessionId,
actorId,
deviceId!,
"user",
"offline",
);

return c.json({ success: true, actorId, deviceId, status: "offline" });
}
} catch (error) {
console.error("[AUTH] Failed to logout:", error);
return c.json(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Normalize auth log prefixes and extract presence literals.

Use the [streams/auth] prefix for error logs and extract "user", "online", and "offline" to module-level constants to avoid magic strings drifting.

🛠️ Proposed fix
 import { Hono } from "hono";
 import type { AIDBSessionProtocol } from "../protocol";
-import type { AgentSpec } from "../types";
+
+const ACTOR_TYPE_USER = "user";
+const PRESENCE_ONLINE = "online";
+const PRESENCE_OFFLINE = "offline";
@@
-			await protocol.writePresence(
+			await protocol.writePresence(
 				stream,
 				sessionId,
 				actorId,
 				deviceId,
-				"user",
-				"online",
+				ACTOR_TYPE_USER,
+				PRESENCE_ONLINE,
 				name ?? actorId,
 			);
@@
-		} catch (error) {
-			console.error("Failed to login:", error);
+		} catch (error) {
+			console.error("[streams/auth] Failed to login:", error);
 			return c.json(
 				{ error: "Failed to login", details: (error as Error).message },
 				500,
 			);
 		}
@@
-			} catch (parseError) {
-				console.error("[AUTH] Failed to parse logout body:", parseError);
+			} catch (parseError) {
+				console.error("[streams/auth] Failed to parse logout body:", parseError);
 				return c.json({ error: "Invalid JSON body" }, 400);
 			}
@@
-					await protocol.writePresence(
+					await protocol.writePresence(
 						stream,
 						sessionId,
 						actorId,
 						devId,
-						"user",
-						"offline",
+						ACTOR_TYPE_USER,
+						PRESENCE_OFFLINE,
 					);
 				}
@@
-				await protocol.writePresence(
+				await protocol.writePresence(
 					stream,
 					sessionId,
 					actorId,
 					deviceId!,
-					"user",
-					"offline",
+					ACTOR_TYPE_USER,
+					PRESENCE_OFFLINE,
 				);
@@
-		} catch (error) {
-			console.error("[AUTH] Failed to logout:", error);
+		} catch (error) {
+			console.error("[streams/auth] Failed to logout:", error);
 			return c.json(
 				{ error: "Failed to logout", details: (error as Error).message },
 				500,
 			);
 		}

As per coding guidelines, "Use prefixed console logging with consistent context pattern: [domain/operation] message for entry/exit of significant operations, external API calls, and error conditions" and "Extract hardcoded magic numbers, strings, and enums to named constants at module top instead of leaving them inline in logic".

Comment thread apps/streams/src/routes/messages.ts Outdated
Comment on lines +16 to +38
try {
const rawBody = await c.req.json();
const body = regenerateRequestSchema.parse(rawBody);

const _actorId =
body.actorId ?? c.req.header("X-Actor-Id") ?? crypto.randomUUID();

const stream = await protocol.getOrCreateSession(sessionId);

const agents = await protocol.getRegisteredAgents(sessionId);

if (agents.length === 0) {
return c.json({ error: "No agents registered for regeneration" }, 400);
}

const messageHistory = [
{
role: "user",
content: body.content,
},
];

await protocol.invokeAgent(stream, sessionId, agents[0]!, messageHistory);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the regenerateRequestSchema definition and understand its structure
rg -n 'regenerateRequestSchema' apps/streams/src -A 5

Repository: superset-sh/superset

Length of output: 1337


🏁 Script executed:

# Check how fromMessageId is used elsewhere in the codebase
rg -n 'fromMessageId' apps/streams/src -B 2 -A 2

Repository: superset-sh/superset

Length of output: 599


🏁 Script executed:

# Look at the protocol.invokeAgent signature to understand its parameters
rg -n 'invokeAgent' apps/streams/src -B 2 -A 3

Repository: superset-sh/superset

Length of output: 3528


Pass fromMessageId to fetch prior message context for regeneration.

The schema validates fromMessageId, but the route never uses it. Regeneration should fetch the original message and its context before invoking the agent, not just create a new conversation with only the new content. Either use fromMessageId to load and include prior context in messageHistory, or remove it from the schema if regeneration isn't scoped to a specific message.

🤖 Prompt for AI Agents
In `@apps/streams/src/routes/messages.ts` around lines 16 - 38, The route
validates regenerateRequestSchema which includes fromMessageId but never uses
it; update the handler (around regenerate logic using regenerateRequestSchema,
sessionId, protocol.getOrCreateSession, protocol.getRegisteredAgents, and
protocol.invokeAgent) to, when body.fromMessageId is present, fetch the original
message and its preceding context (e.g., via a protocol method like
getMessageById/getMessage or getMessageHistory for sessionId and fromMessageId),
prepend that returned context messages into messageHistory before pushing the
new user content, and then call protocol.invokeAgent with the composed
messageHistory; if no such protocol method exists, either implement one or
remove fromMessageId from regenerateRequestSchema and adjust validations
accordingly.

Comment on lines +16 to +67
try {
const rawBody = await c.req.json();
const body = regenerateRequestSchema.parse(rawBody);

const _actorId =
body.actorId ?? c.req.header("X-Actor-Id") ?? crypto.randomUUID();

const stream = await protocol.getOrCreateSession(sessionId);

const agents = await protocol.getRegisteredAgents(sessionId);

if (agents.length === 0) {
return c.json({ error: "No agents registered for regeneration" }, 400);
}

const messageHistory = [
{
role: "user",
content: body.content,
},
];

await protocol.invokeAgent(stream, sessionId, agents[0]!, messageHistory);

return c.json({ success: true }, 200);
} catch (error) {
console.error("Failed to regenerate:", error);
return c.json(
{ error: "Failed to regenerate", details: (error as Error).message },
500,
);
}
});

app.post("/:sessionId/stop", async (c) => {
const sessionId = c.req.param("sessionId");

try {
const rawBody = await c.req.json();
const body = stopGenerationRequestSchema.parse(rawBody);

await protocol.stopGeneration(sessionId, body.messageId ?? null);

return new Response(null, { status: 204 });
} catch (error) {
console.error("Failed to stop generation:", error);
return c.json(
{
error: "Failed to stop generation",
details: (error as Error).message,
},
500,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cd apps/streams && wc -l src/routes/messages.ts

Repository: superset-sh/superset

Length of output: 89


🏁 Script executed:

cd apps/streams && cat -n src/routes/messages.ts | head -100

Repository: superset-sh/superset

Length of output: 2478


🏁 Script executed:

# Check if Zod schemas are defined and what they validate
cd apps/streams && cat -n src/types.ts | head -50

Repository: superset-sh/superset

Length of output: 1601


🏁 Script executed:

# Search for error handling patterns in the codebase with Zod
rg "ZodError|instanceof.*Error" apps/streams/src --type ts -A 3 -B 1 | head -80

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

# Check for console logging patterns with prefixes
rg "console\.(error|log|warn)" apps/streams/src --type ts -B 2 | head -60

Repository: superset-sh/superset

Length of output: 3265


🏁 Script executed:

cd apps/streams && cat -n src/types.ts | grep -A 20 "regenerateRequestSchema\|stopGenerationRequestSchema"

Repository: superset-sh/superset

Length of output: 1105


🏁 Script executed:

# Search for any existing Zod error handling patterns
rg "ZodError|\.parse\(" apps/streams/src --type ts -A 5 | head -100

Repository: superset-sh/superset

Length of output: 4518


🏁 Script executed:

# Check if other routes handle validation errors differently
rg "catch.*error|\.parse\(" apps/streams/src/routes --type ts -B 3 -A 8 | head -150

Repository: superset-sh/superset

Length of output: 8553


🏁 Script executed:

cd apps/streams && cat -n src/handlers/send-message.ts

Repository: superset-sh/superset

Length of output: 1846


🏁 Script executed:

cd apps/streams && cat -n src/handlers/invoke-agent.ts

Repository: superset-sh/superset

Length of output: 3266


Separate validation from business logic error handling and add context prefix to logs.

The handlers already demonstrate the correct pattern: wrap Zod parse() in its own try-catch to return 400 for validation errors, keeping business logic errors as 500. Also, regenerateRequestSchema requires fromMessageId, but the endpoint doesn't use it—clarify if this is intentional or should be included in messageHistory. Add the [streams/messages] prefix to console logs per the logging convention.

🛠️ Proposed fix
	app.post("/:sessionId/regenerate", async (c) => {
		const sessionId = c.req.param("sessionId");

+		let body: RegenerateRequest;
+		try {
+			const rawBody = await c.req.json();
+			body = regenerateRequestSchema.parse(rawBody);
+		} catch (error) {
+			return c.json(
+				{ error: "Invalid regenerate payload", details: (error as Error).message },
+				400,
+			);
+		}
+
		try {
-			const rawBody = await c.req.json();
-			const body = regenerateRequestSchema.parse(rawBody);

			const _actorId =
				body.actorId ?? c.req.header("X-Actor-Id") ?? crypto.randomUUID();

			const stream = await protocol.getOrCreateSession(sessionId);

			const agents = await protocol.getRegisteredAgents(sessionId);

			if (agents.length === 0) {
				return c.json({ error: "No agents registered for regeneration" }, 400);
			}

			const messageHistory = [
				{
					role: "user",
					content: body.content,
				},
			];

			await protocol.invokeAgent(stream, sessionId, agents[0]!, messageHistory);

			return c.json({ success: true }, 200);
		} catch (error) {
-			console.error("Failed to regenerate:", error);
+			console.error("[streams/messages] Failed to regenerate:", error);
			return c.json(
				{ error: "Failed to regenerate", details: (error as Error).message },
				500,
			);
		}
	});

	app.post("/:sessionId/stop", async (c) => {
		const sessionId = c.req.param("sessionId");

+		let body: StopGenerationRequest;
+		try {
+			const rawBody = await c.req.json();
+			body = stopGenerationRequestSchema.parse(rawBody);
+		} catch (error) {
+			return c.json(
+				{ error: "Invalid stop-generation payload", details: (error as Error).message },
+				400,
+			);
+		}
+
		try {
-			const rawBody = await c.req.json();
-			const body = stopGenerationRequestSchema.parse(rawBody);

			await protocol.stopGeneration(sessionId, body.messageId ?? null);

			return new Response(null, { status: 204 });
		} catch (error) {
-			console.error("Failed to stop generation:", error);
+			console.error("[streams/messages] Failed to stop generation:", error);
			return c.json(
				{
					error: "Failed to stop generation",
					details: (error as Error).message,
				},
				500,
			);
		}
	});

Note: The regenerateRequestSchema requires fromMessageId, but the handler doesn't use it. Verify if context from prior messages should be included in the invocation.

🤖 Prompt for AI Agents
In `@apps/streams/src/routes/messages.ts` around lines 16 - 67, Split request
validation and business logic error handling in the POST
"/:sessionId/regenerate" and "/:sessionId/stop" handlers: wrap
regenerateRequestSchema.parse(rawBody) and
stopGenerationRequestSchema.parse(rawBody) each in their own try-catch that
returns a 400 with validation details, and keep the remaining logic
(protocol.getOrCreateSession, protocol.getRegisteredAgents,
protocol.invokeAgent, protocol.stopGeneration) in a separate try-catch that
returns 500 on failure; also ensure regenerateRequestSchema's required field
fromMessageId is either used to build messageHistory (include prior messages
starting from fromMessageId) or the schema is adjusted if intentional, and
change all console.error calls in these handlers to prefix messages with
"[streams/messages]" (e.g., "[streams/messages] Failed to regenerate: ...") to
follow logging convention.

Comment on lines +11 to +39
try {
const rawBody = await c.req.json();
const body = toolResultRequestSchema.parse(rawBody);

const actorId = c.req.header("X-Actor-Id") ?? crypto.randomUUID();
const messageId = body.messageId ?? crypto.randomUUID();

const stream = await protocol.getOrCreateSession(sessionId);

await protocol.writeToolResult(
stream,
sessionId,
messageId,
actorId,
body.toolCallId,
body.output,
body.error ?? null,
body.txid,
);

return new Response(null, { status: 204 });
} catch (error) {
console.error("Failed to add tool result:", error);
return c.json(
{
error: "Failed to add tool result",
details: (error as Error).message,
},
500,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find the file and examine its content
find . -name "tool-results.ts" -type f

Repository: superset-sh/superset

Length of output: 105


🏁 Script executed:

# Read the tool-results.ts file to inspect the actual code
cat -n apps/streams/src/routes/tool-results.ts

Repository: superset-sh/superset

Length of output: 1462


🏁 Script executed:

# Search for similar error handling patterns with ZodError in the codebase
rg "ZodError" --type ts --type tsx -A 3 -B 1

Repository: superset-sh/superset

Length of output: 91


🏁 Script executed:

# Search for console logging patterns with domain/operation prefix to verify consistency
rg "\[.*\/.*\]" --type ts --type tsx | grep -i "console\." | head -20

Repository: superset-sh/superset

Length of output: 144


🏁 Script executed:

# Check if toolResultRequestSchema is imported and used for validation
rg "toolResultRequestSchema" --type ts --type tsx -B 2 -A 2

Repository: superset-sh/superset

Length of output: 91


🏁 Script executed:

# Search for ZodError handling in TypeScript files (use -t ts only)
rg "ZodError" -t ts -A 3 -B 1

Repository: superset-sh/superset

Length of output: 546


🏁 Script executed:

# Search for console logging patterns with domain prefix (escape properly)
rg "console\.\(error\|log\|warn\).*\[" -t ts | head -20

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

# Search for Zod parse patterns to see how errors are typically handled
rg "\.parse\(" -t ts -A 2 -B 1 | head -40

Repository: superset-sh/superset

Length of output: 2292


🏁 Script executed:

# Check if there are other routes in the streams directory for pattern consistency
find apps/streams/src/routes -name "*.ts" -type f -exec basename {} \;

Repository: superset-sh/superset

Length of output: 171


🏁 Script executed:

# Check error handling in other routes in apps/streams/src/routes
cat -n apps/streams/src/routes/messages.ts

Repository: superset-sh/superset

Length of output: 2478


🏁 Script executed:

# Check error handling in fork.ts
cat -n apps/streams/src/routes/fork.ts

Repository: superset-sh/superset

Length of output: 1254


🏁 Script executed:

# Look for console.error or console.log usage in the streams directory
rg "console\." apps/streams/src -t ts -B 2 -A 2

Repository: superset-sh/superset

Length of output: 7860


🏁 Script executed:

# Check if ZodError needs to be imported or if there's a better pattern
rg "instanceof.*Error" apps/streams/src -t ts -B 2 -A 2

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

# Verify the toolResultRequestSchema to understand what it returns
cat -n apps/streams/src/types.ts | head -50

Repository: superset-sh/superset

Length of output: 1601


🏁 Script executed:

# Check how auth.ts implements the [AUTH] prefix to ensure consistency
cat -n apps/streams/src/routes/auth.ts | grep -A 15 "console.error"

Repository: superset-sh/superset

Length of output: 1377


Distinguish Zod validation errors from other failures and add the [streams/tool-results] prefix to error logs.

Zod parse failures should return 400 (invalid input), not 500, and console.error logs must include the [streams/tool-results] prefix for consistency with the coding guidelines.

🛠️ Proposed fix
 import { Hono } from "hono";
+import { ZodError } from "zod";
 import type { AIDBSessionProtocol } from "../protocol";
 import { toolResultRequestSchema } from "../types";

@@ -30,9 +31,16 @@ export function createToolResultRoutes(protocol: AIDBSessionProtocol) {
 
 			return new Response(null, { status: 204 });
 		} catch (error) {
-			console.error("Failed to add tool result:", error);
+			const isValidation = error instanceof ZodError;
+			const status = isValidation ? 400 : 500;
+			console.error("[streams/tool-results] Failed to add tool result:", error);
 			return c.json(
 				{
-					error: "Failed to add tool result",
+					error: isValidation
+						? "Invalid tool result payload"
+						: "Failed to add tool result",
 					details: (error as Error).message,
 				},
-				500,
+				status,
 			);
 		}
🤖 Prompt for AI Agents
In `@apps/streams/src/routes/tool-results.ts` around lines 11 - 39, The handler
currently treats all errors as 500 and logs without the required prefix; update
the error handling around toolResultRequestSchema.parse in the try/catch so that
if the caught error is a ZodError (from toolResultRequestSchema.parse) you
return status 400 with a JSON body indicating invalid input, otherwise return
500 for server errors; also prepend “[streams/tool-results]” to the
console.error message for all logged errors (the catch around await
protocol.writeToolResult / toolResultRequestSchema.parse should check error
instanceof ZodError and branch accordingly).

@Kitenite Kitenite merged commit 0551b4e into main Feb 6, 2026
11 of 12 checks passed
@Kitenite Kitenite deleted the kitenite/ai-chat-ui branch February 6, 2026 00:19
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Fix all issues with AI agents
In `@apps/streams/src/index.ts`:
- Around line 5-8: PORT and INTERNAL_PORT are parsed with parseInt without NaN
checks, which can yield NaN and break server startup; update the parsing logic
for PORT and INTERNAL_PORT (the constants PORT and INTERNAL_PORT in
apps/streams/src/index.ts) to validate the parseInt result (e.g., const parsed =
parseInt(..., 10); if (Number.isNaN(parsed)) fallback to the default numeric
value 8080/8081) and use the validated numeric variables when constructing
DURABLE_STREAMS_URL so it never embeds "NaN" — ensure the final
DURABLE_STREAMS_URL is built from the validated INTERNAL_PORT variable.

In `@apps/streams/src/protocol.ts`:
- Around line 579-582: The getRegisteredAgents method currently returns the
internal agents array reference (sessionStates.get(sessionId).agents), allowing
callers to mutate session state; change getRegisteredAgents to return a shallow
copy (e.g., [...state.agents] or state.agents.slice()) so callers get an
immutable copy; update the function in protocol.ts (getRegisteredAgents and any
related usages) to return the copy and keep sessionStates and AgentSpec
unchanged.
- Around line 110-133: In resetSession, calling this.messageSeqs.clear() wipes
sequence counters for every session; instead iterate the session's state (from
this.sessionStates.get(sessionId)) and delete only messageIds related to that
session's activeGenerations (e.g., for each generation in
state.activeGenerations use its messageId to
this.messageSeqs.delete(messageId)); remove or reset only per-session entries
and leave other sessions' messageSeqs intact, and ensure you still clear
state.activeGenerations after removing those specific message sequences.
- Around line 97-108: deleteSession currently cleans subscriptions and closes
session DB but doesn't abort any in-flight agent generation, so ongoing fetches
can still call streamAgentResponse.writeChunk on a removed stream; locate the
session state object in sessionStates (referenced in deleteSession) and, before
unsubscribing or deleting, call the generation abort mechanism on any active
generation (e.g., state.activeGenerationAbortController.abort() or similar field
used to cancel fetch/streamAgentResponse), then wait/ensure the generator
cleanup completes (or mark it cancelled) prior to calling
state.changeSubscription?.unsubscribe(), state.sessionDB.close(),
this.streams.delete(sessionId) and this.sessionStates.delete(sessionId).
- Around line 181-218: The code can invoke the same agent twice (once via
handleSendMessage -> invokeAgent and once via setupReactiveAgentTrigger ->
notifyRegisteredAgents) because deduplication only tracks message IDs; modify
the flow to deduplicate by agent+message: either (A) update
notifyRegisteredAgents to accept and check a per-message Set of invoked agent
IDs (or maintain a new map like activeAgentInvocations keyed by messageId ->
Set<agentId>) and skip invoking an agent already in that Set, or (B) when
handleSendMessage invokes protocol.invokeAgent, record that agentId under the
messageId in the existing activeGenerations/tracking structure so
setupReactiveAgentTrigger/notifyRegisteredAgents can consult it and avoid
duplicate invokes; reference functions/classes: handleSendMessage,
setupReactiveAgentTrigger, notifyRegisteredAgents, invokeAgent, and
activeGenerations/activeAgentInvocations. Ensure the chosen place for the check
is centralized (preferably inside notifyRegisteredAgents) to avoid duplicated
logic.
- Around line 400-415: The fetch call for sending the agent request lacks a hard
timeout; create a combined abort signal using
AbortSignal.any([abortController.signal, AbortSignal.timeout(<ms>)]) (use
agent.timeout if available or a sensible default e.g. 30_000) and pass that
combinedSignal to fetch instead of abortController.signal, and also pass the
same combinedSignal into streamAgentResponse; update references around the
existing abortController, the fetch call, and streamAgentResponse to use
combinedSignal so the request, generation slot, and stream are automatically
aborted on timeout.
🧹 Nitpick comments (4)
apps/streams/src/handlers/send-message.ts (1)

54-59: Add [streams/send-message] prefix to the error log for consistency.

Line 48 uses the [streams/send-message] prefix, but line 55 uses a bare "Failed to send message:". Keep the context prefix consistent. As per coding guidelines, use [domain/operation] prefixed console logging.

Proposed fix
-		console.error("Failed to send message:", error);
+		console.error("[streams/send-message] Failed to send message:", error);
apps/streams/src/protocol.ts (1)

235-263: Multiple methods use long positional parameter lists — prefer object parameters.

writeChunk takes 7+ positional args; writeUserMessage, writePresence, writeToolResult, and writeApprovalResponse are similar. This makes call sites fragile and hard to read. As per coding guidelines, "Use object parameters for functions with 2 or more parameters."

Since this is vendored code, this can be deferred, but consider refactoring as the protocol API stabilizes.

Example for writeChunk
-async writeChunk(
-	stream: DurableStream,
-	sessionId: string,
-	messageId: string,
-	actorId: string,
-	role: MessageRole,
-	chunk: StreamChunk,
-	txid?: string,
-): Promise<void> {
+async writeChunk(opts: {
+	stream: DurableStream;
+	sessionId: string;
+	messageId: string;
+	actorId: string;
+	role: MessageRole;
+	chunk: StreamChunk;
+	txid?: string;
+}): Promise<void> {
apps/streams/src/routes/messages.ts (1)

27-41: Redundant empty-agents check.

Lines 27–29 check agents.length === 0 and return 400. Lines 38–41 check !agent (i.e., agents[0] is falsy) and return the same 400. The second check is unreachable after the first passes. Remove one.

Proposed fix
-			if (agents.length === 0) {
-				return c.json({ error: "No agents registered for regeneration" }, 400);
-			}
-
-			const messageHistory = [
-				{
-					role: "user",
-					content: body.content,
-				},
-			];
-
-			const agent = agents[0];
-			if (!agent) {
+			const agent = agents[0];
+			if (!agent) {
 				return c.json({ error: "No agents registered for regeneration" }, 400);
 			}
+
+			const messageHistory = [
+				{
+					role: "user",
+					content: body.content,
+				},
+			];
apps/streams/src/index.ts (1)

29-32: Graceful shutdown is incomplete — only stops the durable stream server.

The SIGINT handler stops durableStreamServer but doesn't close the Hono HTTP server, so in-flight requests are dropped. Also, SIGTERM (the standard signal sent by container orchestrators) is not handled.

serve() from @hono/node-server returns a Server instance that can be closed.

Proposed fix
-serve({ fetch: app.fetch, port: PORT }, (info) => {
+const server = serve({ fetch: app.fetch, port: PORT }, (info) => {
 	console.log(`[streams] Proxy running on http://localhost:${info.port}`);
 });

-process.on("SIGINT", async () => {
+async function shutdown() {
+	console.log("[streams] Shutting down...");
+	server.close();
 	await durableStreamServer.stop();
 	process.exit(0);
-});
+}
+
+process.on("SIGINT", shutdown);
+process.on("SIGTERM", shutdown);

Comment thread apps/streams/src/index.ts
Comment on lines +5 to +8
const PORT = parseInt(process.env.PORT ?? "8080", 10);
const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? "8081", 10);
const DURABLE_STREAMS_URL =
process.env.DURABLE_STREAMS_URL ?? `http://127.0.0.1:${INTERNAL_PORT}`;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

parseInt without NaN validation could produce undefined behavior.

If PORT or INTERNAL_PORT is set to a non-numeric string, parseInt returns NaN, which silently propagates to the server constructors. Add a guard.

Proposed fix
-const PORT = parseInt(process.env.PORT ?? "8080", 10);
-const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? "8081", 10);
+const PORT = parseInt(process.env.PORT ?? "8080", 10);
+const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? "8081", 10);
+
+if (Number.isNaN(PORT) || Number.isNaN(INTERNAL_PORT)) {
+	console.error("[streams] Invalid PORT or INTERNAL_PORT environment variable");
+	process.exit(1);
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const PORT = parseInt(process.env.PORT ?? "8080", 10);
const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? "8081", 10);
const DURABLE_STREAMS_URL =
process.env.DURABLE_STREAMS_URL ?? `http://127.0.0.1:${INTERNAL_PORT}`;
const PORT = parseInt(process.env.PORT ?? "8080", 10);
const INTERNAL_PORT = parseInt(process.env.INTERNAL_PORT ?? "8081", 10);
if (Number.isNaN(PORT) || Number.isNaN(INTERNAL_PORT)) {
console.error("[streams] Invalid PORT or INTERNAL_PORT environment variable");
process.exit(1);
}
const DURABLE_STREAMS_URL =
process.env.DURABLE_STREAMS_URL ?? `http://127.0.0.1:${INTERNAL_PORT}`;
🤖 Prompt for AI Agents
In `@apps/streams/src/index.ts` around lines 5 - 8, PORT and INTERNAL_PORT are
parsed with parseInt without NaN checks, which can yield NaN and break server
startup; update the parsing logic for PORT and INTERNAL_PORT (the constants PORT
and INTERNAL_PORT in apps/streams/src/index.ts) to validate the parseInt result
(e.g., const parsed = parseInt(..., 10); if (Number.isNaN(parsed)) fallback to
the default numeric value 8080/8081) and use the validated numeric variables
when constructing DURABLE_STREAMS_URL so it never embeds "NaN" — ensure the
final DURABLE_STREAMS_URL is built from the validated INTERNAL_PORT variable.

Comment on lines +97 to +108
deleteSession(sessionId: string): void {
const state = this.sessionStates.get(sessionId);
if (state) {
// Unsubscribe from changes
state.changeSubscription?.unsubscribe();
// Close SessionDB to cleanup stream subscription
state.sessionDB.close();
}

this.streams.delete(sessionId);
this.sessionStates.delete(sessionId);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

deleteSession doesn't abort in-flight agent generations.

If an agent generation is active when the session is deleted, the fetch continues, and streamAgentResponse will attempt to call writeChunk on a stream that's been removed from this.streams. This will either throw or silently fail. Abort active generations before cleaning up.

Proposed fix
 	deleteSession(sessionId: string): void {
+		// Abort any in-flight generations first
+		this.stopGeneration(sessionId, null);
+
 		const state = this.sessionStates.get(sessionId);
 		if (state) {
 			state.changeSubscription?.unsubscribe();
 			state.sessionDB.close();
 		}

 		this.streams.delete(sessionId);
 		this.sessionStates.delete(sessionId);
 	}
🤖 Prompt for AI Agents
In `@apps/streams/src/protocol.ts` around lines 97 - 108, deleteSession currently
cleans subscriptions and closes session DB but doesn't abort any in-flight agent
generation, so ongoing fetches can still call streamAgentResponse.writeChunk on
a removed stream; locate the session state object in sessionStates (referenced
in deleteSession) and, before unsubscribing or deleting, call the generation
abort mechanism on any active generation (e.g.,
state.activeGenerationAbortController.abort() or similar field used to cancel
fetch/streamAgentResponse), then wait/ensure the generator cleanup completes (or
mark it cancelled) prior to calling state.changeSubscription?.unsubscribe(),
state.sessionDB.close(), this.streams.delete(sessionId) and
this.sessionStates.delete(sessionId).

Comment on lines +110 to +133
async resetSession(sessionId: string, _clearPresence = false): Promise<void> {
const stream = this.streams.get(sessionId);
if (!stream) {
throw new Error(`Session ${sessionId} not found`);
}

// Write control reset event to the stream
const resetEvent = {
headers: {
control: "reset" as const,
},
};

await stream.append(JSON.stringify(resetEvent));

// Clear in-memory state
this.messageSeqs.clear();
const state = this.sessionStates.get(sessionId);
if (state) {
state.activeGenerations = [];
}

this.updateLastActivity(sessionId);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

messageSeqs.clear() wipes sequence counters for ALL sessions, not just the reset one.

this.messageSeqs is a global map keyed by messageId. Calling .clear() on line 126 destroys counters for every in-flight message across all sessions. If another session has an active generation, its next chunk will restart at seq 0, producing duplicate keys and corrupting that session's stream.

Scope the cleanup to the session being reset.

🐛 Proposed fix — clear only message sequences belonging to this session
 		// Clear in-memory state
-		this.messageSeqs.clear();
+		// Only clear sequences for active generations in this session
 		const state = this.sessionStates.get(sessionId);
 		if (state) {
+			for (const msgId of state.activeGenerations) {
+				this.messageSeqs.delete(msgId);
+			}
 			state.activeGenerations = [];
 		}

Note: This only covers sequences tracked via activeGenerations. If user-message sequences also need cleanup, consider maintaining a per-session set of messageIds.

🤖 Prompt for AI Agents
In `@apps/streams/src/protocol.ts` around lines 110 - 133, In resetSession,
calling this.messageSeqs.clear() wipes sequence counters for every session;
instead iterate the session's state (from this.sessionStates.get(sessionId)) and
delete only messageIds related to that session's activeGenerations (e.g., for
each generation in state.activeGenerations use its messageId to
this.messageSeqs.delete(messageId)); remove or reset only per-session entries
and leave other sessions' messageSeqs intact, and ensure you still clear
state.activeGenerations after removing those specific message sequences.

Comment on lines +181 to +218
private setupReactiveAgentTrigger(sessionId: string): void {
const state = this.sessionStates.get(sessionId);
if (!state) return;

const stream = this.streams.get(sessionId);
if (!stream) return;

// Subscribe to changes in the modelMessages collection
// subscribeChanges() only fires for NEW changes (after subscription)
const subscription = state.modelMessages.subscribeChanges((changes) => {
for (const change of changes) {
if (change.type !== "insert") continue;

const message = change.value;
if (!message) continue;

if (message.role !== "user") continue;

this.getMessageHistory(sessionId)
.then((history) => {
this.notifyRegisteredAgents(
stream,
sessionId,
"user-messages",
history,
);
})
.catch((err) => {
console.error(
`[Protocol] Failed to get message history for agent trigger:`,
err,
);
});
}
});

state.changeSubscription = subscription;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n 'handleSendMessage|body\.agent|invokeAgent|notifyRegisteredAgents|registerAgent' apps/streams/src/protocol.ts --type ts -B2 -A2

Repository: superset-sh/superset

Length of output: 1284


🏁 Script executed:

# Also search broadly across the file to understand the agent flow
rg -n 'invokeAgent|notifyRegisteredAgents' apps/streams/src/protocol.ts --type ts -B3 -A5

Repository: superset-sh/superset

Length of output: 1160


🏁 Script executed:

# Check registerAgent and trigger configuration
fd . apps/streams/src --type f -name "*.ts" | xargs rg -l "registerAgent|triggers" | head -5

Repository: superset-sh/superset

Length of output: 507


🏁 Script executed:

rg -n 'handleSendMessage|send-message|body\.agent' apps/streams/src/ --type ts -B3 -A5

Repository: superset-sh/superset

Length of output: 3919


🏁 Script executed:

# Search more broadly for where body.agent is used
rg -n '\.agent' apps/streams/src/protocol.ts --type ts -B2 -A2 | head -60

Repository: superset-sh/superset

Length of output: 1348


🏁 Script executed:

# Check invokeAgent implementation for deduplication or generation tracking
sed -n '388,450p' apps/streams/src/protocol.ts | cat -n

Repository: superset-sh/superset

Length of output: 1895


🏁 Script executed:

# Check if there's any generation tracking or state management that would prevent double invocation
rg -n 'generation|activeGeneration|pending' apps/streams/src/protocol.ts --type ts | head -20

Repository: superset-sh/superset

Length of output: 440


🏁 Script executed:

# Check addActiveGeneration implementation
rg -n 'addActiveGeneration|removeActiveGeneration' apps/streams/src/protocol.ts --type ts -B2 -A4

Repository: superset-sh/superset

Length of output: 1086


🏁 Script executed:

# Check AgentSpec and trigger configuration types
rg -n 'AgentSpec|triggers' apps/streams/src/types.ts --type ts -B1 -A3

Repository: superset-sh/superset

Length of output: 801


Potential double agent invocation for user messages.

When handleSendMessage receives body.agent, it calls protocol.invokeAgent directly, creating a unique generation. Independently, setupReactiveAgentTrigger fires on every new user message insertion and calls notifyRegisteredAgents. If the same agent is both passed ad-hoc via the request body and registered with triggers: "user-messages" (the default), it will be invoked twice with separate message IDs, both executing independently.

The activeGenerations array only tracks message IDs, not agent invocations, so there is no deduplication mechanism. Consider either deduplicating in notifyRegisteredAgents by tracking which agents have already been invoked for a message, or documenting that callers should not pass an agent in the request body when the same agent is configured with reactive triggers.

🤖 Prompt for AI Agents
In `@apps/streams/src/protocol.ts` around lines 181 - 218, The code can invoke the
same agent twice (once via handleSendMessage -> invokeAgent and once via
setupReactiveAgentTrigger -> notifyRegisteredAgents) because deduplication only
tracks message IDs; modify the flow to deduplicate by agent+message: either (A)
update notifyRegisteredAgents to accept and check a per-message Set of invoked
agent IDs (or maintain a new map like activeAgentInvocations keyed by messageId
-> Set<agentId>) and skip invoking an agent already in that Set, or (B) when
handleSendMessage invokes protocol.invokeAgent, record that agentId under the
messageId in the existing activeGenerations/tracking structure so
setupReactiveAgentTrigger/notifyRegisteredAgents can consult it and avoid
duplicate invokes; reference functions/classes: handleSendMessage,
setupReactiveAgentTrigger, notifyRegisteredAgents, invokeAgent, and
activeGenerations/activeAgentInvocations. Ensure the chosen place for the check
is centralized (preferably inside notifyRegisteredAgents) to avoid duplicated
logic.

Comment on lines +400 to +415
try {
const requestBody = {
...agent.bodyTemplate,
messages: messageHistory,
stream: true,
};

const response = await fetch(agent.endpoint, {
method: agent.method ?? "POST",
headers: {
"Content-Type": "application/json",
...agent.headers,
},
body: JSON.stringify(requestBody),
signal: abortController.signal,
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

AbortSignal.any() Node.js 20 support availability

💡 Result:

AbortSignal.any() is supported in Node.js 20 starting with Node v20.3.0 (so it works in 20.3.0 and later, e.g. all current Node 20 LTS patch releases). In Node 20.0.0–20.2.x, it is not available and you’d need a polyfill/workaround. [1], [2], [3]

Sources: Node.js API docs (“Added in: v20.3.0”) [1]; Node v20.3.0 release notes (“implement AbortSignal.any()”) [2]; MDN compatibility data (Node.js 20.3.0) [3].


🏁 Script executed:

cat apps/streams/src/protocol.ts | head -420 | tail -30

Repository: superset-sh/superset

Length of output: 861


🏁 Script executed:

cd apps/streams && (cat package.json | grep -A5 -B5 '"node"' || echo "not in streams package") && cd /
cat package.json | grep -A5 -B5 '"node"' || echo "checking root package.json"

Repository: superset-sh/superset

Length of output: 175


🌐 Web query:

AbortSignal.timeout() Node.js version support

💡 Result:

AbortSignal.timeout() is supported in Node.js starting with:

  • Node.js v17.3.0
  • Backported to Node.js v16.14.0 (so it’s available in Node 16.14+, including later LTS lines like 18/20/22/24) [1]

MDN’s compatibility data also lists Node.js 17.3.0+ for this API [2].

Sources: [1] [2]


🏁 Script executed:

find . -name ".nvmrc" -o -name ".node-version" -o -name "engines" 2>/dev/null | head -5
cat .nvmrc 2>/dev/null || cat .node-version 2>/dev/null || echo "No version files found"
git ls-files | grep -E "(package\.json|\.nvmrc|\.node-version)" | head -10

Repository: superset-sh/superset

Length of output: 317


🏁 Script executed:

cat package.json 2>/dev/null | jq '.engines' 2>/dev/null || grep -A2 '"engines"' package.json 2>/dev/null || echo "Checking for engines field"

Repository: superset-sh/superset

Length of output: 68


🏁 Script executed:

cat package.json | head -30

Repository: superset-sh/superset

Length of output: 1092


🏁 Script executed:

cat apps/streams/package.json 2>/dev/null | head -30 || echo "No streams package.json"

Repository: superset-sh/superset

Length of output: 1005


🏁 Script executed:

cat apps/streams/package.json | tail -20

Repository: superset-sh/superset

Length of output: 578


🏁 Script executed:

find . -name ".github" -type d | head -1 && ls .github/workflows 2>/dev/null | head -5

Repository: superset-sh/superset

Length of output: 159


🏁 Script executed:

cat .github/workflows/ci.yml | grep -A5 -B5 "node-version\|node_version" | head -40

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

cat .github/workflows/ci.yml | head -60

Repository: superset-sh/superset

Length of output: 1272


🏁 Script executed:

cat .github/workflows/ci.yml | grep -i "node\|bun" | head -20

Repository: superset-sh/superset

Length of output: 732


🏁 Script executed:

cat bun.lockb 2>/dev/null | head -1 && echo "lockfile exists" || echo "checking for other version info"

Repository: superset-sh/superset

Length of output: 79


🏁 Script executed:

rg -A3 -B3 "AbortSignal\.(any|timeout)" apps/streams/src/protocol.ts 2>/dev/null || echo "No AbortSignal usage found yet"

Repository: superset-sh/superset

Length of output: 94


🌐 Web query:

Bun 1.3.3 Node.js compatibility AbortSignal.any() support

💡 Result:

Yes—Bun 1.3.3 supports AbortSignal.any().

  • Bun added AbortSignal.any() in Bun v1.1.4, so it’s included in later releases like 1.3.3. (bun.com)
  • Bun’s API reference documents AbortSignal.any(signals: AbortSignal[]): AbortSignal. (bun.sh)
  • For Node.js parity, AbortSignal.any() was added in Node v18.17.0 and v20.3.0 (and later). (nodejs.org)

Citations:


🏁 Script executed:

cat apps/streams/src/protocol.ts | sed -n '450,470p'

Repository: superset-sh/superset

Length of output: 488


Add timeout to agent fetch to prevent indefinite blocking.

The fetch call at line 407 lacks a timeout—if the agent endpoint hangs or is slow, the request blocks indefinitely, holding the abort controller, generation slot, and stream reference. The abort signal alone relies on explicit stopGeneration calls.

Compose the abort signal with AbortSignal.timeout() to enforce an automatic deadline:

Proposed fix using AbortSignal.any()
+		const AGENT_TIMEOUT_MS = 120_000; // 2 minutes
+		const timeoutSignal = AbortSignal.timeout(AGENT_TIMEOUT_MS);
+		const combinedSignal = AbortSignal.any([abortController.signal, timeoutSignal]);

 		const response = await fetch(agent.endpoint, {
 			method: agent.method ?? "POST",
 			headers: {
 				"Content-Type": "application/json",
 				...agent.headers,
 			},
 			body: JSON.stringify(requestBody),
-			signal: abortController.signal,
+			signal: combinedSignal,
 		});

Pass combinedSignal to streamAgentResponse as well.

🤖 Prompt for AI Agents
In `@apps/streams/src/protocol.ts` around lines 400 - 415, The fetch call for
sending the agent request lacks a hard timeout; create a combined abort signal
using AbortSignal.any([abortController.signal, AbortSignal.timeout(<ms>)]) (use
agent.timeout if available or a sensible default e.g. 30_000) and pass that
combinedSignal to fetch instead of abortController.signal, and also pass the
same combinedSignal into streamAgentResponse; update references around the
existing abortController, the fetch call, and streamAgentResponse to use
combinedSignal so the request, generation slot, and stream are automatically
aborted on timeout.

Comment on lines +579 to +582
getRegisteredAgents(sessionId: string): AgentSpec[] {
const state = this.sessionStates.get(sessionId);
return state?.agents ?? [];
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

getRegisteredAgents returns a direct mutable reference to the internal agents array.

Callers (e.g., the regenerate route at messages.ts line 25) receive the live array. Any mutation (push, splice) by a caller would corrupt the session state. Return a shallow copy instead.

Proposed fix
 	getRegisteredAgents(sessionId: string): AgentSpec[] {
 		const state = this.sessionStates.get(sessionId);
-		return state?.agents ?? [];
+		return state ? [...state.agents] : [];
 	}
🤖 Prompt for AI Agents
In `@apps/streams/src/protocol.ts` around lines 579 - 582, The getRegisteredAgents
method currently returns the internal agents array reference
(sessionStates.get(sessionId).agents), allowing callers to mutate session state;
change getRegisteredAgents to return a shallow copy (e.g., [...state.agents] or
state.agents.slice()) so callers get an immutable copy; update the function in
protocol.ts (getRegisteredAgents and any related usages) to return the copy and
keep sessionStates and AgentSpec unchanged.

This was referenced Feb 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant