diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 40574a2d48..84a73921ca 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -170,6 +170,7 @@ def resume # these chunks(unstaged chunks) has shared the same metadata # So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364 if chunk_size_full?(chunk) || stage.key?(chunk.metadata) + chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num queue << chunk.enqueued! else stage[chunk.metadata] = chunk diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 8ed55c2f85..3e8f95edda 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -363,6 +363,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) u = unstaged_chunks[m].pop u.synchronize do if u.unstaged? && !chunk_size_full?(u) + # `u.metadata.seq` and `m.seq` can be different but Buffer#enqueue_chunk expect them to be the same value + u.metadata.seq = 0 synchronize { @stage[m] = u.staged! @stage_size += u.bytesize @@ -426,6 +428,7 @@ def enqueue_chunk(metadata) if chunk.empty? chunk.close else + chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 chunk.enqueued! @@ -444,6 +447,7 @@ def enqueue_unstaged_chunk(chunk) synchronize do chunk.synchronize do metadata = chunk.metadata + metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 chunk.enqueued! diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index c010f2bac2..e073ef6eae 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -1072,7 +1072,7 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) assert_equal 3, s.size assert_equal [0, 1, 2], s.keys.map(&:seq).sort assert_equal 1, e.size - assert_equal [2], e.map { |e| e.metadata.seq } + assert_equal [0], e.map { |e| e.metadata.seq } end end diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 23506e4447..0b995dd6e8 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -903,13 +903,26 @@ def create_chunk_es(metadata, es) # metadata whose seq is 4 is created, but overwrite with original metadata(seq=0) for next use of this chunk https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L357 assert_equal [@dm0], @p.stage.keys assert_equal 5400, @p.stage[@dm0].size - r = [@dm0] - 3.times { |i| r << r[i].dup_next } - assert_equal [@dm0, *r], @p.queue.map(&:metadata) + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata) assert_equal [5000, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ... # 9900 * 4 + 5400 == 45000 end + test '#dequeue_chunk succeeds when chunk is splited' do + assert_equal [@dm0], @p.stage.keys + assert_equal [], @p.queue.map(&:metadata) + + assert_equal 1_280_000, @p.chunk_limit_size + + es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * (128 - 22)}] ] * 45000) + @p.write({@dm0 => es}, format: @format) + @p.enqueue_all(true) + + dequeued_chunks = 6.times.map { |e| @p.dequeue_chunk } # splits: 45000 / 100 => 450 * ... + assert_equal [5000, 9900, 9900, 9900, 9900, 5400], dequeued_chunks.map(&:size) + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0, @dm0], dequeued_chunks.map(&:metadata) + end + test '#write raises BufferChunkOverflowError if a record is biggar than chunk limit size' do assert_equal [@dm0], @p.stage.keys assert_equal [], @p.queue.map(&:metadata) @@ -993,9 +1006,7 @@ def create_chunk_es(metadata, es) assert_equal [@dm0], @p.stage.keys assert_equal 900, @p.stage[@dm0].size - r = [@dm0] - 4.times { |i| r << r[i].dup_next } - assert_equal r, @p.queue.map(&:metadata) + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata) assert_equal [9500, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ... ##### 900 + 9500 + 9900 * 4 == 5000 + 45000 end