diff --git a/collector/config/config.go b/collector/config/config.go index 61aaf3de2..220e1908f 100644 --- a/collector/config/config.go +++ b/collector/config/config.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "runtime" + "strings" "time" "go.opentelemetry.io/ebpf-profiler/internal/linux" @@ -18,6 +19,27 @@ const ( MaxArgMapScaleFactor = 8 ) +// ErrorMode controls how the profiler receiver handles startup errors. +type ErrorMode string + +const ( + // IgnoreError means startup errors are logged but not returned to the collector. + IgnoreError ErrorMode = "ignore" + // PropagateError means startup errors are returned to the collector (default). + PropagateError ErrorMode = "propagate" +) + +func (e *ErrorMode) UnmarshalText(text []byte) error { + str := ErrorMode(strings.ToLower(string(text))) + switch str { + case IgnoreError, PropagateError: + *e = str + return nil + default: + return fmt.Errorf("unknown error mode %q", str) + } +} + // Config is the configuration for the collector. type Config struct { ReporterInterval time.Duration `mapstructure:"reporter_interval"` @@ -40,11 +62,16 @@ type Config struct { NoKernelVersionCheck bool `mapstructure:"no_kernel_version_check"` MaxGRPCRetries uint32 `mapstructure:"max_grpc_retries"` MaxRPCMsgSize int `mapstructure:"max_rpc_msg_size"` + ErrorMode ErrorMode `mapstructure:"error_mode"` } // Validate validates the config. // This is automatically called by the config parser as it implements the xconfmap.Validator interface. func (cfg *Config) Validate() error { + if cfg.ErrorMode != IgnoreError && cfg.ErrorMode != PropagateError { + return fmt.Errorf("unknown error mode %q", cfg.ErrorMode) + } + if cfg.SamplesPerSecond < 1 { return fmt.Errorf("invalid sampling frequency: %d", cfg.SamplesPerSecond) } diff --git a/collector/config/config_test.go b/collector/config/config_test.go index 63b27b237..441fcdefa 100644 --- a/collector/config/config_test.go +++ b/collector/config/config_test.go @@ -5,16 +5,111 @@ package config // import "go.opentelemetry.io/ebpf-profiler/collector/config" import ( "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/confmap/xconfmap" ) +// validConfig returns a config with valid defaults for testing. +func validConfig() *Config { + return &Config{ + SamplesPerSecond: 20, + ProbabilisticInterval: 1 * time.Minute, + ProbabilisticThreshold: 100, + NoKernelVersionCheck: true, + } +} + func TestValidate(t *testing.T) { cfg := &Config{ SamplesPerSecond: 0, + ErrorMode: PropagateError, } err := xconfmap.Validate(cfg) require.Error(t, err) require.Equal(t, "invalid sampling frequency: 0", err.Error()) } + +func TestUnmarshalText(t *testing.T) { + for _, tt := range []struct { + name string + input string + want ErrorMode + wantErr bool + }{ + { + name: "ignore", + input: "ignore", + want: IgnoreError, + }, + { + name: "propagate", + input: "propagate", + want: PropagateError, + }, + { + name: "case insensitive", + input: "IGNORE", + want: IgnoreError, + }, + { + name: "invalid value", + input: "INVALID", + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + var e ErrorMode + err := e.UnmarshalText([]byte(tt.input)) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tt.want, e) + }) + } +} + +func TestValidateErrorMode(t *testing.T) { + for _, tt := range []struct { + name string + errorMode ErrorMode + want ErrorMode + wantErr bool + }{ + { + name: "empty error mode is invalid", + errorMode: "", + wantErr: true, + }, + { + name: "ignore is valid", + errorMode: IgnoreError, + want: IgnoreError, + }, + { + name: "propagate is valid", + errorMode: PropagateError, + want: PropagateError, + }, + { + name: "invalid error mode", + errorMode: "INVALID", + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + cfg := validConfig() + cfg.ErrorMode = tt.errorMode + err := xconfmap.Validate(cfg) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tt.want, cfg.ErrorMode) + }) + } +} diff --git a/collector/factory.go b/collector/factory.go index 86c697585..35f5271be 100644 --- a/collector/factory.go +++ b/collector/factory.go @@ -40,5 +40,6 @@ func defaultConfig() component.Config { ClockSyncInterval: 3 * time.Minute, MaxGRPCRetries: 5, MaxRPCMsgSize: 32 << 20, // 32 MiB, + ErrorMode: config.PropagateError, } } diff --git a/collector/internal/controller.go b/collector/internal/controller.go index 2cf25bbd9..b76eabee6 100644 --- a/collector/internal/controller.go +++ b/collector/internal/controller.go @@ -9,8 +9,9 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/receiver" - + "go.opentelemetry.io/ebpf-profiler/collector/config" "go.opentelemetry.io/ebpf-profiler/internal/controller" + "go.opentelemetry.io/ebpf-profiler/internal/log" "go.opentelemetry.io/ebpf-profiler/metrics" "go.opentelemetry.io/ebpf-profiler/reporter" "go.opentelemetry.io/ebpf-profiler/times" @@ -22,10 +23,11 @@ const ( ) // Controller is a bridge between the Collector's [receiverprofiles.Profiles] -// interface and our [internal.Controller] +// interface and our [internal.Controller]. type Controller struct { ctlr *controller.Controller onShutdown func() error + errorMode config.ErrorMode } func NewController(cfg *controller.Config, rs receiver.Settings, @@ -64,12 +66,21 @@ func NewController(cfg *controller.Config, rs receiver.Settings, return &Controller{ onShutdown: cfg.OnShutdown, ctlr: controller.New(cfg), + errorMode: cfg.ErrorMode, }, nil } // Start starts the receiver. func (c *Controller) Start(ctx context.Context, _ component.Host) error { - return c.ctlr.Start(ctx) + if err := c.ctlr.Start(ctx); err != nil { + if c.errorMode == config.IgnoreError { + c.ctlr.Shutdown() + log.Errorf("eBPF profiler receiver failed, continuing without profiling: %v", err) + return nil + } + return err + } + return nil } // Shutdown stops the receiver. diff --git a/collector/start_test.go b/collector/start_test.go new file mode 100644 index 000000000..cd0128905 --- /dev/null +++ b/collector/start_test.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build linux && (amd64 || arm64) + +package collector + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/receiver/receivertest" + + "go.opentelemetry.io/ebpf-profiler/collector/config" + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/ebpf-profiler/reporter" + "go.opentelemetry.io/ebpf-profiler/reporter/samples" +) + +// dummyReporter is a no-op reporter for testing. +type dummyReporter struct{} + +func (d *dummyReporter) Start(context.Context) error { return fmt.Errorf("dummy error") } +func (d *dummyReporter) Stop() {} +func (d *dummyReporter) ReportTraceEvent(*libpf.Trace, *samples.TraceEventMeta) error { return nil } + +// TestStartErrorMode tests the error_mode config option on controller Start(). +// dummyReporter.Start() always returns an error to simulate startup failure. +func TestStartErrorMode(t *testing.T) { + dummyFactory := func(_ *reporter.Config, _ xconsumer.Profiles) (reporter.Reporter, error) { + return &dummyReporter{}, nil + } + + for _, tt := range []struct { + name string + errorMode config.ErrorMode + wantErr bool + }{ + { + name: "propagate returns error", + errorMode: config.PropagateError, + wantErr: true, + }, + { + name: "ignore returns nil", + errorMode: config.IgnoreError, + wantErr: false, + }, + } { + t.Run(tt.name, func(t *testing.T) { + cfg := defaultConfig().(*config.Config) + cfg.ErrorMode = tt.errorMode + cfg.NoKernelVersionCheck = true + + typ, err := component.NewType("test") + require.NoError(t, err) + + recv, err := BuildProfilesReceiver( + WithReporterFactory(dummyFactory), + )( + t.Context(), + receivertest.NewNopSettings(typ), + cfg, + consumertest.NewNop(), + ) + require.NoError(t, err) + + err = recv.Start(t.Context(), componenttest.NewNopHost()) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +}