diff --git a/metricbeat/module/ceph/pool_disk/_meta/data.json b/metricbeat/module/ceph/pool_disk/_meta/data.json index 8d0d0ba019d1..e601c9f5c67d 100644 --- a/metricbeat/module/ceph/pool_disk/_meta/data.json +++ b/metricbeat/module/ceph/pool_disk/_meta/data.json @@ -1,9 +1,5 @@ { "@timestamp": "2017-10-12T08:05:34.853Z", - "agent": { - "hostname": "host.example.com", - "name": "host.example.com" - }, "ceph": { "pool_disk": { "id": 0, diff --git a/metricbeat/module/ceph/pool_disk/pool_disk.go b/metricbeat/module/ceph/pool_disk/pool_disk.go index 9a2af58f1fe1..761c25887fac 100644 --- a/metricbeat/module/ceph/pool_disk/pool_disk.go +++ b/metricbeat/module/ceph/pool_disk/pool_disk.go @@ -18,7 +18,6 @@ package pool_disk import ( - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -61,12 +60,21 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { }, nil } -func (m *MetricSet) Fetch() ([]common.MapStr, error) { +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) { content, err := m.HTTP.FetchContent() - if err != nil { - return nil, err + m.Logger().Error(err) + reporter.Error(err) + return + } + + events := eventsMapping(content) + for _, event := range events { + reporter.Event(mb.Event{MetricSetFields: event}) } - return eventsMapping(content), nil + return } diff --git a/metricbeat/module/ceph/pool_disk/pool_disk_integration_test.go b/metricbeat/module/ceph/pool_disk/pool_disk_integration_test.go index b15b73e56c5b..d4a0dd3a9bba 100644 --- a/metricbeat/module/ceph/pool_disk/pool_disk_integration_test.go +++ b/metricbeat/module/ceph/pool_disk/pool_disk_integration_test.go @@ -26,9 +26,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewEventsFetcher(t, getConfig()) - err := mbtest.WriteEvents(f, t) - if err != nil { + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + + if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/pool_disk/pool_disk_test.go b/metricbeat/module/ceph/pool_disk/pool_disk_test.go index a64a373e72ba..f5e8a3b9306f 100644 --- a/metricbeat/module/ceph/pool_disk/pool_disk_test.go +++ b/metricbeat/module/ceph/pool_disk/pool_disk_test.go @@ -32,6 +32,7 @@ import ( func TestFetchEventContents(t *testing.T) { absPath, err := filepath.Abs("../_meta/testdata/") + assert.NoError(t, err) response, err := ioutil.ReadFile(absPath + "/df_sample_response.json") server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -47,12 +48,13 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewEventsFetcher(t, config) - events, err := f.Fetch() - event := events[0] - if !assert.NoError(t, err) { - t.FailNow() + f := mbtest.NewReportingMetricSetV2(t, config) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } + assert.NotEmpty(t, events) + event := events[0].MetricSetFields t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint())