Skip to content

Commit 9cbbd0b

Browse files
committed
Introduce harvester_limit to limit number of harvesters
This limits the number of harvesters that are started per prospector. * Changelog entry added * Docs updated * Config file updated Closes #2236
1 parent b3b79aa commit 9cbbd0b

File tree

9 files changed

+119
-29
lines changed

9 files changed

+119
-29
lines changed

CHANGELOG.asciidoc

+5-3
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,16 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
8787

8888
*Packetbeat*
8989

90-
- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959]
91-
- Match connections with IPv6 addresses to processes {pull}2254[2254]
92-
- Add IP address to -devices command output {pull}2327[2327]
90+
- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959]
91+
- Match connections with IPv6 addresses to processes {pull}2254[2254]
92+
- Add IP address to -devices command output {pull}2327[2327]
9393

9494
*Topbeat*
9595

9696
*Filebeat*
9797

98+
- Add harvester_limit option {pull}2417[2417]
99+
98100
*Winlogbeat*
99101

100102

filebeat/docs/reference/configuration/filebeat-options.asciidoc

+18-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ The timestamp for closing a file does not depend on the modification time of the
209209

210210
You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 5m.
211211

212-
213212
===== close_renamed
214213

215214
WARNING: Only use this options if you understand that data loss is a potential side effect.
@@ -410,6 +409,24 @@ the backoff algorithm is disabled, and the `backoff` value is used for waiting f
410409
lines. The `backoff` value will be multiplied each time with the `backoff_factor` until
411410
`max_backoff` is reached. The default is 2.
412411

412+
===== harvester_limit
413+
414+
EXPERIMENTAL
415+
416+
harvester_limit limits the number of harvesters that are started in parallel for one prospector. This directly relates
417+
to the maximum number of file handlers that are opened. The default is 0 which means there is no limit. This configuration
418+
is useful if the number of files to be harvested exceeds the open file handler limit of the operating system.
419+
420+
As setting a limit on harvester means that potentially not all files are opened in parallel, it is recommended to use
421+
this option in combination with the close_* options to make sure harvesters are stopped more often so new files can be
422+
picked up.
423+
424+
Currently if a new harvester can be started again, the new harvester to be started is picked randomly. This means it can
425+
happen that a harvester for a file which was just closed and the file was updated again will be started instead of a
426+
harvester for a file which wasn't harvested for a longer period of time.
427+
428+
This configuration option applies per prospector. This can be indirectly used to set higher priorities on certain prospectors
429+
by assining a higher limit of harvesters.
413430

414431
[[configuration-global-options]]
415432
=== Filebeat Global Configuration

filebeat/etc/beat.full.yml

+4
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ filebeat.prospectors:
161161
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
162162
#backoff_factor: 2
163163

164+
# Experimental: Max number of harvesters that are started in parallel.
165+
# Default is 0 which means unlimited
166+
#harvester_limit: 0
167+
164168
### Harvester closing options
165169

166170
# Close inactive closes the file handler after the predefined period.

filebeat/filebeat.full.yml

+4
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ filebeat.prospectors:
161161
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
162162
#backoff_factor: 2
163163

164+
# Experimental: Max number of harvesters that are started in parallel.
165+
# Default is 0 which means unlimited
166+
#harvester_limit: 0
167+
164168
### Harvester closing options
165169

166170
# Close inactive closes the file handler after the predefined period.

