Skip to content

Commit

Permalink
Fix issue that new prospector was not reloaded on conflict (elastic#4128
Browse files Browse the repository at this point in the history
)

A new prospector cannot be started if one of the states he starts with is not set to Finished. In this case the prospector should wait until the old prospector is shutdown and try again on the next reload. This did not work as intended so far because if loading failed, the running prospector was stopped and the non running was not started again on the next run. This change should fix it.

See also https://discuss.elastic.co/t/error-when-reloading-prospectors-for-filebeat/83082/5

Closes elastic#4133
  • Loading branch information
ruflin authored and tsg committed May 5, 2017
1 parent 2ce7688 commit d98e54e
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Fix panic in JSON decoding code if the input line is "null". {pull}4042[4042]
- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037]
- Fix the Mysql slowlog parsing of IP addresses. {pull}4183[4183]
- Fix issue that new prospector was not reloaded on conflict {pull}4128[4128]

*Heartbeat*
- Add default ports in HTTP monitor. {pull}3924[3924]
Expand Down
3 changes: 2 additions & 1 deletion filebeat/prospector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
err = p.LoadStates(r.registrar.GetStates())
if err != nil {
logp.Err("Error loading states for prospector %v: %v", p.ID(), err)
return nil, err
// In case of error with loading state, prospector is still returne
return p, err
}

return p, nil
Expand Down
91 changes: 91 additions & 0 deletions filebeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,94 @@ def test_load_configs(self):
assert self.output_lines() == 2
assert output[0]["message"] == first_line
assert output[1]["message"] == second_line

def test_reload_same_config(self):
"""
Test reload same config with same file but different config. Makes sure reloading also works on conflicts.
"""
self.render_config_template(
reload=True,
reload_path=self.working_dir + "/configs/*.yml",
prospectors=False,
)

os.mkdir(self.working_dir + "/logs/")
logfile = self.working_dir + "/logs/test.log"
os.mkdir(self.working_dir + "/configs/")

with open(self.working_dir + "/configs/prospector.yml", 'w') as f:
f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/*"))

proc = self.start_beat()

with open(logfile, 'w') as f:
f.write("Hello world1\n")

self.wait_until(lambda: self.output_lines() > 0)

# New config with same config file but a bit different to make it reload
# Add it intentionally when other prospector is still running to cause an error
with open(self.working_dir + "/configs/prospector.yml", 'w') as f:
f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/test.log"))

# Make sure error shows up in log file
self.wait_until(
lambda: self.log_contains("Can only start a prospector when all related states are finished"),
max_timeout=15)

# Wait until old runner is stopped
self.wait_until(
lambda: self.log_contains("Runner stopped:"),
max_timeout=15)

# Add new log line and see if it is picked up = new prospector is running
with open(logfile, 'a') as f:
f.write("Hello world2\n")

self.wait_until(lambda: self.output_lines() > 1)

proc.check_kill_and_wait()

def test_reload_add(self):
"""
Test adding a prospector and makes sure both are still running
"""
self.render_config_template(
reload=True,
reload_path=self.working_dir + "/configs/*.yml",
prospectors=False,
)

os.mkdir(self.working_dir + "/logs/")
logfile1 = self.working_dir + "/logs/test1.log"
logfile2 = self.working_dir + "/logs/test2.log"
os.mkdir(self.working_dir + "/configs/")

with open(self.working_dir + "/configs/prospector1.yml", 'w') as f:
f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/test1.log"))

proc = self.start_beat()

with open(logfile1, 'w') as f:
f.write("Hello world1\n")

self.wait_until(lambda: self.output_lines() > 0)

with open(self.working_dir + "/configs/prospector2.yml", 'w') as f:
f.write(prospectorConfigTemplate.format(self.working_dir + "/logs/test2.log"))

self.wait_until(
lambda: self.log_contains_count("New runner started") == 2,
max_timeout=15)

# Add new log line and see if it is picked up = new prospector is running
with open(logfile1, 'a') as f:
f.write("Hello world2\n")

# Add new log line and see if it is picked up = new prospector is running
with open(logfile2, 'a') as f:
f.write("Hello world3\n")

self.wait_until(lambda: self.output_lines() == 3)

proc.check_kill_and_wait()
13 changes: 12 additions & 1 deletion libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
rl.config.Reload.Period = 0
}

overwriteUpate := true

for {
select {
case <-rl.done:
Expand All @@ -110,7 +112,8 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
}

// no file changes
if !updated {
if !updated && !overwriteUpate {
overwriteUpate = false
continue
}

Expand Down Expand Up @@ -140,6 +143,14 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {

runner, err := runnerFactory.Create(c)
if err != nil {
// Make sure the next run also updates because some runners were not properly loaded
overwriteUpate = true

// In case prospector already is running, do not stop it
if runner != nil && rl.registry.Has(runner.ID()) {
debugf("Remove module from stoplist: %v", runner.ID())
delete(stopList, runner.ID())
}
logp.Err("Error creating module: %s", err)
continue
}
Expand Down

0 comments on commit d98e54e

Please sign in to comment.