From 531824a0cfb937b558092499b39e0b16e95c9ab5 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 1 Sep 2022 14:50:49 +0900 Subject: [PATCH 1/8] Move lib/fluent/plugin/file_wrapper.rb to lib/fluent It doesn't use the name space `Fluent::Plugin`, and seems useful for global usage rather than only for plugin. Signed-off-by: Takuro Ashie --- lib/fluent/{plugin => }/file_wrapper.rb | 0 lib/fluent/plugin/in_tail.rb | 2 +- test/{plugin => }/test_file_wrapper.rb | 4 ++-- 3 files changed, 3 insertions(+), 3 deletions(-) rename lib/fluent/{plugin => }/file_wrapper.rb (100%) rename test/{plugin => }/test_file_wrapper.rb (94%) diff --git a/lib/fluent/plugin/file_wrapper.rb b/lib/fluent/file_wrapper.rb similarity index 100% rename from lib/fluent/plugin/file_wrapper.rb rename to lib/fluent/file_wrapper.rb diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index cf12810953..5346994b31 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -27,7 +27,7 @@ require 'fluent/plugin/in_tail/group_watch' if Fluent.windows? - require_relative 'file_wrapper' + require 'fluent/file_wrapper' else Fluent::FileWrapper = File end diff --git a/test/plugin/test_file_wrapper.rb b/test/test_file_wrapper.rb similarity index 94% rename from test/plugin/test_file_wrapper.rb rename to test/test_file_wrapper.rb index 93e7e61fbc..22e5b5ad92 100644 --- a/test/plugin/test_file_wrapper.rb +++ b/test/test_file_wrapper.rb @@ -1,5 +1,5 @@ -require_relative '../helper' -require 'fluent/plugin/file_wrapper' +require_relative 'helper' +require 'fluent/file_wrapper' class FileWrapperTest < Test::Unit::TestCase TMP_DIR = File.dirname(__FILE__) + "/../tmp/file_wrapper#{ENV['TEST_ENV_NUMBER']}" From 77472a9cdf9dc9706911c1bbe85e04c0831c7fe9 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 1 Sep 2022 14:59:22 +0900 Subject: [PATCH 2/8] Define Fluent::FileWrapper at file_wrapper.rb even on non-Windows Signed-off-by: Takuro Ashie --- lib/fluent/file_wrapper.rb | 196 ++++++++++++++++++----------------- lib/fluent/plugin/in_tail.rb | 7 +- 2 files changed, 101 insertions(+), 102 deletions(-) diff --git a/lib/fluent/file_wrapper.rb b/lib/fluent/file_wrapper.rb index 91075a2aed..383862671f 100644 --- a/lib/fluent/file_wrapper.rb +++ b/lib/fluent/file_wrapper.rb @@ -14,118 +14,122 @@ # limitations under the License. # -require 'fluent/win32api' - -module Fluent - module FileWrapper - def self.open(path, mode='r') - io = WindowsFile.new(path, mode).io - if block_given? - v = yield io - io.close - v - else - io +unless Fluent.windows? + Fluent::FileWrapper = File +else + require 'fluent/win32api' + + module Fluent + module FileWrapper + def self.open(path, mode='r') + io = WindowsFile.new(path, mode).io + if block_given? + v = yield io + io.close + v + else + io + end end - end - def self.stat(path) - f = WindowsFile.new(path) - s = f.stat - f.close - s + def self.stat(path) + f = WindowsFile.new(path) + s = f.stat + f.close + s + end end - end - class WindowsFile - include File::Constants + class WindowsFile + include File::Constants - attr_reader :io + attr_reader :io - INVALID_HANDLE_VALUE = -1 + INVALID_HANDLE_VALUE = -1 - def initialize(path, mode='r') - @path = path - @io = File.open(path, mode2flags(mode)) - @file_handle = Win32API._get_osfhandle(@io.to_i) - @io.instance_variable_set(:@file_index, self.ino) - def @io.ino - @file_index + def initialize(path, mode='r') + @path = path + @io = File.open(path, mode2flags(mode)) + @file_handle = Win32API._get_osfhandle(@io.to_i) + @io.instance_variable_set(:@file_index, self.ino) + def @io.ino + @file_index + end end - end - - def close - @io.close - @file_handle = INVALID_HANDLE_VALUE - end - - # To keep backward compatibility, we continue to use GetFileInformationByHandle() - # to get file id. - # Note that Ruby's File.stat uses GetFileInformationByHandleEx() with FileIdInfo - # and returned value is different with above one, former one is 64 bit while - # later one is 128bit. - def ino - by_handle_file_information = '\0'*(4+8+8+8+4+4+4+4+4+4) #72bytes - unless Win32API.GetFileInformationByHandle(@file_handle, by_handle_file_information) - return 0 + def close + @io.close + @file_handle = INVALID_HANDLE_VALUE end - by_handle_file_information.unpack("I11Q1")[11] # fileindex - end + # To keep backward compatibility, we continue to use GetFileInformationByHandle() + # to get file id. + # Note that Ruby's File.stat uses GetFileInformationByHandleEx() with FileIdInfo + # and returned value is different with above one, former one is 64 bit while + # later one is 128bit. + def ino + by_handle_file_information = '\0'*(4+8+8+8+4+4+4+4+4+4) #72bytes - def stat - raise Errno::ENOENT if delete_pending - s = File.stat(@path) - s.instance_variable_set :@ino, self.ino - def s.ino; @ino; end - s - end + unless Win32API.GetFileInformationByHandle(@file_handle, by_handle_file_information) + return 0 + end - private - - def mode2flags(mode) - # Always inject File::Constants::SHARE_DELETE - # https://github.com/fluent/fluentd/pull/3585#issuecomment-1101502617 - # To enable SHARE_DELETE, BINARY is also required. - # https://bugs.ruby-lang.org/issues/11218 - # https://github.com/ruby/ruby/blob/d6684f063bc53e3cab025bd39526eca3b480b5e7/win32/win32.c#L6332-L6345 - flags = BINARY | SHARE_DELETE - case mode.delete("b") - when "r" - flags |= RDONLY - when "r+" - flags |= RDWR - when "w" - flags |= WRONLY | CREAT | TRUNC - when "w+" - flags |= RDWR | CREAT | TRUNC - when "a" - flags |= WRONLY | CREAT | APPEND - when "a+" - flags |= RDWR | CREAT | APPEND - else - raise Errno::EINVAL.new("Unsupported mode by Fluent::FileWrapper: #{mode}") + by_handle_file_information.unpack("I11Q1")[11] # fileindex end - end - # DeletePending is a Windows-specific file state that roughly means - # "this file is queued for deletion, so close any open handlers" - # - # This flag can be retrieved via GetFileInformationByHandleEx(). - # - # https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getfileinformationbyhandleex - # - def delete_pending - file_standard_info = 0x01 - bufsize = 1024 - buf = '\0' * bufsize - - unless Win32API.GetFileInformationByHandleEx(@file_handle, file_standard_info, buf, bufsize) - return false + def stat + raise Errno::ENOENT if delete_pending + s = File.stat(@path) + s.instance_variable_set :@ino, self.ino + def s.ino; @ino; end + s end - return buf.unpack("QQICC")[3] != 0 + private + + def mode2flags(mode) + # Always inject File::Constants::SHARE_DELETE + # https://github.com/fluent/fluentd/pull/3585#issuecomment-1101502617 + # To enable SHARE_DELETE, BINARY is also required. + # https://bugs.ruby-lang.org/issues/11218 + # https://github.com/ruby/ruby/blob/d6684f063bc53e3cab025bd39526eca3b480b5e7/win32/win32.c#L6332-L6345 + flags = BINARY | SHARE_DELETE + case mode.delete("b") + when "r" + flags |= RDONLY + when "r+" + flags |= RDWR + when "w" + flags |= WRONLY | CREAT | TRUNC + when "w+" + flags |= RDWR | CREAT | TRUNC + when "a" + flags |= WRONLY | CREAT | APPEND + when "a+" + flags |= RDWR | CREAT | APPEND + else + raise Errno::EINVAL.new("Unsupported mode by Fluent::FileWrapper: #{mode}") + end + end + + # DeletePending is a Windows-specific file state that roughly means + # "this file is queued for deletion, so close any open handlers" + # + # This flag can be retrieved via GetFileInformationByHandleEx(). + # + # https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getfileinformationbyhandleex + # + def delete_pending + file_standard_info = 0x01 + bufsize = 1024 + buf = '\0' * bufsize + + unless Win32API.GetFileInformationByHandleEx(@file_handle, file_standard_info, buf, bufsize) + return false + end + + return buf.unpack("QQICC")[3] != 0 + end end end -end if Fluent.windows? +end diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 5346994b31..fb29dd249f 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -25,12 +25,7 @@ require 'fluent/capability' require 'fluent/plugin/in_tail/position_file' require 'fluent/plugin/in_tail/group_watch' - -if Fluent.windows? - require 'fluent/file_wrapper' -else - Fluent::FileWrapper = File -end +require 'fluent/file_wrapper' module Fluent::Plugin class TailInput < Fluent::Plugin::Input From 2e4f3db26f95c6f2674d8796aed238c0e579e490 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 1 Sep 2022 15:18:09 +0900 Subject: [PATCH 3/8] Use Fluent::FileWrapper.open instead of File.open to create test files Try to fix Errno::EACESS on deleting test files, because `FileUtils.rm_rf` of Ruby 3.2 doesn't ignore it. Signed-off-by: Takuro Ashie --- test/command/test_fluentd.rb | 11 +- test/plugin/test_in_tail.rb | 205 ++++++++++++++++++----------------- test/plugin/test_out_file.rb | 5 +- test/test_supervisor.rb | 3 +- 4 files changed, 114 insertions(+), 110 deletions(-) diff --git a/test/command/test_fluentd.rb b/test/command/test_fluentd.rb index 2ad023bf90..726bc5b8f5 100644 --- a/test/command/test_fluentd.rb +++ b/test/command/test_fluentd.rb @@ -5,6 +5,7 @@ require 'fileutils' require 'timeout' +require 'fluent/file_wrapper' class TestFluentdCommand < ::Test::Unit::TestCase TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluentd#{ENV['TEST_ENV_NUMBER']}") @@ -31,7 +32,7 @@ def process_exist?(pid) def create_conf_file(name, content, ext_enc = 'utf-8') conf_path = File.join(TMP_DIR, name) - File.open(conf_path, "w:#{ext_enc}:utf-8") do |file| + Fluent::FileWrapper.open(conf_path, "w:#{ext_enc}:utf-8") do |file| file.write content end conf_path @@ -40,7 +41,7 @@ def create_conf_file(name, content, ext_enc = 'utf-8') def create_plugin_file(name, content) file_path = File.join(TMP_DIR, 'plugin', name) FileUtils.mkdir_p(File.dirname(file_path)) - File.open(file_path, 'w') do |file| + Fluent::FileWrapper.open(file_path, 'w') do |file| file.write content end file_path @@ -57,7 +58,7 @@ def create_cmdline(conf_path, *fluentd_options) end def execute_command(cmdline, chdir=TMP_DIR, env = {}) - null_stream = File.open(File::NULL, 'w') + null_stream = Fluent::FileWrapper.open(File::NULL, 'w') gemfile_path = File.expand_path(File.dirname(__FILE__) + "../../../Gemfile") env = { "BUNDLE_GEMFILE" => gemfile_path }.merge(env) @@ -308,7 +309,7 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10) end test 'fails to launch fluentd if specified root path is invalid path for directory' do - File.open(@root_path, 'w') do |_| + Fluent::FileWrapper.open(@root_path, 'w') do |_| # create file and close it end conf_path = create_conf_file('existing_root_dir.conf', @conf) @@ -964,7 +965,7 @@ def multi_workers_ready? tmp_ruby_path = File.join(TMP_DIR, "ruby with spaces") if Fluent.windows? tmp_ruby_path << ".bat" - File.open(tmp_ruby_path, "w") do |file| + Fluent::FileWrapper.open(tmp_ruby_path, "w") do |file| file.write "#{ruby_path} %*" end else diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 204d45cbff..6edb9f38d6 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3,6 +3,7 @@ require 'fluent/plugin/in_tail' require 'fluent/plugin/buffer' require 'fluent/system_config' +require 'fluent/file_wrapper' require 'net/http' require 'flexmock/test_unit' require 'timecop' @@ -378,9 +379,9 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(tailing_group_pattern, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG d = create_driver(conf, false) - File.open("#{@tmp_dir}/test1.txt", 'w') - File.open("#{@tmp_dir}/test2.txt", 'w') - File.open("#{@tmp_dir}/test3.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test1.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test2.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test3.txt", 'w') d.run do ## checking default group_watcher's paths @@ -419,10 +420,10 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) file4 = File.join(@tmp_dir, "test-podname4_test-namespace3_test-container-15fabq.log") d.run do - File.open(file1, 'w') - File.open(file2, 'w') - File.open(file3, 'w') - File.open(file4, 'w') + Fluent::FileWrapper.open(file1, 'w') + Fluent::FileWrapper.open(file2, 'w') + Fluent::FileWrapper.open(file3, 'w') + Fluent::FileWrapper.open(file4, 'w') instance = d.instance assert_equal(100, instance.find_group_from_metadata(file1).limit) @@ -438,7 +439,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) parse: PARSE_SINGLE_LINE_CONFIG) def test_emit(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -446,7 +447,7 @@ def test_emit(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\ntest4" } end @@ -462,11 +463,11 @@ def test_emit(data) def test_emit_with_emit_unmatched_lines_true config = config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test line 1" f.puts "test line 2" f.puts "bad line 1" @@ -498,7 +499,7 @@ def test_emit_with_read_lines_limit(data) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. d.run(expect_emits: num_events, timeout: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts msg f.puts msg } @@ -544,7 +545,7 @@ def test_emit_with_read_bytes_limit_per_second(data) d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| 100.times do f.puts msg end @@ -567,7 +568,7 @@ def test_read_bytes_limit_precede_read_lines_limit start_time = Fluent::Clock.now d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| 8000.times do f.puts msg end @@ -606,7 +607,7 @@ def test_emit_with_read_bytes_limit_per_second(data) io_handler end - File.open("#{@tmp_dir}/tail.txt", "ab") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") do |f| 100.times do f.puts msg end @@ -616,7 +617,7 @@ def test_emit_with_read_bytes_limit_per_second(data) d.run do start_time = Fluent::Clock.now while Fluent::Clock.now - start_time < 0.8 do - File.open("#{@tmp_dir}/tail.txt", "ab") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") do |f| f.puts msg f.flush end @@ -634,7 +635,7 @@ def test_longer_than_rotate_wait num_lines = 1024 * 3 msg = "08bytes" - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| f.write("#{msg}\n" * num_lines) end @@ -670,7 +671,7 @@ def test_shorter_than_rotate_wait num_lines = 1024 * 2 msg = "08bytes" - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| f.write("#{msg}\n" * num_lines) end @@ -718,7 +719,7 @@ def test_shorter_than_rotate_wait parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_read_from_head(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -726,7 +727,7 @@ def test_emit_with_read_from_head(data) d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -744,7 +745,7 @@ def test_emit_with_read_from_head(data) parse: CONFIG_DISABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG) def test_emit_without_watch_timer(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -752,7 +753,7 @@ def test_emit_without_watch_timer(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -776,7 +777,7 @@ def test_watch_wildcard_path_without_watch_timer }) config = config + CONFIG_DISABLE_WATCH_TIMER + SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -784,7 +785,7 @@ def test_watch_wildcard_path_without_watch_timer d = create_driver(config, false) d.run(expect_emits: 1, timeout: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -802,7 +803,7 @@ def test_watch_wildcard_path_without_watch_timer parse: CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_disable_stat_watcher(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -810,7 +811,7 @@ def test_emit_with_disable_stat_watcher(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -826,7 +827,7 @@ def test_always_read_from_head_on_detecting_a_new_file d = create_driver(SINGLE_LINE_CONFIG) d.run(expect_emits: 1, timeout: 3) do - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1\ntest2\n" } end @@ -873,7 +874,7 @@ def test_emit_with_system system_conf = parse_system(CONFIG_SYSTEM) sc = Fluent::SystemConfig.new(system_conf) Fluent::Engine.init(sc) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -881,7 +882,7 @@ def test_emit_with_system d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -938,13 +939,13 @@ def test_rotate_file_with_open_on_every_update(data) def test_rotate_file_with_write_old(data) config = data events = sub_test_rotate_file(config, expect_emits: 3) { |rotated_file| - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } rotated_file.puts "test7" rotated_file.puts "test8" rotated_file.flush sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -994,10 +995,10 @@ def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, t if block_given? yield file else - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -1012,7 +1013,7 @@ def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, t def test_truncate_file config = SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" f.flush @@ -1021,7 +1022,7 @@ def test_truncate_file d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\ntest4" f.flush } @@ -1052,7 +1053,7 @@ def test_truncate_file def test_move_truncate_move_back config = SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -1089,17 +1090,17 @@ def test_move_truncate_move_back end def test_lf - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| } d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.print "test3" } sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test4" } end @@ -1110,12 +1111,12 @@ def test_lf end def test_whitespace - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| } d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts " " # 4 spaces f.puts " 4 spaces" f.puts "4 spaces " @@ -1146,7 +1147,7 @@ def test_encoding(data) d = create_driver(CONFIG_READ_FROM_HEAD + encoding_config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test" } end @@ -1168,7 +1169,7 @@ def test_from_encoding utf8_message = cp932_message.encode(Encoding::UTF_8) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:cp932") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:cp932") {|f| f.puts cp932_message } end @@ -1191,7 +1192,7 @@ def test_from_encoding_utf16 utf8_message = utf16_message.encode(Encoding::UTF_8).strip d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:utf-16le") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:utf-16le") { |f| f.write utf16_message } end @@ -1212,7 +1213,7 @@ def test_encoding_with_bad_character d = create_driver(conf) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w") { |f| f.write "te\x86st\n" } end @@ -1227,11 +1228,11 @@ def test_encoding_with_bad_character parse: PARSE_MULTILINE_CONFIG) def test_multiline(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1255,11 +1256,11 @@ def test_multiline(data) parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_emit_unmatched_lines_true(data) config = data + config_element("", "", { "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1285,11 +1286,11 @@ def test_multiline_with_emit_unmatched_lines_true(data) parse: PARSE_MULTILINE_CONFIG_WITH_NEWLINE) def test_multiline_with_emit_unmatched_lines2(data) config = data + config_element("", "", { "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 0, timeout: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "s test0" f.puts "f test1" f.puts "f test2" @@ -1311,7 +1312,7 @@ def test_multiline_with_emit_unmatched_lines2(data) data(flat: MULTILINE_CONFIG, parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_flush_interval(data) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = data + config_element("", "", { "multiline_flush_interval" => "2s" }) d = create_driver(config) @@ -1319,7 +1320,7 @@ def test_multiline_with_flush_interval(data) assert_equal(2, d.instance.multiline_flush_interval) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1354,7 +1355,7 @@ def test_multiline_encoding_of_flushed_record(data) d = create_driver(config + encoding_config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| f.puts "s test" } end @@ -1377,7 +1378,7 @@ def test_multiline_from_encoding_of_flushed_record cp932_message = "s \x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) utf8_message = "\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".encode(Encoding::UTF_8, Encoding::CP932) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:cp932") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:cp932") { |f| f.puts cp932_message } end @@ -1409,11 +1410,11 @@ def test_multiline_from_encoding_of_flushed_record ) def test_multiline_with_multiple_formats(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1450,7 +1451,7 @@ def test_multiline_with_multiple_formats(data) ) def test_multilinelog_with_multiple_paths(data) files = ["#{@tmp_dir}/tail1.txt", "#{@tmp_dir}/tail2.txt"] - files.each { |file| File.open(file, "wb") { |f| } } + files.each { |file| Fluent::FileWrapper.open(file, "wb") { |f| } } config = data + config_element("", "", { "path" => "#{files[0]},#{files[1]}", @@ -1459,7 +1460,7 @@ def test_multilinelog_with_multiple_paths(data) d = create_driver(config, false) d.run(expect_emits: 2) do files.each do |file| - File.open(file, 'ab') { |f| + Fluent::FileWrapper.open(file, 'ab') { |f| f.puts "f #{file} line should be ignored" f.puts "s test1" f.puts "f test2" @@ -1494,12 +1495,12 @@ def test_multilinelog_with_multiple_paths(data) ]) ) def test_multiline_without_firstline(data) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = data d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -1605,7 +1606,7 @@ def test_unwatched_files_should_be_removed d = create_driver(config, false) d.end_if { d.instance.instance_variable_get(:@tails).keys.size >= 1 } d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test3\n" } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test3\n" } end cleanup_directory(@tmp_dir) @@ -1882,7 +1883,7 @@ def test_tag_prefix_and_suffix_ignore "max_line_size" => label, "log_level" => "debug" }) - File.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| f.puts "foo" f.puts "x" * size # 'x' * size + \n > @max_line_size f.puts "bar" @@ -1908,7 +1909,7 @@ def test_tag_prefix_and_suffix_ignore # Ensure that no fatal exception is raised when a file is missing and that # files that do exist are still tailed as expected. def test_missing_file - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -1926,7 +1927,7 @@ def test_missing_file [config1, config2].each do |config| d = create_driver(config, false) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -1947,14 +1948,14 @@ def test_should_delete_file_pos_entry_for_non_existing_file_with_follow_inodes path = "#{@tmp_dir}/tail.txt" ino = 1 pos = 1234 - File.open("#{@tmp_dir}/tail.pos", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "wb") {|f| f.puts ("%s\t%016x\t%016x\n" % [path, pos, ino]) } d = create_driver(config, false) d.run - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 assert_raise(EOFError) do @@ -1964,20 +1965,20 @@ def test_should_delete_file_pos_entry_for_non_existing_file_with_follow_inodes def test_should_write_latest_offset_after_rotate_wait config = common_follow_inode_config - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d = create_driver(config, false) d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") sleep 1 - File.open("#{@tmp_dir}/tail.txt" + "1", "ab") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt" + "1", "ab") {|f| f.puts "test4\n"} end - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 line_parts = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(pos_file.readline) waiting(5) { @@ -1997,13 +1998,13 @@ def test_should_remove_deleted_file path = "#{@tmp_dir}/tail.txt" ino = 1 pos = 1234 - File.open("#{@tmp_dir}/tail.pos", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "wb") {|f| f.puts ("%s\t%016x\t%016x\n" % [path, pos, ino]) } d = create_driver(config) d.run do - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 assert_equal([], pos_file.readlines) end @@ -2024,14 +2025,14 @@ def test_should_mark_file_unwatched_after_limit_recently_modified_and_rotate_wai d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } target_info = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end @@ -2062,13 +2063,13 @@ def test_should_read_from_head_on_file_renaming_with_star_in_pattern d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1") end @@ -2082,13 +2083,13 @@ def test_should_not_read_from_head_on_rotation_when_watching_inodes d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1") @@ -2106,16 +2107,16 @@ def test_should_mark_file_unwatched_if_same_name_file_created_with_different_ino d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } target_info = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test4\n"} end new_target_info = create_target_info("#{@tmp_dir}/tail.txt") @@ -2147,7 +2148,7 @@ def test_should_close_watcher_after_rotate_wait @metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) end - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -2171,18 +2172,18 @@ def test_should_create_new_watcher_for_new_file_with_same_name d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } path_ino = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test3" f.puts "test4" } @@ -2208,15 +2209,15 @@ def test_truncate_file_with_follow_inodes d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 3, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} sleep 2 - File.open("#{@tmp_dir}/tail.txt", "w+b") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w+b") {|f| f.puts "test4\n"} end events = d.events @@ -2230,7 +2231,7 @@ def test_truncate_file_with_follow_inodes # issue #3464 def test_should_replace_target_info - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1\n" } target_info = create_target_info("#{@tmp_dir}/tail.txt") @@ -2257,7 +2258,7 @@ def test_should_replace_target_info assert_equal([target_info.ino], inodes) cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test2\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test2\n"} while d.events.size < 2 do sleep 0.1 @@ -2274,7 +2275,7 @@ def test_should_replace_target_info sub_test_case "tail_path" do def test_tail_path_with_singleline - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -2282,7 +2283,7 @@ def test_tail_path_with_singleline d = create_driver(SINGLE_LINE_CONFIG + config_element("", "", { "path_key" => "path" })) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -2296,7 +2297,7 @@ def test_tail_path_with_singleline end def test_tail_path_with_multiline_with_firstline - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = config_element("", "", { "path_key" => "path", @@ -2306,7 +2307,7 @@ def test_tail_path_with_multiline_with_firstline }) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -2326,7 +2327,7 @@ def test_tail_path_with_multiline_with_firstline end def test_tail_path_with_multiline_without_firstline - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = config_element("", "", { "path_key" => "path", @@ -2337,7 +2338,7 @@ def test_tail_path_with_multiline_without_firstline }) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -2356,7 +2357,7 @@ def test_tail_path_with_multiline_with_multiple_paths omit "This testcase is unstable on AppVeyor." end files = ["#{@tmp_dir}/tail1.txt", "#{@tmp_dir}/tail2.txt"] - files.each { |file| File.open(file, "wb") { |f| } } + files.each { |file| Fluent::FileWrapper.open(file, "wb") { |f| } } config = config_element("", "", { "path" => "#{files[0]},#{files[1]}", @@ -2369,7 +2370,7 @@ def test_tail_path_with_multiline_with_multiple_paths d = create_driver(config, false) d.run(expect_emits: 2) do files.each do |file| - File.open(file, 'ab') { |f| + Fluent::FileWrapper.open(file, 'ab') { |f| f.puts "f #{file} line should be ignored" f.puts "s test1" f.puts "f test2" @@ -2500,7 +2501,7 @@ def test_EACCES end def test_shutdown_timeout - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| # Should be large enough to take too long time to consume (1024 * 1024 * 5).times do f.puts "{\"test\":\"fizzbuzz\"}" @@ -2548,7 +2549,7 @@ def test_lines_collected_with_no_throttling(data) conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/#{file}", 'wb') do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/#{file}", 'wb') do |f| num_lines.times do f.puts "#{msg}\n" end @@ -2594,7 +2595,7 @@ def test_lines_collected_with_no_throttling(data) d = create_driver(conf, false) file_path = "#{@tmp_dir}/#{file}" - File.open(file_path, 'wb') do |f| + Fluent::FileWrapper.open(file_path, 'wb') do |f| num_lines.times do f.puts msg end diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index be593ab7b1..099315e92c 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -5,6 +5,7 @@ require 'time' require 'timecop' require 'zlib' +require 'fluent/file_wrapper' class FileOutputTest < Test::Unit::TestCase def setup @@ -1016,7 +1017,7 @@ def run_and_check(d, symlink_path) test 'returns filepath with index which does not exist yet' do 5.times do |i| - File.open(File.join(@tmp, "exist_#{i}.log"), 'a'){|f| } # open(create) and close + Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a'){|f| } # open(create) and close end @i.find_filepath_available(File.join(@tmp, "exist_**.log")) do |path| assert_equal File.join(@tmp, "exist_5.log"), path @@ -1025,7 +1026,7 @@ def run_and_check(d, symlink_path) test 'creates lock directory when with_lock is true to exclude operations of other worker process' do 5.times do |i| - File.open(File.join(@tmp, "exist_#{i}.log"), 'a') + Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a') end Dir.mkdir(File.join(@tmp, "exist_5.log.lock")) @i.find_filepath_available(File.join(@tmp, "exist_**.log"), with_lock: true) do |path| diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 2022b132a8..78588f1b69 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -2,6 +2,7 @@ require 'fluent/event_router' require 'fluent/system_config' require 'fluent/supervisor' +require 'fluent/file_wrapper' require_relative 'test_plugin_classes' require 'net/http' @@ -32,7 +33,7 @@ def setup def write_config(path, data) FileUtils.mkdir_p(File.dirname(path)) - File.open(path, "w") {|f| f.write data } + Fluent::FileWrapper.open(path, "w") {|f| f.write data } end From af7972a43576d4ccdd1f4d208b6953aed9d4d891 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 1 Sep 2022 16:16:15 +0900 Subject: [PATCH 4/8] FileWrapper::WindowsFile: Support encoding parameter Signed-off-by: Takuro Ashie --- lib/fluent/file_wrapper.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/fluent/file_wrapper.rb b/lib/fluent/file_wrapper.rb index 383862671f..3e4fe19784 100644 --- a/lib/fluent/file_wrapper.rb +++ b/lib/fluent/file_wrapper.rb @@ -47,9 +47,11 @@ class WindowsFile INVALID_HANDLE_VALUE = -1 - def initialize(path, mode='r') + def initialize(path, mode_enc='r') @path = path + mode, enc = mode_enc.split(":", 2) @io = File.open(path, mode2flags(mode)) + @io.set_encoding(enc) if enc @file_handle = Win32API._get_osfhandle(@io.to_i) @io.instance_variable_set(:@file_index, self.ino) def @io.ino From 4ddf423e77d33b838b30020a3f3b80e6871cf4e2 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 1 Sep 2022 17:21:44 +0900 Subject: [PATCH 5/8] test_log: Suppress Errno::EACCES error on Ruby 3.2 Signed-off-by: Takuro Ashie --- test/test_log.rb | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/test/test_log.rb b/test/test_log.rb index bed0f2fb7c..480781d4d8 100644 --- a/test/test_log.rb +++ b/test/test_log.rb @@ -4,13 +4,16 @@ require 'fluent/log' require 'timecop' require 'logger' +require 'securerandom' class LogTest < Test::Unit::TestCase - TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/log/#{ENV['TEST_ENV_NUMBER']}") + def tmp_dir + File.join(File.dirname(__FILE__), "tmp", "log", "#{ENV['TEST_ENV_NUMBER']}", SecureRandom.hex(10)) + end def setup - FileUtils.rm_rf(TMP_DIR) - FileUtils.mkdir_p(TMP_DIR) + @tmp_dir = tmp_dir + FileUtils.mkdir_p(@tmp_dir) @log_device = Fluent::Test::DummyLogDevice.new @timestamp = Time.parse("2016-04-21 02:58:41 +0000") @timestamp_str = @timestamp.strftime("%Y-%m-%d %H:%M:%S %z") @@ -21,6 +24,13 @@ def teardown @log_device.reset Timecop.return Thread.current[:last_repeated_stacktrace] = nil + begin + FileUtils.rm_rf(@tmp_dir) + rescue Errno::EACCES + # It may occur on Windows because of delete pending state due to delayed GC. + # Ruby 3.2 or later doesn't ignore Errno::EACCES: + # https://github.com/ruby/ruby/commit/983115cf3c8f75b1afbe3274f02c1529e1ce3a81 + end end sub_test_case "log level" do @@ -560,7 +570,7 @@ def test_log_with_logdevio(expected) Timecop.freeze(@timestamp) rotate_age, rotate_size, travel_term = expected - path = "#{TMP_DIR}/log-dev-io-#{rotate_size}-#{rotate_age}" + path = "#{@tmp_dir}/log-dev-io-#{rotate_size}-#{rotate_age}" logdev = Fluent::LogDeviceIO.new(path, shift_age: rotate_age, shift_size: rotate_size) logger = ServerEngine::DaemonLogger.new(logdev) @@ -585,7 +595,7 @@ def test_log_rotates_specified_size_with_logdevio with_timezone('utc') do rotate_age = 2 rotate_size = 100 - path = "#{TMP_DIR}/log-dev-io-#{rotate_size}-#{rotate_age}" + path = "#{@tmp_dir}/log-dev-io-#{rotate_size}-#{rotate_age}" path0 = path + '.0' path1 = path + '.1' From c068a3257bba20b61a04ceb512893ce3aaf55fb8 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 1 Sep 2022 17:37:57 +0900 Subject: [PATCH 6/8] test_supervisor: Suppress Errno::EACCES error on Ruby 3.2 Signed-off-by: Takuro Ashie --- test/test_supervisor.rb | 41 +++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 78588f1b69..5c8b16a924 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -9,6 +9,7 @@ require 'uri' require 'fileutils' require 'tempfile' +require 'securerandom' if Fluent.windows? require 'win32/event' @@ -23,12 +24,24 @@ def config end end - TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/supervisor#{ENV['TEST_ENV_NUMBER']}") - TMP_ROOT_DIR = File.join(TMP_DIR, 'root') + def tmp_dir + File.join(File.dirname(__FILE__), "tmp", "supervisor#{ENV['TEST_ENV_NUMBER']}", SecureRandom.hex(10)) + end def setup - FileUtils.rm_rf(TMP_DIR) - FileUtils.mkdir_p(TMP_DIR) + @tmp_dir = tmp_dir + @tmp_root_dir = File.join(@tmp_dir, 'root') + FileUtils.mkdir_p(@tmp_dir) + end + + def teardown + begin + FileUtils.rm_rf(@tmp_dir) + rescue Errno::EACCES + # It may occur on Windows because of delete pending state due to delayed GC. + # Ruby 3.2 or later doesn't ignore Errno::EACCES: + # https://github.com/ruby/ruby/commit/983115cf3c8f75b1afbe3274f02c1529e1ce3a81 + end end def write_config(path, data) @@ -49,7 +62,7 @@ def test_system_config enable_get_dump true process_name "process_name" log_level info - root_dir #{TMP_ROOT_DIR} + root_dir #{@tmp_root_dir} format json time_format %Y @@ -77,7 +90,7 @@ def test_system_config assert_equal true, sys_conf.enable_get_dump assert_equal "process_name", sys_conf.process_name assert_equal 2, sys_conf.log_level - assert_equal TMP_ROOT_DIR, sys_conf.root_dir + assert_equal @tmp_root_dir, sys_conf.root_dir assert_equal :json, sys_conf.log.format assert_equal '%Y', sys_conf.log.time_format counter_server = sys_conf.counter_server @@ -117,7 +130,7 @@ def test_system_config enable_get_dump: true process_name: "process_name" log_level: info - root_dir: !fluent/s "#{TMP_ROOT_DIR}" + root_dir: !fluent/s "#{@tmp_root_dir}" log: format: json time_format: "%Y" @@ -145,7 +158,7 @@ def test_system_config true, "process_name", 2, - TMP_ROOT_DIR, + @tmp_root_dir, :json, '%Y', '127.0.0.1', @@ -450,7 +463,7 @@ def server.config end def test_load_config - tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" + tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf" conf_info_str = %[ log_level info @@ -521,7 +534,7 @@ def test_load_config end def test_load_config_for_logger - tmp_dir = "#{TMP_DIR}/dir/test_load_config_log.conf" + tmp_dir = "#{@tmp_dir}/dir/test_load_config_log.conf" conf_info_str = %[ @@ -548,7 +561,7 @@ def test_load_config_for_logger end def test_load_config_for_daemonize - tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" + tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf" conf_info_str = %[ log_level info @@ -645,7 +658,7 @@ def test_logger ) def test_logger_with_rotate_age_and_rotate_size(rotate_age) opts = Fluent::Supervisor.default_options.merge( - log_path: "#{TMP_DIR}/test", log_rotate_age: rotate_age, log_rotate_size: 10 + log_path: "#{@tmp_dir}/test", log_rotate_age: rotate_age, log_rotate_size: 10 ) sv = Fluent::Supervisor.new(opts) log = sv.instance_variable_get(:@log) @@ -675,7 +688,7 @@ def test_override_default_log_rotate file.puts(config) file.flush opts = Fluent::Supervisor.default_options.merge( - log_path: "#{TMP_DIR}/test.log", config_path: file.path + log_path: "#{@tmp_dir}/test.log", config_path: file.path ) sv = Fluent::Supervisor.new(opts) @@ -700,7 +713,7 @@ def test_override_default_log_rotate_with_yaml_config file.puts(config) file.flush opts = Fluent::Supervisor.default_options.merge( - log_path: "#{TMP_DIR}/test.log", config_path: file.path, config_file_type: :yaml, + log_path: "#{@tmp_dir}/test.log", config_path: file.path, config_file_type: :yaml, ) sv = Fluent::Supervisor.new(opts) From f70cb60baa1d541da7834eda356f3b1aee36d911 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 1 Sep 2022 17:44:11 +0900 Subject: [PATCH 7/8] test_fluentd: Suppress Errno::EACCES error on Ruby 3.2 Signed-off-by: Takuro Ashie --- test/command/test_fluentd.rb | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/test/command/test_fluentd.rb b/test/command/test_fluentd.rb index 726bc5b8f5..8845fc3ea8 100644 --- a/test/command/test_fluentd.rb +++ b/test/command/test_fluentd.rb @@ -5,21 +5,35 @@ require 'fileutils' require 'timeout' +require 'securerandom' require 'fluent/file_wrapper' class TestFluentdCommand < ::Test::Unit::TestCase - TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluentd#{ENV['TEST_ENV_NUMBER']}") SUPERVISOR_PID_PATTERN = /starting fluentd-[.0-9]+ pid=(\d+)/ WORKER_PID_PATTERN = /starting fluentd worker pid=(\d+) / + def tmp_dir + File.join(File.dirname(__FILE__), "..", "tmp", "command" "fluentd#{ENV['TEST_ENV_NUMBER']}", SecureRandom.hex(10)) + end + setup do - FileUtils.rm_rf(TMP_DIR) - FileUtils.mkdir_p(TMP_DIR) + @tmp_dir = tmp_dir + FileUtils.mkdir_p(@tmp_dir) @supervisor_pid = nil @worker_pids = [] ENV["TEST_RUBY_PATH"] = nil end + teardown do + begin + FileUtils.rm_rf(@tmp_dir) + rescue Errno::EACCES + # It may occur on Windows because of delete pending state due to delayed GC. + # Ruby 3.2 or later doesn't ignore Errno::EACCES: + # https://github.com/ruby/ruby/commit/983115cf3c8f75b1afbe3274f02c1529e1ce3a81 + end + end + def process_exist?(pid) begin r = Process.waitpid(pid, Process::WNOHANG) @@ -31,7 +45,7 @@ def process_exist?(pid) end def create_conf_file(name, content, ext_enc = 'utf-8') - conf_path = File.join(TMP_DIR, name) + conf_path = File.join(@tmp_dir, name) Fluent::FileWrapper.open(conf_path, "w:#{ext_enc}:utf-8") do |file| file.write content end @@ -39,7 +53,7 @@ def create_conf_file(name, content, ext_enc = 'utf-8') end def create_plugin_file(name, content) - file_path = File.join(TMP_DIR, 'plugin', name) + file_path = File.join(@tmp_dir, 'plugin', name) FileUtils.mkdir_p(File.dirname(file_path)) Fluent::FileWrapper.open(file_path, 'w') do |file| file.write content @@ -57,7 +71,7 @@ def create_cmdline(conf_path, *fluentd_options) end end - def execute_command(cmdline, chdir=TMP_DIR, env = {}) + def execute_command(cmdline, chdir=@tmp_dir, env = {}) null_stream = Fluent::FileWrapper.open(File::NULL, 'w') gemfile_path = File.expand_path(File.dirname(__FILE__) + "../../../Gemfile") @@ -104,7 +118,7 @@ def assert_log_matches(cmdline, *pattern_list, patterns_not_match: [], timeout: assert_error_msg = "" stdio_buf = "" begin - execute_command(cmdline, TMP_DIR, env) do |pid, stdout| + execute_command(cmdline, @tmp_dir, env) do |pid, stdout| begin waiting(timeout) do while process_exist?(pid) && !matched @@ -270,7 +284,7 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10) sub_test_case 'with system configuration about root directory' do setup do - @root_path = File.join(TMP_DIR, "rootpath") + @root_path = File.join(@tmp_dir, "rootpath") FileUtils.rm_rf(@root_path) @conf = < @@ -555,7 +569,7 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10) sub_test_case 'configured to run 2 workers' do setup do - @root_path = File.join(TMP_DIR, "rootpath") + @root_path = File.join(@tmp_dir, "rootpath") FileUtils.rm_rf(@root_path) FileUtils.mkdir_p(@root_path) end @@ -962,7 +976,7 @@ def multi_workers_ready? CONF ruby_path = ServerEngine.ruby_bin_path - tmp_ruby_path = File.join(TMP_DIR, "ruby with spaces") + tmp_ruby_path = File.join(@tmp_dir, "ruby with spaces") if Fluent.windows? tmp_ruby_path << ".bat" Fluent::FileWrapper.open(tmp_ruby_path, "w") do |file| From 9e34c1b6ce2aa11cef69150455c8d0e1b4e0a8a6 Mon Sep 17 00:00:00 2001 From: Takuro Ashie Date: Thu, 1 Sep 2022 20:59:00 +0900 Subject: [PATCH 8/8] test_in_tail: Don't remove pos file before shutdown It can't removed on Windows and `Errno::EACCES` isn't ignored as of Ruby 3.2. Signed-off-by: Takuro Ashie --- test/plugin/test_in_tail.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 6edb9f38d6..b1e28fc986 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1609,7 +1609,7 @@ def test_unwatched_files_should_be_removed Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test3\n" } end - cleanup_directory(@tmp_dir) + cleanup_file("#{@tmp_dir}/tail.txt") waiting(20) { sleep 0.1 until Dir.glob("#{@tmp_dir}/*.txt").size == 0 } # Ensure file is deleted on Windows waiting(5) { sleep 0.1 until d.instance.instance_variable_get(:@tails).keys.size <= 0 } @@ -1625,6 +1625,7 @@ def test_unwatched_files_should_be_removed ) ensure d.instance_shutdown if d && d.instance + cleanup_directory(@tmp_dir) end def count_timer_object