Skip to content

Commit

Permalink
Fix a bug that BufferChunkOverflowError cause a whole data loss
Browse files Browse the repository at this point in the history
In the previous version, if BufferChunkOverflowError occurs,
whole data is not sent because of raised exception.

For example, incoming partial data exceeds chunk_limit_size,
"message a" and "message b" is lost.

  message a\n
  longer.....message data exceeds chunk_limit_size\n
  message b\n

It is not appropriate because the problem exists only a partial split
data.

In this commit, detect BufferChunkOverflowError and postpone a timing
to raise exception.
Thus a valid partial split is processed as intended.

Signed-off-by: Kentaro Hayashi <[email protected]>
  • Loading branch information
kenhys committed Nov 11, 2021
1 parent abca46f commit 0bd4076
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 16 deletions.
59 changes: 44 additions & 15 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,26 +332,33 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}
# track internal BufferChunkOverflowError in write_step_by_step
@delay_buffer_chunk_overflow_error = nil

begin
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
#
# https://github.com/fluent/fluentd/issues/2712
# write_once is supposed to write to a chunk only once
# but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked
# So we should be counting the stage_size only for the last successful write
#
staged_bytesizes_by_chunk[chunk] = adding_bytesize
elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
begin
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
#
# https://github.com/fluent/fluentd/issues/2712
# write_once is supposed to write to a chunk only once
# but this block **may** run multiple times from write_step_by_step and previous write may be rollbacked
# So we should be counting the stage_size only for the last successful write
#
staged_bytesizes_by_chunk[chunk] = adding_bytesize
elsif chunk.unstaged?
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end
end
rescue BufferChunkOverflowError
# If BufferChunkOverflowError is not caught here, operated_chunks will be lost
# So only do catch exception here.
end
end

Expand Down Expand Up @@ -444,6 +451,12 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
if @delay_buffer_chunk_overflow_error
# Notify delayed chunk overflow here
message = @delay_buffer_chunk_overflow_error
@delay_buffer_chunk_overflow_error = nil
raise BufferChunkOverflowError, message
end
end
end

Expand Down Expand Up @@ -761,6 +774,12 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
if split.size == 1 && original_bytesize == 0 && (chunk.bytesize + split.first.to_msgpack.bytesize) > @chunk_limit_size
# When it will be predictable that BufferChunkOverflowError occur,
# commit current chunk to keep it, otherwise valid 'split' data can be lost by
# the following chunk.rollback
chunk.commit
end
if format
chunk.concat(format.call(split), split.size)
else
Expand All @@ -772,7 +791,13 @@ def write_step_by_step(metadata, data, format, splits_count, &block)

if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
writing_splits_index += 1
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
@delay_buffer_chunk_overflow_error = "a #{big_record_size}bytes record is larger than buffer chunk limit size"
next
end

if chunk_size_full?(chunk) || split.size == 1
Expand Down Expand Up @@ -807,6 +832,10 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
retry
ensure
if @delay_buffer_chunk_overflow_error
raise BufferChunkOverflowError, @delay_buffer_chunk_limit_error
end
end

STATS_KEYS = [
Expand Down
33 changes: 32 additions & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DummyOutputPlugin < Fluent::Plugin::Base
end
class DummyMemoryChunkError < StandardError; end
class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk
attr_reader :append_count, :rollbacked, :closed, :purged
attr_reader :append_count, :rollbacked, :closed, :purged, :chunk
attr_accessor :failing
def initialize(metadata, compress: :text)
super
Expand Down Expand Up @@ -944,6 +944,37 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es}, format: @format)
end
end

data(
first_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
intermediate_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
last_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]])
)
test '#write exceeds chunk_limit_size, raise BufferChunkOverflowError, but not lost whole messages' do |(es)|
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError do
@p.write({@dm0 => es}, format: @format)
end
# message a and b are concatenated and staged
messages = Fluent::MessagePackFactory.msgpack_unpacker.feed_each(@p.stage[@dm0].chunk).collect do |record|
record.last
end
assert_equal([2, messages],
[@p.stage[@dm0].size, [{"message" => "a"}, {"message" => "b"}]])
# only es0 message is queued
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down

0 comments on commit 0bd4076

Please sign in to comment.