Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,18 @@ func NewProvider(config schemas.ProviderConfig) (*Provider, error) {
MaxConnsPerHost: config.NetworkConfig.MaxConnsPerHost, // configurable, default 5000
MaxIdleConnDuration: 30 * time.Second,
}
return &Provider{client: client, ...}, nil
// After ConfigureProxy/ConfigureDialer/ConfigureTLS, build a sibling client
// for streaming. BuildStreamingClient zeros ReadTimeout/WriteTimeout/MaxConnDuration
// so streams aren't killed by fasthttp's whole-response deadline; per-chunk idle
// is enforced at the app layer via NewIdleTimeoutReader.
streamingClient := providerUtils.BuildStreamingClient(client)
return &Provider{client: client, streamingClient: streamingClient, ...}, nil
}
```
**Note:** Bedrock uses `net/http` (not fasthttp) with HTTP/2 support. Its `http.Transport` is configured with `ForceAttemptHTTP2: true` and `MaxConnsPerHost` from `NetworkConfig` to allow multiple HTTP/2 connections when the server's per-connection stream limit (100 for AWS Bedrock) is reached.

**Streaming vs unary client:** Every provider holds two clients — `client` for unary requests (`ReadTimeout=30s` bounds the whole response) and `streamingClient` for SSE / EventStream / chunked paths (`ReadTimeout=0`; the per-chunk `NewIdleTimeoutReader` is the only governor). Pass `provider.streamingClient` to every `Handle*Streaming` / `Handle*StreamRequest` helper and to direct `Do` calls inside `*Stream` methods. For new providers, apply the same pattern — missing the switch means streams get killed at 30s.

**Note:** Bedrock uses `net/http` (not fasthttp) with HTTP/2 support. Its `http.Transport` is configured with `ForceAttemptHTTP2: true` and `MaxConnsPerHost` from `NetworkConfig` to allow multiple HTTP/2 connections when the server's per-connection stream limit (100 for AWS Bedrock) is reached. Use `providerUtils.BuildStreamingHTTPClient(client)` to derive the streaming variant — it shares the base `Transport` (safe for concurrent reuse) but clears `Client.Timeout`.

### The Provider Interface

Expand Down Expand Up @@ -509,6 +517,21 @@ In `tests/e2e/core/`, **never marshal API payloads to a `Record`/`Map`/plain-obj

## Testing

### Always prefer `make test-core` over raw `go test` for provider-level tests

The `make test-core` target is the canonical harness for provider tests — it wires up env vars from `.env` (provider API keys), invokes the per-provider `{provider}_test.go` entrypoint in `core/providers/<provider>/`, and routes through the shared `core/internal/llmtests/` scenario suite that validates end-to-end behavior (including streaming).

Running bare `go test ./core/providers/<provider>/...` only executes unit tests and skips the llmtests scenarios — so it won't catch regressions in streaming, tool-calling, or provider-specific response shapes.

```bash
make test-core PROVIDER=anthropic TESTCASE=TestChatCompletionStream # exact test
make test-core PROVIDER=openai PATTERN=Stream # substring match
make test-core PROVIDER=bedrock # all scenarios for one provider
make test-core DEBUG=1 PROVIDER=gemini TESTCASE=TestResponsesStream # attach Delve on :2345
```

`PATTERN` and `TESTCASE` are mutually exclusive. Provider name must match a directory under `core/providers/` (e.g. `anthropic`, `openai`, `bedrock`, `vertex`, `azure`, `gemini`, `cohere`, `mistral`, `groq`, etc.).

### LLM Tests (`core/internal/llmtests/`)

Scenario-based tests that run against **live provider APIs** with dual-API testing (Chat Completions + Responses API):
Expand Down Expand Up @@ -648,6 +671,7 @@ Systematically address unresolved PR review comments. Uses GraphQL to get unreso
- **Converter functions**: Pure — no side effects, no logging, no HTTP.
- **Pool names**: Descriptive string passed to `pool.New()` (e.g., `"channel-message"`, `"response-stream"`).
- **Context keys**: Use `BifrostContextKey` type. Custom plugins should define their own key types to avoid collisions.
- **Go filenames**: No underscores. The only permitted underscore is the `_test.go` suffix. Examples: `pluginpipeline.go`, `pluginpipeline_test.go` — never `plugin_pipeline.go` or `plugin_pipeline_race_test.go`. Concatenate words (lowercase, no separators) for multi-word filenames.

# Frontend Code Guidelines & Patterns

Expand Down
254 changes: 170 additions & 84 deletions core/bifrost.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions core/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21/go.mod h1:p+hz+PRAYlY
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.5 h1:clHU5fm//kWS1C2HgtgWxfQbFbx4b6rx+5jzhgX9HrI=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.5/go.mod h1:O3h0IK87yXci+kg6flUKzJnWeziQUKciKrLjcatSNcY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 h1:rWyie/PxDRIdhNf4DzRk0lvjVOqFJuNnO8WwaIRVxzQ=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22/go.mod h1:zd/JsJ4P7oGfUhXn1VyLqaRZwPmZwg44Jf2dS84Dm3Y=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13 h1:JRaIgADQS/U6uXDqlPiefP32yXTda7Kqfx+LgspooZM=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13/go.mod h1:CEuVn5WqOMilYl+tbccq8+N2ieCy0gVn3OtRb0vBNNM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 h1:c31//R3xgIJMSC8S6hEVq+38DcvUlgFY0FM6mSI5oto=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21/go.mod h1:r6+pf23ouCB718FUxaqzZdbpYFyDtehyZcmP5KL9FkA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 h1:ZlvrNcHSFFWURB8avufQq9gFsheUgjVD9536obIknfM=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21/go.mod h1:cv3TNhVrssKR0O/xxLJVRfd2oazSnZnkUeTf6ctUwfQ=
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3 h1:HwxWTbTrIHm5qY+CAEur0s/figc3qwvLWsNkF4RPToo=
github.com/aws/aws-sdk-go-v2/service/s3 v1.97.3/go.mod h1:uoA43SdFwacedBfSgfFSjjCvYe8aYBS7EnU5GZ/YKMM=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.9 h1:QKZH0S178gCmFEgst8hN0mCX1KxLgHBKKY/CLqwP8lg=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.9/go.mod h1:7yuQJoT+OoH8aqIxw9vwF+8KpvLZ8AWmvmUWHsGQZvI=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.15 h1:lFd1+ZSEYJZYvv9d6kXzhkZu07si3f+GQ1AaYwa2LUM=
Expand Down
9 changes: 9 additions & 0 deletions core/internal/llmtests/chat_completion_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@ func RunChatCompletionStreamTest(t *testing.T, client *bifrost.Bifrost, ctx cont
t.Logf("⚠️ Warning: Response ID is empty")
}

// Per-chunk Object validation: bifrost normalizes every streaming chunk
// to the OpenAI shape with Object="chat.completion.chunk", whether the
// upstream provider natively emits it (OpenAI family) or bifrost
// synthesizes it during translation (e.g., Anthropic's type-keyed events).
// A missing/wrong Object here indicates a provider translation regression.
if response.BifrostChatResponse.Object != "chat.completion.chunk" {
t.Errorf("Chunk %d: Object field must be 'chat.completion.chunk', got %q", responseCount+1, response.BifrostChatResponse.Object)
}

// Log latency for each chunk (can be 0 for inter-chunks)
t.Logf("📊 Chunk %d latency: %d ms", responseCount+1, response.BifrostChatResponse.ExtraFields.Latency)

Expand Down
18 changes: 12 additions & 6 deletions core/internal/llmtests/response_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func ValidateChatResponse(t *testing.T, response *schemas.BifrostChatResponse, e
}

// Validate basic structure
validateChatBasicStructure(t, response, expectations, &result)
validateChatBasicStructure(t, response, expectations, &result, scenarioName)

// Validate content
validateChatContent(t, response, expectations, &result)
Expand Down Expand Up @@ -445,11 +445,17 @@ func ValidateCountTokensResponse(t *testing.T, response *schemas.BifrostCountTok
// =============================================================================

// validateChatBasicStructure checks the basic structure of the chat response
func validateChatBasicStructure(t *testing.T, response *schemas.BifrostChatResponse, expectations ResponseExpectations, result *ValidationResult) {
// Check that Object field is not empty (should be "chat.completion" or "chat.completion.chunk")
if response.Object == "" {
result.Passed = false
result.Errors = append(result.Errors, "Object field is empty in chat completion response")
func validateChatBasicStructure(t *testing.T, response *schemas.BifrostChatResponse, expectations ResponseExpectations, result *ValidationResult, scenarioName string) {
// Object is a constant bifrost schema marker ("chat.completion" / "chat.completion.chunk").
// For streaming scenarios, per-chunk validation in chat_completion_stream.go covers this —
// the aggregated/consolidated response built by the harness is a synthetic construct and
// does not carry provider-originating semantics. Skip the check there to avoid asserting
// that the harness remembered to copy a constant forward.
if !strings.Contains(scenarioName, "Stream") {
if response.Object == "" {
result.Passed = false
result.Errors = append(result.Errors, "Object field is empty in chat completion response")
}
}

// Check choice count
Expand Down
77 changes: 77 additions & 0 deletions core/pluginpipelinerace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package bifrost

import (
"context"
"fmt"
"sync"
"testing"
"time"

schemas "github.com/maximhq/bifrost/core/schemas"
)

// TestPluginPipelineStreamingRace reproduces the production panic:
//
// fatal error: concurrent map read and map write
// (*PluginPipeline).FinalizeStreamingPostHookSpans
//
// It hammers accumulatePluginTiming (per-chunk writer) concurrently with
// FinalizeStreamingPostHookSpans (end-of-stream reader) and resetPluginPipeline
// (pool-release writer). Before the streamingMu fix these three paths had no
// synchronisation and the -race detector / runtime map check would trip
// immediately. Run with: go test -race -run PluginPipelineStreamingRace
func TestPluginPipelineStreamingRace(t *testing.T) {
p := &PluginPipeline{
logger: NewDefaultLogger(schemas.LogLevelError),
tracer: &schemas.NoOpTracer{},
}

const writers = 8
const iterations = 2000

var wg sync.WaitGroup

// Per-chunk accumulator writers — simulate multiple plugins accumulating
// timing for every streamed chunk.
for w := 0; w < writers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
pluginName := fmt.Sprintf("plugin-%d", id%3) // a few distinct plugin keys
for i := 0; i < iterations; i++ {
p.accumulatePluginTiming(pluginName, time.Microsecond, i%17 == 0)
}
}(w)
}

// End-of-stream finalizer racing with writers.
wg.Add(1)
go func() {
defer wg.Done()
ctx := context.Background()
for i := 0; i < iterations/10; i++ {
p.FinalizeStreamingPostHookSpans(ctx)
}
}()

// resetPluginPipeline racing with writers — simulates the pool returning
// the pipeline to another request mid-flight.
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < iterations/10; i++ {
p.resetPluginPipeline()
}
}()

// Concurrent GetChunkCount readers.
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < iterations; i++ {
_ = p.GetChunkCount()
}
}()

wg.Wait()
}
Loading
Loading