From 781e31319649e0a229dedd7abca8bc8d20a76034 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Fri, 14 Jul 2023 15:56:21 +0900 Subject: [PATCH] in_tail: Ensure to discard TailWatcher with missing target when follow_inodes For example, when a rotation process is slow, there is a small time lag between moving and adding files. There is a possibility that StatWatcher notifies too quickly to the TailWatcher before the new file is moved to that target path. From the TailWatcher, it appears as if the file is resurrected once it disappeared. In this case, `refresh_watcher` can't recognize it, so TailWatcher needs to discard self correctly. In the previous implementation, it was not done. So it caused the handle leak and log duplication. (Please check the added test-case) Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/in_tail.rb | 30 +++++++-- test/plugin/test_in_tail.rb | 117 +++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a58e6b39d9..54fd900039 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -498,6 +498,24 @@ def close_watcher_handles # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety. def update_watcher(tail_watcher, pe, new_inode) + # TODO we should use another callback for this. + # To supress impact to existing logics, limit the case to `@follow_inodes`. + # We may not need `@follow_inodes` condition. + if @follow_inodes && new_inode.nil? + # nil inode means the file disappeared, so we only need to stop it. + @tails.delete(tail_watcher.path) + # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 + # Because of this problem, log duplication can occur during `rotate_wait`. + # Need to set `rotate_wait 0` for a workaround. + # Duplication will occur if `refresh_watcher` is called during the `rotate_wait`. + # In that case, `refresh_watcher` will add the new TailWatcher to tail the same target, + # and it causes the log duplication. + # (Other `detach_watcher_after_rotate_wait` may have the same problem. + # We need the mechanism not to add duplicated TailWathcer with detaching TailWatcher.) + detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode) + return + end + path = tail_watcher.path log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds") @@ -890,10 +908,14 @@ def on_rotate(stat) if watcher_needs_update if @follow_inodes - # No need to update a watcher if stat is nil (file not present), because moving to inodes will create - # new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method - # don't want to swap state because we need latest read offset in pos file even after rotate_wait - @update_watcher.call(self, @pe, stat.ino) if stat + # If stat is nil (file not present), NEED to stop and discard this watcher. + # When the file is disappeared but is resurrected soon, then `#refresh_watcher` + # can't recognize this TailWatcher needs to be stopped. + # This can happens when the file is rotated. + # If a notify comes before the new file for the path is created during rotation, + # then it appears as if the file was resurrected once it disappeared. + # Don't want to swap state because we need latest read offset in pos file even after rotate_wait + @update_watcher.call(self, @pe, stat&.ino) else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 9d2cfeca39..1546bf9d4d 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2899,5 +2899,122 @@ def test_updateTW_after_refreshTW }, ) end + + def test_path_resurrection + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 + # Because of this problem, log duplication can occur during `rotate_wait`. + # Need to set `rotate_wait 0` for a workaround. + "rotate_wait" => "0s", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 5, timeout: 10) do + # Rotate + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + # TailWatcher(path: "tail.txt", inode: inode_0) detects `tail.txt` disappeared. + # Call `update_watcher` to stop and discard self. + # If not discarding, then it will be a orphan and cause leak and log duplication. + # + # This reproduces the case where the notify to TailWatcher comes before the new file for the path + # is created during rotation. + # (stat_watcher notifies faster than a new file is created) + # Overall, this is a rotation operation, but from the TailWatcher, it appears as if the file + # was resurrected once it disappeared. + sleep 2 # On Windows and macOS, StatWatcher doesn't work, so need enough interval for TimeTrigger. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # Add new TailWatchers + # tail.txt: TailWatcher(path: "tail.txt", inode: inode_1) + # tail.txt: TailWatcher(path: "tail.txt1", inode: inode_0) + # NOTE: If not discarding the first TailWatcher on notify, this makes it a orphan because + # this overwrites the `@tails[tail.txt]` by adding TailWatcher(path: "tail.txt", inode: inode_1) + d.instance.refresh_watchers + + # This does nothing. + # NOTE: If not discarding the first TailWatcher on notify, this add + # tail.txt1: TailWatcher(path: "tail.txt1", inode: inode_0) + # because the previous refresh_watcher overwrites `@tails[tail.txt]` and the inode_0 is lost. + # This would cause log duplication. + d.instance.refresh_watchers + + # Append to the old file + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt1", "ab") {|f| f.puts "file1 log3"} + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = Fluent::FileWrapper.stat("#{@tmp_dir}/tail.txt1").ino + inode_1 = Fluent::FileWrapper.stat("#{@tmp_dir}/tail.txt").ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file1 log3", "file2 log1", "file2 log2"], + tail_watcher_set: Set[ + { + path: "#{@tmp_dir}/tail.txt", + inode: inode_0, + io_handler_opened_status: false, + }, + { + path: "#{@tmp_dir}/tail.txt", + inode: inode_1, + io_handler_opened_status: false, + }, + { + path: "#{@tmp_dir}/tail.txt1", + inode: inode_0, + io_handler_opened_status: false, + }, + ], + position_entries: [ + ["#{@tmp_dir}/tail.txt", "0000000000000021", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ], + }, + { + record_values: record_values, + tail_watcher_set: Set.new(tail_watchers.collect { |tw| + { + path: tw.path, + inode: tw.ino, + io_handler_opened_status: tw.instance_variable_get(:@io_handler)&.opened? || false, + } + }), + position_entries: position_entries, + }, + ) + end end end