Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,18 @@ func NewProvider(config schemas.ProviderConfig) (*Provider, error) {
MaxConnsPerHost: config.NetworkConfig.MaxConnsPerHost, // configurable, default 5000
MaxIdleConnDuration: 30 * time.Second,
}
return &Provider{client: client, ...}, nil
// After ConfigureProxy/ConfigureDialer/ConfigureTLS, build a sibling client
// for streaming. BuildStreamingClient zeros ReadTimeout/WriteTimeout/MaxConnDuration
// so streams aren't killed by fasthttp's whole-response deadline; per-chunk idle
// is enforced at the app layer via NewIdleTimeoutReader.
streamingClient := providerUtils.BuildStreamingClient(client)
return &Provider{client: client, streamingClient: streamingClient, ...}, nil
}
```
**Note:** Bedrock uses `net/http` (not fasthttp) with HTTP/2 support. Its `http.Transport` is configured with `ForceAttemptHTTP2: true` and `MaxConnsPerHost` from `NetworkConfig` to allow multiple HTTP/2 connections when the server's per-connection stream limit (100 for AWS Bedrock) is reached.

**Streaming vs unary client:** Every provider holds two clients — `client` for unary requests (`ReadTimeout=30s` bounds the whole response) and `streamingClient` for SSE / EventStream / chunked paths (`ReadTimeout=0`; the per-chunk `NewIdleTimeoutReader` is the only governor). Pass `provider.streamingClient` to every `Handle*Streaming` / `Handle*StreamRequest` helper and to direct `Do` calls inside `*Stream` methods. For new providers, apply the same pattern — missing the switch means streams get killed at 30s.

**Note:** Bedrock uses `net/http` (not fasthttp) with HTTP/2 support. Its `http.Transport` is configured with `ForceAttemptHTTP2: true` and `MaxConnsPerHost` from `NetworkConfig` to allow multiple HTTP/2 connections when the server's per-connection stream limit (100 for AWS Bedrock) is reached. Use `providerUtils.BuildStreamingHTTPClient(client)` to derive the streaming variant — it shares the base `Transport` (safe for concurrent reuse) but clears `Client.Timeout`.

### The Provider Interface

Expand Down Expand Up @@ -509,6 +517,21 @@ In `tests/e2e/core/`, **never marshal API payloads to a `Record`/`Map`/plain-obj

## Testing

### Always prefer `make test-core` over raw `go test` for provider-level tests

The `make test-core` target is the canonical harness for provider tests — it wires up env vars from `.env` (provider API keys), invokes the per-provider `{provider}_test.go` entrypoint in `core/providers/<provider>/`, and routes through the shared `core/internal/llmtests/` scenario suite that validates end-to-end behavior (including streaming).

Running bare `go test ./core/providers/<provider>/...` only executes unit tests and skips the llmtests scenarios — so it won't catch regressions in streaming, tool-calling, or provider-specific response shapes.

```bash
make test-core PROVIDER=anthropic TESTCASE=TestChatCompletionStream # exact test
make test-core PROVIDER=openai PATTERN=Stream # substring match
make test-core PROVIDER=bedrock # all scenarios for one provider
make test-core DEBUG=1 PROVIDER=gemini TESTCASE=TestResponsesStream # attach Delve on :2345
```

`PATTERN` and `TESTCASE` are mutually exclusive. Provider name must match a directory under `core/providers/` (e.g. `anthropic`, `openai`, `bedrock`, `vertex`, `azure`, `gemini`, `cohere`, `mistral`, `groq`, etc.).

### LLM Tests (`core/internal/llmtests/`)

Scenario-based tests that run against **live provider APIs** with dual-API testing (Chat Completions + Responses API):
Expand Down
4 changes: 4 additions & 0 deletions core/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.5 h1:clHU5fm//kWS1C2HgtgWxfQbFbx4b6rx+5jzhgX9HrI=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.5/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 h1:rWyie/PxDRIdhNf4DzRk0lvjVOqFJuNnO8WwaIRVxzQ=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22/go.mod h1:zd/JsJ4P7oGfUhXn1VyLqaRZwPmZwg44Jf2dS84Dm3Y=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13 h1:JRaIgADQS/U6uXDqlPiefP32yXTda7Kqfx+LgspooZM=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13/go.mod h1:CEuVn5WqOMilYl+tbccq8+N2ieCy0gVn3OtRb0vBNNM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 h1:c31//R3xgIJMSC8S6hEVq+38DcvUlgFY0FM6mSI5oto=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21/go.mod h1:r6+pf23ouCB718FUxaqzZdbpYFyDtehyZcmP5KL9FkA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 h1:ZlvrNcHSFFWURB8avufQq9gFsheUgjVD9536obIknfM=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21/go.mod h1:cv3TNhVrssKR0O/xxLJVRfd2oazSnZnkUeTf6ctUwfQ=
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3 h1:HwxWTbTrIHm5qY+CAEur0s/figc3qwvLWsNkF4RPToo=
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3/go.mod h1:uoA43SdFwacedBfSgfFSjjCvYe8aYBS7EnU5GZ/YKMM=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.9 h1:QKZH0S178gCmFEgst8hN0mCX1KxLgHBKKY/CLqwP8lg=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.9/go.mod h1:7yuQJoT+OoH8aqIxw9vwF+8KpvLZ8AWmvmUWHsGQZvI=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.15 h1:lFd1+ZSEYJZYvv9d6kXzhkZu07si3f+GQ1AaYwa2LUM=
Expand Down
9 changes: 9 additions & 0 deletions core/internal/llmtests/chat_completion_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ func RunChatCompletionStreamTest(t *testing.T, client *bifrost.Bifrost, ctx cont
t.Logf("⚠️ Warning: Response ID is empty")
}

// Per-chunk Object validation: bifrost normalizes every streaming chunk
// to the OpenAI shape with Object="chat.completion.chunk", whether the
// upstream provider natively emits it (OpenAI family) or bifrost
// synthesizes it during translation (e.g., Anthropic's type-keyed events).
// A missing/wrong Object here indicates a provider translation regression.
if response.BifrostChatResponse.Object != "chat.completion.chunk" {
t.Errorf("Chunk %d: Object field must be 'chat.completion.chunk', got %q", responseCount+1, response.BifrostChatResponse.Object)
}

// Log latency for each chunk (can be 0 for inter-chunks)
t.Logf("📊 Chunk %d latency: %d ms", responseCount+1, response.BifrostChatResponse.ExtraFields.Latency)

Expand Down
18 changes: 12 additions & 6 deletions core/internal/llmtests/response_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func ValidateChatResponse(t *testing.T, response *schemas.BifrostChatResponse, e
}

// Validate basic structure
validateChatBasicStructure(t, response, expectations, &result)
validateChatBasicStructure(t, response, expectations, &result, scenarioName)

// Validate content
validateChatContent(t, response, expectations, &result)
Expand Down Expand Up @@ -445,11 +445,17 @@ func ValidateCountTokensResponse(t *testing.T, response *schemas.BifrostCountTok
// =============================================================================

// validateChatBasicStructure checks the basic structure of the chat response
func validateChatBasicStructure(t *testing.T, response *schemas.BifrostChatResponse, expectations ResponseExpectations, result *ValidationResult) {
// Check that Object field is not empty (should be "chat.completion" or "chat.completion.chunk")
if response.Object == "" {
result.Passed = false
result.Errors = append(result.Errors, "Object field is empty in chat completion response")
func validateChatBasicStructure(t *testing.T, response *schemas.BifrostChatResponse, expectations ResponseExpectations, result *ValidationResult, scenarioName string) {
// Object is a constant bifrost schema marker ("chat.completion" / "chat.completion.chunk").
// For streaming scenarios, per-chunk validation in chat_completion_stream.go covers this —
// the aggregated/consolidated response built by the harness is a synthetic construct and
// does not carry provider-originating semantics. Skip the check there to avoid asserting
// that the harness remembered to copy a constant forward.
if !strings.Contains(scenarioName, "Stream") {
if response.Object == "" {
result.Passed = false
result.Errors = append(result.Errors, "Object field is empty in chat completion response")
}
}

// Check choice count
Expand Down
11 changes: 7 additions & 4 deletions core/providers/anthropic/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
// AnthropicProvider implements the Provider interface for Anthropic's Claude API.
type AnthropicProvider struct {
logger schemas.Logger // Logger for provider operations
client *fasthttp.Client // HTTP client for API requests
client *fasthttp.Client // HTTP client for unary API requests (ReadTimeout bounds overall response)
streamingClient *fasthttp.Client // HTTP client for streaming API requests (no ReadTimeout; idle governed by NewIdleTimeoutReader)
Comment thread
akshaydeo marked this conversation as resolved.
apiVersion string // API version for the provider
networkConfig schemas.NetworkConfig // Network configuration including extra headers
sendBackRawRequest bool // Whether to include raw request in BifrostResponse
Expand Down Expand Up @@ -101,6 +102,7 @@ func NewAnthropicProvider(config *schemas.ProviderConfig, logger schemas.Logger)
client = providerUtils.ConfigureProxy(client, config.ProxyConfig, logger)
client = providerUtils.ConfigureDialer(client)
client = providerUtils.ConfigureTLS(client, config.NetworkConfig, logger)
streamingClient := providerUtils.BuildStreamingClient(client)
// Set default BaseURL if not provided
if config.NetworkConfig.BaseURL == "" {
config.NetworkConfig.BaseURL = "https://api.anthropic.com"
Expand All @@ -110,6 +112,7 @@ func NewAnthropicProvider(config *schemas.ProviderConfig, logger schemas.Logger)
return &AnthropicProvider{
logger: logger,
client: client,
streamingClient: streamingClient,
apiVersion: "2023-06-01",
networkConfig: config.NetworkConfig,
sendBackRawRequest: config.SendBackRawRequest,
Expand Down Expand Up @@ -566,7 +569,7 @@ func (provider *AnthropicProvider) ChatCompletionStream(ctx *schemas.BifrostCont
// Use shared Anthropic streaming logic
return HandleAnthropicChatCompletionStreaming(
ctx,
provider.client,
provider.streamingClient,
provider.buildRequestURL(ctx, "/v1/messages", schemas.ChatCompletionStreamRequest),
jsonData,
headers,
Expand Down Expand Up @@ -1018,7 +1021,7 @@ func (provider *AnthropicProvider) ResponsesStream(ctx *schemas.BifrostContext,

return HandleAnthropicResponsesStream(
ctx,
provider.client,
provider.streamingClient,
provider.buildRequestURL(ctx, "/v1/messages", schemas.ResponsesStreamRequest),
jsonBody,
headers,
Expand Down Expand Up @@ -2622,7 +2625,7 @@ func (provider *AnthropicProvider) PassthroughStream(

fasthttpReq.SetBody(req.Body)

activeClient := providerUtils.PrepareResponseStreaming(ctx, provider.client, resp)
activeClient := providerUtils.PrepareResponseStreaming(ctx, provider.streamingClient, resp)
if err := activeClient.Do(fasthttpReq, resp); err != nil {
providerUtils.ReleaseStreamingResponse(resp)
if errors.Is(err, context.Canceled) {
Expand Down
74 changes: 22 additions & 52 deletions core/providers/anthropic/chat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,72 +378,42 @@ func TestToBifrostChatResponse_MultipleTextBlocksWithThinking(t *testing.T) {
t.Fatal("expected non-nil result")
}

// Content should be a combined string, not blocks
// With multiple text blocks, ToBifrostChatResponse preserves them as ContentBlocks
// (only a single text block collapses to ContentStr — see chat.go:812-815).
// Thinking flows through ReasoningDetails below, not ContentStr.
choice := result.Choices[0]
msg := choice.ChatNonStreamResponseChoice.Message
if msg.Content.ContentBlocks != nil {
t.Error("expected ContentBlocks to be nil (combined into string)")
if msg.Content.ContentStr != nil {
t.Errorf("expected ContentStr to be nil with multiple text blocks, got %q", *msg.Content.ContentStr)
}
if msg.Content.ContentStr == nil {
t.Fatal("expected ContentStr to be non-nil")
if len(msg.Content.ContentBlocks) != 2 {
t.Fatalf("expected 2 content blocks (one per text block), got %d", len(msg.Content.ContentBlocks))
}

// Combined string: thinking first, then text blocks
expected := thinkingText + "\n\n" + textBlock1 + "\n\n" + textBlock2
if *msg.Content.ContentStr != expected {
t.Errorf("expected combined content:\n%s\ngot:\n%s", expected, *msg.Content.ContentStr)
if msg.Content.ContentBlocks[0].Text == nil || *msg.Content.ContentBlocks[0].Text != textBlock1 {
t.Errorf("block 0 text mismatch: got %v, want %q", msg.Content.ContentBlocks[0].Text, textBlock1)
}
if msg.Content.ContentBlocks[1].Text == nil || *msg.Content.ContentBlocks[1].Text != textBlock2 {
t.Errorf("block 1 text mismatch: got %v, want %q", msg.Content.ContentBlocks[1].Text, textBlock2)
}

// Reasoning field should still have thinking text
// Thinking is surfaced via ReasoningDetails with the signature preserved
// (see chat.go:798-807).
if msg.ChatAssistantMessage == nil {
t.Fatal("expected ChatAssistantMessage to be non-nil")
}
if msg.ChatAssistantMessage.Reasoning == nil {
t.Fatal("expected Reasoning to be non-nil")
}

// ReasoningDetails should have: signature-only thinking entry + content blocks boundary
rd := msg.ChatAssistantMessage.ReasoningDetails
if len(rd) < 2 {
t.Fatalf("expected at least 2 reasoning details entries, got %d", len(rd))
if len(rd) != 1 {
t.Fatalf("expected 1 reasoning details entry (the thinking block), got %d", len(rd))
}

// First entry: thinking with signature, no text (text was cleared)
if rd[0].Type != schemas.BifrostReasoningDetailsTypeText {
t.Errorf("expected first reasoning detail type %s, got %s", schemas.BifrostReasoningDetailsTypeText, rd[0].Type)
t.Errorf("expected reasoning detail type %s, got %s", schemas.BifrostReasoningDetailsTypeText, rd[0].Type)
}
if rd[0].Signature == nil || *rd[0].Signature != signature {
t.Error("expected signature to be preserved")
}
if rd[0].Text != nil {
t.Error("expected thinking text to be nil (cleared to avoid duplication)")
}

// Last entry: content blocks boundary
lastRD := rd[len(rd)-1]
if lastRD.Type != schemas.BifrostReasoningDetailsTypeContentBlocks {
t.Errorf("expected last reasoning detail type %s, got %s", schemas.BifrostReasoningDetailsTypeContentBlocks, lastRD.Type)
}
if lastRD.Text == nil {
t.Fatal("expected content blocks metadata to be non-nil")
}

// var meta []contentBlockMeta
// if err := json.Unmarshal([]byte(*lastRD.Text), &meta); err != nil {
// t.Fatalf("failed to unmarshal block metadata: %v", err)
// }
// if len(meta) != 3 {
// t.Fatalf("expected 3 block metadata entries, got %d", len(meta))
// }
// if meta[0].T != "thinking" || meta[0].L != len(thinkingText) {
// t.Errorf("block 0: expected thinking/%d, got %s/%d", len(thinkingText), meta[0].T, meta[0].L)
// }
// if meta[1].T != "text" || meta[1].L != len(textBlock1) {
// t.Errorf("block 1: expected text/%d, got %s/%d", len(textBlock1), meta[1].T, meta[1].L)
// }
// if meta[2].T != "text" || meta[2].L != len(textBlock2) {
// t.Errorf("block 2: expected text/%d, got %s/%d", len(textBlock2), meta[2].T, meta[2].L)
// }
t.Error("expected thinking signature to be preserved on reasoning detail")
}
if rd[0].Text == nil || *rd[0].Text != thinkingText {
t.Errorf("expected reasoning text to match thinking text")
}
}

func TestToBifrostChatResponse_SingleTextBlockNoThinking(t *testing.T) {
Expand Down
25 changes: 14 additions & 11 deletions core/providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ const DefaultAzureScope = "https://cognitiveservices.azure.com/.default"

// AzureProvider implements the Provider interface for Azure's API.
type AzureProvider struct {
logger schemas.Logger // Logger for provider operations
client *fasthttp.Client // HTTP client for API requests
networkConfig schemas.NetworkConfig // Network configuration including extra headers
logger schemas.Logger // Logger for provider operations
client *fasthttp.Client // HTTP client for unary API requests (ReadTimeout bounds overall response)
streamingClient *fasthttp.Client // HTTP client for streaming API requests (no ReadTimeout; idle governed by NewIdleTimeoutReader)
networkConfig schemas.NetworkConfig // Network configuration including extra headers

credentials sync.Map // map of tenant ID:client ID to azcore.TokenCredential
sendBackRawRequest bool // Whether to include raw request in BifrostResponse
Expand Down Expand Up @@ -184,9 +185,11 @@ func NewAzureProvider(config *schemas.ProviderConfig, logger schemas.Logger) (*A
client = providerUtils.ConfigureProxy(client, config.ProxyConfig, logger)
client = providerUtils.ConfigureDialer(client)
client = providerUtils.ConfigureTLS(client, config.NetworkConfig, logger)
streamingClient := providerUtils.BuildStreamingClient(client)
return &AzureProvider{
logger: logger,
client: client,
streamingClient: streamingClient,
Comment thread
akshaydeo marked this conversation as resolved.
networkConfig: config.NetworkConfig,
sendBackRawRequest: config.SendBackRawRequest,
sendBackRawResponse: config.SendBackRawResponse,
Expand Down Expand Up @@ -483,7 +486,7 @@ func (provider *AzureProvider) TextCompletionStream(ctx *schemas.BifrostContext,

return openai.HandleOpenAITextCompletionStreaming(
ctx,
provider.client,
provider.streamingClient,
url,
request,
authHeader,
Expand Down Expand Up @@ -628,7 +631,7 @@ func (provider *AzureProvider) ChatCompletionStream(ctx *schemas.BifrostContext,
// Use shared streaming logic from Anthropic
return anthropic.HandleAnthropicChatCompletionStreaming(
ctx,
provider.client,
provider.streamingClient,
url,
jsonData,
authHeader,
Expand All @@ -655,7 +658,7 @@ func (provider *AzureProvider) ChatCompletionStream(ctx *schemas.BifrostContext,
// Use shared streaming logic from OpenAI
return openai.HandleOpenAIChatCompletionStreaming(
ctx,
provider.client,
provider.streamingClient,
url,
request,
authHeader,
Expand Down Expand Up @@ -781,7 +784,7 @@ func (provider *AzureProvider) ResponsesStream(ctx *schemas.BifrostContext, post
// Use shared streaming logic from Anthropic
return anthropic.HandleAnthropicResponsesStream(
ctx,
provider.client,
provider.streamingClient,
url,
jsonData,
authHeader,
Expand All @@ -804,7 +807,7 @@ func (provider *AzureProvider) ResponsesStream(ctx *schemas.BifrostContext, post
// Use shared streaming logic from OpenAI
return openai.HandleOpenAIResponsesStreaming(
ctx,
provider.client,
provider.streamingClient,
url,
request,
authHeader,
Expand Down Expand Up @@ -1320,7 +1323,7 @@ func (provider *AzureProvider) ImageGenerationStream(
// Azure is OpenAI-compatible
return openai.HandleOpenAIImageGenerationStreaming(
ctx,
provider.client,
provider.streamingClient,
url,
request,
authHeader,
Expand Down Expand Up @@ -1391,7 +1394,7 @@ func (provider *AzureProvider) ImageEditStream(ctx *schemas.BifrostContext, post
// Azure is OpenAI-compatible
return openai.HandleOpenAIImageEditStreamRequest(
ctx,
provider.client,
provider.streamingClient,
url,
request,
authHeader,
Expand Down Expand Up @@ -2797,7 +2800,7 @@ func (provider *AzureProvider) PassthroughStream(

fasthttpReq.SetBody(req.Body)

activeClient := providerUtils.PrepareResponseStreaming(ctx, provider.client, resp)
activeClient := providerUtils.PrepareResponseStreaming(ctx, provider.streamingClient, resp)
providerUtils.SetStreamIdleTimeoutIfEmpty(ctx, provider.networkConfig.StreamIdleTimeoutInSeconds)

startTime := time.Now()
Expand Down
Loading
Loading