Skip to content
7 changes: 5 additions & 2 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,18 @@ func mustImplementFetcher(ms MetricSet) error {
ifcs = append(ifcs, "ReportingMetricSetV2")
}

if _, ok := ms.(ReportingMetricSetV2Error); ok {
ifcs = append(ifcs, "ReportingMetricSetV2Error")
}

if _, ok := ms.(PushMetricSetV2); ok {
ifcs = append(ifcs, "PushMetricSetV2")
}

switch len(ifcs) {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
"producing interface (EventFetcher, EventsFetcher, "+
"ReportingMetricSet, ReportingMetricSetV2, PushMetricSet, or "+
"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, PushMetricSet, or "+
"PushMetricSetV2)",
ms.Module().Name(), ms.Name())
case 1:
Expand Down
7 changes: 7 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ type ReportingMetricSetV2 interface {
Fetch(r ReporterV2)
}

// ReportingMetricSetV2Error is a MetricSet that reports events or errors through the
// ReporterV2 interface. Fetch is called periodically to collect events.
type ReportingMetricSetV2Error interface {
MetricSet
Fetch(r ReporterV2) error
}

// PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
Expand Down
9 changes: 8 additions & 1 deletion metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
case mb.PushMetricSetV2:
ms.Run(reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2:
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error:
msw.startPeriodicFetching(reporter)
default:
// Earlier startup stages prevent this from happening.
Expand Down Expand Up @@ -236,6 +236,13 @@ func (msw *metricSetWrapper) fetch(reporter reporter) {
case mb.ReportingMetricSetV2:
reporter.StartFetchTimer()
fetcher.Fetch(reporter.V2())
case mb.ReportingMetricSetV2Error:
reporter.StartFetchTimer()
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
logp.Info("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
}
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
}
Expand Down
27 changes: 26 additions & 1 deletion metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ func WriteEventsReporterV2(f mb.ReportingMetricSetV2, t testing.TB, path string)
return WriteEventsReporterV2Cond(f, t, path, nil)
}

// WriteEventsReporterV2Error fetches events and writes the first event to a ./_meta/data.json
// file.
func WriteEventsReporterV2Error(f mb.ReportingMetricSetV2Error, t testing.TB, path string) error {
return WriteEventsReporterV2ErrorCond(f, t, path, nil)
}

// WriteEventsReporterV2Cond fetches events and writes the first event that matches
// the condition to a file.
func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path string, cond func(common.MapStr) bool) error {
Expand All @@ -105,6 +111,25 @@ func WriteEventsReporterV2Cond(f mb.ReportingMetricSetV2, t testing.TB, path str
return errs[0]
}

return writeEvent(events, f, t, path, cond)
}

// WriteEventsReporterV2ErrorCond fetches events and writes the first event that matches
// the condition to a file.
func WriteEventsReporterV2ErrorCond(f mb.ReportingMetricSetV2Error, t testing.TB, path string, cond func(common.MapStr) bool) error {
if !*dataFlag {
t.Skip("skip data generation tests")
}

events, errs := ReportingFetchV2Error(f)
if len(errs) > 0 {
return errs[0]
}

return writeEvent(events, f, t, path, cond)
}

func writeEvent(events []mb.Event, f mb.MetricSet, t testing.TB, path string, cond func(common.MapStr) bool) error {
if len(events) == 0 {
return fmt.Errorf("no events were generated")
}
Expand Down Expand Up @@ -198,7 +223,7 @@ func SelectEvent(events []common.MapStr, cond func(e common.MapStr) bool) (commo
}

// SelectEventV2 selects the first event that matches an specific condition
func SelectEventV2(f mb.ReportingMetricSetV2, events []mb.Event, cond func(e common.MapStr) bool) (mb.Event, error) {
func SelectEventV2(f mb.MetricSet, events []mb.Event, cond func(e common.MapStr) bool) (mb.Event, error) {
if cond == nil && len(events) > 0 {
return events[0], nil
}
Expand Down
17 changes: 15 additions & 2 deletions metricbeat/mb/testing/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,21 @@ func runTest(t *testing.T, file string, module, metricSetName, url string) {
s := server(t, file, url)
defer s.Close()

metricSet := NewReportingMetricSetV2(t, getConfig(module, metricSetName, s.URL))
events, errs := ReportingFetchV2(metricSet)
metricSet := newMetricSet(t, getConfig(module, metricSetName, s.URL))

var events []mb.Event
var errs []error

switch v := metricSet.(type) {
case mb.ReportingMetricSetV2:
metricSet := NewReportingMetricSetV2(t, getConfig(module, metricSetName, s.URL))
events, errs = ReportingFetchV2(metricSet)
case mb.ReportingMetricSetV2Error:
metricSet := NewReportingMetricSetV2Error(t, getConfig(module, metricSetName, s.URL))
events, errs = ReportingFetchV2Error(metricSet)
default:
t.Fatalf("unknown type: %T", v)
}

// Gather errors to build also error events
for _, e := range errs {
Expand Down
24 changes: 24 additions & 0 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetri
return reportingMetricSetV2
}

// NewReportingMetricSetV2Error returns a new ReportingMetricSetV2 instance. Then
// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet.
func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.ReportingMetricSetV2Error {
metricSet := newMetricSet(t, config)

reportingMetricSetV2Error, ok := metricSet.(mb.ReportingMetricSetV2Error)
if !ok {
t.Fatal("MetricSet does not implement ReportingMetricSetV2Error")
}

return reportingMetricSetV2Error
}

// CapturingReporterV2 is a reporter used for testing which stores all events and errors
type CapturingReporterV2 struct {
events []mb.Event
Expand Down Expand Up @@ -204,6 +217,17 @@ func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) {
return r.events, r.errs
}

// ReportingFetchV2Error runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, []error) {
r := &CapturingReporterV2{}
err := metricSet.Fetch(r)
if err != nil {
r.errs = append(r.errs, err)
}
return r.events, r.errs
}

// NewPushMetricSet instantiates a new PushMetricSet using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
Expand Down
6 changes: 3 additions & 3 deletions metricbeat/module/php_fpm/process/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@ type phpFpmProcess struct {
LastRequestMemory int `json:"last request memory"`
}

func eventsMapping(r mb.ReporterV2, content []byte) {
func eventsMapping(r mb.ReporterV2, content []byte) error {
var status phpFpmStatus
err := json.Unmarshal(content, &status)
if err != nil {
r.Error(err)
return
return err
}
//remapping process details to match the naming format
for _, process := range status.Processes {
Expand Down Expand Up @@ -94,4 +93,5 @@ func eventsMapping(r mb.ReporterV2, content []byte) {
event.ModuleFields.Put("pool.name", status.Name)
r.Event(event)
}
return nil
}
10 changes: 4 additions & 6 deletions metricbeat/module/php_fpm/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) {
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
u, err := url.Parse(m.GetURI())
if err != nil {
report.Error(err)
return
return err
}
u, err = parse.SetQueryParams(u, "full")
if err == nil {
m.SetURI(u.String())
}
content, err := m.HTTP.FetchContent()
if err != nil {
report.Error(err)
return
return err
}
eventsMapping(report, content)
return eventsMapping(report, content)
}
4 changes: 2 additions & 2 deletions metricbeat/module/php_fpm/process/process_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "phpfpm")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)

assert.Empty(t, errs)
if !assert.NotEmpty(t, events) {
Expand Down