Skip to content

Add durable session transport#1241

Merged
Kitenite merged 6 commits into
mainfrom
kitenite/migrate-client
Feb 5, 2026
Merged

Add durable session transport#1241
Kitenite merged 6 commits into
mainfrom
kitenite/migrate-client

Conversation

@Kitenite
Copy link
Copy Markdown
Collaborator

@Kitenite Kitenite commented Feb 5, 2026

Description

Related Issues

Type of Change

  • Bug fix
  • New feature
  • Documentation
  • Refactor
  • Other (please describe):

Testing

Screenshots (if applicable)

Additional Notes

Summary by CodeRabbit

  • New Features

    • Added durable-session package: persistent durable chat sessions with real-time sync, presence, multi-agent support, session forking, session stats, LLM-ready message collections, and React bindings (hook + chat UI components).
    • Improved message materialization for reliable streaming and model-ready inputs.
  • Chores

    • Removed legacy ai-chat public exports and older stream APIs/components; public surface consolidated under durable-session.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 5, 2026

Caution

Review failed

The pull request is closed.

📝 Walkthrough

Walkthrough

Adds a new private package @superset/durable-session implementing a durable, stream-backed chat client, typed session DB and schema, materialized/derived collections, React bindings (useDurableChat) and UI components; concurrently removes many ai-chat stream modules and related re-exports/tests.

Changes

