diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a58e6b39d9..54fd900039 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -498,6 +498,24 @@ def close_watcher_handles # refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety. def update_watcher(tail_watcher, pe, new_inode) + # TODO we should use another callback for this. + # To supress impact to existing logics, limit the case to `@follow_inodes`. + # We may not need `@follow_inodes` condition. + if @follow_inodes && new_inode.nil? + # nil inode means the file disappeared, so we only need to stop it. + @tails.delete(tail_watcher.path) + # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 + # Because of this problem, log duplication can occur during `rotate_wait`. + # Need to set `rotate_wait 0` for a workaround. + # Duplication will occur if `refresh_watcher` is called during the `rotate_wait`. + # In that case, `refresh_watcher` will add the new TailWatcher to tail the same target, + # and it causes the log duplication. + # (Other `detach_watcher_after_rotate_wait` may have the same problem. + # We need the mechanism not to add duplicated TailWathcer with detaching TailWatcher.) + detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode) + return + end + path = tail_watcher.path log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds") @@ -890,10 +908,14 @@ def on_rotate(stat) if watcher_needs_update if @follow_inodes - # No need to update a watcher if stat is nil (file not present), because moving to inodes will create - # new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method - # don't want to swap state because we need latest read offset in pos file even after rotate_wait - @update_watcher.call(self, @pe, stat.ino) if stat + # If stat is nil (file not present), NEED to stop and discard this watcher. + # When the file is disappeared but is resurrected soon, then `#refresh_watcher` + # can't recognize this TailWatcher needs to be stopped. + # This can happens when the file is rotated. + # If a notify comes before the new file for the path is created during rotation, + # then it appears as if the file was resurrected once it disappeared. + # Don't want to swap state because we need latest read offset in pos file even after rotate_wait + @update_watcher.call(self, @pe, stat&.ino) else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 9d2cfeca39..1546bf9d4d 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2899,5 +2899,122 @@ def test_updateTW_after_refreshTW }, ) end + + def test_path_resurrection + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632 + # Because of this problem, log duplication can occur during `rotate_wait`. + # Need to set `rotate_wait 0` for a workaround. + "rotate_wait" => "0s", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 5, timeout: 10) do + # Rotate + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + # TailWatcher(path: "tail.txt", inode: inode_0) detects `tail.txt` disappeared. + # Call `update_watcher` to stop and discard self. + # If not discarding, then it will be a orphan and cause leak and log duplication. + # + # This reproduces the case where the notify to TailWatcher comes before the new file for the path + # is created during rotation. + # (stat_watcher notifies faster than a new file is created) + # Overall, this is a rotation operation, but from the TailWatcher, it appears as if the file + # was resurrected once it disappeared. + sleep 2 # On Windows and macOS, StatWatcher doesn't work, so need enough interval for TimeTrigger. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # Add new TailWatchers + # tail.txt: TailWatcher(path: "tail.txt", inode: inode_1) + # tail.txt: TailWatcher(path: "tail.txt1", inode: inode_0) + # NOTE: If not discarding the first TailWatcher on notify, this makes it a orphan because + # this overwrites the `@tails[tail.txt]` by adding TailWatcher(path: "tail.txt", inode: inode_1) + d.instance.refresh_watchers + + # This does nothing. + # NOTE: If not discarding the first TailWatcher on notify, this add + # tail.txt1: TailWatcher(path: "tail.txt1", inode: inode_0) + # because the previous refresh_watcher overwrites `@tails[tail.txt]` and the inode_0 is lost. + # This would cause log duplication. + d.instance.refresh_watchers + + # Append to the old file + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt1", "ab") {|f| f.puts "file1 log3"} + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = Fluent::FileWrapper.stat("#{@tmp_dir}/tail.txt1").ino + inode_1 = Fluent::FileWrapper.stat("#{@tmp_dir}/tail.txt").ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file1 log3", "file2 log1", "file2 log2"], + tail_watcher_set: Set[ + { + path: "#{@tmp_dir}/tail.txt", + inode: inode_0, + io_handler_opened_status: false, + }, + { + path: "#{@tmp_dir}/tail.txt", + inode: inode_1, + io_handler_opened_status: false, + }, + { + path: "#{@tmp_dir}/tail.txt1", + inode: inode_0, + io_handler_opened_status: false, + }, + ], + position_entries: [ + ["#{@tmp_dir}/tail.txt", "0000000000000021", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ], + }, + { + record_values: record_values, + tail_watcher_set: Set.new(tail_watchers.collect { |tw| + { + path: tw.path, + inode: tw.ino, + io_handler_opened_status: tw.instance_variable_get(:@io_handler)&.opened? || false, + } + }), + position_entries: position_entries, + }, + ) + end end end