diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index ec3ffbddca..e48994c743 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -32,7 +32,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 diff --git a/.github/workflows/pr-tests.yml b/.github/workflows/pr-tests.yml index 3fbdaa4203..adb43b95f8 100644 --- a/.github/workflows/pr-tests.yml +++ b/.github/workflows/pr-tests.yml @@ -77,7 +77,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 diff --git a/.github/workflows/release-cli.yml b/.github/workflows/release-cli.yml index 327b06d99c..b843195885 100644 --- a/.github/workflows/release-cli.yml +++ b/.github/workflows/release-cli.yml @@ -67,7 +67,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Run CLI tests working-directory: cli @@ -97,7 +97,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Configure Git run: | diff --git a/.github/workflows/release-pipeline.yml b/.github/workflows/release-pipeline.yml index 41280f6678..987d5294d9 100644 --- a/.github/workflows/release-pipeline.yml +++ b/.github/workflows/release-pipeline.yml @@ -137,7 +137,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -254,7 +254,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Docker Compose run: | @@ -356,7 +356,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Docker Compose run: | @@ -452,7 +452,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -519,7 +519,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -602,7 +602,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -693,7 +693,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -802,7 +802,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -897,7 +897,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -995,7 +995,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Configure Git run: | @@ -1114,7 +1114,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -1231,7 +1231,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -1295,7 +1295,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 @@ -1364,7 +1364,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Set up Node.js uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0 diff --git a/.github/workflows/snyk.yml b/.github/workflows/snyk.yml index 3a0cf24fd2..cfb3fe7b82 100644 --- a/.github/workflows/snyk.yml +++ b/.github/workflows/snyk.yml @@ -61,7 +61,7 @@ jobs: - name: Setup Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Setup Go workspace run: make setup-workspace @@ -134,7 +134,7 @@ jobs: - name: Setup Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.26.2" + go-version: "1.26.1" - name: Setup Go workspace run: make setup-workspace diff --git a/README.md b/README.md index 9e843c58fc..f957c58ee1 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,6 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/maximhq/bifrost/core)](https://goreportcard.com/report/github.com/maximhq/bifrost/core) [![Discord badge](https://dcbadge.limes.pink/api/server/https://discord.gg/exN5KAydbU?style=flat)](https://discord.gg/exN5KAydbU) -[![Known Vulnerabilities](https://snyk.io/test/github/maximhq/bifrost/badge.svg)](https://snyk.io/test/github/maximhq/bifrost) [![codecov](https://codecov.io/gh/maximhq/bifrost/branch/main/graph/badge.svg)](https://codecov.io/gh/maximhq/bifrost) ![Docker Pulls](https://img.shields.io/docker/pulls/maximhq/bifrost) [Run In Postman](https://app.getpostman.com/run-collection/31642484-2ba0e658-4dcd-49f4-845a-0c7ed745b916?action=collection%2Ffork&source=rip_markdown&collection-url=entityId%3D31642484-2ba0e658-4dcd-49f4-845a-0c7ed745b916%26entityType%3Dcollection%26workspaceId%3D63e853c8-9aec-477f-909c-7f02f543150e) diff --git a/cli/go.mod b/cli/go.mod index c887aac505..f7260a3b86 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/cli -go 1.26.2 +go 1.26.1 require ( github.com/bytedance/sonic v1.15.0 diff --git a/core/bifrost.go b/core/bifrost.go index 17543d4cbe..a386c7393a 100644 --- a/core/bifrost.go +++ b/core/bifrost.go @@ -4689,8 +4689,16 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem // shared processedResponse or processedError objects. streamResponse := providerUtils.BuildClientStreamChunk(ctx, processedResponse, processedError) - // Send the processed message to the output stream - outputStream <- streamResponse + // Guarded send: if the consumer abandons outputStream (client + // disconnect, ctx cancel), drain the upstream shortCircuit.Stream + // so its producer can exit cleanly instead of blocking on its send. + select { + case outputStream <- streamResponse: + case <-ctx.Done(): + for range shortCircuit.Stream { + } + return + } //TODO: Release the processed response immediately after use } @@ -4952,7 +4960,7 @@ func executeRequestWithRetries[T any]( // the SSE stream instead of returning proper HTTP error status codes. if bifrostError == nil { if streamChan, ok := any(result).(chan *schemas.BifrostStreamChunk); ok { - checkedStream, drainDone, firstChunkErr := providerUtils.CheckFirstStreamChunkForError(streamChan) + checkedStream, drainDone, firstChunkErr := providerUtils.CheckFirstStreamChunkForError(ctx, streamChan) if firstChunkErr != nil { <-drainDone bifrostError = firstChunkErr @@ -5216,12 +5224,16 @@ func (bifrost *Bifrost) requestWorker(provider schemas.Provider, config *schemas } return resp, nil } - // Store a finalizer callback to create aggregated post-hook spans at stream end - // This closure captures the pipeline reference and releases it after finalization + // Store a finalizer callback to create aggregated post-hook spans at stream end. + // Wrapped in sync.Once so the normal end-of-stream invocation and a deferred + // safety-net invocation (e.g. from a provider goroutine's panic path) cannot + // double-release the pipeline. + var finalizerOnce sync.Once postHookSpanFinalizer := func(ctx context.Context) { - pipeline.FinalizeStreamingPostHookSpans(ctx) - // Release the pipeline AFTER finalizing spans (not before streaming completes) - bifrost.releasePluginPipeline(pipeline) + finalizerOnce.Do(func() { + pipeline.FinalizeStreamingPostHookSpans(ctx) + bifrost.releasePluginPipeline(pipeline) + }) } req.Context.SetValue(schemas.BifrostContextKeyPostHookSpanFinalizer, postHookSpanFinalizer) } diff --git a/core/go.mod b/core/go.mod index 924e204ea2..b85c403ec6 100644 --- a/core/go.mod +++ b/core/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/core -go 1.26.2 +go 1.26.1 require ( cloud.google.com/go v0.123.0 diff --git a/core/providers/anthropic/anthropic.go b/core/providers/anthropic/anthropic.go index ca23445d82..e012f3a13f 100644 --- a/core/providers/anthropic/anthropic.go +++ b/core/providers/anthropic/anthropic.go @@ -701,6 +701,7 @@ func HandleAnthropicChatCompletionStreaming( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { model := "unknown" if meta != nil { @@ -1187,6 +1188,7 @@ func HandleAnthropicResponsesStream( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { model := "" if meta != nil { @@ -1227,6 +1229,7 @@ func HandleAnthropicResponsesStream( stopCancellation := providerUtils.SetupStreamCancellation(ctx, resp.BodyStream(), logger) defer stopCancellation() + sseReader := providerUtils.GetSSEEventReader(ctx, reader) chunkIndex := 0 @@ -2786,6 +2789,7 @@ func (provider *AnthropicProvider) PassthroughStream( ch := make(chan *schemas.BifrostStreamChunk, schemas.DefaultStreamBufferSize) go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, ch, provider.GetProviderKey(), req.Model, schemas.PassthroughStreamRequest, provider.logger) @@ -2798,6 +2802,7 @@ func (provider *AnthropicProvider) PassthroughStream( defer stopIdleTimeout() defer stopCancellation() + buf := make([]byte, 4096) for { n, readErr := bodyStream.Read(buf) diff --git a/core/providers/anthropic/chat.go b/core/providers/anthropic/chat.go index 125a6a7ee1..a72ffbe024 100644 --- a/core/providers/anthropic/chat.go +++ b/core/providers/anthropic/chat.go @@ -410,8 +410,16 @@ func ToAnthropicChatRequest(ctx *schemas.BifrostContext, bifrostReq *schemas.Bif // (3) Custom tool (tool.Custom != nil) — not currently forwarded // to Anthropic; skipped. if bifrostReq.Params.Tools != nil { - tools := make([]AnthropicTool, 0, len(bifrostReq.Params.Tools)) - for _, tool := range bifrostReq.Params.Tools { + // Strip server tools the target provider doesn't support per + // ProviderFeatures (e.g. web_search on Vertex's non-supporting + // model variants, or MCP on Bedrock when this converter is used + // by non-Bedrock providers). Function/custom tools are always + // kept. The dropped set is discarded — "silent strip + continue" + // policy per user direction. See Bedrock's convertToolConfig for + // the direct-Bedrock-path equivalent. + filtered, _ := ValidateChatToolsForProvider(bifrostReq.Params.Tools, bifrostReq.Provider) + tools := make([]AnthropicTool, 0, len(filtered)) + for _, tool := range filtered { if tool.Function != nil { tools = append(tools, convertFunctionToolToAnthropic(tool)) continue diff --git a/core/providers/anthropic/utils.go b/core/providers/anthropic/utils.go index 62e6a311b5..dbde522d6f 100644 --- a/core/providers/anthropic/utils.go +++ b/core/providers/anthropic/utils.go @@ -14,6 +14,77 @@ import ( "github.com/maximhq/bifrost/core/schemas" ) +// anthropicToolTypePrefixToFeature maps Anthropic server-tool type prefixes +// to the corresponding ProviderFeatureSupport flag. Mirrors the structure of +// betaHeaderPrefixToFeature (defined later in this file) so tool-type gating +// and beta-header gating share the same shape. +// +// Prefix-based so future version bumps (e.g. web_search_20261231) flow +// through without a code change. Exact-match types (currently just +// "mcp_toolset") are handled separately. +var anthropicToolTypePrefixToFeature = map[string]func(ProviderFeatureSupport) bool{ + "web_search_": func(f ProviderFeatureSupport) bool { return f.WebSearch }, + "web_fetch_": func(f ProviderFeatureSupport) bool { return f.WebFetch }, + "code_execution_": func(f ProviderFeatureSupport) bool { return f.CodeExecution }, + "computer_": func(f ProviderFeatureSupport) bool { return f.ComputerUse }, + "bash_": func(f ProviderFeatureSupport) bool { return f.Bash }, + "memory_": func(f ProviderFeatureSupport) bool { return f.Memory }, + "text_editor_": func(f ProviderFeatureSupport) bool { return f.TextEditor }, + "tool_search_tool_": func(f ProviderFeatureSupport) bool { return f.ToolSearch }, +} + +// isAnthropicServerToolSupported returns whether the given Anthropic server-tool +// type string is supported by the provider's ProviderFeatureSupport. Unknown +// types return true (forward-compat: let the provider reject if truly invalid +// rather than Bifrost dropping a tool Anthropic has just added). +func isAnthropicServerToolSupported(toolType string, features ProviderFeatureSupport) bool { + // Exact-match types first. + if toolType == "mcp_toolset" { + return features.MCP + } + // Prefix match for versioned types. + for prefix, check := range anthropicToolTypePrefixToFeature { + if strings.HasPrefix(toolType, prefix) { + return check(features) + } + } + return true +} + +// ValidateChatToolsForProvider is the chat-path mirror of +// ValidateToolsForProvider. It partitions []schemas.ChatTool into a keep-set +// (function/custom tools + server tools supported on the target provider) +// and a dropped-set (server-tool Type strings the provider doesn't support +// per ProviderFeatures). +// +// Does NOT mutate its input. Callers decide the policy (silent strip vs +// fail-fast). The Bedrock ChatCompletion path uses silent strip so the +// request still reaches the provider without the unsupported tool; the model +// responds with a prose completion instead of tool use. +// +// Unknown providers keep all tools (safe default for custom providers), +// matching ValidateToolsForProvider. +func ValidateChatToolsForProvider(tools []schemas.ChatTool, provider schemas.ModelProvider) (keep []schemas.ChatTool, dropped []string) { + features, ok := ProviderFeatures[provider] + if !ok { + return tools, nil + } + for _, tool := range tools { + // Function/custom tools are universal — always keep. + if tool.Function != nil || tool.Custom != nil { + keep = append(keep, tool) + continue + } + t := string(tool.Type) + if isAnthropicServerToolSupported(t, features) { + keep = append(keep, tool) + } else { + dropped = append(dropped, t) + } + } + return keep, dropped +} + // ValidateToolsForProvider checks if all tools in the request are supported by the given provider. // Returns an error for the first unsupported tool found. func ValidateToolsForProvider(tools []schemas.ResponsesTool, provider schemas.ModelProvider) error { diff --git a/core/providers/anthropic/validate_chat_tools_test.go b/core/providers/anthropic/validate_chat_tools_test.go new file mode 100644 index 0000000000..d9f0c8a2df --- /dev/null +++ b/core/providers/anthropic/validate_chat_tools_test.go @@ -0,0 +1,138 @@ +package anthropic + +import ( + "testing" + + "github.com/maximhq/bifrost/core/schemas" +) + +// TestValidateChatToolsForProvider locks in the partition: +// function/custom tools always survive; server tools survive only when the +// target provider's ProviderFeatures flag is true for that tool type. +func TestValidateChatToolsForProvider(t *testing.T) { + fnTool := schemas.ChatTool{ + Type: schemas.ChatToolTypeFunction, + Function: &schemas.ChatToolFunction{Name: "get_weather"}, + } + serverTool := func(tpe, name string) schemas.ChatTool { + return schemas.ChatTool{Type: schemas.ChatToolType(tpe), Name: name} + } + + cases := []struct { + name string + provider schemas.ModelProvider + input []schemas.ChatTool + wantKeep int + wantDropped []string + assertNotes string + }{ + { + name: "function tools always survive on any provider", + provider: schemas.Bedrock, + input: []schemas.ChatTool{fnTool, fnTool}, + wantKeep: 2, + }, + { + name: "bedrock drops web_search", + provider: schemas.Bedrock, + input: []schemas.ChatTool{serverTool("web_search_20260209", "web_search")}, + wantKeep: 0, + wantDropped: []string{"web_search_20260209"}, + assertNotes: "Bedrock has WebSearch=false per Table 20 (AWS user guide beta-header list + Anthropic overview)", + }, + { + name: "bedrock drops web_fetch + code_execution + mcp_toolset", + provider: schemas.Bedrock, + input: []schemas.ChatTool{ + serverTool("web_fetch_20260309", "web_fetch"), + serverTool("code_execution_20250825", "code_execution"), + serverTool("mcp_toolset", "notion"), + }, + wantKeep: 0, + wantDropped: []string{"web_fetch_20260309", "code_execution_20250825", "mcp_toolset"}, + }, + { + name: "bedrock keeps computer/bash/memory/text_editor/tool_search", + provider: schemas.Bedrock, + input: []schemas.ChatTool{ + serverTool("computer_20251124", "computer"), + serverTool("bash_20250124", "bash"), + serverTool("memory_20250818", "memory"), + serverTool("text_editor_20250728", "str_replace_based_edit_tool"), + serverTool("tool_search_tool_bm25", "tool_search_tool_bm25"), + }, + wantKeep: 5, + }, + { + name: "bedrock partial drop mixes function + server tools", + provider: schemas.Bedrock, + input: []schemas.ChatTool{ + fnTool, + serverTool("web_search_20260209", "web_search"), + serverTool("bash_20250124", "bash"), + }, + wantKeep: 2, // fnTool + bash + wantDropped: []string{"web_search_20260209"}, + }, + { + name: "vertex drops web_fetch", + provider: schemas.Vertex, + input: []schemas.ChatTool{serverTool("web_fetch_20260309", "web_fetch")}, + wantKeep: 0, + wantDropped: []string{"web_fetch_20260309"}, + assertNotes: "Vertex has WebFetch=false per Table 20", + }, + { + name: "vertex drops mcp_toolset", + provider: schemas.Vertex, + input: []schemas.ChatTool{serverTool("mcp_toolset", "notion")}, + wantKeep: 0, + wantDropped: []string{"mcp_toolset"}, + assertNotes: "Vertex has MCP=false per MCP-excl (explicit exclusion in Anthropic docs)", + }, + { + name: "anthropic keeps everything", + provider: schemas.Anthropic, + input: []schemas.ChatTool{ + serverTool("web_search_20260209", "web_search"), + serverTool("web_fetch_20260309", "web_fetch"), + serverTool("code_execution_20250825", "code_execution"), + serverTool("mcp_toolset", "x"), + serverTool("computer_20251124", "computer"), + }, + wantKeep: 5, + }, + { + name: "unknown provider keeps everything (forward-compat)", + provider: schemas.ModelProvider("custom-new-provider"), + input: []schemas.ChatTool{serverTool("web_search_20260209", "web_search")}, + wantKeep: 1, + }, + { + name: "unknown tool type on known provider is kept (forward-compat)", + provider: schemas.Bedrock, + input: []schemas.ChatTool{serverTool("future_tool_20270101", "future")}, + wantKeep: 1, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + keep, dropped := ValidateChatToolsForProvider(tc.input, tc.provider) + if len(keep) != tc.wantKeep { + t.Errorf("keep count: got %d, want %d (%s)", len(keep), tc.wantKeep, tc.assertNotes) + } + if len(dropped) != len(tc.wantDropped) { + t.Errorf("dropped count: got %v, want %v", dropped, tc.wantDropped) + } + for i, d := range tc.wantDropped { + if i >= len(dropped) { + break + } + if dropped[i] != d { + t.Errorf("dropped[%d]: got %q, want %q", i, dropped[i], d) + } + } + }) + } +} diff --git a/core/providers/azure/azure.go b/core/providers/azure/azure.go index 9649caa662..9d7c3063d7 100644 --- a/core/providers/azure/azure.go +++ b/core/providers/azure/azure.go @@ -1207,6 +1207,7 @@ func (provider *AzureProvider) SpeechStream(ctx *schemas.BifrostContext, postHoo // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, provider.GetProviderKey(), request.Model, schemas.SpeechStreamRequest, provider.logger) @@ -3298,6 +3299,7 @@ func (provider *AzureProvider) PassthroughStream( ch := make(chan *schemas.BifrostStreamChunk, schemas.DefaultStreamBufferSize) go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, ch, provider.GetProviderKey(), req.Model, schemas.PassthroughStreamRequest, provider.logger) diff --git a/core/providers/bedrock/bedrock.go b/core/providers/bedrock/bedrock.go index 3cddc86340..2a181068a2 100644 --- a/core/providers/bedrock/bedrock.go +++ b/core/providers/bedrock/bedrock.go @@ -949,6 +949,7 @@ func (provider *BedrockProvider) TextCompletionStream(ctx *schemas.BifrostContex // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.TextCompletionStreamRequest, provider.logger) @@ -1204,6 +1205,7 @@ func (provider *BedrockProvider) ChatCompletionStream(ctx *schemas.BifrostContex // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ChatCompletionStreamRequest, provider.logger) @@ -1592,6 +1594,7 @@ func (provider *BedrockProvider) ResponsesStream(ctx *schemas.BifrostContext, po // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ResponsesStreamRequest, provider.logger) diff --git a/core/providers/bedrock/convert_tool_config_test.go b/core/providers/bedrock/convert_tool_config_test.go new file mode 100644 index 0000000000..fc417394e5 --- /dev/null +++ b/core/providers/bedrock/convert_tool_config_test.go @@ -0,0 +1,477 @@ +package bedrock + +import ( + "context" + "encoding/json" + "strings" + "testing" + + "github.com/maximhq/bifrost/core/schemas" +) + +// TestConvertToolConfig_DropsServerToolsOnBedrock locks in the bug fix from +// the user-reported repro: sending `web_search_20260209` via the OpenAI- +// compatible /v1/chat/completions endpoint to Bedrock was producing a +// malformed ToolConfig that Bedrock rejected with 400 "The provided request +// is not valid". The fix strips unsupported server tools before the +// conversion loop so the outbound request is valid. +func TestConvertToolConfig_DropsServerToolsOnBedrock(t *testing.T) { + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolTypeFunction, + Function: &schemas.ChatToolFunction{ + Name: "get_weather", + Description: schemas.Ptr("Get weather by city"), + Parameters: &schemas.ToolFunctionParameters{ + Type: "object", + }, + }, + }, + { + // Server tool — Bedrock doesn't support web_search per Table 20. + // Should be stripped silently. + Type: schemas.ChatToolType("web_search_20260209"), + Name: "web_search", + }, + }, + } + + cfg := convertToolConfig("global.anthropic.claude-sonnet-4-6", params) + if cfg == nil { + t.Fatalf("expected ToolConfig, got nil (function tool should have survived)") + } + if len(cfg.Tools) != 1 { + t.Fatalf("expected exactly 1 tool (function), got %d: %+v", len(cfg.Tools), cfg.Tools) + } + if cfg.Tools[0].ToolSpec == nil || cfg.Tools[0].ToolSpec.Name != "get_weather" { + t.Errorf("expected function tool 'get_weather' to survive, got %+v", cfg.Tools[0]) + } +} + +// TestConvertToolConfig_ReturnsNilWhenAllDropped locks in the empty-slice +// guard. Bedrock's Converse API rejects `"toolConfig": {"tools": []}` with a +// 400; when every tool is unsupported and gets stripped, convertToolConfig +// must return nil so no ToolConfig ships at all. +func TestConvertToolConfig_ReturnsNilWhenAllDropped(t *testing.T) { + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolType("web_search_20260209"), + Name: "web_search", + }, + { + Type: schemas.ChatToolType("web_fetch_20260309"), + Name: "web_fetch", + }, + { + Type: schemas.ChatToolType("code_execution_20250825"), + Name: "code_execution", + }, + }, + } + + cfg := convertToolConfig("global.anthropic.claude-sonnet-4-6", params) + if cfg != nil { + t.Fatalf("expected nil ToolConfig (all tools unsupported on Bedrock), got %+v", cfg) + } +} + +// TestConvertToolConfig_KeepsBedrockSupportedServerTools — locks in that +// Bedrock-supported server tools (bash, memory, text_editor, computer, +// tool_search) do NOT appear in Converse's typed toolConfig.tools slot — +// they must be tunneled via additionalModelRequestFields (exercised in +// TestCollectBedrockServerTools_*). If the only tool is a server tool, +// toolConfig is nil so we don't ship {"toolConfig": {"tools": []}}. +func TestConvertToolConfig_KeepsBedrockSupportedServerTools(t *testing.T) { + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolType("bash_20250124"), + Name: "bash", + }, + }, + } + + cfg := convertToolConfig("global.anthropic.claude-sonnet-4-6", params) + if cfg != nil { + t.Fatalf("expected nil toolConfig (server tools flow via additionalModelRequestFields, not toolSpec), got %+v", cfg) + } +} + +// TestCollectBedrockServerTools_BashOnly — bash is Bedrock-supported per the +// B-header list; the helper must emit it as a native-JSON tool entry with no +// derived beta header (bash has no high-confidence 1:1 beta-header mapping; +// callers rely on extra-headers for that). +func TestCollectBedrockServerTools_BashOnly(t *testing.T) { + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolType("bash_20250124"), + Name: "bash", + }, + }, + } + tools, betas := collectBedrockServerTools(params) + if len(tools) != 1 { + t.Fatalf("expected 1 server tool, got %d", len(tools)) + } + got := string(tools[0]) + if !strings.Contains(got, `"type":"bash_20250124"`) || !strings.Contains(got, `"name":"bash"`) { + t.Errorf("expected native Anthropic bash shape, got %s", got) + } + if len(betas) != 0 { + t.Errorf("expected no derived beta headers for bash (no 1:1 mapping), got %v", betas) + } +} + +// TestCollectBedrockServerTools_ComputerDerivesBeta — computer_YYYYMMDD must +// derive computer-use-YYYY-MM-DD as the beta header, gated through +// FilterBetaHeadersForProvider(Bedrock) which keeps computer-use-* headers. +func TestCollectBedrockServerTools_ComputerDerivesBeta(t *testing.T) { + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolType("computer_20251124"), + Name: "computer", + DisplayWidthPx: schemas.Ptr(1280), + DisplayHeightPx: schemas.Ptr(800), + }, + }, + } + tools, betas := collectBedrockServerTools(params) + if len(tools) != 1 { + t.Fatalf("expected 1 server tool, got %d", len(tools)) + } + if !strings.Contains(string(tools[0]), `"display_width_px":1280`) { + t.Errorf("expected computer variant fields to flow through, got %s", string(tools[0])) + } + if len(betas) != 1 || betas[0] != "computer-use-2025-11-24" { + t.Errorf("expected [computer-use-2025-11-24], got %v", betas) + } +} + +// TestCollectBedrockServerTools_MemoryDerivesContextManagement — memory +// activates via the context-management-2025-06-27 bundle on Bedrock (cite: +// anthropic/types.go:179). +func TestCollectBedrockServerTools_MemoryDerivesContextManagement(t *testing.T) { + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolType("memory_20250818"), + Name: "memory", + }, + }, + } + _, betas := collectBedrockServerTools(params) + if len(betas) != 1 || betas[0] != "context-management-2025-06-27" { + t.Errorf("expected [context-management-2025-06-27], got %v", betas) + } +} + +// TestCollectBedrockServerTools_StripsUnsupported — web_search isn't in +// Bedrock's ProviderFeatures (WebSearch=false), so ValidateChatToolsForProvider +// drops it and the helper must emit nothing. +func TestCollectBedrockServerTools_StripsUnsupported(t *testing.T) { + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolType("web_search_20260209"), + Name: "web_search", + }, + }, + } + tools, betas := collectBedrockServerTools(params) + if len(tools) != 0 { + t.Errorf("expected no server tools (web_search unsupported on Bedrock), got %d", len(tools)) + } + if len(betas) != 0 { + t.Errorf("expected no betas when all tools filtered, got %v", betas) + } +} + +// TestCollectBedrockServerTools_FunctionToolsIgnored — function/custom tools +// go through convertToolConfig, not this helper. +func TestCollectBedrockServerTools_FunctionToolsIgnored(t *testing.T) { + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolTypeFunction, + Function: &schemas.ChatToolFunction{ + Name: "get_weather", + Parameters: &schemas.ToolFunctionParameters{ + Type: "object", + }, + }, + }, + }, + } + tools, betas := collectBedrockServerTools(params) + if len(tools) != 0 || len(betas) != 0 { + t.Errorf("function tools should not flow through server-tool helper, got tools=%d betas=%v", len(tools), betas) + } +} + +// TestBuildBedrockServerToolChoice_PinnedServerTool — caller pins a kept +// server tool (computer) by name. Converse's typed toolConfig.toolChoice path +// can't carry this because toolConfig.tools doesn't include server tools; the +// existing reconciliation silently drops the pin. The tunneled path must +// emit {"type":"tool","name":"computer"} into additionalModelRequestFields. +func TestBuildBedrockServerToolChoice_PinnedServerTool(t *testing.T) { + computer := schemas.ChatTool{ + Type: schemas.ChatToolType("computer_20251124"), + Name: "computer", + DisplayWidthPx: schemas.Ptr(1280), + } + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{computer}, + ToolChoice: &schemas.ChatToolChoice{ + ChatToolChoiceStruct: &schemas.ChatToolChoiceStruct{ + Type: schemas.ChatToolChoiceTypeFunction, + Function: &schemas.ChatToolChoiceFunction{Name: "computer"}, + }, + }, + } + choice, ok := buildBedrockServerToolChoice(params, []schemas.ChatTool{computer}) + if !ok { + t.Fatalf("expected tunneled tool_choice for pinned server tool, got (nil, false)") + } + got := string(choice) + if !strings.Contains(got, `"type":"tool"`) || !strings.Contains(got, `"name":"computer"`) { + t.Errorf("expected Anthropic-native {type:tool,name:computer}, got %s", got) + } +} + +// TestBuildBedrockServerToolChoice_PinnedFunctionTool_NotTunneled — function +// tool pins stay on Converse's typed path (toolConfig.toolChoice.tool). The +// helper must not double-emit. +func TestBuildBedrockServerToolChoice_PinnedFunctionTool_NotTunneled(t *testing.T) { + fn := schemas.ChatTool{ + Type: schemas.ChatToolTypeFunction, + Function: &schemas.ChatToolFunction{ + Name: "get_weather", + Parameters: &schemas.ToolFunctionParameters{Type: "object"}, + }, + } + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{fn}, + ToolChoice: &schemas.ChatToolChoice{ + ChatToolChoiceStruct: &schemas.ChatToolChoiceStruct{ + Type: schemas.ChatToolChoiceTypeFunction, + Function: &schemas.ChatToolChoiceFunction{Name: "get_weather"}, + }, + }, + } + if _, ok := buildBedrockServerToolChoice(params, []schemas.ChatTool{fn}); ok { + t.Errorf("expected no tunneling for function-tool pin (typed Converse path handles it)") + } +} + +// TestBuildBedrockServerToolChoice_AnyWithOnlyServerTools — tool_choice:any +// with only server tools: convertToolConfig returns nil (bedrockTools empty), +// so the typed any-contract is lost. The tunneled path must emit +// {"type":"any"} to preserve the forcing semantics. +func TestBuildBedrockServerToolChoice_AnyWithOnlyServerTools(t *testing.T) { + bash := schemas.ChatTool{ + Type: schemas.ChatToolType("bash_20250124"), + Name: "bash", + } + anyStr := string(schemas.ChatToolChoiceTypeAny) + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{bash}, + ToolChoice: &schemas.ChatToolChoice{ + ChatToolChoiceStr: &anyStr, + }, + } + choice, ok := buildBedrockServerToolChoice(params, []schemas.ChatTool{bash}) + if !ok { + t.Fatalf("expected tunneled any-contract when only server tools are present, got (nil, false)") + } + got := string(choice) + if !strings.Contains(got, `"type":"any"`) { + t.Errorf("expected {type:any}, got %s", got) + } +} + +// TestBuildBedrockServerToolChoice_AnyWithFunctionTool_NotTunneled — when at +// least one function/custom tool is present, Converse's typed +// toolConfig.toolChoice.any carries the any-contract. Don't double-emit. +func TestBuildBedrockServerToolChoice_AnyWithFunctionTool_NotTunneled(t *testing.T) { + fn := schemas.ChatTool{ + Type: schemas.ChatToolTypeFunction, + Function: &schemas.ChatToolFunction{ + Name: "get_weather", + Parameters: &schemas.ToolFunctionParameters{Type: "object"}, + }, + } + bash := schemas.ChatTool{ + Type: schemas.ChatToolType("bash_20250124"), + Name: "bash", + } + anyStr := string(schemas.ChatToolChoiceTypeAny) + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{fn, bash}, + ToolChoice: &schemas.ChatToolChoice{ + ChatToolChoiceStr: &anyStr, + }, + } + if _, ok := buildBedrockServerToolChoice(params, []schemas.ChatTool{fn, bash}); ok { + t.Errorf("expected no tunneling when function/custom tool is present (typed Converse path handles any)") + } +} + +// TestBuildBedrockServerToolChoice_UnsupportedServerToolPin_NotTunneled — the +// caller pins web_search, which ValidateChatToolsForProvider strips on +// Bedrock. The pin name is absent from the filtered set; the helper must not +// fabricate a tunneled tool_choice for a tool that isn't in the request. +func TestBuildBedrockServerToolChoice_UnsupportedServerToolPin_NotTunneled(t *testing.T) { + // The caller's original request had web_search, but it's been stripped. + // We pass the filtered slice (empty for the server-tool axis) to mimic + // the convertChatParameters call path. + params := &schemas.ChatParameters{ + Tools: []schemas.ChatTool{{Type: schemas.ChatToolType("web_search_20260209"), Name: "web_search"}}, + ToolChoice: &schemas.ChatToolChoice{ + ChatToolChoiceStruct: &schemas.ChatToolChoiceStruct{ + Type: schemas.ChatToolChoiceTypeFunction, + Function: &schemas.ChatToolChoiceFunction{Name: "web_search"}, + }, + }, + } + // Filtered (post-ValidateChatToolsForProvider(Bedrock)) — web_search is dropped. + filtered := []schemas.ChatTool{} + if _, ok := buildBedrockServerToolChoice(params, filtered); ok { + t.Errorf("expected no tunneling when pinned name was stripped by provider validation") + } +} + +// TestConvertChatParameters_PinnedServerToolE2E — end-to-end verification +// that convertChatParameters composes convertToolConfig + +// collectBedrockServerTools + buildBedrockServerToolChoice such that a +// request pinning a kept server tool produces: +// - AdditionalModelRequestFields.tools containing the server tool +// - AdditionalModelRequestFields.tool_choice with Anthropic-native shape +// - ToolConfig nil (no function tools → Converse's typed path is inactive) +func TestConvertChatParameters_PinnedServerToolE2E(t *testing.T) { + bifrostReq := &schemas.BifrostChatRequest{ + Model: "global.anthropic.claude-sonnet-4-6", + Params: &schemas.ChatParameters{ + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolType("computer_20251124"), + Name: "computer", + DisplayWidthPx: schemas.Ptr(1280), + }, + }, + ToolChoice: &schemas.ChatToolChoice{ + ChatToolChoiceStruct: &schemas.ChatToolChoiceStruct{ + Type: schemas.ChatToolChoiceTypeFunction, + Function: &schemas.ChatToolChoiceFunction{Name: "computer"}, + }, + }, + }, + } + bedrockReq := &BedrockConverseRequest{} + if err := convertChatParameters(nil, bifrostReq, bedrockReq); err != nil { + t.Fatalf("convertChatParameters failed: %v", err) + } + if bedrockReq.ToolConfig != nil { + t.Errorf("expected nil ToolConfig (no function/custom tools), got %+v", bedrockReq.ToolConfig) + } + if bedrockReq.AdditionalModelRequestFields == nil { + t.Fatalf("expected AdditionalModelRequestFields to carry server-tool payload, got nil") + } + tools, ok := bedrockReq.AdditionalModelRequestFields.Get("tools") + if !ok { + t.Errorf("expected additionalModelRequestFields.tools to be set for server tool") + } else if toolsSlice, castOK := tools.([]json.RawMessage); !castOK || len(toolsSlice) != 1 { + t.Errorf("expected 1 server tool in additionalModelRequestFields.tools, got %+v", tools) + } + choice, ok := bedrockReq.AdditionalModelRequestFields.Get("tool_choice") + if !ok { + t.Fatalf("expected additionalModelRequestFields.tool_choice to carry pinned server-tool contract") + } + choiceRaw, castOK := choice.(json.RawMessage) + if !castOK { + t.Fatalf("expected tool_choice value to be json.RawMessage, got %T", choice) + } + got := string(choiceRaw) + if !strings.Contains(got, `"type":"tool"`) || !strings.Contains(got, `"name":"computer"`) { + t.Errorf("expected {type:tool,name:computer}, got %s", got) + } +} + +// TestConvertChatParameters_ResponseFormatWithPinnedServerTool_NoConflictingChoice +// locks in the fix for the "two conflicting tool-choice directives" hazard: +// when response_format forces the synthetic bf_so_* tool via +// ToolConfig.ToolChoice, the tunneled additionalModelRequestFields.tool_choice +// (which would pin a server tool) must be suppressed so Bedrock doesn't +// receive both pins in the same Converse call. Uses a Nova model since +// Anthropic models route response_format through native output_config.format +// (no synthetic tool), so the conflict only surfaces on non-Anthropic +// Bedrock targets. +func TestConvertChatParameters_ResponseFormatWithPinnedServerTool_NoConflictingChoice(t *testing.T) { + responseFormat := any(map[string]any{ + "type": "json_schema", + "json_schema": map[string]any{ + "name": "classification", + "schema": map[string]any{ + "type": "object", + "properties": map[string]any{ + "topic": map[string]any{"type": "string"}, + }, + "required": []any{"topic"}, + }, + }, + }) + + bifrostReq := &schemas.BifrostChatRequest{ + Model: "amazon.nova-pro-v1:0", + Params: &schemas.ChatParameters{ + ResponseFormat: &responseFormat, + Tools: []schemas.ChatTool{ + { + Type: schemas.ChatToolType("bash_20250124"), + Name: "bash", + }, + }, + ToolChoice: &schemas.ChatToolChoice{ + ChatToolChoiceStruct: &schemas.ChatToolChoiceStruct{ + Type: schemas.ChatToolChoiceTypeFunction, + Function: &schemas.ChatToolChoiceFunction{Name: "bash"}, + }, + }, + }, + } + + ctx := schemas.NewBifrostContext(context.Background(), schemas.NoDeadline) + bedrockReq := &BedrockConverseRequest{} + if err := convertChatParameters(ctx, bifrostReq, bedrockReq); err != nil { + t.Fatalf("convertChatParameters failed: %v", err) + } + + // Synthetic bf_so_* tool must be injected and pinned via Converse's typed path. + if bedrockReq.ToolConfig == nil { + t.Fatalf("expected ToolConfig with synthetic bf_so_* tool, got nil") + } + if bedrockReq.ToolConfig.ToolChoice == nil || bedrockReq.ToolConfig.ToolChoice.Tool == nil { + t.Fatalf("expected ToolConfig.ToolChoice.Tool to pin synthetic structured-output tool, got %+v", bedrockReq.ToolConfig.ToolChoice) + } + if !strings.HasPrefix(bedrockReq.ToolConfig.ToolChoice.Tool.Name, "bf_so_") { + t.Errorf("expected ToolConfig.ToolChoice.Tool.Name to start with bf_so_, got %q", bedrockReq.ToolConfig.ToolChoice.Tool.Name) + } + + // Server tool must still be tunneled so the model has it available. + if bedrockReq.AdditionalModelRequestFields == nil { + t.Fatalf("expected AdditionalModelRequestFields to carry tunneled server-tool payload, got nil") + } + if _, ok := bedrockReq.AdditionalModelRequestFields.Get("tools"); !ok { + t.Errorf("expected additionalModelRequestFields.tools to still carry bash server tool") + } + + // Guarded field: tunneled tool_choice MUST be absent because response_format + // forces the synthetic tool. Two tool-choice directives in the same request + // would let Bedrock pick one and silently violate the structured-output contract. + if _, ok := bedrockReq.AdditionalModelRequestFields.Get("tool_choice"); ok { + t.Errorf("expected NO additionalModelRequestFields.tool_choice when response_format pins bf_so_* (conflict hazard)") + } +} diff --git a/core/providers/bedrock/utils.go b/core/providers/bedrock/utils.go index ef3dc08146..4eb48452a0 100644 --- a/core/providers/bedrock/utils.go +++ b/core/providers/bedrock/utils.go @@ -85,11 +85,49 @@ func convertChatParameters(ctx *schemas.BifrostContext, bifrostReq *schemas.Bifr setOutputConfigField(bedrockReq.AdditionalModelRequestFields, "format", anthropicOutputFormat) } - // Convert tool config - if toolConfig := convertToolConfig(bifrostReq.Model, bifrostReq.Params); toolConfig != nil { + // Filter provider-unsupported server tools once; both convertToolConfig and + // collectBedrockServerTools consume the same filtered set, and + // buildBedrockServerToolChoice resolves pinned names against it. + filteredTools, _ := anthropic.ValidateChatToolsForProvider(bifrostReq.Params.Tools, schemas.Bedrock) + + // Convert tool config (function/custom tools → Converse toolConfig.tools). + if toolConfig := convertToolConfigFromFiltered(bifrostReq.Model, bifrostReq.Params, filteredTools); toolConfig != nil { bedrockReq.ToolConfig = toolConfig } + // Tunnel Bedrock-supported Anthropic server tools through Converse's + // additionalModelRequestFields (model-specific passthrough) since Converse's + // typed toolSpec shape can't express server tools like bash_*, computer_*, + // memory_*, text_editor_*, tool_search_tool_*. Fields injected: + // - tools: array of server tools in Anthropic-native shape, which + // Bedrock merges into the underlying Messages request. + // - anthropic_beta: activation header(s) for the relevant server tool, in + // addition to whatever the existing anthropic-beta HTTP + // header path in bedrock.go:214/447 already forwards. + // - tool_choice: Anthropic-native pin for a kept server tool OR an + // any/required contract when only server tools are + // present. Emitted only when Converse's typed + // toolConfig.toolChoice path can't express the intent + // (see buildBedrockServerToolChoice). + if serverTools, betaHeaders := collectBedrockServerToolsFromFiltered(filteredTools); len(serverTools) > 0 { + if bedrockReq.AdditionalModelRequestFields == nil { + bedrockReq.AdditionalModelRequestFields = schemas.NewOrderedMap() + } + bedrockReq.AdditionalModelRequestFields.Set("tools", serverTools) + if len(betaHeaders) > 0 { + bedrockReq.AdditionalModelRequestFields.Set("anthropic_beta", betaHeaders) + } + // Skip the tunneled tool_choice when response_format forces the synthetic + // bf_so_* tool at lines 263-275 below; otherwise Bedrock receives two + // conflicting tool-choice directives and the structured-output contract + // can silently break. + if responseFormatTool == nil { + if choice, ok := buildBedrockServerToolChoice(bifrostReq.Params, filteredTools); ok { + bedrockReq.AdditionalModelRequestFields.Set("tool_choice", choice) + } + } + } + // Convert reasoning config if bifrostReq.Params.Reasoning != nil { if bedrockReq.AdditionalModelRequestFields == nil { @@ -1087,14 +1125,225 @@ func convertInferenceConfig(params *schemas.ChatParameters) *BedrockInferenceCon return &config } -// convertToolConfig converts Bifrost tools to Bedrock tool config +// collectBedrockServerTools partitions kept tools into the function/custom +// set (which convertToolConfig materializes into Converse's toolConfig.tools) +// and the kept-server-tool set (which cannot be expressed via Converse's +// typed toolSpec slot and must be tunneled via additionalModelRequestFields). +// +// Returns: +// - serverTools: each ChatTool serialized to its Anthropic-native JSON shape +// (e.g. `{"type":"computer_20251124","name":"computer","display_width_px":1280}`) +// ready to drop into additionalModelRequestFields.tools. Per the comment on +// ChatTool in core/schemas/chatcompletions.go:340-351, the default marshaler +// produces this shape directly — no custom codec needed. +// - betaHeaders: anthropic-beta header values derived from the server tool +// Types, filtered through FilterBetaHeadersForProvider(schemas.Bedrock) so +// only Bedrock-approved headers survive. Only high-confidence mappings are +// derived here (computer_* and memory_*); callers relying on other betas +// (e.g. text_editor-specific headers) should continue supplying them via +// extra-headers / ctx — they flow through bedrock.go's existing +// anthropic-beta HTTP header path. +// +// Unsupported server tools (e.g. web_search on Bedrock) are dropped upstream +// by ValidateChatToolsForProvider, so they never reach this helper. +func collectBedrockServerTools(params *schemas.ChatParameters) (serverTools []json.RawMessage, betaHeaders []string) { + if params == nil || len(params.Tools) == 0 { + return nil, nil + } + filtered, _ := anthropic.ValidateChatToolsForProvider(params.Tools, schemas.Bedrock) + return collectBedrockServerToolsFromFiltered(filtered) +} + +// collectBedrockServerToolsFromFiltered is the inner variant that accepts a +// pre-filtered tool set (already run through ValidateChatToolsForProvider). +// convertChatParameters filters once and passes the result to both this helper +// and convertToolConfigFromFiltered to avoid re-filtering twice per request. +func collectBedrockServerToolsFromFiltered(filtered []schemas.ChatTool) (serverTools []json.RawMessage, betaHeaders []string) { + if len(filtered) == 0 { + return nil, nil + } + seenBeta := make(map[string]struct{}) + for _, tool := range filtered { + if tool.Function != nil || tool.Custom != nil { + continue + } + bytes, err := providerUtils.MarshalSorted(tool) + if err != nil { + continue + } + serverTools = append(serverTools, json.RawMessage(bytes)) + for _, h := range deriveBedrockBetaHeadersForToolType(string(tool.Type)) { + if _, ok := seenBeta[h]; ok { + continue + } + seenBeta[h] = struct{}{} + betaHeaders = append(betaHeaders, h) + } + } + if len(betaHeaders) > 0 { + // Gate through the Bedrock-approved beta-header list. + betaHeaders = anthropic.FilterBetaHeadersForProvider(betaHeaders, schemas.Bedrock) + } + return serverTools, betaHeaders +} + +// buildBedrockServerToolChoice emits an Anthropic-native tool_choice value +// for tunneling through additionalModelRequestFields.tool_choice ONLY when +// Converse's typed toolConfig.toolChoice path cannot express the caller's +// intent: +// +// - Named pin of a kept server tool: convertToolConfig builds toolConfig.tools +// from function/custom tools only, and its reconciliation (around line +// 1274) drops any named pin that doesn't match an entry in that slice. +// Server-tool names never appear there, so a legitimate pin like +// tool_choice={type:"function", function:{name:"computer"}} gets silently +// nuked. We tunnel {"type":"tool","name":"computer"} instead so the +// forced-tool contract reaches Anthropic via Bedrock's merge. +// - any/required with only server tools: convertToolConfig returns nil +// entirely (empty-slice guard since bedrockTools is empty), so the typed +// "any" contract is lost. We tunnel {"type":"any"} to preserve it. +// +// Returns (nil, false) when the typed Converse path is adequate (auto/none, +// function-tool pin, any with function tools present, or a pin whose name +// doesn't match any kept server tool). +// +// Anthropic tool_choice shape ref: platform.claude.com/docs/en/docs/agents-and-tools/tool-use/define-tools +// ("Controlling Claude's output / Forcing tool use" — four options: +// auto, any, tool, none; forced tool shape is {"type":"tool","name":"..."}). +func buildBedrockServerToolChoice(params *schemas.ChatParameters, filtered []schemas.ChatTool) (json.RawMessage, bool) { + if params == nil || params.ToolChoice == nil { + return nil, false + } + + // Resolve effective type and optional pinned name from either the string + // or struct representation of ChatToolChoice. + var ( + choiceType schemas.ChatToolChoiceType + pinnedName string + ) + if params.ToolChoice.ChatToolChoiceStr != nil { + choiceType = schemas.ChatToolChoiceType(*params.ToolChoice.ChatToolChoiceStr) + } else if params.ToolChoice.ChatToolChoiceStruct != nil { + s := params.ToolChoice.ChatToolChoiceStruct + choiceType = s.Type + if s.Function != nil { + pinnedName = s.Function.Name + } else if s.Custom != nil { + pinnedName = s.Custom.Name + } + } else { + return nil, false + } + + // Partition kept tools: server-tool name set, plus whether any + // function/custom tool is present. + serverToolNames := make(map[string]struct{}) + hasFunctionOrCustom := false + for _, tool := range filtered { + if tool.Function != nil || tool.Custom != nil { + hasFunctionOrCustom = true + continue + } + if tool.Name != "" { + serverToolNames[tool.Name] = struct{}{} + } + } + + switch choiceType { + case schemas.ChatToolChoiceTypeFunction, schemas.ChatToolChoiceTypeCustom, + schemas.ChatToolChoiceType("tool"): + // Only tunnel when the pinned name matches a kept server tool. + // Function/custom pins stay on the typed Converse path. + if pinnedName == "" { + return nil, false + } + if _, ok := serverToolNames[pinnedName]; !ok { + return nil, false + } + bytes, err := providerUtils.MarshalSorted(map[string]any{ + "type": "tool", + "name": pinnedName, + }) + if err != nil { + return nil, false + } + return json.RawMessage(bytes), true + + case schemas.ChatToolChoiceTypeAny, schemas.ChatToolChoiceTypeRequired: + // When function/custom tools are present, Converse's typed + // toolChoice.any handles the any contract — don't double-emit. + if hasFunctionOrCustom || len(serverToolNames) == 0 { + return nil, false + } + bytes, err := providerUtils.MarshalSorted(map[string]any{"type": "any"}) + if err != nil { + return nil, false + } + return json.RawMessage(bytes), true + + default: + // auto, none, allowed_tools, empty, unknown — no tunneling. + return nil, false + } +} + +// deriveBedrockBetaHeadersForToolType maps an Anthropic server-tool Type string +// to the anthropic-beta header(s) Bedrock requires for the feature to activate. +// Only high-confidence mappings are encoded here — both are anchored in +// core/providers/anthropic/types.go (cite: B-header comments around lines 178-183). +// Unknown prefixes return nil; callers can still inject betas via extra-headers. +func deriveBedrockBetaHeadersForToolType(toolType string) []string { + switch { + case strings.HasPrefix(toolType, "computer_"): + // computer_YYYYMMDD → computer-use-YYYY-MM-DD (Bedrock B-header). + rest := strings.TrimPrefix(toolType, "computer_") + if len(rest) == 8 { + return []string{"computer-use-" + rest[0:4] + "-" + rest[4:6] + "-" + rest[6:8]} + } + return nil + case strings.HasPrefix(toolType, "memory_"): + // Memory activates via the context-management bundle on Bedrock + // (see anthropic/types.go:179 — "context-management-2025-06-27 per + // B-header (bundles memory)"). + return []string{"context-management-2025-06-27"} + } + return nil +} + +// convertToolConfig converts Bifrost tools to Bedrock tool config. +// +// Responsibilities (split from collectBedrockServerTools): +// - Filters server tools the target provider doesn't support via +// ValidateChatToolsForProvider (e.g. web_search on Bedrock per cited +// docs — AWS user guide beta-header list, Anthropic overview feature +// table). Silently stripped. +// - Materializes function/custom tools into Converse's typed toolConfig.tools. +// Kept server tools (bash_*, computer_*, memory_*, text_editor_*, +// tool_search_tool_*) are NOT emitted here — they are handled separately +// by collectBedrockServerTools → additionalModelRequestFields.tools, since +// Converse's toolSpec slot has no shape for them. +// - Returns nil instead of an empty-slice ToolConfig, since Bedrock's +// Converse API rejects `"toolConfig": {"tools": []}` with a 400. func convertToolConfig(model string, params *schemas.ChatParameters) *BedrockToolConfig { - if len(params.Tools) == 0 { + if params == nil || len(params.Tools) == 0 { + return nil + } + // Strip unsupported server tools before the conversion loop. + filtered, _ := anthropic.ValidateChatToolsForProvider(params.Tools, schemas.Bedrock) + return convertToolConfigFromFiltered(model, params, filtered) +} + +// convertToolConfigFromFiltered is the inner variant that accepts a +// pre-filtered tool set. convertChatParameters uses this to avoid filtering +// twice (once here, once in collectBedrockServerTools). The public +// convertToolConfig entry point is a thin wrapper preserved for tests. +func convertToolConfigFromFiltered(model string, params *schemas.ChatParameters, filtered []schemas.ChatTool) *BedrockToolConfig { + if params == nil { return nil } var bedrockTools []BedrockTool - for _, tool := range params.Tools { + for _, tool := range filtered { if tool.Function != nil { // Serialize the parameters (or a default empty schema) to json.RawMessage var schemaObjectBytes []byte @@ -1138,6 +1387,15 @@ func convertToolConfig(model string, params *schemas.ChatParameters) *BedrockToo } } + // Empty-guard: Bedrock's Converse API rejects {"toolConfig": {"tools": []}} + // with a 400 "The provided request is not valid". If every incoming tool + // was filtered out above (e.g. only server tools the target provider + // doesn't support), omit ToolConfig entirely so the request is valid and + // the model simply answers without tool access. + if len(bedrockTools) == 0 { + return nil + } + toolConfig := &BedrockToolConfig{ Tools: bedrockTools, } @@ -1146,7 +1404,28 @@ func convertToolConfig(model string, params *schemas.ChatParameters) *BedrockToo if params.ToolChoice != nil { toolChoice := convertToolChoice(*params.ToolChoice) if toolChoice != nil { - toolConfig.ToolChoice = toolChoice + // Reconcile: if the choice forces a specific tool by name, + // verify that name still exists in the filtered tool set. + // Without this, a caller that pinned a server tool we just + // stripped (e.g. web_search on Bedrock) would ship a + // toolChoice.tool.name ∉ tools, and Bedrock's Converse API + // rejects that with a 400 ValidationException — defeating + // the silent-strip contract. + if toolChoice.Tool != nil && toolChoice.Tool.Name != "" { + found := false + for _, bt := range bedrockTools { + if bt.ToolSpec != nil && bt.ToolSpec.Name == toolChoice.Tool.Name { + found = true + break + } + } + if !found { + toolChoice = nil + } + } + if toolChoice != nil { + toolConfig.ToolChoice = toolChoice + } } } diff --git a/core/providers/cohere/cohere.go b/core/providers/cohere/cohere.go index 4386a55d11..fd50f6d6d5 100644 --- a/core/providers/cohere/cohere.go +++ b/core/providers/cohere/cohere.go @@ -518,6 +518,7 @@ func (provider *CohereProvider) ChatCompletionStream(ctx *schemas.BifrostContext // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ChatCompletionStreamRequest, provider.logger) @@ -806,6 +807,7 @@ func (provider *CohereProvider) ResponsesStream(ctx *schemas.BifrostContext, pos // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ResponsesStreamRequest, provider.logger) diff --git a/core/providers/elevenlabs/elevenlabs.go b/core/providers/elevenlabs/elevenlabs.go index 8b144b63b0..31d3a4761a 100644 --- a/core/providers/elevenlabs/elevenlabs.go +++ b/core/providers/elevenlabs/elevenlabs.go @@ -426,6 +426,7 @@ func (provider *ElevenlabsProvider) SpeechStream(ctx *schemas.BifrostContext, po // which immediately unblocks any in-progress read (including reads blocked inside a gzip decompression layer). stopCancellation := providerUtils.SetupStreamCancellation(ctx, resp.BodyStream(), provider.logger) defer stopCancellation() + defer providerUtils.EnsureStreamFinalizerCalled(ctx) // read binary audio chunks from the stream // 4KB buffer for reading chunks diff --git a/core/providers/gemini/gemini.go b/core/providers/gemini/gemini.go index 1dd0842158..f4c4a32c01 100644 --- a/core/providers/gemini/gemini.go +++ b/core/providers/gemini/gemini.go @@ -481,6 +481,7 @@ func HandleGeminiChatCompletionStream( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, model, schemas.ChatCompletionStreamRequest, logger) @@ -1017,6 +1018,7 @@ func HandleGeminiResponsesStream( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, model, schemas.ResponsesStreamRequest, logger) @@ -1553,6 +1555,7 @@ func (provider *GeminiProvider) SpeechStream(ctx *schemas.BifrostContext, postHo // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.SpeechStreamRequest, provider.logger) @@ -1871,6 +1874,7 @@ func (provider *GeminiProvider) TranscriptionStream(ctx *schemas.BifrostContext, // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.TranscriptionStreamRequest, provider.logger) @@ -4557,6 +4561,7 @@ func (provider *GeminiProvider) PassthroughStream( ch := make(chan *schemas.BifrostStreamChunk, schemas.DefaultStreamBufferSize) go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, ch, provider.GetProviderKey(), req.Model, schemas.PassthroughStreamRequest, provider.logger) diff --git a/core/providers/huggingface/huggingface.go b/core/providers/huggingface/huggingface.go index 1fed844d31..4039f2d36b 100644 --- a/core/providers/huggingface/huggingface.go +++ b/core/providers/huggingface/huggingface.go @@ -1160,6 +1160,7 @@ func (provider *HuggingFaceProvider) ImageGenerationStream(ctx *schemas.BifrostC // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer providerUtils.ReleaseStreamingResponse(resp) defer close(responseChan) @@ -1579,6 +1580,7 @@ func (provider *HuggingFaceProvider) ImageEditStream(ctx *schemas.BifrostContext // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer providerUtils.ReleaseStreamingResponse(resp) defer close(responseChan) diff --git a/core/providers/mistral/mistral.go b/core/providers/mistral/mistral.go index 597bf6c239..a13e10d58a 100644 --- a/core/providers/mistral/mistral.go +++ b/core/providers/mistral/mistral.go @@ -602,6 +602,7 @@ func (provider *MistralProvider) TranscriptionStream(ctx *schemas.BifrostContext // which immediately unblocks any in-progress read (including reads blocked inside a gzip decompression layer). stopCancellation := providerUtils.SetupStreamCancellation(ctx, resp.BodyStream(), provider.logger) defer stopCancellation() + defer providerUtils.EnsureStreamFinalizerCalled(ctx) sseReader := providerUtils.GetSSEEventReader(ctx, reader) chunkIndex := -1 diff --git a/core/providers/openai/openai.go b/core/providers/openai/openai.go index 41eee7b9a9..810ee583d7 100644 --- a/core/providers/openai/openai.go +++ b/core/providers/openai/openai.go @@ -531,6 +531,7 @@ func HandleOpenAITextCompletionStreaming( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.TextCompletionStreamRequest, logger) @@ -1090,6 +1091,7 @@ func HandleOpenAIChatCompletionStreaming( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, streamRequestType, logger) @@ -1694,6 +1696,7 @@ func HandleOpenAIResponsesStreaming( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ResponsesStreamRequest, logger) @@ -2315,6 +2318,7 @@ func HandleOpenAISpeechStreamRequest( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.SpeechStreamRequest, logger) @@ -2756,6 +2760,7 @@ func HandleOpenAITranscriptionStreamRequest( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.TranscriptionStreamRequest, logger) @@ -3204,6 +3209,7 @@ func HandleOpenAIImageGenerationStreaming( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ImageGenerationStreamRequest, logger) @@ -4466,6 +4472,7 @@ func HandleOpenAIImageEditStreamRequest( // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ImageEditStreamRequest, logger) @@ -7209,6 +7216,7 @@ func (provider *OpenAIProvider) PassthroughStream( ch := make(chan *schemas.BifrostStreamChunk, schemas.DefaultStreamBufferSize) go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, ch, provider.GetProviderKey(), req.Model, schemas.PassthroughStreamRequest, provider.logger) diff --git a/core/providers/replicate/replicate.go b/core/providers/replicate/replicate.go index 373dbb5bbf..cea010c6b9 100644 --- a/core/providers/replicate/replicate.go +++ b/core/providers/replicate/replicate.go @@ -587,6 +587,7 @@ func (provider *ReplicateProvider) TextCompletionStream(ctx *schemas.BifrostCont // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, provider.GetProviderKey(), request.Model, schemas.TextCompletionStreamRequest, provider.logger) @@ -962,6 +963,7 @@ func (provider *ReplicateProvider) ChatCompletionStream(ctx *schemas.BifrostCont // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, provider.GetProviderKey(), request.Model, schemas.ChatCompletionStreamRequest, provider.logger) @@ -1389,6 +1391,10 @@ func (provider *ReplicateProvider) ResponsesStream(ctx *schemas.BifrostContext, // Start streaming in a goroutine go func() { + // Registered first so the post-hook span finalizer runs on every exit + // path — including the empty-reader early return below, which would + // otherwise skip any finalizer declared later in this goroutine. + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, provider.GetProviderKey(), request.Model, schemas.ResponsesStreamRequest, provider.logger) @@ -2014,6 +2020,7 @@ func (provider *ReplicateProvider) ImageGenerationStream(ctx *schemas.BifrostCon // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ImageGenerationStreamRequest, provider.logger) @@ -2450,6 +2457,7 @@ func (provider *ReplicateProvider) ImageEditStream(ctx *schemas.BifrostContext, // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.ImageEditStreamRequest, provider.logger) diff --git a/core/providers/utils/stream.go b/core/providers/utils/stream.go index 6b3ea417b4..1cdd011602 100644 --- a/core/providers/utils/stream.go +++ b/core/providers/utils/stream.go @@ -1,6 +1,8 @@ package utils import ( + "context" + schemas "github.com/maximhq/bifrost/core/schemas" ) @@ -20,7 +22,12 @@ import ( // // If the source channel is closed immediately (empty stream), it returns a // nil channel with nil error. drainDone is already closed. +// +// The ctx argument cancels the background forwarding goroutine if the consumer +// abandons the returned wrapped channel. On ctx.Done the goroutine drains the +// source stream so the upstream provider's blocked send can exit cleanly. func CheckFirstStreamChunkForError( + ctx context.Context, stream chan *schemas.BifrostStreamChunk, ) (chan *schemas.BifrostStreamChunk, <-chan struct{}, *schemas.BifrostError) { firstChunk, ok := <-stream @@ -53,7 +60,15 @@ func CheckFirstStreamChunkForError( defer close(done) defer close(wrapped) for chunk := range stream { - wrapped <- chunk + select { + case wrapped <- chunk: + case <-ctx.Done(): + // Consumer abandoned the wrapped channel. Drain the source so the + // provider's blocked send unblocks and its goroutine can exit. + for range stream { + } + return + } } }() return wrapped, done, nil diff --git a/core/providers/utils/stream_test.go b/core/providers/utils/stream_test.go index 45c88853fa..7e843fa4fd 100644 --- a/core/providers/utils/stream_test.go +++ b/core/providers/utils/stream_test.go @@ -1,7 +1,9 @@ package utils import ( + "context" "testing" + "time" schemas "github.com/maximhq/bifrost/core/schemas" ) @@ -18,7 +20,7 @@ func TestCheckFirstStreamChunk_ErrorInFirstChunk(t *testing.T) { } close(stream) - _, drainDone, err := CheckFirstStreamChunkForError(stream) + _, drainDone, err := CheckFirstStreamChunkForError(context.Background(), stream) if err == nil { t.Fatal("expected error, got nil") } @@ -47,7 +49,7 @@ func TestCheckFirstStreamChunk_ValidFirstChunk(t *testing.T) { stream <- chunk2 close(stream) - wrapped, _, err := CheckFirstStreamChunkForError(stream) + wrapped, _, err := CheckFirstStreamChunkForError(context.Background(), stream) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -75,7 +77,7 @@ func TestCheckFirstStreamChunk_EmptyStream(t *testing.T) { stream := make(chan *schemas.BifrostStreamChunk) close(stream) - wrapped, drainDone, err := CheckFirstStreamChunkForError(stream) + wrapped, drainDone, err := CheckFirstStreamChunkForError(context.Background(), stream) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -110,7 +112,7 @@ func TestCheckFirstStreamChunk_ErrorInSecondChunk(t *testing.T) { close(stream) // Should NOT return error — only first chunk matters for retry - wrapped, _, err := CheckFirstStreamChunkForError(stream) + wrapped, _, err := CheckFirstStreamChunkForError(context.Background(), stream) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -149,7 +151,7 @@ func TestCheckFirstStreamChunk_ErrorDrainsSource(t *testing.T) { } close(stream) - _, drainDone, err := CheckFirstStreamChunkForError(stream) + _, drainDone, err := CheckFirstStreamChunkForError(context.Background(), stream) if err == nil { t.Fatal("expected error, got nil") } @@ -176,7 +178,7 @@ func TestCheckFirstStreamChunk_ErrorWithEmptyMessage(t *testing.T) { } close(stream) - wrapped, _, err := CheckFirstStreamChunkForError(stream) + wrapped, _, err := CheckFirstStreamChunkForError(context.Background(), stream) if err != nil { t.Fatalf("unexpected error for empty message: %v", err) } @@ -184,6 +186,49 @@ func TestCheckFirstStreamChunk_ErrorWithEmptyMessage(t *testing.T) { <-wrapped } +func TestCheckFirstStreamChunk_CtxCancelUnblocksWrapper(t *testing.T) { + // Source with cap=1 so wrapped also has cap=1. wrapped is left full by + // the re-injected first chunk, which makes the forwarder goroutine block + // on its next send — the exact leak condition this test guards against. + src := make(chan *schemas.BifrostStreamChunk, 1) + src <- &schemas.BifrostStreamChunk{ + BifrostChatResponse: &schemas.BifrostChatResponse{ID: "1"}, + } + + ctx, cancel := context.WithCancel(context.Background()) + + wrapped, drainDone, err := CheckFirstStreamChunkForError(ctx, src) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if wrapped == nil { + t.Fatal("expected wrapped channel, got nil") + } + + // Push a second chunk; forwarder will read it from src and then block + // trying to send into the full wrapped channel (we intentionally never + // read from wrapped). + src <- &schemas.BifrostStreamChunk{ + BifrostChatResponse: &schemas.BifrostChatResponse{ID: "2"}, + } + + // Cancel — forwarder must stop trying to send to wrapped and drain src. + cancel() + + // Simulate the upstream producer still emitting, then closing. The + // drain loop should consume these and terminate. + src <- &schemas.BifrostStreamChunk{ + BifrostChatResponse: &schemas.BifrostChatResponse{ID: "3"}, + } + close(src) + + select { + case <-drainDone: + case <-time.After(time.Second): + t.Fatal("drainDone did not close after ctx cancel; forwarder goroutine leaked") + } +} + func TestCheckFirstStreamChunk_CodeOnlyError(t *testing.T) { // Error with code but no message should be treated as an error stream := make(chan *schemas.BifrostStreamChunk, 2) @@ -196,7 +241,7 @@ func TestCheckFirstStreamChunk_CodeOnlyError(t *testing.T) { } close(stream) - _, drainDone, err := CheckFirstStreamChunkForError(stream) + _, drainDone, err := CheckFirstStreamChunkForError(context.Background(), stream) if err == nil { t.Fatal("expected error for code-only error, got nil") } diff --git a/core/providers/utils/utils.go b/core/providers/utils/utils.go index 0c67b2f99f..efd1ea992b 100644 --- a/core/providers/utils/utils.go +++ b/core/providers/utils/utils.go @@ -1893,6 +1893,41 @@ func ProcessAndSendBifrostError( } } +// EnsureStreamFinalizerCalled invokes the post-hook span finalizer registered +// on ctx, if any. Designed to be deferred as the last line of defence in a +// provider's streaming goroutine (next to SetupStreamCancellation's cleanup): +// +// defer providerUtils.EnsureStreamFinalizerCalled(ctx) +// +// On a normal stream end the finalizer is already invoked when the final chunk +// is processed (via completeDeferredSpan). The registration wraps the closure +// in sync.Once, so this safety-net call is a noop in that case. It only does +// real work when the streaming goroutine exits without reaching the final-chunk +// path — e.g. a panic mid-stream — which would otherwise leak the plugin +// pipeline back-reference held by the finalizer closure. +// +// Panics inside the finalizer are recovered and logged so they never mask an +// in-flight panic that triggered the defer. +func EnsureStreamFinalizerCalled(ctx context.Context) { + // Install the recover first so any panic — including one triggered by + // accessing ctx itself — is caught. This matters because this helper is + // called from `defer`, so a panic here would mask the in-flight panic + // that invoked the defer. + defer func() { + if r := recover(); r != nil { + getLogger().Debug("recovered panic in deferred stream finalizer: %v", r) + } + }() + if ctx == nil { + return + } + finalizer, ok := ctx.Value(schemas.BifrostContextKeyPostHookSpanFinalizer).(func(context.Context)) + if !ok || finalizer == nil { + return + } + finalizer(ctx) +} + // SetupStreamCancellation spawns a goroutine that closes the body stream when // the context is cancelled or deadline exceeded, unblocking any blocked Read/Scan operations. // Returns a cleanup function that MUST be called when streaming is done to diff --git a/core/providers/vertex/vertex.go b/core/providers/vertex/vertex.go index 37e7af90f2..1f5682d692 100644 --- a/core/providers/vertex/vertex.go +++ b/core/providers/vertex/vertex.go @@ -3466,6 +3466,7 @@ func (provider *VertexProvider) PassthroughStream( ch := make(chan *schemas.BifrostStreamChunk, schemas.DefaultStreamBufferSize) go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, ch, provider.GetProviderKey(), req.Model, schemas.PassthroughStreamRequest, provider.logger) diff --git a/core/providers/vllm/vllm.go b/core/providers/vllm/vllm.go index 5cafe918af..a6b7b1c48c 100644 --- a/core/providers/vllm/vllm.go +++ b/core/providers/vllm/vllm.go @@ -524,6 +524,7 @@ func (provider *VLLMProvider) TranscriptionStream(ctx *schemas.BifrostContext, p // Start streaming in a goroutine go func() { + defer providerUtils.EnsureStreamFinalizerCalled(ctx) defer func() { if ctx.Err() == context.Canceled { providerUtils.HandleStreamCancellation(ctx, postHookRunner, responseChan, providerName, request.Model, schemas.TranscriptionStreamRequest, logger) diff --git a/core/schemas/images.go b/core/schemas/images.go index d16df42a10..1944f96d3f 100644 --- a/core/schemas/images.go +++ b/core/schemas/images.go @@ -69,8 +69,24 @@ type BifrostImageGenerationResponse struct { // - Size on ImageGenerationResponseParameters (from request params if not in response) // - Quality (low, medium, high, auto) only func (r *BifrostImageGenerationResponse) BackfillParams(req *BifrostRequest) { + if r == nil || req == nil { + return + } numInputImages, size, quality := getNumInputImagesSizeAndQualityFromRequest(req) + // Backfill Model from whichever inner request carries it. Some provider APIs + // (notably OpenAI /v1/images/*) omit model in the response body. + if r.Model == "" { + switch { + case req.ImageGenerationRequest != nil: + r.Model = req.ImageGenerationRequest.Model + case req.ImageEditRequest != nil: + r.Model = req.ImageEditRequest.Model + case req.ImageVariationRequest != nil: + r.Model = req.ImageVariationRequest.Model + } + } + // Backfill NumInputImages if numInputImages > 0 { if r.Usage == nil { diff --git a/core/schemas/videos.go b/core/schemas/videos.go index 9e133c7d52..b1d134889c 100644 --- a/core/schemas/videos.go +++ b/core/schemas/videos.go @@ -156,6 +156,9 @@ func (r *BifrostVideoGenerationResponse) BackfillParams(req *BifrostRequest) { if seconds != nil { r.Seconds = seconds } + if r.Model == "" && req.VideoGenerationRequest != nil { + r.Model = req.VideoGenerationRequest.Model + } } // --- Video Remix --- diff --git a/examples/plugins/hello-world/go.mod b/examples/plugins/hello-world/go.mod index 2c895cdafc..f2283117bc 100644 --- a/examples/plugins/hello-world/go.mod +++ b/examples/plugins/hello-world/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/examples/plugins/hello-world -go 1.26.2 +go 1.26.1 require github.com/maximhq/bifrost/core v1.4.19 diff --git a/framework/go.mod b/framework/go.mod index e872c33262..b70ae24f45 100644 --- a/framework/go.mod +++ b/framework/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/framework -go 1.26.2 +go 1.26.1 require ( github.com/google/uuid v1.6.0 diff --git a/plugins/governance/go.mod b/plugins/governance/go.mod index 13949872e0..c3bb0b9df0 100644 --- a/plugins/governance/go.mod +++ b/plugins/governance/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/governance -go 1.26.2 +go 1.26.1 require gorm.io/gorm v1.31.1 diff --git a/plugins/jsonparser/go.mod b/plugins/jsonparser/go.mod index f56f394e76..a90cae75b8 100644 --- a/plugins/jsonparser/go.mod +++ b/plugins/jsonparser/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/jsonparser -go 1.26.2 +go 1.26.1 require github.com/maximhq/bifrost/core v1.4.19 diff --git a/plugins/litellmcompat/go.mod b/plugins/litellmcompat/go.mod index e2c5d88a52..b7f1de93f4 100644 --- a/plugins/litellmcompat/go.mod +++ b/plugins/litellmcompat/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/litellmcompat -go 1.26.2 +go 1.26.1 require ( github.com/maximhq/bifrost/core v1.4.19 diff --git a/plugins/logging/go.mod b/plugins/logging/go.mod index 847efe0d82..010df98bac 100644 --- a/plugins/logging/go.mod +++ b/plugins/logging/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/logging -go 1.26.2 +go 1.26.1 require ( github.com/bytedance/sonic v1.15.0 diff --git a/plugins/maxim/go.mod b/plugins/maxim/go.mod index ee75d3ae18..68c36f46c6 100644 --- a/plugins/maxim/go.mod +++ b/plugins/maxim/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/maxim -go 1.26.2 +go 1.26.1 require ( github.com/maximhq/bifrost/core v1.4.19 diff --git a/plugins/mocker/go.mod b/plugins/mocker/go.mod index 34ebd36f31..c106d39ca7 100644 --- a/plugins/mocker/go.mod +++ b/plugins/mocker/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/mocker -go 1.26.2 +go 1.26.1 require ( github.com/jaswdr/faker/v2 v2.8.0 diff --git a/plugins/otel/go.mod b/plugins/otel/go.mod index 28c10d5ba2..79711eeef8 100644 --- a/plugins/otel/go.mod +++ b/plugins/otel/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/otel -go 1.26.2 +go 1.26.1 require ( github.com/maximhq/bifrost/core v1.4.19 diff --git a/plugins/semanticcache/go.mod b/plugins/semanticcache/go.mod index 8848124240..47dea89450 100644 --- a/plugins/semanticcache/go.mod +++ b/plugins/semanticcache/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/semanticcache -go 1.26.2 +go 1.26.1 require ( github.com/cespare/xxhash/v2 v2.3.0 diff --git a/plugins/telemetry/go.mod b/plugins/telemetry/go.mod index 0a3fc77f42..15f3f41268 100644 --- a/plugins/telemetry/go.mod +++ b/plugins/telemetry/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/plugins/telemetry -go 1.26.2 +go 1.26.1 require ( github.com/maximhq/bifrost/core v1.4.19 diff --git a/tests/scripts/1millogs/go.mod b/tests/scripts/1millogs/go.mod index e51491f776..c4dd43e882 100644 --- a/tests/scripts/1millogs/go.mod +++ b/tests/scripts/1millogs/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/tests/scripts/1millogs -go 1.26.2 +go 1.26.1 require ( github.com/maximhq/bifrost/core v1.4.18 diff --git a/transports/Dockerfile b/transports/Dockerfile index c8e9335064..5a05df2795 100644 --- a/transports/Dockerfile +++ b/transports/Dockerfile @@ -15,7 +15,7 @@ # Skip the copy-build step since we'll copy the files in the Go build stage # --- Go Build Stage: Compile the Go binary --- - FROM golang:1.26.2-alpine3.23@sha256:c2a1f7b2095d046ae14b286b18413a05bb82c9bca9b25fe7ff5efef0f0826166 AS builder + FROM golang:1.26.1-alpine3.23@sha256:2389ebfa5b7f43eeafbd6be0c3700cc46690ef842ad962f6c5bd6be49ed82039 AS builder WORKDIR /app # Install dependencies including gcc for CGO and sqlite diff --git a/transports/Dockerfile.local b/transports/Dockerfile.local index 1a7d9840b4..8761833577 100644 --- a/transports/Dockerfile.local +++ b/transports/Dockerfile.local @@ -18,7 +18,7 @@ # Skip the copy-build step since we'll copy the files in the Go build stage # --- Go Build Stage: Compile the Go binary using local modules --- - FROM golang:1.26.2-alpine3.23@sha256:c2a1f7b2095d046ae14b286b18413a05bb82c9bca9b25fe7ff5efef0f0826166 AS builder + FROM golang:1.26.1-alpine3.23@sha256:2389ebfa5b7f43eeafbd6be0c3700cc46690ef842ad962f6c5bd6be49ed82039 AS builder WORKDIR /build # Install dependencies including gcc for CGO and sqlite diff --git a/transports/bifrost-http/handlers/devpprof.go b/transports/bifrost-http/handlers/devpprof.go index fca8712d9b..6a2390b1eb 100644 --- a/transports/bifrost-http/handlers/devpprof.go +++ b/transports/bifrost-http/handlers/devpprof.go @@ -26,8 +26,8 @@ const ( metricsCollectionInterval = 10 * time.Second // Number of data points to keep (5 minutes / 10 seconds = 30 points) historySize = 30 - // Top allocations to return - topAllocationsCount = 5 + // Top allocations to return per table (cumulative and in-use) + topAllocationsCount = 50 ) // MemoryStats represents memory statistics at a point in time @@ -57,11 +57,12 @@ type RuntimeStats struct { // AllocationInfo represents a single allocation site type AllocationInfo struct { - Function string `json:"function"` - File string `json:"file"` - Line int `json:"line"` - Bytes int64 `json:"bytes"` - Count int64 `json:"count"` + Function string `json:"function"` + File string `json:"file"` + Line int `json:"line"` + Bytes int64 `json:"bytes"` + Count int64 `json:"count"` + Stack []string `json:"stack"` } // GoroutineGroup represents a group of goroutines with the same stack trace @@ -104,12 +105,13 @@ type HistoryPoint struct { // PprofData represents the complete pprof response type PprofData struct { - Timestamp string `json:"timestamp"` - Memory MemoryStats `json:"memory"` - CPU CPUStats `json:"cpu"` - Runtime RuntimeStats `json:"runtime"` - TopAllocations []AllocationInfo `json:"top_allocations"` - History []HistoryPoint `json:"history"` + Timestamp string `json:"timestamp"` + Memory MemoryStats `json:"memory"` + CPU CPUStats `json:"cpu"` + Runtime RuntimeStats `json:"runtime"` + TopAllocations []AllocationInfo `json:"top_allocations"` + InuseAllocations []AllocationInfo `json:"inuse_allocations"` + History []HistoryPoint `json:"history"` } // cpuSample holds a CPU time sample for calculating usage @@ -288,85 +290,138 @@ func (c *MetricsCollector) getCPUStats() CPUStats { return c.currentCPU } -// getTopAllocations analyzes heap profile to find top allocation sites -func getTopAllocations() []AllocationInfo { - // Write heap profile to buffer +// getAllocations analyzes the heap profile and returns two allocation lists +// aggregated by full call stack: +// - cumulative: alloc_space / alloc_objects (total since process start) +// - inuse: inuse_space / inuse_objects (currently live on the heap) +// +// Both are produced from a single pprof.WriteHeapProfile call. +func getAllocations() (cumulative, inuse []AllocationInfo) { var buf bytes.Buffer if err := pprof.WriteHeapProfile(&buf); err != nil { - return []AllocationInfo{} + return nil, nil } - // Parse the protobuf profile p, err := profile.Parse(&buf) if err != nil { - return []AllocationInfo{} + return nil, nil } - // Find the indices for alloc_objects and alloc_space sample types - var allocObjectsIdx, allocSpaceIdx int + allocObjectsIdx, allocSpaceIdx := -1, -1 + inuseObjectsIdx, inuseSpaceIdx := -1, -1 for i, st := range p.SampleType { switch st.Type { case "alloc_objects": allocObjectsIdx = i case "alloc_space": allocSpaceIdx = i + case "inuse_objects": + inuseObjectsIdx = i + case "inuse_space": + inuseSpaceIdx = i } } - // Aggregate allocations by function (top of stack = allocation site) allocMap := make(map[string]*AllocationInfo) + inuseMap := make(map[string]*AllocationInfo) for _, sample := range p.Sample { if len(sample.Location) == 0 { continue } - loc := sample.Location[0] // Top of stack = allocation site - if len(loc.Line) == 0 { + + topLoc := sample.Location[0] + if len(topLoc.Line) == 0 { continue } - line := loc.Line[0] - fn := line.Function - if fn == nil { + topLine := topLoc.Line[0] + topFn := topLine.Function + if topFn == nil { continue } - // Skip allocations from the profiler itself - if isProfilerFunction(fn.Name, fn.Filename) { + // Filter only the top frame — filtering inner frames would drop real + // user allocations that merely pass through runtime/profiler code. + if isProfilerFunction(topFn.Name, topFn.Filename) { continue } - key := fn.Name - if existing, ok := allocMap[key]; ok { - existing.Bytes += sample.Value[allocSpaceIdx] - existing.Count += sample.Value[allocObjectsIdx] - } else { - allocMap[key] = &AllocationInfo{ - Function: fn.Name, - File: fn.Filename, - Line: int(line.Line), - Bytes: sample.Value[allocSpaceIdx], - Count: sample.Value[allocObjectsIdx], + // Build full stack in goroutine-dump format: alternating "funcName" and + // "\tfile:line" entries, top-down. Matches GoroutineGroup.Stack so the + // UI can render both with the same code path. + stack := make([]string, 0, len(sample.Location)*2) + for _, loc := range sample.Location { + if len(loc.Line) == 0 { + continue + } + frame := loc.Line[0] + if frame.Function == nil { + continue + } + stack = append(stack, frame.Function.Name) + stack = append(stack, "\t"+frame.Function.Filename+":"+strconv.FormatInt(frame.Line, 10)) + } + if len(stack) == 0 { + continue + } + key := strings.Join(stack, "\n") + + if allocSpaceIdx >= 0 && allocObjectsIdx >= 0 { + b := sample.Value[allocSpaceIdx] + c := sample.Value[allocObjectsIdx] + if existing, ok := allocMap[key]; ok { + existing.Bytes += b + existing.Count += c + } else { + allocMap[key] = &AllocationInfo{ + Function: topFn.Name, + File: topFn.Filename, + Line: int(topLine.Line), + Bytes: b, + Count: c, + Stack: stack, + } } } - } - // Convert map to slice - allocations := make([]AllocationInfo, 0, len(allocMap)) - for _, alloc := range allocMap { - allocations = append(allocations, *alloc) + if inuseSpaceIdx >= 0 && inuseObjectsIdx >= 0 { + b := sample.Value[inuseSpaceIdx] + c := sample.Value[inuseObjectsIdx] + // Most samples have inuse=0 (already freed) — skip them so the live + // table isn't padded with noise. + if b == 0 && c == 0 { + continue + } + if existing, ok := inuseMap[key]; ok { + existing.Bytes += b + existing.Count += c + } else { + inuseMap[key] = &AllocationInfo{ + Function: topFn.Name, + File: topFn.Filename, + Line: int(topLine.Line), + Bytes: b, + Count: c, + Stack: stack, + } + } + } } - // Sort by bytes descending - sort.Slice(allocations, func(i, j int) bool { - return allocations[i].Bytes > allocations[j].Bytes - }) + return flattenAndTopN(allocMap), flattenAndTopN(inuseMap) +} - // Return top N allocations - if len(allocations) > topAllocationsCount { - allocations = allocations[:topAllocationsCount] +// flattenAndTopN sorts an allocation map by bytes desc and caps it. +func flattenAndTopN(m map[string]*AllocationInfo) []AllocationInfo { + out := make([]AllocationInfo, 0, len(m)) + for _, a := range m { + out = append(out, *a) } - - return allocations + sort.Slice(out, func(i, j int) bool { return out[i].Bytes > out[j].Bytes }) + if len(out) > topAllocationsCount { + out = out[:topAllocationsCount] + } + return out } // RegisterRoutes registers the dev pprof routes @@ -400,9 +455,9 @@ func (h *DevPprofHandler) getPprof(ctx *fasthttp.RequestCtx) { NumCPU: runtime.NumCPU(), GOMAXPROCS: runtime.GOMAXPROCS(0), }, - TopAllocations: getTopAllocations(), - History: h.collector.getHistory(), + History: h.collector.getHistory(), } + data.TopAllocations, data.InuseAllocations = getAllocations() SendJSON(ctx, data) } @@ -688,7 +743,8 @@ var profilerPatterns = []string{ "profile.Parse", "MetricsCollector", "collectLoop", - "getTopAllocations", + "getAllocations", + "flattenAndTopN", "parseGoroutineProfile", "getGoroutines", "getCPUSample", diff --git a/transports/changelog.md b/transports/changelog.md index c48ea02e42..96d7209cb5 100644 --- a/transports/changelog.md +++ b/transports/changelog.md @@ -1,9 +1,17 @@ ## ✨ Features - **Claude Opus 4.7 Support** — Added compatibility for Anthropic's Claude Opus 4.7 model, including adaptive thinking, task-budgets beta header, `display` parameter handling, and "xhigh" effort mapping +- **Anthropic Structured Outputs** — Added `response_format` and structured output support for Anthropic models across chat completions and Responses API, including JSON-schema and JSON-object formats with order-preserving merge of additional model request fields (thanks [@emirhanmutlu-natuvion](https://github.com/emirhanmutlu-natuvion)!) +- **MCP Tool Annotations** — Preserve MCP tool annotations (`title`, `readOnly`, `destructive`, `idempotent`, `openWorld`) in bidirectional conversion between MCP tools and Bifrost chat tools so agents can reason about tool behavior +- **Anthropic Server Tools** — Expanded Anthropic chat schema and Responses converters to surface server-side tools (web search, code execution, computer use containers) end-to-end ## 🐞 Fixed +- **Provider Queue Shutdown Panic** — Eliminated `send on closed channel` panics in provider queue shutdown by leaving queue channels open and exiting workers via the `done` signal; stale producers transparently re-route to new queues during `UpdateProvider`, with rollback on failed updates +- **OpenAI Tool Result Output** — Flatten array-form `tool_result` output into a newline-joined string before marshaling for the Responses API so strict upstreams (Ollama Cloud, openai-go typed models) no longer reject it with HTTP 400; non-text blocks (images, files) are preserved (thanks [@martingiguere](https://github.com/martingiguere)!) +- **vLLM Token Usage** — Treat `delta.content=""` the same as `nil` in streaming so the synthesis chunk retains its `finish_reason`, restoring token usage attribution in logs and UI +- **Config Schema Validator** — Corrected JSON-path lookups for concurrency and SCIM blocks in the schema validation script, and reformatted `transports/config.schema.json` for readability +- **CI Egress Hardening** — Switched `step-security/harden-runner` from `audit` to `block` across all GitHub Actions workflows with explicit `allowed-endpoints` per job - **Gemini Tool Outputs** — Handle content block tool outputs in Responses API path for `function_call_output` messages (thanks [@tom-diacono](https://github.com/tom-diacono)!) - **Bedrock Streaming** — Emit `message_stop` event for Anthropic invoke stream and case-insensitive `anthropic-beta` header merging (thanks [@tefimov](https://github.com/tefimov)!) - **Bedrock Tool Images** — Preserve image content blocks in tool results when converting Anthropic Messages to Bedrock Converse API (thanks [@Edward-Upton](https://github.com/Edward-Upton)!) diff --git a/transports/go.mod b/transports/go.mod index b954bd38c2..046f8e586e 100644 --- a/transports/go.mod +++ b/transports/go.mod @@ -1,6 +1,6 @@ module github.com/maximhq/bifrost/transports -go 1.26.2 +go 1.26.1 require ( github.com/andybalholm/brotli v1.2.0 diff --git a/ui/app/pprof/page.tsx b/ui/app/pprof/page.tsx index a61781989e..4d939f0d38 100644 --- a/ui/app/pprof/page.tsx +++ b/ui/app/pprof/page.tsx @@ -16,7 +16,7 @@ import { RotateCcw, TrendingUp, } from "lucide-react"; -import React, { useCallback, useEffect, useMemo, useState } from "react"; +import React, { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { Area, AreaChart, CartesianGrid, ResponsiveContainer, Tooltip, XAxis, YAxis } from "recharts"; // ============================================================================ @@ -102,6 +102,135 @@ function saveSkippedGoroutineFiles(skipped: Set): void { type AllocationSortField = "function" | "file" | "bytes" | "count"; type SortDirection = "asc" | "desc"; +type AllocationSortState = { field: AllocationSortField; direction: SortDirection }; +type LeakSeverity = "high" | "medium" | "low"; + +interface LeakCandidate { + key: string; + function: string; + file: string; + line: number; + stack: string[]; + liveBytes: number; + cumulativeBytes: number; + retention: number; + liveCount: number; + samples: number[]; + isGrowing: boolean; + growthBytes: number; + severity: LeakSeverity; +} + +// ~60 seconds of history at 10s polling interval +const LEAK_MAX_SAMPLES = 6; +const LEAK_MIN_GROWTH_SAMPLES = 3; +const LEAK_SEVERITY_RANK: Record = { high: 0, medium: 1, low: 2 }; + +function makeStackKey(stack: string[]): string { + return stack.join("\n"); +} + +function isMonotonicGrowing(samples: number[]): boolean { + if (samples.length < LEAK_MIN_GROWTH_SAMPLES) return false; + for (let i = 1; i < samples.length; i++) { + if (samples[i] < samples[i - 1]) return false; + } + return samples[samples.length - 1] > samples[0]; +} + +function classifyLeakSeverity(retention: number, liveBytes: number, isGrowing: boolean): LeakSeverity | null { + const MB = 1024 * 1024; + if (isGrowing && retention >= 0.5 && liveBytes >= MB) return "high"; + if (retention >= 0.8 && liveBytes >= 10 * MB) return "high"; + if (retention >= 0.5 && liveBytes >= MB) return "medium"; + if (retention >= 0.3 && liveBytes >= 100 * 1024) return "low"; + return null; +} + +function detectLeaks( + cumulative: AllocationInfo[], + live: AllocationInfo[], + inuseHistory: Map, +): LeakCandidate[] { + const cumMap = new Map(); + for (const c of cumulative) { + cumMap.set(makeStackKey(c.stack), c); + } + + const candidates: LeakCandidate[] = []; + for (const l of live) { + const key = makeStackKey(l.stack); + const cum = cumMap.get(key); + if (!cum || cum.bytes === 0) continue; + const retention = l.bytes / cum.bytes; + const samples = inuseHistory.get(key) ?? []; + const isGrowing = isMonotonicGrowing(samples); + const growthBytes = samples.length >= 2 ? samples[samples.length - 1] - samples[0] : 0; + const severity = classifyLeakSeverity(retention, l.bytes, isGrowing); + if (!severity) continue; + candidates.push({ + key, + function: l.function, + file: l.file, + line: l.line, + stack: l.stack, + liveBytes: l.bytes, + cumulativeBytes: cum.bytes, + retention, + liveCount: l.count, + samples: [...samples], + isGrowing, + growthBytes, + severity, + }); + } + + candidates.sort((a, b) => { + if (a.severity !== b.severity) return LEAK_SEVERITY_RANK[a.severity] - LEAK_SEVERITY_RANK[b.severity]; + return b.liveBytes - a.liveBytes; + }); + return candidates; +} + +function getLeakSeverityClasses(severity: LeakSeverity): string { + switch (severity) { + case "high": + return "text-red-400 bg-red-400/10 border-red-400/20"; + case "medium": + return "text-amber-400 bg-amber-400/10 border-amber-400/20"; + case "low": + return "text-zinc-400 bg-zinc-400/10 border-zinc-400/20"; + } +} + +function getRetentionColor(retention: number): string { + if (retention >= 0.8) return "text-red-400"; + if (retention >= 0.5) return "text-amber-400"; + return "text-zinc-400"; +} + +function sortAllocations(list: AllocationInfo[], sort: AllocationSortState): AllocationInfo[] { + const sorted = [...list]; + sorted.sort((a, b) => { + let cmp = 0; + switch (sort.field) { + case "function": + cmp = a.function.localeCompare(b.function); + break; + case "file": + cmp = a.file.localeCompare(b.file); + break; + case "bytes": + cmp = a.bytes - b.bytes; + break; + case "count": + cmp = a.count - b.count; + break; + } + return sort.direction === "asc" ? cmp : -cmp; + }); + return sorted; +} // ============================================================================ // Components @@ -139,17 +268,25 @@ function AllocationTable({ sortField, sortDirection, onSort, + expandedKeys, + onToggle, + bytesColorClass = "text-rose-400", + testIdPrefix = "pprof-sort", }: { allocations: AllocationInfo[]; sortField: AllocationSortField; sortDirection: SortDirection; onSort: (field: AllocationSortField) => void; + expandedKeys: Set; + onToggle: (key: string) => void; + bytesColorClass?: string; + testIdPrefix?: string; }) { const SortIcon = sortDirection === "asc" ? ArrowUp : ArrowDown; const SortHeader = ({ field, children }: { field: AllocationSortField; children: React.ReactNode }) => ( - @@ -161,6 +298,7 @@ function AllocationTable({ + - {allocations.map((alloc, i) => ( - - - - - - - ))} + {allocations.map((alloc) => { + const hasStack = alloc.stack && alloc.stack.length > 0; + const key = hasStack ? makeStackKey(alloc.stack) : `${alloc.function}:${alloc.file}:${alloc.line}`; + const isExpanded = expandedKeys.has(key); + return ( + + onToggle(key) : undefined} + onKeyDown={ + hasStack + ? (e) => { + if (e.key === "Enter" || e.key === " ") { + e.preventDefault(); + onToggle(key); + } + } + : undefined + } + data-testid="pprof-alloc-row" + className={`border-b border-zinc-800/50 hover:bg-zinc-800/30 ${hasStack ? "cursor-pointer" : ""}`} + > + + + + + + + {isExpanded && hasStack && ( + + + + )} + + ); + })} {allocations.length === 0 && ( - @@ -199,6 +381,147 @@ function AllocationTable({ ); } +// Leak Candidates Table +function LeakTable({ + candidates, + expandedKeys, + onToggle, +}: { + candidates: LeakCandidate[]; + expandedKeys: Set; + onToggle: (key: string) => void; +}) { + return ( +
+
Function File:Line Bytes @@ -168,27 +306,71 @@ function AllocationTable({
- {alloc.function} - - - {alloc.file}:{alloc.line} - - - {formatBytes(alloc.bytes)} - - {alloc.count.toLocaleString()} -
+ {hasStack ? ( + isExpanded ? : + ) : null} + + {alloc.function} + + + {alloc.file}:{alloc.line} + + + {formatBytes(alloc.bytes)} + + {alloc.count.toLocaleString()} +
+ +
Stack Trace
+
+ {alloc.stack.map((line, j) => ( +
+ {line} +
+ ))} +
+
+ No allocations data available
+ + + + + + + + + + + + + {candidates.map((c) => { + const rowKey = c.key; + const isExpanded = expandedKeys.has(rowKey); + return ( + + onToggle(rowKey)} + onKeyDown={(e) => { + if (e.key === "Enter" || e.key === " ") { + e.preventDefault(); + onToggle(rowKey); + } + }} + data-testid="pprof-leak-row" + className="cursor-pointer border-b border-zinc-800/50 hover:bg-zinc-800/30" + > + + + + + + + + + + {isExpanded && ( + + + + )} + + ); + })} + {candidates.length === 0 && ( + + + + )} + +
+ + Severity + + Function + + File:Line + + Live + + Retention + + Trend + + Live Count +
+ {isExpanded ? : } + + + {c.severity} + + + {c.function} + + + {c.file}:{c.line} + + + {formatBytes(c.liveBytes)} + + + {(c.retention * 100).toFixed(0)}% + + + {c.isGrowing ? ( + + +{formatBytes(c.growthBytes)} + + ) : ( + stable + )} + + {c.liveCount.toLocaleString()} +
+ +
+ + Cumulative: {formatBytes(c.cumulativeBytes)} + + + Retained: {(c.retention * 100).toFixed(1)}% + + {c.samples.length >= 2 && ( + + Last {c.samples.length * 10}s:{" "} + {c.samples.map((b) => formatBytes(b)).join(" → ")} + + )} +
+
Stack Trace
+
+ {c.stack.map((line, j) => ( +
+ {line} +
+ ))} +
+
+ No obvious leak signatures — all live allocations have normal retention ratios. +
+ + ); +} + // Goroutine Group Component function GoroutineGroupRow({ group, @@ -287,10 +610,14 @@ export default function PprofPage() { const [expandedGoroutines, setExpandedGoroutines] = useState>(new Set()); const [skippedGoroutines, setSkippedGoroutines] = useState>(new Set()); const [hasLoadedSkipped, setHasLoadedSkipped] = useState(false); - const [allocationSort, setAllocationSort] = useState<{ - field: AllocationSortField; - direction: SortDirection; - }>({ field: "bytes", direction: "desc" }); + const [allocationSort, setAllocationSort] = useState({ field: "bytes", direction: "desc" }); + const [inuseSort, setInuseSort] = useState({ field: "bytes", direction: "desc" }); + const [expandedAlloc, setExpandedAlloc] = useState>(new Set()); + const [expandedInuse, setExpandedInuse] = useState>(new Set()); + const [expandedLeaks, setExpandedLeaks] = useState>(new Set()); + const inuseHistoryRef = useRef>(new Map()); + const lastInuseSnapshotRef = useRef(null); + const [historyVersion, setHistoryVersion] = useState(0); // Load skipped goroutines from localStorage on client useEffect(() => { @@ -333,29 +660,55 @@ export default function PprofPage() { }, [data?.history]); // Sort allocations - const sortedAllocations = useMemo(() => { - if (!data?.top_allocations) return []; - const sorted = [...data.top_allocations]; - sorted.sort((a, b) => { - let cmp = 0; - switch (allocationSort.field) { - case "function": - cmp = a.function.localeCompare(b.function); - break; - case "file": - cmp = a.file.localeCompare(b.file); - break; - case "bytes": - cmp = a.bytes - b.bytes; - break; - case "count": - cmp = a.count - b.count; - break; - } - return allocationSort.direction === "asc" ? cmp : -cmp; - }); - return sorted; - }, [data?.top_allocations, allocationSort]); + const sortedAllocations = useMemo( + () => sortAllocations(data?.top_allocations ?? [], allocationSort), + [data?.top_allocations, allocationSort], + ); + const sortedInuseAllocations = useMemo( + () => sortAllocations(data?.inuse_allocations ?? [], inuseSort), + [data?.inuse_allocations, inuseSort], + ); + + // Roll a ~60s window of inuse bytes per stack signature so we can detect + // sites whose live memory grows monotonically across polls. Dedupe on + // data.timestamp (stamped fresh by the backend each poll) rather than + // array identity: RTK Query's default structural sharing reuses the + // inuse_allocations reference when the snapshot is deep-equal, which + // would silently skip samples on idle polls and shrink the window. + useEffect(() => { + const inuse = data?.inuse_allocations; + const snapshotTs = data?.timestamp; + if (!inuse || !snapshotTs || lastInuseSnapshotRef.current === snapshotTs) return; + lastInuseSnapshotRef.current = snapshotTs; + const map = inuseHistoryRef.current; + const seen = new Set(); + for (const l of inuse) { + const key = makeStackKey(l.stack); + seen.add(key); + const samples = map.get(key) ?? []; + samples.push(l.bytes); + while (samples.length > LEAK_MAX_SAMPLES) samples.shift(); + map.set(key, samples); + } + // Drop sites absent from the latest snapshot (either freed or evicted + // from top-N) so the map stays bounded. + for (const key of [...map.keys()]) { + if (!seen.has(key)) map.delete(key); + } + setHistoryVersion((v) => v + 1); + }, [data?.timestamp, data?.inuse_allocations]); + + const leakCandidates = useMemo( + () => detectLeaks(data?.top_allocations ?? [], data?.inuse_allocations ?? [], inuseHistoryRef.current), + // historyVersion bumps when the ref is mutated; top/inuse refs change per poll + [data?.top_allocations, data?.inuse_allocations, historyVersion], + ); + + const leakSummary = useMemo(() => { + const counts: Record = { high: 0, medium: 0, low: 0 }; + for (const c of leakCandidates) counts[c.severity]++; + return counts; + }, [leakCandidates]); // Detect goroutine count trend const goroutineTrend = useMemo(() => { @@ -394,6 +747,49 @@ export default function PprofPage() { })); }, []); + const handleInuseSort = useCallback((field: AllocationSortField) => { + setInuseSort((prev) => ({ + field, + direction: prev.field === field && prev.direction === "desc" ? "asc" : "desc", + })); + }, []); + + const toggleAllocExpand = useCallback((key: string) => { + setExpandedAlloc((prev) => { + const next = new Set(prev); + if (next.has(key)) { + next.delete(key); + } else { + next.add(key); + } + return next; + }); + }, []); + + const toggleInuseExpand = useCallback((key: string) => { + setExpandedInuse((prev) => { + const next = new Set(prev); + if (next.has(key)) { + next.delete(key); + } else { + next.add(key); + } + return next; + }); + }, []); + + const toggleLeakExpand = useCallback((key: string) => { + setExpandedLeaks((prev) => { + const next = new Set(prev); + if (next.has(key)) { + next.delete(key); + } else { + next.add(key); + } + return next; + }); + }, []); + const toggleGoroutineExpand = useCallback((id: string) => { setExpandedGoroutines((prev) => { const next = new Set(prev); @@ -635,18 +1031,81 @@ export default function PprofPage() { - {/* Allocations Table */} + {/* Potential Leaks — stacks accumulating live memory without being freed */}
-
- - Memory Allocations - ({sortedAllocations.length} allocations) +
+
+ + Potential Leaks + ({leakCandidates.length} suspicious) + {leakSummary.high > 0 && ( + + {leakSummary.high} high + + )} + {leakSummary.medium > 0 && ( + + {leakSummary.medium} medium + + )} + {leakSummary.low > 0 && ( + + {leakSummary.low} low + + )} +
+

