From 6810225cca992a74e418adf34549d709503f2293 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 28 Jul 2025 10:39:41 -0300 Subject: [PATCH] otel: add mbreceiver E2E test (#45427) * otel: add mbreceiver E2E test * add multiple receivers test * fix message for Eventuallyf (cherry picked from commit 73f8a2bce6ec945dc38b78c457324ddd6a66726b) --- .../metricbeat/tests/integration/otel_test.go | 299 ++++++++++++++++++ 1 file changed, 299 insertions(+) diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go index 49c997ca27ab..66678f764cca 100644 --- a/x-pack/metricbeat/tests/integration/otel_test.go +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -11,6 +11,7 @@ import ( "context" "fmt" "net/http" + "path/filepath" "strings" "testing" "text/template" @@ -146,6 +147,304 @@ func assertMonitoring(t *testing.T, port int) { require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } +func TestMetricbeatOTelReceiverE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + + mbReceiverIndex := "logs-integration-mbreceiver-" + namespace + mbIndex := "logs-metricbeat-mb-" + namespace + + type options struct { + Index string + ESURL string + Username string + Password string + } + + cfg := `receivers: + metricbeatreceiver: + metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - {{.ESURL}} + compression: none + user: {{.Username}} + password: {{.Password}} + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - metricbeatreceiver + exporters: + - elasticsearch/log + - debug +` + + // start metricbeat in otel mode + metricbeatOTel := integration.NewBeat( + t, + "metricbeat-otel", + "../../metricbeat.test", + "otel", + ) + + var configBuffer bytes.Buffer + require.NoError(t, template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, options{ + Index: mbReceiverIndex, + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + })) + + metricbeatOTel.WriteConfigFile(configBuffer.String()) + metricbeatOTel.Start() + defer metricbeatOTel.Stop() + + var beatsCfgFile = `receivers: +metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: {{ .Index }} +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + var mbConfigBuffer bytes.Buffer + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&mbConfigBuffer, options{ + Index: mbIndex, + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + })) + metricbeat := integration.NewBeat(t, "metricbeat", "../../metricbeat.test") + metricbeat.WriteConfigFile(mbConfigBuffer.String()) + metricbeat.Start() + defer metricbeat.Stop() + + // prepare to query ES + es := integration.GetESClient(t, "http") + + var metricbeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+mbReceiverIndex+"*") + require.NoError(t, err) + + metricbeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+mbIndex+"*") + require.NoError(t, err) + + return otelDocs.Hits.Total.Value >= 1 && metricbeatDocs.Hits.Total.Value >= 1 + }, + 2*time.Minute, 1*time.Second, "expected at least 1 log") + otelDoc := otelDocs.Hits.Hits[0] + metricbeatDoc := metricbeatDocs.Hits.Hits[0] + assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal") +} + +func TestMetricbeatOTelMultipleReceiversE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + metricbeatOTel := integration.NewBeat( + t, + "metricbeat-otel", + "../../metricbeat.test", + "otel", + ) + + type receiverConfig struct { + MonitoringPort int + InputFile string + PathHome string + } + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + otelConfig := struct { + Index string + Username string + Password string + Receivers []receiverConfig + }{ + Index: "logs-integration-" + namespace, + Username: user, + Password: password, + Receivers: []receiverConfig{ + { + MonitoringPort: 5066, + PathHome: filepath.Join(metricbeatOTel.TempDir(), "r1"), + }, + { + MonitoringPort: 5067, + PathHome: filepath.Join(metricbeatOTel.TempDir(), "r2"), + }, + }, + } + + cfg := `receivers: +{{range $i, $receiver := .Receivers}} + metricbeatreceiver/{{$i}}: + metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu + processors: + - add_fields: + target: '' + fields: + receiverid: "{{$i}}" + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{$receiver.PathHome}} +{{if $receiver.MonitoringPort}} + http.enabled: true + http.host: localhost + http.port: {{$receiver.MonitoringPort}} +{{end}} +{{end}} +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: {{.Username}} + password: {{.Password}} + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: +{{range $i, $receiver := .Receivers}} + - metricbeatreceiver/{{$i}} +{{end}} + exporters: + - debug + - elasticsearch/log +` + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + metricbeatOTel.WriteConfigFile(string(configContents)) + metricbeatOTel.Start() + defer metricbeatOTel.Stop() + + es := integration.GetESClient(t, "http") + + var r0Docs, r1Docs estools.Documents + var err error + + require.Eventuallyf(t, + func() bool { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + r0Docs, err = estools.PerformQueryForRawQuery(findCtx, map[string]any{ + "query": map[string]any{ + "match": map[string]any{ + "receiverid": "0", + }, + }, + }, ".ds-"+otelConfig.Index+"*", es) + require.NoError(t, err) + + r1Docs, err = estools.PerformQueryForRawQuery(findCtx, map[string]any{ + "query": map[string]any{ + "match": map[string]any{ + "receiverid": "1", + }, + }, + }, ".ds-"+otelConfig.Index+"*", es) + require.NoError(t, err) + + return r0Docs.Hits.Total.Value >= 1 && r1Docs.Hits.Total.Value >= 1 + }, + 1*time.Minute, 100*time.Millisecond, "expected at least 1 log for each receiver") + assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, []string{}, "expected documents keys to be equal") + for _, rec := range otelConfig.Receivers { + assertMonitoring(t, rec.MonitoringPort) + } +} + func assertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { t.Helper() // Delete all ignored fields.