diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index b1e28fc986..34feb40ff0 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2638,4 +2638,262 @@ def test_lines_collected_with_no_throttling(data) end end end + + # https://github.com/fluent/fluentd/issues/3614 + sub_test_case "Update watchers for rotation with follow_inodes" do + def test_updateTW_before_refreshTW_and_detach_before_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher quickly. + "rotate_wait" => "1s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + sleep 3 + + # This reproduces the following situation: + # Rotation => update_watcher => refresh_watchers + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + + def test_updateTW_before_refreshTW_and_detach_after_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher after refresh_watchers. + "rotate_wait" => "4s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + sleep 2 + + # This reproduces the following situation: + # Rotation => update_watcher => refresh_watchers + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # The old TailWathcer is detached here since `rotate_wait` is `4s`. + sleep 3 + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + + def test_updateTW_after_refreshTW + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + # In order to detach the old watcher quickly. + "rotate_wait" => "1s", + # In order to reproduce the same condition stably, ensure that `refresh_watchers` is not + # called by a timer. + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 4, timeout: 10) do + # Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation) + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"} + + # This reproduces the following situation: + # Rotation => refresh_watchers => update_watcher + # This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1) + # This overwrites `@tails["tail.txt"]` + d.instance.refresh_watchers + + # `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher: + # TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1) + # The old TailWathcer is detached here since `rotate_wait` is just `1s`. + sleep 3 + + # This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0) + d.instance.refresh_watchers + + # Append to the new current log file. + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"} + end + + inode_0 = tail_watchers[0].ino + inode_1 = tail_watchers[1].ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"], + tail_watcher_inodes: [inode_0, inode_1, inode_0], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0], + ["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end + end end