+ Stacks whose live bytes remain a large fraction of what they ever allocated (retention), optionally with + live bytes trending upward over the last minute. Growth + high retention together is the strongest leak + signal. +

+
+ +
+ + {/* Live Heap Allocations — what's currently consuming the heap */} +
+
+
+ + Live Heap Allocations + ({sortedInuseAllocations.length} sites) +
+

+ Call stacks currently holding memory on the heap right now — expand a row to see the full stack. +

+
+ +
+ + {/* Cumulative Memory Allocations — total since process start */} +
+
+
+ + Cumulative Memory Allocations + ({sortedAllocations.length} sites) +
+

+ Total bytes allocated since process start (includes memory already freed) — expand a row to see the full stack. +

diff --git a/ui/lib/store/apis/devApi.ts b/ui/lib/store/apis/devApi.ts index e8cc281dd3..355b320fdc 100644 --- a/ui/lib/store/apis/devApi.ts +++ b/ui/lib/store/apis/devApi.ts @@ -32,6 +32,7 @@ export interface AllocationInfo { line: number bytes: number count: number + stack: string[] } // Single point in the metrics history @@ -51,6 +52,7 @@ export interface PprofData { cpu: CPUStats runtime: RuntimeStats top_allocations: AllocationInfo[] + inuse_allocations: AllocationInfo[] history: HistoryPoint[] }