Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Info struct {
FIPSDistribution bool // If the beat was compiled as a FIPS distribution.

LogConsumer consumer.Logs // otel log consumer
ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs"
UseDefaultProcessors bool // Whether to use the default processors
Logger *logp.Logger
}
Expand Down
12 changes: 7 additions & 5 deletions libbeat/otelbeat/oteltest/oteltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package oteltest

import (
"context"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -107,17 +108,17 @@ func CheckReceivers(params CheckReceiversParams) {
require.NotEmpty(t, rc.Beat, "receiver beat must not be empty")

var receiverSettings receiver.Settings
receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name)

// Replicate the behavior of the collector logger
receiverCore := core.
With([]zapcore.Field{
zap.String("otelcol.component.id", rc.Name),
zap.String("otelcol.component.id", receiverSettings.ID.String()),
zap.String("otelcol.component.kind", "receiver"),
zap.String("otelcol.signal", "logs"),
})

receiverSettings.Logger = zap.New(receiverCore)
receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name)

logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
for _, rl := range ld.ResourceLogs().All() {
Expand Down Expand Up @@ -160,9 +161,9 @@ func CheckReceivers(params CheckReceiversParams) {
}
})

beatForCompID := func(compID string) string {
beatForCompName := func(compName string) string {
for _, rec := range params.Receivers {
if rec.Name == compID {
if rec.Name == compName {
return rec.Beat
}
}
Expand All @@ -183,8 +184,9 @@ func CheckReceivers(params CheckReceiversParams) {
require.Contains(ct, zl.ContextMap(), "otelcol.component.id")
compID, ok := zl.ContextMap()["otelcol.component.id"].(string)
require.True(ct, ok, "otelcol.component.id should be a string")
compName := strings.Split(compID, "/")[1]
require.Contains(ct, zl.ContextMap(), "service.name")
require.Equal(ct, beatForCompID(compID), zl.ContextMap()["service.name"])
require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"])
break
}
require.NotNil(ct, host.Evt, "expected not nil, got nil")
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/fbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core())
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand Down
7 changes: 7 additions & 0 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func TestNewReceiver(t *testing.T) {
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
_ = zapLogs
require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"]))
assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket, "stats")
Expand Down Expand Up @@ -232,6 +234,11 @@ func TestMultipleReceivers(t *testing.T) {
require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")

assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record")

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
// logger in Beats was global, causing logger fields to be
Expand Down
13 changes: 11 additions & 2 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,27 @@ setup.template.pattern: logs-filebeat-default
},
2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", numEvents)

filebeatDoc := filebeatDocs.Hits.Hits[0].Source
otelDoc := otelDocs.Hits.Hits[0].Source
var filebeatDoc, otelDoc mapstr.M
filebeatDoc = filebeatDocs.Hits.Hits[0].Source
otelDoc = otelDocs.Hits.Hits[0].Source
ignoredFields := []string{
// Expected to change between agentDocs and OtelDocs
"@timestamp",
"agent.ephemeral_id",
"agent.id",
"log.file.inode",
"log.file.path",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}

assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")

assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record")
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record")
assertMonitoring(t, otelMonitoringPort)
}

Expand Down
3 changes: 2 additions & 1 deletion x-pack/libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*instance.Beat, error) {
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
b, err := instance.NewBeat(settings.Name,
settings.IndexPrefix,
settings.Version,
Expand All @@ -43,6 +43,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
return nil, err
}

b.Info.ComponentID = componentID
b.Info.LogConsumer = consumer

// begin code similar to configure
Expand Down
13 changes: 13 additions & 0 deletions x-pack/libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
esDocumentIDAttribute = "elasticsearch.document_id"
beatNameCtxKey = "beat_name"
beatVersionCtxtKey = "beat_version"
// otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event.
otelComponentIDKey = "otelcol.component.id"
// otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver".
otelComponentKindKey = "otelcol.component.kind"
)

func init() {
Expand Down Expand Up @@ -145,6 +149,15 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
}
logRecord.SetObservedTimestamp(observedTimestamp)

if agent, _ := beatEvent.GetValue("agent"); agent != nil {
switch agent := agent.(type) {
case mapstr.M:
agent[otelComponentIDKey] = out.beatInfo.ComponentID
agent[otelComponentKindKey] = "receiver"
beatEvent["agent"] = agent
}
}

otelmap.ConvertNonPrimitive(beatEvent)

// if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing
Expand Down
48 changes: 47 additions & 1 deletion x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ func TestPublish(t *testing.T) {
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
})

t.Run("sets the client context metadata with the beat info", func(t *testing.T) {
batch := outest.NewBatch(event1)
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
Expand All @@ -269,4 +268,51 @@ func TestPublish(t *testing.T) {
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
})
t.Run("sets otel specific-fields", func(t *testing.T) {
testCases := []struct {
name string
componentID string
componentKind string
expectedComponentID string
expectedComponentKind string
}{
{
name: "sets beat component ID",
componentID: "filebeatreceiver/1",
expectedComponentID: "filebeatreceiver/1",
expectedComponentKind: "receiver",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
event := beat.Event{
Fields: mapstr.M{
"field": 1,
"agent": mapstr.M{},
},
Meta: mapstr.M{
"_id": "abc123",
},
}
batch := outest.NewBatch(event)
var countLogs int
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
countLogs = countLogs + ld.LogRecordCount()
return nil
})
otelConsumer.beatInfo.ComponentID = tc.componentID
err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed")
for _, event := range batch.Events() {
beatEvent := event.Content.Fields.Flatten()
assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record")
}
})
}
})
}
2 changes: 1 addition & 1 deletion x-pack/metricbeat/mbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core())
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand Down
8 changes: 7 additions & 1 deletion x-pack/metricbeat/mbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func TestNewReceiver(t *testing.T) {
require.Conditionf(c, func() bool {
return len(logs["r1"]) > 0
}, "expected at least one ingest log, got logs: %v", logs["r1"])
assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket, "stats")
Expand Down Expand Up @@ -198,9 +200,13 @@ func TestMultipleReceivers(t *testing.T) {
},
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
_ = zapLogs
assert.Conditionf(c, func() bool {
require.Conditionf(c, func() bool {
return len(logs["r1"]) > 0 && len(logs["r2"]) > 0
}, "expected at least one ingest log for each receiver, got logs: %v", logs)
assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record")
var lastError strings.Builder
assert.Conditionf(c, func() bool {
tests := []string{monitorSocket1, monitorSocket2}
Expand Down
16 changes: 13 additions & 3 deletions x-pack/metricbeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,19 @@ http.port: {{.MonitoringPort}}
},
1*time.Minute, 1*time.Second, "expected at least 1 log for metricbeat and otel receiver")

otelDoc := otelDocs.Hits.Hits[0]
metricbeatDoc := metricbeatDocs.Hits.Hits[0]
assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal")
var metricbeatDoc, otelDoc mapstr.M
otelDoc = otelDocs.Hits.Hits[0].Source
metricbeatDoc = metricbeatDocs.Hits.Hits[0].Source
ignoredFields := []string{
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}
assert.Equal(t, "metricbeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in metricbeat log record")
assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in metricbeat log record")
assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, ignoredFields, "expected documents keys to be equal")
assertMonitoring(t, optionsValue.MonitoringPort)
}

Expand Down
Loading