Skip to content

Commit

Permalink
[otelcol] Allow passing confmap.Providers to otelcol.Collector (#9228)
Browse files Browse the repository at this point in the history
**Description:**

One way to work toward
#4759.
This implements the second approach that I've outlined here:
#4759 (comment).

I think the main advantage of this approach is that it cleans up the API
for the package. `otelcol.ConfigProvider` is a fairly thin wrapper
around `confmap.Resolver`, so I think we could ultimately remove the
interface, and any custom functionality for config merging or
unmarshaling could be exposed to users through settings rather through a
custom implementation.

**Link to tracking Issue:**

Works toward
#4759

**Testing:**

Unit tests

**Documentation:**

Added Godoc comments.
  • Loading branch information
evan-bradley authored Feb 29, 2024
1 parent 62050a2 commit df6f8fd
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 102 deletions.
25 changes: 25 additions & 0 deletions .chloggen/pass-confmap-providers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otelcol

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `ConfigProviderSettings` to `CollectorSettings`

# One or more tracking issues or pull requests related to the change
issues: [4759]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This allows passing a custom list of `confmap.Provider`s to `otelcol.NewCommand`.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
30 changes: 22 additions & 8 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"context"
"errors"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -66,10 +65,16 @@ type CollectorSettings struct {
// and manually handle the signals to shutdown the collector.
DisableGracefulShutdown bool

// Deprecated: [v0.95.0] Use ConfigProviderSettings instead.
// ConfigProvider provides the service configuration.
// If the provider watches for configuration change, collector may reload the new configuration upon changes.
ConfigProvider ConfigProvider

// ConfigProviderSettings allows configuring the way the Collector retrieves its configuration
// The Collector will reload based on configuration changes from the ConfigProvider if any
// confmap.Providers watch for configuration changes.
ConfigProviderSettings ConfigProviderSettings

// LoggingOptions provides a way to change behavior of zap logging.
LoggingOptions []zap.Option

Expand All @@ -92,6 +97,8 @@ type CollectorSettings struct {
type Collector struct {
set CollectorSettings

configProvider ConfigProvider

service *service.Service
state *atomic.Int32

Expand All @@ -105,8 +112,14 @@ type Collector struct {

// NewCollector creates and returns a new instance of Collector.
func NewCollector(set CollectorSettings) (*Collector, error) {
if set.ConfigProvider == nil {
return nil, errors.New("invalid nil config provider")
var err error
configProvider := set.ConfigProvider

if configProvider == nil {
configProvider, err = NewConfigProvider(set.ConfigProviderSettings)
if err != nil {
return nil, err
}
}

state := &atomic.Int32{}
Expand All @@ -119,6 +132,7 @@ func NewCollector(set CollectorSettings) (*Collector, error) {
// the number of signals getting notified on is recommended.
signalsChannel: make(chan os.Signal, 3),
asyncErrorChannel: make(chan error),
configProvider: configProvider,
}, nil
}

Expand Down Expand Up @@ -146,7 +160,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {

var conf *confmap.Conf

if cp, ok := col.set.ConfigProvider.(ConfmapProvider); ok {
if cp, ok := col.configProvider.(ConfmapProvider); ok {
var err error
conf, err = cp.GetConfmap(ctx)

Expand All @@ -159,7 +173,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to initialize factories: %w", err)
}
cfg, err := col.set.ConfigProvider.Get(ctx, factories)
cfg, err := col.configProvider.Get(ctx, factories)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
}
Expand Down Expand Up @@ -215,7 +229,7 @@ func (col *Collector) DryRun(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to initialize factories: %w", err)
}
cfg, err := col.set.ConfigProvider.Get(ctx, factories)
cfg, err := col.configProvider.Get(ctx, factories)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
}
Expand Down Expand Up @@ -243,7 +257,7 @@ func (col *Collector) Run(ctx context.Context) error {
LOOP:
for {
select {
case err := <-col.set.ConfigProvider.Watch():
case err := <-col.configProvider.Watch():
if err != nil {
col.service.Logger().Error("Config watch failed", zap.Error(err))
break LOOP
Expand Down Expand Up @@ -280,7 +294,7 @@ func (col *Collector) shutdown(ctx context.Context) error {
// Accumulate errors and proceed with shutting down remaining components.
var errs error

if err := col.set.ConfigProvider.Shutdown(ctx); err != nil {
if err := col.configProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err))
}

Expand Down
154 changes: 76 additions & 78 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/processor/processortest"
)
Expand All @@ -32,13 +31,10 @@ func TestStateString(t *testing.T) {
}

func TestCollectorStartAsGoRoutine(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}))
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}),
}
col, err := NewCollector(set)
require.NoError(t, err)
Expand All @@ -56,13 +52,10 @@ func TestCollectorStartAsGoRoutine(t *testing.T) {
}

func TestCollectorCancelContext(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}))
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}),
}
col, err := NewCollector(set)
require.NoError(t, err)
Expand Down Expand Up @@ -119,13 +112,10 @@ func TestCollectorStateAfterConfigChange(t *testing.T) {
}

