Skip to content

Commit

Permalink
Fix a bug that BufferChunkOverflowError cause a whole data loss
Browse files Browse the repository at this point in the history
In the previous version, if BufferChunkOverflowError occurs,
whole data is not sent because of raised exception.

For example, incoming partial data exceeds chunk_limit_size,
"message a" and "message b" is lost.

  message a\n
  longer.....message data exceeds chunk_limit_size\n
  message b\n

It is not appropriate because the problem exists only a partial split
data.

In this commit, when BufferChunkOverflowError is detected in
write_step_by_step, collect errors and propagate it to write method
scope.
By this way, a valid partial split is processed and not lost.

Here is the micro benchmark result:

It seems that it has slightly performance drawbacks, but all 'split'
data is processed correctly without thrown away it when BufferChunkOverflowError
occurs, thus it is not so critical.

Before:

  Calculating -------------------------------------
  no BufferChunkOverflowError
                          115.161k (±13.5%) i/s -      1.078M in   9.705579s
  raise BufferChunkOverflowError in every 100 calls
                          957.400  (±33.0%) i/s -      7.500k in   9.994991s

  Comparison:
  no BufferChunkOverflowError:   115160.5 i/s
  raise BufferChunkOverflowError in every 100 calls:      957.4 i/s - 120.28x  (± 0.00) slower

After:

  Calculating -------------------------------------
  no BufferChunkOverflowError
                          114.962k (±13.4%) i/s -      1.080M in   9.724037s
  raise BufferChunkOverflowError in every 100 calls
                          907.511  (±32.1%) i/s -      7.081k in  10.003371s

  Comparison:
  no BufferChunkOverflowError:   114961.7 i/s
  raise BufferChunkOverflowError in every 100 calls:      907.5 i/s - 126.68x  (± 0.00) slower

Here is the micro benchmark test case:

  test '#write BufferChunkOverflowError micro benchmark' do
    require 'benchmark/ips'
    es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x"}] ])
    large = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}] ])
    Benchmark.ips do |x|
      x.config(:time => 10, :warmup => 0)
      x.report("no BufferChunkOverflowError") do
        @p.write({@dm0 => es}, format: @Format)
      end
      x.report("raise BufferChunkOverflowError in every 100 calls") do |n|
        n.times do |i|
          if i % 100 == 0
            assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError do
              @p.write({@dm0 => large}, format: @Format)
            end
          else
            @p.write({@dm0 => es}, format: @Format)
          end
        end
      end
      x.compare!
    end
  end

Signed-off-by: Kentaro Hayashi <[email protected]>
  • Loading branch information
kenhys committed Nov 12, 2021
1 parent f03bb92 commit 40455e4
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
28 changes: 25 additions & 3 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,14 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}
# track internal BufferChunkOverflowError in write_step_by_step
buffer_chunk_overflow_errors = []

begin
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize, error|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
Expand All @@ -352,6 +354,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end
if error and not error.empty?
buffer_chunk_overflow_errors << error
end
end
end

Expand Down Expand Up @@ -444,6 +449,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
unless buffer_chunk_overflow_errors.empty?
# Notify delayed BufferChunkOverflowError here
raise BufferChunkOverflowError, buffer_chunk_overflow_errors.join(", ")
end
end
end

Expand Down Expand Up @@ -716,6 +725,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
errors = []
if splits_count > data.size
splits_count = data.size
end
Expand Down Expand Up @@ -761,6 +771,12 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
if split.size == 1 && original_bytesize == 0 && (chunk.bytesize + split.first.to_msgpack.bytesize) > @chunk_limit_size
# When it will be predictable that BufferChunkOverflowError occur,
# commit current chunk to keep it, otherwise valid 'split' data can be lost by
# the following chunk.rollback
chunk.commit
end
if format
chunk.concat(format.call(split), split.size)
else
Expand All @@ -772,7 +788,13 @@ def write_step_by_step(metadata, data, format, splits_count, &block)

if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
end

if chunk_size_full?(chunk) || split.size == 1
Expand All @@ -795,7 +817,7 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise
end

block.call(chunk, chunk.bytesize - original_bytesize)
block.call(chunk, chunk.bytesize - original_bytesize, errors)
end
end
rescue ShouldRetry
Expand Down
48 changes: 47 additions & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DummyOutputPlugin < Fluent::Plugin::Base
end
class DummyMemoryChunkError < StandardError; end
class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk
attr_reader :append_count, :rollbacked, :closed, :purged
attr_reader :append_count, :rollbacked, :closed, :purged, :chunk
attr_accessor :failing
def initialize(metadata, compress: :text)
super
Expand Down Expand Up @@ -944,6 +944,52 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es}, format: @format)
end
end

data(
first_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
intermediate_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
last_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]]),
multiple_chunks: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]])
)
test '#write exceeds chunk_limit_size, raise BufferChunkOverflowError, but not lost whole messages' do |(es)|
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

nth = []
es.entries.each_with_index do |entry, index|
if entry.last["message"].size == @p.chunk_limit_size
nth << index
end
end
messages = []
nth.each do |n|
messages << "a 1280025 bytes record (nth: #{n}) is larger than buffer chunk limit size"
end

assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages.join(", ")) do
@p.write({@dm0 => es}, format: @format)
end
# message a and b are concatenated and staged
messages = Fluent::MessagePackFactory.msgpack_unpacker.feed_each(@p.stage[@dm0].chunk).collect do |record|
record.last
end
assert_equal([2, messages],
[@p.stage[@dm0].size, [{"message" => "a"}, {"message" => "b"}]])
# only es0 message is queued
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down

0 comments on commit 40455e4

Please sign in to comment.