Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
fa7b90c
otel: add otelcol.component.id field to ingested docs
mauri870 Jul 8, 2025
6ba3880
adjust otelcol field name
mauri870 Jul 8, 2025
ce990d1
rename otelcol.component.id in beats event to otel.component.name
mauri870 Jul 10, 2025
7b67ee9
add otel.component.type
mauri870 Jul 10, 2025
9a6f3a1
update metricbeatreceiver unit tests
mauri870 Jul 10, 2025
67939f5
add integration test coverage
mauri870 Jul 10, 2025
f310226
update receiver name in test pipeline
mauri870 Jul 10, 2025
c149985
use filestream as receiver name
mauri870 Jul 10, 2025
f546f89
revert to otelcol.component.id and otelcol.component.kind
mauri870 Jul 15, 2025
d6611b3
fix tests
mauri870 Jul 15, 2025
2e6c001
ignore fields
mauri870 Jul 15, 2025
fe84bf1
Merge branch 'main' into otel-component-id
mauri870 Jul 16, 2025
c0d6e48
ignore otel fields in gcppubsub test
mauri870 Jul 16, 2025
c650857
move assertion on otel fields to the up
mauri870 Jul 16, 2025
975ca68
fix linter issues
mauri870 Jul 16, 2025
69436ae
handle existing otelcol.component.id
mauri870 Jul 17, 2025
3fbd74c
prevent otelcol fields from being replaced if they exist
mauri870 Jul 17, 2025
b7ca458
Merge branch 'main' into otel-component-id
mauri870 Aug 5, 2025
c0fc12a
put otelcol fields nested inside agent property
mauri870 Aug 5, 2025
3489181
Merge branch 'main' into otel-component-id
mauri870 Aug 6, 2025
6fa3b41
fix E2E tests
mauri870 Aug 6, 2025
57aaf89
Merge branch 'main' into otel-component-id
mauri870 Aug 6, 2025
ad61942
Merge branch 'main' into otel-component-id
mauri870 Aug 6, 2025
796a49e
Merge branch 'main' into otel-component-id
mauri870 Aug 11, 2025
30dd986
Merge branch 'main' into otel-component-id
mauri870 Aug 11, 2025
cb239c1
Merge branch 'main' into otel-component-id
mauri870 Aug 12, 2025
ea8e59d
Merge branch 'main' into otel-component-id
mauri870 Aug 12, 2025
db14928
use EventuallyWithTf
mauri870 Aug 13, 2025
2349bd3
Merge branch 'main' into otel-component-id
mauri870 Aug 18, 2025
58a5767
Merge branch 'main' into otel-component-id
mauri870 Aug 18, 2025
3e85a54
Merge branch 'main' into otel-component-id
mauri870 Aug 19, 2025
dab2ead
Merge branch 'main' into otel-component-id
mauri870 Aug 25, 2025
a45c4df
Merge branch 'main' into otel-component-id
mauri870 Aug 26, 2025
8ad2789
Merge branch 'main' into otel-component-id
mauri870 Aug 28, 2025
79d62bb
Merge branch 'main' into otel-component-id
mauri870 Sep 1, 2025
9748ca5
fix go vet issue with logger Warnf
mauri870 Sep 1, 2025
0b3ba27
Merge branch 'main' into otel-component-id
mauri870 Sep 2, 2025
a8743ac
Merge branch 'main' into otel-component-id
mauri870 Sep 3, 2025
8b9d402
Merge branch 'main' into otel-component-id
mauri870 Sep 3, 2025
1f59834
Merge branch 'main' into otel-component-id
mauri870 Sep 4, 2025
2353d18
Merge branch 'main' into otel-component-id
mauri870 Sep 4, 2025
3fa9a89
Merge branch 'main' into otel-component-id
mauri870 Sep 4, 2025
ea52083
Merge branch 'main' into otel-component-id
mauri870 Sep 5, 2025
d57ebe5
Merge branch 'main' into otel-component-id
mauri870 Sep 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Info struct {
FIPSDistribution bool // If the beat was compiled as a FIPS distribution.

LogConsumer consumer.Logs // otel log consumer
ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs"
UseDefaultProcessors bool // Whether to use the default processors
Logger *logp.Logger
}
Expand Down
12 changes: 7 additions & 5 deletions libbeat/otelbeat/oteltest/oteltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package oteltest

import (
"context"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -104,17 +105,17 @@ func CheckReceivers(params CheckReceiversParams) {
require.NotEmpty(t, rc.Beat, "receiver beat must not be empty")

var receiverSettings receiver.Settings
receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name)

// Replicate the behavior of the collector logger
receiverCore := core.
With([]zapcore.Field{
zap.String("otelcol.component.id", rc.Name),
zap.String("otelcol.component.id", receiverSettings.ID.String()),
zap.String("otelcol.component.kind", "receiver"),
zap.String("otelcol.signal", "logs"),
})

receiverSettings.Logger = zap.New(receiverCore)
receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name)

logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
for _, rl := range ld.ResourceLogs().All() {
Expand Down Expand Up @@ -157,9 +158,9 @@ func CheckReceivers(params CheckReceiversParams) {
}
})

beatForCompID := func(compID string) string {
beatForCompName := func(compName string) string {
for _, rec := range params.Receivers {
if rec.Name == compID {
if rec.Name == compName {
return rec.Beat
}
}
Expand All @@ -180,8 +181,9 @@ func CheckReceivers(params CheckReceiversParams) {
require.Contains(ct, zl.ContextMap(), "otelcol.component.id")
compID, ok := zl.ContextMap()["otelcol.component.id"].(string)
require.True(ct, ok, "otelcol.component.id should be a string")
compName := strings.Split(compID, "/")[1]
require.Contains(ct, zl.ContextMap(), "service.name")
require.Equal(ct, beatForCompID(compID), zl.ContextMap()["service.name"])
require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"])
break
}
require.NotNil(ct, host.Evt, "expected not nil, got nil")
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/fbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core())
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand Down
7 changes: 7 additions & 0 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func TestNewReceiver(t *testing.T) {
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
_ = zapLogs
require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"]))
assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket, "stats")
Expand Down Expand Up @@ -232,6 +234,11 @@ func TestMultipleReceivers(t *testing.T) {
require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")

assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record")

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
// logger in Beats was global, causing logger fields to be
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/input/gcppubsub/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ processors:
"agent.ephemeral_id",
"agent.id",
"event.created",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
Comment thread
mauri870 marked this conversation as resolved.
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
Expand Down
33 changes: 27 additions & 6 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
libbeattesting "github.com/elastic/beats/v7/libbeat/testing"
"github.com/elastic/beats/v7/libbeat/tests/integration"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/testing/estools"
)

Expand Down Expand Up @@ -120,18 +121,27 @@ http.port: %d
},
2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", numEvents)

filebeatDoc := filebeatDocs.Hits.Hits[0].Source
otelDoc := otelDocs.Hits.Hits[0].Source
var filebeatDoc, otelDoc mapstr.M
filebeatDoc = filebeatDocs.Hits.Hits[0].Source
otelDoc = otelDocs.Hits.Hits[0].Source
ignoredFields := []string{
// Expected to change between agentDocs and OtelDocs
"@timestamp",
"agent.ephemeral_id",
"agent.id",
"log.file.inode",
"log.file.path",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")

assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record")
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record")
assertMonitoring(t, otelMonitoringPort)
}

Expand Down Expand Up @@ -252,6 +262,9 @@ processors:
"agent.ephemeral_id",
"agent.id",
"event.created",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
Expand Down Expand Up @@ -325,7 +338,7 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) {
}