Cohort / File(s) Summary
Package manifest & config
packages/durable-session/package.json, packages/durable-session/tsconfig.json
Add package manifest and TypeScript configuration for the new durable-session package.
Public barrels & top-level exports
packages/durable-session/src/index.ts, packages/durable-session/src/react/index.ts, packages/durable-session/src/collections/index.ts, packages/durable-session/src/react/components/index.ts
New entry points and re-exports exposing client, collections, types, hooks, and React components.
Client implementation
packages/durable-session/src/client.ts
New DurableChatClient class and createDurableChatClient factory with lifecycle, optimistic actions, proxy POSTs, session sync and server actions.
Session DB & schema
packages/durable-session/src/collection.ts, packages/durable-session/src/schema.ts
SessionDB factory (createSessionDB), state schemas (chunks, presence, agents), and chunk key helpers (getChunkKey, parseChunkKey).
Types surface
packages/durable-session/src/types.ts
Comprehensive exported types for messages, chunks, meta, stats, agents, collections and client/options.
Materialization & utilities
packages/durable-session/src/materialize.ts
Chunk parsing and message materialization (whole-message + streamed), text extraction and message classification utilities.
Derived collections
packages/durable-session/src/collections/...
messages.ts, active-generations.ts, model-messages.ts, presence.ts, session-meta.ts, session-stats.ts
New root messages collection and multiple derived/live collection factories (toolCalls, pendingApprovals, toolResults, activeGenerations, model-ready messages, presence aggregation, session-meta, session-stats).
React hook & types
packages/durable-session/src/react/use-durable-chat.ts, packages/durable-session/src/react/types.ts
New SSR-safe useDurableChat hook, hook option/return types, lifecycle and action APIs.
React components
packages/durable-session/src/react/components/PresenceBar/PresenceBar.tsx, packages/durable-session/src/react/components/PresenceBar/index.ts, packages/durable-session/src/react/components/index.ts
PresenceBar and ChatInput re-exports added; PresenceUser interface defined in PresenceBar.
Removed: ai-chat stream surface
packages/ai-chat/package.json (deleted), packages/ai-chat/src/stream/**, packages/ai-chat/src/components/index.ts, packages/ai-chat/src/index.ts, packages/ai-chat/src/types.ts
Removal of ai-chat package manifest and many stream-related modules (client, actions, materialize, schema, hooks, useCollectionData), tests, and component re-exports—reduces ai-chat public surface.
Small cross-package re-exports
packages/durable-session/src/react/components/PresenceBar/index.ts, packages/ai-chat/src/components/PresenceBar/index.ts
Added PresenceBar re-export in durable-session; corresponding export removed from ai-chat.

Sequence Diagram(s)

sequenceDiagram
    participant User as User (UI)
    participant Hook as useDurableChat
    participant Client as DurableChatClient
    participant DB as SessionDB & Collections
    participant Proxy as Proxy Server
    participant Stream as Stream Server

    User->>Hook: sendMessage(content)
    Hook->>Client: sendMessage(content)
    Client->>DB: optimistic append (chunks)
    DB->>DB: materialize -> messages collection emits
    Client->>Proxy: POST /messages
    Proxy->>Stream: persist chunks
    Stream-->>Proxy: { txId }
    Proxy-->>Client: { txId }
    Client->>DB: mark synced (txId)
    DB-->>Hook: collections updated
    Hook-->>User: render updated messages
Loading
sequenceDiagram
    participant App as React App
    participant Hook as useDurableChat
    participant Client as DurableChatClient
    participant DB as SessionDB
    participant Derived as Derived Collections
    participant Proxy as Proxy Server

    App->>Hook: mount(sessionId, proxyUrl)
    Hook->>Client: create/reuse client
    Client->>DB: instantiate SessionDB (synchronous)
    Hook->>Client: connect()
    Client->>Proxy: ensure session & preload
    Proxy-->>Client: session metadata
    Client->>DB: preload stream subscription
    DB->>Derived: materialize -> derived collections update
    Derived-->>Hook: snapshots
    Hook-->>App: provide messages, status, actions
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

"I hopped through streams with nimble feet,
Chunks stitched into messages, whole and neat.
Hooks hum softly, clients hold the thread —
Optimistic carrots keep the session fed.
A rabbit cheers: durable chats ahead! 🐇"

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is entirely empty, containing only the template placeholders with no actual information filled in about the changes, rationale, testing, or related issues. Complete the PR description by filling in all relevant sections: describe the migration of components and packages, link related issues, specify the change type (refactor/new feature), document testing performed, and provide context for the significant structural changes.
Title check ❓ Inconclusive The title 'Add durable session transport' is vague and generic, using non-descriptive terminology that does not clearly convey what 'durable session transport' means or what the actual change accomplishes. Consider a more specific title like 'Migrate durable-session package and remove ai-chat package' or 'Move Chat components to durable-session and remove legacy ai-chat' to better describe the primary changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch kitenite/migrate-client

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Feb 5, 2026

🚀 Preview Deployment

🔗 Preview Links

Service Status Link
Neon Database (Neon) View Branch
Fly.io Electric (Fly.io) View App
Vercel API (Vercel) Open Preview
Vercel Web (Vercel) Open Preview
Vercel Marketing (Vercel) Open Preview
Vercel Admin (Vercel) Open Preview
Vercel Docs (Vercel) Open Preview

Preview updates automatically with new commits

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

🤖 Fix all issues with AI agents
In `@packages/durable-session/src/client.ts`:
- Around line 465-472: The clear() method currently only calls
this.options.onMessagesChange([]) but leaves the underlying message store
intact; update durable-session's clear() to also empty the in-memory collection
(e.g. reset this._collections.messages to an empty array or call its clear()
method if it's a Map/Set) before invoking this.options.onMessagesChange, so
client.messages reflects the cleared state; reference the clear() method and
this._collections.messages (and the options.onMessagesChange callback) when
making the change.
- Around line 640-642: The function that is declared to return
Promise<ForkResult> currently returns response.json() directly; change it to
await the JSON parse (e.g., const result = await response.json(); return result)
so the function resolves to a ForkResult instead of returning a Promise<
ForkResult> from response.json(); ensure the enclosing function is async (or
already is) and keep the return type as Promise<ForkResult>.
- Around line 744-761: The pause()/resume() methods currently only flip
_isPaused but never affect the stream; update them to actually pause/resume the
underlying stream by delegating to the stream DB or managing the AbortSignal
used by createStreamDB(): if the _db instance exposes pause()/resume() call
those in pause() and resume(); otherwise store and manage an AbortController
(e.g., _streamAbortController) created when connecting (in
connect()/createStreamDB()) and call abort() to pause and recreate/reset the
controller and restart the stream in resume(); also ensure the sync logic checks
_isPaused (or the controller state) before processing events so pausing truly
stops work. Refer to pause(), resume(), _isPaused, _isConnected, connect(), _db,
createStreamDB(), and signal when implementing.

In `@packages/durable-session/src/collection.ts`:
- Around line 98-113: The stream URL in createSessionDB is built by
interpolating sessionId directly into streamUrl which can corrupt the path if
sessionId contains characters like '/', '?', or '#'; update createSessionDB to
safely encode the sessionId when constructing the URL (e.g., use new URL() and
encodeURIComponent(sessionId) or equivalent) so the value passed into
createStreamDB's streamOptions.url is a valid URL; apply the same encoding
approach anywhere sessionId is interpolated into URLs (e.g., client.ts methods)
to prevent path injection.
- Around line 133-153: Extract the delimiter ':' into a named constant (e.g.,
CHUNK_KEY_DELIMITER) and update both getChunkKey and parseChunkKey to use it;
refactor getChunkKey to accept a single options object parameter (e.g., {
messageId, seq }) instead of two positional args to follow the object-parameter
guideline; in parseChunkKey replace parseInt with a strict numeric validation
(e.g., a regex that matches only digits) before converting to Number and return
null on any non-matching seq; ensure parseChunkKey still locates the last
delimiter (using CHUNK_KEY_DELIMITER) and returns { messageId, seq } or null.

In `@packages/durable-session/src/collections/active-generations.ts`:
- Around line 29-37: Remove the unused placeholder fields lastChunkSeq and
lastChunkAt from the ActiveGenerationRow type and from the
messageToActiveGeneration function so the derived ActiveGenerationRow only
contains messageId, actorId, and startedAt; update the messageToActiveGeneration
implementation to stop populating lastChunkSeq/lastChunkAt and adjust any
type/interface declarations named ActiveGenerationRow accordingly, or if you
prefer to keep them for legacy reasons, mark them as deprecated in the type docs
(but do not populate them in messageToActiveGeneration).

In `@packages/durable-session/src/collections/messages.ts`:
- Around line 99-163: Extract the repeated literal part-type strings into
module-level constants (e.g., const PART_TYPE_TOOL_CALL = 'tool-call' and const
PART_TYPE_TOOL_RESULT = 'tool-result') and replace occurrences of the hardcoded
strings inside createToolCallsCollection, createPendingApprovalsCollection, and
createToolResultsCollection (and any type guards like the ToolCallPart checks)
to reference these constants; ensure exports or naming follow existing file
conventions and update all .type === 'tool-call' and .type === 'tool-result'
comparisons to use the new constants to avoid typos.

In `@packages/durable-session/src/collections/presence.ts`:
- Around line 51-76: Extract the string literals 'online', 'user', and 'agent'
into module-level constants (e.g., PRESENCE_STATUS_ONLINE, ACTOR_TYPE_USER,
ACTOR_TYPE_AGENT) and replace their inline uses in the query pipeline and return
object (references: presence.status, grouped.actorId, actorPresence, first) with
those constants; additionally add a small type-guard function (e.g.,
isValidActorType(value): value is 'user' | 'agent') and use it when deriving
actorType from first?.actorType so you defensively choose first.actorType if
valid or fall back to ACTOR_TYPE_USER before casting in the return object
(affecting RawPresenceRow handling and the actorType assignment).
- Around line 48-56: The grouped subquery (built from rawPresenceCollection and
assigned to grouped) only selects deviceCount which lets metadata-only updates
be ignored; update the grouped.select to also include a change discriminator
such as max(presence.lastSeenAt) or a content hash (e.g., max(lastSeenAt) AS
lastSeenAt or metadataHash) so the grouped row changes when device metadata
changes, and adjust the downstream fn.select/PresenceRow mapping to use that
lastSeenAt/hash when choosing the canonical device metadata (or to trigger
recomputation); ensure the symbols mentioned (grouped, presence,
rawPresenceCollection, fn.select, PresenceRow) are updated accordingly.

In `@packages/durable-session/src/collections/session-meta.ts`:
- Around line 33-43: The JSDoc example in session-meta.ts incorrectly imports
createSessionMetaCollectionOptions from '@electric-sql/durable-session'; update
the example to import createSessionMetaCollectionOptions from
'@superset/durable-session' (and verify any other example imports in the same
file reference the correct package name) so the example matches package.json;
keep the rest of the example (including the createCollection /
createSessionMetaCollectionOptions usage) unchanged.

In `@packages/durable-session/src/collections/session-stats.ts`:
- Around line 108-113: The current try/catch around
materializeMessage(messageRows) silently swallows errors; update the catch to
log the error with context (e.g., include messageRows or an identifier) before
continuing so invalid messages are still skipped but failures are visible;
specifically modify the block that calls materializeMessage and pushes into
messages to catch (err) and call the module logger or processLogger with a
descriptive message and the err plus relevant messageRows metadata, then
continue to the next message.

In `@packages/durable-session/src/materialize.ts`:
- Around line 231-238: The function messageRowToUIMessage incorrectly narrows
row.role to 'user' | 'assistant', dropping the 'system' role; update the
function to preserve the full MessageRole (or explicitly allow 'system') for the
returned UIMessage so system messages remain typed and handled consistently with
other functions and client.ts; locate messageRowToUIMessage in materialize.ts
and replace the cast on row.role with the correct MessageRole-compatible type or
remove the cast so the role is returned unchanged.
- Around line 53-59: The parseChunk function currently swallows JSON parse
errors; update parseChunk to catch and log parsing errors with a
"[durable-session/parseChunk]" prefix and the error + raw chunkJson, then
validate the parsed value's shape before casting to DurableStreamChunk (either
by using a Zod schema or a minimal manual check for required properties such as
the known DurableStreamChunk fields) and return null if validation fails;
reference the parseChunk function and the DurableStreamChunk type when making
these changes so you log failures and only return a typed object after
successful validation.
- Around line 95-125: Replace the ad-hoc any casts and silent swallowing in the
loop: create module-level named constants for legacy and control chunk types
(e.g., MESSAGE_START, MESSAGE_END, STOP, ERROR, RUN_ERROR), add a typed guard
function (or narrow the parsed chunk type) to check for legacy wrapper chunks
instead of using (chunk as any).type, and wrap the call to
processor.processChunk(chunk as StreamChunk) in a try/catch that logs the error
with context (include the chunk id/index and chunk content) before continuing;
keep the existing isWholeMessageChunk and isDoneChunk checks but compare against
the new constants and use the typed guard and logging to improve type safety and
observability (referencing parseChunk, isWholeMessageChunk,
processor.processChunk, and isDoneChunk).

In `@packages/durable-session/src/react/use-durable-chat.ts`:
- Around line 169-190: The code builds a key and constructs a DurableChatClient
using possibly-undefined properties from clientOptions (sessionId/proxyUrl) and
uses an unsafe cast; add a runtime guard before creating the key or new client:
when providedClient is falsy, assert that clientOptions.sessionId and
clientOptions.proxyUrl are present (throw a clear Error or call
onErrorRef.current) and only then compute key = `${sessionId}:${proxyUrl}` and
create the DurableChatClient with a properly typed options object (do not use
"as DurableChatClientOptions"); update the client creation branch that
references clientRef, key, clientOptions and DurableChatClient to depend on this
guard so undefined values never reach the constructor.
- Around line 37-112: Replace the loose any types in the Collection generic
bounds and the CollectionItem helper with unknown (update the generics used in
useCollectionData and the CollectionItem type) and reset internal refs when the
incoming collection instance changes: inside useCollectionData detect collection
identity change and set versionRef.current = 0 and snapshotRef.current = {
version: -1, data: [] } (and reinitialize
subscribeRef.current/getSnapshotRef.current if needed) so the hook does not
return stale cached snapshots from the prior collection; update references to
the functions subscribeRef, getSnapshotRef and
collection.subscribeChanges/collection.values in the useCollectionData function
to operate against the new collection instance.

In `@packages/durable-session/src/schema.ts`:
- Around line 11-27: Update the JSDoc example to import from the correct package
name: change the import statement that references sessionStateSchema from
'@electric-sql/durable-session' to '@superset/durable-session' so the example
reflects the actual package; locate the example block that mentions
sessionStateSchema and update only the import string.
🧹 Nitpick comments (9)
packages/durable-session/package.json (1)

29-32: Consider marking @tanstack/react-db as optional.

Since the package exposes both a core entry (".") and a React-specific entry ("./react"), consumers using only the core API shouldn't need @tanstack/react-db. Consider adding peerDependenciesMeta to mark it as optional:

Suggested change
 	"peerDependencies": {
 		"react": "^18.0.0 || ^19.0.0",
 		"@tanstack/react-db": "^0.1.66"
+	},
+	"peerDependenciesMeta": {
+		"react": {
+			"optional": true
+		},
+		"@tanstack/react-db": {
+			"optional": true
+		}
 	}
packages/durable-session/src/collections/session-stats.ts (1)

199-210: Consider extracting the usage type to a named constant.

The inline type assertion for usage is verbose. Extracting it to a module-level type would improve readability and reusability.

Suggested refactor
+/** Token usage shape from chunk data (supports both camelCase and snake_case) */
+interface ChunkUsage {
+  totalTokens?: number
+  promptTokens?: number
+  completionTokens?: number
+  total_tokens?: number
+  prompt_tokens?: number
+  completion_tokens?: number
+}
+
+interface ChunkWithUsage {
+  usage?: ChunkUsage
+}
+
 function extractTokenUsage(rows: ChunkRow[]): {
   // ...
     // Look for usage information in chunks
-    const usage = (
-      chunk as {
-        usage?: {
-          totalTokens?: number
-          promptTokens?: number
-          completionTokens?: number
-          total_tokens?: number
-          prompt_tokens?: number
-          completion_tokens?: number
-        }
-      }
-    ).usage
+    const usage = (chunk as ChunkWithUsage).usage
packages/durable-session/src/types.ts (2)

223-224: Move import to the top of the file.

The import statement is placed in the middle of the file after several type definitions. TypeScript/JavaScript convention is to place all imports at the top of the file for consistency and readability.

Suggested change

Move this import to the top of the file, after the other import statements (after line 19):

 import type { Collection } from '@tanstack/db'
 import type { SessionDB } from './collection'
+import type { ChunkRow, PresenceRow, AgentRow } from './schema'
 
 // Re-export schema types
 export type { ChunkRow, ChunkValue, PresenceRow, AgentRow } from './schema'

Then remove the duplicate import at line 224.


412-416: Remove or convert commented-out code.

The liveMode field is commented out. Per coding guidelines, avoid leaving commented-out code. Either remove it entirely or convert to a TODO comment with context about when it should be added.

Suggested change
   signal?: AbortSignal
-  // /**
-  //  * Live mode for the stream connection.
-  //  * Defaults to "sse" for efficient real-time updates.
-  //  */
-  // liveMode?: LiveMode
+  // TODO: Add liveMode option when `@durable-streams/state` supports configurable live modes
 }
