From 67247a02b8e351fa5fd8e2bfeb751a35fc65112e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 3 Aug 2021 16:07:27 +0900 Subject: [PATCH 1/3] buffer: Implement metrics mechanism Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/buffer.rb | 106 ++++++++++++++++++++++++++++-------- test/plugin/test_buffer.rb | 10 +++- 2 files changed, 92 insertions(+), 24 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index b5630e753b..8177969edc 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -16,6 +16,8 @@ require 'fluent/plugin/base' require 'fluent/plugin/owned_by_mixin' +require 'fluent/plugin_id' +require 'fluent/plugin_helper' require 'fluent/unique_id' require 'fluent/ext_monitor_require' @@ -24,7 +26,9 @@ module Plugin class Buffer < Base include OwnedByMixin include UniqueId::Mixin + include PluginId include MonitorMixin + include PluginHelper::Mixin # for metrics class BufferError < StandardError; end class BufferOverflowError < BufferError; end @@ -39,6 +43,8 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than configured_in :buffer + helpers_internal :metrics + # TODO: system total buffer limit size in bytes by SystemConfig config_param :chunk_limit_size, :size, default: DEFAULT_CHUNK_LIMIT_SIZE @@ -153,8 +159,11 @@ def hash end end + # for metrics + attr_reader :stage_size_metrics, :stage_length_metrics, :queue_size_metrics, :queue_length_metrics + attr_reader :available_buffer_space_ratios_metrics, :total_queued_size_metrics + attr_reader :newest_timekey_metrics, :oldest_timekey_metrics # for tests - attr_accessor :stage_size, :queue_size attr_reader :stage, :queue, :dequeued, :queued_num def initialize @@ -171,12 +180,35 @@ def initialize @queued_num = {} # metadata => int (number of queued chunks) @dequeued_num = {} # metadata => int (number of dequeued chunks) - @stage_size = @queue_size = 0 + @stage_length_metrics = nil + @stage_size_metrics = nil + @queue_length_metrics = nil + @queue_size_metrics = nil + @available_buffer_space_ratios_metrics = nil + @total_queued_size_metrics = nil + @newest_timekey_metrics = nil + @oldest_timekey_metrics = nil @timekeys = Hash.new(0) @enable_update_timekeys = false @mutex = Mutex.new end + def stage_size + @stage_size_metrics.get + end + + def stage_size=(value) + @stage_size_metrics.set(value) + end + + def queue_size + @queue_size_metrics.get + end + + def queue_size=(value) + @queue_size_metrics.set(value) + end + def persistent? false end @@ -187,6 +219,28 @@ def configure(conf) unless @queue_limit_length.nil? @total_limit_size = @chunk_limit_size * @queue_limit_length end + @stage_length_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "stage_length", + help_text: 'Length of stage buffers', prefer_gauge: true) + @stage_length_metrics.set(0) + @stage_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "stage_byte_size", + help_text: 'Total size of stage buffers', prefer_gauge: true) + @stage_size_metrics.set(0) # Ensure zero. + @queue_length_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "queue_length", + help_text: 'Length of queue buffers', prefer_gauge: true) + @queue_length_metrics.set(0) + @queue_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "queue_byte_size", + help_text: 'Total size of queue buffers', prefer_gauge: true) + @queue_size_metrics.set(0) # Ensure zero. + @available_buffer_space_ratios_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "available_buffer_space_ratios", + help_text: 'Ratio of available space in buffer', prefer_gauge: true) + @available_buffer_space_ratios_metrics.set(100) # Default is 100%. + @total_queued_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "total_queued_size", + help_text: 'Total size of stage and queue buffers', prefer_gauge: true) + @total_queued_size_metrics.set(0) + @newest_timekey_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "newest_timekey", + help_text: 'Newest timekey in buffer', prefer_gauge: true) + @oldest_timekey_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "oldest_timekey", + help_text: 'Oldest timekey in buffer', prefer_gauge: true) end def enable_update_timekeys @@ -198,15 +252,15 @@ def start @stage, @queue = resume @stage.each_pair do |metadata, chunk| - @stage_size += chunk.bytesize + @stage_size_metrics.add(chunk.bytesize) end @queue.each do |chunk| @queued_num[chunk.metadata] ||= 0 @queued_num[chunk.metadata] += 1 - @queue_size += chunk.bytesize + @queue_size_metrics.add(chunk.bytesize) end update_timekeys - log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size + log.debug "buffer started", instance: self.object_id, stage_size: @stage_size_metrics.get, queue_size: @queue_size_metrics.get end def close @@ -228,17 +282,19 @@ def close def terminate super @dequeued = @stage = @queue = @queued_num = nil - @stage_size = @queue_size = 0 + @stage_length_metrics = @stage_size_metrics = @queue_length_metrics = @queue_size_metrics = nil + @available_buffer_space_ratios_metrics = @total_queued_size_metrics = nil + @newest_timekey_metrics = @oldest_timekey_metrics = nil @timekeys.clear end def storable? - @total_limit_size > @stage_size + @queue_size + @total_limit_size > @stage_size_metrics.get + @queue_size_metrics.get end ## TODO: for back pressure feature # def used?(ratio) - # @total_limit_size * ratio > @stage_size + @queue_size + # @total_limit_size * ratio > @stage_size_metrics.get + @queue_size_metrics.get # end def resume @@ -344,7 +400,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) # staged_bytesizes_by_chunk.each do |chunk, bytesize| chunk.synchronize do - synchronize { @stage_size += bytesize } + synchronize { @stage_size_metrics.add(bytesize) } log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } } end end @@ -361,7 +417,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) u.metadata.seq = 0 synchronize { @stage[m] = u.staged! - @stage_size += u.bytesize + @stage_size_metrics.add(u.bytesize) } end end @@ -428,8 +484,8 @@ def enqueue_chunk(metadata) chunk.enqueued! end bytesize = chunk.bytesize - @stage_size -= bytesize - @queue_size += bytesize + @stage_size_metrics.sub(bytesize) + @queue_size_metrics.add(bytesize) end end nil @@ -446,7 +502,7 @@ def enqueue_unstaged_chunk(chunk) @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 chunk.enqueued! end - @queue_size += chunk.bytesize + @queue_size_metrics.add(chunk.bytesize) end end @@ -531,7 +587,7 @@ def purge_chunk(chunk_id) begin bytesize = chunk.bytesize chunk.purge - @queue_size -= bytesize + @queue_size_metrics.sub(bytesize) rescue => e log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e log.error_backtrace @@ -562,7 +618,7 @@ def clear_queue! log.error_backtrace end end - @queue_size = 0 + @queue_size_metrics.set(0) end end @@ -765,23 +821,29 @@ def write_step_by_step(metadata, data, format, splits_count, &block) ] def statistics - stage_size, queue_size = @stage_size, @queue_size + stage_size, queue_size = @stage_size_metrics.get, @queue_size_metrics.get buffer_space = 1.0 - ((stage_size + queue_size * 1.0) / @total_limit_size) + @stage_length_metrics.set(@stage.size) + @queue_length_metrics.set(@queue.size) + @available_buffer_space_ratios_metrics.set(buffer_space * 100) + @total_queued_size_metrics.set(stage_size + queue_size) stats = { - 'stage_length' => @stage.size, + 'stage_length' => @stage_length_metrics.get, 'stage_byte_size' => stage_size, - 'queue_length' => @queue.size, + 'queue_length' => @queue_length_metrics.get, 'queue_byte_size' => queue_size, - 'available_buffer_space_ratios' => (buffer_space * 100).round(1), - 'total_queued_size' => stage_size + queue_size, + 'available_buffer_space_ratios' => @available_buffer_space_ratios_metrics.get.round(1), + 'total_queued_size' => @total_queued_size_metrics.get, } tkeys = timekeys if (m = tkeys.min) - stats['oldest_timekey'] = m + @oldest_timekey_metrics.set(m) + stats['oldest_timekey'] = @oldest_timekey_metrics.get end if (m = tkeys.max) - stats['newest_timekey'] = m + @newest_timekey_metrics.set(m) + stats['newest_timekey'] = @newest_timekey_metrics.get end { 'buffer' => stats } diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index ab3114f8d8..af9eab54f5 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -238,8 +238,14 @@ def create_chunk_es(metadata, es) assert_nil @p.queue assert_nil @p.dequeued assert_nil @p.queued_num - assert_equal 0, @p.stage_size - assert_equal 0, @p.queue_size + assert_nil @p.stage_length_metrics + assert_nil @p.stage_size_metrics + assert_nil @p.queue_length_metrics + assert_nil @p.queue_size_metrics + assert_nil @p.available_buffer_space_ratios_metrics + assert_nil @p.total_queued_size_metrics + assert_nil @p.newest_timekey_metrics + assert_nil @p.oldest_timekey_metrics assert_equal [], @p.timekeys end From 05bc1b50d3c75396f89d840c37bb7e4c9ab03a3b Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 3 Aug 2021 18:49:05 +0900 Subject: [PATCH 2/3] plugin_helper: metrics: Inject last downcased class name if type is not provided Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin_helper/metrics.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/metrics.rb b/lib/fluent/plugin_helper/metrics.rb index f3fdb8afe4..7489732918 100644 --- a/lib/fluent/plugin_helper/metrics.rb +++ b/lib/fluent/plugin_helper/metrics.rb @@ -42,7 +42,11 @@ def configure(conf) @plugin_type_or_id = if self.plugin_id_configured? self.plugin_id else - "#{conf["@type"] || conf["type"]}.#{self.plugin_id}" + if type = (conf["@type"] || conf["type"]) + "#{type}.#{self.plugin_id}" + else + "#{self.class.to_s.split("::").last.downcase}.#{self.plugin_id}" + end end end From 206ca6992190e2a7557915e2268dc349d3908ab9 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 3 Aug 2021 19:23:39 +0900 Subject: [PATCH 3/3] plugin_helper: metrics: Add assertions for plugin_type_or_id instance variable Signed-off-by: Hiroshi Hatake --- test/plugin_helper/test_metrics.rb | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/test/plugin_helper/test_metrics.rb b/test/plugin_helper/test_metrics.rb index 77ee50c24b..586e4846a3 100644 --- a/test/plugin_helper/test_metrics.rb +++ b/test/plugin_helper/test_metrics.rb @@ -39,15 +39,21 @@ def configure(conf) test 'creates metrics instances' do d = Dummy.new - d.configure(config_element()) i = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics1", help_text: "metrics testing") + d.configure(config_element()) + assert do + d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object") + end assert{ i.is_a?(Fluent::Plugin::LocalMetrics) } assert_true i.has_methods_for_counter assert_false i.has_methods_for_gauge d = Dummy.new - d.configure(config_element()) i = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics2", help_text: "metrics testing", prefer_gauge: true) + d.configure(config_element()) + assert do + d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object") + end assert{ i.is_a?(Fluent::Plugin::LocalMetrics) } assert_false i.has_methods_for_counter assert_true i.has_methods_for_gauge @@ -55,10 +61,13 @@ def configure(conf) test 'calls lifecycle methods for all plugin instances via owner plugin' do @d = d = Dummy.new - d.configure(config_element()) i1 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics1", help_text: "metrics testing") i2 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics2", help_text: "metrics testing", prefer_gauge: true) i3 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics3", help_text: "metrics testing") + d.configure(config_element()) + assert do + d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object") + end d.start assert i1.started?