Skip to content

Commit

Permalink
buffer: backup broken file chunk
Browse files Browse the repository at this point in the history
Backup feature was implemented in fluent#1952, but it didn't support
handling broken file chunks found in resuming buffer.

This extends the backup feature to support it.

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Jan 26, 2023
1 parent c48ca04 commit a8f241f
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 39 deletions.
19 changes: 17 additions & 2 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,23 @@ def generate_chunk(metadata)
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message
unique_id = Fluent::Plugin::Buffer::FileChunk.unique_id_from_path(path)
if @disable_chunk_backup
log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away"
return
end
backup(unique_id) { |f|
File.open(path, 'rb') { |chunk|
chunk.set_encoding(Encoding::ASCII_8BIT)
chunk.sync = true
chunk.binmode
IO.copy_stream(chunk, f)
}
}
rescue => error
log.error "backup failed. Delete corresponding files.", :err_msg => error.message
ensure
File.unlink(path, path + '.meta') rescue nil
end

Expand Down
19 changes: 17 additions & 2 deletions lib/fluent/plugin/buf_file_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,23 @@ def generate_chunk(metadata)
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Delete corresponding files:", path: path, mode: mode, err_msg: e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message
unique_id, _ = Fluent::Plugin::Buffer::FileSingleChunk.unique_id_and_key_from_path(path)
if @disable_chunk_backup
log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away"
return
end
backup(unique_id) { |f|
File.open(path, 'rb') { |chunk|
chunk.set_encoding(Encoding::ASCII_8BIT)
chunk.sync = true
chunk.binmode
IO.copy_stream(chunk, f)
}
}
rescue => error
log.error "backup failed. Delete corresponding files.", :err_msg => error.message
ensure
File.unlink(path) rescue nil
end

Expand Down
15 changes: 15 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

desc 'If true, chunks are thrown away when unrecoverable error happens'
config_param :disable_chunk_backup, :bool, default: false

Metadata = Struct.new(:timekey, :tag, :variables, :seq) do
def initialize(timekey, tag, variables)
super(timekey, tag, variables, 0)
Expand Down Expand Up @@ -903,6 +906,18 @@ def statistics
{ 'buffer' => stats }
end

def backup(chunk_unique_id)
unique_id = dump_unique_id_hex(chunk_unique_id)
safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_owner_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION) { |f| yield f }
end

private

def optimistic_queued?(metadata = nil)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def self.generate_queued_chunk_path(path, unique_id)
end
end

# used only for queued v0.12 buffer path
# used only for queued v0.12 buffer path or broken files
def self.unique_id_from_path(path)
if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
return $2.scan(/../).map{|x| x.to_i(16) }.pack('C*')
Expand Down
13 changes: 2 additions & 11 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ class Output < Base
config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'

config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'
config_param :disable_chunk_backup, :bool, default: false, desc: 'If true, chunks are thrown away when unrecoverable error happens'
end

config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do
Expand Down Expand Up @@ -1240,18 +1239,10 @@ def try_flush
end

def backup_chunk(chunk, using_secondary, delayed_commit)
if @buffer_config.disable_chunk_backup
if @buffer.disable_chunk_backup
log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away"
else
unique_id = dump_unique_id_hex(chunk.unique_id)
safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION) { |f|
@buffer.backup(chunk.unique_id) { |f|
chunk.write_to(f)
}
end
Expand Down
85 changes: 62 additions & 23 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1151,15 +1151,13 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)

sub_test_case 'there are existing broken file chunks' do
setup do
@id_output = 'backup_test'
@bufdir = File.expand_path('../../tmp/broken_buffer_file', __FILE__)
FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir)
FileUtils.rm_rf @bufdir rescue nil
FileUtils.mkdir_p @bufdir
@bufpath = File.join(@bufdir, 'broken_test.*.log')

Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
end

teardown do
Expand All @@ -1171,12 +1169,12 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end

