diff --git a/metricbeat/module/kvm/dommemstat/dommemstat.go b/metricbeat/module/kvm/dommemstat/dommemstat.go index 0d03096c4681..b198719e8477 100644 --- a/metricbeat/module/kvm/dommemstat/dommemstat.go +++ b/metricbeat/module/kvm/dommemstat/dommemstat.go @@ -82,8 +82,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right // format. It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(report mb.ReporterV2) { - +func (m *MetricSet) Fetch(report mb.ReporterV2) error { var ( c net.Conn err error @@ -102,8 +101,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) { c, err = net.DialTimeout(u.Scheme, address, m.Timeout) if err != nil { - report.Error(errors.Wrapf(err, "cannot connect to %v", u)) - return + return errors.Wrapf(err, "cannot connect to %v", u) } } @@ -111,35 +109,39 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) { l := libvirt.New(c) if err = l.Connect(); err != nil { - report.Error(errors.Wrap(err, "error connecting to libvirtd")) - return + return errors.Wrap(err, "error connecting to libvirtd") } defer func() { if err = l.Disconnect(); err != nil { - report.Error(errors.Wrap(err, "failed to disconnect")) + msg := errors.Wrap(err, "failed to disconnect") + report.Error(msg) + m.Logger().Error(msg) } }() domains, err := l.Domains() if err != nil { - report.Error(errors.Wrap(err, "error listing domains")) - return + return errors.Wrap(err, "error listing domains") } for _, d := range domains { gotDomainMemoryStats, err := l.DomainMemoryStats(d, maximumStats, flags) if err != nil { - report.Error(errors.Wrapf(err, "error fetching memory stats for domain %s", d.Name)) + msg := errors.Wrapf(err, "error fetching memory stats for domain %s", d.Name) + report.Error(msg) + m.Logger().Error(msg) continue } if len(gotDomainMemoryStats) == 0 { - report.Error(errors.Errorf("no memory stats for domain %s", d.Name)) + msg := errors.Errorf("no memory stats for domain %s", d.Name) + report.Error(msg) + m.Logger().Error(msg) continue } for i := range gotDomainMemoryStats { - report.Event(mb.Event{ + reported := report.Event(mb.Event{ MetricSetFields: common.MapStr{ "id": d.ID, "name": d.Name, @@ -149,8 +151,13 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) { }, }, }) + if !reported { + return errors.New("metricset has closed") + } } } + + return nil } func getDomainMemoryStatName(tag int32) string { diff --git a/metricbeat/module/kvm/dommemstat/dommemstat_test.go b/metricbeat/module/kvm/dommemstat/dommemstat_test.go index dc3009dca7a3..3aa4c0ff6377 100644 --- a/metricbeat/module/kvm/dommemstat/dommemstat_test.go +++ b/metricbeat/module/kvm/dommemstat/dommemstat_test.go @@ -30,9 +30,9 @@ import ( func TestFetchEventContents(t *testing.T) { conn := libvirttest.New() - f := mbtest.NewReportingMetricSetV2(t, getConfig(conn)) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(conn)) - events, errs := mbtest.ReportingFetchV2(f) + events, errs := mbtest.ReportingFetchV2Error(f) if len(errs) > 0 { t.Fatal(errs) }