From fd4c9e4c798048323bc36b481530c3e190a780b7 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 2 Jul 2025 13:24:41 +0530 Subject: [PATCH 01/15] [beatreceivers] Introduce otel mode for metricbeat --- .../otelbeat/beatconverter/beatconverter.go | 2 +- libbeat/otelbeat/providers/common_provider.go | 67 ++++++++ .../providers/fbprovider/fbprovider.go | 38 +---- .../providers/mbprovider/mbprovider.go | 51 +++++++ .../providers/mbprovider/mbprovider_test.go | 144 ++++++++++++++++++ libbeat/publisher/processing/processors.go | 6 +- x-pack/filebeat/cmd/otelcmd_enabled.go | 3 +- x-pack/libbeat/common/otelbeat/otel.go | 14 +- x-pack/metricbeat/cmd/otelcmd_disabled.go | 15 ++ x-pack/metricbeat/cmd/otelcmd_enabled.go | 17 +++ x-pack/metricbeat/cmd/root.go | 1 + x-pack/metricbeat/magefile.go | 7 + x-pack/metricbeat/metricbeat-otel.yml | 26 ++++ 13 files changed, 344 insertions(+), 47 deletions(-) create mode 100644 libbeat/otelbeat/providers/common_provider.go create mode 100644 libbeat/otelbeat/providers/mbprovider/mbprovider.go create mode 100644 libbeat/otelbeat/providers/mbprovider/mbprovider_test.go create mode 100644 x-pack/metricbeat/cmd/otelcmd_disabled.go create mode 100644 x-pack/metricbeat/cmd/otelcmd_enabled.go create mode 100644 x-pack/metricbeat/metricbeat-otel.yml diff --git a/libbeat/otelbeat/beatconverter/beatconverter.go b/libbeat/otelbeat/beatconverter/beatconverter.go index f46f6f94e8e9..b1071cb6b82c 100644 --- a/libbeat/otelbeat/beatconverter/beatconverter.go +++ b/libbeat/otelbeat/beatconverter/beatconverter.go @@ -29,7 +29,7 @@ import ( ) // list of supported beatreceivers -var supportedReceivers = []string{"filebeatreceiver"} // Add more beat receivers to this list when we add support +var supportedReceivers = []string{"filebeatreceiver", "metricbeatreceiver"} // Add more beat receivers to this list when we add support type converter struct{} diff --git a/libbeat/otelbeat/providers/common_provider.go b/libbeat/otelbeat/providers/common_provider.go new file mode 100644 index 000000000000..1c4b8fda1bc9 --- /dev/null +++ b/libbeat/otelbeat/providers/common_provider.go @@ -0,0 +1,67 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package providers + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/elastic/beats/v7/libbeat/cfgfile" + "go.opentelemetry.io/collector/confmap" +) + +var schemeMap = map[string]string{ + "fb": "filebeatreceiver", + "mb": "metricbeatreceiver", +} + +func LoadConfig(uri string, schemeName string) (*confmap.Retrieved, error) { + if !strings.HasPrefix(uri, schemeName+":") { + return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName) + } + + // Load filebeat config file + cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil) + if err != nil { + return nil, err + } + + var receiverMap map[string]any + err = cfg.Unpack(&receiverMap) + if err != nil { + return nil, err + } + + receiverName := schemeMap[schemeName] + // filebeat specific configuration is defined here + cfgMap := map[string]any{ + "receivers": map[string]any{ + receiverName: receiverMap, + }, + "service": map[string]any{ + "pipelines": map[string]any{ + "logs": map[string]any{ + "receivers": []string{receiverName}, + }, + }, + }, + } + + return confmap.NewRetrieved(cfgMap) +} diff --git a/libbeat/otelbeat/providers/fbprovider/fbprovider.go b/libbeat/otelbeat/providers/fbprovider/fbprovider.go index 50ba26bdf900..fdf95c835298 100644 --- a/libbeat/otelbeat/providers/fbprovider/fbprovider.go +++ b/libbeat/otelbeat/providers/fbprovider/fbprovider.go @@ -19,13 +19,9 @@ package fbprovider import ( "context" - "fmt" - "path/filepath" - "strings" + "github.com/elastic/beats/v7/libbeat/otelbeat/providers" "go.opentelemetry.io/collector/confmap" - - "github.com/elastic/beats/v7/libbeat/cfgfile" ) const schemeName = "fb" @@ -43,37 +39,7 @@ func newProvider(confmap.ProviderSettings) confmap.Provider { // Retrieve retrieves the beat configuration file and constructs otel config func (fmp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { - if !strings.HasPrefix(uri, schemeName+":") { - return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName) - } - - // Load filebeat config file - cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil) - if err != nil { - return nil, err - } - - var receiverMap map[string]any - err = cfg.Unpack(&receiverMap) - if err != nil { - return nil, err - } - - // filebeat specific configuration is defined here - cfgMap := map[string]any{ - "receivers": map[string]any{ - "filebeatreceiver": receiverMap, - }, - "service": map[string]any{ - "pipelines": map[string]any{ - "logs": map[string]any{ - "receivers": []string{"filebeatreceiver"}, - }, - }, - }, - } - - return confmap.NewRetrieved(cfgMap) + return providers.LoadConfig(uri, schemeName) } func (*provider) Scheme() string { diff --git a/libbeat/otelbeat/providers/mbprovider/mbprovider.go b/libbeat/otelbeat/providers/mbprovider/mbprovider.go new file mode 100644 index 000000000000..ee61d0baa499 --- /dev/null +++ b/libbeat/otelbeat/providers/mbprovider/mbprovider.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package mbprovider + +import ( + "context" + + "github.com/elastic/beats/v7/libbeat/otelbeat/providers" + "go.opentelemetry.io/collector/confmap" +) + +const schemeName = "mb" + +type provider struct{} + +// The Provider provides configuration, and allows to watch/monitor for changes. +func NewFactory() confmap.ProviderFactory { + return confmap.NewProviderFactory(newProvider) +} + +func newProvider(confmap.ProviderSettings) confmap.Provider { + return &provider{} +} + +// Retrieve retrieves the beat configuration file and constructs otel config +func (fmp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { + return providers.LoadConfig(uri, schemeName) +} + +func (*provider) Scheme() string { + return schemeName +} + +func (*provider) Shutdown(context.Context) error { + return nil +} diff --git a/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go b/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go new file mode 100644 index 000000000000..dc98f1f43583 --- /dev/null +++ b/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go @@ -0,0 +1,144 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package mbprovider + +import ( + "context" + _ "embed" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" + + "gopkg.in/yaml.v2" +) + +var beatsConfig = ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + paths: + - /tmp/flog.log + +output: + elasticsearch: + hosts: ["https://localhost:9200"] + username: elastic + password: changeme + index: form-otel-exporter + ssl.enabled: false + +setup.template.name: form-otel-exporter +setup.template.pattern: form-otel-exporter +setup.dashboards.index: "form-otel-exporter*" + +setup.kibana: + host: https://localhost:5601 + username: elastic + password: changeme + ssl.verification_mode: none +` + +var expectedOutput = ` +receivers: + filebeatreceiver: + filebeat: + inputs: + - enabled: true + id: filestream-input-id + paths: + - /tmp/flog.log + type: filestream + output: + elasticsearch: + hosts: ["https://localhost:9200"] + username: elastic + password: changeme + index: form-otel-exporter + ssl: + enabled: false + path: + config: . + data: ./data + home: . + logs: ./logs + setup: + dashboards: + index: form-otel-exporter* + kibana: + host: https://localhost:5601 + password: changeme + ssl: + verification_mode: none + username: elastic + template: + name: form-otel-exporter + pattern: form-otel-exporter + +service: + pipelines: + logs: + receivers: + - "filebeatreceiver" +` + +func TestFileBeatProvider(t *testing.T) { + p := provider{} + + t.Run("test filebeat provider", func(t *testing.T) { + + tempFile, err := os.CreateTemp("", "filebeat.yml") + require.NoError(t, err, "error creating temp file") + defer os.Remove(tempFile.Name()) // Clean up the file after we're done + defer tempFile.Close() + + content := []byte(beatsConfig) + _, err = tempFile.Write(content) + require.NoError(t, err, "error creating temp file") + + // prefix file path with fb: + ret, err := p.Retrieve(context.Background(), "fb:"+tempFile.Name(), nil) + require.NoError(t, err) + + retValue, err := ret.AsRaw() + require.NoError(t, err) + expOutput := newFromYamlString(t, expectedOutput) + + // convert it into a common type + want, err := yaml.Marshal(expOutput.ToStringMap()) + require.NoError(t, err) + got, err := yaml.Marshal(retValue) + require.NoError(t, err) + + assert.Equal(t, string(want), string(got)) + assert.NoError(t, p.Shutdown(context.Background())) + }) + +} + +func newFromYamlString(t *testing.T, input string) *confmap.Conf { + t.Helper() + var rawConf map[string]any + err := yaml.Unmarshal([]byte(input), &rawConf) + require.NoError(t, err) + + return confmap.NewFromStringMap(rawConf) +} diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index 710af841e234..ebf55458c698 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -205,12 +205,14 @@ func debugPrintProcessor(info beat.Info, log *logp.Logger) *processorFn { mux.Lock() defer mux.Unlock() - b, err := encoder.Encode(info.Beat, event) + _, err := encoder.Encode(info.Beat, event) if err != nil { return event, nil } - log.Debugw(fmt.Sprintf("Publish event: %s", b), logp.TypeKey, logp.EventType) + // fmt.Println("publish event works") + + log.Debugw(fmt.Sprintf("Publish event: %+v", event), logp.TypeKey, logp.EventType) return event, nil }) } diff --git a/x-pack/filebeat/cmd/otelcmd_enabled.go b/x-pack/filebeat/cmd/otelcmd_enabled.go index 09bf09cdade2..110ae9f589c7 100644 --- a/x-pack/filebeat/cmd/otelcmd_enabled.go +++ b/x-pack/filebeat/cmd/otelcmd_enabled.go @@ -7,11 +7,10 @@ package cmd import ( - fbcmd "github.com/elastic/beats/v7/filebeat/cmd" cmd "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" ) func addOTelCommand(command *cmd.BeatsRootCmd) { - command.AddCommand(otelbeat.OTelCmd(fbcmd.Name)) + command.AddCommand(otelbeat.OTelCmd(Name)) } diff --git a/x-pack/libbeat/common/otelbeat/otel.go b/x-pack/libbeat/common/otelbeat/otel.go index 551ec161f30a..c5fac00b1390 100644 --- a/x-pack/libbeat/common/otelbeat/otel.go +++ b/x-pack/libbeat/common/otelbeat/otel.go @@ -22,13 +22,15 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" + "github.com/elastic/beats/v7/libbeat/otelbeat/providers/mbprovider" "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/x-pack/metricbeat/mbreceiver" "github.com/elastic/elastic-agent-libs/mapstr" ) var schemeMap = map[string]string{ - "filebeat": "fb", + "filebeat": "fb", + "metricbeat": "mb", } func OTelCmd(beatname string) *cobra.Command { @@ -37,8 +39,6 @@ func OTelCmd(beatname string) *cobra.Command { Use: "otel", Hidden: true, RunE: func(cmd *cobra.Command, args []string) error { - logger := logp.NewLogger(beatname + "-otel-mode") - logger.Info("This mode is experimental and unsupported") // get beat configuration file beatCfg, _ := cmd.Flags().GetString("config") @@ -64,7 +64,7 @@ func OTelCmd(beatname string) *cobra.Command { }, } - command.Flags().String("config", beatname+"-otel.yml", "path to filebeat config file") + command.Flags().String("config", beatname+"-otel.yml", "path to"+beatname+"config file") return command } @@ -72,6 +72,7 @@ func OTelCmd(beatname string) *cobra.Command { func getComponent() (otelcol.Factories, error) { receivers, err := otelcol.MakeFactoryMap( fbreceiver.NewFactory(), + mbreceiver.NewFactory(), ) if err != nil { return otelcol.Factories{}, nil //nolint:nilerr //ignoring this error @@ -118,7 +119,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings { info := component.BuildInfo{ Command: "otel", Description: "Beats OTel", - Version: "9.0.0", + Version: "9.2.0", } return otelcol.CollectorSettings{ @@ -130,6 +131,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings { ProviderFactories: []confmap.ProviderFactory{ fileprovider.NewFactory(), fbprovider.NewFactory(), + mbprovider.NewFactory(), }, ConverterFactories: []confmap.ConverterFactory{ beatconverter.NewFactory(), diff --git a/x-pack/metricbeat/cmd/otelcmd_disabled.go b/x-pack/metricbeat/cmd/otelcmd_disabled.go new file mode 100644 index 000000000000..af7e1424dd0a --- /dev/null +++ b/x-pack/metricbeat/cmd/otelcmd_disabled.go @@ -0,0 +1,15 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !otelbeat + +package cmd + +import ( + cmd "github.com/elastic/beats/v7/libbeat/cmd" +) + +func addOTelCommand(command *cmd.BeatsRootCmd) { + // No-op +} diff --git a/x-pack/metricbeat/cmd/otelcmd_enabled.go b/x-pack/metricbeat/cmd/otelcmd_enabled.go new file mode 100644 index 000000000000..afbb9e4dfa20 --- /dev/null +++ b/x-pack/metricbeat/cmd/otelcmd_enabled.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build otelbeat + +package cmd + +import ( + cmd "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat" +) + +func addOTelCommand(command *cmd.BeatsRootCmd) { + // Name here is the name of the beat + command.AddCommand(otelbeat.OTelCmd(Name)) +} diff --git a/x-pack/metricbeat/cmd/root.go b/x-pack/metricbeat/cmd/root.go index 76ca40ddf131..30d3d8386466 100644 --- a/x-pack/metricbeat/cmd/root.go +++ b/x-pack/metricbeat/cmd/root.go @@ -54,6 +54,7 @@ func Initialize() *cmd.BeatsRootCmd { rootCmd.PersistentPreRun = func(cmd *cobra.Command, args []string) { management.ConfigTransform.SetTransform(metricbeatCfg) } + addOTelCommand(rootCmd) return rootCmd } diff --git a/x-pack/metricbeat/magefile.go b/x-pack/metricbeat/magefile.go index 6fded2c4e737..c3d388e686e8 100644 --- a/x-pack/metricbeat/magefile.go +++ b/x-pack/metricbeat/magefile.go @@ -52,6 +52,13 @@ func Build() error { return devtools.Build(args) } +// BuildOTel builds the Beat binary with OTel sub command +func BuildOTel() error { + args := devtools.DefaultBuildArgs() + args.ExtraFlags = append(args.ExtraFlags, "-tags", "otelbeat") + return devtools.Build(args) +} + // GolangCrossBuild build the Beat binary inside of the golang-builder. // Do not use directly, use crossBuild instead. func GolangCrossBuild() error { diff --git a/x-pack/metricbeat/metricbeat-otel.yml b/x-pack/metricbeat/metricbeat-otel.yml new file mode 100644 index 000000000000..14c2a1a11423 --- /dev/null +++ b/x-pack/metricbeat/metricbeat-otel.yml @@ -0,0 +1,26 @@ +metricbeat.modules: +- module: system + metricsets: + - cpu # CPU usage + - load # CPU load averages + - memory # Memory usage + - network # Network IO + - process # Per process metrics + - process_summary # Process summary + - uptime # System Uptime + - socket_summary # Socket summary + enabled: true + period: 10s + processes: ['.*'] + + +output: + elasticsearch: + hosts: ["https://localhost:9200"] + username: elastic + password: changeme + +setup.kibana: + host: https://localhost:5601 + username: elastic + password: changeme From e33fe590983da31edd9e9f29b8bee39c0719c579 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 2 Jul 2025 13:44:32 +0530 Subject: [PATCH 02/15] fix test --- .../providers/mbprovider/mbprovider_test.go | 73 +++++++------------ 1 file changed, 28 insertions(+), 45 deletions(-) diff --git a/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go b/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go index dc98f1f43583..26b1dfc77693 100644 --- a/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go +++ b/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go @@ -31,12 +31,15 @@ import ( ) var beatsConfig = ` -filebeat.inputs: - - type: filestream - id: filestream-input-id +metricbeat.modules: + - module: system + metricsets: + - cpu # CPU usage + - load # CPU load averages enabled: true - paths: - - /tmp/flog.log + period: 10s + processes: ['.*'] + output: elasticsearch: @@ -45,28 +48,25 @@ output: password: changeme index: form-otel-exporter ssl.enabled: false - -setup.template.name: form-otel-exporter -setup.template.pattern: form-otel-exporter -setup.dashboards.index: "form-otel-exporter*" - -setup.kibana: - host: https://localhost:5601 - username: elastic - password: changeme - ssl.verification_mode: none ` var expectedOutput = ` receivers: - filebeatreceiver: - filebeat: - inputs: - - enabled: true - id: filestream-input-id - paths: - - /tmp/flog.log - type: filestream + metricbeatreceiver: + metricbeat: + modules: + - module: system + enabled: true + metricsets: + - cpu + - load + processes: ['.*'] + period: 10s + path: + config: . + data: ./data + home: . + logs: ./logs output: elasticsearch: hosts: ["https://localhost:9200"] @@ -75,37 +75,20 @@ receivers: index: form-otel-exporter ssl: enabled: false - path: - config: . - data: ./data - home: . - logs: ./logs - setup: - dashboards: - index: form-otel-exporter* - kibana: - host: https://localhost:5601 - password: changeme - ssl: - verification_mode: none - username: elastic - template: - name: form-otel-exporter - pattern: form-otel-exporter service: pipelines: logs: receivers: - - "filebeatreceiver" + - "metricbeatreceiver" ` -func TestFileBeatProvider(t *testing.T) { +func TestMetricbeatProvider(t *testing.T) { p := provider{} - t.Run("test filebeat provider", func(t *testing.T) { + t.Run("test metricbeat provider", func(t *testing.T) { - tempFile, err := os.CreateTemp("", "filebeat.yml") + tempFile, err := os.CreateTemp("", "metricbeat.yml") require.NoError(t, err, "error creating temp file") defer os.Remove(tempFile.Name()) // Clean up the file after we're done defer tempFile.Close() @@ -115,7 +98,7 @@ func TestFileBeatProvider(t *testing.T) { require.NoError(t, err, "error creating temp file") // prefix file path with fb: - ret, err := p.Retrieve(context.Background(), "fb:"+tempFile.Name(), nil) + ret, err := p.Retrieve(context.Background(), "mb:"+tempFile.Name(), nil) require.NoError(t, err) retValue, err := ret.AsRaw() From ae1e99300a663471cca411d3a0ddb7006d1601b6 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 2 Jul 2025 13:46:36 +0530 Subject: [PATCH 03/15] nit --- libbeat/otelbeat/providers/common_provider.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/otelbeat/providers/common_provider.go b/libbeat/otelbeat/providers/common_provider.go index 1c4b8fda1bc9..a48809d04465 100644 --- a/libbeat/otelbeat/providers/common_provider.go +++ b/libbeat/otelbeat/providers/common_provider.go @@ -36,7 +36,7 @@ func LoadConfig(uri string, schemeName string) (*confmap.Retrieved, error) { return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName) } - // Load filebeat config file + // Load beat config file cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil) if err != nil { return nil, err @@ -49,7 +49,7 @@ func LoadConfig(uri string, schemeName string) (*confmap.Retrieved, error) { } receiverName := schemeMap[schemeName] - // filebeat specific configuration is defined here + // beat specific configuration is defined here cfgMap := map[string]any{ "receivers": map[string]any{ receiverName: receiverMap, From 0a592f4ace6a262a9c3447ab3001c6b0a2b153c0 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 2 Jul 2025 13:47:25 +0530 Subject: [PATCH 04/15] remove processor --- libbeat/publisher/processing/processors.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index ebf55458c698..710af841e234 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -205,14 +205,12 @@ func debugPrintProcessor(info beat.Info, log *logp.Logger) *processorFn { mux.Lock() defer mux.Unlock() - _, err := encoder.Encode(info.Beat, event) + b, err := encoder.Encode(info.Beat, event) if err != nil { return event, nil } - // fmt.Println("publish event works") - - log.Debugw(fmt.Sprintf("Publish event: %+v", event), logp.TypeKey, logp.EventType) + log.Debugw(fmt.Sprintf("Publish event: %s", b), logp.TypeKey, logp.EventType) return event, nil }) } From 920921ef96d39c6d65ce5a23eefafff2692845e7 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 2 Jul 2025 14:34:22 +0530 Subject: [PATCH 05/15] make check --- libbeat/otelbeat/providers/common_provider.go | 3 ++- libbeat/otelbeat/providers/fbprovider/fbprovider.go | 3 ++- libbeat/otelbeat/providers/mbprovider/mbprovider.go | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/libbeat/otelbeat/providers/common_provider.go b/libbeat/otelbeat/providers/common_provider.go index a48809d04465..ed90c92cde4d 100644 --- a/libbeat/otelbeat/providers/common_provider.go +++ b/libbeat/otelbeat/providers/common_provider.go @@ -22,8 +22,9 @@ import ( "path/filepath" "strings" - "github.com/elastic/beats/v7/libbeat/cfgfile" "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/beats/v7/libbeat/cfgfile" ) var schemeMap = map[string]string{ diff --git a/libbeat/otelbeat/providers/fbprovider/fbprovider.go b/libbeat/otelbeat/providers/fbprovider/fbprovider.go index fdf95c835298..f1759d3a8b62 100644 --- a/libbeat/otelbeat/providers/fbprovider/fbprovider.go +++ b/libbeat/otelbeat/providers/fbprovider/fbprovider.go @@ -20,8 +20,9 @@ package fbprovider import ( "context" - "github.com/elastic/beats/v7/libbeat/otelbeat/providers" "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/beats/v7/libbeat/otelbeat/providers" ) const schemeName = "fb" diff --git a/libbeat/otelbeat/providers/mbprovider/mbprovider.go b/libbeat/otelbeat/providers/mbprovider/mbprovider.go index ee61d0baa499..8ab74b0f99ec 100644 --- a/libbeat/otelbeat/providers/mbprovider/mbprovider.go +++ b/libbeat/otelbeat/providers/mbprovider/mbprovider.go @@ -20,8 +20,9 @@ package mbprovider import ( "context" - "github.com/elastic/beats/v7/libbeat/otelbeat/providers" "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/beats/v7/libbeat/otelbeat/providers" ) const schemeName = "mb" From 42619f640f55dec886eb98b16b4aa25f8920fe78 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 3 Jul 2025 13:05:06 +0530 Subject: [PATCH 06/15] Update x-pack/libbeat/common/otelbeat/otel.go Co-authored-by: Tiago Queiroz --- x-pack/libbeat/common/otelbeat/otel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/libbeat/common/otelbeat/otel.go b/x-pack/libbeat/common/otelbeat/otel.go index c5fac00b1390..1ac58740721a 100644 --- a/x-pack/libbeat/common/otelbeat/otel.go +++ b/x-pack/libbeat/common/otelbeat/otel.go @@ -64,7 +64,7 @@ func OTelCmd(beatname string) *cobra.Command { }, } - command.Flags().String("config", beatname+"-otel.yml", "path to"+beatname+"config file") + command.Flags().String("config", beatname+"-otel.yml", "path to "+beatname+" config file") return command } From 2d3bebb504ec546811d718b5ddbfbde18d571471 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 3 Jul 2025 13:05:50 +0530 Subject: [PATCH 07/15] Update x-pack/libbeat/common/otelbeat/otel.go Co-authored-by: Tiago Queiroz --- x-pack/libbeat/common/otelbeat/otel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/libbeat/common/otelbeat/otel.go b/x-pack/libbeat/common/otelbeat/otel.go index 1ac58740721a..dc71f1acbb53 100644 --- a/x-pack/libbeat/common/otelbeat/otel.go +++ b/x-pack/libbeat/common/otelbeat/otel.go @@ -119,7 +119,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings { info := component.BuildInfo{ Command: "otel", Description: "Beats OTel", - Version: "9.2.0", + Version: version.GetDefaultVersion(), } return otelcol.CollectorSettings{ From 2f9e1fc8f9d141c32d4d8d8247916d203cc1218e Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 3 Jul 2025 14:09:23 +0530 Subject: [PATCH 08/15] address review comments --- .../providers/fbprovider/fbprovider.go | 13 +++++++------ .../providers/fbprovider/fbprovider_test.go | 16 +++++++--------- .../providers/mbprovider/mbprovider.go | 15 +++++++++------ .../providers/mbprovider/mbprovider_test.go | 18 ++++++++---------- .../{common_provider.go => providers.go} | 1 + 5 files changed, 32 insertions(+), 31 deletions(-) rename libbeat/otelbeat/providers/{common_provider.go => providers.go} (95%) diff --git a/libbeat/otelbeat/providers/fbprovider/fbprovider.go b/libbeat/otelbeat/providers/fbprovider/fbprovider.go index f1759d3a8b62..3e677e868fae 100644 --- a/libbeat/otelbeat/providers/fbprovider/fbprovider.go +++ b/libbeat/otelbeat/providers/fbprovider/fbprovider.go @@ -27,26 +27,27 @@ import ( const schemeName = "fb" -type provider struct{} +type fbProvider struct{} -// The Provider provides configuration, and allows to watch/monitor for changes. +// NewFactory returns a provider factory that loads filebeat configuration func NewFactory() confmap.ProviderFactory { return confmap.NewProviderFactory(newProvider) } func newProvider(confmap.ProviderSettings) confmap.Provider { - return &provider{} + return &fbProvider{} } // Retrieve retrieves the beat configuration file and constructs otel config -func (fmp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { +// uri here is the filepath of the beat config +func (*fbProvider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { return providers.LoadConfig(uri, schemeName) } -func (*provider) Scheme() string { +func (*fbProvider) Scheme() string { return schemeName } -func (*provider) Shutdown(context.Context) error { +func (*fbProvider) Shutdown(context.Context) error { return nil } diff --git a/libbeat/otelbeat/providers/fbprovider/fbprovider_test.go b/libbeat/otelbeat/providers/fbprovider/fbprovider_test.go index 6e4512fc4463..b2f5df08e3fe 100644 --- a/libbeat/otelbeat/providers/fbprovider/fbprovider_test.go +++ b/libbeat/otelbeat/providers/fbprovider/fbprovider_test.go @@ -21,6 +21,7 @@ import ( "context" _ "embed" "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -101,21 +102,18 @@ service: ` func TestFileBeatProvider(t *testing.T) { - p := provider{} + p := fbProvider{} t.Run("test filebeat provider", func(t *testing.T) { - tempFile, err := os.CreateTemp("", "filebeat.yml") - require.NoError(t, err, "error creating temp file") - defer os.Remove(tempFile.Name()) // Clean up the file after we're done - defer tempFile.Close() + tempDir := t.TempDir() - content := []byte(beatsConfig) - _, err = tempFile.Write(content) - require.NoError(t, err, "error creating temp file") + tempFileName := filepath.Join(tempDir, "filebeat.yml") + err := os.WriteFile(tempFileName, []byte(beatsConfig), 0666) + require.NoError(t, err, "error writing to temp file") // prefix file path with fb: - ret, err := p.Retrieve(context.Background(), "fb:"+tempFile.Name(), nil) + ret, err := p.Retrieve(context.Background(), "fb:"+tempFileName, nil) require.NoError(t, err) retValue, err := ret.AsRaw() diff --git a/libbeat/otelbeat/providers/mbprovider/mbprovider.go b/libbeat/otelbeat/providers/mbprovider/mbprovider.go index 8ab74b0f99ec..d1dbaecd328e 100644 --- a/libbeat/otelbeat/providers/mbprovider/mbprovider.go +++ b/libbeat/otelbeat/providers/mbprovider/mbprovider.go @@ -27,26 +27,29 @@ import ( const schemeName = "mb" -type provider struct{} +type mbProvider struct{} -// The Provider provides configuration, and allows to watch/monitor for changes. +// NewFactory returns a provider factory that loads metricbeat configuration func NewFactory() confmap.ProviderFactory { return confmap.NewProviderFactory(newProvider) } func newProvider(confmap.ProviderSettings) confmap.Provider { - return &provider{} + return &mbProvider{} } // Retrieve retrieves the beat configuration file and constructs otel config -func (fmp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { +// uri here is the filepath of the beat config +func (*mbProvider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { return providers.LoadConfig(uri, schemeName) } -func (*provider) Scheme() string { +// Scheme returns the scheme name +func (*mbProvider) Scheme() string { return schemeName } -func (*provider) Shutdown(context.Context) error { +// Shutdown is a noop, it always returns nil +func (*mbProvider) Shutdown(context.Context) error { return nil } diff --git a/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go b/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go index 26b1dfc77693..2ae7e247965b 100644 --- a/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go +++ b/libbeat/otelbeat/providers/mbprovider/mbprovider_test.go @@ -21,6 +21,7 @@ import ( "context" _ "embed" "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -84,21 +85,18 @@ service: ` func TestMetricbeatProvider(t *testing.T) { - p := provider{} + p := mbProvider{} t.Run("test metricbeat provider", func(t *testing.T) { - tempFile, err := os.CreateTemp("", "metricbeat.yml") - require.NoError(t, err, "error creating temp file") - defer os.Remove(tempFile.Name()) // Clean up the file after we're done - defer tempFile.Close() + tempDir := t.TempDir() - content := []byte(beatsConfig) - _, err = tempFile.Write(content) - require.NoError(t, err, "error creating temp file") + tempFileName := filepath.Join(tempDir, "metricbeat.yml") + err := os.WriteFile(tempFileName, []byte(beatsConfig), 0666) + require.NoError(t, err, "error writing to temp file") - // prefix file path with fb: - ret, err := p.Retrieve(context.Background(), "mb:"+tempFile.Name(), nil) + // prefix file path with mb: + ret, err := p.Retrieve(context.Background(), "mb:"+tempFileName, nil) require.NoError(t, err) retValue, err := ret.AsRaw() diff --git a/libbeat/otelbeat/providers/common_provider.go b/libbeat/otelbeat/providers/providers.go similarity index 95% rename from libbeat/otelbeat/providers/common_provider.go rename to libbeat/otelbeat/providers/providers.go index ed90c92cde4d..adef7052e090 100644 --- a/libbeat/otelbeat/providers/common_provider.go +++ b/libbeat/otelbeat/providers/providers.go @@ -32,6 +32,7 @@ var schemeMap = map[string]string{ "mb": "metricbeatreceiver", } +// LoadConfig loads beat configuration based on provided scheme-name and uri path func LoadConfig(uri string, schemeName string) (*confmap.Retrieved, error) { if !strings.HasPrefix(uri, schemeName+":") { return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName) From a83a364c25da8766187b076595c010a67a23c3de Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 3 Jul 2025 15:33:50 +0530 Subject: [PATCH 09/15] fix ci --- x-pack/libbeat/common/otelbeat/otel.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/libbeat/common/otelbeat/otel.go b/x-pack/libbeat/common/otelbeat/otel.go index dc71f1acbb53..31ee9f003a5d 100644 --- a/x-pack/libbeat/common/otelbeat/otel.go +++ b/x-pack/libbeat/common/otelbeat/otel.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter" "github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider" "github.com/elastic/beats/v7/libbeat/otelbeat/providers/mbprovider" + "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver" "github.com/elastic/beats/v7/x-pack/metricbeat/mbreceiver" "github.com/elastic/elastic-agent-libs/mapstr" From 2ea9de4bb725312f1d373b3beb8b82a652e5a76c Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 3 Jul 2025 17:45:26 +0530 Subject: [PATCH 10/15] add test --- x-pack/metricbeat/magefile.go | 25 ++++ .../metricbeat/tests/integration/otel_test.go | 109 ++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 x-pack/metricbeat/tests/integration/otel_test.go diff --git a/x-pack/metricbeat/magefile.go b/x-pack/metricbeat/magefile.go index c3d388e686e8..3e239ade95e3 100644 --- a/x-pack/metricbeat/magefile.go +++ b/x-pack/metricbeat/magefile.go @@ -242,6 +242,19 @@ func IntegTest() { // Use TEST_TAGS=tag1,tag2 to add additional build tags. // Use MODULE=module to run only tests for `module`. func GoIntegTest(ctx context.Context) error { + + // build integration test binary with otel sub command + devtools.BuildSystemTestOTelBinary() + args := devtools.DefaultGoTestIntegrationFromHostArgs() + // ES_USER must be admin in order for the Go Integration tests to function because they require + // indices:data/read/search + args.Env["ES_USER"] = args.Env["ES_SUPERUSER_USER"] + args.Env["ES_PASS"] = args.Env["ES_SUPERUSER_PASS"] + // run integration test from home directory + args.Packages = []string{"./tests/integration/"} + devtools.GoIntegTestFromHost(ctx, args) + + // run module integration test if os.Getenv("CI") == "true" { mg.Deps(devtools.DefineModules) } @@ -259,6 +272,18 @@ func GoIntegTest(ctx context.Context) error { // Use TEST_TAGS=tag1,tag2 to add additional build tags. // Use MODULE=module to run only tests for `module`. func GoFIPSOnlyIntegTest(ctx context.Context) error { + // build integration test binary with otel sub command + devtools.BuildSystemTestOTelBinary() + args := devtools.DefaultGoTestIntegrationFromHostArgs() + // ES_USER must be admin in order for the Go Integration tests to function because they require + // indices:data/read/search + args.Env["ES_USER"] = args.Env["ES_SUPERUSER_USER"] + args.Env["ES_PASS"] = args.Env["ES_SUPERUSER_PASS"] + // run integration test from home directory + args.Packages = []string{"./tests/integration/"} + devtools.GoIntegTestFromHost(ctx, args) + + // run module integration test if os.Getenv("CI") == "true" { mg.Deps(devtools.DefineModules) } diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go new file mode 100644 index 000000000000..7e6d2f2d4080 --- /dev/null +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -0,0 +1,109 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "fmt" + "strings" + "testing" + "text/template" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/testing/estools" +) + +func TestMetricbeatOTelE2E(t *testing.T) { + integration.EnsureESIsRunning(t) + + host := integration.GetESURL(t, "http") + user := host.User.Username() + password, _ := host.User.Password() + + // create a random uuid and make sure it doesn't contain dashes/ + otelNamespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + + type options struct { + Namespace string + ESURL string + Username string + Password string + } + + var beatsCfgFile = ` +metricbeat: + modules: + - module: system + enabled: true + period: 1s + processes: + - '.*' + metricsets: + - cpu +output: + elasticsearch: + hosts: + - {{ .ESURL }} + username: {{ .Username }} + password: {{ .Password }} + index: logs-integration-{{ .Namespace }} +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + + // start metricbeat in otel mode + metricbeatOTel := integration.NewBeat( + t, + "metricbeat-otel", + "../../metricbeat.test", + "otel", + ) + + var configBuffer bytes.Buffer + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&configBuffer, options{ + Namespace: otelNamespace, + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + })) + + metricbeatOTel.WriteConfigFile(configBuffer.String()) + metricbeatOTel.Start() + defer metricbeatOTel.Stop() + + // prepare to query ES + es := integration.GetESClient(t, "http") + + // Make sure find the logs + actualHits := &struct{ Hits int }{} + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err := estools.GetLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-"+otelNamespace+"*", map[string]interface{}{ + "metricset.name": "cpu", + }) + require.NoError(t, err) + + actualHits.Hits = otelDocs.Hits.Total.Value + return actualHits.Hits >= 1 + }, + 2*time.Minute, 1*time.Second, + "Expected at least %d logs, got %v", 1, actualHits) + +} From 4567a899bdba6c8d95142882f418f08e82dbf527 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Mon, 7 Jul 2025 11:50:11 +0530 Subject: [PATCH 11/15] add e2e test --- .../metricbeat/tests/integration/otel_test.go | 76 ++++++++++++++----- 1 file changed, 58 insertions(+), 18 deletions(-) diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go index 7e6d2f2d4080..16c8aabee1e9 100644 --- a/x-pack/metricbeat/tests/integration/otel_test.go +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -16,9 +16,11 @@ import ( "time" "github.com/gofrs/uuid/v5" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/tests/integration" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" ) @@ -30,13 +32,13 @@ func TestMetricbeatOTelE2E(t *testing.T) { password, _ := host.User.Password() // create a random uuid and make sure it doesn't contain dashes/ - otelNamespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") type options struct { - Namespace string - ESURL string - Username string - Password string + Index string + ESURL string + Username string + Password string } var beatsCfgFile = ` @@ -55,7 +57,7 @@ output: - {{ .ESURL }} username: {{ .Username }} password: {{ .Password }} - index: logs-integration-{{ .Namespace }} + index: {{ .Index }} queue.mem.flush.timeout: 0s setup.template.enabled: false processors: @@ -75,35 +77,73 @@ processors: var configBuffer bytes.Buffer require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&configBuffer, options{ - Namespace: otelNamespace, - ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), - Username: user, - Password: password, + Index: "logs-integration-mbreceiver-" + namespace, + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, })) metricbeatOTel.WriteConfigFile(configBuffer.String()) metricbeatOTel.Start() defer metricbeatOTel.Stop() + var mbConfigBuffer bytes.Buffer + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&mbConfigBuffer, options{ + Index: "logs-filebeat-mb-" + namespace, + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + })) + metricbeat := integration.NewBeat(t, "metricbeat", "../../metricbeat.test") + metricbeat.WriteConfigFile(mbConfigBuffer.String()) + metricbeat.Start() + defer metricbeat.Stop() + // prepare to query ES es := integration.GetESClient(t, "http") // Make sure find the logs - actualHits := &struct{ Hits int }{} + var metricbeatDocs estools.Documents + var otelDocs estools.Documents + var err error require.Eventually(t, func() bool { findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) defer findCancel() - otelDocs, err := estools.GetLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-"+otelNamespace+"*", map[string]interface{}{ - "metricset.name": "cpu", - }) + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-mbreceiver*") + require.NoError(t, err) + + metricbeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-mb*") require.NoError(t, err) - actualHits.Hits = otelDocs.Hits.Total.Value - return actualHits.Hits >= 1 + return otelDocs.Hits.Total.Value >= 1 && metricbeatDocs.Hits.Total.Value >= 1 }, - 2*time.Minute, 1*time.Second, - "Expected at least %d logs, got %v", 1, actualHits) + 2*time.Minute, 1*time.Second, "Expected at least one ingested metric event, got metricbeat: %d, otel: %d", metricbeatDocs.Hits.Total.Value, otelDocs.Hits.Total.Value) + + otelDoc := otelDocs.Hits.Hits[0] + metricbeatDoc := metricbeatDocs.Hits.Hits[0] + assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal") + +} + +func assertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { + t.Helper() + // Delete all ignored fields. + for _, f := range ignoredFields { + _ = m1.Delete(f) + _ = m2.Delete(f) + } + + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + + for k := range flatM1 { + flatM1[k] = "" + } + for k := range flatM2 { + flatM2[k] = "" + } + require.Zero(t, cmp.Diff(flatM1, flatM2), msg) } From 94f476041dc488a1acb1a5da894bf6ba66995979 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 9 Jul 2025 10:29:47 +0530 Subject: [PATCH 12/15] fix ci related issues and add monitoring --- x-pack/metricbeat/docker-compose.yml | 8 ++- .../metricbeat/tests/integration/otel_test.go | 55 +++++++++++++------ 2 files changed, 44 insertions(+), 19 deletions(-) diff --git a/x-pack/metricbeat/docker-compose.yml b/x-pack/metricbeat/docker-compose.yml index 9f74541f834b..98e9361eb6bf 100644 --- a/x-pack/metricbeat/docker-compose.yml +++ b/x-pack/metricbeat/docker-compose.yml @@ -17,8 +17,14 @@ services: # Used by base tests elasticsearch: extends: - file: ../../metricbeat/docker-compose.yml + file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml service: elasticsearch + healthcheck: + test: ["CMD-SHELL", "curl -u admin:testing -s http://localhost:9200/_cat/health?h=status | grep -q green"] + retries: 300 + interval: 1s + ports: + - 9200:9200 # Used by base tests kibana: diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go index 16c8aabee1e9..49c997ca27ab 100644 --- a/x-pack/metricbeat/tests/integration/otel_test.go +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -10,6 +10,7 @@ import ( "bytes" "context" "fmt" + "net/http" "strings" "testing" "text/template" @@ -35,10 +36,11 @@ func TestMetricbeatOTelE2E(t *testing.T) { namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") type options struct { - Index string - ESURL string - Username string - Password string + Index string + ESURL string + Username string + Password string + MonitoringPort int } var beatsCfgFile = ` @@ -65,6 +67,8 @@ processors: - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ +http.host: localhost +http.port: {{.MonitoringPort}} ` // start metricbeat in otel mode @@ -75,25 +79,25 @@ processors: "otel", ) + optionsValue := options{ + ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), + Username: user, + Password: password, + MonitoringPort: 5078, + } + var configBuffer bytes.Buffer - require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&configBuffer, options{ - Index: "logs-integration-mbreceiver-" + namespace, - ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), - Username: user, - Password: password, - })) + optionsValue.Index = "logs-integration-mbreceiver-" + namespace + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&configBuffer, optionsValue)) metricbeatOTel.WriteConfigFile(configBuffer.String()) metricbeatOTel.Start() defer metricbeatOTel.Stop() var mbConfigBuffer bytes.Buffer - require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&mbConfigBuffer, options{ - Index: "logs-filebeat-mb-" + namespace, - ESURL: fmt.Sprintf("%s://%s", host.Scheme, host.Host), - Username: user, - Password: password, - })) + optionsValue.Index = "logs-integration-mb-" + namespace + optionsValue.MonitoringPort = 5079 + require.NoError(t, template.Must(template.New("config").Parse(beatsCfgFile)).Execute(&mbConfigBuffer, optionsValue)) metricbeat := integration.NewBeat(t, "metricbeat", "../../metricbeat.test") metricbeat.WriteConfigFile(mbConfigBuffer.String()) metricbeat.Start() @@ -111,10 +115,10 @@ processors: findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) defer findCancel() - otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-mbreceiver*") + otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-mbreceiver-"+namespace+"*") require.NoError(t, err) - metricbeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-mb*") + metricbeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-mb-"+namespace+"*") require.NoError(t, err) return otelDocs.Hits.Total.Value >= 1 && metricbeatDocs.Hits.Total.Value >= 1 @@ -124,7 +128,22 @@ processors: otelDoc := otelDocs.Hits.Hits[0] metricbeatDoc := metricbeatDocs.Hits.Hits[0] assertMapstrKeysEqual(t, otelDoc.Source, metricbeatDoc.Source, []string{}, "expected documents keys to be equal") + assertMonitoring(t, optionsValue.MonitoringPort) +} + +func assertMonitoring(t *testing.T, port int) { + address := fmt.Sprintf("http://localhost:%d", port) + r, err := http.Get(address) //nolint:noctx,bodyclose,gosec // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get(address + "/stats") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + r, err = http.Get(address + "/not-exist") //nolint:noctx,bodyclose // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } func assertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { From 1c21f5416533b74176fc4af760eb935aaea748da Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Wed, 9 Jul 2025 12:09:27 +0530 Subject: [PATCH 13/15] add proxy dep --- libbeat/tests/integration/framework.go | 1 + x-pack/metricbeat/docker-compose.yml | 16 +++++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 3ea2dbed3a29..3391bcd7d4e7 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -693,6 +693,7 @@ func createTempDir(t *testing.T) string { // using the default test credentials or the corresponding environment // variables. func EnsureESIsRunning(t *testing.T) { + t.Helper() esURL := GetESURL(t, "http") ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second)) diff --git a/x-pack/metricbeat/docker-compose.yml b/x-pack/metricbeat/docker-compose.yml index 98e9361eb6bf..0857759502cd 100644 --- a/x-pack/metricbeat/docker-compose.yml +++ b/x-pack/metricbeat/docker-compose.yml @@ -1,6 +1,14 @@ version: '2.3' services: + # This is a proxy used to block "docker-compose up" until all services are healthy. + # See: https://github.com/docker/compose/issues/4369 + proxy_dep: + image: busybox + depends_on: + elasticsearch: { condition: service_healthy } + kibana: { condition: service_healthy } + beat: build: ../../metricbeat environment: @@ -17,14 +25,8 @@ services: # Used by base tests elasticsearch: extends: - file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml + file: ../../metricbeat/docker-compose.yml service: elasticsearch - healthcheck: - test: ["CMD-SHELL", "curl -u admin:testing -s http://localhost:9200/_cat/health?h=status | grep -q green"] - retries: 300 - interval: 1s - ports: - - 9200:9200 # Used by base tests kibana: From 8fc946b6ffddd5345d9614f964735c0c79ca85d2 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 10 Jul 2025 10:04:30 +0530 Subject: [PATCH 14/15] fix mage --- x-pack/metricbeat/magefile.go | 46 +++++++++++------------------------ 1 file changed, 14 insertions(+), 32 deletions(-) diff --git a/x-pack/metricbeat/magefile.go b/x-pack/metricbeat/magefile.go index 3e239ade95e3..048dbf8ef52c 100644 --- a/x-pack/metricbeat/magefile.go +++ b/x-pack/metricbeat/magefile.go @@ -243,25 +243,26 @@ func IntegTest() { // Use MODULE=module to run only tests for `module`. func GoIntegTest(ctx context.Context) error { - // build integration test binary with otel sub command - devtools.BuildSystemTestOTelBinary() - args := devtools.DefaultGoTestIntegrationFromHostArgs() - // ES_USER must be admin in order for the Go Integration tests to function because they require - // indices:data/read/search - args.Env["ES_USER"] = args.Env["ES_SUPERUSER_USER"] - args.Env["ES_PASS"] = args.Env["ES_SUPERUSER_PASS"] - // run integration test from home directory - args.Packages = []string{"./tests/integration/"} - devtools.GoIntegTestFromHost(ctx, args) - - // run module integration test + // define modules if os.Getenv("CI") == "true" { mg.Deps(devtools.DefineModules) } if !devtools.IsInIntegTestEnv() { + // build integration test binary with otel sub command + devtools.BuildSystemTestOTelBinary() + args := devtools.DefaultGoTestIntegrationFromHostArgs() + // ES_USER must be admin in order for the Go Integration tests to function because they require + // indices:data/read/search + args.Env["ES_USER"] = args.Env["ES_SUPERUSER_USER"] + args.Env["ES_PASS"] = args.Env["ES_SUPERUSER_PASS"] + // run integration test from home directory + args.Packages = []string{"./tests/integration/"} + devtools.GoIntegTestFromHost(ctx, args) + mg.SerialDeps(Fields, Dashboards) } + return devtools.GoTestIntegrationForModule(ctx) } @@ -272,27 +273,8 @@ func GoIntegTest(ctx context.Context) error { // Use TEST_TAGS=tag1,tag2 to add additional build tags. // Use MODULE=module to run only tests for `module`. func GoFIPSOnlyIntegTest(ctx context.Context) error { - // build integration test binary with otel sub command - devtools.BuildSystemTestOTelBinary() - args := devtools.DefaultGoTestIntegrationFromHostArgs() - // ES_USER must be admin in order for the Go Integration tests to function because they require - // indices:data/read/search - args.Env["ES_USER"] = args.Env["ES_SUPERUSER_USER"] - args.Env["ES_PASS"] = args.Env["ES_SUPERUSER_PASS"] - // run integration test from home directory - args.Packages = []string{"./tests/integration/"} - devtools.GoIntegTestFromHost(ctx, args) - - // run module integration test - if os.Getenv("CI") == "true" { - mg.Deps(devtools.DefineModules) - } - - if !devtools.IsInIntegTestEnv() { - mg.SerialDeps(Fields, Dashboards) - } os.Setenv("GODEBUG", "fips140=only") - return devtools.GoTestIntegrationForModule(ctx) + return GoIntegTest(ctx) } // PythonIntegTest executes the python system tests in the integration From 885c39bcfcffb5b8bf71004b49cc72cd741e5210 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 10 Jul 2025 21:53:07 +0530 Subject: [PATCH 15/15] address comments --- dev-tools/mage/build.go | 6 ++++++ x-pack/filebeat/magefile.go | 4 +--- x-pack/metricbeat/magefile.go | 4 +--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/dev-tools/mage/build.go b/dev-tools/mage/build.go index d4ed8aaaf95e..ced63a6625d9 100644 --- a/dev-tools/mage/build.go +++ b/dev-tools/mage/build.go @@ -198,6 +198,12 @@ func GolangCrossBuild(params BuildArgs) error { return Build(params) } +func BuildOTel() error { + args := DefaultBuildArgs() + args.ExtraFlags = append(args.ExtraFlags, "-tags", "otelbeat") + return Build(args) +} + // Build invokes "go build" to produce a binary. func Build(params BuildArgs) error { fmt.Println(">> build: Building", params.Name) diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index 4b95ad92d246..ea5beffd33fc 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -45,9 +45,7 @@ func Build() error { // BuildOTel builds the Beat binary with OTel sub command func BuildOTel() error { - args := devtools.DefaultBuildArgs() - args.ExtraFlags = append(args.ExtraFlags, "-tags", "otelbeat") - return devtools.Build(args) + return devtools.BuildOTel() } // BuildSystemTestBinary builds a binary instrumented for use with Python system tests. diff --git a/x-pack/metricbeat/magefile.go b/x-pack/metricbeat/magefile.go index 048dbf8ef52c..739d4359bf97 100644 --- a/x-pack/metricbeat/magefile.go +++ b/x-pack/metricbeat/magefile.go @@ -54,9 +54,7 @@ func Build() error { // BuildOTel builds the Beat binary with OTel sub command func BuildOTel() error { - args := devtools.DefaultBuildArgs() - args.ExtraFlags = append(args.ExtraFlags, "-tags", "otelbeat") - return devtools.Build(args) + return devtools.BuildOTel() } // GolangCrossBuild build the Beat binary inside of the golang-builder.