diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index 82338fe871e6..aa1338091a79 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -43,6 +43,7 @@ type Info struct { FIPSDistribution bool // If the beat was compiled as a FIPS distribution. LogConsumer consumer.Logs // otel log consumer + ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs" UseDefaultProcessors bool // Whether to use the default processors Logger *logp.Logger } diff --git a/libbeat/otelbeat/oteltest/oteltest.go b/libbeat/otelbeat/oteltest/oteltest.go index 05b2bd0103a3..4a57ffd4d26f 100644 --- a/libbeat/otelbeat/oteltest/oteltest.go +++ b/libbeat/otelbeat/oteltest/oteltest.go @@ -20,6 +20,7 @@ package oteltest import ( "context" + "strings" "sync" "testing" "time" @@ -104,17 +105,17 @@ func CheckReceivers(params CheckReceiversParams) { require.NotEmpty(t, rc.Beat, "receiver beat must not be empty") var receiverSettings receiver.Settings + receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) // Replicate the behavior of the collector logger receiverCore := core. With([]zapcore.Field{ - zap.String("otelcol.component.id", rc.Name), + zap.String("otelcol.component.id", receiverSettings.ID.String()), zap.String("otelcol.component.kind", "receiver"), zap.String("otelcol.signal", "logs"), }) receiverSettings.Logger = zap.New(receiverCore) - receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { for _, rl := range ld.ResourceLogs().All() { @@ -157,9 +158,9 @@ func CheckReceivers(params CheckReceiversParams) { } }) - beatForCompID := func(compID string) string { + beatForCompName := func(compName string) string { for _, rec := range params.Receivers { - if rec.Name == compID { + if rec.Name == compName { return rec.Beat } } @@ -180,8 +181,9 @@ func CheckReceivers(params CheckReceiversParams) { require.Contains(ct, zl.ContextMap(), "otelcol.component.id") compID, ok := zl.ContextMap()["otelcol.component.id"].(string) require.True(ct, ok, "otelcol.component.id should be a string") + compName := strings.Split(compID, "/")[1] require.Contains(ct, zl.ContextMap(), "service.name") - require.Equal(ct, beatForCompID(compID), zl.ContextMap()["service.name"]) + require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"]) break } require.NotNil(ct, host.Evt, "expected not nil, got nil") diff --git a/x-pack/filebeat/fbreceiver/factory.go b/x-pack/filebeat/fbreceiver/factory.go index b40cdbae4337..23f36721c898 100644 --- a/x-pack/filebeat/fbreceiver/factory.go +++ b/x-pack/filebeat/fbreceiver/factory.go @@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core()) + b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index d35f097e2df4..bb78c28b9939 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -92,6 +92,8 @@ func TestNewReceiver(t *testing.T) { AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { _ = zapLogs require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"])) + assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket, "stats") @@ -232,6 +234,11 @@ func TestMultipleReceivers(t *testing.T) { require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs") require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs") + assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") + assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") + assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record") + // Make sure that each receiver has a separate logger // instance and does not interfere with others. Previously, the // logger in Beats was global, causing logger fields to be diff --git a/x-pack/filebeat/input/gcppubsub/otel_test.go b/x-pack/filebeat/input/gcppubsub/otel_test.go index c949c2123974..496d3bf0531d 100644 --- a/x-pack/filebeat/input/gcppubsub/otel_test.go +++ b/x-pack/filebeat/input/gcppubsub/otel_test.go @@ -175,6 +175,9 @@ processors: "agent.ephemeral_id", "agent.id", "event.created", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", } oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 3497dbe3d529..cfaeace98228 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest" libbeattesting "github.com/elastic/beats/v7/libbeat/testing" "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" ) @@ -120,8 +121,9 @@ http.port: %d }, 2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", numEvents) - filebeatDoc := filebeatDocs.Hits.Hits[0].Source - otelDoc := otelDocs.Hits.Hits[0].Source + var filebeatDoc, otelDoc mapstr.M + filebeatDoc = filebeatDocs.Hits.Hits[0].Source + otelDoc = otelDocs.Hits.Hits[0].Source ignoredFields := []string{ // Expected to change between agentDocs and OtelDocs "@timestamp", @@ -129,9 +131,17 @@ http.port: %d "agent.id", "log.file.inode", "log.file.path", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", } oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + + assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record") assertMonitoring(t, otelMonitoringPort) } @@ -252,6 +262,9 @@ processors: "agent.ephemeral_id", "agent.id", "event.created", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", } oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") @@ -325,7 +338,7 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) { } cfg := `receivers: - filebeatreceiver: + filebeatreceiver/filestream: filebeat: inputs: - type: filestream @@ -366,7 +379,7 @@ service: pipelines: logs: receivers: - - filebeatreceiver + - filebeatreceiver/filestream exporters: - elasticsearch/log - debug @@ -452,8 +465,9 @@ http.port: %d }, 2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", wantEvents) - filebeatDoc := filebeatDocs.Hits.Hits[0].Source - otelDoc := otelDocs.Hits.Hits[0].Source + var filebeatDoc, otelDoc mapstr.M + filebeatDoc = filebeatDocs.Hits.Hits[0].Source + otelDoc = otelDocs.Hits.Hits[0].Source ignoredFields := []string{ // Expected to change between agentDocs and OtelDocs "@timestamp", @@ -461,9 +475,16 @@ http.port: %d "agent.id", "log.file.inode", "log.file.path", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", } oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + assert.Equal(t, "filebeatreceiver/filestream", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record") assertMonitoring(t, otelConfig.MonitoringPort) assertMonitoring(t, filebeatMonitoringPort) // filebeat } diff --git a/x-pack/libbeat/cmd/instance/beat.go b/x-pack/libbeat/cmd/instance/beat.go index bf22fa62e3e4..562c3ce43b3b 100644 --- a/x-pack/libbeat/cmd/instance/beat.go +++ b/x-pack/libbeat/cmd/instance/beat.go @@ -33,7 +33,7 @@ import ( ) // NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver -func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*instance.Beat, error) { +func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) { b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version, @@ -43,6 +43,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an return nil, err } + b.Info.ComponentID = componentID b.Info.LogConsumer = consumer // begin code similar to configure diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go index 976a9e6d6211..ee3fcc45db98 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go @@ -32,8 +32,12 @@ import ( const ( // esDocumentIDAttribute is the attribute key used to store the document ID in the log record. esDocumentIDAttribute = "elasticsearch.document_id" - beatNameCtxKey = "beat_name" - beatVersionCtxtKey = "beat_version" + // otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event. + otelComponentIDKey = "otelcol.component.id" + // otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver". + otelComponentKindKey = "otelcol.component.kind" + beatNameCtxKey = "beat_name" + beatVersionCtxtKey = "beat_version" ) func init() { @@ -145,6 +149,15 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) } logRecord.SetObservedTimestamp(observedTimestamp) + if agent, _ := beatEvent.GetValue("agent"); agent != nil { + switch agent := agent.(type) { + case mapstr.M: + agent[otelComponentIDKey] = out.beatInfo.ComponentID + agent[otelComponentKindKey] = "receiver" + beatEvent["agent"] = agent + } + } + otelmap.ConvertNonPrimitive(beatEvent) // if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go index 1caba7ec1a60..f4299808cce0 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -254,7 +254,53 @@ func TestPublish(t *testing.T) { assert.Len(t, batch.Signals, 1) assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) }) + t.Run("sets otel specific-fields", func(t *testing.T) { + testCases := []struct { + name string + componentID string + componentKind string + expectedComponentID string + expectedComponentKind string + }{ + { + name: "sets beat component ID", + componentID: "filebeatreceiver/1", + expectedComponentID: "filebeatreceiver/1", + expectedComponentKind: "receiver", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + event := beat.Event{ + Fields: mapstr.M{ + "field": 1, + "agent": mapstr.M{}, + }, + Meta: mapstr.M{ + "_id": "abc123", + }, + } + batch := outest.NewBatch(event) + var countLogs int + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + countLogs = countLogs + ld.LogRecordCount() + return nil + }) + otelConsumer.beatInfo.ComponentID = tc.componentID + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 1) + assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) + assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed") + for _, event := range batch.Events() { + beatEvent := event.Content.Fields.Flatten() + assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record") + } + }) + } + }) t.Run("sets the client context metadata with the beat info", func(t *testing.T) { batch := outest.NewBatch(event1) otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { diff --git a/x-pack/metricbeat/mbreceiver/factory.go b/x-pack/metricbeat/mbreceiver/factory.go index 8ab0c1a3fd5d..4990a89a0281 100644 --- a/x-pack/metricbeat/mbreceiver/factory.go +++ b/x-pack/metricbeat/mbreceiver/factory.go @@ -51,7 +51,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core()) + b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go index c050df4a0d2e..332d23fddc11 100644 --- a/x-pack/metricbeat/mbreceiver/receiver_test.go +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -86,6 +86,8 @@ func TestNewReceiver(t *testing.T) { require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 }, "expected at least one ingest log, got logs: %v", logs["r1"]) + assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket, "stats") @@ -198,9 +200,13 @@ func TestMultipleReceivers(t *testing.T) { }, AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { _ = zapLogs - assert.Conditionf(c, func() bool { + require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 && len(logs["r2"]) > 0 }, "expected at least one ingest log for each receiver, got logs: %v", logs) + assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") + assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") + assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record") var lastError strings.Builder assert.Conditionf(c, func() bool { tests := []string{monitorSocket1, monitorSocket2} diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go index 40b12c603492..8b447e4cf182 100644 --- a/x-pack/metricbeat/tests/integration/otel_test.go +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -138,9 +138,19 @@ http.port: {{.MonitoringPort}} }, 1*time.Minute, 1*time.Second, "expected at least 1 log for metricbeat and otel receiver") - otelDoc := otelDocs.Hits.Hits[0] - metricbeatDoc := metricbeatDocs.Hits.Hits[0] - assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal") + var metricbeatDoc, otelDoc mapstr.M + otelDoc = otelDocs.Hits.Hits[0].Source + metricbeatDoc = metricbeatDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + assert.Equal(t, "metricbeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in metricbeat log record") + assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in metricbeat log record") + assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, ignoredFields, "expected documents keys to be equal") assertMonitoring(t, optionsValue.MonitoringPort) } @@ -308,7 +318,12 @@ processors: 1*time.Minute, 1*time.Second, "expected at least a single log for metricbeat and otel mode") otelDoc := otelDocs.Hits.Hits[0] metricbeatDoc := metricbeatDocs.Hits.Hits[0] - assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal") + ignoredFields := []string{ + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, ignoredFields, "expected documents keys to be equal") } func TestMetricbeatOTelMultipleReceiversE2E(t *testing.T) { @@ -464,8 +479,13 @@ service: assert.GreaterOrEqualf(ct, r0Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 0, got %d", r0Docs.Hits.Total.Value) assert.GreaterOrEqualf(ct, r1Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 1, got %d", r1Docs.Hits.Total.Value) }, - 1*time.Minute, 100*time.Millisecond, "expected to found receiver logs") - assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, []string{}, "expected documents keys to be equal") + 1*time.Minute, 100*time.Millisecond, "expected at least 1 log for each receiver") + ignoredFields := []string{ + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, ignoredFields, "expected documents keys to be equal") for _, rec := range otelConfig.Receivers { assertMonitoring(t, rec.MonitoringPort) }