From a8f241f6c2e6c6a43e2142ab4a6c950ca03c944d Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 26 Jan 2023 10:51:57 +0900 Subject: [PATCH 1/6] buffer: backup broken file chunk Backup feature was implemented in #1952, but it didn't support handling broken file chunks found in resuming buffer. This extends the backup feature to support it. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/buf_file.rb | 19 +++++- lib/fluent/plugin/buf_file_single.rb | 19 +++++- lib/fluent/plugin/buffer.rb | 15 +++++ lib/fluent/plugin/buffer/file_chunk.rb | 2 +- lib/fluent/plugin/output.rb | 13 +--- test/plugin/test_buf_file.rb | 85 +++++++++++++++++++------- test/plugin/test_buf_file_single.rb | 65 ++++++++++++++++++++ 7 files changed, 179 insertions(+), 39 deletions(-) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 4725f5c9fe..d7e83c58bc 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -195,8 +195,23 @@ def generate_chunk(metadata) end def handle_broken_files(path, mode, e) - log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message - # After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink. + log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message + unique_id = Fluent::Plugin::Buffer::FileChunk.unique_id_from_path(path) + if @disable_chunk_backup + log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away" + return + end + backup(unique_id) { |f| + File.open(path, 'rb') { |chunk| + chunk.set_encoding(Encoding::ASCII_8BIT) + chunk.sync = true + chunk.binmode + IO.copy_stream(chunk, f) + } + } + rescue => error + log.error "backup failed. Delete corresponding files.", :err_msg => error.message + ensure File.unlink(path, path + '.meta') rescue nil end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index 225447063e..c798a294a0 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -207,8 +207,23 @@ def generate_chunk(metadata) end def handle_broken_files(path, mode, e) - log.error "found broken chunk file during resume. Delete corresponding files:", path: path, mode: mode, err_msg: e.message - # After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink. + log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message + unique_id, _ = Fluent::Plugin::Buffer::FileSingleChunk.unique_id_and_key_from_path(path) + if @disable_chunk_backup + log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away" + return + end + backup(unique_id) { |f| + File.open(path, 'rb') { |chunk| + chunk.set_encoding(Encoding::ASCII_8BIT) + chunk.sync = true + chunk.binmode + IO.copy_stream(chunk, f) + } + } + rescue => error + log.error "backup failed. Delete corresponding files.", :err_msg => error.message + ensure File.unlink(path) rescue nil end diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 8315694e64..6f8ddcfa2c 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -66,6 +66,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than desc 'Compress buffered data.' config_param :compress, :enum, list: [:text, :gzip], default: :text + desc 'If true, chunks are thrown away when unrecoverable error happens' + config_param :disable_chunk_backup, :bool, default: false + Metadata = Struct.new(:timekey, :tag, :variables, :seq) do def initialize(timekey, tag, variables) super(timekey, tag, variables, 0) @@ -903,6 +906,18 @@ def statistics { 'buffer' => stats } end + def backup(chunk_unique_id) + unique_id = dump_unique_id_hex(chunk_unique_id) + safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') + backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR + backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_owner_id, "#{unique_id}.log") + backup_dir = File.dirname(backup_file) + + log.warn "bad chunk is moved to #{backup_file}" + FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir) + File.open(backup_file, 'ab', system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION) { |f| yield f } + end + private def optimistic_queued?(metadata = nil) diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index e08987cf55..06bbf017c2 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -204,7 +204,7 @@ def self.generate_queued_chunk_path(path, unique_id) end end - # used only for queued v0.12 buffer path + # used only for queued v0.12 buffer path or broken files def self.unique_id_from_path(path) if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern return $2.scan(/../).map{|x| x.to_i(16) }.pack('C*') diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 9087994bc9..1bffc24460 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -99,7 +99,6 @@ class Output < Base config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.' config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.' - config_param :disable_chunk_backup, :bool, default: false, desc: 'If true, chunks are thrown away when unrecoverable error happens' end config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do @@ -1240,18 +1239,10 @@ def try_flush end def backup_chunk(chunk, using_secondary, delayed_commit) - if @buffer_config.disable_chunk_backup + if @buffer.disable_chunk_backup log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away" else - unique_id = dump_unique_id_hex(chunk.unique_id) - safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') - backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR - backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log") - backup_dir = File.dirname(backup_file) - - log.warn "bad chunk is moved to #{backup_file}" - FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir) - File.open(backup_file, 'ab', system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION) { |f| + @buffer.backup(chunk.unique_id) { |f| chunk.write_to(f) } end diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 6a89bd9867..99276b1de1 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -1151,15 +1151,13 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) sub_test_case 'there are existing broken file chunks' do setup do + @id_output = 'backup_test' @bufdir = File.expand_path('../../tmp/broken_buffer_file', __FILE__) - FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir) + FileUtils.rm_rf @bufdir rescue nil + FileUtils.mkdir_p @bufdir @bufpath = File.join(@bufdir, 'broken_test.*.log') Fluent::Test.setup - @d = FluentPluginFileBufferTest::DummyOutputPlugin.new - @p = Fluent::Plugin::FileBuffer.new - @p.owner = @d - @p.configure(config_element('buffer', '', {'path' => @bufpath})) end teardown do @@ -1171,12 +1169,12 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) @p.close unless @p.closed? @p.terminate unless @p.terminated? end - if @bufdir - Dir.glob(File.join(@bufdir, '*')).each do |path| - next if ['.', '..'].include?(File.basename(path)) - File.delete(path) - end - end + end + + def setup_plugins(buf_conf) + @d = FluentPluginFileBufferTest::DummyOutputPlugin.new + @d.configure(config_element('ROOT', '', {'@id' => @id_output}, [config_element('buffer', '', buf_conf)])) + @p = @d.buffer end def create_first_chunk(mode) @@ -1232,44 +1230,85 @@ def compare_log(plugin, msg) assert { logs.any? { |log| log.include?(msg) } } end - test '#resume ignores staged empty chunk' do - _, p1 = create_first_chunk('b') + test '#resume backups staged empty chunk' do + setup_plugins({'path' => @bufpath}) + c1id, p1 = create_first_chunk('b') File.open(p1, 'wb') { |f| } # create staged empty chunk file c2id, _ = create_second_chunk('b') - @p.start + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged) compare_log(@p, 'staged file chunk is empty') + assert_false File.exist?(p1) + assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") end - test '#resume ignores staged broken metadata' do + test '#resume backups staged broken metadata' do + setup_plugins({'path' => @bufpath}) c1id, _ = create_first_chunk('b') - _, p2 = create_second_chunk('b') + c2id, p2 = create_second_chunk('b') File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file - @p.start + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged) compare_log(@p, 'staged meta file is broken') + assert_false File.exist?(p2) + assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") end - test '#resume ignores enqueued empty chunk' do - _, p1 = create_first_chunk('q') + test '#resume backups enqueued empty chunk' do + setup_plugins({'path' => @bufpath}) + c1id, p1 = create_first_chunk('q') File.open(p1, 'wb') { |f| } # create enqueued empty chunk file c2id, _ = create_second_chunk('q') - @p.start + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + compare_queued_chunk(@p.queue, c2id, 3, :queued) compare_log(@p, 'enqueued file chunk is empty') + assert_false File.exist?(p1) + assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") end - test '#resume ignores enqueued broken metadata' do + test '#resume backups enqueued broken metadata' do + setup_plugins({'path' => @bufpath}) c1id, _ = create_first_chunk('q') - _, p2 = create_second_chunk('q') + c2id, p2 = create_second_chunk('q') File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create enqueued broken meta file - @p.start + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + compare_queued_chunk(@p.queue, c1id, 4, :queued) compare_log(@p, 'enqueued meta file is broken') + assert_false File.exist?(p2) + assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") + end + + test '#resume throws away broken chunk with disable_chunk_backup' do + setup_plugins({'path' => @bufpath, 'disable_chunk_backup' => true}) + c1id, _ = create_first_chunk('b') + c2id, p2 = create_second_chunk('b') + File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file + + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + + compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged) + compare_log(@p, 'staged meta file is broken') + compare_log(@p, 'disable_chunk_backup is true') + assert_false File.exist?(p2) + assert_false File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") end end end diff --git a/test/plugin/test_buf_file_single.rb b/test/plugin/test_buf_file_single.rb index 9a14a2f4c7..b1caf3a19e 100644 --- a/test/plugin/test_buf_file_single.rb +++ b/test/plugin/test_buf_file_single.rb @@ -830,4 +830,69 @@ def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::Dum assert_equal :queued, queue[0].state end end + + sub_test_case 'there are existing broken file chunks' do + setup do + FileUtils.rm_rf(@bufdir) rescue nil + FileUtils.mkdir_p(@bufdir) + end + + teardown do + return unless @p + + @p.stop unless @p.stopped? + @p.before_shutdown unless @p.before_shutdown? + @p.shutdown unless @p.shutdown? + @p.after_shutdown unless @p.after_shutdown? + @p.close unless @p.closed? + @p.terminate unless @p.terminated? + end + + test '#resume backups empty chunk' do + id_output = 'backup_test' + @d = create_driver(%[ + @id #{id_output} + + @type file_single + path #{PATH} + + ]) + @p = @d.instance.buffer + + c1id = Fluent::UniqueId.generate + p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf") + File.open(p1, 'wb') { |f| } # create empty chunk file + + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + + assert_false File.exist?(p1) + assert_true File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") + end + + test '#resume throws away broken chunk with disable_chunk_backup' do + id_output = 'backup_test' + @d = create_driver(%[ + @id #{id_output} + + @type file_single + path #{PATH} + disable_chunk_backup true + + ]) + @p = @d.instance.buffer + + c1id = Fluent::UniqueId.generate + p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf") + File.open(p1, 'wb') { |f| } # create empty chunk file + + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + + assert_false File.exist?(p1) + assert_false File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") + end + end end From 0467bdb89330a16fa9ae0d88b5ca883a594ad29d Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 26 Jan 2023 15:49:58 +0900 Subject: [PATCH 2/6] keep compatibility of "disable_chunk_backup" option Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/output.rb | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 1bffc24460..28dd3f8d2d 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -377,6 +377,7 @@ def configure(conf) buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, []) @buffer = Plugin.new_buffer(buffer_type, parent: self) @buffer.configure(buffer_conf) + keep_buffer_config_compat @buffer.enable_update_timekeys if @chunk_key_time @flush_at_shutdown = @buffer_config.flush_at_shutdown @@ -434,6 +435,12 @@ def configure(conf) self end + def keep_buffer_config_compat + # Need this to call `@buffer_config.disable_chunk_backup` just as before, + # since some plugins may use this option in this shape. + @buffer_config[:disable_chunk_backup] = @buffer.disable_chunk_backup + end + def start super From 2fd9752a81a706f9443d35c92ee7dd8db4dba56f Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 26 Jan 2023 15:51:22 +0900 Subject: [PATCH 3/6] fix test `disable_chunk_backup` option is still in `buffer` section, but it is moved into `Buffer` from `Output`. Signed-off-by: Daijiro Fukuda --- test/command/test_plugin_config_formatter.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/test/command/test_plugin_config_formatter.rb b/test/command/test_plugin_config_formatter.rb index 28e311401d..a72ea29340 100644 --- a/test/command/test_plugin_config_formatter.rb +++ b/test/command/test_plugin_config_formatter.rb @@ -188,7 +188,6 @@ class SimpleServiceDiscovery < ::Fluent::Plugin::ServiceDiscovery retry_exponential_backoff_base: float: (2) retry_max_interval: time: (nil) retry_randomize: bool: (true) - disable_chunk_backup: bool: (false) : optional, single @type: string: (nil) : optional, single From 6152b8fc111adb1e137546ab6c1bc5958e3682bc Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 13 Feb 2023 14:58:38 +0900 Subject: [PATCH 4/6] Move some logic to parent buffer class Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/buf_file.rb | 5 +---- lib/fluent/plugin/buf_file_single.rb | 5 +---- lib/fluent/plugin/buffer.rb | 6 ++++++ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index d7e83c58bc..4172909509 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -197,10 +197,6 @@ def generate_chunk(metadata) def handle_broken_files(path, mode, e) log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message unique_id = Fluent::Plugin::Buffer::FileChunk.unique_id_from_path(path) - if @disable_chunk_backup - log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away" - return - end backup(unique_id) { |f| File.open(path, 'rb') { |chunk| chunk.set_encoding(Encoding::ASCII_8BIT) @@ -212,6 +208,7 @@ def handle_broken_files(path, mode, e) rescue => error log.error "backup failed. Delete corresponding files.", :err_msg => error.message ensure + log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away." if @disable_chunk_backup File.unlink(path, path + '.meta') rescue nil end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index c798a294a0..f11c5df7e7 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -209,10 +209,6 @@ def generate_chunk(metadata) def handle_broken_files(path, mode, e) log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message unique_id, _ = Fluent::Plugin::Buffer::FileSingleChunk.unique_id_and_key_from_path(path) - if @disable_chunk_backup - log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away" - return - end backup(unique_id) { |f| File.open(path, 'rb') { |chunk| chunk.set_encoding(Encoding::ASCII_8BIT) @@ -224,6 +220,7 @@ def handle_broken_files(path, mode, e) rescue => error log.error "backup failed. Delete corresponding files.", :err_msg => error.message ensure + log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away." if @disable_chunk_backup File.unlink(path) rescue nil end diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 6f8ddcfa2c..d04ae08296 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -908,6 +908,12 @@ def statistics def backup(chunk_unique_id) unique_id = dump_unique_id_hex(chunk_unique_id) + + if @disable_chunk_backup + log.warn "disable_chunk_backup is true. #{unique_id} chunk is not backed up." + return + end + safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_owner_id, "#{unique_id}.log") From 4c82d8fcedeb5acee187e7c5a024eb8c8c638dfb Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 13 Feb 2023 15:31:20 +0900 Subject: [PATCH 5/6] Improve comment Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/output.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 28dd3f8d2d..5dd5255652 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -437,7 +437,7 @@ def configure(conf) def keep_buffer_config_compat # Need this to call `@buffer_config.disable_chunk_backup` just as before, - # since some plugins may use this option in this shape. + # since some plugins may use this option in this way. @buffer_config[:disable_chunk_backup] = @buffer.disable_chunk_backup end From 310e0f0d9b4d46ee0d633d3621cb14d458656b9a Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 14 Feb 2023 10:49:13 +0900 Subject: [PATCH 6/6] Use power assertion Signed-off-by: Daijiro Fukuda --- test/plugin/test_buf_file.rb | 20 ++++++++++---------- test/plugin/test_buf_file_single.rb | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 99276b1de1..8a8c6744a4 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -1242,8 +1242,8 @@ def compare_log(plugin, msg) compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged) compare_log(@p, 'staged file chunk is empty') - assert_false File.exist?(p1) - assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") + assert { not File.exist?(p1) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") } end test '#resume backups staged broken metadata' do @@ -1258,8 +1258,8 @@ def compare_log(plugin, msg) compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged) compare_log(@p, 'staged meta file is broken') - assert_false File.exist?(p2) - assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") + assert { not File.exist?(p2) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") } end test '#resume backups enqueued empty chunk' do @@ -1274,8 +1274,8 @@ def compare_log(plugin, msg) compare_queued_chunk(@p.queue, c2id, 3, :queued) compare_log(@p, 'enqueued file chunk is empty') - assert_false File.exist?(p1) - assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") + assert { not File.exist?(p1) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") } end test '#resume backups enqueued broken metadata' do @@ -1290,8 +1290,8 @@ def compare_log(plugin, msg) compare_queued_chunk(@p.queue, c1id, 4, :queued) compare_log(@p, 'enqueued meta file is broken') - assert_false File.exist?(p2) - assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") + assert { not File.exist?(p2) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") } end test '#resume throws away broken chunk with disable_chunk_backup' do @@ -1307,8 +1307,8 @@ def compare_log(plugin, msg) compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged) compare_log(@p, 'staged meta file is broken') compare_log(@p, 'disable_chunk_backup is true') - assert_false File.exist?(p2) - assert_false File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") + assert { not File.exist?(p2) } + assert { not File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") } end end end diff --git a/test/plugin/test_buf_file_single.rb b/test/plugin/test_buf_file_single.rb index b1caf3a19e..85e8bc57d5 100644 --- a/test/plugin/test_buf_file_single.rb +++ b/test/plugin/test_buf_file_single.rb @@ -867,8 +867,8 @@ def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::Dum @p.start end - assert_false File.exist?(p1) - assert_true File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") + assert { not File.exist?(p1) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") } end test '#resume throws away broken chunk with disable_chunk_backup' do @@ -891,8 +891,8 @@ def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::Dum @p.start end - assert_false File.exist?(p1) - assert_false File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") + assert { not File.exist?(p1) } + assert { not File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") } end end end