diff --git a/.chloggen/configoptional-scalars-2.yaml b/.chloggen/configoptional-scalars-2.yaml new file mode 100644 index 000000000000..66f9770f3959 --- /dev/null +++ b/.chloggen/configoptional-scalars-2.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: all + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for marshaling and unmarshaling scalar values + +# One or more tracking issues or pull requests related to the change +issues: [13421] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This allows using fields like `Optional[int]` + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/configoptional-scalars.yaml b/.chloggen/configoptional-scalars.yaml new file mode 100644 index 000000000000..4de7d609d861 --- /dev/null +++ b/.chloggen/configoptional-scalars.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pkg/confmap + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `WithScalarMarshaler` and `WithScalarUnmarshaler` options to support handling scalar values in confmap + +# One or more tracking issues or pull requests related to the change +issues: [13421] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/config/configopaque/maplist.go b/config/configopaque/maplist.go index 71a206b633f1..435235871ec1 100644 --- a/config/configopaque/maplist.go +++ b/config/configopaque/maplist.go @@ -4,12 +4,10 @@ package configopaque // import "go.opentelemetry.io/collector/config/configopaque" import ( - "cmp" "fmt" "iter" "slices" - "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/xconfmap" ) @@ -28,27 +26,32 @@ type Pair struct { // Pairs are assumed to have distinct names. This is checked during config validation. type MapList []Pair -var _ confmap.Unmarshaler = (*MapList)(nil) +type MapFormat map[string]String -// Unmarshal is called by the Collector when unmarshalling from a map. -// When the input config is a slice, this will be skipped, -// and mapstructure's default unmarshalling logic will be used. -func (ml *MapList) Unmarshal(conf *confmap.Conf) error { - var m2 map[string]String - if err := conf.Unmarshal(&m2); err != nil { - return err +var _ xconfmap.ConfigMigrator = MapFormat{} + +func (mm MapFormat) Migrate(cfg any) (bool, error) { + ml, ok := cfg.(*MapList) + if !ok { + return false, nil + } + if ml == nil { + return false, nil + } + + for key, val := range mm { + ml.Set(key, val) } - *ml = make(MapList, 0, len(m2)) - for name, value := range m2 { - *ml = append(*ml, Pair{ - Name: name, - Value: value, - }) + + return true, nil +} + +var _ xconfmap.MigrateableConfig = (*MapList)(nil) + +func (ml MapList) Migrations() []xconfmap.ConfigMigrator { + return []xconfmap.ConfigMigrator{ + MapFormat{}, } - slices.SortFunc(*ml, func(p1, p2 Pair) int { - return cmp.Compare(p1.Name, p2.Name) - }) - return nil } var _ xconfmap.Validator = MapList(nil) diff --git a/config/configopaque/maplist_test.go b/config/configopaque/maplist_test.go index a61f6217fe62..d8113dc2c490 100644 --- a/config/configopaque/maplist_test.go +++ b/config/configopaque/maplist_test.go @@ -55,7 +55,7 @@ func TestMapListDuality(t *testing.T) { conf1, err := retrieved1.AsConf() require.NoError(t, err) var tc1 testConfig - require.NoError(t, conf1.Unmarshal(&tc1)) + require.NoError(t, conf1.Unmarshal(&tc1, xconfmap.WithScalarUnmarshaler())) assert.NoError(t, xconfmap.Validate(&tc1)) retrieved2, err := confmap.NewRetrievedFromYAML([]byte(headersMap)) @@ -63,7 +63,7 @@ func TestMapListDuality(t *testing.T) { conf2, err := retrieved2.AsConf() require.NoError(t, err) var tc2 testConfig - require.NoError(t, conf2.Unmarshal(&tc2)) + require.NoError(t, conf2.Unmarshal(&tc2, xconfmap.WithScalarUnmarshaler())) assert.NoError(t, xconfmap.Validate(&tc2)) assert.Equal(t, tc1, tc2) diff --git a/config/configoptional/optional.go b/config/configoptional/optional.go index c510a021b374..4dae2ffb30bd 100644 --- a/config/configoptional/optional.go +++ b/config/configoptional/optional.go @@ -36,6 +36,9 @@ type Optional[T any] struct { // flavor indicates the flavor of the Optional. // The zero value of flavor is noneFlavor. flavor flavor + + // Whether `enabled` was used + manuallySet bool } // deref a reflect.Type to its underlying type. @@ -157,7 +160,7 @@ func (o *Optional[T]) GetOrInsertDefault() *T { } empty := confmap.NewFromStringMap(map[string]any{}) - if err := empty.Unmarshal(o); err != nil { + if err := empty.Unmarshal(o, xconfmap.WithScalarUnmarshaler()); err != nil { // This should never happen, if it happens it is a bug, so this panic is not documented. panic(fmt.Errorf("failed to unmarshal empty map into %T type: %w. Please report this bug", o.value, err)) } @@ -165,7 +168,10 @@ func (o *Optional[T]) GetOrInsertDefault() *T { return o.Get() } -var _ confmap.Unmarshaler = (*Optional[any])(nil) +var ( + // _ confmap.Unmarshaler = (*Optional[any])(nil) + _ xconfmap.ScalarUnmarshaler = (*Optional[any])(nil) +) var ( addEnabledFieldFeatureGateID = "configoptional.AddEnabledField" @@ -193,41 +199,97 @@ var ( // // T must be derefenceable to a type with struct kind and not have an 'enabled' field. // Scalar values are not supported. -func (o *Optional[T]) Unmarshal(conf *confmap.Conf) error { - if err := assertNoEnabledField[T](); err != nil { - return err +// func (o *Optional[T]) Unmarshal(conf *confmap.Conf) error { +// if err := assertNoEnabledField[T](); err != nil { +// return err +// } + +// if o.flavor == noneFlavor && conf.ToStringMap() == nil { +// // If the Optional is None and the configuration is nil, we do nothing. +// // This replicates the behavior of unmarshaling into a field with a nil pointer. +// return nil +// } + +// isEnabled := true +// if addEnabledFieldFeatureGate.IsEnabled() && conf.IsSet("enabled") { +// enabled := conf.Get("enabled") +// conf.Delete("enabled") +// var ok bool +// if isEnabled, ok = enabled.(bool); !ok { +// return fmt.Errorf("unexpected type %T for 'enabled': got '%v' value expected 'true' or 'false'", enabled, enabled) +// } +// } + +// if err := conf.Unmarshal(&o.value, xconfmap.WithScalarUnmarshaler()); err != nil { +// return err +// } + +// if isEnabled { +// o.flavor = someFlavor +// } else { +// o.flavor = noneFlavor +// } + +// return nil +// } + +var _ xconfmap.ConfigMigrator = (*EnabledField[any])(nil) + +type EnabledField[T any] struct { + Enabled bool `mapstructure:"enabled"` +} + +func (ef EnabledField[T]) Migrate(val any) (bool, error) { + o, ok := val.(*Optional[T]) + if !ok { + return false, fmt.Errorf("expected Optional type but got %T", val) } - if o.flavor == noneFlavor && conf.ToStringMap() == nil { - // If the Optional is None and the configuration is nil, we do nothing. - // This replicates the behavior of unmarshaling into a field with a nil pointer. - return nil + if ef.Enabled { + o.flavor = someFlavor + } else { + o.flavor = noneFlavor } - isEnabled := true - if addEnabledFieldFeatureGate.IsEnabled() && conf.IsSet("enabled") { - enabled := conf.Get("enabled") - conf.Delete("enabled") - var ok bool - if isEnabled, ok = enabled.(bool); !ok { - return fmt.Errorf("unexpected type %T for 'enabled': got '%v' value expected 'true' or 'false'", enabled, enabled) - } + o.manuallySet = true + + return true, nil +} + +func (o *Optional[T]) UnmarshalScalar(val any) error { + if val == nil { + return nil } - if err := conf.Unmarshal(&o.value); err != nil { - return err + v, ok := val.(T) + if !ok { + return fmt.Errorf("val is %T, not %T", val, v) } + o.value = v - if isEnabled { + if !o.manuallySet { o.flavor = someFlavor - } else { - o.flavor = noneFlavor } return nil } -var _ confmap.Marshaler = (*Optional[any])(nil) +func (o *Optional[T]) ScalarType() any { + return o.value +} + +var _ xconfmap.MigrateableConfig = (*Optional[any])(nil) + +func (o *Optional[T]) Migrations() []xconfmap.ConfigMigrator { + return []xconfmap.ConfigMigrator{ + EnabledField[T]{}, + } +} + +var ( + _ confmap.Marshaler = (*Optional[any])(nil) + _ xconfmap.ScalarMarshaler = (*Optional[any])(nil) +) // Marshal the Optional value into the configuration. // If the Optional is None or Default, it does not marshal anything. @@ -245,13 +307,21 @@ func (o Optional[T]) Marshal(conf *confmap.Conf) error { return conf.Marshal(map[string]any(nil)) } - if err := conf.Marshal(o.value); err != nil { + if err := conf.Marshal(o.value, xconfmap.WithScalarMarshaler()); err != nil { return fmt.Errorf("configoptional: failed to marshal Optional value: %w", err) } return nil } +func (o Optional[T]) GetScalarValue() (any, error) { + if o.flavor == noneFlavor || o.flavor == defaultFlavor { + return nil, nil + } + + return o.value, nil +} + var _ xconfmap.Validator = (*Optional[any])(nil) // Validate implements [xconfmap.Validator]. This is required because the diff --git a/config/configoptional/optional_test.go b/config/configoptional/optional_test.go index ff301b518141..c4f03610a34e 100644 --- a/config/configoptional/optional_test.go +++ b/config/configoptional/optional_test.go @@ -369,7 +369,7 @@ func TestUnmarshalOptional(t *testing.T) { t.Run(test.name, func(t *testing.T) { cfg := test.defaultCfg conf := confmap.NewFromStringMap(test.config) - require.NoError(t, conf.Unmarshal(&cfg)) + require.NoError(t, conf.Unmarshal(&cfg), xconfmap.WithScalarUnmarshaler()) require.Equal(t, test.expectedSub, cfg.Sub1.HasValue()) if test.expectedSub { require.Equal(t, test.expectedFoo, cfg.Sub1.Get().Foo) @@ -519,7 +519,7 @@ func TestAddFieldEnabledFeatureGate(t *testing.T) { t.Run(test.name, func(t *testing.T) { cfg := test.defaultCfg conf := confmap.NewFromStringMap(test.config) - require.NoError(t, conf.Unmarshal(&cfg)) + require.NoError(t, conf.Unmarshal(&cfg, xconfmap.WithScalarUnmarshaler())) require.Equal(t, test.expectedSub, cfg.Sub1.HasValue()) if test.expectedSub { require.Equal(t, test.expectedFoo, cfg.Sub1.Get().Foo) @@ -542,7 +542,7 @@ func TestUnmarshalErrorEnabledInvalidType(t *testing.T) { cfg := Config[Sub]{ Sub1: None[Sub](), } - err := cm.Unmarshal(&cfg) + err := cm.Unmarshal(&cfg, xconfmap.WithScalarUnmarshaler()) require.ErrorContains(t, err, "unexpected type string for 'enabled': got 'something' value expected 'true' or 'false'") } @@ -552,7 +552,7 @@ func TestUnmarshalErrorEnabledField(t *testing.T) { }) // Use zero value to avoid panic on constructor. var none Optional[WithEnabled] - require.Error(t, cm.Unmarshal(&none)) + require.Error(t, cm.Unmarshal(&none), xconfmap.WithScalarUnmarshaler()) } func TestUnmarshalConfigPointer(t *testing.T) { @@ -563,7 +563,7 @@ func TestUnmarshalConfigPointer(t *testing.T) { }) var cfg Config[*Sub] - err := cm.Unmarshal(&cfg) + err := cm.Unmarshal(&cfg, xconfmap.WithScalarUnmarshaler()) require.NoError(t, err) assert.True(t, cfg.Sub1.HasValue()) assert.Equal(t, "bar", (*cfg.Sub1.Get()).Foo) @@ -580,7 +580,7 @@ func TestUnmarshalErr(t *testing.T) { assert.False(t, cfg.Sub1.HasValue()) - err := cm.Unmarshal(&cfg) + err := cm.Unmarshal(&cfg, xconfmap.WithScalarUnmarshaler()) require.Error(t, err) require.ErrorContains(t, err, "has invalid keys: field") assert.False(t, cfg.Sub1.HasValue()) @@ -606,7 +606,7 @@ func TestSquashedOptional(t *testing.T) { Default(myIntDefault), } - err := cm.Unmarshal(&cfg) + err := cm.Unmarshal(&cfg, xconfmap.WithScalarUnmarshaler()) require.NoError(t, err) assert.True(t, cfg.HasValue()) @@ -687,7 +687,7 @@ func TestOptionalMarshal(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { conf := confmap.New() - require.NoError(t, conf.Marshal(test.value)) + require.NoError(t, conf.Marshal(test.value, xconfmap.WithScalarMarshaler())) assert.Equal(t, test.expected, conf.ToStringMap()) }) } @@ -717,11 +717,11 @@ func TestComparePointerMarshal(t *testing.T) { t.Run(fmt.Sprintf("%v vs %v", test.pointer, test.optional), func(t *testing.T) { wrapPointer := Wrap[*Sub]{Sub1: test.pointer} confPointer := confmap.NewFromStringMap(nil) - require.NoError(t, confPointer.Marshal(wrapPointer)) + require.NoError(t, confPointer.Marshal(wrapPointer, xconfmap.WithScalarMarshaler())) wrapOptional := Wrap[Optional[Sub]]{Sub1: test.optional} confOptional := confmap.NewFromStringMap(nil) - require.NoError(t, confOptional.Marshal(wrapOptional)) + require.NoError(t, confOptional.Marshal(wrapOptional, xconfmap.WithScalarMarshaler())) assert.Equal(t, confPointer.ToStringMap(), confOptional.ToStringMap()) }) @@ -732,11 +732,11 @@ func TestComparePointerMarshal(t *testing.T) { t.Run(fmt.Sprintf("%v vs %v (omitempty)", test.pointer, test.optional), func(t *testing.T) { wrapPointer := WrapOmitEmpty[*Sub]{Sub1: test.pointer} confPointer := confmap.NewFromStringMap(nil) - require.NoError(t, confPointer.Marshal(wrapPointer)) + require.NoError(t, confPointer.Marshal(wrapPointer, xconfmap.WithScalarMarshaler())) wrapOptional := WrapOmitEmpty[Optional[Sub]]{Sub1: test.optional} confOptional := confmap.NewFromStringMap(nil) - require.NoError(t, confOptional.Marshal(wrapOptional)) + require.NoError(t, confOptional.Marshal(wrapOptional, xconfmap.WithScalarMarshaler())) assert.Equal(t, confPointer.ToStringMap(), confOptional.ToStringMap()) }) diff --git a/confmap/internal/conf.go b/confmap/internal/conf.go index 1760cdd425cb..8ac0beefc39e 100644 --- a/confmap/internal/conf.go +++ b/confmap/internal/conf.go @@ -11,8 +11,6 @@ import ( "github.com/knadh/koanf/maps" "github.com/knadh/koanf/providers/confmap" "github.com/knadh/koanf/v2" - - encoder "go.opentelemetry.io/collector/confmap/internal/mapstructure" ) const ( @@ -67,8 +65,7 @@ func (l *Conf) Marshal(rawVal any, opts ...MarshalOption) error { for _, opt := range opts { opt.apply(&set) } - enc := encoder.New(EncoderConfig(rawVal, set)) - data, err := enc.Encode(rawVal) + data, err := Encode(rawVal, set) if err != nil { return err } diff --git a/confmap/internal/decoder.go b/confmap/internal/decoder.go index 17f172a2ec34..97d795fff190 100644 --- a/confmap/internal/decoder.go +++ b/confmap/internal/decoder.go @@ -40,6 +40,22 @@ func WithIgnoreUnused() UnmarshalOption { // Decodes time.Duration from strings. Allows custom unmarshaling for structs implementing // encoding.TextUnmarshaler. Allows custom unmarshaling for structs implementing confmap.Unmarshaler. func Decode(input, result any, settings UnmarshalOptions, skipTopLevelUnmarshaler bool) error { + hooks := []mapstructure.DecodeHookFunc{ + useExpandValue(), + expandNilStructPointersHookFunc(), + mapstructure.StringToSliceHookFunc(","), + mapKeyStringToMapKeyTextUnmarshalerHookFunc(), + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.TextUnmarshallerHookFunc(), + } + hooks = append(hooks, settings.AdditionalDecodeHookFuncs...) + hooks = append(hooks, + unmarshalerHookFunc(result, skipTopLevelUnmarshaler), + // after the main unmarshaler hook is called, + // we unmarshal the embedded structs if present to merge with the result: + unmarshalerEmbeddedStructsHookFunc(), + zeroSliceAndMapHookFunc(), + ) dc := &mapstructure.DecoderConfig{ ErrorUnused: !settings.IgnoreUnused, Result: result, @@ -47,20 +63,9 @@ func Decode(input, result any, settings UnmarshalOptions, skipTopLevelUnmarshale WeaklyTypedInput: false, MatchName: caseSensitiveMatchName, DecodeNil: true, - DecodeHook: composehook.ComposeDecodeHookFunc( - useExpandValue(), - expandNilStructPointersHookFunc(), - mapstructure.StringToSliceHookFunc(","), - mapKeyStringToMapKeyTextUnmarshalerHookFunc(), - mapstructure.StringToTimeDurationHookFunc(), - mapstructure.TextUnmarshallerHookFunc(), - unmarshalerHookFunc(result, skipTopLevelUnmarshaler), - // after the main unmarshaler hook is called, - // we unmarshal the embedded structs if present to merge with the result: - unmarshalerEmbeddedStructsHookFunc(), - zeroSliceAndMapHookFunc(), - ), + DecodeHook: composehook.ComposeDecodeHookFunc(hooks...), } + decoder, err := mapstructure.NewDecoder(dc) if err != nil { return err diff --git a/confmap/internal/encoder.go b/confmap/internal/encoder.go index 6df9675b41a5..8e71d19fc534 100644 --- a/confmap/internal/encoder.go +++ b/confmap/internal/encoder.go @@ -11,16 +11,33 @@ import ( encoder "go.opentelemetry.io/collector/confmap/internal/mapstructure" ) +func Encode(rawVal any, set MarshalOptions) (any, error) { + enc := encoder.New(EncoderConfig(rawVal, set)) + data, err := enc.Encode(rawVal) + if err != nil { + return nil, err + } + return data, nil +} + // EncoderConfig returns a default encoder.EncoderConfig that includes // an EncodeHook that handles both TextMarshaler and Marshaler // interfaces. -func EncoderConfig(rawVal any, _ MarshalOptions) *encoder.EncoderConfig { +func EncoderConfig(rawVal any, set MarshalOptions) *encoder.EncoderConfig { + hooks := []mapstructure.DecodeHookFunc{ + encoder.YamlMarshalerHookFunc(), + encoder.TextMarshalerHookFunc(), + } + + if set.ScalarMarshalingEncodeHookFunc != nil { + hooks = append(hooks, set.ScalarMarshalingEncodeHookFunc) + } + + // This hook must come after the scalar marshaling hook, if present. + hooks = append(hooks, marshalerHookFunc(rawVal)) + return &encoder.EncoderConfig{ - EncodeHook: mapstructure.ComposeDecodeHookFunc( - encoder.YamlMarshalerHookFunc(), - encoder.TextMarshalerHookFunc(), - marshalerHookFunc(rawVal), - ), + EncodeHook: mapstructure.ComposeDecodeHookFunc(hooks...), } } diff --git a/confmap/internal/marshaloption.go b/confmap/internal/marshaloption.go index 06d5726b918b..482426c3c4dd 100644 --- a/confmap/internal/marshaloption.go +++ b/confmap/internal/marshaloption.go @@ -3,16 +3,27 @@ package internal // import "go.opentelemetry.io/collector/confmap/internal" +import "github.com/go-viper/mapstructure/v2" + type MarshalOption interface { apply(*MarshalOptions) } // MarshalOptions is used by (*Conf).Marshal to toggle unmarshaling settings. // It is in the `internal` package so experimental options can be added in xconfmap. -type MarshalOptions struct{} +type MarshalOptions struct { + ScalarMarshalingEncodeHookFunc mapstructure.DecodeHookFunc +} type MarshalOptionFunc func(*MarshalOptions) func (fn MarshalOptionFunc) apply(set *MarshalOptions) { fn(set) } + +// ApplyMarshalOption simply calls (MarshalOption).apply. This function allows us +// to keep the `apply“ function private and therefore keep `confmap.MarshalOption` +// without any exported methods. +func ApplyMarshalOption(mo MarshalOption, set *MarshalOptions) { + mo.apply(set) +} diff --git a/confmap/internal/unmarshaloption.go b/confmap/internal/unmarshaloption.go index e82ce6fd0cc3..0240bf628393 100644 --- a/confmap/internal/unmarshaloption.go +++ b/confmap/internal/unmarshaloption.go @@ -3,6 +3,8 @@ package internal // import "go.opentelemetry.io/collector/confmap/internal" +import "github.com/go-viper/mapstructure/v2" + type UnmarshalOption interface { apply(*UnmarshalOptions) } @@ -10,7 +12,8 @@ type UnmarshalOption interface { // UnmarshalOptions is used by (*Conf).Unmarshal to toggle unmarshaling settings. // It is in the `internal` package so experimental options can be added in xconfmap. type UnmarshalOptions struct { - IgnoreUnused bool + IgnoreUnused bool + AdditionalDecodeHookFuncs []mapstructure.DecodeHookFunc } type UnmarshalOptionFunc func(*UnmarshalOptions) @@ -18,3 +21,10 @@ type UnmarshalOptionFunc func(*UnmarshalOptions) func (fn UnmarshalOptionFunc) apply(set *UnmarshalOptions) { fn(set) } + +// ApplyUnmarshalOption simply calls (UnmarshalOption).apply. This function allows us +// to keep the `apply“ function private and therefore keep `confmap.UnmarshalOption` +// without any exported methods. +func ApplyUnmarshalOption(uo UnmarshalOption, set *UnmarshalOptions) { + uo.apply(set) +} diff --git a/confmap/xconfmap/go.mod b/confmap/xconfmap/go.mod index ec55bbe05964..9fe93b76db3a 100644 --- a/confmap/xconfmap/go.mod +++ b/confmap/xconfmap/go.mod @@ -3,13 +3,13 @@ module go.opentelemetry.io/collector/confmap/xconfmap go 1.24.0 require ( + github.com/go-viper/mapstructure/v2 v2.4.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/confmap v1.45.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect diff --git a/confmap/xconfmap/scalarmarshaler.go b/confmap/xconfmap/scalarmarshaler.go new file mode 100644 index 000000000000..da9c97343a67 --- /dev/null +++ b/confmap/xconfmap/scalarmarshaler.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xconfmap // import "go.opentelemetry.io/collector/confmap/xconfmap" + +import ( + "reflect" + + "github.com/go-viper/mapstructure/v2" + + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/internal" +) + +func WithScalarMarshaler() confmap.MarshalOption { + return internal.MarshalOptionFunc(func(mo *internal.MarshalOptions) { + mo.ScalarMarshalingEncodeHookFunc = scalarmarshalerHookFunc(mo) + }) +} + +// ScalarUnmarshaler is an interface which may be implemented by wrapper types +// to customize their behavior when the type under the wrapper is a scalar value. +type ScalarMarshaler interface { + // GetScalarValue gets the scalar value and marshals it. + // The struct implementing the interface is free to + // do any pre-processing to the value as part of marshaling. + GetScalarValue() (any, error) +} + +// Provides a mechanism for individual structs to define their own unmarshal logic, +// by implementing the Unmarshaler interface, unless skipTopLevelUnmarshaler is +// true and the struct matches the top level object being unmarshaled. +func scalarmarshalerHookFunc(mo *internal.MarshalOptions) mapstructure.DecodeHookFuncValue { + return safeWrapDecodeHookFunc(func(from reflect.Value, _ reflect.Value) (any, error) { + marshaler, ok := from.Interface().(ScalarMarshaler) + if !ok { + return from.Interface(), nil + } + + v, err := marshaler.GetScalarValue() + if err != nil { + return nil, err + } + + res, err := internal.Encode(v, *mo) + if err != nil { + return nil, err + } + + if res == nil { + return nil, nil + } + + return res, nil + }) +} diff --git a/confmap/xconfmap/scalarmarshaler_test.go b/confmap/xconfmap/scalarmarshaler_test.go new file mode 100644 index 000000000000..1ee8664a9c09 --- /dev/null +++ b/confmap/xconfmap/scalarmarshaler_test.go @@ -0,0 +1,152 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xconfmap + +import ( + "bytes" + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func (tms textMarshalerStruct) MarshalText() ([]byte, error) { + return tms.data, nil +} + +func (tms *textMarshalerStruct) UnmarshalText(data []byte) error { + tms.data = data + return nil +} + +type textMarshalerStruct struct { + id int + data []byte +} + +type nonTextMarshalerStruct struct { + id int + data []byte +} + +type textMarshalerAlias string + +func (tma textMarshalerAlias) MarshalText() ([]byte, error) { + return bytes.NewBufferString(string(tma)).Bytes(), nil +} + +func (tma *textMarshalerAlias) UnmarshalText(data []byte) error { + *tma = textMarshalerAlias(data) + return nil +} + +type nonTextMarshalerAlias string + +type NonImplWrapperType[T any] struct { + inner T `mapstructure:"-"` +} + +var _ confmap.Unmarshaler = (*wrapperType[any])(nil) +var _ ScalarMarshaler = wrapperType[any]{} +var _ ScalarUnmarshaler = (*wrapperType[any])(nil) + +type wrapperType[T any] struct { + inner T `mapstructure:"-"` +} + +func (wt *wrapperType[T]) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(&wt.inner, WithScalarUnmarshaler()); err != nil { + return err + } + + return nil +} + +func (wt wrapperType[T]) Marshal(conf *confmap.Conf) error { + if err := conf.Marshal(wt.inner, WithScalarMarshaler()); err != nil { + return fmt.Errorf("failed to marshal wrapperType value: %w", err) + } + + return nil +} + +func (wt wrapperType[T]) MarshalScalar(in *string) (any, error) { + if in != nil { + return *in, nil + } + + return wt.inner, nil +} + +func (wt wrapperType[T]) GetScalarValue() (any, error) { + return wt.inner, nil +} + +func (wt *wrapperType[T]) UnmarshalScalar(val any) error { + v, ok := val.(T) + + if !ok { + return fmt.Errorf("could not unmarshal scalar: val is %T, not %T", val, v) + } + + wt.inner = v + return nil +} + +func (wt *wrapperType[T]) ScalarType() any { + return wt.inner +} + +type testConfig struct { + // Handled by confmap, treated as string + Tma textMarshalerAlias `mapstructure:"text_marshaler_alias"` + Ntma nonTextMarshalerAlias `mapstructure:"non_text_marshaler_alias"` + Nonimplint NonImplWrapperType[int] `mapstructure:"non_impl_int"` + Nonimplstr NonImplWrapperType[string] `mapstructure:"non_impl_str"` + Nonimpltms NonImplWrapperType[textMarshalerStruct] `mapstructure:"non_impl_text_marshaler_struct"` + Nonimplntms NonImplWrapperType[nonTextMarshalerStruct] `mapstructure:"non_impl_non_text_marshaler_struct"` + Implint wrapperType[int] `mapstructure:"impl_int"` + Implstr wrapperType[string] `mapstructure:"impl_str"` + Impltms wrapperType[textMarshalerStruct] `mapstructure:"impl_text_marshaler_struct"` + Implntms wrapperType[nonTextMarshalerStruct] `mapstructure:"impl_non_text_marshaler_struct"` + Recursive wrapperType[wrapperType[textMarshalerStruct]] `mapstructure:"recursive"` +} + +func (cfg *testConfig) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(cfg, WithScalarUnmarshaler()); err != nil { + return err + } + + return nil +} + +func TestMarshalConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + wantCfg := &testConfig{} + require.NoError(t, cm.Unmarshal(wantCfg, WithScalarUnmarshaler())) + require.NoError(t, cm.Marshal(wantCfg, WithScalarMarshaler())) + + conf := confmap.New() + cfg := &testConfig{ + Tma: textMarshalerAlias("test"), + Ntma: nonTextMarshalerAlias("test"), + Nonimplint: NonImplWrapperType[int]{inner: 1}, + Nonimplstr: NonImplWrapperType[string]{inner: "test"}, + Nonimpltms: NonImplWrapperType[textMarshalerStruct]{inner: textMarshalerStruct{id: 0, data: []byte{47}}}, + Nonimplntms: NonImplWrapperType[nonTextMarshalerStruct]{inner: nonTextMarshalerStruct{id: 2, data: []byte{48}}}, + Implint: wrapperType[int]{inner: 1}, + Implstr: wrapperType[string]{inner: "test"}, + Impltms: wrapperType[textMarshalerStruct]{inner: textMarshalerStruct{id: 0, data: []byte{81}}}, + Implntms: wrapperType[nonTextMarshalerStruct]{inner: nonTextMarshalerStruct{id: 2, data: []byte{80}}}, + Recursive: wrapperType[wrapperType[textMarshalerStruct]]{inner: wrapperType[textMarshalerStruct]{inner: textMarshalerStruct{id: 2, data: []byte{80}}}}, + } + + require.NoError(t, conf.Marshal(cfg, WithScalarMarshaler())) + require.EqualValues(t, cm.ToStringMap(), conf.ToStringMap()) +} diff --git a/confmap/xconfmap/scalarunmarshaler.go b/confmap/xconfmap/scalarunmarshaler.go new file mode 100644 index 000000000000..54b5a2344091 --- /dev/null +++ b/confmap/xconfmap/scalarunmarshaler.go @@ -0,0 +1,154 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xconfmap // import "go.opentelemetry.io/collector/confmap/xconfmap" + +import ( + "fmt" + "reflect" + "strings" + + "github.com/go-viper/mapstructure/v2" + + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/internal" +) + +func WithScalarUnmarshaler() confmap.UnmarshalOption { + return internal.UnmarshalOptionFunc(func(uo *internal.UnmarshalOptions) { + uo.AdditionalDecodeHookFuncs = append(uo.AdditionalDecodeHookFuncs, scalarunmarshalerHookFunc(uo)) + }) +} + +// ScalarUnmarshaler is an interface which may be implemented by wrapper types +// to customize their behavior when the type under the wrapper is a scalar value. +type ScalarUnmarshaler interface { + //UnmarshalScalar unmarshals a scalar into a value in a custom way. + UnmarshalScalar(val any) error + + // ScalarType returns a value that can be used to get the type + // of the scalar using reflection. + ScalarType() any +} + +type MigrateableConfig interface { + Migrations() []ConfigMigrator +} + +type ConfigMigrator interface { + Migrate(cfg any) (bool, error) +} + +// Provides a mechanism for individual structs to define their own unmarshal logic, +// by implementing the Unmarshaler interface, unless skipTopLevelUnmarshaler is +// true and the struct matches the top level object being unmarshaled. +func scalarunmarshalerHookFunc(opts *internal.UnmarshalOptions) mapstructure.DecodeHookFuncValue { + return safeWrapDecodeHookFunc(func(from, to reflect.Value) (any, error) { + if !to.CanAddr() { + return from.Interface(), nil + } + + // if from.Kind() == reflect.Struct || + // from.Kind() == reflect.Pointer && from.Elem().Kind() == reflect.Struct { + // return from.Interface(), nil + // } + + toPtr := to.Addr().Interface() + + fm, fmOk := from.Interface().(map[string]interface{}) + + m, ok := toPtr.(MigrateableConfig) + if ok && !(from.Kind() == reflect.Map && from.IsNil()) && (!fmOk || len(fm) > 0) { + for _, migrator := range m.Migrations() { + mOpts := *opts + mOpts.IgnoreUnused = true + if err := internal.Decode(from.Interface(), &migrator, mOpts, false); err != nil { + // An error decoding likely means this migrator's schema + // doesn't match the input. + continue + } + + migrated, err := migrator.Migrate(toPtr) + + if err != nil { + return nil, fmt.Errorf("failed to migrate config using %T: %w", migrator, err) + } + + if migrated { + switch reflect.TypeOf(migrator).Kind() { + case reflect.Map: + // Empty out the map. + return toPtr, nil + case reflect.Struct: + mVal := reflect.TypeOf(migrator) + numFields := mVal.NumField() + + fm, ok := from.Interface().(map[string]interface{}) + if !ok { + return toPtr, nil + } + + for i := range numFields { + field := mVal.Field(i) + tag, ok := field.Tag.Lookup("mapstructure") + + if !ok { + continue + } + + name := strings.Split(tag, ",")[0] + delete(fm, name) + } + } + + } + } + } + + unmarshaler, ok := toPtr.(ScalarUnmarshaler) + if !ok { + return from.Interface(), nil + } + + if to.Addr().IsNil() { + unmarshaler = reflect.New(to.Type()).Interface().(ScalarUnmarshaler) + } + + resultVal := reflect.New(reflect.TypeOf(unmarshaler.ScalarType())) + + // If the inner condition matches, the user specified `null` for this + // value, which was translated to a nil map by mapstructure. + if !(from.Kind() == reflect.Map && from.IsNil()) { + if err := internal.Decode(from.Interface(), resultVal.Interface(), *opts, false); err != nil { + return nil, err + } + + if err := unmarshaler.UnmarshalScalar(resultVal.Elem().Interface()); err != nil { + return nil, err + } + } else { + if err := unmarshaler.UnmarshalScalar(nil); err != nil { + return nil, err + } + } + + return unmarshaler, nil + }) +} + +// safeWrapDecodeHookFunc wraps a DecodeHookFuncValue to ensure fromVal is a valid `reflect.Value` +// object and therefore it is safe to call `reflect.Value` methods on fromVal. +// +// Use this only if the hook does not need to be called on untyped nil values. +// Typed nil values are safe to call and will be passed to the hook. +// See https://github.com/golang/go/issues/51649 +func safeWrapDecodeHookFunc( + f mapstructure.DecodeHookFuncValue, +) mapstructure.DecodeHookFuncValue { + return func(fromVal, toVal reflect.Value) (any, error) { + if !fromVal.IsValid() { + return nil, nil + } + return f(fromVal, toVal) + } +} diff --git a/confmap/xconfmap/scalarunmarshaler_test.go b/confmap/xconfmap/scalarunmarshaler_test.go new file mode 100644 index 000000000000..2cf9e45a10d3 --- /dev/null +++ b/confmap/xconfmap/scalarunmarshaler_test.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xconfmap + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestUnmarshalConfig(t *testing.T) { + wantCfg := &testConfig{ + Tma: textMarshalerAlias("test"), + Ntma: nonTextMarshalerAlias("test"), + Implint: wrapperType[int]{inner: 1}, + Implstr: wrapperType[string]{inner: "test"}, + Impltms: wrapperType[textMarshalerStruct]{inner: textMarshalerStruct{id: 0, data: []byte{81}}}, + Recursive: wrapperType[wrapperType[textMarshalerStruct]]{inner: wrapperType[textMarshalerStruct]{inner: textMarshalerStruct{id: 0, data: []byte{80}}}}, + } + + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + cfg := &testConfig{} + require.NoError(t, cm.Unmarshal(cfg, WithScalarUnmarshaler())) + + require.Equal(t, wantCfg, cfg) +} diff --git a/confmap/xconfmap/testdata/config.yaml b/confmap/xconfmap/testdata/config.yaml new file mode 100644 index 000000000000..e04b9585db9b --- /dev/null +++ b/confmap/xconfmap/testdata/config.yaml @@ -0,0 +1,11 @@ +impl_int: 1 +impl_non_text_marshaler_struct: {} +impl_str: test +impl_text_marshaler_struct: Q +non_impl_int: {} +non_impl_non_text_marshaler_struct: {} +non_impl_str: {} +non_impl_text_marshaler_struct: {} +non_text_marshaler_alias: test +text_marshaler_alias: test +recursive: P \ No newline at end of file diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 147ff280a7ea..3b4d98e8fd87 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -210,8 +210,8 @@ func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Reque o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - if cfg.StorageID != nil && set.Encoding == nil { - return errors.New("`Settings.Encoding` must not be nil when persistent queue is enabled") + if cfg.StorageID.HasValue() && set.Encoding == nil { + return errors.New("`QueueBatchSettings.Encoding` must not be nil when persistent queue is enabled") } o.queueBatchSettings = set o.queueCfg = cfg diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 5a5da91a9f8a..14b3d909e199 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" @@ -53,8 +54,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) { require.Error(t, err) qCfg := NewDefaultQueueConfig() - storageID := component.NewID(component.MustNewType("test")) - qCfg.StorageID = &storageID + qCfg.StorageID = configoptional.Some(component.MustNewID("test")) _, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport, WithQueueBatchSettings(newFakeQueueBatch()), WithRetry(configretry.NewDefaultBackOffConfig()), diff --git a/exporter/exporterhelper/internal/queue/persistent_queue.go b/exporter/exporterhelper/internal/queue/persistent_queue.go index a4ddf2e330b5..09e62eba60a5 100644 --- a/exporter/exporterhelper/internal/queue/persistent_queue.go +++ b/exporter/exporterhelper/internal/queue/persistent_queue.go @@ -103,7 +103,7 @@ func newPersistentQueue[T request.Request](set Settings[T]) readableQueue[T] { activeSizer: request.NewSizer(set.SizerType), itemsSizer: request.NewItemsSizer(), bytesSizer: request.NewBytesSizer(), - storageID: *set.StorageID, + storageID: *set.StorageID.Get(), id: set.ID, signal: set.Signal, blockOnOverflow: set.BlockOnOverflow, diff --git a/exporter/exporterhelper/internal/queue/persistent_queue_test.go b/exporter/exporterhelper/internal/queue/persistent_queue_test.go index 7534c414f9fc..87177c2db255 100644 --- a/exporter/exporterhelper/internal/queue/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queue/persistent_queue_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" @@ -255,8 +256,7 @@ func newSettings(sizerType request.SizerType, capacity int64) Settings[intReques func newSettingsWithStorage(sizerType request.SizerType, capacity int64) Settings[intRequest] { set := newSettings(sizerType, capacity) - storageID := component.ID{} - set.StorageID = &storageID + set.StorageID = configoptional.Some(component.ID{}) return set } @@ -537,8 +537,7 @@ func TestInvalidStorageExtensionType(t *testing.T) { } func TestPersistentQueue_StopAfterBadStart(t *testing.T) { - storageID := component.ID{} - pq := newPersistentQueue[intRequest](Settings[intRequest]{StorageID: &storageID}) + pq := newPersistentQueue[intRequest](Settings[intRequest]{StorageID: configoptional.Some(component.ID{})}) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/internal/queue/queue.go b/exporter/exporterhelper/internal/queue/queue.go index 8e6b2625192c..6749be401593 100644 --- a/exporter/exporterhelper/internal/queue/queue.go +++ b/exporter/exporterhelper/internal/queue/queue.go @@ -8,6 +8,7 @@ import ( "errors" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/pipeline" ) @@ -71,7 +72,7 @@ type Settings[T request.Request] struct { WaitForResult bool BlockOnOverflow bool Signal pipeline.Signal - StorageID *component.ID + StorageID configoptional.Optional[component.ID] ReferenceCounter ReferenceCounter[T] Encoding Encoding[T] ID component.ID @@ -90,7 +91,7 @@ func NewQueue[T request.Request](set Settings[T], next ConsumeFunc[T]) (Queue[T] func newBaseQueue[T request.Request](set Settings[T]) readableQueue[T] { // Configure memory queue or persistent based on the config. - if set.StorageID == nil { + if !set.StorageID.HasValue() { return newMemoryQueue[T](set) } diff --git a/exporter/exporterhelper/internal/queuebatch/config.go b/exporter/exporterhelper/internal/queuebatch/config.go index ecef3ce3040a..f8689ed41e56 100644 --- a/exporter/exporterhelper/internal/queuebatch/config.go +++ b/exporter/exporterhelper/internal/queuebatch/config.go @@ -11,45 +11,46 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/xconfmap" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" ) // Config defines configuration for queueing and batching incoming requests. type Config struct { // Enabled indicates whether to not enqueue and batch before exporting. - Enabled bool `mapstructure:"enabled"` + Enabled bool `mapstructure:"enabled,omitempty"` // WaitForResult determines if incoming requests are blocked until the request is processed or not. // Currently, this option is not available when persistent queue is configured using the storage configuration. - WaitForResult bool `mapstructure:"wait_for_result"` + WaitForResult bool `mapstructure:"wait_for_result,omitempty"` // Sizer determines the type of size measurement used by this component. // It accepts "requests", "items", or "bytes". - Sizer request.SizerType `mapstructure:"sizer"` + Sizer request.SizerType `mapstructure:"sizer,omitempty"` // QueueSize represents the maximum data size allowed for concurrent storage and processing. - QueueSize int64 `mapstructure:"queue_size"` + QueueSize int `mapstructure:"queue_size,omitempty"` // BlockOnOverflow determines the behavior when the component's TotalSize limit is reached. // If true, the component will wait for space; otherwise, operations will immediately return a retryable error. - BlockOnOverflow bool `mapstructure:"block_on_overflow"` + BlockOnOverflow bool `mapstructure:"block_on_overflow,omitempty"` - // StorageID if not empty, enables the persistent storage and uses the component specified + // StorageID, if not empty, enables the persistent storage and uses the component specified // as a storage extension for the persistent queue. // TODO: This will be changed to Optional when available. // See https://github.com/open-telemetry/opentelemetry-collector/issues/13822 - StorageID *component.ID `mapstructure:"storage"` + StorageID configoptional.Optional[component.ID] `mapstructure:"storage"` // NumConsumers is the maximum number of concurrent consumers from the queue. // This applies across all different optional configurations from above (e.g. wait_for_result, block_on_overflow, storage, etc.). NumConsumers int `mapstructure:"num_consumers"` // BatchConfig it configures how the requests are consumed from the queue and batch together during consumption. - Batch configoptional.Optional[BatchConfig] `mapstructure:"batch"` + Batch configoptional.Optional[BatchConfig] `mapstructure:"batch,omitempty"` } func (cfg *Config) Unmarshal(conf *confmap.Conf) error { - if err := conf.Unmarshal(cfg); err != nil { + if err := conf.Unmarshal(cfg, xconfmap.WithScalarUnmarshaler()); err != nil { return err } @@ -79,13 +80,13 @@ func (cfg *Config) Validate() error { } // Only support request sizer for persistent queue at this moment. - if cfg.StorageID != nil && cfg.WaitForResult { + if cfg.StorageID.HasValue() && cfg.WaitForResult { return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`") } if cfg.Batch.HasValue() && cfg.Batch.Get().Sizer == cfg.Sizer { // Avoid situations where the queue is not able to hold any data. - if cfg.Batch.Get().MinSize > cfg.QueueSize { + if cfg.Batch.Get().MinSize > int64(cfg.QueueSize) { return errors.New("`min_size` must be less than or equal to `queue_size`") } } diff --git a/exporter/exporterhelper/internal/queuebatch/config_test.go b/exporter/exporterhelper/internal/queuebatch/config_test.go index f55f7bcb8ff6..83aef849e355 100644 --- a/exporter/exporterhelper/internal/queuebatch/config_test.go +++ b/exporter/exporterhelper/internal/queuebatch/config_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configoptional" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/confmap/xconfmap" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" @@ -33,14 +34,13 @@ func TestConfig_Validate(t *testing.T) { cfg.QueueSize = 0 require.EqualError(t, xconfmap.Validate(cfg), "`queue_size` must be positive") - storageID := component.MustNewID("test") cfg = newTestConfig() cfg.WaitForResult = true - cfg.StorageID = &storageID + cfg.StorageID = configoptional.Some(component.MustNewID("test")) require.EqualError(t, xconfmap.Validate(cfg), "`wait_for_result` is not supported with a persistent queue configured with `storage`") cfg = newTestConfig() - cfg.QueueSize = cfg.Batch.Get().MinSize - 1 + cfg.QueueSize = int(cfg.Batch.Get().MinSize) - 1 require.EqualError(t, xconfmap.Validate(cfg), "`min_size` must be less than or equal to `queue_size`") cfg = newTestConfig() @@ -86,6 +86,42 @@ func TestBatchConfig_Validate(t *testing.T) { require.EqualError(t, xconfmap.Validate(cfg), "`max_size` (1024) must be greater or equal to `min_size` (2048)") } +func TestLoadConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + + disk := component.MustNewIDWithName("disk", "") + + wantCfg := &Config{ + QueueSize: 1000, + Enabled: true, + NumConsumers: 10, + StorageID: configoptional.Some(disk), + } + defaultConfig := &Config{} + + require.NoError(t, cm.Unmarshal(defaultConfig, xconfmap.WithScalarUnmarshaler())) + assert.Equal(t, wantCfg, defaultConfig) +} + +func TestMarshalConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + + disk := component.MustNewIDWithName("disk", "") + + conf := confmap.New() + cfg := &Config{ + QueueSize: 1000, + Enabled: true, + NumConsumers: 10, + StorageID: configoptional.Some(disk), + } + + require.NoError(t, conf.Marshal(cfg, xconfmap.WithScalarMarshaler())) + assert.Equal(t, cm.ToStringMap(), conf.ToStringMap()) +} + func newTestBatchConfig() BatchConfig { return BatchConfig{ FlushTimeout: 200 * time.Millisecond, diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch.go b/exporter/exporterhelper/internal/queuebatch/queue_batch.go index 2ca4b3a31deb..40446c99ab42 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch.go @@ -58,7 +58,7 @@ func NewQueueBatch( q, err := queue.NewQueue(queue.Settings[request.Request]{ SizerType: cfg.Sizer, - Capacity: cfg.QueueSize, + Capacity: int64(cfg.QueueSize), NumConsumers: cfg.NumConsumers, WaitForResult: cfg.WaitForResult, BlockOnOverflow: cfg.BlockOnOverflow, diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go index 0fcaf70ec1f5..c00120bcafe8 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go @@ -151,7 +151,7 @@ func TestQueueBatchDifferentSizers(t *testing.T) { func TestQueueBatchPersistenceEnabled(t *testing.T) { cfg := newTestConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - cfg.StorageID = &storageID + cfg.StorageID = configoptional.Some(storageID) qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sendertest.NewNopSenderFunc[request.Request]()) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestQueueBatchPersistenceEnabledStorageError(t *testing.T) { storageError := errors.New("could not get storage client") cfg := newTestConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - cfg.StorageID = &storageID + cfg.StorageID = configoptional.Some(storageID) qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sendertest.NewNopSenderFunc[request.Request]()) require.NoError(t, err) @@ -184,7 +184,7 @@ func TestQueueBatchPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { cfg := newTestConfig() cfg.NumConsumers = 1 storageID := component.MustNewIDWithName("file_storage", "storage") - cfg.StorageID = &storageID + cfg.StorageID = configoptional.Some(storageID) mockReq := &requesttest.FakeRequest{Items: 2} qSet := newFakeRequestSettings() diff --git a/exporter/exporterhelper/internal/queuebatch/testdata/config.yaml b/exporter/exporterhelper/internal/queuebatch/testdata/config.yaml new file mode 100644 index 000000000000..ebec5826c61d --- /dev/null +++ b/exporter/exporterhelper/internal/queuebatch/testdata/config.yaml @@ -0,0 +1,4 @@ +queue_size: 1000 +enabled: true +num_consumers: 10 +storage: disk \ No newline at end of file diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 132712b567f7..410ea74f0705 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" @@ -101,7 +102,7 @@ func TestLogs_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID + qCfg.StorageID = configoptional.Some(storageID) set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index dc559d9f9f31..fa448eaf02bb 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" @@ -101,7 +102,7 @@ func TestMetrics_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID + qCfg.StorageID = configoptional.Some(storageID) set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_metrics", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index d4cb14c3b2f7..5320f09aefd3 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" @@ -103,7 +104,7 @@ func TestTraces_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID + qCfg.StorageID = configoptional.Some(storageID) set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/xexporterhelper/go.mod b/exporter/exporterhelper/xexporterhelper/go.mod index da0dc2192ee6..5099c3ccddfe 100644 --- a/exporter/exporterhelper/xexporterhelper/go.mod +++ b/exporter/exporterhelper/xexporterhelper/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.45.0 go.opentelemetry.io/collector/component/componenttest v0.139.0 + go.opentelemetry.io/collector/config/configoptional v1.45.0 go.opentelemetry.io/collector/consumer v1.45.0 go.opentelemetry.io/collector/consumer/consumererror v0.139.0 go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.139.0 @@ -46,7 +47,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/client v1.45.0 // indirect - go.opentelemetry.io/collector/config/configoptional v1.45.0 // indirect go.opentelemetry.io/collector/config/configretry v1.45.0 // indirect go.opentelemetry.io/collector/confmap v1.45.0 // indirect go.opentelemetry.io/collector/confmap/xconfmap v0.139.0 // indirect diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index a48d5a760758..5e097cfd7ccf 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumererror/xconsumererror" @@ -171,7 +172,7 @@ func TestProfiles_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := exporterhelper.NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID + qCfg.StorageID = configoptional.Some(storageID) set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{