diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 4725f5c9fe..1de983268e 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -139,6 +139,7 @@ def persistent? def resume stage = {} queue = [] + exist_broken_file = false patterns = [@path] patterns.unshift @additional_resume_path if @additional_resume_path @@ -158,6 +159,7 @@ def resume begin chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode, compress: @compress) # file chunk resumes contents of metadata rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e + exist_broken_file = true handle_broken_files(path, mode, e) next end @@ -182,6 +184,15 @@ def resume queue.sort_by!{ |chunk| chunk.modified_at } + # If one of the files is corrupted, other files may also be corrupted and be undetected. + # The time priods of each chunk are helpful to check the data. + if exist_broken_file + log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files." + (stage.values + queue).each { |chunk| + log.info " #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at + } + end + return stage, queue end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index 225447063e..414aad8ff1 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -160,6 +160,7 @@ def persistent? def resume stage = {} queue = [] + exist_broken_file = false patterns = [@path] patterns.unshift @additional_resume_path if @additional_resume_path @@ -179,6 +180,7 @@ def resume chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress) chunk.restore_size(@chunk_format) if @calc_num_records rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e + exist_broken_file = true handle_broken_files(path, mode, e) next end @@ -193,6 +195,15 @@ def resume queue.sort_by!(&:modified_at) + # If one of the files is corrupted, other files may also be corrupted and be undetected. + # The time priods of each chunk are helpful to check the data. + if exist_broken_file + log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files." + (stage.values + queue).each { |chunk| + log.info " #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at + } + end + return stage, queue end