Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/helm-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,4 @@ jobs:
keep_files: false
enable_jekyll: false
user_name: "github-actions[bot]"
user_email: "github-actions[bot]@users.noreply.github.com"
user_email: "github-actions[bot]@users.noreply.github.com"
2 changes: 1 addition & 1 deletion .github/workflows/scripts/validate-schema-sync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ echo "=================================================================="
--schema "$REPO_ROOT/transports/config.schema.json" \
--pkg-root "$REPO_ROOT" \
--helm-values "$REPO_ROOT/helm-charts/bifrost/values.schema.json" \
--helm-helpers "$REPO_ROOT/helm-charts/bifrost/templates/_helpers.tpl"
--helm-helpers "$REPO_ROOT/helm-charts/bifrost/templates/_helpers.tpl"
49 changes: 47 additions & 2 deletions core/schemas/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,51 @@ func SafeExtractInt(value interface{}) (int, bool) {
}
}

// SafeExtractInt64 safely extracts an int64 value from an any with type checking
func SafeExtractInt64(value any) (int64, bool) {
if value == nil {
return 0, false
}
switch v := value.(type) {
case int:
return int64(v), true
case int8:
return int64(v), true
case int16:
return int64(v), true
case int32:
return int64(v), true
case int64:
return v, true
case uint:
return int64(v), true
case uint8:
return int64(v), true
case uint16:
return int64(v), true
case uint32:
return int64(v), true
case uint64:
return int64(v), true
Comment thread
greptile-apps[bot] marked this conversation as resolved.
case float32:
return int64(v), true
case float64:
return int64(v), true
case json.Number:
if intVal, err := v.Int64(); err == nil {
return int64(intVal), true
}
return 0, false
case string:
if intVal, err := strconv.ParseInt(v, 10, 64); err == nil {
return intVal, true
}
return 0, false
default:
return 0, false
}
Comment thread
sammaji marked this conversation as resolved.
}

// SafeExtractFloat64 safely extracts a float64 value from an interface{} with type checking
func SafeExtractFloat64(value interface{}) (float64, bool) {
if value == nil {
Expand Down Expand Up @@ -835,8 +880,8 @@ func DeepCopyChatTool(original ChatTool) ChatTool {

if original.Function.Parameters != nil {
copyParams := &ToolFunctionParameters{
Type: original.Function.Parameters.Type,
keyOrder: original.Function.Parameters.keyOrder,
Type: original.Function.Parameters.Type,
keyOrder: original.Function.Parameters.keyOrder,
explicitEmptyObject: original.Function.Parameters.explicitEmptyObject,
Comment thread
sammaji marked this conversation as resolved.
}

Expand Down
54 changes: 52 additions & 2 deletions framework/configstore/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ func triggerMigrations(ctx context.Context, db *gorm.DB) error {
if err := migrationNormalizeOtelTraceType(ctx, db); err != nil {
return err
}
if err := migrationMigrateOtelConfigToProfiles(ctx, db); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -6165,7 +6168,6 @@ func migrationAddMCPClientDiscoveredToolsColumns(ctx context.Context, db *gorm.D
}})
if err := m.Migrate(); err != nil {
return fmt.Errorf("error running add_mcp_client_discovered_tools_columns migration: %s", err.Error())

}
return nil
}
Expand Down Expand Up @@ -6275,7 +6277,6 @@ func migrationAddFlexTierPricingColumns(ctx context.Context, db *gorm.DB) error
}})
if err := m.Migrate(); err != nil {
return fmt.Errorf("error while running flex tier pricing columns migration: %s", err.Error())

}
return nil
}
Expand Down Expand Up @@ -6539,7 +6540,56 @@ func migrationNormalizeOtelTraceType(ctx context.Context, db *gorm.DB) error {
}})
if err := m.Migrate(); err != nil {
return fmt.Errorf("error running normalize_otel_trace_type migration: %s", err.Error())
}
return nil
}