def setup_plugins(buf_conf)
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@d.configure(config_element('ROOT', '', {'@id' => @id_output}, [config_element('buffer', '', buf_conf)]))
@p = @d.buffer
end

def create_first_chunk(mode)
Expand Down Expand Up @@ -1232,44 +1230,85 @@ def compare_log(plugin, msg)
assert { logs.any? { |log| log.include?(msg) } }
end

test '#resume ignores staged empty chunk' do
_, p1 = create_first_chunk('b')
test '#resume backups staged empty chunk' do
setup_plugins({'path' => @bufpath})
c1id, p1 = create_first_chunk('b')
File.open(p1, 'wb') { |f| } # create staged empty chunk file
c2id, _ = create_second_chunk('b')

@p.start
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged)
compare_log(@p, 'staged file chunk is empty')
assert_false File.exist?(p1)
assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log")
end

test '#resume ignores staged broken metadata' do
test '#resume backups staged broken metadata' do
setup_plugins({'path' => @bufpath})
c1id, _ = create_first_chunk('b')
_, p2 = create_second_chunk('b')
c2id, p2 = create_second_chunk('b')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file

@p.start
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged)
compare_log(@p, 'staged meta file is broken')
assert_false File.exist?(p2)
assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log")
end

test '#resume ignores enqueued empty chunk' do
_, p1 = create_first_chunk('q')
test '#resume backups enqueued empty chunk' do
setup_plugins({'path' => @bufpath})
c1id, p1 = create_first_chunk('q')
File.open(p1, 'wb') { |f| } # create enqueued empty chunk file
c2id, _ = create_second_chunk('q')

@p.start
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_queued_chunk(@p.queue, c2id, 3, :queued)
compare_log(@p, 'enqueued file chunk is empty')
assert_false File.exist?(p1)
assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log")
end

test '#resume ignores enqueued broken metadata' do
test '#resume backups enqueued broken metadata' do
setup_plugins({'path' => @bufpath})
c1id, _ = create_first_chunk('q')
_, p2 = create_second_chunk('q')
c2id, p2 = create_second_chunk('q')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create enqueued broken meta file

@p.start
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_queued_chunk(@p.queue, c1id, 4, :queued)
compare_log(@p, 'enqueued meta file is broken')
assert_false File.exist?(p2)
assert_true File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log")
end

test '#resume throws away broken chunk with disable_chunk_backup' do
setup_plugins({'path' => @bufpath, 'disable_chunk_backup' => true})
c1id, _ = create_first_chunk('b')
c2id, p2 = create_second_chunk('b')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file

Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged)
compare_log(@p, 'staged meta file is broken')
compare_log(@p, 'disable_chunk_backup is true')
assert_false File.exist?(p2)
assert_false File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log")
end
end
end
65 changes: 65 additions & 0 deletions test/plugin/test_buf_file_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -830,4 +830,69 @@ def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::Dum
assert_equal :queued, queue[0].state
end
end

sub_test_case 'there are existing broken file chunks' do
setup do
FileUtils.rm_rf(@bufdir) rescue nil
FileUtils.mkdir_p(@bufdir)
end

teardown do
return unless @p

@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end

test '#resume backups empty chunk' do
id_output = 'backup_test'
@d = create_driver(%[
@id #{id_output}
<buffer tag>
@type file_single
path #{PATH}
</buffer>
])
@p = @d.instance.buffer

c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf")
File.open(p1, 'wb') { |f| } # create empty chunk file

Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

assert_false File.exist?(p1)
assert_true File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log")
end

test '#resume throws away broken chunk with disable_chunk_backup' do
id_output = 'backup_test'
@d = create_driver(%[
@id #{id_output}
<buffer tag>
@type file_single
path #{PATH}
disable_chunk_backup true
</buffer>
])
@p = @d.instance.buffer

c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf")
File.open(p1, 'wb') { |f| } # create empty chunk file

Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

assert_false File.exist?(p1)
assert_false File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log")
end
end
end

0 comments on commit a8f241f

Please sign in to comment.