diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 7d07307dab..02c702affd 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -192,6 +192,7 @@ def initialize @stage_size = @queue_size = 0 @timekeys = Hash.new(0) + @enable_update_timekeys = false @mutex = Mutex.new end @@ -207,6 +208,10 @@ def configure(conf) end end + def enable_update_timekeys + @enable_update_timekeys = true + end + def start super @@ -268,8 +273,9 @@ def new_metadata(timekey: nil, tag: nil, variables: nil) Metadata.new(timekey, tag, variables) end + # Keep this method for existing code def metadata(timekey: nil, tag: nil, variables: nil) - meta = Metadata.new(timekey, tag, variables) + Metadata.new(timekey, tag, variables) end def timekeys @@ -465,8 +471,10 @@ def enqueue_unstaged_chunk(chunk) def update_timekeys synchronize do - @timekeys = (@stage.values + @queue).each_with_object({}) do |chunk, keys| - if chunk && chunk.metadata && chunk.metadata.timekey + chunks = @stage.values + chunks.concat(@queue) + @timekeys = chunks.each_with_object({}) do |chunk, keys| + if chunk.metadata && chunk.metadata.timekey t = chunk.metadata.timekey keys[t] = keys.fetch(t, 0) + 1 end @@ -477,7 +485,7 @@ def update_timekeys # At flush_at_shutdown, all staged chunks should be enqueued for buffer flush. Set true to force_enqueue for it. def enqueue_all(force_enqueue = false) log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id } - update_timekeys + update_timekeys if @enable_update_timekeys if block_given? synchronize{ @stage.keys }.each do |metadata| @@ -787,11 +795,11 @@ def statistics 'total_queued_size' => stage_size + queue_size, } - if (m = timekeys.min) + tkeys = timekeys + if (m = tkeys.min) stats['oldest_timekey'] = m end - - if (m = timekeys.max) + if (m = tkeys.max) stats['newest_timekey'] = m end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index b5128bc8e2..8e9950fd5b 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -340,6 +340,7 @@ def configure(conf) buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, []) @buffer = Plugin.new_buffer(buffer_type, parent: self) @buffer.configure(buffer_conf) + @buffer.enable_update_timekeys if @chunk_key_time @flush_at_shutdown = @buffer_config.flush_at_shutdown if @flush_at_shutdown.nil?