Skip to content

Commit

Permalink
Merge pull request #2734 from hlakshmi/fix-stage-size-computation
Browse files Browse the repository at this point in the history
Make stage_size & stage computation thread safe
  • Loading branch information
repeatedly authored Jan 6, 2020
2 parents c809788 + 065751f commit 24ddf6e
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)

log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size }

staged_bytesize = 0
operated_chunks = []
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}

begin
# sort metadata to get lock of chunks in same order with other threads
Expand All @@ -279,7 +279,13 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
staged_bytesize += adding_bytesize
#
# https://github.com/fluent/fluentd/issues/2712
# write_once is supposed to write to a chunk only once
# but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked
# So we should be counting the stage_size only for the last successful write
#
staged_bytesizes_by_chunk[chunk] = adding_bytesize
elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
Expand Down Expand Up @@ -326,27 +332,37 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)

# All locks about chunks are released.

synchronize do
# At here, staged chunks may be enqueued by other threads.
@stage_size += staged_bytesize

chunks_to_enqueue.each do |c|
if c.staged? && (enqueue || chunk_size_full?(c))
m = c.metadata
enqueue_chunk(m)
if unstaged_chunks[m]
u = unstaged_chunks[m].pop
#
# Now update the stage, stage_size with proper locking
# FIX FOR stage_size miscomputation - https://github.com/fluent/fluentd/issues/2712
#
staged_bytesizes_by_chunk.each do |chunk, bytesize|
chunk.synchronize do
synchronize { @stage_size += bytesize }
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
end
end

chunks_to_enqueue.each do |c|
if c.staged? && (enqueue || chunk_size_full?(c))
m = c.metadata
enqueue_chunk(m)
if unstaged_chunks[m]
u = unstaged_chunks[m].pop
u.synchronize do
if u.unstaged? && !chunk_size_full?(u)
@stage[m] = u.staged!
@stage_size += u.bytesize
synchronize {
@stage[m] = u.staged!
@stage_size += u.bytesize
}
end
end
elsif c.unstaged?
enqueue_unstaged_chunk(c)
else
# previously staged chunk is already enqueued, closed or purged.
# no problem.
end
elsif c.unstaged?
enqueue_unstaged_chunk(c)
else
# previously staged chunk is already enqueued, closed or purged.
# no problem.
end
end

Expand Down

0 comments on commit 24ddf6e

Please sign in to comment.