fix(streams): streaming correctness, reliability, and performance overhaul#1391
Conversation
…rrors Track producer background errors per session. finishGeneration now flushes, clears per-message seq state, and throws if any producer errors occurred during the run. The finish route returns structured error response (code: FINISH_FAILED) instead of silent success.
Extract recordProducerError and drainProducerErrors as private helpers. Simplifies the onError callback and finishGeneration method, making the error lifecycle (record → drain → throw) explicit.
Non-2xx responses from finish are now logged with the response body. The finish request now sends the messageId so the server can clear per-message seq state.
deleteSession is now async and awaits producer.flush() then producer.detach() before cleaning up session state. Prevents returning 204 while queued chunks are still in flight.
Ensures all queued chunks are durably written before the reset event is appended, preventing reset from racing ahead of buffered data. Also clears producer errors on reset.
Extract appendToStream helper that prefers the producer when available, falling back to direct stream.append. writeChunk, writeUserMessage, and writePresence all use this single write path now. User messages and presence flush immediately for durability while streaming chunks remain buffered.
Pass the agent abort signal to streaming chunk fetch calls so that interrupting an agent cancels in-flight chunk sends immediately. AbortError is silently swallowed since it's the expected outcome.
If /generations/finish returns non-2xx or the network request fails, emit an explicit error event so the UI shows a visible failure instead of silently appearing done.
Add a promise-chain based per-session lock so concurrent delete, reset, and close operations serialize rather than race. Prevents interleaved lifecycle transitions from corrupting session state.
Flush the producer first to preserve global ordering, then append the user message directly to the stream. This avoids producer queue latency that was causing txid timeout errors on the client side (5s default timeout in stream-db awaitTxId).
Generate messageId client-side with crypto.randomUUID() instead of blocking on POST /generations/start. Eliminates a full HTTP round trip before the first token can stream.
Add writeChunks method to protocol and POST /chunks/batch endpoint that accepts an array of chunks in a single HTTP request. On the desktop side, replace the sequential per-chunk POST chain with a ChunkBatcher that coalesces chunks within a 5ms window (or 50-chunk max) before sending as a batch. This reduces HTTP round trips from N to ~N/batch_size during active streaming.
Reduce producer lingerMs from 5ms to 1ms since the desktop ChunkBatcher already coalesces at 5ms — avoids double-buffering latency. Add 10s timeout to flushSession so flush/finish cannot hang indefinitely on a stuck producer.
The /chunks/batch endpoint now does a lightweight array check instead of full Zod schema validation on every chunk — this is an authenticated internal path from the desktop client. ChunkBatcher now has a maxBufferSize (default 2000) that drops oldest chunks when the buffer exceeds the cap, preventing OOM when the network or proxy is slower than the agent.
ChunkBatcher now retries failed sendBatch calls up to 3 times with exponential backoff (50ms base). sendBatch callback throws on non-ok responses so the retry logic can catch transient failures. AbortError is rethrown immediately to respect cancellation.
Track per-session producer health. When a producer fires onError, mark it unhealthy and route subsequent writes through direct stream.append instead. A successful flush restores healthy status. Prevents cascading failures when the producer is in a bad state.
#33) Add startGeneration/getActiveGeneration/finishGeneration lifecycle to protocol. Chunk routes auto-register the generation from the first chunk if none is active. finishGeneration clears the active generation. Reset and delete also clean up generation state.
Include sessionId (and messageId where applicable) in both success and error responses from chunk and session routes for tracing.
Every error response now includes a machine-readable `code` field (SESSION_NOT_FOUND, WRITE_FAILED, FINISH_FAILED, INVALID_BODY, etc.) for deterministic client error handling.
30 of 51 items now done. Remaining items are larger architectural changes (14, 29, 31), operational (16-18), and observability/test/ rollout work (35-50).
Generation is now auto-registered from the first chunk written. Desktop generates messageId client-side, so this endpoint was dead code. Replaced with chunksBatch in the discovery listing.
message-end chunk = UI signal (isLoading → false) /generations/finish = server lifecycle cleanup (flush, seq clear, error drain) Both are required and always sent in that order.
🧹 Preview Cleanup CompleteThe following preview resources have been cleaned up:
Thank you for your contribution! 🎉 |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In
`@apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-execution.ts`:
- Around line 66-72: startSession and restoreSession accept permissionMode as an
unconstrained string which diverges from updateSessionConfig's enum validation
and agent-execution.ts then silently defaults to the permissive
"bypassPermissions"; fix by updating the input schemas for startSession and
restoreSession in index.ts to use the same zod enum/unions used by
updateSessionConfig (the same allowed values "default" | "acceptEdits" |
"bypassPermissions") so invalid strings are rejected, and change the nullish
coalescing in agent-execution.ts (permissionMode: (session.permissionMode as ...
) ?? "bypassPermissions") to either use a safer explicit default (e.g.,
"default") or leave undefined and require callers to supply a valid
mode—alternatively, if "bypassPermissions" is intentional for desktop, add a
clear comment next to that line explaining the security assumption.
In
`@apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-runner.ts`:
- Around line 63-66: The race occurs because an old run's finally
unconditionally calls this.deps.runningAgents.delete(sessionId) and can remove a
newly written AbortController; in startAgent/abortExistingAgent flows fix this
by making the finally block conditional: read const current =
this.deps.runningAgents.get(sessionId) and only delete if current ===
abortController (the controller created in this startAgent), and optionally
update abortExistingAgent to remove the map entry only when it is aborting the
same controller instance; reference startAgent, abortExistingAgent,
this.deps.runningAgents, AbortController and the finally block to locate where
to add the identity check before deleting the map entry.
In
`@apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-lifecycle.ts`:
- Around line 146-187: The catch block in startSession swallows errors from
ensureSessionReady and deps.store.create so callers can't detect failures; after
logging and calling this.deps.emitSessionError({ sessionId, error: message })
rethrow the original error (or throw a new Error(message)) so the returned
promise rejects, ensuring callers of startSession receive the failure; reference
startSession, ensureSessionReady, deps.store.create and deps.emitSessionError
when applying the change.
- Around line 283-290: The DELETE fetch to
`${this.deps.proxyUrl}/v1/sessions/${sessionId}` currently ignores the response
which can cause local cleanup to proceed when the remote delete failed; update
the logic around the fetch call in the session lifecycle (the block using
this.deps.proxyUrl, sessionId, headers) to capture the Response, check
response.ok (and response.status), log non-ok responses with details (status and
body/text), and abort or handle local archiving/cleanup accordingly (e.g., throw
or return on failure) so remote and local state remain consistent.
In
`@apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-manager.ts`:
- Around line 25-28: The local redeclaration of StartAgentInput duplicates the
type from agent-runner.ts; remove the local interface StartAgentInput and
instead import StartAgentInput from agent-runner.ts (where it is exported) and
update any references in this file (e.g., in session-manager functions that
accept StartAgentInput) to use the imported type to avoid drift and duplication.
🧹 Nitpick comments (8)
apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-stream-writer.ts (3)
89-133: Session recovery retries the fullmaxAttemptsagain — document or constrain.After a session-not-found recovery (Line 122), the retry call on Line 124 uses the same
maxAttemptsas the original. For callers passingmaxAttempts: 3, this means up to 6 total HTTP attempts (3 + recovery + 3). This is likely acceptable for resilience, but it could surprise callers. Consider adding a brief inline comment clarifying that the post-recovery retry budget is intentionally the same.
145-158:maxAttempts: 1inside sendBatch — confirm this is intentional given ChunkBatcher's own retry.The
sendBatchcallback usesmaxAttempts: 1forpostWithSessionRecovery, meaningpostJsonWithRetrywon't retry internally. This is fine sinceChunkBatcher.sendWithRetryretries the entire batch (up tomaxRetriestimes, default 3). However, this means session recovery (on 404) also won't trigger inside batcher retries — the batcher will just retry the same call that got a 404. If the remote session disappears mid-stream, every batcher retry will fail with 404.Consider whether session recovery should happen at least once within the batcher retry loop, e.g., by setting
maxAttempts: 2.
218-223: Extract hardcoded"claude"actor and"message-end"chunk type to named constants.
"claude"(Line 220) and"message-end"(Line 269) are repeated magic strings that represent protocol-level values. Extracting them to module-level constants improves discoverability and prevents typos.Proposed fix
At the top of the file (after line 9):
+const ACTOR_CLAUDE = "claude"; +const CHUNK_TYPE_MESSAGE_END = "message-end";Then replace usages:
batcher.push({ messageId, - actorId: "claude", + actorId: ACTOR_CLAUDE, role: "assistant", chunk, });const terminalChunkPayload = { messageId, - actorId: "claude", + actorId: ACTOR_CLAUDE, role: "assistant", - chunk: { type: "message-end" as const }, + chunk: { type: CHUNK_TYPE_MESSAGE_END as const }, };As per coding guidelines, "Extract hardcoded magic numbers, strings, and enums to named constants at module top instead of leaving them inline in logic."
apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-types.ts (1)
1-15: Consider a stricter union type forpermissionMode.Both interfaces type
permissionModeasstring, butagent-execution.ts(Line 68–72) casts it to"default" | "acceptEdits" | "bypassPermissions". Defining a shared union type here would catch invalid values at compile time and remove the need for theascast downstream.Proposed change
+export type PermissionMode = "default" | "acceptEdits" | "bypassPermissions"; + export interface ActiveSession { sessionId: string; cwd: string; model?: string; - permissionMode?: string; + permissionMode?: PermissionMode; maxThinkingTokens?: number; } export interface EnsureSessionReadyInput { sessionId: string; cwd: string; model?: string; - permissionMode?: string; + permissionMode?: PermissionMode; maxThinkingTokens?: number; }As per coding guidelines, "Maintain type safety by avoiding
anytypes unless absolutely necessary."apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-runner.ts (1)
88-95: Type assertions are safe here but could be avoided.
watchdog as GenerationWatchdogandbatcher as ChunkBatcherare safe since they're assigned on lines 80–81 before this code runs. However, you could avoid the casts by restructuring to keep them in scope afterprepareStream:Alternative structure (optional)
await this.execution.execute({ session, sessionId, prompt, abortController, onChunk: (chunk) => { this.streamWriter.onAssistantChunk({ - watchdog: watchdog as GenerationWatchdog, - batcher: batcher as ChunkBatcher, + watchdog: prepared.watchdog, + batcher: prepared.batcher, messageId, chunk, }); }, });This requires keeping
preparedin scope rather than destructuring into nullable locals.apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-execution.ts (1)
108-127: UnusedsessionIdparameter — consider using it for logging.
_sessionIdis destructured but ignored. The warning on Line 123 could include it for better diagnostics when multiple sessions are active.Proposed improvement
resolvePermission({ - sessionId: _sessionId, + sessionId, toolUseId, approved, updatedInput, }: ResolvePermissionInput): void { const result = approved ? { ... } : { behavior: "deny" as const, message: "User denied permission" }; const resolved = resolvePendingPermission({ toolUseId, result }); if (!resolved) { console.warn( - `[chat/session] No pending permission for toolUseId=${toolUseId}`, + `[chat/session] No pending permission for toolUseId=${toolUseId} in session ${sessionId}`, ); } }apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/session-lifecycle.ts (2)
297-302:updateSessionMetauses positional parameters instead of an object.Per coding guidelines: "Use object parameters for functions with 2 or more parameters."
Proposed fix
- async updateSessionMeta( - sessionId: string, - patch: UpdateSessionMetaPatch, - ): Promise<void> { - await this.deps.store.update(sessionId, patch); + async updateSessionMeta({ + sessionId, + patch, + }: { + sessionId: string; + patch: UpdateSessionMetaPatch; + }): Promise<void> { + await this.deps.store.update(sessionId, patch); }This would also require updating the caller in
session-manager.ts(Line 117).
304-341: Direct mutation of theActiveSessionobject is fine for in-memory state but fragile.
updateAgentConfigmutates the session object stored in theMapdirectly. This works because the map holds references, but it means any code holding a reference to the session object will see the mutations. If this is the intended pattern, it's fine — just be aware that replacing the map implementation (e.g., with immutable state) would break this silently.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/desktop/src/renderer/screens/main/components/WorkspaceView/ContentView/TabsContent/TabView/ChatPane/ChatInterface/ChatInterface.tsx (1)
318-326:⚠️ Potential issue | 🟡 Minor
handleStopshould also clear the pending-send timer.If the user stops during the window between send and stream start, the orphaned timer will fire, logging a misleading warning and redundantly clearing
isSending.Proposed fix
const handleStop = useCallback( (e: React.MouseEvent) => { e.preventDefault(); + clearSendPendingTimer(); setIsSending(false); interruptAgent.mutate({ sessionId }); stop(); }, - [interruptAgent, sessionId, stop], + [clearSendPendingTimer, interruptAgent, sessionId, stop], );
🧹 Nitpick comments (9)
apps/streams/src/routes/chunks.ts (3)
88-96:writeChunktakes 7 positional arguments — prefer an object parameter.This call passes 7 positional args, making it easy to mis-order
messageId/actorId/role. Consider refactoringwriteChunkto accept a single options object, consistent withwriteChunkson Line 197 which already uses{ sessionId, chunks }.
94-94:as nevercasts suppress type-checking on the data written to streams.Both the single-chunk path (Line 94) and batch path (Line 199) use
as neverto bypass the type system. This hides any mismatch between the parsed/unvalidatedchunkshape and whatwriteChunk/writeChunksactually expects, defeating compile-time safety on the write path.If the protocol method signatures accept a broader type than
z.infer<typeof chunkBodySchema>, align the Zod schema or add an explicit cast to the correct target type so the compiler can still catch regressions.Also applies to: 199-199
220-227: Silent catch for optional body parsing is acceptable but could be tightened.Swallowing all errors (including unexpected ones like OOM) to treat
messageIdas optional is pragmatic. A minor improvement: catch only JSON parse / Zod errors by checkingerror instanceof z.ZodError || error instanceof SyntaxError, so truly unexpected failures still surface.apps/streams/src/routes/chunks.test.ts (1)
19-121: Tests thoroughly cover generation-mismatch rejection — consider adding happy-path and other error-code tests.The three mismatch scenarios are well-structured and verify both the HTTP status and the response shape, plus assert that no writes occur. To round out coverage for the new route logic, consider adding:
- A happy-path test (no active generation →
startGenerationcalled, chunk written, 200 returned).SESSION_NOT_FOUNDwhengetSessionreturns falsy.INVALID_BODYfor malformed payloads.- Finish endpoint (
/:id/generations/finish) success andFINISH_FAILEDpaths.apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-stream-writer.ts (3)
61-69: String-based error detection is fragile — consider structured error matching.
isSessionNotFoundErrorrelies on substrings like"status 404"and"session_not_found"in the error message. If the upstream HTTP client or server changes its error formatting even slightly, this detection silently breaks and recovery is skipped, leading to permanent failures instead of transparent retries.If
postJsonWithRetrycan be made to throw a typed error (e.g., with astatusorcodeproperty), matching on that would be more robust. Otherwise, a brief comment documenting the expected error format contract would help future maintainers.
145-168: StaleproxyHeadersafter session recovery — subsequent batch sends reuse the original (potentially expired) headers.The
proxyHeadersclosure is captured once when the batcher is created. If a batch send triggers session recovery insidepostWithSessionRecovery, the retry uses freshly built headers — but the next batch send from the batcher still uses the original staleproxyHeaders. If auth tokens have expired, every subsequent send will fail → recover → retry, which is functionally correct but wasteful (O(n) recovery round-trips).Consider making
proxyHeadersmutable (e.g., stored in a{ current: headers }ref object) and updating it after each successful recovery so subsequent sends benefit from the refresh.Sketch
private createChunkBatcher({ sessionId, session, proxyHeaders, abortController, }: { sessionId: string; session: ActiveSession; - proxyHeaders: Record<string, string>; + proxyHeaders: { current: Record<string, string> }; abortController: AbortController; }): ChunkBatcher { return new ChunkBatcher({ sendBatch: async (chunks) => { await this.postWithSessionRecovery({ sessionId, session, url: `${this.deps.proxyUrl}/v1/sessions/${sessionId}/chunks/batch`, - headers: proxyHeaders, + headers: proxyHeaders.current, body: { chunks }, maxAttempts: 1, operation: "write chunk batch", signal: abortController.signal, + }).then(() => {}, (err) => { + // If recovery happened, postWithSessionRecovery already used refreshed headers. + // We could update proxyHeaders.current here if needed. + throw err; }); },
264-319: No abort signal passed to terminal chunk persistence — finalization can hang indefinitely on network issues.
persistTerminalChunkcallspostWithSessionRecoverywithout asignal, meaning if the network is unreachable these calls (up to 3 attempts each, on two tiers) will block until the underlying HTTP client times out (or never, depending on configuration). IfpostJsonWithRetrydoesn't enforce its own socket/response timeout, this could stall the entire finalization indefinitely.If this is an intentional "try hard" design decision for the finalization path, a brief comment would clarify intent. Otherwise, consider passing a dedicated
AbortSignalwith a generous timeout (e.g., 30s) to bound the worst case.apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-runner.ts (2)
89-96: Type assertions (as GenerationWatchdog,as ChunkBatcher) bypass null safety.
watchdogandbatcherare declared as| nullon lines 71-72 but cast away inside theonChunkcallback. This is safe only becauseonChunkis invoked after theprepareStreamawait assigns them. However, the assertions suppress the compiler's null-check, so a future refactor (e.g., makingexecutecallonChunkduring setup) would silently introduce a null-pointer bug.A lightweight alternative: assign inside
preparedscope and pass the non-null references directly.Sketch — avoid the type assertions
const prepared = await this.streamWriter.prepareStream({ sessionId, session, abortController, }); headers = prepared.headers; batcher = prepared.batcher; watchdog = prepared.watchdog; await this.execution.execute({ session, sessionId, prompt, abortController, onChunk: (chunk) => { this.streamWriter.onAssistantChunk({ - watchdog: watchdog as GenerationWatchdog, - batcher: batcher as ChunkBatcher, + watchdog: prepared.watchdog, + batcher: prepared.batcher, messageId, chunk, }); }, });
51-62: Early return on missing session silently drops the prompt — consider if the caller needs to know.When the session isn't found, the method emits a session error and returns
void— the caller (startAgent's invoker) gets a resolved promise with no indication the agent never ran. If the caller can handle a thrown error or a return value indicating failure, that would allow it to retry or surface the problem differently.Not critical if the emitted session error is sufficient for the UI, but worth confirming the caller's contract.
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/streams/src/protocol.ts (1)
534-563:⚠️ Potential issue | 🟠 Major
forkSessionshallow-copiessourceState, sharing mutable DB/collection instances between sessions.After
createSession(targetSessionId)initializes a freshsessionDB, line 550 immediately overwrites it with{ ...sourceState, ... }, causing both sessions to share the samesessionDB,messages,modelMessages, andchangeSubscriptionreferences. Closing or modifying one session's DB will corrupt the other.This appears to be an incomplete implementation (per the TODO on line 558). Consider either:
- Deep-initializing the target's state independently (using
initializeSessionState's results and only copying data), or- Guarding
forkSessionwith aNOT_IMPLEMENTEDerror until the copy logic is built.
🤖 Fix all issues with AI agents
In
`@apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-stream-writer.ts`:
- Around line 373-396: In finalizeGeneration, if persistTerminalChunk(...)
returns false you must return early and not call finishGeneration; update
finalizeGeneration (the method that calls persistTerminalChunk and
finishGeneration) to check terminalChunkPersisted and, after emitting the
session error via this.deps.emitSessionError(...), immediately return so
finishGeneration is not invoked when persistTerminalChunk fails.
In
`@apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/proxy-requests.ts`:
- Around line 83-88: The nonRetryable flag on the ProxyRequestError currently
treats all 4xx responses as non-retryable; update the logic around the thrown
ProxyRequestError (the block that constructs message, status, code,
nonRetryable) to exclude known retryable 4xx codes such as 429 and 408 so they
are considered retryable (i.e., set nonRetryable to false for res.status === 429
or === 408), while keeping other 4xx codes non-retryable.
In `@apps/streams/src/protocol.ts`:
- Around line 57-63: recordProducerError currently always sets
this.producerHealthy.set(sessionId, false) even if deleteSession removed the
session, which can re-insert stale entries; update recordProducerError to first
verify the session exists in your maps (e.g. check
this.producerErrors.has(sessionId) or this.producerHealthy.has(sessionId)) and
only push the error and flip healthy-to-false when the session is present,
otherwise bail out; refer to the method name recordProducerError and the maps
producerErrors and producerHealthy to locate and guard the write.
- Around line 112-147: deleteSession currently omits cleaning up messageSeqs,
causing orphaned entries; before removing the maps at the end of deleteSession,
mirror resetSession by retrieving this.activeGenerationIds.get(sessionId) (or
the single active generation id used for the session) and for each generation id
remove/clear any message IDs stored in this.messageSeqs for that generation,
then proceed to delete this.activeGenerationIds, this.producers,
this.sessionStates, etc.; update deleteSession to perform this messageSeqs
cleanup (use the same logic as resetSession) so messageSeqs no longer
accumulates orphaned entries.
- Around line 354-383: finishGeneration currently only calls clearSeq when an
explicit messageId is provided, which leaves the active generation's sequence
counter in messageSeqs orphaned when finishGeneration is invoked with messageId
undefined; update finishGeneration so that when you delete the active generation
from activeGenerationIds you also clear its sequence counter: if messageId is
provided call clearSeq(messageId) as now, but when you detect activeMessageId
and either messageId is undefined or different, call clearSeq(activeMessageId)
before deleting activeGenerationIds.delete(sessionId) (avoid double-clearing
when messageId === activeMessageId).
In `@apps/streams/src/routes/auth.ts`:
- Around line 12-19: The login handler currently swallows JSON parse errors when
calling c.req.json(); update that catch block to log the error with context
(e.g., use console.error or the existing logging convention) before returning
the 400 response—mirror the logout handler's behavior which logs "[AUTH] Failed
to parse logout body:"; reference the login parse site (the try around
c.req.json(), and the sessionId variable) and include the parse error in the log
message (e.g., "[AUTH] Failed to parse login body:" + parseError) so the error
is not silently discarded.
🧹 Nitpick comments (12)
apps/streams/src/routes/chunks.ts (2)
108-116:writeChunktakes 7 positional arguments — prefer an object parameter.This call is hard to read and easy to mis-order. The coding guidelines require object parameters for functions with 2+ parameters. The
as nevercast onchunkalso silently discards type checking.Consider refactoring
writeChunkto accept an object:Suggested call-site shape
- await protocol.writeChunk( - stream, - sessionId, - messageId, - actorId, - role, - chunk as never, - txid, - ); + await protocol.writeChunk({ + stream, + sessionId, + messageId, + actorId, + role, + chunk, + txid, + });This would also eliminate the need for the
as nevercast if the parameter type is properly typed in the protocol interface. As per coding guidelines, "Use object parameters for functions with 2 or more parameters instead of positional arguments".#!/bin/bash # Check the writeChunk signature in the protocol to understand the current interface ast-grep --pattern 'writeChunk($$$) { $$$ }' rg -n 'writeChunk' --type=ts -C2
245-255: Redundant guard —firstMessageIdis guaranteed to be a non-empty string here.The validation loop (Lines 154–219) already ensures every element has a
stringmessageId, and Line 143 ensures the array is non-empty. Sochunks[0]?.messageIdis always a truthy string at this point, making the!firstMessageIdbranch dead code.Simplification
- const firstMessageId = chunks[0]?.messageId; - if (!firstMessageId) { - return c.json( - { - error: "Each chunk must include messageId", - code: "INVALID_BODY", - sessionId, - }, - 400, - ); - } + const firstMessageId = chunks[0].messageId;apps/streams/src/routes/auth.ts (5)
21-25: Redundant and misleading type assertion.
bodyis already typed on Line 11 with optional fields. Theascast here assertsactorIdanddeviceIdare required strings, which the compiler trusts without proof. The runtime check on Line 27 is the actual guard. Remove the cast and destructure directly frombody.Proposed fix
- const { actorId, deviceId, name } = body as { - actorId: string; - deviceId: string; - name?: string; - }; + const { actorId, deviceId, name } = body;
51-52: Inconsistent log prefix — missing[AUTH]domain tag.The logout error handler (Line 145) uses
[AUTH]prefix but this login handler omits it. Per coding guidelines, use[domain/operation]consistently.Proposed fix
- console.error("Failed to login:", error); + console.error("[AUTH] Failed to login:", error);As per coding guidelines, "Use prefixed console logging with consistent context pattern: [domain/operation] message."
68-79: Inconsistent JSON parsing — login usesc.req.json(), logout uses manualtext()+JSON.parse().Both achieve the same result. Consider aligning on
c.req.json()for consistency with the login route.Proposed fix
- const rawBody = await c.req.text(); - let body: { actorId?: string; deviceId?: string; allDevices?: boolean }; try { - body = JSON.parse(rawBody); - } catch (parseError) { - console.error("[AUTH] Failed to parse logout body:", parseError); + body = await c.req.json(); + } catch (error) { + console.error("[AUTH] Failed to parse logout body:", error); return c.json( { error: "Invalid JSON body", code: "INVALID_BODY", sessionId }, 400, ); }
50-50: Success responses omitsessionId.Error responses consistently include
sessionId, but success responses at Lines 50, 126, and 142 do not. If the intent (per PR objectives: "sessionId included in responses") is to always include it, these should be updated too.
11-36: Consider Zod schemas for body validation at this API boundary.Both login and logout handlers use manual type annotations and
ifchecks. Using Zod would eliminate the unsafeascast, provide better type narrowing after.parse(), and align with the guideline to use Zod for API route bodies at boundaries.Example for login:
import { z } from "zod"; const loginBodySchema = z.object({ actorId: z.string().min(1), deviceId: z.string().min(1), name: z.string().optional(), });Then
loginBodySchema.safeParse(body)gives you validated + narrowed types in one step.As per coding guidelines, "Use Zod schemas for validating tRPC inputs and API route bodies at boundaries."
apps/streams/src/protocol.ts (3)
83-92: Extract producer configuration constants.
lingerMs: 1andmaxInFlight: 5are tuning knobs that would be clearer as named constants alongsideFLUSH_TIMEOUT_MS. As per coding guidelines, "Extract hardcoded magic numbers, strings, and enums to named constants at module top instead of leaving them inline in logic."
232-258:writeChunkhas 7 positional parameters — prefer an object parameter.
writeChunksalready uses the object-params pattern.writeChunk(and similarlywriteUserMessage,writeToolResult,writeApprovalResponse,writePresence) still uses positional args with an unused_streamparameter. Consider aligning with the same object-param style to improve readability and make the unused_streameasier to remove. As per coding guidelines, "Use object parameters for functions with 2 or more parameters instead of positional arguments."
292-313: Fallback path inappendToStreamis correct but consider the implication.When the producer is unhealthy, each call falls through to
stream.append(data)— a direct network write. InwriteChunks, this means N sequential network round-trips in the degraded path. This is acceptable as a resilience fallback, but worth noting for observability (e.g., a log when falling back).apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-runner.ts (1)
92-99: Type assertionsas GenerationWatchdog/as ChunkBatcherbypass null safety.These are safe at runtime because
onChunkis only invoked afterprepareStreamsucceeds (which initializes both). However, the assertions mask the| nulltype. A small restructure could avoid them.Optional: capture non-null references before execute
headers = prepared.headers; batcher = prepared.batcher; watchdog = prepared.watchdog; + const activeWatchdog = watchdog; + const activeBatcher = batcher; await this.execution.execute({ session, sessionId, prompt, abortController, onChunk: (chunk) => { this.streamWriter.onAssistantChunk({ - watchdog: watchdog as GenerationWatchdog, - batcher: batcher as ChunkBatcher, + watchdog: activeWatchdog, + batcher: activeBatcher, messageId, chunk, }); }, });apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-execution.ts (1)
35-41: Eliminate redundantsessionIdparameter inExecuteAgentInput.
ActiveSessionalready containssessionId. Replace the separatesessionIdparameter withsession.sessionIdthroughout theexecute()method (lines 61, 74, 88) to reduce redundancy and eliminate divergence risk.
| async finalizeGeneration({ | ||
| sessionId, | ||
| session, | ||
| messageId, | ||
| headers, | ||
| }: FinalizeInput): Promise<void> { | ||
| if (!headers) return; | ||
|
|
||
| const terminalChunkPersisted = await this.persistTerminalChunk({ | ||
| sessionId, | ||
| session, | ||
| messageId, | ||
| headers, | ||
| }); | ||
| if (!terminalChunkPersisted) { | ||
| this.deps.emitSessionError({ | ||
| sessionId, | ||
| error: | ||
| "Assistant completion marker failed to persist. Message may stay loading.", | ||
| }); | ||
| } | ||
|
|
||
| await this.finishGeneration({ sessionId, session, messageId, headers }); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n "finish" apps/streams/src/routes/ --type=ts -C 5 | head -100Repository: superset-sh/superset
Length of output: 3835
🏁 Script executed:
# Search for finishGeneration implementation
rg -n "finishGeneration" apps/desktop/src/lib/trpc/routers/ai-chat/ --type=ts -A 10Repository: superset-sh/superset
Length of output: 1632
🏁 Script executed:
# Search for persistTerminalChunk implementation
rg -n "persistTerminalChunk" apps/desktop/src/lib/trpc/routers/ai-chat/ --type=ts -B 3 -A 10Repository: superset-sh/superset
Length of output: 3199
🏁 Script executed:
# Get the full finishGeneration implementation
sed -n '339,370p' apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-stream-writer.tsRepository: superset-sh/superset
Length of output: 850
🏁 Script executed:
# Search for protocol.finishGeneration server implementation
rg -n "finishGeneration" apps/streams/src/ --type=ts -B 5 -A 20Repository: superset-sh/superset
Length of output: 5059
🏁 Script executed:
# Check if there's any documentation or comments about terminal chunk requirements
rg -n "terminal" apps/streams/src/ --type=ts -B 3 -A 3Repository: superset-sh/superset
Length of output: 576
Correct: finalizeGeneration should not call finishGeneration if persistTerminalChunk fails.
Per the protocol documentation, both the terminal chunk (message-end) and the finish signal are required and interdependent: the terminal chunk marks visual completion for the UI, while finish marks the durable-state cleanup boundary. Calling finish without a persisted terminal chunk violates this contract and leaves the generation in an inconsistent state—the server marks generation complete but the client/UI never receives the completion marker.
Return early if persistTerminalChunk fails instead of proceeding to finishGeneration.
🤖 Prompt for AI Agents
In
`@apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/agent-stream-writer.ts`
around lines 373 - 396, In finalizeGeneration, if persistTerminalChunk(...)
returns false you must return early and not call finishGeneration; update
finalizeGeneration (the method that calls persistTerminalChunk and
finishGeneration) to check terminalChunkPersisted and, after emitting the
session error via this.deps.emitSessionError(...), immediately return so
finishGeneration is not invoked when persistTerminalChunk fails.
| throw new ProxyRequestError({ | ||
| message: `${operation} failed: status ${res.status}${detail ? ` (${detail.slice(0, 300)})` : ""}`, | ||
| status: res.status, | ||
| code, | ||
| nonRetryable: res.status >= 400 && res.status < 500, | ||
| }); |
There was a problem hiding this comment.
Blanket 4xx = non-retryable excludes retryable status codes like 429 and 408.
HTTP 429 (Too Many Requests) and 408 (Request Timeout) are conventionally retryable. The current check res.status >= 400 && res.status < 500 will short-circuit retries for these.
Proposed fix
+const RETRYABLE_4XX = new Set([408, 429]);
+
throw new ProxyRequestError({
message: `${operation} failed: status ${res.status}${detail ? ` (${detail.slice(0, 300)})` : ""}`,
status: res.status,
code,
- nonRetryable: res.status >= 400 && res.status < 500,
+ nonRetryable: res.status >= 400 && res.status < 500 && !RETRYABLE_4XX.has(res.status),
});📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| throw new ProxyRequestError({ | |
| message: `${operation} failed: status ${res.status}${detail ? ` (${detail.slice(0, 300)})` : ""}`, | |
| status: res.status, | |
| code, | |
| nonRetryable: res.status >= 400 && res.status < 500, | |
| }); | |
| const RETRYABLE_4XX = new Set([408, 429]); | |
| throw new ProxyRequestError({ | |
| message: `${operation} failed: status ${res.status}${detail ? ` (${detail.slice(0, 300)})` : ""}`, | |
| status: res.status, | |
| code, | |
| nonRetryable: res.status >= 400 && res.status < 500 && !RETRYABLE_4XX.has(res.status), | |
| }); |
🤖 Prompt for AI Agents
In
`@apps/desktop/src/lib/trpc/routers/ai-chat/utils/session-manager/proxy-requests.ts`
around lines 83 - 88, The nonRetryable flag on the ProxyRequestError currently
treats all 4xx responses as non-retryable; update the logic around the thrown
ProxyRequestError (the block that constructs message, status, code,
nonRetryable) to exclude known retryable 4xx codes such as 429 and 408 so they
are considered retryable (i.e., set nonRetryable to false for res.status === 429
or === 408), while keeping other 4xx codes non-retryable.
| private recordProducerError(sessionId: string, err: unknown): void { | ||
| const errors = this.producerErrors.get(sessionId); | ||
| if (errors) { | ||
| errors.push(err instanceof Error ? err : new Error(String(err))); | ||
| } | ||
| this.producerHealthy.set(sessionId, false); | ||
| } |
There was a problem hiding this comment.
recordProducerError writes to producerHealthy even if the session was already deleted.
If the producer's onError callback fires after deleteSession has cleaned up all maps, line 62 will re-insert a stale entry into producerHealthy. Consider guarding with an existence check:
Proposed fix
private recordProducerError(sessionId: string, err: unknown): void {
const errors = this.producerErrors.get(sessionId);
- if (errors) {
- errors.push(err instanceof Error ? err : new Error(String(err)));
+ if (!errors) {
+ // Session already deleted; ignore late callback
+ return;
}
+ errors.push(err instanceof Error ? err : new Error(String(err)));
this.producerHealthy.set(sessionId, false);
}🤖 Prompt for AI Agents
In `@apps/streams/src/protocol.ts` around lines 57 - 63, recordProducerError
currently always sets this.producerHealthy.set(sessionId, false) even if
deleteSession removed the session, which can re-insert stale entries; update
recordProducerError to first verify the session exists in your maps (e.g. check
this.producerErrors.has(sessionId) or this.producerHealthy.has(sessionId)) and
only push the error and flip healthy-to-false when the session is present,
otherwise bail out; refer to the method name recordProducerError and the maps
producerErrors and producerHealthy to locate and guard the write.
| async deleteSession(sessionId: string): Promise<void> { | ||
| return this.withSessionLock(sessionId, async () => { | ||
| const producer = this.producers.get(sessionId); | ||
| if (producer) { | ||
| try { | ||
| await producer.flush(); | ||
| } catch (err) { | ||
| console.error( | ||
| `[protocol] Failed to flush producer for ${sessionId}:`, | ||
| err, | ||
| ); | ||
| } | ||
| try { | ||
| await producer.detach(); | ||
| } catch (err) { | ||
| console.error( | ||
| `[protocol] Failed to detach producer for ${sessionId}:`, | ||
| err, | ||
| ); | ||
| } | ||
| this.producers.delete(sessionId); | ||
| } | ||
|
|
||
| const state = this.sessionStates.get(sessionId); | ||
| if (state) { | ||
| state.changeSubscription?.unsubscribe(); | ||
| state.sessionDB.close(); | ||
| } | ||
|
|
||
| this.streams.delete(sessionId); | ||
| this.sessionStates.delete(sessionId); | ||
| this.streams.delete(sessionId); | ||
| this.sessionStates.delete(sessionId); | ||
| this.producerErrors.delete(sessionId); | ||
| this.producerHealthy.delete(sessionId); | ||
| this.activeGenerationIds.delete(sessionId); | ||
| }); | ||
| } |
There was a problem hiding this comment.
deleteSession does not clean up messageSeqs entries for the session.
resetSession carefully deletes messageSeq entries for active generations, but deleteSession skips this. Over repeated create/delete cycles, orphaned entries in messageSeqs will accumulate (keyed by messageId). Consider mirroring the cleanup logic from resetSession before deleting the other maps — or at minimum clearing the active generation's messageId.
Proposed fix (add before the map deletions)
+ const activeMessageId = this.activeGenerationIds.get(sessionId);
+ if (activeMessageId) {
+ this.messageSeqs.delete(activeMessageId);
+ }
+
this.streams.delete(sessionId);🤖 Prompt for AI Agents
In `@apps/streams/src/protocol.ts` around lines 112 - 147, deleteSession currently
omits cleaning up messageSeqs, causing orphaned entries; before removing the
maps at the end of deleteSession, mirror resetSession by retrieving
this.activeGenerationIds.get(sessionId) (or the single active generation id used
for the session) and for each generation id remove/clear any message IDs stored
in this.messageSeqs for that generation, then proceed to delete
this.activeGenerationIds, this.producers, this.sessionStates, etc.; update
deleteSession to perform this messageSeqs cleanup (use the same logic as
resetSession) so messageSeqs no longer accumulates orphaned entries.
| async finishGeneration({ | ||
| sessionId, | ||
| messageId, | ||
| }: { | ||
| sessionId: string; | ||
| messageId?: string; | ||
| }): Promise<void> { | ||
| await this.flushSession(sessionId); | ||
|
|
||
| if (messageId) { | ||
| this.clearSeq(messageId); | ||
| } | ||
| const activeMessageId = this.activeGenerationIds.get(sessionId); | ||
| if (!activeMessageId) { | ||
| // no-op | ||
| } else if (!messageId || messageId === activeMessageId) { | ||
| this.activeGenerationIds.delete(sessionId); | ||
| } else { | ||
| console.warn( | ||
| `[protocol] Ignoring stale finish for ${sessionId}: got ${messageId}, active is ${activeMessageId}`, | ||
| ); | ||
| } | ||
|
|
||
| const errors = this.drainProducerErrors(sessionId); | ||
| if (errors.length > 0) { | ||
| throw new Error( | ||
| `Producer encountered ${errors.length} background error(s) during generation: ${errors.map((e) => e.message).join("; ")}`, | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
finishGeneration without messageId skips clearSeq for the active generation.
When messageId is undefined, line 363 skips clearSeq, but lines 369–370 still delete the active generation. This leaves the active messageId's sequence counter orphaned in messageSeqs. If that messageId is ever reused, it would resume from the old sequence number instead of 0.
Proposed fix
async finishGeneration({
sessionId,
messageId,
}: {
sessionId: string;
messageId?: string;
}): Promise<void> {
await this.flushSession(sessionId);
- if (messageId) {
- this.clearSeq(messageId);
- }
const activeMessageId = this.activeGenerationIds.get(sessionId);
+ const idToClear = messageId ?? activeMessageId;
+ if (idToClear) {
+ this.clearSeq(idToClear);
+ }
if (!activeMessageId) {
// no-op
} else if (!messageId || messageId === activeMessageId) {🤖 Prompt for AI Agents
In `@apps/streams/src/protocol.ts` around lines 354 - 383, finishGeneration
currently only calls clearSeq when an explicit messageId is provided, which
leaves the active generation's sequence counter in messageSeqs orphaned when
finishGeneration is invoked with messageId undefined; update finishGeneration so
that when you delete the active generation from activeGenerationIds you also
clear its sequence counter: if messageId is provided call clearSeq(messageId) as
now, but when you detect activeMessageId and either messageId is undefined or
different, call clearSeq(activeMessageId) before deleting
activeGenerationIds.delete(sessionId) (avoid double-clearing when messageId ===
activeMessageId).
| try { | ||
| body = await c.req.json(); | ||
| } catch { | ||
| return c.json( | ||
| { error: "Invalid JSON body", code: "INVALID_BODY", sessionId }, | ||
| 400, | ||
| ); | ||
| } |
There was a problem hiding this comment.
Login parse error is silently swallowed; logout logs it.
Line 14 discards the parse error without logging, while the equivalent logout handler (Line 74) correctly logs with console.error("[AUTH] Failed to parse logout body:", parseError). This violates the "never swallow errors silently" guideline.
Proposed fix
try {
body = await c.req.json();
- } catch {
+ } catch (parseError) {
+ console.error("[AUTH] Failed to parse login body:", parseError);
return c.json(
{ error: "Invalid JSON body", code: "INVALID_BODY", sessionId },
400,
);
}As per coding guidelines, "Never swallow errors silently; at minimum log errors with context before rethrowing or handling them explicitly."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try { | |
| body = await c.req.json(); | |
| } catch { | |
| return c.json( | |
| { error: "Invalid JSON body", code: "INVALID_BODY", sessionId }, | |
| 400, | |
| ); | |
| } | |
| try { | |
| body = await c.req.json(); | |
| } catch (parseError) { | |
| console.error("[AUTH] Failed to parse login body:", parseError); | |
| return c.json( | |
| { error: "Invalid JSON body", code: "INVALID_BODY", sessionId }, | |
| 400, | |
| ); | |
| } |
🤖 Prompt for AI Agents
In `@apps/streams/src/routes/auth.ts` around lines 12 - 19, The login handler
currently swallows JSON parse errors when calling c.req.json(); update that
catch block to log the error with context (e.g., use console.error or the
existing logging convention) before returning the 400 response—mirror the logout
handler's behavior which logs "[AUTH] Failed to parse logout body:"; reference
the login parse site (the try around c.req.json(), and the sessionId variable)
and include the parse error in the log message (e.g., "[AUTH] Failed to parse
login body:" + parseError) so the error is not silently discarded.
Summary
Addresses 32 of 51 items from the streaming performance and reliability recommendations. Fixes critical correctness bugs, reduces streaming latency, adds retry/fallback resilience, and standardizes the API surface.
/generations/startround trip, batch chunk endpoint with desktop-side coalescing (ChunkBatcher), producer tuning (1ms linger), Zod bypass on hot path, bounded memory queue/generations/startendpoint, documented terminal semanticsChanges
apps/streams/src/protocol.tswithSessionLock) for delete/reset serializationrecordProducerError/drainProducerErrors) — finish now throws on background errorsappendToStreamsingle write path with producer health fallback tostream.appendwriteChunksbatch method,startGeneration/finishGenerationlifecyclePromise.raceapps/streams/src/routes/chunks.ts/chunks/batchendpoint (skips Zod, lightweight array validation)/generations/startendpoint (auto-registers from first chunk)SESSION_NOT_FOUND,WRITE_FAILED,FINISH_FAILED,INVALID_BODY) and sessionId/messageId in all responsesapps/desktop/.../session-manager/ChunkBatcherclass: 5ms linger, 50 max batch, 2000 max buffer, 3 retries with exponential backoff (50ms base)sendBatchthrows on non-ok so retry logic can catch transient failuresmessageIdgeneration (no/generations/startround trip)res.okcheck on finish with error event emissionapps/streams/src/routes/sessions.ts,auth.ts,server.ts,types.tsStreamChunk:message-end= UI signal,/finish= server cleanupgenerationsStartwithchunksBatch)Test Plan
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests
Chores