Skip to content

Commit cf956ee

Browse files
committed
Stop waiting for signals on closed outleters (#11263)
Outleters start a goroutine to handle the finalization of filebeat. If the outleter is closed by other means the goroutine will be kept running even if it has nothing to do, leaking goroutines. Stop this goroutine if the outleter is closed. (cherry picked from commit 57c9891)
1 parent 3cdb2c3 commit cf956ee

File tree

5 files changed

+20
-2
lines changed

5 files changed

+20
-2
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v6.6.2...6.6[Check the HEAD diff]
3737
*Filebeat*
3838

3939
- Fix a bug with the convert_timezone option using the incorrect timezone field. {issue}11055[11055] {pull}11164[11164]
40+
- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263]
4041

4142
*Heartbeat*
4243

filebeat/channel/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,6 @@ type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
3232
// Outleter is the outlet for an input
3333
type Outleter interface {
3434
Close() error
35+
Done() <-chan struct{}
3536
OnEvent(data *util.Data) bool
3637
}

filebeat/channel/outlet.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,32 @@ type outlet struct {
2727
wg eventCounter
2828
client beat.Client
2929
isOpen atomic.Bool
30+
done chan struct{}
3031
}
3132

3233
func newOutlet(client beat.Client, wg eventCounter) *outlet {
3334
o := &outlet{
3435
wg: wg,
3536
client: client,
3637
isOpen: atomic.MakeBool(true),
38+
done: make(chan struct{}),
3739
}
3840
return o
3941
}
4042

4143
func (o *outlet) Close() error {
4244
isOpen := o.isOpen.Swap(false)
4345
if isOpen {
46+
close(o.done)
4447
return o.client.Close()
4548
}
4649
return nil
4750
}
4851

52+
func (o *outlet) Done() <-chan struct{} {
53+
return o.done
54+
}
55+
4956
func (o *outlet) OnEvent(d *util.Data) bool {
5057
if !o.isOpen.Load() {
5158
return false

filebeat/channel/util.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ func (o *subOutlet) Close() error {
7171
return nil
7272
}
7373

74+
func (o *subOutlet) Done() <-chan struct{} {
75+
return o.done
76+
}
77+
7478
func (o *subOutlet) OnEvent(d *util.Data) bool {
7579

7680
o.mutex.Lock()
@@ -114,8 +118,12 @@ func (o *subOutlet) OnEvent(d *util.Data) bool {
114118
func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter {
115119
if sig != nil {
116120
go func() {
117-
<-sig
118-
outlet.Close()
121+
select {
122+
case <-outlet.Done():
123+
return
124+
case <-sig:
125+
outlet.Close()
126+
}
119127
}()
120128
}
121129
return outlet

filebeat/input/log/input_other_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,4 @@ type TestOutlet struct{}
169169

170170
func (o TestOutlet) OnEvent(event *util.Data) bool { return true }
171171
func (o TestOutlet) Close() error { return nil }
172+
func (o TestOutlet) Done() <-chan struct{} { return nil }

0 commit comments

Comments
 (0)