From 77654e9bd625d7ccb8e1e43423f33e82e80d4009 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 3 Mar 2020 14:52:53 +0900 Subject: [PATCH 1/2] set u.message.seq to 0 To avoid raising NoMethodError at https://github.com/fluent/fluentd/blob/34987470df995965f4791c6d9ba2dd1c272a70ad/lib/fluent/plugin/buffer.rb#L488. If it doesn't set the value, `metadata` and `chunk.metadata` can be different in [Buffer#enqueue_chunk](https://github.com/fluent/fluentd/blob/34987470df995965f4791c6d9ba2dd1c272a70ad/lib/fluent/plugin/buffer.rb#L430). Then, `@queued_name`'s key and `chunk.metadata` can be different. As a result, it raises an error at https://github.com/fluent/fluentd/blob/34987470df995965f4791c6d9ba2dd1c272a70ad/lib/fluent/plugin/buffer.rb#L488 Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buffer.rb | 2 ++ test/plugin/test_buffer.rb | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 8ed55c2f85..57eba6e2b8 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -363,6 +363,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) u = unstaged_chunks[m].pop u.synchronize do if u.unstaged? && !chunk_size_full?(u) + # `u.metadata.seq` and `m.seq` can be different but Buffer#enqueue_chunk expect them to be the same value + u.metadata.seq = 0 synchronize { @stage[m] = u.staged! @stage_size += u.bytesize diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 23506e4447..17528daa3b 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -910,6 +910,23 @@ def create_chunk_es(metadata, es) # 9900 * 4 + 5400 == 45000 end + test '#dequeue_chunk succeeds when chunk is splited' do + assert_equal [@dm0], @p.stage.keys + assert_equal [], @p.queue.map(&:metadata) + + assert_equal 1_280_000, @p.chunk_limit_size + + es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * (128 - 22)}] ] * 45000) + @p.write({@dm0 => es}, format: @format) + @p.enqueue_all(true) + + dequeued_chunks = 6.times.map { |e| @p.dequeue_chunk } # splits: 45000 / 100 => 450 * ... + assert_equal [5000, 9900, 9900, 9900, 9900, 5400], dequeued_chunks.map(&:size) + r = [@dm0] + 3.times { |i| r << r[i].dup_next } + assert_equal [@dm0, *r, @dm0], dequeued_chunks.map(&:metadata) # last last one's metadata.seq is 0 + end + test '#write raises BufferChunkOverflowError if a record is biggar than chunk limit size' do assert_equal [@dm0], @p.stage.keys assert_equal [], @p.queue.map(&:metadata) From 8881a9e9478b5f10da7c1ded4d80d049ac054550 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 3 Mar 2020 15:16:28 +0900 Subject: [PATCH 2/2] metadata.seq should be 0 when counting `@queued_num`, the difference of seq's value cause the bug at Buffer#optimistic_queued? https://github.com/fluent/fluentd/blob/34987470df995965f4791c6d9ba2dd1c272a70ad/lib/fluent/plugin/buffer.rb#L782 Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buf_file.rb | 1 + lib/fluent/plugin/buffer.rb | 2 ++ test/plugin/test_buf_file.rb | 2 +- test/plugin/test_buffer.rb | 12 +++--------- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 40574a2d48..84a73921ca 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -170,6 +170,7 @@ def resume # these chunks(unstaged chunks) has shared the same metadata # So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364 if chunk_size_full?(chunk) || stage.key?(chunk.metadata) + chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num queue << chunk.enqueued! else stage[chunk.metadata] = chunk diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 57eba6e2b8..3e8f95edda 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -428,6 +428,7 @@ def enqueue_chunk(metadata) if chunk.empty? chunk.close else + chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 chunk.enqueued! @@ -446,6 +447,7 @@ def enqueue_unstaged_chunk(chunk) synchronize do chunk.synchronize do metadata = chunk.metadata + metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 chunk.enqueued! diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index c010f2bac2..e073ef6eae 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -1072,7 +1072,7 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) assert_equal 3, s.size assert_equal [0, 1, 2], s.keys.map(&:seq).sort assert_equal 1, e.size - assert_equal [2], e.map { |e| e.metadata.seq } + assert_equal [0], e.map { |e| e.metadata.seq } end end diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 17528daa3b..0b995dd6e8 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -903,9 +903,7 @@ def create_chunk_es(metadata, es) # metadata whose seq is 4 is created, but overwrite with original metadata(seq=0) for next use of this chunk https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L357 assert_equal [@dm0], @p.stage.keys assert_equal 5400, @p.stage[@dm0].size - r = [@dm0] - 3.times { |i| r << r[i].dup_next } - assert_equal [@dm0, *r], @p.queue.map(&:metadata) + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata) assert_equal [5000, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ... # 9900 * 4 + 5400 == 45000 end @@ -922,9 +920,7 @@ def create_chunk_es(metadata, es) dequeued_chunks = 6.times.map { |e| @p.dequeue_chunk } # splits: 45000 / 100 => 450 * ... assert_equal [5000, 9900, 9900, 9900, 9900, 5400], dequeued_chunks.map(&:size) - r = [@dm0] - 3.times { |i| r << r[i].dup_next } - assert_equal [@dm0, *r, @dm0], dequeued_chunks.map(&:metadata) # last last one's metadata.seq is 0 + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0, @dm0], dequeued_chunks.map(&:metadata) end test '#write raises BufferChunkOverflowError if a record is biggar than chunk limit size' do @@ -1010,9 +1006,7 @@ def create_chunk_es(metadata, es) assert_equal [@dm0], @p.stage.keys assert_equal 900, @p.stage[@dm0].size - r = [@dm0] - 4.times { |i| r << r[i].dup_next } - assert_equal r, @p.queue.map(&:metadata) + assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata) assert_equal [9500, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ... ##### 900 + 9500 + 9900 * 4 == 5000 + 45000 end