diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index 82338fe871e6..aa1338091a79 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -43,6 +43,7 @@ type Info struct { FIPSDistribution bool // If the beat was compiled as a FIPS distribution. LogConsumer consumer.Logs // otel log consumer + ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs" UseDefaultProcessors bool // Whether to use the default processors Logger *logp.Logger } diff --git a/libbeat/otelbeat/oteltest/oteltest.go b/libbeat/otelbeat/oteltest/oteltest.go index bb51b7f254c5..69f44a0d5758 100644 --- a/libbeat/otelbeat/oteltest/oteltest.go +++ b/libbeat/otelbeat/oteltest/oteltest.go @@ -20,6 +20,7 @@ package oteltest import ( "context" + "strings" "sync" "testing" "time" @@ -107,17 +108,17 @@ func CheckReceivers(params CheckReceiversParams) { require.NotEmpty(t, rc.Beat, "receiver beat must not be empty") var receiverSettings receiver.Settings + receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) // Replicate the behavior of the collector logger receiverCore := core. With([]zapcore.Field{ - zap.String("otelcol.component.id", rc.Name), + zap.String("otelcol.component.id", receiverSettings.ID.String()), zap.String("otelcol.component.kind", "receiver"), zap.String("otelcol.signal", "logs"), }) receiverSettings.Logger = zap.New(receiverCore) - receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { for _, rl := range ld.ResourceLogs().All() { @@ -160,9 +161,9 @@ func CheckReceivers(params CheckReceiversParams) { } }) - beatForCompID := func(compID string) string { + beatForCompName := func(compName string) string { for _, rec := range params.Receivers { - if rec.Name == compID { + if rec.Name == compName { return rec.Beat } } @@ -183,8 +184,9 @@ func CheckReceivers(params CheckReceiversParams) { require.Contains(ct, zl.ContextMap(), "otelcol.component.id") compID, ok := zl.ContextMap()["otelcol.component.id"].(string) require.True(ct, ok, "otelcol.component.id should be a string") + compName := strings.Split(compID, "/")[1] require.Contains(ct, zl.ContextMap(), "service.name") - require.Equal(ct, beatForCompID(compID), zl.ContextMap()["service.name"]) + require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"]) break } require.NotNil(ct, host.Evt, "expected not nil, got nil") diff --git a/x-pack/filebeat/fbreceiver/factory.go b/x-pack/filebeat/fbreceiver/factory.go index c35fe6db9ba7..d3afc7f7d906 100644 --- a/x-pack/filebeat/fbreceiver/factory.go +++ b/x-pack/filebeat/fbreceiver/factory.go @@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core()) + b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index c3fc4cdb763d..10211957e17e 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -92,6 +92,8 @@ func TestNewReceiver(t *testing.T) { AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { _ = zapLogs require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"])) + assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket, "stats") @@ -232,6 +234,11 @@ func TestMultipleReceivers(t *testing.T) { require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs") require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs") + assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") + assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") + assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record") + // Make sure that each receiver has a separate logger // instance and does not interfere with others. Previously, the // logger in Beats was global, causing logger fields to be diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 45949fc28aeb..f7c13cbd21c0 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -131,8 +131,9 @@ setup.template.pattern: logs-filebeat-default }, 2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", numEvents) - filebeatDoc := filebeatDocs.Hits.Hits[0].Source - otelDoc := otelDocs.Hits.Hits[0].Source + var filebeatDoc, otelDoc mapstr.M + filebeatDoc = filebeatDocs.Hits.Hits[0].Source + otelDoc = otelDocs.Hits.Hits[0].Source ignoredFields := []string{ // Expected to change between agentDocs and OtelDocs "@timestamp", @@ -140,9 +141,17 @@ setup.template.pattern: logs-filebeat-default "agent.id", "log.file.inode", "log.file.path", + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", } assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + + assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record") + assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record") assertMonitoring(t, otelMonitoringPort) } diff --git a/x-pack/libbeat/cmd/instance/beat.go b/x-pack/libbeat/cmd/instance/beat.go index 53fb07fd25ea..9c52826b4844 100644 --- a/x-pack/libbeat/cmd/instance/beat.go +++ b/x-pack/libbeat/cmd/instance/beat.go @@ -33,7 +33,7 @@ import ( ) // NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver -func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*instance.Beat, error) { +func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) { b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version, @@ -43,6 +43,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an return nil, err } + b.Info.ComponentID = componentID b.Info.LogConsumer = consumer // begin code similar to configure diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go index 02dd2a62e91d..3b81b347a4d7 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go @@ -34,6 +34,10 @@ const ( esDocumentIDAttribute = "elasticsearch.document_id" beatNameCtxKey = "beat_name" beatVersionCtxtKey = "beat_version" + // otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event. + otelComponentIDKey = "otelcol.component.id" + // otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver". + otelComponentKindKey = "otelcol.component.kind" ) func init() { @@ -145,6 +149,15 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) } logRecord.SetObservedTimestamp(observedTimestamp) + if agent, _ := beatEvent.GetValue("agent"); agent != nil { + switch agent := agent.(type) { + case mapstr.M: + agent[otelComponentIDKey] = out.beatInfo.ComponentID + agent[otelComponentKindKey] = "receiver" + beatEvent["agent"] = agent + } + } + otelmap.ConvertNonPrimitive(beatEvent) // if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go index 1caba7ec1a60..da7fda50d5fb 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -254,7 +254,6 @@ func TestPublish(t *testing.T) { assert.Len(t, batch.Signals, 1) assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) }) - t.Run("sets the client context metadata with the beat info", func(t *testing.T) { batch := outest.NewBatch(event1) otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { @@ -269,4 +268,51 @@ func TestPublish(t *testing.T) { assert.Len(t, batch.Signals, 1) assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) }) + t.Run("sets otel specific-fields", func(t *testing.T) { + testCases := []struct { + name string + componentID string + componentKind string + expectedComponentID string + expectedComponentKind string + }{ + { + name: "sets beat component ID", + componentID: "filebeatreceiver/1", + expectedComponentID: "filebeatreceiver/1", + expectedComponentKind: "receiver", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + event := beat.Event{ + Fields: mapstr.M{ + "field": 1, + "agent": mapstr.M{}, + }, + Meta: mapstr.M{ + "_id": "abc123", + }, + } + batch := outest.NewBatch(event) + var countLogs int + otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { + countLogs = countLogs + ld.LogRecordCount() + return nil + }) + otelConsumer.beatInfo.ComponentID = tc.componentID + err := otelConsumer.Publish(ctx, batch) + assert.NoError(t, err) + assert.Len(t, batch.Signals, 1) + assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) + assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed") + for _, event := range batch.Events() { + beatEvent := event.Content.Fields.Flatten() + assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record") + } + }) + } + }) } diff --git a/x-pack/metricbeat/mbreceiver/factory.go b/x-pack/metricbeat/mbreceiver/factory.go index 08cc155eb075..147f4605bfa3 100644 --- a/x-pack/metricbeat/mbreceiver/factory.go +++ b/x-pack/metricbeat/mbreceiver/factory.go @@ -51,7 +51,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core()) + b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go index c050df4a0d2e..332d23fddc11 100644 --- a/x-pack/metricbeat/mbreceiver/receiver_test.go +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -86,6 +86,8 @@ func TestNewReceiver(t *testing.T) { require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 }, "expected at least one ingest log, got logs: %v", logs["r1"]) + assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket, "stats") @@ -198,9 +200,13 @@ func TestMultipleReceivers(t *testing.T) { }, AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { _ = zapLogs - assert.Conditionf(c, func() bool { + require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 && len(logs["r2"]) > 0 }, "expected at least one ingest log for each receiver, got logs: %v", logs) + assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") + assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") + assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") + assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record") var lastError strings.Builder assert.Conditionf(c, func() bool { tests := []string{monitorSocket1, monitorSocket2} diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go index fd2f3a68b79d..a2e585283234 100644 --- a/x-pack/metricbeat/tests/integration/otel_test.go +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -137,9 +137,19 @@ http.port: {{.MonitoringPort}} }, 1*time.Minute, 1*time.Second, "expected at least 1 log for metricbeat and otel receiver") - otelDoc := otelDocs.Hits.Hits[0] - metricbeatDoc := metricbeatDocs.Hits.Hits[0] - assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal") + var metricbeatDoc, otelDoc mapstr.M + otelDoc = otelDocs.Hits.Hits[0].Source + metricbeatDoc = metricbeatDocs.Hits.Hits[0].Source + ignoredFields := []string{ + // only present in beats receivers + "agent.otelcol.component.id", + "agent.otelcol.component.kind", + } + assert.Equal(t, "metricbeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") + assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in metricbeat log record") + assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in metricbeat log record") + assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, ignoredFields, "expected documents keys to be equal") assertMonitoring(t, optionsValue.MonitoringPort) }