Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When chunks are queued, their metadata.seq should be 0 #2853

Merged
merged 2 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def resume
# these chunks(unstaged chunks) has shared the same metadata
# So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364
if chunk_size_full?(chunk) || stage.key?(chunk.metadata)
chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
queue << chunk.enqueued!
else
stage[chunk.metadata] = chunk
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
u = unstaged_chunks[m].pop
u.synchronize do
if u.unstaged? && !chunk_size_full?(u)
# `u.metadata.seq` and `m.seq` can be different but Buffer#enqueue_chunk expect them to be the same value
u.metadata.seq = 0
synchronize {
@stage[m] = u.staged!
@stage_size += u.bytesize
Expand Down Expand Up @@ -426,6 +428,7 @@ def enqueue_chunk(metadata)
if chunk.empty?
chunk.close
else
chunk.metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
Expand All @@ -444,6 +447,7 @@ def enqueue_unstaged_chunk(chunk)
synchronize do
chunk.synchronize do
metadata = chunk.metadata
metadata.seq = 0 # metadata.seq should be 0 for counting @queued_num
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
assert_equal 3, s.size
assert_equal [0, 1, 2], s.keys.map(&:seq).sort
assert_equal 1, e.size
assert_equal [2], e.map { |e| e.metadata.seq }
assert_equal [0], e.map { |e| e.metadata.seq }
end
end

Expand Down
23 changes: 17 additions & 6 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -903,13 +903,26 @@ def create_chunk_es(metadata, es)
# metadata whose seq is 4 is created, but overwrite with original metadata(seq=0) for next use of this chunk https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L357
assert_equal [@dm0], @p.stage.keys
assert_equal 5400, @p.stage[@dm0].size
r = [@dm0]
3.times { |i| r << r[i].dup_next }
assert_equal [@dm0, *r], @p.queue.map(&:metadata)
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata)
assert_equal [5000, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ...
# 9900 * 4 + 5400 == 45000
end

test '#dequeue_chunk succeeds when chunk is splited' do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)

assert_equal 1_280_000, @p.chunk_limit_size

es = Fluent::ArrayEventStream.new([ [event_time('2016-04-11 16:00:02 +0000'), {"message" => "x" * (128 - 22)}] ] * 45000)
@p.write({@dm0 => es}, format: @format)
@p.enqueue_all(true)

dequeued_chunks = 6.times.map { |e| @p.dequeue_chunk } # splits: 45000 / 100 => 450 * ...
assert_equal [5000, 9900, 9900, 9900, 9900, 5400], dequeued_chunks.map(&:size)
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0, @dm0], dequeued_chunks.map(&:metadata)
end

test '#write raises BufferChunkOverflowError if a record is biggar than chunk limit size' do
assert_equal [@dm0], @p.stage.keys
assert_equal [], @p.queue.map(&:metadata)
Expand Down Expand Up @@ -993,9 +1006,7 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0], @p.stage.keys
assert_equal 900, @p.stage[@dm0].size
r = [@dm0]
4.times { |i| r << r[i].dup_next }
assert_equal r, @p.queue.map(&:metadata)
assert_equal [@dm0, @dm0, @dm0, @dm0, @dm0], @p.queue.map(&:metadata)
assert_equal [9500, 9900, 9900, 9900, 9900], @p.queue.map(&:size) # splits: 45000 / 100 => 450 * ...
##### 900 + 9500 + 9900 * 4 == 5000 + 45000
end
Expand Down