Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12699,11 +12699,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-system-metrics
Version: v0.11.7
Version: v0.11.9
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected].7/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected].9/LICENSE.txt:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ require (
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.9.0
github.com/elastic/elastic-agent-libs v0.18.9
github.com/elastic/elastic-agent-system-metrics v0.11.7
github.com/elastic/elastic-agent-system-metrics v0.11.9
github.com/elastic/go-elasticsearch/v8 v8.17.0
github.com/elastic/go-quark v0.2.0
github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7b
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.18.9 h1:NQbBK6uMd/t4S0Fe7iOpT5mE7RXMWVaOLBSEXHRBzWI=
github.com/elastic/elastic-agent-libs v0.18.9/go.mod h1:Repx7BMzE1v/gTipPogNIQeEnSGwOWGBC63h7h9c5aM=
github.com/elastic/elastic-agent-system-metrics v0.11.7 h1:1xm2okCM0eQZ4jivZgUFSlt6HAn/nPgKB/Fj8eLG6mY=
github.com/elastic/elastic-agent-system-metrics v0.11.7/go.mod h1:nzkrGajQA29YNcfP62gfzhxX9an3/xdQ3RmfQNw9YTI=
github.com/elastic/elastic-agent-system-metrics v0.11.9 h1:UDgT4ygXKGRwyNyUDzBKbDW7bYphZNg1GOLtDFqZ4Wg=
github.com/elastic/elastic-agent-system-metrics v0.11.9/go.mod h1:FgtshyeVEAxNqRoFkdL0MpCW0rHxQGFNEKjIsxfcrag=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
Expand Down
30 changes: 22 additions & 8 deletions libbeat/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,34 @@ import (
"github.com/elastic/elastic-agent-libs/monitoring"
)

type lookupFunc func(string) *monitoring.Namespace
// RegistryLookupFunc is used for looking up specfic registry inside a namespace
func RegistryLookupFunc(rootNamespace *monitoring.Namespace) LookupFunc {
return func(s string) *monitoring.Registry {
return rootNamespace.GetRegistry().GetRegistry(s)
}
}

// NamespaceLookupFunc is used for looking up root registry of a given namespace
func NamespaceLookupFunc() LookupFunc {
return func(s string) *monitoring.Registry {
return monitoring.GetNamespace(s).GetRegistry()
}
}

type LookupFunc func(string) *monitoring.Registry

// NewWithDefaultRoutes creates a new server with default API routes.
func NewWithDefaultRoutes(log *logp.Logger, config *config.C, ns lookupFunc) (*Server, error) {
func NewWithDefaultRoutes(log *logp.Logger, config *config.C, reg LookupFunc) (*Server, error) {
api, err := New(log, config)
if err != nil {
return nil, err
}

err = multierr.Combine(
api.AttachHandler("/", makeRootAPIHandler(makeAPIHandler(ns("info")))),
api.AttachHandler("/state", makeAPIHandler(ns("state"))),
api.AttachHandler("/stats", makeAPIHandler(ns("stats"))),
api.AttachHandler("/dataset", makeAPIHandler(ns("dataset"))),
api.AttachHandler("/", makeRootAPIHandler(makeAPIHandler(reg("info")))),
api.AttachHandler("/state", makeAPIHandler(reg("state"))),
api.AttachHandler("/stats", makeAPIHandler(reg("stats"))),
api.AttachHandler("/dataset", makeAPIHandler(reg("dataset"))),
)
if err != nil {
return nil, err
Expand All @@ -62,12 +76,12 @@ func makeRootAPIHandler(handler http.HandlerFunc) http.HandlerFunc {
}
}

func makeAPIHandler(ns *monitoring.Namespace) http.HandlerFunc {
func makeAPIHandler(registry *monitoring.Registry) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

data := monitoring.CollectStructSnapshot(
ns.GetRegistry(),
registry,
monitoring.Full,
false,
)
Expand Down
1 change: 1 addition & 0 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Info struct {

StateRegistry *monitoring.Registry
InfoRegistry *monitoring.Registry
StatsRegistry *monitoring.Registry
}
LogConsumer consumer.Logs // otel log consumer
UseDefaultProcessors bool // Whether to use the default processors
Expand Down
17 changes: 14 additions & 3 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,9 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, u

uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry()

reg := b.Info.Monitoring.StateRegistry.GetRegistry("libbeat")
reg := b.Info.Monitoring.StatsRegistry.GetRegistry("libbeat")
if reg == nil {
reg = b.Info.Monitoring.StateRegistry.NewRegistry("libbeat")
reg = b.Info.Monitoring.StatsRegistry.NewRegistry("libbeat")
}

tel := uniq_reg.GetRegistry("state")
Expand Down Expand Up @@ -669,7 +669,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
// that would be set at runtime.
if b.Config.HTTP.Enabled() {
var err error
b.API, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace)
b.API, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, api.NamespaceLookupFunc())
if err != nil {
return fmt.Errorf("could not start the HTTP server for the API: %w", err)
}
Expand Down Expand Up @@ -979,6 +979,17 @@ func (b *Beat) SetupRegistry() {
stateRegistry = monitoring.GetNamespace("state").GetRegistry()
}
b.Info.Monitoring.StateRegistry = stateRegistry

var statsRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
statsRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats")
if statsRegistry == nil {
statsRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats")
}
} else {
statsRegistry = monitoring.GetNamespace("stats").GetRegistry()
}
b.Info.Monitoring.StatsRegistry = statsRegistry
}

// handleFlags converts -flag to --flags, parses the command line
Expand Down
10 changes: 9 additions & 1 deletion x-pack/filebeat/fbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/x-pack/filebeat/include"
inputs "github.com/elastic/beats/v7/x-pack/filebeat/input/default-inputs"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -62,7 +63,14 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component.
return nil, fmt.Errorf("error getting %s creator:%w", Name, err)
}

