Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove @metadata_list from buffer.rb #2563

Merged
merged 6 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 37 additions & 55 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def initialize

@stage_size = @queue_size = 0
@timekeys = Hash.new(0)
@metadata_list = [] # keys of @stage
@mutex = Mutex.new
end

def persistent?
Expand All @@ -175,16 +175,18 @@ def start

@stage, @queue = resume
@stage.each_pair do |metadata, chunk|
@metadata_list << metadata unless @metadata_list.include?(metadata)
@stage_size += chunk.bytesize
add_timekey(metadata)
if chunk.metadata && chunk.metadata.timekey
add_timekey(metadata.timekey)
end
end
@queue.each do |chunk|
@metadata_list << chunk.metadata unless @metadata_list.include?(chunk.metadata)
@queued_num[chunk.metadata] ||= 0
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
add_timekey(chunk.metadata)
if chunk.metadata && chunk.metadata.timekey
add_timekey(chunk.metadata.timekey)
end
end
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end
Expand All @@ -207,7 +209,7 @@ def close

def terminate
super
@dequeued = @stage = @queue = @queued_num = @metadata_list = nil
@dequeued = @stage = @queue = @queued_num = nil
@stage_size = @queue_size = 0
@timekeys.clear
end
Expand All @@ -230,61 +232,17 @@ def generate_chunk(metadata)
raise NotImplementedError, "Implement this method in child class"
end

def metadata_list
synchronize do
@metadata_list.dup
end
end

# it's too dangerous, and use it so carefully to remove metadata for tests
def metadata_list_clear!
synchronize do
@metadata_list.clear
end
end

def new_metadata(timekey: nil, tag: nil, variables: nil)
Metadata.new(timekey, tag, variables)
end

def add_metadata(metadata)
log.on_trace { log.trace "adding metadata", instance: self.object_id, metadata: metadata }

synchronize do
if i = @metadata_list.index(metadata)
@metadata_list[i]
else
@metadata_list << metadata
add_timekey(metadata)
metadata
end
end
end

def metadata(timekey: nil, tag: nil, variables: nil)
meta = new_metadata(timekey: timekey, tag: tag, variables: variables)
add_metadata(meta)
end

def add_timekey(metadata)
if t = metadata.timekey
@timekeys[t] += 1
if (t = meta.timekey)
add_timekey(t)
end
nil
meta
end
private :add_timekey

def del_timekey(metadata)
if t = metadata.timekey
if @timekeys[t] <= 1
@timekeys.delete(t)
else
@timekeys[t] -= 1
end
end
nil
end
private :del_timekey

def timekeys
@timekeys.keys
Expand Down Expand Up @@ -514,6 +472,7 @@ def takeback_chunk(chunk_id)
end

def purge_chunk(chunk_id)
metadata = nil
synchronize do
chunk = @dequeued.delete(chunk_id)
return nil unless chunk # purged by other threads
Expand All @@ -532,13 +491,16 @@ def purge_chunk(chunk_id)

@dequeued_num[chunk.metadata] -= 1
if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) && @dequeued_num[metadata].zero?
@metadata_list.delete(metadata)
@queued_num.delete(metadata)
@dequeued_num.delete(metadata)
del_timekey(metadata)
end
log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end

