diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 8aa997c2db32..237b3eca8c8e 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -54,10 +54,16 @@ http.host: localhost http.port: %d ` +<<<<<<< HEAD func TestFilebeatOTelE2E(t *testing.T) { integration.EnsureESIsRunning(t) numEvents := 1 +======= + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + fbOtelIndex := "logs-integration-" + namespace + fbIndex := "logs-filebeat-" + namespace +>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314)) // start filebeat in otel mode filebeatOTel := integration.NewBeat( t, @@ -67,7 +73,7 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") - filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066)) + filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, fbOtelIndex, 5066)) writeEventsToLogFile(t, logFilePath, numEvents) filebeatOTel.Start() @@ -79,11 +85,15 @@ func TestFilebeatOTelE2E(t *testing.T) { ) logFilePath = filepath.Join(filebeat.TempDir(), "log.log") writeEventsToLogFile(t, logFilePath, numEvents) +<<<<<<< HEAD s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067) s = s + ` setup.template.name: logs-filebeat-default setup.template.pattern: logs-filebeat-default ` +======= + s := fmt.Sprintf(beatsCfgFile, logFilePath, fbIndex, 5067) +>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314)) filebeat.WriteConfigFile(s) filebeat.Start() @@ -110,10 +120,10 @@ setup.template.pattern: logs-filebeat-default findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) defer findCancel() - otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*") + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbOtelIndex+"*") require.NoError(t, err) - filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*") + filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbIndex+"*") require.NoError(t, err) return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents @@ -188,3 +198,311 @@ func assertMonitoring(t *testing.T) { require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } +<<<<<<< HEAD +======= + +func TestFilebeatOTelReceiverE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + wantEvents := 1 + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + fbReceiverIndex := "logs-integration-" + namespace + filebeatIndex := "logs-filebeat-" + namespace + + otelConfig := struct { + Index string + MonitoringPort int + InputFile string + PathHome string + }{ + Index: fbReceiverIndex, + MonitoringPort: 5066, + InputFile: filepath.Join(filebeatOTel.TempDir(), "log.log"), + PathHome: filebeatOTel.TempDir(), + } + + cfg := `receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{.PathHome}} + http.enabled: true + http.host: localhost + http.port: {{.MonitoringPort}} +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: admin + password: testing + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch/log + - debug +` + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + filebeatOTel.WriteConfigFile(string(configContents)) + writeEventsToLogFile(t, otelConfig.InputFile, wantEvents) + filebeatOTel.Start() + defer filebeatOTel.Stop() + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + beatsCfgFile := ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false + paths: + - %s +output: + elasticsearch: + hosts: + - localhost:9200 + username: admin + password: testing + index: %s +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +setup.template.name: logs-filebeat-default +setup.template.pattern: logs-filebeat-default +http.enabled: true +http.host: localhost +http.port: %d +` + logFilePath := filepath.Join(filebeat.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, wantEvents) + s := fmt.Sprintf(beatsCfgFile, logFilePath, filebeatIndex, 5067) + filebeat.WriteConfigFile(s) + filebeat.Start() + defer filebeat.Stop() + + es := integration.GetESClient(t, "http") + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + // wait for logs to be published + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbReceiverIndex+"*") + require.NoError(t, err) + + filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*") + require.NoError(t, err) + + return otelDocs.Hits.Total.Value >= wantEvents && filebeatDocs.Hits.Total.Value >= wantEvents + }, + 2*time.Minute, 1*time.Second, "expected at least %d events, got filebeat: %d and otel: %d", wantEvents, filebeatDocs.Hits.Total.Value, otelDocs.Hits.Total.Value) + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "log.file.inode", + "log.file.path", + } + + assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + assertMonitoring(t, otelConfig.MonitoringPort) + assertMonitoring(t, 5067) // filebeat +} + +func TestFilebeatOTelMultipleReceiversE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + wantEvents := 100 + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + // write events to log file + logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, wantEvents) + + type receiverConfig struct { + MonitoringPort int + InputFile string + PathHome string + } + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + otelConfig := struct { + Index string + Receivers []receiverConfig + }{ + Index: "logs-integration-" + namespace, + Receivers: []receiverConfig{ + { + MonitoringPort: 5066, + InputFile: logFilePath, + PathHome: filepath.Join(filebeatOTel.TempDir(), "r1"), + }, + { + MonitoringPort: 5067, + InputFile: logFilePath, + PathHome: filepath.Join(filebeatOTel.TempDir(), "r2"), + }, + }, + } + + cfg := `receivers: +{{range $i, $receiver := .Receivers}} + filebeatreceiver/{{$i}}: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{$receiver.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{$receiver.PathHome}} +{{if $receiver.MonitoringPort}} + http.enabled: true + http.host: localhost + http.port: {{$receiver.MonitoringPort}} +{{end}} +{{end}} +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: admin + password: testing + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: +{{range $i, $receiver := .Receivers}} + - filebeatreceiver/{{$i}} +{{end}} + exporters: + - debug + - elasticsearch/log +` + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + filebeatOTel.WriteConfigFile(string(configContents)) + writeEventsToLogFile(t, logFilePath, wantEvents) + filebeatOTel.Start() + defer filebeatOTel.Stop() + + es := integration.GetESClient(t, "http") + + var otelDocs estools.Documents + var err error + + // wait for logs to be published + wantTotalLogs := wantEvents * len(otelConfig.Receivers) + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+otelConfig.Index+"*") + require.NoError(t, err) + + return otelDocs.Hits.Total.Value >= wantTotalLogs + }, + 2*time.Minute, 100*time.Millisecond, "expected at least %d events, got %d", wantTotalLogs, otelDocs.Hits.Total.Value) + for _, rec := range otelConfig.Receivers { + assertMonitoring(t, rec.MonitoringPort) + } +} +>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314))