Skip to content

Commit a63cb6b

Browse files
author
Carlos Pérez-Aradros Herce
authored
Cherry-pick #12063 to 7.1: Fix memory leak in Filebeat pipeline acker (#12137)
* Fix memory leak in Filebeat pipeline acker (#12063) * Fix memory leak in Filebeat pipeline acker Before this change acker goroutine was kept forever as processed events count was not correctly updated. Filebeat sends an empty event to update file states, this event is not published, but treated as dropped, without updating counters. This change makes sures that `a.events` count gets updated for dropped events also, so the acker gets closed after all ACKs happen. (cherry picked from commit 9653105) * Update CHANGELOG.next.asciidoc
1 parent 88a567c commit a63cb6b

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

CHANGELOG.next.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ https://github.com/elastic/beats/compare/v7.0.0...7.1[Check the HEAD diff]
3939

4040
*Filebeat*
4141

42+
- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]
43+
4244
*Heartbeat*
4345

4446
*Journalbeat*

libbeat/publisher/pipeline/acker.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,19 +139,26 @@ func (a *gapCountACK) ackLoop() {
139139
case <-a.done:
140140
closing = true
141141
a.done = nil
142+
if a.events.Load() == 0 {
143+
// stop worker, if all events accounted for have been ACKed.
144+
// If new events are added after this acker won't handle them, which may
145+
// result in duplicates
146+
return
147+
}
142148

143149
case <-a.pipeline.ackDone:
144150
return
145151

146152
case n := <-acks:
147153
empty := a.handleACK(n)
148154
if empty && closing && a.events.Load() == 0 {
149-
// stop worker, iff all events accounted for have been ACKed
155+
// stop worker, if and only if all events accounted for have been ACKed
150156
return
151157
}
152158

153159
case <-drop:
154160
// TODO: accumulate multiple drop events + flush count with timer
161+
a.events.Sub(1)
155162
a.fn(1, 0)
156163
}
157164
}

0 commit comments

Comments
 (0)