From c1bb6e1e469db7f9f0af1f43c86aae69ad6596f6 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 10 Feb 2025 11:28:39 -0300 Subject: [PATCH 1/2] fix(otel): ensure default processors are applied in fbreceiver --- libbeat/beat/info.go | 4 + libbeat/publisher/processing/default.go | 6 +- x-pack/filebeat/fbreceiver/receiver_test.go | 110 ++++++++++++++++++++ 3 files changed, 118 insertions(+), 2 deletions(-) diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index 7c3b5c0d90f8..ce9670a888a7 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -57,3 +57,7 @@ func (i Info) FQDNAwareHostname(useFQDN bool) string { return i.Hostname } + +func (i Info) IsOTelReceiver() bool { + return i.LogConsumer != nil +} diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 14f75fde004c..2a7f0f611b08 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -114,9 +114,11 @@ func MakeDefaultSupport( // don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those // also makes it easier to disable global processors if needed, since they're otherwise hardcoded var rawProcessors processors.PluginConfig + shouldLoadDefaultProcessors := info.IsOTelReceiver() || fleetmode.Enabled() + // don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[] - if fleetmode.Enabled() && !beatCfg.HasField("processors") { - log.Debugf("In fleet mode with no processors specified, defaulting to global processors") + if shouldLoadDefaultProcessors && !beatCfg.HasField("processors") { + log.Debugf("In fleet/otel mode with no processors specified, defaulting to global processors") rawProcessors = fleetDefaultProcessors } else { diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index 7da5c24f0adf..6e9195146fa8 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -5,11 +5,15 @@ package fbreceiver import ( + "bufio" "bytes" "context" + "strings" "testing" "time" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" @@ -91,6 +95,112 @@ found: assert.NoError(t, err, "Error shutting down filebeatreceiver") } +func TestReceiverDefaultProcessors(t *testing.T) { + config := Config{ + Beatconfig: map[string]interface{}{ + "filebeat": map[string]interface{}{ + "inputs": []map[string]interface{}{ + { + "type": "benchmark", + "enabled": true, + "message": "test", + "count": 1, + }, + }, + }, + "output": map[string]interface{}{ + "otelconsumer": map[string]interface{}{}, + }, + "logging": map[string]interface{}{ + "level": "debug", + "selectors": []string{ + "*", + }, + }, + "path.home": t.TempDir(), + }, + } + + var zapLogs bytes.Buffer + core := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), + zapcore.AddSync(&zapLogs), + zapcore.DebugLevel) + + receiverSettings := receiver.Settings{} + receiverSettings.Logger = zap.New(core) + + logsCh := make(chan []mapstr.M, 1) + logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + var logs []mapstr.M + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rl := ld.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + for k := 0; k < sl.LogRecords().Len(); k++ { + log := sl.LogRecords().At(k) + logs = append(logs, log.Body().Map().AsRaw()) + } + } + } + + logsCh <- logs + return nil + }) + assert.NoError(t, err, "Error creating log consumer") + + r, err := NewFactory().CreateLogs(context.Background(), receiverSettings, &config, logConsumer) + assert.NoErrorf(t, err, "Error creating receiver. Logs:\n %s", zapLogs.String()) + + err = r.Start(context.Background(), nil) + assert.NoError(t, err, "Error starting filebeatreceiver") + defer func() { + require.NoError(t, r.Shutdown(context.Background())) + }() + + var logs []mapstr.M + select { + case logs = <-logsCh: + case <-time.After(1 * time.Minute): + t.Fatal("timeout waiting for logs") + } + + require.Len(t, logs, 1) + t.Log("ingested log: ", logs[0]) + + scanner := bufio.NewScanner(&zapLogs) + wantKeywords := []string{ + "Generated new processors", + "add_host_metadata", + "add_cloud_metadata", + "add_docker_metadata", + "add_kubernetes_metadata", + } + + var processorsLoaded bool + for scanner.Scan() { + line := scanner.Text() + if stringContainsAll(line, wantKeywords) { + processorsLoaded = true + break + } + } + + require.True(t, processorsLoaded, "processors not loaded") + // Check that add_host_metadata works, other processors are not guaranteed to add fields in all environments + require.Contains(t, logs[0].Flatten(), "host.architecture") +} + +func stringContainsAll(s string, want []string) bool { + for _, w := range want { + if !strings.Contains(s, w) { + return false + } + } + + return true +} + func BenchmarkFactory(b *testing.B) { tmpDir := b.TempDir() From 061af6bb50c4cf20182337e8ddeb6a49d3f7f485 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 12 Feb 2025 09:30:14 -0300 Subject: [PATCH 2/2] set default processors in NewBeatReceiver --- libbeat/beat/info.go | 8 ++------ libbeat/cmd/instance/beat.go | 3 ++- libbeat/publisher/processing/default.go | 2 +- x-pack/filebeat/fbreceiver/factory.go | 2 +- x-pack/metricbeat/mbreceiver/factory.go | 2 +- 5 files changed, 7 insertions(+), 10 deletions(-) diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index ce9670a888a7..5aac06d7b893 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -46,8 +46,8 @@ type Info struct { DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance } - LogConsumer consumer.Logs // otel log consumer - + LogConsumer consumer.Logs // otel log consumer + UseDefaultProcessors bool // Whether to use the default processors } func (i Info) FQDNAwareHostname(useFQDN bool) string { @@ -57,7 +57,3 @@ func (i Info) FQDNAwareHostname(useFQDN bool) string { return i.Hostname } - -func (i Info) IsOTelReceiver() bool { - return i.LogConsumer != nil -} diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 4884b2e6f027..e16cab2f3311 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -263,7 +263,7 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func } // NewBeatReceiver creates a Beat that will be used in the context of an otel receiver -func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, consumer consumer.Logs, core zapcore.Core) (*Beat, error) { +func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*Beat, error) { b, err := NewBeat(settings.Name, settings.IndexPrefix, settings.Version, @@ -440,6 +440,7 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c return nil, fmt.Errorf("error setting index supporter: %w", err) } + b.Info.UseDefaultProcessors = useDefaultProcessors processingFactory := settings.Processing if processingFactory == nil { processingFactory = processing.MakeDefaultBeatSupport(true) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 2a7f0f611b08..2b85b10b3cc2 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -114,7 +114,7 @@ func MakeDefaultSupport( // don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those // also makes it easier to disable global processors if needed, since they're otherwise hardcoded var rawProcessors processors.PluginConfig - shouldLoadDefaultProcessors := info.IsOTelReceiver() || fleetmode.Enabled() + shouldLoadDefaultProcessors := info.UseDefaultProcessors || fleetmode.Enabled() // don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[] if shouldLoadDefaultProcessors && !beatCfg.HasField("processors") { diff --git a/x-pack/filebeat/fbreceiver/factory.go b/x-pack/filebeat/fbreceiver/factory.go index a08c4d575cbb..b3c12afb8979 100644 --- a/x-pack/filebeat/fbreceiver/factory.go +++ b/x-pack/filebeat/fbreceiver/factory.go @@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, consumer, set.Logger.Core()) + b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/metricbeat/mbreceiver/factory.go b/x-pack/metricbeat/mbreceiver/factory.go index 62ea8f5c9b5f..c4e8d3b1dcce 100644 --- a/x-pack/metricbeat/mbreceiver/factory.go +++ b/x-pack/metricbeat/mbreceiver/factory.go @@ -33,7 +33,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. settings := cmd.MetricbeatSettings(Name) settings.ElasticLicensed = true - b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, consumer, set.Logger.Core()) + b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, false, consumer, set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) }