From 4576f4789ad527aad9b0b75121832a70d60f3911 Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 27 Jun 2016 10:29:58 +0200 Subject: [PATCH] Add close_removed and close_renamed, deprecate force_close_files force_close_files is replaced by the two option close_removed and close_renamed. Force_close_files is deprecated. In case it is enabled, it sets close_removed and close_renamed to true. This is part of https://github.com/elastic/beats/issues/1600 --- CHANGELOG.asciidoc | 5 + filebeat/filebeat.full.yml | 20 ++-- filebeat/harvester/config.go | 14 ++- filebeat/harvester/config_test.go | 22 ++++ filebeat/harvester/log.go | 14 ++- filebeat/harvester/reader/log.go | 28 +++-- filebeat/tests/system/config/filebeat.yml.j2 | 3 +- filebeat/tests/system/test_crawler.py | 62 +---------- filebeat/tests/system/test_harvester.py | 106 +++++++++++++++++++ 9 files changed, 190 insertions(+), 84 deletions(-) create mode 100644 filebeat/harvester/config_test.go create mode 100644 filebeat/tests/system/test_harvester.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e4b0857965b..2464a72e9a4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -22,6 +22,9 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d *Filebeat* +- Stop following symlink. Symlinks are now ignored: {pull}1686[1686] +- Deprecate force_close_files option and replace it with close_removed and close_renamed {issue}1600[1600] + *Winlogbeat* @@ -50,6 +53,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d *Topbeat* *Filebeat* +- Introdce close_removed and close_renamed harvester options {issue}1600[1600] + *Winlogbeat* diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 4cbcdb1a253..3c605dcc446 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -166,22 +166,22 @@ filebeat.prospectors: # The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached #backoff_factor: 2 - # This option closes a file, as soon as the file name changes. - # This config option is recommended on windows only. Filebeat keeps the files it's reading open. This can cause - # issues when the file is removed, as the file will not be fully removed until also Filebeat closes - # the reading. Filebeat closes the file handler after ignore_older. During this time no new file with the - # same name can be created. Turning this feature on the other hand can lead to loss of data - # on rotate files. It can happen that after file rotation the beginning of the new - # file is skipped, as the reading starts at the end. We recommend to leave this option on false - # but lower the ignore_older value to release files faster. - #force_close_files: false + # Close renamed means a file handler is close when a file is renamed / rotated. In case the harvester was + # not finished reading the roated file, the file will be picked up again after scan_frequency in case it + # also matches the prospector patterns. + #close_renamed: false + + # When enabling this option, a file handler is closed immidiately in case a file can't be found + # any more. In case the file shows up again later, harvesting will continue at the last position + # after scan_frequency + #close_removed: false #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input #- input_type: stdin #========================= Filebeat global options ============================ - + # Event count spool threshold - forces network flush if exceeded #filebeat.spool_size: 2048 diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index 6495e1da515..f4fbf74681b 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -10,6 +10,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/dustin/go-humanize" + "github.com/elastic/beats/libbeat/logp" ) var ( @@ -22,8 +23,10 @@ var ( BackoffFactor: 2, MaxBackoff: 10 * time.Second, CloseOlder: 1 * time.Hour, - ForceCloseFiles: false, MaxBytes: 10 * (1 << 20), // 10MB + CloseRemoved: false, + CloseRenamed: false, + ForceCloseFiles: false, } ) @@ -38,6 +41,8 @@ type harvesterConfig struct { BackoffFactor int `config:"backoff_factor" validate:"min=1"` MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` CloseOlder time.Duration `config:"close_older"` + CloseRemoved bool `config:"close_removed"` + CloseRenamed bool `config:"close_renamed"` ForceCloseFiles bool `config:"force_close_files"` ExcludeLines []*regexp.Regexp `config:"exclude_lines"` IncludeLines []*regexp.Regexp `config:"include_lines"` @@ -48,6 +53,13 @@ type harvesterConfig struct { func (config *harvesterConfig) Validate() error { + // TODO: remove in 7.0 + if config.ForceCloseFiles { + config.CloseRemoved = true + config.CloseRenamed = true + logp.Warn("DEPRECATED: force_close_files was set to true. Use close_removed + close_rename") + } + // Check input type if _, ok := cfg.ValidInputType[config.InputType]; !ok { return fmt.Errorf("Invalid input type: %v", config.InputType) diff --git a/filebeat/harvester/config_test.go b/filebeat/harvester/config_test.go new file mode 100644 index 00000000000..b64e14b8e30 --- /dev/null +++ b/filebeat/harvester/config_test.go @@ -0,0 +1,22 @@ +package harvester + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestForceCloseFiles(t *testing.T) { + + config := defaultConfig + assert.False(t, config.ForceCloseFiles) + assert.False(t, config.CloseRemoved) + assert.False(t, config.CloseRenamed) + + config.ForceCloseFiles = true + config.Validate() + + assert.True(t, config.ForceCloseFiles) + assert.True(t, config.CloseRemoved) + assert.True(t, config.CloseRenamed) +} diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 92b364a11b8..ed4307c8205 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -37,7 +37,8 @@ func (h *Harvester) Harvest() { // don't require 'complicated' logic. cfg := h.Config readerConfig := reader.LogFileReaderConfig{ - ForceClose: cfg.ForceCloseFiles, + CloseRemoved: cfg.CloseRemoved, + CloseRenamed: cfg.CloseRenamed, CloseOlder: cfg.CloseOlder, BackoffDuration: cfg.Backoff, MaxBackoffDuration: cfg.MaxBackoff, @@ -68,12 +69,23 @@ func (h *Harvester) Harvest() { // Partial lines return error and are only read on completion ts, text, bytesRead, jsonFields, err := readLine(processor) if err != nil { + if err == reader.ErrFileTruncate { logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) h.SetOffset(0) return } + if err == reader.ErrRemoved { + logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path) + return + } + + if err == reader.ErrRenamed { + logp.Info("File was renamed: %s Closing because close_renamed is enabled.", h.Path) + return + } + logp.Info("Read line error: %s", err) return } diff --git a/filebeat/harvester/reader/log.go b/filebeat/harvester/reader/log.go index fb1f5a65150..8833a8d1b79 100644 --- a/filebeat/harvester/reader/log.go +++ b/filebeat/harvester/reader/log.go @@ -13,7 +13,8 @@ import ( var ( ErrFileTruncate = errors.New("detected file being truncated") - ErrForceClose = errors.New("file must be closed") + ErrRenamed = errors.New("file was renamed") + ErrRemoved = errors.New("file was removed") ErrInactive = errors.New("file inactive") ) @@ -27,11 +28,12 @@ type logFileReader struct { } type LogFileReaderConfig struct { - ForceClose bool CloseOlder time.Duration BackoffDuration time.Duration MaxBackoffDuration time.Duration BackoffFactor int + CloseRenamed bool + CloseRemoved bool } func NewLogFileReader( @@ -72,8 +74,9 @@ func (r *logFileReader) Read(buf []byte) (int, error) { r.offset += int64(n) r.lastTimeRead = time.Now() } + + // reset backoff if err == nil { - // reset backoff r.backoff = r.config.BackoffDuration return n, nil } @@ -113,16 +116,19 @@ func (r *logFileReader) Read(buf []byte) (int, error) { return n, ErrInactive } - if r.config.ForceClose { - // Check if the file name exists (see #93) + if r.config.CloseRenamed { + if !file.IsSameFile(r.fs.Name(), info) { + return n, ErrRenamed + } + } + + if r.config.CloseRemoved { + // Check if the file name exists. See https://github.com/elastic/filebeat/issues/93 _, statErr := os.Stat(r.fs.Name()) - // Error means file does not exist. If no error, check if same file. If - // not close as rotated. - if statErr != nil || !file.IsSameFile(r.fs.Name(), info) { - logp.Info("Force close file: %s; error: %s", r.fs.Name(), statErr) - // Return directly on windows -> file is closing - return n, ErrForceClose + // Error means file does not exist. + if statErr != nil { + return n, ErrRemoved } } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 9dc76fb36b8..e0cc96ffd6d 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -19,7 +19,8 @@ filebeat.prospectors: backoff: 0.1s backoff_factor: 1 max_backoff: 0.1s - force_close_files: {{force_close_files}} + close_removed: {{close_removed}} + close_renamed: {{close_renamed}} {% if fields %} fields: diff --git a/filebeat/tests/system/test_crawler.py b/filebeat/tests/system/test_crawler.py index 85c68f78c8a..53098a6cf5f 100644 --- a/filebeat/tests/system/test_crawler.py +++ b/filebeat/tests/system/test_crawler.py @@ -272,7 +272,7 @@ def test_file_disappear_appear(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*.log", - force_close_files="true", + close_removed="true", scan_frequency="0.1s" ) os.mkdir(self.working_dir + "/log/") @@ -301,7 +301,7 @@ def test_file_disappear_appear(self): # Wait until error shows up on windows self.wait_until( lambda: self.log_contains( - "Force close file"), + "Closing because close_removed is enabled"), max_timeout=15) # Move file to old file name @@ -332,64 +332,6 @@ def test_file_disappear_appear(self): output = self.read_output() assert len(output) == 5 + 6 - def test_force_close(self): - """ - Checks that a file is closed in case it is rotated - """ - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/test.log", - force_close_files="true", - scan_frequency="0.1s" - ) - os.mkdir(self.working_dir + "/log/") - - testfile1 = self.working_dir + "/log/test.log" - testfile2 = self.working_dir + "/log/test.log.rotated" - file = open(testfile1, 'w') - - iterations1 = 5 - for n in range(0, iterations1): - file.write("rotation file") - file.write("\n") - - file.close() - - filebeat = self.start_beat() - - # Let it read the file - self.wait_until( - lambda: self.output_has(lines=iterations1), max_timeout=10) - - os.rename(testfile1, testfile2) - - file = open(testfile1, 'w', 0) - file.write("Hello World\n") - file.close() - - # Wait until error shows up on windows - self.wait_until( - lambda: self.log_contains( - "Force close file"), - max_timeout=15) - - # Let it read the file - self.wait_until( - lambda: self.output_has(lines=iterations1 + 1), max_timeout=10) - - filebeat.check_kill_and_wait() - - data = self.get_registry() - - # Make sure new file was picked up. As it has the same file name, - # one entry for the new and one for the old should exist - assert len(data) == 2 - - # Make sure output has 11 entries, the new file was started - # from scratch - output = self.read_output() - #assert len(output) == 5 + 6 - def test_new_line_on_existing_file(self): """ Checks that filebeat follows future writes to the same diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py new file mode 100644 index 00000000000..075ae31073f --- /dev/null +++ b/filebeat/tests/system/test_harvester.py @@ -0,0 +1,106 @@ +from filebeat import BaseTest +import os +import socket + +""" +Test Harvesters +""" + + +class Test(BaseTest): + + def test_close_renamed(self): + """ + Checks that a file is closed when its renamed / rotated + """ + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/test.log", + close_renamed="true", + scan_frequency="0.1s" + ) + os.mkdir(self.working_dir + "/log/") + + testfile1 = self.working_dir + "/log/test.log" + testfile2 = self.working_dir + "/log/test.log.rotated" + file = open(testfile1, 'w') + + iterations1 = 5 + for n in range(0, iterations1): + file.write("rotation file") + file.write("\n") + + file.close() + + filebeat = self.start_beat() + + # Let it read the file + self.wait_until( + lambda: self.output_has(lines=iterations1), max_timeout=10) + + os.rename(testfile1, testfile2) + + file = open(testfile1, 'w', 0) + file.write("Hello World\n") + file.close() + + # Wait until error shows up + self.wait_until( + lambda: self.log_contains( + "Closing because close_renamed is enabled"), + max_timeout=15) + + # Let it read the file + self.wait_until( + lambda: self.output_has(lines=iterations1 + 1), max_timeout=10) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + + # Make sure new file was picked up. As it has the same file name, + # one entry for the new and one for the old should exist + assert len(data) == 2 + + + def test_close_removed(self): + """ + Checks that a file is closed if removed + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/test.log", + close_removed="true", + scan_frequency="0.1s" + ) + os.mkdir(self.working_dir + "/log/") + + testfile1 = self.working_dir + "/log/test.log" + file = open(testfile1, 'w') + + iterations1 = 5 + for n in range(0, iterations1): + file.write("rotation file") + file.write("\n") + + file.close() + + filebeat = self.start_beat() + + # Let it read the file + self.wait_until( + lambda: self.output_has(lines=iterations1), max_timeout=10) + + os.remove(testfile1) + + # Wait until error shows up on windows + self.wait_until( + lambda: self.log_contains( + "Closing because close_removed is enabled"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + + # Make sure the state for the file was persisted + assert len(data) == 1