diff --git a/.github/workflows/helm-release.yml b/.github/workflows/helm-release.yml index bfeb83bb39..599efb3af6 100644 --- a/.github/workflows/helm-release.yml +++ b/.github/workflows/helm-release.yml @@ -126,4 +126,4 @@ jobs: keep_files: false enable_jekyll: false user_name: "github-actions[bot]" - user_email: "github-actions[bot]@users.noreply.github.com" + user_email: "github-actions[bot]@users.noreply.github.com" \ No newline at end of file diff --git a/.github/workflows/scripts/validate-schema-sync.sh b/.github/workflows/scripts/validate-schema-sync.sh index 0214d0679b..83dd285f0d 100755 --- a/.github/workflows/scripts/validate-schema-sync.sh +++ b/.github/workflows/scripts/validate-schema-sync.sh @@ -60,4 +60,4 @@ echo "==================================================================" --schema "$REPO_ROOT/transports/config.schema.json" \ --pkg-root "$REPO_ROOT" \ --helm-values "$REPO_ROOT/helm-charts/bifrost/values.schema.json" \ - --helm-helpers "$REPO_ROOT/helm-charts/bifrost/templates/_helpers.tpl" + --helm-helpers "$REPO_ROOT/helm-charts/bifrost/templates/_helpers.tpl" \ No newline at end of file diff --git a/core/schemas/utils.go b/core/schemas/utils.go index 5e61c84ade..2669f271e0 100644 --- a/core/schemas/utils.go +++ b/core/schemas/utils.go @@ -400,6 +400,51 @@ func SafeExtractInt(value interface{}) (int, bool) { } } +// SafeExtractInt64 safely extracts an int64 value from an any with type checking +func SafeExtractInt64(value any) (int64, bool) { + if value == nil { + return 0, false + } + switch v := value.(type) { + case int: + return int64(v), true + case int8: + return int64(v), true + case int16: + return int64(v), true + case int32: + return int64(v), true + case int64: + return v, true + case uint: + return int64(v), true + case uint8: + return int64(v), true + case uint16: + return int64(v), true + case uint32: + return int64(v), true + case uint64: + return int64(v), true + case float32: + return int64(v), true + case float64: + return int64(v), true + case json.Number: + if intVal, err := v.Int64(); err == nil { + return int64(intVal), true + } + return 0, false + case string: + if intVal, err := strconv.ParseInt(v, 10, 64); err == nil { + return intVal, true + } + return 0, false + default: + return 0, false + } +} + // SafeExtractFloat64 safely extracts a float64 value from an interface{} with type checking func SafeExtractFloat64(value interface{}) (float64, bool) { if value == nil { @@ -835,8 +880,8 @@ func DeepCopyChatTool(original ChatTool) ChatTool { if original.Function.Parameters != nil { copyParams := &ToolFunctionParameters{ - Type: original.Function.Parameters.Type, - keyOrder: original.Function.Parameters.keyOrder, + Type: original.Function.Parameters.Type, + keyOrder: original.Function.Parameters.keyOrder, explicitEmptyObject: original.Function.Parameters.explicitEmptyObject, } diff --git a/framework/configstore/migrations.go b/framework/configstore/migrations.go index a0352382f8..8096b9161d 100644 --- a/framework/configstore/migrations.go +++ b/framework/configstore/migrations.go @@ -398,6 +398,9 @@ func triggerMigrations(ctx context.Context, db *gorm.DB) error { if err := migrationNormalizeOtelTraceType(ctx, db); err != nil { return err } + if err := migrationMigrateOtelConfigToProfiles(ctx, db); err != nil { + return err + } return nil } @@ -6165,7 +6168,6 @@ func migrationAddMCPClientDiscoveredToolsColumns(ctx context.Context, db *gorm.D }}) if err := m.Migrate(); err != nil { return fmt.Errorf("error running add_mcp_client_discovered_tools_columns migration: %s", err.Error()) - } return nil } @@ -6275,7 +6277,6 @@ func migrationAddFlexTierPricingColumns(ctx context.Context, db *gorm.DB) error }}) if err := m.Migrate(); err != nil { return fmt.Errorf("error while running flex tier pricing columns migration: %s", err.Error()) - } return nil } @@ -6539,7 +6540,56 @@ func migrationNormalizeOtelTraceType(ctx context.Context, db *gorm.DB) error { }}) if err := m.Migrate(); err != nil { return fmt.Errorf("error running normalize_otel_trace_type migration: %s", err.Error()) + } + return nil +} +// migrationMigrateOtelConfigToProfiles wraps the legacy single-collector OTEL +// plugin config into the new multi-profile shape. Old: {collector_url, protocol, ...}. +// New: {profiles: [{collector_url, protocol, ...}]}. No-op if the row is already +// in the new shape or the plugin is not configured. +func migrationMigrateOtelConfigToProfiles(ctx context.Context, db *gorm.DB) error { + m := migrator.New(db, migrator.DefaultOptions, []*migrator.Migration{{ + ID: "migrate_otel_config_to_profiles", + Migrate: func(tx *gorm.DB) error { + tx = tx.WithContext(ctx) + var plugin tables.TablePlugin + err := tx.Where("name = ?", "otel").First(&plugin).Error + if err != nil { + if err == gorm.ErrRecordNotFound { + return nil + } + return fmt.Errorf("failed to load otel plugin row: %w", err) + } + // AfterFind already decrypted ConfigJSON and unmarshaled into Config. + cfgMap, ok := plugin.Config.(map[string]any) + if !ok || len(cfgMap) == 0 { + return nil + } + // Already migrated. + if _, hasProfiles := cfgMap["profiles"]; hasProfiles { + return nil + } + // Old shape has collector_url at top level. If absent, nothing to migrate. + if _, hasCollector := cfgMap["collector_url"]; !hasCollector { + return nil + } + plugin.Config = map[string]any{ + "profiles": []any{cfgMap}, + } + // Force BeforeSave to re-serialize + re-encrypt. + plugin.ConfigJSON = "" + plugin.EncryptionStatus = tables.EncryptionStatusPlainText + if err := tx.Save(&plugin).Error; err != nil { + return fmt.Errorf("failed to save migrated otel config: %w", err) + } + log.Printf("[Migration] Wrapped legacy otel config into profiles[0]") + return nil + }, + Rollback: func(tx *gorm.DB) error { return nil }, + }}) + if err := m.Migrate(); err != nil { + return fmt.Errorf("error running migrate_otel_config_to_profiles migration: %s", err.Error()) } return nil } diff --git a/plugins/otel/changelog.md b/plugins/otel/changelog.md index e69de29bb2..a33eb08e1b 100644 --- a/plugins/otel/changelog.md +++ b/plugins/otel/changelog.md @@ -0,0 +1 @@ +- feat: adds support for multiple otel profiles \ No newline at end of file diff --git a/plugins/otel/converter.go b/plugins/otel/converter.go index 9decd7c01f..eba2811be4 100644 --- a/plugins/otel/converter.go +++ b/plugins/otel/converter.go @@ -70,10 +70,10 @@ func hexToBytes(hexStr string, length int) []byte { } // convertTraceToResourceSpan converts a Bifrost trace to OTEL ResourceSpan -func (p *OtelPlugin) convertTraceToResourceSpan(trace *schemas.Trace) *ResourceSpan { +func (p *OtelProfile) convertTraceToResourceSpan(trace *schemas.Trace) *ResourceSpan { otelSpans := make([]*Span, 0, len(trace.Spans)) for _, span := range trace.Spans { - otelSpans = append(otelSpans, p.convertSpanToOTELSpan(trace.TraceID, span)) + otelSpans = append(otelSpans, convertSpanToOTELSpan(trace.TraceID, span)) } return &ResourceSpan{ @@ -81,14 +81,14 @@ func (p *OtelPlugin) convertTraceToResourceSpan(trace *schemas.Trace) *ResourceS Attributes: p.getResourceAttributes(), }, ScopeSpans: []*ScopeSpan{{ - Scope: p.getInstrumentationScope(), - Spans: otelSpans, + Scope: p.getInstrumentationScope(), + Spans: otelSpans, }}, } } // convertSpanToOTELSpan converts a single Bifrost span to OTEL format -func (p *OtelPlugin) convertSpanToOTELSpan(traceID string, span *schemas.Span) *Span { +func convertSpanToOTELSpan(traceID string, span *schemas.Span) *Span { otelSpan := &Span{ TraceId: hexToBytes(traceID, 16), SpanId: hexToBytes(span.SpanID, 8), @@ -110,7 +110,7 @@ func (p *OtelPlugin) convertSpanToOTELSpan(traceID string, span *schemas.Span) * } // getResourceAttributes returns the resource attributes for the OTEL span -func (p *OtelPlugin) getResourceAttributes() []*KeyValue { +func (p *OtelProfile) getResourceAttributes() []*KeyValue { attrs := []*KeyValue{ kvStr("service.name", p.serviceName), kvStr("service.version", p.bifrostVersion), @@ -123,7 +123,7 @@ func (p *OtelPlugin) getResourceAttributes() []*KeyValue { } // getInstrumentationScope returns the instrumentation scope for OTEL -func (p *OtelPlugin) getInstrumentationScope() *commonpb.InstrumentationScope { +func (p *OtelProfile) getInstrumentationScope() *commonpb.InstrumentationScope { return &commonpb.InstrumentationScope{ Name: p.serviceName, Version: p.bifrostVersion, @@ -156,22 +156,18 @@ func anyToKeyValue(key string, value any) *KeyValue { return nil } return kvStr(key, v) - case int: - return kvInt(key, int64(v)) - case int32: - return kvInt(key, int64(v)) - case int64: - return kvInt(key, v) - case uint: - return kvInt(key, int64(v)) - case uint32: - return kvInt(key, int64(v)) - case uint64: - return kvInt(key, int64(v)) - case float32: - return kvDbl(key, float64(v)) - case float64: - return kvDbl(key, v) + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + intValue, ok := schemas.SafeExtractInt64(v) + if !ok { + return nil + } + return kvInt(key, intValue) + case float32, float64: + floatValue, ok := schemas.SafeExtractFloat64(v) + if !ok { + return nil + } + return kvDbl(key, floatValue) case bool: return kvBool(key, v) case []string: @@ -184,32 +180,29 @@ func anyToKeyValue(key string, value any) *KeyValue { } return kvAny(key, arrValue(vals...)) case []int: - if len(v) == 0 { - return nil - } - vals := make([]*AnyValue, len(v)) - for i, n := range v { - vals[i] = &AnyValue{Value: &IntValue{IntValue: int64(n)}} - } - return kvAny(key, arrValue(vals...)) + return intSliceToKeyValue(key, v) + case []int8: + return intSliceToKeyValue(key, v) + case []int16: + return intSliceToKeyValue(key, v) + case []int32: + return intSliceToKeyValue(key, v) case []int64: - if len(v) == 0 { - return nil - } - vals := make([]*AnyValue, len(v)) - for i, n := range v { - vals[i] = &AnyValue{Value: &IntValue{IntValue: n}} - } - return kvAny(key, arrValue(vals...)) + return intSliceToKeyValue(key, v) + case []uint: + return intSliceToKeyValue(key, v) + case []uint8: + return intSliceToKeyValue(key, v) + case []uint16: + return intSliceToKeyValue(key, v) + case []uint32: + return intSliceToKeyValue(key, v) + case []uint64: + return intSliceToKeyValue(key, v) + case []float32: + return floatSliceToAttribute(key, v) case []float64: - if len(v) == 0 { - return nil - } - vals := make([]*AnyValue, len(v)) - for i, n := range v { - vals[i] = &AnyValue{Value: &DoubleValue{DoubleValue: n}} - } - return kvAny(key, arrValue(vals...)) + return floatSliceToAttribute(key, v) case map[string]any: if len(v) == 0 { return nil @@ -228,6 +221,30 @@ func anyToKeyValue(key string, value any) *KeyValue { } } +func intSliceToKeyValue[T int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64](key string, slice []T) *KeyValue { + if len(slice) == 0 { + return nil + } + vals := make([]*AnyValue, len(slice)) + for i, n := range slice { + iv, _ := schemas.SafeExtractInt64(n) + vals[i] = &AnyValue{Value: &IntValue{IntValue: iv}} + } + return kvAny(key, arrValue(vals...)) +} + +func floatSliceToAttribute[T float32 | float64](key string, slice []T) *KeyValue { + if len(slice) == 0 { + return nil + } + vals := make([]*AnyValue, len(slice)) + for i, n := range slice { + fv, _ := schemas.SafeExtractFloat64(n) + vals[i] = &AnyValue{Value: &DoubleValue{DoubleValue: fv}} + } + return kvAny(key, arrValue(vals...)) +} + // convertSpanKind maps Bifrost SpanKind to OTEL SpanKind func convertSpanKind(kind schemas.SpanKind) tracepb.Span_SpanKind { switch kind { diff --git a/plugins/otel/go.mod b/plugins/otel/go.mod index 7595e08f75..c1641a7591 100644 --- a/plugins/otel/go.mod +++ b/plugins/otel/go.mod @@ -157,7 +157,7 @@ require ( ) require ( - github.com/bytedance/sonic v1.15.0 + github.com/bytedance/sonic v1.15.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect diff --git a/plugins/otel/grpc.go b/plugins/otel/grpc.go index 6cdd5f9a08..082b8d176a 100644 --- a/plugins/otel/grpc.go +++ b/plugins/otel/grpc.go @@ -2,10 +2,6 @@ package otel import ( "context" - "crypto/tls" - "crypto/x509" - "fmt" - "os" collectorpb "go.opentelemetry.io/proto/otlp/collector/trace/v1" "google.golang.org/grpc" @@ -24,33 +20,14 @@ type OtelClientGRPC struct { // NewOtelClientGRPC creates a new OpenTelemetry client for gRPC func NewOtelClientGRPC(endpoint string, headers map[string]string, tlsCACert string, insecureMode bool) (*OtelClientGRPC, error) { var creds credentials.TransportCredentials - // TLS priority: custom CA > system roots > insecure - if tlsCACert != "" { - // Validate the CA cert path to prevent path traversal attacks - if err := validateCACertPath(tlsCACert); err != nil { - return nil, err - } - // Use custom CA certificate with MinVersion - caCert, err := os.ReadFile(tlsCACert) - if err != nil { - return nil, fmt.Errorf("fail to load provided CA cert: %w", err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, fmt.Errorf("fail to parse provided CA cert") - } - tlsConfig := &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, - } - creds = credentials.NewTLS(tlsConfig) - } else if insecureMode { - // Skip TLS entirely + // gRPC insecure mode uses plaintext (no TLS at all), not just skip-verify. + // buildTLSConfig is bypassed here to preserve that behaviour. + if tlsCACert == "" && insecureMode { creds = insecure.NewCredentials() } else { - // Use system root CAs with MinVersion - tlsConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, + tlsConfig, err := buildTLSConfig(tlsCACert, false) + if err != nil { + return nil, err } creds = credentials.NewTLS(tlsConfig) } diff --git a/plugins/otel/http.go b/plugins/otel/http.go index 08ba89284d..9cab25fb23 100644 --- a/plugins/otel/http.go +++ b/plugins/otel/http.go @@ -3,12 +3,9 @@ package otel import ( "bytes" "context" - "crypto/tls" - "crypto/x509" "fmt" "io" "net/http" - "os" "strings" "time" @@ -30,35 +27,11 @@ func NewOtelClientHTTP(endpoint string, headers map[string]string, tlsCACert str transport.MaxIdleConnsPerHost = 10 transport.IdleConnTimeout = 120 * time.Second - // TLS priority: custom CA > system roots > insecure - if tlsCACert != "" { - // Validate the CA cert path to prevent path traversal attacks - if err := validateCACertPath(tlsCACert); err != nil { - return nil, err - } - caCert, err := os.ReadFile(tlsCACert) - if err != nil { - return nil, fmt.Errorf("fail to load provided CA cert: %w", err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, fmt.Errorf("fail to add provided CA cert") - } - transport.TLSClientConfig = &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, - } - } else if insecureMode { - transport.TLSClientConfig = &tls.Config{ - InsecureSkipVerify: true, // #nosec G402 - MinVersion: tls.VersionTLS12, - } - } else { - // Use system root CAs with MinVersion - transport.TLSClientConfig = &tls.Config{ - MinVersion: tls.VersionTLS12, - } + tlsConfig, err := buildTLSConfig(tlsCACert, insecureMode) + if err != nil { + return nil, err } + transport.TLSClientConfig = tlsConfig return &OtelClientHTTP{client: &http.Client{ Timeout: 30 * time.Second, diff --git a/plugins/otel/main.go b/plugins/otel/main.go index faa32944e9..1f1cc57a6e 100644 --- a/plugins/otel/main.go +++ b/plugins/otel/main.go @@ -10,15 +10,14 @@ import ( "github.com/bytedance/sonic" "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/framework/modelcatalog" - "go.opentelemetry.io/otel/attribute" commonpb "go.opentelemetry.io/proto/otlp/common/v1" ) -// logger is the logger for the OTEL plugin +// logger is the package-level logger, set once in Init. var logger schemas.Logger -// OTELResponseAttributesEnvKey is the environment variable key for the OTEL resource attributes -// We check if this is present in the environment variables and if so, we will use it to set the attributes for all spans at the resource level +// OTELResponseAttributesEnvKey is the environment variable key for the OTEL resource attributes. +// If set, its key=value pairs are attached to every span at the resource level. const OTELResponseAttributesEnvKey = "OTEL_RESOURCE_ATTRIBUTES" const PluginName = "otel" @@ -26,32 +25,30 @@ const PluginName = "otel" // TraceType is the type of trace to use for the OTEL collector type TraceType string -// TraceTypeGenAIExtension is the type of trace to use for the OTEL collector -const TraceTypeGenAIExtension TraceType = "genai_extension" - -// TraceTypeVercel is the type of trace to use for the OTEL collector -const TraceTypeVercel TraceType = "vercel" - -// TraceTypeOpenInference is the type of trace to use for the OTEL collector -const TraceTypeOpenInference TraceType = "open_inference" +const ( + TraceTypeGenAIExtension TraceType = "genai_extension" + TraceTypeVercel TraceType = "vercel" + TraceTypeOpenInference TraceType = "open_inference" +) // Protocol is the protocol to use for the OTEL collector type Protocol string -// ProtocolHTTP is the default protocol -const ProtocolHTTP Protocol = "http" - -// ProtocolGRPC is the second protocol -const ProtocolGRPC Protocol = "grpc" +const ( + ProtocolHTTP Protocol = "http" + ProtocolGRPC Protocol = "grpc" +) -type Config struct { +// OtelProfileConfig is the per-collector configuration. +type OtelProfileConfig struct { + Enabled *bool `json:"enabled,omitempty"` // nil or true = enabled; false = skip during export ServiceName string `json:"service_name"` CollectorURL string `json:"collector_url"` Headers map[string]string `json:"headers"` TraceType TraceType `json:"trace_type"` Protocol Protocol `json:"protocol"` TLSCACert string `json:"tls_ca_cert"` - Insecure bool `json:"insecure"` // Skip TLS when true; ignored if TLSCACert is set. Defaults to true when omitted. + Insecure bool `json:"insecure"` // Skip TLS when true; ignored if TLSCACert is set // Metrics push configuration MetricsEnabled bool `json:"metrics_enabled"` @@ -62,8 +59,8 @@ type Config struct { // UnmarshalJSON applies field defaults that the zero-value wouldn't capture. // Specifically, Insecure defaults to true when the key is omitted so http:// // collectors work out-of-the-box without forcing users to set it explicitly. -func (c *Config) UnmarshalJSON(data []byte) error { - type alias Config +func (c *OtelProfileConfig) UnmarshalJSON(data []byte) error { + type alias OtelProfileConfig aux := struct { Insecure *bool `json:"insecure"` *alias @@ -81,32 +78,37 @@ func (c *Config) UnmarshalJSON(data []byte) error { return nil } +// Config holds one or more collector profiles. +type Config struct { + Profiles []*OtelProfileConfig +} + +// OtelProfile holds the runtime state for a single collector destination: +// its client, optional metrics exporter, and the metadata used to build +// OTEL resource/scope attributes on every emitted span. +type OtelProfile struct { + serviceName string + url string + headers map[string]string + traceType TraceType + protocol Protocol + bifrostVersion string + attributesFromEnvironment []*commonpb.KeyValue + client OtelClient + metricsExporter *MetricsExporter +} + // OtelPlugin is the plugin for OpenTelemetry. -// It implements the ObservabilityPlugin interface to receive completed traces -// from the tracing middleware and forward them to an OTEL collector. +// It implements ObservabilityPlugin and fans traces out to every configured profile. type OtelPlugin struct { ctx context.Context cancel context.CancelFunc - serviceName string - url string - headers map[string]string - traceType TraceType - protocol Protocol - - bifrostVersion string - - attributesFromEnvironment []*commonpb.KeyValue - - client OtelClient - + profiles []*OtelProfile pricingManager *modelcatalog.ModelCatalog - - // Metrics push support - metricsExporter *MetricsExporter } -// Init function for the OTEL plugin +// Init creates the plugin, initialising one client (and optional metrics exporter) per profile. func Init(ctx context.Context, config *Config, _logger schemas.Logger, pricingManager *modelcatalog.ModelCatalog, bifrostVersion string) (*OtelPlugin, error) { if config == nil { return nil, fmt.Errorf("config is required") @@ -115,95 +117,137 @@ func Init(ctx context.Context, config *Config, _logger schemas.Logger, pricingMa if pricingManager == nil { logger.Warn("otel plugin requires model catalog to calculate cost, all cost calculations will be skipped.") } - var err error - // If headers are present, and any of them start with env., we will replace the value with the environment variable - if config.Headers != nil { - for key, value := range config.Headers { - if newValue, ok := strings.CutPrefix(value, "env."); ok { - config.Headers[key] = os.Getenv(newValue) - if config.Headers[key] == "" { - logger.Warn("environment variable %s not found", newValue) - return nil, fmt.Errorf("environment variable %s not found", newValue) - } - } - } - } - if config.ServiceName == "" { - config.ServiceName = "bifrost" + if len(config.Profiles) == 0 { + return nil, fmt.Errorf("at least one profile is required") } - // Loading attributes from environment - attributesFromEnvironment := make([]*commonpb.KeyValue, 0) - if attributes, ok := os.LookupEnv(OTELResponseAttributesEnvKey); ok { - // We will split the attributes by , and then split each attribute by = - for attribute := range strings.SplitSeq(attributes, ",") { - attributeParts := strings.Split(strings.TrimSpace(attribute), "=") - if len(attributeParts) == 2 { - attributesFromEnvironment = append(attributesFromEnvironment, kvStr(strings.TrimSpace(attributeParts[0]), strings.TrimSpace(attributeParts[1]))) - } + + attributesFromEnvironment := loadEnvAttributes() + + profiles := make([]*OtelProfile, 0, len(config.Profiles)) + for i, profileCfg := range config.Profiles { + if profileCfg == nil { + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d]: config is required", i) } - } - // Preparing the plugin - p := &OtelPlugin{ - serviceName: config.ServiceName, - url: config.CollectorURL, - traceType: config.TraceType, - headers: config.Headers, - protocol: config.Protocol, - pricingManager: pricingManager, - bifrostVersion: bifrostVersion, - attributesFromEnvironment: attributesFromEnvironment, - } - p.ctx, p.cancel = context.WithCancel(ctx) - if config.Protocol == ProtocolGRPC { - p.client, err = NewOtelClientGRPC(config.CollectorURL, config.Headers, config.TLSCACert, config.Insecure) - if err != nil { - return nil, err + if profileCfg.Enabled != nil && !*profileCfg.Enabled { + continue } - } - if config.Protocol == ProtocolHTTP { - p.client, err = NewOtelClientHTTP(config.CollectorURL, config.Headers, config.TLSCACert, config.Insecure) - if err != nil { - return nil, err + if err := injectEnvToHeaders(profileCfg.Headers); err != nil { + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d]: %w", i, err) } - } - if p.client == nil { - return nil, fmt.Errorf("otel client is not initialized. invalid protocol type") - } - - // Initialize metrics exporter if enabled - if config.MetricsEnabled { - if config.MetricsEndpoint == "" { - return nil, fmt.Errorf("metrics_endpoint is required when metrics_enabled is true") + if profileCfg.CollectorURL == "" { + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d]: collector_url is required", i) + } + if profileCfg.ServiceName == "" { + profileCfg.ServiceName = "bifrost" } - pushInterval := config.MetricsPushInterval - if pushInterval <= 0 { - pushInterval = 15 // default 15 seconds - } else if pushInterval > 300 { - return nil, fmt.Errorf("metrics_push_interval must be between 1 and 300 seconds, got %d", pushInterval) + if profileCfg.TraceType == "" { + profileCfg.TraceType = TraceTypeGenAIExtension } - metricsConfig := &MetricsConfig{ - ServiceName: config.ServiceName, - Endpoint: config.MetricsEndpoint, - Headers: config.Headers, - Protocol: config.Protocol, - TLSCACert: config.TLSCACert, - Insecure: config.Insecure, - PushInterval: pushInterval, + if profileCfg.Protocol == "" { + profileCfg.Protocol = ProtocolHTTP + } + + var ( + client OtelClient + err error + ) + switch profileCfg.Protocol { + case ProtocolGRPC: + client, err = NewOtelClientGRPC(profileCfg.CollectorURL, profileCfg.Headers, profileCfg.TLSCACert, profileCfg.Insecure) + case ProtocolHTTP: + client, err = NewOtelClientHTTP(profileCfg.CollectorURL, profileCfg.Headers, profileCfg.TLSCACert, profileCfg.Insecure) + default: + err = fmt.Errorf("unsupported protocol: %s", profileCfg.Protocol) } - p.metricsExporter, err = NewMetricsExporter(p.ctx, metricsConfig) if err != nil { - // Clean up trace client if metrics exporter fails - if p.client != nil { - p.client.Close() + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d] (%s): %w", i, profileCfg.ServiceName, err) + } + + profile := &OtelProfile{ + serviceName: profileCfg.ServiceName, + url: profileCfg.CollectorURL, + headers: profileCfg.Headers, + traceType: profileCfg.TraceType, + protocol: profileCfg.Protocol, + bifrostVersion: bifrostVersion, + attributesFromEnvironment: attributesFromEnvironment, + client: client, + } + + if profileCfg.MetricsEnabled { + if profileCfg.MetricsEndpoint == "" { + _ = client.Close() + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d] (%s): metrics_endpoint is required when metrics_enabled is true", i, profileCfg.ServiceName) + } + pushInterval := profileCfg.MetricsPushInterval + if pushInterval <= 0 { + pushInterval = 15 + } else if pushInterval > 300 { + _ = client.Close() + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d] (%s): metrics_push_interval must be between 1 and 300 seconds, got %d", i, profileCfg.ServiceName, pushInterval) + } + metricsConfig := &MetricsConfig{ + ServiceName: profileCfg.ServiceName, + Endpoint: profileCfg.MetricsEndpoint, + Headers: profileCfg.Headers, + Protocol: profileCfg.Protocol, + TLSCACert: profileCfg.TLSCACert, + Insecure: profileCfg.Insecure, + PushInterval: pushInterval, } - return nil, fmt.Errorf("failed to initialize metrics exporter: %w", err) + profile.metricsExporter, err = NewMetricsExporter(ctx, metricsConfig, bifrostVersion) + if err != nil { + _ = client.Close() + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d] (%s): failed to initialize metrics exporter: %w", i, profileCfg.ServiceName, err) + } + logger.Info("OTEL metrics push enabled for %s, pushing to %s every %d seconds", profileCfg.ServiceName, profileCfg.MetricsEndpoint, pushInterval) } - logger.Info("OTEL metrics push enabled, pushing to %s every %d seconds", config.MetricsEndpoint, pushInterval) + + profiles = append(profiles, profile) + } + + p := &OtelPlugin{ + profiles: profiles, + pricingManager: pricingManager, } + p.ctx, p.cancel = context.WithCancel(ctx) return p, nil } +// loadEnvAttributes parses OTEL_RESOURCE_ATTRIBUTES into KeyValue pairs. +func loadEnvAttributes() []*commonpb.KeyValue { + result := make([]*commonpb.KeyValue, 0) + if attributes, ok := os.LookupEnv(OTELResponseAttributesEnvKey); ok { + for attribute := range strings.SplitSeq(attributes, ",") { + parts := strings.Split(strings.TrimSpace(attribute), "=") + if len(parts) == 2 { + result = append(result, kvStr(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]))) + } + } + } + return result +} + +// closeProfiles is used in the Init error path to clean up already-initialised profiles. +func closeProfiles(profiles []*OtelProfile) { + for _, p := range profiles { + if p.metricsExporter != nil { + _ = p.metricsExporter.Shutdown(context.Background()) + } + if p.client != nil { + _ = p.client.Close() + } + } +} + // GetName function for the OTEL plugin func (p *OtelPlugin) GetName() string { return PluginName @@ -224,247 +268,57 @@ func (p *OtelPlugin) HTTPTransportStreamChunkHook(ctx *schemas.BifrostContext, r return chunk, nil } -// ValidateConfig function for the OTEL plugin -func (p *OtelPlugin) ValidateConfig(config any) (*Config, error) { - var otelConfig Config - // Checking if its a string, then we will JSON parse and confirm - if configStr, ok := config.(string); ok { - if err := sonic.Unmarshal([]byte(configStr), &otelConfig); err != nil { - return nil, err - } - } - // Checking if its a map[string]any, then we will JSON parse and confirm - if configMap, ok := config.(map[string]any); ok { - configString, err := sonic.Marshal(configMap) - if err != nil { - return nil, err - } - if err := sonic.Unmarshal([]byte(configString), &otelConfig); err != nil { - return nil, err - } - } - // Checking if its a Config, then we will confirm - if config, ok := config.(*Config); ok { - otelConfig = *config - } - // Validating fields - if otelConfig.CollectorURL == "" { - return nil, fmt.Errorf("collector url is required") - } - if otelConfig.TraceType == "" { - return nil, fmt.Errorf("trace type is required") - } - if otelConfig.Protocol == "" { - return nil, fmt.Errorf("protocol is required") - } - return &otelConfig, nil -} - // PreLLMHook is a no-op - tracing is handled via the Inject method. -// The OTEL plugin receives completed traces from TracingMiddleware. func (p *OtelPlugin) PreLLMHook(_ *schemas.BifrostContext, req *schemas.BifrostRequest) (*schemas.BifrostRequest, *schemas.LLMPluginShortCircuit, error) { return req, nil, nil } // PostLLMHook is a no-op - tracing is handled via the Inject method. -// The OTEL plugin receives completed traces from TracingMiddleware. func (p *OtelPlugin) PostLLMHook(_ *schemas.BifrostContext, resp *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError, error) { return resp, bifrostErr, nil } -// Inject receives a completed trace and sends it to the OTEL collector. -// Implements schemas.ObservabilityPlugin interface. -// This method is called asynchronously by TracingMiddleware after the response -// has been written to the client. +// Inject receives a completed trace and forwards it to every configured collector profile. func (p *OtelPlugin) Inject(ctx context.Context, trace *schemas.Trace) error { if trace == nil { return nil } - - // Emit trace to collector if client is initialized - if p.client != nil { - // Convert schemas.Trace to OTEL ResourceSpan - resourceSpan := p.convertTraceToResourceSpan(trace) - - // Emit to collector - if err := p.client.Emit(ctx, []*ResourceSpan{resourceSpan}); err != nil { - logger.Error("failed to emit trace %s: %v", trace.TraceID, err) + for _, profile := range p.profiles { + resourceSpan := profile.convertTraceToResourceSpan(trace) + if err := profile.client.Emit(ctx, []*ResourceSpan{resourceSpan}); err != nil { + logger.Error("failed to emit trace %s to %s: %v", trace.TraceID, profile.url, err) } - } - - // Record metrics if metrics exporter is enabled - if p.metricsExporter != nil { - p.recordMetricsFromTrace(ctx, trace) - } - - return nil -} - -// Helper functions for type-safe attribute extraction from trace spans -func getStringAttr(attrs map[string]any, key string) string { - if attrs == nil { - return "" - } - if v, ok := attrs[key].(string); ok { - return v - } - return "" -} - -func getIntAttr(attrs map[string]any, key string) int { - if attrs == nil { - return 0 - } - switch v := attrs[key].(type) { - case int: - return v - case int64: - return int(v) - case float64: - return int(v) - } - return 0 -} - -func getFloat64Attr(attrs map[string]any, key string) float64 { - if attrs == nil { - return 0 - } - switch v := attrs[key].(type) { - case float64: - return v - case int: - return float64(v) - case int64: - return float64(v) - } - return 0 -} - -// buildSpanAttrs extracts metric dimension attrs from a single attempt span. -func buildSpanAttrs(span *schemas.Span) []attribute.KeyValue { - attrs := span.Attributes - method := getStringAttr(attrs, "request.type") - if method == "" { - method = span.Name - } - return BuildBifrostAttributes( - getStringAttr(attrs, schemas.AttrProviderName), - getStringAttr(attrs, schemas.AttrRequestModel), - method, - getStringAttr(attrs, schemas.AttrVirtualKeyID), - getStringAttr(attrs, schemas.AttrVirtualKeyName), - getStringAttr(attrs, schemas.AttrSelectedKeyID), - getStringAttr(attrs, schemas.AttrSelectedKeyName), - getIntAttr(attrs, schemas.AttrNumberOfRetries), - getIntAttr(attrs, schemas.AttrFallbackIndex), - getStringAttr(attrs, schemas.AttrTeamID), - getStringAttr(attrs, schemas.AttrTeamName), - getStringAttr(attrs, schemas.AttrCustomerID), - getStringAttr(attrs, schemas.AttrCustomerName), - ) -} - -// recordMetricsFromTrace extracts metrics data from a completed trace and records them -// via the OTEL metrics exporter. This is called from Inject after trace emission. -// -// Per-attempt metrics (upstream_requests, errors, success, latency) are recorded once -// per llm.call/retry span so fallback attempts and failed retries are counted with -// their own provider/model/fallback_index labels. Per-trace metrics (tokens, cost, -// TTFT) are recorded once, keyed off the final (latest) attempt span. -func (p *OtelPlugin) recordMetricsFromTrace(ctx context.Context, trace *schemas.Trace) { - if trace == nil || p.metricsExporter == nil { - return - } - - var finalSpan *schemas.Span - for _, span := range trace.Spans { - if span.Kind != schemas.SpanKindLLMCall && span.Kind != schemas.SpanKindRetry { - continue + if profile.metricsExporter != nil { + profile.metricsExporter.recordMetricsFromTrace(ctx, trace) } - - spanAttrs := buildSpanAttrs(span) - - p.metricsExporter.RecordUpstreamRequest(ctx, spanAttrs...) - - if !span.StartTime.IsZero() && !span.EndTime.IsZero() { - latencySeconds := span.EndTime.Sub(span.StartTime).Seconds() - p.metricsExporter.RecordUpstreamLatency(ctx, latencySeconds, spanAttrs...) - } - - if span.Status == schemas.SpanStatusError { - p.metricsExporter.RecordErrorRequest(ctx, spanAttrs...) - } else { - p.metricsExporter.RecordSuccessRequest(ctx, spanAttrs...) - } - - if finalSpan == nil || span.EndTime.After(finalSpan.EndTime) { - finalSpan = span - } - } - - if finalSpan == nil { - finalSpan = trace.RootSpan - } - if finalSpan == nil { - return - } - - attrs := finalSpan.Attributes - otelAttrs := buildSpanAttrs(finalSpan) - - // Record token usage - try both naming conventions - inputTokens := getIntAttr(attrs, schemas.AttrPromptTokens) - if inputTokens == 0 { - inputTokens = getIntAttr(attrs, schemas.AttrInputTokens) - } - if inputTokens > 0 { - p.metricsExporter.RecordInputTokens(ctx, int64(inputTokens), otelAttrs...) - } - - outputTokens := getIntAttr(attrs, schemas.AttrCompletionTokens) - if outputTokens == 0 { - outputTokens = getIntAttr(attrs, schemas.AttrOutputTokens) - } - if outputTokens > 0 { - p.metricsExporter.RecordOutputTokens(ctx, int64(outputTokens), otelAttrs...) - } - - // Record cost if available - cost := getFloat64Attr(attrs, schemas.AttrUsageCost) - if cost > 0 { - p.metricsExporter.RecordCost(ctx, cost, otelAttrs...) - } - - // Record streaming latency metrics if available - ttft := getFloat64Attr(attrs, schemas.AttrTimeToFirstToken) - if ttft > 0 { - // Convert from nanoseconds to seconds if needed (check the unit) - p.metricsExporter.RecordStreamFirstTokenLatency(ctx, ttft/1e9, otelAttrs...) } + return nil } -// Cleanup function for the OTEL plugin +// Cleanup shuts down all profile clients and metrics exporters. func (p *OtelPlugin) Cleanup() error { if p.cancel != nil { p.cancel() } - // Shutdown metrics exporter first - if p.metricsExporter != nil { - if err := p.metricsExporter.Shutdown(context.Background()); err != nil { - logger.Error("failed to shutdown metrics exporter: %v", err) + var firstErr error + for _, profile := range p.profiles { + if profile.metricsExporter != nil { + if err := profile.metricsExporter.Shutdown(context.Background()); err != nil { + logger.Error("failed to shutdown metrics exporter for %s: %v", profile.serviceName, err) + if firstErr == nil { + firstErr = err + } + } + } + if err := profile.client.Close(); err != nil { + logger.Error("failed to close client for %s: %v", profile.serviceName, err) + if firstErr == nil { + firstErr = err + } } } - if p.client != nil { - return p.client.Close() - } - return nil -} - -// GetMetricsExporter returns the metrics exporter for external use (e.g., by telemetry plugin) -func (p *OtelPlugin) GetMetricsExporter() *MetricsExporter { - return p.metricsExporter + return firstErr } // Compile-time check that OtelPlugin implements ObservabilityPlugin -var _ schemas.ObservabilityPlugin = (*OtelPlugin)(nil) +var _ schemas.ObservabilityPlugin = (*OtelPlugin)(nil) \ No newline at end of file diff --git a/plugins/otel/metrics.go b/plugins/otel/metrics.go index e1b5d85089..73aa3de26e 100644 --- a/plugins/otel/metrics.go +++ b/plugins/otel/metrics.go @@ -2,14 +2,12 @@ package otel import ( "context" - "crypto/tls" - "crypto/x509" "fmt" "os" - "path/filepath" "sync" "time" + "github.com/maximhq/bifrost/core/schemas" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -61,86 +59,84 @@ type MetricsExporter struct { httpResponseSizeBytes *syncFloat64Histogram } -// syncInt64Counter wraps metric.Int64Counter with thread-safe lazy initialization -type syncInt64Counter struct { - counter metric.Int64Counter +// onceCounter provides thread-safe once-initialization for any OTel metric instrument. +type onceCounter[I any] struct { + counter I + ok bool once sync.Once - name string - desc string - unit string - meter metric.Meter } -func (c *syncInt64Counter) Add(ctx context.Context, value int64, opts ...metric.AddOption) { - c.once.Do(func() { +func (o *onceCounter[I]) load(name string, create func() (I, error)) (I, bool) { + o.once.Do(func() { var err error - c.counter, err = c.meter.Int64Counter(c.name, - metric.WithDescription(c.desc), - metric.WithUnit(c.unit), - ) + o.counter, err = create() + o.ok = err == nil if err != nil { - logger.Error("failed to create counter %s: %v", c.name, err) + logger.Error("failed to create metric %s: %v", name, err) } }) - if c.counter != nil { - c.counter.Add(ctx, value, opts...) + return o.counter, o.ok +} + +// syncInt64Counter wraps metric.Int64Counter with thread-safe lazy initialization +type syncInt64Counter struct { + onceCounter[metric.Int64Counter] + name, desc, unit string + meter metric.Meter +} + +func newSyncInt64Counter(name, desc, unit string, meter metric.Meter) *syncInt64Counter { + return &syncInt64Counter{name: name, desc: desc, unit: unit, meter: meter} +} + +func (c *syncInt64Counter) Add(ctx context.Context, value int64, opts ...metric.AddOption) { + if inst, ok := c.load(c.name, func() (metric.Int64Counter, error) { + return c.meter.Int64Counter(c.name, metric.WithDescription(c.desc), metric.WithUnit(c.unit)) + }); ok { + inst.Add(ctx, value, opts...) } } // syncFloat64Counter wraps metric.Float64Counter with thread-safe lazy initialization type syncFloat64Counter struct { - counter metric.Float64Counter - once sync.Once - name string - desc string - unit string - meter metric.Meter + onceCounter[metric.Float64Counter] + name, desc, unit string + meter metric.Meter +} + +func newSyncFloat64Counter(name, desc, unit string, meter metric.Meter) *syncFloat64Counter { + return &syncFloat64Counter{name: name, desc: desc, unit: unit, meter: meter} } func (c *syncFloat64Counter) Add(ctx context.Context, value float64, opts ...metric.AddOption) { - c.once.Do(func() { - var err error - c.counter, err = c.meter.Float64Counter(c.name, - metric.WithDescription(c.desc), - metric.WithUnit(c.unit), - ) - if err != nil { - logger.Error("failed to create float counter %s: %v", c.name, err) - } - }) - if c.counter != nil { - c.counter.Add(ctx, value, opts...) + if inst, ok := c.load(c.name, func() (metric.Float64Counter, error) { + return c.meter.Float64Counter(c.name, metric.WithDescription(c.desc), metric.WithUnit(c.unit)) + }); ok { + inst.Add(ctx, value, opts...) } } // syncFloat64Histogram wraps metric.Float64Histogram with thread-safe lazy initialization type syncFloat64Histogram struct { - histogram metric.Float64Histogram - once sync.Once - name string - desc string - unit string - meter metric.Meter + onceCounter[metric.Float64Histogram] + name, desc, unit string + meter metric.Meter +} + +func newSyncFloat64Histogram(name, desc, unit string, meter metric.Meter) *syncFloat64Histogram { + return &syncFloat64Histogram{name: name, desc: desc, unit: unit, meter: meter} } func (h *syncFloat64Histogram) Record(ctx context.Context, value float64, opts ...metric.RecordOption) { - h.once.Do(func() { - var err error - h.histogram, err = h.meter.Float64Histogram(h.name, - metric.WithDescription(h.desc), - metric.WithUnit(h.unit), - ) - if err != nil { - logger.Error("failed to create histogram %s: %v", h.name, err) - } - }) - if h.histogram != nil { - h.histogram.Record(ctx, value, opts...) + if inst, ok := h.load(h.name, func() (metric.Float64Histogram, error) { + return h.meter.Float64Histogram(h.name, metric.WithDescription(h.desc), metric.WithUnit(h.unit)) + }); ok { + inst.Record(ctx, value, opts...) } } // NewMetricsExporter creates a new OTEL metrics exporter -func NewMetricsExporter(ctx context.Context, config *MetricsConfig) (*MetricsExporter, error) { +func NewMetricsExporter(ctx context.Context, config *MetricsConfig, version string) (*MetricsExporter, error) { // Generate a unique instance ID for this node instanceID, err := os.Hostname() if err != nil { @@ -187,7 +183,7 @@ func NewMetricsExporter(ctx context.Context, config *MetricsConfig) (*MetricsExp // Create meter meter := provider.Meter("bifrost", - metric.WithInstrumentationVersion("1.0.0"), + metric.WithInstrumentationVersion(version), ) // Create metrics exporter @@ -202,43 +198,6 @@ func NewMetricsExporter(ctx context.Context, config *MetricsConfig) (*MetricsExp return m, nil } -// validateCACertPath validates the CA certificate path to prevent path traversal attacks. -// It ensures the path is absolute, cleaned of traversal sequences, and exists as a regular file. -func validateCACertPath(certPath string) error { - if certPath == "" { - return nil - } - - // Clean the path to resolve any .. or . components - cleanPath := filepath.Clean(certPath) - - // Require absolute paths to prevent relative path attacks - if !filepath.IsAbs(cleanPath) { - return fmt.Errorf("TLS CA cert path must be absolute: %s", certPath) - } - - // Check that the cleaned path doesn't differ significantly from input - // (indicates attempted traversal) - if cleanPath != filepath.Clean(filepath.FromSlash(certPath)) { - return fmt.Errorf("invalid TLS CA cert path: %s", certPath) - } - - // Verify the file exists and is not a symlink - info, err := os.Lstat(cleanPath) - if err != nil { - return fmt.Errorf("TLS CA cert path not accessible: %w", err) - } - // Reject symlinks to prevent symlink-based path traversal - if info.Mode()&os.ModeSymlink != 0 { - return fmt.Errorf("TLS CA cert path cannot be a symlink: %s", certPath) - } - if !info.Mode().IsRegular() { - return fmt.Errorf("TLS CA cert path is not a regular file: %s", certPath) - } - - return nil -} - func createHTTPExporter(ctx context.Context, config *MetricsConfig) (sdkmetric.Exporter, error) { opts := []otlpmetrichttp.Option{ otlpmetrichttp.WithEndpointURL(config.Endpoint), @@ -248,34 +207,16 @@ func createHTTPExporter(ctx context.Context, config *MetricsConfig) (sdkmetric.E opts = append(opts, otlpmetrichttp.WithHeaders(config.Headers)) } - // TLS priority: custom CA > system roots > insecure - if config.TLSCACert != "" { - // Validate the CA cert path to prevent path traversal attacks - if err := validateCACertPath(config.TLSCACert); err != nil { - return nil, err - } - // Use custom CA certificate - caCert, err := os.ReadFile(config.TLSCACert) + // HTTP metrics insecure mode disables TLS entirely (unlike the trace HTTP client + // which uses InsecureSkipVerify). buildTLSConfig is bypassed for that case. + if config.TLSCACert == "" && config.Insecure { + opts = append(opts, otlpmetrichttp.WithInsecure()) + } else { + tlsConfig, err := buildTLSConfig(config.TLSCACert, false) if err != nil { - return nil, fmt.Errorf("failed to read CA cert: %w", err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, fmt.Errorf("failed to parse CA cert") - } - tlsConfig := &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, + return nil, err } opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig)) - } else if config.Insecure { - // Skip TLS entirely - opts = append(opts, otlpmetrichttp.WithInsecure()) - } else { - // Use system root CAs (empty tls.Config uses system roots) - opts = append(opts, otlpmetrichttp.WithTLSClientConfig(&tls.Config{ - MinVersion: tls.VersionTLS12, - })) } return otlpmetrichttp.New(ctx, opts...) @@ -290,141 +231,50 @@ func createGRPCExporter(ctx context.Context, config *MetricsConfig) (sdkmetric.E opts = append(opts, otlpmetricgrpc.WithHeaders(config.Headers)) } - // TLS priority: custom CA > system roots > insecure - if config.TLSCACert != "" { - // Validate the CA cert path to prevent path traversal attacks - if err := validateCACertPath(config.TLSCACert); err != nil { - return nil, err - } - // Use custom CA certificate with MinVersion - caCert, err := os.ReadFile(config.TLSCACert) - if err != nil { - return nil, fmt.Errorf("failed to read CA cert: %w", err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, fmt.Errorf("failed to parse CA cert") - } - tlsConfig := &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, - } - creds := credentials.NewTLS(tlsConfig) - opts = append(opts, otlpmetricgrpc.WithTLSCredentials(creds)) - } else if config.Insecure { - // Skip TLS entirely + // gRPC insecure mode uses plaintext (no TLS at all). buildTLSConfig is bypassed for that case. + if config.TLSCACert == "" && config.Insecure { opts = append(opts, otlpmetricgrpc.WithTLSCredentials(insecure.NewCredentials())) } else { - // Use system root CAs with MinVersion - tlsConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, + tlsConfig, err := buildTLSConfig(config.TLSCACert, false) + if err != nil { + return nil, err } - creds := credentials.NewTLS(tlsConfig) - opts = append(opts, otlpmetricgrpc.WithTLSCredentials(creds)) + opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig))) } return otlpmetricgrpc.New(ctx, opts...) } func (m *MetricsExporter) initMetrics() { - // Bifrost upstream metrics - m.upstreamRequestsTotal = &syncInt64Counter{ - name: "bifrost_upstream_requests_total", - desc: "Total number of requests forwarded to upstream providers by Bifrost", - unit: "{request}", - meter: m.meter, - } - - m.successRequestsTotal = &syncInt64Counter{ - name: "bifrost_success_requests_total", - desc: "Total number of successful requests forwarded to upstream providers by Bifrost", - unit: "{request}", - meter: m.meter, - } - - m.errorRequestsTotal = &syncInt64Counter{ - name: "bifrost_error_requests_total", - desc: "Total number of error requests forwarded to upstream providers by Bifrost", - unit: "{request}", - meter: m.meter, - } - - m.inputTokensTotal = &syncInt64Counter{ - name: "bifrost_input_tokens_total", - desc: "Total number of input tokens forwarded to upstream providers by Bifrost", - unit: "{token}", - meter: m.meter, - } - - m.outputTokensTotal = &syncInt64Counter{ - name: "bifrost_output_tokens_total", - desc: "Total number of output tokens forwarded to upstream providers by Bifrost", - unit: "{token}", - meter: m.meter, - } - - m.cacheHitsTotal = &syncInt64Counter{ - name: "bifrost_cache_hits_total", - desc: "Total number of cache hits forwarded to upstream providers by Bifrost", - unit: "{hit}", - meter: m.meter, - } - - m.costTotal = &syncFloat64Counter{ - name: "bifrost_cost_total", - desc: "Total cost in USD for requests to upstream providers", - unit: "USD", - meter: m.meter, - } - - m.upstreamLatencySeconds = &syncFloat64Histogram{ - name: "bifrost_upstream_latency_seconds", - desc: "Latency of requests forwarded to upstream providers by Bifrost", - unit: "s", - meter: m.meter, - } - - m.streamFirstTokenLatencySeconds = &syncFloat64Histogram{ - name: "bifrost_stream_first_token_latency_seconds", - desc: "Latency of the first token of a stream response", - unit: "s", - meter: m.meter, - } - - m.streamInterTokenLatencySeconds = &syncFloat64Histogram{ - name: "bifrost_stream_inter_token_latency_seconds", - desc: "Latency of the intermediate tokens of a stream response", - unit: "s", - meter: m.meter, - } - - // HTTP metrics - m.httpRequestsTotal = &syncInt64Counter{ - name: "http_requests_total", - desc: "Total number of HTTP requests", - unit: "{request}", - meter: m.meter, - } - - m.httpRequestDuration = &syncFloat64Histogram{ - name: "http_request_duration_seconds", - desc: "Duration of HTTP requests", - unit: "s", - meter: m.meter, - } - - m.httpRequestSizeBytes = &syncFloat64Histogram{ - name: "http_request_size_bytes", - desc: "Size of HTTP requests", - unit: "By", - meter: m.meter, - } - - m.httpResponseSizeBytes = &syncFloat64Histogram{ - name: "http_response_size_bytes", - desc: "Size of HTTP responses", - unit: "By", - meter: m.meter, + for _, s := range []struct { + name, desc, unit string + ptr **syncInt64Counter + }{ + {"bifrost_upstream_requests_total", "Total number of requests forwarded to upstream providers by Bifrost", "{request}", &m.upstreamRequestsTotal}, + {"bifrost_success_requests_total", "Total number of successful requests forwarded to upstream providers by Bifrost", "{request}", &m.successRequestsTotal}, + {"bifrost_error_requests_total", "Total number of error requests forwarded to upstream providers by Bifrost", "{request}", &m.errorRequestsTotal}, + {"bifrost_input_tokens_total", "Total number of input tokens forwarded to upstream providers by Bifrost", "{token}", &m.inputTokensTotal}, + {"bifrost_output_tokens_total", "Total number of output tokens forwarded to upstream providers by Bifrost", "{token}", &m.outputTokensTotal}, + {"bifrost_cache_hits_total", "Total number of cache hits forwarded to upstream providers by Bifrost", "{hit}", &m.cacheHitsTotal}, + {"http_requests_total", "Total number of HTTP requests", "{request}", &m.httpRequestsTotal}, + } { + *s.ptr = newSyncInt64Counter(s.name, s.desc, s.unit, m.meter) + } + + m.costTotal = newSyncFloat64Counter("bifrost_cost_total", "Total cost in USD for requests to upstream providers", "USD", m.meter) + + for _, s := range []struct { + name, desc, unit string + ptr **syncFloat64Histogram + }{ + {"bifrost_upstream_latency_seconds", "Latency of requests forwarded to upstream providers by Bifrost", "s", &m.upstreamLatencySeconds}, + {"bifrost_stream_first_token_latency_seconds", "Latency of the first token of a stream response", "s", &m.streamFirstTokenLatencySeconds}, + {"bifrost_stream_inter_token_latency_seconds", "Latency of the intermediate tokens of a stream response", "s", &m.streamInterTokenLatencySeconds}, + {"http_request_duration_seconds", "Duration of HTTP requests", "s", &m.httpRequestDuration}, + {"http_request_size_bytes", "Size of HTTP requests", "By", &m.httpRequestSizeBytes}, + {"http_response_size_bytes", "Size of HTTP responses", "By", &m.httpResponseSizeBytes}, + } { + *s.ptr = newSyncFloat64Histogram(s.name, s.desc, s.unit, m.meter) } } @@ -533,3 +383,147 @@ func BuildHTTPAttributes(path, method, status string) []attribute.KeyValue { attribute.String("status", status), } } + +// Helper functions for type-safe attribute extraction from trace spans +func getStringAttr(attrs map[string]any, key string) string { + if attrs == nil { + return "" + } + if v, ok := attrs[key].(string); ok { + return v + } + return "" +} + +func getIntAttr(attrs map[string]any, key string) int { + if attrs == nil { + return 0 + } + switch v := attrs[key].(type) { + case int: + return v + case int64: + return int(v) + case float64: + return int(v) + } + return 0 +} + +func getFloat64Attr(attrs map[string]any, key string) float64 { + if attrs == nil { + return 0 + } + switch v := attrs[key].(type) { + case float64: + return v + case int: + return float64(v) + case int64: + return float64(v) + } + return 0 +} + +// buildSpanAttrs extracts metric dimension attrs from a single attempt span. +func buildSpanAttrs(span *schemas.Span) []attribute.KeyValue { + attrs := span.Attributes + method := getStringAttr(attrs, "request.type") + if method == "" { + method = span.Name + } + return BuildBifrostAttributes( + getStringAttr(attrs, schemas.AttrProviderName), + getStringAttr(attrs, schemas.AttrRequestModel), + method, + getStringAttr(attrs, schemas.AttrVirtualKeyID), + getStringAttr(attrs, schemas.AttrVirtualKeyName), + getStringAttr(attrs, schemas.AttrSelectedKeyID), + getStringAttr(attrs, schemas.AttrSelectedKeyName), + getIntAttr(attrs, schemas.AttrNumberOfRetries), + getIntAttr(attrs, schemas.AttrFallbackIndex), + getStringAttr(attrs, schemas.AttrTeamID), + getStringAttr(attrs, schemas.AttrTeamName), + getStringAttr(attrs, schemas.AttrCustomerID), + getStringAttr(attrs, schemas.AttrCustomerName), + ) +} + +// recordMetricsFromTrace extracts metrics data from a completed trace and records them +// via the OTEL metrics exporter. This is called from Inject after trace emission. +// +// Per-attempt metrics (upstream_requests, errors, success, latency) are recorded once +// per llm.call/retry span so fallback attempts and failed retries are counted with +// their own provider/model/fallback_index labels. Per-trace metrics (tokens, cost, +// TTFT) are recorded once, keyed off the final (latest) attempt span. +func (m *MetricsExporter) recordMetricsFromTrace(ctx context.Context, trace *schemas.Trace) { + if trace == nil || m == nil { + return + } + + var finalSpan *schemas.Span + for _, span := range trace.Spans { + if span.Kind != schemas.SpanKindLLMCall && span.Kind != schemas.SpanKindRetry { + continue + } + + spanAttrs := buildSpanAttrs(span) + + m.RecordUpstreamRequest(ctx, spanAttrs...) + + if !span.StartTime.IsZero() && !span.EndTime.IsZero() { + latencySeconds := span.EndTime.Sub(span.StartTime).Seconds() + m.RecordUpstreamLatency(ctx, latencySeconds, spanAttrs...) + } + + if span.Status == schemas.SpanStatusError { + m.RecordErrorRequest(ctx, spanAttrs...) + } else { + m.RecordSuccessRequest(ctx, spanAttrs...) + } + + if finalSpan == nil || span.EndTime.After(finalSpan.EndTime) { + finalSpan = span + } + } + + if finalSpan == nil { + finalSpan = trace.RootSpan + } + if finalSpan == nil { + return + } + + attrs := finalSpan.Attributes + otelAttrs := buildSpanAttrs(finalSpan) + + // Record token usage - try both naming conventions + inputTokens := getIntAttr(attrs, schemas.AttrPromptTokens) + if inputTokens == 0 { + inputTokens = getIntAttr(attrs, schemas.AttrInputTokens) + } + if inputTokens > 0 { + m.RecordInputTokens(ctx, int64(inputTokens), otelAttrs...) + } + + outputTokens := getIntAttr(attrs, schemas.AttrCompletionTokens) + if outputTokens == 0 { + outputTokens = getIntAttr(attrs, schemas.AttrOutputTokens) + } + if outputTokens > 0 { + m.RecordOutputTokens(ctx, int64(outputTokens), otelAttrs...) + } + + // Record cost if available + cost := getFloat64Attr(attrs, schemas.AttrUsageCost) + if cost > 0 { + m.RecordCost(ctx, cost, otelAttrs...) + } + + // Record streaming latency metrics if available + ttft := getFloat64Attr(attrs, schemas.AttrTimeToFirstToken) + if ttft > 0 { + // Convert from nanoseconds to seconds if needed (check the unit) + m.RecordStreamFirstTokenLatency(ctx, ttft/1e9, otelAttrs...) + } +} diff --git a/plugins/otel/utils.go b/plugins/otel/utils.go new file mode 100644 index 0000000000..65646657dc --- /dev/null +++ b/plugins/otel/utils.go @@ -0,0 +1,91 @@ +package otel + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "path/filepath" + "strings" +) + +// injectEnvToHeaders converts any headers that start with "env." with their corresponding environment variable value +// errors out if any environment variable is not found +func injectEnvToHeaders(headers map[string]string) error { + if headers == nil { + return nil + } + for k, v := range headers { + if envKey, ok := strings.CutPrefix(v, "env."); ok { + envVal := os.Getenv(envKey) + if envVal == "" { + return fmt.Errorf("environment variable %s not found", envKey) + } + headers[k] = envVal + } + } + return nil +} + +// validateCACertPath validates the CA certificate path to prevent path traversal attacks. +// It ensures the path is absolute, cleaned of traversal sequences, and exists as a regular file. +func validateCACertPath(certPath string) error { + if certPath == "" { + return nil + } + + // Clean the path to resolve any .. or . components + cleanPath := filepath.Clean(certPath) + + // Require absolute paths to prevent relative path attacks + if !filepath.IsAbs(cleanPath) { + return fmt.Errorf("TLS CA cert path must be absolute: %s", certPath) + } + + // Verify the file exists and is not a symlink + info, err := os.Lstat(cleanPath) + if err != nil { + return fmt.Errorf("TLS CA cert path not accessible: %w", err) + } + // Reject symlinks to prevent symlink-based path traversal + if info.Mode()&os.ModeSymlink != 0 { + return fmt.Errorf("TLS CA cert path cannot be a symlink: %s", certPath) + } + // Ensure path is a regular file, not directories, sockets, pipes, devices, etc. + if !info.Mode().IsRegular() { + return fmt.Errorf("TLS CA cert path is not a regular file: %s", certPath) + } + + return nil +} + +// Builds a TLS config with custom CA, insecure mode, or system roots CAs +// - use a custom CA pool if tlsCACert is provided +// - otherwise skip verification if insecureMode is enabled +// - otherwise use the system root CAs +func buildTLSConfig(tlsCACert string, insecureMode bool) (*tls.Config, error) { + cfg := tls.Config{ + InsecureSkipVerify: false, + MinVersion: tls.VersionTLS12, + } + + // TLS priority: custom CA > system roots > insecure + if tlsCACert != "" { + if err := validateCACertPath(tlsCACert); err != nil { + return nil, err + } + caCert, err := os.ReadFile(tlsCACert) + if err != nil { + return nil, fmt.Errorf("failed to load provided CA cert: %w", err) + } + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to add provided CA cert") + } + cfg.RootCAs = caCertPool + } else if insecureMode { + cfg.InsecureSkipVerify = true // #nosec G402 + } + + return &cfg, nil +} diff --git a/transports/bifrost-http/server/plugins.go b/transports/bifrost-http/server/plugins.go index c0137d6e6b..5f7a30f6cb 100644 --- a/transports/bifrost-http/server/plugins.go +++ b/transports/bifrost-http/server/plugins.go @@ -118,7 +118,7 @@ func loadBuiltinPlugin(ctx context.Context, name string, pluginConfig any, bifro } // loadCustomPlugin loads a plugin from a shared object file -func loadCustomPlugin(ctx context.Context, path *string, pluginConfig any, bifrostConfig *lib.Config) (schemas.BasePlugin, error) { +func loadCustomPlugin(_ context.Context, path *string, pluginConfig any, bifrostConfig *lib.Config) (schemas.BasePlugin, error) { logger.Info("loading custom plugin from path %s", *path) plugin, err := bifrostConfig.PluginLoader.LoadPlugin(*path, pluginConfig) diff --git a/transports/config.schema.json b/transports/config.schema.json index 7b64e12e51..b82faf7074 100644 --- a/transports/config.schema.json +++ b/transports/config.schema.json @@ -1396,74 +1396,102 @@ "type": "object", "description": "Configuration for the OpenTelemetry plugin", "properties": { - "service_name": { - "type": "string", - "description": "Service name to be used for tracing", - "default": "bifrost" - }, - "collector_url": { - "type": "string", - "description": "URL of the OpenTelemetry collector", - "oneOf": [ - { - "format": "uri" - }, - { - "pattern": "^[^:\\s]+:\\d+$" - } - ] - }, - "trace_type": { - "type": "string", - "description": "Type of trace to use for the OTEL collector", - "enum": ["genai_extension", "vercel", "open_inference"] - }, - "protocol": { - "type": "string", - "description": "Protocol to use for the OTEL collector", - "enum": ["http", "grpc"] - }, - "metrics_enabled": { - "type": "boolean", - "description": "Enable push-based metrics export via OTLP. Recommended for multi-node cluster deployments.", - "default": false - }, - "metrics_endpoint": { - "type": "string", - "description": "OTLP metrics endpoint URL (e.g., http://otel-collector:4318/v1/metrics for HTTP or otel-collector:4317 for gRPC)", - "oneOf": [ - { - "format": "uri" + "profiles": { + "type": "array", + "description": "One or more OTEL collector destinations. Each profile exports to a separate collector.", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": "If false, this profile is skipped during export. Defaults to true.", + "default": true + }, + "service_name": { + "type": "string", + "description": "Service name to be used for tracing", + "default": "bifrost" + }, + "collector_url": { + "type": "string", + "description": "URL of the OpenTelemetry collector", + "oneOf": [ + { + "format": "uri" + }, + { + "pattern": "^[^:\\s]+:\\d+$" + } + ] + }, + "trace_type": { + "type": "string", + "description": "Type of trace to use for the OTEL collector", + "enum": [ + "genai_extension" + ] + }, + "protocol": { + "type": "string", + "description": "Protocol to use for the OTEL collector", + "enum": [ + "http", + "grpc" + ] + }, + "metrics_enabled": { + "type": "boolean", + "description": "Enable push-based metrics export via OTLP. Recommended for multi-node cluster deployments.", + "default": false + }, + "metrics_endpoint": { + "type": "string", + "description": "OTLP metrics endpoint URL (e.g., http://otel-collector:4318/v1/metrics for HTTP or otel-collector:4317 for gRPC)", + "oneOf": [ + { + "format": "uri" + }, + { + "pattern": "^[^:\\s]+:\\d+$" + } + ] + }, + "metrics_push_interval": { + "type": "integer", + "description": "Metrics push interval in seconds", + "default": 15, + "minimum": 1, + "maximum": 300 + }, + "headers": { + "type": "object", + "additionalProperties": { + "type": "string" + }, + "description": "Custom headers for the collector. Supports env.VAR_NAME prefix for environment variable substitution." + }, + "tls_ca_cert": { + "type": "string", + "description": "Path to TLS CA certificate file" + }, + "insecure": { + "type": "boolean", + "description": "Skip TLS verification (ignored if tls_ca_cert is set)" + } }, - { - "pattern": "^[^:\\s]+:\\d+$" - } - ] - }, - "metrics_push_interval": { - "type": "integer", - "description": "Metrics push interval in seconds", - "default": 15, - "minimum": 1, - "maximum": 300 - }, - "headers": { - "type": "object", - "additionalProperties": { - "type": "string" - }, - "description": "Custom headers for the collector. Supports env.VAR_NAME prefix for environment variable substitution." - }, - "tls_ca_cert": { - "type": "string", - "description": "Path to TLS CA certificate file" - }, - "insecure": { - "type": "boolean", - "description": "Skip TLS verification (ignored if tls_ca_cert is set)" + "required": [ + "collector_url", + "trace_type", + "protocol" + ], + "additionalProperties": false + } } }, - "required": ["collector_url", "trace_type", "protocol"], + "required": [ + "profiles" + ], "additionalProperties": false } } diff --git a/ui/app/workspace/observability/fragments/otelFormFragment.tsx b/ui/app/workspace/observability/fragments/otelFormFragment.tsx index f27438fb54..2099065477 100644 --- a/ui/app/workspace/observability/fragments/otelFormFragment.tsx +++ b/ui/app/workspace/observability/fragments/otelFormFragment.tsx @@ -1,458 +1,681 @@ import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; -import { Form, FormControl, FormDescription, FormField, FormItem, FormLabel, FormMessage } from "@/components/ui/form"; +import { + Form, + FormControl, + FormDescription, + FormField, + FormItem, + FormLabel, + FormMessage, +} from "@/components/ui/form"; import { HeadersTable } from "@/components/ui/headersTable"; import { Input } from "@/components/ui/input"; -import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; import { Switch } from "@/components/ui/switch"; -import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from "@/components/ui/tooltip"; -import { otelFormSchema, type OtelFormSchema } from "@/lib/types/schemas"; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from "@/components/ui/tooltip"; +import { + otelFormSchema, + type OtelFormSchema, + type OtelProfileConfigSchema, +} from "@/lib/types/schemas"; import { RbacOperation, RbacResource, useRbac } from "@enterprise/lib"; import { zodResolver } from "@hookform/resolvers/zod"; -import { Trash2 } from "lucide-react"; +import { ChevronDown, ChevronRight, Plus, Trash2 } from "lucide-react"; import { useEffect, useState } from "react"; -import { useForm, type Resolver } from "react-hook-form"; +import { useFieldArray, useForm, type Resolver } from "react-hook-form"; interface OtelFormFragmentProps { - currentConfig?: { - enabled?: boolean; - service_name?: string; - collector_url?: string; - headers?: Record; - trace_type?: "genai_extension" | "vercel" | "open_inference"; - protocol?: "http" | "grpc"; - // TLS configuration - tls_ca_cert?: string; - insecure?: boolean; - // Metrics push configuration - metrics_enabled?: boolean; - metrics_endpoint?: string; - metrics_push_interval?: number; - }; - onSave: (config: OtelFormSchema) => Promise; - onDelete?: () => void; - isDeleting?: boolean; - isLoading?: boolean; + currentConfig?: { + enabled?: boolean; + profiles?: Array<{ + enabled?: boolean; + service_name?: string; + collector_url?: string; + headers?: Record; + trace_type?: "genai_extension"; + protocol?: "http" | "grpc"; + tls_ca_cert?: string; + insecure?: boolean; + metrics_enabled?: boolean; + metrics_endpoint?: string; + metrics_push_interval?: number; + }>; + }; + onSave: (config: OtelFormSchema) => Promise; + onDelete?: () => void; + isDeleting?: boolean; + isLoading?: boolean; } +const DEFAULT_PROFILE: OtelProfileConfigSchema = { + enabled: false, + service_name: "bifrost", + collector_url: "", + headers: {}, + trace_type: "genai_extension", + protocol: "http", + tls_ca_cert: "", + insecure: true, + metrics_enabled: false, + metrics_endpoint: "", + metrics_push_interval: 15, +}; + +type RawProfile = NonNullable< + NonNullable["profiles"] +>[number]; + +function profileDefaults(p?: RawProfile): OtelProfileConfigSchema { + return { + enabled: p?.enabled ?? true, + service_name: p?.service_name ?? "bifrost", + collector_url: p?.collector_url ?? "", + headers: p?.headers ?? {}, + trace_type: p?.trace_type ?? "genai_extension", + protocol: p?.protocol ?? "http", + tls_ca_cert: p?.tls_ca_cert ?? "", + insecure: p?.insecure ?? true, + metrics_enabled: p?.metrics_enabled ?? false, + metrics_endpoint: p?.metrics_endpoint ?? "", + metrics_push_interval: p?.metrics_push_interval ?? 15, + }; +} + +const traceTypeOptions: { + value: string; + label: string; + disabled?: boolean; + disabledReason?: string; +}[] = [ + { value: "genai_extension", label: "OTel GenAI Extension (Recommended)" }, + { + value: "vercel", + label: "Vercel AI SDK", + disabled: true, + disabledReason: "Coming soon", + }, + { + value: "open_inference", + label: "Arize OpenInference", + disabled: true, + disabledReason: "Coming soon", + }, +]; +const protocolOptions = [ + { value: "http", label: "HTTP" }, + { value: "grpc", label: "GRPC" }, +]; + export function OtelFormFragment({ - currentConfig: initialConfig, - onSave, - onDelete, - isDeleting = false, - isLoading = false, + currentConfig: initialConfig, + onSave, + onDelete, + isDeleting = false, + isLoading = false, }: OtelFormFragmentProps) { - const hasOtelAccess = useRbac(RbacResource.Observability, RbacOperation.Update); - const [isSaving, setIsSaving] = useState(false); - const form = useForm({ - resolver: zodResolver(otelFormSchema) as Resolver, - mode: "onChange", - reValidateMode: "onChange", - defaultValues: { - enabled: initialConfig?.enabled ?? true, - otel_config: { - service_name: initialConfig?.service_name ?? "bifrost", - collector_url: initialConfig?.collector_url ?? "", - headers: initialConfig?.headers ?? {}, - trace_type: initialConfig?.trace_type ?? "genai_extension", - protocol: initialConfig?.protocol ?? "http", - tls_ca_cert: initialConfig?.tls_ca_cert ?? "", - insecure: initialConfig?.insecure ?? true, - metrics_enabled: initialConfig?.metrics_enabled ?? false, - metrics_endpoint: initialConfig?.metrics_endpoint ?? "", - metrics_push_interval: initialConfig?.metrics_push_interval ?? 15, - }, - }, - }); + const hasOtelAccess = useRbac( + RbacResource.Observability, + RbacOperation.Update, + ); + const [isSaving, setIsSaving] = useState(false); + + const makeDefaultValues = (cfg: typeof initialConfig) => ({ + enabled: cfg?.enabled ?? true, + otel_config: { + profiles: cfg?.profiles?.length + ? cfg.profiles.map(profileDefaults) + : [{ ...DEFAULT_PROFILE }], + }, + }); + + const form = useForm({ + resolver: zodResolver(otelFormSchema) as Resolver< + OtelFormSchema, + any, + OtelFormSchema + >, + mode: "onChange", + reValidateMode: "onChange", + defaultValues: makeDefaultValues(initialConfig), + }); + + const { fields, append, remove } = useFieldArray({ + control: form.control, + name: "otel_config.profiles", + }); - const onSubmit = (data: OtelFormSchema) => { - setIsSaving(true); - onSave(data).finally(() => setIsSaving(false)); - }; + const [expandedProfiles, setExpandedProfiles] = useState< + Record + >(() => Object.fromEntries(fields.map((_, i) => [i, true]))); - // Re-run validation on collector_url when protocol changes so cross-field - // refinement in the schema is applied immediately - const protocol = form.watch("otel_config.protocol"); - const metricsEnabled = form.watch("otel_config.metrics_enabled"); - useEffect(() => { - if (form.getValues("enabled") === false) return; - form.trigger("otel_config.collector_url"); - // Also re-validate metrics_endpoint when protocol changes - if (metricsEnabled) { - form.trigger("otel_config.metrics_endpoint"); - } - }, [protocol, form, metricsEnabled]); + // Auto-expand newly appended profiles. Keyed by index so state survives + // form.reset() (which regenerates useFieldArray ids). + useEffect(() => { + setExpandedProfiles((prev) => { + const next: Record = {}; + for (let i = 0; i < fields.length; i++) { + next[i] = i in prev ? prev[i] : true; + } + return next; + }); + }, [fields.length]); - // Re-run validation on metrics_endpoint when metrics_enabled changes - useEffect(() => { - if (metricsEnabled) { - form.trigger("otel_config.metrics_endpoint"); - } - }, [metricsEnabled, form]); + // Reset form when saved config changes (e.g. after page reload / navigation) + useEffect(() => { + form.reset(makeDefaultValues(initialConfig)); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [initialConfig]); - useEffect(() => { - // Reset form with new initial config when it changes - form.reset({ - enabled: initialConfig?.enabled ?? true, - otel_config: { - service_name: initialConfig?.service_name ?? "bifrost", - collector_url: initialConfig?.collector_url || "", - headers: initialConfig?.headers || {}, - trace_type: initialConfig?.trace_type || "genai_extension", - protocol: initialConfig?.protocol || "http", - tls_ca_cert: initialConfig?.tls_ca_cert ?? "", - insecure: initialConfig?.insecure ?? true, - metrics_enabled: initialConfig?.metrics_enabled ?? false, - metrics_endpoint: initialConfig?.metrics_endpoint ?? "", - metrics_push_interval: initialConfig?.metrics_push_interval ?? 15, - }, - }); - }, [form, initialConfig]); + // Re-trigger cross-field validation when protocol or metrics_enabled changes for any profile + const profiles = form.watch("otel_config.profiles"); + const protocolsKey = profiles.map((p) => p.protocol).join(","); + const metricsKey = profiles.map((p) => p.metrics_enabled).join(","); + useEffect(() => { + if (!form.getValues("enabled")) return; + profiles.forEach((profile, i) => { + void form.trigger(`otel_config.profiles.${i}.collector_url` as any); + if (profile.metrics_enabled) { + void form.trigger(`otel_config.profiles.${i}.metrics_endpoint` as any); + } + }); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [protocolsKey, metricsKey]); - const traceTypeOptions: { value: string; label: string; disabled?: boolean; disabledReason?: string }[] = [ - { value: "genai_extension", label: "OTel GenAI Extension (Recommended)" }, - { value: "vercel", label: "Vercel AI SDK", disabled: true, disabledReason: "Coming soon" }, - { value: "open_inference", label: "Arize OpenInference", disabled: true, disabledReason: "Coming soon" }, - ]; - const protocolOptions: { value: string; label: string; disabled?: boolean; disabledReason?: string }[] = [ - { value: "http", label: "HTTP" }, - { value: "grpc", label: "GRPC" }, - ]; + const onSubmit = (data: OtelFormSchema) => { + setIsSaving(true); + onSave(data).finally(() => setIsSaving(false)); + }; - return ( -
- - {/* OTEL Configuration */} -
-
- ( - - Service Name - If kept empty, the service name will be set to "bifrost" - - - - - - )} - /> - ( - - OTLP Collector URL -
- {form.watch("otel_config.protocol") === "http" ? "http(s)://:/v1/traces" : ":"} -
- - - - -
- )} - /> - ( - - - - - - - )} - /> -
- ( - - Format - - - - )} - /> + return ( + + + {/* Profile cards */} +
+ {fields.map((field, index) => { + const isExpanded = expandedProfiles[index] !== false; + const serviceName = form.watch( + `otel_config.profiles.${index}.service_name`, + ); + const collectorUrl = form.watch( + `otel_config.profiles.${index}.collector_url`, + ); + const profileTitle = serviceName?.trim() || `Profile ${index + 1}`; - ( - - Protocol - - - - )} - /> -
+ return ( +
+ {/* Collapsible header */} +
+ setExpandedProfiles((prev) => ({ + ...prev, + [index]: prev[index] === false ? true : false, + })) + } + > +
+ {profileTitle} + {!isExpanded && collectorUrl && ( + + {collectorUrl} + + )} +
+
+ ( + e.stopPropagation()} + disabled={!hasOtelAccess} + aria-label="Enable profile" + /> + )} + /> + {fields.length > 1 && ( + + )} + {isExpanded ? ( + + ) : ( + + )} +
+
- {/* TLS Configuration */} -
- ( - -
-
- Insecure (Skip TLS) - - Skip TLS verification. Disable this to use TLS with system root CAs or a custom CA certificate. - -
-
- { - field.onChange(checked); - if (checked) { - form.setValue("otel_config.tls_ca_cert", ""); - } - }} - disabled={!hasOtelAccess} - /> -
-
-
- )} - /> - {!form.watch("otel_config.insecure") && ( - ( - - TLS CA Certificate Path - - File path to the CA certificate on the Bifrost server. Leave empty to use system root CAs. - - - - - - - )} - /> - )} -
-
-
+ {/* Expanded body */} + {isExpanded && ( +
+
+ ( + + Service Name + + If kept empty, the service name will be set to + "bifrost" + + + + + + + )} + /> + ( + + OTLP Collector URL +
+ + {form.watch( + `otel_config.profiles.${index}.protocol`, + ) === "http" + ? "http(s)://:/v1/traces" + : ":"} + +
+ + + + +
+ )} + /> + ( + + + + + + + )} + /> +
+ ( + + Format + + + + )} + /> + ( + + Protocol + + + + )} + /> +
- {/* Metrics Push Configuration */} -
- ( - -
-
-

- Enable Metrics Export BETA -

-

- Push metrics to an OTEL Collector for proper aggregation in cluster deployments -

-
-
- -
-
-
- )} - /> + {/* TLS Configuration */} +
+ ( + +
+
+ Insecure (Skip TLS) + + Skip TLS verification. Disable this to use + TLS with system root CAs or a custom CA + certificate. + +
+
+ { + field.onChange(checked); + if (checked) { + form.setValue( + `otel_config.profiles.${index}.tls_ca_cert`, + "", + ); + } + }} + disabled={!hasOtelAccess} + /> +
+
+
+ )} + /> + {!form.watch( + `otel_config.profiles.${index}.insecure`, + ) && ( + ( + + TLS CA Certificate Path + + File path to the CA certificate on the Bifrost + server. Leave empty to use system root CAs. + + + + + + + )} + /> + )} +
+
- {form.watch("otel_config.metrics_enabled") && ( -
- ( - - Metrics Endpoint -
- {form.watch("otel_config.protocol") === "http" ? "http(s)://:/v1/metrics" : ":"} -
- - - - -
- )} - /> + {/* Metrics Push Configuration */} +
+ ( + +
+
+

+ Enable Metrics Export{" "} + BETA +

+

+ Push metrics to an OTEL Collector for proper + aggregation in cluster deployments +

+
+
+ +
+
+
+ )} + /> + {form.watch( + `otel_config.profiles.${index}.metrics_enabled`, + ) && ( +
+ ( + + Metrics Endpoint +
+ + {form.watch( + `otel_config.profiles.${index}.protocol`, + ) === "http" + ? "http(s)://:/v1/metrics" + : ":"} + +
+ + + + +
+ )} + /> + ( + + Push Interval (seconds) + + + field.onChange( + e.target.value === "" + ? null + : Number(e.target.value), + ) + } + /> + + + How often to push metrics (1-300 seconds) + + + + )} + /> +
+ )} +
+
+ )} +
+ ); + })} +
- ( - - Push Interval (seconds) - - field.onChange(e.target.value === "" ? null : Number(e.target.value))} - /> - - How often to push metrics (1-300 seconds) - - - )} - /> -
- )} -
+ {/* Add Profile */} + - {/* Form Actions */} -
- ( - - Enabled - - - - - )} - /> -
- {onDelete && ( - - )} - - - - - - - {(!form.formState.isDirty || !form.formState.isValid) && ( - -

- {!form.formState.isDirty && !form.formState.isValid - ? "No changes made and validation errors present" - : !form.formState.isDirty - ? "No changes made" - : "Please fix validation errors"} -

-
- )} -
-
-
-
-
- - ); + {/* Form Actions */} +
+ ( + + + Enabled + + + + + + )} + /> +
+ {onDelete && ( + + )} + + + + + + + {(!form.formState.isDirty || !form.formState.isValid) && ( + +

+ {!form.formState.isDirty && !form.formState.isValid + ? "No changes made and validation errors present" + : !form.formState.isDirty + ? "No changes made" + : "Please fix validation errors"} +

+
+ )} +
+
+
+
+ + + ); } \ No newline at end of file diff --git a/ui/lib/types/schemas.ts b/ui/lib/types/schemas.ts index 2ecf332a97..6f4d48beb7 100644 --- a/ui/lib/types/schemas.ts +++ b/ui/lib/types/schemas.ts @@ -710,15 +710,14 @@ export const betaHeadersFormSchema = z.object({ export type BetaHeadersFormSchema = z.infer; -// OTEL Configuration Schema -export const otelConfigSchema = z +// OTEL Profile Configuration Schema (per-collector configuration) +export const otelProfileConfigSchema = z .object({ + enabled: z.boolean().default(true), service_name: z.string().optional(), collector_url: z.string().default(""), trace_type: z - .enum(["genai_extension", "vercel", "open_inference"], { - message: "Please select a trace type", - }) + .enum(["genai_extension"], { message: "Please select a trace type" }) .default("genai_extension"), headers: z.record(z.string(), z.string()).optional(), protocol: z @@ -809,6 +808,11 @@ export const otelConfigSchema = z } }); +// OTEL Configuration Schema (wraps one or more collector profiles) +export const otelConfigSchema = z.object({ + profiles: z.array(otelProfileConfigSchema).min(1, "At least one profile is required"), +}); + // OTEL form schema for the OtelFormFragment export const otelFormSchema = z .object({ @@ -817,14 +821,16 @@ export const otelFormSchema = z }) .superRefine((data, ctx) => { if (data.enabled) { - const collectorUrl = (data.otel_config.collector_url || "").trim(); - if (!collectorUrl) { - ctx.addIssue({ - code: "custom", - path: ["otel_config", "collector_url"], - message: "Collector address is required", - }); - } + data.otel_config.profiles.forEach((profile, i) => { + const collectorUrl = (profile.collector_url || "").trim(); + if (!collectorUrl) { + ctx.addIssue({ + code: "custom", + path: ["otel_config", "profiles", i, "collector_url"], + message: "Collector address is required", + }); + } + }); } }); @@ -1090,6 +1096,7 @@ export type NetworkFormConfigSchema = z.infer; export type ProxyFormConfigSchema = z.infer; export type NetworkAndProxyFormSchema = z.infer; export type ProxyOnlyFormSchema = z.infer; +export type OtelProfileConfigSchema = z.infer; export type OtelConfigSchema = z.infer; export type OtelFormSchema = z.infer; export type MaximConfigSchema = z.infer;