Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions metricbeat/module/redis/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ import (

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"
)

var (
debugf = logp.MakeDebug("redis-info")
)

func init() {
mb.Registry.MustAddMetricSet("redis", "info", New,
mb.WithHostParser(parse.PassThruHostParser),
Expand All @@ -54,12 +49,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch fetches metrics from Redis by issuing the INFO command.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// Fetch default INFO.
info, err := redis.FetchRedisInfo("default", m.Connection())
if err != nil {
logp.Err("Failed to fetch redis info: %s", err)
return
return errors.Wrap(err, "failed to fetch redis info")
}

// In 5.0 some fields are renamed, maintain both names, old ones will be deprecated
Expand All @@ -79,11 +73,12 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {

slowLogLength, err := redis.FetchSlowLogLength(m.Connection())
if err != nil {
logp.Err("Failed to fetch slow log length: %s", err)
return
return errors.Wrap(err, "failed to fetch slow log length")

}
info["slowlog_len"] = strconv.FormatInt(slowLogLength, 10)

debugf("Redis INFO from %s: %+v", m.Host(), info)
m.Logger().Debugf("Redis INFO from %s: %+v", m.Host(), info)
eventMapping(r, info)
return nil
}
8 changes: 4 additions & 4 deletions metricbeat/module/redis/info/info_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ var redisHost = redis.GetRedisEnvHost() + ":" + redis.GetRedisEnvPort()
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "redis")

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
events, err := mbtest.ReportingFetchV2(ms)
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, err := mbtest.ReportingFetchV2Error(ms)
if err != nil {
t.Fatal("fetch", err)
}
Expand All @@ -56,8 +56,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "redis")

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand Down
28 changes: 16 additions & 12 deletions metricbeat/module/redis/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
package key

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"
)

var (
debugf = logp.MakeDebug("redis-key")
)

func init() {
mb.Registry.MustAddMetricSet("redis", "key", New,
mb.WithHostParser(parse.PassThruHostParser),
Expand Down Expand Up @@ -71,35 +68,42 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch fetches information from Redis keys
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
conn := m.Connection()
for _, p := range m.patterns {
if err := redis.Select(conn, p.Keyspace); err != nil {
logp.Err("Failed to select keyspace %d: %s", p.Keyspace, err)
msg := errors.Wrapf(err, "Failed to select keyspace %d", p.Keyspace)
m.Logger().Error(msg)
r.Error(err)
continue
}

keys, err := redis.FetchKeys(conn, p.Pattern, p.Limit)
if err != nil {
logp.Err("Failed to list keys in keyspace %d with pattern '%s': %s", p.Keyspace, p.Pattern, err)
msg := errors.Wrapf(err, "Failed to list keys in keyspace %d with pattern '%s'", p.Keyspace, p.Pattern)
m.Logger().Error(msg)
r.Error(err)
continue
}
if p.Limit > 0 && len(keys) > int(p.Limit) {
debugf("Collecting stats for %d keys, but there are more available for pattern '%s' in keyspace %d", p.Limit)
m.Logger().Debugf("Collecting stats for %d keys, but there are more available for pattern '%s' in keyspace %d", p.Limit)
keys = keys[:p.Limit]
}

for _, key := range keys {
keyInfo, err := redis.FetchKeyInfo(conn, key)
if err != nil {
logp.Err("Failed to fetch key info for key %s in keyspace %d", key, p.Keyspace)
msg := fmt.Errorf("Failed to fetch key info for key %s in keyspace %d", key, p.Keyspace)
m.Logger().Error(msg)
r.Error(err)
continue
}
event := eventMapping(p.Keyspace, keyInfo)
if !r.Event(event) {
debugf("Failed to report event, interrupting Fetch")
return
return errors.New("metricset has closed")
}
}
}

return nil
}
8 changes: 4 additions & 4 deletions metricbeat/module/redis/key/key_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestFetch(t *testing.T) {

addEntry(t)

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
events, err := mbtest.ReportingFetchV2(ms)
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, err := mbtest.ReportingFetchV2Error(ms)
if err != nil {
t.Fatal("fetch", err)
}
Expand All @@ -52,8 +52,8 @@ func TestData(t *testing.T) {

addEntry(t)

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand Down
5 changes: 1 addition & 4 deletions metricbeat/module/redis/keyspace/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ func eventsMapping(r mb.ReporterV2, info map[string]string) {
event := mb.Event{
MetricSetFields: space,
}
if !r.Event(event) {
debugf("Failed to report event, interrupting Fetch")
return
}
r.Event(event)
}
}

Expand Down
13 changes: 4 additions & 9 deletions metricbeat/module/redis/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ package keyspace
import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"
)

var (
debugf = logp.MakeDebug("redis-keyspace")
)

func init() {
mb.Registry.MustAddMetricSet("redis", "keyspace", New,
mb.WithHostParser(parse.PassThruHostParser),
Expand All @@ -52,14 +47,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch fetches metrics from Redis by issuing the INFO command.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// Fetch default INFO.
info, err := redis.FetchRedisInfo("keyspace", m.Connection())
if err != nil {
logp.Err("Failed to fetch redis info for keyspaces: %s", err)
return
return errors.Wrap(err, "Failed to fetch redis info for keyspaces")
}

debugf("Redis INFO from %s: %+v", m.Host(), info)
m.Logger().Debugf("Redis INFO from %s: %+v", m.Host(), info)
eventsMapping(r, info)
return nil
}
8 changes: 4 additions & 4 deletions metricbeat/module/redis/keyspace/keyspace_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestFetch(t *testing.T) {
addEntry(t)

// Fetch data
ms := mbtest.NewReportingMetricSetV2(t, getConfig())
events, err := mbtest.ReportingFetchV2(ms)
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, err := mbtest.ReportingFetchV2Error(ms)
if err != nil {
t.Fatal("fetch", err)
}
Expand All @@ -63,8 +63,8 @@ func TestData(t *testing.T) {

addEntry(t)

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand Down