Skip to content

Commit

Permalink
Merge pull request #4208 from daipom/in_tail-ensure-detach-correct-wa…
Browse files Browse the repository at this point in the history
…tcher

in_tail: Ensure to detach correct watcher on rotation with follow_inodes
  • Loading branch information
ashie authored Jun 27, 2023
2 parents b71ab13 + 1a86721 commit badef56
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 16 deletions.
31 changes: 15 additions & 16 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ def close_watcher_handles
end

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(target_info, pe)
path = target_info.path
def update_watcher(tail_watcher, pe, new_inode)
path = tail_watcher.path

log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

Expand All @@ -499,23 +499,26 @@ def update_watcher(target_info, pe)
end
end

rotated_tw = @tails[path]
new_target_info = TargetInfo.new(path, new_inode)

if @follow_inodes
new_position_entry = @pf[target_info]
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
tail_watcher.unwatched = true

new_position_entry = @pf[new_target_info]
# If `refresh_watcher` find the new file before, this will not be zero.
# In this case, only we have to do is detaching the current tail_watcher.
if new_position_entry.read_inode == 0
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
rotated_tw.unwatched = true if rotated_tw
@tails[path] = setup_watcher(target_info, new_position_entry)
@tails[path] = setup_watcher(new_target_info, new_position_entry)
@tails[path].on_notify
end
else
@tails[path] = setup_watcher(target_info, pe)
@tails[path] = setup_watcher(new_target_info, pe)
@tails[path].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw

detach_watcher_after_rotate_wait(tail_watcher, pe.read_inode)
end

# TailWatcher#close is called by another thread at shutdown phase.
Expand Down Expand Up @@ -877,18 +880,14 @@ def on_rotate(stat)
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
if stat
target_info = TargetInfo.new(@path, stat.ino)
@update_watcher.call(target_info, @pe)
end
@update_watcher.call(self, @pe, stat.ino) if stat
else
# Permit to handle if stat is nil (file not present).
# If a file is mv-ed and a new file is created during
# calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers`
# and `#stop_watchers()` for the path because `target_paths_hash`
# always contains the path.
target_info = TargetInfo.new(@path, stat ? stat.ino : nil)
@update_watcher.call(target_info, swap_state(@pe))
@update_watcher.call(self, swap_state(@pe), stat&.ino)
end
else
@log.info "detected rotation of #{@path}"
Expand Down
275 changes: 275 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2638,4 +2638,279 @@ def test_lines_collected_with_no_throttling(data)
end
end
end

sub_test_case "Update watchers for rotation with follow_inodes" do
def test_updateTW_before_refreshTW_and_detach_before_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher quickly.
"rotate_wait" => "1s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end

def test_updateTW_before_refreshTW_and_detach_after_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher after refresh_watchers.
"rotate_wait" => "4s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
sleep 2

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# The old TailWathcer is detached here since `rotate_wait` is `4s`.
sleep 3

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end

# The scenario where in_tail wrongly detaches TailWatcher.
# This is reported in https://github.com/fluent/fluentd/issues/4190.
def test_updateTW_after_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher quickly.
"rotate_wait" => "1s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# This reproduces the following situation:
# Rotation => refresh_watchers => update_watcher
# This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1)
# This overwrites `@tails["tail.txt"]`
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end
end
end

0 comments on commit badef56

Please sign in to comment.