Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Journald: removed configuration options `include_matches.or`, `include_matches.and`, `backoff`, `max_backoff`, `cursor_seek_fallback`. {pull}40061[40061]
- Journald: `include_matches.match` now behaves in the same way as matchers in `journalctl`. Users should carefully update their input configuration. {pull}40061[40061]
- Journald: `seek` and `since` behaviour have been simplified, if there is a cursor (state) `seek` and `since` are ignored and the cursor is used. {pull}40061[40061]
- Redis: Added replication role as a field to submitted slowlogs
- Added `container.image.name` to `journald` Filebeat input's Docker-specific translated fields. {pull}40450[40450]


Expand Down
18 changes: 14 additions & 4 deletions filebeat/input/redis/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,38 @@ func (h *Harvester) Run() error {
return nil
default:
}
// Writes Slowlog get and slowlog reset both to the buffer so they are executed together
// Writes Slowlog get, slowlog reset, and role to the buffer so they are executed together
if err := h.conn.Send("SLOWLOG", "GET"); err != nil {
return fmt.Errorf("error sending slowlog get: %w", err)
}
if err := h.conn.Send("SLOWLOG", "RESET"); err != nil {
return fmt.Errorf("error sending slowlog reset: %w", err)
}
if err := h.conn.Send("ROLE"); err != nil {
return fmt.Errorf("error sending role: %w", err)
}

// Flush the buffer to execute both commands and receive the reply from SLOWLOG GET
// Flush the buffer to execute all commands and receive the replies
h.conn.Flush()

// Receives first reply from redis which is the one from GET
// Receives first reply from redis which is the one from SLOWLOG GET
logs, err := rd.Values(h.conn.Receive())
if err != nil {
return fmt.Errorf("error receiving slowlog data: %w", err)
}

// Read reply from RESET
// Read reply from SLOWLOG RESET
_, err = h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving reset data: %w", err)
}

// Read reply from ROLE
role, err := h.conn.Receive()
if err != nil {
return fmt.Errorf("error receiving replication role: %w", err)
}

for _, item := range logs {
// Stopping here means some of the slowlog events are lost!
select {
Expand Down Expand Up @@ -146,6 +155,7 @@ func (h *Harvester) Run() error {
"duration": mapstr.M{
"us": log.duration,
},
"role": role,
}

if log.args != nil {
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def test_input(self):

assert output["input.type"] == "redis"
assert "redis.slowlog.cmd" in output
assert "redis.slowlog.role" in output

def get_host(self):
return os.getenv('REDIS_HOST', 'localhost')
Expand Down