diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go index db00b072eaca..deb2e43ec63f 100644 --- a/metricbeat/mb/builders.go +++ b/metricbeat/mb/builders.go @@ -237,15 +237,18 @@ func mustImplementFetcher(ms MetricSet) error { ifcs = append(ifcs, "ReportingMetricSetV2") } + if _, ok := ms.(ReportingMetricSetV2Error); ok { + ifcs = append(ifcs, "ReportingMetricSetV2Error") + } + if _, ok := ms.(PushMetricSetV2); ok { ifcs = append(ifcs, "PushMetricSetV2") } - switch len(ifcs) { case 0: return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+ "producing interface (EventFetcher, EventsFetcher, "+ - "ReportingMetricSet, ReportingMetricSetV2, PushMetricSet, or "+ + "ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, PushMetricSet, or "+ "PushMetricSetV2)", ms.Module().Name(), ms.Name()) case 1: diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 0d38a14e5b9c..e38655a6907f 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -201,6 +201,13 @@ type ReportingMetricSetV2 interface { Fetch(r ReporterV2) } +// ReportingMetricSetV2Error is a MetricSet that reports events or errors through the +// ReporterV2 interface. Fetch is called periodically to collect events. +type ReportingMetricSetV2Error interface { + MetricSet + Fetch(r ReporterV2) error +} + // PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them // periodically via a Fetch callback). Run is invoked to start the event // subscription and it should block until the MetricSet is ready to stop or diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index c835447e6534..245876cd655b 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -192,7 +192,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { case mb.PushMetricSetV2: ms.Run(reporter.V2()) case mb.EventFetcher, mb.EventsFetcher, - mb.ReportingMetricSet, mb.ReportingMetricSetV2: + mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error: msw.startPeriodicFetching(reporter) default: // Earlier startup stages prevent this from happening. @@ -236,6 +236,13 @@ func (msw *metricSetWrapper) fetch(reporter reporter) { case mb.ReportingMetricSetV2: reporter.StartFetchTimer() fetcher.Fetch(reporter.V2()) + case mb.ReportingMetricSetV2Error: + reporter.StartFetchTimer() + err := fetcher.Fetch(reporter.V2()) + if err != nil { + reporter.V2().Error(err) + logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) } diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index 7677b0dd05cb..1bdb6c69e5d4 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -93,6 +93,12 @@ func WriteEventsReporterV2(f mb.ReportingMetricSetV2, t testing.TB, path string) return WriteEventsReporterV2Cond(f, t, path, nil) } +// WriteEventsReporterV2Error fetches events and writes the first event to a ./_meta/data.json +// file. +func WriteEventsReporterV2Error(f mb.ReportingMetricSetV2Error, t testing.TB, path string) error { + return WriteEventsReporterV2ErrorCond(f, t, path, nil) +} + // WriteEventsReporterV2Cond fetches events and writes the first event that matches // the condition to a file. func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path string, cond func(common.MapStr) bool) error { @@ -105,6 +111,25 @@ func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path str return errs[0] } + return writeEvent(events, f, t, path, cond) +} + +// WriteEventsReporterV2ErrorCond fetches events and writes the first event that matches +// the condition to a file. +func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB, path string, cond func(common.MapStr) bool) error { + if !*dataFlag { + t.Skip("skip data generation tests") + } + + events, errs := ReportingFetchV2Error(f) + if len(errs) > 0 { + return errs[0] + } + + return writeEvent(events, f, t, path, cond) +} + +func writeEvent(events []mb.Event, f mb.MetricSet, t testing.TB, path string, cond func(common.MapStr) bool) error { if len(events) == 0 { return fmt.Errorf("no events were generated") } @@ -198,7 +223,7 @@ func SelectEvent(events []common.MapStr, cond func(e common.MapStr) bool) (commo } // SelectEventV2 selects the first event that matches an specific condition -func SelectEventV2(f mb.ReportingMetricSetV2, events []mb.Event, cond func(e common.MapStr) bool) (mb.Event, error) { +func SelectEventV2(f mb.MetricSet, events []mb.Event, cond func(e common.MapStr) bool) (mb.Event, error) { if cond == nil && len(events) > 0 { return events[0], nil } diff --git a/metricbeat/mb/testing/data_test.go b/metricbeat/mb/testing/data_test.go index aa0c3c6f18b9..803bfec6d3c3 100644 --- a/metricbeat/mb/testing/data_test.go +++ b/metricbeat/mb/testing/data_test.go @@ -106,8 +106,21 @@ func runTest(t *testing.T, file string, module, metricSetName, url string) { s := server(t, file, url) defer s.Close() - metricSet := NewReportingMetricSetV2(t, getConfig(module, metricSetName, s.URL)) - events, errs := ReportingFetchV2(metricSet) + metricSet := newMetricSet(t, getConfig(module, metricSetName, s.URL)) + + var events []mb.Event + var errs []error + + switch v := metricSet.(type) { + case mb.ReportingMetricSetV2: + metricSet := NewReportingMetricSetV2(t, getConfig(module, metricSetName, s.URL)) + events, errs = ReportingFetchV2(metricSet) + case mb.ReportingMetricSetV2Error: + metricSet := NewReportingMetricSetV2Error(t, getConfig(module, metricSetName, s.URL)) + events, errs = ReportingFetchV2Error(metricSet) + default: + t.Fatalf("unknown type: %T", v) + } // Gather errors to build also error events for _, e := range errs { diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 88d3ce78e0c1..234feee35687 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -168,6 +168,19 @@ func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetri return reportingMetricSetV2 } +// NewReportingMetricSetV2Error returns a new ReportingMetricSetV2 instance. Then +// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. +func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.ReportingMetricSetV2Error { + metricSet := newMetricSet(t, config) + + reportingMetricSetV2Error, ok := metricSet.(mb.ReportingMetricSetV2Error) + if !ok { + t.Fatal("MetricSet does not implement ReportingMetricSetV2Error") + } + + return reportingMetricSetV2Error +} + // CapturingReporterV2 is a reporter used for testing which stores all events and errors type CapturingReporterV2 struct { events []mb.Event @@ -204,6 +217,17 @@ func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) { return r.events, r.errs } +// ReportingFetchV2Error runs the given reporting metricset and returns all of the +// events and errors that occur during that period. +func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, []error) { + r := &CapturingReporterV2{} + err := metricSet.Fetch(r) + if err != nil { + r.errs = append(r.errs, err) + } + return r.events, r.errs +} + // NewPushMetricSet instantiates a new PushMetricSet using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. diff --git a/metricbeat/module/php_fpm/process/data.go b/metricbeat/module/php_fpm/process/data.go index bb8e6c5cacb1..705041b101b3 100644 --- a/metricbeat/module/php_fpm/process/data.go +++ b/metricbeat/module/php_fpm/process/data.go @@ -47,12 +47,11 @@ type phpFpmProcess struct { LastRequestMemory int `json:"last request memory"` } -func eventsMapping(r mb.ReporterV2, content []byte) { +func eventsMapping(r mb.ReporterV2, content []byte) error { var status phpFpmStatus err := json.Unmarshal(content, &status) if err != nil { - r.Error(err) - return + return err } //remapping process details to match the naming format for _, process := range status.Processes { @@ -94,4 +93,5 @@ func eventsMapping(r mb.ReporterV2, content []byte) { event.ModuleFields.Put("pool.name", status.Name) r.Event(event) } + return nil } diff --git a/metricbeat/module/php_fpm/process/process.go b/metricbeat/module/php_fpm/process/process.go index 6774d91b4f90..85dde711ae3e 100644 --- a/metricbeat/module/php_fpm/process/process.go +++ b/metricbeat/module/php_fpm/process/process.go @@ -60,11 +60,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(report mb.ReporterV2) { +func (m *MetricSet) Fetch(report mb.ReporterV2) error { u, err := url.Parse(m.GetURI()) if err != nil { - report.Error(err) - return + return err } u, err = parse.SetQueryParams(u, "full") if err == nil { @@ -72,8 +71,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) { } content, err := m.HTTP.FetchContent() if err != nil { - report.Error(err) - return + return err } - eventsMapping(report, content) + return eventsMapping(report, content) } diff --git a/metricbeat/module/php_fpm/process/process_integration_test.go b/metricbeat/module/php_fpm/process/process_integration_test.go index a523648c9adf..a5f7a1d8d640 100644 --- a/metricbeat/module/php_fpm/process/process_integration_test.go +++ b/metricbeat/module/php_fpm/process/process_integration_test.go @@ -32,8 +32,8 @@ import ( func TestFetch(t *testing.T) { compose.EnsureUp(t, "phpfpm") - f := mbtest.NewReportingMetricSetV2(t, getConfig()) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) + events, errs := mbtest.ReportingFetchV2Error(f) assert.Empty(t, errs) if !assert.NotEmpty(t, events) {