Skip to content

Commit

Permalink
Merge pull request #4028 from daipom/log-restored-buffer-time-period-…
Browse files Browse the repository at this point in the history
…possibly-broken

buffer: add log for time periods of restored chunks which may be broken
  • Loading branch information
ashie authored Jan 27, 2023
2 parents 64509a1 + a769a15 commit 8c2e9dd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
11 changes: 11 additions & 0 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def persistent?
def resume
stage = {}
queue = []
exist_broken_file = false

patterns = [@path]
patterns.unshift @additional_resume_path if @additional_resume_path
Expand All @@ -164,6 +165,7 @@ def resume
begin
chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode, compress: @compress) # file chunk resumes contents of metadata
rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e
exist_broken_file = true
handle_broken_files(path, mode, e)
next
end
Expand All @@ -188,6 +190,15 @@ def resume

queue.sort_by!{ |chunk| chunk.modified_at }

# If one of the files is corrupted, other files may also be corrupted and be undetected.
# The time priods of each chunk are helpful to check the data.
if exist_broken_file
log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files."
(stage.values + queue).each { |chunk|
log.info " #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at
}
end

return stage, queue
end

Expand Down
11 changes: 11 additions & 0 deletions lib/fluent/plugin/buf_file_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def persistent?
def resume
stage = {}
queue = []
exist_broken_file = false

patterns = [@path]
patterns.unshift @additional_resume_path if @additional_resume_path
Expand All @@ -185,6 +186,7 @@ def resume
chunk = Fluent::Plugin::Buffer::FileSingleChunk.new(m, path, mode, @key_in_path, compress: @compress)
chunk.restore_size(@chunk_format) if @calc_num_records
rescue Fluent::Plugin::Buffer::FileSingleChunk::FileChunkError => e
exist_broken_file = true
handle_broken_files(path, mode, e)
next
end
Expand All @@ -199,6 +201,15 @@ def resume

queue.sort_by!(&:modified_at)

# If one of the files is corrupted, other files may also be corrupted and be undetected.
# The time priods of each chunk are helpful to check the data.
if exist_broken_file
log.info "Since a broken chunk file was found, it is possible that other files remaining at the time of resuming were also broken. Here is the list of the files."
(stage.values + queue).each { |chunk|
log.info " #{chunk.path}:", :created_at => chunk.created_at, :modified_at => chunk.modified_at
}
end

return stage, queue
end

Expand Down

0 comments on commit 8c2e9dd

Please sign in to comment.