// migrationMigrateOtelConfigToProfiles wraps the legacy single-collector OTEL
// plugin config into the new multi-profile shape. Old: {collector_url, protocol, ...}.
// New: {profiles: [{collector_url, protocol, ...}]}. No-op if the row is already
// in the new shape or the plugin is not configured.
func migrationMigrateOtelConfigToProfiles(ctx context.Context, db *gorm.DB) error {
m := migrator.New(db, migrator.DefaultOptions, []*migrator.Migration{{
ID: "migrate_otel_config_to_profiles",
Migrate: func(tx *gorm.DB) error {
tx = tx.WithContext(ctx)
var plugin tables.TablePlugin
err := tx.Where("name = ?", "otel").First(&plugin).Error
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil
}
return fmt.Errorf("failed to load otel plugin row: %w", err)
}
// AfterFind already decrypted ConfigJSON and unmarshaled into Config.
cfgMap, ok := plugin.Config.(map[string]any)
if !ok || len(cfgMap) == 0 {
return nil
}
// Already migrated.
if _, hasProfiles := cfgMap["profiles"]; hasProfiles {
return nil
}
// Old shape has collector_url at top level. If absent, nothing to migrate.
if _, hasCollector := cfgMap["collector_url"]; !hasCollector {
return nil
}
plugin.Config = map[string]any{
"profiles": []any{cfgMap},
}
// Force BeforeSave to re-serialize + re-encrypt.
plugin.ConfigJSON = ""
plugin.EncryptionStatus = tables.EncryptionStatusPlainText
if err := tx.Save(&plugin).Error; err != nil {
return fmt.Errorf("failed to save migrated otel config: %w", err)
Comment thread
sammaji marked this conversation as resolved.
}
log.Printf("[Migration] Wrapped legacy otel config into profiles[0]")
return nil
},
Rollback: func(tx *gorm.DB) error { return nil },
}})
if err := m.Migrate(); err != nil {
return fmt.Errorf("error running migrate_otel_config_to_profiles migration: %s", err.Error())
}
return nil
}
1 change: 1 addition & 0 deletions plugins/otel/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- feat: adds support for multiple otel profiles
111 changes: 64 additions & 47 deletions plugins/otel/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,25 @@ func hexToBytes(hexStr string, length int) []byte {
}

