From bbdbb4c2906083d78d92bcaf6653fafd1bc51fe0 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Thu, 15 Aug 2019 14:22:03 +0900 Subject: [PATCH 1/6] Remove `@metadata_list` it is not used and a bit large overhead for performance. Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buffer.rb | 30 ++++-------------------------- lib/fluent/plugin/out_file.rb | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index fbf3559436..b3f132a974 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -155,7 +155,6 @@ def initialize @stage_size = @queue_size = 0 @timekeys = Hash.new(0) - @metadata_list = [] # keys of @stage end def persistent? @@ -175,12 +174,10 @@ def start @stage, @queue = resume @stage.each_pair do |metadata, chunk| - @metadata_list << metadata unless @metadata_list.include?(metadata) @stage_size += chunk.bytesize add_timekey(metadata) end @queue.each do |chunk| - @metadata_list << chunk.metadata unless @metadata_list.include?(chunk.metadata) @queued_num[chunk.metadata] ||= 0 @queued_num[chunk.metadata] += 1 @queue_size += chunk.bytesize @@ -207,7 +204,7 @@ def close def terminate super - @dequeued = @stage = @queue = @queued_num = @metadata_list = nil + @dequeued = @stage = @queue = @queued_num = nil @stage_size = @queue_size = 0 @timekeys.clear end @@ -231,39 +228,21 @@ def generate_chunk(metadata) end def metadata_list - synchronize do - @metadata_list.dup - end + # for compat + [] end # it's too dangerous, and use it so carefully to remove metadata for tests def metadata_list_clear! - synchronize do - @metadata_list.clear - end + # for compat end def new_metadata(timekey: nil, tag: nil, variables: nil) Metadata.new(timekey, tag, variables) end - def add_metadata(metadata) - log.on_trace { log.trace "adding metadata", instance: self.object_id, metadata: metadata } - - synchronize do - if i = @metadata_list.index(metadata) - @metadata_list[i] - else - @metadata_list << metadata - add_timekey(metadata) - metadata - end - end - end - def metadata(timekey: nil, tag: nil, variables: nil) meta = new_metadata(timekey: timekey, tag: tag, variables: variables) - add_metadata(meta) end def add_timekey(metadata) @@ -532,7 +511,6 @@ def purge_chunk(chunk_id) @dequeued_num[chunk.metadata] -= 1 if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) && @dequeued_num[metadata].zero? - @metadata_list.delete(metadata) @queued_num.delete(metadata) @dequeued_num.delete(metadata) del_timekey(metadata) diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index bac6608afa..7e3a9973d5 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -72,6 +72,19 @@ class FileOutput < Output attr_accessor :last_written_path # for tests module SymlinkBufferMixin + def initialize + super + @latest_metadata = Metadata.new(0, nil, nil) + end + + def metadata(timekey: nil, tag: nil, variables: nil) + metadata = super + if metadata.timekey && (metadata.timekey > @latest_metadata.timekey) + @latest_metadata = metadata + end + metadata + end + def output_plugin_for_symlink=(output_plugin) @_output_plugin_for_symlink = output_plugin end @@ -86,8 +99,7 @@ def generate_chunk(metadata) # timekey will be appended into that file chunk. On the other side, resumed file chunks might NOT # have timekey, especially in the cases that resumed file chunks are generated by Fluentd v0.12. # These chunks will be enqueued immediately, and will be flushed soon. - latest_metadata = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last - if chunk.metadata == latest_metadata + if chunk.metadata == @latest_metadata sym_path = @_output_plugin_for_symlink.extract_placeholders(@_symlink_path, chunk) FileUtils.mkdir_p(File.dirname(sym_path), mode: @_output_plugin_for_symlink.dir_perm) FileUtils.ln_sf(chunk.path, sym_path) From 0d90aba5cbc1faffe53aa3c3c1816d2cb37517ee Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Thu, 15 Aug 2019 14:23:56 +0900 Subject: [PATCH 2/6] re-support add_timekey and del_timekey Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buffer.rb | 58 +++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index b3f132a974..461875ebfa 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -155,6 +155,7 @@ def initialize @stage_size = @queue_size = 0 @timekeys = Hash.new(0) + @mutex = Mutex.new end def persistent? @@ -175,13 +176,17 @@ def start @stage, @queue = resume @stage.each_pair do |metadata, chunk| @stage_size += chunk.bytesize - add_timekey(metadata) + if chunk.metadata && chunk.metadata.timekey + add_timekey(metadata.timekey) + end end @queue.each do |chunk| @queued_num[chunk.metadata] ||= 0 @queued_num[chunk.metadata] += 1 @queue_size += chunk.bytesize - add_timekey(chunk.metadata) + if chunk.metadata && chunk.metadata.timekey + add_timekey(chunk.metadata.timekey) + end end log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size end @@ -243,27 +248,11 @@ def new_metadata(timekey: nil, tag: nil, variables: nil) def metadata(timekey: nil, tag: nil, variables: nil) meta = new_metadata(timekey: timekey, tag: tag, variables: variables) - end - - def add_timekey(metadata) - if t = metadata.timekey - @timekeys[t] += 1 - end - nil - end - private :add_timekey - - def del_timekey(metadata) - if t = metadata.timekey - if @timekeys[t] <= 1 - @timekeys.delete(t) - else - @timekeys[t] -= 1 - end + if (t = meta.timekey) + add_timekey(t) end - nil + meta end - private :del_timekey def timekeys @timekeys.keys @@ -493,6 +482,7 @@ def takeback_chunk(chunk_id) end def purge_chunk(chunk_id) + metadata = nil synchronize do chunk = @dequeued.delete(chunk_id) return nil unless chunk # purged by other threads @@ -513,10 +503,14 @@ def purge_chunk(chunk_id) if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) && @dequeued_num[metadata].zero? @queued_num.delete(metadata) @dequeued_num.delete(metadata) - del_timekey(metadata) end log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata end + + if metadata && metadata.timekey + @mutex.synchronize { del_timekey(metadata.timekey) } + end + nil end @@ -752,6 +746,26 @@ def statistics { 'buffer' => stats } end + + private + + def add_timekey(t) + @mutex.synchronize do + @timekeys[t] += 1 + end + nil + end + + def del_timekey(t) + @mutex.synchronize do + if @timekeys[t] <= 1 + @timekeys.delete(t) + else + @timekeys[t] -= 1 + end + end + nil + end end end end From 2421e47916f9f949d0fae18e6f39ae4679502db9 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Thu, 15 Aug 2019 14:34:08 +0900 Subject: [PATCH 3/6] Remove metadata_list public method Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buffer.rb | 10 ------- lib/fluent/plugin/output.rb | 9 +------ test/plugin/test_buffer.rb | 52 +++---------------------------------- 3 files changed, 5 insertions(+), 66 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 461875ebfa..4195901461 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -232,16 +232,6 @@ def generate_chunk(metadata) raise NotImplementedError, "Implement this method in child class" end - def metadata_list - # for compat - [] - end - - # it's too dangerous, and use it so carefully to remove metadata for tests - def metadata_list_clear! - # for compat - end - def new_metadata(timekey: nil, tag: nil, variables: nil) Metadata.new(timekey, tag, variables) end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index be44e3990b..ca4885f9c6 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -857,15 +857,8 @@ def calculate_timekey(time) def chunk_for_test(tag, time, record) require 'fluent/plugin/buffer/memory_chunk' - m = metadata_for_test(tag, time, record) - Fluent::Plugin::Buffer::MemoryChunk.new(m) - end - - def metadata_for_test(tag, time, record) - raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty? m = metadata(tag, time, record) - @buffer.metadata_list_clear! - m + Fluent::Plugin::Buffer::MemoryChunk.new(m) end def execute_chunking(tag, es, enqueue: false) diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 51c241238d..3f443a118e 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -185,7 +185,6 @@ def create_chunk_es(metadata, es) assert_equal([], plugin.queue) assert_equal({}, plugin.dequeued) assert_equal({}, plugin.queued_num) - assert_equal([], plugin.metadata_list) assert_equal 0, plugin.stage_size assert_equal 0, plugin.queue_size @@ -207,7 +206,6 @@ def create_chunk_es(metadata, es) assert_equal 203, @p.queue_size # staged, queued - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list assert_equal 1, @p.queued_num[@dm0] assert_equal 2, @p.queued_num[@dm1] end @@ -240,46 +238,11 @@ def create_chunk_es(metadata, es) assert_nil @p.queue assert_nil @p.dequeued assert_nil @p.queued_num - assert_nil @p.instance_eval{ @metadata_list } # #metadata_list does #dup for @metadata_list assert_equal 0, @p.stage_size assert_equal 0, @p.queue_size assert_equal [], @p.timekeys end - test '#metadata_list returns list of metadata on stage or in queue' do - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - end - - test '#new_metadata creates metadata instance without inserting metadata_list' do - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - _m = @p.new_metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - end - - test '#add_metadata adds unknown metadata into list, or return known metadata if already exists' do - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - - m = @p.new_metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) - _mx = @p.add_metadata(m) - assert_equal [@dm2,@dm3,@dm0,@dm1,m], @p.metadata_list - assert_equal m.object_id, m.object_id - - my = @p.add_metadata(@dm1) - assert_equal [@dm2,@dm3,@dm0,@dm1,m], @p.metadata_list - assert_equal @dm1, my - assert{ @dm1.object_id != my.object_id } # 'my' is an object created in #resume - end - - test '#metadata is utility method to create-add-and-return metadata' do - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - - m1 = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) - assert_equal [@dm2,@dm3,@dm0,@dm1,m1], @p.metadata_list - m2 = @p.metadata(timekey: @dm3.timekey) - assert_equal [@dm2,@dm3,@dm0,@dm1,m1], @p.metadata_list - assert_equal @dm3, m2 - end - test '#queued_records returns total number of size in all chunks in queue' do assert_equal 3, @p.queue.size @@ -430,7 +393,6 @@ def create_chunk_es(metadata, es) test '#purge_chunk removes a chunk specified by argument id from dequeued chunks' do assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list m0 = @p.dequeue_chunk m1 = @p.dequeue_chunk @@ -447,13 +409,11 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list end - test '#purge_chunk removes an argument metadata from metadata_list if no chunks exist on stage or in queue' do + test '#purge_chunk removes an argument metadata if no chunks exist on stage or in queue' do assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list m0 = @p.dequeue_chunk @@ -467,13 +427,11 @@ def create_chunk_es(metadata, es) assert_equal [@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm1], @p.metadata_list end test '#takeback_chunk returns false if specified chunk_id is already purged' do assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list m0 = @p.dequeue_chunk @@ -487,13 +445,11 @@ def create_chunk_es(metadata, es) assert_equal [@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm1], @p.metadata_list assert !@p.takeback_chunk(m0.unique_id) assert_equal [@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm1], @p.metadata_list end test '#clear_queue! removes all chunks in queue, but leaves staged chunks' do @@ -575,7 +531,7 @@ def create_chunk_es(metadata, es) assert !@p.timekeys.include?(timekey) prev_stage_size = @p.stage_size - + m = @p.metadata(timekey: timekey) @p.write({m => ["x" * 256, "y" * 256, "z" * 256]}) @@ -695,7 +651,7 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal [@dm2,@dm3], @p.stage.keys - + timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i assert !@p.timekeys.include?(timekey) @@ -718,7 +674,7 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal [@dm2,@dm3,m], @p.stage.keys assert_equal 1, @p.stage[m].append_count - + assert @p.timekeys.include?(timekey) end From 5c81b40df6586e0ad659ef045ea0536b969e13b2 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Fri, 16 Aug 2019 12:07:56 +0900 Subject: [PATCH 4/6] Get a lock in #del_timekey. this is duplicated. Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buffer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 4195901461..51a15bbbc4 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -498,7 +498,7 @@ def purge_chunk(chunk_id) end if metadata && metadata.timekey - @mutex.synchronize { del_timekey(metadata.timekey) } + del_timekey(metadata.timekey) end nil From 53a1e76b34079dc444888de7294b3eb5c7fff811 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Fri, 16 Aug 2019 12:11:48 +0900 Subject: [PATCH 5/6] call Metadata.new directly Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buffer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 51a15bbbc4..2fdbe50606 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -237,7 +237,7 @@ def new_metadata(timekey: nil, tag: nil, variables: nil) end def metadata(timekey: nil, tag: nil, variables: nil) - meta = new_metadata(timekey: timekey, tag: tag, variables: variables) + meta = Metadata.new(timekey, tag, variables) if (t = meta.timekey) add_timekey(t) end From 829b3aad5dfe97cc6f1fc3638f39aeb2062663e6 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Fri, 16 Aug 2019 14:23:12 +0900 Subject: [PATCH 6/6] this module initialize will be never called Signed-off-by: Yuta Iwama --- lib/fluent/plugin/out_file.rb | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 7e3a9973d5..7240ae76f3 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -72,16 +72,14 @@ class FileOutput < Output attr_accessor :last_written_path # for tests module SymlinkBufferMixin - def initialize - super - @latest_metadata = Metadata.new(0, nil, nil) - end - def metadata(timekey: nil, tag: nil, variables: nil) metadata = super - if metadata.timekey && (metadata.timekey > @latest_metadata.timekey) + + @latest_metadata ||= new_metadata(timekey: 0) + if metadata.timekey && (metadata.timekey >= @latest_metadata.timekey) @latest_metadata = metadata end + metadata end