if metadata && metadata.timekey
@mutex.synchronize { del_timekey(metadata.timekey) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mutex.synchronize is called in del_timekey

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot about it! thanks.
.5c81b40

end

nil
end

Expand Down Expand Up @@ -774,6 +736,26 @@ def statistics

{ 'buffer' => stats }
end

private

def add_timekey(t)
@mutex.synchronize do
@timekeys[t] += 1
end
nil
end

def del_timekey(t)
@mutex.synchronize do
if @timekeys[t] <= 1
@timekeys.delete(t)
else
@timekeys[t] -= 1
end
end
nil
end
end
end
end
16 changes: 14 additions & 2 deletions lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ class FileOutput < Output
attr_accessor :last_written_path # for tests

module SymlinkBufferMixin
def initialize
super
@latest_metadata = Metadata.new(0, nil, nil)
end

def metadata(timekey: nil, tag: nil, variables: nil)
metadata = super
if metadata.timekey && (metadata.timekey > @latest_metadata.timekey)
@latest_metadata = metadata
end
metadata
end

def output_plugin_for_symlink=(output_plugin)
@_output_plugin_for_symlink = output_plugin
end
Expand All @@ -86,8 +99,7 @@ def generate_chunk(metadata)
# timekey will be appended into that file chunk. On the other side, resumed file chunks might NOT
# have timekey, especially in the cases that resumed file chunks are generated by Fluentd v0.12.
# These chunks will be enqueued immediately, and will be flushed soon.
latest_metadata = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last
if chunk.metadata == latest_metadata
if chunk.metadata == @latest_metadata
sym_path = @_output_plugin_for_symlink.extract_placeholders(@_symlink_path, chunk)
FileUtils.mkdir_p(File.dirname(sym_path), mode: @_output_plugin_for_symlink.dir_perm)
FileUtils.ln_sf(chunk.path, sym_path)
Expand Down
9 changes: 1 addition & 8 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -857,15 +857,8 @@ def calculate_timekey(time)
def chunk_for_test(tag, time, record)
require 'fluent/plugin/buffer/memory_chunk'

m = metadata_for_test(tag, time, record)
Fluent::Plugin::Buffer::MemoryChunk.new(m)
end

def metadata_for_test(tag, time, record)
raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty?
m = metadata(tag, time, record)
@buffer.metadata_list_clear!
m
Fluent::Plugin::Buffer::MemoryChunk.new(m)
end

def execute_chunking(tag, es, enqueue: false)
Expand Down
52 changes: 4 additions & 48 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ def create_chunk_es(metadata, es)
assert_equal([], plugin.queue)
assert_equal({}, plugin.dequeued)
assert_equal({}, plugin.queued_num)
assert_equal([], plugin.metadata_list)

assert_equal 0, plugin.stage_size
assert_equal 0, plugin.queue_size
Expand All @@ -207,7 +206,6 @@ def create_chunk_es(metadata, es)
assert_equal 203, @p.queue_size

# staged, queued
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
assert_equal 1, @p.queued_num[@dm0]
assert_equal 2, @p.queued_num[@dm1]
end
Expand Down Expand Up @@ -240,46 +238,11 @@ def create_chunk_es(metadata, es)
assert_nil @p.queue
assert_nil @p.dequeued
assert_nil @p.queued_num
assert_nil @p.instance_eval{ @metadata_list } # #metadata_list does #dup for @metadata_list
assert_equal 0, @p.stage_size
assert_equal 0, @p.queue_size
assert_equal [], @p.timekeys
end

test '#metadata_list returns list of metadata on stage or in queue' do
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
end

test '#new_metadata creates metadata instance without inserting metadata_list' do
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
_m = @p.new_metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
end

test '#add_metadata adds unknown metadata into list, or return known metadata if already exists' do
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m = @p.new_metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
_mx = @p.add_metadata(m)
assert_equal [@dm2,@dm3,@dm0,@dm1,m], @p.metadata_list
assert_equal m.object_id, m.object_id

my = @p.add_metadata(@dm1)
assert_equal [@dm2,@dm3,@dm0,@dm1,m], @p.metadata_list
assert_equal @dm1, my
assert{ @dm1.object_id != my.object_id } # 'my' is an object created in #resume
end

test '#metadata is utility method to create-add-and-return metadata' do
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m1 = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
assert_equal [@dm2,@dm3,@dm0,@dm1,m1], @p.metadata_list
m2 = @p.metadata(timekey: @dm3.timekey)
assert_equal [@dm2,@dm3,@dm0,@dm1,m1], @p.metadata_list
assert_equal @dm3, m2
end

test '#queued_records returns total number of size in all chunks in queue' do
assert_equal 3, @p.queue.size

Expand Down Expand Up @@ -430,7 +393,6 @@ def create_chunk_es(metadata, es)
test '#purge_chunk removes a chunk specified by argument id from dequeued chunks' do
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m0 = @p.dequeue_chunk
m1 = @p.dequeue_chunk
Expand All @@ -447,13 +409,11 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
end

test '#purge_chunk removes an argument metadata from metadata_list if no chunks exist on stage or in queue' do
test '#purge_chunk removes an argument metadata if no chunks exist on stage or in queue' do
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m0 = @p.dequeue_chunk

Expand All @@ -467,13 +427,11 @@ def create_chunk_es(metadata, es)

assert_equal [@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm1], @p.metadata_list
end

test '#takeback_chunk returns false if specified chunk_id is already purged' do
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m0 = @p.dequeue_chunk

Expand All @@ -487,13 +445,11 @@ def create_chunk_es(metadata, es)

assert_equal [@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm1], @p.metadata_list

assert [email protected]_chunk(m0.unique_id)

assert_equal [@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm1], @p.metadata_list
end

test '#clear_queue! removes all chunks in queue, but leaves staged chunks' do
Expand Down Expand Up @@ -575,7 +531,7 @@ def create_chunk_es(metadata, es)
assert [email protected]?(timekey)

prev_stage_size = @p.stage_size

m = @p.metadata(timekey: timekey)

@p.write({m => ["x" * 256, "y" * 256, "z" * 256]})
Expand Down Expand Up @@ -695,7 +651,7 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3], @p.stage.keys

timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i
assert [email protected]?(timekey)

Expand All @@ -718,7 +674,7 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3,m], @p.stage.keys
assert_equal 1, @p.stage[m].append_count

assert @p.timekeys.include?(timekey)
end

Expand Down