diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 7ef6377aab..486dc169f6 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -52,6 +52,7 @@ def initialize super @paths = [] @tails = {} + @tails_rotate_wait = {} @pf_file = nil @pf = nil @ignore_list = [] @@ -267,6 +268,9 @@ def shutdown @shutdown_start_time = Fluent::Clock.now # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) + @tails_rotate_wait.each do |tw, data| + detach_watcher(tw, data[:ino]) + end @pf_file.close if @pf_file super @@ -582,7 +586,11 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif throttling_is_enabled?(tw) + end + + return if @tails_rotate_wait[tw] + + if throttling_is_enabled?(tw) # When the throttling feature is enabled, it might not reach EOF yet. # Should ensure to read all contents before closing it, with keeping throttling. start_time_to_wait = Fluent::Clock.now @@ -591,13 +599,17 @@ def detach_watcher_after_rotate_wait(tw, ino) if tw.eof? && elapsed >= @rotate_wait timer.detach detach_watcher(tw, ino) + @tails_rotate_wait.delete(tw) end end + @tails_rotate_wait[tw] = { ino: ino, timer: timer } else # when the throttling feature isn't enabled, just wait @rotate_wait - timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + timer = timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) + @tails_rotate_wait.delete(tw) end + @tails_rotate_wait[tw] = { ino: ino, timer: timer } end end