Skip to content

Commit

Permalink
Merge pull request #3489 from fluent/refactor-in-tail
Browse files Browse the repository at this point in the history
in_tail: Simplify TargetInfo related code
  • Loading branch information
ashie authored May 17, 2022
2 parents e55d409 + bd4782d commit c65d250
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 128 deletions.
69 changes: 30 additions & 39 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,11 @@ def expand_paths

def existence_path
hash = {}
@tails.each_key {|target_info|
@tails.each {|path, tw|
if @follow_inodes
hash[target_info.ino] = target_info
hash[tw.ino] = TargetInfo.new(tw.path, tw.ino)
else
hash[target_info.path] = target_info
hash[tw.path] = TargetInfo.new(tw.path, tw.ino)
end
}
hash
Expand Down Expand Up @@ -425,36 +425,31 @@ def setup_watcher(target_info, pe)
end

def construct_watcher(target_info)
path = target_info.path

# The file might be rotated or removed after collecting paths, so check inode again here.
begin
target_info.ino = Fluent::FileWrapper.stat(path).ino
rescue Errno::ENOENT, Errno::EACCES
$log.warn "stat() for #{path} failed. Continuing without tailing it."
return
end

pe = nil
if @pf
pe = @pf[target_info]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0)
rescue Errno::ENOENT, Errno::EACCES
$log.warn "stat() for #{target_info.path} failed. Continuing without tailing it."
end
end
pe.update(target_info.ino, 0) if @read_from_head && pe.read_inode.zero?
end

begin
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
return
end

begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails.delete(target_info)
@tails[target_info] = tw
tw.on_notify
rescue Errno::ENOENT, Errno::EACCES => e
$log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
tw.unwatched = true
detach_watcher(tw, target_info.ino, false)
end
@tails[path] = tw
tw.on_notify
end

def start_watchers(targets_info)
Expand All @@ -469,9 +464,9 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
remove_path_from_group_watcher(target_info.path)

if remove_watcher
tw = @tails.delete(target_info)
tw = @tails.delete(target_info.path)
else
tw = @tails[target_info]
tw = @tails[target_info.path]
end
if tw
tw.unwatched = unwatched
Expand All @@ -485,8 +480,8 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
end

def close_watcher_handles
@tails.keys.each do |target_info|
tw = @tails.delete(target_info)
@tails.keys.each do |path|
tw = @tails.delete(path)
if tw
tw.close
end
Expand All @@ -495,20 +490,20 @@ def close_watcher_handles

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(target_info, pe)
log.info("detected rotation of #{target_info.path}; waiting #{@rotate_wait} seconds")
path = target_info.path

log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

if @pf
pe_inode = pe.read_inode
target_info_from_position_entry = TargetInfo.new(target_info.path, pe_inode)
target_info_from_position_entry = TargetInfo.new(path, pe_inode)
unless pe_inode == @pf[target_info_from_position_entry].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
return
end
end

rotated_target_info = TargetInfo.new(target_info.path, pe.read_inode)
rotated_tw = @tails[rotated_target_info]
new_target_info = target_info.dup
rotated_tw = @tails[path]

if @follow_inodes
new_position_entry = @pf[target_info]
Expand All @@ -517,16 +512,12 @@ def update_watcher(target_info, pe)
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
rotated_tw.unwatched = true
# Make sure to delete old key, it has a different ino while the hash key is same.
@tails.delete(rotated_target_info)
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
@tails[new_target_info].on_notify
@tails[path] = setup_watcher(target_info, new_position_entry)
@tails[path].on_notify
end
else
# Make sure to delete old key, it has a different ino while the hash key is same.
@tails.delete(rotated_target_info)
@tails[new_target_info] = setup_watcher(new_target_info, pe)
@tails[new_target_info].on_notify
@tails[path] = setup_watcher(target_info, pe)
@tails[path].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end
Expand Down
16 changes: 1 addition & 15 deletions lib/fluent/plugin/in_tail/position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,6 @@ def read_inode
end
end

TargetInfo = Struct.new(:path, :ino) do
def ==(other)
return false unless other.is_a?(TargetInfo)
self.path == other.path
end

def hash
self.path.hash
end

def eql?(other)
return false unless other.is_a?(TargetInfo)
self.path == other.path
end
end
TargetInfo = Struct.new(:path, :ino)
end
end
54 changes: 0 additions & 54 deletions test/plugin/in_tail/test_position_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,58 +313,4 @@ def build_files(file)
assert_equal 2, f.read_inode
end
end

