diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index 0ded360bfc62..a76a67fd28f4 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -51,6 +51,7 @@ type Info struct { StatsRegistry *monitoring.Registry } LogConsumer consumer.Logs // otel log consumer + ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs" UseDefaultProcessors bool // Whether to use the default processors } diff --git a/libbeat/otelbeat/oteltest/oteltest.go b/libbeat/otelbeat/oteltest/oteltest.go index 5eba3b145bd4..5947bdde3327 100644 --- a/libbeat/otelbeat/oteltest/oteltest.go +++ b/libbeat/otelbeat/oteltest/oteltest.go @@ -20,6 +20,7 @@ package oteltest import ( "context" + "strings" "sync" "testing" "time" @@ -104,17 +105,17 @@ func CheckReceivers(params CheckReceiversParams) { require.NotEmpty(t, rc.Beat, "receiver beat must not be empty") var receiverSettings receiver.Settings + receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) // Replicate the behavior of the collector logger receiverCore := core. With([]zapcore.Field{ - zap.String("otelcol.component.id", rc.Name), + zap.String("otelcol.component.id", receiverSettings.ID.String()), zap.String("otelcol.component.kind", "receiver"), zap.String("otelcol.signal", "logs"), }) receiverSettings.Logger = zap.New(receiverCore) - receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { for _, rl := range ld.ResourceLogs().All() { @@ -157,9 +158,9 @@ func CheckReceivers(params CheckReceiversParams) { } }) - beatForCompID := func(compID string) string { + beatForCompName := func(compName string) string { for _, rec := range params.Receivers { - if rec.Name == compID { + if rec.Name == compName { return rec.Beat } } @@ -180,8 +181,9 @@ func CheckReceivers(params CheckReceiversParams) { require.Contains(ct, zl.ContextMap(), "otelcol.component.id") compID, ok := zl.ContextMap()["otelcol.component.id"].(string) require.True(ct, ok, "otelcol.component.id should be a string") + compName := strings.Split(compID, "/")[1] require.Contains(ct, zl.ContextMap(), "service.name") - require.Equal(ct, beatForCompID(compID), zl.ContextMap()["service.name"]) + require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"]) break } require.NotNilf(ct, host.Evt, "expected not nil nil, got %v", host.Evt) diff --git a/x-pack/filebeat/fbreceiver/factory.go b/x-pack/filebeat/fbreceiver/factory.go index b40cdbae4337..23f36721c898 100644 --- a/x-pack/filebeat/fbreceiver/factory.go +++ b/x-pack/filebeat/fbreceiver/factory.go @@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core()) + b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index d4303bc76b05..dae6d3c2dd13 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -66,8 +66,22 @@ func TestNewReceiver(t *testing.T) { }, AssertFunc: func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { _ = zapLogs +<<<<<<< HEAD require.Lenf(t, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"])) assert.Condition(t, func() bool { +======= + require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"])) + assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + var lastError strings.Builder + assert.Conditionf(c, func() bool { + return getFromSocket(t, &lastError, monitorSocket, "stats") + }, "failed to connect to monitoring socket, stats endpoint, last error was: %s", &lastError) + assert.Conditionf(c, func() bool { + return getFromSocket(t, &lastError, monitorSocket, "inputs") + }, "failed to connect to monitoring socket, inputs endpoint, last error was: %s", &lastError) + assert.Condition(c, func() bool { +>>>>>>> fafbdcbd8 (otel: add otel-specific fields to ingested docs (#45242)) processorsLoaded := zapLogs.FilterMessageSnippet("Generated new processors"). FilterMessageSnippet("add_host_metadata"). FilterMessageSnippet("add_cloud_metadata"). @@ -184,6 +198,11 @@ func TestMultipleReceivers(t *testing.T) { require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs") require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs") + assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") + assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") + assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record") + // Make sure that each receiver has a separate logger // instance and does not interfere with others. Previously, the // logger in Beats was global, causing logger fields to be diff --git a/x-pack/filebeat/input/gcppubsub/otel_test.go b/x-pack/filebeat/input/gcppubsub/otel_test.go new file mode 100644 index 000000000000..496d3bf0531d --- /dev/null +++ b/x-pack/filebeat/input/gcppubsub/otel_test.go @@ -0,0 +1,185 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration && !agentbeat + +package gcppubsub + +import ( + "bytes" + "context" + "fmt" + "testing" + "text/template" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest" + "github.com/elastic/beats/v7/libbeat/tests/integration" + + "github.com/elastic/elastic-agent-libs/testing/estools" +) + +func TestGCPInputOTelE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + // Create pubsub client for setting up and communicating to emulator. + client, clientCancel := testSetup(t) + defer func() { + clientCancel() + client.Close() + }() + + createTopic(t, client) + createSubscription(t, "test-subscription-otel", client) + createSubscription(t, "test-subscription-fb", client) + const numMsgs = 10 + publishMessages(t, client, numMsgs) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + // create a random uuid and make sure it doesn't contain dashes/ + otelNamespace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4())) + fbNameSpace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4())) + + otelIndex := "logs-integration-" + otelNamespace + fbIndex := "logs-integration-" + fbNameSpace + + type options struct { + Namespace string + ESURL string + Username string + Password string + Subscription string + } + + gcpConfig := `filebeat.inputs: +- type: gcp-pubsub + project_id: test-project-id + topic: test-topic-foo + subscription.name: {{ .Subscription }} + credentials_file: "testdata/fake.json" + +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: logs-integration-{{ .Namespace }} + +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + optionsValue := options{ + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + } + + var configBuffer bytes.Buffer + optionsValue.Namespace = otelNamespace + optionsValue.Subscription = "test-subscription-otel" + require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue)) + + filebeatOTel.WriteConfigFile(configBuffer.String()) + + filebeatOTel.Start() + defer filebeatOTel.Stop() + + // reset buffer + configBuffer.Reset() + + optionsValue.Namespace = fbNameSpace + optionsValue.Subscription = "test-subscription-fb" + require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue)) + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + filebeat.WriteConfigFile(configBuffer.String()) + filebeat.Start() + defer filebeat.Stop() + + // prepare to query ES + es := integration.GetESClient(t, "http") + + t.Cleanup(func() { + _, err := es.Indices.DeleteDataStream([]string{ + otelIndex, + fbIndex, + }) + require.NoError(t, err, "failed to delete indices") + }) + + rawQuery := map[string]any{ + "query": map[string]any{ + "match_phrase": map[string]any{ + "input.type": "gcp-pubsub", + }, + }, + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + // wait for logs to be published + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+otelIndex+"*", es) + assert.NoError(ct, err) + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel document, got %d", otelDocs.Hits.Total.Value) + + filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+fbIndex+"*", es) + assert.NoError(ct, err) + assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat document, got %d", filebeatDocs.Hits.Total.Value) + }, + 3*time.Minute, 1*time.Second, "expected at least 1 document for both filebeat and otel modes") + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "event.created", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + + oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + +} diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 166d0a6ae86c..d1141a0eaed7 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -131,8 +131,9 @@ setup.template.pattern: logs-filebeat-default }, 2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", numEvents) - filebeatDoc := filebeatDocs.Hits.Hits[0].Source - otelDoc := otelDocs.Hits.Hits[0].Source + var filebeatDoc, otelDoc mapstr.M + filebeatDoc = filebeatDocs.Hits.Hits[0].Source + otelDoc = otelDocs.Hits.Hits[0].Source ignoredFields := []string{ // Expected to change between agentDocs and OtelDocs "@timestamp", @@ -140,12 +141,152 @@ setup.template.pattern: logs-filebeat-default "agent.id", "log.file.inode", "log.file.path", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", } +<<<<<<< HEAD assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") assertMonitoring(t, otelMonitoringPort) } +======= + oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + + assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record") + assertMonitoring(t, otelMonitoringPort) +} + +func TestHTTPJSONInputOTel(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + // create a random uuid and make sure it doesn't contain dashes/ + otelNamespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + fbNameSpace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + + type options struct { + Namespace string + ESURL string + Username string + Password string + } + + // The request url is a http mock server started using streams + configFile := ` +filebeat.inputs: + - type: httpjson + id: httpjson-e2e-otel + request.url: http://localhost:8090/test + +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: logs-integration-{{ .Namespace }} + +setup.template.enabled: false +queue.mem.flush.timeout: 0s +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + optionsValue := options{ + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + } + + var configBuffer bytes.Buffer + optionsValue.Namespace = otelNamespace + require.NoError(t, template.Must(template.New("config").Parse(configFile)).Execute(&configBuffer, optionsValue)) + + filebeatOTel.WriteConfigFile(configBuffer.String()) + filebeatOTel.Start() + + // reset buffer + configBuffer.Reset() + + optionsValue.Namespace = fbNameSpace + require.NoError(t, template.Must(template.New("config").Parse(configFile)).Execute(&configBuffer, optionsValue)) + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + filebeat.WriteConfigFile(configBuffer.String()) + filebeat.Start() + + // prepare to query ES + es := integration.GetESClient(t, "http") + + rawQuery := map[string]any{ + "sort": []map[string]any{ + {"@timestamp": map[string]any{"order": "asc"}}, + }, + } + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + // wait for logs to be published + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-logs-integration-"+otelNamespace+"*", es) + assert.NoError(ct, err) + + filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-logs-integration-"+fbNameSpace+"*", es) + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel event, got %d", otelDocs.Hits.Total.Value) + assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat event, got %d", filebeatDocs.Hits.Total.Value) + }, + 2*time.Minute, 1*time.Second, "expected at least 1 event for both filebeat and otel") + + filebeatDoc := filebeatDocs.Hits.Hits[0].Source + otelDoc := otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "event.created", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + + oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") +} + +>>>>>>> fafbdcbd8 (otel: add otel-specific fields to ingested docs (#45242)) func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { t.Helper() logFile, err := os.Create(filename) @@ -200,3 +341,416 @@ func assertMonitoring(t *testing.T, port int) { require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } +<<<<<<< HEAD +======= + +func TestFilebeatOTelReceiverE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + wantEvents := 1 + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + fbReceiverIndex := "logs-integration-" + namespace + filebeatIndex := "logs-filebeat-" + namespace + + otelMonitoringPort := int(libbeattesting.MustAvailableTCP4Port(t)) + filebeatMonitoringPort := int(libbeattesting.MustAvailableTCP4Port(t)) + + otelConfig := struct { + Index string + MonitoringPort int + InputFile string + PathHome string + }{ + Index: fbReceiverIndex, + MonitoringPort: otelMonitoringPort, + InputFile: filepath.Join(filebeatOTel.TempDir(), "log.log"), + PathHome: filebeatOTel.TempDir(), + } + + cfg := `receivers: + filebeatreceiver/filestream: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{.PathHome}} + http.enabled: true + http.host: localhost + http.port: {{.MonitoringPort}} +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: admin + password: testing + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - filebeatreceiver/filestream + exporters: + - elasticsearch/log + - debug +` + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + filebeatOTel.WriteConfigFile(string(configContents)) + writeEventsToLogFile(t, otelConfig.InputFile, wantEvents) + filebeatOTel.Start() + defer filebeatOTel.Stop() + + // start filebeat + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + beatsCfgFile := ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false + paths: + - %s +output: + elasticsearch: + hosts: + - localhost:9200 + username: admin + password: testing + index: %s +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +setup.template.name: logs-filebeat-default +setup.template.pattern: logs-filebeat-default +http.enabled: true +http.host: localhost +http.port: %d +` + logFilePath := filepath.Join(filebeat.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, wantEvents) + s := fmt.Sprintf(beatsCfgFile, logFilePath, filebeatIndex, filebeatMonitoringPort) + filebeat.WriteConfigFile(s) + filebeat.Start() + defer filebeat.Stop() + + es := integration.GetESClient(t, "http") + + var filebeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + // wait for logs to be published + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbReceiverIndex+"*") + assert.NoError(ct, err) + + filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*") + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d otel events, got %d", wantEvents, otelDocs.Hits.Total.Value) + assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, wantEvents, "expected at least %d filebeat events, got %d", wantEvents, filebeatDocs.Hits.Total.Value) + }, + 2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", wantEvents) + + var filebeatDoc, otelDoc mapstr.M + filebeatDoc = filebeatDocs.Hits.Hits[0].Source + otelDoc = otelDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // Expected to change between agentDocs and OtelDocs + "@timestamp", + "agent.ephemeral_id", + "agent.id", + "log.file.inode", + "log.file.path", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + + oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + assert.Equal(t, "filebeatreceiver/filestream", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record") + assertMonitoring(t, otelConfig.MonitoringPort) + assertMonitoring(t, filebeatMonitoringPort) // filebeat +} + +func TestFilebeatOTelMultipleReceiversE2E(t *testing.T) { + t.Skip("Flaky test: https://github.com/elastic/beats/issues/45631") + integration.EnsureESIsRunning(t) + wantEvents := 100 + + // start filebeat in otel mode + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + // write events to log file + logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log") + writeEventsToLogFile(t, logFilePath, wantEvents) + + type receiverConfig struct { + MonitoringPort int + InputFile string + PathHome string + } + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + otelConfig := struct { + Index string + Receivers []receiverConfig + }{ + Index: "logs-integration-" + namespace, + Receivers: []receiverConfig{ + { + MonitoringPort: int(libbeattesting.MustAvailableTCP4Port(t)), + InputFile: logFilePath, + PathHome: filepath.Join(filebeatOTel.TempDir(), "r1"), + }, + { + MonitoringPort: int(libbeattesting.MustAvailableTCP4Port(t)), + InputFile: logFilePath, + PathHome: filepath.Join(filebeatOTel.TempDir(), "r2"), + }, + }, + } + + cfg := `receivers: +{{range $i, $receiver := .Receivers}} + filebeatreceiver/{{$i}}: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{$receiver.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{$receiver.PathHome}} +{{if $receiver.MonitoringPort}} + http.enabled: true + http.host: localhost + http.port: {{$receiver.MonitoringPort}} +{{end}} +{{end}} +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: admin + password: testing + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: +{{range $i, $receiver := .Receivers}} + - filebeatreceiver/{{$i}} +{{end}} + exporters: + - debug + - elasticsearch/log +` + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + filebeatOTel.WriteConfigFile(string(configContents)) + writeEventsToLogFile(t, logFilePath, wantEvents) + filebeatOTel.Start() + defer filebeatOTel.Stop() + + es := integration.GetESClient(t, "http") + + var otelDocs estools.Documents + var err error + + // wait for logs to be published + wantTotalLogs := wantEvents * len(otelConfig.Receivers) + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+otelConfig.Index+"*") + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantTotalLogs, "expected at least %d events, got %d", wantTotalLogs, otelDocs.Hits.Total.Value) + }, + 2*time.Minute, 100*time.Millisecond, "expected at least %d events from multiple receivers", wantTotalLogs) + for _, rec := range otelConfig.Receivers { + assertMonitoring(t, rec.MonitoringPort) + } +} + +func TestFilebeatOTelInspect(t *testing.T) { + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + var beatsCfgFile = ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false + paths: + - /tmp/log.log +output: + elasticsearch: + hosts: + - localhost:9200 + username: admin + password: testing + index: index +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + expectedExporter := `exporters: + elasticsearch: + batcher: + enabled: true + max_size: 1600 + min_size: 0 + compression: gzip + compression_params: + level: 1 + endpoints: + - http://localhost:9200 + idle_conn_timeout: 3s + logs_index: index + mapping: + mode: bodymap + password: testing + retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 + timeout: 1m30s + user: admin` + expectedReceiver := `receivers: + filebeatreceiver: + filebeat: + inputs: + - enabled: true + file_identity: + native: null + id: filestream-input-id + paths: + - /tmp/log.log + prospector: + scanner: + fingerprint: + enabled: false + type: filestream` + expectedService := `service: + pipelines: + logs: + exporters: + - elasticsearch + receivers: + - filebeatreceiver +` + filebeatOTel.WriteConfigFile(beatsCfgFile) + + filebeatOTel.Start("inspect") + defer filebeatOTel.Stop() + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + out, err := filebeatOTel.ReadStdout() + require.NoError(t, err) + require.Contains(t, out, expectedExporter) + require.Contains(t, out, expectedReceiver) + require.Contains(t, out, expectedService) + }, 10*time.Second, 500*time.Millisecond, "failed to get output of inspect command") +} +>>>>>>> fafbdcbd8 (otel: add otel-specific fields to ingested docs (#45242)) diff --git a/x-pack/libbeat/cmd/instance/beat.go b/x-pack/libbeat/cmd/instance/beat.go index c3979019f7e3..d7746c7ea723 100644 --- a/x-pack/libbeat/cmd/instance/beat.go +++ b/x-pack/libbeat/cmd/instance/beat.go @@ -33,7 +33,7 @@ import ( ) // NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver -func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*instance.Beat, error) { +func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) { b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version, @@ -43,6 +43,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an return nil, err } + b.Info.ComponentID = componentID b.Info.LogConsumer = consumer // begin code similar to configure diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go index aeb2c34f2671..4d7cf45e0a91 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go @@ -29,6 +29,15 @@ import ( const ( // esDocumentIDAttribute is the attribute key used to store the document ID in the log record. esDocumentIDAttribute = "elasticsearch.document_id" +<<<<<<< HEAD +======= + // otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event. + otelComponentIDKey = "otelcol.component.id" + // otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver". + otelComponentKindKey = "otelcol.component.kind" + beatNameCtxKey = "beat_name" + beatVersionCtxtKey = "beat_version" +>>>>>>> fafbdcbd8 (otel: add otel-specific fields to ingested docs (#45242)) ) func init() { @@ -134,6 +143,15 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) } logRecord.SetObservedTimestamp(observedTimestamp) + if agent, _ := beatEvent.GetValue("agent"); agent != nil { + switch agent := agent.(type) { + case mapstr.M: + agent[otelComponentIDKey] = out.beatInfo.ComponentID + agent[otelComponentKindKey] = "receiver" + beatEvent["agent"] = agent + } + } + otelmap.ConvertNonPrimitive(beatEvent) // if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go index 94e8bbbcc68f..28029aa25644 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -251,4 +251,68 @@ func TestPublish(t *testing.T) { assert.Len(t, batch.Signals, 1) assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) }) +<<<<<<< HEAD +======= + t.Run("sets otel specific-fields", func(t *testing.T) { + testCases := []struct { + name string + componentID string + componentKind string + expectedComponentID string + expectedComponentKind string + }{ + { + name: "sets beat component ID", + componentID: "filebeatreceiver/1", + expectedComponentID: "filebeatreceiver/1", + expectedComponentKind: "receiver", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + event := beat.Event{ + Fields: mapstr.M{ + "field": 1, + "agent": mapstr.M{}, + }, + Meta: mapstr.M{ + "_id": "abc123", + }, + } + batch := outest.NewBatch(event) + var countLogs int + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + countLogs = countLogs + ld.LogRecordCount() + return nil + }) + otelConsumer.beatInfo.ComponentID = tc.componentID + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 1) + assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) + assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed") + for _, event := range batch.Events() { + beatEvent := event.Content.Fields.Flatten() + assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record") + } + }) + } + }) + t.Run("sets the client context metadata with the beat info", func(t *testing.T) { + batch := outest.NewBatch(event1) + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + cm := client.FromContext(ctx).Metadata + assert.Equal(t, beatInfo.Beat, cm.Get(beatNameCtxKey)[0]) + assert.Equal(t, beatInfo.Version, cm.Get(beatVersionCtxtKey)[0]) + return nil + }) + + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 1) + assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) + }) +>>>>>>> fafbdcbd8 (otel: add otel-specific fields to ingested docs (#45242)) } diff --git a/x-pack/metricbeat/mbreceiver/factory.go b/x-pack/metricbeat/mbreceiver/factory.go index 8ab0c1a3fd5d..4990a89a0281 100644 --- a/x-pack/metricbeat/mbreceiver/factory.go +++ b/x-pack/metricbeat/mbreceiver/factory.go @@ -51,7 +51,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core()) + b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go index ec1bb9dea095..5ae841f0e385 100644 --- a/x-pack/metricbeat/mbreceiver/receiver_test.go +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -84,6 +84,8 @@ func TestNewReceiver(t *testing.T) { require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 }, "expected at least one ingest log, got logs: %v", logs["r1"]) + assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket) @@ -193,9 +195,13 @@ func TestMultipleReceivers(t *testing.T) { }, AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { _ = zapLogs - assert.Conditionf(c, func() bool { + require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 && len(logs["r2"]) > 0 }, "expected at least one ingest log for each receiver, got logs: %v", logs) + assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") + assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") + assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record") var lastError strings.Builder assert.Conditionf(c, func() bool { tests := []string{monitorSocket1, monitorSocket2} diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go new file mode 100644 index 000000000000..8b447e4cf182 --- /dev/null +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -0,0 +1,607 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "fmt" + "net/http" + "path/filepath" + "strings" + "testing" + "text/template" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + libbeattesting "github.com/elastic/beats/v7/libbeat/testing" + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/testing/estools" +) + +func TestMetricbeatOTelE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + es := integration.GetESClient(t, "http") + + // create a random uuid and make sure it doesn't contain dashes/ + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + mbIndex := "logs-integration-mb-" + namespace + mbReceiverIndex := "logs-integration-mbreceiver-" + namespace + t.Cleanup(func() { + _, err := es.Indices.DeleteDataStream([]string{ + mbIndex, + mbReceiverIndex, + }) + require.NoError(t, err, "failed to delete indices") + }) + + type options struct { + Index string + ESURL string + Username string + Password string + MonitoringPort int + } + + var beatsCfgFile = ` +metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: {{ .Index }} +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +http.host: localhost +http.port: {{.MonitoringPort}} +` + + // start metricbeat in otel mode + metricbeatOTel := integration.NewBeat( + t, + "metricbeat-otel", + "../../metricbeat.test", + "otel", + ) + + optionsValue := options{ + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + MonitoringPort: int(libbeattesting.MustAvailableTCP4Port(t)), + } + + var configBuffer bytes.Buffer + optionsValue.Index = mbReceiverIndex + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&configBuffer, optionsValue)) + + metricbeatOTel.WriteConfigFile(configBuffer.String()) + metricbeatOTel.Start() + defer metricbeatOTel.Stop() + + var mbConfigBuffer bytes.Buffer + optionsValue.Index = mbIndex + optionsValue.MonitoringPort = int(libbeattesting.MustAvailableTCP4Port(t)) + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&mbConfigBuffer, optionsValue)) + metricbeat := integration.NewBeat(t, "metricbeat", "../../metricbeat.test") + metricbeat.WriteConfigFile(mbConfigBuffer.String()) + metricbeat.Start() + defer metricbeat.Stop() + + // Make sure find the logs + var metricbeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+mbReceiverIndex+"*") + assert.NoError(ct, err) + + metricbeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+mbIndex+"*") + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 log for otel receiver, got %d", otelDocs.Hits.Total.Value) + assert.GreaterOrEqual(ct, metricbeatDocs.Hits.Total.Value, 1, "expected at least 1 log for metricbeat, got %d", metricbeatDocs.Hits.Total.Value) + }, + 1*time.Minute, 1*time.Second, "expected at least 1 log for metricbeat and otel receiver") + + var metricbeatDoc, otelDoc mapstr.M + otelDoc = otelDocs.Hits.Hits[0].Source + metricbeatDoc = metricbeatDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + assert.Equal(t, "metricbeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in metricbeat log record") + assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in metricbeat log record") + assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, ignoredFields, "expected documents keys to be equal") + assertMonitoring(t, optionsValue.MonitoringPort) +} + +func assertMonitoring(t *testing.T, port int) { + address := fmt.Sprintf("http://localhost:%d", port) + r, err := http.Get(address) //nolint:noctx,bodyclose,gosec // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get(address + "/stats") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get(address + "/not-exist") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") +} + +func TestMetricbeatOTelReceiverE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + es := integration.GetESClient(t, "http") + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + mbReceiverIndex := "logs-integration-mbreceiver-" + namespace + mbIndex := "logs-integration-mb-" + namespace + t.Cleanup(func() { + _, err := es.Indices.DeleteDataStream([]string{ + mbIndex, + mbReceiverIndex, + }) + require.NoError(t, err, "failed to delete indices") + }) + + type options struct { + Index string + ESURL string + Username string + Password string + } + + cfg := `receivers: + metricbeatreceiver: + metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - {{.ESURL}} + compression: none + user: {{.Username}} + password: {{.Password}} + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - metricbeatreceiver + exporters: + - elasticsearch/log + - debug +` + + // start metricbeat in otel mode + metricbeatOTel := integration.NewBeat( + t, + "metricbeat-otel", + "../../metricbeat.test", + "otel", + ) + + var configBuffer bytes.Buffer + require.NoError(t, template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, options{ + Index: mbReceiverIndex, + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + })) + + metricbeatOTel.WriteConfigFile(configBuffer.String()) + metricbeatOTel.Start() + defer metricbeatOTel.Stop() + + var beatsCfgFile = `receivers: +metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: {{ .Index }} +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + var mbConfigBuffer bytes.Buffer + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&mbConfigBuffer, options{ + Index: mbIndex, + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + })) + metricbeat := integration.NewBeat(t, "metricbeat", "../../metricbeat.test") + metricbeat.WriteConfigFile(mbConfigBuffer.String()) + metricbeat.Start() + defer metricbeat.Stop() + + var metricbeatDocs estools.Documents + var otelDocs estools.Documents + var err error + + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+mbReceiverIndex+"*") + assert.NoError(ct, err) + + metricbeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+mbIndex+"*") + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 log for otel receiver, got %d", otelDocs.Hits.Total.Value) + assert.GreaterOrEqual(ct, metricbeatDocs.Hits.Total.Value, 1, "expected at least 1 log for metricbeat receiver, got %d", metricbeatDocs.Hits.Total.Value) + }, + 1*time.Minute, 1*time.Second, "expected at least a single log for metricbeat and otel mode") + otelDoc := otelDocs.Hits.Hits[0] + metricbeatDoc := metricbeatDocs.Hits.Hits[0] + ignoredFields := []string{ + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, ignoredFields, "expected documents keys to be equal") +} + +func TestMetricbeatOTelMultipleReceiversE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + metricbeatOTel := integration.NewBeat( + t, + "metricbeat-otel", + "../../metricbeat.test", + "otel", + ) + + type receiverConfig struct { + MonitoringPort int + InputFile string + PathHome string + } + + es := integration.GetESClient(t, "http") + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + index := "logs-integration-" + namespace + t.Cleanup(func() { + _, err := es.Indices.DeleteDataStream([]string{ + index, + }) + require.NoError(t, err, "failed to delete indices") + }) + + otelConfig := struct { + Index string + Username string + Password string + Receivers []receiverConfig + }{ + Index: index, + Username: user, + Password: password, + Receivers: []receiverConfig{ + { + MonitoringPort: int(libbeattesting.MustAvailableTCP4Port(t)), + PathHome: filepath.Join(metricbeatOTel.TempDir(), "r1"), + }, + { + MonitoringPort: int(libbeattesting.MustAvailableTCP4Port(t)), + PathHome: filepath.Join(metricbeatOTel.TempDir(), "r2"), + }, + }, + } + + cfg := `receivers: +{{range $i, $receiver := .Receivers}} + metricbeatreceiver/{{$i}}: + metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu + processors: + - add_fields: + target: '' + fields: + receiverid: "{{$i}}" + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + queue.mem.flush.timeout: 0s + path.home: {{$receiver.PathHome}} +{{if $receiver.MonitoringPort}} + http.enabled: true + http.host: localhost + http.port: {{$receiver.MonitoringPort}} +{{end}} +{{end}} +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - http://localhost:9200 + compression: none + user: {{.Username}} + password: {{.Password}} + logs_index: {{.Index}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: +{{range $i, $receiver := .Receivers}} + - metricbeatreceiver/{{$i}} +{{end}} + exporters: + - debug + - elasticsearch/log +` + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + metricbeatOTel.WriteConfigFile(string(configContents)) + metricbeatOTel.Start() + defer metricbeatOTel.Stop() + + var r0Docs, r1Docs estools.Documents + var err error + + require.EventuallyWithTf(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second) + defer findCancel() + + r0Docs, err = estools.PerformQueryForRawQuery(findCtx, map[string]any{ + "query": map[string]any{ + "match": map[string]any{ + "receiverid": "0", + }, + }, + }, ".ds-"+otelConfig.Index+"*", es) + assert.NoError(ct, err, "failed to query for receiver 0 logs") + + r1Docs, err = estools.PerformQueryForRawQuery(findCtx, map[string]any{ + "query": map[string]any{ + "match": map[string]any{ + "receiverid": "1", + }, + }, + }, ".ds-"+otelConfig.Index+"*", es) + assert.NoError(ct, err, "failed to query for receiver 1 logs") + + assert.GreaterOrEqualf(ct, r0Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 0, got %d", r0Docs.Hits.Total.Value) + assert.GreaterOrEqualf(ct, r1Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 1, got %d", r1Docs.Hits.Total.Value) + }, + 1*time.Minute, 100*time.Millisecond, "expected at least 1 log for each receiver") + ignoredFields := []string{ + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, ignoredFields, "expected documents keys to be equal") + for _, rec := range otelConfig.Receivers { + assertMonitoring(t, rec.MonitoringPort) + } +} + +func assertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { + t.Helper() + // Delete all ignored fields. + for _, f := range ignoredFields { + _ = m1.Delete(f) + _ = m2.Delete(f) + } + + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + + for k := range flatM1 { + flatM1[k] = "" + } + for k := range flatM2 { + flatM2[k] = "" + } + + require.Zero(t, cmp.Diff(flatM1, flatM2), msg) +} + +func TestMetricbeatOTelInspect(t *testing.T) { + mbOTel := integration.NewBeat( + t, + "metricbeat-otel", + "../../metricbeat.test", + "otel", + ) + + var beatsCfgFile = ` +metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu +output: + elasticsearch: + hosts: + - localhost:9200 + username: admin + password: testing + index: index +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + expectedExporter := `exporters: + elasticsearch: + batcher: + enabled: true + max_size: 1600 + min_size: 0 + compression: gzip + compression_params: + level: 1 + endpoints: + - http://localhost:9200 + idle_conn_timeout: 3s + logs_index: index + mapping: + mode: bodymap + password: testing + retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 + timeout: 1m30s + user: admin` + expectedReceiver := `receivers: + metricbeatreceiver: + logging: + files: + rotateeverybytes: 104857600 + rotateonstartup: false + to_files: true + metricbeat: + modules: + - enabled: true + metricsets: + - cpu + module: system + period: 1s + processes: + - .*` + expectedService := `service: + pipelines: + logs: + exporters: + - elasticsearch + receivers: + - metricbeatreceiver +` + mbOTel.WriteConfigFile(beatsCfgFile) + + mbOTel.Start("inspect") + defer mbOTel.Stop() + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + out, err := mbOTel.ReadStdout() + require.NoError(collect, err) + require.Contains(collect, out, expectedExporter) + require.Contains(collect, out, expectedReceiver) + require.Contains(collect, out, expectedService) + }, 10*time.Second, 500*time.Millisecond, "failed to get output of inspect command") +}