From e641679c9ef28fa79ff13a1af384542ba42e0990 Mon Sep 17 00:00:00 2001 From: Samyabrata Maji <116789799+sammaji@users.noreply.github.com> Date: Wed, 8 Apr 2026 20:25:33 +0530 Subject: [PATCH 1/2] feat: multiple otel profiles --- core/schemas/utils.go | 49 +- framework/configstore/migrations.go | 54 +- plugins/otel/changelog.md | 1 + plugins/otel/converter.go | 111 +- plugins/otel/go.mod | 2 +- plugins/otel/grpc.go | 35 +- plugins/otel/http.go | 35 +- plugins/otel/main.go | 510 +++----- plugins/otel/metrics.go | 536 ++++---- plugins/otel/utils.go | 91 ++ transports/bifrost-http/server/plugins.go | 2 +- transports/config.schema.json | 158 ++- .../fragments/otelFormFragment.tsx | 1087 ++++++++++------- ui/lib/types/schemas.ts | 29 +- 14 files changed, 1493 insertions(+), 1207 deletions(-) create mode 100644 plugins/otel/utils.go diff --git a/core/schemas/utils.go b/core/schemas/utils.go index 5e61c84ade..2669f271e0 100644 --- a/core/schemas/utils.go +++ b/core/schemas/utils.go @@ -400,6 +400,51 @@ func SafeExtractInt(value interface{}) (int, bool) { } } +// SafeExtractInt64 safely extracts an int64 value from an any with type checking +func SafeExtractInt64(value any) (int64, bool) { + if value == nil { + return 0, false + } + switch v := value.(type) { + case int: + return int64(v), true + case int8: + return int64(v), true + case int16: + return int64(v), true + case int32: + return int64(v), true + case int64: + return v, true + case uint: + return int64(v), true + case uint8: + return int64(v), true + case uint16: + return int64(v), true + case uint32: + return int64(v), true + case uint64: + return int64(v), true + case float32: + return int64(v), true + case float64: + return int64(v), true + case json.Number: + if intVal, err := v.Int64(); err == nil { + return int64(intVal), true + } + return 0, false + case string: + if intVal, err := strconv.ParseInt(v, 10, 64); err == nil { + return intVal, true + } + return 0, false + default: + return 0, false + } +} + // SafeExtractFloat64 safely extracts a float64 value from an interface{} with type checking func SafeExtractFloat64(value interface{}) (float64, bool) { if value == nil { @@ -835,8 +880,8 @@ func DeepCopyChatTool(original ChatTool) ChatTool { if original.Function.Parameters != nil { copyParams := &ToolFunctionParameters{ - Type: original.Function.Parameters.Type, - keyOrder: original.Function.Parameters.keyOrder, + Type: original.Function.Parameters.Type, + keyOrder: original.Function.Parameters.keyOrder, explicitEmptyObject: original.Function.Parameters.explicitEmptyObject, } diff --git a/framework/configstore/migrations.go b/framework/configstore/migrations.go index a0352382f8..8096b9161d 100644 --- a/framework/configstore/migrations.go +++ b/framework/configstore/migrations.go @@ -398,6 +398,9 @@ func triggerMigrations(ctx context.Context, db *gorm.DB) error { if err := migrationNormalizeOtelTraceType(ctx, db); err != nil { return err } + if err := migrationMigrateOtelConfigToProfiles(ctx, db); err != nil { + return err + } return nil } @@ -6165,7 +6168,6 @@ func migrationAddMCPClientDiscoveredToolsColumns(ctx context.Context, db *gorm.D }}) if err := m.Migrate(); err != nil { return fmt.Errorf("error running add_mcp_client_discovered_tools_columns migration: %s", err.Error()) - } return nil } @@ -6275,7 +6277,6 @@ func migrationAddFlexTierPricingColumns(ctx context.Context, db *gorm.DB) error }}) if err := m.Migrate(); err != nil { return fmt.Errorf("error while running flex tier pricing columns migration: %s", err.Error()) - } return nil } @@ -6539,7 +6540,56 @@ func migrationNormalizeOtelTraceType(ctx context.Context, db *gorm.DB) error { }}) if err := m.Migrate(); err != nil { return fmt.Errorf("error running normalize_otel_trace_type migration: %s", err.Error()) + } + return nil +} +// migrationMigrateOtelConfigToProfiles wraps the legacy single-collector OTEL +// plugin config into the new multi-profile shape. Old: {collector_url, protocol, ...}. +// New: {profiles: [{collector_url, protocol, ...}]}. No-op if the row is already +// in the new shape or the plugin is not configured. +func migrationMigrateOtelConfigToProfiles(ctx context.Context, db *gorm.DB) error { + m := migrator.New(db, migrator.DefaultOptions, []*migrator.Migration{{ + ID: "migrate_otel_config_to_profiles", + Migrate: func(tx *gorm.DB) error { + tx = tx.WithContext(ctx) + var plugin tables.TablePlugin + err := tx.Where("name = ?", "otel").First(&plugin).Error + if err != nil { + if err == gorm.ErrRecordNotFound { + return nil + } + return fmt.Errorf("failed to load otel plugin row: %w", err) + } + // AfterFind already decrypted ConfigJSON and unmarshaled into Config. + cfgMap, ok := plugin.Config.(map[string]any) + if !ok || len(cfgMap) == 0 { + return nil + } + // Already migrated. + if _, hasProfiles := cfgMap["profiles"]; hasProfiles { + return nil + } + // Old shape has collector_url at top level. If absent, nothing to migrate. + if _, hasCollector := cfgMap["collector_url"]; !hasCollector { + return nil + } + plugin.Config = map[string]any{ + "profiles": []any{cfgMap}, + } + // Force BeforeSave to re-serialize + re-encrypt. + plugin.ConfigJSON = "" + plugin.EncryptionStatus = tables.EncryptionStatusPlainText + if err := tx.Save(&plugin).Error; err != nil { + return fmt.Errorf("failed to save migrated otel config: %w", err) + } + log.Printf("[Migration] Wrapped legacy otel config into profiles[0]") + return nil + }, + Rollback: func(tx *gorm.DB) error { return nil }, + }}) + if err := m.Migrate(); err != nil { + return fmt.Errorf("error running migrate_otel_config_to_profiles migration: %s", err.Error()) } return nil } diff --git a/plugins/otel/changelog.md b/plugins/otel/changelog.md index e69de29bb2..a33eb08e1b 100644 --- a/plugins/otel/changelog.md +++ b/plugins/otel/changelog.md @@ -0,0 +1 @@ +- feat: adds support for multiple otel profiles \ No newline at end of file diff --git a/plugins/otel/converter.go b/plugins/otel/converter.go index 9decd7c01f..eba2811be4 100644 --- a/plugins/otel/converter.go +++ b/plugins/otel/converter.go @@ -70,10 +70,10 @@ func hexToBytes(hexStr string, length int) []byte { } // convertTraceToResourceSpan converts a Bifrost trace to OTEL ResourceSpan -func (p *OtelPlugin) convertTraceToResourceSpan(trace *schemas.Trace) *ResourceSpan { +func (p *OtelProfile) convertTraceToResourceSpan(trace *schemas.Trace) *ResourceSpan { otelSpans := make([]*Span, 0, len(trace.Spans)) for _, span := range trace.Spans { - otelSpans = append(otelSpans, p.convertSpanToOTELSpan(trace.TraceID, span)) + otelSpans = append(otelSpans, convertSpanToOTELSpan(trace.TraceID, span)) } return &ResourceSpan{ @@ -81,14 +81,14 @@ func (p *OtelPlugin) convertTraceToResourceSpan(trace *schemas.Trace) *ResourceS Attributes: p.getResourceAttributes(), }, ScopeSpans: []*ScopeSpan{{ - Scope: p.getInstrumentationScope(), - Spans: otelSpans, + Scope: p.getInstrumentationScope(), + Spans: otelSpans, }}, } } // convertSpanToOTELSpan converts a single Bifrost span to OTEL format -func (p *OtelPlugin) convertSpanToOTELSpan(traceID string, span *schemas.Span) *Span { +func convertSpanToOTELSpan(traceID string, span *schemas.Span) *Span { otelSpan := &Span{ TraceId: hexToBytes(traceID, 16), SpanId: hexToBytes(span.SpanID, 8), @@ -110,7 +110,7 @@ func (p *OtelPlugin) convertSpanToOTELSpan(traceID string, span *schemas.Span) * } // getResourceAttributes returns the resource attributes for the OTEL span -func (p *OtelPlugin) getResourceAttributes() []*KeyValue { +func (p *OtelProfile) getResourceAttributes() []*KeyValue { attrs := []*KeyValue{ kvStr("service.name", p.serviceName), kvStr("service.version", p.bifrostVersion), @@ -123,7 +123,7 @@ func (p *OtelPlugin) getResourceAttributes() []*KeyValue { } // getInstrumentationScope returns the instrumentation scope for OTEL -func (p *OtelPlugin) getInstrumentationScope() *commonpb.InstrumentationScope { +func (p *OtelProfile) getInstrumentationScope() *commonpb.InstrumentationScope { return &commonpb.InstrumentationScope{ Name: p.serviceName, Version: p.bifrostVersion, @@ -156,22 +156,18 @@ func anyToKeyValue(key string, value any) *KeyValue { return nil } return kvStr(key, v) - case int: - return kvInt(key, int64(v)) - case int32: - return kvInt(key, int64(v)) - case int64: - return kvInt(key, v) - case uint: - return kvInt(key, int64(v)) - case uint32: - return kvInt(key, int64(v)) - case uint64: - return kvInt(key, int64(v)) - case float32: - return kvDbl(key, float64(v)) - case float64: - return kvDbl(key, v) + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + intValue, ok := schemas.SafeExtractInt64(v) + if !ok { + return nil + } + return kvInt(key, intValue) + case float32, float64: + floatValue, ok := schemas.SafeExtractFloat64(v) + if !ok { + return nil + } + return kvDbl(key, floatValue) case bool: return kvBool(key, v) case []string: @@ -184,32 +180,29 @@ func anyToKeyValue(key string, value any) *KeyValue { } return kvAny(key, arrValue(vals...)) case []int: - if len(v) == 0 { - return nil - } - vals := make([]*AnyValue, len(v)) - for i, n := range v { - vals[i] = &AnyValue{Value: &IntValue{IntValue: int64(n)}} - } - return kvAny(key, arrValue(vals...)) + return intSliceToKeyValue(key, v) + case []int8: + return intSliceToKeyValue(key, v) + case []int16: + return intSliceToKeyValue(key, v) + case []int32: + return intSliceToKeyValue(key, v) case []int64: - if len(v) == 0 { - return nil - } - vals := make([]*AnyValue, len(v)) - for i, n := range v { - vals[i] = &AnyValue{Value: &IntValue{IntValue: n}} - } - return kvAny(key, arrValue(vals...)) + return intSliceToKeyValue(key, v) + case []uint: + return intSliceToKeyValue(key, v) + case []uint8: + return intSliceToKeyValue(key, v) + case []uint16: + return intSliceToKeyValue(key, v) + case []uint32: + return intSliceToKeyValue(key, v) + case []uint64: + return intSliceToKeyValue(key, v) + case []float32: + return floatSliceToAttribute(key, v) case []float64: - if len(v) == 0 { - return nil - } - vals := make([]*AnyValue, len(v)) - for i, n := range v { - vals[i] = &AnyValue{Value: &DoubleValue{DoubleValue: n}} - } - return kvAny(key, arrValue(vals...)) + return floatSliceToAttribute(key, v) case map[string]any: if len(v) == 0 { return nil @@ -228,6 +221,30 @@ func anyToKeyValue(key string, value any) *KeyValue { } } +func intSliceToKeyValue[T int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64](key string, slice []T) *KeyValue { + if len(slice) == 0 { + return nil + } + vals := make([]*AnyValue, len(slice)) + for i, n := range slice { + iv, _ := schemas.SafeExtractInt64(n) + vals[i] = &AnyValue{Value: &IntValue{IntValue: iv}} + } + return kvAny(key, arrValue(vals...)) +} + +func floatSliceToAttribute[T float32 | float64](key string, slice []T) *KeyValue { + if len(slice) == 0 { + return nil + } + vals := make([]*AnyValue, len(slice)) + for i, n := range slice { + fv, _ := schemas.SafeExtractFloat64(n) + vals[i] = &AnyValue{Value: &DoubleValue{DoubleValue: fv}} + } + return kvAny(key, arrValue(vals...)) +} + // convertSpanKind maps Bifrost SpanKind to OTEL SpanKind func convertSpanKind(kind schemas.SpanKind) tracepb.Span_SpanKind { switch kind { diff --git a/plugins/otel/go.mod b/plugins/otel/go.mod index 7595e08f75..c1641a7591 100644 --- a/plugins/otel/go.mod +++ b/plugins/otel/go.mod @@ -157,7 +157,7 @@ require ( ) require ( - github.com/bytedance/sonic v1.15.0 + github.com/bytedance/sonic v1.15.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect diff --git a/plugins/otel/grpc.go b/plugins/otel/grpc.go index 6cdd5f9a08..082b8d176a 100644 --- a/plugins/otel/grpc.go +++ b/plugins/otel/grpc.go @@ -2,10 +2,6 @@ package otel import ( "context" - "crypto/tls" - "crypto/x509" - "fmt" - "os" collectorpb "go.opentelemetry.io/proto/otlp/collector/trace/v1" "google.golang.org/grpc" @@ -24,33 +20,14 @@ type OtelClientGRPC struct { // NewOtelClientGRPC creates a new OpenTelemetry client for gRPC func NewOtelClientGRPC(endpoint string, headers map[string]string, tlsCACert string, insecureMode bool) (*OtelClientGRPC, error) { var creds credentials.TransportCredentials - // TLS priority: custom CA > system roots > insecure - if tlsCACert != "" { - // Validate the CA cert path to prevent path traversal attacks - if err := validateCACertPath(tlsCACert); err != nil { - return nil, err - } - // Use custom CA certificate with MinVersion - caCert, err := os.ReadFile(tlsCACert) - if err != nil { - return nil, fmt.Errorf("fail to load provided CA cert: %w", err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, fmt.Errorf("fail to parse provided CA cert") - } - tlsConfig := &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, - } - creds = credentials.NewTLS(tlsConfig) - } else if insecureMode { - // Skip TLS entirely + // gRPC insecure mode uses plaintext (no TLS at all), not just skip-verify. + // buildTLSConfig is bypassed here to preserve that behaviour. + if tlsCACert == "" && insecureMode { creds = insecure.NewCredentials() } else { - // Use system root CAs with MinVersion - tlsConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, + tlsConfig, err := buildTLSConfig(tlsCACert, false) + if err != nil { + return nil, err } creds = credentials.NewTLS(tlsConfig) } diff --git a/plugins/otel/http.go b/plugins/otel/http.go index 08ba89284d..9cab25fb23 100644 --- a/plugins/otel/http.go +++ b/plugins/otel/http.go @@ -3,12 +3,9 @@ package otel import ( "bytes" "context" - "crypto/tls" - "crypto/x509" "fmt" "io" "net/http" - "os" "strings" "time" @@ -30,35 +27,11 @@ func NewOtelClientHTTP(endpoint string, headers map[string]string, tlsCACert str transport.MaxIdleConnsPerHost = 10 transport.IdleConnTimeout = 120 * time.Second - // TLS priority: custom CA > system roots > insecure - if tlsCACert != "" { - // Validate the CA cert path to prevent path traversal attacks - if err := validateCACertPath(tlsCACert); err != nil { - return nil, err - } - caCert, err := os.ReadFile(tlsCACert) - if err != nil { - return nil, fmt.Errorf("fail to load provided CA cert: %w", err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, fmt.Errorf("fail to add provided CA cert") - } - transport.TLSClientConfig = &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, - } - } else if insecureMode { - transport.TLSClientConfig = &tls.Config{ - InsecureSkipVerify: true, // #nosec G402 - MinVersion: tls.VersionTLS12, - } - } else { - // Use system root CAs with MinVersion - transport.TLSClientConfig = &tls.Config{ - MinVersion: tls.VersionTLS12, - } + tlsConfig, err := buildTLSConfig(tlsCACert, insecureMode) + if err != nil { + return nil, err } + transport.TLSClientConfig = tlsConfig return &OtelClientHTTP{client: &http.Client{ Timeout: 30 * time.Second, diff --git a/plugins/otel/main.go b/plugins/otel/main.go index faa32944e9..94c85a5a37 100644 --- a/plugins/otel/main.go +++ b/plugins/otel/main.go @@ -10,15 +10,14 @@ import ( "github.com/bytedance/sonic" "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/framework/modelcatalog" - "go.opentelemetry.io/otel/attribute" commonpb "go.opentelemetry.io/proto/otlp/common/v1" ) -// logger is the logger for the OTEL plugin +// logger is the package-level logger, set once in Init. var logger schemas.Logger -// OTELResponseAttributesEnvKey is the environment variable key for the OTEL resource attributes -// We check if this is present in the environment variables and if so, we will use it to set the attributes for all spans at the resource level +// OTELResponseAttributesEnvKey is the environment variable key for the OTEL resource attributes. +// If set, its key=value pairs are attached to every span at the resource level. const OTELResponseAttributesEnvKey = "OTEL_RESOURCE_ATTRIBUTES" const PluginName = "otel" @@ -26,32 +25,30 @@ const PluginName = "otel" // TraceType is the type of trace to use for the OTEL collector type TraceType string -// TraceTypeGenAIExtension is the type of trace to use for the OTEL collector -const TraceTypeGenAIExtension TraceType = "genai_extension" - -// TraceTypeVercel is the type of trace to use for the OTEL collector -const TraceTypeVercel TraceType = "vercel" - -// TraceTypeOpenInference is the type of trace to use for the OTEL collector -const TraceTypeOpenInference TraceType = "open_inference" +const ( + TraceTypeGenAIExtension TraceType = "genai_extension" + TraceTypeVercel TraceType = "vercel" + TraceTypeOpenInference TraceType = "open_inference" +) // Protocol is the protocol to use for the OTEL collector type Protocol string -// ProtocolHTTP is the default protocol -const ProtocolHTTP Protocol = "http" - -// ProtocolGRPC is the second protocol -const ProtocolGRPC Protocol = "grpc" +const ( + ProtocolHTTP Protocol = "http" + ProtocolGRPC Protocol = "grpc" +) -type Config struct { +// OtelProfileConfig is the per-collector configuration. +type OtelProfileConfig struct { + Enabled *bool `json:"enabled,omitempty"` // nil or true = enabled; false = skip during export ServiceName string `json:"service_name"` CollectorURL string `json:"collector_url"` Headers map[string]string `json:"headers"` TraceType TraceType `json:"trace_type"` Protocol Protocol `json:"protocol"` TLSCACert string `json:"tls_ca_cert"` - Insecure bool `json:"insecure"` // Skip TLS when true; ignored if TLSCACert is set. Defaults to true when omitted. + Insecure bool `json:"insecure"` // Skip TLS when true; ignored if TLSCACert is set // Metrics push configuration MetricsEnabled bool `json:"metrics_enabled"` @@ -62,8 +59,8 @@ type Config struct { // UnmarshalJSON applies field defaults that the zero-value wouldn't capture. // Specifically, Insecure defaults to true when the key is omitted so http:// // collectors work out-of-the-box without forcing users to set it explicitly. -func (c *Config) UnmarshalJSON(data []byte) error { - type alias Config +func (c *OtelProfileConfig) UnmarshalJSON(data []byte) error { + type alias OtelProfileConfig aux := struct { Insecure *bool `json:"insecure"` *alias @@ -81,32 +78,37 @@ func (c *Config) UnmarshalJSON(data []byte) error { return nil } +// Config holds one or more collector profiles. +type Config struct { + Profiles []*OtelProfileConfig +} + +// OtelProfile holds the runtime state for a single collector destination: +// its client, optional metrics exporter, and the metadata used to build +// OTEL resource/scope attributes on every emitted span. +type OtelProfile struct { + serviceName string + url string + headers map[string]string + traceType TraceType + protocol Protocol + bifrostVersion string + attributesFromEnvironment []*commonpb.KeyValue + client OtelClient + metricsExporter *MetricsExporter +} + // OtelPlugin is the plugin for OpenTelemetry. -// It implements the ObservabilityPlugin interface to receive completed traces -// from the tracing middleware and forward them to an OTEL collector. +// It implements ObservabilityPlugin and fans traces out to every configured profile. type OtelPlugin struct { ctx context.Context cancel context.CancelFunc - serviceName string - url string - headers map[string]string - traceType TraceType - protocol Protocol - - bifrostVersion string - - attributesFromEnvironment []*commonpb.KeyValue - - client OtelClient - + profiles []*OtelProfile pricingManager *modelcatalog.ModelCatalog - - // Metrics push support - metricsExporter *MetricsExporter } -// Init function for the OTEL plugin +// Init creates the plugin, initialising one client (and optional metrics exporter) per profile. func Init(ctx context.Context, config *Config, _logger schemas.Logger, pricingManager *modelcatalog.ModelCatalog, bifrostVersion string) (*OtelPlugin, error) { if config == nil { return nil, fmt.Errorf("config is required") @@ -115,95 +117,137 @@ func Init(ctx context.Context, config *Config, _logger schemas.Logger, pricingMa if pricingManager == nil { logger.Warn("otel plugin requires model catalog to calculate cost, all cost calculations will be skipped.") } - var err error - // If headers are present, and any of them start with env., we will replace the value with the environment variable - if config.Headers != nil { - for key, value := range config.Headers { - if newValue, ok := strings.CutPrefix(value, "env."); ok { - config.Headers[key] = os.Getenv(newValue) - if config.Headers[key] == "" { - logger.Warn("environment variable %s not found", newValue) - return nil, fmt.Errorf("environment variable %s not found", newValue) - } - } - } - } - if config.ServiceName == "" { - config.ServiceName = "bifrost" + if len(config.Profiles) == 0 { + return nil, fmt.Errorf("at least one profile is required") } - // Loading attributes from environment - attributesFromEnvironment := make([]*commonpb.KeyValue, 0) - if attributes, ok := os.LookupEnv(OTELResponseAttributesEnvKey); ok { - // We will split the attributes by , and then split each attribute by = - for attribute := range strings.SplitSeq(attributes, ",") { - attributeParts := strings.Split(strings.TrimSpace(attribute), "=") - if len(attributeParts) == 2 { - attributesFromEnvironment = append(attributesFromEnvironment, kvStr(strings.TrimSpace(attributeParts[0]), strings.TrimSpace(attributeParts[1]))) - } + + attributesFromEnvironment := loadEnvAttributes() + + profiles := make([]*OtelProfile, 0, len(config.Profiles)) + for i, profileCfg := range config.Profiles { + if profileCfg == nil { + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d]: config is required", i) } - } - // Preparing the plugin - p := &OtelPlugin{ - serviceName: config.ServiceName, - url: config.CollectorURL, - traceType: config.TraceType, - headers: config.Headers, - protocol: config.Protocol, - pricingManager: pricingManager, - bifrostVersion: bifrostVersion, - attributesFromEnvironment: attributesFromEnvironment, - } - p.ctx, p.cancel = context.WithCancel(ctx) - if config.Protocol == ProtocolGRPC { - p.client, err = NewOtelClientGRPC(config.CollectorURL, config.Headers, config.TLSCACert, config.Insecure) - if err != nil { - return nil, err + if profileCfg.Enabled != nil && !*profileCfg.Enabled { + continue } - } - if config.Protocol == ProtocolHTTP { - p.client, err = NewOtelClientHTTP(config.CollectorURL, config.Headers, config.TLSCACert, config.Insecure) - if err != nil { - return nil, err + if err := injectEnvToHeaders(profileCfg.Headers); err != nil { + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d]: %w", i, err) } - } - if p.client == nil { - return nil, fmt.Errorf("otel client is not initialized. invalid protocol type") - } - - // Initialize metrics exporter if enabled - if config.MetricsEnabled { - if config.MetricsEndpoint == "" { - return nil, fmt.Errorf("metrics_endpoint is required when metrics_enabled is true") + if profileCfg.CollectorURL == "" { + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d]: collector_url is required", i) + } + if profileCfg.ServiceName == "" { + profileCfg.ServiceName = "bifrost" } - pushInterval := config.MetricsPushInterval - if pushInterval <= 0 { - pushInterval = 15 // default 15 seconds - } else if pushInterval > 300 { - return nil, fmt.Errorf("metrics_push_interval must be between 1 and 300 seconds, got %d", pushInterval) + if profileCfg.TraceType == "" { + profileCfg.TraceType = TraceTypeGenAIExtension } - metricsConfig := &MetricsConfig{ - ServiceName: config.ServiceName, - Endpoint: config.MetricsEndpoint, - Headers: config.Headers, - Protocol: config.Protocol, - TLSCACert: config.TLSCACert, - Insecure: config.Insecure, - PushInterval: pushInterval, + if profileCfg.Protocol == "" { + profileCfg.Protocol = ProtocolHTTP + } + + var ( + client OtelClient + err error + ) + switch profileCfg.Protocol { + case ProtocolGRPC: + client, err = NewOtelClientGRPC(profileCfg.CollectorURL, profileCfg.Headers, profileCfg.TLSCACert, profileCfg.Insecure) + case ProtocolHTTP: + client, err = NewOtelClientHTTP(profileCfg.CollectorURL, profileCfg.Headers, profileCfg.TLSCACert, profileCfg.Insecure) + default: + err = fmt.Errorf("unsupported protocol: %s", profileCfg.Protocol) } - p.metricsExporter, err = NewMetricsExporter(p.ctx, metricsConfig) if err != nil { - // Clean up trace client if metrics exporter fails - if p.client != nil { - p.client.Close() + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d] (%s): %w", i, profileCfg.ServiceName, err) + } + + profile := &OtelProfile{ + serviceName: profileCfg.ServiceName, + url: profileCfg.CollectorURL, + headers: profileCfg.Headers, + traceType: profileCfg.TraceType, + protocol: profileCfg.Protocol, + bifrostVersion: bifrostVersion, + attributesFromEnvironment: attributesFromEnvironment, + client: client, + } + + if profileCfg.MetricsEnabled { + if profileCfg.MetricsEndpoint == "" { + _ = client.Close() + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d] (%s): metrics_endpoint is required when metrics_enabled is true", i, profileCfg.ServiceName) + } + pushInterval := profileCfg.MetricsPushInterval + if pushInterval <= 0 { + pushInterval = 15 + } else if pushInterval > 300 { + _ = client.Close() + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d] (%s): metrics_push_interval must be between 1 and 300 seconds, got %d", i, profileCfg.ServiceName, pushInterval) + } + metricsConfig := &MetricsConfig{ + ServiceName: profileCfg.ServiceName, + Endpoint: profileCfg.MetricsEndpoint, + Headers: profileCfg.Headers, + Protocol: profileCfg.Protocol, + TLSCACert: profileCfg.TLSCACert, + Insecure: profileCfg.Insecure, + PushInterval: pushInterval, } - return nil, fmt.Errorf("failed to initialize metrics exporter: %w", err) + profile.metricsExporter, err = NewMetricsExporter(ctx, metricsConfig, bifrostVersion) + if err != nil { + _ = client.Close() + closeProfiles(profiles) + return nil, fmt.Errorf("profile[%d] (%s): failed to initialize metrics exporter: %w", i, profileCfg.ServiceName, err) + } + logger.Info("OTEL metrics push enabled for %s, pushing to %s every %d seconds", profileCfg.ServiceName, profileCfg.MetricsEndpoint, pushInterval) } - logger.Info("OTEL metrics push enabled, pushing to %s every %d seconds", config.MetricsEndpoint, pushInterval) + + profiles = append(profiles, profile) + } + + p := &OtelPlugin{ + profiles: profiles, + pricingManager: pricingManager, } + p.ctx, p.cancel = context.WithCancel(ctx) return p, nil } +// loadEnvAttributes parses OTEL_RESOURCE_ATTRIBUTES into KeyValue pairs. +func loadEnvAttributes() []*commonpb.KeyValue { + result := make([]*commonpb.KeyValue, 0) + if attributes, ok := os.LookupEnv(OTELResponseAttributesEnvKey); ok { + for attribute := range strings.SplitSeq(attributes, ",") { + parts := strings.Split(strings.TrimSpace(attribute), "=") + if len(parts) == 2 { + result = append(result, kvStr(strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]))) + } + } + } + return result +} + +// closeProfiles is used in the Init error path to clean up already-initialised profiles. +func closeProfiles(profiles []*OtelProfile) { + for _, p := range profiles { + if p.metricsExporter != nil { + _ = p.metricsExporter.Shutdown(context.Background()) + } + if p.client != nil { + _ = p.client.Close() + } + } +} + // GetName function for the OTEL plugin func (p *OtelPlugin) GetName() string { return PluginName @@ -224,246 +268,56 @@ func (p *OtelPlugin) HTTPTransportStreamChunkHook(ctx *schemas.BifrostContext, r return chunk, nil } -// ValidateConfig function for the OTEL plugin -func (p *OtelPlugin) ValidateConfig(config any) (*Config, error) { - var otelConfig Config - // Checking if its a string, then we will JSON parse and confirm - if configStr, ok := config.(string); ok { - if err := sonic.Unmarshal([]byte(configStr), &otelConfig); err != nil { - return nil, err - } - } - // Checking if its a map[string]any, then we will JSON parse and confirm - if configMap, ok := config.(map[string]any); ok { - configString, err := sonic.Marshal(configMap) - if err != nil { - return nil, err - } - if err := sonic.Unmarshal([]byte(configString), &otelConfig); err != nil { - return nil, err - } - } - // Checking if its a Config, then we will confirm - if config, ok := config.(*Config); ok { - otelConfig = *config - } - // Validating fields - if otelConfig.CollectorURL == "" { - return nil, fmt.Errorf("collector url is required") - } - if otelConfig.TraceType == "" { - return nil, fmt.Errorf("trace type is required") - } - if otelConfig.Protocol == "" { - return nil, fmt.Errorf("protocol is required") - } - return &otelConfig, nil -} - // PreLLMHook is a no-op - tracing is handled via the Inject method. -// The OTEL plugin receives completed traces from TracingMiddleware. func (p *OtelPlugin) PreLLMHook(_ *schemas.BifrostContext, req *schemas.BifrostRequest) (*schemas.BifrostRequest, *schemas.LLMPluginShortCircuit, error) { return req, nil, nil } // PostLLMHook is a no-op - tracing is handled via the Inject method. -// The OTEL plugin receives completed traces from TracingMiddleware. func (p *OtelPlugin) PostLLMHook(_ *schemas.BifrostContext, resp *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError, error) { return resp, bifrostErr, nil } -// Inject receives a completed trace and sends it to the OTEL collector. -// Implements schemas.ObservabilityPlugin interface. -// This method is called asynchronously by TracingMiddleware after the response -// has been written to the client. +// Inject receives a completed trace and forwards it to every configured collector profile. func (p *OtelPlugin) Inject(ctx context.Context, trace *schemas.Trace) error { if trace == nil { return nil } - - // Emit trace to collector if client is initialized - if p.client != nil { - // Convert schemas.Trace to OTEL ResourceSpan - resourceSpan := p.convertTraceToResourceSpan(trace) - - // Emit to collector - if err := p.client.Emit(ctx, []*ResourceSpan{resourceSpan}); err != nil { - logger.Error("failed to emit trace %s: %v", trace.TraceID, err) + for _, profile := range p.profiles { + resourceSpan := profile.convertTraceToResourceSpan(trace) + if err := profile.client.Emit(ctx, []*ResourceSpan{resourceSpan}); err != nil { + logger.Error("failed to emit trace %s to %s: %v", trace.TraceID, profile.url, err) } - } - - // Record metrics if metrics exporter is enabled - if p.metricsExporter != nil { - p.recordMetricsFromTrace(ctx, trace) - } - - return nil -} - -// Helper functions for type-safe attribute extraction from trace spans -func getStringAttr(attrs map[string]any, key string) string { - if attrs == nil { - return "" - } - if v, ok := attrs[key].(string); ok { - return v - } - return "" -} - -func getIntAttr(attrs map[string]any, key string) int { - if attrs == nil { - return 0 - } - switch v := attrs[key].(type) { - case int: - return v - case int64: - return int(v) - case float64: - return int(v) - } - return 0 -} - -func getFloat64Attr(attrs map[string]any, key string) float64 { - if attrs == nil { - return 0 - } - switch v := attrs[key].(type) { - case float64: - return v - case int: - return float64(v) - case int64: - return float64(v) - } - return 0 -} - -// buildSpanAttrs extracts metric dimension attrs from a single attempt span. -func buildSpanAttrs(span *schemas.Span) []attribute.KeyValue { - attrs := span.Attributes - method := getStringAttr(attrs, "request.type") - if method == "" { - method = span.Name - } - return BuildBifrostAttributes( - getStringAttr(attrs, schemas.AttrProviderName), - getStringAttr(attrs, schemas.AttrRequestModel), - method, - getStringAttr(attrs, schemas.AttrVirtualKeyID), - getStringAttr(attrs, schemas.AttrVirtualKeyName), - getStringAttr(attrs, schemas.AttrSelectedKeyID), - getStringAttr(attrs, schemas.AttrSelectedKeyName), - getIntAttr(attrs, schemas.AttrNumberOfRetries), - getIntAttr(attrs, schemas.AttrFallbackIndex), - getStringAttr(attrs, schemas.AttrTeamID), - getStringAttr(attrs, schemas.AttrTeamName), - getStringAttr(attrs, schemas.AttrCustomerID), - getStringAttr(attrs, schemas.AttrCustomerName), - ) -} - -// recordMetricsFromTrace extracts metrics data from a completed trace and records them -// via the OTEL metrics exporter. This is called from Inject after trace emission. -// -// Per-attempt metrics (upstream_requests, errors, success, latency) are recorded once -// per llm.call/retry span so fallback attempts and failed retries are counted with -// their own provider/model/fallback_index labels. Per-trace metrics (tokens, cost, -// TTFT) are recorded once, keyed off the final (latest) attempt span. -func (p *OtelPlugin) recordMetricsFromTrace(ctx context.Context, trace *schemas.Trace) { - if trace == nil || p.metricsExporter == nil { - return - } - - var finalSpan *schemas.Span - for _, span := range trace.Spans { - if span.Kind != schemas.SpanKindLLMCall && span.Kind != schemas.SpanKindRetry { - continue + if profile.metricsExporter != nil { + profile.metricsExporter.recordMetricsFromTrace(ctx, trace) } - - spanAttrs := buildSpanAttrs(span) - - p.metricsExporter.RecordUpstreamRequest(ctx, spanAttrs...) - - if !span.StartTime.IsZero() && !span.EndTime.IsZero() { - latencySeconds := span.EndTime.Sub(span.StartTime).Seconds() - p.metricsExporter.RecordUpstreamLatency(ctx, latencySeconds, spanAttrs...) - } - - if span.Status == schemas.SpanStatusError { - p.metricsExporter.RecordErrorRequest(ctx, spanAttrs...) - } else { - p.metricsExporter.RecordSuccessRequest(ctx, spanAttrs...) - } - - if finalSpan == nil || span.EndTime.After(finalSpan.EndTime) { - finalSpan = span - } - } - - if finalSpan == nil { - finalSpan = trace.RootSpan - } - if finalSpan == nil { - return - } - - attrs := finalSpan.Attributes - otelAttrs := buildSpanAttrs(finalSpan) - - // Record token usage - try both naming conventions - inputTokens := getIntAttr(attrs, schemas.AttrPromptTokens) - if inputTokens == 0 { - inputTokens = getIntAttr(attrs, schemas.AttrInputTokens) - } - if inputTokens > 0 { - p.metricsExporter.RecordInputTokens(ctx, int64(inputTokens), otelAttrs...) - } - - outputTokens := getIntAttr(attrs, schemas.AttrCompletionTokens) - if outputTokens == 0 { - outputTokens = getIntAttr(attrs, schemas.AttrOutputTokens) - } - if outputTokens > 0 { - p.metricsExporter.RecordOutputTokens(ctx, int64(outputTokens), otelAttrs...) - } - - // Record cost if available - cost := getFloat64Attr(attrs, schemas.AttrUsageCost) - if cost > 0 { - p.metricsExporter.RecordCost(ctx, cost, otelAttrs...) - } - - // Record streaming latency metrics if available - ttft := getFloat64Attr(attrs, schemas.AttrTimeToFirstToken) - if ttft > 0 { - // Convert from nanoseconds to seconds if needed (check the unit) - p.metricsExporter.RecordStreamFirstTokenLatency(ctx, ttft/1e9, otelAttrs...) } + return nil } -// Cleanup function for the OTEL plugin +// Cleanup shuts down all profile clients and metrics exporters. func (p *OtelPlugin) Cleanup() error { if p.cancel != nil { p.cancel() } - // Shutdown metrics exporter first - if p.metricsExporter != nil { - if err := p.metricsExporter.Shutdown(context.Background()); err != nil { - logger.Error("failed to shutdown metrics exporter: %v", err) + var firstErr error + for _, profile := range p.profiles { + if profile.metricsExporter != nil { + if err := profile.metricsExporter.Shutdown(context.Background()); err != nil { + logger.Error("failed to shutdown metrics exporter for %s: %v", profile.serviceName, err) + if firstErr == nil { + firstErr = err + } + } + } + if err := profile.client.Close(); err != nil { + logger.Error("failed to close client for %s: %v", profile.serviceName, err) + if firstErr == nil { + firstErr = err + } } } - if p.client != nil { - return p.client.Close() - } - return nil -} - -// GetMetricsExporter returns the metrics exporter for external use (e.g., by telemetry plugin) -func (p *OtelPlugin) GetMetricsExporter() *MetricsExporter { - return p.metricsExporter + return firstErr } // Compile-time check that OtelPlugin implements ObservabilityPlugin diff --git a/plugins/otel/metrics.go b/plugins/otel/metrics.go index e1b5d85089..1d3d58c0df 100644 --- a/plugins/otel/metrics.go +++ b/plugins/otel/metrics.go @@ -2,14 +2,12 @@ package otel import ( "context" - "crypto/tls" - "crypto/x509" "fmt" "os" - "path/filepath" "sync" "time" + "github.com/maximhq/bifrost/core/schemas" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -61,86 +59,84 @@ type MetricsExporter struct { httpResponseSizeBytes *syncFloat64Histogram } -// syncInt64Counter wraps metric.Int64Counter with thread-safe lazy initialization -type syncInt64Counter struct { - counter metric.Int64Counter +// onceCounter provides thread-safe once-initialization for any OTel metric instrument. +type onceCounter[I any] struct { + counter I + ok bool once sync.Once - name string - desc string - unit string - meter metric.Meter } -func (c *syncInt64Counter) Add(ctx context.Context, value int64, opts ...metric.AddOption) { - c.once.Do(func() { +func (o *onceCounter[I]) load(name string, create func() (I, error)) (I, bool) { + o.once.Do(func() { var err error - c.counter, err = c.meter.Int64Counter(c.name, - metric.WithDescription(c.desc), - metric.WithUnit(c.unit), - ) + o.counter, err = create() + o.ok = err == nil if err != nil { - logger.Error("failed to create counter %s: %v", c.name, err) + logger.Error("failed to create metric %s: %v", name, err) } }) - if c.counter != nil { - c.counter.Add(ctx, value, opts...) + return o.counter, o.ok +} + +// syncInt64Counter wraps metric.Int64Counter with thread-safe lazy initialization +type syncInt64Counter struct { + onceCounter[metric.Int64Counter] + name, desc, unit string + meter metric.Meter +} + +func newSyncInt64Counter(name, desc, unit string, meter metric.Meter) *syncInt64Counter { + return &syncInt64Counter{name: name, desc: desc, unit: unit, meter: meter} +} + +func (c *syncInt64Counter) Add(ctx context.Context, value int64, opts ...metric.AddOption) { + if inst, ok := c.load(c.name, func() (metric.Int64Counter, error) { + return c.meter.Int64Counter(c.name, metric.WithDescription(c.desc), metric.WithUnit(c.unit)) + }); ok { + inst.Add(ctx, value, opts...) } } // syncFloat64Counter wraps metric.Float64Counter with thread-safe lazy initialization type syncFloat64Counter struct { - counter metric.Float64Counter - once sync.Once - name string - desc string - unit string - meter metric.Meter + onceCounter[metric.Float64Counter] + name, desc, unit string + meter metric.Meter +} + +func newSyncFloat64Counter(name, desc, unit string, meter metric.Meter) *syncFloat64Counter { + return &syncFloat64Counter{name: name, desc: desc, unit: unit, meter: meter} } func (c *syncFloat64Counter) Add(ctx context.Context, value float64, opts ...metric.AddOption) { - c.once.Do(func() { - var err error - c.counter, err = c.meter.Float64Counter(c.name, - metric.WithDescription(c.desc), - metric.WithUnit(c.unit), - ) - if err != nil { - logger.Error("failed to create float counter %s: %v", c.name, err) - } - }) - if c.counter != nil { - c.counter.Add(ctx, value, opts...) + if inst, ok := c.load(c.name, func() (metric.Float64Counter, error) { + return c.meter.Float64Counter(c.name, metric.WithDescription(c.desc), metric.WithUnit(c.unit)) + }); ok { + inst.Add(ctx, value, opts...) } } // syncFloat64Histogram wraps metric.Float64Histogram with thread-safe lazy initialization type syncFloat64Histogram struct { - histogram metric.Float64Histogram - once sync.Once - name string - desc string - unit string - meter metric.Meter + onceCounter[metric.Float64Histogram] + name, desc, unit string + meter metric.Meter +} + +func newSyncFloat64Histogram(name, desc, unit string, meter metric.Meter) *syncFloat64Histogram { + return &syncFloat64Histogram{name: name, desc: desc, unit: unit, meter: meter} } func (h *syncFloat64Histogram) Record(ctx context.Context, value float64, opts ...metric.RecordOption) { - h.once.Do(func() { - var err error - h.histogram, err = h.meter.Float64Histogram(h.name, - metric.WithDescription(h.desc), - metric.WithUnit(h.unit), - ) - if err != nil { - logger.Error("failed to create histogram %s: %v", h.name, err) - } - }) - if h.histogram != nil { - h.histogram.Record(ctx, value, opts...) + if inst, ok := h.load(h.name, func() (metric.Float64Histogram, error) { + return h.meter.Float64Histogram(h.name, metric.WithDescription(h.desc), metric.WithUnit(h.unit)) + }); ok { + inst.Record(ctx, value, opts...) } } // NewMetricsExporter creates a new OTEL metrics exporter -func NewMetricsExporter(ctx context.Context, config *MetricsConfig) (*MetricsExporter, error) { +func NewMetricsExporter(ctx context.Context, config *MetricsConfig, version string) (*MetricsExporter, error) { // Generate a unique instance ID for this node instanceID, err := os.Hostname() if err != nil { @@ -187,7 +183,7 @@ func NewMetricsExporter(ctx context.Context, config *MetricsConfig) (*MetricsExp // Create meter meter := provider.Meter("bifrost", - metric.WithInstrumentationVersion("1.0.0"), + metric.WithInstrumentationVersion(version), ) // Create metrics exporter @@ -202,43 +198,6 @@ func NewMetricsExporter(ctx context.Context, config *MetricsConfig) (*MetricsExp return m, nil } -// validateCACertPath validates the CA certificate path to prevent path traversal attacks. -// It ensures the path is absolute, cleaned of traversal sequences, and exists as a regular file. -func validateCACertPath(certPath string) error { - if certPath == "" { - return nil - } - - // Clean the path to resolve any .. or . components - cleanPath := filepath.Clean(certPath) - - // Require absolute paths to prevent relative path attacks - if !filepath.IsAbs(cleanPath) { - return fmt.Errorf("TLS CA cert path must be absolute: %s", certPath) - } - - // Check that the cleaned path doesn't differ significantly from input - // (indicates attempted traversal) - if cleanPath != filepath.Clean(filepath.FromSlash(certPath)) { - return fmt.Errorf("invalid TLS CA cert path: %s", certPath) - } - - // Verify the file exists and is not a symlink - info, err := os.Lstat(cleanPath) - if err != nil { - return fmt.Errorf("TLS CA cert path not accessible: %w", err) - } - // Reject symlinks to prevent symlink-based path traversal - if info.Mode()&os.ModeSymlink != 0 { - return fmt.Errorf("TLS CA cert path cannot be a symlink: %s", certPath) - } - if !info.Mode().IsRegular() { - return fmt.Errorf("TLS CA cert path is not a regular file: %s", certPath) - } - - return nil -} - func createHTTPExporter(ctx context.Context, config *MetricsConfig) (sdkmetric.Exporter, error) { opts := []otlpmetrichttp.Option{ otlpmetrichttp.WithEndpointURL(config.Endpoint), @@ -248,34 +207,16 @@ func createHTTPExporter(ctx context.Context, config *MetricsConfig) (sdkmetric.E opts = append(opts, otlpmetrichttp.WithHeaders(config.Headers)) } - // TLS priority: custom CA > system roots > insecure - if config.TLSCACert != "" { - // Validate the CA cert path to prevent path traversal attacks - if err := validateCACertPath(config.TLSCACert); err != nil { - return nil, err - } - // Use custom CA certificate - caCert, err := os.ReadFile(config.TLSCACert) + // HTTP metrics insecure mode disables TLS entirely (unlike the trace HTTP client + // which uses InsecureSkipVerify). buildTLSConfig is bypassed for that case. + if config.TLSCACert == "" && config.Insecure { + opts = append(opts, otlpmetrichttp.WithInsecure()) + } else { + tlsConfig, err := buildTLSConfig(config.TLSCACert, false) if err != nil { - return nil, fmt.Errorf("failed to read CA cert: %w", err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, fmt.Errorf("failed to parse CA cert") - } - tlsConfig := &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, + return nil, err } opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig)) - } else if config.Insecure { - // Skip TLS entirely - opts = append(opts, otlpmetrichttp.WithInsecure()) - } else { - // Use system root CAs (empty tls.Config uses system roots) - opts = append(opts, otlpmetrichttp.WithTLSClientConfig(&tls.Config{ - MinVersion: tls.VersionTLS12, - })) } return otlpmetrichttp.New(ctx, opts...) @@ -290,141 +231,50 @@ func createGRPCExporter(ctx context.Context, config *MetricsConfig) (sdkmetric.E opts = append(opts, otlpmetricgrpc.WithHeaders(config.Headers)) } - // TLS priority: custom CA > system roots > insecure - if config.TLSCACert != "" { - // Validate the CA cert path to prevent path traversal attacks - if err := validateCACertPath(config.TLSCACert); err != nil { - return nil, err - } - // Use custom CA certificate with MinVersion - caCert, err := os.ReadFile(config.TLSCACert) - if err != nil { - return nil, fmt.Errorf("failed to read CA cert: %w", err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, fmt.Errorf("failed to parse CA cert") - } - tlsConfig := &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, - } - creds := credentials.NewTLS(tlsConfig) - opts = append(opts, otlpmetricgrpc.WithTLSCredentials(creds)) - } else if config.Insecure { - // Skip TLS entirely + // gRPC insecure mode uses plaintext (no TLS at all). buildTLSConfig is bypassed for that case. + if config.TLSCACert == "" && config.Insecure { opts = append(opts, otlpmetricgrpc.WithTLSCredentials(insecure.NewCredentials())) } else { - // Use system root CAs with MinVersion - tlsConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, + tlsConfig, err := buildTLSConfig(config.TLSCACert, false) + if err != nil { + return nil, err } - creds := credentials.NewTLS(tlsConfig) - opts = append(opts, otlpmetricgrpc.WithTLSCredentials(creds)) + opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig))) } return otlpmetricgrpc.New(ctx, opts...) } func (m *MetricsExporter) initMetrics() { - // Bifrost upstream metrics - m.upstreamRequestsTotal = &syncInt64Counter{ - name: "bifrost_upstream_requests_total", - desc: "Total number of requests forwarded to upstream providers by Bifrost", - unit: "{request}", - meter: m.meter, - } - - m.successRequestsTotal = &syncInt64Counter{ - name: "bifrost_success_requests_total", - desc: "Total number of successful requests forwarded to upstream providers by Bifrost", - unit: "{request}", - meter: m.meter, - } - - m.errorRequestsTotal = &syncInt64Counter{ - name: "bifrost_error_requests_total", - desc: "Total number of error requests forwarded to upstream providers by Bifrost", - unit: "{request}", - meter: m.meter, - } - - m.inputTokensTotal = &syncInt64Counter{ - name: "bifrost_input_tokens_total", - desc: "Total number of input tokens forwarded to upstream providers by Bifrost", - unit: "{token}", - meter: m.meter, - } - - m.outputTokensTotal = &syncInt64Counter{ - name: "bifrost_output_tokens_total", - desc: "Total number of output tokens forwarded to upstream providers by Bifrost", - unit: "{token}", - meter: m.meter, - } - - m.cacheHitsTotal = &syncInt64Counter{ - name: "bifrost_cache_hits_total", - desc: "Total number of cache hits forwarded to upstream providers by Bifrost", - unit: "{hit}", - meter: m.meter, - } - - m.costTotal = &syncFloat64Counter{ - name: "bifrost_cost_total", - desc: "Total cost in USD for requests to upstream providers", - unit: "USD", - meter: m.meter, - } - - m.upstreamLatencySeconds = &syncFloat64Histogram{ - name: "bifrost_upstream_latency_seconds", - desc: "Latency of requests forwarded to upstream providers by Bifrost", - unit: "s", - meter: m.meter, - } - - m.streamFirstTokenLatencySeconds = &syncFloat64Histogram{ - name: "bifrost_stream_first_token_latency_seconds", - desc: "Latency of the first token of a stream response", - unit: "s", - meter: m.meter, - } - - m.streamInterTokenLatencySeconds = &syncFloat64Histogram{ - name: "bifrost_stream_inter_token_latency_seconds", - desc: "Latency of the intermediate tokens of a stream response", - unit: "s", - meter: m.meter, - } - - // HTTP metrics - m.httpRequestsTotal = &syncInt64Counter{ - name: "http_requests_total", - desc: "Total number of HTTP requests", - unit: "{request}", - meter: m.meter, - } - - m.httpRequestDuration = &syncFloat64Histogram{ - name: "http_request_duration_seconds", - desc: "Duration of HTTP requests", - unit: "s", - meter: m.meter, - } - - m.httpRequestSizeBytes = &syncFloat64Histogram{ - name: "http_request_size_bytes", - desc: "Size of HTTP requests", - unit: "By", - meter: m.meter, - } - - m.httpResponseSizeBytes = &syncFloat64Histogram{ - name: "http_response_size_bytes", - desc: "Size of HTTP responses", - unit: "By", - meter: m.meter, + for _, s := range []struct { + name, desc, unit string + ptr **syncInt64Counter + }{ + {"bifrost_upstream_requests_total", "Total number of requests forwarded to upstream providers by Bifrost", "{request}", &m.upstreamRequestsTotal}, + {"bifrost_success_requests_total", "Total number of successful requests forwarded to upstream providers by Bifrost", "{request}", &m.successRequestsTotal}, + {"bifrost_error_requests_total", "Total number of error requests forwarded to upstream providers by Bifrost", "{request}", &m.errorRequestsTotal}, + {"bifrost_input_tokens_total", "Total number of input tokens forwarded to upstream providers by Bifrost", "{token}", &m.inputTokensTotal}, + {"bifrost_output_tokens_total", "Total number of output tokens forwarded to upstream providers by Bifrost", "{token}", &m.outputTokensTotal}, + {"bifrost_cache_hits_total", "Total number of cache hits forwarded to upstream providers by Bifrost", "{hit}", &m.cacheHitsTotal}, + {"http_requests_total", "Total number of HTTP requests", "{request}", &m.httpRequestsTotal}, + } { + *s.ptr = newSyncInt64Counter(s.name, s.desc, s.unit, m.meter) + } + + m.costTotal = newSyncFloat64Counter("bifrost_cost_total", "Total cost in USD for requests to upstream providers", "USD", m.meter) + + for _, s := range []struct { + name, desc, unit string + ptr **syncFloat64Histogram + }{ + {"bifrost_upstream_latency_seconds", "Latency of requests forwarded to upstream providers by Bifrost", "s", &m.upstreamLatencySeconds}, + {"bifrost_stream_first_token_latency_seconds", "Latency of the first token of a stream response", "s", &m.streamFirstTokenLatencySeconds}, + {"bifrost_stream_inter_token_latency_seconds", "Latency of the intermediate tokens of a stream response", "s", &m.streamInterTokenLatencySeconds}, + {"http_request_duration_seconds", "Duration of HTTP requests", "s", &m.httpRequestDuration}, + {"http_request_size_bytes", "Size of HTTP requests", "By", &m.httpRequestSizeBytes}, + {"http_response_size_bytes", "Size of HTTP responses", "By", &m.httpResponseSizeBytes}, + } { + *s.ptr = newSyncFloat64Histogram(s.name, s.desc, s.unit, m.meter) } } @@ -506,22 +356,39 @@ func (m *MetricsExporter) RecordHTTPResponseSize(ctx context.Context, sizeBytes m.httpResponseSizeBytes.Record(ctx, sizeBytes, metric.WithAttributes(attrs...)) } +// BifrostAttrParams holds parameters for building Bifrost metric attributes +type BifrostAttrParams struct { + Provider string + Model string + Method string + VirtualKeyID string + VirtualKeyName string + SelectedKeyID string + SelectedKeyName string + NumberOfRetries int + FallbackIndex int + TeamID string + TeamName string + CustomerID string + CustomerName string +} + // BuildBifrostAttributes builds common Bifrost metric attributes -func BuildBifrostAttributes(provider, model, method, virtualKeyID, virtualKeyName, selectedKeyID, selectedKeyName string, numberOfRetries, fallbackIndex int, teamID, teamName, customerID, customerName string) []attribute.KeyValue { +func BuildBifrostAttributes(p BifrostAttrParams) []attribute.KeyValue { return []attribute.KeyValue{ - attribute.String("provider", provider), - attribute.String("model", model), - attribute.String("method", method), - attribute.String("virtual_key_id", virtualKeyID), - attribute.String("virtual_key_name", virtualKeyName), - attribute.String("selected_key_id", selectedKeyID), - attribute.String("selected_key_name", selectedKeyName), - attribute.Int("number_of_retries", numberOfRetries), - attribute.Int("fallback_index", fallbackIndex), - attribute.String("team_id", teamID), - attribute.String("team_name", teamName), - attribute.String("customer_id", customerID), - attribute.String("customer_name", customerName), + attribute.String("provider", p.Provider), + attribute.String("model", p.Model), + attribute.String("method", p.Method), + attribute.String("virtual_key_id", p.VirtualKeyID), + attribute.String("virtual_key_name", p.VirtualKeyName), + attribute.String("selected_key_id", p.SelectedKeyID), + attribute.String("selected_key_name", p.SelectedKeyName), + attribute.Int("number_of_retries", p.NumberOfRetries), + attribute.Int("fallback_index", p.FallbackIndex), + attribute.String("team_id", p.TeamID), + attribute.String("team_name", p.TeamName), + attribute.String("customer_id", p.CustomerID), + attribute.String("customer_name", p.CustomerName), } } @@ -533,3 +400,154 @@ func BuildHTTPAttributes(path, method, status string) []attribute.KeyValue { attribute.String("status", status), } } + +// Helper functions for type-safe attribute extraction from trace spans +func getStringAttr(attrs map[string]any, key string) string { + if attrs == nil { + return "" + } + if v, ok := attrs[key].(string); ok { + return v + } + return "" +} + +func getIntAttr(attrs map[string]any, key string) int { + if attrs == nil { + return 0 + } + switch v := attrs[key].(type) { + case int: + return v + case int64: + return int(v) + case float64: + return int(v) + } + return 0 +} + +func getFloat64Attr(attrs map[string]any, key string) float64 { + if attrs == nil { + return 0 + } + switch v := attrs[key].(type) { + case float64: + return v + case int: + return float64(v) + case int64: + return float64(v) + } + return 0 +} + +// recordMetricsFromTrace extracts metrics data from a completed trace and records them +// via the OTEL metrics exporter. This is called from Inject after trace emission. +func (m *MetricsExporter) recordMetricsFromTrace(ctx context.Context, trace *schemas.Trace) { + if trace == nil || m == nil { + return + } + + // Prefer the last attempt span (LLM call or retry) so metrics reflect the final outcome. + var llmSpan *schemas.Span + for _, span := range trace.Spans { + if span.Kind != schemas.SpanKindLLMCall && span.Kind != schemas.SpanKindRetry { + continue + } + if llmSpan == nil || span.EndTime.After(llmSpan.EndTime) { + llmSpan = span + } + } + if llmSpan == nil { + llmSpan = trace.RootSpan + } + + if llmSpan == nil { + return + } + + attrs := llmSpan.Attributes + + // Extract all metric dimensions from span attributes + provider := getStringAttr(attrs, schemas.AttrProviderName) + model := getStringAttr(attrs, schemas.AttrRequestModel) + // Prefer request.type attribute to keep the method stable across retries + method := getStringAttr(attrs, "request.type") + if method == "" { + method = llmSpan.Name + } + virtualKeyID := getStringAttr(attrs, schemas.AttrVirtualKeyID) + virtualKeyName := getStringAttr(attrs, schemas.AttrVirtualKeyName) + selectedKeyID := getStringAttr(attrs, schemas.AttrSelectedKeyID) + selectedKeyName := getStringAttr(attrs, schemas.AttrSelectedKeyName) + numberOfRetries := getIntAttr(attrs, schemas.AttrNumberOfRetries) + fallbackIndex := getIntAttr(attrs, schemas.AttrFallbackIndex) + teamID := getStringAttr(attrs, schemas.AttrTeamID) + teamName := getStringAttr(attrs, schemas.AttrTeamName) + customerID := getStringAttr(attrs, schemas.AttrCustomerID) + customerName := getStringAttr(attrs, schemas.AttrCustomerName) + + // Build common attributes for all metrics + otelAttrs := BuildBifrostAttributes(BifrostAttrParams{ + Provider: provider, + Model: model, + Method: method, + VirtualKeyID: virtualKeyID, + VirtualKeyName: virtualKeyName, + SelectedKeyID: selectedKeyID, + SelectedKeyName: selectedKeyName, + NumberOfRetries: numberOfRetries, + FallbackIndex: fallbackIndex, + TeamID: teamID, + TeamName: teamName, + CustomerID: customerID, + CustomerName: customerName, + }) + + // Record upstream request count + m.RecordUpstreamRequest(ctx, otelAttrs...) + + // Record latency (from span duration) + if !llmSpan.StartTime.IsZero() && !llmSpan.EndTime.IsZero() { + latencySeconds := llmSpan.EndTime.Sub(llmSpan.StartTime).Seconds() + m.RecordUpstreamLatency(ctx, latencySeconds, otelAttrs...) + } + + // Record success or error based on span status + if llmSpan.Status == schemas.SpanStatusError { + m.RecordErrorRequest(ctx, otelAttrs...) + } else { + m.RecordSuccessRequest(ctx, otelAttrs...) + } + + // Record token usage - try both naming conventions + inputTokens := getIntAttr(attrs, schemas.AttrPromptTokens) + if inputTokens == 0 { + inputTokens = getIntAttr(attrs, schemas.AttrInputTokens) + } + if inputTokens > 0 { + m.RecordInputTokens(ctx, int64(inputTokens), otelAttrs...) + } + + outputTokens := getIntAttr(attrs, schemas.AttrCompletionTokens) + if outputTokens == 0 { + outputTokens = getIntAttr(attrs, schemas.AttrOutputTokens) + } + if outputTokens > 0 { + m.RecordOutputTokens(ctx, int64(outputTokens), otelAttrs...) + } + + // Record cost if available + cost := getFloat64Attr(attrs, schemas.AttrUsageCost) + if cost > 0 { + m.RecordCost(ctx, cost, otelAttrs...) + } + + // Record streaming latency metrics if available + ttft := getFloat64Attr(attrs, schemas.AttrTimeToFirstToken) + if ttft > 0 { + // Convert from nanoseconds to seconds if needed (check the unit) + m.RecordStreamFirstTokenLatency(ctx, ttft/1e9, otelAttrs...) + } +} diff --git a/plugins/otel/utils.go b/plugins/otel/utils.go new file mode 100644 index 0000000000..65646657dc --- /dev/null +++ b/plugins/otel/utils.go @@ -0,0 +1,91 @@ +package otel + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + "path/filepath" + "strings" +) + +// injectEnvToHeaders converts any headers that start with "env." with their corresponding environment variable value +// errors out if any environment variable is not found +func injectEnvToHeaders(headers map[string]string) error { + if headers == nil { + return nil + } + for k, v := range headers { + if envKey, ok := strings.CutPrefix(v, "env."); ok { + envVal := os.Getenv(envKey) + if envVal == "" { + return fmt.Errorf("environment variable %s not found", envKey) + } + headers[k] = envVal + } + } + return nil +} + +// validateCACertPath validates the CA certificate path to prevent path traversal attacks. +// It ensures the path is absolute, cleaned of traversal sequences, and exists as a regular file. +func validateCACertPath(certPath string) error { + if certPath == "" { + return nil + } + + // Clean the path to resolve any .. or . components + cleanPath := filepath.Clean(certPath) + + // Require absolute paths to prevent relative path attacks + if !filepath.IsAbs(cleanPath) { + return fmt.Errorf("TLS CA cert path must be absolute: %s", certPath) + } + + // Verify the file exists and is not a symlink + info, err := os.Lstat(cleanPath) + if err != nil { + return fmt.Errorf("TLS CA cert path not accessible: %w", err) + } + // Reject symlinks to prevent symlink-based path traversal + if info.Mode()&os.ModeSymlink != 0 { + return fmt.Errorf("TLS CA cert path cannot be a symlink: %s", certPath) + } + // Ensure path is a regular file, not directories, sockets, pipes, devices, etc. + if !info.Mode().IsRegular() { + return fmt.Errorf("TLS CA cert path is not a regular file: %s", certPath) + } + + return nil +} + +// Builds a TLS config with custom CA, insecure mode, or system roots CAs +// - use a custom CA pool if tlsCACert is provided +// - otherwise skip verification if insecureMode is enabled +// - otherwise use the system root CAs +func buildTLSConfig(tlsCACert string, insecureMode bool) (*tls.Config, error) { + cfg := tls.Config{ + InsecureSkipVerify: false, + MinVersion: tls.VersionTLS12, + } + + // TLS priority: custom CA > system roots > insecure + if tlsCACert != "" { + if err := validateCACertPath(tlsCACert); err != nil { + return nil, err + } + caCert, err := os.ReadFile(tlsCACert) + if err != nil { + return nil, fmt.Errorf("failed to load provided CA cert: %w", err) + } + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to add provided CA cert") + } + cfg.RootCAs = caCertPool + } else if insecureMode { + cfg.InsecureSkipVerify = true // #nosec G402 + } + + return &cfg, nil +} diff --git a/transports/bifrost-http/server/plugins.go b/transports/bifrost-http/server/plugins.go index c0137d6e6b..5f7a30f6cb 100644 --- a/transports/bifrost-http/server/plugins.go +++ b/transports/bifrost-http/server/plugins.go @@ -118,7 +118,7 @@ func loadBuiltinPlugin(ctx context.Context, name string, pluginConfig any, bifro } // loadCustomPlugin loads a plugin from a shared object file -func loadCustomPlugin(ctx context.Context, path *string, pluginConfig any, bifrostConfig *lib.Config) (schemas.BasePlugin, error) { +func loadCustomPlugin(_ context.Context, path *string, pluginConfig any, bifrostConfig *lib.Config) (schemas.BasePlugin, error) { logger.Info("loading custom plugin from path %s", *path) plugin, err := bifrostConfig.PluginLoader.LoadPlugin(*path, pluginConfig) diff --git a/transports/config.schema.json b/transports/config.schema.json index 7b64e12e51..b82faf7074 100644 --- a/transports/config.schema.json +++ b/transports/config.schema.json @@ -1396,74 +1396,102 @@ "type": "object", "description": "Configuration for the OpenTelemetry plugin", "properties": { - "service_name": { - "type": "string", - "description": "Service name to be used for tracing", - "default": "bifrost" - }, - "collector_url": { - "type": "string", - "description": "URL of the OpenTelemetry collector", - "oneOf": [ - { - "format": "uri" - }, - { - "pattern": "^[^:\\s]+:\\d+$" - } - ] - }, - "trace_type": { - "type": "string", - "description": "Type of trace to use for the OTEL collector", - "enum": ["genai_extension", "vercel", "open_inference"] - }, - "protocol": { - "type": "string", - "description": "Protocol to use for the OTEL collector", - "enum": ["http", "grpc"] - }, - "metrics_enabled": { - "type": "boolean", - "description": "Enable push-based metrics export via OTLP. Recommended for multi-node cluster deployments.", - "default": false - }, - "metrics_endpoint": { - "type": "string", - "description": "OTLP metrics endpoint URL (e.g., http://otel-collector:4318/v1/metrics for HTTP or otel-collector:4317 for gRPC)", - "oneOf": [ - { - "format": "uri" + "profiles": { + "type": "array", + "description": "One or more OTEL collector destinations. Each profile exports to a separate collector.", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": "If false, this profile is skipped during export. Defaults to true.", + "default": true + }, + "service_name": { + "type": "string", + "description": "Service name to be used for tracing", + "default": "bifrost" + }, + "collector_url": { + "type": "string", + "description": "URL of the OpenTelemetry collector", + "oneOf": [ + { + "format": "uri" + }, + { + "pattern": "^[^:\\s]+:\\d+$" + } + ] + }, + "trace_type": { + "type": "string", + "description": "Type of trace to use for the OTEL collector", + "enum": [ + "genai_extension" + ] + }, + "protocol": { + "type": "string", + "description": "Protocol to use for the OTEL collector", + "enum": [ + "http", + "grpc" + ] + }, + "metrics_enabled": { + "type": "boolean", + "description": "Enable push-based metrics export via OTLP. Recommended for multi-node cluster deployments.", + "default": false + }, + "metrics_endpoint": { + "type": "string", + "description": "OTLP metrics endpoint URL (e.g., http://otel-collector:4318/v1/metrics for HTTP or otel-collector:4317 for gRPC)", + "oneOf": [ + { + "format": "uri" + }, + { + "pattern": "^[^:\\s]+:\\d+$" + } + ] + }, + "metrics_push_interval": { + "type": "integer", + "description": "Metrics push interval in seconds", + "default": 15, + "minimum": 1, + "maximum": 300 + }, + "headers": { + "type": "object", + "additionalProperties": { + "type": "string" + }, + "description": "Custom headers for the collector. Supports env.VAR_NAME prefix for environment variable substitution." + }, + "tls_ca_cert": { + "type": "string", + "description": "Path to TLS CA certificate file" + }, + "insecure": { + "type": "boolean", + "description": "Skip TLS verification (ignored if tls_ca_cert is set)" + } }, - { - "pattern": "^[^:\\s]+:\\d+$" - } - ] - }, - "metrics_push_interval": { - "type": "integer", - "description": "Metrics push interval in seconds", - "default": 15, - "minimum": 1, - "maximum": 300 - }, - "headers": { - "type": "object", - "additionalProperties": { - "type": "string" - }, - "description": "Custom headers for the collector. Supports env.VAR_NAME prefix for environment variable substitution." - }, - "tls_ca_cert": { - "type": "string", - "description": "Path to TLS CA certificate file" - }, - "insecure": { - "type": "boolean", - "description": "Skip TLS verification (ignored if tls_ca_cert is set)" + "required": [ + "collector_url", + "trace_type", + "protocol" + ], + "additionalProperties": false + } } }, - "required": ["collector_url", "trace_type", "protocol"], + "required": [ + "profiles" + ], "additionalProperties": false } } diff --git a/ui/app/workspace/observability/fragments/otelFormFragment.tsx b/ui/app/workspace/observability/fragments/otelFormFragment.tsx index f27438fb54..180e6e8d26 100644 --- a/ui/app/workspace/observability/fragments/otelFormFragment.tsx +++ b/ui/app/workspace/observability/fragments/otelFormFragment.tsx @@ -1,458 +1,681 @@ import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; -import { Form, FormControl, FormDescription, FormField, FormItem, FormLabel, FormMessage } from "@/components/ui/form"; +import { + Form, + FormControl, + FormDescription, + FormField, + FormItem, + FormLabel, + FormMessage, +} from "@/components/ui/form"; import { HeadersTable } from "@/components/ui/headersTable"; import { Input } from "@/components/ui/input"; -import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; import { Switch } from "@/components/ui/switch"; -import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from "@/components/ui/tooltip"; -import { otelFormSchema, type OtelFormSchema } from "@/lib/types/schemas"; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from "@/components/ui/tooltip"; +import { + otelFormSchema, + type OtelFormSchema, + type OtelProfileConfigSchema, +} from "@/lib/types/schemas"; import { RbacOperation, RbacResource, useRbac } from "@enterprise/lib"; import { zodResolver } from "@hookform/resolvers/zod"; -import { Trash2 } from "lucide-react"; +import { ChevronDown, ChevronRight, Plus, Trash2 } from "lucide-react"; import { useEffect, useState } from "react"; -import { useForm, type Resolver } from "react-hook-form"; +import { useFieldArray, useForm, type Resolver } from "react-hook-form"; interface OtelFormFragmentProps { - currentConfig?: { - enabled?: boolean; - service_name?: string; - collector_url?: string; - headers?: Record; - trace_type?: "genai_extension" | "vercel" | "open_inference"; - protocol?: "http" | "grpc"; - // TLS configuration - tls_ca_cert?: string; - insecure?: boolean; - // Metrics push configuration - metrics_enabled?: boolean; - metrics_endpoint?: string; - metrics_push_interval?: number; - }; - onSave: (config: OtelFormSchema) => Promise; - onDelete?: () => void; - isDeleting?: boolean; - isLoading?: boolean; + currentConfig?: { + enabled?: boolean; + profiles?: Array<{ + enabled?: boolean; + service_name?: string; + collector_url?: string; + headers?: Record; + trace_type?: "genai_extension" | "vercel" | "open_inference"; + protocol?: "http" | "grpc"; + tls_ca_cert?: string; + insecure?: boolean; + metrics_enabled?: boolean; + metrics_endpoint?: string; + metrics_push_interval?: number; + }>; + }; + onSave: (config: OtelFormSchema) => Promise; + onDelete?: () => void; + isDeleting?: boolean; + isLoading?: boolean; } +const DEFAULT_PROFILE: OtelProfileConfigSchema = { + enabled: false, + service_name: "bifrost", + collector_url: "", + headers: {}, + trace_type: "genai_extension", + protocol: "http", + tls_ca_cert: "", + insecure: true, + metrics_enabled: false, + metrics_endpoint: "", + metrics_push_interval: 15, +}; + +type RawProfile = NonNullable< + NonNullable["profiles"] +>[number]; + +function profileDefaults(p?: RawProfile): OtelProfileConfigSchema { + return { + enabled: p?.enabled ?? true, + service_name: p?.service_name ?? "bifrost", + collector_url: p?.collector_url ?? "", + headers: p?.headers ?? {}, + trace_type: p?.trace_type ?? "genai_extension", + protocol: p?.protocol ?? "http", + tls_ca_cert: p?.tls_ca_cert ?? "", + insecure: p?.insecure ?? true, + metrics_enabled: p?.metrics_enabled ?? false, + metrics_endpoint: p?.metrics_endpoint ?? "", + metrics_push_interval: p?.metrics_push_interval ?? 15, + }; +} + +const traceTypeOptions: { + value: string; + label: string; + disabled?: boolean; + disabledReason?: string; +}[] = [ + { value: "genai_extension", label: "OTel GenAI Extension (Recommended)" }, + { + value: "vercel", + label: "Vercel AI SDK", + disabled: true, + disabledReason: "Coming soon", + }, + { + value: "open_inference", + label: "Arize OpenInference", + disabled: true, + disabledReason: "Coming soon", + }, +]; +const protocolOptions = [ + { value: "http", label: "HTTP" }, + { value: "grpc", label: "GRPC" }, +]; + export function OtelFormFragment({ - currentConfig: initialConfig, - onSave, - onDelete, - isDeleting = false, - isLoading = false, + currentConfig: initialConfig, + onSave, + onDelete, + isDeleting = false, + isLoading = false, }: OtelFormFragmentProps) { - const hasOtelAccess = useRbac(RbacResource.Observability, RbacOperation.Update); - const [isSaving, setIsSaving] = useState(false); - const form = useForm({ - resolver: zodResolver(otelFormSchema) as Resolver, - mode: "onChange", - reValidateMode: "onChange", - defaultValues: { - enabled: initialConfig?.enabled ?? true, - otel_config: { - service_name: initialConfig?.service_name ?? "bifrost", - collector_url: initialConfig?.collector_url ?? "", - headers: initialConfig?.headers ?? {}, - trace_type: initialConfig?.trace_type ?? "genai_extension", - protocol: initialConfig?.protocol ?? "http", - tls_ca_cert: initialConfig?.tls_ca_cert ?? "", - insecure: initialConfig?.insecure ?? true, - metrics_enabled: initialConfig?.metrics_enabled ?? false, - metrics_endpoint: initialConfig?.metrics_endpoint ?? "", - metrics_push_interval: initialConfig?.metrics_push_interval ?? 15, - }, - }, - }); + const hasOtelAccess = useRbac( + RbacResource.Observability, + RbacOperation.Update, + ); + const [isSaving, setIsSaving] = useState(false); - const onSubmit = (data: OtelFormSchema) => { - setIsSaving(true); - onSave(data).finally(() => setIsSaving(false)); - }; + const makeDefaultValues = (cfg: typeof initialConfig) => ({ + enabled: cfg?.enabled ?? true, + otel_config: { + profiles: cfg?.profiles?.length + ? cfg.profiles.map(profileDefaults) + : [{ ...DEFAULT_PROFILE }], + }, + }); - // Re-run validation on collector_url when protocol changes so cross-field - // refinement in the schema is applied immediately - const protocol = form.watch("otel_config.protocol"); - const metricsEnabled = form.watch("otel_config.metrics_enabled"); - useEffect(() => { - if (form.getValues("enabled") === false) return; - form.trigger("otel_config.collector_url"); - // Also re-validate metrics_endpoint when protocol changes - if (metricsEnabled) { - form.trigger("otel_config.metrics_endpoint"); - } - }, [protocol, form, metricsEnabled]); + const form = useForm({ + resolver: zodResolver(otelFormSchema) as Resolver< + OtelFormSchema, + any, + OtelFormSchema + >, + mode: "onChange", + reValidateMode: "onChange", + defaultValues: makeDefaultValues(initialConfig), + }); - // Re-run validation on metrics_endpoint when metrics_enabled changes - useEffect(() => { - if (metricsEnabled) { - form.trigger("otel_config.metrics_endpoint"); - } - }, [metricsEnabled, form]); + const { fields, append, remove } = useFieldArray({ + control: form.control, + name: "otel_config.profiles", + }); - useEffect(() => { - // Reset form with new initial config when it changes - form.reset({ - enabled: initialConfig?.enabled ?? true, - otel_config: { - service_name: initialConfig?.service_name ?? "bifrost", - collector_url: initialConfig?.collector_url || "", - headers: initialConfig?.headers || {}, - trace_type: initialConfig?.trace_type || "genai_extension", - protocol: initialConfig?.protocol || "http", - tls_ca_cert: initialConfig?.tls_ca_cert ?? "", - insecure: initialConfig?.insecure ?? true, - metrics_enabled: initialConfig?.metrics_enabled ?? false, - metrics_endpoint: initialConfig?.metrics_endpoint ?? "", - metrics_push_interval: initialConfig?.metrics_push_interval ?? 15, - }, - }); - }, [form, initialConfig]); + const [expandedProfiles, setExpandedProfiles] = useState< + Record + >(() => Object.fromEntries(fields.map((_, i) => [i, true]))); - const traceTypeOptions: { value: string; label: string; disabled?: boolean; disabledReason?: string }[] = [ - { value: "genai_extension", label: "OTel GenAI Extension (Recommended)" }, - { value: "vercel", label: "Vercel AI SDK", disabled: true, disabledReason: "Coming soon" }, - { value: "open_inference", label: "Arize OpenInference", disabled: true, disabledReason: "Coming soon" }, - ]; - const protocolOptions: { value: string; label: string; disabled?: boolean; disabledReason?: string }[] = [ - { value: "http", label: "HTTP" }, - { value: "grpc", label: "GRPC" }, - ]; + // Auto-expand newly appended profiles. Keyed by index so state survives + // form.reset() (which regenerates useFieldArray ids). + useEffect(() => { + setExpandedProfiles((prev) => { + const next: Record = {}; + for (let i = 0; i < fields.length; i++) { + next[i] = i in prev ? prev[i] : true; + } + return next; + }); + }, [fields.length]); - return ( -
- - {/* OTEL Configuration */} -
-
- ( - - Service Name - If kept empty, the service name will be set to "bifrost" - - - - - - )} - /> - ( - - OTLP Collector URL -
- {form.watch("otel_config.protocol") === "http" ? "http(s)://:/v1/traces" : ":"} -
- - - - -
- )} - /> - ( - - - - - - - )} - /> -
- ( - - Format - - - - )} - /> + // Reset form when saved config changes (e.g. after page reload / navigation) + useEffect(() => { + form.reset(makeDefaultValues(initialConfig)); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [initialConfig]); - ( - - Protocol - - - - )} - /> -
+ // Re-trigger cross-field validation when protocol or metrics_enabled changes for any profile + const profiles = form.watch("otel_config.profiles"); + const protocolsKey = profiles.map((p) => p.protocol).join(","); + const metricsKey = profiles.map((p) => p.metrics_enabled).join(","); + useEffect(() => { + if (!form.getValues("enabled")) return; + profiles.forEach((profile, i) => { + void form.trigger(`otel_config.profiles.${i}.collector_url` as any); + if (profile.metrics_enabled) { + void form.trigger(`otel_config.profiles.${i}.metrics_endpoint` as any); + } + }); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [protocolsKey, metricsKey]); - {/* TLS Configuration */} -
- ( - -
-
- Insecure (Skip TLS) - - Skip TLS verification. Disable this to use TLS with system root CAs or a custom CA certificate. - -
-
- { - field.onChange(checked); - if (checked) { - form.setValue("otel_config.tls_ca_cert", ""); - } - }} - disabled={!hasOtelAccess} - /> -
-
-
- )} - /> - {!form.watch("otel_config.insecure") && ( - ( - - TLS CA Certificate Path - - File path to the CA certificate on the Bifrost server. Leave empty to use system root CAs. - - - - - - - )} - /> - )} -
-
-
+ const onSubmit = (data: OtelFormSchema) => { + setIsSaving(true); + onSave(data).finally(() => setIsSaving(false)); + }; - {/* Metrics Push Configuration */} -
- ( - -
-
-

- Enable Metrics Export BETA -

-

- Push metrics to an OTEL Collector for proper aggregation in cluster deployments -

-
-
- -
-
-
- )} - /> + return ( + + + {/* Profile cards */} +
+ {fields.map((field, index) => { + const isExpanded = expandedProfiles[index] !== false; + const serviceName = form.watch( + `otel_config.profiles.${index}.service_name`, + ); + const collectorUrl = form.watch( + `otel_config.profiles.${index}.collector_url`, + ); + const profileTitle = serviceName?.trim() || `Profile ${index + 1}`; - {form.watch("otel_config.metrics_enabled") && ( -
- ( - - Metrics Endpoint -
- {form.watch("otel_config.protocol") === "http" ? "http(s)://:/v1/metrics" : ":"} -
- - - - -
- )} - /> + return ( +
+ {/* Collapsible header */} +
+ setExpandedProfiles((prev) => ({ + ...prev, + [index]: prev[index] === false ? true : false, + })) + } + > +
+ {profileTitle} + {!isExpanded && collectorUrl && ( + + {collectorUrl} + + )} +
+
+ ( + e.stopPropagation()} + disabled={!hasOtelAccess} + aria-label="Enable profile" + /> + )} + /> + {fields.length > 1 && ( + + )} + {isExpanded ? ( + + ) : ( + + )} +
+
- ( - - Push Interval (seconds) - - field.onChange(e.target.value === "" ? null : Number(e.target.value))} - /> - - How often to push metrics (1-300 seconds) - - - )} - /> -
- )} -
+ {/* Expanded body */} + {isExpanded && ( +
+
+ ( + + Service Name + + If kept empty, the service name will be set to + "bifrost" + + + + + + + )} + /> + ( + + OTLP Collector URL +
+ + {form.watch( + `otel_config.profiles.${index}.protocol`, + ) === "http" + ? "http(s)://:/v1/traces" + : ":"} + +
+ + + + +
+ )} + /> + ( + + + + + + + )} + /> +
+ ( + + Format + + + + )} + /> + ( + + Protocol + + + + )} + /> +
- {/* Form Actions */} -
- ( - - Enabled - - - - - )} - /> -
- {onDelete && ( - - )} - - - - - - - {(!form.formState.isDirty || !form.formState.isValid) && ( - -

- {!form.formState.isDirty && !form.formState.isValid - ? "No changes made and validation errors present" - : !form.formState.isDirty - ? "No changes made" - : "Please fix validation errors"} -

-
- )} -
-
-
-
- - - ); -} \ No newline at end of file + {/* TLS Configuration */} +
+ ( + +
+
+ Insecure (Skip TLS) + + Skip TLS verification. Disable this to use + TLS with system root CAs or a custom CA + certificate. + +
+
+ { + field.onChange(checked); + if (checked) { + form.setValue( + `otel_config.profiles.${index}.tls_ca_cert`, + "", + ); + } + }} + disabled={!hasOtelAccess} + /> +
+
+
+ )} + /> + {!form.watch( + `otel_config.profiles.${index}.insecure`, + ) && ( + ( + + TLS CA Certificate Path + + File path to the CA certificate on the Bifrost + server. Leave empty to use system root CAs. + + + + + + + )} + /> + )} +
+
+ + {/* Metrics Push Configuration */} +
+ ( + +
+
+

+ Enable Metrics Export{" "} + BETA +

+

+ Push metrics to an OTEL Collector for proper + aggregation in cluster deployments +

+
+
+ +
+
+
+ )} + /> + {form.watch( + `otel_config.profiles.${index}.metrics_enabled`, + ) && ( +
+ ( + + Metrics Endpoint +
+ + {form.watch( + `otel_config.profiles.${index}.protocol`, + ) === "http" + ? "http(s)://:/v1/metrics" + : ":"} + +
+ + + + +
+ )} + /> + ( + + Push Interval (seconds) + + + field.onChange( + e.target.value === "" + ? null + : Number(e.target.value), + ) + } + /> + + + How often to push metrics (1-300 seconds) + + + + )} + /> +
+ )} +
+
+ )} +
+ ); + })} +
+ + {/* Add Profile */} + + + {/* Form Actions */} +
+ ( + + + Enabled + + + + + + )} + /> +
+ {onDelete && ( + + )} + + + + + + + {(!form.formState.isDirty || !form.formState.isValid) && ( + +

+ {!form.formState.isDirty && !form.formState.isValid + ? "No changes made and validation errors present" + : !form.formState.isDirty + ? "No changes made" + : "Please fix validation errors"} +

+
+ )} +
+
+
+
+ + + ); +} diff --git a/ui/lib/types/schemas.ts b/ui/lib/types/schemas.ts index 2ecf332a97..6ec2e1c93b 100644 --- a/ui/lib/types/schemas.ts +++ b/ui/lib/types/schemas.ts @@ -710,9 +710,10 @@ export const betaHeadersFormSchema = z.object({ export type BetaHeadersFormSchema = z.infer; -// OTEL Configuration Schema -export const otelConfigSchema = z +// OTEL Profile Configuration Schema (per-collector configuration) +export const otelProfileConfigSchema = z .object({ + enabled: z.boolean().default(true), service_name: z.string().optional(), collector_url: z.string().default(""), trace_type: z @@ -809,6 +810,11 @@ export const otelConfigSchema = z } }); +// OTEL Configuration Schema (wraps one or more collector profiles) +export const otelConfigSchema = z.object({ + profiles: z.array(otelProfileConfigSchema).min(1, "At least one profile is required"), +}); + // OTEL form schema for the OtelFormFragment export const otelFormSchema = z .object({ @@ -817,14 +823,16 @@ export const otelFormSchema = z }) .superRefine((data, ctx) => { if (data.enabled) { - const collectorUrl = (data.otel_config.collector_url || "").trim(); - if (!collectorUrl) { - ctx.addIssue({ - code: "custom", - path: ["otel_config", "collector_url"], - message: "Collector address is required", - }); - } + data.otel_config.profiles.forEach((profile, i) => { + const collectorUrl = (profile.collector_url || "").trim(); + if (!collectorUrl) { + ctx.addIssue({ + code: "custom", + path: ["otel_config", "profiles", i, "collector_url"], + message: "Collector address is required", + }); + } + }); } }); @@ -1090,6 +1098,7 @@ export type NetworkFormConfigSchema = z.infer; export type ProxyFormConfigSchema = z.infer; export type NetworkAndProxyFormSchema = z.infer; export type ProxyOnlyFormSchema = z.infer; +export type OtelProfileConfigSchema = z.infer; export type OtelConfigSchema = z.infer; export type OtelFormSchema = z.infer; export type MaximConfigSchema = z.infer; From 580d85969024ad5ffb8975bed79a17ddef09250a Mon Sep 17 00:00:00 2001 From: Akshay Deo Date: Wed, 15 Apr 2026 01:21:23 +0530 Subject: [PATCH 2/2] updates helm-chart checks (#2718) Adds a new Go-based schema synchronization tool (`schemasync`) to validate that Go config types stay in sync with `config.schema.json`. The tool recursively walks Go struct types via AST analysis to ensure field names, types, and enum values match between Go code and JSON schema definitions. - **New schemasync tool**: Created `.github/workflows/scripts/schemasync/` with a Go program that uses `go/types` to analyze config structs and compare them against the JSON schema - **CI integration**: Added schemasync validation step to the helm-release workflow with Go 1.26 setup - **Enhanced MCP secret support**: Added `secretRef` configuration for MCP client connection strings in helm charts, allowing Kubernetes secret injection - **Schema updates**: Updated `config.schema.json` and `values.schema.json` to reflect recent Go type changes including: - Renamed `concurrency_config` to `concurrency_and_buffer_size` - Added new provider fields (`id`, `description`, network/proxy configs) - Updated plugin placement enum to include `builtin` - Added `chain_rule` field for routing rules - Removed unused MCP websocket/http config objects - **Validation script updates**: Modified existing helm schema validation to account for renamed types and removed obsolete checks - [x] Feature - [x] Chore/CI - [x] Core (Go) - [x] Transports (HTTP) - [ ] Providers/Integrations - [ ] Plugins - [ ] UI (React) - [ ] Docs The schemasync tool runs automatically in CI. To test locally: ```sh .github/workflows/scripts/validate-schema-sync.sh .github/workflows/scripts/validate-helm-config-fields.sh helm template helm-charts/bifrost --values test-values.yaml ``` The tool validates: - Go struct fields have corresponding JSON schema properties - Enum constants in Go match schema enum arrays - EnvVar-typed fields have helm chart secret support N/A - This is a validation tool and schema update. - [ ] Yes - [x] No The changes are additive validation tooling and schema updates that maintain backward compatibility. This addresses the need for automated validation between Go config types and JSON schemas to prevent drift during development. - The schemasync tool validates that EnvVar-typed fields (which may contain secrets) have proper Kubernetes secret injection support in helm charts - Added MCP connection string secret reference capability to avoid hardcoding sensitive connection details - [x] I read `docs/contributing/README.md` and followed the guidelines - [x] I added/updated tests where appropriate - [x] I updated documentation where needed - [x] I verified builds succeed (Go and UI) - [x] I verified the CI pipeline passes locally if applicable --- .github/workflows/helm-release.yml | 2 +- .../workflows/scripts/validate-schema-sync.sh | 2 +- plugins/otel/main.go | 2 +- plugins/otel/metrics.go | 156 ++++++++---------- .../fragments/otelFormFragment.tsx | 4 +- ui/lib/types/schemas.ts | 4 +- 6 files changed, 72 insertions(+), 98 deletions(-) diff --git a/.github/workflows/helm-release.yml b/.github/workflows/helm-release.yml index bfeb83bb39..599efb3af6 100644 --- a/.github/workflows/helm-release.yml +++ b/.github/workflows/helm-release.yml @@ -126,4 +126,4 @@ jobs: keep_files: false enable_jekyll: false user_name: "github-actions[bot]" - user_email: "github-actions[bot]@users.noreply.github.com" + user_email: "github-actions[bot]@users.noreply.github.com" \ No newline at end of file diff --git a/.github/workflows/scripts/validate-schema-sync.sh b/.github/workflows/scripts/validate-schema-sync.sh index 0214d0679b..83dd285f0d 100755 --- a/.github/workflows/scripts/validate-schema-sync.sh +++ b/.github/workflows/scripts/validate-schema-sync.sh @@ -60,4 +60,4 @@ echo "==================================================================" --schema "$REPO_ROOT/transports/config.schema.json" \ --pkg-root "$REPO_ROOT" \ --helm-values "$REPO_ROOT/helm-charts/bifrost/values.schema.json" \ - --helm-helpers "$REPO_ROOT/helm-charts/bifrost/templates/_helpers.tpl" + --helm-helpers "$REPO_ROOT/helm-charts/bifrost/templates/_helpers.tpl" \ No newline at end of file diff --git a/plugins/otel/main.go b/plugins/otel/main.go index 94c85a5a37..1f1cc57a6e 100644 --- a/plugins/otel/main.go +++ b/plugins/otel/main.go @@ -321,4 +321,4 @@ func (p *OtelPlugin) Cleanup() error { } // Compile-time check that OtelPlugin implements ObservabilityPlugin -var _ schemas.ObservabilityPlugin = (*OtelPlugin)(nil) +var _ schemas.ObservabilityPlugin = (*OtelPlugin)(nil) \ No newline at end of file diff --git a/plugins/otel/metrics.go b/plugins/otel/metrics.go index 1d3d58c0df..73aa3de26e 100644 --- a/plugins/otel/metrics.go +++ b/plugins/otel/metrics.go @@ -356,39 +356,22 @@ func (m *MetricsExporter) RecordHTTPResponseSize(ctx context.Context, sizeBytes m.httpResponseSizeBytes.Record(ctx, sizeBytes, metric.WithAttributes(attrs...)) } -// BifrostAttrParams holds parameters for building Bifrost metric attributes -type BifrostAttrParams struct { - Provider string - Model string - Method string - VirtualKeyID string - VirtualKeyName string - SelectedKeyID string - SelectedKeyName string - NumberOfRetries int - FallbackIndex int - TeamID string - TeamName string - CustomerID string - CustomerName string -} - // BuildBifrostAttributes builds common Bifrost metric attributes -func BuildBifrostAttributes(p BifrostAttrParams) []attribute.KeyValue { +func BuildBifrostAttributes(provider, model, method, virtualKeyID, virtualKeyName, selectedKeyID, selectedKeyName string, numberOfRetries, fallbackIndex int, teamID, teamName, customerID, customerName string) []attribute.KeyValue { return []attribute.KeyValue{ - attribute.String("provider", p.Provider), - attribute.String("model", p.Model), - attribute.String("method", p.Method), - attribute.String("virtual_key_id", p.VirtualKeyID), - attribute.String("virtual_key_name", p.VirtualKeyName), - attribute.String("selected_key_id", p.SelectedKeyID), - attribute.String("selected_key_name", p.SelectedKeyName), - attribute.Int("number_of_retries", p.NumberOfRetries), - attribute.Int("fallback_index", p.FallbackIndex), - attribute.String("team_id", p.TeamID), - attribute.String("team_name", p.TeamName), - attribute.String("customer_id", p.CustomerID), - attribute.String("customer_name", p.CustomerName), + attribute.String("provider", provider), + attribute.String("model", model), + attribute.String("method", method), + attribute.String("virtual_key_id", virtualKeyID), + attribute.String("virtual_key_name", virtualKeyName), + attribute.String("selected_key_id", selectedKeyID), + attribute.String("selected_key_name", selectedKeyName), + attribute.Int("number_of_retries", numberOfRetries), + attribute.Int("fallback_index", fallbackIndex), + attribute.String("team_id", teamID), + attribute.String("team_name", teamName), + attribute.String("customer_id", customerID), + attribute.String("customer_name", customerName), } } @@ -442,84 +425,77 @@ func getFloat64Attr(attrs map[string]any, key string) float64 { return 0 } +// buildSpanAttrs extracts metric dimension attrs from a single attempt span. +func buildSpanAttrs(span *schemas.Span) []attribute.KeyValue { + attrs := span.Attributes + method := getStringAttr(attrs, "request.type") + if method == "" { + method = span.Name + } + return BuildBifrostAttributes( + getStringAttr(attrs, schemas.AttrProviderName), + getStringAttr(attrs, schemas.AttrRequestModel), + method, + getStringAttr(attrs, schemas.AttrVirtualKeyID), + getStringAttr(attrs, schemas.AttrVirtualKeyName), + getStringAttr(attrs, schemas.AttrSelectedKeyID), + getStringAttr(attrs, schemas.AttrSelectedKeyName), + getIntAttr(attrs, schemas.AttrNumberOfRetries), + getIntAttr(attrs, schemas.AttrFallbackIndex), + getStringAttr(attrs, schemas.AttrTeamID), + getStringAttr(attrs, schemas.AttrTeamName), + getStringAttr(attrs, schemas.AttrCustomerID), + getStringAttr(attrs, schemas.AttrCustomerName), + ) +} + // recordMetricsFromTrace extracts metrics data from a completed trace and records them // via the OTEL metrics exporter. This is called from Inject after trace emission. +// +// Per-attempt metrics (upstream_requests, errors, success, latency) are recorded once +// per llm.call/retry span so fallback attempts and failed retries are counted with +// their own provider/model/fallback_index labels. Per-trace metrics (tokens, cost, +// TTFT) are recorded once, keyed off the final (latest) attempt span. func (m *MetricsExporter) recordMetricsFromTrace(ctx context.Context, trace *schemas.Trace) { if trace == nil || m == nil { return } - // Prefer the last attempt span (LLM call or retry) so metrics reflect the final outcome. - var llmSpan *schemas.Span + var finalSpan *schemas.Span for _, span := range trace.Spans { if span.Kind != schemas.SpanKindLLMCall && span.Kind != schemas.SpanKindRetry { continue } - if llmSpan == nil || span.EndTime.After(llmSpan.EndTime) { - llmSpan = span - } - } - if llmSpan == nil { - llmSpan = trace.RootSpan - } - if llmSpan == nil { - return - } + spanAttrs := buildSpanAttrs(span) - attrs := llmSpan.Attributes + m.RecordUpstreamRequest(ctx, spanAttrs...) - // Extract all metric dimensions from span attributes - provider := getStringAttr(attrs, schemas.AttrProviderName) - model := getStringAttr(attrs, schemas.AttrRequestModel) - // Prefer request.type attribute to keep the method stable across retries - method := getStringAttr(attrs, "request.type") - if method == "" { - method = llmSpan.Name - } - virtualKeyID := getStringAttr(attrs, schemas.AttrVirtualKeyID) - virtualKeyName := getStringAttr(attrs, schemas.AttrVirtualKeyName) - selectedKeyID := getStringAttr(attrs, schemas.AttrSelectedKeyID) - selectedKeyName := getStringAttr(attrs, schemas.AttrSelectedKeyName) - numberOfRetries := getIntAttr(attrs, schemas.AttrNumberOfRetries) - fallbackIndex := getIntAttr(attrs, schemas.AttrFallbackIndex) - teamID := getStringAttr(attrs, schemas.AttrTeamID) - teamName := getStringAttr(attrs, schemas.AttrTeamName) - customerID := getStringAttr(attrs, schemas.AttrCustomerID) - customerName := getStringAttr(attrs, schemas.AttrCustomerName) - - // Build common attributes for all metrics - otelAttrs := BuildBifrostAttributes(BifrostAttrParams{ - Provider: provider, - Model: model, - Method: method, - VirtualKeyID: virtualKeyID, - VirtualKeyName: virtualKeyName, - SelectedKeyID: selectedKeyID, - SelectedKeyName: selectedKeyName, - NumberOfRetries: numberOfRetries, - FallbackIndex: fallbackIndex, - TeamID: teamID, - TeamName: teamName, - CustomerID: customerID, - CustomerName: customerName, - }) + if !span.StartTime.IsZero() && !span.EndTime.IsZero() { + latencySeconds := span.EndTime.Sub(span.StartTime).Seconds() + m.RecordUpstreamLatency(ctx, latencySeconds, spanAttrs...) + } - // Record upstream request count - m.RecordUpstreamRequest(ctx, otelAttrs...) + if span.Status == schemas.SpanStatusError { + m.RecordErrorRequest(ctx, spanAttrs...) + } else { + m.RecordSuccessRequest(ctx, spanAttrs...) + } - // Record latency (from span duration) - if !llmSpan.StartTime.IsZero() && !llmSpan.EndTime.IsZero() { - latencySeconds := llmSpan.EndTime.Sub(llmSpan.StartTime).Seconds() - m.RecordUpstreamLatency(ctx, latencySeconds, otelAttrs...) + if finalSpan == nil || span.EndTime.After(finalSpan.EndTime) { + finalSpan = span + } } - // Record success or error based on span status - if llmSpan.Status == schemas.SpanStatusError { - m.RecordErrorRequest(ctx, otelAttrs...) - } else { - m.RecordSuccessRequest(ctx, otelAttrs...) + if finalSpan == nil { + finalSpan = trace.RootSpan } + if finalSpan == nil { + return + } + + attrs := finalSpan.Attributes + otelAttrs := buildSpanAttrs(finalSpan) // Record token usage - try both naming conventions inputTokens := getIntAttr(attrs, schemas.AttrPromptTokens) diff --git a/ui/app/workspace/observability/fragments/otelFormFragment.tsx b/ui/app/workspace/observability/fragments/otelFormFragment.tsx index 180e6e8d26..2099065477 100644 --- a/ui/app/workspace/observability/fragments/otelFormFragment.tsx +++ b/ui/app/workspace/observability/fragments/otelFormFragment.tsx @@ -44,7 +44,7 @@ interface OtelFormFragmentProps { service_name?: string; collector_url?: string; headers?: Record; - trace_type?: "genai_extension" | "vercel" | "open_inference"; + trace_type?: "genai_extension"; protocol?: "http" | "grpc"; tls_ca_cert?: string; insecure?: boolean; @@ -678,4 +678,4 @@ export function OtelFormFragment({ ); -} +} \ No newline at end of file diff --git a/ui/lib/types/schemas.ts b/ui/lib/types/schemas.ts index 6ec2e1c93b..6f4d48beb7 100644 --- a/ui/lib/types/schemas.ts +++ b/ui/lib/types/schemas.ts @@ -717,9 +717,7 @@ export const otelProfileConfigSchema = z service_name: z.string().optional(), collector_url: z.string().default(""), trace_type: z - .enum(["genai_extension", "vercel", "open_inference"], { - message: "Please select a trace type", - }) + .enum(["genai_extension"], { message: "Please select a trace type" }) .default("genai_extension"), headers: z.record(z.string(), z.string()).optional(), protocol: z