From ab81c166a22a011760bce3f4729e4a45055f432b Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Tue, 31 Oct 2023 10:08:53 +0900 Subject: [PATCH 1/6] in_tail: Manage tail watchers under `rorate_wait` state After a tail watcher transitions to `rotate_wait` state, the `rotate_wait` timer is no longer managed by in_tail, it might cause unexpected behaviour. e.g.) * It's never unwatched when shutdown occurs before `rotate_wait` passed. * Needless `rotate_wait` timers are executed when it detects more rotations. This patch fixes such unexpected behaviour. Signed-off-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 16 ++++++++++++++-- test/plugin/test_in_tail.rb | 3 --- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 44761b4061..e1d9b0b8ee 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -52,6 +52,7 @@ def initialize super @paths = [] @tails = {} + @tails_rotate_wait = {} @pf_file = nil @pf = nil @ignore_list = [] @@ -267,6 +268,9 @@ def shutdown @shutdown_start_time = Fluent::Clock.now # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) + @tails_rotate_wait.keys.each do |tw| + detach_watcher(tw, @tails_rotate_wait[tw][:ino]) + end @pf_file.close if @pf_file super @@ -590,6 +594,8 @@ def detach_watcher(tw, ino, close_io = true) target_info = TargetInfo.new(tw.path, ino) @pf.unwatch(target_info) end + + @tails_rotate_wait.delete(tw) end def throttling_is_enabled?(tw) @@ -604,7 +610,11 @@ def detach_watcher_after_rotate_wait(tw, ino) if @open_on_every_update # Detach now because it's already closed, waiting it doesn't make sense. detach_watcher(tw, ino) - elsif throttling_is_enabled?(tw) + end + + return if @tails_rotate_wait[tw] + + if throttling_is_enabled?(tw) # When the throttling feature is enabled, it might not reach EOF yet. # Should ensure to read all contents before closing it, with keeping throttling. start_time_to_wait = Fluent::Clock.now @@ -615,11 +625,13 @@ def detach_watcher_after_rotate_wait(tw, ino) detach_watcher(tw, ino) end end + @tails_rotate_wait[tw] = { ino: ino, timer: timer } else # when the throttling feature isn't enabled, just wait @rotate_wait - timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + timer = timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw, ino) end + @tails_rotate_wait[tw] = { ino: ino, timer: timer } end end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 8919866683..87d5015045 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3084,9 +3084,6 @@ def test_refreshTW_during_rotation sleep 3 Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} - - # Wait `rotate_wait` for file2 to make sure to close all IO handlers - sleep 3 end inode_0 = tail_watchers[0]&.ino From 441b38b3c31605074f7ac8173051939f299357c2 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 25 Jan 2024 21:44:55 +0900 Subject: [PATCH 2/6] in_tail: Add test for fast rotation Signed-off-by: Daijiro Fukuda --- test/plugin/test_in_tail.rb | 166 ++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 87d5015045..b57df21e40 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3016,6 +3016,92 @@ def test_path_resurrection }, ) end + + def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + "rotate_wait" => "3s", + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls. + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 6, timeout: 15) do + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} + + sleep 1 + + FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} + + sleep 1 + + # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher) + [1, 0].each do |i| + FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}") + end + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} + + # Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate. + # (Note: Currently, there is no harm in duplicate calls) + sleep 4 + end + + inode_0 = tail_watchers[0]&.ino + inode_1 = tail_watchers[1]&.ino + inode_2 = tail_watchers[2]&.ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"], + tail_watcher_inodes: [inode_0, inode_1, inode_2], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_0], + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_1], + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end end sub_test_case "Update watchers for rotation without follow_inodes" do @@ -3118,5 +3204,85 @@ def test_refreshTW_during_rotation }, ) end + + def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt0", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "rotate_wait" => "3s", + "refresh_interval" => "1h", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls. + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 6, timeout: 15) do + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} + + sleep 1 + + FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} + + sleep 1 + + # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher) + [1, 0].each do |i| + FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}") + end + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} + + # Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate. + # (Note: Currently, there is no harm in duplicate calls) + sleep 4 + end + + inode_0 = tail_watchers[0]&.ino + inode_1 = tail_watchers[1]&.ino + inode_2 = tail_watchers[2]&.ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"], + tail_watcher_inodes: [inode_0, inode_1, inode_2], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end end end From 341d8b89eb7314ac72ad663907a8cc05904c21ea Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 31 Jan 2024 11:49:05 +0900 Subject: [PATCH 3/6] in_tail: Make deletion be done on the timer callback side Logic inside `detach_watcher()` should be independent of `rotate_wait`. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index e1d9b0b8ee..e4866e157b 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -594,8 +594,6 @@ def detach_watcher(tw, ino, close_io = true) target_info = TargetInfo.new(tw.path, ino) @pf.unwatch(target_info) end - - @tails_rotate_wait.delete(tw) end def throttling_is_enabled?(tw) @@ -622,6 +620,7 @@ def detach_watcher_after_rotate_wait(tw, ino) elapsed = Fluent::Clock.now - start_time_to_wait if tw.eof? && elapsed >= @rotate_wait timer.detach + @tails_rotate_wait.delete(tw) detach_watcher(tw, ino) end end @@ -629,6 +628,7 @@ def detach_watcher_after_rotate_wait(tw, ino) else # when the throttling feature isn't enabled, just wait @rotate_wait timer = timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + @tails_rotate_wait.delete(tw) detach_watcher(tw, ino) end @tails_rotate_wait[tw] = { ino: ino, timer: timer } From b8315c7a232f3df60ffab2150fd890e5fe44521c Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Fri, 8 Mar 2024 14:58:32 +0900 Subject: [PATCH 4/6] in_tail: Make closing io handler be done in closing phase Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index e4866e157b..f6f12441b7 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -269,7 +269,7 @@ def shutdown # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(existence_path, immediate: true, remove_watcher: false) @tails_rotate_wait.keys.each do |tw| - detach_watcher(tw, @tails_rotate_wait[tw][:ino]) + detach_watcher(tw, @tails_rotate_wait[tw][:ino], false) end @pf_file.close if @pf_file @@ -279,6 +279,7 @@ def shutdown def close super # close file handles after all threads stopped (in #close of thread plugin helper) + # It may be because we need to wait IOHanlder.ready_to_shutdown() close_watcher_handles end @@ -520,6 +521,9 @@ def close_watcher_handles tw.close end end + @tails_rotate_wait.keys.each do |tw| + tw.close + end end # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety. From 13dd299937f086fb82ac3a412867dabce03eab7d Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 31 Jan 2024 12:33:37 +0900 Subject: [PATCH 5/6] in_tail: Remove old comments The comments was added in 76f246ae6a5a543c2b302b1a1f61a4223be177eb. At that time, closing was done by event-loop. Now, the situation is completely different. Currently, there are comments about the `close_io` option in `shutdown()` and `close()`. So, it would be enough to remove these comments. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index f6f12441b7..7e0289f65b 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -578,10 +578,6 @@ def update_watcher(tail_watcher, pe, new_inode) detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode) end - # TailWatcher#close is called by another thread at shutdown phase. - # It causes 'can't modify string; temporarily locked' error in IOHandler - # so adding close_io argument to avoid this problem. - # At shutdown, IOHandler's io will be released automatically after detached the event loop def detach_watcher(tw, ino, close_io = true) if @follow_inodes && tw.ino != ino log.warn("detach_watcher could be detaching an unexpected tail_watcher with a different ino.", From d28f32e9716196c62cd48b0a113bc38b3abb9b7a Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 7 Mar 2024 18:42:00 +0900 Subject: [PATCH 6/6] in_tail: Make the new tests stable `sleep 1s` is too short for `enable_stat_watcher false`. Signed-off-by: Daijiro Fukuda --- test/plugin/test_in_tail.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index b57df21e40..58006c0b98 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3049,13 +3049,13 @@ def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait d.run(expect_records: 6, timeout: 15) do Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} - sleep 1 + sleep 1.5 # Need to be larger than 1s (the interval of watch_timer) FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} - sleep 1 + sleep 1.5 # Need to be larger than 1s (the interval of watch_timer) # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher) [1, 0].each do |i| @@ -3233,13 +3233,13 @@ def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait d.run(expect_records: 6, timeout: 15) do Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} - sleep 1 + sleep 1.5 # Need to be larger than 1s (the interval of watch_timer) FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} - sleep 1 + sleep 1.5 # Need to be larger than 1s (the interval of watch_timer) # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher) [1, 0].each do |i|