Skip to content

Commit

Permalink
Fix registrar for rotating files.
Browse files Browse the repository at this point in the history
Backport elastic#1010
  • Loading branch information
ruflin committed Feb 29, 2016
1 parent a7091d0 commit 4488805
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
*Topbeat*

*Filebeat*
- Fix registrar bug for rotated files {pull}1010[1010]


*Winlogbeat*

Expand All @@ -33,6 +35,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]

*Filebeat*


*Winlogbeat*


Expand Down
8 changes: 3 additions & 5 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,15 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
return
}

// Call crawler if there if there exists a state for the given file
offset, resuming := p.registrar.fetchState(file, newinfo.Fileinfo)

// Check for unmodified time, but only if the file modification time is before the last scan started
// This ensures we don't skip genuine creations with dead times less than 10s
if newinfo.Fileinfo.ModTime().Before(p.lastscan) &&
time.Since(newinfo.Fileinfo.ModTime()) > p.ProspectorConfig.IgnoreOlderDuration {

logp.Debug("prospector", "Fetching old state of file to resume: %s", file)
// Call crawler if there if there exists a state for the given file
offset, resuming := p.registrar.fetchState(file, newinfo.Fileinfo)

// Are we resuming a dead file? We have to resume even if dead so we catch any old updates to the file
// This is safe as the harvester, once it hits the EOF and a timeout, will stop harvesting
Expand All @@ -354,9 +355,6 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
newinfo.Continue(&lastinfo)
} else {

// Call crawler if there if there exists a state for the given file
offset, resuming := p.registrar.fetchState(file, newinfo.Fileinfo)

// Are we resuming a file or is this a completely new file?
if resuming {
logp.Debug("prospector", "Resuming harvester on a previously harvested file: %s", file)
Expand Down
45 changes: 45 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,48 @@ def test_custom_registry_file_location(self):
filebeat.kill_and_wait()

assert os.path.isfile(os.path.join(self.working_dir, "a/b/c/registry"))


def test_rotating_file(self):
"""
Checks that the registry is properly updated after a file is rotated
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*"
)

os.mkdir(self.working_dir + "/log/")
testfile = self.working_dir + "/log/test.log"

filebeat = self.start_filebeat()

with open(testfile, 'w') as f:
f.write("offset 9\n")

self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)


testfilerenamed = self.working_dir + "/log/test.1.log"
os.rename(testfile, testfilerenamed)

with open(testfile, 'w') as f:
f.write("offset 10\n")


self.wait_until(
lambda: self.output_has(lines=2),
max_timeout=10)

filebeat.kill_and_wait()

# Check that file exist
data = self.get_dot_filebeat()

# Make sure the offsets are correctly set
data[os.path.abspath(testfile)]["offset"] = 10
data[os.path.abspath(testfilerenamed)]["offset"] = 9

# Check that 2 files are port of the registrar file
assert len(data) == 2

0 comments on commit 4488805

Please sign in to comment.