Skip to content

Commit

Permalink
add stage_size only for last successful write to the chunk
Browse files Browse the repository at this point in the history
Signed-off-by: Harish Nelakurthi <[email protected]>
  • Loading branch information
Harish Nelakurthi committed Dec 17, 2019
1 parent a1731f1 commit 065751f
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,13 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
staged_bytesizes_by_chunk[chunk] = 0 if staged_bytesizes_by_chunk[chunk].nil?
staged_bytesizes_by_chunk[chunk] += adding_bytesize
#
# https://github.com/fluent/fluentd/issues/2712
# write_once is supposed to write to a chunk only once
# but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked
# So we should be counting the stage_size only for the last successful write
#
staged_bytesizes_by_chunk[chunk] = adding_bytesize
elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
Expand Down Expand Up @@ -337,10 +342,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
#
staged_bytesizes_by_chunk.each do |chunk, bytesize|
chunk.synchronize do
if chunk.staged?
synchronize { @stage_size += bytesize }
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
end
synchronize { @stage_size += bytesize }
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
end
end

Expand Down

0 comments on commit 065751f

Please sign in to comment.