adds concurrent map fixes for plugin timer#2835
Conversation
|
|
📝 WalkthroughSummary by CodeRabbit
WalkthroughThis pull request introduces explicit Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Bifrost as Bifrost<br/>(requestWorker)
participant Pipeline as PluginPipeline<br/>(streamingMu)
participant Provider
participant Stream
rect rgba(100, 150, 200, 0.5)
Note over Bifrost,Pipeline: Retry Attempt N
Bifrost->>Pipeline: allocate per-attempt<br/>pipeline & finalizer
Bifrost->>Provider: handleProviderStreamRequest<br/>(..., postHookSpanFinalizer)
Provider->>Stream: initialize stream with<br/>finalizer callback
end
rect rgba(200, 100, 100, 0.5)
Note over Pipeline,Stream: Streaming Chunks
Stream->>Pipeline: accumulatePluginTiming()<br/>(under streamingMu)
Stream->>Pipeline: GetChunkCount()<br/>(under streamingMu)
Stream->>Client: send chunk
end
rect rgba(100, 200, 100, 0.5)
Note over Bifrost,Pipeline: Retry on Error
alt bifrostError != nil
Provider->>Pipeline: reset & release<br/>(via last attempt's<br/>postHookSpanFinalizer)
Pipeline->>Pipeline: invoke finalizer<br/>under streamingMu
else Stream Success
Stream->>Pipeline: FinalizeStreamingPostHookSpans<br/>(snapshot under lock)
Pipeline->>Client: finalized aggregated spans
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain main module or its selected dependencies" Comment |
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Confidence Score: 5/5Safe to merge; all findings are minor style/documentation issues with no runtime impact. The synchronization design is sound: mutex correctly guards all concurrently-accessed fields,
Important Files Changed
Reviews (4): Last reviewed commit: "moved finalizer out of context for fixin..." | Re-trigger Greptile |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@core/bifrost.go`:
- Around line 6464-6495: In resetPluginPipeline (method PluginPipeline) the
slices p.preHookErrors and p.postHookErrors are only resliced to length 0 but
their backing arrays still hold references; call clear(p.preHookErrors) and
clear(p.postHookErrors) before re-slicing them (i.e. clear(...) then
p.preHookErrors = p.preHookErrors[:0] and same for postHookErrors) so the pooled
PluginPipeline does not retain error objects between requests.
- Around line 5582-5627: The streaming retry closure never recalculates the
per-key alias, so before creating postHookRunner and calling
bifrost.handleProviderStreamRequest you must resolve the model alias for the
chosen key and update the request and local resolvedModel used by
PopulateExtraFields; i.e., call the provider/key-specific alias resolution
(e.g., provider.ResolveModelAlias or bifrost.resolveModelAliasForKey) with k and
originalModelRequested, set resolvedModel to that result, and patch req (the
request object) to reflect the resolved model (so PopulateExtraFields and the
upstream provider call use the per-key alias) before invoking
handleProviderStreamRequest.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5250e320-6828-43c1-b237-9ba3f88c474d
📒 Files selected for processing (3)
AGENTS.mdcore/bifrost.gocore/pluginpipelinerace_test.go
2c78d66 to
f9fcb40
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@core/bifrost.go`:
- Around line 5583-5619: Snapshot the per-attempt state instead of closing over
shared variables: create an attempt-local copy of resolvedModel (e.g.,
attemptResolvedModel := resolvedModel) and use that local variable inside
postHookRunner (and in all PopulateExtraFields calls) so retries don't observe
later alias changes; likewise stop mutating the shared req.Context by removing
the req.Context.SetValue(schemas.BifrostContextKeyPostHookSpanFinalizer,
postHookSpanFinalizer) call and instead carry postHookSpanFinalizer in
attempt-local state (or put it into a derived per-attempt context passed into
handleProviderStreamRequest) so each attempt keeps its own finalizer and
pipeline instance.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 52b20152-e3e6-4bc6-b455-f7ef36cb306b
📒 Files selected for processing (14)
AGENTS.mdcore/bifrost.gocore/pluginpipelinerace_test.goframework/go.modplugins/compat/go.modplugins/governance/go.modplugins/jsonparser/go.modplugins/logging/go.modplugins/maxim/go.modplugins/mocker/go.modplugins/otel/go.modplugins/semanticcache/go.modplugins/telemetry/go.modtransports/go.mod
✅ Files skipped from review due to trivial changes (12)
- plugins/compat/go.mod
- plugins/mocker/go.mod
- plugins/otel/go.mod
- plugins/semanticcache/go.mod
- plugins/governance/go.mod
- AGENTS.md
- plugins/telemetry/go.mod
- plugins/maxim/go.mod
- transports/go.mod
- plugins/jsonparser/go.mod
- framework/go.mod
- plugins/logging/go.mod
🚧 Files skipped from review as they are similar to previous changes (1)
- core/pluginpipelinerace_test.go
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
core/providers/utils/utils.go (1)
2138-2176:⚠️ Potential issue | 🟠 Major
ProcessAndSendErrorstill drops the terminal-span/finalizer path.This helper now accepts
postHookSpanFinalizer, but it never uses it and still bypassescompleteDeferredSpan. On the terminal read-error path, that means the error chunk is emitted without closing the deferred LLM span or finalizing post-hook spans. Line 3166 incore/providers/vertex/vertex.goalready relies on this helper for stream read failures, so those failures will leave tracing/plugin cleanup incomplete.💡 Proposed fix
func ProcessAndSendError( ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, err error, responseChan chan *schemas.BifrostStreamChunk, logger schemas.Logger, postHookSpanFinalizer func(context.Context), ) { - // Send scanner error through channel bifrostError := &schemas.BifrostError{ IsBifrostError: true, Error: &schemas.ErrorField{ Message: fmt.Sprintf("Error reading stream: %v", err), Error: err, }, } - processedResponse, processedError := postHookRunner(ctx, nil, bifrostError) - - if HandleStreamControlSkip(processedError) { - return - } - - streamResponse := &schemas.BifrostStreamChunk{} - if processedResponse != nil { - streamResponse.BifrostTextCompletionResponse = processedResponse.TextCompletionResponse - streamResponse.BifrostChatResponse = processedResponse.ChatResponse - streamResponse.BifrostResponsesStreamResponse = processedResponse.ResponsesStreamResponse - streamResponse.BifrostSpeechStreamResponse = processedResponse.SpeechStreamResponse - streamResponse.BifrostTranscriptionStreamResponse = processedResponse.TranscriptionStreamResponse - } - if processedError != nil { - streamResponse.BifrostError = processedError - } - - select { - case responseChan <- streamResponse: - case <-ctx.Done(): - } + ProcessAndSendBifrostError(ctx, postHookRunner, bifrostError, responseChan, logger, postHookSpanFinalizer) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/utils/utils.go` around lines 2138 - 2176, ProcessAndSendError is not invoking the post-hook span finalizer or completing deferred LLM spans, so terminal stream-error paths leave tracing and plugin cleanup incomplete; update ProcessAndSendError to call postHookSpanFinalizer(ctx) (if non-nil) and completeDeferredSpan(ctx) (the helper that closes the deferred LLM span) before emitting the final error chunk and also before returning when HandleStreamControlSkip(processedError) is true, ensuring both normal send and early-skip paths finalize spans; use the existing function names postHookSpanFinalizer, completeDeferredSpan, postHookRunner, ctx and responseChan to locate where to insert those calls.core/providers/azure/azure.go (1)
1006-1008:⚠️ Potential issue | 🟠 MajorUse
streamingClientfor Azure speech streaming.This stream path still does
provider.client.Do(req, resp)whileresp.StreamBody = true. That keeps the unary client'sReadTimeouton a long-lived stream, so Azure TTS can still be cut off by fasthttp response deadlines even after the stack's streaming-client split.Suggested fix
- requestErr := provider.client.Do(req, resp) + requestErr := provider.streamingClient.Do(req, resp)As per coding guidelines
core/providers/**/*.go: Always use thestreamingClient(notclient) when making calls to provider APIs for SSE, EventStream, or chunked response paths to prevent streams from being killed by fasthttp's response deadline.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/azure/azure.go` around lines 1006 - 1008, The Azure streaming path sets resp.StreamBody = true but still calls provider.client.Do(req, resp), which allows the unary client's ReadTimeout to kill long-lived streams; change the call to use provider.streamingClient.Do(req, resp) (or otherwise route requests for SSE/EventStream/chunked responses through streamingClient) wherever resp.StreamBody is set or where Azure TTS/event streaming is performed (look for provider.client.Do and resp.StreamBody = true in this file) so streamingClient is used for long-lived responses.core/providers/gemini/gemini.go (1)
448-451:⚠️ Potential issue | 🟠 MajorEnsure passthrough finalization is called exactly once across all Gemini streaming paths.
Early-return passthrough paths at lines 448-451, 943-946, 1432-1435, and 1721-1724 skip finalization entirely because they return before the goroutine with the deferred
EnsureStreamFinalizerCalledis created. Meanwhile,PassthroughStream(lines 4200-4250) invokespostHookSpanFinalizer(ctx)explicitly at lines 4246-4247 and also defersEnsureStreamFinalizerCalled, causing double finalization. SinceEnsureStreamFinalizerCalleddirectly calls the finalizer without idempotency guards, the deferred call will execute a second time after the explicit invocation completes.Fix: Route all passthrough finalization through a single helper (whether direct call or deferred), or ensure the early-return paths also invoke the finalizer before returning.
Also applies to: 943-946, 1432-1435, 1721-1724, 4203-4248
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/gemini/gemini.go` around lines 448 - 451, The passthrough return paths are bypassing the goroutine that defers EnsureStreamFinalizerCalled and PassthroughStream also calls postHookSpanFinalizer explicitly then defers EnsureStreamFinalizerCalled, causing double or missed finalization; fix by centralizing finalization: add a single helper (e.g., finalizePassthrough(ctx) or ensurePassthroughFinalizerCalled) that calls postHookSpanFinalizer and EnsureStreamFinalizerCalled exactly once, call that helper from all early-return branches where providerUtils.SetupStreamingPassthrough(...) returns true (instead of returning early without finalizing), and in PassthroughStream remove the duplicate explicit call or the duplicate defer so only the centralized helper runs once (update the responseChan immediate-close paths to invoke the helper before returning).core/providers/xai/xai.go (1)
108-125:⚠️ Potential issue | 🟠 MajorPass the xAI auth header into text-completion streaming.
TextCompletionStreamignoreskeyand sendsnilauth headers toHandleOpenAITextCompletionStreaming, whileChatCompletionStreamandResponsesStreamaddAuthorization: Bearer .... This will make/v1/completionsstreaming fail with unauthorized responses.🔐 Proposed fix
func (provider *XAIProvider) TextCompletionStream(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, postHookSpanFinalizer func(context.Context), key schemas.Key, request *schemas.BifrostTextCompletionRequest) (chan *schemas.BifrostStreamChunk, *schemas.BifrostError) { + var authHeader map[string]string + if key.Value.GetValue() != "" { + authHeader = map[string]string{"Authorization": "Bearer " + key.Value.GetValue()} + } + return openai.HandleOpenAITextCompletionStreaming( ctx, provider.streamingClient, provider.networkConfig.BaseURL+"/v1/completions", request, - nil, + authHeader, provider.networkConfig.ExtraHeaders, providerUtils.ShouldSendBackRawRequest(ctx, provider.sendBackRawRequest), providerUtils.ShouldSendBackRawResponse(ctx, provider.sendBackRawResponse),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/xai/xai.go` around lines 108 - 125, TextCompletionStream is passing nil for auth headers so completions streaming will be unauthorized; update XAIProvider.TextCompletionStream to build and pass the same Authorization header used by ChatCompletionStream/ResponsesStream (i.e., create a header map with "Authorization: Bearer <key.Value()>" or use the existing helper that constructs provider auth headers) into the call to openai.HandleOpenAITextCompletionStreaming instead of nil, keeping all other arguments (provider.streamingClient, networkConfig.BaseURL+"/v1/completions", ParseXAIError, postHookRunner, postHookSpanFinalizer, provider.logger, etc.) unchanged.core/providers/elevenlabs/elevenlabs.go (1)
327-337:⚠️ Potential issue | 🟡 MinorRelease the pooled response on the validation fast-path.
respis acquired on Line 328, but the early return on Lines 335-337 happens before any release path is installed. Invalid requests will leak a pooledfasthttp.Response.🛠️ Proposed fix
- // Create HTTP request for streaming - req := fasthttp.AcquireRequest() - resp := fasthttp.AcquireResponse() - resp.StreamBody = true - defer fasthttp.ReleaseRequest(req) - - // Set any extra headers from network config - providerUtils.SetExtraHeaders(ctx, req, provider.networkConfig.ExtraHeaders, nil) - if request.Params == nil || request.Params.VoiceConfig == nil || request.Params.VoiceConfig.Voice == nil { return nil, providerUtils.EnrichError(ctx, providerUtils.NewBifrostOperationError("voice parameter is required", nil), jsonBody, nil, provider.sendBackRawRequest, provider.sendBackRawResponse) } + + // Create HTTP request for streaming + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + resp.StreamBody = true + defer fasthttp.ReleaseRequest(req) + + // Set any extra headers from network config + providerUtils.SetExtraHeaders(ctx, req, provider.networkConfig.ExtraHeaders, nil)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/elevenlabs/elevenlabs.go` around lines 327 - 337, The code acquires resp via fasthttp.AcquireResponse() but returns early on the voice validation fast-path without releasing it, leaking the pooled response; fix by ensuring resp is released before any early return (either call fasthttp.ReleaseResponse(resp) immediately before the return in the block that checks request.Params/VoiceConfig/Voice, or move a defer fasthttp.ReleaseResponse(resp) right after resp is acquired) so that the pooled response is always returned to the pool.core/providers/bedrock/bedrock.go (1)
1900-1912:⚠️ Potential issue | 🟡 MinorUse
provider.GetProviderKey()in these unsupported stream stubs.These branches still hardcode
schemas.Bedrock, so custom provider aliases lose the correct provider name in the returned error metadata.ImageEditStreamandPassthroughStreamalready use the right pattern.Based on learnings: In provider implementations, when raising unsupported-operation errors, pass provider.GetProviderKey() to NewUnsupportedOperationError so error messages and ExtraFields.Provider reflect the provider's alias.🪪 Proposed fix
func (provider *BedrockProvider) SpeechStream(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, postHookSpanFinalizer func(context.Context), key schemas.Key, request *schemas.BifrostSpeechRequest) (chan *schemas.BifrostStreamChunk, *schemas.BifrostError) { - return nil, providerUtils.NewUnsupportedOperationError(schemas.SpeechStreamRequest, schemas.Bedrock) + return nil, providerUtils.NewUnsupportedOperationError(schemas.SpeechStreamRequest, provider.GetProviderKey()) } @@ func (provider *BedrockProvider) TranscriptionStream(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, postHookSpanFinalizer func(context.Context), key schemas.Key, request *schemas.BifrostTranscriptionRequest) (chan *schemas.BifrostStreamChunk, *schemas.BifrostError) { - return nil, providerUtils.NewUnsupportedOperationError(schemas.TranscriptionStreamRequest, schemas.Bedrock) + return nil, providerUtils.NewUnsupportedOperationError(schemas.TranscriptionStreamRequest, provider.GetProviderKey()) } @@ func (provider *BedrockProvider) ImageGenerationStream(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, postHookSpanFinalizer func(context.Context), key schemas.Key, request *schemas.BifrostImageGenerationRequest) (chan *schemas.BifrostStreamChunk, *schemas.BifrostError) { - return nil, providerUtils.NewUnsupportedOperationError(schemas.ImageGenerationStreamRequest, schemas.Bedrock) + return nil, providerUtils.NewUnsupportedOperationError(schemas.ImageGenerationStreamRequest, provider.GetProviderKey()) }Also applies to: 1983-1985
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/bedrock/bedrock.go` around lines 1900 - 1912, The three unsupported-stream stubs SpeechStream, Transcription, and TranscriptionStream currently call providerUtils.NewUnsupportedOperationError(..., schemas.Bedrock) and should use the provider alias instead; update each call to pass provider.GetProviderKey() as the provider argument (e.g., providerUtils.NewUnsupportedOperationError(schemas.SpeechStreamRequest, provider.GetProviderKey())), likewise for TranscriptionRequest and TranscriptionStreamRequest so the returned error ExtraFields.Provider reflects the provider's alias.
🧹 Nitpick comments (2)
core/schemas/provider.go (1)
502-506: Consider naming the span-finalizer callback type.
func(context.Context)is now repeated across every streaming method. A shared exported type next toPostHookRunnerwould centralize the contract and make later signature/doc updates less error-prone.Also applies to: 510-510, 514-514, 526-526, 530-530, 535-536, 540-541, 599-599
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/schemas/provider.go` around lines 502 - 506, The repeated anonymous callback type func(context.Context) used for span finalization should be replaced with a single exported type alias (e.g., PostHookSpanFinalizer or SpanFinalizer) declared near PostHookRunner; add the new type declaration and then update the method signatures (starting with TextCompletionStream and the other streaming methods referenced) and their doc comments to use that exported type instead of func(context.Context) so the contract is centralized and easy to update later.core/providers/elevenlabs/elevenlabs.go (1)
310-310: Measure stream latency from after setup, not beforeDo.Line 352 starts the timer before the handshake, so the final done-chunk latency on Lines 462-468 includes connection/setup time. The streaming handlers elsewhere in
core/providers/start that clock after the stream is established.Based on learnings: In Go streaming handlers under core/providers, initialize the streaming startTime inside the streaming goroutine after client.Do and the stream is set up, so the final-chunk latency reflects post-handshake streaming time.⏱️ Suggested adjustment
- startTime := time.Now() err := provider.streamingClient.Do(req, resp) if err != nil { defer providerUtils.ReleaseStreamingResponse(resp) ... } @@ go func() { + startTime := time.Now() defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer)Also applies to: 352-353, 467-476
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/elevenlabs/elevenlabs.go` at line 310, The SpeechStream handler is starting its latency timer before the HTTP/WebSocket handshake, so the reported final-chunk latency (done-chunk) includes connection/setup time; in ElevenlabsProvider.SpeechStream move the startTime initialization out of the setup phase and into the streaming goroutine immediately after a successful client.Do (i.e., after the stream is established and any handshake completes), ensure any uses that compute the done-chunk latency reference that in-goroutine startTime, and remove or relocate the earlier startTime assignment (the ones around client.Do and the final chunk emission) so the latency reflects only post-handshake streaming time.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@core/providers/azure/azure.go`:
- Around line 1006-1008: The Azure streaming path sets resp.StreamBody = true
but still calls provider.client.Do(req, resp), which allows the unary client's
ReadTimeout to kill long-lived streams; change the call to use
provider.streamingClient.Do(req, resp) (or otherwise route requests for
SSE/EventStream/chunked responses through streamingClient) wherever
resp.StreamBody is set or where Azure TTS/event streaming is performed (look for
provider.client.Do and resp.StreamBody = true in this file) so streamingClient
is used for long-lived responses.
In `@core/providers/bedrock/bedrock.go`:
- Around line 1900-1912: The three unsupported-stream stubs SpeechStream,
Transcription, and TranscriptionStream currently call
providerUtils.NewUnsupportedOperationError(..., schemas.Bedrock) and should use
the provider alias instead; update each call to pass provider.GetProviderKey()
as the provider argument (e.g.,
providerUtils.NewUnsupportedOperationError(schemas.SpeechStreamRequest,
provider.GetProviderKey())), likewise for TranscriptionRequest and
TranscriptionStreamRequest so the returned error ExtraFields.Provider reflects
the provider's alias.
In `@core/providers/elevenlabs/elevenlabs.go`:
- Around line 327-337: The code acquires resp via fasthttp.AcquireResponse() but
returns early on the voice validation fast-path without releasing it, leaking
the pooled response; fix by ensuring resp is released before any early return
(either call fasthttp.ReleaseResponse(resp) immediately before the return in the
block that checks request.Params/VoiceConfig/Voice, or move a defer
fasthttp.ReleaseResponse(resp) right after resp is acquired) so that the pooled
response is always returned to the pool.
In `@core/providers/gemini/gemini.go`:
- Around line 448-451: The passthrough return paths are bypassing the goroutine
that defers EnsureStreamFinalizerCalled and PassthroughStream also calls
postHookSpanFinalizer explicitly then defers EnsureStreamFinalizerCalled,
causing double or missed finalization; fix by centralizing finalization: add a
single helper (e.g., finalizePassthrough(ctx) or
ensurePassthroughFinalizerCalled) that calls postHookSpanFinalizer and
EnsureStreamFinalizerCalled exactly once, call that helper from all early-return
branches where providerUtils.SetupStreamingPassthrough(...) returns true
(instead of returning early without finalizing), and in PassthroughStream remove
the duplicate explicit call or the duplicate defer so only the centralized
helper runs once (update the responseChan immediate-close paths to invoke the
helper before returning).
In `@core/providers/utils/utils.go`:
- Around line 2138-2176: ProcessAndSendError is not invoking the post-hook span
finalizer or completing deferred LLM spans, so terminal stream-error paths leave
tracing and plugin cleanup incomplete; update ProcessAndSendError to call
postHookSpanFinalizer(ctx) (if non-nil) and completeDeferredSpan(ctx) (the
helper that closes the deferred LLM span) before emitting the final error chunk
and also before returning when HandleStreamControlSkip(processedError) is true,
ensuring both normal send and early-skip paths finalize spans; use the existing
function names postHookSpanFinalizer, completeDeferredSpan, postHookRunner, ctx
and responseChan to locate where to insert those calls.
In `@core/providers/xai/xai.go`:
- Around line 108-125: TextCompletionStream is passing nil for auth headers so
completions streaming will be unauthorized; update
XAIProvider.TextCompletionStream to build and pass the same Authorization header
used by ChatCompletionStream/ResponsesStream (i.e., create a header map with
"Authorization: Bearer <key.Value()>" or use the existing helper that constructs
provider auth headers) into the call to
openai.HandleOpenAITextCompletionStreaming instead of nil, keeping all other
arguments (provider.streamingClient, networkConfig.BaseURL+"/v1/completions",
ParseXAIError, postHookRunner, postHookSpanFinalizer, provider.logger, etc.)
unchanged.
---
Nitpick comments:
In `@core/providers/elevenlabs/elevenlabs.go`:
- Line 310: The SpeechStream handler is starting its latency timer before the
HTTP/WebSocket handshake, so the reported final-chunk latency (done-chunk)
includes connection/setup time; in ElevenlabsProvider.SpeechStream move the
startTime initialization out of the setup phase and into the streaming goroutine
immediately after a successful client.Do (i.e., after the stream is established
and any handshake completes), ensure any uses that compute the done-chunk
latency reference that in-goroutine startTime, and remove or relocate the
earlier startTime assignment (the ones around client.Do and the final chunk
emission) so the latency reflects only post-handshake streaming time.
In `@core/schemas/provider.go`:
- Around line 502-506: The repeated anonymous callback type
func(context.Context) used for span finalization should be replaced with a
single exported type alias (e.g., PostHookSpanFinalizer or SpanFinalizer)
declared near PostHookRunner; add the new type declaration and then update the
method signatures (starting with TextCompletionStream and the other streaming
methods referenced) and their doc comments to use that exported type instead of
func(context.Context) so the contract is centralized and easy to update later.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 55acd18f-bd84-4e61-b5b9-c0a449f1193e
📒 Files selected for processing (32)
core/bifrost.gocore/providers/anthropic/anthropic.gocore/providers/azure/azure.gocore/providers/bedrock/bedrock.gocore/providers/bedrock/transport_test.gocore/providers/cerebras/cerebras.gocore/providers/cohere/cohere.gocore/providers/elevenlabs/elevenlabs.gocore/providers/fireworks/fireworks.gocore/providers/fireworks/fireworks_test.gocore/providers/gemini/gemini.gocore/providers/groq/groq.gocore/providers/huggingface/huggingface.gocore/providers/mistral/custom_provider_test.gocore/providers/mistral/mistral.gocore/providers/mistral/transcription_test.gocore/providers/nebius/nebius.gocore/providers/ollama/ollama.gocore/providers/openai/openai.gocore/providers/openrouter/openrouter.gocore/providers/parasail/parasail.gocore/providers/perplexity/perplexity.gocore/providers/replicate/replicate.gocore/providers/runway/runway.gocore/providers/sgl/sgl.gocore/providers/utils/utils.gocore/providers/utils/utils_test.gocore/providers/vertex/vertex.gocore/providers/vllm/vllm.gocore/providers/xai/xai.gocore/schemas/bifrost.gocore/schemas/provider.go
💤 Files with no reviewable changes (1)
- core/schemas/bifrost.go
🚧 Files skipped from review as they are similar to previous changes (1)
- core/bifrost.go
b9d2397 to
72c7bd0
Compare
7a5168a to
fe043ae
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (5)
core/providers/utils/utils.go (1)
2138-2176:⚠️ Potential issue | 🟡 MinorAdd StreamEndIndicator check and completeDeferredSpan call to ProcessAndSendError.
ProcessAndSendErroraccepts thepostHookSpanFinalizerparameter but never uses it. UnlikeProcessAndSendResponseandProcessAndSendBifrostError, it does not checkStreamEndIndicatoror callcompleteDeferredSpanwhen the final chunk arrives. Since providers setStreamEndIndicatorbefore callingProcessAndSendError(e.g., when stream read errors occur on final chunks), the function must include the same deferred span completion logic present in the other two functions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/utils/utils.go` around lines 2138 - 2176, ProcessAndSendError currently ignores postHookSpanFinalizer and never completes deferred spans; add the same StreamEndIndicator check and deferred-span completion used in ProcessAndSendResponse/ProcessAndSendBifrostError: after constructing streamResponse (and before sending on responseChan) inspect the StreamEndIndicator on any populated response field (BifrostResponsesStreamResponse, BifrostChatResponse, BifrostTextCompletionResponse, BifrostSpeechStreamResponse, BifrostTranscriptionStreamResponse) and if true and postHookSpanFinalizer != nil call postHookSpanFinalizer(ctx) (this is the completeDeferredSpan call), preserving the existing HandleStreamControlSkip behavior and ctx.Done() send logic.core/providers/openai/openai.go (1)
6847-6856:⚠️ Potential issue | 🟠 MajorUse OpenAI URL builders for passthrough routes (ChatGPTOAuth compatibility).
Both passthrough paths still build URLs via string concatenation, which can bypass
/v1normalization/remapping when ChatGPTOAuth is enabled.💡 Suggested fix
- url := provider.networkConfig.BaseURL + "/v1" + path + url := provider.buildFullURL("/v1" + path) if req.RawQuery != "" { url += "?" + req.RawQuery }Apply the same change in both
PassthroughandPassthroughStream.Based on learnings: In core/providers/openai, when ChatGPTOAuth is enabled, all
/v1/...routes (including Passthrough/PassthroughStream) must go throughbuildRequestURL(...)/buildFullURL(...)normalization.Also applies to: 6924-6931
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/openai/openai.go` around lines 6847 - 6856, The Passthrough and PassthroughStream handlers currently build the upstream URL via string concatenation (using provider.networkConfig.BaseURL + "/v1" + path and appending req.RawQuery), which bypasses the OpenAI URL normalization used for ChatGPTOAuth; update both handlers (functions Passthrough and PassthroughStream) to call the existing URL helpers (buildRequestURL or buildFullURL — whichever is used elsewhere for `/v1` normalization) passing req.Path and req.RawQuery (and provider.networkConfig.BaseURL) so all `/v1/...` routes go through the same normalization/remapping logic instead of manual string assembly.core/providers/xai/xai.go (1)
108-125:⚠️ Potential issue | 🔴 CriticalPass the xAI auth header into text-completion streaming.
HandleOpenAITextCompletionStreamingonly gets the explicit header map here, but this method always passesnil, so the stream request is sent withoutAuthorization. Mirror the chat/responses paths and populate the Bearer header fromkey.Value.Suggested fix
func (provider *XAIProvider) TextCompletionStream(ctx *schemas.BifrostContext, postHookRunner schemas.PostHookRunner, postHookSpanFinalizer func(context.Context), key schemas.Key, request *schemas.BifrostTextCompletionRequest) (chan *schemas.BifrostStreamChunk, *schemas.BifrostError) { + var authHeader map[string]string + if key.Value.GetValue() != "" { + authHeader = map[string]string{"Authorization": "Bearer " + key.Value.GetValue()} + } return openai.HandleOpenAITextCompletionStreaming( ctx, provider.streamingClient, provider.networkConfig.BaseURL+"/v1/completions", request, - nil, + authHeader, provider.networkConfig.ExtraHeaders, providerUtils.ShouldSendBackRawRequest(ctx, provider.sendBackRawRequest), providerUtils.ShouldSendBackRawResponse(ctx, provider.sendBackRawResponse), provider.GetProviderKey(),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/xai/xai.go` around lines 108 - 125, TextCompletionStream is calling HandleOpenAITextCompletionStreaming with a nil explicit headers map so the streaming request lacks the xAI Authorization header; populate the explicit headers by merging provider.networkConfig.ExtraHeaders with an "Authorization" : "Bearer "+key.Value (same approach used in chat/response paths) before calling HandleOpenAITextCompletionStreaming, keeping the rest of the parameters (provider.GetProviderKey, ParseXAIError, postHookRunner, etc.) unchanged.core/providers/azure/azure.go (1)
1006-1008:⚠️ Potential issue | 🟠 MajorUse
provider.streamingClientforSpeechStream.This request still goes through the unary client, so
ReadTimeout/WriteTimeoutcan cut off long-lived Azure speech streams before the idle-timeout and cancellation guards do their job. That leaves this*Streampath outside the stack’s streaming-client fix.Suggested fix
- requestErr := provider.client.Do(req, resp) + requestErr := provider.streamingClient.Do(req, resp)As per coding guidelines, use the streaming client variant (
provider.streamingClient) for all SSE / EventStream / chunked response paths and directDocalls within*Streammethods to prevent streams from being killed by fasthttp's whole-response deadline.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/azure/azure.go` around lines 1006 - 1008, In the SpeechStream path replace the unary HTTP client call with the streaming client: change usages of provider.client.Do(req, resp) inside the SpeechStream (and any SSE/EventStream/chunked response handlers in azure.go) to provider.streamingClient.Do(req, resp) so the request uses the streaming client variant (provider.streamingClient) which respects long-lived chunked/event streams and avoids fasthttp whole-response timeouts; ensure the response handler and error handling remain compatible with streamingClient.Do and that any other direct Do calls in *Stream methods are switched similarly.core/providers/mistral/mistral.go (1)
584-606:⚠️ Potential issue | 🟠 MajorRegister the stream finalizer as the first defer to ensure it executes last.
The
EnsureStreamFinalizerCalleddefer at line 117 is registered last in the goroutine, causing it to execute first by Go's LIFO order. This finalizes the post-hook spans beforeHandleStreamCancellation/HandleStreamTimeoutrun on canceled or timed-out transcription streams, violating the callback contract. Move it to the top of the goroutine (immediately aftergo func() {at line 95), matching the pattern used in OpenAI'sHandleOpenAITranscriptionStreamRequest.Suggested fix
go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx, postHookSpanFinalizer) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer) } else if ctx.Err() == context.DeadlineExceeded { providerUtils.HandleStreamTimeout(ctx, postHookRunner, responseChan, provider.logger, postHookSpanFinalizer) } close(responseChan) }() defer providerUtils.ReleaseStreamingResponse(resp)stopCancellation := providerUtils.SetupStreamCancellation(ctx, resp.BodyStream(), provider.logger) defer stopCancellation() - defer providerUtils.EnsureStreamFinalizerCalled(ctx, postHookSpanFinalizer) sseReader := providerUtils.GetSSEEventReader(ctx, reader)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/providers/mistral/mistral.go` around lines 584 - 606, The stream finalizer defer is registered last so it runs first; move the call to providerUtils.EnsureStreamFinalizerCalled(ctx, postHookSpanFinalizer) to be the very first defer inside the goroutine (immediately after "go func() {") so it is the first defer declared and thus executed last, ensuring postHookSpanFinalizer is finalized after providerUtils.HandleStreamCancellation and providerUtils.HandleStreamTimeout; keep all other defers (ReleaseStreamingResponse, DecompressStreamBody's releaseGzip, stopIdleTimeout, stopCancellation, and the closure that calls HandleStreamCancellation/HandleStreamTimeout and close(responseChan)) in the same order below it.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@core/providers/azure/azure.go`:
- Around line 1006-1008: In the SpeechStream path replace the unary HTTP client
call with the streaming client: change usages of provider.client.Do(req, resp)
inside the SpeechStream (and any SSE/EventStream/chunked response handlers in
azure.go) to provider.streamingClient.Do(req, resp) so the request uses the
streaming client variant (provider.streamingClient) which respects long-lived
chunked/event streams and avoids fasthttp whole-response timeouts; ensure the
response handler and error handling remain compatible with streamingClient.Do
and that any other direct Do calls in *Stream methods are switched similarly.
In `@core/providers/mistral/mistral.go`:
- Around line 584-606: The stream finalizer defer is registered last so it runs
first; move the call to providerUtils.EnsureStreamFinalizerCalled(ctx,
postHookSpanFinalizer) to be the very first defer inside the goroutine
(immediately after "go func() {") so it is the first defer declared and thus
executed last, ensuring postHookSpanFinalizer is finalized after
providerUtils.HandleStreamCancellation and providerUtils.HandleStreamTimeout;
keep all other defers (ReleaseStreamingResponse, DecompressStreamBody's
releaseGzip, stopIdleTimeout, stopCancellation, and the closure that calls
HandleStreamCancellation/HandleStreamTimeout and close(responseChan)) in the
same order below it.
In `@core/providers/openai/openai.go`:
- Around line 6847-6856: The Passthrough and PassthroughStream handlers
currently build the upstream URL via string concatenation (using
provider.networkConfig.BaseURL + "/v1" + path and appending req.RawQuery), which
bypasses the OpenAI URL normalization used for ChatGPTOAuth; update both
handlers (functions Passthrough and PassthroughStream) to call the existing URL
helpers (buildRequestURL or buildFullURL — whichever is used elsewhere for `/v1`
normalization) passing req.Path and req.RawQuery (and
provider.networkConfig.BaseURL) so all `/v1/...` routes go through the same
normalization/remapping logic instead of manual string assembly.
In `@core/providers/utils/utils.go`:
- Around line 2138-2176: ProcessAndSendError currently ignores
postHookSpanFinalizer and never completes deferred spans; add the same
StreamEndIndicator check and deferred-span completion used in
ProcessAndSendResponse/ProcessAndSendBifrostError: after constructing
streamResponse (and before sending on responseChan) inspect the
StreamEndIndicator on any populated response field
(BifrostResponsesStreamResponse, BifrostChatResponse,
BifrostTextCompletionResponse, BifrostSpeechStreamResponse,
BifrostTranscriptionStreamResponse) and if true and postHookSpanFinalizer != nil
call postHookSpanFinalizer(ctx) (this is the completeDeferredSpan call),
preserving the existing HandleStreamControlSkip behavior and ctx.Done() send
logic.
In `@core/providers/xai/xai.go`:
- Around line 108-125: TextCompletionStream is calling
HandleOpenAITextCompletionStreaming with a nil explicit headers map so the
streaming request lacks the xAI Authorization header; populate the explicit
headers by merging provider.networkConfig.ExtraHeaders with an "Authorization" :
"Bearer "+key.Value (same approach used in chat/response paths) before calling
HandleOpenAITextCompletionStreaming, keeping the rest of the parameters
(provider.GetProviderKey, ParseXAIError, postHookRunner, etc.) unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ad3d0a0e-e5fe-4b1e-85ad-586d572d4f83
📒 Files selected for processing (34)
AGENTS.mdcore/bifrost.gocore/pluginpipelinerace_test.gocore/providers/anthropic/anthropic.gocore/providers/azure/azure.gocore/providers/bedrock/bedrock.gocore/providers/bedrock/transport_test.gocore/providers/cerebras/cerebras.gocore/providers/cohere/cohere.gocore/providers/elevenlabs/elevenlabs.gocore/providers/fireworks/fireworks.gocore/providers/fireworks/fireworks_test.gocore/providers/gemini/gemini.gocore/providers/groq/groq.gocore/providers/huggingface/huggingface.gocore/providers/mistral/custom_provider_test.gocore/providers/mistral/mistral.gocore/providers/mistral/transcription_test.gocore/providers/nebius/nebius.gocore/providers/ollama/ollama.gocore/providers/openai/openai.gocore/providers/openrouter/openrouter.gocore/providers/parasail/parasail.gocore/providers/perplexity/perplexity.gocore/providers/replicate/replicate.gocore/providers/runway/runway.gocore/providers/sgl/sgl.gocore/providers/utils/utils.gocore/providers/utils/utils_test.gocore/providers/vertex/vertex.gocore/providers/vllm/vllm.gocore/providers/xai/xai.gocore/schemas/bifrost.gocore/schemas/provider.go
💤 Files with no reviewable changes (1)
- core/schemas/bifrost.go
✅ Files skipped from review due to trivial changes (4)
- core/providers/utils/utils_test.go
- AGENTS.md
- core/providers/huggingface/huggingface.go
- core/providers/parasail/parasail.go
🚧 Files skipped from review as they are similar to previous changes (19)
- core/providers/fireworks/fireworks_test.go
- core/providers/mistral/custom_provider_test.go
- core/providers/mistral/transcription_test.go
- core/pluginpipelinerace_test.go
- core/bifrost.go
- core/providers/groq/groq.go
- core/providers/runway/runway.go
- core/schemas/provider.go
- core/providers/fireworks/fireworks.go
- core/providers/gemini/gemini.go
- core/providers/openrouter/openrouter.go
- core/providers/sgl/sgl.go
- core/providers/nebius/nebius.go
- core/providers/ollama/ollama.go
- core/providers/cohere/cohere.go
- core/providers/elevenlabs/elevenlabs.go
- core/providers/vllm/vllm.go
- core/providers/perplexity/perplexity.go
- core/providers/bedrock/bedrock.go
Merge activity
|
The base branch was changed.
## Summary Fixes a production panic (`fatal error: concurrent map read and map write`) in `PluginPipeline` during streaming requests. Per-chunk writers (`accumulatePluginTiming`) run in the provider goroutine while the end-of-stream finalizer (`FinalizeStreamingPostHookSpans`) and pool-release path (`resetPluginPipeline`) can run concurrently on a different goroutine, causing unsynchronised access to the shared `postHookTimings` map and related fields. Additionally fixes a related bug where the streaming plugin pipeline and its `postHookRunner`/finalizer were allocated once outside the retry loop, meaning a retry triggered by `CheckFirstStreamChunkForError` could run against a pipeline already returned to the pool by the previous attempt's deferred finalizer. ## Changes - Added `streamingMu sync.Mutex` to `PluginPipeline` to guard all streaming accumulator fields (`postHookTimings`, `postHookPluginOrder`, `chunkCount`, `logger`, `tracer`). - `accumulatePluginTiming`, `FinalizeStreamingPostHookSpans`, `GetChunkCount`, and `resetPluginPipeline` now acquire `streamingMu` before accessing those fields. - `FinalizeStreamingPostHookSpans` snapshots accumulator state under the lock and performs tracer span I/O unlocked, avoiding stalling chunk writers on span operations. - Moved streaming pipeline allocation, `postHookRunner` construction, and `finalizerOnce`/`postHookSpanFinalizer` setup inside the `executeRequestWithRetries` handler closure so each retry attempt gets a fresh, independent pipeline. Pipelines from failed attempts are released immediately when stream setup fails before a provider goroutine starts. - `resetPluginPipeline` now nils `llmPlugins`, `mcpPlugins`, `logger`, and `tracer` under the mutex for GC hygiene when plugins are hot-swapped, and calls `clear()` on `postHookPluginOrder` before truncating to avoid retaining live string references in the backing array. - Added `pluginpipelinerace_test.go` with a `-race`-detector test (`TestPluginPipelineStreamingRace`) that concurrently hammers `accumulatePluginTiming`, `FinalizeStreamingPostHookSpans`, `resetPluginPipeline`, and `GetChunkCount` to reproduce and verify the fix for the original panic. - Updated `AGENTS.md` to document the Go filename convention (no underscores except `_test.go`). ## Type of change - [x] Bug fix ## Affected areas - [x] Core (Go) - [x] Plugins ## How to test ```sh # Verify the race condition is resolved go test -race -run TestPluginPipelineStreamingRace ./core/... # Full test suite go test ./... ``` ## Screenshots/Recordings N/A ## Breaking changes - [x] No ## Related issues N/A ## Security considerations None. Changes are limited to internal synchronisation of in-process pipeline state. ## Checklist - [ ] I read `docs/contributing/README.md` and followed the guidelines - [x] I added/updated tests where appropriate - [ ] I updated documentation where needed - [x] I verified builds succeed (Go and UI) - [ ] I verified the CI pipeline passes locally if applicable

