Skip to content

Commit

Permalink
Fix issue that new prospector was not reloaded on conflict (elastic#4128
Browse files Browse the repository at this point in the history
)

A new prospector cannot be started if one of the states he starts with is not set to Finished. In this case the prospector should wait until the old prospector is shutdown and try again on the next reload. This did not work as intended so far because if loading failed, the running prospector was stopped and the non running was not started again on the next run. This change should fix it.

See also https://discuss.elastic.co/t/error-when-reloading-prospectors-for-filebeat/83082/5

Closes elastic#4133
(cherry picked from commit d98e54e)
  • Loading branch information
ruflin committed May 5, 2017
1 parent a73bab4 commit 40f1c6c
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ https://github.com/elastic/beats/compare/v5.3.0...master[Check the HEAD diff]
- Fix console output {pull}4045[4045]

*Filebeat*
- Fix issue that new prospector was not reloaded on conflict {pull}4128[4128]

*Heartbeat*

Expand Down
3 changes: 2 additions & 1 deletion filebeat/prospector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
err = p.LoadStates(r.registrar.GetStates())
if err != nil {
logp.Err("Error loading states for prospector %v: %v", p.ID(), err)
return nil, err
// In case of error with loading state, prospector is still returne
return p, err
}

return p, nil
Expand Down
5 changes: 4 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,14 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg
{%endif%}
filebeat.publish_async: {{publish_async}}

{% if reload -%}
{% if reload or reload_path -%}
filebeat.config.prospectors:
enabled: true
path: {{ reload_path }}
{% if reload -%}
reload.period: 1s
reload.enabled: true
{% endif -%}
{% endif -%}

#================================ General =====================================
Expand Down
141 changes: 141 additions & 0 deletions filebeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,144 @@ def test_reload_same_prospector(self):
assert output[1]["message"] == second_line
# assert that fields are added
assert output[1]["fields.hello"] == "world"

def test_load_configs(self):
"""
Test loading separate prospectors configs
"""
self.render_config_template(
reload_path=self.working_dir + "/configs/*.yml",
prospectors=False,
)

os.mkdir(self.working_dir + "/logs/")
logfile = self.working_dir + "/logs/test.log"
os.mkdir(self.working_dir + "/configs/")

first_line = "First log file"
second_line = "Second log file"

config = prospectorConfigTemplate.format(self.working_dir + "/logs/test.log")
config = config + """
close_eof: true
"""
with open(self.working_dir + "/configs/prospector.yml", 'w') as f:
f.write(config)

with open(logfile, 'w') as f:
f.write(first_line + "\n")

proc = self.start_beat()

self.wait_until(lambda: self.output_lines() == 1)

# Update both log files, only 1 change should be picke dup
with open(logfile, 'a') as f:
f.write(second_line + "\n")

self.wait_until(lambda: self.output_lines() == 2)

proc.check_kill_and_wait()

output = self.read_output()

# Reloading stopped.
self.wait_until(
lambda: self.log_contains("Loading of config files completed."),
max_timeout=15)

# Make sure the correct lines were picked up
assert self.output_lines() == 2
assert output[0]["message"] == first_line
assert output[1]["message"] == second_line

def test_reload_same_config(self):
"""
Test reload same config with same file but different config. Makes sure reloading also works on conflicts.
"""
self.render_config_template(
reload=True,
reload_path=self.working_dir + "/configs/*.yml",
prospectors=False,
)

os.mkdir(self.working_dir + "/logs/")
logfile = self.working_dir + "/logs/test.log"
os.mkdir(self.working_dir + "/configs/")

with open(self.working_dir + "/configs/prospector.yml", 'w') as f:
f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/*"))

proc = self.start_beat()

with open(logfile, 'w') as f:
f.write("Hello world1\n")

self.wait_until(lambda: self.output_lines() > 0)

# New config with same config file but a bit different to make it reload
# Add it intentionally when other prospector is still running to cause an error
with open(self.working_dir + "/configs/prospector.yml", 'w') as f:
f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/test.log"))

# Make sure error shows up in log file
self.wait_until(
lambda: self.log_contains("Can only start a prospector when all related states are finished"),
max_timeout=15)

# Wait until old runner is stopped
self.wait_until(
lambda: self.log_contains("Runner stopped:"),
max_timeout=15)

# Add new log line and see if it is picked up = new prospector is running
with open(logfile, 'a') as f:
f.write("Hello world2\n")

self.wait_until(lambda: self.output_lines() > 1)

proc.check_kill_and_wait()

def test_reload_add(self):
"""
Test adding a prospector and makes sure both are still running
"""
self.render_config_template(
reload=True,
reload_path=self.working_dir + "/configs/*.yml",
prospectors=False,
)

os.mkdir(self.working_dir + "/logs/")
logfile1 = self.working_dir + "/logs/test1.log"
logfile2 = self.working_dir + "/logs/test2.log"
os.mkdir(self.working_dir + "/configs/")

with open(self.working_dir + "/configs/prospector1.yml", 'w') as f:
f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/test1.log"))

proc = self.start_beat()

with open(logfile1, 'w') as f:
f.write("Hello world1\n")

self.wait_until(lambda: self.output_lines() > 0)

with open(self.working_dir + "/configs/prospector2.yml", 'w') as f:
f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/test2.log"))

self.wait_until(
lambda: self.log_contains_count("New runner started") == 2,
max_timeout=15)

# Add new log line and see if it is picked up = new prospector is running
with open(logfile1, 'a') as f:
f.write("Hello world2\n")

# Add new log line and see if it is picked up = new prospector is running
with open(logfile2, 'a') as f:
f.write("Hello world3\n")

self.wait_until(lambda: self.output_lines() == 3)

proc.check_kill_and_wait()
28 changes: 27 additions & 1 deletion libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {

gw := NewGlobWatcher(path)

// If reloading is disable, config files should be loaded immidiately
if !rl.config.Reload.Enabled {
rl.config.Reload.Period = 0
}

overwriteUpate := true

for {
select {
case <-rl.done:
Expand All @@ -105,7 +112,8 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
}

// no file changes
if !updated {
if !updated && !overwriteUpate {
overwriteUpate = false
continue
}

Expand Down Expand Up @@ -135,6 +143,14 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {

runner, err := runnerFactory.Create(c)
if err != nil {
// Make sure the next run also updates because some runners were not properly loaded
overwriteUpate = true

// In case prospector already is running, do not stop it
if runner != nil && rl.registry.Has(runner.ID()) {
debugf("Remove module from stoplist: %v", runner.ID())
delete(stopList, runner.ID())
}
logp.Err("Error creating module: %s", err)
continue
}
Expand All @@ -153,6 +169,16 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
rl.stopRunners(stopList)
rl.startRunners(startList)
}

// Path loading is enabled but not reloading. Loads files only once and then stops.
if !rl.config.Reload.Enabled {
logp.Info("Loading of config files completed.")
select {
case <-rl.done:
logp.Info("Dynamic config reloader stopped")
return
}
}
}
}

Expand Down

0 comments on commit 40f1c6c

Please sign in to comment.