Skip to content

Commit

Permalink
in_tail: Ensure to detach correct watcher on rotation with follow_inodes
Browse files Browse the repository at this point in the history
If `refresh_watchers` run before `update_watcher`,
the old implementation of `update_watcher` detach wrongly the
new TailWatcher which is added in `refresh_watcher`.
This causes the problem of stopping tailing log and handle leak.

The test case `test_updateTW_after_refreshTW` reproduces this
problem.
This fix solves it.

There are another BUG about unwatching.
I adjusted some expected values of the tests for this BUG.
When `refresh_watcher` find the rotated old file AFTER
unwatching it (`rotate_wait`), then the logs will be collected in
duplicate.
If `refresh_watcher` find it BEFORE unwatching it (`rotate_wait`),
this problem doesn't occur because the position entry is still
alive.
We need to fix this and fix the adjusted expected values.

This fix is based on the content and discussion of the following
issue and PR:

* fluent#3614
* fluent#4185
* fluent#4191

Signed-off-by: Daijiro Fukuda <[email protected]>
Co-authored-by: Katuya Kawakami <[email protected]>
Co-authored-by: Masaki Hatada <[email protected]>
Co-authored-by: Gary Zhu <[email protected]>
Co-authored-by: Takuro Ashie <[email protected]>
  • Loading branch information
5 people committed Jun 20, 2023
1 parent 02ae0ae commit 8ec5545
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 16 deletions.
31 changes: 15 additions & 16 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ def close_watcher_handles
end

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(target_info, pe)
path = target_info.path
def update_watcher(watcher, pe, new_inode)
path = watcher.path

log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

Expand All @@ -499,23 +499,26 @@ def update_watcher(target_info, pe)
end
end

rotated_tw = @tails[path]
new_target_info = TargetInfo.new(path, new_inode)

if @follow_inodes
new_position_entry = @pf[target_info]
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
watcher.unwatched = true

new_position_entry = @pf[new_target_info]
# If `refresh_watcher` find the new file before, this will not be zero.
# In this case, only we have to do is detaching the current watcher.
if new_position_entry.read_inode == 0
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
rotated_tw.unwatched = true if rotated_tw
@tails[path] = setup_watcher(target_info, new_position_entry)
@tails[path] = setup_watcher(new_target_info, new_position_entry)
@tails[path].on_notify
end
else
@tails[path] = setup_watcher(target_info, pe)
@tails[path] = setup_watcher(new_target_info, pe)
@tails[path].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw

detach_watcher_after_rotate_wait(watcher, pe.read_inode)
end

# TailWatcher#close is called by another thread at shutdown phase.
Expand Down Expand Up @@ -877,18 +880,14 @@ def on_rotate(stat)
# No need to update a watcher if stat is nil (file not present), because moving to inodes will create
# new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method
# don't want to swap state because we need latest read offset in pos file even after rotate_wait
if stat
target_info = TargetInfo.new(@path, stat.ino)
@update_watcher.call(target_info, @pe)
end
@update_watcher.call(self, @pe, stat.ino) if stat
else
# Permit to handle if stat is nil (file not present).
# If a file is mv-ed and a new file is created during
# calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers`
# and `#stop_watchers()` for the path because `target_paths_hash`
# always contains the path.
target_info = TargetInfo.new(@path, stat ? stat.ino : nil)
@update_watcher.call(target_info, swap_state(@pe))
@update_watcher.call(self, swap_state(@pe), stat&.ino)
end
else
@log.info "detected rotation of #{@path}"
Expand Down
274 changes: 274 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2638,4 +2638,278 @@ def test_lines_collected_with_no_throttling(data)
end
end
end

# https://github.com/fluent/fluentd/issues/3614
sub_test_case "Update watchers for rotation with follow_inodes" do
def test_updateTW_before_refreshTW_and_detach_before_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher quickly.
"rotate_wait" => "1s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end

def test_updateTW_before_refreshTW_and_detach_after_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher after refresh_watchers.
"rotate_wait" => "4s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
sleep 2

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# The old TailWathcer is detached here since `rotate_wait` is `4s`.
sleep 3

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end

def test_updateTW_after_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher quickly.
"rotate_wait" => "1s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# This reproduces the following situation:
# Rotation => refresh_watchers => update_watcher
# This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1)
# This overwrites `@tails["tail.txt"]`
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
# TODO: This is BUG!! We need to fix it and replace this with the next.
record_values: ["file1 log1", "file1 log1", "file1 log2", "file1 log2", "file2 log1", "file2 log2"],
# record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
# TODO: This is BUG!! We need to fix it and replace this with the next.
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
["#{@tmp_dir}/tail.txt1", "0000000000000016", inode_0],
],
# position_entries: [
# ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
# ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
# ],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end
end
end

0 comments on commit 8ec5545

Please sign in to comment.