From c116c6db518319ae0195c64cfc69af017f6011ff Mon Sep 17 00:00:00 2001 From: Joseph Heyburn Date: Mon, 6 Jan 2025 18:16:09 +0000 Subject: [PATCH 1/2] Correctly parse Redis role values in connection receive --- filebeat/input/redis/harvester.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/redis/harvester.go b/filebeat/input/redis/harvester.go index 67854bc04b11..0abfab3485e7 100644 --- a/filebeat/input/redis/harvester.go +++ b/filebeat/input/redis/harvester.go @@ -108,7 +108,7 @@ func (h *Harvester) Run() error { } // Read reply from ROLE - role, err := h.conn.Receive() + role, err := rd.Values(h.conn.Receive()) if err != nil { return fmt.Errorf("error receiving replication role: %w", err) } From cd5bf043adf35b82f3c2c26879f5d5795addec29 Mon Sep 17 00:00:00 2001 From: Joseph Heyburn Date: Fri, 28 Feb 2025 13:34:36 +0000 Subject: [PATCH 2/2] Correctly parse Redis role values in connection receive and add int test --- filebeat/input/redis/harvester.go | 33 ++++++++++++++++++- .../input/redis/redis_integration_test.go | 4 +++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/filebeat/input/redis/harvester.go b/filebeat/input/redis/harvester.go index 0abfab3485e7..b250ea13193d 100644 --- a/filebeat/input/redis/harvester.go +++ b/filebeat/input/redis/harvester.go @@ -72,6 +72,33 @@ func NewHarvester(conn rd.Conn) (*Harvester, error) { }, nil } +// Expected response +// +// 1) "master" +// 2) (integer) 100 +// 3) 1) 1) "10.0.0.2" +// 2) "6379" +// 3) "100" +// 2) 1) "10.0.0.3" +// 2) "6379" +// 3) "100" +// +// OR +// +// 1) "slave" +// 2) "10.0.0.1" +// 3) (integer) 6379 +// 4) "connected" +// 5) (integer) 100 + +func (h *Harvester) parseReplicationRole(reply []interface{}) (string, error) { + role, ok := reply[0].([]byte) + if !ok { + return "", fmt.Errorf("unexpected type for role response: %T", reply[0]) + } + return string(role), nil +} + // Run starts a new redis harvester func (h *Harvester) Run() error { defer h.conn.Close() @@ -108,10 +135,14 @@ func (h *Harvester) Run() error { } // Read reply from ROLE - role, err := rd.Values(h.conn.Receive()) + roleReply, err := rd.Values(h.conn.Receive()) if err != nil { return fmt.Errorf("error receiving replication role: %w", err) } + role, err := h.parseReplicationRole(roleReply) + if err != nil { + return fmt.Errorf("error parsing replication role: %w", err) + } for _, item := range logs { // Stopping here means some of the slowlog events are lost! diff --git a/filebeat/input/redis/redis_integration_test.go b/filebeat/input/redis/redis_integration_test.go index 1dbf7757b667..f47ddc4ff84a 100644 --- a/filebeat/input/redis/redis_integration_test.go +++ b/filebeat/input/redis/redis_integration_test.go @@ -141,6 +141,10 @@ func TestInput(t *testing.T) { val, err := event.GetValue("message") require.NoError(t, err) require.Equal(t, message, val) + val, err = event.GetValue("redis") + require.NoError(t, err) + role := val.(mapstr.M)["slowlog"].(mapstr.M)["role"] + require.Equal(t, "master", role) case <-time.After(30 * time.Second): t.Fatal("Timeout waiting for event") }