Skip to content

Commit

Permalink
Merge pull request #2563 from ganmacs/remove-unused-data
Browse files Browse the repository at this point in the history
Remove `@metadata_list` from buffer.rb
  • Loading branch information
repeatedly authored Aug 16, 2019
2 parents d7dcda5 + 829b3aa commit d3c8766
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 114 deletions.
94 changes: 38 additions & 56 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def initialize

@stage_size = @queue_size = 0
@timekeys = Hash.new(0)
@metadata_list = [] # keys of @stage
@mutex = Mutex.new
end

def persistent?
Expand All @@ -175,16 +175,18 @@ def start

@stage, @queue = resume
@stage.each_pair do |metadata, chunk|
@metadata_list << metadata unless @metadata_list.include?(metadata)
@stage_size += chunk.bytesize
add_timekey(metadata)
if chunk.metadata && chunk.metadata.timekey
add_timekey(metadata.timekey)
end
end
@queue.each do |chunk|
@metadata_list << chunk.metadata unless @metadata_list.include?(chunk.metadata)
@queued_num[chunk.metadata] ||= 0
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
add_timekey(chunk.metadata)
if chunk.metadata && chunk.metadata.timekey
add_timekey(chunk.metadata.timekey)
end
end
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end
Expand All @@ -207,7 +209,7 @@ def close

def terminate
super
@dequeued = @stage = @queue = @queued_num = @metadata_list = nil
@dequeued = @stage = @queue = @queued_num = nil
@stage_size = @queue_size = 0
@timekeys.clear
end
Expand All @@ -230,61 +232,17 @@ def generate_chunk(metadata)
raise NotImplementedError, "Implement this method in child class"
end

def metadata_list
synchronize do
@metadata_list.dup
end
end

# it's too dangerous, and use it so carefully to remove metadata for tests
def metadata_list_clear!
synchronize do
@metadata_list.clear
end
end

def new_metadata(timekey: nil, tag: nil, variables: nil)
Metadata.new(timekey, tag, variables)
end

def add_metadata(metadata)
log.on_trace { log.trace "adding metadata", instance: self.object_id, metadata: metadata }

synchronize do
if i = @metadata_list.index(metadata)
@metadata_list[i]
else
@metadata_list << metadata
add_timekey(metadata)
metadata
end
end
end

def metadata(timekey: nil, tag: nil, variables: nil)
meta = new_metadata(timekey: timekey, tag: tag, variables: variables)
add_metadata(meta)
end

def add_timekey(metadata)
if t = metadata.timekey
@timekeys[t] += 1
meta = Metadata.new(timekey, tag, variables)
if (t = meta.timekey)
add_timekey(t)
end
nil
meta
end
private :add_timekey

def del_timekey(metadata)
if t = metadata.timekey
if @timekeys[t] <= 1
@timekeys.delete(t)
else
@timekeys[t] -= 1
end
end
nil
end
private :del_timekey

def timekeys
@timekeys.keys
Expand Down Expand Up @@ -514,6 +472,7 @@ def takeback_chunk(chunk_id)
end

def purge_chunk(chunk_id)
metadata = nil
synchronize do
chunk = @dequeued.delete(chunk_id)
return nil unless chunk # purged by other threads
Expand All @@ -532,13 +491,16 @@ def purge_chunk(chunk_id)

@dequeued_num[chunk.metadata] -= 1
if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) && @dequeued_num[metadata].zero?
@metadata_list.delete(metadata)
@queued_num.delete(metadata)
@dequeued_num.delete(metadata)
del_timekey(metadata)
end
log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end

if metadata && metadata.timekey
del_timekey(metadata.timekey)
end

nil
end

Expand Down Expand Up @@ -774,6 +736,26 @@ def statistics

{ 'buffer' => stats }
end

private

def add_timekey(t)
@mutex.synchronize do
@timekeys[t] += 1
end
nil
end

def del_timekey(t)
@mutex.synchronize do
if @timekeys[t] <= 1
@timekeys.delete(t)
else
@timekeys[t] -= 1
end
end
nil
end
end
end
end
14 changes: 12 additions & 2 deletions lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ class FileOutput < Output
attr_accessor :last_written_path # for tests

module SymlinkBufferMixin
def metadata(timekey: nil, tag: nil, variables: nil)
metadata = super

@latest_metadata ||= new_metadata(timekey: 0)
if metadata.timekey && (metadata.timekey >= @latest_metadata.timekey)
@latest_metadata = metadata
end

metadata
end

def output_plugin_for_symlink=(output_plugin)
@_output_plugin_for_symlink = output_plugin
end
Expand All @@ -86,8 +97,7 @@ def generate_chunk(metadata)
# timekey will be appended into that file chunk. On the other side, resumed file chunks might NOT
# have timekey, especially in the cases that resumed file chunks are generated by Fluentd v0.12.
# These chunks will be enqueued immediately, and will be flushed soon.
latest_metadata = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last
if chunk.metadata == latest_metadata
if chunk.metadata == @latest_metadata
sym_path = @_output_plugin_for_symlink.extract_placeholders(@_symlink_path, chunk)
FileUtils.mkdir_p(File.dirname(sym_path), mode: @_output_plugin_for_symlink.dir_perm)
FileUtils.ln_sf(chunk.path, sym_path)
Expand Down
9 changes: 1 addition & 8 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -857,15 +857,8 @@ def calculate_timekey(time)
def chunk_for_test(tag, time, record)
require 'fluent/plugin/buffer/memory_chunk'