Summary
Fixes a production panic (
fatal error: concurrent map read and map write) inPluginPipelineduring streaming requests. Per-chunk writers (accumulatePluginTiming) run in the provider goroutine while the end-of-stream finalizer (FinalizeStreamingPostHookSpans) and pool-release path (resetPluginPipeline) can run concurrently on a different goroutine, causing unsynchronised access to the sharedpostHookTimingsmap and related fields.Additionally fixes a related bug where the streaming plugin pipeline and its
postHookRunner/finalizer were allocated once outside the retry loop, meaning a retry triggered byCheckFirstStreamChunkForErrorcould run against a pipeline already returned to the pool by the previous attempt's deferred finalizer.Changes
streamingMu sync.MutextoPluginPipelineto guard all streaming accumulator fields (postHookTimings,postHookPluginOrder,chunkCount,logger,tracer).accumulatePluginTiming,FinalizeStreamingPostHookSpans,GetChunkCount, andresetPluginPipelinenow acquirestreamingMubefore accessing those fields.FinalizeStreamingPostHookSpanssnapshots accumulator state under the lock and performs tracer span I/O unlocked, avoiding stalling chunk writers on span operations.postHookRunnerconstruction, andfinalizerOnce/postHookSpanFinalizersetup inside theexecuteRequestWithRetrieshandler closure so each retry attempt gets a fresh, independent pipeline. Pipelines from failed attempts are released immediately when stream setup fails before a provider goroutine starts.resetPluginPipelinenow nilsllmPlugins,mcpPlugins,logger, andtracerunder the mutex for GC hygiene when plugins are hot-swapped, and callsclear()onpostHookPluginOrderbefore truncating to avoid retaining live string references in the backing array.pluginpipelinerace_test.gowith a-race-detector test (TestPluginPipelineStreamingRace) that concurrently hammersaccumulatePluginTiming,FinalizeStreamingPostHookSpans,resetPluginPipeline, andGetChunkCountto reproduce and verify the fix for the original panic.AGENTS.mdto document the Go filename convention (no underscores except_test.go).Type of change
Affected areas
How to test
Screenshots/Recordings
N/A
Breaking changes
Related issues
N/A
Security considerations
None. Changes are limited to internal synchronisation of in-process pipeline state.
Checklist
docs/contributing/README.mdand followed the guidelines