cfg := `receivers:
filebeatreceiver:
filebeatreceiver/filestream:
filebeat:
inputs:
- type: filestream
Expand Down Expand Up @@ -366,7 +379,7 @@ service:
pipelines:
logs:
receivers:
- filebeatreceiver
- filebeatreceiver/filestream
exporters:
- elasticsearch/log
- debug
Expand Down Expand Up @@ -452,18 +465,26 @@ http.port: %d
},
2*time.Minute, 1*time.Second, "expected at least %d events for both filebeat and otel", wantEvents)

filebeatDoc := filebeatDocs.Hits.Hits[0].Source
otelDoc := otelDocs.Hits.Hits[0].Source
var filebeatDoc, otelDoc mapstr.M
filebeatDoc = filebeatDocs.Hits.Hits[0].Source
otelDoc = otelDocs.Hits.Hits[0].Source
ignoredFields := []string{
// Expected to change between agentDocs and OtelDocs
"@timestamp",
"agent.ephemeral_id",
"agent.id",
"log.file.inode",
"log.file.path",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
assert.Equal(t, "filebeatreceiver/filestream", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record")
assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record")
assertMonitoring(t, otelConfig.MonitoringPort)
assertMonitoring(t, filebeatMonitoringPort) // filebeat
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, core zapcore.Core) (*instance.Beat, error) {
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
b, err := instance.NewBeat(settings.Name,
settings.IndexPrefix,
settings.Version,
Expand All @@ -43,6 +43,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
return nil, err
}

b.Info.ComponentID = componentID
b.Info.LogConsumer = consumer

// begin code similar to configure
Expand Down
17 changes: 15 additions & 2 deletions x-pack/libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ import (
const (
// esDocumentIDAttribute is the attribute key used to store the document ID in the log record.
esDocumentIDAttribute = "elasticsearch.document_id"
beatNameCtxKey = "beat_name"
beatVersionCtxtKey = "beat_version"
// otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event.
otelComponentIDKey = "otelcol.component.id"
// otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver".
otelComponentKindKey = "otelcol.component.kind"
beatNameCtxKey = "beat_name"
beatVersionCtxtKey = "beat_version"
)

func init() {
Expand Down Expand Up @@ -145,6 +149,15 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
}
logRecord.SetObservedTimestamp(observedTimestamp)

if agent, _ := beatEvent.GetValue("agent"); agent != nil {
switch agent := agent.(type) {
case mapstr.M:
agent[otelComponentIDKey] = out.beatInfo.ComponentID
agent[otelComponentKindKey] = "receiver"
beatEvent["agent"] = agent
}
}

otelmap.ConvertNonPrimitive(beatEvent)

// if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing
Expand Down
46 changes: 46 additions & 0 deletions x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,53 @@ func TestPublish(t *testing.T) {
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
})
t.Run("sets otel specific-fields", func(t *testing.T) {
testCases := []struct {
name string
componentID string
componentKind string
expectedComponentID string
expectedComponentKind string
}{
{
name: "sets beat component ID",
componentID: "filebeatreceiver/1",
expectedComponentID: "filebeatreceiver/1",
expectedComponentKind: "receiver",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
event := beat.Event{
Fields: mapstr.M{
"field": 1,
"agent": mapstr.M{},
},
Meta: mapstr.M{
"_id": "abc123",
},
}
batch := outest.NewBatch(event)
var countLogs int
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
countLogs = countLogs + ld.LogRecordCount()
return nil
})
otelConsumer.beatInfo.ComponentID = tc.componentID
err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed")
for _, event := range batch.Events() {
beatEvent := event.Content.Fields.Flatten()
assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record")
}
})
}
})
t.Run("sets the client context metadata with the beat info", func(t *testing.T) {
batch := outest.NewBatch(event1)
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/mbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core())
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand Down
8 changes: 7 additions & 1 deletion x-pack/metricbeat/mbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func TestNewReceiver(t *testing.T) {
require.Conditionf(c, func() bool {
return len(logs["r1"]) > 0
}, "expected at least one ingest log, got logs: %v", logs["r1"])
assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket, "stats")
Expand Down Expand Up @@ -198,9 +200,13 @@ func TestMultipleReceivers(t *testing.T) {
},
AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
_ = zapLogs
assert.Conditionf(c, func() bool {
require.Conditionf(c, func() bool {
return len(logs["r1"]) > 0 && len(logs["r2"]) > 0
}, "expected at least one ingest log for each receiver, got logs: %v", logs)
assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record")
var lastError strings.Builder
assert.Conditionf(c, func() bool {
tests := []string{monitorSocket1, monitorSocket2}
Expand Down
32 changes: 26 additions & 6 deletions x-pack/metricbeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,19 @@ http.port: {{.MonitoringPort}}
},
1*time.Minute, 1*time.Second, "expected at least 1 log for metricbeat and otel receiver")

otelDoc := otelDocs.Hits.Hits[0]
metricbeatDoc := metricbeatDocs.Hits.Hits[0]
assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal")
var metricbeatDoc, otelDoc mapstr.M
otelDoc = otelDocs.Hits.Hits[0].Source
metricbeatDoc = metricbeatDocs.Hits.Hits[0].Source
ignoredFields := []string{
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}
assert.Equal(t, "metricbeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in metricbeat log record")
assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in metricbeat log record")
assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, ignoredFields, "expected documents keys to be equal")
assertMonitoring(t, optionsValue.MonitoringPort)
}

Expand Down Expand Up @@ -308,7 +318,12 @@ processors:
1*time.Minute, 1*time.Second, "expected at least a single log for metricbeat and otel mode")
otelDoc := otelDocs.Hits.Hits[0]
metricbeatDoc := metricbeatDocs.Hits.Hits[0]
assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal")
ignoredFields := []string{
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}
assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, ignoredFields, "expected documents keys to be equal")
}

func TestMetricbeatOTelMultipleReceiversE2E(t *testing.T) {
Expand Down Expand Up @@ -464,8 +479,13 @@ service:
assert.GreaterOrEqualf(ct, r0Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 0, got %d", r0Docs.Hits.Total.Value)
assert.GreaterOrEqualf(ct, r1Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 1, got %d", r1Docs.Hits.Total.Value)
},
1*time.Minute, 100*time.Millisecond, "expected to found receiver logs")
assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, []string{}, "expected documents keys to be equal")
1*time.Minute, 100*time.Millisecond, "expected at least 1 log for each receiver")
ignoredFields := []string{
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}
assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, ignoredFields, "expected documents keys to be equal")
for _, rec := range otelConfig.Receivers {
assertMonitoring(t, rec.MonitoringPort)
}
Expand Down
Loading