m = metadata_for_test(tag, time, record)
Fluent::Plugin::Buffer::MemoryChunk.new(m)
end

def metadata_for_test(tag, time, record)
raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty?
m = metadata(tag, time, record)
@buffer.metadata_list_clear!
m
Fluent::Plugin::Buffer::MemoryChunk.new(m)
end

def execute_chunking(tag, es, enqueue: false)
Expand Down
52 changes: 4 additions & 48 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ def create_chunk_es(metadata, es)
assert_equal([], plugin.queue)
assert_equal({}, plugin.dequeued)
assert_equal({}, plugin.queued_num)
assert_equal([], plugin.metadata_list)

assert_equal 0, plugin.stage_size
assert_equal 0, plugin.queue_size
Expand All @@ -207,7 +206,6 @@ def create_chunk_es(metadata, es)
assert_equal 203, @p.queue_size

# staged, queued
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
assert_equal 1, @p.queued_num[@dm0]
assert_equal 2, @p.queued_num[@dm1]
end
Expand Down Expand Up @@ -240,46 +238,11 @@ def create_chunk_es(metadata, es)
assert_nil @p.queue
assert_nil @p.dequeued
assert_nil @p.queued_num
assert_nil @p.instance_eval{ @metadata_list } # #metadata_list does #dup for @metadata_list
assert_equal 0, @p.stage_size
assert_equal 0, @p.queue_size
assert_equal [], @p.timekeys
end

test '#metadata_list returns list of metadata on stage or in queue' do
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
end

test '#new_metadata creates metadata instance without inserting metadata_list' do
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
_m = @p.new_metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
end

test '#add_metadata adds unknown metadata into list, or return known metadata if already exists' do
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m = @p.new_metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
_mx = @p.add_metadata(m)
assert_equal [@dm2,@dm3,@dm0,@dm1,m], @p.metadata_list
assert_equal m.object_id, m.object_id

my = @p.add_metadata(@dm1)
assert_equal [@dm2,@dm3,@dm0,@dm1,m], @p.metadata_list
assert_equal @dm1, my
assert{ @dm1.object_id != my.object_id } # 'my' is an object created in #resume
end

test '#metadata is utility method to create-add-and-return metadata' do
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m1 = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)
assert_equal [@dm2,@dm3,@dm0,@dm1,m1], @p.metadata_list
m2 = @p.metadata(timekey: @dm3.timekey)
assert_equal [@dm2,@dm3,@dm0,@dm1,m1], @p.metadata_list
assert_equal @dm3, m2
end

test '#queued_records returns total number of size in all chunks in queue' do
assert_equal 3, @p.queue.size

Expand Down Expand Up @@ -430,7 +393,6 @@ def create_chunk_es(metadata, es)
test '#purge_chunk removes a chunk specified by argument id from dequeued chunks' do
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m0 = @p.dequeue_chunk
m1 = @p.dequeue_chunk
Expand All @@ -447,13 +409,11 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list
end

test '#purge_chunk removes an argument metadata from metadata_list if no chunks exist on stage or in queue' do
test '#purge_chunk removes an argument metadata if no chunks exist on stage or in queue' do
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m0 = @p.dequeue_chunk

Expand All @@ -467,13 +427,11 @@ def create_chunk_es(metadata, es)

assert_equal [@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm1], @p.metadata_list
end

test '#takeback_chunk returns false if specified chunk_id is already purged' do
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list

m0 = @p.dequeue_chunk

Expand All @@ -487,13 +445,11 @@ def create_chunk_es(metadata, es)

assert_equal [@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm1], @p.metadata_list

assert !@p.takeback_chunk(m0.unique_id)

assert_equal [@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal({}, @p.dequeued)
assert_equal [@dm2,@dm3,@dm1], @p.metadata_list
end

test '#clear_queue! removes all chunks in queue, but leaves staged chunks' do
Expand Down Expand Up @@ -575,7 +531,7 @@ def create_chunk_es(metadata, es)
assert !@p.timekeys.include?(timekey)

prev_stage_size = @p.stage_size

m = @p.metadata(timekey: timekey)

@p.write({m => ["x" * 256, "y" * 256, "z" * 256]})
Expand Down Expand Up @@ -695,7 +651,7 @@ def create_chunk_es(metadata, es)

assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3], @p.stage.keys

timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i
assert !@p.timekeys.include?(timekey)

Expand All @@ -718,7 +674,7 @@ def create_chunk_es(metadata, es)
assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata)
assert_equal [@dm2,@dm3,m], @p.stage.keys
assert_equal 1, @p.stage[m].append_count

assert @p.timekeys.include?(timekey)
end

Expand Down

0 comments on commit d3c8766

Please sign in to comment.