From 9d410040c939099445cb2ff124938a0c95acd798 Mon Sep 17 00:00:00 2001 From: sayden Date: Thu, 28 Feb 2019 15:58:32 +0100 Subject: [PATCH 1/8] Atomic commit --- .../module/ceph/cluster_disk/cluster_disk.go | 17 +++++++--- .../cluster_disk_integration_test.go | 33 +++++++++++++++++-- .../ceph/cluster_disk/cluster_disk_test.go | 9 +++-- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk.go b/metricbeat/module/ceph/cluster_disk/cluster_disk.go index cc179e714409..6c0245884b8e 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk.go @@ -18,7 +18,7 @@ package cluster_disk import ( - "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -34,6 +34,8 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() + + logger = logp.NewLogger("ceph.cluster_disk") ) func init() { @@ -61,12 +63,19 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { }, nil } -func (m *MetricSet) Fetch() (common.MapStr, error) { +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) { content, err := m.HTTP.FetchContent() if err != nil { - return nil, err + logger.Error(err) + reporter.Error(err) + return } - return eventMapping(content), nil + reporter.Event(mb.Event{MetricSetFields: eventMapping(content)}) + + return } diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go index 29253ee93830..55d88a5c29cc 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// +build integration + package cluster_disk import ( @@ -22,17 +24,42 @@ import ( "os" "testing" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/tests/compose" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" ) func TestData(t *testing.T) { - f := mbtest.NewEventFetcher(t, getConfig()) - err := mbtest.WriteEvent(f, t) - if err != nil { + compose.EnsureUp(t, "ceph") + + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + + if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { t.Fatal("write", err) } } +func TestFetch(t *testing.T) { + compose.EnsureUp(t, "ceph") + + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + event := events[0].MetricSetFields + + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) +} + func getConfig() map[string]interface{} { return map[string]interface{}{ "module": "ceph", diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go index 3aef094ad2a5..da5cbd5ba327 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go @@ -50,8 +50,13 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewEventFetcher(t, config) - event, err := f.Fetch() + f := mbtest.NewReportingMetricSetV2(t, config) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + event := events[0].MetricSetFields t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint()) From 58f6664ec1fae74b810b1d80af1bad5c7c141ab9 Mon Sep 17 00:00:00 2001 From: sayden Date: Fri, 1 Mar 2019 13:33:20 +0100 Subject: [PATCH 2/8] Revert tests addition --- .../cluster_disk_integration_test.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go index 55d88a5c29cc..01955e0f08ec 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go @@ -26,14 +26,10 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/libbeat/tests/compose" - mbtest "github.com/elastic/beats/metricbeat/mb/testing" ) func TestData(t *testing.T) { - compose.EnsureUp(t, "ceph") - f := mbtest.NewReportingMetricSetV2(t, getConfig()) events, errs := mbtest.ReportingFetchV2(f) if len(errs) > 0 { @@ -46,20 +42,6 @@ func TestData(t *testing.T) { } } -func TestFetch(t *testing.T) { - compose.EnsureUp(t, "ceph") - - f := mbtest.NewReportingMetricSetV2(t, getConfig()) - events, errs := mbtest.ReportingFetchV2(f) - if len(errs) > 0 { - t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) - } - assert.NotEmpty(t, events) - event := events[0].MetricSetFields - - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) -} - func getConfig() map[string]interface{} { return map[string]interface{}{ "module": "ceph", From bddce46d387c8d422aace5239ed1ac154c51a47a Mon Sep 17 00:00:00 2001 From: sayden Date: Tue, 5 Mar 2019 10:12:57 +0100 Subject: [PATCH 3/8] Skip data generation test --- .../module/ceph/cluster_disk/cluster_disk_integration_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go index 01955e0f08ec..d8a57857474a 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go @@ -30,6 +30,8 @@ import ( ) func TestData(t *testing.T) { + t.Skip("Skipping data generation test") + f := mbtest.NewReportingMetricSetV2(t, getConfig()) events, errs := mbtest.ReportingFetchV2(f) if len(errs) > 0 { From f5766ad7c646aa620646a299150df949048c0fe8 Mon Sep 17 00:00:00 2001 From: sayden Date: Tue, 5 Mar 2019 11:08:26 +0100 Subject: [PATCH 4/8] make fmt --- .../module/ceph/cluster_disk/cluster_disk_integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go index d8a57857474a..63f2230873cc 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go @@ -31,7 +31,7 @@ import ( func TestData(t *testing.T) { t.Skip("Skipping data generation test") - + f := mbtest.NewReportingMetricSetV2(t, getConfig()) events, errs := mbtest.ReportingFetchV2(f) if len(errs) > 0 { From f2d60143f0d8a350f36a52e5911158f566079f8e Mon Sep 17 00:00:00 2001 From: sayden Date: Wed, 6 Mar 2019 15:48:55 +0100 Subject: [PATCH 5/8] Remove unnecessary lines from TestData --- .../ceph/cluster_disk/cluster_disk_integration_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go index 63f2230873cc..e00a55cd5509 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go @@ -24,20 +24,11 @@ import ( "os" "testing" - "github.com/stretchr/testify/assert" - mbtest "github.com/elastic/beats/metricbeat/mb/testing" ) func TestData(t *testing.T) { - t.Skip("Skipping data generation test") - f := mbtest.NewReportingMetricSetV2(t, getConfig()) - events, errs := mbtest.ReportingFetchV2(f) - if len(errs) > 0 { - t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) - } - assert.NotEmpty(t, events) if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { t.Fatal("write", err) From c1288a1cc1ec84e35121dc4acb504bda46cce40f Mon Sep 17 00:00:00 2001 From: sayden Date: Wed, 6 Mar 2019 15:55:13 +0100 Subject: [PATCH 6/8] Rebase and use new logger --- metricbeat/module/ceph/cluster_disk/cluster_disk.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk.go b/metricbeat/module/ceph/cluster_disk/cluster_disk.go index 6c0245884b8e..32ba579094e8 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk.go @@ -18,7 +18,6 @@ package cluster_disk import ( - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -34,8 +33,6 @@ var ( DefaultScheme: defaultScheme, DefaultPath: defaultPath, }.Build() - - logger = logp.NewLogger("ceph.cluster_disk") ) func init() { @@ -70,7 +67,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { content, err := m.HTTP.FetchContent() if err != nil { - logger.Error(err) + m.Logger().Error(err) reporter.Error(err) return } From a43fd2dad31d2c890b86f2dae921293307f24ed1 Mon Sep 17 00:00:00 2001 From: sayden Date: Wed, 6 Mar 2019 16:20:22 +0100 Subject: [PATCH 7/8] Slight improve in eventMapping function --- metricbeat/module/ceph/cluster_disk/cluster_disk.go | 9 ++++++++- metricbeat/module/ceph/cluster_disk/data.go | 9 +++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk.go b/metricbeat/module/ceph/cluster_disk/cluster_disk.go index 32ba579094e8..65d6c894fd80 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk.go @@ -72,7 +72,14 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } - reporter.Event(mb.Event{MetricSetFields: eventMapping(content)}) + event, err := eventMapping(content) + if err != nil { + m.Logger().Error(err) + reporter.Error(err) + return + } + + reporter.Event(mb.Event{MetricSetFields: event}) return } diff --git a/metricbeat/module/ceph/cluster_disk/data.go b/metricbeat/module/ceph/cluster_disk/data.go index 7a2c600cc539..259d712f8c9e 100644 --- a/metricbeat/module/ceph/cluster_disk/data.go +++ b/metricbeat/module/ceph/cluster_disk/data.go @@ -20,8 +20,9 @@ package cluster_disk import ( "encoding/json" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" ) type StatsCluster struct { @@ -39,11 +40,11 @@ type DfRequest struct { Output Output `json:"output"` } -func eventMapping(content []byte) common.MapStr { +func eventMapping(content []byte) (common.MapStr, error) { var d DfRequest err := json.Unmarshal(content, &d) if err != nil { - logp.Err("Error: %+v", err) + return nil, errors.Wrap(err, "could not get DFRequest data") } return common.MapStr{ @@ -56,5 +57,5 @@ func eventMapping(content []byte) common.MapStr { "available": common.MapStr{ "bytes": d.Output.StatsCluster.TotalAvailBytes, }, - } + }, nil } From 11031ceb960bea71ad571979be86bb0e1ac52857 Mon Sep 17 00:00:00 2001 From: sayden Date: Thu, 7 Mar 2019 11:02:10 +0100 Subject: [PATCH 8/8] Add reporting interface with error --- .../module/ceph/cluster_disk/cluster_disk.go | 17 +++++++---------- .../cluster_disk_integration_test.go | 4 ++-- .../ceph/cluster_disk/cluster_disk_test.go | 4 ++-- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk.go b/metricbeat/module/ceph/cluster_disk/cluster_disk.go index 65d6c894fd80..37f68f37a6cb 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk.go @@ -63,23 +63,20 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(reporter mb.ReporterV2) { +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { content, err := m.HTTP.FetchContent() - if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return err } event, err := eventMapping(content) if err != nil { - m.Logger().Error(err) - reporter.Error(err) - return + return err } - reporter.Event(mb.Event{MetricSetFields: event}) + if reported := reporter.Event(mb.Event{MetricSetFields: event}); !reported { + m.Logger().Debug("error reporting event") + } - return + return nil } diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go index e00a55cd5509..8b815dca0f7b 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_integration_test.go @@ -28,9 +28,9 @@ import ( ) func TestData(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig()) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) - if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { + if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil { t.Fatal("write", err) } } diff --git a/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go b/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go index da5cbd5ba327..b073f4be6b67 100644 --- a/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go +++ b/metricbeat/module/ceph/cluster_disk/cluster_disk_test.go @@ -50,8 +50,8 @@ func TestFetchEventContents(t *testing.T) { "hosts": []string{server.URL}, } - f := mbtest.NewReportingMetricSetV2(t, config) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) if len(errs) > 0 { t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) }