diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index b702314702..f11a4173e2 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -29,6 +29,7 @@ require 'fluent/system_config' require 'fluent/msgpack_factory' require 'fluent/variable_store' +require 'fluent/file_wrapper' require 'serverengine' if Fluent.windows? @@ -545,7 +546,7 @@ def init(process_type, worker_id) worker_id_suffixed_path(worker_id, @path) : @path, shift_age: @log_rotate_age, shift_size: @log_rotate_size) else - File.open(@path, "a") + Fluent::FileWrapper.open(@path, "a") end if @chuser || @chgroup chuid = @chuser ? ServerEngine::Privilege.get_etc_passwd(@chuser).uid : nil diff --git a/test/command/test_fluentd.rb b/test/command/test_fluentd.rb index 2ad023bf90..726bc5b8f5 100644 --- a/test/command/test_fluentd.rb +++ b/test/command/test_fluentd.rb @@ -5,6 +5,7 @@ require 'fileutils' require 'timeout' +require 'fluent/file_wrapper' class TestFluentdCommand < ::Test::Unit::TestCase TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/command/fluentd#{ENV['TEST_ENV_NUMBER']}") @@ -31,7 +32,7 @@ def process_exist?(pid) def create_conf_file(name, content, ext_enc = 'utf-8') conf_path = File.join(TMP_DIR, name) - File.open(conf_path, "w:#{ext_enc}:utf-8") do |file| + Fluent::FileWrapper.open(conf_path, "w:#{ext_enc}:utf-8") do |file| file.write content end conf_path @@ -40,7 +41,7 @@ def create_conf_file(name, content, ext_enc = 'utf-8') def create_plugin_file(name, content) file_path = File.join(TMP_DIR, 'plugin', name) FileUtils.mkdir_p(File.dirname(file_path)) - File.open(file_path, 'w') do |file| + Fluent::FileWrapper.open(file_path, 'w') do |file| file.write content end file_path @@ -57,7 +58,7 @@ def create_cmdline(conf_path, *fluentd_options) end def execute_command(cmdline, chdir=TMP_DIR, env = {}) - null_stream = File.open(File::NULL, 'w') + null_stream = Fluent::FileWrapper.open(File::NULL, 'w') gemfile_path = File.expand_path(File.dirname(__FILE__) + "../../../Gemfile") env = { "BUNDLE_GEMFILE" => gemfile_path }.merge(env) @@ -308,7 +309,7 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10) end test 'fails to launch fluentd if specified root path is invalid path for directory' do - File.open(@root_path, 'w') do |_| + Fluent::FileWrapper.open(@root_path, 'w') do |_| # create file and close it end conf_path = create_conf_file('existing_root_dir.conf', @conf) @@ -964,7 +965,7 @@ def multi_workers_ready? tmp_ruby_path = File.join(TMP_DIR, "ruby with spaces") if Fluent.windows? tmp_ruby_path << ".bat" - File.open(tmp_ruby_path, "w") do |file| + Fluent::FileWrapper.open(tmp_ruby_path, "w") do |file| file.write "#{ruby_path} %*" end else diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 204d45cbff..4391a67371 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -8,6 +8,7 @@ require 'timecop' require 'tmpdir' require 'securerandom' +require 'file_wrapper' class TailInputTest < Test::Unit::TestCase include FlexMock::TestCase @@ -378,9 +379,9 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(tailing_group_pattern, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG d = create_driver(conf, false) - File.open("#{@tmp_dir}/test1.txt", 'w') - File.open("#{@tmp_dir}/test2.txt", 'w') - File.open("#{@tmp_dir}/test3.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test1.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test2.txt", 'w') + Fluent::FileWrapper.open("#{@tmp_dir}/test3.txt", 'w') d.run do ## checking default group_watcher's paths @@ -419,10 +420,10 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) file4 = File.join(@tmp_dir, "test-podname4_test-namespace3_test-container-15fabq.log") d.run do - File.open(file1, 'w') - File.open(file2, 'w') - File.open(file3, 'w') - File.open(file4, 'w') + Fluent::FileWrapper.open(file1, 'w') + Fluent::FileWrapper.open(file2, 'w') + Fluent::FileWrapper.open(file3, 'w') + Fluent::FileWrapper.open(file4, 'w') instance = d.instance assert_equal(100, instance.find_group_from_metadata(file1).limit) @@ -438,7 +439,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) parse: PARSE_SINGLE_LINE_CONFIG) def test_emit(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -446,7 +447,7 @@ def test_emit(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\ntest4" } end @@ -462,11 +463,11 @@ def test_emit(data) def test_emit_with_emit_unmatched_lines_true config = config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test line 1" f.puts "test line 2" f.puts "bad line 1" @@ -498,7 +499,7 @@ def test_emit_with_read_lines_limit(data) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. d.run(expect_emits: num_events, timeout: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts msg f.puts msg } @@ -544,7 +545,7 @@ def test_emit_with_read_bytes_limit_per_second(data) d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| 100.times do f.puts msg end @@ -567,7 +568,7 @@ def test_read_bytes_limit_precede_read_lines_limit start_time = Fluent::Clock.now d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| 8000.times do f.puts msg end @@ -606,7 +607,7 @@ def test_emit_with_read_bytes_limit_per_second(data) io_handler end - File.open("#{@tmp_dir}/tail.txt", "ab") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") do |f| 100.times do f.puts msg end @@ -616,7 +617,7 @@ def test_emit_with_read_bytes_limit_per_second(data) d.run do start_time = Fluent::Clock.now while Fluent::Clock.now - start_time < 0.8 do - File.open("#{@tmp_dir}/tail.txt", "ab") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") do |f| f.puts msg f.flush end @@ -634,7 +635,7 @@ def test_longer_than_rotate_wait num_lines = 1024 * 3 msg = "08bytes" - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| f.write("#{msg}\n" * num_lines) end @@ -670,7 +671,7 @@ def test_shorter_than_rotate_wait num_lines = 1024 * 2 msg = "08bytes" - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| f.write("#{msg}\n" * num_lines) end @@ -718,7 +719,7 @@ def test_shorter_than_rotate_wait parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_read_from_head(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -726,7 +727,7 @@ def test_emit_with_read_from_head(data) d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -744,7 +745,7 @@ def test_emit_with_read_from_head(data) parse: CONFIG_DISABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG) def test_emit_without_watch_timer(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -752,7 +753,7 @@ def test_emit_without_watch_timer(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -776,7 +777,7 @@ def test_watch_wildcard_path_without_watch_timer }) config = config + CONFIG_DISABLE_WATCH_TIMER + SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -784,7 +785,7 @@ def test_watch_wildcard_path_without_watch_timer d = create_driver(config, false) d.run(expect_emits: 1, timeout: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -802,7 +803,7 @@ def test_watch_wildcard_path_without_watch_timer parse: CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_disable_stat_watcher(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -810,7 +811,7 @@ def test_emit_with_disable_stat_watcher(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -826,7 +827,7 @@ def test_always_read_from_head_on_detecting_a_new_file d = create_driver(SINGLE_LINE_CONFIG) d.run(expect_emits: 1, timeout: 3) do - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1\ntest2\n" } end @@ -873,7 +874,7 @@ def test_emit_with_system system_conf = parse_system(CONFIG_SYSTEM) sc = Fluent::SystemConfig.new(system_conf) Fluent::Engine.init(sc) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -881,7 +882,7 @@ def test_emit_with_system d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -938,13 +939,13 @@ def test_rotate_file_with_open_on_every_update(data) def test_rotate_file_with_write_old(data) config = data events = sub_test_rotate_file(config, expect_emits: 3) { |rotated_file| - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } rotated_file.puts "test7" rotated_file.puts "test8" rotated_file.flush sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -994,10 +995,10 @@ def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, t if block_given? yield file else - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -1012,7 +1013,7 @@ def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, t def test_truncate_file config = SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" f.flush @@ -1021,7 +1022,7 @@ def test_truncate_file d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\ntest4" f.flush } @@ -1052,7 +1053,7 @@ def test_truncate_file def test_move_truncate_move_back config = SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -1089,17 +1090,17 @@ def test_move_truncate_move_back end def test_lf - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| } d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.print "test3" } sleep 1 - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test4" } end @@ -1110,12 +1111,12 @@ def test_lf end def test_whitespace - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| } d = create_driver d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts " " # 4 spaces f.puts " 4 spaces" f.puts "4 spaces " @@ -1146,7 +1147,7 @@ def test_encoding(data) d = create_driver(CONFIG_READ_FROM_HEAD + encoding_config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test" } end @@ -1168,7 +1169,7 @@ def test_from_encoding utf8_message = cp932_message.encode(Encoding::UTF_8) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:cp932") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:cp932") {|f| f.puts cp932_message } end @@ -1191,7 +1192,7 @@ def test_from_encoding_utf16 utf8_message = utf16_message.encode(Encoding::UTF_8).strip d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:utf-16le") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:utf-16le") { |f| f.write utf16_message } end @@ -1212,7 +1213,7 @@ def test_encoding_with_bad_character d = create_driver(conf) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w") { |f| f.write "te\x86st\n" } end @@ -1227,11 +1228,11 @@ def test_encoding_with_bad_character parse: PARSE_MULTILINE_CONFIG) def test_multiline(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1255,11 +1256,11 @@ def test_multiline(data) parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_emit_unmatched_lines_true(data) config = data + config_element("", "", { "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1285,11 +1286,11 @@ def test_multiline_with_emit_unmatched_lines_true(data) parse: PARSE_MULTILINE_CONFIG_WITH_NEWLINE) def test_multiline_with_emit_unmatched_lines2(data) config = data + config_element("", "", { "emit_unmatched_lines" => true }) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 0, timeout: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "s test0" f.puts "f test1" f.puts "f test2" @@ -1311,7 +1312,7 @@ def test_multiline_with_emit_unmatched_lines2(data) data(flat: MULTILINE_CONFIG, parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_flush_interval(data) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = data + config_element("", "", { "multiline_flush_interval" => "2s" }) d = create_driver(config) @@ -1319,7 +1320,7 @@ def test_multiline_with_flush_interval(data) assert_equal(2, d.instance.multiline_flush_interval) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1354,7 +1355,7 @@ def test_multiline_encoding_of_flushed_record(data) d = create_driver(config + encoding_config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| f.puts "s test" } end @@ -1377,7 +1378,7 @@ def test_multiline_from_encoding_of_flushed_record cp932_message = "s \x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) utf8_message = "\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".encode(Encoding::UTF_8, Encoding::CP932) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "w:cp932") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w:cp932") { |f| f.puts cp932_message } end @@ -1409,11 +1410,11 @@ def test_multiline_from_encoding_of_flushed_record ) def test_multiline_with_multiple_formats(data) config = data - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1450,7 +1451,7 @@ def test_multiline_with_multiple_formats(data) ) def test_multilinelog_with_multiple_paths(data) files = ["#{@tmp_dir}/tail1.txt", "#{@tmp_dir}/tail2.txt"] - files.each { |file| File.open(file, "wb") { |f| } } + files.each { |file| Fluent::FileWrapper.open(file, "wb") { |f| } } config = data + config_element("", "", { "path" => "#{files[0]},#{files[1]}", @@ -1459,7 +1460,7 @@ def test_multilinelog_with_multiple_paths(data) d = create_driver(config, false) d.run(expect_emits: 2) do files.each do |file| - File.open(file, 'ab') { |f| + Fluent::FileWrapper.open(file, 'ab') { |f| f.puts "f #{file} line should be ignored" f.puts "s test1" f.puts "f test2" @@ -1494,12 +1495,12 @@ def test_multilinelog_with_multiple_paths(data) ]) ) def test_multiline_without_firstline(data) - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = data d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -1605,7 +1606,7 @@ def test_unwatched_files_should_be_removed d = create_driver(config, false) d.end_if { d.instance.instance_variable_get(:@tails).keys.size >= 1 } d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test3\n" } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test3\n" } end cleanup_directory(@tmp_dir) @@ -1882,7 +1883,7 @@ def test_tag_prefix_and_suffix_ignore "max_line_size" => label, "log_level" => "debug" }) - File.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| f.puts "foo" f.puts "x" * size # 'x' * size + \n > @max_line_size f.puts "bar" @@ -1908,7 +1909,7 @@ def test_tag_prefix_and_suffix_ignore # Ensure that no fatal exception is raised when a file is missing and that # files that do exist are still tailed as expected. def test_missing_file - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -1926,7 +1927,7 @@ def test_missing_file [config1, config2].each do |config| d = create_driver(config, false) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -1947,14 +1948,14 @@ def test_should_delete_file_pos_entry_for_non_existing_file_with_follow_inodes path = "#{@tmp_dir}/tail.txt" ino = 1 pos = 1234 - File.open("#{@tmp_dir}/tail.pos", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "wb") {|f| f.puts ("%s\t%016x\t%016x\n" % [path, pos, ino]) } d = create_driver(config, false) d.run - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 assert_raise(EOFError) do @@ -1964,20 +1965,20 @@ def test_should_delete_file_pos_entry_for_non_existing_file_with_follow_inodes def test_should_write_latest_offset_after_rotate_wait config = common_follow_inode_config - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d = create_driver(config, false) d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") sleep 1 - File.open("#{@tmp_dir}/tail.txt" + "1", "ab") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt" + "1", "ab") {|f| f.puts "test4\n"} end - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 line_parts = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(pos_file.readline) waiting(5) { @@ -1997,13 +1998,13 @@ def test_should_remove_deleted_file path = "#{@tmp_dir}/tail.txt" ino = 1 pos = 1234 - File.open("#{@tmp_dir}/tail.pos", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "wb") {|f| f.puts ("%s\t%016x\t%016x\n" % [path, pos, ino]) } d = create_driver(config) d.run do - pos_file = File.open("#{@tmp_dir}/tail.pos", "r") + pos_file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 assert_equal([], pos_file.readlines) end @@ -2024,14 +2025,14 @@ def test_should_mark_file_unwatched_after_limit_recently_modified_and_rotate_wai d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } target_info = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end @@ -2062,13 +2063,13 @@ def test_should_read_from_head_on_file_renaming_with_star_in_pattern d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1") end @@ -2082,13 +2083,13 @@ def test_should_not_read_from_head_on_rotation_when_watching_inodes d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1") @@ -2106,16 +2107,16 @@ def test_should_mark_file_unwatched_if_same_name_file_created_with_different_ino d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } target_info = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 2, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test4\n"} end new_target_info = create_target_info("#{@tmp_dir}/tail.txt") @@ -2147,7 +2148,7 @@ def test_should_close_watcher_after_rotate_wait @metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) end - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -2171,18 +2172,18 @@ def test_should_create_new_watcher_for_new_file_with_same_name d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } path_ino = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 1, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test3" f.puts "test4" } @@ -2208,15 +2209,15 @@ def test_truncate_file_with_follow_inodes d = create_driver(config, false) - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 3, shutdown: false) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} sleep 2 - File.open("#{@tmp_dir}/tail.txt", "w+b") {|f| f.puts "test4\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "w+b") {|f| f.puts "test4\n"} end events = d.events @@ -2230,7 +2231,7 @@ def test_truncate_file_with_follow_inodes # issue #3464 def test_should_replace_target_info - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1\n" } target_info = create_target_info("#{@tmp_dir}/tail.txt") @@ -2257,7 +2258,7 @@ def test_should_replace_target_info assert_equal([target_info.ino], inodes) cleanup_file("#{@tmp_dir}/tail.txt") - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test2\n"} + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test2\n"} while d.events.size < 2 do sleep 0.1 @@ -2274,7 +2275,7 @@ def test_should_replace_target_info sub_test_case "tail_path" do def test_tail_path_with_singleline - File.open("#{@tmp_dir}/tail.txt", "wb") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -2282,7 +2283,7 @@ def test_tail_path_with_singleline d = create_driver(SINGLE_LINE_CONFIG + config_element("", "", { "path_key" => "path" })) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") {|f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -2296,7 +2297,7 @@ def test_tail_path_with_singleline end def test_tail_path_with_multiline_with_firstline - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = config_element("", "", { "path_key" => "path", @@ -2306,7 +2307,7 @@ def test_tail_path_with_multiline_with_firstline }) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -2326,7 +2327,7 @@ def test_tail_path_with_multiline_with_firstline end def test_tail_path_with_multiline_without_firstline - File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = config_element("", "", { "path_key" => "path", @@ -2337,7 +2338,7 @@ def test_tail_path_with_multiline_without_firstline }) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{@tmp_dir}/tail.txt", "ab") { |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -2356,7 +2357,7 @@ def test_tail_path_with_multiline_with_multiple_paths omit "This testcase is unstable on AppVeyor." end files = ["#{@tmp_dir}/tail1.txt", "#{@tmp_dir}/tail2.txt"] - files.each { |file| File.open(file, "wb") { |f| } } + files.each { |file| Fluent::FileWrapper.open(file, "wb") { |f| } } config = config_element("", "", { "path" => "#{files[0]},#{files[1]}", @@ -2369,7 +2370,7 @@ def test_tail_path_with_multiline_with_multiple_paths d = create_driver(config, false) d.run(expect_emits: 2) do files.each do |file| - File.open(file, 'ab') { |f| + Fluent::FileWrapper.open(file, 'ab') { |f| f.puts "f #{file} line should be ignored" f.puts "s test1" f.puts "f test2" @@ -2500,7 +2501,7 @@ def test_EACCES end def test_shutdown_timeout - File.open("#{@tmp_dir}/tail.txt", "wb") do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") do |f| # Should be large enough to take too long time to consume (1024 * 1024 * 5).times do f.puts "{\"test\":\"fizzbuzz\"}" @@ -2548,7 +2549,7 @@ def test_lines_collected_with_no_throttling(data) conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG - File.open("#{@tmp_dir}/#{file}", 'wb') do |f| + Fluent::FileWrapper.open("#{@tmp_dir}/#{file}", 'wb') do |f| num_lines.times do f.puts "#{msg}\n" end @@ -2594,7 +2595,7 @@ def test_lines_collected_with_no_throttling(data) d = create_driver(conf, false) file_path = "#{@tmp_dir}/#{file}" - File.open(file_path, 'wb') do |f| + Fluent::FileWrapper.open(file_path, 'wb') do |f| num_lines.times do f.puts msg end diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index be593ab7b1..099315e92c 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -5,6 +5,7 @@ require 'time' require 'timecop' require 'zlib' +require 'fluent/file_wrapper' class FileOutputTest < Test::Unit::TestCase def setup @@ -1016,7 +1017,7 @@ def run_and_check(d, symlink_path) test 'returns filepath with index which does not exist yet' do 5.times do |i| - File.open(File.join(@tmp, "exist_#{i}.log"), 'a'){|f| } # open(create) and close + Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a'){|f| } # open(create) and close end @i.find_filepath_available(File.join(@tmp, "exist_**.log")) do |path| assert_equal File.join(@tmp, "exist_5.log"), path @@ -1025,7 +1026,7 @@ def run_and_check(d, symlink_path) test 'creates lock directory when with_lock is true to exclude operations of other worker process' do 5.times do |i| - File.open(File.join(@tmp, "exist_#{i}.log"), 'a') + Fluent::FileWrapper.open(File.join(@tmp, "exist_#{i}.log"), 'a') end Dir.mkdir(File.join(@tmp, "exist_5.log.lock")) @i.find_filepath_available(File.join(@tmp, "exist_**.log"), with_lock: true) do |path|