diff --git a/lib/fluent/file_wrapper.rb b/lib/fluent/file_wrapper.rb new file mode 100644 index 0000000000..3e4fe19784 --- /dev/null +++ b/lib/fluent/file_wrapper.rb @@ -0,0 +1,137 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +unless Fluent.windows? + Fluent::FileWrapper = File +else + require 'fluent/win32api' + + module Fluent + module FileWrapper + def self.open(path, mode='r') + io = WindowsFile.new(path, mode).io + if block_given? + v = yield io + io.close + v + else + io + end + end + + def self.stat(path) + f = WindowsFile.new(path) + s = f.stat + f.close + s + end + end + + class WindowsFile + include File::Constants + + attr_reader :io + + INVALID_HANDLE_VALUE = -1 + + def initialize(path, mode_enc='r') + @path = path + mode, enc = mode_enc.split(":", 2) + @io = File.open(path, mode2flags(mode)) + @io.set_encoding(enc) if enc + @file_handle = Win32API._get_osfhandle(@io.to_i) + @io.instance_variable_set(:@file_index, self.ino) + def @io.ino + @file_index + end + end + + def close + @io.close + @file_handle = INVALID_HANDLE_VALUE + end + + # To keep backward compatibility, we continue to use GetFileInformationByHandle() + # to get file id. + # Note that Ruby's File.stat uses GetFileInformationByHandleEx() with FileIdInfo + # and returned value is different with above one, former one is 64 bit while + # later one is 128bit. + def ino + by_handle_file_information = '\0'*(4+8+8+8+4+4+4+4+4+4) #72bytes + + unless Win32API.GetFileInformationByHandle(@file_handle, by_handle_file_information) + return 0 + end + + by_handle_file_information.unpack("I11Q1")[11] # fileindex + end + + def stat + raise Errno::ENOENT if delete_pending + s = File.stat(@path) + s.instance_variable_set :@ino, self.ino + def s.ino; @ino; end + s + end + + private + + def mode2flags(mode) + # Always inject File::Constants::SHARE_DELETE + # https://github.com/fluent/fluentd/pull/3585#issuecomment-1101502617 + # To enable SHARE_DELETE, BINARY is also required. + # https://bugs.ruby-lang.org/issues/11218 + # https://github.com/ruby/ruby/blob/d6684f063bc53e3cab025bd39526eca3b480b5e7/win32/win32.c#L6332-L6345 + flags = BINARY | SHARE_DELETE + case mode.delete("b") + when "r" + flags |= RDONLY + when "r+" + flags |= RDWR + when "w" + flags |= WRONLY | CREAT | TRUNC + when "w+" + flags |= RDWR | CREAT | TRUNC + when "a" + flags |= WRONLY | CREAT | APPEND + when "a+" + flags |= RDWR | CREAT | APPEND + else + raise Errno::EINVAL.new("Unsupported mode by Fluent::FileWrapper: #{mode}") + end + end + + # DeletePending is a Windows-specific file state that roughly means + # "this file is queued for deletion, so close any open handlers" + # + # This flag can be retrieved via GetFileInformationByHandleEx(). + # + # https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getfileinformationbyhandleex + # + def delete_pending + file_standard_info = 0x01 + bufsize = 1024 + buf = '\0' * bufsize + + unless Win32API.GetFileInformationByHandleEx(@file_handle, file_standard_info, buf, bufsize) + return false + end + + return buf.unpack("QQICC")[3] != 0 + end + end + end +end diff --git a/lib/fluent/plugin/file_wrapper.rb b/lib/fluent/plugin/file_wrapper.rb deleted file mode 100644 index 91075a2aed..0000000000 --- a/lib/fluent/plugin/file_wrapper.rb +++ /dev/null @@ -1,131 +0,0 @@ -# -# Fluentd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -require 'fluent/win32api' - -module Fluent - module FileWrapper - def self.open(path, mode='r') - io = WindowsFile.new(path, mode).io - if block_given? - v = yield io - io.close - v - else - io - end - end - - def self.stat(path) - f = WindowsFile.new(path) - s = f.stat - f.close - s - end - end - - class WindowsFile - include File::Constants - - attr_reader :io - - INVALID_HANDLE_VALUE = -1 - - def initialize(path, mode='r') - @path = path - @io = File.open(path, mode2flags(mode)) - @file_handle = Win32API._get_osfhandle(@io.to_i) - @io.instance_variable_set(:@file_index, self.ino) - def @io.ino - @file_index - end - end - - def close - @io.close - @file_handle = INVALID_HANDLE_VALUE - end - - # To keep backward compatibility, we continue to use GetFileInformationByHandle() - # to get file id. - # Note that Ruby's File.stat uses GetFileInformationByHandleEx() with FileIdInfo - # and returned value is different with above one, former one is 64 bit while - # later one is 128bit. - def ino - by_handle_file_information = '\0'*(4+8+8+8+4+4+4+4+4+4) #72bytes - - unless Win32API.GetFileInformationByHandle(@file_handle, by_handle_file_information) - return 0 - end - - by_handle_file_information.unpack("I11Q1")[11] # fileindex - end - - def stat - raise Errno::ENOENT if delete_pending - s = File.stat(@path) - s.instance_variable_set :@ino, self.ino - def s.ino; @ino; end - s - end - - private - - def mode2flags(mode) - # Always inject File::Constants::SHARE_DELETE - # https://github.com/fluent/fluentd/pull/3585#issuecomment-1101502617 - # To enable SHARE_DELETE, BINARY is also required. - # https://bugs.ruby-lang.org/issues/11218 - # https://github.com/ruby/ruby/blob/d6684f063bc53e3cab025bd39526eca3b480b5e7/win32/win32.c#L6332-L6345 - flags = BINARY | SHARE_DELETE - case mode.delete("b") - when "r" - flags |= RDONLY - when "r+" - flags |= RDWR - when "w" - flags |= WRONLY | CREAT | TRUNC - when "w+" - flags |= RDWR | CREAT | TRUNC - when "a" - flags |= WRONLY | CREAT | APPEND - when "a+" - flags |= RDWR | CREAT | APPEND - else - raise Errno::EINVAL.new("Unsupported mode by Fluent::FileWrapper: #{mode}") - end - end - - # DeletePending is a Windows-specific file state that roughly means - # "this file is queued for deletion, so close any open handlers" - # - # This flag can be retrieved via GetFileInformationByHandleEx(). - # - # https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-getfileinformationbyhandleex - # - def delete_pending - file_standard_info = 0x01 - bufsize = 1024 - buf = '\0' * bufsize - - unless Win32API.GetFileInformationByHandleEx(@file_handle, file_standard_info, buf, bufsize) - return false - end - - return buf.unpack("QQICC")[3] != 0 - end - end -end if Fluent.windows? diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index cf12810953..fb29dd249f 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -25,12 +25,7 @@ require 'fluent/capability' require 'fluent/plugin/in_tail/position_file' require 'fluent/plugin/in_tail/group_watch' - -if Fluent.windows? - require_relative 'file_wrapper' -else - Fluent::FileWrapper = File -end +require 'fluent/file_wrapper' module Fluent::Plugin class TailInput < Fluent::Plugin::Input diff --git a/test/command/test_fluentd.rb b/test/command/test_fluentd.rb index 2ad023bf90..8845fc3ea8 100644 --- a/test/command/test_fluentd.rb +++ b/test/command/test_fluentd.rb @@ -5,20 +5,35 @@ require 'fileutils' require 'timeout' +require 'securerandom' +require 'fluent/file_wrapper' class TestFluentdCommand < ::Test::Unit::TestCase - TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluentd#{ENV['TEST_ENV_NUMBER']}") SUPERVISOR_PID_PATTERN = /starting fluentd-[.0-9]+ pid=(\d+)/ WORKER_PID_PATTERN = /starting fluentd worker pid=(\d+) / + def tmp_dir + File.join(File.dirname(__FILE__), "..", "tmp", "command" "fluentd#{ENV['TEST_ENV_NUMBER']}", SecureRandom.hex(10)) + end + setup do - FileUtils.rm_rf(TMP_DIR) - FileUtils.mkdir_p(TMP_DIR) + @tmp_dir = tmp_dir + FileUtils.mkdir_p(@tmp_dir) @supervisor_pid = nil @worker_pids = [] ENV["TEST_RUBY_PATH"] = nil end + teardown do + begin + FileUtils.rm_rf(@tmp_dir) + rescue Errno::EACCES + # It may occur on Windows because of delete pending state due to delayed GC. + # Ruby 3.2 or later doesn't ignore Errno::EACCES: + # https://github.com/ruby/ruby/commit/983115cf3c8f75b1afbe3274f02c1529e1ce3a81 + end + end + def process_exist?(pid) begin r = Process.waitpid(pid, Process::WNOHANG) @@ -30,17 +45,17 @@ def process_exist?(pid) end def create_conf_file(name, content, ext_enc = 'utf-8') - conf_path = File.join(TMP_DIR, name) - File.open(conf_path, "w:#{ext_enc}:utf-8") do |file| + conf_path = File.join(@tmp_dir, name) + Fluent::FileWrapper.open(conf_path, "w:#{ext_enc}:utf-8") do |file| file.write content end conf_path end def create_plugin_file(name, content) - file_path = File.join(TMP_DIR, 'plugin', name) + file_path = File.join(@tmp_dir, 'plugin', name) FileUtils.mkdir_p(File.dirname(file_path)) - File.open(file_path, 'w') do |file| + Fluent::FileWrapper.open(file_path, 'w') do |file| file.write content end file_path @@ -56,8 +71,8 @@ def create_cmdline(conf_path, *fluentd_options) end end - def execute_command(cmdline, chdir=TMP_DIR, env = {}) - null_stream = File.open(File::NULL, 'w') + def execute_command(cmdline, chdir=@tmp_dir, env = {}) + null_stream = Fluent::FileWrapper.open(File::NULL, 'w') gemfile_path = File.expand_path(File.dirname(__FILE__) + "../../../Gemfile") env = { "BUNDLE_GEMFILE" => gemfile_path }.merge(env) @@ -103,7 +118,7 @@ def assert_log_matches(cmdline, *pattern_list, patterns_not_match: [], timeout: assert_error_msg = "" stdio_buf = "" begin - execute_command(cmdline, TMP_DIR, env) do |pid, stdout| + execute_command(cmdline, @tmp_dir, env) do |pid, stdout| begin waiting(timeout) do while process_exist?(pid) && !matched @@ -269,7 +284,7 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10) sub_test_case 'with system configuration about root directory' do setup do - @root_path = File.join(TMP_DIR, "rootpath") + @root_path = File.join(@tmp_dir, "rootpath") FileUtils.rm_rf(@root_path) @conf = < @@ -308,7 +323,7 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10) end test 'fails to launch fluentd if specified root path is invalid path for directory' do - File.open(@root_path, 'w') do |_| + Fluent::FileWrapper.open(@root_path, 'w') do |_| # create file and close it end conf_path = create_conf_file('existing_root_dir.conf', @conf) @@ -554,7 +569,7 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10) sub_test_case 'configured to run 2 workers' do setup do - @root_path = File.join(TMP_DIR, "rootpath") + @root_path = File.join(@tmp_dir, "rootpath") FileUtils.rm_rf(@root_path) FileUtils.mkdir_p(@root_path) end @@ -961,10 +976,10 @@ def multi_workers_ready? CONF ruby_path = ServerEngine.ruby_bin_path - tmp_ruby_path = File.join(TMP_DIR, "ruby with spaces") + tmp_ruby_path = File.join(@tmp_dir, "ruby with spaces") if Fluent.windows? tmp_ruby_path << ".bat" - File.open(tmp_ruby_path, "w") do |file| + Fluent::FileWrapper.open(tmp_ruby_path, "w") do |file| file.write "#{ruby_path} %*" end else diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 204d45cbff..b1e28fc986 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -3,6 +3,7 @@ require 'fluent/plugin/in_tail' require 'fluent/plugin/buffer' require 'fluent/system_config' +require 'fluent/file_wrapper' require 'net/http' require 'flexmock/test_unit' require 'timecop' @@ -378,9 +379,9 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(tailing_group_pattern, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG d = create_driver(conf, false) - File.open("#{@tmp_dir}/test1.txt", 'w') - File.open("#{@tmp_dir}/test2.txt", 'w') - File.open("#{@tmp_dir}/test3.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test1.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test2.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test3.txt", 'w') d.run do ## checking default group_watcher's paths @@ -419,10 +420,10 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) file4 = File.join(@tmp_dir, "test-podname4_test-namespace3_test-container-15fabq.log") d.run do - File.open(file1, 'w') - File.open(file2, 'w') - File.open(file3, 'w') - File.open(file4, 'w') + Fluent::FileWrapper.open(file1, 'w') + Fluent::FileWrapper.open(file2, 'w') + Fluent::FileWrapper.open(file3, 'w') + Fluent::FileWrapper.open(file4, 'w') instance = d.instance assert_equal(100, instance.find_group_from_metadata(file1).limit) @@ -438,7 +439,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) parse: PARSE_SINGLE_LINE_CONFIG) def test_emit(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -446,7 +447,7 @@ def test_emit(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\ntest4" } end @@ -462,11 +463,11 @@ def test_emit(data) def test_emit_with_emit_unmatched_lines_true config = config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test line 1" f.puts "test line 2" f.puts "bad line 1" @@ -498,7 +499,7 @@ def test_emit_with_read_lines_limit(data) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. d.run(expect_emits: num_events, timeout: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts msg f.puts msg } @@ -544,7 +545,7 @@ def test_emit_with_read_bytes_limit_per_second(data) d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| 100.times do f.puts msg end @@ -567,7 +568,7 @@ def test_read_bytes_limit_precede_read_lines_limit start_time = Fluent::Clock.now d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| 8000.times do f.puts msg end @@ -606,7 +607,7 @@ def test_emit_with_read_bytes_limit_per_second(data) io_handler end - File.open("#{@tmp_dir}/tail.txt", "ab") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") do |f| 100.times do f.puts msg end @@ -616,7 +617,7 @@ def test_emit_with_read_bytes_limit_per_second(data) d.run do start_time = Fluent::Clock.now while Fluent::Clock.now - start_time < 0.8 do - File.open("#{@tmp_dir}/tail.txt", "ab") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") do |f| f.puts msg f.flush end @@ -634,7 +635,7 @@ def test_longer_than_rotate_wait num_lines = 1024 * 3 msg = "08bytes" - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| f.write("#{msg}\n" * num_lines) end @@ -670,7 +671,7 @@ def test_shorter_than_rotate_wait num_lines = 1024 * 2 msg = "08bytes" - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| f.write("#{msg}\n" * num_lines) end @@ -718,7 +719,7 @@ def test_shorter_than_rotate_wait parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_read_from_head(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -726,7 +727,7 @@ def test_emit_with_read_from_head(data) d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -744,7 +745,7 @@ def test_emit_with_read_from_head(data) parse: CONFIG_DISABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG) def test_emit_without_watch_timer(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -752,7 +753,7 @@ def test_emit_without_watch_timer(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -776,7 +777,7 @@ def test_watch_wildcard_path_without_watch_timer }) config = config + CONFIG_DISABLE_WATCH_TIMER + SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -784,7 +785,7 @@ def test_watch_wildcard_path_without_watch_timer d = create_driver(config, false) d.run(expect_emits: 1, timeout: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -802,7 +803,7 @@ def test_watch_wildcard_path_without_watch_timer parse: CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_disable_stat_watcher(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -810,7 +811,7 @@ def test_emit_with_disable_stat_watcher(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -826,7 +827,7 @@ def test_always_read_from_head_on_detecting_a_new_file d = create_driver(SINGLE_LINE_CONFIG) d.run(expect_emits: 1, timeout: 3) do - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1\ntest2\n" } end @@ -873,7 +874,7 @@ def test_emit_with_system system_conf = parse_system(CONFIG_SYSTEM) sc = Fluent::SystemConfig.new(system_conf) Fluent::Engine.init(sc) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -881,7 +882,7 @@ def test_emit_with_system d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -938,13 +939,13 @@ def test_rotate_file_with_open_on_every_update(data) def test_rotate_file_with_write_old(data) config = data events = sub_test_rotate_file(config, expect_emits: 3) { |rotated_file| - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } rotated_file.puts "test7" rotated_file.puts "test8" rotated_file.flush sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -994,10 +995,10 @@ def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, t if block_given? yield file else - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -1012,7 +1013,7 @@ def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, t def test_truncate_file config = SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" f.flush @@ -1021,7 +1022,7 @@ def test_truncate_file d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\ntest4" f.flush } @@ -1052,7 +1053,7 @@ def test_truncate_file def test_move_truncate_move_back config = SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -1089,17 +1090,17 @@ def test_move_truncate_move_back end def test_lf - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| } d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.print "test3" } sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test4" } end @@ -1110,12 +1111,12 @@ def test_lf end def test_whitespace - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| } d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts " " # 4 spaces f.puts " 4 spaces" f.puts "4 spaces " @@ -1146,7 +1147,7 @@ def test_encoding(data) d = create_driver(CONFIG_READ_FROM_HEAD + encoding_config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test" } end @@ -1168,7 +1169,7 @@ def test_from_encoding utf8_message = cp932_message.encode(Encoding::UTF_8) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:cp932") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:cp932") {|f| f.puts cp932_message } end @@ -1191,7 +1192,7 @@ def test_from_encoding_utf16 utf8_message = utf16_message.encode(Encoding::UTF_8).strip d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:utf-16le") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:utf-16le") { |f| f.write utf16_message } end @@ -1212,7 +1213,7 @@ def test_encoding_with_bad_character d = create_driver(conf) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w") { |f| f.write "te\x86st\n" } end @@ -1227,11 +1228,11 @@ def test_encoding_with_bad_character parse: PARSE_MULTILINE_CONFIG) def test_multiline(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1255,11 +1256,11 @@ def test_multiline(data) parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_emit_unmatched_lines_true(data) config = data + config_element("", "", { "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1285,11 +1286,11 @@ def test_multiline_with_emit_unmatched_lines_true(data) parse: PARSE_MULTILINE_CONFIG_WITH_NEWLINE) def test_multiline_with_emit_unmatched_lines2(data) config = data + config_element("", "", { "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 0, timeout: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "s test0" f.puts "f test1" f.puts "f test2" @@ -1311,7 +1312,7 @@ def test_multiline_with_emit_unmatched_lines2(data) data(flat: MULTILINE_CONFIG, parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_flush_interval(data) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = data + config_element("", "", { "multiline_flush_interval" => "2s" }) d = create_driver(config) @@ -1319,7 +1320,7 @@ def test_multiline_with_flush_interval(data) assert_equal(2, d.instance.multiline_flush_interval) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1354,7 +1355,7 @@ def test_multiline_encoding_of_flushed_record(data) d = create_driver(config + encoding_config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| f.puts "s test" } end @@ -1377,7 +1378,7 @@ def test_multiline_from_encoding_of_flushed_record cp932_message = "s \x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) utf8_message = "\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".encode(Encoding::UTF_8, Encoding::CP932) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:cp932") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:cp932") { |f| f.puts cp932_message } end @@ -1409,11 +1410,11 @@ def test_multiline_from_encoding_of_flushed_record ) def test_multiline_with_multiple_formats(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1450,7 +1451,7 @@ def test_multiline_with_multiple_formats(data) ) def test_multilinelog_with_multiple_paths(data) files = ["#{@tmp_dir}/tail1.txt", "#{@tmp_dir}/tail2.txt"] - files.each { |file| File.open(file, "wb") { |f| } } + files.each { |file| Fluent::FileWrapper.open(file, "wb") { |f| } } config = data + config_element("", "", { "path" => "#{files[0]},#{files[1]}", @@ -1459,7 +1460,7 @@ def test_multilinelog_with_multiple_paths(data) d = create_driver(config, false) d.run(expect_emits: 2) do files.each do |file| - File.open(file, 'ab') { |f| + Fluent::FileWrapper.open(file, 'ab') { |f| f.puts "f #{file} line should be ignored" f.puts "s test1" f.puts "f test2" @@ -1494,12 +1495,12 @@ def test_multilinelog_with_multiple_paths(data) ]) ) def test_multiline_without_firstline(data) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = data d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -1605,10 +1606,10 @@ def test_unwatched_files_should_be_removed d = create_driver(config, false) d.end_if { d.instance.instance_variable_get(:@tails).keys.size >= 1 } d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test3\n" } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test3\n" } end - cleanup_directory(@tmp_dir) + cleanup_file("#{@tmp_dir}/tail.txt") waiting(20) { sleep 0.1 until Dir.glob("#{@tmp_dir}/*.txt").size == 0 } # Ensure file is deleted on Windows waiting(5) { sleep 0.1 until d.instance.instance_variable_get(:@tails).keys.size <= 0 } @@ -1624,6 +1625,7 @@ def test_unwatched_files_should_be_removed ) ensure d.instance_shutdown if d && d.instance + cleanup_directory(@tmp_dir) end def count_timer_object @@ -1882,7 +1884,7 @@ def test_tag_prefix_and_suffix_ignore "max_line_size" => label, "log_level" => "debug" }) - File.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| f.puts "foo" f.puts "x" * size # 'x' * size + \n > @max_line_size f.puts "bar" @@ -1908,7 +1910,7 @@ def test_tag_prefix_and_suffix_ignore # Ensure that no fatal exception is raised when a file is missing and that # files that do exist are still tailed as expected. def test_missing_file - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -1926,7 +1928,7 @@ def test_missing_file [config1, config2].each do |config| d = create_driver(config, false) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -1947,14 +1949,14 @@ def test_should_delete_file_pos_entry_for_non_existing_file_with_follow_inodes path = "#{@tmp_dir}/tail.txt" ino = 1 pos = 1234 - File.open("#{@tmp_dir}/tail.pos", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "wb") {|f| f.puts ("%s\t%016x\t%016x\n" % [path, pos, ino]) } d = create_driver(config, false) d.run - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 assert_raise(EOFError) do @@ -1964,20 +1966,20 @@ def test_should_delete_file_pos_entry_for_non_existing_file_with_follow_inodes def test_should_write_latest_offset_after_rotate_wait config = common_follow_inode_config - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d = create_driver(config, false) d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") sleep 1 - File.open("#{@tmp_dir}/tail.txt" + "1", "ab") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt" + "1", "ab") {|f| f.puts "test4\n"} end - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 line_parts = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(pos_file.readline) waiting(5) { @@ -1997,13 +1999,13 @@ def test_should_remove_deleted_file path = "#{@tmp_dir}/tail.txt" ino = 1 pos = 1234 - File.open("#{@tmp_dir}/tail.pos", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "wb") {|f| f.puts ("%s\t%016x\t%016x\n" % [path, pos, ino]) } d = create_driver(config) d.run do - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 assert_equal([], pos_file.readlines) end @@ -2024,14 +2026,14 @@ def test_should_mark_file_unwatched_after_limit_recently_modified_and_rotate_wai d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } target_info = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end @@ -2062,13 +2064,13 @@ def test_should_read_from_head_on_file_renaming_with_star_in_pattern d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1") end @@ -2082,13 +2084,13 @@ def test_should_not_read_from_head_on_rotation_when_watching_inodes d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1") @@ -2106,16 +2108,16 @@ def test_should_mark_file_unwatched_if_same_name_file_created_with_different_ino d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } target_info = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test4\n"} end new_target_info = create_target_info("#{@tmp_dir}/tail.txt") @@ -2147,7 +2149,7 @@ def test_should_close_watcher_after_rotate_wait @metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) end - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -2171,18 +2173,18 @@ def test_should_create_new_watcher_for_new_file_with_same_name d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } path_ino = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test3" f.puts "test4" } @@ -2208,15 +2210,15 @@ def test_truncate_file_with_follow_inodes d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 3, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} sleep 2 - File.open("#{@tmp_dir}/tail.txt", "w+b") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w+b") {|f| f.puts "test4\n"} end events = d.events @@ -2230,7 +2232,7 @@ def test_truncate_file_with_follow_inodes # issue #3464 def test_should_replace_target_info - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1\n" } target_info = create_target_info("#{@tmp_dir}/tail.txt") @@ -2257,7 +2259,7 @@ def test_should_replace_target_info assert_equal([target_info.ino], inodes) cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test2\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test2\n"} while d.events.size < 2 do sleep 0.1 @@ -2274,7 +2276,7 @@ def test_should_replace_target_info sub_test_case "tail_path" do def test_tail_path_with_singleline - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -2282,7 +2284,7 @@ def test_tail_path_with_singleline d = create_driver(SINGLE_LINE_CONFIG + config_element("", "", { "path_key" => "path" })) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -2296,7 +2298,7 @@ def test_tail_path_with_singleline end def test_tail_path_with_multiline_with_firstline - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = config_element("", "", { "path_key" => "path", @@ -2306,7 +2308,7 @@ def test_tail_path_with_multiline_with_firstline }) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -2326,7 +2328,7 @@ def test_tail_path_with_multiline_with_firstline end def test_tail_path_with_multiline_without_firstline - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = config_element("", "", { "path_key" => "path", @@ -2337,7 +2339,7 @@ def test_tail_path_with_multiline_without_firstline }) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -2356,7 +2358,7 @@ def test_tail_path_with_multiline_with_multiple_paths omit "This testcase is unstable on AppVeyor." end files = ["#{@tmp_dir}/tail1.txt", "#{@tmp_dir}/tail2.txt"] - files.each { |file| File.open(file, "wb") { |f| } } + files.each { |file| Fluent::FileWrapper.open(file, "wb") { |f| } } config = config_element("", "", { "path" => "#{files[0]},#{files[1]}", @@ -2369,7 +2371,7 @@ def test_tail_path_with_multiline_with_multiple_paths d = create_driver(config, false) d.run(expect_emits: 2) do files.each do |file| - File.open(file, 'ab') { |f| + Fluent::FileWrapper.open(file, 'ab') { |f| f.puts "f #{file} line should be ignored" f.puts "s test1" f.puts "f test2" @@ -2500,7 +2502,7 @@ def test_EACCES end def test_shutdown_timeout - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| # Should be large enough to take too long time to consume (1024 * 1024 * 5).times do f.puts "{\"test\":\"fizzbuzz\"}" @@ -2548,7 +2550,7 @@ def test_lines_collected_with_no_throttling(data) conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/#{file}", 'wb') do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/#{file}", 'wb') do |f| num_lines.times do f.puts "#{msg}\n" end @@ -2594,7 +2596,7 @@ def test_lines_collected_with_no_throttling(data) d = create_driver(conf, false) file_path = "#{@tmp_dir}/#{file}" - File.open(file_path, 'wb') do |f| + Fluent::FileWrapper.open(file_path, 'wb') do |f| num_lines.times do f.puts msg end diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index be593ab7b1..099315e92c 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -5,6 +5,7 @@ require 'time' require 'timecop' require 'zlib' +require 'fluent/file_wrapper' class FileOutputTest < Test::Unit::TestCase def setup @@ -1016,7 +1017,7 @@ def run_and_check(d, symlink_path) test 'returns filepath with index which does not exist yet' do 5.times do |i| - File.open(File.join(@tmp, "exist_#{i}.log"), 'a'){|f| } # open(create) and close + Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a'){|f| } # open(create) and close end @i.find_filepath_available(File.join(@tmp, "exist_**.log")) do |path| assert_equal File.join(@tmp, "exist_5.log"), path @@ -1025,7 +1026,7 @@ def run_and_check(d, symlink_path) test 'creates lock directory when with_lock is true to exclude operations of other worker process' do 5.times do |i| - File.open(File.join(@tmp, "exist_#{i}.log"), 'a') + Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a') end Dir.mkdir(File.join(@tmp, "exist_5.log.lock")) @i.find_filepath_available(File.join(@tmp, "exist_**.log"), with_lock: true) do |path| diff --git a/test/plugin/test_file_wrapper.rb b/test/test_file_wrapper.rb similarity index 94% rename from test/plugin/test_file_wrapper.rb rename to test/test_file_wrapper.rb index 93e7e61fbc..22e5b5ad92 100644 --- a/test/plugin/test_file_wrapper.rb +++ b/test/test_file_wrapper.rb @@ -1,5 +1,5 @@ -require_relative '../helper' -require 'fluent/plugin/file_wrapper' +require_relative 'helper' +require 'fluent/file_wrapper' class FileWrapperTest < Test::Unit::TestCase TMP_DIR = File.dirname(__FILE__) + "/../tmp/file_wrapper#{ENV['TEST_ENV_NUMBER']}" diff --git a/test/test_log.rb b/test/test_log.rb index bed0f2fb7c..480781d4d8 100644 --- a/test/test_log.rb +++ b/test/test_log.rb @@ -4,13 +4,16 @@ require 'fluent/log' require 'timecop' require 'logger' +require 'securerandom' class LogTest < Test::Unit::TestCase - TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/log/#{ENV['TEST_ENV_NUMBER']}") + def tmp_dir + File.join(File.dirname(__FILE__), "tmp", "log", "#{ENV['TEST_ENV_NUMBER']}", SecureRandom.hex(10)) + end def setup - FileUtils.rm_rf(TMP_DIR) - FileUtils.mkdir_p(TMP_DIR) + @tmp_dir = tmp_dir + FileUtils.mkdir_p(@tmp_dir) @log_device = Fluent::Test::DummyLogDevice.new @timestamp = Time.parse("2016-04-21 02:58:41 +0000") @timestamp_str = @timestamp.strftime("%Y-%m-%d %H:%M:%S %z") @@ -21,6 +24,13 @@ def teardown @log_device.reset Timecop.return Thread.current[:last_repeated_stacktrace] = nil + begin + FileUtils.rm_rf(@tmp_dir) + rescue Errno::EACCES + # It may occur on Windows because of delete pending state due to delayed GC. + # Ruby 3.2 or later doesn't ignore Errno::EACCES: + # https://github.com/ruby/ruby/commit/983115cf3c8f75b1afbe3274f02c1529e1ce3a81 + end end sub_test_case "log level" do @@ -560,7 +570,7 @@ def test_log_with_logdevio(expected) Timecop.freeze(@timestamp) rotate_age, rotate_size, travel_term = expected - path = "#{TMP_DIR}/log-dev-io-#{rotate_size}-#{rotate_age}" + path = "#{@tmp_dir}/log-dev-io-#{rotate_size}-#{rotate_age}" logdev = Fluent::LogDeviceIO.new(path, shift_age: rotate_age, shift_size: rotate_size) logger = ServerEngine::DaemonLogger.new(logdev) @@ -585,7 +595,7 @@ def test_log_rotates_specified_size_with_logdevio with_timezone('utc') do rotate_age = 2 rotate_size = 100 - path = "#{TMP_DIR}/log-dev-io-#{rotate_size}-#{rotate_age}" + path = "#{@tmp_dir}/log-dev-io-#{rotate_size}-#{rotate_age}" path0 = path + '.0' path1 = path + '.1' diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb index 2022b132a8..5c8b16a924 100644 --- a/test/test_supervisor.rb +++ b/test/test_supervisor.rb @@ -2,12 +2,14 @@ require 'fluent/event_router' require 'fluent/system_config' require 'fluent/supervisor' +require 'fluent/file_wrapper' require_relative 'test_plugin_classes' require 'net/http' require 'uri' require 'fileutils' require 'tempfile' +require 'securerandom' if Fluent.windows? require 'win32/event' @@ -22,17 +24,29 @@ def config end end - TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/supervisor#{ENV['TEST_ENV_NUMBER']}") - TMP_ROOT_DIR = File.join(TMP_DIR, 'root') + def tmp_dir + File.join(File.dirname(__FILE__), "tmp", "supervisor#{ENV['TEST_ENV_NUMBER']}", SecureRandom.hex(10)) + end def setup - FileUtils.rm_rf(TMP_DIR) - FileUtils.mkdir_p(TMP_DIR) + @tmp_dir = tmp_dir + @tmp_root_dir = File.join(@tmp_dir, 'root') + FileUtils.mkdir_p(@tmp_dir) + end + + def teardown + begin + FileUtils.rm_rf(@tmp_dir) + rescue Errno::EACCES + # It may occur on Windows because of delete pending state due to delayed GC. + # Ruby 3.2 or later doesn't ignore Errno::EACCES: + # https://github.com/ruby/ruby/commit/983115cf3c8f75b1afbe3274f02c1529e1ce3a81 + end end def write_config(path, data) FileUtils.mkdir_p(File.dirname(path)) - File.open(path, "w") {|f| f.write data } + Fluent::FileWrapper.open(path, "w") {|f| f.write data } end @@ -48,7 +62,7 @@ def test_system_config enable_get_dump true process_name "process_name" log_level info - root_dir #{TMP_ROOT_DIR} + root_dir #{@tmp_root_dir} format json time_format %Y @@ -76,7 +90,7 @@ def test_system_config assert_equal true, sys_conf.enable_get_dump assert_equal "process_name", sys_conf.process_name assert_equal 2, sys_conf.log_level - assert_equal TMP_ROOT_DIR, sys_conf.root_dir + assert_equal @tmp_root_dir, sys_conf.root_dir assert_equal :json, sys_conf.log.format assert_equal '%Y', sys_conf.log.time_format counter_server = sys_conf.counter_server @@ -116,7 +130,7 @@ def test_system_config enable_get_dump: true process_name: "process_name" log_level: info - root_dir: !fluent/s "#{TMP_ROOT_DIR}" + root_dir: !fluent/s "#{@tmp_root_dir}" log: format: json time_format: "%Y" @@ -144,7 +158,7 @@ def test_system_config true, "process_name", 2, - TMP_ROOT_DIR, + @tmp_root_dir, :json, '%Y', '127.0.0.1', @@ -449,7 +463,7 @@ def server.config end def test_load_config - tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" + tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf" conf_info_str = %[ log_level info @@ -520,7 +534,7 @@ def test_load_config end def test_load_config_for_logger - tmp_dir = "#{TMP_DIR}/dir/test_load_config_log.conf" + tmp_dir = "#{@tmp_dir}/dir/test_load_config_log.conf" conf_info_str = %[ @@ -547,7 +561,7 @@ def test_load_config_for_logger end def test_load_config_for_daemonize - tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf" + tmp_dir = "#{@tmp_dir}/dir/test_load_config.conf" conf_info_str = %[ log_level info @@ -644,7 +658,7 @@ def test_logger ) def test_logger_with_rotate_age_and_rotate_size(rotate_age) opts = Fluent::Supervisor.default_options.merge( - log_path: "#{TMP_DIR}/test", log_rotate_age: rotate_age, log_rotate_size: 10 + log_path: "#{@tmp_dir}/test", log_rotate_age: rotate_age, log_rotate_size: 10 ) sv = Fluent::Supervisor.new(opts) log = sv.instance_variable_get(:@log) @@ -674,7 +688,7 @@ def test_override_default_log_rotate file.puts(config) file.flush opts = Fluent::Supervisor.default_options.merge( - log_path: "#{TMP_DIR}/test.log", config_path: file.path + log_path: "#{@tmp_dir}/test.log", config_path: file.path ) sv = Fluent::Supervisor.new(opts) @@ -699,7 +713,7 @@ def test_override_default_log_rotate_with_yaml_config file.puts(config) file.flush opts = Fluent::Supervisor.default_options.merge( - log_path: "#{TMP_DIR}/test.log", config_path: file.path, config_file_type: :yaml, + log_path: "#{@tmp_dir}/test.log", config_path: file.path, config_file_type: :yaml, ) sv = Fluent::Supervisor.new(opts)