Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: fix emit error of race condition #4447

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 75 additions & 68 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -764,94 +764,95 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
while writing_splits_index < splits.size
chunk = get_next_chunk.call
errors = []
# The chunk must be locked until being passed to &block.
chunk.mon_enter
modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors}
chunk.synchronize do
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?

original_bytesize = committed_bytesize = chunk.bytesize
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : nil

if split.size == 1 # Check BufferChunkOverflowError
determined_bytesize = nil
if @compress != :text
determined_bytesize = nil
elsif formatted_split
determined_bytesize = formatted_split.bytesize
elsif split.first.respond_to?(:bytesize)
determined_bytesize = split.first.bytesize
end
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?

if determined_bytesize && determined_bytesize > @chunk_limit_size
# It is a obvious case that BufferChunkOverflowError should be raised here.
# But if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# So it is a last resort to delay raising such a exception
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
original_bytesize = committed_bytesize = chunk.bytesize
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : nil

if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
# The split will (might) cause size over so keep already processed
# 'split' content here (allow performance regression a bit).
chunk.commit
committed_bytesize = chunk.bytesize
end
if split.size == 1 # Check BufferChunkOverflowError
determined_bytesize = nil
if @compress != :text
determined_bytesize = nil
elsif formatted_split
determined_bytesize = formatted_split.bytesize
elsif split.first.respond_to?(:bytesize)
determined_bytesize = split.first.bytesize
end

if format
chunk.concat(formatted_split, split.size)
else
chunk.append(split, compress: @compress)
if determined_bytesize && determined_bytesize > @chunk_limit_size
# It is a obvious case that BufferChunkOverflowError should be raised here.
# But if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# So it is a last resort to delay raising such a exception
errors << "a #{determined_bytesize} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
end
adding_bytes = chunk.bytesize - committed_bytesize

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback
if determined_bytesize.nil? || chunk.bytesize + determined_bytesize > @chunk_limit_size
# The split will (might) cause size over so keep already processed
# 'split' content here (allow performance regression a bit).
chunk.commit
committed_bytesize = chunk.bytesize
end
end

if split.size == 1 # Check BufferChunkOverflowError again
if adding_bytes > @chunk_limit_size
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
else
# As already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again.
# New chunk should be allocated, to do it, modify @stage and so on.
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
end
end
if format
chunk.concat(formatted_split, split.size)
else
chunk.append(split, compress: @compress)
end
adding_bytes = chunk.bytesize - committed_bytesize

if chunk_size_full?(chunk) || split.size == 1
enqueue_chunk_before_retry = true
if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback
committed_bytesize = chunk.bytesize

if split.size == 1 # Check BufferChunkOverflowError again
if adding_bytes > @chunk_limit_size
errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})"
writing_splits_index += 1
next
else
splits_count *= 10
# As already processed content is kept after rollback, then unstaged chunk should be queued.
# After that, re-process current split again.
# New chunk should be allocated, to do it, modify @stage and so on.
synchronize { @stage.delete(modified_metadata) }
staged_chunk_used = false
chunk.unstaged!
break
end
end

raise ShouldRetry
if chunk_size_full?(chunk) || split.size == 1
enqueue_chunk_before_retry = true
else
splits_count *= 10
end

writing_splits_index += 1
raise ShouldRetry
end

if chunk_size_full?(chunk)
break
end
writing_splits_index += 1

if chunk_size_full?(chunk)
break
end
rescue
chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it
raise
end

modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
rescue
chunk.purge if chunk.unstaged? # unstaged chunk will leak unless purge it
raise
end

modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
end
modified_chunks.each do |data|
block.call(data[:chunk], data[:adding_bytesize], data[:errors])
Expand All @@ -863,9 +864,15 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
if chunk.unstaged?
chunk.purge rescue nil
end
chunk.mon_exit rescue nil
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
retry
ensure
modified_chunks.each do |data|
chunk = data[:chunk]
chunk.mon_exit
end
end

STATS_KEYS = [
Expand Down
59 changes: 59 additions & 0 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,65 @@ def create_chunk_es(metadata, es)

assert_equal 2, purge_count
end

# https://github.com/fluent/fluentd/issues/4446
test "#write_step_by_step keeps chunks kept in locked in entire #write process" do
assert_equal 8 * 1024 * 1024, @p.chunk_limit_size
assert_equal 0.95, @p.chunk_full_threshold

mon_enter_counts_by_chunk = {}
mon_exit_counts_by_chunk = {}

stub.proxy(@p).generate_chunk(anything) do |chunk|
stub(chunk).mon_enter do
enter_count = 1 + mon_enter_counts_by_chunk.fetch(chunk, 0)
exit_count = mon_exit_counts_by_chunk.fetch(chunk, 0)
mon_enter_counts_by_chunk[chunk] = enter_count

# Assert that chunk is passed to &block of write_step_by_step before exiting the lock.
# (i.e. The lock count must be 2 greater than the exit count).
# Since ShouldRetry occurs once, the staged chunk takes the lock 3 times when calling the block.
if chunk.staged?
lock_in_block = enter_count == 3
assert_equal(enter_count - 2, exit_count) if lock_in_block
else
lock_in_block = enter_count == 2
assert_equal(enter_count - 2, exit_count) if lock_in_block
end
end
stub(chunk).mon_exit do
exit_count = 1 + mon_exit_counts_by_chunk.fetch(chunk, 0)
mon_exit_counts_by_chunk[chunk] = exit_count
end
chunk
end

m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
small_row = "x" * 1024 * 400
big_row = "x" * 1024 * 1024 * 8 # just `chunk_size_limit`, it does't cause BufferOverFlowError.

# Write 42 events in 1 event stream, last one is for triggering `ShouldRetry`
@p.write({m => [small_row] * 40 + [big_row] + ["x"]})

# Above event strem will be splitted twice by `Buffer#write_step_by_step`
#
# 1. `write_once`: 42 [events] * 1 [stream]
# 2. `write_step_by_step`: 4 [events]* 10 [streams] + 2 [events] * 1 [stream]
# 3. `write_step_by_step` (by `ShouldRetry`): 1 [event] * 42 [streams]
#
# Example of staged chunk lock behavior:
#
# 1. mon_enter in write_step_by_step
# 2. ShouldRetry occurs
# 3. mon_exit in write_step_by_step
# 4. mon_enter again in write_step_by_step (retry)
# 5. passed to &block of write_step_by_step
# 6. mon_enter in the block (write)
# 7. mon_exit in write_step_by_step
# 8. mon_exit in write

assert_equal(mon_enter_counts_by_chunk.values, mon_exit_counts_by_chunk.values)
end
end

sub_test_case 'standard format with configuration for test with lower chunk limit size' do
Expand Down
Loading