Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libbeat/otelbeat/beatconverter/beatconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

// list of supported beatreceivers
var supportedReceivers = []string{"filebeatreceiver"} // Add more beat receivers to this list when we add support
var supportedReceivers = []string{"filebeatreceiver", "metricbeatreceiver"} // Add more beat receivers to this list when we add support

type converter struct{}

Expand Down
68 changes: 68 additions & 0 deletions libbeat/otelbeat/providers/common_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to Elasticsearch B.V. under one or more contributor
Comment thread
khushijain21 marked this conversation as resolved.
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package providers

import (
"fmt"
"path/filepath"
"strings"

"go.opentelemetry.io/collector/confmap"

"github.com/elastic/beats/v7/libbeat/cfgfile"
)

var schemeMap = map[string]string{
"fb": "filebeatreceiver",
"mb": "metricbeatreceiver",
}

func LoadConfig(uri string, schemeName string) (*confmap.Retrieved, error) {
Comment thread
khushijain21 marked this conversation as resolved.
if !strings.HasPrefix(uri, schemeName+":") {
return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName)
}

// Load beat config file
cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil)
if err != nil {
return nil, err
}

var receiverMap map[string]any
err = cfg.Unpack(&receiverMap)
if err != nil {
return nil, err
}

receiverName := schemeMap[schemeName]
// beat specific configuration is defined here
cfgMap := map[string]any{
"receivers": map[string]any{
receiverName: receiverMap,
},
"service": map[string]any{
"pipelines": map[string]any{
"logs": map[string]any{
"receivers": []string{receiverName},
},
},
},
}

return confmap.NewRetrieved(cfgMap)
}
37 changes: 2 additions & 35 deletions libbeat/otelbeat/providers/fbprovider/fbprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package fbprovider

import (
"context"
"fmt"
"path/filepath"
"strings"

"go.opentelemetry.io/collector/confmap"

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/otelbeat/providers"
)

const schemeName = "fb"
Expand All @@ -43,37 +40,7 @@ func newProvider(confmap.ProviderSettings) confmap.Provider {

// Retrieve retrieves the beat configuration file and constructs otel config
func (fmp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
if !strings.HasPrefix(uri, schemeName+":") {
return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName)
}

// Load filebeat config file
cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil)
if err != nil {
return nil, err
}

var receiverMap map[string]any
err = cfg.Unpack(&receiverMap)
if err != nil {
return nil, err
}

// filebeat specific configuration is defined here
cfgMap := map[string]any{
"receivers": map[string]any{
"filebeatreceiver": receiverMap,
},
"service": map[string]any{
"pipelines": map[string]any{
"logs": map[string]any{
"receivers": []string{"filebeatreceiver"},
},
},
},
}

return confmap.NewRetrieved(cfgMap)
return providers.LoadConfig(uri, schemeName)
}

