diff --git a/filebeat/input/redis/harvester.go b/filebeat/input/redis/harvester.go index 67854bc04b1..b250ea13193 100644 --- a/filebeat/input/redis/harvester.go +++ b/filebeat/input/redis/harvester.go @@ -72,6 +72,33 @@ func NewHarvester(conn rd.Conn) (*Harvester, error) { }, nil } +// Expected response +// +// 1) "master" +// 2) (integer) 100 +// 3) 1) 1) "10.0.0.2" +// 2) "6379" +// 3) "100" +// 2) 1) "10.0.0.3" +// 2) "6379" +// 3) "100" +// +// OR +// +// 1) "slave" +// 2) "10.0.0.1" +// 3) (integer) 6379 +// 4) "connected" +// 5) (integer) 100 + +func (h *Harvester) parseReplicationRole(reply []interface{}) (string, error) { + role, ok := reply[0].([]byte) + if !ok { + return "", fmt.Errorf("unexpected type for role response: %T", reply[0]) + } + return string(role), nil +} + // Run starts a new redis harvester func (h *Harvester) Run() error { defer h.conn.Close() @@ -108,10 +135,14 @@ func (h *Harvester) Run() error { } // Read reply from ROLE - role, err := h.conn.Receive() + roleReply, err := rd.Values(h.conn.Receive()) if err != nil { return fmt.Errorf("error receiving replication role: %w", err) } + role, err := h.parseReplicationRole(roleReply) + if err != nil { + return fmt.Errorf("error parsing replication role: %w", err) + } for _, item := range logs { // Stopping here means some of the slowlog events are lost! diff --git a/filebeat/input/redis/redis_integration_test.go b/filebeat/input/redis/redis_integration_test.go index 1dbf7757b66..f47ddc4ff84 100644 --- a/filebeat/input/redis/redis_integration_test.go +++ b/filebeat/input/redis/redis_integration_test.go @@ -141,6 +141,10 @@ func TestInput(t *testing.T) { val, err := event.GetValue("message") require.NoError(t, err) require.Equal(t, message, val) + val, err = event.GetValue("redis") + require.NoError(t, err) + role := val.(mapstr.M)["slowlog"].(mapstr.M)["role"] + require.Equal(t, "master", role) case <-time.After(30 * time.Second): t.Fatal("Timeout waiting for event") }