Skip to content
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c6bd143
fix: fix status reporting for metricsets
VihasMakwana May 28, 2025
a4ead4d
remove if
VihasMakwana May 28, 2025
5b5a218
fix map initialization
VihasMakwana May 28, 2025
c7ab288
revert wrapper changes
VihasMakwana May 29, 2025
8b17fb0
fix status reporting
VihasMakwana May 29, 2025
1f18cd6
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana May 29, 2025
48b8e12
notice
VihasMakwana May 29, 2025
47c529b
nit
VihasMakwana May 29, 2025
025f2c7
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana May 29, 2025
ba19557
add it for filebeat
VihasMakwana May 29, 2025
308ccaf
use hash instead of runner.String()
VihasMakwana May 29, 2025
5c53d41
make check
VihasMakwana May 30, 2025
afcaffe
make update
VihasMakwana May 30, 2025
e25c343
rename
VihasMakwana May 30, 2025
006f3fd
define otel manager
VihasMakwana May 30, 2025
21bd422
manager
VihasMakwana May 30, 2025
4a7827a
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana May 31, 2025
0fb0cd2
license
VihasMakwana May 31, 2025
70d930a
make check
VihasMakwana Jun 2, 2025
2517b55
move status to libbeat
VihasMakwana Jun 3, 2025
513f433
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana Jun 3, 2025
9540d52
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana Jun 4, 2025
b44759c
add mutex
VihasMakwana Jun 4, 2025
cb09c7b
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana Jun 4, 2025
43e3d91
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana Jun 5, 2025
c39093e
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana Jun 5, 2025
461dd58
chore: add testing
VihasMakwana Jun 5, 2025
b54d7ef
lint
VihasMakwana Jun 5, 2025
b0b268a
refactor
VihasMakwana Jun 6, 2025
5d7f818
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana Jun 6, 2025
31b99f3
linter happy
VihasMakwana Jun 6, 2025
a93f275
add test case
VihasMakwana Jun 6, 2025
dfd1d24
comments
VihasMakwana Jun 6, 2025
e2649a4
Merge branch 'main' into fix-status-reporting-metricsets
VihasMakwana Jun 9, 2025
8e33dfb
test improvements
VihasMakwana Jun 10, 2025
d64de30
Merge branch 'fix-status-reporting-metricsets' of github.com:VihasMak…
VihasMakwana Jun 10, 2025
77a7911
minor change
VihasMakwana Jun 10, 2025
c274941
comment
VihasMakwana Jun 10, 2025
3f8c1ef
gci
VihasMakwana Jun 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
424 changes: 212 additions & 212 deletions NOTICE.txt

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/management/status"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -66,14 +68,18 @@ func (c *crawler) Start(
pipeline beat.PipelineConnector,
configInputs *conf.C,
configModules *conf.C,
reporter status.StatusReporter,
) error {
log := c.log

log.Infof("Loading Inputs: %d", len(c.inputConfigs))

groupReporter := status.NewGroupStatusReporter(reporter)

// Prospect the globs/paths given on the command line and launch harvesters
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig)
err := c.startInput(pipeline, inputConfig, groupReporter)

