Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 299 additions & 0 deletions x-pack/metricbeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"fmt"
"net/http"
"path/filepath"
"strings"
"testing"
"text/template"
Expand Down Expand Up @@ -146,6 +147,304 @@ func assertMonitoring(t *testing.T, port int) {
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}

func TestMetricbeatOTelReceiverE2E(t *testing.T) {
integration.EnsureESIsRunning(t)

host := integration.GetESURL(t, "http")
user := host.User.Username()
password, _ := host.User.Password()

namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")

mbReceiverIndex := "logs-integration-mbreceiver-" + namespace
mbIndex := "logs-metricbeat-mb-" + namespace

type options struct {
Index string
ESURL string
Username string
Password string
}

cfg := `receivers:
metricbeatreceiver:
metricbeat:
modules:
- module: system
enabled: true
period: 1s
processes:
- '.*'
metricsets:
- cpu
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
queue.mem.flush.timeout: 0s
exporters:
debug:
use_internal_logger: false
verbosity: detailed
elasticsearch/log:
endpoints:
- {{.ESURL}}
compression: none
user: {{.Username}}
password: {{.Password}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
mapping:
mode: bodymap
service:
pipelines:
logs:
receivers:
- metricbeatreceiver
exporters:
- elasticsearch/log
- debug
`

// start metricbeat in otel mode
metricbeatOTel := integration.NewBeat(
t,
"metricbeat-otel",
"../../metricbeat.test",
"otel",
)

var configBuffer bytes.Buffer
require.NoError(t, template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, options{
Index: mbReceiverIndex,
ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host),
Username: user,
Password: password,
}))

metricbeatOTel.WriteConfigFile(configBuffer.String())
metricbeatOTel.Start()
defer metricbeatOTel.Stop()

var beatsCfgFile = `receivers:
metricbeat:
modules:
- module: system
enabled: true
period: 1s
processes:
- '.*'
metricsets:
- cpu
output:
elasticsearch:
hosts:
- {{ .ESURL }}
username: {{ .Username }}
password: {{ .Password }}
index: {{ .Index }}
queue.mem.flush.timeout: 0s
setup.template.enabled: false
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
`
var mbConfigBuffer bytes.Buffer
require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&mbConfigBuffer, options{
Index: mbIndex,
ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host),
Username: user,
Password: password,
}))
metricbeat := integration.NewBeat(t, "metricbeat", "../../metricbeat.test")
metricbeat.WriteConfigFile(mbConfigBuffer.String())
metricbeat.Start()
defer metricbeat.Stop()

// prepare to query ES
es := integration.GetESClient(t, "http")

var metricbeatDocs estools.Documents
var otelDocs estools.Documents
var err error

require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+mbReceiverIndex+"*")
require.NoError(t, err)

metricbeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+mbIndex+"*")
require.NoError(t, err)

return otelDocs.Hits.Total.Value >= 1 && metricbeatDocs.Hits.Total.Value >= 1
},
2*time.Minute, 1*time.Second, "expected at least 1 log")
otelDoc := otelDocs.Hits.Hits[0]
metricbeatDoc := metricbeatDocs.Hits.Hits[0]
assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal")
}

func TestMetricbeatOTelMultipleReceiversE2E(t *testing.T) {
integration.EnsureESIsRunning(t)

host := integration.GetESURL(t, "http")
user := host.User.Username()
password, _ := host.User.Password()

metricbeatOTel := integration.NewBeat(
t,
"metricbeat-otel",
"../../metricbeat.test",
"otel",
)

type receiverConfig struct {
MonitoringPort int
InputFile string
PathHome string
}

namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
otelConfig := struct {
Index string
Username string
Password string
Receivers []receiverConfig
}{
Index: "logs-integration-" + namespace,
Username: user,
Password: password,
Receivers: []receiverConfig{
{
MonitoringPort: 5066,
PathHome: filepath.Join(metricbeatOTel.TempDir(), "r1"),
},
{
MonitoringPort: 5067,
PathHome: filepath.Join(metricbeatOTel.TempDir(), "r2"),
},
},
}

cfg := `receivers:
{{range $i, $receiver := .Receivers}}
metricbeatreceiver/{{$i}}:
metricbeat:
modules:
- module: system
enabled: true
period: 1s
processes:
- '.*'
metricsets:
- cpu
processors:
- add_fields:
target: ''
fields:
receiverid: "{{$i}}"
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
queue.mem.flush.timeout: 0s
path.home: {{$receiver.PathHome}}
{{if $receiver.MonitoringPort}}
http.enabled: true
http.host: localhost
http.port: {{$receiver.MonitoringPort}}
{{end}}
{{end}}
exporters:
debug:
use_internal_logger: false
verbosity: detailed
elasticsearch/log:
endpoints:
- http://localhost:9200
compression: none
user: {{.Username}}
password: {{.Password}}
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
mapping:
mode: bodymap
service:
pipelines:
logs:
receivers:
{{range $i, $receiver := .Receivers}}
- metricbeatreceiver/{{$i}}
{{end}}
exporters:
- debug
- elasticsearch/log
`
var configBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig))
configContents := configBuffer.Bytes()

t.Cleanup(func() {
if t.Failed() {
t.Logf("Config contents:\n%s", configContents)
}
})

metricbeatOTel.WriteConfigFile(string(configContents))
metricbeatOTel.Start()
defer metricbeatOTel.Stop()

es := integration.GetESClient(t, "http")

var r0Docs, r1Docs estools.Documents
var err error

require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second)
defer findCancel()

r0Docs, err = estools.PerformQueryForRawQuery(findCtx, map[string]any{
"query": map[string]any{
"match": map[string]any{
"receiverid": "0",
},
},
}, ".ds-"+otelConfig.Index+"*", es)
require.NoError(t, err)

r1Docs, err = estools.PerformQueryForRawQuery(findCtx, map[string]any{
"query": map[string]any{
"match": map[string]any{
"receiverid": "1",
},
},
}, ".ds-"+otelConfig.Index+"*", es)
require.NoError(t, err)

return r0Docs.Hits.Total.Value >= 1 && r1Docs.Hits.Total.Value >= 1
},
1*time.Minute, 100*time.Millisecond, "expected at least 1 log for each receiver")
assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, []string{}, "expected documents keys to be equal")
for _, rec := range otelConfig.Receivers {
assertMonitoring(t, rec.MonitoringPort)
}
}

func assertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) {
t.Helper()
// Delete all ignored fields.
Expand Down
Loading