diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 7ef6377aab..b62cd05450 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -52,6 +52,7 @@ def initialize super @paths = [] @tails = {} + @tails_rotate_wait = {} @pf_file = nil @pf = nil @ignore_list = [] @@ -267,6 +268,9 @@ def shutdown @shutdown_start_time = Fluent::Clock.now # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) + @tails_rotate_wait.keys.each do |tw| + detach_watcher(tw, @tails_rotate_wait[tw][:ino]) + end @pf_file.close if @pf_file super @@ -568,6 +572,8 @@ def detach_watcher(tw, ino, close_io = true) target_info = TargetInfo.new(tw.path, ino) @pf.unwatch(target_info) end + + @tails_rotate_wait.delete(tw) end def throttling_is_enabled?(tw) @@ -582,7 +588,11 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif throttling_is_enabled?(tw) + end + + return if @tails_rotate_wait[tw] + + if throttling_is_enabled?(tw) # When the throttling feature is enabled, it might not reach EOF yet. # Should ensure to read all contents before closing it, with keeping throttling. start_time_to_wait = Fluent::Clock.now @@ -593,11 +603,13 @@ def detach_watcher_after_rotate_wait(tw, ino) detach_watcher(tw, ino) end end + @tails_rotate_wait[tw] = { ino: ino, timer: timer } else # when the throttling feature isn't enabled, just wait @rotate_wait - timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + timer = timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) end + @tails_rotate_wait[tw] = { ino: ino, timer: timer } end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 8919866683..87d5015045 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3084,9 +3084,6 @@ def test_refreshTW_during_rotation sleep 3 Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} - - # Wait `rotate_wait` for file2 to make sure to close all IO handlers - sleep 3 end inode_0 = tail_watchers[0]&.ino