sub_test_case "TargetInfo equality rules" do
sub_test_case "== operator" do
def test_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1235)

assert_equal t1, t2
end

def test_not_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)

assert_not_equal t1, t2
end
end

sub_test_case "eql? method" do
def test_eql?
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 5321)

assert do
t1.eql? t2
end
end

def test_not_eql?
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test3", 1234)

assert do
!t1.eql? t2
end
end
end

sub_test_case "hash" do
def test_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 7321)

assert_equal t1.hash, t2.hash
end

def test_not_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)

assert_not_equal t1.hash, t2.hash
end
end
end
end
45 changes: 25 additions & 20 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1777,8 +1777,7 @@ def test_z_refresh_watchers
end

path = 'test/plugin/data/2010/01/20100102-030405.log'
target_info = Fluent::Plugin::TailInput::TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
mock.proxy(plugin).detach_watcher_after_rotate_wait(plugin.instance_variable_get(:@tails)[target_info], target_info.ino)
mock.proxy(plugin).detach_watcher_after_rotate_wait(plugin.instance_variable_get(:@tails)[path], Fluent::FileWrapper.stat(path).ino)

Timecop.freeze(2010, 1, 2, 3, 4, 6) do
path = "test/plugin/data/2010/01/20100102-030406.log"
Expand Down Expand Up @@ -2179,13 +2178,13 @@ def test_should_close_watcher_after_rotate_wait
target_info = create_target_info("#{TMP_DIR}/tail.txt")
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once
d.run(shutdown: false)
assert d.instance.instance_variable_get(:@tails)[target_info]
assert d.instance.instance_variable_get(:@tails)[target_info.path]

Timecop.travel(now + 10) do
d.instance.instance_eval do
sleep 0.1 until @tails[target_info] == nil
sleep 0.1 until @tails[target_info.path] == nil
end
assert_nil d.instance.instance_variable_get(:@tails)[target_info]
assert_nil d.instance.instance_variable_get(:@tails)[target_info.path]
end
d.instance_shutdown
end
Expand Down Expand Up @@ -2216,8 +2215,8 @@ def test_should_create_new_watcher_for_new_file_with_same_name
Timecop.travel(now + 10) do
sleep 3
d.instance.instance_eval do
@tails[path_ino] == nil
@tails[new_path_ino] != nil
@tails[path_ino.path] == nil
@tails[new_path_ino.path] != nil
end
end

Expand Down Expand Up @@ -2276,8 +2275,8 @@ def test_should_replace_target_info
while d.events.size < 1 do
sleep 0.1
end
inodes = d.instance.instance_variable_get(:@tails).keys.collect do |key|
key.ino
inodes = d.instance.instance_variable_get(:@tails).values.collect do |tw|
tw.ino
end
assert_equal([target_info.ino], inodes)

Expand All @@ -2287,8 +2286,8 @@ def test_should_replace_target_info
while d.events.size < 2 do
sleep 0.1
end
inodes = d.instance.instance_variable_get(:@tails).keys.collect do |key|
key.ino
inodes = d.instance.instance_variable_get(:@tails).values.collect do |tw|
tw.ino
end
new_target_info = create_target_info("#{TMP_DIR}/tail.txt")
assert_not_equal(target_info.ino, new_target_info.ino)
Expand Down Expand Up @@ -2458,14 +2457,19 @@ def test_ENOENT_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
cleanup_file(path)
tw
end
file_deleted = false
mock.proxy(d.instance).existence_path do |hash|
unless file_deleted
cleanup_file(path)
file_deleted = true
end
hash
end.twice
assert_nothing_raised do
d.run(shutdown: false) {}
end
assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed with Errno::ENOENT. Drop tail watcher for now.\n") })
assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed. Continuing without tailing it.\n") },
$log.out.logs.join("\n"))
ensure
d.instance_shutdown if d && d.instance
end
Expand All @@ -2483,14 +2487,15 @@ def test_EACCES_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config, false)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(d.instance).existence_path do |hash|
FileUtils.chmod(0000, "#{TMP_DIR}/noaccess")
tw
end
hash
end.twice
assert_nothing_raised do
d.run(shutdown: false) {}
end
assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed with Errno::EACCES. Drop tail watcher for now.\n") })
assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed. Continuing without tailing it.\n") },
$log.out.logs.join("\n"))
end
ensure
d.instance_shutdown if d && d.instance
Expand Down

0 comments on commit c65d250

Please sign in to comment.