From 45b193fbdb264d8ff41832e56aa7227e010ac17c Mon Sep 17 00:00:00 2001 From: Katuya Kawakami Date: Tue, 11 Jul 2023 18:42:33 +0900 Subject: [PATCH] in_tail: use inode for key of TailWatchers when follow_inodes This may improve the maintainability. Note: Regardless of `follow_inodes`, both path or inode is acceptable for the key of `tails`. It is not wrong that the current logic uses `path` for the key. Perhaps we don't need to use different keys depending on `follow_inodes`. We can always use inode for the key. (Future work) Signed-off-by: Katuya Kawakami Signed-off-by: Masaki Hatada Co-authored-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 48 ++++++++++++++++++++++++++--------- test/plugin/test_in_tail.rb | 49 ++++++++++++++++++++++++++++-------- 2 files changed, 74 insertions(+), 23 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 88556ca6ab..ea1fb7f6be 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -352,13 +352,13 @@ def expand_paths def existence_path hash = {} - @tails.each {|path, tw| + @tails.each_value do |tw| if @follow_inodes hash[tw.ino] = TargetInfo.new(tw.path, tw.ino) else hash[tw.path] = TargetInfo.new(tw.path, tw.ino) end - } + end hash end @@ -443,7 +443,12 @@ def construct_watcher(target_info) return end - @tails[path] = tw + if @follow_inodes + @tails[target_info.ino] = tw + else + @tails[path] = tw + end + tw.on_notify end @@ -459,9 +464,17 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch remove_path_from_group_watcher(target_info.path) if remove_watcher - tw = @tails.delete(target_info.path) + if @follow_inodes + tw = @tails.delete(target_info.ino) + else + tw = @tails.delete(target_info.path) + end else - tw = @tails[target_info.path] + if @follow_inodes + tw = @tails[target_info.ino] + else + tw = @tails[target_info.path] + end end if tw tw.unwatched = unwatched @@ -475,8 +488,8 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch end def close_watcher_handles - @tails.keys.each do |path| - tw = @tails.delete(path) + @tails.keys.each do |key| + tw = @tails.delete(key) if tw tw.close end @@ -502,16 +515,27 @@ def update_watcher(tail_watcher, pe, new_inode) new_target_info = TargetInfo.new(path, new_inode) if @follow_inodes - # When follow_inodes is true, it's not cleaned up by refresh_watcher. - # So it should be unwatched here explicitly. + @tails.delete(tail_watcher.ino) + + # TODO: This can cause log duplication. We need to fix this. + # (This problem exists from the start of implementation of follow_inodes) + # + # The old inode can still exist. + # In that case, we can't unwatch it because it causes log duplication. + # (`refresh_watcher` will find the inode as a new inode and read them again.) + # + # However, if the old inode already has disappeared, it must be unwatched here. + # It is because the old inode is removed from `@tails`, so `refresh_watcher` can't + # recognize that it has disappeared and need to be unwatched. + # So simply removing this line causes leak of unwatch. tail_watcher.unwatched = true new_position_entry = @pf[new_target_info] # If `refresh_watcher` find the new file before, this will not be zero. - # In this case, only we have to do is detaching the current tail_watcher. + # In this case, we don't need to create a new TailWatcher. if new_position_entry.read_inode == 0 - @tails[path] = setup_watcher(new_target_info, new_position_entry) - @tails[path].on_notify + @tails[new_inode] = setup_watcher(new_target_info, new_position_entry) + @tails[new_inode].on_notify end else @tails[path] = setup_watcher(new_target_info, pe) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 81bf0dfa7d..d2e76122de 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2156,13 +2156,13 @@ def test_should_close_watcher_after_rotate_wait target_info = create_target_info("#{@tmp_dir}/tail.txt") mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once d.run(shutdown: false) - assert d.instance.instance_variable_get(:@tails)[target_info.path] + assert d.instance.instance_variable_get(:@tails)[target_info.ino] Timecop.travel(now + 10) do d.instance.instance_eval do - sleep 0.1 until @tails[target_info.path] == nil + sleep 0.1 until @tails[target_info.ino] == nil end - assert_nil d.instance.instance_variable_get(:@tails)[target_info.path] + assert_nil d.instance.instance_variable_get(:@tails)[target_info.ino] end d.instance_shutdown end @@ -2679,12 +2679,20 @@ def test_updateTW_before_refreshTW_and_detach_before_refreshTW # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) - # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + # @tails: + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # } + # The old TailWatcher(path: "tail.txt", inode: inode_0) is detached here since `rotate_wait` is just `1s`. sleep 3 # This reproduces the following situation: # Rotation => update_watcher => refresh_watchers # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # inode_0: TailWatcher(path: "tail.txt1", inode: inode_0), + # } d.instance.refresh_watchers # Append to the new current log file. @@ -2769,15 +2777,23 @@ def test_updateTW_before_refreshTW_and_detach_after_refreshTW Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: - # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # @tails: + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # } sleep 2 # This reproduces the following situation: - # Rotation => update_watcher => refresh_watchers + # Rotation => update_watcher => refresh_watchers # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # inode_0: TailWatcher(path: "tail.txt1", inode: inode_0), + # } d.instance.refresh_watchers - # The old TailWathcer is detached here since `rotate_wait` is `4s`. + # The old TailWatcher(path: "tail.txt", inode: inode_0) is detached here since `rotate_wait` is `4s`. sleep 3 # Append to the new current log file. @@ -2856,17 +2872,28 @@ def test_updateTW_after_refreshTW Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} # This reproduces the following situation: - # Rotation => refresh_watchers => update_watcher + # Rotation => refresh_watchers => update_watcher # This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1) - # This overwrites `@tails["tail.txt"]` + # @tails => { + # inode_0: TailWatcher(path: "tail.txt", inode: inode_0), + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # } d.instance.refresh_watchers - # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` trys to update the TailWatcher: # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) - # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + # However, it is already added in `refresh_watcher`, so this only remove and detach the old TailWatcher. + # It is detached here since `rotate_wait` is just `1s`. + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # } sleep 3 # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + # @tails => { + # inode_1: TailWatcher(path: "tail.txt", inode: inode_1), + # inode_0: TailWatcher(path: "tail.txt1", inode: inode_0), + # } d.instance.refresh_watchers # Append to the new current log file.