Skip to content

Commit e6bb5c9

Browse files
authored
Fix leak caused by input runners created when checking their configuration (#23722)
Stop input v1 runners created to check config. `CheckConfig` for v1 inputs actually calls the constructors of the inputs. In some cases, as in the log input, the constructor creates resources that are never released unless the runner is stopped. This causes goroutines leaks with autodiscover or other dynamic configurations.
1 parent 2fb2561 commit e6bb5c9

File tree

19 files changed

+416
-45
lines changed

19 files changed

+416
-45
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
280280
- Change the `event.created` in Netflow events to be the time the event was created by Filebeat
281281
to be consistent with ECS. {pull}23094[23094]
282282
- Update `filestream` reader offset when a line is skipped. {pull}23417[23417]
283+
- Fix goroutines leak with some inputs in autodiscover. {pull}23722[23722]
283284

284285
*Filebeat*
285286

filebeat/fileset/factory.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
package fileset
1919

2020
import (
21+
"fmt"
22+
2123
"github.com/gofrs/uuid"
24+
"github.com/mitchellh/hashstructure"
2225

2326
"github.com/elastic/beats/v7/libbeat/beat"
2427
"github.com/elastic/beats/v7/libbeat/cfgfile"
@@ -27,9 +30,6 @@ import (
2730
"github.com/elastic/beats/v7/libbeat/logp"
2831
"github.com/elastic/beats/v7/libbeat/monitoring"
2932
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
30-
pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline"
31-
32-
"github.com/mitchellh/hashstructure"
3333
)
3434

3535
var (
@@ -77,15 +77,9 @@ func NewFactory(
7777

7878
// Create creates a module based on a config
7979
func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) {
80-
// Start a registry of one module:
81-
m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false)
82-
if err != nil {
83-
return nil, err
84-
}
85-
86-
pConfigs, err := m.GetInputConfigs()
80+
m, pConfigs, err := f.createRegistry(c)
8781
if err != nil {
88-
return nil, err
82+
return nil, fmt.Errorf("could not create module registry for filesets: %w", err)
8983
}
9084

9185
// Hash module ID
@@ -116,8 +110,36 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Ru
116110
}
117111

118112
func (f *Factory) CheckConfig(c *common.Config) error {
119-
_, err := f.Create(pubpipeline.NewNilPipeline(), c)
120-
return err
113+
_, pConfigs, err := f.createRegistry(c)
114+
if err != nil {
115+
return fmt.Errorf("could not create module registry for filesets: %w", err)
116+
}
117+
118+
for _, pConfig := range pConfigs {
119+
err = f.inputFactory.CheckConfig(pConfig)
120+
if err != nil {
121+
logp.Err("Error checking input configuration: %s", err)
122+
return err
123+
}
124+
}
125+
126+
return nil
127+
}
128+
129+
// createRegistry starts a registry for a set of filesets, it returns the registry and
130+
// its input configurations
131+
func (f *Factory) createRegistry(c *common.Config) (*ModuleRegistry, []*common.Config, error) {
132+
m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false)
133+
if err != nil {
134+
return nil, nil, err
135+
}
136+
137+
pConfigs, err := m.GetInputConfigs()
138+
if err != nil {
139+
return nil, nil, err
140+
}
141+
142+
return m, pConfigs, err
121143
}
122144

123145
func (p *inputsRunner) Start() {
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// +build !integration
19+
20+
package container
21+
22+
import (
23+
"os"
24+
"path"
25+
"testing"
26+
27+
"github.com/elastic/beats/v7/filebeat/input/inputtest"
28+
"github.com/elastic/beats/v7/libbeat/common"
29+
)
30+
31+
func TestNewInputDone(t *testing.T) {
32+
config := common.MapStr{
33+
"paths": path.Join(os.TempDir(), "logs", "*.log"),
34+
}
35+
inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config)
36+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// +build !integration
19+
20+
package docker
21+
22+
import (
23+
"testing"
24+
25+
"github.com/elastic/beats/v7/filebeat/input/inputtest"
26+
"github.com/elastic/beats/v7/libbeat/common"
27+
)
28+
29+
func TestNewInputDone(t *testing.T) {
30+
config := common.MapStr{
31+
"containers.ids": "fad130edd3d2",
32+
}
33+
inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config)
34+
}

filebeat/input/inputtest/input.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package inputtest
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
26+
"github.com/elastic/beats/v7/filebeat/channel"
27+
"github.com/elastic/beats/v7/filebeat/input"
28+
"github.com/elastic/beats/v7/libbeat/beat"
29+
"github.com/elastic/beats/v7/libbeat/common"
30+
"github.com/elastic/beats/v7/libbeat/tests/resources"
31+
)
32+
33+
// Outlet is an empty outlet for testing.
34+
type Outlet struct{}
35+
36+
func (o Outlet) OnEvent(event beat.Event) bool { return true }
37+
func (o Outlet) Close() error { return nil }
38+
func (o Outlet) Done() <-chan struct{} { return nil }
39+
40+
// Connector is a connector to a test empty outlet.
41+
var Connector = channel.ConnectorFunc(
42+
func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
43+
return Outlet{}, nil
44+
},
45+
)
46+
47+
// AssertNotStartedInputCanBeDone checks that the context of an input can be
48+
// done before starting the input, and it doesn't leak goroutines. This is
49+
// important to confirm that leaks don't happen with CheckConfig.
50+
func AssertNotStartedInputCanBeDone(t *testing.T, factory input.Factory, configMap *common.MapStr) {
51+
goroutines := resources.NewGoroutinesChecker()
52+
defer goroutines.Check(t)
53+
54+
config, err := common.NewConfigFrom(configMap)
55+
require.NoError(t, err)
56+
57+
context := input.Context{
58+
Done: make(chan struct{}),
59+
}
60+
61+
_, err = factory(config, Connector, context)
62+
assert.NoError(t, err)
63+
64+
close(context.Done)
65+
}

