From 59d3e2e76e25052ee0f2d46d78c19c4af090f22f Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 9 Jun 2025 15:02:57 +0530 Subject: [PATCH 01/32] add otel test binary --- .../tests/integration/collector_test.go | 81 +++++++ x-pack/libbeat/tests/integration/otel_test.go | 204 ++++++++++++++++++ 2 files changed, 285 insertions(+) create mode 100644 x-pack/libbeat/tests/integration/collector_test.go create mode 100644 x-pack/libbeat/tests/integration/otel_test.go diff --git a/x-pack/libbeat/tests/integration/collector_test.go b/x-pack/libbeat/tests/integration/collector_test.go new file mode 100644 index 000000000000..c2c70b9d7573 --- /dev/null +++ b/x-pack/libbeat/tests/integration/collector_test.go @@ -0,0 +1,81 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration && !agentbeat + +package integration + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/exporter/debugexporter" + "go.opentelemetry.io/collector/otelcol" + + "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" + "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" + "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" +) + +var schemeMap = map[string]string{ + "filebeat": "fb", +} + +// NewTestCollector configures and returns an otel collector intended for testing only +func NewTestCollector(beatname string, configPath string) (*otelcol.Collector, error) { + // adds scheme name as prefix + beatCfg := schemeMap[beatname] + ":" + configPath + + set := getCollectorSettings(beatCfg) + return otelcol.NewCollector(set) +} + +// Component initializes collector components +func getComponent() (otelcol.Factories, error) { + receivers, err := otelcol.MakeFactoryMap( + fbreceiver.NewFactory(), + ) + if err != nil { + return otelcol.Factories{}, nil //nolint:nilerr //ignoring this error + } + + exporters, err := otelcol.MakeFactoryMap( + debugexporter.NewFactory(), + elasticsearchexporter.NewFactory(), + ) + if err != nil { + return otelcol.Factories{}, nil //nolint:nilerr //ignoring this error + } + + return otelcol.Factories{ + Receivers: receivers, + Exporters: exporters, + }, nil + +} + +func getCollectorSettings(filename string) otelcol.CollectorSettings { + // initialize collector settings + info := component.BuildInfo{ + Command: "otel-test", + Description: "Beats OTel", + Version: "9.0.0", + } + + return otelcol.CollectorSettings{ + BuildInfo: info, + Factories: getComponent, + ConfigProviderSettings: otelcol.ConfigProviderSettings{ + ResolverSettings: confmap.ResolverSettings{ + URIs: []string{filename}, + ProviderFactories: []confmap.ProviderFactory{ + fbprovider.NewFactory(), + }, + ConverterFactories: []confmap.ConverterFactory{ + beatconverter.NewFactory(), + }, + }, + }, + } +} diff --git a/x-pack/libbeat/tests/integration/otel_test.go b/x-pack/libbeat/tests/integration/otel_test.go new file mode 100644 index 000000000000..a8007ac2f598 --- /dev/null +++ b/x-pack/libbeat/tests/integration/otel_test.go @@ -0,0 +1,204 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration && !agentbeat + +package integration + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/testing/estools" + "github.com/elastic/go-elasticsearch/v8" +) + +var beatsCfgFile = ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false + paths: + - %s +output: + elasticsearch: + hosts: + - localhost:9200 + protocol: http + username: admin + password: testing + index: %s +queue.mem.flush.timeout: 0s +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +http.enabled: true +http.host: localhost +http.port: %d +` + +func TestFilebeatOTelE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + numEvents := 1 + + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + writeEventsToLogFile(t, logFilePath, numEvents) + + cfgFile := filepath.Join(tempDir, "filebeat-otel.yml") + if err := os.WriteFile(cfgFile, []byte(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)), 0o644); err != nil { + t.Fatalf("cannot create config file '%s': %s", cfgFile, err) + } + + // Get collector with given config + col, err := NewTestCollector("filebeat", cfgFile) + require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) + + // start collector + go func() { + col.Run(t.Context()) + }() + + t.Cleanup(func() { + col.Shutdown() + if t.Failed() { + fmt.Println("shutting down") + + } + }) + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../../filebeat/filebeat.test", + ) + logFilePath = filepath.Join(filebeat.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, numEvents) + s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) + s = s + ` +setup.template.name: logs-filebeat-default +setup.template.pattern: logs-filebeat-default +` + + filebeat.WriteConfigFile(s) + filebeat.Start() + + // prepare to query ES + esCfg := elasticsearch.Config{ + Addresses: []string{"http://localhost:9200"}, + Username: "admin", + Password: "testing", + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // this is only for testing + }, + }, + } + es, err := elasticsearch.NewClient(esCfg) + require.NoError(t, err) + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + // wait for logs to be published + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") + require.NoError(t, err) + + filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") + require.NoError(t, err) + + return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents + }, + 2*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "log.file.inode", + "log.file.path", + "container.id", + } + + assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + // assertMonitoring(t) +} + +func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { + t.Helper() + logFile, err := os.Create(filename) + if err != nil { + t.Fatalf("could not create file '%s': %s", filename, err) + } + // write events to log file + for i := 0; i < numEvents; i++ { + msg := fmt.Sprintf("Line %d", i) + _, err = logFile.Write([]byte(msg + "\n")) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + } + + if err := logFile.Sync(); err != nil { + t.Fatalf("could not sync log file '%s': %s", filename, err) + } + if err := logFile.Close(); err != nil { + t.Fatalf("could not close log file '%s': %s", filename, err) + } +} + +func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { + t.Helper() + + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + for _, f := range ignoredFields { + hasKeyM1, _ := flatM1.HasKey(f) + hasKeyM2, _ := flatM2.HasKey(f) + + if !hasKeyM1 && !hasKeyM2 { + assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) + } + + flatM1.Delete(f) + flatM2.Delete(f) + } + require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") +} + +func assertMonitoring(t *testing.T) { + r, err := http.Get("http://localhost:5066") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get("http://localhost:5066/stats") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get("http://localhost:5066/not-exist") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") +} From bb6ef5e4b969d9c52d2a597f67573d3353a98e26 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 9 Jun 2025 15:47:27 +0530 Subject: [PATCH 02/32] improve collector framework --- libbeat/tests/integration/framework.go | 6 +- .../tests/integration/collector_test.go | 86 ++++++++++++++++++- x-pack/libbeat/tests/integration/otel_test.go | 37 +++----- 3 files changed, 96 insertions(+), 33 deletions(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 86c3b868e9c9..b062a14b763f 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -103,7 +103,7 @@ type Total struct { // `args` will be passed as CLI arguments to the Beat func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { require.FileExistsf(t, binary, "beat binary must exists") - tempDir := createTempDir(t) + tempDir := CreateTempDir(t) configFile := filepath.Join(tempDir, beatName+".yml") stdoutFile, err := os.Create(filepath.Join(tempDir, "stdout")) @@ -142,7 +142,7 @@ func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { // See `NewBeat` for options and information for the parameters. func NewAgentBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { require.FileExistsf(t, binary, "agentbeat binary must exists") - tempDir := createTempDir(t) + tempDir := CreateTempDir(t) configFile := filepath.Join(tempDir, beatName+".yml") stdoutFile, err := os.Create(filepath.Join(tempDir, "stdout")) @@ -648,7 +648,7 @@ func (b *BeatProc) openEventLogFile() *os.File { // // If the tests are run with -v, the temporary directory will // be logged. -func createTempDir(t *testing.T) string { +func CreateTempDir(t *testing.T) string { rootDir, err := filepath.Abs("../../build/integration-tests") if err != nil { t.Fatalf("failed to determine absolute path for temp dir: %s", err) diff --git a/x-pack/libbeat/tests/integration/collector_test.go b/x-pack/libbeat/tests/integration/collector_test.go index c2c70b9d7573..f46437364b81 100644 --- a/x-pack/libbeat/tests/integration/collector_test.go +++ b/x-pack/libbeat/tests/integration/collector_test.go @@ -7,6 +7,11 @@ package integration import ( + "os" + "path/filepath" + "sync" + "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -15,6 +20,7 @@ import ( "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" + "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" ) @@ -22,13 +28,85 @@ var schemeMap = map[string]string{ "filebeat": "fb", } +type TestCollector struct { + t *testing.T + tempDir string + otelcol *otelcol.Collector + wg sync.WaitGroup +} + // NewTestCollector configures and returns an otel collector intended for testing only -func NewTestCollector(beatname string, configPath string) (*otelcol.Collector, error) { - // adds scheme name as prefix - beatCfg := schemeMap[beatname] + ":" + configPath +// It accepts beatname and configuration +func NewTestCollector(t *testing.T, beatname string, config string) (*TestCollector, error) { + // create a temp dir + tempDir := integration.CreateTempDir(t) + // stdoutFile, err := os.Create(filepath.Join(tempDir, "stdout")) + // require.NoError(t, err, "error creating stdout file") + // stderrFile, err := os.Create(filepath.Join(tempDir, "stderr")) + // require.NoError(t, err, "error creating stderr file") + + // create a config file + configFile := filepath.Join(tempDir, beatname+".yml") + // write configuration to a file + if err := os.WriteFile(configFile, []byte(config), 0o644); err != nil { + t.Fatalf("cannot create config file '%s': %s", configFile, err) + } + // adds scheme name as prefix to the configfile + beatCfg := schemeMap[beatname] + ":" + configFile + // get collector settings set := getCollectorSettings(beatCfg) - return otelcol.NewCollector(set) + // get new collector instance + otelcol, err := otelcol.NewCollector(set) + + return &TestCollector{ + t: t, + tempDir: tempDir, + otelcol: otelcol, + wg: sync.WaitGroup{}, + }, err +} + +// NewTestCollector configures and returns an otel collector intended for testing only +// It accepts beatname and configuration +func NewTestStartCollector(t *testing.T, beatname string, config string) (*TestCollector, error) { + + otelcol, err := NewTestCollector(t, beatname, config) + if err != nil { + return nil, err + } + err = otelcol.Run() + if err != nil { + return nil, err + } + + t.Cleanup(func() { + otelcol.Shutdown() + if !t.Failed() { + return + } + }) + + return otelcol, err +} + +func (c *TestCollector) GetTempDir() string { + return c.tempDir +} + +func (c *TestCollector) Run() error { + wg := sync.WaitGroup{} + var err error + go func() { + wg.Add(1) + defer wg.Done() + err = c.otelcol.Run(c.t.Context()) + + }() + return err +} +func (c *TestCollector) Shutdown() { + c.otelcol.Shutdown() } // Component initializes collector components diff --git a/x-pack/libbeat/tests/integration/otel_test.go b/x-pack/libbeat/tests/integration/otel_test.go index a8007ac2f598..19e19f03a1cf 100644 --- a/x-pack/libbeat/tests/integration/otel_test.go +++ b/x-pack/libbeat/tests/integration/otel_test.go @@ -8,7 +8,6 @@ package integration import ( "context" - "crypto/tls" "fmt" "net/http" "os" @@ -23,7 +22,6 @@ import ( "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" - "github.com/elastic/go-elasticsearch/v8" ) var beatsCfgFile = ` @@ -58,28 +56,24 @@ func TestFilebeatOTelE2E(t *testing.T) { integration.EnsureESIsRunning(t) numEvents := 1 - tempDir := t.TempDir() - logFilePath := filepath.Join(tempDir, "log.log") - writeEventsToLogFile(t, logFilePath, numEvents) - - cfgFile := filepath.Join(tempDir, "filebeat-otel.yml") - if err := os.WriteFile(cfgFile, []byte(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)), 0o644); err != nil { - t.Fatalf("cannot create config file '%s': %s", cfgFile, err) - } - // Get collector with given config - col, err := NewTestCollector("filebeat", cfgFile) + col, err := NewTestCollector(t, "filebeat", beatsCfgFile) require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) + logFilePath := filepath.Join(col.GetTempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, numEvents) + // start collector go func() { - col.Run(t.Context()) + err := col.Run() + if err != nil { + t.Logf("could not start collector") + } }() t.Cleanup(func() { col.Shutdown() if t.Failed() { - fmt.Println("shutting down") } }) @@ -101,19 +95,10 @@ setup.template.pattern: logs-filebeat-default filebeat.WriteConfigFile(s) filebeat.Start() - // prepare to query ES - esCfg := elasticsearch.Config{ - Addresses: []string{"http://localhost:9200"}, - Username: "admin", - Password: "testing", - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, //nolint:gosec // this is only for testing - }, - }, + es, err := integration.GetESClient(t) + if err != nil { + t.Fatalf("could not get es client due to: %v", err) } - es, err := elasticsearch.NewClient(esCfg) - require.NoError(t, err) var filebeatDocs estools.Documents var otelDocs estools.Documents From 188bd7ccb32729c42d5feb032e9b94143b11a19e Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 9 Jun 2025 16:28:24 +0530 Subject: [PATCH 03/32] get es client and framework improvement --- libbeat/tests/integration/framework.go | 27 +++++++++++++ .../tests/integration/collector_test.go | 39 ++++++++++++++----- x-pack/libbeat/tests/integration/otel_test.go | 20 ++++------ 3 files changed, 64 insertions(+), 22 deletions(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index b062a14b763f..f4d85d10f28b 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -23,6 +23,7 @@ import ( "bufio" "bytes" "context" + "crypto/tls" "encoding/json" "errors" "fmt" @@ -42,6 +43,7 @@ import ( "testing" "time" + "github.com/elastic/go-elasticsearch/v8" "github.com/gofrs/uuid/v5" "github.com/stretchr/testify/require" ) @@ -716,6 +718,31 @@ func EnsureESIsRunning(t *testing.T) { } } +func GetESClient(t *testing.T) (*elasticsearch.Client, error) { + esURL := GetESURL(t, "http") + + u := esURL.User.Username() + p, _ := esURL.User.Password() + + // prepare to query ES + esCfg := elasticsearch.Config{ + Addresses: []string{esURL.String()}, + Username: u, + Password: p, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // this is only for testing + }, + }, + } + es, err := elasticsearch.NewClient(esCfg) + if err != nil { + return nil, err + } + return es, nil + +} + func (b *BeatProc) FileContains(filename string, match string) string { file, err := os.Open(filename) require.NoErrorf(b.t, err, "error opening: %s", filename) diff --git a/x-pack/libbeat/tests/integration/collector_test.go b/x-pack/libbeat/tests/integration/collector_test.go index f46437364b81..76040eabbf7b 100644 --- a/x-pack/libbeat/tests/integration/collector_test.go +++ b/x-pack/libbeat/tests/integration/collector_test.go @@ -2,8 +2,6 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build integration && !agentbeat - package integration import ( @@ -29,10 +27,12 @@ var schemeMap = map[string]string{ } type TestCollector struct { - t *testing.T - tempDir string - otelcol *otelcol.Collector - wg sync.WaitGroup + t *testing.T + tempDir string + otelcol *otelcol.Collector + wg sync.WaitGroup + configFile string + beatname string } // NewTestCollector configures and returns an otel collector intended for testing only @@ -60,10 +60,12 @@ func NewTestCollector(t *testing.T, beatname string, config string) (*TestCollec otelcol, err := otelcol.NewCollector(set) return &TestCollector{ - t: t, - tempDir: tempDir, - otelcol: otelcol, - wg: sync.WaitGroup{}, + t: t, + tempDir: tempDir, + otelcol: otelcol, + wg: sync.WaitGroup{}, + configFile: configFile, + beatname: beatname, }, err } @@ -90,6 +92,23 @@ func NewTestStartCollector(t *testing.T, beatname string, config string) (*TestC return otelcol, err } +func (c *TestCollector) ReloadConfig(config string) error { + c.Shutdown() + // write configuration to a file + if err := os.WriteFile(c.configFile, []byte(config), 0o644); err != nil { + c.t.Fatalf("cannot create config file '%s': %s", c.configFile, err) + } + // adds scheme name as prefix to the configfile + beatCfg := schemeMap[c.beatname] + ":" + c.configFile + // get collector settings + set := getCollectorSettings(beatCfg) + // get new collector instance + otelcol, _ := otelcol.NewCollector(set) + c.otelcol = otelcol + return c.Run() + +} + func (c *TestCollector) GetTempDir() string { return c.tempDir } diff --git a/x-pack/libbeat/tests/integration/otel_test.go b/x-pack/libbeat/tests/integration/otel_test.go index 19e19f03a1cf..04b9da6c504c 100644 --- a/x-pack/libbeat/tests/integration/otel_test.go +++ b/x-pack/libbeat/tests/integration/otel_test.go @@ -57,25 +57,21 @@ func TestFilebeatOTelE2E(t *testing.T) { numEvents := 1 // Get collector with given config - col, err := NewTestCollector(t, "filebeat", beatsCfgFile) + col, err := NewTestCollector(t, "filebeat", "") require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) + // start collector logFilePath := filepath.Join(col.GetTempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - // start collector - go func() { - err := col.Run() - if err != nil { - t.Logf("could not start collector") - } - }() + col.ReloadConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) + err = col.Run() + if err != nil { + t.Logf("could not start collector") + } t.Cleanup(func() { col.Shutdown() - if t.Failed() { - - } }) // start filebeat @@ -131,7 +127,7 @@ setup.template.pattern: logs-filebeat-default } assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") - // assertMonitoring(t) + assertMonitoring(t) } func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { From 6e3780a55df5c112cf88cf7f235a012d84a3c767 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 9 Jun 2025 17:28:12 +0530 Subject: [PATCH 04/32] otel e2e test works --- .../libbeat/tests/integration/collector_test.go | 7 +++---- x-pack/libbeat/tests/integration/otel_test.go | 16 +++++++++------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/x-pack/libbeat/tests/integration/collector_test.go b/x-pack/libbeat/tests/integration/collector_test.go index 76040eabbf7b..9c990438f4c3 100644 --- a/x-pack/libbeat/tests/integration/collector_test.go +++ b/x-pack/libbeat/tests/integration/collector_test.go @@ -69,10 +69,9 @@ func NewTestCollector(t *testing.T, beatname string, config string) (*TestCollec }, err } -// NewTestCollector configures and returns an otel collector intended for testing only +// NewTestStartCollector configures and returns an otel collector intended for testing only // It accepts beatname and configuration func NewTestStartCollector(t *testing.T, beatname string, config string) (*TestCollector, error) { - otelcol, err := NewTestCollector(t, beatname, config) if err != nil { return nil, err @@ -93,6 +92,7 @@ func NewTestStartCollector(t *testing.T, beatname string, config string) (*TestC } func (c *TestCollector) ReloadConfig(config string) error { + // If collector is started, shut it down then reload c.Shutdown() // write configuration to a file if err := os.WriteFile(c.configFile, []byte(config), 0o644); err != nil { @@ -116,11 +116,10 @@ func (c *TestCollector) GetTempDir() string { func (c *TestCollector) Run() error { wg := sync.WaitGroup{} var err error + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() err = c.otelcol.Run(c.t.Context()) - }() return err } diff --git a/x-pack/libbeat/tests/integration/otel_test.go b/x-pack/libbeat/tests/integration/otel_test.go index 04b9da6c504c..a59ce0c9c49c 100644 --- a/x-pack/libbeat/tests/integration/otel_test.go +++ b/x-pack/libbeat/tests/integration/otel_test.go @@ -47,6 +47,7 @@ processors: - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ +path.home: %s http.enabled: true http.host: localhost http.port: %d @@ -56,7 +57,7 @@ func TestFilebeatOTelE2E(t *testing.T) { integration.EnsureESIsRunning(t) numEvents := 1 - // Get collector with given config + // Get collector with empty config col, err := NewTestCollector(t, "filebeat", "") require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) @@ -64,11 +65,8 @@ func TestFilebeatOTelE2E(t *testing.T) { logFilePath := filepath.Join(col.GetTempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - col.ReloadConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) - err = col.Run() - if err != nil { - t.Logf("could not start collector") - } + err := col.ReloadConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) + require.NoError(t, err) t.Cleanup(func() { col.Shutdown() @@ -91,6 +89,10 @@ setup.template.pattern: logs-filebeat-default filebeat.WriteConfigFile(s) filebeat.Start() + t.Cleanup(func() { + filebeat.Stop() + }) + es, err := integration.GetESClient(t) if err != nil { t.Fatalf("could not get es client due to: %v", err) @@ -101,7 +103,7 @@ setup.template.pattern: logs-filebeat-default // wait for logs to be published require.Eventually(t, func() bool { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) defer findCancel() otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") From b897c9d4b091090d572748afbc6af90210294bcf Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 12 Jun 2025 12:17:13 +0530 Subject: [PATCH 05/32] address review comments --- libbeat/tests/integration/framework.go | 7 +- x-pack/libbeat/common/otelbeat/otel.go | 4 +- .../tests/integration/collector_test.go | 146 +++++++++--------- x-pack/libbeat/tests/integration/otel_test.go | 11 +- 4 files changed, 86 insertions(+), 82 deletions(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index f4d85d10f28b..7e317a4c33cd 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -651,6 +651,7 @@ func (b *BeatProc) openEventLogFile() *os.File { // If the tests are run with -v, the temporary directory will // be logged. func CreateTempDir(t *testing.T) string { + t.Helper() rootDir, err := filepath.Abs("../../build/integration-tests") if err != nil { t.Fatalf("failed to determine absolute path for temp dir: %s", err) @@ -667,7 +668,7 @@ func CreateTempDir(t *testing.T) string { cleanup := func() { if !t.Failed() { if err := os.RemoveAll(tempDir); err != nil { - // Ungly workaround Windows limitations + // Ugly workaround Windows limitations // Windows does not support the Interrup signal, so it might // happen that Filebeat is still running, keeping it's registry // file open, thus preventing the temporary folder from being @@ -692,6 +693,7 @@ func CreateTempDir(t *testing.T) string { // using the default test credentials or the corresponding environment // variables. func EnsureESIsRunning(t *testing.T) { + t.Helper() esURL := GetESURL(t, "http") ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second)) @@ -719,6 +721,7 @@ func EnsureESIsRunning(t *testing.T) { } func GetESClient(t *testing.T) (*elasticsearch.Client, error) { + t.Helper() esURL := GetESURL(t, "http") u := esURL.User.Username() @@ -932,6 +935,7 @@ func FormatRefreshURL(t *testing.T, srcURL url.URL) url.URL { } func FormatDataStreamSearchURL(t *testing.T, srcURL url.URL, dataStream string) (url.URL, error) { + t.Helper() t.Helper() path, err := url.JoinPath("/", dataStream, "_search") if err != nil { @@ -994,6 +998,7 @@ func reportErrors(t *testing.T, tempDir string, beatName string) { // GenerateLogFile writes count lines to path, each line is 50 bytes. // Each line contains the current time (RFC3339) and a counter func GenerateLogFile(t *testing.T, path string, count int, append bool) { + t.Helper() var file *os.File var err error if !append { diff --git a/x-pack/libbeat/common/otelbeat/otel.go b/x-pack/libbeat/common/otelbeat/otel.go index f338584cde9a..153ae8319258 100644 --- a/x-pack/libbeat/common/otelbeat/otel.go +++ b/x-pack/libbeat/common/otelbeat/otel.go @@ -53,7 +53,7 @@ func OTelCmd(beatname string) *cobra.Command { } // Component initializes collector components -func getComponent() (otelcol.Factories, error) { +func GetComponent() (otelcol.Factories, error) { receivers, err := otelcol.MakeFactoryMap( fbreceiver.NewFactory(), ) @@ -86,7 +86,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings { return otelcol.CollectorSettings{ BuildInfo: info, - Factories: getComponent, + Factories: GetComponent, ConfigProviderSettings: otelcol.ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ URIs: []string{filename}, diff --git a/x-pack/libbeat/tests/integration/collector_test.go b/x-pack/libbeat/tests/integration/collector_test.go index 9c990438f4c3..319ea459f994 100644 --- a/x-pack/libbeat/tests/integration/collector_test.go +++ b/x-pack/libbeat/tests/integration/collector_test.go @@ -2,24 +2,29 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. +//go:build integration && !agentbeat + package integration import ( + "fmt" "os" "path/filepath" "sync" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/exporter/debugexporter" "go.opentelemetry.io/collector/otelcol" "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" + "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "go.uber.org/zap/zaptest/observer" ) var schemeMap = map[string]string{ @@ -27,50 +32,37 @@ var schemeMap = map[string]string{ } type TestCollector struct { - t *testing.T - tempDir string - otelcol *otelcol.Collector - wg sync.WaitGroup - configFile string - beatname string + t *testing.T + tempDir string + otelcol *otelcol.Collector + wg sync.WaitGroup + configFile string + beatname string + observedLogs *observer.ObservedLogs } -// NewTestCollector configures and returns an otel collector intended for testing only -// It accepts beatname and configuration +// NewTestCollector configures and returns an instance of otel collector intended for testing only func NewTestCollector(t *testing.T, beatname string, config string) (*TestCollector, error) { - - // create a temp dir + // create a temp dir at beat/build/integration-tests tempDir := integration.CreateTempDir(t) - // stdoutFile, err := os.Create(filepath.Join(tempDir, "stdout")) - // require.NoError(t, err, "error creating stdout file") - // stderrFile, err := os.Create(filepath.Join(tempDir, "stderr")) - // require.NoError(t, err, "error creating stderr file") - // create a config file + // create a configfile configFile := filepath.Join(tempDir, beatname+".yml") - // write configuration to a file - if err := os.WriteFile(configFile, []byte(config), 0o644); err != nil { - t.Fatalf("cannot create config file '%s': %s", configFile, err) - } - // adds scheme name as prefix to the configfile - beatCfg := schemeMap[beatname] + ":" + configFile - // get collector settings - set := getCollectorSettings(beatCfg) - // get new collector instance - otelcol, err := otelcol.NewCollector(set) - return &TestCollector{ + testCol := &TestCollector{ t: t, tempDir: tempDir, - otelcol: otelcol, wg: sync.WaitGroup{}, configFile: configFile, beatname: beatname, - }, err + } + + // write configuration to a file + err := testCol.reloadConfig(config) + return testCol, err } -// NewTestStartCollector configures and returns an otel collector intended for testing only -// It accepts beatname and configuration +// NewTestStartCollector configures and starts an otel collector intended for testing only func NewTestStartCollector(t *testing.T, beatname string, config string) (*TestCollector, error) { otelcol, err := NewTestCollector(t, beatname, config) if err != nil { @@ -81,39 +73,48 @@ func NewTestStartCollector(t *testing.T, beatname string, config string) (*TestC return nil, err } - t.Cleanup(func() { - otelcol.Shutdown() - if !t.Failed() { - return - } - }) - return otelcol, err } -func (c *TestCollector) ReloadConfig(config string) error { - // If collector is started, shut it down then reload - c.Shutdown() +// reloadConfig reloads configuration with which collector should be started. +// Note: A running collector will not pick the new config until it is stopped and started +func (c *TestCollector) reloadConfig(config string) error { // write configuration to a file if err := os.WriteFile(c.configFile, []byte(config), 0o644); err != nil { - c.t.Fatalf("cannot create config file '%s': %s", c.configFile, err) + return fmt.Errorf("cannot create config file '%s': %s", c.configFile, err) } // adds scheme name as prefix to the configfile beatCfg := schemeMap[c.beatname] + ":" + c.configFile // get collector settings - set := getCollectorSettings(beatCfg) + set, observedLogs := getCollectorSettings(beatCfg) + + c.observedLogs = observedLogs // get new collector instance - otelcol, _ := otelcol.NewCollector(set) + otelcol, err := otelcol.NewCollector(set) c.otelcol = otelcol - return c.Run() + return err +} +// ReloadCollectorWithConfig reloads the collector with given config +func (c *TestCollector) ReloadCollectorWithConfig(config string) error { + c.t.Helper() + // shutdown collector if it is running + c.Shutdown() + err := c.reloadConfig(config) + if err != nil { + return err + } + return c.Run() } func (c *TestCollector) GetTempDir() string { return c.tempDir } +// Run starts the otel collector func (c *TestCollector) Run() error { + c.t.Helper() + wg := sync.WaitGroup{} var err error wg.Add(1) @@ -121,47 +122,43 @@ func (c *TestCollector) Run() error { defer wg.Done() err = c.otelcol.Run(c.t.Context()) }() + + c.t.Cleanup(func() { + c.Shutdown() + return + }) + return err } + func (c *TestCollector) Shutdown() { c.otelcol.Shutdown() } -// Component initializes collector components -func getComponent() (otelcol.Factories, error) { - receivers, err := otelcol.MakeFactoryMap( - fbreceiver.NewFactory(), - ) - if err != nil { - return otelcol.Factories{}, nil //nolint:nilerr //ignoring this error - } - - exporters, err := otelcol.MakeFactoryMap( - debugexporter.NewFactory(), - elasticsearchexporter.NewFactory(), - ) - if err != nil { - return otelcol.Factories{}, nil //nolint:nilerr //ignoring this error - } - - return otelcol.Factories{ - Receivers: receivers, - Exporters: exporters, - }, nil - +// IsCollectorHealthy returns true if collector is healthy +func (c *TestCollector) IsCollectorHealthy(config string) bool { + // TODO + return true } -func getCollectorSettings(filename string) otelcol.CollectorSettings { +func getCollectorSettings(filename string) (otelcol.CollectorSettings, *observer.ObservedLogs) { // initialize collector settings info := component.BuildInfo{ Command: "otel-test", Description: "Beats OTel", - Version: "9.0.0", + Version: "9.1.0", } + zapCore := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), + &zaptest.Discarder{}, + zapcore.DebugLevel, + ) + observed, zapLogs := observer.New(zapcore.DebugLevel) + return otelcol.CollectorSettings{ BuildInfo: info, - Factories: getComponent, + Factories: otelbeat.GetComponent, ConfigProviderSettings: otelcol.ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ URIs: []string{filename}, @@ -173,5 +170,8 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings { }, }, }, - } + LoggingOptions: []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core { + return zapcore.NewTee(zapCore, observed) + })}, + }, zapLogs } diff --git a/x-pack/libbeat/tests/integration/otel_test.go b/x-pack/libbeat/tests/integration/otel_test.go index a59ce0c9c49c..fc502e0f5681 100644 --- a/x-pack/libbeat/tests/integration/otel_test.go +++ b/x-pack/libbeat/tests/integration/otel_test.go @@ -61,16 +61,15 @@ func TestFilebeatOTelE2E(t *testing.T) { col, err := NewTestCollector(t, "filebeat", "") require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) - // start collector + // write to a log file logFilePath := filepath.Join(col.GetTempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - err := col.ReloadConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) + // start collector + err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) require.NoError(t, err) - t.Cleanup(func() { - col.Shutdown() - }) + // TODO: check if collector is healthy before proceeding // start filebeat filebeat := integration.NewBeat( @@ -80,7 +79,7 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath = filepath.Join(filebeat.TempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) + s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", filebeat.TempDir(), 5067) s = s + ` setup.template.name: logs-filebeat-default setup.template.pattern: logs-filebeat-default From 7b4e8091c3e225de5af1655bf8507fef01e9a83c Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 12 Jun 2025 12:39:28 +0530 Subject: [PATCH 06/32] refactor paths --- .../filebeat/tests/integration/otel_test.go | 49 +++-- x-pack/libbeat/tests/integration/otel_test.go | 186 ------------------ ...ollector_test.go => oteltest_framework.go} | 0 3 files changed, 23 insertions(+), 212 deletions(-) delete mode 100644 x-pack/libbeat/tests/integration/otel_test.go rename x-pack/libbeat/tests/integration/{collector_test.go => oteltest_framework.go} (100%) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 8aa997c2db32..521fb28aa42b 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -8,7 +8,6 @@ package integration import ( "context" - "crypto/tls" "fmt" "net/http" "os" @@ -21,9 +20,9 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/tests/integration" + oteltest "github.com/elastic/beats/v7/x-pack/libbeat/tests/integration" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" - "github.com/elastic/go-elasticsearch/v8" ) var beatsCfgFile = ` @@ -49,6 +48,7 @@ processors: - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ +path.home: %s http.enabled: true http.host: localhost http.port: %d @@ -58,18 +58,19 @@ func TestFilebeatOTelE2E(t *testing.T) { integration.EnsureESIsRunning(t) numEvents := 1 - // start filebeat in otel mode - filebeatOTel := integration.NewBeat( - t, - "filebeat-otel", - "../../filebeat.test", - "otel", - ) + // Get collector with empty config + col, err := oteltest.NewTestCollector(t, "filebeat", "") + require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) - logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") - filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) + // write to a log file + logFilePath := filepath.Join(col.GetTempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - filebeatOTel.Start() + + // start collector + err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) + require.NoError(t, err) + + // TODO: check if collector is healthy before proceeding // start filebeat filebeat := integration.NewBeat( @@ -79,7 +80,7 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath = filepath.Join(filebeat.TempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) + s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", filebeat.TempDir(), 5067) s = s + ` setup.template.name: logs-filebeat-default setup.template.pattern: logs-filebeat-default @@ -88,26 +89,21 @@ setup.template.pattern: logs-filebeat-default filebeat.WriteConfigFile(s) filebeat.Start() - // prepare to query ES - esCfg := elasticsearch.Config{ - Addresses: []string{"http://localhost:9200"}, - Username: "admin", - Password: "testing", - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, //nolint:gosec // this is only for testing - }, - }, + t.Cleanup(func() { + filebeat.Stop() + }) + + es, err := integration.GetESClient(t) + if err != nil { + t.Fatalf("could not get es client due to: %v", err) } - es, err := elasticsearch.NewClient(esCfg) - require.NoError(t, err) var filebeatDocs estools.Documents var otelDocs estools.Documents // wait for logs to be published require.Eventually(t, func() bool { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) defer findCancel() otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") @@ -129,6 +125,7 @@ setup.template.pattern: logs-filebeat-default "agent.id", "log.file.inode", "log.file.path", + "container.id", } assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") diff --git a/x-pack/libbeat/tests/integration/otel_test.go b/x-pack/libbeat/tests/integration/otel_test.go deleted file mode 100644 index fc502e0f5681..000000000000 --- a/x-pack/libbeat/tests/integration/otel_test.go +++ /dev/null @@ -1,186 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration && !agentbeat - -package integration - -import ( - "context" - "fmt" - "net/http" - "os" - "path/filepath" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/testing/estools" -) - -var beatsCfgFile = ` -filebeat.inputs: - - type: filestream - id: filestream-input-id - enabled: true - file_identity.native: ~ - prospector.scanner.fingerprint.enabled: false - paths: - - %s -output: - elasticsearch: - hosts: - - localhost:9200 - protocol: http - username: admin - password: testing - index: %s -queue.mem.flush.timeout: 0s -processors: - - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ -path.home: %s -http.enabled: true -http.host: localhost -http.port: %d -` - -func TestFilebeatOTelE2E(t *testing.T) { - integration.EnsureESIsRunning(t) - numEvents := 1 - - // Get collector with empty config - col, err := NewTestCollector(t, "filebeat", "") - require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) - - // write to a log file - logFilePath := filepath.Join(col.GetTempDir(), "log.log") - writeEventsToLogFile(t, logFilePath, numEvents) - - // start collector - err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) - require.NoError(t, err) - - // TODO: check if collector is healthy before proceeding - - // start filebeat - filebeat := integration.NewBeat( - t, - "filebeat", - "../../../filebeat/filebeat.test", - ) - logFilePath = filepath.Join(filebeat.TempDir(), "log.log") - writeEventsToLogFile(t, logFilePath, numEvents) - s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", filebeat.TempDir(), 5067) - s = s + ` -setup.template.name: logs-filebeat-default -setup.template.pattern: logs-filebeat-default -` - - filebeat.WriteConfigFile(s) - filebeat.Start() - - t.Cleanup(func() { - filebeat.Stop() - }) - - es, err := integration.GetESClient(t) - if err != nil { - t.Fatalf("could not get es client due to: %v", err) - } - - var filebeatDocs estools.Documents - var otelDocs estools.Documents - // wait for logs to be published - require.Eventually(t, - func() bool { - findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) - defer findCancel() - - otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") - require.NoError(t, err) - - filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") - require.NoError(t, err) - - return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents - }, - 2*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) - - filebeatDoc := filebeatDocs.Hits.Hits[0].Source - otelDoc := otelDocs.Hits.Hits[0].Source - ignoredFields := []string{ - // Expected to change between agentDocs and OtelDocs - "@timestamp", - "agent.ephemeral_id", - "agent.id", - "log.file.inode", - "log.file.path", - "container.id", - } - - assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") - assertMonitoring(t) -} - -func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { - t.Helper() - logFile, err := os.Create(filename) - if err != nil { - t.Fatalf("could not create file '%s': %s", filename, err) - } - // write events to log file - for i := 0; i < numEvents; i++ { - msg := fmt.Sprintf("Line %d", i) - _, err = logFile.Write([]byte(msg + "\n")) - require.NoErrorf(t, err, "failed to write line %d to temp file", i) - } - - if err := logFile.Sync(); err != nil { - t.Fatalf("could not sync log file '%s': %s", filename, err) - } - if err := logFile.Close(); err != nil { - t.Fatalf("could not close log file '%s': %s", filename, err) - } -} - -func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { - t.Helper() - - flatM1 := m1.Flatten() - flatM2 := m2.Flatten() - for _, f := range ignoredFields { - hasKeyM1, _ := flatM1.HasKey(f) - hasKeyM2, _ := flatM2.HasKey(f) - - if !hasKeyM1 && !hasKeyM2 { - assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) - } - - flatM1.Delete(f) - flatM2.Delete(f) - } - require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") -} - -func assertMonitoring(t *testing.T) { - r, err := http.Get("http://localhost:5066") //nolint:noctx,bodyclose // fine for tests - require.NoError(t, err) - require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - - r, err = http.Get("http://localhost:5066/stats") //nolint:noctx,bodyclose // fine for tests - require.NoError(t, err) - require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - - r, err = http.Get("http://localhost:5066/not-exist") //nolint:noctx,bodyclose // fine for tests - require.NoError(t, err) - require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") -} diff --git a/x-pack/libbeat/tests/integration/collector_test.go b/x-pack/libbeat/tests/integration/oteltest_framework.go similarity index 100% rename from x-pack/libbeat/tests/integration/collector_test.go rename to x-pack/libbeat/tests/integration/oteltest_framework.go From 4a6c2eb4211aa1bbc0c6d38f94b60432b50a1cc4 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 12 Jun 2025 13:22:24 +0530 Subject: [PATCH 07/32] check update --- libbeat/tests/integration/framework.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 7e317a4c33cd..df6538f5ede4 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -43,9 +43,10 @@ import ( "testing" "time" - "github.com/elastic/go-elasticsearch/v8" "github.com/gofrs/uuid/v5" "github.com/stretchr/testify/require" + + "github.com/elastic/go-elasticsearch/v8" ) type BeatProc struct { From 4fca789032bb1ab4d4b94502192d3bc573b8b290 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 12 Jun 2025 13:49:24 +0530 Subject: [PATCH 08/32] check update --- x-pack/libbeat/tests/integration/oteltest_framework.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/x-pack/libbeat/tests/integration/oteltest_framework.go b/x-pack/libbeat/tests/integration/oteltest_framework.go index 319ea459f994..cae9e86d3c24 100644 --- a/x-pack/libbeat/tests/integration/oteltest_framework.go +++ b/x-pack/libbeat/tests/integration/oteltest_framework.go @@ -17,14 +17,15 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/otelcol" - "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" - "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" - "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest/observer" + + "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" + "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" ) var schemeMap = map[string]string{ From c4e3afe871030276624b11e064d2307de61ab105 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 12 Jun 2025 21:43:18 +0530 Subject: [PATCH 09/32] Update libbeat/tests/integration/framework.go Co-authored-by: Tiago Queiroz --- libbeat/tests/integration/framework.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index df6538f5ede4..f0aa2b92faa4 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -721,9 +721,8 @@ func EnsureESIsRunning(t *testing.T) { } } -func GetESClient(t *testing.T) (*elasticsearch.Client, error) { - t.Helper() - esURL := GetESURL(t, "http") +func GetESClient(t *testing.T, scheme string) *elasticsearch.Client { + esURL := GetESURL(t, scheme) u := esURL.User.Username() p, _ := esURL.User.Password() @@ -741,10 +740,10 @@ func GetESClient(t *testing.T) (*elasticsearch.Client, error) { } es, err := elasticsearch.NewClient(esCfg) if err != nil { - return nil, err + t.Fatalf("cannot create Elasticsearch client: %s", err) } - return es, nil + return es } func (b *BeatProc) FileContains(filename string, match string) string { From 85042a8bbada307661cc2ed8b14daafc92440409 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 12 Jun 2025 21:43:27 +0530 Subject: [PATCH 10/32] Update libbeat/tests/integration/framework.go Co-authored-by: Tiago Queiroz --- libbeat/tests/integration/framework.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index f0aa2b92faa4..b9607c7592ad 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -935,7 +935,6 @@ func FormatRefreshURL(t *testing.T, srcURL url.URL) url.URL { } func FormatDataStreamSearchURL(t *testing.T, srcURL url.URL, dataStream string) (url.URL, error) { - t.Helper() t.Helper() path, err := url.JoinPath("/", dataStream, "_search") if err != nil { From 7065f2acea472c8c58af3695021f8f9ab8a69262 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 12 Jun 2025 21:49:39 +0530 Subject: [PATCH 11/32] increase timeout --- x-pack/filebeat/tests/integration/otel_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 521fb28aa42b..f104abd5aa35 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -43,11 +43,6 @@ output: password: testing index: %s queue.mem.flush.timeout: 0s -processors: - - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ path.home: %s http.enabled: true http.host: localhost @@ -114,7 +109,7 @@ setup.template.pattern: logs-filebeat-default return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents }, - 2*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) + 3*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) filebeatDoc := filebeatDocs.Hits.Hits[0].Source otelDoc := otelDocs.Hits.Hits[0].Source From 3d9a15ab639dd8764b86b73c0245e7f73804156a Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 16 Jun 2025 07:09:07 +0530 Subject: [PATCH 12/32] add collector wait method --- libbeat/tests/integration/framework.go | 1 - x-pack/filebeat/tests/integration/otel_test.go | 13 ++++++++----- .../libbeat/tests/integration/oteltest_framework.go | 13 +++++++------ 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index b9607c7592ad..41a9bc5c560c 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -997,7 +997,6 @@ func reportErrors(t *testing.T, tempDir string, beatName string) { // GenerateLogFile writes count lines to path, each line is 50 bytes. // Each line contains the current time (RFC3339) and a counter func GenerateLogFile(t *testing.T, path string, count int, append bool) { - t.Helper() var file *os.File var err error if !append { diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index f104abd5aa35..61d9ff85b8ff 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -42,6 +42,11 @@ output: username: admin password: testing index: %s +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ queue.mem.flush.timeout: 0s path.home: %s http.enabled: true @@ -65,7 +70,8 @@ func TestFilebeatOTelE2E(t *testing.T) { err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) require.NoError(t, err) - // TODO: check if collector is healthy before proceeding + // wait for collector to be ready + col.Wait() // start filebeat filebeat := integration.NewBeat( @@ -88,10 +94,7 @@ setup.template.pattern: logs-filebeat-default filebeat.Stop() }) - es, err := integration.GetESClient(t) - if err != nil { - t.Fatalf("could not get es client due to: %v", err) - } + es := integration.GetESClient(t, "http") var filebeatDocs estools.Documents var otelDocs estools.Documents diff --git a/x-pack/libbeat/tests/integration/oteltest_framework.go b/x-pack/libbeat/tests/integration/oteltest_framework.go index cae9e86d3c24..bc1c274c4910 100644 --- a/x-pack/libbeat/tests/integration/oteltest_framework.go +++ b/x-pack/libbeat/tests/integration/oteltest_framework.go @@ -12,11 +12,13 @@ import ( "path/filepath" "sync" "testing" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/otelcol" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" @@ -98,7 +100,6 @@ func (c *TestCollector) reloadConfig(config string) error { // ReloadCollectorWithConfig reloads the collector with given config func (c *TestCollector) ReloadCollectorWithConfig(config string) error { - c.t.Helper() // shutdown collector if it is running c.Shutdown() err := c.reloadConfig(config) @@ -114,7 +115,6 @@ func (c *TestCollector) GetTempDir() string { // Run starts the otel collector func (c *TestCollector) Run() error { - c.t.Helper() wg := sync.WaitGroup{} var err error @@ -136,10 +136,11 @@ func (c *TestCollector) Shutdown() { c.otelcol.Shutdown() } -// IsCollectorHealthy returns true if collector is healthy -func (c *TestCollector) IsCollectorHealthy(config string) bool { - // TODO - return true +// Wait waits for the collector to be ready +func (c *TestCollector) Wait() { + require.Eventually(c.t, func() bool { + return c.observedLogs.FilterMessage("Everything is ready. Begin running and processing data.").Len() > 0 + }, 10*time.Second, 100*time.Millisecond) } func getCollectorSettings(filename string) (otelcol.CollectorSettings, *observer.ObservedLogs) { From 85049825d9a2a8ad51133f28c37e768c95b00e69 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 16 Jun 2025 09:44:50 +0530 Subject: [PATCH 13/32] add verbose error logging --- x-pack/filebeat/tests/integration/otel_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 61d9ff85b8ff..ca64fdd7ab68 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -112,7 +112,7 @@ setup.template.pattern: logs-filebeat-default return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents }, - 3*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) + 3*time.Minute, 1*time.Second, fmt.Sprintf("otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value)) filebeatDoc := filebeatDocs.Hits.Hits[0].Source otelDoc := otelDocs.Hits.Hits[0].Source From c06f914cb81e5f4411c343246d5fa6338522fd8e Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 16 Jun 2025 12:22:40 +0530 Subject: [PATCH 14/32] change context background --- x-pack/filebeat/tests/integration/otel_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index ca64fdd7ab68..2ba2a7f5e8dc 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -101,7 +101,7 @@ setup.template.pattern: logs-filebeat-default // wait for logs to be published require.Eventually(t, func() bool { - findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) defer findCancel() otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") From a48f4f81cae867fc367a8b3728d4a156ae3b5d87 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 16 Jun 2025 13:01:21 +0530 Subject: [PATCH 15/32] add logs --- x-pack/filebeat/tests/integration/otel_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 2ba2a7f5e8dc..ee2e5e9fa71b 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -98,10 +98,11 @@ setup.template.pattern: logs-filebeat-default var filebeatDocs estools.Documents var otelDocs estools.Documents + // wait for logs to be published require.Eventually(t, func() bool { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) defer findCancel() otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") @@ -109,10 +110,10 @@ setup.template.pattern: logs-filebeat-default filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") require.NoError(t, err) - - return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents + t.Logf("otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) + return otelDocs.Hits.Total.Value < 0 && filebeatDocs.Hits.Total.Value >= numEvents }, - 3*time.Minute, 1*time.Second, fmt.Sprintf("otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value)) + 30*time.Second, 5*time.Second, "otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) filebeatDoc := filebeatDocs.Hits.Hits[0].Source otelDoc := otelDocs.Hits.Hits[0].Source From d98b134516c38156e10a53c428f137bb46e92a5d Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 16 Jun 2025 13:03:09 +0530 Subject: [PATCH 16/32] add logs --- x-pack/filebeat/tests/integration/otel_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index ee2e5e9fa71b..a76a78ea8444 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -113,7 +113,7 @@ setup.template.pattern: logs-filebeat-default t.Logf("otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) return otelDocs.Hits.Total.Value < 0 && filebeatDocs.Hits.Total.Value >= numEvents }, - 30*time.Second, 5*time.Second, "otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) + 2*time.Minute, 1*time.Second, "otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) filebeatDoc := filebeatDocs.Hits.Hits[0].Source otelDoc := otelDocs.Hits.Hits[0].Source From 93fb7785aa8559bbf2e0c75ed6d61274322b5db1 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 16 Jun 2025 15:35:38 +0530 Subject: [PATCH 17/32] remove home --- x-pack/filebeat/tests/integration/otel_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index a76a78ea8444..bac8b8ea6dac 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -48,7 +48,6 @@ processors: - add_docker_metadata: ~ - add_kubernetes_metadata: ~ queue.mem.flush.timeout: 0s -path.home: %s http.enabled: true http.host: localhost http.port: %d @@ -67,7 +66,7 @@ func TestFilebeatOTelE2E(t *testing.T) { writeEventsToLogFile(t, logFilePath, numEvents) // start collector - err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) + err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) require.NoError(t, err) // wait for collector to be ready @@ -81,7 +80,7 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath = filepath.Join(filebeat.TempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", filebeat.TempDir(), 5067) + s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) s = s + ` setup.template.name: logs-filebeat-default setup.template.pattern: logs-filebeat-default @@ -111,7 +110,7 @@ setup.template.pattern: logs-filebeat-default filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") require.NoError(t, err) t.Logf("otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) - return otelDocs.Hits.Total.Value < 0 && filebeatDocs.Hits.Total.Value >= numEvents + return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents }, 2*time.Minute, 1*time.Second, "otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) From 59a66b5493eaeaba7168f2ca6ced26852a09f200 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 16 Jun 2025 18:47:42 +0530 Subject: [PATCH 18/32] changed getESCLient --- libbeat/tests/integration/framework.go | 2 +- x-pack/filebeat/tests/integration/otel_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 41a9bc5c560c..9f9d00fe2a99 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -729,7 +729,7 @@ func GetESClient(t *testing.T, scheme string) *elasticsearch.Client { // prepare to query ES esCfg := elasticsearch.Config{ - Addresses: []string{esURL.String()}, + Addresses: []string{fmt.Sprintf("%s://%s", esURL.Scheme, esURL.Host)}, Username: u, Password: p, Transport: &http.Transport{ diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index bac8b8ea6dac..dc150647e572 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -48,6 +48,7 @@ processors: - add_docker_metadata: ~ - add_kubernetes_metadata: ~ queue.mem.flush.timeout: 0s +path.home: %s http.enabled: true http.host: localhost http.port: %d @@ -66,7 +67,7 @@ func TestFilebeatOTelE2E(t *testing.T) { writeEventsToLogFile(t, logFilePath, numEvents) // start collector - err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) + err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) require.NoError(t, err) // wait for collector to be ready @@ -80,7 +81,7 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath = filepath.Join(filebeat.TempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) + s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", filebeat.TempDir(), 5067) s = s + ` setup.template.name: logs-filebeat-default setup.template.pattern: logs-filebeat-default From 8535993e6f78edb45e5801ca931de6f6c261bd5a Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 08:00:13 +0530 Subject: [PATCH 19/32] use previous test file + GetESClient --- .../filebeat/tests/integration/otel_test.go | 42 +++++++------------ 1 file changed, 16 insertions(+), 26 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index dc150647e572..39d0c8503776 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -20,7 +20,6 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/tests/integration" - oteltest "github.com/elastic/beats/v7/x-pack/libbeat/tests/integration" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" ) @@ -42,13 +41,12 @@ output: username: admin password: testing index: %s +queue.mem.flush.timeout: 0s processors: - add_host_metadata: ~ - add_cloud_metadata: ~ - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ -queue.mem.flush.timeout: 0s -path.home: %s + - add_kubernetes_metadata: ~ http.enabled: true http.host: localhost http.port: %d @@ -58,20 +56,18 @@ func TestFilebeatOTelE2E(t *testing.T) { integration.EnsureESIsRunning(t) numEvents := 1 - // Get collector with empty config - col, err := oteltest.NewTestCollector(t, "filebeat", "") - require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) - // write to a log file - logFilePath := filepath.Join(col.GetTempDir(), "log.log") + logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") + filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) writeEventsToLogFile(t, logFilePath, numEvents) - - // start collector - err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) - require.NoError(t, err) - - // wait for collector to be ready - col.Wait() + filebeatOTel.Start() // start filebeat filebeat := integration.NewBeat( @@ -81,7 +77,7 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath = filepath.Join(filebeat.TempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", filebeat.TempDir(), 5067) + s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) s = s + ` setup.template.name: logs-filebeat-default setup.template.pattern: logs-filebeat-default @@ -90,19 +86,14 @@ setup.template.pattern: logs-filebeat-default filebeat.WriteConfigFile(s) filebeat.Start() - t.Cleanup(func() { - filebeat.Stop() - }) - es := integration.GetESClient(t, "http") var filebeatDocs estools.Documents var otelDocs estools.Documents - // wait for logs to be published require.Eventually(t, func() bool { - findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) defer findCancel() otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") @@ -110,10 +101,10 @@ setup.template.pattern: logs-filebeat-default filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") require.NoError(t, err) - t.Logf("otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) + return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents }, - 2*time.Minute, 1*time.Second, "otel docs = %d, filebeat docs = %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) + 2*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) filebeatDoc := filebeatDocs.Hits.Hits[0].Source otelDoc := otelDocs.Hits.Hits[0].Source @@ -124,7 +115,6 @@ setup.template.pattern: logs-filebeat-default "agent.id", "log.file.inode", "log.file.path", - "container.id", } assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") From e1dfc1869f590686da16327fd33db6ac564cd7cd Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 08:10:42 +0530 Subject: [PATCH 20/32] add error --- x-pack/filebeat/tests/integration/otel_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 39d0c8503776..339cc39a37de 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -90,6 +90,7 @@ setup.template.pattern: logs-filebeat-default var filebeatDocs estools.Documents var otelDocs estools.Documents + var err error // wait for logs to be published require.Eventually(t, func() bool { From 1866a4bcf3d3dbd055eb1cedb2e9d8b8d7142433 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 08:52:59 +0530 Subject: [PATCH 21/32] use previous client --- x-pack/filebeat/tests/integration/otel_test.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 339cc39a37de..75cef1517a60 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -8,6 +8,7 @@ package integration import ( "context" + "crypto/tls" "fmt" "net/http" "os" @@ -22,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" + "github.com/elastic/go-elasticsearch/v7" ) var beatsCfgFile = ` @@ -86,7 +88,19 @@ setup.template.pattern: logs-filebeat-default filebeat.WriteConfigFile(s) filebeat.Start() - es := integration.GetESClient(t, "http") + // prepare to query ES + esCfg := elasticsearch.Config{ + Addresses: []string{"http://localhost:9200"}, + Username: "admin", + Password: "testing", + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // this is only for testing + }, + }, + } + es, err := elasticsearch.NewClient(esCfg) + require.NoError(t, err) var filebeatDocs estools.Documents var otelDocs estools.Documents From 3d268e251b42c5d98d9a76a7500d2ffcbcaa9b64 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 08:55:50 +0530 Subject: [PATCH 22/32] change v7 to v8 --- x-pack/filebeat/tests/integration/otel_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 75cef1517a60..47c82b05ff79 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -23,7 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/tests/integration" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" - "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v8" ) var beatsCfgFile = ` From b0c49ff726fb03711fb909a6476f3ffeb71f331b Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 09:02:08 +0530 Subject: [PATCH 23/32] remove err --- x-pack/filebeat/tests/integration/otel_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 47c82b05ff79..8aa997c2db32 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -104,7 +104,6 @@ setup.template.pattern: logs-filebeat-default var filebeatDocs estools.Documents var otelDocs estools.Documents - var err error // wait for logs to be published require.Eventually(t, func() bool { From 80eb9d151eaf6770fb5e85198654b6e8478d0251 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 09:40:17 +0530 Subject: [PATCH 24/32] add new collector --- .../filebeat/tests/integration/otel_test.go | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 8aa997c2db32..1e10cf919f53 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/tests/integration" + oteltest "github.com/elastic/beats/v7/x-pack/libbeat/tests/integration" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" "github.com/elastic/go-elasticsearch/v8" @@ -43,6 +44,7 @@ output: username: admin password: testing index: %s +path.home: %s queue.mem.flush.timeout: 0s processors: - add_host_metadata: ~ @@ -58,18 +60,19 @@ func TestFilebeatOTelE2E(t *testing.T) { integration.EnsureESIsRunning(t) numEvents := 1 - // start filebeat in otel mode - filebeatOTel := integration.NewBeat( - t, - "filebeat-otel", - "../../filebeat.test", - "otel", - ) - - logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") - filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) + // Get collector with empty config + col, err := oteltest.NewTestCollector(t, "filebeat", "") + require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) + // write to a log file + logFilePath := filepath.Join(col.GetTempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - filebeatOTel.Start() + + // start collector + err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) + require.NoError(t, err) + + // wait for collector to be ready + col.Wait() // start filebeat filebeat := integration.NewBeat( @@ -79,7 +82,7 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath = filepath.Join(filebeat.TempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) - s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) + s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", filebeat.TempDir(), 5067) s = s + ` setup.template.name: logs-filebeat-default setup.template.pattern: logs-filebeat-default @@ -116,6 +119,7 @@ setup.template.pattern: logs-filebeat-default filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") require.NoError(t, err) + t.Logf("otel docs = %d, filebeat docs %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents }, 2*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) From 23e8497ed6941e5b7232f39a544bc53eca3c2d47 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 13:13:17 +0530 Subject: [PATCH 25/32] add otelbeat binary --- .buildkite/x-pack/pipeline.xpack.filebeat.yml | 30 +++ x-pack/filebeat/cmd/otel_test.go | 44 ---- x-pack/filebeat/cmd/otelcmd_disabled.go | 15 ++ x-pack/filebeat/cmd/otelcmd_enabled.go | 17 ++ x-pack/filebeat/cmd/root.go | 3 +- .../tests/oteltest/filestream_test.go | 190 ++++++++++++++++++ 6 files changed, 253 insertions(+), 46 deletions(-) delete mode 100644 x-pack/filebeat/cmd/otel_test.go create mode 100644 x-pack/filebeat/cmd/otelcmd_disabled.go create mode 100644 x-pack/filebeat/cmd/otelcmd_enabled.go create mode 100644 x-pack/filebeat/tests/oteltest/filestream_test.go diff --git a/.buildkite/x-pack/pipeline.xpack.filebeat.yml b/.buildkite/x-pack/pipeline.xpack.filebeat.yml index 52790806cf41..af7b00d2e0c0 100644 --- a/.buildkite/x-pack/pipeline.xpack.filebeat.yml +++ b/.buildkite/x-pack/pipeline.xpack.filebeat.yml @@ -180,6 +180,36 @@ steps: - github_commit_status: context: "x-pack/filebeat: Go Integration Tests" + - label: ":ubuntu: x-pack/filebeat: OTel Go Integration Tests" + key: "x-pack-filebeat-mandatory-int-test" +## TODO: add mage command + command: | + cd x-pack/filebeat + go test -c -tags otelbeat + go test -tags "integration otelbeat" ./tests/oteltest + retry: + automatic: + - limit: 1 + agents: + provider: "gcp" + image: "${IMAGE_UBUNTU_X86_64}" + machineType: "${GCP_DEFAULT_MACHINE_TYPE}" + artifact_paths: + - "x-pack/filebeat/build/*.xml" + - "x-pack/filebeat/build/*.json" + - "x-pack/filebeat/build/integration-tests/*" + - "x-pack/filebeat/build/integration-tests/Test*/*" + - "x-pack/filebeat/build/integration-tests/Test*/data/**/*" + plugins: + - test-collector#v1.10.2: + files: "x-pack/filebeat/build/TEST-*.xml" + format: "junit" + branches: "main" + debug: true + notify: + - github_commit_status: + context: "x-pack/filebeat: OTel Go Integration Tests" + - label: ":ubuntu: x-pack/filebeat: Go fips140=only Integration Tests" command: | cd x-pack/filebeat diff --git a/x-pack/filebeat/cmd/otel_test.go b/x-pack/filebeat/cmd/otel_test.go deleted file mode 100644 index f50a0bfc39af..000000000000 --- a/x-pack/filebeat/cmd/otel_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package cmd - -import ( - "context" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" -) - -func TestOtel(t *testing.T) { - rootDir, _ := filepath.Abs("../filebeat-otel.yml") - - // Create the command - cmd := otelbeat.OTelCmd("filebeat") - cmd.SetArgs([]string{"otel", "--config", rootDir}) - - // Set up a context with a timeout to avoid indefinite blocking - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - // Run the command in a goroutine - errCh := make(chan error, 1) - go func() { - err := cmd.Execute() - errCh <- err - }() - - // Wait for 15s to check there were no errors in starting the otel collector - select { - case err := <-errCh: - // Assert no error occurred - require.NoError(t, err, "cmd.RunE should not return an error") - case <-ctx.Done(): - return - } -} diff --git a/x-pack/filebeat/cmd/otelcmd_disabled.go b/x-pack/filebeat/cmd/otelcmd_disabled.go new file mode 100644 index 000000000000..af7e1424dd0a --- /dev/null +++ b/x-pack/filebeat/cmd/otelcmd_disabled.go @@ -0,0 +1,15 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !otelbeat + +package cmd + +import ( + cmd "github.com/elastic/beats/v7/libbeat/cmd" +) + +func addOTelCommand(command *cmd.BeatsRootCmd) { + // No-op +} diff --git a/x-pack/filebeat/cmd/otelcmd_enabled.go b/x-pack/filebeat/cmd/otelcmd_enabled.go new file mode 100644 index 000000000000..09bf09cdade2 --- /dev/null +++ b/x-pack/filebeat/cmd/otelcmd_enabled.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build otelbeat + +package cmd + +import ( + fbcmd "github.com/elastic/beats/v7/filebeat/cmd" + cmd "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" +) + +func addOTelCommand(command *cmd.BeatsRootCmd) { + command.AddCommand(otelbeat.OTelCmd(fbcmd.Name)) +} diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index 1e3ad48c2a02..9b44ebb55960 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -17,7 +17,6 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/x-pack/filebeat/include" inputs "github.com/elastic/beats/v7/x-pack/filebeat/input/default-inputs" - "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" "github.com/elastic/beats/v7/x-pack/libbeat/management" // Register the includes. @@ -41,7 +40,7 @@ func Filebeat() *cmd.BeatsRootCmd { command.PersistentPreRun = func(cmd *cobra.Command, args []string) { management.ConfigTransform.SetTransform(filebeatCfg) } - command.AddCommand(otelbeat.OTelCmd(fbcmd.Name)) + addOTelCommand(command) return command } diff --git a/x-pack/filebeat/tests/oteltest/filestream_test.go b/x-pack/filebeat/tests/oteltest/filestream_test.go new file mode 100644 index 000000000000..9644dcf2cfd0 --- /dev/null +++ b/x-pack/filebeat/tests/oteltest/filestream_test.go @@ -0,0 +1,190 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration && otelbeat + +package integration + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/testing/estools" + "github.com/elastic/go-elasticsearch/v8" +) + +var beatsCfgFile = ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false + paths: + - %s +output: + elasticsearch: + hosts: + - localhost:9200 + protocol: http + username: admin + password: testing + index: %s +queue.mem.flush.timeout: 0s +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +http.enabled: true +http.host: localhost +http.port: %d +` + +func TestFilebeatOTelE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + numEvents := 1 + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") + filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) + writeEventsToLogFile(t, logFilePath, numEvents) + filebeatOTel.Start() + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + logFilePath = filepath.Join(filebeat.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, numEvents) + s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) + s = s + ` +setup.template.name: logs-filebeat-default +setup.template.pattern: logs-filebeat-default +` + + filebeat.WriteConfigFile(s) + filebeat.Start() + + // prepare to query ES + esCfg := elasticsearch.Config{ + Addresses: []string{"http://localhost:9200"}, + Username: "admin", + Password: "testing", + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // this is only for testing + }, + }, + } + es, err := elasticsearch.NewClient(esCfg) + require.NoError(t, err) + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + // wait for logs to be published + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") + require.NoError(t, err) + + filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") + require.NoError(t, err) + + return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents + }, + 2*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "log.file.inode", + "log.file.path", + } + + assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + assertMonitoring(t) +} + +func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { + t.Helper() + logFile, err := os.Create(filename) + if err != nil { + t.Fatalf("could not create file '%s': %s", filename, err) + } + // write events to log file + for i := 0; i < numEvents; i++ { + msg := fmt.Sprintf("Line %d", i) + _, err = logFile.Write([]byte(msg + "\n")) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + } + + if err := logFile.Sync(); err != nil { + t.Fatalf("could not sync log file '%s': %s", filename, err) + } + if err := logFile.Close(); err != nil { + t.Fatalf("could not close log file '%s': %s", filename, err) + } +} + +func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { + t.Helper() + + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + for _, f := range ignoredFields { + hasKeyM1, _ := flatM1.HasKey(f) + hasKeyM2, _ := flatM2.HasKey(f) + + if !hasKeyM1 && !hasKeyM2 { + assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) + } + + flatM1.Delete(f) + flatM2.Delete(f) + } + require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") +} + +func assertMonitoring(t *testing.T) { + r, err := http.Get("http://localhost:5066") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get("http://localhost:5066/stats") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get("http://localhost:5066/not-exist") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") +} From e37dbead3faf231680cdc61cbacb85223403902e Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 13:22:17 +0530 Subject: [PATCH 26/32] fix pipeline step --- .buildkite/x-pack/pipeline.xpack.filebeat.yml | 3 +- .../filebeat/tests/integration/otel_test.go | 194 ------------------ .../tests/integration/oteltest_framework.go | 179 ---------------- 3 files changed, 2 insertions(+), 374 deletions(-) delete mode 100644 x-pack/filebeat/tests/integration/otel_test.go delete mode 100644 x-pack/libbeat/tests/integration/oteltest_framework.go diff --git a/.buildkite/x-pack/pipeline.xpack.filebeat.yml b/.buildkite/x-pack/pipeline.xpack.filebeat.yml index af7b00d2e0c0..fb0fe1c13716 100644 --- a/.buildkite/x-pack/pipeline.xpack.filebeat.yml +++ b/.buildkite/x-pack/pipeline.xpack.filebeat.yml @@ -181,10 +181,11 @@ steps: context: "x-pack/filebeat: Go Integration Tests" - label: ":ubuntu: x-pack/filebeat: OTel Go Integration Tests" - key: "x-pack-filebeat-mandatory-int-test" + key: "x-pack-filebeat-mandatory-otel-int-test" ## TODO: add mage command command: | cd x-pack/filebeat + mage docker:composeUp go test -c -tags otelbeat go test -tags "integration otelbeat" ./tests/oteltest retry: diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go deleted file mode 100644 index 1e10cf919f53..000000000000 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration && !agentbeat - -package integration - -import ( - "context" - "crypto/tls" - "fmt" - "net/http" - "os" - "path/filepath" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/tests/integration" - oteltest "github.com/elastic/beats/v7/x-pack/libbeat/tests/integration" - "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/elastic-agent-libs/testing/estools" - "github.com/elastic/go-elasticsearch/v8" -) - -var beatsCfgFile = ` -filebeat.inputs: - - type: filestream - id: filestream-input-id - enabled: true - file_identity.native: ~ - prospector.scanner.fingerprint.enabled: false - paths: - - %s -output: - elasticsearch: - hosts: - - localhost:9200 - protocol: http - username: admin - password: testing - index: %s -path.home: %s -queue.mem.flush.timeout: 0s -processors: - - add_host_metadata: ~ - - add_cloud_metadata: ~ - - add_docker_metadata: ~ - - add_kubernetes_metadata: ~ -http.enabled: true -http.host: localhost -http.port: %d -` - -func TestFilebeatOTelE2E(t *testing.T) { - integration.EnsureESIsRunning(t) - numEvents := 1 - - // Get collector with empty config - col, err := oteltest.NewTestCollector(t, "filebeat", "") - require.NoError(t, err, fmt.Sprintf("could not get new collector due to %v", err)) - // write to a log file - logFilePath := filepath.Join(col.GetTempDir(), "log.log") - writeEventsToLogFile(t, logFilePath, numEvents) - - // start collector - err = col.ReloadCollectorWithConfig(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", col.GetTempDir(), 5066)) - require.NoError(t, err) - - // wait for collector to be ready - col.Wait() - - // start filebeat - filebeat := integration.NewBeat( - t, - "filebeat", - "../../filebeat.test", - ) - logFilePath = filepath.Join(filebeat.TempDir(), "log.log") - writeEventsToLogFile(t, logFilePath, numEvents) - s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", filebeat.TempDir(), 5067) - s = s + ` -setup.template.name: logs-filebeat-default -setup.template.pattern: logs-filebeat-default -` - - filebeat.WriteConfigFile(s) - filebeat.Start() - - // prepare to query ES - esCfg := elasticsearch.Config{ - Addresses: []string{"http://localhost:9200"}, - Username: "admin", - Password: "testing", - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, //nolint:gosec // this is only for testing - }, - }, - } - es, err := elasticsearch.NewClient(esCfg) - require.NoError(t, err) - - var filebeatDocs estools.Documents - var otelDocs estools.Documents - // wait for logs to be published - require.Eventually(t, - func() bool { - findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer findCancel() - - otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") - require.NoError(t, err) - - filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") - require.NoError(t, err) - - t.Logf("otel docs = %d, filebeat docs %d", otelDocs.Hits.Total.Value, filebeatDocs.Hits.Total.Value) - return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents - }, - 2*time.Minute, 1*time.Second, fmt.Sprintf("Number of hits %d not equal to number of events for %d", filebeatDocs.Hits.Total.Value, numEvents)) - - filebeatDoc := filebeatDocs.Hits.Hits[0].Source - otelDoc := otelDocs.Hits.Hits[0].Source - ignoredFields := []string{ - // Expected to change between agentDocs and OtelDocs - "@timestamp", - "agent.ephemeral_id", - "agent.id", - "log.file.inode", - "log.file.path", - } - - assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") - assertMonitoring(t) -} - -func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { - t.Helper() - logFile, err := os.Create(filename) - if err != nil { - t.Fatalf("could not create file '%s': %s", filename, err) - } - // write events to log file - for i := 0; i < numEvents; i++ { - msg := fmt.Sprintf("Line %d", i) - _, err = logFile.Write([]byte(msg + "\n")) - require.NoErrorf(t, err, "failed to write line %d to temp file", i) - } - - if err := logFile.Sync(); err != nil { - t.Fatalf("could not sync log file '%s': %s", filename, err) - } - if err := logFile.Close(); err != nil { - t.Fatalf("could not close log file '%s': %s", filename, err) - } -} - -func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { - t.Helper() - - flatM1 := m1.Flatten() - flatM2 := m2.Flatten() - for _, f := range ignoredFields { - hasKeyM1, _ := flatM1.HasKey(f) - hasKeyM2, _ := flatM2.HasKey(f) - - if !hasKeyM1 && !hasKeyM2 { - assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) - } - - flatM1.Delete(f) - flatM2.Delete(f) - } - require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") -} - -func assertMonitoring(t *testing.T) { - r, err := http.Get("http://localhost:5066") //nolint:noctx,bodyclose // fine for tests - require.NoError(t, err) - require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - - r, err = http.Get("http://localhost:5066/stats") //nolint:noctx,bodyclose // fine for tests - require.NoError(t, err) - require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - - r, err = http.Get("http://localhost:5066/not-exist") //nolint:noctx,bodyclose // fine for tests - require.NoError(t, err) - require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") -} diff --git a/x-pack/libbeat/tests/integration/oteltest_framework.go b/x-pack/libbeat/tests/integration/oteltest_framework.go deleted file mode 100644 index bc1c274c4910..000000000000 --- a/x-pack/libbeat/tests/integration/oteltest_framework.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration && !agentbeat - -package integration - -import ( - "fmt" - "os" - "path/filepath" - "sync" - "testing" - "time" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/otelcol" - - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest" - "go.uber.org/zap/zaptest/observer" - - "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" - "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" - "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" -) - -var schemeMap = map[string]string{ - "filebeat": "fb", -} - -type TestCollector struct { - t *testing.T - tempDir string - otelcol *otelcol.Collector - wg sync.WaitGroup - configFile string - beatname string - observedLogs *observer.ObservedLogs -} - -// NewTestCollector configures and returns an instance of otel collector intended for testing only -func NewTestCollector(t *testing.T, beatname string, config string) (*TestCollector, error) { - // create a temp dir at beat/build/integration-tests - tempDir := integration.CreateTempDir(t) - - // create a configfile - configFile := filepath.Join(tempDir, beatname+".yml") - - testCol := &TestCollector{ - t: t, - tempDir: tempDir, - wg: sync.WaitGroup{}, - configFile: configFile, - beatname: beatname, - } - - // write configuration to a file - err := testCol.reloadConfig(config) - return testCol, err -} - -// NewTestStartCollector configures and starts an otel collector intended for testing only -func NewTestStartCollector(t *testing.T, beatname string, config string) (*TestCollector, error) { - otelcol, err := NewTestCollector(t, beatname, config) - if err != nil { - return nil, err - } - err = otelcol.Run() - if err != nil { - return nil, err - } - - return otelcol, err -} - -// reloadConfig reloads configuration with which collector should be started. -// Note: A running collector will not pick the new config until it is stopped and started -func (c *TestCollector) reloadConfig(config string) error { - // write configuration to a file - if err := os.WriteFile(c.configFile, []byte(config), 0o644); err != nil { - return fmt.Errorf("cannot create config file '%s': %s", c.configFile, err) - } - // adds scheme name as prefix to the configfile - beatCfg := schemeMap[c.beatname] + ":" + c.configFile - // get collector settings - set, observedLogs := getCollectorSettings(beatCfg) - - c.observedLogs = observedLogs - // get new collector instance - otelcol, err := otelcol.NewCollector(set) - c.otelcol = otelcol - return err -} - -// ReloadCollectorWithConfig reloads the collector with given config -func (c *TestCollector) ReloadCollectorWithConfig(config string) error { - // shutdown collector if it is running - c.Shutdown() - err := c.reloadConfig(config) - if err != nil { - return err - } - return c.Run() -} - -func (c *TestCollector) GetTempDir() string { - return c.tempDir -} - -// Run starts the otel collector -func (c *TestCollector) Run() error { - - wg := sync.WaitGroup{} - var err error - wg.Add(1) - go func() { - defer wg.Done() - err = c.otelcol.Run(c.t.Context()) - }() - - c.t.Cleanup(func() { - c.Shutdown() - return - }) - - return err -} - -func (c *TestCollector) Shutdown() { - c.otelcol.Shutdown() -} - -// Wait waits for the collector to be ready -func (c *TestCollector) Wait() { - require.Eventually(c.t, func() bool { - return c.observedLogs.FilterMessage("Everything is ready. Begin running and processing data.").Len() > 0 - }, 10*time.Second, 100*time.Millisecond) -} - -func getCollectorSettings(filename string) (otelcol.CollectorSettings, *observer.ObservedLogs) { - // initialize collector settings - info := component.BuildInfo{ - Command: "otel-test", - Description: "Beats OTel", - Version: "9.1.0", - } - - zapCore := zapcore.NewCore( - zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), - &zaptest.Discarder{}, - zapcore.DebugLevel, - ) - observed, zapLogs := observer.New(zapcore.DebugLevel) - - return otelcol.CollectorSettings{ - BuildInfo: info, - Factories: otelbeat.GetComponent, - ConfigProviderSettings: otelcol.ConfigProviderSettings{ - ResolverSettings: confmap.ResolverSettings{ - URIs: []string{filename}, - ProviderFactories: []confmap.ProviderFactory{ - fbprovider.NewFactory(), - }, - ConverterFactories: []confmap.ConverterFactory{ - beatconverter.NewFactory(), - }, - }, - }, - LoggingOptions: []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core { - return zapcore.NewTee(zapCore, observed) - })}, - }, zapLogs -} From c42990a2f5c7b663b19da99492a7183345a9afbd Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 14:10:38 +0530 Subject: [PATCH 27/32] remove unnecessary code --- libbeat/tests/integration/framework.go | 38 +++----------------------- x-pack/libbeat/common/otelbeat/otel.go | 4 +-- 2 files changed, 6 insertions(+), 36 deletions(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 9f9d00fe2a99..86c3b868e9c9 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -23,7 +23,6 @@ import ( "bufio" "bytes" "context" - "crypto/tls" "encoding/json" "errors" "fmt" @@ -45,8 +44,6 @@ import ( "github.com/gofrs/uuid/v5" "github.com/stretchr/testify/require" - - "github.com/elastic/go-elasticsearch/v8" ) type BeatProc struct { @@ -106,7 +103,7 @@ type Total struct { // `args` will be passed as CLI arguments to the Beat func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { require.FileExistsf(t, binary, "beat binary must exists") - tempDir := CreateTempDir(t) + tempDir := createTempDir(t) configFile := filepath.Join(tempDir, beatName+".yml") stdoutFile, err := os.Create(filepath.Join(tempDir, "stdout")) @@ -145,7 +142,7 @@ func NewBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { // See `NewBeat` for options and information for the parameters. func NewAgentBeat(t *testing.T, beatName, binary string, args ...string) *BeatProc { require.FileExistsf(t, binary, "agentbeat binary must exists") - tempDir := CreateTempDir(t) + tempDir := createTempDir(t) configFile := filepath.Join(tempDir, beatName+".yml") stdoutFile, err := os.Create(filepath.Join(tempDir, "stdout")) @@ -651,8 +648,7 @@ func (b *BeatProc) openEventLogFile() *os.File { // // If the tests are run with -v, the temporary directory will // be logged. -func CreateTempDir(t *testing.T) string { - t.Helper() +func createTempDir(t *testing.T) string { rootDir, err := filepath.Abs("../../build/integration-tests") if err != nil { t.Fatalf("failed to determine absolute path for temp dir: %s", err) @@ -669,7 +665,7 @@ func CreateTempDir(t *testing.T) string { cleanup := func() { if !t.Failed() { if err := os.RemoveAll(tempDir); err != nil { - // Ugly workaround Windows limitations + // Ungly workaround Windows limitations // Windows does not support the Interrup signal, so it might // happen that Filebeat is still running, keeping it's registry // file open, thus preventing the temporary folder from being @@ -694,7 +690,6 @@ func CreateTempDir(t *testing.T) string { // using the default test credentials or the corresponding environment // variables. func EnsureESIsRunning(t *testing.T) { - t.Helper() esURL := GetESURL(t, "http") ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second)) @@ -721,31 +716,6 @@ func EnsureESIsRunning(t *testing.T) { } } -func GetESClient(t *testing.T, scheme string) *elasticsearch.Client { - esURL := GetESURL(t, scheme) - - u := esURL.User.Username() - p, _ := esURL.User.Password() - - // prepare to query ES - esCfg := elasticsearch.Config{ - Addresses: []string{fmt.Sprintf("%s://%s", esURL.Scheme, esURL.Host)}, - Username: u, - Password: p, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, //nolint:gosec // this is only for testing - }, - }, - } - es, err := elasticsearch.NewClient(esCfg) - if err != nil { - t.Fatalf("cannot create Elasticsearch client: %s", err) - } - - return es -} - func (b *BeatProc) FileContains(filename string, match string) string { file, err := os.Open(filename) require.NoErrorf(b.t, err, "error opening: %s", filename) diff --git a/x-pack/libbeat/common/otelbeat/otel.go b/x-pack/libbeat/common/otelbeat/otel.go index 153ae8319258..f338584cde9a 100644 --- a/x-pack/libbeat/common/otelbeat/otel.go +++ b/x-pack/libbeat/common/otelbeat/otel.go @@ -53,7 +53,7 @@ func OTelCmd(beatname string) *cobra.Command { } // Component initializes collector components -func GetComponent() (otelcol.Factories, error) { +func getComponent() (otelcol.Factories, error) { receivers, err := otelcol.MakeFactoryMap( fbreceiver.NewFactory(), ) @@ -86,7 +86,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings { return otelcol.CollectorSettings{ BuildInfo: info, - Factories: GetComponent, + Factories: getComponent, ConfigProviderSettings: otelcol.ConfigProviderSettings{ ResolverSettings: confmap.ResolverSettings{ URIs: []string{filename}, From 517b486f2cb2a486798c0b92cba8fe051eeb4285 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 16:51:24 +0530 Subject: [PATCH 28/32] add mage command --- .buildkite/x-pack/pipeline.xpack.filebeat.yml | 5 +---- dev-tools/mage/gotest.go | 18 ++++++++++++++++++ x-pack/filebeat/magefile.go | 7 +++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/.buildkite/x-pack/pipeline.xpack.filebeat.yml b/.buildkite/x-pack/pipeline.xpack.filebeat.yml index fb0fe1c13716..be40eb92ca8b 100644 --- a/.buildkite/x-pack/pipeline.xpack.filebeat.yml +++ b/.buildkite/x-pack/pipeline.xpack.filebeat.yml @@ -182,12 +182,9 @@ steps: - label: ":ubuntu: x-pack/filebeat: OTel Go Integration Tests" key: "x-pack-filebeat-mandatory-otel-int-test" -## TODO: add mage command command: | cd x-pack/filebeat - mage docker:composeUp - go test -c -tags otelbeat - go test -tags "integration otelbeat" ./tests/oteltest + mage goOTelIntegTest retry: automatic: - limit: 1 diff --git a/dev-tools/mage/gotest.go b/dev-tools/mage/gotest.go index 6d6590a1c31b..dc9f34304fcf 100644 --- a/dev-tools/mage/gotest.go +++ b/dev-tools/mage/gotest.go @@ -184,6 +184,17 @@ func DefaultGoTestIntegrationFromHostArgs() GoTestArgs { return args } +// DefaultGoTestIntegrationFromHostArgs returns a default set of arguments for running +// all integration tests from the host system (outside the docker network). +func DefaultOTelIntegrationFromHostArgs() GoTestArgs { + args := DefaultGoTestIntegrationArgs() + // overwrite package path + args.Packages = []string{"./tests/oteltest"} + args.Tags = append(args.Tags, "otelbeat") + args.Env = WithGoIntegTestHostEnv(args.Env) + return args +} + // FIPSOnlyGoTestIngrationFromHostArgs returns a default set of arguments for running // all integration tests from the host system (outside the docker network) along // with the GODEBUG=fips140=only arg set. @@ -471,6 +482,13 @@ func BuildSystemTestBinary() error { return BuildSystemTestGoBinary(DefaultTestBinaryArgs()) } +// BuildSystemTestOTelBinary builds beat binary that includes otel. +func BuildSystemTestOTelBinary() error { + args := DefaultTestBinaryArgs() + args.ExtraFlags = []string{"-tags", "otelbeat"} + return BuildSystemTestGoBinary(args) +} + // BuildSystemTestGoBinary build a binary for testing that is instrumented for // testing and measuring code coverage. The binary is only instrumented for // coverage when TEST_COVERAGE=true (default is false). diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index cd1cff476781..b7c2e50c66e6 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -172,6 +172,13 @@ func GoIntegTest(ctx context.Context) error { return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs()) } +// GoOTelIntegTest starts the docker containers and executes OTel integration tests. +func GoOTelIntegTest(ctx context.Context) error { + // build otel binary + devtools.BuildSystemTestOTelBinary() + return devtools.GoIntegTestFromHost(ctx, devtools.DefaultOTelIntegrationFromHostArgs()) +} + // GoFIPSOnlyIntegTest starts the docker containers and executes the Go integration tests with GODEBUG=fips140=only set. func GoFIPSOnlyIntegTest(ctx context.Context) error { mg.Deps(BuildSystemTestBinary) From d4c9204b50854fe7ec8f8049fd00b5ab92e88f2a Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 16:59:18 +0530 Subject: [PATCH 29/32] add otel integ test --- .buildkite/x-pack/pipeline.xpack.filebeat.yml | 2 +- x-pack/filebeat/magefile.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.buildkite/x-pack/pipeline.xpack.filebeat.yml b/.buildkite/x-pack/pipeline.xpack.filebeat.yml index be40eb92ca8b..3f23dc5df2cc 100644 --- a/.buildkite/x-pack/pipeline.xpack.filebeat.yml +++ b/.buildkite/x-pack/pipeline.xpack.filebeat.yml @@ -184,7 +184,7 @@ steps: key: "x-pack-filebeat-mandatory-otel-int-test" command: | cd x-pack/filebeat - mage goOTelIntegTest + mage otelIntegTest retry: automatic: - limit: 1 diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index b7c2e50c66e6..7e06c0709ada 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -172,8 +172,8 @@ func GoIntegTest(ctx context.Context) error { return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs()) } -// GoOTelIntegTest starts the docker containers and executes OTel integration tests. -func GoOTelIntegTest(ctx context.Context) error { +// OTelIntegTest starts the docker containers and executes OTel integration tests. +func OtelIntegTest(ctx context.Context) error { // build otel binary devtools.BuildSystemTestOTelBinary() return devtools.GoIntegTestFromHost(ctx, devtools.DefaultOTelIntegrationFromHostArgs()) From 6c52aa66dae6dcfd9e0f561f2529a4c1eebca4ff Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Tue, 17 Jun 2025 17:24:02 +0530 Subject: [PATCH 30/32] Update .buildkite/x-pack/pipeline.xpack.filebeat.yml Co-authored-by: Victor Martinez --- .buildkite/x-pack/pipeline.xpack.filebeat.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.buildkite/x-pack/pipeline.xpack.filebeat.yml b/.buildkite/x-pack/pipeline.xpack.filebeat.yml index 3f23dc5df2cc..dcbcb8914a91 100644 --- a/.buildkite/x-pack/pipeline.xpack.filebeat.yml +++ b/.buildkite/x-pack/pipeline.xpack.filebeat.yml @@ -181,7 +181,6 @@ steps: context: "x-pack/filebeat: Go Integration Tests" - label: ":ubuntu: x-pack/filebeat: OTel Go Integration Tests" - key: "x-pack-filebeat-mandatory-otel-int-test" command: | cd x-pack/filebeat mage otelIntegTest From 64a035af4ddd13adfb09a85c7b3e375646d0b19d Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 18 Jun 2025 10:19:28 +0530 Subject: [PATCH 31/32] Remove extra stage - compile otel binary with integ test --- .buildkite/x-pack/pipeline.xpack.filebeat.yml | 27 ------------------- dev-tools/mage/gotest.go | 11 -------- x-pack/filebeat/magefile.go | 17 ++++++------ .../otel_test.go} | 2 +- 4 files changed, 10 insertions(+), 47 deletions(-) rename x-pack/filebeat/tests/{oteltest/filestream_test.go => integration/otel_test.go} (99%) diff --git a/.buildkite/x-pack/pipeline.xpack.filebeat.yml b/.buildkite/x-pack/pipeline.xpack.filebeat.yml index dcbcb8914a91..52790806cf41 100644 --- a/.buildkite/x-pack/pipeline.xpack.filebeat.yml +++ b/.buildkite/x-pack/pipeline.xpack.filebeat.yml @@ -180,33 +180,6 @@ steps: - github_commit_status: context: "x-pack/filebeat: Go Integration Tests" - - label: ":ubuntu: x-pack/filebeat: OTel Go Integration Tests" - command: | - cd x-pack/filebeat - mage otelIntegTest - retry: - automatic: - - limit: 1 - agents: - provider: "gcp" - image: "${IMAGE_UBUNTU_X86_64}" - machineType: "${GCP_DEFAULT_MACHINE_TYPE}" - artifact_paths: - - "x-pack/filebeat/build/*.xml" - - "x-pack/filebeat/build/*.json" - - "x-pack/filebeat/build/integration-tests/*" - - "x-pack/filebeat/build/integration-tests/Test*/*" - - "x-pack/filebeat/build/integration-tests/Test*/data/**/*" - plugins: - - test-collector#v1.10.2: - files: "x-pack/filebeat/build/TEST-*.xml" - format: "junit" - branches: "main" - debug: true - notify: - - github_commit_status: - context: "x-pack/filebeat: OTel Go Integration Tests" - - label: ":ubuntu: x-pack/filebeat: Go fips140=only Integration Tests" command: | cd x-pack/filebeat diff --git a/dev-tools/mage/gotest.go b/dev-tools/mage/gotest.go index dc9f34304fcf..de7db7e3825a 100644 --- a/dev-tools/mage/gotest.go +++ b/dev-tools/mage/gotest.go @@ -184,17 +184,6 @@ func DefaultGoTestIntegrationFromHostArgs() GoTestArgs { return args } -// DefaultGoTestIntegrationFromHostArgs returns a default set of arguments for running -// all integration tests from the host system (outside the docker network). -func DefaultOTelIntegrationFromHostArgs() GoTestArgs { - args := DefaultGoTestIntegrationArgs() - // overwrite package path - args.Packages = []string{"./tests/oteltest"} - args.Tags = append(args.Tags, "otelbeat") - args.Env = WithGoIntegTestHostEnv(args.Env) - return args -} - // FIPSOnlyGoTestIngrationFromHostArgs returns a default set of arguments for running // all integration tests from the host system (outside the docker network) along // with the GODEBUG=fips140=only arg set. diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index 7e06c0709ada..5a7768fcac24 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -43,6 +43,13 @@ func Build() error { return devtools.Build(devtools.DefaultBuildArgs()) } +// BuildOTel builds the Beat binary with OTel sub command +func BuildOTel() error { + args := devtools.DefaultBuildArgs() + args.ExtraFlags = append(args.ExtraFlags, "-tags", "otelbeat") + return devtools.Build(args) +} + // BuildSystemTestBinary builds a binary instrumented for use with Python system tests. func BuildSystemTestBinary() error { return devtools.BuildSystemTestBinary() @@ -168,15 +175,9 @@ func IntegTest() { // GoIntegTest starts the docker containers and executes the Go integration tests. func GoIntegTest(ctx context.Context) error { - mg.Deps(BuildSystemTestBinary) - return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs()) -} - -// OTelIntegTest starts the docker containers and executes OTel integration tests. -func OtelIntegTest(ctx context.Context) error { - // build otel binary + // build integration test binary with otel sub command devtools.BuildSystemTestOTelBinary() - return devtools.GoIntegTestFromHost(ctx, devtools.DefaultOTelIntegrationFromHostArgs()) + return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs()) } // GoFIPSOnlyIntegTest starts the docker containers and executes the Go integration tests with GODEBUG=fips140=only set. diff --git a/x-pack/filebeat/tests/oteltest/filestream_test.go b/x-pack/filebeat/tests/integration/otel_test.go similarity index 99% rename from x-pack/filebeat/tests/oteltest/filestream_test.go rename to x-pack/filebeat/tests/integration/otel_test.go index 9644dcf2cfd0..8aa997c2db32 100644 --- a/x-pack/filebeat/tests/oteltest/filestream_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build integration && otelbeat +//go:build integration && !agentbeat package integration From 38202d57be4738aa80ac323283b4d9d91fd69b13 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 18 Jun 2025 10:25:46 +0530 Subject: [PATCH 32/32] add otel test binary for fips too --- x-pack/filebeat/magefile.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index 5a7768fcac24..91fb608c31d5 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -182,7 +182,8 @@ func GoIntegTest(ctx context.Context) error { // GoFIPSOnlyIntegTest starts the docker containers and executes the Go integration tests with GODEBUG=fips140=only set. func GoFIPSOnlyIntegTest(ctx context.Context) error { - mg.Deps(BuildSystemTestBinary) + // build integration test binary with otel sub command + devtools.BuildSystemTestOTelBinary() return devtools.GoIntegTestFromHost(ctx, devtools.FIPSOnlyGoTestIntegrationFromHostArgs()) }