filebeat/prospector/config.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,24 @@ import (
1010

1111
var (
1212
defaultConfig = prospectorConfig{
13-
IgnoreOlder: 0,
14-
ScanFrequency: 10 * time.Second,
15-
InputType: cfg.DefaultInputType,
16-
CleanInactive: 0,
17-
CleanRemoved: false,
13+
IgnoreOlder: 0,
14+
ScanFrequency: 10 * time.Second,
15+
InputType: cfg.DefaultInputType,
16+
CleanInactive: 0,
17+
CleanRemoved: false,
18+
HarvesterLimit: 0,
1819
}
1920
)
2021

2122
type prospectorConfig struct {
22-
ExcludeFiles []*regexp.Regexp `config:"exclude_files"`
23-
IgnoreOlder time.Duration `config:"ignore_older"`
24-
Paths []string `config:"paths"`
25-
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
26-
InputType string `config:"input_type"`
27-
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
28-
CleanRemoved bool `config:"clean_removed"`
23+
ExcludeFiles []*regexp.Regexp `config:"exclude_files"`
24+
IgnoreOlder time.Duration `config:"ignore_older"`
25+
Paths []string `config:"paths"`
26+
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
27+
InputType string `config:"input_type"`
28+
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
29+
CleanRemoved bool `config:"clean_removed"`
30+
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
2931
}
3032

3133
func (config *prospectorConfig) Validate() error {

filebeat/prospector/prospector.go

+22-9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"sync"
66
"time"
77

8+
"sync/atomic"
9+
810
cfg "github.com/elastic/beats/filebeat/config"
911
"github.com/elastic/beats/filebeat/harvester"
1012
"github.com/elastic/beats/filebeat/input"
@@ -14,14 +16,15 @@ import (
1416
)
1517

1618
type Prospector struct {
17-
cfg *common.Config // Raw config
18-
config prospectorConfig
19-
prospectorer Prospectorer
20-
spoolerChan chan *input.Event
21-
harvesterChan chan *input.Event
22-
done chan struct{}
23-
states *file.States
24-
wg sync.WaitGroup
19+
cfg *common.Config // Raw config
20+
config prospectorConfig
21+
prospectorer Prospectorer
22+
spoolerChan chan *input.Event
23+
harvesterChan chan *input.Event
24+
done chan struct{}
25+
states *file.States
26+
wg sync.WaitGroup
27+
harvesterCounter uint64
2528
}
2629

2730
type Prospectorer interface {
@@ -155,6 +158,13 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er
155158
}
156159

157160
func (p *Prospector) startHarvester(state file.State, offset int64) error {
161+
162+
if p.config.HarvesterLimit > 0 && atomic.LoadUint64(&p.harvesterCounter) >= p.config.HarvesterLimit {
163+
return fmt.Errorf("Harvester limit reached.")
164+
}
165+
166+
atomic.AddUint64(&p.harvesterCounter, 1)
167+
158168
state.Offset = offset
159169
// Create harvester with state
160170
h, err := p.createHarvester(state)
@@ -164,7 +174,10 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
164174

165175
p.wg.Add(1)
166176
go func() {
167-
defer p.wg.Done()
177+
defer func() {
178+
p.wg.Done()
179+
atomic.AddUint64(&p.harvesterCounter, ^uint64(0))
180+
}()
168181
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)
169182
h.Harvest()
170183
}()

filebeat/prospector/prospector_log.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (p *ProspectorLog) scan() {
159159
logp.Debug("prospector", "Start harvester for new file: %s", newState.Source)
160160
err := p.Prospector.startHarvester(newState, 0)
161161
if err != nil {
162-
logp.Err("Harvester could not be started on new file: %s", err)
162+
logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err)
163163
}
164164
} else {
165165
p.harvestExistingFile(newState, lastState)
@@ -182,7 +182,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
182182
logp.Debug("prospector", "Resuming harvesting of file: %s, offset: %v", newState.Source, oldState.Offset)
183183
err := p.Prospector.startHarvester(newState, oldState.Offset)
184184
if err != nil {
185-
logp.Err("Harvester could not be started on existing file: %s", err)
185+
logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
186186
}
187187
return
188188
}
@@ -192,7 +192,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
192192
logp.Debug("prospector", "Old file was truncated. Starting from the beginning: %s", newState.Source)
193193
err := p.Prospector.startHarvester(newState, 0)
194194
if err != nil {
195-
logp.Err("Harvester could not be started on truncated file: %s", err)
195+
logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
196196
}
197197

198198
filesTrucated.Add(1)

filebeat/tests/system/config/filebeat.yml.j2

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ filebeat.prospectors:
2525
force_close_files: {{force_close_files}}
2626
clean_inactive: {{clean_inactive}}
2727
clean_removed: {{clean_removed}}
28+
harvester_limit: {{harvester_limit | default(0) }}
2829

2930
{% if fields %}
3031
fields:

filebeat/tests/system/test_prospector.py

+48-1
Original file line numberDiff line numberDiff line change
@@ -574,10 +574,57 @@ def test_skip_symlinks(self):
574574
lambda: self.output_has(lines=1),
575575
max_timeout=15)
576576

577-
time.sleep(5)
578577
filebeat.check_kill_and_wait()
579578

580579
data = self.read_output()
581580

582581
# Make sure there is only one entry, means it didn't follow the symlink
583582
assert len(data) == 1
583+
584+
def test_harvester_limit(self):
585+
"""
586+
Test if harvester_limit applies
587+
"""
588+
self.render_config_template(
589+
path=os.path.abspath(self.working_dir) + "/log/*",
590+
harvester_limit=1,
591+
close_inactive="1s",
592+
scan_frequency="1s",
593+
)
594+
595+
os.mkdir(self.working_dir + "/log/")
596+
testfile1 = self.working_dir + "/log/test1.log"
597+
testfile2 = self.working_dir + "/log/test2.log"
598+
testfile3 = self.working_dir + "/log/test3.log"
599+
600+
with open(testfile1, 'w') as file:
601+
file.write("Line1\n")
602+
603+
with open(testfile2, 'w') as file:
604+
file.write("Line2\n")
605+
606+
with open(testfile3, 'w') as file:
607+
file.write("Line3\n")
608+
609+
filebeat = self.start_beat()
610+
611+
# check that not all harvesters were started
612+
self.wait_until(
613+
lambda: self.log_contains("Harvester limit reached"),
614+
max_timeout=10)
615+
616+
# wait for registry to be written
617+
self.wait_until(
618+
lambda: self.log_contains("Registry file updated"),
619+
max_timeout=10)
620+
621+
# Make sure not all events were written so far
622+
data = self.read_output()
623+
assert len(data) < 3
624+
625+
self.wait_until(lambda: self.output_has(lines=3), max_timeout=15)
626+
627+
data = self.read_output()
628+
assert len(data) == 3
629+
630+
filebeat.check_kill_and_wait()

0 commit comments

Comments
 (0)