Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dev-tools/mage/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ func GolangCrossBuild(params BuildArgs) error {
return Build(params)
}

func BuildOTel() error {
args := DefaultBuildArgs()
args.ExtraFlags = append(args.ExtraFlags, "-tags", "otelbeat")
return Build(args)
}

// Build invokes "go build" to produce a binary.
func Build(params BuildArgs) error {
fmt.Println(">> build: Building", params.Name)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/otelbeat/beatconverter/beatconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

// list of supported beatreceivers
var supportedReceivers = []string{"filebeatreceiver"} // Add more beat receivers to this list when we add support
var supportedReceivers = []string{"filebeatreceiver", "metricbeatreceiver"} // Add more beat receivers to this list when we add support

type converter struct{}

Expand Down
50 changes: 9 additions & 41 deletions libbeat/otelbeat/providers/fbprovider/fbprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,67 +19,35 @@ package fbprovider

import (
"context"
"fmt"
"path/filepath"
"strings"

"go.opentelemetry.io/collector/confmap"

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/otelbeat/providers"
)

const schemeName = "fb"

type provider struct{}
type fbProvider struct{}

// The Provider provides configuration, and allows to watch/monitor for changes.
// NewFactory returns a provider factory that loads filebeat configuration
func NewFactory() confmap.ProviderFactory {
return confmap.NewProviderFactory(newProvider)
}

func newProvider(confmap.ProviderSettings) confmap.Provider {
return &provider{}
return &fbProvider{}
}

// Retrieve retrieves the beat configuration file and constructs otel config
func (fmp *provider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
if !strings.HasPrefix(uri, schemeName+":") {
return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName)
}

// Load filebeat config file
cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil)
if err != nil {
return nil, err
}

var receiverMap map[string]any
err = cfg.Unpack(&receiverMap)
if err != nil {
return nil, err
}

// filebeat specific configuration is defined here
cfgMap := map[string]any{
"receivers": map[string]any{
"filebeatreceiver": receiverMap,
},
"service": map[string]any{
"pipelines": map[string]any{
"logs": map[string]any{
"receivers": []string{"filebeatreceiver"},
},
},
},
}

return confmap.NewRetrieved(cfgMap)
// uri here is the filepath of the beat config
func (*fbProvider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
return providers.LoadConfig(uri, schemeName)
}

