Skip to content

Commit

Permalink
Merge pull request #4239 from daipom/in_tail-ensure-discard-watcher-w…
Browse files Browse the repository at this point in the history
…ith-missing-target

in_tail: Ensure to discard TailWatcher with missing target when follow_inodes
  • Loading branch information
ashie authored Jul 14, 2023
2 parents d05f592 + 781e313 commit 32082ea
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 4 deletions.
30 changes: 26 additions & 4 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,24 @@ def close_watcher_handles

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(tail_watcher, pe, new_inode)
# TODO we should use another callback for this.
# To supress impact to existing logics, limit the case to `@follow_inodes`.
# We may not need `@follow_inodes` condition.
if @follow_inodes && new_inode.nil?
# nil inode means the file disappeared, so we only need to stop it.
@tails.delete(tail_watcher.path)
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# Because of this problem, log duplication can occur during `rotate_wait`.
# Need to set `rotate_wait 0` for a workaround.
# Duplication will occur if `refresh_watcher` is called during the `rotate_wait`.
# In that case, `refresh_watcher` will add the new TailWatcher to tail the same target,
# and it causes the log duplication.
# (Other `detach_watcher_after_rotate_wait` may have the same problem.
# We need the mechanism not to add duplicated TailWathcer with detaching TailWatcher.)
detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode)
return
end

path = tail_watcher.path

log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")
Expand Down Expand Up @@ -890,10 +908,14 @@ def on_rotate(stat)

if watcher_needs_update
if @follow_inodes
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(self, @pe, stat.ino) if stat
# If stat is nil (file not present), NEED to stop and discard this watcher.
# When the file is disappeared but is resurrected soon, then `#refresh_watcher`
# can't recognize this TailWatcher needs to be stopped.
# This can happens when the file is rotated.
# If a notify comes before the new file for the path is created during rotation,
# then it appears as if the file was resurrected once it disappeared.
# Don't want to swap state because we need latest read offset in pos file even after rotate_wait
@update_watcher.call(self, @pe, stat&.ino)
else
# Permit to handle if stat is nil (file not present).
# If a file is mv-ed and a new file is created during
Expand Down
117 changes: 117 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2899,5 +2899,122 @@ def test_updateTW_after_refreshTW
},
)
end

def test_path_resurrection
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# https://github.com/fluent/fluentd/pull/4237#issuecomment-1633358632
# Because of this problem, log duplication can occur during `rotate_wait`.
# Need to set `rotate_wait 0` for a workaround.
"rotate_wait" => "0s",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 5, timeout: 10) do
# Rotate
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
# TailWatcher(path: "tail.txt", inode: inode_0) detects `tail.txt` disappeared.
# Call `update_watcher` to stop and discard self.
# If not discarding, then it will be a orphan and cause leak and log duplication.
#
# This reproduces the case where the notify to TailWatcher comes before the new file for the path
# is created during rotation.
# (stat_watcher notifies faster than a new file is created)
# Overall, this is a rotation operation, but from the TailWatcher, it appears as if the file
# was resurrected once it disappeared.
sleep 2 # On Windows and macOS, StatWatcher doesn't work, so need enough interval for TimeTrigger.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# Add new TailWatchers
# tail.txt: TailWatcher(path: "tail.txt", inode: inode_1)
# tail.txt: TailWatcher(path: "tail.txt1", inode: inode_0)
# NOTE: If not discarding the first TailWatcher on notify, this makes it a orphan because
# this overwrites the `@tails[tail.txt]` by adding TailWatcher(path: "tail.txt", inode: inode_1)
d.instance.refresh_watchers

# This does nothing.
# NOTE: If not discarding the first TailWatcher on notify, this add
# tail.txt1: TailWatcher(path: "tail.txt1", inode: inode_0)
# because the previous refresh_watcher overwrites `@tails[tail.txt]` and the inode_0 is lost.
# This would cause log duplication.
d.instance.refresh_watchers

# Append to the old file
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt1", "ab") {|f| f.puts "file1 log3"}

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = Fluent::FileWrapper.stat("#{@tmp_dir}/tail.txt1").ino
inode_1 = Fluent::FileWrapper.stat("#{@tmp_dir}/tail.txt").ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file1 log3", "file2 log1", "file2 log2"],
tail_watcher_set: Set[
{
path: "#{@tmp_dir}/tail.txt",
inode: inode_0,
io_handler_opened_status: false,
},
{
path: "#{@tmp_dir}/tail.txt",
inode: inode_1,
io_handler_opened_status: false,
},
{
path: "#{@tmp_dir}/tail.txt1",
inode: inode_0,
io_handler_opened_status: false,
},
],
position_entries: [
["#{@tmp_dir}/tail.txt", "0000000000000021", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
{
record_values: record_values,
tail_watcher_set: Set.new(tail_watchers.collect { |tw|
{
path: tw.path,
inode: tw.ino,
io_handler_opened_status: tw.instance_variable_get(:@io_handler)&.opened? || false,
}
}),
position_entries: position_entries,
},
)
end
end
end

0 comments on commit 32082ea

Please sign in to comment.