Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce clean_removed #1922

Merged
merged 1 commit into from
Jul 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d

*Filebeat*
- Introduce close_removed and close_renamed harvester options {issue}1600[1600]
- Introduce close_eof harvester options {issue}1600[1600]

- Introduce close_eof harvester option {issue}1600[1600]
- Add clean_removed config option {issue}1600[1600]

*Winlogbeat*

Expand Down
6 changes: 5 additions & 1 deletion filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ filebeat.prospectors:
# Note: Potential data loss if renamed file is not picked up by prospector.
#close_renamed: false

# When enabling this option, a file handler is closed immidiately in case a file can't be found
# When enabling this option, a file handler is closed immediately in case a file can't be found
# any more. In case the file shows up again later, harvesting will continue at the last known position
# after scan_frequency.
# Note: Potential data loss if file reading was not finished when file was removed.
Expand All @@ -187,6 +187,10 @@ filebeat.prospectors:
# By default this is disabled.
#clean_older: 0

# Removes the state for file which cannot be found on disk anymore immediately
#clean_removed: false


#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin
Expand Down
6 changes: 5 additions & 1 deletion filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ filebeat.prospectors:
# Note: Potential data loss if renamed file is not picked up by prospector.
#close_renamed: false

# When enabling this option, a file handler is closed immidiately in case a file can't be found
# When enabling this option, a file handler is closed immediately in case a file can't be found
# any more. In case the file shows up again later, harvesting will continue at the last known position
# after scan_frequency.
# Note: Potential data loss if file reading was not finished when file was removed.
Expand All @@ -187,6 +187,10 @@ filebeat.prospectors:
# By default this is disabled.
#clean_older: 0

# Removes the state for file which cannot be found on disk anymore immediately
#clean_removed: false


#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *States) Cleanup() {

for _, state := range s.states {
ttl := state.TTL
if ttl >= 0 && currentTime.Sub(state.Timestamp) > ttl {
if ttl == 0 || (ttl > 0 && currentTime.Sub(state.Timestamp) > ttl) {
logp.Debug("state", "State removed for %v because of older: %v", state.Source, ttl)
continue // drop state
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
CleanOlder: 0,
CleanRemoved: false,
}
)

Expand All @@ -24,6 +25,7 @@ type prospectorConfig struct {
ScanFrequency time.Duration `config:"scan_frequency"`
InputType string `config:"input_type"`
CleanOlder time.Duration `config:"clean_older" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
17 changes: 16 additions & 1 deletion filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,21 @@ func (p *ProspectorLog) Run() {
p.Prospector.states.Cleanup()
logp.Debug("prospector", "Prospector states cleaned up.")
}
p.lastScan = time.Now()

// Cleanup of removed files will only happen after next scan. Otherwise it can happen that not all states
// were updated before cleanup is called
if p.config.CleanRemoved {
for _, state := range p.Prospector.states.GetStates() {
// os.Stat will return an error in case the file does not exist
_, err := os.Stat(state.Source)
if err != nil {
state.TTL = 0
h, _ := p.Prospector.createHarvester(state)
h.SendStateUpdate()
logp.Debug("prospector", "Cleanup state for file as file removed: %s", state.Source)
}
}
}
}

// getFiles returns all files which have to be harvested
Expand Down Expand Up @@ -131,6 +145,7 @@ func (p *ProspectorLog) scan() {
}
}

// Only update lastScan timestamp after scan is completed
p.lastScan = newLastScan
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ filebeat.prospectors:
close_eof: {{close_eof}}
force_close_files: {{force_close_files}}
clean_older: {{clean_older}}
clean_removed: {{clean_removed}}

{% if fields %}
fields:
Expand Down
58 changes: 58 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,3 +769,61 @@ def test_clean_older(self):
else:
assert data[0]["offset"] == 2


def test_clean_removed(self):
"""
Checks that files which were removed, the state is removed
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
scan_frequency="0.1s",
clean_removed=True,
close_removed=True
)

os.mkdir(self.working_dir + "/log/")
testfile1 = self.working_dir + "/log/input1"
testfile2 = self.working_dir + "/log/input2"

with open(testfile1, 'w') as f:
f.write("file to be removed\n")

with open(testfile2, 'w') as f:
f.write("2\n")

filebeat = self.start_beat()

self.wait_until(
lambda: self.output_has(lines=2),
max_timeout=10)

data = self.get_registry()
assert len(data) == 2

os.remove(testfile1)

# Wait until states are removed from prospectors
self.wait_until(
lambda: self.log_contains(
"Cleanup state for file as file removed"),
max_timeout=15)

# Add one more line to make sure registry is written
with open(testfile2, 'a') as f:
f.write("make sure registry is written\n")

self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)

filebeat.check_kill_and_wait()

# Check that the first to files were removed from the registry
data = self.get_registry()
assert len(data) == 1

# Make sure the last file in the registry is the correct one and has the correct offset
if os.name == "nt":
assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + 2
else:
assert data[0]["offset"] == len("make sure registry is written\n" + "2\n")