Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 69 additions & 44 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ import (
"github.com/elastic/elastic-agent-libs/testing"
)

// Expvar metric names.
const (
successesKey = "success"
failuresKey = "failures"
eventsKey = "events"
// Expvar metric names.
successesKey = "success"
failuresKey = "failures"
eventsKey = "events"
consecutiveFailuresKey = "consecutive_failures"

// Failure threshold config key
failureThresholdKey = "failure_threshold"
)

var (
Expand Down Expand Up @@ -70,16 +74,18 @@ type metricSetWrapper struct {
module *Wrapper // Parent Module.
stats *stats // stats for this MetricSet.

periodic bool // Set to true if this metricset is a periodic fetcher
periodic bool // Set to true if this metricset is a periodic fetcher
failureThreshold uint // threshold of consecutive errors needed to set the stream as degraded
}

// stats bundles common metricset stats.
type stats struct {
key string // full stats key
ref uint32 // number of modules/metricsets reusing stats instance
success *monitoring.Int // Total success events.
failures *monitoring.Int // Total error events.
events *monitoring.Int // Total events published.
key string // full stats key
ref uint32 // number of modules/metricsets reusing stats instance
success *monitoring.Int // Total success events.
failures *monitoring.Int // Total error events.
events *monitoring.Int // Total events published.
consecutiveFailures *monitoring.Uint // Consecutive failures fetching this metricset
}

// NewWrapper creates a new module and its associated metricsets based on the given configuration.
Expand All @@ -106,11 +112,28 @@ func createWrapper(module mb.Module, metricSets []mb.MetricSet, options ...Optio
applyOption(wrapper)
}

failureThreshold := uint(1)

var streamHealthSettings struct {
FailureThreshold *uint `config:"failure_threshold"`
}

err := module.UnpackConfig(&streamHealthSettings)

if err != nil {
return nil, fmt.Errorf("unpacking raw config: %w", err)
}

if streamHealthSettings.FailureThreshold != nil {
failureThreshold = *streamHealthSettings.FailureThreshold
}

for i, metricSet := range metricSets {
wrapper.metricSets[i] = &metricSetWrapper{
MetricSet: metricSet,
module: wrapper,
stats: getMetricSetStats(wrapper.Name(), metricSet.Name()),
MetricSet: metricSet,
module: wrapper,
stats: getMetricSetStats(wrapper.Name(), metricSet.Name()),
failureThreshold: failureThreshold,
}
}
return wrapper, nil
Expand Down Expand Up @@ -254,35 +277,11 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
case mb.ReportingMetricSetV2Error:
reporter.StartFetchTimer()
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
if errors.As(err, &mb.PartialMetricsError{}) {
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
} else {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
msw.handleFetchError(err, reporter.V2())
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
if errors.As(err, &mb.PartialMetricsError{}) {
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
} else {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
msw.handleFetchError(err, reporter.V2())
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
}
Expand Down Expand Up @@ -311,6 +310,31 @@ func (msw *metricSetWrapper) Test(d testing.Driver) {
})
}

func (msw *metricSetWrapper) handleFetchError(err error, reporter mb.PushReporterV2) {
switch {
case err == nil:
msw.stats.consecutiveFailures.Set(0)
msw.module.UpdateStatus(status.Running, "")

case errors.As(err, &mb.PartialMetricsError{}):
reporter.Error(err)
msw.stats.consecutiveFailures.Set(0)
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)

default:
reporter.Error(err)
msw.stats.consecutiveFailures.Inc()
if msw.failureThreshold > 0 && msw.stats.consecutiveFailures != nil && uint(msw.stats.consecutiveFailures.Get()) >= msw.failureThreshold {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)

}
}

type reporter interface {
StartFetchTimer()
V1() mb.PushReporter //nolint:staticcheck // PushReporter is deprecated but not removed
Expand Down Expand Up @@ -437,11 +461,12 @@ func getMetricSetStats(module, name string) *stats {

reg := monitoring.Default.NewRegistry(key)
s := &stats{
key: key,
ref: 1,
success: monitoring.NewInt(reg, successesKey),
failures: monitoring.NewInt(reg, failuresKey),
events: monitoring.NewInt(reg, eventsKey),
key: key,
ref: 1,
success: monitoring.NewInt(reg, successesKey),
failures: monitoring.NewInt(reg, failuresKey),
events: monitoring.NewInt(reg, eventsKey),
consecutiveFailures: monitoring.NewUint(reg, consecutiveFailuresKey),
}

fetches[key] = s
Expand Down
Loading