diff --git a/NOTICE.txt b/NOTICE.txt index 146b29e3e38d..96d2eca26d2c 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12699,11 +12699,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-system-metrics -Version: v0.11.7 +Version: v0.11.9 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.11.7/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.11.9/LICENSE.txt: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index b9d6fc621d67..c84e0bedd1dc 100644 --- a/go.mod +++ b/go.mod @@ -177,7 +177,7 @@ require ( github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.9.0 github.com/elastic/elastic-agent-libs v0.18.9 - github.com/elastic/elastic-agent-system-metrics v0.11.7 + github.com/elastic/elastic-agent-system-metrics v0.11.9 github.com/elastic/go-elasticsearch/v8 v8.17.0 github.com/elastic/go-quark v0.2.0 github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727 diff --git a/go.sum b/go.sum index 9a1c36d3f843..f40cb314da3e 100644 --- a/go.sum +++ b/go.sum @@ -326,8 +326,8 @@ github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7b github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= github.com/elastic/elastic-agent-libs v0.18.9 h1:NQbBK6uMd/t4S0Fe7iOpT5mE7RXMWVaOLBSEXHRBzWI= github.com/elastic/elastic-agent-libs v0.18.9/go.mod h1:Repx7BMzE1v/gTipPogNIQeEnSGwOWGBC63h7h9c5aM= -github.com/elastic/elastic-agent-system-metrics v0.11.7 h1:1xm2okCM0eQZ4jivZgUFSlt6HAn/nPgKB/Fj8eLG6mY= -github.com/elastic/elastic-agent-system-metrics v0.11.7/go.mod h1:nzkrGajQA29YNcfP62gfzhxX9an3/xdQ3RmfQNw9YTI= +github.com/elastic/elastic-agent-system-metrics v0.11.9 h1:UDgT4ygXKGRwyNyUDzBKbDW7bYphZNg1GOLtDFqZ4Wg= +github.com/elastic/elastic-agent-system-metrics v0.11.9/go.mod h1:FgtshyeVEAxNqRoFkdL0MpCW0rHxQGFNEKjIsxfcrag= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4= diff --git a/libbeat/api/routes.go b/libbeat/api/routes.go index fd0bcc8ca41b..08b75fd42357 100644 --- a/libbeat/api/routes.go +++ b/libbeat/api/routes.go @@ -30,20 +30,34 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring" ) -type lookupFunc func(string) *monitoring.Namespace +// RegistryLookupFunc is used for looking up specfic registry inside a namespace +func RegistryLookupFunc(rootNamespace *monitoring.Namespace) LookupFunc { + return func(s string) *monitoring.Registry { + return rootNamespace.GetRegistry().GetRegistry(s) + } +} + +// NamespaceLookupFunc is used for looking up root registry of a given namespace +func NamespaceLookupFunc() LookupFunc { + return func(s string) *monitoring.Registry { + return monitoring.GetNamespace(s).GetRegistry() + } +} + +type LookupFunc func(string) *monitoring.Registry // NewWithDefaultRoutes creates a new server with default API routes. -func NewWithDefaultRoutes(log *logp.Logger, config *config.C, ns lookupFunc) (*Server, error) { +func NewWithDefaultRoutes(log *logp.Logger, config *config.C, reg LookupFunc) (*Server, error) { api, err := New(log, config) if err != nil { return nil, err } err = multierr.Combine( - api.AttachHandler("/", makeRootAPIHandler(makeAPIHandler(ns("info")))), - api.AttachHandler("/state", makeAPIHandler(ns("state"))), - api.AttachHandler("/stats", makeAPIHandler(ns("stats"))), - api.AttachHandler("/dataset", makeAPIHandler(ns("dataset"))), + api.AttachHandler("/", makeRootAPIHandler(makeAPIHandler(reg("info")))), + api.AttachHandler("/state", makeAPIHandler(reg("state"))), + api.AttachHandler("/stats", makeAPIHandler(reg("stats"))), + api.AttachHandler("/dataset", makeAPIHandler(reg("dataset"))), ) if err != nil { return nil, err @@ -62,12 +76,12 @@ func makeRootAPIHandler(handler http.HandlerFunc) http.HandlerFunc { } } -func makeAPIHandler(ns *monitoring.Namespace) http.HandlerFunc { +func makeAPIHandler(registry *monitoring.Registry) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") data := monitoring.CollectStructSnapshot( - ns.GetRegistry(), + registry, monitoring.Full, false, ) diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index 784ebd5e7876..0ded360bfc62 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -48,6 +48,7 @@ type Info struct { StateRegistry *monitoring.Registry InfoRegistry *monitoring.Registry + StatsRegistry *monitoring.Registry } LogConsumer consumer.Logs // otel log consumer UseDefaultProcessors bool // Whether to use the default processors diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 8ede9655d340..9219a2568f92 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -467,9 +467,9 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, u uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry() - reg := b.Info.Monitoring.StateRegistry.GetRegistry("libbeat") + reg := b.Info.Monitoring.StatsRegistry.GetRegistry("libbeat") if reg == nil { - reg = b.Info.Monitoring.StateRegistry.NewRegistry("libbeat") + reg = b.Info.Monitoring.StatsRegistry.NewRegistry("libbeat") } tel := uniq_reg.GetRegistry("state") @@ -669,7 +669,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { // that would be set at runtime. if b.Config.HTTP.Enabled() { var err error - b.API, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace) + b.API, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, api.NamespaceLookupFunc()) if err != nil { return fmt.Errorf("could not start the HTTP server for the API: %w", err) } @@ -979,6 +979,17 @@ func (b *Beat) SetupRegistry() { stateRegistry = monitoring.GetNamespace("state").GetRegistry() } b.Info.Monitoring.StateRegistry = stateRegistry + + var statsRegistry *monitoring.Registry + if b.Info.Monitoring.Namespace != nil { + statsRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats") + if statsRegistry == nil { + statsRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats") + } + } else { + statsRegistry = monitoring.GetNamespace("stats").GetRegistry() + } + b.Info.Monitoring.StatsRegistry = statsRegistry } // handleFlags converts -flag to --flags, parses the command line diff --git a/x-pack/filebeat/fbreceiver/factory.go b/x-pack/filebeat/fbreceiver/factory.go index b3c12afb8979..6aee230c9764 100644 --- a/x-pack/filebeat/fbreceiver/factory.go +++ b/x-pack/filebeat/fbreceiver/factory.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/x-pack/filebeat/include" inputs "github.com/elastic/beats/v7/x-pack/filebeat/input/default-inputs" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" "go.opentelemetry.io/collector/component" @@ -62,7 +63,14 @@ func createReceiver(_ context.Context, set receiver.Settings, baseCfg component. return nil, fmt.Errorf("error getting %s creator:%w", Name, err) } - return &filebeatReceiver{beat: &b.Beat, beater: fbBeater, logger: set.Logger}, nil + httpConf := struct { + HTTP *config.C `config:"http"` + }{} + if err := b.RawConfig.Unpack(&httpConf); err != nil { + return nil, fmt.Errorf("error starting API :%w", err) + } + + return &filebeatReceiver{beat: b, beater: fbBeater, logger: set.Logger, httpConf: httpConf.HTTP}, nil } func defaultProcessors() []mapstr.M { diff --git a/x-pack/filebeat/fbreceiver/receiver.go b/x-pack/filebeat/fbreceiver/receiver.go index 495877abf43b..493c504b09a3 100644 --- a/x-pack/filebeat/fbreceiver/receiver.go +++ b/x-pack/filebeat/fbreceiver/receiver.go @@ -6,19 +6,27 @@ package fbreceiver import ( "context" + "fmt" "sync" + "github.com/elastic/beats/v7/libbeat/api" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cmd/instance" + "github.com/elastic/beats/v7/libbeat/version" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + metricreport "github.com/elastic/elastic-agent-system-metrics/report" "go.opentelemetry.io/collector/component" "go.uber.org/zap" ) type filebeatReceiver struct { - beat *beat.Beat - beater beat.Beater - logger *zap.Logger - wg sync.WaitGroup + beat *instance.Beat + beater beat.Beater + logger *zap.Logger + wg sync.WaitGroup + httpConf *config.C } func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) error { @@ -26,8 +34,10 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro go func() { defer fb.wg.Done() fb.logger.Info("starting filebeat receiver") - err := fb.beater.Run(fb.beat) - if err != nil { + if err := fb.startMonitoring(); err != nil { + fb.logger.Error("could not start the HTTP server for the monitoring API", zap.Error(err)) + } + if err := fb.beater.Run(&fb.beat.Beat); err != nil { fb.logger.Error("filebeat receiver run error", zap.Error(err)) } }() @@ -37,6 +47,49 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro func (fb *filebeatReceiver) Shutdown(ctx context.Context) error { fb.logger.Info("stopping filebeat receiver") fb.beater.Stop() + if err := fb.stopMonitoring(); err != nil { + return fmt.Errorf("error stopping monitoring server: %w", err) + } fb.wg.Wait() return nil } + +func (fb *filebeatReceiver) startMonitoring() error { + if fb.httpConf.Enabled() { + var err error + + fb.beat.RegisterMetrics() + + statsReg := fb.beat.Info.Monitoring.StatsRegistry + + // stats.beat + processReg := statsReg.GetRegistry("beat") + if processReg == nil { + processReg = statsReg.NewRegistry("beat") + } + + // stats.system + systemReg := statsReg.GetRegistry("system") + if systemReg == nil { + systemReg = statsReg.NewRegistry("system") + } + + err = metricreport.SetupMetrics(logp.NewLogger("metrics"), fb.beat.Info.Beat, version.GetDefaultVersion(), metricreport.WithProcessRegistry(processReg), metricreport.WithSystemRegistry(systemReg)) + if err != nil { + return err + } + fb.beat.API, err = api.NewWithDefaultRoutes(logp.NewLogger("metrics.http"), fb.httpConf, api.RegistryLookupFunc(fb.beat.Info.Monitoring.Namespace)) + if err != nil { + return err + } + fb.beat.API.Start() + } + return nil +} + +func (fb *filebeatReceiver) stopMonitoring() error { + if fb.beat.API != nil { + return fb.beat.API.Stop() + } + return nil +} diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 7a147add0dbe..b60eb39b4bbc 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -49,6 +49,9 @@ processors: - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ +http.enabled: true +http.host: localhost +http.port: 5066 ` func TestFilebeatOTelE2E(t *testing.T) { @@ -129,6 +132,7 @@ setup.template.pattern: logs-filebeat-default } assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") + assertMonitoring(t) } func writeEventsToLogFile(t *testing.T, filename string, numEvents int) { @@ -170,3 +174,17 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg } require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") } + +func assertMonitoring(t *testing.T) { + r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") + + r, err = http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") +}