Skip to content

Commit

Permalink
add metricbeat receiver (#41738) (#41779)
Browse files Browse the repository at this point in the history
* add metricbeat receiver

(cherry picked from commit 6203368)

Co-authored-by: Lee E Hinman <[email protected]>
  • Loading branch information
mergify[bot] and leehinman authored Nov 26, 2024
1 parent 7e92949 commit daf8f16
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 3 deletions.
7 changes: 5 additions & 2 deletions x-pack/filebeat/fbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ func createDefaultConfig() component.Config {
}

func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
cfg := baseCfg.(*Config)
cfg, ok := baseCfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not convert otel config to filebeat config")
}

settings := cmd.FilebeatSettings(Name)
globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors())
Expand Down Expand Up @@ -59,7 +62,7 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
return nil, fmt.Errorf("error getting %s creator:%w", Name, err)
}

return &filebeatReceiver{beat: &b.Beat, beater: fbBeater}, nil
return &filebeatReceiver{beat: &b.Beat, beater: fbBeater, logger: set.Logger}, nil
}

func defaultProcessors() []mapstr.M {
Expand Down
9 changes: 8 additions & 1 deletion x-pack/filebeat/fbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,28 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)

type filebeatReceiver struct {
beat *beat.Beat
beater beat.Beater
logger *zap.Logger
}

func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error {
go func() {
_ = fb.beater.Run(fb.beat)
fb.logger.Info("starting filebeat receiver")
err := fb.beater.Run(fb.beat)
if err != nil {
fb.logger.Error("filebeat receiver run error", zap.Error(err))
}
}()
return nil
}

func (fb *filebeatReceiver) Shutdown(ctx context.Context) error {
fb.logger.Info("stopping filebeat receiver")
fb.beater.Stop()
return nil
}
25 changes: 25 additions & 0 deletions x-pack/metricbeat/mbreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import "fmt"

// Config is config settings for metricbeat receiver. The structure of
// which is the same as the metricbeat.yml configuration file.
type Config struct {
Beatconfig map[string]interface{} `mapstructure:",remain"`
}

// Validate checks if the configuration in valid
func (cfg *Config) Validate() error {
if len(cfg.Beatconfig) == 0 {
return fmt.Errorf("Configuration is required")
}
_, prs := cfg.Beatconfig["metricbeat"]
if !prs {
return fmt.Errorf("Configuration key 'metricbeat' is required")
}
return nil
}
44 changes: 44 additions & 0 deletions x-pack/metricbeat/mbreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidate(t *testing.T) {
tests := map[string]struct {
c *Config
hasError bool
errorString string
}{
"Empty config": {
c: &Config{Beatconfig: map[string]interface{}{}},
hasError: true,
errorString: "Configuration is required",
},
"No metricbeat section": {
c: &Config{Beatconfig: map[string]interface{}{"other": map[string]interface{}{}}},
hasError: true,
errorString: "Configuration key 'metricbeat' is required",
},
"Valid config": {
c: &Config{Beatconfig: map[string]interface{}{"metricbeat": map[string]interface{}{}}},
hasError: false,
errorString: "",
},
}
for name, tc := range tests {
err := tc.c.Validate()
if tc.hasError {
assert.NotNilf(t, err, "%s failed, should have had error", name)
assert.Equalf(t, err.Error(), tc.errorString, "%s failed, error not equal", name)
} else {
assert.Nilf(t, err, "%s failed, should not have error", name)
}
}
}
61 changes: 61 additions & 0 deletions x-pack/metricbeat/mbreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/metricbeat/beater"
"github.com/elastic/beats/v7/metricbeat/cmd"
)

const (
Name = "metricbeatreceiver"
)

func createDefaultConfig() component.Config {
return &Config{}
}

func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
cfg, ok := baseCfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not convert otel config to metricbeat config")
}
settings := cmd.MetricbeatSettings(Name)
settings.ElasticLicensed = true

b, err := instance.NewBeatReceiver(settings, cfg.Beatconfig, consumer, set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}

beatCreator := beater.DefaultCreator()

beatConfig, err := b.BeatConfig()
if err != nil {
return nil, fmt.Errorf("error getting beat config: %w", err)
}

mbBeater, err := beatCreator(&b.Beat, beatConfig)
if err != nil {
return nil, fmt.Errorf("error getting %s creator:%w", Name, err)
}

return &metricbeatReceiver{beat: &b.Beat, beater: mbBeater, logger: set.Logger}, nil
}

func NewFactory() receiver.Factory {
return receiver.NewFactory(
component.MustNewType(Name),
createDefaultConfig,
receiver.WithLogs(createReceiver, component.StabilityLevelAlpha))
}
37 changes: 37 additions & 0 deletions x-pack/metricbeat/mbreceiver/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import (
"context"

"github.com/elastic/beats/v7/libbeat/beat"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)

type metricbeatReceiver struct {
beat *beat.Beat
beater beat.Beater
logger *zap.Logger
}

func (mb *metricbeatReceiver) Start(ctx context.Context, host component.Host) error {
go func() {
mb.logger.Info("starting metricbeat receiver")
err := mb.beater.Run(mb.beat)
if err != nil {
mb.logger.Error("metricbeat receiver run error", zap.Error(err))
}
}()
return nil
}

func (mb *metricbeatReceiver) Shutdown(ctx context.Context) error {
mb.logger.Info("stopping metricbeat receiver")
mb.beater.Stop()
return nil
}
92 changes: 92 additions & 0 deletions x-pack/metricbeat/mbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package mbreceiver

import (
"bytes"
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func TestNewReceiver(t *testing.T) {
config := Config{
Beatconfig: map[string]interface{}{
"metricbeat": map[string]interface{}{
"modules": []map[string]interface{}{
{
"module": "system",
"enabled": true,
"period": "1s",
"processes": []string{".*"},
"metricsets": []string{"cpu"},
},
},
},
"output": map[string]interface{}{
"otelconsumer": map[string]interface{}{},
},
"logging": map[string]interface{}{
"level": "debug",
"selectors": []string{
"*",
},
},
"path.home": t.TempDir(),
},
}

var zapLogs bytes.Buffer
core := zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
zapcore.AddSync(&zapLogs),
zapcore.DebugLevel)

receiverSettings := receiver.Settings{}
receiverSettings.Logger = zap.New(core)

var countLogs int
logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
countLogs = countLogs + ld.LogRecordCount()
return nil
})
require.NoError(t, err, "Error creating log consumer")

r, err := createReceiver(context.Background(), receiverSettings, &config, logConsumer)
require.NoErrorf(t, err, "Error creating receiver. Logs:\n %s", zapLogs.String())
err = r.Start(context.Background(), nil)
require.NoError(t, err, "Error starting metricbeatreceiver")

ch := make(chan bool, 1)
timer := time.NewTimer(120 * time.Second)
defer timer.Stop()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for tick := ticker.C; ; {
select {
case <-timer.C:
t.Fatalf("consumed logs didn't increase\nCount: %d\nLogs: %s\n", countLogs, zapLogs.String())
case <-tick:
tick = nil
go func() { ch <- countLogs > 0 }()
case v := <-ch:
if v {
goto found
}
tick = ticker.C
}
}
found:
err = r.Shutdown(context.Background())
require.NoError(t, err, "Error shutting down metricbeatreceiver")
}

0 comments on commit daf8f16

Please sign in to comment.