func TestCollectorReportError(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}))
require.NoError(t, err)

col, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}),
})
require.NoError(t, err)

Expand Down Expand Up @@ -235,13 +225,10 @@ func TestComponentStatusWatcher(t *testing.T) {
}

func TestCollectorSendSignal(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}))
require.NoError(t, err)

col, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}),
})
require.NoError(t, err)

Expand All @@ -266,13 +253,10 @@ func TestCollectorSendSignal(t *testing.T) {
func TestCollectorFailedShutdown(t *testing.T) {
t.Skip("This test was using telemetry shutdown failure, switch to use a component that errors on shutdown.")

cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}))
require.NoError(t, err)

col, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}),
})
require.NoError(t, err)

Expand All @@ -294,16 +278,49 @@ func TestCollectorFailedShutdown(t *testing.T) {
}

func TestCollectorStartInvalidConfig(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}))
col, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
})
require.NoError(t, err)
assert.Error(t, col.Run(context.Background()))
}

func TestNewCollectorInvalidConfigProviderSettings(t *testing.T) {
_, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: ConfigProviderSettings{},
})
require.Error(t, err)
}

func TestNewCollectorUseConfig(t *testing.T) {
set := newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")})

col, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: set,
})
require.NoError(t, err)
assert.Error(t, col.Run(context.Background()))
require.NotNil(t, col.configProvider)
}

func TestNewCollectorValidatesResolverSettings(t *testing.T) {
set := ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{filepath.Join("testdata", "otelcol-nop.yaml")},
},
}

_, err := NewCollector(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: set,
})
require.Error(t, err)
}

func TestCollectorStartWithTraceContextPropagation(t *testing.T) {
Expand All @@ -318,13 +335,10 @@ func TestCollectorStartWithTraceContextPropagation(t *testing.T) {

for _, tt := range tests {
t.Run(tt.file, func(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", tt.file)}))
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", tt.file)}),
}

col, err := NewCollector(set)
Expand Down Expand Up @@ -353,13 +367,10 @@ func TestCollectorRun(t *testing.T) {

for _, tt := range tests {
t.Run(tt.file, func(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", tt.file)}))
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", tt.file)}),
}
col, err := NewCollector(set)
require.NoError(t, err)
Expand All @@ -374,13 +385,10 @@ func TestCollectorRun(t *testing.T) {
}

func TestCollectorShutdownBeforeRun(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}))
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-nop.yaml")}),
}
col, err := NewCollector(set)
require.NoError(t, err)
Expand All @@ -396,14 +404,11 @@ func TestCollectorShutdownBeforeRun(t *testing.T) {
}

func TestCollectorClosedStateOnStartUpError(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}))
require.NoError(t, err)

// Load a bad config causing startup to fail
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
}
col, err := NewCollector(set)
require.NoError(t, err)
Expand All @@ -416,14 +421,11 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) {
}

func TestCollectorDryRun(t *testing.T) {
cfgProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}))
require.NoError(t, err)

// Load a bad config causing startup to fail
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}),
}
col, err := NewCollector(set)
require.NoError(t, err)
Expand All @@ -432,19 +434,15 @@ func TestCollectorDryRun(t *testing.T) {
}

func TestPassConfmapToServiceFailure(t *testing.T) {
cfgProvider, err := NewConfigProvider(ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")},
Providers: makeMapProvidersMap(newFailureProvider()),
Converters: []confmap.Converter{expandconverter.New(confmap.ConverterSettings{})},
},
})
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProvider: cfgProvider,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: nopFactories,
ConfigProviderSettings: ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")},
Providers: makeMapProvidersMap(newFailureProvider()),
},
},
}
col, err := NewCollector(set)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit df6f8fd

Please sign in to comment.