Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Info struct {
StatsRegistry *monitoring.Registry
}
LogConsumer consumer.Logs // otel log consumer
ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs"
UseDefaultProcessors bool // Whether to use the default processors
}

Expand Down
12 changes: 7 additions & 5 deletions libbeat/otelbeat/oteltest/oteltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package oteltest

import (
"context"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -104,17 +105,17 @@ func CheckReceivers(params CheckReceiversParams) {
require.NotEmpty(t, rc.Beat, "receiver beat must not be empty")

var receiverSettings receiver.Settings
receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name)

// Replicate the behavior of the collector logger
receiverCore := core.
With([]zapcore.Field{
zap.String("otelcol.component.id", rc.Name),
zap.String("otelcol.component.id", receiverSettings.ID.String()),
zap.String("otelcol.component.kind", "receiver"),
zap.String("otelcol.signal", "logs"),
})

receiverSettings.Logger = zap.New(receiverCore)
receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name)

logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
for _, rl := range ld.ResourceLogs().All() {
Expand Down Expand Up @@ -157,9 +158,9 @@ func CheckReceivers(params CheckReceiversParams) {
}
})

beatForCompID := func(compID string) string {
beatForCompName := func(compName string) string {
for _, rec := range params.Receivers {
if rec.Name == compID {
if rec.Name == compName {
return rec.Beat
}
}
Expand All @@ -180,8 +181,9 @@ func CheckReceivers(params CheckReceiversParams) {
require.Contains(ct, zl.ContextMap(), "otelcol.component.id")
compID, ok := zl.ContextMap()["otelcol.component.id"].(string)
require.True(ct, ok, "otelcol.component.id should be a string")
compName := strings.Split(compID, "/")[1]
require.Contains(ct, zl.ContextMap(), "service.name")
require.Equal(ct, beatForCompID(compID), zl.ContextMap()["service.name"])
require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"])
break
}
require.NotNilf(ct, host.Evt, "expected not nil nil, got %v", host.Evt)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/fbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.Logger.Core())
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand Down
19 changes: 19 additions & 0 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,22 @@
},
AssertFunc: func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) {
_ = zapLogs
<<<<<<< HEAD

Check failure on line 69 in x-pack/filebeat/fbreceiver/receiver_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

expected statement, found '<<' (typecheck)
require.Lenf(t, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"]))
assert.Condition(t, func() bool {
=======
require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"]))
assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record")
var lastError strings.Builder
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket, "stats")
}, "failed to connect to monitoring socket, stats endpoint, last error was: %s", &lastError)
assert.Conditionf(c, func() bool {
return getFromSocket(t, &lastError, monitorSocket, "inputs")
}, "failed to connect to monitoring socket, inputs endpoint, last error was: %s", &lastError)
assert.Condition(c, func() bool {
>>>>>>> fafbdcbd8 (otel: add otel-specific fields to ingested docs (#45242))

Check failure on line 84 in x-pack/filebeat/fbreceiver/receiver_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

expected statement, found '>>' (typecheck)
processorsLoaded := zapLogs.FilterMessageSnippet("Generated new processors").
FilterMessageSnippet("add_host_metadata").
FilterMessageSnippet("add_cloud_metadata").
Expand Down Expand Up @@ -184,6 +198,11 @@
require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs")
require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs")

assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record")
assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record")
assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record")
assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record")

// Make sure that each receiver has a separate logger
// instance and does not interfere with others. Previously, the
// logger in Beats was global, causing logger fields to be
Expand Down
185 changes: 185 additions & 0 deletions x-pack/filebeat/input/gcppubsub/otel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration && !agentbeat

package gcppubsub

import (
"bytes"
"context"
"fmt"
"testing"
"text/template"
"time"

"github.com/gofrs/uuid/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
"github.com/elastic/beats/v7/libbeat/tests/integration"

"github.com/elastic/elastic-agent-libs/testing/estools"
)

func TestGCPInputOTelE2E(t *testing.T) {
integration.EnsureESIsRunning(t)

// Create pubsub client for setting up and communicating to emulator.
client, clientCancel := testSetup(t)
defer func() {
clientCancel()
client.Close()
}()

createTopic(t, client)
createSubscription(t, "test-subscription-otel", client)

Check failure on line 38 in x-pack/filebeat/input/gcppubsub/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

too many arguments in call to createSubscription
createSubscription(t, "test-subscription-fb", client)

Check failure on line 39 in x-pack/filebeat/input/gcppubsub/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

too many arguments in call to createSubscription
const numMsgs = 10
publishMessages(t, client, numMsgs)

host := integration.GetESURL(t, "http")
user := host.User.Username()
password, _ := host.User.Password()

// create a random uuid and make sure it doesn't contain dashes/
otelNamespace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4()))
fbNameSpace := fmt.Sprintf("%x", uuid.Must(uuid.NewV4()))

otelIndex := "logs-integration-" + otelNamespace
fbIndex := "logs-integration-" + fbNameSpace

type options struct {
Namespace string
ESURL string
Username string
Password string
Subscription string
}

gcpConfig := `filebeat.inputs:
- type: gcp-pubsub
project_id: test-project-id
topic: test-topic-foo
subscription.name: {{ .Subscription }}
credentials_file: "testdata/fake.json"

output:
elasticsearch:
hosts:
- {{ .ESURL }}
username: {{ .Username }}
password: {{ .Password }}
index: logs-integration-{{ .Namespace }}

queue.mem.flush.timeout: 0s
setup.template.enabled: false
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
`

// start filebeat in otel mode
filebeatOTel := integration.NewBeat(
t,
"filebeat-otel",
"../../filebeat.test",
"otel",
)

optionsValue := options{
ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host),
Username: user,
Password: password,
}

var configBuffer bytes.Buffer
optionsValue.Namespace = otelNamespace
optionsValue.Subscription = "test-subscription-otel"
require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue))

filebeatOTel.WriteConfigFile(configBuffer.String())

filebeatOTel.Start()
defer filebeatOTel.Stop()

// reset buffer
configBuffer.Reset()

optionsValue.Namespace = fbNameSpace
optionsValue.Subscription = "test-subscription-fb"
require.NoError(t, template.Must(template.New("config").Parse(gcpConfig)).Execute(&configBuffer, optionsValue))

// start filebeat
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)

