Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor path and inode arguments related code #3196

Merged
merged 9 commits into from
Dec 14, 2020
70 changes: 35 additions & 35 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ def expand_paths

def existence_path
hash = {}
@tails.each_key {|path_ino|
@tails.each_key {|target_info|
if @follow_inodes
hash[path_ino.ino] = path_ino
hash[target_info.ino] = target_info
else
hash[path_ino.path] = path_ino
hash[target_info.path] = target_info
end
}
hash
Expand All @@ -353,9 +353,9 @@ def refresh_watchers
start_watchers(added_hash) unless added_hash.empty?
end

def setup_watcher(path_with_inode, pe)
def setup_watcher(target_info, pe)
line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path_with_inode, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))
tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler))

if @enable_watch_timer
tt = TimerTrigger.new(1, log) { tw.on_notify }
Expand Down Expand Up @@ -386,78 +386,77 @@ def setup_watcher(path_with_inode, pe)
raise e
end

def start_watchers(paths_with_inodes)
paths_with_inodes.each_value { |path_with_inode|
path = path_with_inode.path
ino = path_with_inode.ino
def start_watchers(targets_info)
targets_info.each_value { |target_info|
pe = nil
if @pf
pe = @pf[path, ino]
pe = @pf[target_info]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{path} not found. Continuing without tailing it."
$log.warn "#{target_info.path} not found. Continuing without tailing it."
end
end
end

begin
tw = setup_watcher(path_with_inode, pe)
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
next
end
target_info = TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails[target_info] = tw
}
end

def stop_watchers(paths_with_inodes, immediate: false, unwatched: false, remove_watcher: true)
paths_with_inodes.each_value { |path_with_inode|
def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true)
targets_info.each_value { |target_info|
if remove_watcher
tw = @tails.delete(path_with_inode)
tw = @tails.delete(target_info)
else
tw = @tails[path_with_inode]
tw = @tails[target_info]
end
if tw
tw.unwatched = unwatched
if immediate
detach_watcher(tw, path_with_inode.ino, false)
detach_watcher(tw, target_info.ino, false)
else
detach_watcher_after_rotate_wait(tw, path_with_inode.ino)
detach_watcher_after_rotate_wait(tw, target_info.ino)
end
end
}
end

def close_watcher_handles
@tails.keys.each do |path_with_inode|
tw = @tails.delete(path_with_inode)
@tails.keys.each do |target_info|
tw = @tails.delete(target_info)
if tw
tw.close
end
end
end

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(path_with_inode, pe)
log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")
def update_watcher(target_info, pe)
log.info("detected rotation of #{target_info.path}; waiting #{@rotate_wait} seconds")

if @pf
unless pe.read_inode == @pf[path_with_inode.path, pe.read_inode].read_inode
pe_inode = pe.read_inode
target_info_from_position_entry = TargetInfo.new(target_info.path, pe_inode)
unless pe_inode == @pf[target_info_from_position_entry].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
return
end
end

target_info = TargetInfo.new(path_with_inode.path, pe.read_inode)
cosmo0920 marked this conversation as resolved.
Show resolved Hide resolved
rotated_tw = @tails[target_info]

new_target_info = TargetInfo.new(path_with_inode.path, path_with_inode.ino)
rotated_target_info = TargetInfo.new(target_info.path, pe.read_inode)
rotated_tw = @tails[rotated_target_info]
new_target_info = target_info.dup

if @follow_inodes
new_position_entry = @pf[path_with_inode.path, path_with_inode.ino]
new_position_entry = @pf[target_info]

if new_position_entry.read_inode == 0
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
Expand All @@ -481,7 +480,8 @@ def detach_watcher(tw, ino, close_io = true)
tw.close if close_io

if tw.unwatched && @pf
@pf.unwatch(tw.path, ino)
target_info = TargetInfo.new(tw.path, ino)
@pf.unwatch(target_info)
end
end

Expand Down Expand Up @@ -657,9 +657,9 @@ def on_timer
end

class TailWatcher
def initialize(path_with_inode, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = path_with_inode.path
@ino = path_with_inode.ino
def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build)
@path = target_info.path
@ino = target_info.ino
@pe = pe || MemoryPositionEntry.new
@read_from_head = read_from_head
@follow_inodes = follow_inodes
Expand Down
31 changes: 12 additions & 19 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,26 @@ def initialize(file, follow_inodes, existing_paths, logger: nil)
@existing_paths = existing_paths
end

def [](path, inode)
if @follow_inodes && m = @map[inode]
return m
elsif !@follow_inodes && m = @map[path]
def [](target_info)
if m = @map[@follow_inodes ? target_info.ino : target_info.path]
return m
end

@file_mutex.synchronize {
@file.seek(0, IO::SEEK_END)
seek = @file.pos + path.bytesize + 1
@file.write "#{path}\t0000000000000000\t0000000000000000\n"
seek = @file.pos + target_info.path.bytesize + 1
@file.write "#{target_info.path}\t0000000000000000\t0000000000000000\n"
if @follow_inodes
@map[inode] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
@map[target_info.ino] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
else
@map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
@map[target_info.path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
end
}
end

def unwatch(path, inode)
if @follow_inodes
if (entry = @map.delete(inode))
entry.update_pos(UNWATCHED_POSITION)
end
else
if (entry = @map.delete(path))
entry.update_pos(UNWATCHED_POSITION)
end
def unwatch(target_info)
if (entry = @map.delete(@follow_inodes ? target_info.ino : target_info.path))
entry.update_pos(UNWATCHED_POSITION)
end
end

Expand Down Expand Up @@ -112,8 +104,9 @@ def try_compact
@file.truncate(0)
@file.write(entries.values.map(&:to_entry_fmt).join)

entries.each do |path, val|
if (m = @map[path])
# entry contains path/ino key and value.
entries.each do |key, val|
if (m = @map[key])
m.seek = val.seek
end
end
Expand Down
40 changes: 25 additions & 15 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,15 @@ def follow_inodes_block

test 'update seek postion of remained position entry' do
pf = Fluent::Plugin::TailInput::PositionFile.new(@file, false, {}, **{logger: $log})
pf['path1', -1]
pf['path2', -1]
pf['path3', -1]
target_info1 = Fluent::Plugin::TailInput::TargetInfo.new('path1', -1)
target_info2 = Fluent::Plugin::TailInput::TargetInfo.new('path2', -1)
target_info3 = Fluent::Plugin::TailInput::TargetInfo.new('path3', -1)
pf[target_info1]
pf[target_info2]
pf[target_info3]

pf.unwatch('path1', 1234)
target_info1_2 = Fluent::Plugin::TailInput::TargetInfo.new('path1', 1234)
pf.unwatch(target_info1_2)

pf.try_compact

Expand All @@ -101,8 +105,10 @@ def follow_inodes_block
assert_equal "path3\t0000000000000000\t0000000000000000\n", lines[1]
assert_equal 2, lines.size

pf.unwatch('path2', 1235)
pf.unwatch('path3', 1236)
target_info2_2 = Fluent::Plugin::TailInput::TargetInfo.new('path2', 1235)
target_info3_2 = Fluent::Plugin::TailInput::TargetInfo.new('path3', 1236)
pf.unwatch(target_info2_2)
pf.unwatch(target_info3_2)
@file.seek(0)
lines = @file.readlines
assert_equal "path2\t#{UNWATCHED_STR}\t0000000000000000\n", lines[0]
Expand Down Expand Up @@ -141,7 +147,8 @@ def follow_inodes_block
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, **{logger: $log})

f = pf['valid_path', Fluent::FileWrapper.stat(@file).ino]
valid_target_info = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', Fluent::FileWrapper.stat(@file).ino)
f = pf[valid_target_info]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, f.class
assert_equal 2, f.read_pos
assert_equal 1, f.read_inode
Expand All @@ -150,7 +157,8 @@ def follow_inodes_block
lines = @file.readlines
assert_equal 2, lines.size

f = pf['nonexist_path', -1]
nonexistent_target_info = Fluent::Plugin::TailInput::TargetInfo.new('nonexist_path', -1)
f = pf[nonexistent_target_info]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, f.class
assert_equal 0, f.read_pos
assert_equal 0, f.read_inode
Expand All @@ -165,17 +173,17 @@ def follow_inodes_block
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log)

