diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index fbf3559436..2fdbe50606 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -155,7 +155,7 @@ def initialize @stage_size = @queue_size = 0 @timekeys = Hash.new(0) - @metadata_list = [] # keys of @stage + @mutex = Mutex.new end def persistent? @@ -175,16 +175,18 @@ def start @stage, @queue = resume @stage.each_pair do |metadata, chunk| - @metadata_list << metadata unless @metadata_list.include?(metadata) @stage_size += chunk.bytesize - add_timekey(metadata) + if chunk.metadata && chunk.metadata.timekey + add_timekey(metadata.timekey) + end end @queue.each do |chunk| - @metadata_list << chunk.metadata unless @metadata_list.include?(chunk.metadata) @queued_num[chunk.metadata] ||= 0 @queued_num[chunk.metadata] += 1 @queue_size += chunk.bytesize - add_timekey(chunk.metadata) + if chunk.metadata && chunk.metadata.timekey + add_timekey(chunk.metadata.timekey) + end end log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size end @@ -207,7 +209,7 @@ def close def terminate super - @dequeued = @stage = @queue = @queued_num = @metadata_list = nil + @dequeued = @stage = @queue = @queued_num = nil @stage_size = @queue_size = 0 @timekeys.clear end @@ -230,61 +232,17 @@ def generate_chunk(metadata) raise NotImplementedError, "Implement this method in child class" end - def metadata_list - synchronize do - @metadata_list.dup - end - end - - # it's too dangerous, and use it so carefully to remove metadata for tests - def metadata_list_clear! - synchronize do - @metadata_list.clear - end - end - def new_metadata(timekey: nil, tag: nil, variables: nil) Metadata.new(timekey, tag, variables) end - def add_metadata(metadata) - log.on_trace { log.trace "adding metadata", instance: self.object_id, metadata: metadata } - - synchronize do - if i = @metadata_list.index(metadata) - @metadata_list[i] - else - @metadata_list << metadata - add_timekey(metadata) - metadata - end - end - end - def metadata(timekey: nil, tag: nil, variables: nil) - meta = new_metadata(timekey: timekey, tag: tag, variables: variables) - add_metadata(meta) - end - - def add_timekey(metadata) - if t = metadata.timekey - @timekeys[t] += 1 + meta = Metadata.new(timekey, tag, variables) + if (t = meta.timekey) + add_timekey(t) end - nil + meta end - private :add_timekey - - def del_timekey(metadata) - if t = metadata.timekey - if @timekeys[t] <= 1 - @timekeys.delete(t) - else - @timekeys[t] -= 1 - end - end - nil - end - private :del_timekey def timekeys @timekeys.keys @@ -514,6 +472,7 @@ def takeback_chunk(chunk_id) end def purge_chunk(chunk_id) + metadata = nil synchronize do chunk = @dequeued.delete(chunk_id) return nil unless chunk # purged by other threads @@ -532,13 +491,16 @@ def purge_chunk(chunk_id) @dequeued_num[chunk.metadata] -= 1 if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) && @dequeued_num[metadata].zero? - @metadata_list.delete(metadata) @queued_num.delete(metadata) @dequeued_num.delete(metadata) - del_timekey(metadata) end log.trace "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata end + + if metadata && metadata.timekey + del_timekey(metadata.timekey) + end + nil end @@ -774,6 +736,26 @@ def statistics { 'buffer' => stats } end + + private + + def add_timekey(t) + @mutex.synchronize do + @timekeys[t] += 1 + end + nil + end + + def del_timekey(t) + @mutex.synchronize do + if @timekeys[t] <= 1 + @timekeys.delete(t) + else + @timekeys[t] -= 1 + end + end + nil + end end end end diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index bac6608afa..7240ae76f3 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -72,6 +72,17 @@ class FileOutput < Output attr_accessor :last_written_path # for tests module SymlinkBufferMixin + def metadata(timekey: nil, tag: nil, variables: nil) + metadata = super + + @latest_metadata ||= new_metadata(timekey: 0) + if metadata.timekey && (metadata.timekey >= @latest_metadata.timekey) + @latest_metadata = metadata + end + + metadata + end + def output_plugin_for_symlink=(output_plugin) @_output_plugin_for_symlink = output_plugin end @@ -86,8 +97,7 @@ def generate_chunk(metadata) # timekey will be appended into that file chunk. On the other side, resumed file chunks might NOT # have timekey, especially in the cases that resumed file chunks are generated by Fluentd v0.12. # These chunks will be enqueued immediately, and will be flushed soon. - latest_metadata = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last - if chunk.metadata == latest_metadata + if chunk.metadata == @latest_metadata sym_path = @_output_plugin_for_symlink.extract_placeholders(@_symlink_path, chunk) FileUtils.mkdir_p(File.dirname(sym_path), mode: @_output_plugin_for_symlink.dir_perm) FileUtils.ln_sf(chunk.path, sym_path) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 823e91081f..16f07ac8bb 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -857,15 +857,8 @@ def calculate_timekey(time) def chunk_for_test(tag, time, record) require 'fluent/plugin/buffer/memory_chunk' - m = metadata_for_test(tag, time, record) - Fluent::Plugin::Buffer::MemoryChunk.new(m) - end - - def metadata_for_test(tag, time, record) - raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty? m = metadata(tag, time, record) - @buffer.metadata_list_clear! - m + Fluent::Plugin::Buffer::MemoryChunk.new(m) end def execute_chunking(tag, es, enqueue: false) diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index 51c241238d..3f443a118e 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -185,7 +185,6 @@ def create_chunk_es(metadata, es) assert_equal([], plugin.queue) assert_equal({}, plugin.dequeued) assert_equal({}, plugin.queued_num) - assert_equal([], plugin.metadata_list) assert_equal 0, plugin.stage_size assert_equal 0, plugin.queue_size @@ -207,7 +206,6 @@ def create_chunk_es(metadata, es) assert_equal 203, @p.queue_size # staged, queued - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list assert_equal 1, @p.queued_num[@dm0] assert_equal 2, @p.queued_num[@dm1] end @@ -240,46 +238,11 @@ def create_chunk_es(metadata, es) assert_nil @p.queue assert_nil @p.dequeued assert_nil @p.queued_num - assert_nil @p.instance_eval{ @metadata_list } # #metadata_list does #dup for @metadata_list assert_equal 0, @p.stage_size assert_equal 0, @p.queue_size assert_equal [], @p.timekeys end - test '#metadata_list returns list of metadata on stage or in queue' do - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - end - - test '#new_metadata creates metadata instance without inserting metadata_list' do - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - _m = @p.new_metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - end - - test '#add_metadata adds unknown metadata into list, or return known metadata if already exists' do - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - - m = @p.new_metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) - _mx = @p.add_metadata(m) - assert_equal [@dm2,@dm3,@dm0,@dm1,m], @p.metadata_list - assert_equal m.object_id, m.object_id - - my = @p.add_metadata(@dm1) - assert_equal [@dm2,@dm3,@dm0,@dm1,m], @p.metadata_list - assert_equal @dm1, my - assert{ @dm1.object_id != my.object_id } # 'my' is an object created in #resume - end - - test '#metadata is utility method to create-add-and-return metadata' do - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list - - m1 = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) - assert_equal [@dm2,@dm3,@dm0,@dm1,m1], @p.metadata_list - m2 = @p.metadata(timekey: @dm3.timekey) - assert_equal [@dm2,@dm3,@dm0,@dm1,m1], @p.metadata_list - assert_equal @dm3, m2 - end - test '#queued_records returns total number of size in all chunks in queue' do assert_equal 3, @p.queue.size @@ -430,7 +393,6 @@ def create_chunk_es(metadata, es) test '#purge_chunk removes a chunk specified by argument id from dequeued chunks' do assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list m0 = @p.dequeue_chunk m1 = @p.dequeue_chunk @@ -447,13 +409,11 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list end - test '#purge_chunk removes an argument metadata from metadata_list if no chunks exist on stage or in queue' do + test '#purge_chunk removes an argument metadata if no chunks exist on stage or in queue' do assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list m0 = @p.dequeue_chunk @@ -467,13 +427,11 @@ def create_chunk_es(metadata, es) assert_equal [@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm1], @p.metadata_list end test '#takeback_chunk returns false if specified chunk_id is already purged' do assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm0,@dm1], @p.metadata_list m0 = @p.dequeue_chunk @@ -487,13 +445,11 @@ def create_chunk_es(metadata, es) assert_equal [@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm1], @p.metadata_list assert !@p.takeback_chunk(m0.unique_id) assert_equal [@dm1,@dm1], @p.queue.map(&:metadata) assert_equal({}, @p.dequeued) - assert_equal [@dm2,@dm3,@dm1], @p.metadata_list end test '#clear_queue! removes all chunks in queue, but leaves staged chunks' do @@ -575,7 +531,7 @@ def create_chunk_es(metadata, es) assert !@p.timekeys.include?(timekey) prev_stage_size = @p.stage_size - + m = @p.metadata(timekey: timekey) @p.write({m => ["x" * 256, "y" * 256, "z" * 256]}) @@ -695,7 +651,7 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal [@dm2,@dm3], @p.stage.keys - + timekey = Time.parse('2016-04-11 16:40:00 +0000').to_i assert !@p.timekeys.include?(timekey) @@ -718,7 +674,7 @@ def create_chunk_es(metadata, es) assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) assert_equal [@dm2,@dm3,m], @p.stage.keys assert_equal 1, @p.stage[m].append_count - + assert @p.timekeys.include?(timekey) end