filebeat/input/kafka/input_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// +build !integration
19+
20+
package kafka
21+
22+
import (
23+
"testing"
24+
25+
"github.com/elastic/beats/v7/filebeat/input/inputtest"
26+
"github.com/elastic/beats/v7/libbeat/common"
27+
)
28+
29+
func TestNewInputDone(t *testing.T) {
30+
config := common.MapStr{
31+
"hosts": "localhost:9092",
32+
"topics": "messages",
33+
"group_id": "filebeat",
34+
}
35+
inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config)
36+
}

filebeat/input/log/input_other_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ package log
2222
import (
2323
"testing"
2424

25+
"github.com/stretchr/testify/assert"
26+
2527
"github.com/elastic/beats/v7/filebeat/input/file"
28+
"github.com/elastic/beats/v7/filebeat/input/inputtest"
2629
"github.com/elastic/beats/v7/libbeat/common/match"
27-
28-
"github.com/stretchr/testify/assert"
2930
)
3031

3132
var matchTests = []struct {
@@ -148,7 +149,7 @@ func TestInit(t *testing.T) {
148149
Paths: test.paths,
149150
},
150151
states: file.NewStates(),
151-
outlet: TestOutlet{},
152+
outlet: inputtest.Outlet{},
152153
fileStateIdentifier: &file.MockIdentifier{},
153154
}
154155

filebeat/input/log/input_test.go

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/elastic/beats/v7/filebeat/channel"
3333
"github.com/elastic/beats/v7/filebeat/input"
3434
"github.com/elastic/beats/v7/filebeat/input/file"
35+
"github.com/elastic/beats/v7/filebeat/input/inputtest"
3536
"github.com/elastic/beats/v7/libbeat/beat"
3637
"github.com/elastic/beats/v7/libbeat/common"
3738
"github.com/elastic/beats/v7/libbeat/common/match"
@@ -185,25 +186,10 @@ func testInputLifecycle(t *testing.T, context input.Context, closer func(input.C
185186
}
186187

187188
func TestNewInputDone(t *testing.T) {
188-
goroutines := resources.NewGoroutinesChecker()
189-
defer goroutines.Check(t)
190-
191-
config, _ := common.NewConfigFrom(common.MapStr{
189+
config := common.MapStr{
192190
"paths": path.Join(os.TempDir(), "logs", "*.log"),
193-
})
194-
195-
connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
196-
return TestOutlet{}, nil
197-
})
198-
199-
context := input.Context{
200-
Done: make(chan struct{}),
201191
}
202-
203-
_, err := NewInput(config, connector, context)
204-
assert.NoError(t, err)
205-
206-
close(context.Done)
192+
inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config)
207193
}
208194

209195
func TestNewInputError(t *testing.T) {
@@ -213,7 +199,7 @@ func TestNewInputError(t *testing.T) {
213199
config := common.NewConfig()
214200

215201
connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
216-
return TestOutlet{}, nil
202+
return inputtest.Outlet{}, nil
217203
})
218204

219205
context := input.Context{}
@@ -318,10 +304,3 @@ func (o *eventCapturer) Close() error {
318304
func (o *eventCapturer) Done() <-chan struct{} {
319305
return o.c
320306
}
321-
322-
// TestOutlet is an empty outlet for testing
323-
type TestOutlet struct{}
324-
325-
func (o TestOutlet) OnEvent(event beat.Event) bool { return true }
326-
func (o TestOutlet) Close() error { return nil }
327-
func (o TestOutlet) Done() <-chan struct{} { return nil }

filebeat/input/mqtt/input_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/stretchr/testify/require"
2828

2929
finput "github.com/elastic/beats/v7/filebeat/input"
30+
"github.com/elastic/beats/v7/filebeat/input/inputtest"
3031
"github.com/elastic/beats/v7/libbeat/beat"
3132
"github.com/elastic/beats/v7/libbeat/common"
3233
"github.com/elastic/beats/v7/libbeat/common/backoff"
@@ -322,6 +323,13 @@ func TestOnCreateHandler_SubscribeMultiple_BackoffSignalDone(t *testing.T) {
322323
require.Equal(t, 1, mockedBackoff.resetCount)
323324
}
324325

326+
func TestNewInputDone(t *testing.T) {
327+
config := common.MapStr{
328+
"hosts": "tcp://:0",
329+
}
330+
inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config)
331+
}
332+
325333
func assertEventMatches(t *testing.T, expected mockedMessage, got beat.Event) {
326334
topic, err := got.GetValue("mqtt.topic")
327335
require.NoError(t, err)

filebeat/input/redis/input_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// +build !integration
19+
20+
package redis
21+
22+
import (
23+
"testing"
24+
25+
"github.com/elastic/beats/v7/filebeat/input/inputtest"
26+
"github.com/elastic/beats/v7/libbeat/common"
27+
)
28+
29+
func TestNewInputDone(t *testing.T) {
30+
config := common.MapStr{
31+
"hosts": "localhost:3679",
32+
}
33+
inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config)
34+
}

0 commit comments

Comments
 (0)