f = pf['nonexist_path', -1]
f = pf[Fluent::Plugin::TailInput::TargetInfo.new('nonexist_path', -1)]
assert_equal 0, f.read_inode
assert_equal 0, f.read_pos

pf['valid_path', Fluent::FileWrapper.stat(@file).ino].update(1, 2)
pf[Fluent::Plugin::TailInput::TargetInfo.new('valid_path', Fluent::FileWrapper.stat(@file).ino)].update(1, 2)

f = pf['nonexist_path', -1]
f = pf[Fluent::Plugin::TailInput::TargetInfo.new('nonexist_path', -1)]
assert_equal 0, f.read_inode
assert_equal 0, f.read_pos

pf['nonexist_path', -1].update(1, 2)
pf[Fluent::Plugin::TailInput::TargetInfo.new('nonexist_path', -1)].update(1, 2)
assert_equal 1, f.read_inode
assert_equal 2, f.read_pos
end
Expand All @@ -186,14 +194,16 @@ def follow_inodes_block
write_data(@file, TEST_CONTENT)
pf = Fluent::Plugin::TailInput::PositionFile.load(@file, false, {}, logger: $log)
inode1 = Fluent::FileWrapper.stat(@file).ino
p1 = pf['valid_path', inode1]
target_info1 = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', inode1)
p1 = pf[target_info1]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p1.class

pf.unwatch('valid_path', inode1)
pf.unwatch(target_info1)
assert_equal p1.read_pos, Fluent::Plugin::TailInput::PositionFile::UNWATCHED_POSITION

inode2 = Fluent::FileWrapper.stat(@file).ino
p2 = pf['valid_path', inode2]
target_info2 = Fluent::Plugin::TailInput::TargetInfo.new('valid_path', inode2)
p2 = pf[target_info2]
assert_equal Fluent::Plugin::TailInput::FilePositionEntry, p2.class

assert_not_equal p1, p2
Expand Down
Loading