packages/durable-session/src/react/index.ts (1)

1-4: Package name inconsistency in JSDoc.

The JSDoc header references @superset/durable-session/react, but the main index.ts uses @electric-sql/durable-session. These should be consistent across the package.

 /**
- * `@superset/durable-session/react`
+ * `@electric-sql/durable-session/react`
  *
  * React bindings for durable chat client backed by TanStack DB and Durable Streams.
packages/durable-session/src/client.ts (4)

115-116: Unused field _optimisticSeq.

The _optimisticSeq field is declared but never used in the class. Either remove it or implement its intended usage.

-  // Counter for generating unique optimistic sequence numbers
-  private _optimisticSeq = 0

454-463: Use prefixed logging pattern.

Per coding guidelines, use prefixed console logging with consistent context pattern: [domain/operation] message.

     fetch(`${this.options.proxyUrl}/v1/sessions/${this.sessionId}/stop`, {
       method: 'POST',
       headers: { 'Content-Type': 'application/json' },
       body: JSON.stringify({ messageId: null }), // null = stop all
     }).catch((err) => {
-      console.warn('Failed to stop generation:', err)
+      console.warn('[DurableChatClient/stop] Failed to stop generation:', err)
     })

426-428: Suspicious double type cast.

The cast as unknown as MessageRow suggests a type mismatch between UIMessage and MessageRow. Since extractTextContent expects MessageRow, but lastUserMessage is UIMessage, consider:

  1. Creating an overload of extractTextContent that accepts UIMessage
  2. Or using a type guard to safely extract content from UIMessage
     // Get content of last user message
     const lastUserMessage = msgs[lastUserMessageIndex]!
-    const content = extractTextContent(lastUserMessage as unknown as MessageRow)
+    const content = 'parts' in lastUserMessage
+      ? extractTextContent(lastUserMessage as MessageRow)
+      : ''

716-718: Redundant status code checks.

If response.ok is false, the status is already outside the 200-299 range, making the additional !== 200 and !== 201 checks redundant.

-        if (!response.ok && response.status !== 200 && response.status !== 201) {
+        if (!response.ok) {
           throw new Error(`Failed to create session: ${response.status}`)
         }

Comment thread packages/durable-session/src/client.ts Outdated
Comment on lines +465 to +472
/**
* Clear all messages (local only - does not affect server).
*/
clear(): void {
// Note: This only clears local state, not the durable stream
// For full clear, use the proxy's clear endpoint
this.options.onMessagesChange?.([])
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

clear() does not actually clear local messages.

The method only invokes the onMessagesChange callback with an empty array but doesn't clear the underlying _collections.messages. After calling clear(), client.messages will still return all messages.

Either implement actual clearing of the messages collection or rename/document this as a callback notification only.

🤖 Prompt for AI Agents
In `@packages/durable-session/src/client.ts` around lines 465 - 472, The clear()
method currently only calls this.options.onMessagesChange([]) but leaves the
underlying message store intact; update durable-session's clear() to also empty
the in-memory collection (e.g. reset this._collections.messages to an empty
array or call its clear() method if it's a Map/Set) before invoking
this.options.onMessagesChange, so client.messages reflects the cleared state;
reference the clear() method and this._collections.messages (and the
options.onMessagesChange callback) when making the change.

Comment thread packages/durable-session/src/client.ts Outdated
Comment thread packages/durable-session/src/client.ts Outdated
Comment on lines +744 to +761
*/
pause(): void {
this._isPaused = true
// The stream-db handles pausing internally via the abort signal
}

/**
* Resume stream sync.
*/
async resume(): Promise<void> {
if (!this._isConnected) {
await this.connect()
return
}

this._isPaused = false
// The stream-db handles resuming internally
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's understand the structure of the client.ts file and find the _db object
cd packages/durable-session/src && head -100 client.ts | grep -E 'import|_db|SessionDB'

Repository: superset-sh/superset

Length of output: 598


🏁 Script executed:

# Find the SessionDB class definition and its methods
rg -n 'class SessionDB|interface SessionDB' -A 10 packages/durable-session

Repository: superset-sh/superset

Length of output: 863


🏁 Script executed:

# Search for pause/resume methods in SessionDB or stream-db related files
rg -n 'pause|resume' packages/durable-session/src --type ts -B 2 -A 2

Repository: superset-sh/superset

Length of output: 2389


🏁 Script executed:

# Check what _isPaused flag is used for
rg -n '_isPaused' packages/durable-session/src/client.ts -B 2 -A 2

Repository: superset-sh/superset

Length of output: 736


🏁 Script executed:

# Look for the actual pause/resume implementation context in client.ts
sed -n '730,770p' packages/durable-session/src/client.ts

Repository: superset-sh/superset

Length of output: 937


🏁 Script executed:

# Find where _isPaused is checked in the sync logic
rg -n '_isPaused' packages/durable-session/src/client.ts -B 3 -A 3

Repository: superset-sh/superset

Length of output: 1065


🏁 Script executed:

# Look for the sync loop or main stream sync method
rg -n 'syncStream|sync|_db\.' packages/durable-session/src/client.ts | head -30

Repository: superset-sh/superset

Length of output: 1943


🏁 Script executed:

# Check the collection.ts to see SessionDB interface and close method
sed -n '1,150p' packages/durable-session/src/collection.ts

Repository: superset-sh/superset

Length of output: 4969


🏁 Script executed:

# Search for where _isPaused is actually used/checked (not just set)
rg -n '_isPaused' packages/durable-session/src/ -B 5 -A 5 --type ts

Repository: superset-sh/superset

Length of output: 3860


🏁 Script executed:

# Look for the main sync/stream handling logic
rg -n 'sync|stream|connect' packages/durable-session/src/client.ts | grep -E 'async|function|for|while|if' | head -20

Repository: superset-sh/superset

Length of output: 1298


🏁 Script executed:

# Check the full connect method to see the actual sync implementation
sed -n '695,750p' packages/durable-session/src/client.ts

Repository: superset-sh/superset

Length of output: 1673


🏁 Script executed:

# Check if StreamDB has pause/resume methods or if the signal mechanism handles pausing
rg -n 'StreamDB|StreamDBMethods' packages/durable-session/src/collection.ts -A 15

Repository: superset-sh/superset

Length of output: 2679


🏁 Script executed:

# Look for any abort signal usage or pause/resume in the sync logic
rg -n 'signal|abort|pause|resume' packages/durable-session/src/collection.ts

Repository: superset-sh/superset

Length of output: 157


🏁 Script executed:

# Check if there's a reference to `@durable-streams/state` documentation or implementation
rg -n '@durable-streams|createStreamDB' packages/durable-session

Repository: superset-sh/superset

Length of output: 894


🏁 Script executed:

# Check if _isPaused is actually checked anywhere in conditions/logic
ast-grep --pattern 'if ($_isPaused) {
  $$$
}'

Repository: superset-sh/superset

Length of output: 46


pause() and resume() methods are incomplete—they set flags but don't actually pause/resume the stream.

The _isPaused flag is never checked anywhere in the sync logic, and neither method invokes any pause/resume functionality on _db. The comments claim "stream-db handles internally" but no delegation actually occurs. The signal parameter passed to createStreamDB() could be used for pausing (via abort), but neither method manipulates it.

Either implement actual stream pause/resume logic or clearly document these as placeholder stubs pending implementation.

🤖 Prompt for AI Agents
In `@packages/durable-session/src/client.ts` around lines 744 - 761, The
pause()/resume() methods currently only flip _isPaused but never affect the
stream; update them to actually pause/resume the underlying stream by delegating
to the stream DB or managing the AbortSignal used by createStreamDB(): if the
_db instance exposes pause()/resume() call those in pause() and resume();
otherwise store and manage an AbortController (e.g., _streamAbortController)
created when connecting (in connect()/createStreamDB()) and call abort() to
pause and recreate/reset the controller and restart the stream in resume(); also
ensure the sync logic checks _isPaused (or the controller state) before
processing events so pausing truly stops work. Refer to pause(), resume(),
_isPaused, _isConnected, connect(), _db, createStreamDB(), and signal when
implementing.

Comment on lines +98 to +113
export function createSessionDB(config: SessionDBConfig): SessionDB {
const { sessionId, baseUrl, headers, signal, /* liveMode */ } = config

// Build the stream URL for this session
const streamUrl = `${baseUrl}/v1/stream/sessions/${sessionId}`

// Create the stream-db instance with our session state schema (synchronous)
const rawDb: RawSessionDB = createStreamDB({
streamOptions: {
url: streamUrl,
headers,
signal,
},
state: sessionStateSchema,
// liveMode,
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's verify the file exists and check the current code
fd -t f "collection.ts" packages/durable-session/src/

Repository: superset-sh/superset

Length of output: 106


🏁 Script executed:

# Check the content of the file around the specified lines
cat -n packages/durable-session/src/collection.ts | head -120 | tail -30

Repository: superset-sh/superset

Length of output: 1174


🏁 Script executed:

# Search for sessionId usage throughout the codebase to understand constraints
rg "sessionId" packages/durable-session/src/ -A 2 -B 2

Repository: superset-sh/superset

Length of output: 20723


🏁 Script executed:

# Look for SessionDBConfig type definition to understand sessionId type
rg "SessionDBConfig|sessionId.*:" packages/durable-session/src/ -B 3 -A 3

Repository: superset-sh/superset

Length of output: 14447


🏁 Script executed:

# Check if there are other URL building patterns in the codebase for reference
rg "new URL|baseUrl.*streamUrl" packages/durable-session/ -B 2 -A 2

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

# Search for other sessionId URL patterns in client.ts
rg "sessionId\}" packages/durable-session/src/client.ts -B 1 -A 1

Repository: superset-sh/superset

Length of output: 1175


🏁 Script executed:

# Check if sessionId is validated anywhere for format constraints
rg "sessionId.*validate|sessionId.*pattern|sessionId.*regex|sessionId.*format" packages/durable-session/src/ -i

Repository: superset-sh/superset

Length of output: 46


Encode sessionId when building the stream URL to prevent path corruption.
If sessionId contains /, ?, or #, the current string interpolation corrupts the path. Use new URL() with encodeURIComponent() to safely handle the sessionId.

Suggested fix
+const SESSION_STREAM_PATH = '/v1/stream/sessions'
+
 export function createSessionDB(config: SessionDBConfig): SessionDB {
   const { sessionId, baseUrl, headers, signal, /* liveMode */ } = config

   // Build the stream URL for this session
-  const streamUrl = `${baseUrl}/v1/stream/sessions/${sessionId}`
+  const streamUrl = new URL(
+    `${SESSION_STREAM_PATH}/${encodeURIComponent(sessionId)}`,
+    baseUrl
+  ).toString()

Note: The same vulnerability exists in other parts of the codebase where sessionId is interpolated directly into URLs (e.g., client.ts methods). Consider applying the same fix systematically.

🤖 Prompt for AI Agents
In `@packages/durable-session/src/collection.ts` around lines 98 - 113, The stream
URL in createSessionDB is built by interpolating sessionId directly into
streamUrl which can corrupt the path if sessionId contains characters like '/',
'?', or '#'; update createSessionDB to safely encode the sessionId when
constructing the URL (e.g., use new URL() and encodeURIComponent(sessionId) or
equivalent) so the value passed into createStreamDB's streamOptions.url is a
valid URL; apply the same encoding approach anywhere sessionId is interpolated
into URLs (e.g., client.ts methods) to prevent path injection.

Comment on lines +133 to +153
export function getChunkKey(messageId: string, seq: number): string {
return `${messageId}:${seq}`
}

/**
* Parse a chunk key into its components.
*
* @param key - Chunk key in format `${messageId}:${seq}`
* @returns Parsed components or null if invalid
*/
export function parseChunkKey(key: string): { messageId: string; seq: number } | null {
const lastColonIndex = key.lastIndexOf(':')
if (lastColonIndex === -1) return null

const messageId = key.slice(0, lastColonIndex)
const seqStr = key.slice(lastColonIndex + 1)
const seq = parseInt(seqStr, 10)

if (isNaN(seq)) return null

return { messageId, seq }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n packages/durable-session/src/collection.ts | sed -n '130,160p'

Repository: superset-sh/superset

Length of output: 953


🏁 Script executed:

rg "getChunkKey|parseChunkKey" packages/durable-session --no-heading -n

Repository: superset-sh/superset

Length of output: 431


🏁 Script executed:

rg "getChunkKey|parseChunkKey" --type ts --type tsx -B2 -A2 | grep -v "^--$" | head -100

Repository: superset-sh/superset

Length of output: 91


🏁 Script executed:

rg "getChunkKey|parseChunkKey" -t ts -B2 -A2 | grep -v "^--$"

Repository: superset-sh/superset

Length of output: 1187


🏁 Script executed:

rg "getChunkKey\(" -t ts -B2 -A2 --no-heading

Repository: superset-sh/superset

Length of output: 416


🏁 Script executed:

rg "getChunkKey\s*\(" -t ts --no-heading -A2

Repository: superset-sh/superset

Length of output: 295


🏁 Script executed:

rg "getChunkKey" -t ts --no-heading | grep -v "export function" | grep -v "^packages/durable-session/src/index.ts"

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

find . -name "*.ts" -type f -exec grep -l "getChunkKey\|parseChunkKey" {} \;

Repository: superset-sh/superset

Length of output: 148


🏁 Script executed:

wc -l packages/durable-session/src/collection.ts

Repository: superset-sh/superset

Length of output: 110


🏁 Script executed:

cat -n packages/durable-session/src/collection.ts

Repository: superset-sh/superset

Length of output: 6108


🏁 Script executed:

rg "from.*durable-session" -t ts --no-heading

Repository: superset-sh/superset

Length of output: 1167


🏁 Script executed:

find . -name "*.ts" -type f ! -path "*/node_modules/*" ! -path "*/dist/*" ! -path "*/build/*" -exec grep -l "getChunkKey\|parseChunkKey" {} \;

Repository: superset-sh/superset

Length of output: 148


🏁 Script executed:

grep -r "getChunkKey\|parseChunkKey" . --include="*.ts" --include="*.tsx" --include="*.js" --include="*.jsx" 2>/dev/null | grep -v "export" | grep -v "^Binary"

Repository: superset-sh/superset

Length of output: 175


🏁 Script executed:

cat packages/durable-session/src/index.ts | head -150

Repository: superset-sh/superset

Length of output: 3989


🏁 Script executed:

find . -name "*.test.ts" -o -name "*.spec.ts" | xargs grep -l "getChunkKey\|parseChunkKey" 2>/dev/null

Repository: superset-sh/superset

Length of output: 48


🏁 Script executed:

find packages/durable-session -name "*.test.ts" -o -name "*.spec.ts" -o -name "__tests__"

Repository: superset-sh/superset

Length of output: 46


Extract delimiter constant and use options object for consistency with coding guidelines.

getChunkKey violates the guideline to use object parameters for functions with 2+ parameters, and both functions hardcode the ':' delimiter instead of extracting it as a named constant. Additionally, parseChunkKey uses parseInt which accepts partial numeric strings (e.g., "12x"12); use stricter validation with regex.

Suggested fix
+const CHUNK_KEY_SEPARATOR = ':'
+const CHUNK_KEY_REGEX = /^(.+):(\d+)$/
+
-export function getChunkKey(messageId: string, seq: number): string {
-  return `${messageId}:${seq}`
-}
+export function getChunkKey({
+  messageId,
+  seq,
+}: {
+  messageId: string
+  seq: number
+}): string {
+  return `${messageId}${CHUNK_KEY_SEPARATOR}${seq}`
+}
@@
-export function parseChunkKey(key: string): { messageId: string; seq: number } | null {
-  const lastColonIndex = key.lastIndexOf(':')
-  if (lastColonIndex === -1) return null
-
-  const messageId = key.slice(0, lastColonIndex)
-  const seqStr = key.slice(lastColonIndex + 1)
-  const seq = parseInt(seqStr, 10)
-
-  if (isNaN(seq)) return null
-
-  return { messageId, seq }
-}
+export function parseChunkKey(key: string): { messageId: string; seq: number } | null {
+  const match = CHUNK_KEY_REGEX.exec(key)
+  if (!match) return null
+
+  const [, messageId, seqStr] = match
+  const seq = Number(seqStr)
+  if (!Number.isInteger(seq) || seq < 0) return null
+
+  return { messageId, seq }
+}
🤖 Prompt for AI Agents
In `@packages/durable-session/src/collection.ts` around lines 133 - 153, Extract
the delimiter ':' into a named constant (e.g., CHUNK_KEY_DELIMITER) and update
both getChunkKey and parseChunkKey to use it; refactor getChunkKey to accept a
single options object parameter (e.g., { messageId, seq }) instead of two
positional args to follow the object-parameter guideline; in parseChunkKey
replace parseInt with a strict numeric validation (e.g., a regex that matches
only digits) before converting to Number and return null on any non-matching
seq; ensure parseChunkKey still locates the last delimiter (using
CHUNK_KEY_DELIMITER) and returns { messageId, seq } or null.

Comment on lines +95 to +125
for (const row of sorted) {
const chunk = parseChunk(row.chunk)
if (!chunk) continue

// Skip legacy wrapper chunks (for backward compatibility)
if ((chunk as any).type === 'message-start' || (chunk as any).type === 'message-end') {
if ((chunk as any).type === 'message-end') {
isComplete = true
}
continue
}

// Skip whole-message chunks (shouldn't be in assistant messages, but guard)
if (isWholeMessageChunk(chunk)) continue

// Process TanStack AI StreamChunk
try {
processor.processChunk(chunk as StreamChunk)
} catch {
// Skip chunks that can't be processed
}

if (isDoneChunk(chunk as StreamChunk)) {
isComplete = true
}

// Also check for stop/error chunks (stop is from our proxy, not in TanStack AI types)
const chunkType = (chunk as { type: string }).type
if (chunkType === 'stop' || chunkType === 'error' || chunkType === 'RUN_ERROR') {
isComplete = true
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's check if the file exists and read the relevant section
head -n 150 packages/durable-session/src/materialize.ts | tail -n 60

Repository: superset-sh/superset

Length of output: 1615


🏁 Script executed:

# Also check the file size to understand its scope
wc -l packages/durable-session/src/materialize.ts

Repository: superset-sh/superset

Length of output: 111


🏁 Script executed:

# Get the full context around lines 95-125
sed -n '80,135p' packages/durable-session/src/materialize.ts

Repository: superset-sh/superset

Length of output: 1699


🏁 Script executed:

# Check for type definitions and imports at the top of the file
head -n 50 packages/durable-session/src/materialize.ts

Repository: superset-sh/superset

Length of output: 1709


🏁 Script executed:

# Search for usage of isDoneChunk and isWholeMessageChunk to understand their role
rg -n "isDoneChunk|isWholeMessageChunk" packages/durable-session/src/materialize.ts

Repository: superset-sh/superset

Length of output: 357


🏁 Script executed:

# Check the exact type definitions for StreamChunk and related types
rg -A 5 "type StreamChunk|type WholeMessageChunk|type DurableStreamChunk" packages/durable-session/src/types.ts packages/durable-session/src/schema.ts 2>/dev/null | head -50

Repository: superset-sh/superset

Length of output: 442


🏁 Script executed:

# Look for console logging patterns in the file to understand project convention
rg -n "console\." packages/durable-session/src/materialize.ts

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

# Check if there are other similar error handling patterns in the codebase
rg -B 2 -A 3 "catch \{" packages/durable-session/src/ --type ts --type tsx | head -40

Repository: superset-sh/superset

Length of output: 91


🏁 Script executed:

# Check logging patterns in the broader durable-session package
rg -n "console\." packages/durable-session/ --type ts -A 1 | head -30

Repository: superset-sh/superset

Length of output: 1411


🏁 Script executed:

# Look at the parseChunk function to understand what types it returns
rg -B 5 -A 10 "function parseChunk|const parseChunk" packages/durable-session/src/materialize.ts

Repository: superset-sh/superset

Length of output: 502


🏁 Script executed:

# Check types.ts for WholeMessageChunk definition
cat packages/durable-session/src/types.ts | head -80

Repository: superset-sh/superset

Length of output: 2670


Avoid any and log stream processor failures.

Three type safety and error handling issues:

  1. any type casts (lines 101–102): Legacy wrapper type checks use (chunk as any).type. Use a typed guard instead.
  2. Silent error swallowing (lines 110–113): processChunk errors are caught but not logged. Log with context before continuing.
  3. Hardcoded magic strings: 'message-start', 'message-end', 'stop', 'error', 'RUN_ERROR' should be extracted to named constants.

Extract constants at module top and add prefixed logging:

Suggested fix
+const LEGACY_WRAPPER_TYPES = new Set(['message-start', 'message-end'])
+const TERMINAL_CHUNK_TYPES = new Set(['stop', 'error', 'RUN_ERROR'])
@@
-    if ((chunk as any).type === 'message-start' || (chunk as any).type === 'message-end') {
-      if ((chunk as any).type === 'message-end') {
+    const chunkType = (chunk as { type?: string }).type
+    if (chunkType && LEGACY_WRAPPER_TYPES.has(chunkType)) {
+      if (chunkType === 'message-end') {
         isComplete = true
       }
       continue
     }
@@
-    try {
-      processor.processChunk(chunk as StreamChunk)
-    } catch {
-      // Skip chunks that can't be processed
-    }
+    try {
+      processor.processChunk(chunk as StreamChunk)
+    } catch (err) {
+      console.warn('[durable-session/stream-processor] Failed to process chunk', { err, chunk })
+      continue
+    }
@@
-    if (isDoneChunk(chunk as StreamChunk)) {
+    if (chunkType === 'RUN_FINISHED') {
       isComplete = true
     }
@@
-    const chunkType = (chunk as { type: string }).type
-    if (chunkType === 'stop' || chunkType === 'error' || chunkType === 'RUN_ERROR') {
+    if (chunkType && TERMINAL_CHUNK_TYPES.has(chunkType)) {
       isComplete = true
     }
🤖 Prompt for AI Agents
In `@packages/durable-session/src/materialize.ts` around lines 95 - 125, Replace
the ad-hoc any casts and silent swallowing in the loop: create module-level
named constants for legacy and control chunk types (e.g., MESSAGE_START,
MESSAGE_END, STOP, ERROR, RUN_ERROR), add a typed guard function (or narrow the
parsed chunk type) to check for legacy wrapper chunks instead of using (chunk as
any).type, and wrap the call to processor.processChunk(chunk as StreamChunk) in
a try/catch that logs the error with context (include the chunk id/index and
chunk content) before continuing; keep the existing isWholeMessageChunk and
isDoneChunk checks but compare against the new constants and use the typed guard
and logging to improve type safety and observability (referencing parseChunk,
isWholeMessageChunk, processor.processChunk, and isDoneChunk).

Comment thread packages/durable-session/src/materialize.ts Outdated
Comment on lines +37 to +112
function useCollectionData<C extends Collection<any, any, any, any, any>>(
collection: C
): CollectionItem<C>[] {
type T = CollectionItem<C>

// Track version to know when to create a new snapshot.
// Incremented by subscription callback when collection changes.
const versionRef = useRef(0)

// Cache the last snapshot to maintain stable reference.
// useSyncExternalStore requires getSnapshot to return the same reference
// when data hasn't changed, otherwise it triggers infinite re-renders.
const snapshotRef = useRef<{ version: number; data: T[] }>({
version: -1, // Force initial snapshot creation
data: [],
})

// Subscribe callback - increments version to signal data changed.
// Stored in ref to maintain stable reference for useSyncExternalStore.
const subscribeRef = useRef((onStoreChange: () => void): (() => void) => {
const subscription = collection.subscribeChanges(() => {
versionRef.current++
onStoreChange()
})
return () => subscription.unsubscribe()
})

// Update subscribe ref when collection changes
subscribeRef.current = (onStoreChange: () => void): (() => void) => {
const subscription = collection.subscribeChanges(() => {
versionRef.current++
onStoreChange()
})
return () => subscription.unsubscribe()
}

// Snapshot callback - returns cached data unless version changed.
// Stored in ref to maintain stable reference for useSyncExternalStore.
const getSnapshotRef = useRef((): T[] => {
const currentVersion = versionRef.current
const cached = snapshotRef.current

// Return cached snapshot if version hasn't changed
if (cached.version === currentVersion) {
return cached.data
}

// Version changed - create new snapshot and cache it
const data = [...collection.values()] as T[]
snapshotRef.current = { version: currentVersion, data }
return data
})

// Update getSnapshot ref when collection changes
getSnapshotRef.current = (): T[] => {
const currentVersion = versionRef.current
const cached = snapshotRef.current

if (cached.version === currentVersion) {
return cached.data
}

const data = [...collection.values()] as T[]
snapshotRef.current = { version: currentVersion, data }
return data
}

// Pass the same function for both getSnapshot and getServerSnapshot.
// This ensures server and client render the same initial state (empty array),
// preventing hydration mismatches while enabling proper SSR.
return useSyncExternalStore(
subscribeRef.current,
getSnapshotRef.current,
getSnapshotRef.current
)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, locate and read the file to understand the context
fd -t f "use-durable-chat.ts" --type f

Repository: superset-sh/superset

Length of output: 118


🏁 Script executed:

# Check the file size first
wc -l packages/durable-session/src/react/use-durable-chat.ts

Repository: superset-sh/superset

Length of output: 122


🏁 Script executed:

# Read the relevant section and surrounding context
sed -n '1,150p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 5955


🏁 Script executed:

# Look for Collection type definition
rg "type Collection|interface Collection" --type ts --type tsx

Repository: superset-sh/superset

Length of output: 91


🏁 Script executed:

# Search for Collection type definition
rg "type Collection|interface Collection" --type ts

Repository: superset-sh/superset

Length of output: 719


🏁 Script executed:

# Check useDurableChat hook which calls useCollectionData
sed -n '150,250p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 4198


🏁 Script executed:

# Check how useCollectionData is called
rg "useCollectionData" packages/durable-session/src/react/use-durable-chat.ts

Repository: superset-sh/superset

Length of output: 367


🏁 Script executed:

# Let me check how the Collections type is defined in DurableChatClient
rg "collections:" packages/durable-session/src/ -A 3 -B 1 --type ts

Repository: superset-sh/superset

Length of output: 1485


🏁 Script executed:

# Check if collection can actually change during the hook's lifetime
sed -n '20,50p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 1423


🏁 Script executed:

# Verify that Collection type from TanStack uses those 5 parameters
rg "export.*type.*Collection" --type ts -A 2

Repository: superset-sh/superset

Length of output: 46


🏁 Script executed:

# Check node_modules or package reference for Collection type
rg "@tanstack/react-db" packages/durable-session/package.json

Repository: superset-sh/superset

Length of output: 97


🏁 Script executed:

# Let me trace the actual flow - check if collection reference can change
sed -n '156,200p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 2376


🏁 Script executed:

# Check the client implementation to see if collections are recreated
rg "createCollections|collections.*=" packages/durable-session/src/client.ts -B 2 -A 5 --type ts | head -60

Repository: superset-sh/superset

Length of output: 1700


🏁 Script executed:

# Look at the CollectionItem type helper which also uses any
sed -n '28,30p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 175


🏁 Script executed:

# Verify that collections are created once per client and not re-created
sed -n '50,100p' packages/durable-session/src/client.ts | cat -n

Repository: superset-sh/superset

Length of output: 1933


🏁 Script executed:

# Check if collections from different clients are different objects
rg "get collections" packages/durable-session/src/client.ts -A 3 --type ts

Repository: superset-sh/superset

Length of output: 119


🏁 Script executed:

# Let me verify the actual scenario - when does collection parameter change?
# Check if clients are created multiple times in a single component instance
sed -n '155,180p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 1517


🏁 Script executed:

# Look at the actual behavior - does client ever change after initial creation?
sed -n '21,42p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 933


🏁 Script executed:

# Verify that collection reference actually changes when client changes
rg "client.collections" packages/durable-session/src/react/use-durable-chat.ts -B 2 -A 2

Repository: superset-sh/superset

Length of output: 474


🏁 Script executed:

# Check if there's any cleanup or dependency array that would re-run useCollectionData
sed -n '155,175p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 1198


🏁 Script executed:

# Verify the complete flow - can client/collections actually be different on re-render?
sed -n '21,50p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 1412


🏁 Script executed:

# Verify by checking the full flow of client recreation
sed -n '156,180p' packages/durable-session/src/react/use-durable-chat.ts | cat -n

Repository: superset-sh/superset

Length of output: 1506


Replace any with unknown in Collection generic bounds and reset refs when collection changes.

The function uses any for 4-5 type parameters in Collection generics (lines 28, 37), violating the type safety guideline. More critically, when the collection reference changes (e.g., client recreation in useDurableChat), the versionRef and snapshotRef are not reset. This causes the hook to return cached snapshots from the previous collection until the new collection emits a change event, creating a transient correctness bug.

Suggested fix
-function useCollectionData<C extends Collection<any, any, any, any, any>>(
+function useCollectionData<C extends Collection<unknown, unknown, unknown, unknown, unknown>>(
   collection: C
 ): CollectionItem<C>[] {
   type T = CollectionItem<C>
 
   // Track version to know when to create a new snapshot.
   // Incremented by subscription callback when collection changes.
   const versionRef = useRef(0)
 
   // Cache the last snapshot to maintain stable reference.
   // useSyncExternalStore requires getSnapshot to return the same reference
   // when data hasn't changed, otherwise it triggers infinite re-renders.
   const snapshotRef = useRef<{ version: number; data: T[] }>({
     version: -1, // Force initial snapshot creation
     data: [],
   })
+
+  const collectionRef = useRef(collection)
+  if (collectionRef.current !== collection) {
+    collectionRef.current = collection
+    versionRef.current++
+    snapshotRef.current = { version: -1, data: [] as T[] }
+  }

Also update the CollectionItem helper on line 28 to use unknown instead of any.

🤖 Prompt for AI Agents
In `@packages/durable-session/src/react/use-durable-chat.ts` around lines 37 -
112, Replace the loose any types in the Collection generic bounds and the
CollectionItem helper with unknown (update the generics used in
useCollectionData and the CollectionItem type) and reset internal refs when the
incoming collection instance changes: inside useCollectionData detect collection
identity change and set versionRef.current = 0 and snapshotRef.current = {
version: -1, data: [] } (and reinitialize
subscribeRef.current/getSnapshotRef.current if needed) so the hook does not
return stale cached snapshots from the prior collection; update references to
the functions subscribeRef, getSnapshotRef and
collection.subscribeChanges/collection.values in the useCollectionData function
to operate against the new collection instance.

Comment on lines +169 to +190
// Create client synchronously - always available immediately
const clientRef = useRef<{ client: DurableChatClient<TTools>; key: string } | null>(null)
const key = `${clientOptions.sessionId}:${clientOptions.proxyUrl}`

// Create or recreate client when key changes or client was disposed
// The isDisposed check handles React Strict Mode: cleanup disposes the client,
// so the next render must create a fresh one with a new AbortController.
if (providedClient) {
if (!clientRef.current || clientRef.current.client !== providedClient) {
clientRef.current = { client: providedClient, key: 'provided' }
}
} else if (!clientRef.current || clientRef.current.key !== key || clientRef.current.client.isDisposed) {
// Dispose old client if exists (may already be disposed, which is fine)
clientRef.current?.client.dispose()
clientRef.current = {
client: new DurableChatClient<TTools>({
...clientOptions,
onError: (err) => onErrorRef.current(err),
} as DurableChatClientOptions<TTools>),
key,
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find packages/durable-session -type f -name "*.ts" -o -name "*.tsx" | head -20

Repository: superset-sh/superset

Length of output: 840


🏁 Script executed:

cat -n packages/durable-session/src/react/use-durable-chat.ts | head -200

Repository: superset-sh/superset

Length of output: 8378


🏁 Script executed:

# Find type definitions for UseDurableChatOptions and DurableChatClientOptions
rg "UseDurableChatOptions|DurableChatClientOptions" packages/durable-session --type ts -A 5 -B 2

Repository: superset-sh/superset

Length of output: 8251


🏁 Script executed:

cat -n packages/durable-session/src/types.ts | grep -A 30 "export interface DurableChatClientOptions"

Repository: superset-sh/superset

Length of output: 1291


🏁 Script executed:

cat -n packages/durable-session/src/react/types.ts

Repository: superset-sh/superset

Length of output: 4122


Add runtime guard for required options when no client is provided.

UseDurableChatOptions is Partial<DurableChatClientOptions>, but sessionId and proxyUrl are required to construct a client. The current code accesses these potentially undefined properties at line 171 (creating an invalid key string) and bypasses type safety with as DurableChatClientOptions at line 187. This allows undefined values to reach the constructor.

Suggested fix
   const key = `${clientOptions.sessionId}:${clientOptions.proxyUrl}`
+
+  if (!providedClient && (!clientOptions.sessionId || !clientOptions.proxyUrl)) {
+    throw new Error('useDurableChat requires sessionId and proxyUrl when no client is provided')
+  }
@@
-    clientRef.current = {
-      client: new DurableChatClient<TTools>({
-        ...clientOptions,
-        onError: (err) => onErrorRef.current(err),
-      } as DurableChatClientOptions<TTools>),
-      key,
-    }
+    const requiredOptions: DurableChatClientOptions<TTools> = {
+      ...clientOptions,
+      sessionId: clientOptions.sessionId!,
+      proxyUrl: clientOptions.proxyUrl!,
+      onError: (err) => onErrorRef.current(err),
+    }
+    clientRef.current = {
+      client: new DurableChatClient<TTools>(requiredOptions),
+      key,
+    }
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Create client synchronously - always available immediately
const clientRef = useRef<{ client: DurableChatClient<TTools>; key: string } | null>(null)
const key = `${clientOptions.sessionId}:${clientOptions.proxyUrl}`
// Create or recreate client when key changes or client was disposed
// The isDisposed check handles React Strict Mode: cleanup disposes the client,
// so the next render must create a fresh one with a new AbortController.
if (providedClient) {
if (!clientRef.current || clientRef.current.client !== providedClient) {
clientRef.current = { client: providedClient, key: 'provided' }
}
} else if (!clientRef.current || clientRef.current.key !== key || clientRef.current.client.isDisposed) {
// Dispose old client if exists (may already be disposed, which is fine)
clientRef.current?.client.dispose()
clientRef.current = {
client: new DurableChatClient<TTools>({
...clientOptions,
onError: (err) => onErrorRef.current(err),
} as DurableChatClientOptions<TTools>),
key,
}
}
// Create client synchronously - always available immediately
const clientRef = useRef<{ client: DurableChatClient<TTools>; key: string } | null>(null)
const key = `${clientOptions.sessionId}:${clientOptions.proxyUrl}`
if (!providedClient && (!clientOptions.sessionId || !clientOptions.proxyUrl)) {
throw new Error('useDurableChat requires sessionId and proxyUrl when no client is provided')
}
// Create or recreate client when key changes or client was disposed
// The isDisposed check handles React Strict Mode: cleanup disposes the client,
// so the next render must create a fresh one with a new AbortController.
if (providedClient) {
if (!clientRef.current || clientRef.current.client !== providedClient) {
clientRef.current = { client: providedClient, key: 'provided' }
}
} else if (!clientRef.current || clientRef.current.key !== key || clientRef.current.client.isDisposed) {
// Dispose old client if exists (may already be disposed, which is fine)
clientRef.current?.client.dispose()
const requiredOptions: DurableChatClientOptions<TTools> = {
...clientOptions,
sessionId: clientOptions.sessionId!,
proxyUrl: clientOptions.proxyUrl!,
onError: (err) => onErrorRef.current(err),
}
clientRef.current = {
client: new DurableChatClient<TTools>(requiredOptions),
key,
}
}
🤖 Prompt for AI Agents
In `@packages/durable-session/src/react/use-durable-chat.ts` around lines 169 -
190, The code builds a key and constructs a DurableChatClient using
possibly-undefined properties from clientOptions (sessionId/proxyUrl) and uses
an unsafe cast; add a runtime guard before creating the key or new client: when
providedClient is falsy, assert that clientOptions.sessionId and
clientOptions.proxyUrl are present (throw a clear Error or call
onErrorRef.current) and only then compute key = `${sessionId}:${proxyUrl}` and
create the DurableChatClient with a properly typed options object (do not use
"as DurableChatClientOptions"); update the client creation branch that
references clientRef, key, clientOptions and DurableChatClient to depend on this
guard so undefined values never reach the constructor.

Comment thread packages/durable-session/src/schema.ts
Move ChatInput and PresenceBar UI components from packages/ai-chat into
packages/durable-session/src/react/components and remove the old ai-chat
package entirely. The stream client, schema, materialization, and React
hooks are all superseded by the vendored durable-session package.
- Remove unused _isPaused private field
- Replace non-null assertions with safe alternatives
- Replace `any` casts with typed alternatives in materialize.ts
- Add biome-ignore for unavoidable Collection<any> type constraints
- Fix duplicate chunkType variable declaration
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@packages/durable-session/src/collections/presence.ts`:
- Around line 46-78: The collection created by createLiveQueryCollection is
missing a getKey option so rows cannot be uniquely tracked; add getKey to the
options and return a stable unique key for each row (use actorId since the
result object includes actorId). Update the createLiveQueryCollection call to
include getKey: (row) => row.actorId so the live query can correctly
identify/update rows produced by the query that references
rawPresenceCollection, grouped and RawPresenceRow.

In `@packages/durable-session/src/index.ts`:
- Around line 19-56: Update the JSDoc example import strings to use the correct
package name: replace occurrences of "@electric-sql/durable-session" with
"@superset/durable-session" in the docs for the DurableChatClient example
(search for DurableChatClient and the example block in this file and in
session-meta.ts if present) so the documented import matches the actual package
name.
🧹 Nitpick comments (4)
packages/durable-session/src/react/components/PresenceBar/PresenceBar.tsx (1)

49-65: Extract magic number for avatar display limit.

The value 5 appears in three places (slice, comparison, subtraction). Extract to a named constant for clarity and maintainability.

♻️ Suggested refactor
+const MAX_DISPLAYED_AVATARS = 5;
+
 export function PresenceBar({
 	viewers,
 	typingUsers,
 	className,
 }: PresenceBarProps) {
 // ...
 					<div className="flex -space-x-2">
-						{viewers.slice(0, 5).map((user) => (
+						{viewers.slice(0, MAX_DISPLAYED_AVATARS).map((user) => (
 // ...
 					</div>
-					{viewers.length > 5 && (
+					{viewers.length > MAX_DISPLAYED_AVATARS && (
 						<span className="text-xs text-muted-foreground">
-							+{viewers.length - 5}
+							+{viewers.length - MAX_DISPLAYED_AVATARS}
 						</span>
 					)}

Based on learnings: "Extract hardcoded magic numbers, strings, and enums to named constants at module top instead of leaving them inline in logic"

packages/durable-session/src/types.ts (1)

417-421: Consider removing or tracking the commented code.

The commented liveMode option suggests incomplete functionality. If this is planned work, consider adding a TODO comment with a tracking issue reference; otherwise, remove to keep the codebase clean.

packages/durable-session/src/collections/session-meta.ts (1)

77-87: Consider using object parameter for consistency.

The function has 3 parameters. Per coding guidelines, functions with 2+ parameters should use object parameters for better readability and maintainability.

Suggested refactor
-export function updateConnectionStatus(
-  meta: SessionMetaRow,
-  status: ConnectionStatus,
-  error?: { message: string; code?: string } | null,
-): SessionMetaRow {
+export function updateConnectionStatus({
+  meta,
+  status,
+  error,
+}: {
+  meta: SessionMetaRow;
+  status: ConnectionStatus;
+  error?: { message: string; code?: string } | null;
+}): SessionMetaRow {

As per coding guidelines: "Use object parameters for functions with 2 or more parameters instead of positional arguments"

packages/durable-session/src/client.ts (1)

736-742: Redundant status check.

The condition !response.ok && response.status !== 200 && response.status !== 201 is redundant because response.ok is already true for status codes 200-299. The additional checks for 200/201 will never be reached when !response.ok is true.

Suggested fix
-        if (
-          !response.ok &&
-          response.status !== 200 &&
-          response.status !== 201
-        ) {
+        if (!response.ok) {
           throw new Error(`Failed to create session: ${response.status}`);
         }

Comment on lines +46 to +78
return createLiveQueryCollection({
query: (q) => {
// Subquery: filter for online, group by actorId, count for change detection
const grouped = q
.from({ presence: rawPresenceCollection })
.where(({ presence }) => eq(presence.status, "online"))
.groupBy(({ presence }) => presence.actorId)
.select(({ presence }) => ({
actorId: presence.actorId,
deviceCount: count(presence.deviceId),
}));

// Main query: imperatively gather device info per actor
return q.from({ grouped }).fn.select(({ grouped }) => {
// Get all online presence rows for this actor
const actorPresence = [...rawPresenceCollection.values()].filter(
(p) =>
(p as RawPresenceRow).actorId === grouped.actorId &&
(p as RawPresenceRow).status === "online",
) as RawPresenceRow[];

const first = actorPresence[0];
return {
actorId: grouped.actorId as string,
actorType: (first?.actorType ?? "user") as "user" | "agent",
name: first?.name,
deviceIds: actorPresence.map((p) => p.deviceId),
deviceCount: actorPresence.length,
};
});
},
startSync: true,
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing getKey function in collection options.

The createLiveQueryCollection call is missing the getKey option. Other collections in this PR (messages, model-messages) include getKey: (row) => row.id. Without it, the collection may not properly track row identity.

🔧 Suggested fix
 	return createLiveQueryCollection({
 		query: (q) => {
 			// ... query logic
 		},
+		getKey: (row) => row.actorId,
 		startSync: true,
 	});
🤖 Prompt for AI Agents
In `@packages/durable-session/src/collections/presence.ts` around lines 46 - 78,
The collection created by createLiveQueryCollection is missing a getKey option
so rows cannot be uniquely tracked; add getKey to the options and return a
stable unique key for each row (use actorId since the result object includes
actorId). Update the createLiveQueryCollection call to include getKey: (row) =>
row.actorId so the live query can correctly identify/update rows produced by the
query that references rawPresenceCollection, grouped and RawPresenceRow.

Comment thread packages/durable-session/src/index.ts
- Add missing `await` on `response.json()` in `fork()` to properly propagate errors
- Remove incorrect `as "user" | "assistant"` cast in `messageRowToUIMessage` (role includes "system")
- Update JSDoc references from `@electric-sql/durable-session` to `@superset/durable-session`
@Kitenite Kitenite merged commit 7df2335 into main Feb 5, 2026
5 of 6 checks passed
@Kitenite Kitenite deleted the kitenite/migrate-client branch February 5, 2026 23:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant