Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add close_removed and close_renamed, deprecate force_close_files #1909

Merged
merged 2 commits into from
Jun 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d

*Filebeat*

- Stop following symlink. Symlinks are now ignored: {pull}1686[1686]
- Deprecate force_close_files option and replace it with close_removed and close_renamed {issue}1600[1600]

*Winlogbeat*


Expand Down Expand Up @@ -50,6 +53,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Introdce close_removed and close_renamed harvester options {issue}1600[1600]


*Winlogbeat*

Expand Down
20 changes: 10 additions & 10 deletions filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ filebeat.prospectors:
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
#backoff_factor: 2

# This option closes a file, as soon as the file name changes.
# This config option is recommended on windows only. Filebeat keeps the files it's reading open. This can cause
# issues when the file is removed, as the file will not be fully removed until also Filebeat closes
# the reading. Filebeat closes the file handler after ignore_older. During this time no new file with the
# same name can be created. Turning this feature on the other hand can lead to loss of data
# on rotate files. It can happen that after file rotation the beginning of the new
# file is skipped, as the reading starts at the end. We recommend to leave this option on false
# but lower the ignore_older value to release files faster.
#force_close_files: false
# Close renamed ensures a file handler is closed when the file is renamed or rotated.
# Note: Potential data loss if renamed file is not picked up by prospector.
#close_renamed: false

# When enabling this option, a file handler is closed immidiately in case a file can't be found
# any more. In case the file shows up again later, harvesting will continue at the last known position
# after scan_frequency.
# Note: Potential data loss if file reading was not finished when file was removed.
#close_removed: false

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
#filebeat.spool_size: 2048

Expand Down
20 changes: 10 additions & 10 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ filebeat.prospectors:
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
#backoff_factor: 2

# This option closes a file, as soon as the file name changes.
# This config option is recommended on windows only. Filebeat keeps the files it's reading open. This can cause
# issues when the file is removed, as the file will not be fully removed until also Filebeat closes
# the reading. Filebeat closes the file handler after ignore_older. During this time no new file with the
# same name can be created. Turning this feature on the other hand can lead to loss of data
# on rotate files. It can happen that after file rotation the beginning of the new
# file is skipped, as the reading starts at the end. We recommend to leave this option on false
# but lower the ignore_older value to release files faster.
#force_close_files: false
# Close renamed ensures a file handler is closed when the file is renamed or rotated.
# Note: Potential data loss if renamed file is not picked up by prospector.
#close_renamed: false

# When enabling this option, a file handler is closed immidiately in case a file can't be found
# any more. In case the file shows up again later, harvesting will continue at the last known position
# after scan_frequency.
# Note: Potential data loss if file reading was not finished when file was removed.
#close_removed: false

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
#filebeat.spool_size: 2048

Expand Down
14 changes: 13 additions & 1 deletion filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/elastic/beats/libbeat/common"

"github.com/dustin/go-humanize"
"github.com/elastic/beats/libbeat/logp"
)

var (
Expand All @@ -22,8 +23,10 @@ var (
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
CloseOlder: 1 * time.Hour,
ForceCloseFiles: false,
MaxBytes: 10 * (1 << 20), // 10MB
CloseRemoved: false,
CloseRenamed: false,
ForceCloseFiles: false,
Copy link

@urso urso Jun 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still in alpha, let's remove 'ForceCloseFiles' then. Don't treat config as deprecated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso I want to make the migration to 5.0 as seamless as possible. That is why I would prefer to keep this in.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we want to add a 'deprecated' option to go-ucfg, which will print a warning when option is used.

We can find deprecated options in code by grepping for config:.*deprecated. Maybe even with version like: config:",deprecated(5.0)".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great TBH. Lets do this as soon as support for this is in ucfg (not a priority).

}
)

Expand All @@ -38,6 +41,8 @@ type harvesterConfig struct {
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
CloseOlder time.Duration `config:"close_older"`
CloseRemoved bool `config:"close_removed"`
CloseRenamed bool `config:"close_renamed"`
ForceCloseFiles bool `config:"force_close_files"`
ExcludeLines []*regexp.Regexp `config:"exclude_lines"`
IncludeLines []*regexp.Regexp `config:"include_lines"`
Expand All @@ -48,6 +53,13 @@ type harvesterConfig struct {

func (config *harvesterConfig) Validate() error {

// TODO: remove in 7.0
if config.ForceCloseFiles {
config.CloseRemoved = true
config.CloseRenamed = true
logp.Warn("DEPRECATED: force_close_files was set to true. Use close_removed + close_rename")
}

// Check input type
if _, ok := cfg.ValidInputType[config.InputType]; !ok {
return fmt.Errorf("Invalid input type: %v", config.InputType)
Expand Down
22 changes: 22 additions & 0 deletions filebeat/harvester/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package harvester

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestForceCloseFiles(t *testing.T) {

config := defaultConfig
assert.False(t, config.ForceCloseFiles)
assert.False(t, config.CloseRemoved)
assert.False(t, config.CloseRenamed)

config.ForceCloseFiles = true
config.Validate()

assert.True(t, config.ForceCloseFiles)
assert.True(t, config.CloseRemoved)
assert.True(t, config.CloseRenamed)
}
14 changes: 13 additions & 1 deletion filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func (h *Harvester) Harvest() {
// don't require 'complicated' logic.
cfg := h.Config
readerConfig := reader.LogFileReaderConfig{
ForceClose: cfg.ForceCloseFiles,
CloseRemoved: cfg.CloseRemoved,
CloseRenamed: cfg.CloseRenamed,
CloseOlder: cfg.CloseOlder,
BackoffDuration: cfg.Backoff,
MaxBackoffDuration: cfg.MaxBackoff,
Expand Down Expand Up @@ -68,12 +69,23 @@ func (h *Harvester) Harvest() {
// Partial lines return error and are only read on completion
ts, text, bytesRead, jsonFields, err := readLine(processor)
if err != nil {

if err == reader.ErrFileTruncate {
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
h.SetOffset(0)
return
}

if err == reader.ErrRemoved {
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path)
return
}

if err == reader.ErrRenamed {
logp.Info("File was renamed: %s Closing because close_renamed is enabled.", h.Path)
return
}

logp.Info("Read line error: %s", err)
return
}
Expand Down
29 changes: 18 additions & 11 deletions filebeat/harvester/reader/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

var (
ErrFileTruncate = errors.New("detected file being truncated")
ErrForceClose = errors.New("file must be closed")
ErrRenamed = errors.New("file was renamed")
ErrRemoved = errors.New("file was removed")
ErrInactive = errors.New("file inactive")
)

Expand All @@ -27,11 +28,12 @@ type logFileReader struct {
}

type LogFileReaderConfig struct {
ForceClose bool
CloseOlder time.Duration
BackoffDuration time.Duration
MaxBackoffDuration time.Duration
BackoffFactor int
CloseRenamed bool
CloseRemoved bool
}

func NewLogFileReader(
Expand Down Expand Up @@ -72,8 +74,9 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
r.offset += int64(n)
r.lastTimeRead = time.Now()
}

// reset backoff
if err == nil {
// reset backoff
r.backoff = r.config.BackoffDuration
return n, nil
}
Expand Down Expand Up @@ -113,16 +116,20 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
return n, ErrInactive
}

if r.config.ForceClose {
// Check if the file name exists (see #93)
if r.config.CloseRenamed {
// Check if the file can still be found under the same path
if !file.IsSameFile(r.fs.Name(), info) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep the code comment Check if the file name still exists.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be: check if file has been renamed, but still exists ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added Check if the file has been renamed. It does not check if the file still exists, it only checks if the file with the same name is this file or an other file. In case no file under the same name exists, it just assumes rename (it could also be removed). But that is checked in the next check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, changed it again to: // Check if the file can still be found under the same path

return n, ErrRenamed
}
}

if r.config.CloseRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
_, statErr := os.Stat(r.fs.Name())

// Error means file does not exist. If no error, check if same file. If
// not close as rotated.
if statErr != nil || !file.IsSameFile(r.fs.Name(), info) {
logp.Info("Force close file: %s; error: %s", r.fs.Name(), statErr)
// Return directly on windows -> file is closing
return n, ErrForceClose
// Error means file does not exist.
if statErr != nil {
return n, ErrRemoved
}
}

Expand Down
3 changes: 2 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ filebeat.prospectors:
backoff: 0.1s
backoff_factor: 1
max_backoff: 0.1s
force_close_files: {{force_close_files}}
close_removed: {{close_removed}}
close_renamed: {{close_renamed}}

{% if fields %}
fields:
Expand Down
62 changes: 2 additions & 60 deletions filebeat/tests/system/test_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def test_file_disappear_appear(self):

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*.log",
force_close_files="true",
close_removed="true",
scan_frequency="0.1s"
)
os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -301,7 +301,7 @@ def test_file_disappear_appear(self):
# Wait until error shows up on windows
self.wait_until(
lambda: self.log_contains(
"Force close file"),
"Closing because close_removed is enabled"),
max_timeout=15)

# Move file to old file name
Expand Down Expand Up @@ -332,64 +332,6 @@ def test_file_disappear_appear(self):
output = self.read_output()
assert len(output) == 5 + 6

def test_force_close(self):
"""
Checks that a file is closed in case it is rotated
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
force_close_files="true",
scan_frequency="0.1s"
)
os.mkdir(self.working_dir + "/log/")

testfile1 = self.working_dir + "/log/test.log"
testfile2 = self.working_dir + "/log/test.log.rotated"
file = open(testfile1, 'w')

iterations1 = 5
for n in range(0, iterations1):
file.write("rotation file")
file.write("\n")

file.close()

filebeat = self.start_beat()

# Let it read the file
self.wait_until(
lambda: self.output_has(lines=iterations1), max_timeout=10)

os.rename(testfile1, testfile2)

file = open(testfile1, 'w', 0)
file.write("Hello World\n")
file.close()

# Wait until error shows up on windows
self.wait_until(
lambda: self.log_contains(
"Force close file"),
max_timeout=15)

# Let it read the file
self.wait_until(
lambda: self.output_has(lines=iterations1 + 1), max_timeout=10)

filebeat.check_kill_and_wait()

data = self.get_registry()

# Make sure new file was picked up. As it has the same file name,
# one entry for the new and one for the old should exist
assert len(data) == 2

# Make sure output has 11 entries, the new file was started
# from scratch
output = self.read_output()
#assert len(output) == 5 + 6

def test_new_line_on_existing_file(self):
"""
Checks that filebeat follows future writes to the same
Expand Down
Loading