// convertTraceToResourceSpan converts a Bifrost trace to OTEL ResourceSpan
func (p *OtelPlugin) convertTraceToResourceSpan(trace *schemas.Trace) *ResourceSpan {
func (p *OtelProfile) convertTraceToResourceSpan(trace *schemas.Trace) *ResourceSpan {
otelSpans := make([]*Span, 0, len(trace.Spans))
for _, span := range trace.Spans {
otelSpans = append(otelSpans, p.convertSpanToOTELSpan(trace.TraceID, span))
otelSpans = append(otelSpans, convertSpanToOTELSpan(trace.TraceID, span))
}

return &ResourceSpan{
Resource: &resourcepb.Resource{
Attributes: p.getResourceAttributes(),
},
ScopeSpans: []*ScopeSpan{{
Scope: p.getInstrumentationScope(),
Spans: otelSpans,
Scope: p.getInstrumentationScope(),
Spans: otelSpans,
}},
}
}

// convertSpanToOTELSpan converts a single Bifrost span to OTEL format
func (p *OtelPlugin) convertSpanToOTELSpan(traceID string, span *schemas.Span) *Span {
func convertSpanToOTELSpan(traceID string, span *schemas.Span) *Span {
otelSpan := &Span{
TraceId: hexToBytes(traceID, 16),
SpanId: hexToBytes(span.SpanID, 8),
Expand All @@ -110,7 +110,7 @@ func (p *OtelPlugin) convertSpanToOTELSpan(traceID string, span *schemas.Span) *
}

// getResourceAttributes returns the resource attributes for the OTEL span
func (p *OtelPlugin) getResourceAttributes() []*KeyValue {
func (p *OtelProfile) getResourceAttributes() []*KeyValue {
attrs := []*KeyValue{
kvStr("service.name", p.serviceName),
kvStr("service.version", p.bifrostVersion),
Expand All @@ -123,7 +123,7 @@ func (p *OtelPlugin) getResourceAttributes() []*KeyValue {
}

// getInstrumentationScope returns the instrumentation scope for OTEL
func (p *OtelPlugin) getInstrumentationScope() *commonpb.InstrumentationScope {
func (p *OtelProfile) getInstrumentationScope() *commonpb.InstrumentationScope {
return &commonpb.InstrumentationScope{
Name: p.serviceName,
Version: p.bifrostVersion,
Expand Down Expand Up @@ -156,22 +156,18 @@ func anyToKeyValue(key string, value any) *KeyValue {
return nil
}
return kvStr(key, v)
case int:
return kvInt(key, int64(v))
case int32:
return kvInt(key, int64(v))
case int64:
return kvInt(key, v)
case uint:
return kvInt(key, int64(v))
case uint32:
return kvInt(key, int64(v))
case uint64:
return kvInt(key, int64(v))
case float32:
return kvDbl(key, float64(v))
case float64:
return kvDbl(key, v)
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
intValue, ok := schemas.SafeExtractInt64(v)
if !ok {
return nil
}
return kvInt(key, intValue)
case float32, float64:
floatValue, ok := schemas.SafeExtractFloat64(v)
if !ok {
return nil
}
return kvDbl(key, floatValue)
case bool:
return kvBool(key, v)
case []string:
Expand All @@ -184,32 +180,29 @@ func anyToKeyValue(key string, value any) *KeyValue {
}
return kvAny(key, arrValue(vals...))
case []int:
if len(v) == 0 {
return nil
}
vals := make([]*AnyValue, len(v))
for i, n := range v {
vals[i] = &AnyValue{Value: &IntValue{IntValue: int64(n)}}
}
return kvAny(key, arrValue(vals...))
return intSliceToKeyValue(key, v)
case []int8:
return intSliceToKeyValue(key, v)
case []int16:
return intSliceToKeyValue(key, v)
case []int32:
return intSliceToKeyValue(key, v)
case []int64:
if len(v) == 0 {
return nil
}
vals := make([]*AnyValue, len(v))
for i, n := range v {
vals[i] = &AnyValue{Value: &IntValue{IntValue: n}}
}
return kvAny(key, arrValue(vals...))
return intSliceToKeyValue(key, v)
case []uint:
return intSliceToKeyValue(key, v)
case []uint8:
return intSliceToKeyValue(key, v)
Comment thread
sammaji marked this conversation as resolved.
case []uint16:
return intSliceToKeyValue(key, v)
case []uint32:
return intSliceToKeyValue(key, v)
case []uint64:
return intSliceToKeyValue(key, v)
case []float32:
return floatSliceToAttribute(key, v)
case []float64:
if len(v) == 0 {
return nil
}
vals := make([]*AnyValue, len(v))
for i, n := range v {
vals[i] = &AnyValue{Value: &DoubleValue{DoubleValue: n}}
}
return kvAny(key, arrValue(vals...))
return floatSliceToAttribute(key, v)
case map[string]any:
if len(v) == 0 {
return nil
Expand All @@ -228,6 +221,30 @@ func anyToKeyValue(key string, value any) *KeyValue {
}
}

func intSliceToKeyValue[T int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64](key string, slice []T) *KeyValue {
if len(slice) == 0 {
return nil
}
vals := make([]*AnyValue, len(slice))
for i, n := range slice {
iv, _ := schemas.SafeExtractInt64(n)
vals[i] = &AnyValue{Value: &IntValue{IntValue: iv}}
}
return kvAny(key, arrValue(vals...))
Comment thread
sammaji marked this conversation as resolved.
}

func floatSliceToAttribute[T float32 | float64](key string, slice []T) *KeyValue {
if len(slice) == 0 {
return nil
}
vals := make([]*AnyValue, len(slice))
for i, n := range slice {
fv, _ := schemas.SafeExtractFloat64(n)
vals[i] = &AnyValue{Value: &DoubleValue{DoubleValue: fv}}
}
return kvAny(key, arrValue(vals...))
}

// convertSpanKind maps Bifrost SpanKind to OTEL SpanKind
func convertSpanKind(kind schemas.SpanKind) tracepb.Span_SpanKind {
switch kind {
Expand Down
2 changes: 1 addition & 1 deletion plugins/otel/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ require (
)

require (
github.com/bytedance/sonic v1.15.0
github.com/bytedance/sonic v1.15.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
Expand Down
35 changes: 6 additions & 29 deletions plugins/otel/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package otel

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"

collectorpb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
Expand All @@ -24,33 +20,14 @@ type OtelClientGRPC struct {
// NewOtelClientGRPC creates a new OpenTelemetry client for gRPC
func NewOtelClientGRPC(endpoint string, headers map[string]string, tlsCACert string, insecureMode bool) (*OtelClientGRPC, error) {
var creds credentials.TransportCredentials
// TLS priority: custom CA > system roots > insecure
if tlsCACert != "" {
// Validate the CA cert path to prevent path traversal attacks
if err := validateCACertPath(tlsCACert); err != nil {
return nil, err
}
// Use custom CA certificate with MinVersion
caCert, err := os.ReadFile(tlsCACert)
if err != nil {
return nil, fmt.Errorf("fail to load provided CA cert: %w", err)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("fail to parse provided CA cert")
}
tlsConfig := &tls.Config{
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
creds = credentials.NewTLS(tlsConfig)
} else if insecureMode {
// Skip TLS entirely
// gRPC insecure mode uses plaintext (no TLS at all), not just skip-verify.
// buildTLSConfig is bypassed here to preserve that behaviour.
if tlsCACert == "" && insecureMode {
creds = insecure.NewCredentials()
} else {
// Use system root CAs with MinVersion
tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
tlsConfig, err := buildTLSConfig(tlsCACert, false)
if err != nil {
return nil, err
}
creds = credentials.NewTLS(tlsConfig)
}
Expand Down
Loading
Loading