Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/providers/anthropic/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,12 @@ func filterEnumValuesByType(enumValues []interface{}, schemaType string) []inter
return filtered
}

// NormalizeSchemaForAnthropic is the exported entry point for normalizeSchemaForAnthropic,
// used by providers (e.g. Bedrock) that share Anthropic's schema validation rules.
func NormalizeSchemaForAnthropic(schema map[string]interface{}) map[string]interface{} {
return normalizeSchemaForAnthropic(schema)
}

// normalizeSchemaForAnthropic recursively normalizes a JSON schema to be compatible with Anthropic's API.
// This handles cases where:
// 1. type is an array like ["string", "null"] - converted to single type
Expand Down
4 changes: 4 additions & 0 deletions core/providers/bedrock/bedrock.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ func isStreamTransportError(err error) bool {
if errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
var checksumErr eventstream.ChecksumError
if errors.As(err, &checksumErr) {
return true
}
var opErr *net.OpError
var dnsErr *net.DNSError
return errors.As(err, &opErr) || errors.As(err, &dnsErr)
Expand Down
1 change: 1 addition & 0 deletions core/providers/bedrock/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,7 @@ func ToBedrockResponsesRequest(ctx *schemas.BifrostContext, bifrostReq *schemas.
bedrockReq.AdditionalModelRequestFields = schemas.NewOrderedMap()
}
setOutputConfigField(bedrockReq.AdditionalModelRequestFields, "format", anthropicOutputFormat)
appendAnthropicBetaToFields(bedrockReq.AdditionalModelRequestFields, anthropic.AnthropicStructuredOutputsBetaHeader)
}
// Defer synthetic tool injection until after normal tool/tool_choice conversion
// so the structured-output tool is not overwritten by the later pass.
Expand Down
48 changes: 46 additions & 2 deletions core/providers/bedrock/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func convertChatParameters(ctx *schemas.BifrostContext, bifrostReq *schemas.Bifr
bedrockReq.AdditionalModelRequestFields = schemas.NewOrderedMap()
}
setOutputConfigField(bedrockReq.AdditionalModelRequestFields, "format", anthropicOutputFormat)
// The outer HTTP anthropic-beta header is consumed by Bedrock's edge and not forwarded
// to the underlying Claude model, so the beta value must also live in
// additionalModelRequestFields for the model to recognise output_config.format.
appendAnthropicBetaToFields(bedrockReq.AdditionalModelRequestFields, anthropic.AnthropicStructuredOutputsBetaHeader)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// Filter provider-unsupported server tools once; both convertToolConfig and
Expand Down Expand Up @@ -114,8 +118,8 @@ func convertChatParameters(ctx *schemas.BifrostContext, bifrostReq *schemas.Bifr
bedrockReq.AdditionalModelRequestFields = schemas.NewOrderedMap()
}
bedrockReq.AdditionalModelRequestFields.Set("tools", serverTools)
if len(betaHeaders) > 0 {
bedrockReq.AdditionalModelRequestFields.Set("anthropic_beta", betaHeaders)
for _, h := range betaHeaders {
appendAnthropicBetaToFields(bedrockReq.AdditionalModelRequestFields, h)
}
// Skip the tunneled tool_choice when response_format forces the synthetic
// bf_so_* tool at lines 263-275 below; otherwise Bedrock receives two
Expand Down Expand Up @@ -479,12 +483,52 @@ func mergeOrderedMapInto(dst, src *schemas.OrderedMap) {
}

func newAnthropicOutputFormatOrderedMap(schemaObj any) *schemas.OrderedMap {
// Normalize multi-type arrays (["string","null"], ["string","integer"]) into anyOf branches
// so Bedrock's schema validator accepts them. Pure in-memory map ops; no JSON round-trips.
// OrderedMap schemas are passed through unchanged.
if m, ok := schemaObj.(map[string]interface{}); ok {
schemaObj = anthropic.NormalizeSchemaForAnthropic(m)
}
return schemas.NewOrderedMapFromPairs(
schemas.KV("type", "json_schema"),
schemas.KV("schema", schemaObj),
)
}

// appendAnthropicBetaToFields merges a single beta header value into
// additionalModelRequestFields.anthropic_beta without creating duplicates.
// This is needed for Bedrock: the outer HTTP anthropic-beta header is consumed
// by Bedrock's edge and NOT forwarded to the underlying Claude model; the value
// must live in additionalModelRequestFields so Bedrock passes it through.
func appendAnthropicBetaToFields(fields *schemas.OrderedMap, header string) {
if fields == nil || header == "" {
return
}
var existing []string
if raw, ok := fields.Get("anthropic_beta"); ok {
switch v := raw.(type) {
case []string:
existing = v
case []interface{}:
for _, item := range v {
if s, ok := item.(string); ok {
existing = append(existing, s)
}
}
case string:
if v != "" {
existing = []string{v}
}
}
}
for _, h := range existing {
if h == header {
return
}
}
fields.Set("anthropic_beta", append(existing, header))
}

// ensureChatToolConfigForConversation ensures toolConfig is present when tool content exists
func ensureChatToolConfigForConversation(bifrostReq *schemas.BifrostChatRequest, bedrockReq *BedrockConverseRequest) {
if bedrockReq.ToolConfig != nil {
Expand Down
7 changes: 4 additions & 3 deletions core/schemas/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,10 @@ type KeyAttemptRecord struct {
// RoutingEngineLogEntry represents a log entry from a routing engine
// format: [timestamp] [engine] - message
type RoutingEngineLogEntry struct {
Engine string // e.g., "governance", "routing-rule", "openrouter"
Message string // Human-readable decision/action message
Timestamp int64 // Unix milliseconds
Engine string `json:"engine"` // e.g., "governance", "routing-rule", "openrouter"
Level LogLevel `json:"level"`
Message string `json:"message"` // Human-readable decision/action message
Timestamp int64 `json:"timestamp"` // Unix milliseconds
}
Comment thread
akshaydeo marked this conversation as resolved.

// PluginLogEntry represents a structured log entry emitted by a plugin via ctx.Log().
Expand Down
3 changes: 2 additions & 1 deletion core/schemas/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,10 @@ func (bc *BifrostContext) GetParentCtxWithUserValues() context.Context {
// - ctx: The Bifrost context
// - engineName: Name of the routing engine (e.g., "governance", "routing-rule")
// - message: Human-readable log message describing the decision/action
func (bc *BifrostContext) AppendRoutingEngineLog(engineName string, message string) {
func (bc *BifrostContext) AppendRoutingEngineLog(engineName string, level LogLevel, message string) {
entry := RoutingEngineLogEntry{
Engine: engineName,
Level: level,
Message: message,
Timestamp: time.Now().UnixMilli(),
}
Expand Down
24 changes: 12 additions & 12 deletions plugins/governance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,12 +652,12 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req
}
}

ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Loading balance provider for model %s", modelStr))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Loading balance provider for model %s", modelStr))

// Get provider configs for this virtual key
providerConfigs := virtualKey.ProviderConfigs
if len(providerConfigs) == 0 {
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("No provider configs on virtual key %s for model %s, skipping load balancing", virtualKey.Name, modelStr))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelWarn, fmt.Sprintf("No provider configs on virtual key %s for model %s, skipping load balancing", virtualKey.Name, modelStr))
// No provider configs, continue without modification
return body, nil
}
Expand All @@ -667,7 +667,7 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req
configuredProviders = append(configuredProviders, pc.Provider)
}
p.logger.Debug("[Governance] Virtual key has %d provider configs: %v", len(providerConfigs), configuredProviders)
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Load balancing model %s across %d configured providers: %v", modelStr, len(providerConfigs), configuredProviders))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Load balancing model %s across %d configured providers: %v", modelStr, len(providerConfigs), configuredProviders))

