Skip to content
Merged
27 changes: 27 additions & 0 deletions collector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"runtime"
"strings"
"time"
Comment thread
rushabh-exe marked this conversation as resolved.

"go.opentelemetry.io/ebpf-profiler/internal/linux"
Expand All @@ -18,6 +19,27 @@ const (
MaxArgMapScaleFactor = 8
)

// ErrorMode controls how the profiler receiver handles startup errors.
type ErrorMode string

const (
// IgnoreError means startup errors are logged but not returned to the collector.
IgnoreError ErrorMode = "ignore"
// PropagateError means startup errors are returned to the collector (default).
PropagateError ErrorMode = "propagate"
)

func (e *ErrorMode) UnmarshalText(text []byte) error {
Comment thread
rushabh-exe marked this conversation as resolved.
str := ErrorMode(strings.ToLower(string(text)))
switch str {
case IgnoreError, PropagateError:
*e = str
return nil
default:
return fmt.Errorf("unknown error mode %q", str)
}
}

// Config is the configuration for the collector.
type Config struct {
ReporterInterval time.Duration `mapstructure:"reporter_interval"`
Expand All @@ -40,11 +62,16 @@ type Config struct {
NoKernelVersionCheck bool `mapstructure:"no_kernel_version_check"`
MaxGRPCRetries uint32 `mapstructure:"max_grpc_retries"`
MaxRPCMsgSize int `mapstructure:"max_rpc_msg_size"`
ErrorMode ErrorMode `mapstructure:"error_mode"`
}

// Validate validates the config.
// This is automatically called by the config parser as it implements the xconfmap.Validator interface.
func (cfg *Config) Validate() error {
if cfg.ErrorMode != IgnoreError && cfg.ErrorMode != PropagateError {
return fmt.Errorf("unknown error mode %q", cfg.ErrorMode)
}

if cfg.SamplesPerSecond < 1 {
return fmt.Errorf("invalid sampling frequency: %d", cfg.SamplesPerSecond)
}
Expand Down
95 changes: 95 additions & 0 deletions collector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,111 @@ package config // import "go.opentelemetry.io/ebpf-profiler/collector/config"

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap/xconfmap"
)

// validConfig returns a config with valid defaults for testing.
func validConfig() *Config {
return &Config{
SamplesPerSecond: 20,
ProbabilisticInterval: 1 * time.Minute,
ProbabilisticThreshold: 100,
NoKernelVersionCheck: true,
}
}

func TestValidate(t *testing.T) {
cfg := &Config{
SamplesPerSecond: 0,
ErrorMode: PropagateError,
}
err := xconfmap.Validate(cfg)
require.Error(t, err)
require.Equal(t, "invalid sampling frequency: 0", err.Error())
}

func TestUnmarshalText(t *testing.T) {
for _, tt := range []struct {
name string
input string
want ErrorMode
wantErr bool
}{
{
name: "ignore",
input: "ignore",
want: IgnoreError,
},
{
name: "propagate",
input: "propagate",
want: PropagateError,
},
{
name: "case insensitive",
input: "IGNORE",
want: IgnoreError,
},
{
name: "invalid value",
input: "INVALID",
wantErr: true,
},
} {
t.Run(tt.name, func(t *testing.T) {
var e ErrorMode
err := e.UnmarshalText([]byte(tt.input))
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tt.want, e)
})
}
}

func TestValidateErrorMode(t *testing.T) {
for _, tt := range []struct {
name string
errorMode ErrorMode
want ErrorMode
wantErr bool
}{
{
name: "empty error mode is invalid",
errorMode: "",
wantErr: true,
},
{
name: "ignore is valid",
errorMode: IgnoreError,
want: IgnoreError,
},
{
name: "propagate is valid",
errorMode: PropagateError,
want: PropagateError,
},
{
name: "invalid error mode",
errorMode: "INVALID",
wantErr: true,
},
} {
t.Run(tt.name, func(t *testing.T) {
cfg := validConfig()
cfg.ErrorMode = tt.errorMode
err := xconfmap.Validate(cfg)
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tt.want, cfg.ErrorMode)
})
}
}
1 change: 1 addition & 0 deletions collector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ func defaultConfig() component.Config {
ClockSyncInterval: 3 * time.Minute,
MaxGRPCRetries: 5,
MaxRPCMsgSize: 32 << 20, // 32 MiB,
ErrorMode: config.PropagateError,
}
}
17 changes: 14 additions & 3 deletions collector/internal/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/receiver"

