Added Claude SDK endpoint#1250
Conversation
📝 WalkthroughWalkthroughAdds a Claude Agent Hono HTTP endpoint and a converter, streams Claude SDK responses as SSE in TanStack AI chunk format, adds two dependencies, and starts a separate agent server alongside the existing streams proxy with coordinated startup/shutdown. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant HonoApp as Hono App\n(claude-agent.ts)
participant SDK as Claude Agent SDK
participant Converter as Message Converter\n(sdk-to-ai-chunks.ts)
participant SSE as SSE Stream
Client->>HonoApp: POST /query (messages, sessionId, opts)
HonoApp->>HonoApp: validate request, select latest user prompt, load session mapping
HonoApp->>SDK: query(prompt, env, options) (streaming)
SDK-->>HonoApp: stream of SDKMessage events
loop for each SDKMessage
HonoApp->>Converter: convert(SDKMessage)
Converter-->>HonoApp: StreamChunk[] (START/ARGS/CONTENT/FINISH/ERROR)
HonoApp->>SSE: send chunks as SSE data events
end
SSE-->>Client: SSE stream (TanStack AI / AG-UI chunks) and final [DONE]
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/streams/src/index.ts (1)
26-40:⚠️ Potential issue | 🟡 MinorGraceful shutdown does not close the proxy or agent HTTP servers.
serve()returns aServerhandle, but it's not captured for either the proxy (line 26) or the agent (line 31). OnSIGINT, only the durable stream server is stopped — in-flight SSE connections on the proxy and agent ports are abandoned without draining.Proposed fix
-serve({ fetch: app.fetch, port: PORT }, (info) => { +const proxyServer = serve({ fetch: app.fetch, port: PORT }, (info) => { console.log(`[streams] Proxy running on http://localhost:${info.port}`); }); // Start Claude agent endpoint -serve({ fetch: claudeAgentApp.fetch, port: AGENT_PORT }, (info) => { +const agentServer = serve({ fetch: claudeAgentApp.fetch, port: AGENT_PORT }, (info) => { console.log( `[streams] Claude agent endpoint on http://localhost:${info.port}`, ); }); // Graceful shutdown process.on("SIGINT", async () => { + proxyServer.close(); + agentServer.close(); await durableStreamServer.stop(); process.exit(0); });
🤖 Fix all issues with AI agents
In `@apps/streams/src/claude-agent.ts`:
- Line 26: The claudeSessions Map is unbounded and never evicted; update session
lifecycle handling so entries are removed when a session completes or errors.
Specifically, after you add entries to claudeSessions on the "system-init" path,
ensure you remove the corresponding key when the associated result finishes or
errors (where the code handles the final result/error for a session), or replace
claudeSessions with a bounded cache (LRU or TTL) implementation; reference the
claudeSessions Map and the places that process "system-init" and the
result/error callbacks to add explicit cleanup or swap in a TTL/LRU-backed map.
- Around line 60-73: The inline magic values in the query options should be
extracted to module-level constants and the permission behavior made
configurable: replace the hardcoded model string, maxTurns number, and
permissionMode string used in the query(...) options (the call that uses
claudeSessionId, cwd, binaryPath, queryEnv, and AbortController) with named
constants (e.g., DEFAULT_CLAUDE_MODEL, DEFAULT_MAX_TURNS,
DEFAULT_PERMISSION_MODE) defined at the top of the file; add a config flag or
environment variable to control permissionMode instead of always using
"bypassPermissions", and add a brief in-code comment documenting the security
implications of bypassPermissions and recommending it be disabled or restricted
for non-local endpoints.
- Around line 49-55: The current construction of queryEnv spreads process.env
when agentEnv is missing, leaking all server env vars to the Claude subprocess;
instead build queryEnv by starting from an explicit allowlist of only the
variables the binary needs (e.g., NODE_ENV, PATH, any CLAUDE_* vars) and then
merge in agentEnv if provided. Modify the logic around queryEnv, agentEnv, and
the assignment of queryEnv.CLAUDE_CODE_ENTRYPOINT so that you initialize
queryEnv from a small allowlist (or a constant ALLOWED_CLAUDE_ENV array) and
only copy those keys from process.env, then overlay agentEnv on top if present,
and finally set CLAUDE_CODE_ENTRYPOINT = "sdk-ts".
- Around line 29-35: The handler currently uses c.req.json<T>() (assigned to
body) which only asserts types at compile time; add a Zod schema (e.g., define a
z.object with messages:
z.array(z.object({role:z.string(),content:z.string()})).optional or required per
spec, stream: z.boolean().optional, sessionId: z.string().optional, cwd:
z.string().optional, env: z.record(z.string()).optional) and validate the
incoming payload with schema.safeParse before using it; if validation fails
return an immediate 400 response with the parsed error details so downstream
code in claude-agent.ts uses a guaranteed-shape body.
- Line 71: The AbortController created and passed into query() is not being
triggered on abort, so wire it up: create a local const abortController = new
AbortController(), pass abortController.signal to the SDK request if the SDK
expects a signal (or ensure the options include the controller under the exact
key the SDK expects), and in the existing abort handler that calls
result.interrupt()/result.close() also call abortController.abort() so the
controller's signal actually cancels the underlying request/subprocess.
In `@apps/streams/src/sdk-to-ai-chunks.ts`:
- Around line 316-322: The code silently swallows JSON.parse errors when
building parsedInput for tool calls inside the block.type === "tool_use" branch;
update the try/catch around JSON.parse(block.argsAccumulator || "{}") to log the
parsing error and relevant context (at least block.toolCallId, block.toolName,
and the raw argsAccumulator) instead of silently falling back to {}—use the
module's existing logger (e.g., processLogger.error) or console.error if no
logger is available, then keep parsedInput = {} after logging so behavior is
unchanged except for diagnostics.
- Around line 394-438: In handleResultMessage, avoid emitting both RUN_ERROR and
RUN_FINISHED for the same message: when message.subtype?.startsWith("error")
push the RUN_ERROR chunk (with runId/state.runId and error details from
message.subtype) and then return the chunks immediately (or otherwise skip
creating the RUN_FINISHED chunk); ensure you preserve any usage data only for
non-error paths so the client’s StreamProcessor does not see a finish after an
error.
- Line 18: The module imports StreamChunk from `@tanstack/ai` but the repo also
defines a local StreamChunk interface in types.ts that downstream files
(stream-writer.ts, protocol.ts) use, causing a type mismatch; either remove the
local StreamChunk and replace its uses with the `@tanstack/ai` StreamChunk across
the codebase, or change sdk-to-ai-chunks.ts to return the local
types.StreamChunk (instead of `@tanstack/ai.StreamChunk`) so all files share the
same loose shape; update the import statements and the return type (in
sdk-to-ai-chunks.ts functions) and run type-check to ensure stream-writer.ts and
protocol.ts compile with the chosen single authoritative StreamChunk type.
- Around line 288-297: The thinking_delta branch currently emits a STEP_FINISHED
for every token which breaks the step lifecycle; update the thinking delta
handling so that in the switch/case for "thinking_delta" in
handleContentBlockDelta you emit an incremental event (e.g., a new event type
STEP_DELTA or similar) with delta.thinking (satisfying StreamChunk) instead of
STEP_FINISHED, and then emit the single STEP_FINISHED only from
handleContentBlockStop when the thinking block actually ends; refer to the
existing symbols handleContentBlockStart, handleContentBlockDelta,
handleContentBlockStop, the "thinking_delta" case, and the
STEP_FINISHED/StreamChunk shape to implement this change consistently.
🧹 Nitpick comments (3)
apps/streams/src/sdk-to-ai-chunks.ts (1)
377-383:toolNameset to empty string for tool results — may confuse downstream consumers.When emitting
TOOL_CALL_ENDfor user-message tool results,toolNameis hardcoded to"". The SDK'stool_resultblock doesn't carry the tool name, so this is understandable, but downstream code may rely ontoolNamebeing present. Consider adding a brief comment explaining why it's empty, or using a sentinel like"unknown"if the StreamProcessor requires a non-empty value.apps/streams/src/claude-agent.ts (1)
115-147: Multiple emptycatchblocks swallow errors silently.Lines 131, 138, and 145 have empty catch blocks. While these are defensive (controller/result may already be closed), the guideline says to at minimum log errors. A debug-level log helps diagnose issues in production.
Proposed fix — add minimal logging
try { controller.enqueue( encoder.encode(`data: ${JSON.stringify(errorChunk)}\n\n`), ); controller.enqueue(encoder.encode("data: [DONE]\n\n")); - } catch { - // Controller may already be closed + } catch (enqueueErr) { + console.debug("[claude-agent] Failed to enqueue error chunk (controller likely closed):", enqueueErr); }apps/streams/package.json (1)
16-16: Pre-1.0 dependencies use caret ranges — minor bumps may break.Both
@anthropic-ai/claude-agent-sdk(latest: 0.2.29) and@tanstack/ai(latest: 0.3.0) are pre-1.0. Under semver,^0.2.19allows0.3.0which can introduce breaking changes. This is consistent with other0.xdeps in the file (e.g.,@tanstack/db), but worth flagging since the Claude Agent SDK is still evolving. Consider pinning more tightly (e.g.,~0.2.19) if you encounter instability.Also applies to: 20-20
| * - RUN_ERROR — error during execution | ||
| */ | ||
|
|
||
| import type { StreamChunk } from "@tanstack/ai"; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check all StreamChunk references and their context
rg -n --type=ts "StreamChunk" -C2 -g '!node_modules/**'Repository: superset-sh/superset
Length of output: 16834
Remove or align StreamChunk type imports across the module.
The converter imports StreamChunk from @tanstack/ai and uses it to type-check returned chunks with satisfies, but apps/streams/src/types.ts exports a local StreamChunk interface with a loose shape ({ type: string; [key: string]: unknown }). Downstream code in stream-writer.ts and protocol.ts imports the local definition, creating a type inconsistency. The local catch-all type defeats the type safety benefits of @tanstack/ai's strict union of event types.
Either:
- Remove the local
StreamChunkfromtypes.tsand import from@tanstack/aieverywhere, or - Have
sdk-to-ai-chunks.tsreturn the localStreamChunktype if loose typing is intentional.
🤖 Prompt for AI Agents
In `@apps/streams/src/sdk-to-ai-chunks.ts` at line 18, The module imports
StreamChunk from `@tanstack/ai` but the repo also defines a local StreamChunk
interface in types.ts that downstream files (stream-writer.ts, protocol.ts) use,
causing a type mismatch; either remove the local StreamChunk and replace its
uses with the `@tanstack/ai` StreamChunk across the codebase, or change
sdk-to-ai-chunks.ts to return the local types.StreamChunk (instead of
`@tanstack/ai.StreamChunk`) so all files share the same loose shape; update the
import statements and the return type (in sdk-to-ai-chunks.ts functions) and run
type-check to ensure stream-writer.ts and protocol.ts compile with the chosen
single authoritative StreamChunk type.
| case "thinking_delta": { | ||
| const stepId = `thinking-${index}`; | ||
| return [ | ||
| { | ||
| type: "STEP_FINISHED", | ||
| stepId, | ||
| delta: delta.thinking, | ||
| timestamp: now, | ||
| } satisfies StreamChunk, | ||
| ]; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's understand the file structure and find relevant type definitions
find . -type f -name "*.ts" -o -name "*.tsx" | grep -E "(stream|chunk|sdk)" | head -20Repository: superset-sh/superset
Length of output: 832
🏁 Script executed:
# Look at the full context of the file to understand the pattern
wc -l apps/streams/src/sdk-to-ai-chunks.tsRepository: superset-sh/superset
Length of output: 104
🏁 Script executed:
# Search for StreamChunk type definition
rg "type StreamChunk|interface StreamChunk" --type ts --type tsx -A 20Repository: superset-sh/superset
Length of output: 91
🏁 Script executed:
# Look for STEP_FINISHED and STEP_DELTA event types
rg "STEP_FINISHED|STEP_DELTA" --type ts --type tsxRepository: superset-sh/superset
Length of output: 91
🏁 Script executed:
# Check the full case statement structure to see how other delta events are handled
cat -n apps/streams/src/sdk-to-ai-chunks.ts | sed -n '280,320p'Repository: superset-sh/superset
Length of output: 1197
🏁 Script executed:
# Search for StreamChunk definition
rg "type StreamChunk|interface StreamChunk" -A 30 -B 2Repository: superset-sh/superset
Length of output: 3816
🏁 Script executed:
# Look for all event types used (STEP_FINISHED, STEP_DELTA, etc.)
rg "type: \"[A-Z_]+\"" apps/streams/src/sdk-to-ai-chunks.tsRepository: superset-sh/superset
Length of output: 312
🏁 Script executed:
# Search for protocol definitions
cat -n apps/streams/src/protocol.ts | head -100Repository: superset-sh/superset
Length of output: 3570
🏁 Script executed:
# Look at more context around thinking_delta and tool_call_delta to understand pattern
cat -n apps/streams/src/sdk-to-ai-chunks.ts | sed -n '250,310p'Repository: superset-sh/superset
Length of output: 1696
🏁 Script executed:
# Look for any references to STEP_DELTA to see if it exists in the codebase
rg "STEP_DELTA" --type tsRepository: superset-sh/superset
Length of output: 46
🏁 Script executed:
# Look for how thinking blocks/steps are finalized (when they actually finish)
rg "thinking|STEP_STARTED|STEP_FINISHED" -B 3 -A 3 apps/streams/src/sdk-to-ai-chunks.ts | head -80Repository: superset-sh/superset
Length of output: 1694
🏁 Script executed:
# Check content block stop handling for thinking blocks
cat -n apps/streams/src/sdk-to-ai-chunks.ts | sed -n '306,360p'Repository: superset-sh/superset
Length of output: 1805
🏁 Script executed:
# Look for how the step lifecycle should work
rg "STEP_STARTED.*STEP_FINISHED" -B 5 -A 5Repository: superset-sh/superset
Length of output: 46
thinking_delta emits STEP_FINISHED on every delta — this is semantically incorrect and breaks the step lifecycle.
Each thinking token emits STEP_FINISHED, but STEP_FINISHED should only be emitted once when the thinking step actually completes. Currently:
handleContentBlockStartemitsSTEP_STARTEDfor thinking blockshandleContentBlockDeltaemitsSTEP_FINISHEDrepeatedly on eachthinking_deltahandleContentBlockStopreturns empty (no actual completion event)
This inverts the step lifecycle. The pattern used elsewhere (text deltas → TEXT_MESSAGE_CONTENT, tool args → TOOL_CALL_ARGS) streams incremental updates; thinking deltas should follow the same pattern with a distinct event type (e.g., STEP_DELTA) rather than reusing the completion marker. Emit STEP_FINISHED once in handleContentBlockStop when the thinking block actually ends.
🤖 Prompt for AI Agents
In `@apps/streams/src/sdk-to-ai-chunks.ts` around lines 288 - 297, The
thinking_delta branch currently emits a STEP_FINISHED for every token which
breaks the step lifecycle; update the thinking delta handling so that in the
switch/case for "thinking_delta" in handleContentBlockDelta you emit an
incremental event (e.g., a new event type STEP_DELTA or similar) with
delta.thinking (satisfying StreamChunk) instead of STEP_FINISHED, and then emit
the single STEP_FINISHED only from handleContentBlockStop when the thinking
block actually ends; refer to the existing symbols handleContentBlockStart,
handleContentBlockDelta, handleContentBlockStop, the "thinking_delta" case, and
the STEP_FINISHED/StreamChunk shape to implement this change consistently.
🧹 Preview Cleanup CompleteThe following preview resources have been cleaned up:
Thank you for your contribution! 🎉 |
- Add Zod request validation matching existing codebase patterns - Fix unbounded session map with TTL eviction (24h) and size cap (1000) - Wire AbortController to request signal abort handler - Fix RUN_ERROR + RUN_FINISHED double emit (early return after error) - Add logging in silent catch blocks per AGENTS.md guidelines - Extract magic numbers to named constants - Capture serve() handles for graceful shutdown
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@apps/streams/src/claude-agent.ts`:
- Around line 100-109: The handler for app.post("/") currently calls
c.req.json() which can throw on malformed JSON before agentRequestSchema
validation; wrap the await c.req.json() call in a try-catch inside the
app.post("/") handler, catch JSON parse errors and return c.json({ error:
"Invalid JSON", details: err.message }, 400), then only run
agentRequestSchema.safeParse on the parsed body; keep existing logic for schema
validation and responses (refer to agentRequestSchema and the app.post("/")
request flow) so malformed JSON produces a clean 400 instead of a 500.
- Around line 191-213: The catch block handling non-AbortError exceptions in the
streaming logic (inside the async streaming function in claude-agent.ts where
converter.state.runId, controller, and encoder are used) writes a RUN_ERROR SSE
to the client but never logs the actual exception server-side; update that catch
block to log the error (e.g., via processLogger.error or console.error)
including the error object/stack and contextual info such as
converter.state.runId and a short message before or immediately after attempting
to enqueue the RUN_ERROR and [DONE] events so the server retains full diagnostic
details if enqueuing fails.
In `@apps/streams/src/sdk-to-ai-chunks.ts`:
- Around line 348-394: The function handleUserMessage currently dereferences
message.message.content without guarding against message.message being
null/undefined; add a defensive null-check at the top of handleUserMessage
(e.g., ensure message.message exists and is the expected shape or bail out)
before reading content, then proceed only when content is an array; reference
the existing SDKUserMessage type and the local variable content inside
handleUserMessage to locate where to add the guard and return an empty array
early if message.message is missing or invalid.
🧹 Nitpick comments (6)
apps/streams/src/sdk-to-ai-chunks.ts (2)
146-167: Type-narrowing viaascasts rather than runtime checks on external data.
convertMessagenarrows SDK messages usingascasts (e.g.,message as SDKPartialAssistantMessage) after switching onmessage.type. Since these messages come from an external SDK, if the shape ever diverges (missingevent, missingmessage.content, etc.), this will produce runtime errors deep in handlers rather than at the boundary. Theswitchonmessage.typeis a good start, but the inner fields are trusted implicitly.Consider adding lightweight guards (e.g., checking
"event" in messagebefore casting) for the critical paths, or use Zod.safeParsefor the SDK message types. Based on learnings, validate external API data as untrusted by handling missing fields and unexpected shapes with tolerant parsing and explicit fallbacks.
58-63: Catch-all variant inSDKMessageunion weakens exhaustiveness checking.The
| { type: string; [key: string]: unknown }variant (line 63) overlaps with every other union member, so TypeScript can never flag an unhandledmessage.typein aswitch. This is fine for forward-compatibility with unknown SDK message types, but it means thedefaultbranch silently absorbs any typo or new message type.If forward-compatibility is intentional, this is acceptable — just worth noting that exhaustiveness checks are disabled.
apps/streams/src/claude-agent.ts (2)
58-76: Session eviction performs O(n log n) sort on everysetClaudeSessionIdcall when over capacity.
evictStaleSessions()is called on everyset(line 87). The TTL sweep is O(n), and when still over capacity, it sorts all entries. WithSESSION_MAX_SIZE = 1000, this is fine for now, but if the cap increases, consider a more efficient eviction strategy (e.g., LRU linked list or a min-heap bylastAccessedAt).
170-178: System init message handling casts toRecord<string, unknown>— consider a type guard.Line 171 casts to
Record<string, unknown>and then accesses.type,.subtype, and.session_idwith additional casts. A small type guard or check (e.g.,typeof msg.session_id === "string") at line 173 would be more defensive, since the SDK message shape is external.apps/streams/src/index.ts (2)
40-46: NoSIGTERMhandler — onlySIGINTis caught.Container orchestrators (Docker, Kubernetes) send
SIGTERMfor graceful shutdown, notSIGINT. This is an existing pattern in the file, so not necessarily in scope for this PR, but worth noting for operational correctness.
6-8: No port-conflict detection between PORT, INTERNAL_PORT, and AGENT_PORT.If any two of these resolve to the same value (all default to different ports, but can be overridden via env), one
serve()call will fail with anEADDRINUSEerror. Consider a startup check or at least a log warning.
…ssage - Wrap c.req.json() in try-catch for clean 400 on invalid JSON - Log non-abort stream errors server-side before sending to client - Add null guard on message.message in handleUserMessage
Dual StreamChunk types exist: strict union from @tanstack/ai (used by sdk-to-ai-chunks.ts) and loose catch-all in types.ts (used by protocol.ts). Works at runtime via JSON boundary but undermines type safety. Documented fix path for a future cleanup PR.
Description
Related Issues
Type of Change
Testing
Screenshots (if applicable)
Additional Notes
Summary by CodeRabbit
New Features
Chores