From 5043e5e8834d82f688de26a8a2b0262fc6bcf55c Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 11 Jul 2025 08:15:20 -0300 Subject: [PATCH] otel: update filebeat integration tests to run in isolation (#45314) As a leftover from https://github.com/elastic/beats/pull/45093, the tests added reuse the index names which may cause issues when running the tests in sequence without cleaning up first. Adjusted the existing tests to use unique indexes. (cherry picked from commit ea98564a4943f61ae99597a43973ad00867fce13) # Conflicts: # x-pack/filebeat/tests/integration/otel_test.go --- .../filebeat/tests/integration/otel_test.go | 324 +++++++++++++++++- 1 file changed, 321 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 8aa997c2db32..237b3eca8c8e 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -54,10 +54,16 @@ http.host: localhost http.port: %d ` +<<<<<<< HEAD func TestFilebeatOTelE2E(t *testing.T) { integration.EnsureESIsRunning(t) numEvents := 1 +======= + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + fbOtelIndex := "logs-integration-" + namespace + fbIndex := "logs-filebeat-" + namespace +>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314)) // start filebeat in otel mode filebeatOTel := integration.NewBeat( t, @@ -67,7 +73,7 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") - filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) + filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, fbOtelIndex, 5066)) writeEventsToLogFile(t, logFilePath, numEvents) filebeatOTel.Start() @@ -79,11 +85,15 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath = filepath.Join(filebeat.TempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) +<<<<<<< HEAD s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) s = s + ` setup.template.name: logs-filebeat-default setup.template.pattern: logs-filebeat-default ` +======= + s := fmt.Sprintf(beatsCfgFile, logFilePath, fbIndex, 5067) +>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314)) filebeat.WriteConfigFile(s) filebeat.Start() @@ -110,10 +120,10 @@ setup.template.pattern: logs-filebeat-default findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) defer findCancel() - otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbOtelIndex+"*") require.NoError(t, err) - filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") + filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbIndex+"*") require.NoError(t, err) return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents @@ -188,3 +198,311 @@ func assertMonitoring(t *testing.T) { require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } +<<<<<<< HEAD +======= + +func TestFilebeatOTelReceiverE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + wantEvents := 1 + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + fbReceiverIndex := "logs-integration-" + namespace + filebeatIndex := "logs-filebeat-" + namespace + + otelConfig := struct { + Index string + MonitoringPort int + InputFile string + PathHome string + }{ + Index: fbReceiverIndex, + MonitoringPort: 5066, + InputFile: filepath.Join(filebeatOTel.TempDir(), "log.log"), + PathHome: filebeatOTel.TempDir(), + } + + cfg := `receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{.PathHome}} + http.enabled: true + http.host: localhost + http.port: {{.MonitoringPort}} +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: admin + password: testing + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch/log + - debug +` + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + filebeatOTel.WriteConfigFile(string(configContents)) + writeEventsToLogFile(t, otelConfig.InputFile, wantEvents) + filebeatOTel.Start() + defer filebeatOTel.Stop() + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + beatsCfgFile := ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false + paths: + - %s +output: + elasticsearch: + hosts: + - localhost:9200 + username: admin + password: testing + index: %s +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +setup.template.name: logs-filebeat-default +setup.template.pattern: logs-filebeat-default +http.enabled: true +http.host: localhost +http.port: %d +` + logFilePath := filepath.Join(filebeat.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, wantEvents) + s := fmt.Sprintf(beatsCfgFile, logFilePath, filebeatIndex, 5067) + filebeat.WriteConfigFile(s) + filebeat.Start() + defer filebeat.Stop() + + es := integration.GetESClient(t, "http") + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + // wait for logs to be published + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbReceiverIndex+"*") + require.NoError(t, err) + + filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*") + require.NoError(t, err) + + return otelDocs.Hits.Total.Value >= wantEvents && filebeatDocs.Hits.Total.Value >= wantEvents + }, + 2*time.Minute, 1*time.Second, "expected at least %d events, got filebeat: %d and otel: %d", wantEvents, filebeatDocs.Hits.Total.Value, otelDocs.Hits.Total.Value) + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "log.file.inode", + "log.file.path", + } + + assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + assertMonitoring(t, otelConfig.MonitoringPort) + assertMonitoring(t, 5067) // filebeat +} + +func TestFilebeatOTelMultipleReceiversE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + wantEvents := 100 + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + // write events to log file + logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, wantEvents) + + type receiverConfig struct { + MonitoringPort int + InputFile string + PathHome string + } + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + otelConfig := struct { + Index string + Receivers []receiverConfig + }{ + Index: "logs-integration-" + namespace, + Receivers: []receiverConfig{ + { + MonitoringPort: 5066, + InputFile: logFilePath, + PathHome: filepath.Join(filebeatOTel.TempDir(), "r1"), + }, + { + MonitoringPort: 5067, + InputFile: logFilePath, + PathHome: filepath.Join(filebeatOTel.TempDir(), "r2"), + }, + }, + } + + cfg := `receivers: +{{range $i, $receiver := .Receivers}} + filebeatreceiver/{{$i}}: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{$receiver.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{$receiver.PathHome}} +{{if $receiver.MonitoringPort}} + http.enabled: true + http.host: localhost + http.port: {{$receiver.MonitoringPort}} +{{end}} +{{end}} +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: admin + password: testing + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: +{{range $i, $receiver := .Receivers}} + - filebeatreceiver/{{$i}} +{{end}} + exporters: + - debug + - elasticsearch/log +` + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + filebeatOTel.WriteConfigFile(string(configContents)) + writeEventsToLogFile(t, logFilePath, wantEvents) + filebeatOTel.Start() + defer filebeatOTel.Stop() + + es := integration.GetESClient(t, "http") + + var otelDocs estools.Documents + var err error + + // wait for logs to be published + wantTotalLogs := wantEvents * len(otelConfig.Receivers) + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+otelConfig.Index+"*") + require.NoError(t, err) + + return otelDocs.Hits.Total.Value >= wantTotalLogs + }, + 2*time.Minute, 100*time.Millisecond, "expected at least %d events, got %d", wantTotalLogs, otelDocs.Hits.Total.Value) + for _, rec := range otelConfig.Receivers { + assertMonitoring(t, rec.MonitoringPort) + } +} +>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314))