return &filebeatReceiver{beat: &b.Beat, beater: fbBeater, logger: set.Logger}, nil
httpConf := struct {
HTTP *config.C `config:"http"`
}{}
if err := b.RawConfig.Unpack(&httpConf); err != nil {
return nil, fmt.Errorf("error starting API :%w", err)
}

return &filebeatReceiver{beat: b, beater: fbBeater, logger: set.Logger, httpConf: httpConf.HTTP}, nil
}

func defaultProcessors() []mapstr.M {
Expand Down
65 changes: 59 additions & 6 deletions x-pack/filebeat/fbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,38 @@ package fbreceiver

import (
"context"
"fmt"
"sync"

"github.com/elastic/beats/v7/libbeat/api"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
metricreport "github.com/elastic/elastic-agent-system-metrics/report"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)

type filebeatReceiver struct {
beat *beat.Beat
beater beat.Beater
logger *zap.Logger
wg sync.WaitGroup
beat *instance.Beat
beater beat.Beater
logger *zap.Logger
wg sync.WaitGroup
httpConf *config.C
}

func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error {
fb.wg.Add(1)
go func() {
defer fb.wg.Done()
fb.logger.Info("starting filebeat receiver")
err := fb.beater.Run(fb.beat)
if err != nil {
if err := fb.startMonitoring(); err != nil {
fb.logger.Error("could not start the HTTP server for the monitoring API", zap.Error(err))
}
if err := fb.beater.Run(&fb.beat.Beat); err != nil {
fb.logger.Error("filebeat receiver run error", zap.Error(err))
}
}()
Expand All @@ -37,6 +47,49 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro
func (fb *filebeatReceiver) Shutdown(ctx context.Context) error {
fb.logger.Info("stopping filebeat receiver")
fb.beater.Stop()
if err := fb.stopMonitoring(); err != nil {
return fmt.Errorf("error stopping monitoring server: %w", err)
}
fb.wg.Wait()
return nil
}

func (fb *filebeatReceiver) startMonitoring() error {
if fb.httpConf.Enabled() {
var err error

fb.beat.RegisterMetrics()

statsReg := fb.beat.Info.Monitoring.StatsRegistry

// stats.beat
processReg := statsReg.GetRegistry("beat")
if processReg == nil {
processReg = statsReg.NewRegistry("beat")
}

// stats.system
systemReg := statsReg.GetRegistry("system")
if systemReg == nil {
systemReg = statsReg.NewRegistry("system")
}

err = metricreport.SetupMetrics(logp.NewLogger("metrics"), fb.beat.Info.Beat, version.GetDefaultVersion(), metricreport.WithProcessRegistry(processReg), metricreport.WithSystemRegistry(systemReg))
if err != nil {
return err
}
fb.beat.API, err = api.NewWithDefaultRoutes(logp.NewLogger("metrics.http"), fb.httpConf, api.RegistryLookupFunc(fb.beat.Info.Monitoring.Namespace))
if err != nil {
return err
}
fb.beat.API.Start()
}
return nil
}

func (fb *filebeatReceiver) stopMonitoring() error {
if fb.beat.API != nil {
return fb.beat.API.Stop()
}
return nil
}
18 changes: 18 additions & 0 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ processors:
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
http.enabled: true
http.host: localhost
http.port: 5066
`

func TestFilebeatOTelE2E(t *testing.T) {
Expand Down Expand Up @@ -129,6 +132,7 @@ setup.template.pattern: logs-filebeat-default
}

assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
assertMonitoring(t)
}

func writeEventsToLogFile(t *testing.T, filename string, numEvents int) {
Expand Down Expand Up @@ -170,3 +174,17 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg
}
require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal")
}

func assertMonitoring(t *testing.T) {
r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

r, err = http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code")

r, err = http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}
Loading