Skip to content

Commit

Permalink
Merge pull request #2922 from ganmacs/update-pf-seek
Browse files Browse the repository at this point in the history
Update seek position of position file entry
  • Loading branch information
ganmacs authored Apr 2, 2020
2 parents 1f3e685 + 529af8a commit 55030b8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
29 changes: 23 additions & 6 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class TailInput < Fluent::Plugin::Input
class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze
POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze

def self.load(file, logger:)
pf = new(file, logger: logger)
Expand Down Expand Up @@ -83,8 +82,8 @@ def try_compact
size = nil

@file_mutex.synchronize do
last_modified = @file.mtime
size = @file.size
last_modified = @file.mtime
end

entries = fetch_compacted_entries
Expand All @@ -93,7 +92,13 @@ def try_compact
if last_modified == @file.mtime && size == @file.size
@file.pos = 0
@file.truncate(0)
@file.write(entries.join)
@file.write(entries.values.map(&:to_entry_fmt).join)

entries.each do |path, val|
if (m = @map[path])
m.seek = val.seek
end
end
else
# skip
end
Expand All @@ -104,7 +109,7 @@ def try_compact

def compact
@file_mutex.synchronize do
entries = fetch_compacted_entries
entries = fetch_compacted_entries.values.map(&:to_entry_fmt)

@file.pos = 0
@file.truncate(0)
Expand All @@ -116,6 +121,7 @@ def fetch_compacted_entries
entries = {}

@file.pos = 0
file_pos = 0
@file.each_line do |line|
m = POSITION_FILE_ENTRY_REGEX.match(line)
if m.nil?
Expand All @@ -133,11 +139,20 @@ def fetch_compacted_entries
@logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger
end

entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino])
entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1)
file_pos += line.size
end
end

entries.values
entries
end
end

Entry = Struct.new(:path, :pos, :ino, :seek) do
POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze

def to_entry_fmt
POSITION_FILE_ENTRY_FORMAT % [path, pos, ino]
end
end

Expand All @@ -158,6 +173,8 @@ def initialize(file, file_mutex, seek, pos, inode)
@inode = inode
end

attr_writer :seek

def update(ino, pos)
@file_mutex.synchronize {
@file.pos = @seek
Expand Down
24 changes: 24 additions & 0 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ def write_data(f, content)
lines = @file.readlines
assert_equal 5, lines.size
end

test 'update seek postion of remained position entry' do
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log)
pf['path1']
pf['path2']
pf['path3']
pf.unwatch('path1')

pf.try_compact

@file.seek(0)
lines = @file.readlines
assert_equal "path2\t0000000000000000\t0000000000000000\n", lines[0]
assert_equal "path3\t0000000000000000\t0000000000000000\n", lines[1]
assert_equal 2, lines.size

pf.unwatch('path2')
pf.unwatch('path3')
@file.seek(0)
lines = @file.readlines
assert_equal "path2\t#{UNWATCHED_STR}\t0000000000000000\n", lines[0]
assert_equal "path3\t#{UNWATCHED_STR}\t0000000000000000\n", lines[1]
assert_equal 2, lines.size
end
end

sub_test_case '#load' do
Expand Down

0 comments on commit 55030b8

Please sign in to comment.