Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug that BufferChunkOverflowError cause a whole data loss #3553

Merged
merged 2 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,14 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
chunks_to_enqueue = []
staged_bytesizes_by_chunk = {}
# track internal BufferChunkOverflowError in write_step_by_step
buffer_chunk_overflow_errors = []

begin
# sort metadata to get lock of chunks in same order with other threads
metadata_and_data.keys.sort.each do |metadata|
data = metadata_and_data[metadata]
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize, error|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.staged?
Expand All @@ -352,6 +354,9 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
unstaged_chunks[metadata] ||= []
unstaged_chunks[metadata] << chunk
end
if error && !error.empty?
buffer_chunk_overflow_errors << error
end
end
end

Expand Down Expand Up @@ -444,6 +449,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
unless buffer_chunk_overflow_errors.empty?
# Notify delayed BufferChunkOverflowError here
raise BufferChunkOverflowError, buffer_chunk_overflow_errors.join(", ")
end
end
end

Expand Down Expand Up @@ -716,6 +725,7 @@ def write_once(metadata, data, format: nil, size: nil, &block)

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
errors = []
if splits_count > data.size
splits_count = data.size
end
Expand Down Expand Up @@ -761,18 +771,41 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
begin
while writing_splits_index < splits.size
split = splits[writing_splits_index]
formatted_split = format ? format.call(split) : split.first
if split.size == 1 && original_bytesize == 0
if format == nil && @compress != :text
# The actual size of chunk is not determined until after chunk.append.
# so, keep already processed 'split' content here.
# (allow performance regression a bit)
chunk.commit
else
big_record_size = formatted_split.bytesize
if chunk.bytesize + big_record_size > @chunk_limit_size
errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
end
end
kenhys marked this conversation as resolved.
Show resolved Hide resolved
end
kenhys marked this conversation as resolved.
Show resolved Hide resolved

if format
chunk.concat(format.call(split), split.size)
chunk.concat(formatted_split, split.size)
else
chunk.append(split, compress: @compress)
end

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
adding_bytes = chunk.instance_eval { @adding_bytes } || "N/A" # 3rd party might not have 'adding_bytes'
chunk.rollback

if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
# It is obviously case that BufferChunkOverflowError should be raised here,
# but if it raises here, already processed 'split' or
# the proceeding 'split' will be lost completely.
# so it is a last resort to delay raising such a exception
errors << "a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size"
writing_splits_index += 1
next
end

if chunk_size_full?(chunk) || split.size == 1
Expand All @@ -795,7 +828,8 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise
end

block.call(chunk, chunk.bytesize - original_bytesize)
block.call(chunk, chunk.bytesize - original_bytesize, errors)
errors = []
end
end
rescue ShouldRetry
Expand Down
73 changes: 72 additions & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DummyOutputPlugin < Fluent::Plugin::Base
end
class DummyMemoryChunkError < StandardError; end
class DummyMemoryChunk < Fluent::Plugin::Buffer::MemoryChunk
attr_reader :append_count, :rollbacked, :closed, :purged
attr_reader :append_count, :rollbacked, :closed, :purged, :chunk
attr_accessor :failing
def initialize(metadata, compress: :text)
super
Expand Down Expand Up @@ -944,6 +944,52 @@ def create_chunk_es(metadata, es)
@p.write({@dm0 => es}, format: @format)
end
end

data(
first_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
intermediate_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}]]),
last_chunk: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]]),
multiple_chunks: Fluent::ArrayEventStream.new([[event_time('2016-04-11 16:00:02 +0000'), {"message" => "a"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "b"}],
[event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * 1_280_000}]])
)
test '#write exceeds chunk_limit_size, raise BufferChunkOverflowError, but not lost whole messages' do |(es)|
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

nth = []
es.entries.each_with_index do |entry, index|
if entry.last["message"].size == @p.chunk_limit_size
nth << index
end
end
messages = []
nth.each do |n|
messages << "a 1280025 bytes record (nth: #{n}) is larger than buffer chunk limit size"
end

assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages.join(", ")) do
@p.write({@dm0 => es}, format: @format)
end
# message a and b are concatenated and staged
staged_messages = Fluent::MessagePackFactory.msgpack_unpacker.feed_each(@p.stage[@dm0].chunk).collect do |record|
record.last
end
assert_equal([2, [{"message" => "a"}, {"message" => "b"}]],
[@p.stage[@dm0].size, staged_messages])
# only es0 message is queued
assert_equal [@dm0], @p.queue.map(&:metadata)
assert_equal [5000], @p.queue.map(&:size)
end
end

sub_test_case 'custom format with configuration for test with lower chunk limit size' do
Expand Down Expand Up @@ -1201,6 +1247,7 @@ def create_chunk_es(metadata, es)
sub_test_case 'when compress is gzip' do
setup do
@p = create_buffer({'compress' => 'gzip'})
@dm0 = create_metadata(Time.parse('2016-04-11 16:00:00 +0000').to_i, nil, nil)
end

test '#compress returns :gzip' do
Expand All @@ -1211,6 +1258,30 @@ def create_chunk_es(metadata, es)
chunk = @p.generate_chunk(create_metadata)
assert chunk.singleton_class.ancestors.include?(Fluent::Plugin::Buffer::Chunk::Decompressable)
end

test '#write compressed data which exceeds chunk_limit_size, it raises BufferChunkOverflowError' do
@p = create_buffer({'compress' => 'gzip', 'chunk_limit_size' => 70})
timestamp = event_time('2016-04-11 16:00:02 +0000')
es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}], # overflow
[timestamp, {"message" => "aaa"}],
[timestamp, {"message" => "bbb"}]])
assert_equal [], @p.queue.map(&:metadata)
assert_equal 70, @p.chunk_limit_size

# calculate the actual boundary value. it varies on machine
c = @p.generate_chunk(create_metadata)
c.append(Fluent::ArrayEventStream.new([[timestamp, {"message" => "012345"}]]), compress: :gzip)
overflow_bytes = c.bytesize

messages = "a #{overflow_bytes} bytes record (nth: 0) is larger than buffer chunk limit size"
assert_raise Fluent::Plugin::Buffer::BufferChunkOverflowError.new(messages) do
# test format == nil && compress == :gzip
@p.write({@dm0 => es})
end
# message a and b occupies each chunks in full, so both of messages are queued (no staged chunk)
assert_equal([2, [@dm0, @dm0], [1, 1], nil],
[@p.queue.size, @p.queue.map(&:metadata), @p.queue.map(&:size), @p.stage[@dm0]])
end
end

sub_test_case '#statistics' do
Expand Down