From 8ec554569aaf9ee069ab48eeee8b9f0e7dcad545 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 21 Jun 2023 03:48:13 +0900 Subject: [PATCH] in_tail: Ensure to detach correct watcher on rotation with follow_inodes If `refresh_watchers` run before `update_watcher`, the old implementation of `update_watcher` detach wrongly the new TailWatcher which is added in `refresh_watcher`. This causes the problem of stopping tailing log and handle leak. The test case `test_updateTW_after_refreshTW` reproduces this problem. This fix solves it. There are another BUG about unwatching. I adjusted some expected values of the tests for this BUG. When `refresh_watcher` find the rotated old file AFTER unwatching it (`rotate_wait`), then the logs will be collected in duplicate. If `refresh_watcher` find it BEFORE unwatching it (`rotate_wait`), this problem doesn't occur because the position entry is still alive. We need to fix this and fix the adjusted expected values. This fix is based on the content and discussion of the following issue and PR: * #3614 * #4185 * #4191 Signed-off-by: Daijiro Fukuda Co-authored-by: Katuya Kawakami Co-authored-by: Masaki Hatada Co-authored-by: Gary Zhu Co-authored-by: Takuro Ashie --- lib/fluent/plugin/in_tail.rb | 31 ++-- test/plugin/test_in_tail.rb | 274 +++++++++++++++++++++++++++++++++++ 2 files changed, 289 insertions(+), 16 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index fb29dd249f..ab76bb2faf 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -484,8 +484,8 @@ def close_watcher_handles end # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety. - def update_watcher(target_info, pe) - path = target_info.path + def update_watcher(watcher, pe, new_inode) + path = watcher.path log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds") @@ -499,23 +499,26 @@ def update_watcher(target_info, pe) end end - rotated_tw = @tails[path] + new_target_info = TargetInfo.new(path, new_inode) if @follow_inodes - new_position_entry = @pf[target_info] + # When follow_inodes is true, it's not cleaned up by refresh_watcher. + # So it should be unwatched here explicitly. + watcher.unwatched = true + new_position_entry = @pf[new_target_info] + # If `refresh_watcher` find the new file before, this will not be zero. + # In this case, only we have to do is detaching the current watcher. if new_position_entry.read_inode == 0 - # When follow_inodes is true, it's not cleaned up by refresh_watcher. - # So it should be unwatched here explicitly. - rotated_tw.unwatched = true if rotated_tw - @tails[path] = setup_watcher(target_info, new_position_entry) + @tails[path] = setup_watcher(new_target_info, new_position_entry) @tails[path].on_notify end else - @tails[path] = setup_watcher(target_info, pe) + @tails[path] = setup_watcher(new_target_info, pe) @tails[path].on_notify end - detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw + + detach_watcher_after_rotate_wait(watcher, pe.read_inode) end # TailWatcher#close is called by another thread at shutdown phase. @@ -877,18 +880,14 @@ def on_rotate(stat) # No need to update a watcher if stat is nil (file not present), because moving to inodes will create # new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method # don't want to swap state because we need latest read offset in pos file even after rotate_wait - if stat - target_info = TargetInfo.new(@path, stat.ino) - @update_watcher.call(target_info, @pe) - end + @update_watcher.call(self, @pe, stat.ino) if stat else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers` # and `#stop_watchers()` for the path because `target_paths_hash` # always contains the path. - target_info = TargetInfo.new(@path, stat ? stat.ino : nil) - @update_watcher.call(target_info, swap_state(@pe)) + @update_watcher.call(self, swap_state(@pe), stat&.ino) end else @log.info "detected rotation of #{@path}" diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index b1e28fc986..1d3c5ac760 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2638,4 +2638,278 @@ def test_lines_collected_with_no_throttling(data) end end end + + # https://github.com/fluent/fluentd/issues/3614 + sub_test_case "Update watchers for rotation with follow_inodes" do + def test_updateTW_before_refreshTW_and_detach_before_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher quickly. + "rotate_wait" => "1s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + sleep 3 + + # This reproduces the following situation: + # Rotation => update_watcher => refresh_watchers + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + # TODO: This is BUG!! We need to fix it and replace this with the next. + record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"], + # record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + # TODO: This is BUG!! We need to fix it and replace this with the next. + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0], + ], + # position_entries: [ + # ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + # ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + + def test_updateTW_before_refreshTW_and_detach_after_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher after refresh_watchers. + "rotate_wait" => "4s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + sleep 2 + + # This reproduces the following situation: + # Rotation => update_watcher => refresh_watchers + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # The old TailWathcer is detached here since `rotate_wait` is `4s`. + sleep 3 + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + + def test_updateTW_after_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher quickly. + "rotate_wait" => "1s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # This reproduces the following situation: + # Rotation => refresh_watchers => update_watcher + # This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1) + # This overwrites `@tails["tail.txt"]` + d.instance.refresh_watchers + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + sleep 3 + + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + # TODO: This is BUG!! We need to fix it and replace this with the next. + record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"], + # record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + # TODO: This is BUG!! We need to fix it and replace this with the next. + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0], + ], + # position_entries: [ + # ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + # ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + # ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + end end