From 441b38b3c31605074f7ac8173051939f299357c2 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 25 Jan 2024 21:44:55 +0900 Subject: [PATCH] in_tail: Add test for fast rotation Signed-off-by: Daijiro Fukuda --- test/plugin/test_in_tail.rb | 166 ++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 87d5015045..b57df21e40 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3016,6 +3016,92 @@ def test_path_resurrection }, ) end + + def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "follow_inodes" => "true", + "rotate_wait" => "3s", + "refresh_interval" => "1h", + # stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file, + # so disable it in order to reproduce the same condition stably. + "enable_stat_watcher" => "false", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls. + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 6, timeout: 15) do + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} + + sleep 1 + + FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} + + sleep 1 + + # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher) + [1, 0].each do |i| + FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}") + end + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} + + # Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate. + # (Note: Currently, there is no harm in duplicate calls) + sleep 4 + end + + inode_0 = tail_watchers[0]&.ino + inode_1 = tail_watchers[1]&.ino + inode_2 = tail_watchers[2]&.ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"], + tail_watcher_inodes: [inode_0, inode_1, inode_2], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_0], + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_1], + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end end sub_test_case "Update watchers for rotation without follow_inodes" do @@ -3118,5 +3204,85 @@ def test_refreshTW_during_rotation }, ) end + + def test_next_rotation_occurs_very_fast_while_old_TW_still_waiting_rotate_wait + config = config_element( + "ROOT", + "", + { + "path" => "#{@tmp_dir}/tail.txt0", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "format" => "none", + "read_from_head" => "true", + "rotate_wait" => "3s", + "refresh_interval" => "1h", + } + ) + d = create_driver(config, false) + + tail_watchers = [] + stub.proxy(d.instance).setup_watcher do |tw| + tail_watchers.append(tw) + mock.proxy(tw).close.once # Note: Currently, there is no harm in duplicate calls. + tw + end + + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file1 log1"} + + d.run(expect_records: 6, timeout: 15) do + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file1 log2"} + + sleep 1 + + FileUtils.move("#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt" + "1") + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file2 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file2 log2"} + + sleep 1 + + # Rotate again (Old TailWatcher waiting rotate_wait also calls update_watcher) + [1, 0].each do |i| + FileUtils.move("#{@tmp_dir}/tail.txt#{i}", "#{@tmp_dir}/tail.txt#{i + 1}") + end + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "wb") {|f| f.puts "file3 log1"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt0", "ab") {|f| f.puts "file3 log2"} + + # Wait rotate_wait to confirm that TailWatcher.close is not called in duplicate. + # (Note: Currently, there is no harm in duplicate calls) + sleep 4 + end + + inode_0 = tail_watchers[0]&.ino + inode_1 = tail_watchers[1]&.ino + inode_2 = tail_watchers[2]&.ino + record_values = d.events.collect { |event| event[2]["message"] }.sort + position_entries = [] + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f| + f.readlines(chomp: true).each do |line| + values = line.split("\t") + position_entries.append([values[0], values[1], values[2].to_i(16)]) + end + end + + assert_equal( + { + record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2", "file3 log1", "file3 log2"], + tail_watcher_paths: ["#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0", "#{@tmp_dir}/tail.txt0"], + tail_watcher_inodes: [inode_0, inode_1, inode_2], + tail_watcher_io_handler_opened_statuses: [false, false, false], + position_entries: [ + ["#{@tmp_dir}/tail.txt0", "0000000000000016", inode_2], + ], + }, + { + record_values: record_values, + tail_watcher_paths: tail_watchers.collect { |tw| tw.path }, + tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino }, + tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false }, + position_entries: position_entries + }, + ) + end end end