Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make stage_size & stage computation thread safe #2734

Merged
merged 2 commits into from
Jan 6, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)

log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size }

staged_bytesize = 0
operated_chunks = []
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}

begin
# sort metadata to get lock of chunks in same order with other threads
Expand All @@ -283,7 +283,13 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
staged_bytesize += adding_bytesize
#
# https://github.com/fluent/fluentd/issues/2712
# write_once is supposed to write to a chunk only once
# but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked
# So we should be counting the stage_size only for the last successful write
#
staged_bytesizes_by_chunk[chunk] = adding_bytesize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the following situation can happen. what do you think of this?

  1. staged_bytesizes_by_chunk stores chunk1
  2. rollback happens in write_step_by_step (ShouldRetry raises)
  3. another thread euqueues chunk1 before this thread enters into
    chunk = get_next_chunk.call
  4. create new chunk(chunk2) in
    synchronize{ @stage[metadata] ||= generate_chunk(metadata).staged! }
  5. staged_bytesizes_by_chunk stores chunk2 (still remtains chunk1 in staged_bytesizes_by_chunk)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ganmacs Steps (3) can't happen because enqueue_chunk can't get a lock and waits until the lock is released by the thread which is writing to the chunk.

The lock is acquired here: https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/buffer.rb#L279
And released here:
https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/buffer.rb#L319

So enqueue_chunk can happen only after the commit is done.
Hope this helps.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. You're right. other thread can get the chunk but can't enter

chunk.synchronize do
.

elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
Expand Down Expand Up @@ -330,27 +336,37 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)

# All locks about chunks are released.

synchronize do
# At here, staged chunks may be enqueued by other threads.
@stage_size += staged_bytesize

chunks_to_enqueue.each do |c|
if c.staged? && (enqueue || chunk_size_full?(c))
m = c.metadata
enqueue_chunk(m)
if unstaged_chunks[m]
u = unstaged_chunks[m].pop
#
# Now update the stage, stage_size with proper locking
# FIX FOR stage_size miscomputation - https://github.com/fluent/fluentd/issues/2712
#
staged_bytesizes_by_chunk.each do |chunk, bytesize|
chunk.synchronize do
synchronize { @stage_size += bytesize }
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
end
end

chunks_to_enqueue.each do |c|
if c.staged? && (enqueue || chunk_size_full?(c))
m = c.metadata
enqueue_chunk(m)
if unstaged_chunks[m]
u = unstaged_chunks[m].pop
u.synchronize do
if u.unstaged? && !chunk_size_full?(u)
@stage[m] = u.staged!
@stage_size += u.bytesize
synchronize {
@stage[m] = u.staged!
@stage_size += u.bytesize
}
end
end
elsif c.unstaged?
enqueue_unstaged_chunk(c)
else
# previously staged chunk is already enqueued, closed or purged.
# no problem.
end
elsif c.unstaged?
enqueue_unstaged_chunk(c)
else
# previously staged chunk is already enqueued, closed or purged.
# no problem.
end
end

Expand Down