diff --git a/dev-tools/mage/build.go b/dev-tools/mage/build.go index d4ed8aaaf95e..ced63a6625d9 100644 --- a/dev-tools/mage/build.go +++ b/dev-tools/mage/build.go @@ -198,6 +198,12 @@ func GolangCrossBuild(params BuildArgs) error { return Build(params) } +func BuildOTel() error { + args := DefaultBuildArgs() + args.ExtraFlags = append(args.ExtraFlags, "-tags", "otelbeat") + return Build(args) +} + // Build invokes "go build" to produce a binary. func Build(params BuildArgs) error { fmt.Println(">> build: Building", params.Name) diff --git a/libbeat/otelbeat/beatconverter/beatconverter.go b/libbeat/otelbeat/beatconverter/beatconverter.go index f46f6f94e8e9..b1071cb6b82c 100644 --- a/libbeat/otelbeat/beatconverter/beatconverter.go +++ b/libbeat/otelbeat/beatconverter/beatconverter.go @@ -29,7 +29,7 @@ import ( ) // list of supported beatreceivers -var supportedReceivers = []string{"filebeatreceiver"} // Add more beat receivers to this list when we add support +var supportedReceivers = []string{"filebeatreceiver", "metricbeatreceiver"} // Add more beat receivers to this list when we add support type converter struct{} diff --git a/libbeat/otelbeat/providers/fbprovider/fbprovider.go b/libbeat/otelbeat/providers/fbprovider/fbprovider.go index 50ba26bdf900..3e677e868fae 100644 --- a/libbeat/otelbeat/providers/fbprovider/fbprovider.go +++ b/libbeat/otelbeat/providers/fbprovider/fbprovider.go @@ -19,67 +19,35 @@ package fbprovider import ( "context" - "fmt" - "path/filepath" - "strings" "go.opentelemetry.io/collector/confmap" - "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/otelbeat/providers" ) const schemeName = "fb" -type provider struct{} +type fbProvider struct{} -// The Provider provides configuration, and allows to watch/monitor for changes. +// NewFactory returns a provider factory that loads filebeat configuration func NewFactory() confmap.ProviderFactory { return confmap.NewProviderFactory(newProvider) } func newProvider(confmap.ProviderSettings) confmap.Provider { - return &provider{} + return &fbProvider{} } // Retrieve retrieves the beat configuration file and constructs otel config -func (fmp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { - if !strings.HasPrefix(uri, schemeName+":") { - return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName) - } - - // Load filebeat config file - cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil) - if err != nil { - return nil, err - } - - var receiverMap map[string]any - err = cfg.Unpack(&receiverMap) - if err != nil { - return nil, err - } - - // filebeat specific configuration is defined here - cfgMap := map[string]any{ - "receivers": map[string]any{ - "filebeatreceiver": receiverMap, - }, - "service": map[string]any{ - "pipelines": map[string]any{ - "logs": map[string]any{ - "receivers": []string{"filebeatreceiver"}, - }, - }, - }, - } - - return confmap.NewRetrieved(cfgMap) +// uri here is the filepath of the beat config +func (*fbProvider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { + return providers.LoadConfig(uri, schemeName) } -func (*provider) Scheme() string { +func (*fbProvider) Scheme() string { return schemeName } -func (*provider) Shutdown(context.Context) error { +func (*fbProvider) Shutdown(context.Context) error { return nil } diff --git a/libbeat/otelbeat/providers/fbprovider/fbprovider_test.go b/libbeat/otelbeat/providers/fbprovider/fbprovider_test.go index 6e4512fc4463..b2f5df08e3fe 100644 --- a/libbeat/otelbeat/providers/fbprovider/fbprovider_test.go +++ b/libbeat/otelbeat/providers/fbprovider/fbprovider_test.go @@ -21,6 +21,7 @@ import ( "context" _ "embed" "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -101,21 +102,18 @@ service: ` func TestFileBeatProvider(t *testing.T) { - p := provider{} + p := fbProvider{} t.Run("test filebeat provider", func(t *testing.T) { - tempFile, err := os.CreateTemp("", "filebeat.yml") - require.NoError(t, err, "error creating temp file") - defer os.Remove(tempFile.Name()) // Clean up the file after we're done - defer tempFile.Close() + tempDir := t.TempDir() - content := []byte(beatsConfig) - _, err = tempFile.Write(content) - require.NoError(t, err, "error creating temp file") + tempFileName := filepath.Join(tempDir, "filebeat.yml") + err := os.WriteFile(tempFileName, []byte(beatsConfig), 0666) + require.NoError(t, err, "error writing to temp file") // prefix file path with fb: - ret, err := p.Retrieve(context.Background(), "fb:"+tempFile.Name(), nil) + ret, err := p.Retrieve(context.Background(), "fb:"+tempFileName, nil) require.NoError(t, err) retValue, err := ret.AsRaw() diff --git a/libbeat/otelbeat/providers/mbprovider/mbprovider.go b/libbeat/otelbeat/providers/mbprovider/mbprovider.go new file mode 100644 index 000000000000..d1dbaecd328e --- /dev/null +++ b/libbeat/otelbeat/providers/mbprovider/mbprovider.go @@ -0,0 +1,55 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package mbprovider + +import ( + "context" + + "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/beats/v7/libbeat/otelbeat/providers" +) + +const schemeName = "mb" + +type mbProvider struct{} + +// NewFactory returns a provider factory that loads metricbeat configuration +func NewFactory() confmap.ProviderFactory { + return confmap.NewProviderFactory(newProvider) +} + +func newProvider(confmap.ProviderSettings) confmap.Provider { + return &mbProvider{} +} + +// Retrieve retrieves the beat configuration file and constructs otel config +// uri here is the filepath of the beat config +func (*mbProvider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { + return providers.LoadConfig(uri, schemeName) +} + +// Scheme returns the scheme name +func (*mbProvider) Scheme() string { + return schemeName +} + +// Shutdown is a noop, it always returns nil +func (*mbProvider) Shutdown(context.Context) error { + return nil +} diff --git a/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go b/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go new file mode 100644 index 000000000000..2ae7e247965b --- /dev/null +++ b/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go @@ -0,0 +1,125 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package mbprovider + +import ( + "context" + _ "embed" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" + + "gopkg.in/yaml.v2" +) + +var beatsConfig = ` +metricbeat.modules: + - module: system + metricsets: + - cpu # CPU usage + - load # CPU load averages + enabled: true + period: 10s + processes: ['.*'] + + +output: + elasticsearch: + hosts: ["https://localhost:9200"] + username: elastic + password: changeme + index: form-otel-exporter + ssl.enabled: false +` + +var expectedOutput = ` +receivers: + metricbeatreceiver: + metricbeat: + modules: + - module: system + enabled: true + metricsets: + - cpu + - load + processes: ['.*'] + period: 10s + path: + config: . + data: ./data + home: . + logs: ./logs + output: + elasticsearch: + hosts: ["https://localhost:9200"] + username: elastic + password: changeme + index: form-otel-exporter + ssl: + enabled: false + +service: + pipelines: + logs: + receivers: + - "metricbeatreceiver" +` + +func TestMetricbeatProvider(t *testing.T) { + p := mbProvider{} + + t.Run("test metricbeat provider", func(t *testing.T) { + + tempDir := t.TempDir() + + tempFileName := filepath.Join(tempDir, "metricbeat.yml") + err := os.WriteFile(tempFileName, []byte(beatsConfig), 0666) + require.NoError(t, err, "error writing to temp file") + + // prefix file path with mb: + ret, err := p.Retrieve(context.Background(), "mb:"+tempFileName, nil) + require.NoError(t, err) + + retValue, err := ret.AsRaw() + require.NoError(t, err) + expOutput := newFromYamlString(t, expectedOutput) + + // convert it into a common type + want, err := yaml.Marshal(expOutput.ToStringMap()) + require.NoError(t, err) + got, err := yaml.Marshal(retValue) + require.NoError(t, err) + + assert.Equal(t, string(want), string(got)) + assert.NoError(t, p.Shutdown(context.Background())) + }) + +} + +func newFromYamlString(t *testing.T, input string) *confmap.Conf { + t.Helper() + var rawConf map[string]any + err := yaml.Unmarshal([]byte(input), &rawConf) + require.NoError(t, err) + + return confmap.NewFromStringMap(rawConf) +} diff --git a/libbeat/otelbeat/providers/providers.go b/libbeat/otelbeat/providers/providers.go new file mode 100644 index 000000000000..adef7052e090 --- /dev/null +++ b/libbeat/otelbeat/providers/providers.go @@ -0,0 +1,69 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package providers + +import ( + "fmt" + "path/filepath" + "strings" + + "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/beats/v7/libbeat/cfgfile" +) + +var schemeMap = map[string]string{ + "fb": "filebeatreceiver", + "mb": "metricbeatreceiver", +} + +// LoadConfig loads beat configuration based on provided scheme-name and uri path +func LoadConfig(uri string, schemeName string) (*confmap.Retrieved, error) { + if !strings.HasPrefix(uri, schemeName+":") { + return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName) + } + + // Load beat config file + cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil) + if err != nil { + return nil, err + } + + var receiverMap map[string]any + err = cfg.Unpack(&receiverMap) + if err != nil { + return nil, err + } + + receiverName := schemeMap[schemeName] + // beat specific configuration is defined here + cfgMap := map[string]any{ + "receivers": map[string]any{ + receiverName: receiverMap, + }, + "service": map[string]any{ + "pipelines": map[string]any{ + "logs": map[string]any{ + "receivers": []string{receiverName}, + }, + }, + }, + } + + return confmap.NewRetrieved(cfgMap) +} diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 3ea2dbed3a29..3391bcd7d4e7 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -693,6 +693,7 @@ func createTempDir(t *testing.T) string { // using the default test credentials or the corresponding environment // variables. func EnsureESIsRunning(t *testing.T) { + t.Helper() esURL := GetESURL(t, "http") ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second)) diff --git a/x-pack/filebeat/cmd/otelcmd_enabled.go b/x-pack/filebeat/cmd/otelcmd_enabled.go index 09bf09cdade2..110ae9f589c7 100644 --- a/x-pack/filebeat/cmd/otelcmd_enabled.go +++ b/x-pack/filebeat/cmd/otelcmd_enabled.go @@ -7,11 +7,10 @@ package cmd import ( - fbcmd "github.com/elastic/beats/v7/filebeat/cmd" cmd "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" ) func addOTelCommand(command *cmd.BeatsRootCmd) { - command.AddCommand(otelbeat.OTelCmd(fbcmd.Name)) + command.AddCommand(otelbeat.OTelCmd(Name)) } diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index 4b95ad92d246..ea5beffd33fc 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -45,9 +45,7 @@ func Build() error { // BuildOTel builds the Beat binary with OTel sub command func BuildOTel() error { - args := devtools.DefaultBuildArgs() - args.ExtraFlags = append(args.ExtraFlags, "-tags", "otelbeat") - return devtools.Build(args) + return devtools.BuildOTel() } // BuildSystemTestBinary builds a binary instrumented for use with Python system tests. diff --git a/x-pack/libbeat/common/otelbeat/otel.go b/x-pack/libbeat/common/otelbeat/otel.go index 551ec161f30a..31ee9f003a5d 100644 --- a/x-pack/libbeat/common/otelbeat/otel.go +++ b/x-pack/libbeat/common/otelbeat/otel.go @@ -22,13 +22,16 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" + "github.com/elastic/beats/v7/libbeat/otelbeat/providers/mbprovider" + "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/metricbeat/mbreceiver" "github.com/elastic/elastic-agent-libs/mapstr" ) var schemeMap = map[string]string{ - "filebeat": "fb", + "filebeat": "fb", + "metricbeat": "mb", } func OTelCmd(beatname string) *cobra.Command { @@ -37,8 +40,6 @@ func OTelCmd(beatname string) *cobra.Command { Use: "otel", Hidden: true, RunE: func(cmd *cobra.Command, args []string) error { - logger := logp.NewLogger(beatname + "-otel-mode") - logger.Info("This mode is experimental and unsupported") // get beat configuration file beatCfg, _ := cmd.Flags().GetString("config") @@ -64,7 +65,7 @@ func OTelCmd(beatname string) *cobra.Command { }, } - command.Flags().String("config", beatname+"-otel.yml", "path to filebeat config file") + command.Flags().String("config", beatname+"-otel.yml", "path to "+beatname+" config file") return command } @@ -72,6 +73,7 @@ func OTelCmd(beatname string) *cobra.Command { func getComponent() (otelcol.Factories, error) { receivers, err := otelcol.MakeFactoryMap( fbreceiver.NewFactory(), + mbreceiver.NewFactory(), ) if err != nil { return otelcol.Factories{}, nil //nolint:nilerr //ignoring this error @@ -118,7 +120,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings { info := component.BuildInfo{ Command: "otel", Description: "Beats OTel", - Version: "9.0.0", + Version: version.GetDefaultVersion(), } return otelcol.CollectorSettings{ @@ -130,6 +132,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings { ProviderFactories: []confmap.ProviderFactory{ fileprovider.NewFactory(), fbprovider.NewFactory(), + mbprovider.NewFactory(), }, ConverterFactories: []confmap.ConverterFactory{ beatconverter.NewFactory(), diff --git a/x-pack/metricbeat/cmd/otelcmd_disabled.go b/x-pack/metricbeat/cmd/otelcmd_disabled.go new file mode 100644 index 000000000000..af7e1424dd0a --- /dev/null +++ b/x-pack/metricbeat/cmd/otelcmd_disabled.go @@ -0,0 +1,15 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !otelbeat + +package cmd + +import ( + cmd "github.com/elastic/beats/v7/libbeat/cmd" +) + +func addOTelCommand(command *cmd.BeatsRootCmd) { + // No-op +} diff --git a/x-pack/metricbeat/cmd/otelcmd_enabled.go b/x-pack/metricbeat/cmd/otelcmd_enabled.go new file mode 100644 index 000000000000..afbb9e4dfa20 --- /dev/null +++ b/x-pack/metricbeat/cmd/otelcmd_enabled.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build otelbeat + +package cmd + +import ( + cmd "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" +) + +func addOTelCommand(command *cmd.BeatsRootCmd) { + // Name here is the name of the beat + command.AddCommand(otelbeat.OTelCmd(Name)) +} diff --git a/x-pack/metricbeat/cmd/root.go b/x-pack/metricbeat/cmd/root.go index 76ca40ddf131..30d3d8386466 100644 --- a/x-pack/metricbeat/cmd/root.go +++ b/x-pack/metricbeat/cmd/root.go @@ -54,6 +54,7 @@ func Initialize() *cmd.BeatsRootCmd { rootCmd.PersistentPreRun = func(cmd *cobra.Command, args []string) { management.ConfigTransform.SetTransform(metricbeatCfg) } + addOTelCommand(rootCmd) return rootCmd } diff --git a/x-pack/metricbeat/docker-compose.yml b/x-pack/metricbeat/docker-compose.yml index 9f74541f834b..0857759502cd 100644 --- a/x-pack/metricbeat/docker-compose.yml +++ b/x-pack/metricbeat/docker-compose.yml @@ -1,6 +1,14 @@ version: '2.3' services: + # This is a proxy used to block "docker-compose up" until all services are healthy. + # See: https://github.com/docker/compose/issues/4369 + proxy_dep: + image: busybox + depends_on: + elasticsearch: { condition: service_healthy } + kibana: { condition: service_healthy } + beat: build: ../../metricbeat environment: diff --git a/x-pack/metricbeat/magefile.go b/x-pack/metricbeat/magefile.go index 6fded2c4e737..739d4359bf97 100644 --- a/x-pack/metricbeat/magefile.go +++ b/x-pack/metricbeat/magefile.go @@ -52,6 +52,11 @@ func Build() error { return devtools.Build(args) } +// BuildOTel builds the Beat binary with OTel sub command +func BuildOTel() error { + return devtools.BuildOTel() +} + // GolangCrossBuild build the Beat binary inside of the golang-builder. // Do not use directly, use crossBuild instead. func GolangCrossBuild() error { @@ -235,13 +240,27 @@ func IntegTest() { // Use TEST_TAGS=tag1,tag2 to add additional build tags. // Use MODULE=module to run only tests for `module`. func GoIntegTest(ctx context.Context) error { + + // define modules if os.Getenv("CI") == "true" { mg.Deps(devtools.DefineModules) } if !devtools.IsInIntegTestEnv() { + // build integration test binary with otel sub command + devtools.BuildSystemTestOTelBinary() + args := devtools.DefaultGoTestIntegrationFromHostArgs() + // ES_USER must be admin in order for the Go Integration tests to function because they require + // indices:data/read/search + args.Env["ES_USER"] = args.Env["ES_SUPERUSER_USER"] + args.Env["ES_PASS"] = args.Env["ES_SUPERUSER_PASS"] + // run integration test from home directory + args.Packages = []string{"./tests/integration/"} + devtools.GoIntegTestFromHost(ctx, args) + mg.SerialDeps(Fields, Dashboards) } + return devtools.GoTestIntegrationForModule(ctx) } @@ -252,15 +271,8 @@ func GoIntegTest(ctx context.Context) error { // Use TEST_TAGS=tag1,tag2 to add additional build tags. // Use MODULE=module to run only tests for `module`. func GoFIPSOnlyIntegTest(ctx context.Context) error { - if os.Getenv("CI") == "true" { - mg.Deps(devtools.DefineModules) - } - - if !devtools.IsInIntegTestEnv() { - mg.SerialDeps(Fields, Dashboards) - } os.Setenv("GODEBUG", "fips140=only") - return devtools.GoTestIntegrationForModule(ctx) + return GoIntegTest(ctx) } // PythonIntegTest executes the python system tests in the integration diff --git a/x-pack/metricbeat/metricbeat-otel.yml b/x-pack/metricbeat/metricbeat-otel.yml new file mode 100644 index 000000000000..14c2a1a11423 --- /dev/null +++ b/x-pack/metricbeat/metricbeat-otel.yml @@ -0,0 +1,26 @@ +metricbeat.modules: +- module: system + metricsets: + - cpu # CPU usage + - load # CPU load averages + - memory # Memory usage + - network # Network IO + - process # Per process metrics + - process_summary # Process summary + - uptime # System Uptime + - socket_summary # Socket summary + enabled: true + period: 10s + processes: ['.*'] + + +output: + elasticsearch: + hosts: ["https://localhost:9200"] + username: elastic + password: changeme + +setup.kibana: + host: https://localhost:5601 + username: elastic + password: changeme diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go new file mode 100644 index 000000000000..49c997ca27ab --- /dev/null +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -0,0 +1,168 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "fmt" + "net/http" + "strings" + "testing" + "text/template" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/testing/estools" +) + +func TestMetricbeatOTelE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + // create a random uuid and make sure it doesn't contain dashes/ + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + + type options struct { + Index string + ESURL string + Username string + Password string + MonitoringPort int + } + + var beatsCfgFile = ` +metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: {{ .Index }} +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +http.host: localhost +http.port: {{.MonitoringPort}} +` + + // start metricbeat in otel mode + metricbeatOTel := integration.NewBeat( + t, + "metricbeat-otel", + "../../metricbeat.test", + "otel", + ) + + optionsValue := options{ + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + MonitoringPort: 5078, + } + + var configBuffer bytes.Buffer + optionsValue.Index = "logs-integration-mbreceiver-" + namespace + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&configBuffer, optionsValue)) + + metricbeatOTel.WriteConfigFile(configBuffer.String()) + metricbeatOTel.Start() + defer metricbeatOTel.Stop() + + var mbConfigBuffer bytes.Buffer + optionsValue.Index = "logs-integration-mb-" + namespace + optionsValue.MonitoringPort = 5079 + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&mbConfigBuffer, optionsValue)) + metricbeat := integration.NewBeat(t, "metricbeat", "../../metricbeat.test") + metricbeat.WriteConfigFile(mbConfigBuffer.String()) + metricbeat.Start() + defer metricbeat.Stop() + + // prepare to query ES + es := integration.GetESClient(t, "http") + + // Make sure find the logs + var metricbeatDocs estools.Documents + var otelDocs estools.Documents + var err error + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-mbreceiver-"+namespace+"*") + require.NoError(t, err) + + metricbeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-mb-"+namespace+"*") + require.NoError(t, err) + + return otelDocs.Hits.Total.Value >= 1 && metricbeatDocs.Hits.Total.Value >= 1 + }, + 2*time.Minute, 1*time.Second, "Expected at least one ingested metric event, got metricbeat: %d, otel: %d", metricbeatDocs.Hits.Total.Value, otelDocs.Hits.Total.Value) + + otelDoc := otelDocs.Hits.Hits[0] + metricbeatDoc := metricbeatDocs.Hits.Hits[0] + assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal") + assertMonitoring(t, optionsValue.MonitoringPort) +} + +func assertMonitoring(t *testing.T, port int) { + address := fmt.Sprintf("http://localhost:%d", port) + r, err := http.Get(address) //nolint:noctx,bodyclose,gosec // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get(address + "/stats") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get(address + "/not-exist") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") +} + +func assertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { + t.Helper() + // Delete all ignored fields. + for _, f := range ignoredFields { + _ = m1.Delete(f) + _ = m2.Delete(f) + } + + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + + for k := range flatM1 { + flatM1[k] = "" + } + for k := range flatM2 { + flatM2[k] = "" + } + + require.Zero(t, cmp.Diff(flatM1, flatM2), msg) +}