if err != nil {
return fmt.Errorf("starting input failed: %w", err)
}
Expand Down Expand Up @@ -112,6 +118,7 @@ func (c *crawler) Start(
func (c *crawler) startInput(
pipeline beat.PipelineConnector,
config *conf.C,
reporter status.RunnerReporter,
) error {

if !config.Enabled() {
Expand Down Expand Up @@ -139,7 +146,9 @@ func (c *crawler) startInput(
if inputRunner, ok := runner.(*input.Runner); ok {
inputRunner.Once = c.once
}

if r, ok := runner.(status.WithStatusReporter); ok {
r.SetStatusReporter(reporter.GetReporterForRunner(id))
}
c.inputs[id] = runner

c.log.Infof("Starting input (ID: %d)", id)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
fb.logger.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)
err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules, b.Manager)
if err != nil {
crawler.Stop()
cancelPipelineFactoryCtx()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ require (
go.elastic.co/apm/v2 v2.7.0
go.mongodb.org/mongo-driver v1.14.0
go.opentelemetry.io/collector/component v1.31.0
go.opentelemetry.io/collector/component/componentstatus v0.125.0
go.opentelemetry.io/collector/config/configtls v1.31.0
go.opentelemetry.io/collector/confmap v1.31.0
go.opentelemetry.io/collector/consumer v1.31.0
Expand Down Expand Up @@ -408,7 +409,6 @@ require (
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/client v1.31.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.125.0 // indirect
go.opentelemetry.io/collector/component/componenttest v0.125.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.125.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.31.0 // indirect
Expand Down
122 changes: 122 additions & 0 deletions libbeat/management/status/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package status

import (
"sync"
)

type runnerState struct {
state Status
msg string
}

// RunnerReporter defines an interface that returns a StatusReporter for a specific runner.
// This is used for grouping and managing statuses of multiple runners
type RunnerReporter interface {
GetReporterForRunner(id uint64) StatusReporter
}

// NewGroupStatusReporter creates a reporter that aggregates the statuses of multiple runners
// and reports the combined status to the parent StatusReporter.
// This is needed because multiple modules can report different statuses, and we want to avoid
// repeatedly flipping the parent's status.
func NewGroupStatusReporter(parent StatusReporter) RunnerReporter {
Comment thread
VihasMakwana marked this conversation as resolved.
Outdated
if parent == nil {
return &nopStatus{}
}
return &reporter{
parent: parent,
runnerStates: make(map[uint64]runnerState),
}
}

type reporter struct {
runnerStates map[uint64]runnerState
parent StatusReporter
mtx sync.Mutex
}

func (r *reporter) GetReporterForRunner(id uint64) StatusReporter {
r.mtx.Lock()
defer r.mtx.Unlock()
return &subReporter{
id: id,
r: r,
}
}

func (r *reporter) updateStatusForRunner(id uint64, state Status, msg string) {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.runnerStates == nil {
r.runnerStates = make(map[uint64]runnerState)
}

// add status for the runner to the map
r.runnerStates[id] = runnerState{
state: state,
msg: msg,
}

// calculate the aggregate state of beat based on the module states
calcState, calcMsg := r.calculateState()

// report status to parent reporter
r.parent.UpdateStatus(calcState, calcMsg)
}

func (r *reporter) calculateState() (Status, string) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you've pointed me at the pre-existing implementation for inputs+streams in

func (u *agentUnit) calcState() (status.Status, string) {

Why does this also need to exist? I can see what it is doing but it's not clear why you had to add this separate group reporter?

The manager already has an UpdateStatus method meant to report status, and there is now an OtelManager that can do OTel specific things, why do we need to add group status reporters in metricbeat and filebeat without going through the existing manager interface?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous one is tied to the concept of units, which we don't have anymore?

It still feels like it would be best if there was a way to account for this through the manager interface so that everything is centralized into a single interface.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous control protocol implementation, the status reporter was injected when the manager reloaded inputs:

in.StatusReporter = unit.GetReporterForStreamByIndex(idx)

This also defined the reporting resolution of status reporters, there was one per unit which mapped to one per input.

In the otel world there would naturally be one status reporter per receiver, and we would ideally have one per beat receiver input one day.

Why can't the status reporter be injected when the receiver is created? I think you probably still need this code to toggle the receiver status based on the input UpdateStatus calls but I am suspecting the group reporters are being created in the wrong places (directly in metricbeat.go and crawler.go) which are not beat receiver specific.

@VihasMakwana VihasMakwana Jun 10, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this also need to exist? I can see what it is doing but it's not clear why you had to add this separate group reporter?

I'll give you an example why we need the resolution logic. Consider the following config

inputs:
- module: system
  metricsets:
        - process
- module: system
  metricsets:
        - cpu
- module: system
  metricsets:
        - memory

In metricbeat, we create one Runner per module. The status reporter for runner is set here

func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) {
for _, runner := range rg.runners {
if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
runnerWithStatus.SetStatusReporter(reporter)
}
}
}
and it accepts any implementation we pass. We just need to adhere to the status.Reporter interface
type StatusReporter interface {
// UpdateStatus updates the status of the unit.
UpdateStatus(status Status, msg string)
}

if we directly set the otel's status reporter as it is (without any group reporter), then we face following problems:

  1. Conflicting statues:
    • When multiple modules are running, each one can independently update the shared reporter's status. This leads can lead misleading results. For example:
      • If one module with process metricset is DEGRADED, it updates the reporter's status, which is as expected.
      • But if another module is HEALTHY, it would also update reporter's status as HEALTHY, overwriting previous DEGRADED state.
  2. To avoid this race condition, we need a centralised place to aggregate statues of each module and calculate status of entire beat.

Why can't the status reporter be injected when the receiver is created? I think you probably still need this code to toggle the receiver status based on the input UpdateStatus calls but I am suspecting the group reporters are being created in the wrong places (directly in metricbeat.go and crawler.go) which are not beat receiver specific.

That should be the case in an ideal world, but receivers can report status via component.Host, which is only accessible during receiver startup. Unfortunately, there's currently no way to configure a status reporter at the time of receiver creation 😢

I mentioned this when I originally opened the PR, but I guess it got lost in the history.

It still feels like it would be best if there was a way to account for this through the manager interface so that everything is centralized into a single interface.

This is on my TODO list. I'll work on it as a follow-up.

@cmacknz cmacknz Jun 10, 2025

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, going to summarize what I mentioned in the project meeting today to make sure it is captured here:

  1. We need to make sure status reporting works for all of the Beats (heartbeat, osquerybeat, etc) and ideally we want to be able to do this without going into each individual Beat's concept of a module or an input. If we can find a way to centralize the injection of the group status reporter in libbeat our life is easier. In the control protocol, the Beat starts up with no inputs configured and then on reload the status reporters are injected.

    in.StatusReporter = unit.GetReporterForStreamByIndex(idx)

  2. We need to make the end state grouping clearer and make sure the way we've wired this up can preserve it. Today each individual input in an elastic-agent.yml has an independent status, and streams under that input are rolled up into that of the parent input. We need to preserve this with Beats receivers but it is not entirely the Beats responsibility to do this, it also might not be possible with the way otel status reporting works today, we'd ideally want a concept of a sub-component (for multiple inputs in a single receiver have independent state).

To use a configuration example let's imagine someone split the cpu, memory, network, and filesystem system metricsets across two inputs:

inputs:
  # This system/metrics module reports state independently of any other in the same configuration.
  - type: system/metrics
    id: cpu-memory
    use_output: default
    streams:
      # The status of each metricset is aggregated into the state of overall parent module.
      - metricsets:
        - cpu
        data_stream.dataset: system.cpu
      - metricsets:
        - memory
        data_stream.dataset: system.memory
  # This system/metrics module reports state independently of the one above.
  - type: system/metrics
    id: network-filesystem
    use_output: default
    streams:
      # The status of each metricset is aggregated into the state of overall parent module.
      - metricsets:
        - network
        data_stream.dataset: system.network
      - metricsets:
        - filesystem
        data_stream.dataset: system.filesystem

I think to get this to work correctly Elastic Agent would have to orchestrate both of those two inputs into separate system/metrics receivers (CC @leehinman and @swiatekm keep me honest) without doing anything else. Otherwise, we may need to do something outside of straight collector component status reporting to preserve the way this works today.

If we one day gained the ability to report sub-component status they wouldn't need to be separate receivers.

reportedState := Running
reportedMsg := ""
for _, s := range r.runnerStates {
switch s.state {
case Degraded:
if reportedState != Degraded {
reportedState = Degraded
reportedMsg = s.msg
}
case Failed:
// we've encountered a failed runner.
// short-circuit and return, as Failed state takes precedence over other states
return s.state, s.msg
Comment thread
cmacknz marked this conversation as resolved.
}
}
return reportedState, reportedMsg
}

type nopStatus struct{}

type noopReporter struct{}

func (*noopReporter) UpdateStatus(Status, string) {}

func (s *nopStatus) GetReporterForRunner(id uint64) StatusReporter {
return &noopReporter{}
}

// subReporter implements status.StatusReporter
type subReporter struct {
id uint64
r *reporter
}

func (m *subReporter) UpdateStatus(status Status, msg string) {
// report status to its parent
m.r.updateStatusForRunner(m.id, status, msg)
}
56 changes: 56 additions & 0 deletions libbeat/management/status/group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package status

import (
"testing"

"github.com/stretchr/testify/require"
)

type mockStatusReporter struct {
s Status
msg string
}

func (m *mockStatusReporter) UpdateStatus(s Status, msg string) {
m.s = s
m.msg = msg
}

func TestGroupStatus(t *testing.T) {
m := &mockStatusReporter{}
reporter := NewGroupStatusReporter(m)

subReporter1, subReporter2, subReporter3 := reporter.GetReporterForRunner(1), reporter.GetReporterForRunner(2), reporter.GetReporterForRunner(3)

subReporter1.UpdateStatus(Running, "")
subReporter2.UpdateStatus(Running, "")
subReporter3.UpdateStatus(Running, "")

require.Equal(t, m.s, Running)
require.Equal(t, m.msg, "")

subReporter1.UpdateStatus(Degraded, "Degrade Runner1")
require.Equal(t, m.s, Degraded)
require.Equal(t, m.msg, "Degrade Runner1")

subReporter2.UpdateStatus(Failed, "Failed Runner2")
Comment thread
VihasMakwana marked this conversation as resolved.
Outdated
require.Equal(t, m.s, Failed)
require.Equal(t, m.msg, "Failed Runner2")
}
20 changes: 19 additions & 1 deletion libbeat/otelbeat/oteltest/oteltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
Expand All @@ -38,6 +39,18 @@ import (
"go.uber.org/zap/zaptest/observer"
)

type MockHost struct {
Evt *componentstatus.Event
}

func (*MockHost) GetExtensions() map[component.ID]component.Component {
return nil
}

func (h *MockHost) Report(evt *componentstatus.Event) {
h.Evt = evt
}

type ReceiverConfig struct {
Name string
Config component.Config
Expand All @@ -61,6 +74,8 @@ func CheckReceivers(params CheckReceiversParams) {
var logsMu sync.Mutex
logs := make(map[string][]mapstr.M)

host := &MockHost{}

zapCore := zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
&zaptest.Discarder{},
Expand Down Expand Up @@ -112,7 +127,7 @@ func CheckReceivers(params CheckReceiversParams) {
}

for i, r := range receivers {
err := r.Start(ctx, nil)
err := r.Start(ctx, host)
require.NoErrorf(t, err, "Error starting receiver %d", i)
defer func() {
require.NoErrorf(t, r.Shutdown(ctx), "Error shutting down receiver %d", i)
Expand All @@ -138,6 +153,9 @@ func CheckReceivers(params CheckReceiversParams) {
require.Equal(t, zl.ContextMap()["otelcol.signal"], "logs")
break
}
require.NotNil(t, host.Evt)
require.Nil(t, host.Evt.Err())
require.Equal(t, host.Evt.Status(), componentstatus.StatusOK)
Comment thread
cmacknz marked this conversation as resolved.

params.AssertFunc(ct, logs, zapLogs)
}, 2*time.Minute, 100*time.Millisecond,
Expand Down
25 changes: 20 additions & 5 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/management/status"

"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/module"
Expand All @@ -44,9 +46,9 @@ import (

// Metricbeat implements the Beater interface for metricbeat.
type Metricbeat struct {
done chan struct{} // Channel used to initiate shutdown.
stopOnce sync.Once // wraps the Stop() method
runners []cfgfile.Runner // Active list of module runners.
done chan struct{} // Channel used to initiate shutdown.
stopOnce sync.Once // wraps the Stop() method
runners map[uint64]cfgfile.Runner // Active list of module runners.
config Config
registry *mb.Register
autodiscover *autodiscover.Autodiscover
Expand Down Expand Up @@ -154,6 +156,7 @@ func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Op
config: config,
registry: registry,
logger: b.Info.Logger,
runners: make(map[uint64]cfgfile.Runner),
}

for _, applyOption := range options {
Expand Down Expand Up @@ -202,7 +205,12 @@ func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Op
return nil, err
}

metricbeat.runners = append(metricbeat.runners, runner)
hash, err := cfgfile.HashConfig(moduleCfg)
Comment thread
cmacknz marked this conversation as resolved.
if err != nil {
return nil, fmt.Errorf("error hashing module config: %w", err)
}

metricbeat.runners[hash] = runner
Comment on lines +208 to +213

@VihasMakwana VihasMakwana May 30, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because each runner has to be associated with a unique id.
Filebeat already has the same logic in place.

}

if len(metricbeat.runners) == 0 && !dynamicCfgEnabled {
Expand Down Expand Up @@ -235,8 +243,15 @@ func newMetricbeat(b *beat.Beat, c *conf.C, registry *mb.Register, options ...Op
func (bt *Metricbeat) Run(b *beat.Beat) error {
var wg sync.WaitGroup

groupReporter := status.NewGroupStatusReporter(b.Manager)

// Static modules (metricbeat.runners)
for _, r := range bt.runners {
for hash, r := range bt.runners {
// If the otelStatusReporter is set, we need to set the status reporter
if status, ok := r.(status.WithStatusReporter); ok {
status.SetStatusReporter(groupReporter.GetReporterForRunner(hash))
}

r.Start()
wg.Add(1)

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/fbreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (fb *filebeatReceiver) Start(ctx context.Context, host component.Host) erro
go func() {
defer fb.wg.Done()
fb.Logger.Info("starting filebeat receiver")
if err := fb.BeatReceiver.Start(); err != nil {
if err := fb.BeatReceiver.Start(host); err != nil {
fb.Logger.Error("error starting filebeat receiver", zap.Error(err))
}
}()
Expand Down
Loading