diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index cc18f76a1e..5877af936c 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -780,10 +780,14 @@ def write_step_by_step(metadata, data, format, splits_count, &block) chunk.commit else big_record_size = formatted_split.bytesize - if chunk.bytesize + big_record_size > @chunk_limit_size + if big_record_size > @chunk_limit_size + # Just skip to next split (current split is ignored) errors << "a #{big_record_size} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" writing_splits_index += 1 next + elsif chunk.bytesize + big_record_size > @chunk_limit_size + # No doubt that the split is expected to cause size over, keep 'split' content here. + chunk.commit end end end @@ -799,13 +803,23 @@ def write_step_by_step(metadata, data, format, splits_count, &block) chunk.rollback if split.size == 1 && original_bytesize == 0 - # It is obviously case that BufferChunkOverflowError should be raised here, - # but if it raises here, already processed 'split' or - # the proceeding 'split' will be lost completely. - # so it is a last resort to delay raising such a exception - errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" - writing_splits_index += 1 - next + # FIXME: adding_bytes should be implemented as an official chunk API in the future + if adding_bytes.is_a?(Integer) && adding_bytes < @chunk_limit_size + # already processed content is kept after rollback, then unstaged chunk should be queued. + # After that, re-process current split again (new chunk should be allocated, to do it, modify @stage and so on) + synchronize { @stage.delete(modified_metadata) } + staged_chunk_used = false + chunk.unstaged! + break + else + # It is obviously case that BufferChunkOverflowError should be raised here, + # but if it raises here, already processed 'split' or + # the proceeding 'split' will be lost completely. + # so it is a last resort to delay raising such a exception + errors << "concatenated/appended a #{adding_bytes} bytes record (nth: #{writing_splits_index}) is larger than buffer chunk limit size (#{@chunk_limit_size})" + writing_splits_index += 1 + next + end end if chunk_size_full?(chunk) || split.size == 1 diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index cfbcd89380..d35d2d6ce7 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -990,6 +990,51 @@ def create_chunk_es(metadata, es) assert_equal [@dm0], @p.queue.map(&:metadata) assert_equal [5000], @p.queue.map(&:size) end + + test "confirm that every message which is smaller than chunk threshold does not raise BufferChunkOverflowError" do + assert_equal [@dm0], @p.stage.keys + assert_equal [], @p.queue.map(&:metadata) + timestamp = event_time('2016-04-11 16:00:02 +0000') + es = Fluent::ArrayEventStream.new([[timestamp, {"message" => "a" * 1_000_000}], + [timestamp, {"message" => "b" * 1_000_000}], + [timestamp, {"message" => "c" * 1_000_000}]]) + + # https://github.com/fluent/fluentd/issues/1849 + # Even though 1_000_000 < 1_280_000 (chunk_limit_size), it raised BufferChunkOverflowError before. + # It should not be raised and message a,b,c should be stored into 3 chunks. + assert_nothing_raised do + @p.write({@dm0 => es}, format: @format) + end + messages = [] + # pick up first letter to check whether chunk is queued in expected order + 3.times do |index| + chunk = @p.queue[index] + es = Fluent::MessagePackEventStream.new(chunk.chunk) + es.ensure_unpacked! + records = es.instance_eval{ @unpacked_records } + records.each do |record| + messages << record["message"][0] + end + end + es = Fluent::MessagePackEventStream.new(@p.stage[@dm0].chunk) + es.ensure_unpacked! + staged_message = es.instance_eval{ @unpacked_records }.first["message"] + # message a and b are queued, message c is staged + assert_equal([ + [@dm0], + "c" * 1_000_000, + [@dm0, @dm0, @dm0], + [5000, 1, 1], + [["x"] * 5000, "a", "b"].flatten + ], + [ + @p.stage.keys, + staged_message, + @p.queue.map(&:metadata), + @p.queue.map(&:size), + messages + ]) + end end sub_test_case 'custom format with configuration for test with lower chunk limit size' do @@ -1078,6 +1123,38 @@ def create_chunk_es(metadata, es) @p.write({@dm0 => es}) end end + + test 'confirm that every array message which is smaller than chunk threshold does not raise BufferChunkOverflowError' do + assert_equal [@dm0], @p.stage.keys + assert_equal [], @p.queue.map(&:metadata) + + assert_equal 1_280_000, @p.chunk_limit_size + + es = ["a" * 1_000_000, "b" * 1_000_000, "c" * 1_000_000] + assert_nothing_raised do + @p.write({@dm0 => es}) + end + queue_messages = @p.queue.collect do |chunk| + # collect first character of each message + chunk.chunk[0] + end + assert_equal([ + [@dm0], + 1, + "c", + [@dm0, @dm0, @dm0], + [5000, 1, 1], + ["x", "a", "b"] + ], + [ + @p.stage.keys, + @p.stage[@dm0].size, + @p.stage[@dm0].chunk[0], + @p.queue.map(&:metadata), + @p.queue.map(&:size), + queue_messages + ]) + end end sub_test_case 'with configuration for test with lower limits' do