From 4e3a0be23039cc2221548edefb9c6cb5f9751ee5 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 15 Mar 2019 15:28:46 +0100 Subject: [PATCH 1/3] Stop waiting for signals on closed outleters Outleters can wait for the finalization of filebeat. If the outleter is closed the goroutine will be still running even if it has nothing to do, leaking goroutines. Stop the goroutine waiting to close on signal if the outleter has been already closed. --- filebeat/channel/interface.go | 1 + filebeat/channel/outlet.go | 7 +++++++ filebeat/channel/util.go | 12 ++++++++++-- filebeat/input/log/input_other_test.go | 1 + 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 82e5a82af37b..877b818870a8 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -32,5 +32,6 @@ type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) // Outleter is the outlet for an input type Outleter interface { Close() error + Done() <-chan struct{} OnEvent(data *util.Data) bool } diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index d130cf9ceeb9..e8382f7e3bc1 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -27,6 +27,7 @@ type outlet struct { wg eventCounter client beat.Client isOpen atomic.Bool + done chan struct{} } func newOutlet(client beat.Client, wg eventCounter) *outlet { @@ -34,6 +35,7 @@ func newOutlet(client beat.Client, wg eventCounter) *outlet { wg: wg, client: client, isOpen: atomic.MakeBool(true), + done: make(chan struct{}), } return o } @@ -43,9 +45,14 @@ func (o *outlet) Close() error { if isOpen { return o.client.Close() } + close(o.done) return nil } +func (o *outlet) Done() <-chan struct{} { + return o.done +} + func (o *outlet) OnEvent(d *util.Data) bool { if !o.isOpen.Load() { return false diff --git a/filebeat/channel/util.go b/filebeat/channel/util.go index 134765c4cd8b..aec2132fa20a 100644 --- a/filebeat/channel/util.go +++ b/filebeat/channel/util.go @@ -71,6 +71,10 @@ func (o *subOutlet) Close() error { return nil } +func (o *subOutlet) Done() <-chan struct{} { + return o.done +} + func (o *subOutlet) OnEvent(d *util.Data) bool { o.mutex.Lock() @@ -114,8 +118,12 @@ func (o *subOutlet) OnEvent(d *util.Data) bool { func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter { if sig != nil { go func() { - <-sig - outlet.Close() + select { + case <-outlet.Done(): + return + case <-sig: + outlet.Close() + } }() } return outlet diff --git a/filebeat/input/log/input_other_test.go b/filebeat/input/log/input_other_test.go index 3a36cb6040d6..bdaba0c7d2ab 100644 --- a/filebeat/input/log/input_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -169,3 +169,4 @@ type TestOutlet struct{} func (o TestOutlet) OnEvent(event *util.Data) bool { return true } func (o TestOutlet) Close() error { return nil } +func (o TestOutlet) Done() <-chan struct{} { return nil } From f057756580635c35b74116af2fdd4dbaf136cdd3 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 15 Mar 2019 15:38:05 +0100 Subject: [PATCH 2/3] Actually close outlet channel --- filebeat/channel/outlet.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index e8382f7e3bc1..c0fe2b0c9e3a 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -43,9 +43,9 @@ func newOutlet(client beat.Client, wg eventCounter) *outlet { func (o *outlet) Close() error { isOpen := o.isOpen.Swap(false) if isOpen { + close(o.done) return o.client.Close() } - close(o.done) return nil } From 581df59a37e9c2b2d544d638b82f9da3ed81b95a Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 15 Mar 2019 15:59:25 +0100 Subject: [PATCH 3/3] Add changelog entry --- CHANGELOG.next.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bd7a850a489f..e2012cd8aa52 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -166,8 +166,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve detection of file deletion on Windows. {pull}10747[10747] - Fix errors in filebeat Zeek dashboard and README files. Add notice.log support. {pull}10916[10916] - Fix a bug when converting NetFlow fields to snake_case. {pull}10950[10950] -- Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test - case. {issue}11004[11004] {pull}11105[11105] +- Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test case. {issue}11004[11004] {pull}11105[11105] +- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] *Heartbeat*