func (*provider) Scheme() string {
Expand Down
52 changes: 52 additions & 0 deletions libbeat/otelbeat/providers/mbprovider/mbprovider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package mbprovider

import (
"context"

"go.opentelemetry.io/collector/confmap"

"github.com/elastic/beats/v7/libbeat/otelbeat/providers"
)

const schemeName = "mb"

type provider struct{}

// The Provider provides configuration, and allows to watch/monitor for changes.
Comment thread
khushijain21 marked this conversation as resolved.
Outdated
func NewFactory() confmap.ProviderFactory {
return confmap.NewProviderFactory(newProvider)
}

func newProvider(confmap.ProviderSettings) confmap.Provider {
return &provider{}
}

// Retrieve retrieves the beat configuration file and constructs otel config
func (fmp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
return providers.LoadConfig(uri, schemeName)
}
Comment thread
khushijain21 marked this conversation as resolved.

func (*provider) Scheme() string {
return schemeName
}

func (*provider) Shutdown(context.Context) error {
return nil
}
Comment thread
khushijain21 marked this conversation as resolved.
Outdated
127 changes: 127 additions & 0 deletions libbeat/otelbeat/providers/mbprovider/mbprovider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package mbprovider

import (
"context"
_ "embed"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"

"gopkg.in/yaml.v2"
)

var beatsConfig = `
metricbeat.modules:
- module: system
metricsets:
- cpu # CPU usage
- load # CPU load averages
enabled: true
period: 10s
processes: ['.*']


output:
elasticsearch:
hosts: ["https://localhost:9200"]
username: elastic
password: changeme
index: form-otel-exporter
ssl.enabled: false
`

var expectedOutput = `
receivers:
metricbeatreceiver:
metricbeat:
modules:
- module: system
enabled: true
metricsets:
- cpu
- load
processes: ['.*']
period: 10s
path:
config: .
data: ./data
home: .
logs: ./logs
output:
elasticsearch:
hosts: ["https://localhost:9200"]
username: elastic
password: changeme
index: form-otel-exporter
ssl:
enabled: false

service:
pipelines:
logs:
receivers:
- "metricbeatreceiver"
`

func TestMetricbeatProvider(t *testing.T) {
p := provider{}

t.Run("test metricbeat provider", func(t *testing.T) {

tempFile, err := os.CreateTemp("", "metricbeat.yml")
require.NoError(t, err, "error creating temp file")
defer os.Remove(tempFile.Name()) // Clean up the file after we're done
defer tempFile.Close()
Comment thread
khushijain21 marked this conversation as resolved.
Outdated

content := []byte(beatsConfig)
_, err = tempFile.Write(content)
require.NoError(t, err, "error creating temp file")
Comment thread
khushijain21 marked this conversation as resolved.
Outdated

// prefix file path with fb:
ret, err := p.Retrieve(context.Background(), "mb:"+tempFile.Name(), nil)
require.NoError(t, err)

retValue, err := ret.AsRaw()
require.NoError(t, err)
expOutput := newFromYamlString(t, expectedOutput)

// convert it into a common type
want, err := yaml.Marshal(expOutput.ToStringMap())
require.NoError(t, err)
got, err := yaml.Marshal(retValue)
require.NoError(t, err)

assert.Equal(t, string(want), string(got))
assert.NoError(t, p.Shutdown(context.Background()))
})

}

func newFromYamlString(t *testing.T, input string) *confmap.Conf {
t.Helper()
var rawConf map[string]any
err := yaml.Unmarshal([]byte(input), &rawConf)
require.NoError(t, err)

return confmap.NewFromStringMap(rawConf)
}
3 changes: 1 addition & 2 deletions x-pack/filebeat/cmd/otelcmd_enabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
package cmd

import (
fbcmd "github.com/elastic/beats/v7/filebeat/cmd"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat"
)

func addOTelCommand(command *cmd.BeatsRootCmd) {
command.AddCommand(otelbeat.OTelCmd(fbcmd.Name))
command.AddCommand(otelbeat.OTelCmd(Name))
}
14 changes: 8 additions & 6 deletions x-pack/libbeat/common/otelbeat/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/otelbeat/beatconverter"
"github.com/elastic/beats/v7/libbeat/otelbeat/providers/fbprovider"
"github.com/elastic/beats/v7/libbeat/otelbeat/providers/mbprovider"
"github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/beats/v7/x-pack/metricbeat/mbreceiver"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var schemeMap = map[string]string{
"filebeat": "fb",
"filebeat": "fb",
"metricbeat": "mb",
}

func OTelCmd(beatname string) *cobra.Command {
Expand All @@ -37,8 +39,6 @@ func OTelCmd(beatname string) *cobra.Command {
Use: "otel",
Hidden: true,
RunE: func(cmd *cobra.Command, args []string) error {
logger := logp.NewLogger(beatname + "-otel-mode")
logger.Info("This mode is experimental and unsupported")

// get beat configuration file
beatCfg, _ := cmd.Flags().GetString("config")
Expand All @@ -64,14 +64,15 @@ func OTelCmd(beatname string) *cobra.Command {
},
}

command.Flags().String("config", beatname+"-otel.yml", "path to filebeat config file")
command.Flags().String("config", beatname+"-otel.yml", "path to"+beatname+"config file")
Comment thread
khushijain21 marked this conversation as resolved.
Outdated
return command
}

// Component initializes collector components
func getComponent() (otelcol.Factories, error) {
receivers, err := otelcol.MakeFactoryMap(
fbreceiver.NewFactory(),
mbreceiver.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, nil //nolint:nilerr //ignoring this error
Expand Down Expand Up @@ -118,7 +119,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings {
info := component.BuildInfo{
Command: "otel",
Description: "Beats OTel",
Version: "9.0.0",
Version: "9.2.0",
Comment thread
khushijain21 marked this conversation as resolved.
Outdated
}

return otelcol.CollectorSettings{
Expand All @@ -130,6 +131,7 @@ func getCollectorSettings(filename string) otelcol.CollectorSettings {
ProviderFactories: []confmap.ProviderFactory{
fileprovider.NewFactory(),
fbprovider.NewFactory(),
mbprovider.NewFactory(),
},
ConverterFactories: []confmap.ConverterFactory{
beatconverter.NewFactory(),
Expand Down
Loading