"go.opentelemetry.io/ebpf-profiler/collector/config"
"go.opentelemetry.io/ebpf-profiler/internal/controller"
"go.opentelemetry.io/ebpf-profiler/internal/log"
"go.opentelemetry.io/ebpf-profiler/metrics"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
Expand All @@ -22,10 +23,11 @@ const (
)

// Controller is a bridge between the Collector's [receiverprofiles.Profiles]
// interface and our [internal.Controller]
// interface and our [internal.Controller].
type Controller struct {
ctlr *controller.Controller
onShutdown func() error
errorMode config.ErrorMode
}

func NewController(cfg *controller.Config, rs receiver.Settings,
Expand Down Expand Up @@ -64,12 +66,21 @@ func NewController(cfg *controller.Config, rs receiver.Settings,
return &Controller{
onShutdown: cfg.OnShutdown,
ctlr: controller.New(cfg),
errorMode: cfg.ErrorMode,
}, nil
}

// Start starts the receiver.
func (c *Controller) Start(ctx context.Context, _ component.Host) error {
return c.ctlr.Start(ctx)
if err := c.ctlr.Start(ctx); err != nil {
if c.errorMode == config.IgnoreError {
c.ctlr.Shutdown()
log.Errorf("eBPF profiler receiver failed, continuing without profiling: %v", err)
return nil
}
Comment thread
rushabh-exe marked this conversation as resolved.
return err
}
return nil
}

// Shutdown stops the receiver.
Expand Down
82 changes: 82 additions & 0 deletions collector/start_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build linux && (amd64 || arm64)

package collector

import (
"context"
Comment thread
rushabh-exe marked this conversation as resolved.
"fmt"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/receiver/receivertest"

"go.opentelemetry.io/ebpf-profiler/collector/config"
"go.opentelemetry.io/ebpf-profiler/libpf"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/reporter/samples"
)

// dummyReporter is a no-op reporter for testing.
type dummyReporter struct{}

func (d *dummyReporter) Start(context.Context) error { return fmt.Errorf("dummy error") }
func (d *dummyReporter) Stop() {}
func (d *dummyReporter) ReportTraceEvent(*libpf.Trace, *samples.TraceEventMeta) error { return nil }

// TestStartErrorMode tests the error_mode config option on controller Start().
// dummyReporter.Start() always returns an error to simulate startup failure.
func TestStartErrorMode(t *testing.T) {
dummyFactory := func(_ *reporter.Config, _ xconsumer.Profiles) (reporter.Reporter, error) {
return &dummyReporter{}, nil
}

for _, tt := range []struct {
name string
errorMode config.ErrorMode
wantErr bool
}{
{
name: "propagate returns error",
errorMode: config.PropagateError,
wantErr: true,
},
{
name: "ignore returns nil",
errorMode: config.IgnoreError,
wantErr: false,
},
} {
t.Run(tt.name, func(t *testing.T) {
cfg := defaultConfig().(*config.Config)
cfg.ErrorMode = tt.errorMode
cfg.NoKernelVersionCheck = true

typ, err := component.NewType("test")
require.NoError(t, err)

recv, err := BuildProfilesReceiver(
WithReporterFactory(dummyFactory),
)(
t.Context(),
receivertest.NewNopSettings(typ),
cfg,
consumertest.NewNop(),
)
require.NoError(t, err)

err = recv.Start(t.Context(), componenttest.NewNopHost())
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
Loading