func (*provider) Scheme() string {
func (*fbProvider) Scheme() string {
return schemeName
}

func (*provider) Shutdown(context.Context) error {
func (*fbProvider) Shutdown(context.Context) error {
return nil
}
16 changes: 7 additions & 9 deletions libbeat/otelbeat/providers/fbprovider/fbprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
_ "embed"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -101,21 +102,18 @@ service:
`

func TestFileBeatProvider(t *testing.T) {
p := provider{}
p := fbProvider{}

t.Run("test filebeat provider", func(t *testing.T) {

tempFile, err := os.CreateTemp("", "filebeat.yml")
require.NoError(t, err, "error creating temp file")
defer os.Remove(tempFile.Name()) // Clean up the file after we're done
defer tempFile.Close()
tempDir := t.TempDir()

content := []byte(beatsConfig)
_, err = tempFile.Write(content)
require.NoError(t, err, "error creating temp file")
tempFileName := filepath.Join(tempDir, "filebeat.yml")
err := os.WriteFile(tempFileName, []byte(beatsConfig), 0666)
require.NoError(t, err, "error writing to temp file")

// prefix file path with fb:
ret, err := p.Retrieve(context.Background(), "fb:"+tempFile.Name(), nil)
ret, err := p.Retrieve(context.Background(), "fb:"+tempFileName, nil)
require.NoError(t, err)

retValue, err := ret.AsRaw()
Expand Down
55 changes: 55 additions & 0 deletions libbeat/otelbeat/providers/mbprovider/mbprovider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package mbprovider

import (
"context"

"go.opentelemetry.io/collector/confmap"

"github.com/elastic/beats/v7/libbeat/otelbeat/providers"
)

const schemeName = "mb"

type mbProvider struct{}

// NewFactory returns a provider factory that loads metricbeat configuration
func NewFactory() confmap.ProviderFactory {
return confmap.NewProviderFactory(newProvider)
}

func newProvider(confmap.ProviderSettings) confmap.Provider {
return &mbProvider{}
}

// Retrieve retrieves the beat configuration file and constructs otel config
// uri here is the filepath of the beat config
func (*mbProvider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
return providers.LoadConfig(uri, schemeName)
}

// Scheme returns the scheme name
func (*mbProvider) Scheme() string {
return schemeName
}

// Shutdown is a noop, it always returns nil
func (*mbProvider) Shutdown(context.Context) error {
return nil
}
125 changes: 125 additions & 0 deletions libbeat/otelbeat/providers/mbprovider/mbprovider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package mbprovider

import (
"context"
_ "embed"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"

"gopkg.in/yaml.v2"
)

var beatsConfig = `
metricbeat.modules:
- module: system
metricsets:
- cpu # CPU usage
- load # CPU load averages
enabled: true
period: 10s
processes: ['.*']


output:
elasticsearch:
hosts: ["https://localhost:9200"]
username: elastic
password: changeme
index: form-otel-exporter
ssl.enabled: false
`

var expectedOutput = `
receivers:
metricbeatreceiver:
metricbeat:
modules:
- module: system
enabled: true
metricsets:
- cpu
- load
processes: ['.*']
period: 10s
path:
config: .
data: ./data
home: .
logs: ./logs
output:
elasticsearch:
hosts: ["https://localhost:9200"]
username: elastic
password: changeme
index: form-otel-exporter
ssl:
enabled: false

service:
pipelines:
logs:
receivers:
- "metricbeatreceiver"
`

func TestMetricbeatProvider(t *testing.T) {
p := mbProvider{}

t.Run("test metricbeat provider", func(t *testing.T) {

tempDir := t.TempDir()

tempFileName := filepath.Join(tempDir, "metricbeat.yml")
err := os.WriteFile(tempFileName, []byte(beatsConfig), 0666)
require.NoError(t, err, "error writing to temp file")

// prefix file path with mb:
ret, err := p.Retrieve(context.Background(), "mb:"+tempFileName, nil)
require.NoError(t, err)

retValue, err := ret.AsRaw()
require.NoError(t, err)
expOutput := newFromYamlString(t, expectedOutput)

// convert it into a common type
want, err := yaml.Marshal(expOutput.ToStringMap())
require.NoError(t, err)
got, err := yaml.Marshal(retValue)
require.NoError(t, err)

assert.Equal(t, string(want), string(got))
assert.NoError(t, p.Shutdown(context.Background()))
})

}

func newFromYamlString(t *testing.T, input string) *confmap.Conf {
t.Helper()
var rawConf map[string]any
err := yaml.Unmarshal([]byte(input), &rawConf)
require.NoError(t, err)

return confmap.NewFromStringMap(rawConf)
}
69 changes: 69 additions & 0 deletions libbeat/otelbeat/providers/providers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package providers

import (
"fmt"
"path/filepath"
"strings"

"go.opentelemetry.io/collector/confmap"

"github.com/elastic/beats/v7/libbeat/cfgfile"
)

var schemeMap = map[string]string{
"fb": "filebeatreceiver",
"mb": "metricbeatreceiver",
}

// LoadConfig loads beat configuration based on provided scheme-name and uri path
func LoadConfig(uri string, schemeName string) (*confmap.Retrieved, error) {
if !strings.HasPrefix(uri, schemeName+":") {
return nil, fmt.Errorf("%q uri is not supported by %q provider", uri, schemeName)
}

// Load beat config file
cfg, err := cfgfile.Load(filepath.Clean(uri[len(schemeName)+1:]), nil)
if err != nil {
return nil, err
}

var receiverMap map[string]any
err = cfg.Unpack(&receiverMap)
if err != nil {
return nil, err
}

receiverName := schemeMap[schemeName]
// beat specific configuration is defined here
cfgMap := map[string]any{
"receivers": map[string]any{
receiverName: receiverMap,
},
"service": map[string]any{
"pipelines": map[string]any{
"logs": map[string]any{
"receivers": []string{receiverName},
},
},
},
}

return confmap.NewRetrieved(cfgMap)
}
1 change: 1 addition & 0 deletions libbeat/tests/integration/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ func createTempDir(t *testing.T) string {
// using the default test credentials or the corresponding environment
// variables.
func EnsureESIsRunning(t *testing.T) {
t.Helper()
esURL := GetESURL(t, "http")

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second))
Expand Down
Loading
Loading