filebeat.WriteConfigFile(configBuffer.String())
filebeat.Start()
defer filebeat.Stop()

// prepare to query ES
es := integration.GetESClient(t, "http")

Check failure on line 129 in x-pack/filebeat/input/gcppubsub/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

undefined: integration.GetESClient

t.Cleanup(func() {
_, err := es.Indices.DeleteDataStream([]string{
otelIndex,
fbIndex,
})
require.NoError(t, err, "failed to delete indices")
})

rawQuery := map[string]any{
"query": map[string]any{
"match_phrase": map[string]any{
"input.type": "gcp-pubsub",
},
},
"sort": []map[string]any{
{"@timestamp": map[string]any{"order": "asc"}},
},
}

var filebeatDocs estools.Documents
var otelDocs estools.Documents
var err error

// wait for logs to be published
require.EventuallyWithTf(t,
func(ct *assert.CollectT) {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

otelDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+otelIndex+"*", es)
assert.NoError(ct, err)
assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, 1, "expected at least 1 otel document, got %d", otelDocs.Hits.Total.Value)

filebeatDocs, err = estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+fbIndex+"*", es)
assert.NoError(ct, err)
assert.GreaterOrEqual(ct, filebeatDocs.Hits.Total.Value, 1, "expected at least 1 filebeat document, got %d", filebeatDocs.Hits.Total.Value)
},
3*time.Minute, 1*time.Second, "expected at least 1 document for both filebeat and otel modes")

filebeatDoc := filebeatDocs.Hits.Hits[0].Source
otelDoc := otelDocs.Hits.Hits[0].Source
ignoredFields := []string{
// Expected to change between agentDocs and OtelDocs
"@timestamp",
"agent.ephemeral_id",
"agent.id",
"event.created",
// only present in beats receivers
"agent.otelcol.component.id",
"agent.otelcol.component.kind",
}

oteltest.AssertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")

Check failure on line 183 in x-pack/filebeat/input/gcppubsub/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

undefined: oteltest.AssertMapsEqual (typecheck)

}
Loading
Loading