fix: capture responses streaming api error#2681
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
✅ Files skipped from review due to trivial changes (1)
📝 WalkthroughSummary by CodeRabbit
WalkthroughProvider streaming error handling now preserves nested provider error fields and the SSE payload; the streaming accumulator accepts error-only events and reserves/reuses a terminal error chunk index; terminal error response chunks include serialized RawResponse; logging finalizes, converts, applies, and cleans up accumulated streaming output on error. Changes
Sequence Diagram(s)sequenceDiagram
participant Provider as OpenAI Provider
participant Accumulator as Streaming Accumulator
participant Responses as Response Builder
participant Logger as Logging Plugin
Provider->>Provider: Receive SSE chunk (error), parse JSON
Provider->>Provider: Copy nested provider error fields into bifrostErr
Provider->>Provider: Enrich Error (pass SSE payload)
Provider->>Accumulator: ProcessStreamingChunk(result=nil, bifrostErr)
Accumulator->>Accumulator: Accept error-only event
Accumulator->>Accumulator: Derive requestType from bifrostErr.ExtraFields
Accumulator->>Responses: Build error chunk (marshal RawResponse)
Responses->>Accumulator: Lock and assign/reuse TerminalErrorChunkIndex
Responses->>Logger: Emit finalized error chunk
Logger->>Logger: Finalize accumulator, convert accumulated stream if needed
Logger->>Logger: Apply streaming output to log entry
Logger->>Logger: Cleanup stream accumulator
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
692fc19 to
48b03ec
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
framework/streaming/responses.go (1)
903-912: Minor race window in ChunkIndex assignment.There's a small TOCTOU window between reading
MaxResponsesChunkIndex(line 911) and adding the chunk viaaddResponsesStreamChunk(line 934). If another goroutine adds a chunk in this window, the computed index could collide.In practice this is low-risk since:
- Error chunks are terminal (stream is ending)
- Multiple concurrent error chunks for the same request is unlikely
Consider computing the unique index atomically inside
addResponsesStreamChunkby passing a sentinel value (e.g.,-1) for error chunks and having the method assign the next index:🔧 Optional: Atomic index assignment
if bifrostErr != nil { chunk.FinishReason = bifrost.Ptr("error") if rawBytes, marshalErr := sonic.Marshal(bifrostErr.ExtraFields.RawResponse); marshalErr == nil && len(rawBytes) > 0 { chunk.RawResponse = bifrost.Ptr(string(rawBytes)) } - // Error chunks may arrive without a stream chunk index (result is nil), which can - // collide with index 0 (e.g., response.created) and get deduplicated away. - // Force a unique trailing index so both prior chunks and terminal error are retained. - accumulator := a.getOrCreateStreamAccumulator(requestID) - accumulator.mu.Lock() - chunk.ChunkIndex = accumulator.MaxResponsesChunkIndex + 1 - accumulator.mu.Unlock() + // Use sentinel value -1 to signal that addResponsesStreamChunk should assign next unique index + chunk.ChunkIndex = -1 }Then update
addResponsesStreamChunkto handle the sentinel:// In addResponsesStreamChunk, after acquiring the lock: if chunk.ChunkIndex == -1 { chunk.ChunkIndex = accumulator.MaxResponsesChunkIndex + 1 }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@framework/streaming/responses.go` around lines 903 - 912, There’s a TOCTOU race when computing chunk.ChunkIndex using accumulator.MaxResponsesChunkIndex while unlocked; instead pass a sentinel (e.g., -1) for error chunks from the caller (where getOrCreateStreamAccumulator is used) and let addResponsesStreamChunk perform the atomic assignment while holding the accumulator lock: in addResponsesStreamChunk, after acquiring accumulator.mu, detect if chunk.ChunkIndex == -1 and set it to accumulator.MaxResponsesChunkIndex + 1 (then update MaxResponsesChunkIndex accordingly); this moves the index computation/assignment into addResponsesStreamChunk and eliminates the window around chunk index creation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@core/providers/openai/openai.go`:
- Around line 1757-1758: The merge incorrectly skips using
response.Response.Error.Code when bifrostErr.Error.Code exists but is an empty
string; update the fallback logic in openai.go so empty top-level codes are
treated as missing (same as message handling) by changing the condition that
assigns bifrostErr.Error.Code to check both nil and empty value—e.g., replace
the current if response.Response.Error.Code != "" && bifrostErr.Error.Code ==
nil with a check like response.Response.Error.Code != "" &&
(bifrostErr.Error.Code == nil || *bifrostErr.Error.Code == "") so the nested
code is used when the existing code pointer is nil or points to an empty string.
---
Nitpick comments:
In `@framework/streaming/responses.go`:
- Around line 903-912: There’s a TOCTOU race when computing chunk.ChunkIndex
using accumulator.MaxResponsesChunkIndex while unlocked; instead pass a sentinel
(e.g., -1) for error chunks from the caller (where getOrCreateStreamAccumulator
is used) and let addResponsesStreamChunk perform the atomic assignment while
holding the accumulator lock: in addResponsesStreamChunk, after acquiring
accumulator.mu, detect if chunk.ChunkIndex == -1 and set it to
accumulator.MaxResponsesChunkIndex + 1 (then update MaxResponsesChunkIndex
accordingly); this moves the index computation/assignment into
addResponsesStreamChunk and eliminates the window around chunk index creation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6d1e9e6e-d571-4dc4-b760-c320685c4199
📒 Files selected for processing (5)
core/providers/openai/openai.goframework/streaming/accumulator.goframework/streaming/responses.goplugins/logging/main.goplugins/logging/operations.go
Confidence Score: 5/5Safe to merge — no P0/P1 issues found; all remaining findings are P2 edge cases unlikely to affect production behavior. The core error-propagation fixes are correct and well-guarded. The TerminalErrorChunkIndex dedup mechanism is sound, and the logging path changes handle nil-result errors cleanly. The two P2 notes are theoretical (token-usage loss on providers that send usage mid-stream) or informational (undocumented logic fix in buildResponseForRequestType). Neither blocks merge. framework/streaming/responses.go — the TerminalErrorChunkIndex / MaxResponsesChunkIndex coupling is worth a second look if token-usage accuracy for mid-stream errors becomes important. Important Files Changed
Reviews (4): Last reviewed commit: "fix: wrong increments of chunk index" | Re-trigger Greptile |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@framework/streaming/responses.go`:
- Around line 901-913: Race condition: assigning chunk.ChunkIndex reads
accumulator.MaxResponsesChunkIndex while unlocked, allowing concurrent handlers
to compute the same index; fix by assigning the index while holding the
accumulator lock (i.e., increment MaxResponsesChunkIndex inside
getOrCreateStreamAccumulator's accumulator.mu critical section or here before
releasing the lock) so you atomically set chunk.ChunkIndex =
++accumulator.MaxResponsesChunkIndex (or move index assignment into
addResponsesStreamChunk which already holds the lock); also log sonic.Marshal
errors when serializing bifrostErr.ExtraFields.RawResponse to surface silent
failures.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0df89b9c-e772-4866-959a-05457838c972
📒 Files selected for processing (2)
core/providers/openai/openai.goframework/streaming/responses.go
🚧 Files skipped from review as they are similar to previous changes (1)
- core/providers/openai/openai.go
762f897 to
a8cf690
Compare
0091d7a to
9ea6845
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@framework/streaming/responses.go`:
- Around line 909-913: The current logic in addResponsesStreamChunk
unconditionally increments accumulator.MaxResponsesChunkIndex and assigns
chunk.ChunkIndex, causing duplicate terminal-error callbacks to get new indexes
and bypass de-duplication; modify this by reserving and reusing a single
terminal-error index on the accumulator: under the same accumulator.mu lock used
around MaxResponsesChunkIndex, detect if the incoming chunk represents the
terminal error (use the chunk's terminal/error indicator) and if so, if the
accumulator already has a stored terminalErrorChunkIndex use that value for
chunk.ChunkIndex instead of incrementing; otherwise increment
MaxResponsesChunkIndex, assign it to chunk.ChunkIndex and also store it in
accumulator.terminalErrorChunkIndex for future reuse. Ensure all reads/writes to
MaxResponsesChunkIndex and terminalErrorChunkIndex happen inside the mutex to
avoid races.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8c86133e-8dc9-463e-a4bb-f65ef87bcc7f
📒 Files selected for processing (1)
framework/streaming/responses.go
a50cdde to
a6b9fb5
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
framework/streaming/responses.go (1)
901-904:⚠️ Potential issue | 🟡 MinorPreserve the error payload even when JSON marshaling fails.
RawResponseisinterface{}. If this branch sees an unsupported value, or a plainstring/[]byte,sonic.Marshaleither drops the payload or JSON-quotes it, so the "raw" error chunk stops being raw. Falling back to a verbatim/string format here keeps the debugging data intact.Suggested adjustment
if bifrostErr.ExtraFields.RawResponse != nil { - if rawBytes, marshalErr := sonic.Marshal(bifrostErr.ExtraFields.RawResponse); marshalErr == nil { - chunk.RawResponse = bifrost.Ptr(string(rawBytes)) - } + switch raw := bifrostErr.ExtraFields.RawResponse.(type) { + case string: + chunk.RawResponse = bifrost.Ptr(raw) + case []byte: + chunk.RawResponse = bifrost.Ptr(string(raw)) + default: + if rawBytes, marshalErr := sonic.Marshal(raw); marshalErr == nil { + chunk.RawResponse = bifrost.Ptr(string(rawBytes)) + } else { + chunk.RawResponse = bifrost.Ptr(fmt.Sprintf("%v", raw)) + } + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@framework/streaming/responses.go` around lines 901 - 904, The RawResponse currently gets JSON-marshaled with sonic.Marshal which can drop or quote the original payload; update the branch that sets chunk.RawResponse (when bifrostErr.ExtraFields.RawResponse != nil) to preserve the original payload on marshal failure: attempt sonic.Marshal first, but if marshalErr != nil then inspect bifrostErr.ExtraFields.RawResponse (type switch on []byte, string, or other) and assign a verbatim representation—use the []byte as string, use string as-is, and for other types use a safe fmt.Sprintf("%v", raw) fallback—so chunk.RawResponse always contains the original/raw payload; reference the symbols bifrostErr.ExtraFields.RawResponse, sonic.Marshal, and chunk.RawResponse to locate where to implement this.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@framework/streaming/responses.go`:
- Around line 901-904: The RawResponse currently gets JSON-marshaled with
sonic.Marshal which can drop or quote the original payload; update the branch
that sets chunk.RawResponse (when bifrostErr.ExtraFields.RawResponse != nil) to
preserve the original payload on marshal failure: attempt sonic.Marshal first,
but if marshalErr != nil then inspect bifrostErr.ExtraFields.RawResponse (type
switch on []byte, string, or other) and assign a verbatim representation—use the
[]byte as string, use string as-is, and for other types use a safe
fmt.Sprintf("%v", raw) fallback—so chunk.RawResponse always contains the
original/raw payload; reference the symbols bifrostErr.ExtraFields.RawResponse,
sonic.Marshal, and chunk.RawResponse to locate where to implement this.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 48ef73cf-0a83-434d-989c-163940c9ae9a
📒 Files selected for processing (3)
framework/streaming/accumulator.goframework/streaming/responses.goframework/streaming/types.go
Merge activity
|

Summary
Improves error handling and logging for streaming responses by ensuring error messages and codes are properly propagated, fixing chunk indexing for error responses, and enhancing log data retention during streaming failures.
Changes
Type of change
Affected areas
How to test
Test streaming error scenarios to verify proper error propagation and logging:
Screenshots/Recordings
N/A
Breaking changes
Related issues
N/A
Security considerations
No security implications - this change improves error handling and logging without exposing additional sensitive data.
Checklist
docs/contributing/README.mdand followed the guidelines