Skip to content

Commit

Permalink
Add mitigate code for existing code
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Feb 14, 2020
1 parent 71a346d commit 0538d26
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
13 changes: 11 additions & 2 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,16 @@ def resume

case chunk.state
when :staged
stage[chunk.metadata] = chunk
# unstaged chunk created at Buffer#write_step_by_step is identified as the staged chunk here because FileChunk#assume_chunk_state checks only the file name.
# https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L663
# This case can happen when fluentd process is killed by signal or other reasons between creating unstaged chunks and changing them to staged mode in Buffer#write
# these chunks(unstaged chunks) has shared the same metadata
# So perform enqueue step again https://github.com/fluent/fluentd/blob/9d113029d4550ce576d8825bfa9612aa3e55bff0/lib/fluent/plugin/buffer.rb#L364
if chunk_size_full?(chunk) || stage.key?(chunk.metadata)
queue << chunk.enqueued!
else
stage[chunk.metadata] = chunk
end
when :queued
queue << chunk
end
Expand All @@ -181,7 +190,7 @@ def generate_chunk(metadata)
# FileChunk generates real path with unique_id
perm = @file_permission || system_config.file_permission
chunk = Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: perm, compress: @compress)
log.debug "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata
log.info "Created new chunk", chunk_id: dump_unique_id_hex(chunk.unique_id), metadata: metadata

return chunk
end
Expand Down
14 changes: 12 additions & 2 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,15 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay6"}].to_json + "\n"
end
m3 = m2.dup_next
write_metadata(p3 + '.meta', c1id, m3, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)
write_metadata(p3 + '.meta', c3id, m3, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)

c4id = Fluent::UniqueId.generate
p4 = File.join(@bufdir, "testbuf.b#{Fluent::UniqueId.hex(c4id)}.log")
File.open(p4, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay5"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay6"}].to_json + "\n"
end
write_metadata(p4 + '.meta', c4id, m3, 2, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i)

Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
Expand Down Expand Up @@ -1060,9 +1068,11 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
end

test '#resume returns each chunks' do
s, _ = @p.resume
s, e = @p.resume
assert_equal 3, s.size
assert_equal [0, 1, 2], s.keys.map(&:seq).sort
assert_equal 1, e.size
assert_equal [2], e.map { |e| e.metadata.seq }
end
end

Expand Down

0 comments on commit 0538d26

Please sign in to comment.