allowedProviderConfigs := make([]configstoreTables.TableVirtualKeyProviderConfig, 0)
for _, config := range providerConfigs {
Expand All @@ -692,16 +692,16 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req
if isProviderAllowed {
// Check if the provider's budget or rate limits are violated using resolver helper methods
if p.resolver.isProviderBudgetViolated(ctx, virtualKey, config) {
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Provider %s excluded: budget limit violated", config.Provider))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Provider %s excluded: budget limit violated", config.Provider))
continue
}
if p.resolver.isProviderRateLimitViolated(ctx, virtualKey, config) {
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Provider %s excluded: rate limit violated", config.Provider))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Provider %s excluded: rate limit violated", config.Provider))
continue
}
allowedProviderConfigs = append(allowedProviderConfigs, config)
} else {
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Provider %s excluded: model %s not in allowed models list", config.Provider, modelStr))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Provider %s excluded: model %s not in allowed models list", config.Provider, modelStr))
}
}

Expand All @@ -710,10 +710,10 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req
allowedProviders = append(allowedProviders, pc.Provider)
}
p.logger.Debug("[Governance] Allowed providers after filtering: %v", allowedProviders)
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Allowed providers after filtering: %v", allowedProviders))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Allowed providers after filtering: %v", allowedProviders))

if len(allowedProviderConfigs) == 0 {
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("No eligible providers remaining after filtering for model %s, skipping load balancing", modelStr))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("No eligible providers remaining after filtering for model %s, skipping load balancing", modelStr))
// TODO: Send proper error if (overall VK budget/rate limit) or (all provider budgets/rate limits) are violated
// No allowed provider configs, continue without modification
return body, nil
Expand Down Expand Up @@ -755,7 +755,7 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req
}

p.logger.Debug("[Governance] Selected provider: %s", selectedProvider)
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Selected provider %s for model %s (from %d eligible: %v)", selectedProvider, modelStr, len(allowedProviderConfigs), allowedProviders))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Selected provider %s for model %s (from %d eligible: %v)", selectedProvider, modelStr, len(allowedProviderConfigs), allowedProviders))

// For genai integration, model is present in URL path instead of the request body
if isGeminiPath {
Expand Down Expand Up @@ -810,7 +810,7 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req

// Add fallbacks to request body
body["fallbacks"] = fallbacks
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Added %d fallback providers: %v", len(fallbacks), fallbacks))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Added %d fallback providers: %v", len(fallbacks), fallbacks))
}

return body, nil
Expand Down Expand Up @@ -896,13 +896,13 @@ func (p *GovernancePlugin) applyRoutingRules(ctx *schemas.BifrostContext, req *s

p.logger.Debug("[HTTPTransport] Built routing context: provider=%s, model=%s, requestType=%s, vk=%v, headerCount=%d, paramCount=%d",
provider, model, requestType, virtualKey != nil, len(req.Headers), len(req.Query))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Evaluating routing rules for model=%s, provider=%s, requestType=%s", model, provider, requestType))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelInfo, fmt.Sprintf("Evaluating routing rules for model=%s, provider=%s, requestType=%s", model, provider, requestType))

// Evaluate routing rules
decision, err := p.engine.EvaluateRoutingRules(ctx, routingCtx)
if err != nil {
p.logger.Error("failed to evaluate routing rules: %v", err)
ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Routing rule evaluation error: %v", err))
ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelError, fmt.Sprintf("Routing rule evaluation error: %v", err))
return body, nil, nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Expand Down
Loading
Loading