diff --git a/core/providers/anthropic/utils.go b/core/providers/anthropic/utils.go index 268a332a2c..473190d405 100644 --- a/core/providers/anthropic/utils.go +++ b/core/providers/anthropic/utils.go @@ -1952,6 +1952,12 @@ func filterEnumValuesByType(enumValues []interface{}, schemaType string) []inter return filtered } +// NormalizeSchemaForAnthropic is the exported entry point for normalizeSchemaForAnthropic, +// used by providers (e.g. Bedrock) that share Anthropic's schema validation rules. +func NormalizeSchemaForAnthropic(schema map[string]interface{}) map[string]interface{} { + return normalizeSchemaForAnthropic(schema) +} + // normalizeSchemaForAnthropic recursively normalizes a JSON schema to be compatible with Anthropic's API. // This handles cases where: // 1. type is an array like ["string", "null"] - converted to single type diff --git a/core/providers/bedrock/bedrock.go b/core/providers/bedrock/bedrock.go index 7e64a13e36..571a02613a 100644 --- a/core/providers/bedrock/bedrock.go +++ b/core/providers/bedrock/bedrock.go @@ -159,6 +159,10 @@ func isStreamTransportError(err error) bool { if errors.Is(err, io.ErrUnexpectedEOF) { return true } + var checksumErr eventstream.ChecksumError + if errors.As(err, &checksumErr) { + return true + } var opErr *net.OpError var dnsErr *net.DNSError return errors.As(err, &opErr) || errors.As(err, &dnsErr) diff --git a/core/providers/bedrock/responses.go b/core/providers/bedrock/responses.go index 41c2fc248c..d66633293f 100644 --- a/core/providers/bedrock/responses.go +++ b/core/providers/bedrock/responses.go @@ -1845,6 +1845,7 @@ func ToBedrockResponsesRequest(ctx *schemas.BifrostContext, bifrostReq *schemas. bedrockReq.AdditionalModelRequestFields = schemas.NewOrderedMap() } setOutputConfigField(bedrockReq.AdditionalModelRequestFields, "format", anthropicOutputFormat) + appendAnthropicBetaToFields(bedrockReq.AdditionalModelRequestFields, anthropic.AnthropicStructuredOutputsBetaHeader) } // Defer synthetic tool injection until after normal tool/tool_choice conversion // so the structured-output tool is not overwritten by the later pass. diff --git a/core/providers/bedrock/utils.go b/core/providers/bedrock/utils.go index 4eb48452a0..4bf9a304e7 100644 --- a/core/providers/bedrock/utils.go +++ b/core/providers/bedrock/utils.go @@ -83,6 +83,10 @@ func convertChatParameters(ctx *schemas.BifrostContext, bifrostReq *schemas.Bifr bedrockReq.AdditionalModelRequestFields = schemas.NewOrderedMap() } setOutputConfigField(bedrockReq.AdditionalModelRequestFields, "format", anthropicOutputFormat) + // The outer HTTP anthropic-beta header is consumed by Bedrock's edge and not forwarded + // to the underlying Claude model, so the beta value must also live in + // additionalModelRequestFields for the model to recognise output_config.format. + appendAnthropicBetaToFields(bedrockReq.AdditionalModelRequestFields, anthropic.AnthropicStructuredOutputsBetaHeader) } // Filter provider-unsupported server tools once; both convertToolConfig and @@ -114,8 +118,8 @@ func convertChatParameters(ctx *schemas.BifrostContext, bifrostReq *schemas.Bifr bedrockReq.AdditionalModelRequestFields = schemas.NewOrderedMap() } bedrockReq.AdditionalModelRequestFields.Set("tools", serverTools) - if len(betaHeaders) > 0 { - bedrockReq.AdditionalModelRequestFields.Set("anthropic_beta", betaHeaders) + for _, h := range betaHeaders { + appendAnthropicBetaToFields(bedrockReq.AdditionalModelRequestFields, h) } // Skip the tunneled tool_choice when response_format forces the synthetic // bf_so_* tool at lines 263-275 below; otherwise Bedrock receives two @@ -479,12 +483,52 @@ func mergeOrderedMapInto(dst, src *schemas.OrderedMap) { } func newAnthropicOutputFormatOrderedMap(schemaObj any) *schemas.OrderedMap { + // Normalize multi-type arrays (["string","null"], ["string","integer"]) into anyOf branches + // so Bedrock's schema validator accepts them. Pure in-memory map ops; no JSON round-trips. + // OrderedMap schemas are passed through unchanged. + if m, ok := schemaObj.(map[string]interface{}); ok { + schemaObj = anthropic.NormalizeSchemaForAnthropic(m) + } return schemas.NewOrderedMapFromPairs( schemas.KV("type", "json_schema"), schemas.KV("schema", schemaObj), ) } +// appendAnthropicBetaToFields merges a single beta header value into +// additionalModelRequestFields.anthropic_beta without creating duplicates. +// This is needed for Bedrock: the outer HTTP anthropic-beta header is consumed +// by Bedrock's edge and NOT forwarded to the underlying Claude model; the value +// must live in additionalModelRequestFields so Bedrock passes it through. +func appendAnthropicBetaToFields(fields *schemas.OrderedMap, header string) { + if fields == nil || header == "" { + return + } + var existing []string + if raw, ok := fields.Get("anthropic_beta"); ok { + switch v := raw.(type) { + case []string: + existing = v + case []interface{}: + for _, item := range v { + if s, ok := item.(string); ok { + existing = append(existing, s) + } + } + case string: + if v != "" { + existing = []string{v} + } + } + } + for _, h := range existing { + if h == header { + return + } + } + fields.Set("anthropic_beta", append(existing, header)) +} + // ensureChatToolConfigForConversation ensures toolConfig is present when tool content exists func ensureChatToolConfigForConversation(bifrostReq *schemas.BifrostChatRequest, bedrockReq *BedrockConverseRequest) { if bedrockReq.ToolConfig != nil { diff --git a/core/schemas/bifrost.go b/core/schemas/bifrost.go index e43ba0ccbf..12ea4b2b07 100644 --- a/core/schemas/bifrost.go +++ b/core/schemas/bifrost.go @@ -315,9 +315,10 @@ type KeyAttemptRecord struct { // RoutingEngineLogEntry represents a log entry from a routing engine // format: [timestamp] [engine] - message type RoutingEngineLogEntry struct { - Engine string // e.g., "governance", "routing-rule", "openrouter" - Message string // Human-readable decision/action message - Timestamp int64 // Unix milliseconds + Engine string `json:"engine"` // e.g., "governance", "routing-rule", "openrouter" + Level LogLevel `json:"level"` + Message string `json:"message"` // Human-readable decision/action message + Timestamp int64 `json:"timestamp"` // Unix milliseconds } // PluginLogEntry represents a structured log entry emitted by a plugin via ctx.Log(). diff --git a/core/schemas/context.go b/core/schemas/context.go index 68ac7c435e..5c2760b065 100644 --- a/core/schemas/context.go +++ b/core/schemas/context.go @@ -359,9 +359,10 @@ func (bc *BifrostContext) GetParentCtxWithUserValues() context.Context { // - ctx: The Bifrost context // - engineName: Name of the routing engine (e.g., "governance", "routing-rule") // - message: Human-readable log message describing the decision/action -func (bc *BifrostContext) AppendRoutingEngineLog(engineName string, message string) { +func (bc *BifrostContext) AppendRoutingEngineLog(engineName string, level LogLevel, message string) { entry := RoutingEngineLogEntry{ Engine: engineName, + Level: level, Message: message, Timestamp: time.Now().UnixMilli(), } diff --git a/plugins/governance/allow_on_all_virtual_keys_test.go b/plugins/governance/allowonallvirtualkeys_test.go similarity index 100% rename from plugins/governance/allow_on_all_virtual_keys_test.go rename to plugins/governance/allowonallvirtualkeys_test.go diff --git a/plugins/governance/http_transport_prehook_test.go b/plugins/governance/httptransportprehook_test.go similarity index 100% rename from plugins/governance/http_transport_prehook_test.go rename to plugins/governance/httptransportprehook_test.go diff --git a/plugins/governance/main.go b/plugins/governance/main.go index 1a244b9ebf..737af98c87 100644 --- a/plugins/governance/main.go +++ b/plugins/governance/main.go @@ -652,12 +652,12 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req } } - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Loading balance provider for model %s", modelStr)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Loading balance provider for model %s", modelStr)) // Get provider configs for this virtual key providerConfigs := virtualKey.ProviderConfigs if len(providerConfigs) == 0 { - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("No provider configs on virtual key %s for model %s, skipping load balancing", virtualKey.Name, modelStr)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelWarn, fmt.Sprintf("No provider configs on virtual key %s for model %s, skipping load balancing", virtualKey.Name, modelStr)) // No provider configs, continue without modification return body, nil } @@ -667,7 +667,7 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req configuredProviders = append(configuredProviders, pc.Provider) } p.logger.Debug("[Governance] Virtual key has %d provider configs: %v", len(providerConfigs), configuredProviders) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Load balancing model %s across %d configured providers: %v", modelStr, len(providerConfigs), configuredProviders)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Load balancing model %s across %d configured providers: %v", modelStr, len(providerConfigs), configuredProviders)) allowedProviderConfigs := make([]configstoreTables.TableVirtualKeyProviderConfig, 0) for _, config := range providerConfigs { @@ -692,16 +692,16 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req if isProviderAllowed { // Check if the provider's budget or rate limits are violated using resolver helper methods if p.resolver.isProviderBudgetViolated(ctx, virtualKey, config) { - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Provider %s excluded: budget limit violated", config.Provider)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Provider %s excluded: budget limit violated", config.Provider)) continue } if p.resolver.isProviderRateLimitViolated(ctx, virtualKey, config) { - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Provider %s excluded: rate limit violated", config.Provider)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Provider %s excluded: rate limit violated", config.Provider)) continue } allowedProviderConfigs = append(allowedProviderConfigs, config) } else { - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Provider %s excluded: model %s not in allowed models list", config.Provider, modelStr)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Provider %s excluded: model %s not in allowed models list", config.Provider, modelStr)) } } @@ -710,10 +710,10 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req allowedProviders = append(allowedProviders, pc.Provider) } p.logger.Debug("[Governance] Allowed providers after filtering: %v", allowedProviders) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Allowed providers after filtering: %v", allowedProviders)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Allowed providers after filtering: %v", allowedProviders)) if len(allowedProviderConfigs) == 0 { - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("No eligible providers remaining after filtering for model %s, skipping load balancing", modelStr)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("No eligible providers remaining after filtering for model %s, skipping load balancing", modelStr)) // TODO: Send proper error if (overall VK budget/rate limit) or (all provider budgets/rate limits) are violated // No allowed provider configs, continue without modification return body, nil @@ -755,7 +755,7 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req } p.logger.Debug("[Governance] Selected provider: %s", selectedProvider) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Selected provider %s for model %s (from %d eligible: %v)", selectedProvider, modelStr, len(allowedProviderConfigs), allowedProviders)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Selected provider %s for model %s (from %d eligible: %v)", selectedProvider, modelStr, len(allowedProviderConfigs), allowedProviders)) // For genai integration, model is present in URL path instead of the request body if isGeminiPath { @@ -810,7 +810,7 @@ func (p *GovernancePlugin) loadBalanceProvider(ctx *schemas.BifrostContext, req // Add fallbacks to request body body["fallbacks"] = fallbacks - ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, fmt.Sprintf("Added %d fallback providers: %v", len(fallbacks), fallbacks)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineGovernance, schemas.LogLevelInfo, fmt.Sprintf("Added %d fallback providers: %v", len(fallbacks), fallbacks)) } return body, nil @@ -896,13 +896,13 @@ func (p *GovernancePlugin) applyRoutingRules(ctx *schemas.BifrostContext, req *s p.logger.Debug("[HTTPTransport] Built routing context: provider=%s, model=%s, requestType=%s, vk=%v, headerCount=%d, paramCount=%d", provider, model, requestType, virtualKey != nil, len(req.Headers), len(req.Query)) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Evaluating routing rules for model=%s, provider=%s, requestType=%s", model, provider, requestType)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelInfo, fmt.Sprintf("Evaluating routing rules for model=%s, provider=%s, requestType=%s", model, provider, requestType)) // Evaluate routing rules decision, err := p.engine.EvaluateRoutingRules(ctx, routingCtx) if err != nil { p.logger.Error("failed to evaluate routing rules: %v", err) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Routing rule evaluation error: %v", err)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelError, fmt.Sprintf("Routing rule evaluation error: %v", err)) return body, nil, nil } diff --git a/plugins/governance/routing.go b/plugins/governance/routing.go index 4df039f12d..4638f70cab 100644 --- a/plugins/governance/routing.go +++ b/plugins/governance/routing.go @@ -3,7 +3,9 @@ package governance import ( "fmt" "math/rand/v2" + "regexp" "strings" + "sync" "github.com/google/cel-go/cel" "github.com/maximhq/bifrost/core/schemas" @@ -102,12 +104,12 @@ func (re *RoutingEngine) EvaluateRoutingRules(ctx *schemas.BifrostContext, routi maxDepth := *re.chainMaxDepth if chainStep >= maxDepth { re.logger.Warn("[RoutingEngine] Routing rule chain exceeded max depth (%d), stopping", maxDepth) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Chain exceeded max depth (%d) at step %d, stopping. Final resolved: provider=%s, model=%s", maxDepth, chainStep, currentProvider, currentModel)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelWarn, fmt.Sprintf("Chain exceeded max depth (%d) at step %d, stopping. Final resolved: provider=%s, model=%s", maxDepth, chainStep, currentProvider, currentModel)) break } if chainStep > 0 { - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Chain step %d: re-evaluating with provider=%s, model=%s", chainStep, currentProvider, currentModel)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelInfo, fmt.Sprintf("Chain step %d: re-evaluating with provider=%s, model=%s", chainStep, currentProvider, currentModel)) } // Build CEL variables for the current chain step's provider/model. @@ -127,7 +129,7 @@ func (re *RoutingEngine) EvaluateRoutingRules(ctx *schemas.BifrostContext, routi scopeChain := buildScopeChain(routingCtx.VirtualKey) re.logger.Debug("[RoutingEngine] Scope chain (step=%d): %v", chainStep, scopeChainToStrings(scopeChain)) if chainStep == 0 { - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Scope chain: %v", scopeChainToStrings(scopeChain))) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelInfo, fmt.Sprintf("Scope chain: %v", scopeChainToStrings(scopeChain))) } var stepDecision *RoutingDecision @@ -149,7 +151,7 @@ func (re *RoutingEngine) EvaluateRoutingRules(ctx *schemas.BifrostContext, routi for _, r := range rules { ruleNames = append(ruleNames, r.Name) } - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Evaluating scope %s: %d rules [%s]", scope.ScopeName, len(rules), strings.Join(ruleNames, ", "))) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelInfo, fmt.Sprintf("Evaluating scope %s: %d rules [%s]", scope.ScopeName, len(rules), strings.Join(ruleNames, ", "))) for _, rule := range rules { re.logger.Debug("[RoutingEngine] Evaluating rule: name=%s, expression=%s", rule.Name, rule.CelExpression) @@ -157,28 +159,29 @@ func (re *RoutingEngine) EvaluateRoutingRules(ctx *schemas.BifrostContext, routi program, err := re.store.GetRoutingProgram(ctx, rule) if err != nil { re.logger.Warn("[RoutingEngine] Failed to compile rule %s: %v", rule.Name, err) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Rule '%s' skipped: compile error: %v", rule.Name, err)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelError, fmt.Sprintf("Rule '%s' skipped: compile error: %v", rule.Name, err)) continue } matched, err := evaluateCELExpression(program, variables) if err != nil { re.logger.Warn("[RoutingEngine] Failed to evaluate rule %s: %v", rule.Name, err) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Rule '%s' skipped: eval error: %v", rule.Name, err)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelError, fmt.Sprintf("Rule '%s' skipped: eval error: %v", rule.Name, err)) continue } re.logger.Debug("[RoutingEngine] Rule %s evaluation result: matched=%v", rule.Name, matched) if !matched { - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Rule '%s' [%s] → no match", rule.Name, rule.CelExpression)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelInfo, + fmt.Sprintf("Rule '%s' [%s] → no match (%s)", rule.Name, rule.CelExpression, buildNoMatchContext(rule.CelExpression, variables))) continue } target, ok := selectWeightedTarget(rule.Targets) if !ok { re.logger.Debug("[RoutingEngine] Rule %s matched but has no valid targets (empty list or all-negative weights), skipping — note: all-zero weights use uniform selection and would not reach here", rule.Name) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Rule '%s' [%s] → matched but no valid targets (empty or all-negative weights), skipping", rule.Name, rule.CelExpression)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelError, fmt.Sprintf("Rule '%s' [%s] → matched but no valid targets (empty or all-negative weights), skipping", rule.Name, rule.CelExpression)) continue } @@ -226,7 +229,7 @@ func (re *RoutingEngine) EvaluateRoutingRules(ctx *schemas.BifrostContext, routi chainSuffix = " [chain_rule=true, continuing]" } re.logger.Debug("[RoutingEngine] Rule matched! Selected target (weight=%.2f): provider=%s, model=%s, fallbacks=%v%s", matchedTargetWeight, stepDecision.Provider, stepDecision.Model, stepDecision.Fallbacks, chainSuffix) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Rule '%s' [%s] → matched, selected target (weight=%.2f): provider=%s, model=%s, fallbacks=%v%s", matchedRule.Name, matchedRule.CelExpression, matchedTargetWeight, stepDecision.Provider, stepDecision.Model, stepDecision.Fallbacks, chainSuffix)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelInfo, fmt.Sprintf("Rule '%s' [%s] → matched, selected target (weight=%.2f): provider=%s, model=%s, fallbacks=%v%s", matchedRule.Name, matchedRule.CelExpression, matchedTargetWeight, stepDecision.Provider, stepDecision.Model, stepDecision.Fallbacks, chainSuffix)) // TERMINATION 2: Rule is terminal (chain_rule=false, the default). if !matchedRule.ChainRule { @@ -237,7 +240,7 @@ func (re *RoutingEngine) EvaluateRoutingRules(ctx *schemas.BifrostContext, routi nextState := fmt.Sprintf("%s|%s", stepDecision.Provider, stepDecision.Model) if _, seen := visited[nextState]; seen { re.logger.Debug("[RoutingEngine] Chain cycle detected at step=%d (state=%s already visited), stopping", chainStep, nextState) - ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, fmt.Sprintf("Chain cycle detected at step %d (provider=%s, model=%s already visited), stopping. Final resolved: provider=%s, model=%s", chainStep, stepDecision.Provider, stepDecision.Model, stepDecision.Provider, stepDecision.Model)) + ctx.AppendRoutingEngineLog(schemas.RoutingEngineRoutingRule, schemas.LogLevelInfo, fmt.Sprintf("Chain cycle detected at step %d (provider=%s, model=%s already visited), stopping. Final resolved: provider=%s, model=%s", chainStep, stepDecision.Provider, stepDecision.Model, stepDecision.Provider, stepDecision.Model)) break } visited[nextState] = struct{}{} @@ -348,7 +351,7 @@ func buildScopeChain(virtualKey *configstoreTables.TableVirtualKey) []ScopeLevel } // evaluateCELExpression evaluates a compiled CEL program with given variables -func evaluateCELExpression(program cel.Program, variables map[string]interface{}) (bool, error) { +func evaluateCELExpression(program cel.Program, variables map[string]any) (bool, error) { if program == nil { return false, fmt.Errorf("CEL program is nil") } @@ -469,6 +472,68 @@ func scopeChainToStrings(chain []ScopeLevel) []string { return scopes } +// buildNoMatchContext builds a compact debug string of scalar variables plus +// only the headers/params keys actually referenced in the CEL expression. +func buildNoMatchContext(expr string, variables map[string]any) string { + parts := []string{ + fmt.Sprintf("model=%q", variables["model"]), + fmt.Sprintf("provider=%q", variables["provider"]), + fmt.Sprintf("request_type=%q", variables["request_type"]), + fmt.Sprintf("budget_used=%.1f%%", variables["budget_used"]), + fmt.Sprintf("tokens_used=%.1f%%", variables["tokens_used"]), + fmt.Sprintf("request=%.1f%%", variables["request"]), + } + for _, mapName := range []string{"headers", "params"} { + keys := extractMapKeysFromCEL(expr, mapName) + if len(keys) == 0 { + continue + } + if m, ok := variables[mapName].(map[string]string); ok { + kvs := make([]string, 0, len(keys)) + for _, k := range keys { + if _, exists := m[k]; exists { + kvs = append(kvs, k+"=") + } else { + kvs = append(kvs, k+"=") + } + } + parts = append(parts, mapName+"("+strings.Join(kvs, ", ")+")") + } + } + return strings.Join(parts, ", ") +} + +// celMapKeyRegexCache caches one *regexp.Regexp per mapName to avoid +// recompiling on every call. Lazy and concurrent-safe via sync.Map's +// LoadOrStore atomicity; benign duplicate compiles on first concurrent miss. +var celMapKeyRegexCache sync.Map // map[string]*regexp.Regexp + +// extractMapKeysFromCEL extracts unique map access keys for mapName from a CEL expression. +// Handles mapName["key"], mapName['key'], and mapName.key patterns. +func extractMapKeysFromCEL(expr, mapName string) []string { + v, ok := celMapKeyRegexCache.Load(mapName) + if !ok { + quoted := regexp.QuoteMeta(mapName) + compiled := regexp.MustCompile(quoted + `\["([^"]+)"\]|` + quoted + `\['([^']+)'\]|` + quoted + `\.([a-zA-Z_][a-zA-Z0-9_]*)`) + v, _ = celMapKeyRegexCache.LoadOrStore(mapName, compiled) + } + re := v.(*regexp.Regexp) + seen := map[string]struct{}{} + var keys []string + for _, m := range re.FindAllStringSubmatch(expr, -1) { + for _, cap := range m[1:] { + if cap != "" { + if _, dup := seen[cap]; !dup { + seen[cap] = struct{}{} + keys = append(keys, cap) + } + break + } + } + } + return keys +} + // createCELEnvironment creates a new CEL environment for routing rules func createCELEnvironment() (*cel.Env, error) { return cel.NewEnv( diff --git a/plugins/governance/store_concurrency_test.go b/plugins/governance/storeconcurrency_test.go similarity index 99% rename from plugins/governance/store_concurrency_test.go rename to plugins/governance/storeconcurrency_test.go index 67e1f081ff..f316895426 100644 --- a/plugins/governance/store_concurrency_test.go +++ b/plugins/governance/storeconcurrency_test.go @@ -121,4 +121,3 @@ func TestResetBudgetAt_ConcurrentResettersCollapse(t *testing.T) { assert.Equal(t, 0.0, final.CurrentUsage) assert.True(t, final.LastReset.Equal(newLastReset)) } -