From df6f8fddccb93fa173235cc99e6d1da250a530cb Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Thu, 29 Feb 2024 07:42:33 -0500 Subject: [PATCH] [otelcol] Allow passing confmap.Providers to otelcol.Collector (#9228) **Description:** One way to work toward https://github.com/open-telemetry/opentelemetry-collector/issues/4759. This implements the second approach that I've outlined here: https://github.com/open-telemetry/opentelemetry-collector/issues/4759#issuecomment-1863082504. I think the main advantage of this approach is that it cleans up the API for the package. `otelcol.ConfigProvider` is a fairly thin wrapper around `confmap.Resolver`, so I think we could ultimately remove the interface, and any custom functionality for config merging or unmarshaling could be exposed to users through settings rather through a custom implementation. **Link to tracking Issue:** Works toward https://github.com/open-telemetry/opentelemetry-collector/issues/4759 **Testing:** Unit tests **Documentation:** Added Godoc comments. --- .chloggen/pass-confmap-providers.yaml | 25 +++++ otelcol/collector.go | 30 +++-- otelcol/collector_test.go | 154 +++++++++++++------------- otelcol/collector_windows.go | 6 +- otelcol/command.go | 33 ++++-- otelcol/command_test.go | 59 +++++++++- 6 files changed, 205 insertions(+), 102 deletions(-) create mode 100755 .chloggen/pass-confmap-providers.yaml diff --git a/.chloggen/pass-confmap-providers.yaml b/.chloggen/pass-confmap-providers.yaml new file mode 100755 index 00000000000..89df0c70ec2 --- /dev/null +++ b/.chloggen/pass-confmap-providers.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otelcol + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `ConfigProviderSettings` to `CollectorSettings` + +# One or more tracking issues or pull requests related to the change +issues: [4759] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This allows passing a custom list of `confmap.Provider`s to `otelcol.NewCommand`. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/otelcol/collector.go b/otelcol/collector.go index 1e369bdaad3..3fd5cb2ca31 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -7,7 +7,6 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( "context" - "errors" "fmt" "os" "os/signal" @@ -66,10 +65,16 @@ type CollectorSettings struct { // and manually handle the signals to shutdown the collector. DisableGracefulShutdown bool + // Deprecated: [v0.95.0] Use ConfigProviderSettings instead. // ConfigProvider provides the service configuration. // If the provider watches for configuration change, collector may reload the new configuration upon changes. ConfigProvider ConfigProvider + // ConfigProviderSettings allows configuring the way the Collector retrieves its configuration + // The Collector will reload based on configuration changes from the ConfigProvider if any + // confmap.Providers watch for configuration changes. + ConfigProviderSettings ConfigProviderSettings + // LoggingOptions provides a way to change behavior of zap logging. LoggingOptions []zap.Option @@ -92,6 +97,8 @@ type CollectorSettings struct { type Collector struct { set CollectorSettings + configProvider ConfigProvider + service *service.Service state *atomic.Int32 @@ -105,8 +112,14 @@ type Collector struct { // NewCollector creates and returns a new instance of Collector. func NewCollector(set CollectorSettings) (*Collector, error) { - if set.ConfigProvider == nil { - return nil, errors.New("invalid nil config provider") + var err error + configProvider := set.ConfigProvider + + if configProvider == nil { + configProvider, err = NewConfigProvider(set.ConfigProviderSettings) + if err != nil { + return nil, err + } } state := &atomic.Int32{} @@ -119,6 +132,7 @@ func NewCollector(set CollectorSettings) (*Collector, error) { // the number of signals getting notified on is recommended. signalsChannel: make(chan os.Signal, 3), asyncErrorChannel: make(chan error), + configProvider: configProvider, }, nil } @@ -146,7 +160,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { var conf *confmap.Conf - if cp, ok := col.set.ConfigProvider.(ConfmapProvider); ok { + if cp, ok := col.configProvider.(ConfmapProvider); ok { var err error conf, err = cp.GetConfmap(ctx) @@ -159,7 +173,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to initialize factories: %w", err) } - cfg, err := col.set.ConfigProvider.Get(ctx, factories) + cfg, err := col.configProvider.Get(ctx, factories) if err != nil { return fmt.Errorf("failed to get config: %w", err) } @@ -215,7 +229,7 @@ func (col *Collector) DryRun(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to initialize factories: %w", err) } - cfg, err := col.set.ConfigProvider.Get(ctx, factories) + cfg, err := col.configProvider.Get(ctx, factories) if err != nil { return fmt.Errorf("failed to get config: %w", err) } @@ -243,7 +257,7 @@ func (col *Collector) Run(ctx context.Context) error { LOOP: for { select { - case err := <-col.set.ConfigProvider.Watch(): + case err := <-col.configProvider.Watch(): if err != nil { col.service.Logger().Error("Config watch failed", zap.Error(err)) break LOOP @@ -280,7 +294,7 @@ func (col *Collector) shutdown(ctx context.Context) error { // Accumulate errors and proceed with shutting down remaining components. var errs error - if err := col.set.ConfigProvider.Shutdown(ctx); err != nil { + if err := col.configProvider.Shutdown(ctx); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err)) } diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index bf7684670bf..c56d56fc728 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -18,7 +18,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/confmap/converter/expandconverter" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/processor/processortest" ) @@ -32,13 +31,10 @@ func TestStateString(t *testing.T) { } func TestCollectorStartAsGoRoutine(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})) - require.NoError(t, err) - set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}), } col, err := NewCollector(set) require.NoError(t, err) @@ -56,13 +52,10 @@ func TestCollectorStartAsGoRoutine(t *testing.T) { } func TestCollectorCancelContext(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})) - require.NoError(t, err) - set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}), } col, err := NewCollector(set) require.NoError(t, err) @@ -119,13 +112,10 @@ func TestCollectorStateAfterConfigChange(t *testing.T) { } func TestCollectorReportError(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})) - require.NoError(t, err) - col, err := NewCollector(CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}), }) require.NoError(t, err) @@ -235,13 +225,10 @@ func TestComponentStatusWatcher(t *testing.T) { } func TestCollectorSendSignal(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})) - require.NoError(t, err) - col, err := NewCollector(CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}), }) require.NoError(t, err) @@ -266,13 +253,10 @@ func TestCollectorSendSignal(t *testing.T) { func TestCollectorFailedShutdown(t *testing.T) { t.Skip("This test was using telemetry shutdown failure, switch to use a component that errors on shutdown.") - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})) - require.NoError(t, err) - col, err := NewCollector(CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}), }) require.NoError(t, err) @@ -294,16 +278,49 @@ func TestCollectorFailedShutdown(t *testing.T) { } func TestCollectorStartInvalidConfig(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")})) + col, err := NewCollector(CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}), + }) require.NoError(t, err) + assert.Error(t, col.Run(context.Background())) +} + +func TestNewCollectorInvalidConfigProviderSettings(t *testing.T) { + _, err := NewCollector(CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: ConfigProviderSettings{}, + }) + require.Error(t, err) +} + +func TestNewCollectorUseConfig(t *testing.T) { + set := newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}) col, err := NewCollector(CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: set, }) require.NoError(t, err) - assert.Error(t, col.Run(context.Background())) + require.NotNil(t, col.configProvider) +} + +func TestNewCollectorValidatesResolverSettings(t *testing.T) { + set := ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{filepath.Join("testdata", "otelcol-nop.yaml")}, + }, + } + + _, err := NewCollector(CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: set, + }) + require.Error(t, err) } func TestCollectorStartWithTraceContextPropagation(t *testing.T) { @@ -318,13 +335,10 @@ func TestCollectorStartWithTraceContextPropagation(t *testing.T) { for _, tt := range tests { t.Run(tt.file, func(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", tt.file)})) - require.NoError(t, err) - set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", tt.file)}), } col, err := NewCollector(set) @@ -353,13 +367,10 @@ func TestCollectorRun(t *testing.T) { for _, tt := range tests { t.Run(tt.file, func(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", tt.file)})) - require.NoError(t, err) - set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", tt.file)}), } col, err := NewCollector(set) require.NoError(t, err) @@ -374,13 +385,10 @@ func TestCollectorRun(t *testing.T) { } func TestCollectorShutdownBeforeRun(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})) - require.NoError(t, err) - set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}), } col, err := NewCollector(set) require.NoError(t, err) @@ -396,14 +404,11 @@ func TestCollectorShutdownBeforeRun(t *testing.T) { } func TestCollectorClosedStateOnStartUpError(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")})) - require.NoError(t, err) - // Load a bad config causing startup to fail set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}), } col, err := NewCollector(set) require.NoError(t, err) @@ -416,14 +421,11 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) { } func TestCollectorDryRun(t *testing.T) { - cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")})) - require.NoError(t, err) - // Load a bad config causing startup to fail set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}), } col, err := NewCollector(set) require.NoError(t, err) @@ -432,19 +434,15 @@ func TestCollectorDryRun(t *testing.T) { } func TestPassConfmapToServiceFailure(t *testing.T) { - cfgProvider, err := NewConfigProvider(ConfigProviderSettings{ - ResolverSettings: confmap.ResolverSettings{ - URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")}, - Providers: makeMapProvidersMap(newFailureProvider()), - Converters: []confmap.Converter{expandconverter.New(confmap.ConverterSettings{})}, - }, - }) - require.NoError(t, err) - set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: nopFactories, - ConfigProvider: cfgProvider, + BuildInfo: component.NewDefaultBuildInfo(), + Factories: nopFactories, + ConfigProviderSettings: ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")}, + Providers: makeMapProvidersMap(newFailureProvider()), + }, + }, } col, err := NewCollector(set) require.NoError(t, err) diff --git a/otelcol/collector_windows.go b/otelcol/collector_windows.go index eb2d3839d9d..f443d930b17 100644 --- a/otelcol/collector_windows.go +++ b/otelcol/collector_windows.go @@ -87,7 +87,11 @@ func (s *windowsService) start(elog *eventlog.Log, colErrorChannel chan error) e } var err error - s.col, err = newCollectorWithFlags(s.settings, s.flags) + err = updateSettingsUsingFlags(&s.settings, s.flags) + if err != nil { + return err + } + s.col, err = NewCollector(s.settings) if err != nil { return err } diff --git a/otelcol/command.go b/otelcol/command.go index e9bc573c746..f510687c31b 100644 --- a/otelcol/command.go +++ b/otelcol/command.go @@ -13,6 +13,9 @@ import ( ) // NewCommand constructs a new cobra.Command using the given CollectorSettings. +// Any URIs specified in CollectorSettings.ConfigProviderSettings.ResolverSettings.URIs +// are considered defaults and will be overwritten by config flags passed as +// command-line arguments to the executable. func NewCommand(set CollectorSettings) *cobra.Command { flagSet := flags(featuregate.GlobalRegistry()) rootCmd := &cobra.Command{ @@ -20,7 +23,12 @@ func NewCommand(set CollectorSettings) *cobra.Command { Version: set.BuildInfo.Version, SilenceUsage: true, RunE: func(cmd *cobra.Command, _ []string) error { - col, err := newCollectorWithFlags(set, flagSet) + err := updateSettingsUsingFlags(&set, flagSet) + if err != nil { + return err + } + + col, err := NewCollector(set) if err != nil { return err } @@ -33,18 +41,23 @@ func NewCommand(set CollectorSettings) *cobra.Command { return rootCmd } -func newCollectorWithFlags(set CollectorSettings, flags *flag.FlagSet) (*Collector, error) { +func updateSettingsUsingFlags(set *CollectorSettings, flags *flag.FlagSet) error { if set.ConfigProvider == nil { + resolverSet := &set.ConfigProviderSettings.ResolverSettings configFlags := getConfigFlag(flags) - if len(configFlags) == 0 { - return nil, errors.New("at least one config flag must be provided") - } - var err error - set.ConfigProvider, err = NewConfigProvider(newDefaultConfigProviderSettings(configFlags)) - if err != nil { - return nil, err + if len(configFlags) > 0 { + resolverSet.URIs = configFlags + } + if len(resolverSet.URIs) == 0 { + return errors.New("at least one config flag must be provided") + } + // Provide a default set of providers and converters if none have been specified. + // TODO: Remove this after CollectorSettings.ConfigProvider is removed and instead + // do it in the builder. + if len(resolverSet.Providers) == 0 && len(resolverSet.Converters) == 0 { + set.ConfigProviderSettings = newDefaultConfigProviderSettings(resolverSet.URIs) } } - return NewCollector(set) + return nil } diff --git a/otelcol/command_test.go b/otelcol/command_test.go index 0ef2015421a..d599441c658 100644 --- a/otelcol/command_test.go +++ b/otelcol/command_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/converter/expandconverter" "go.opentelemetry.io/collector/confmap/provider/fileprovider" + "go.opentelemetry.io/collector/featuregate" ) func TestNewCommandVersion(t *testing.T) { @@ -47,17 +48,65 @@ receivers: require.ErrorContains(t, cmd.Execute(), "invalid_component_name") } -func TestNewCommandInvalidComponent(t *testing.T) { - cfgProvider, err := NewConfigProvider( - ConfigProviderSettings{ +func TestAddFlagToSettings(t *testing.T) { + set := CollectorSettings{ + ConfigProviderSettings: ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")}, Providers: map[string]confmap.Provider{"file": fileprovider.NewWithSettings(confmap.ProviderSettings{})}, Converters: []confmap.Converter{expandconverter.New(confmap.ConverterSettings{})}, }, - }) + }, + } + flgs := flags(featuregate.NewRegistry()) + err := flgs.Parse([]string{"--config=otelcol-nop.yaml"}) + require.NoError(t, err) + + err = updateSettingsUsingFlags(&set, flgs) require.NoError(t, err) + require.Len(t, set.ConfigProviderSettings.ResolverSettings.URIs, 1) +} + +func TestAddDefaultConfmapModules(t *testing.T) { + set := CollectorSettings{ + ConfigProviderSettings: ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{}, + }, + } + flgs := flags(featuregate.NewRegistry()) + err := flgs.Parse([]string{"--config=otelcol-nop.yaml"}) + require.NoError(t, err) + + err = updateSettingsUsingFlags(&set, flgs) + require.NoError(t, err) + require.Len(t, set.ConfigProviderSettings.ResolverSettings.URIs, 1) + require.Len(t, set.ConfigProviderSettings.ResolverSettings.Converters, 1) + require.Len(t, set.ConfigProviderSettings.ResolverSettings.Providers, 5) +} + +func TestInvalidCollectorSettings(t *testing.T) { + set := CollectorSettings{ + ConfigProviderSettings: ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + Converters: []confmap.Converter{expandconverter.New(confmap.ConverterSettings{})}, + URIs: []string{"--config=otelcol-nop.yaml"}, + }, + }, + } + + cmd := NewCommand(set) + require.Error(t, cmd.Execute()) +} + +func TestNewCommandInvalidComponent(t *testing.T) { + set := ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")}, + Providers: map[string]confmap.Provider{"file": fileprovider.NewWithSettings(confmap.ProviderSettings{})}, + Converters: []confmap.Converter{expandconverter.New(confmap.ConverterSettings{})}, + }, + } - cmd := NewCommand(CollectorSettings{Factories: nopFactories, ConfigProvider: cfgProvider}) + cmd := NewCommand(CollectorSettings{Factories: nopFactories, ConfigProviderSettings: set}) require.Error(t, cmd.Execute()) }