From eadb79da8123e5f54d837764adc347d583828329 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 26 Jan 2023 17:43:33 +0900 Subject: [PATCH 1/2] buffer: log restored chunks' time periods which may be broken Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/buf_file.rb | 11 +++++++++++ lib/fluent/plugin/buf_file_single.rb | 11 +++++++++++ 2 files changed, 22 insertions(+) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 4725f5c9fe..d0251f3759 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -139,6 +139,7 @@ def persistent? def resume stage = {} queue = [] + exist_broken_file = false patterns = [@path] patterns.unshift @additional_resume_path if @additional_resume_path @@ -158,6 +159,7 @@ def resume begin chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode, compress: @compress) # file chunk resumes contents of metadata rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e + exist_broken_file = true handle_broken_files(path, mode, e) next end @@ -182,6 +184,15 @@ def resume queue.sort_by!{ |chunk| chunk.modified_at } + # If one of the files is corrupted, other files may also be corrupted and be undetected. + # The time priods of each chunk are helpful to check the data. + if exist_broken_file + log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files." + (stage.values + queue).map { |chunk| + log.info " #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at + } + end + return stage, queue end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index 225447063e..c4937ad247 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -160,6 +160,7 @@ def persistent? def resume stage = {} queue = [] + exist_broken_file = false patterns = [@path] patterns.unshift @additional_resume_path if @additional_resume_path @@ -179,6 +180,7 @@ def resume chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress) chunk.restore_size(@chunk_format) if @calc_num_records rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e + exist_broken_file = true handle_broken_files(path, mode, e) next end @@ -193,6 +195,15 @@ def resume queue.sort_by!(&:modified_at) + # If one of the files is corrupted, other files may also be corrupted and be undetected. + # The time priods of each chunk are helpful to check the data. + if exist_broken_file + log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files." + (stage.values + queue).map { |chunk| + log.info " #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at + } + end + return stage, queue end From a769a15f0e2d2a29637e293e7274be81cd62d73a Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Fri, 27 Jan 2023 13:19:09 +0900 Subject: [PATCH 2/2] use each instead of map Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/buf_file.rb | 2 +- lib/fluent/plugin/buf_file_single.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index d0251f3759..1de983268e 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -188,7 +188,7 @@ def resume # The time priods of each chunk are helpful to check the data. if exist_broken_file log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files." - (stage.values + queue).map { |chunk| + (stage.values + queue).each { |chunk| log.info " #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at } end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index c4937ad247..414aad8ff1 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -199,7 +199,7 @@ def resume # The time priods of each chunk are helpful to check the data. if exist_broken_file log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files." - (stage.values + queue).map { |chunk| + (stage.values + queue).each { |chunk| log.info " #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at } end