diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 20e7d4cc71..f10105b7ea 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -310,7 +310,7 @@ def configure(conf) # original implementation of v0.12 BufferedOutput def emit(tag, es, chain, key="") # this method will not be used except for the case that plugin calls super - @emit_count += 1 + @emit_count_metrics.inc data = format_stream(tag, es) if @buffer.emit(key, data, chain) submit_flush @@ -337,14 +337,14 @@ def format_stream(tag, es) # because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it def handle_stream_simple(tag, es, enqueue: false) if @overrides_emit - current_emit_count = @emit_count + current_emit_count = @emit_count_metrics.get size = es.size key = data = nil begin emit(tag, es, NULL_OUTPUT_CHAIN) key, data = self.last_emit_via_buffer ensure - @emit_count = current_emit_count + @emit_count_metrics.set(current_emit_count) self.last_emit_via_buffer = nil end # on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically @@ -352,7 +352,8 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - @counter_mutex.synchronize{ @emit_records += size } + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics return [meta] end @@ -363,7 +364,8 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end - @counter_mutex.synchronize{ @emit_records += size } + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics return [meta] end @@ -373,7 +375,8 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, enqueue: enqueue) end - @counter_mutex.synchronize{ @emit_records += size } + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics [meta] end diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index 56b1034e61..57de0c9741 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -23,37 +23,78 @@ module Fluent module Plugin class BareOutput < Base + include PluginHelper::Mixin # for metrics + # DO NOT USE THIS plugin for normal output plugin. Use Output instead. # This output plugin base class is only for meta-output plugins # which cannot be implemented on MultiOutput. # E.g,: forest, config-expander + helpers_internal :metrics + include PluginId include PluginLoggerMixin include PluginHelper::Mixin - attr_reader :num_errors, :emit_count, :emit_records - def process(tag, es) raise NotImplementedError, "BUG: output plugins MUST implement this method" end + def num_errors + @num_errors_metrics.get + end + + def emit_count + @emit_count_metrics.get + end + + def emit_size + @emit_size_metrics.get + end + + def emit_records + @emit_records_metrics.get + end + def initialize super @counter_mutex = Mutex.new # TODO: well organized counters - @num_errors = 0 - @emit_count = 0 - @emit_records = 0 + @num_errors_metrics = nil + @emit_count_metrics = nil + @emit_records_metrics = nil + @emit_size_metrics = nil + end + + def configure(conf) + super + + @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_size", help_text: "Total size of emit events") + @enable_size_metrics = !!system_config.enable_size_metrics + end + + def statistics + stats = { + 'num_errors' => @num_errors_metrics.get, + 'emit_records' => @emit_records_metrics.get, + 'emit_count' => @emit_count_metrics.get, + 'emit_size' => @emit_size_metrics.get, + } + + { 'bare_output' => stats } end def emit_sync(tag, es) - @counter_mutex.synchronize{ @emit_count += 1 } + @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize{ @emit_records += es.size } + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc raise end end diff --git a/lib/fluent/plugin/filter.rb b/lib/fluent/plugin/filter.rb index 71310762e4..c2956a98b3 100644 --- a/lib/fluent/plugin/filter.rb +++ b/lib/fluent/plugin/filter.rb @@ -28,39 +28,47 @@ class Filter < Base include PluginLoggerMixin include PluginHelper::Mixin - helpers_internal :event_emitter + helpers_internal :event_emitter, :metrics attr_reader :has_filter_with_time def initialize super @has_filter_with_time = has_filter_with_time? - @emit_records = 0 - @emit_size = 0 + @emit_records_metrics = nil + @emit_size_metrics = nil @counter_mutex = Mutex.new @enable_size_metrics = false end + def emit_records + @emit_records_metrics.get + end + + def emit_size + @emit_size_metrics.get + end + def configure(conf) super + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics end def statistics stats = { - 'emit_records' => @emit_records, - 'emit_size' => @emit_size, + 'emit_records' => @emit_records_metrics.get, + 'emit_size' => @emit_size_metrics.get, } { 'filter' => stats } end def measure_metrics(es) - @counter_mutex.synchronize do - @emit_records += es.size - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end def filter(tag, time, record) diff --git a/lib/fluent/plugin/in_monitor_agent.rb b/lib/fluent/plugin/in_monitor_agent.rb index 7b5364957c..f41c1a0bce 100644 --- a/lib/fluent/plugin/in_monitor_agent.rb +++ b/lib/fluent/plugin/in_monitor_agent.rb @@ -238,7 +238,7 @@ def start 'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size }, 'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys }, 'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && !@buffer.nil? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size }, - 'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil }, + 'retry_count' => ->(){ respond_to?(:num_errors) ? num_errors : nil }, } def all_plugins diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index ef55c8ba0e..7a6909f7a9 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -27,36 +27,44 @@ class Input < Base include PluginLoggerMixin include PluginHelper::Mixin - helpers_internal :event_emitter + helpers_internal :event_emitter, :metrics def initialize super - @emit_records = 0 - @emit_size = 0 + @emit_records_metrics = nil + @emit_size_metrics = nil @counter_mutex = Mutex.new @enable_size_metrics = false end + def emit_records + @emit_records_metrics.get + end + + def emit_size + @emit_size_metrics.get + end + def configure(conf) super + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "emit_records", help_text: "Number of count emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics end def statistics stats = { - 'emit_records' => @emit_records, - 'emit_size' => @emit_size, + 'emit_records' => @emit_records_metrics.get, + 'emit_size' => @emit_size_metrics.get, } { 'input' => stats } end def metric_callback(es) - @counter_mutex.synchronize do - @emit_records += es.size - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end def multi_workers_ready? diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 966f89f7e1..ce900fa5cc 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -27,6 +27,7 @@ class MultiOutput < Base include PluginHelper::Mixin # for event_emitter helpers :event_emitter # to get router from agent, which will be supplied to child plugins + helpers_internal :metrics config_section :store, param_name: :stores, multi: true, required: true do config_argument :arg, :string, default: '' @@ -46,11 +47,40 @@ def initialize @counter_mutex = Mutex.new # TODO: well organized counters - @num_errors = 0 - @emit_count = 0 - @emit_records = 0 + @num_errors_metrics = nil + @emit_count_metrics = nil + @emit_records_metrics = nil + @emit_size_metrics = nil # @write_count = 0 # @rollback_count = 0 + @enable_size_metrics = false + end + + def num_errors + @num_errors_metrics.get + end + + def emit_count + @emit_count_metrics.get + end + + def emit_size + @emit_size_metrics.get + end + + def emit_records + @emit_records_metrics.get + end + + def statistics + stats = { + 'num_errors' => @num_errors_metrics.get, + 'emit_records' => @emit_records_metrics.get, + 'emit_count' => @emit_count_metrics.get, + 'emit_size' => @emit_size_metrics.get, + } + + { 'multi_output' => stats } end def multi_output? @@ -60,6 +90,12 @@ def multi_output? def configure(conf) super + @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events") + @enable_size_metrics = !!system_config.enable_size_metrics + @stores.each do |store| store_conf = store.corresponding_config_element type = store_conf['@type'] @@ -143,12 +179,13 @@ def terminate end def emit_sync(tag, es) - @counter_mutex.synchronize{ @emit_count += 1 } + @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize{ @emit_records += es.size } + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc raise end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index f89c97f610..23fd29ab7e 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -37,7 +37,7 @@ class Output < Base include PluginHelper::Mixin include UniqueId::Mixin - helpers_internal :thread, :retry_state + helpers_internal :thread, :retry_state, :metrics CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/ CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{([-_.@$a-zA-Z0-9]+)\}/ @@ -164,7 +164,6 @@ def expired? end attr_reader :as_secondary, :delayed_commit, :delayed_commit_timeout, :timekey_zone - attr_reader :num_errors, :emit_count, :emit_records, :write_count, :rollback_count # for tests attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_accessors, :chunk_key_time, :chunk_key_tag @@ -172,6 +171,30 @@ def expired? # output_enqueue_thread_waiting: for test of output.rb itself attr_accessor :retry_for_error_chunk # if true, error flush will be retried even if under_plugin_development is true + def num_errors + @num_errors_metrics.get + end + + def emit_count + @emit_count_metrics.get + end + + def emit_size + @emit_size_metrics.get + end + + def emit_records + @emit_records_metrics.get + end + + def write_count + @write_count_metrics.get + end + + def rollback_count + @rollback_count_metrics.get + end + def initialize super @counter_mutex = Mutex.new @@ -181,14 +204,14 @@ def initialize @primary_instance = nil # TODO: well organized counters - @num_errors = 0 - @emit_count = 0 - @emit_records = 0 - @emit_size = 0 - @write_count = 0 - @rollback_count = 0 - @flush_time_count = 0 - @slow_flush_count = 0 + @num_errors_metrics = nil + @emit_count_metrics = nil + @emit_records_metrics = nil + @emit_size_metrics = nil + @write_count_metrics = nil + @rollback_count_metrics = nil + @flush_time_count_metrics = nil + @slow_flush_count_metrics = nil @enable_size_metrics = false # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start @@ -248,6 +271,15 @@ def configure(conf) super + @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") + @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of count emits") + @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") + @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") + @write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") + @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") + @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") + @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") + if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) raise Fluent::ConfigError, " section is configured, but plugin '#{self.class}' doesn't support buffering" @@ -801,21 +833,19 @@ def emit_events(tag, es) end def emit_sync(tag, es) - @counter_mutex.synchronize{ @emit_count += 1 } + @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize do - @emit_records += es.size - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc raise end end def emit_buffered(tag, es) - @counter_mutex.synchronize{ @emit_count += 1 } + @emit_count_metrics.inc begin execute_chunking(tag, es, enqueue: (@flush_mode == :immediate)) if !@retry && @buffer.queued?(nil, optimistic: true) @@ -823,7 +853,7 @@ def emit_buffered(tag, es) end rescue # TODO: separate number of errors into emit errors and write/flush errors - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc raise end end @@ -973,10 +1003,8 @@ def handle_stream_with_custom_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end - @counter_mutex.synchronize do - @emit_records += records - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end @@ -993,10 +1021,8 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end - @counter_mutex.synchronize do - @emit_records += records - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end @@ -1021,10 +1047,8 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end - @counter_mutex.synchronize do - @emit_records += records - @emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics - end + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end @@ -1062,7 +1086,7 @@ def rollback_write(chunk_id, update_retry: true) # false if chunk was already flushed and couldn't be rollbacked unexpectedly # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) - @counter_mutex.synchronize{ @rollback_count += 1 } + @rollback_count_metrics.inc if update_retry primary = @as_secondary ? @primary_instance : self primary.update_retry_state(chunk_id, @as_secondary) @@ -1078,7 +1102,7 @@ def try_rollback_write while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) - @counter_mutex.synchronize{ @rollback_count += 1 } + @rollback_count_metrics.inc log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) @@ -1093,7 +1117,7 @@ def try_rollback_all until @dequeued_chunks.empty? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) - @counter_mutex.synchronize{ @rollback_count += 1 } + @rollback_count_metrics.inc log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id) primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) @@ -1136,7 +1160,7 @@ def try_flush if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) - @counter_mutex.synchronize{ @write_count += 1 } + @write_count_metrics.inc @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in of primary ( don't get ) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) @@ -1148,7 +1172,7 @@ def try_flush chunk_id = chunk.unique_id dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id - @counter_mutex.synchronize{ @write_count += 1 } + @write_count_metrics.inc log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) @@ -1204,7 +1228,7 @@ def try_flush end if @buffer.takeback_chunk(chunk.unique_id) - @counter_mutex.synchronize { @rollback_count += 1 } + @rollback_count_metrics.inc end update_retry_state(chunk.unique_id, using_secondary, e) @@ -1235,9 +1259,9 @@ def backup_chunk(chunk, using_secondary, delayed_commit) def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start elapsed_millsec = (elapsed_time * 1000).to_i - @counter_mutex.synchronize { @flush_time_count += elapsed_millsec } + @flush_time_count_metrics.add(elapsed_millsec) if elapsed_time > @slow_flush_log_threshold - @counter_mutex.synchronize { @slow_flush_count += 1 } + @slow_flush_count_metrics.inc log.warn "buffer flush took longer time than slow_flush_log_threshold:", elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id end @@ -1245,7 +1269,7 @@ def check_slow_flush(start) def update_retry_state(chunk_id, using_secondary, error = nil) @retry_mutex.synchronize do - @counter_mutex.synchronize{ @num_errors += 1 } + @num_errors_metrics.inc chunk_id_hex = dump_unique_id_hex(chunk_id) unless @retry @@ -1506,16 +1530,16 @@ def flush_thread_run(state) def statistics stats = { - 'emit_records' => @emit_records, - 'emit_size' => @emit_size, + 'emit_records' => @emit_records_metrics.get, + 'emit_size' => @emit_size_metrics.get, # Respect original name # https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284 - 'retry_count' => @num_errors, - 'emit_count' => @emit_count, - 'write_count' => @write_count, - 'rollback_count' => @rollback_count, - 'slow_flush_count' => @slow_flush_count, - 'flush_time_count' => @flush_time_count, + 'retry_count' => @num_errors_metrics.get, + 'emit_count' => @emit_count_metrics.get, + 'write_count' => @write_count_metrics.get, + 'rollback_count' => @rollback_count_metrics.get, + 'slow_flush_count' => @slow_flush_count_metrics.get, + 'flush_time_count' => @flush_time_count_metrics.get, } if @buffer && @buffer.respond_to?(:statistics) diff --git a/test/plugin/test_bare_output.rb b/test/plugin/test_bare_output.rb index 3dab2b66c8..a8d901d056 100644 --- a/test/plugin/test_bare_output.rb +++ b/test/plugin/test_bare_output.rb @@ -95,6 +95,19 @@ class FluentPluginBareOutputTest::DummyPlugin2 < Fluent::Plugin::BareOutput end end + test 'can use metrics plugins and fallback methods' do + @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + + %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics].each do |metric_name| + assert_true @p.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + + assert_equal 0, @p.num_errors + assert_equal 0, @p.emit_count + assert_equal 0, @p.emit_size + assert_equal 0, @p.emit_records + end + test 'can get input event stream to write' do @p.configure(config_element('ROOT')) @p.start diff --git a/test/plugin/test_filter.rb b/test/plugin/test_filter.rb index d2e6791a43..052d68b7a7 100644 --- a/test/plugin/test_filter.rb +++ b/test/plugin/test_filter.rb @@ -165,6 +165,17 @@ class FluentPluginFilterTest::DummyPlugin2 < Fluent::Plugin::Filter end end + test 'can use metrics plugins and fallback methods' do + @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + + %w[emit_size_metrics emit_records_metrics].each do |metric_name| + assert_true @p.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + + assert_equal 0, @p.emit_size + assert_equal 0, @p.emit_records + end + test 'are available with multi worker configuration in default' do assert @p.multi_workers_ready? end diff --git a/test/plugin/test_in_monitor_agent.rb b/test/plugin/test_in_monitor_agent.rb index 739cb66a6a..92344b6e15 100644 --- a/test/plugin/test_in_monitor_agent.rb +++ b/test/plugin/test_in_monitor_agent.rb @@ -673,7 +673,7 @@ def get(uri, header = {}) "plugin_id" => "null", "retry_count" => 0, "type" => "null", - "instance_variables" => {"id" => "null", "num_errors" => 0}, + "instance_variables" => {"id" => "null"}, "buffer_available_buffer_space_ratios" => Float, "buffer_queue_byte_size" => Integer, "buffer_stage_byte_size" => Integer, diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 4532557059..97b900ec6a 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2179,8 +2179,9 @@ def test_ENOENT_error_after_setup_watcher assert_nothing_raised do d.run(shutdown: false) {} end - d.instance_shutdown assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed with Errno::ENOENT. Drop tail watcher for now.\n") }) + ensure + d.instance_shutdown if d && d.instance end def test_EACCES_error_after_setup_watcher @@ -2203,10 +2204,10 @@ def test_EACCES_error_after_setup_watcher assert_nothing_raised do d.run(shutdown: false) {} end - d.instance_shutdown assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed with Errno::EACCES. Drop tail watcher for now.\n") }) end ensure + d.instance_shutdown if d && d.instance if File.exist?("#{TMP_DIR}/noaccess") FileUtils.chmod(0755, "#{TMP_DIR}/noaccess") FileUtils.rm_rf("#{TMP_DIR}/noaccess") @@ -2226,8 +2227,9 @@ def test_EACCES assert_nothing_raised do d.run(shutdown: false) {} end - d.instance_shutdown assert($log.out.logs.any?{|log| log.include?("expand_paths: stat() for #{path} failed with Errno::EACCES. Skip file.\n") }) + ensure + d.instance_shutdown if d && d.instance end def test_shutdown_timeout diff --git a/test/plugin/test_input.rb b/test/plugin/test_input.rb index 4bd415d818..971bdaa103 100644 --- a/test/plugin/test_input.rb +++ b/test/plugin/test_input.rb @@ -85,6 +85,17 @@ class FluentPluginInputTest::DummyPlugin2 < Fluent::Plugin::Input end end + test 'can use metrics plugins and fallback methods' do + @p.configure(config_element('ROOT', '', {'@log_level' => 'debug'})) + + %w[emit_size_metrics emit_records_metrics].each do |metric_name| + assert_true @p.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + + assert_equal 0, @p.emit_size + assert_equal 0, @p.emit_records + end + test 'are not available with multi workers configuration in default' do assert_false @p.multi_workers_ready? end diff --git a/test/plugin/test_multi_output.rb b/test/plugin/test_multi_output.rb index e07d6353b7..2351d1385f 100644 --- a/test/plugin/test_multi_output.rb +++ b/test/plugin/test_multi_output.rb @@ -146,8 +146,11 @@ def create_output(type=:multi) @i.configure(conf) assert_equal 4, @i.outputs.size + log_size_for_multi_output_itself = 4 + log_size_for_metrics_plugin_helper = 4 + expected_warn_log_size = log_size_for_multi_output_itself + log_size_for_metrics_plugin_helper logs = @i.log.out.logs - assert{ logs.select{|log| log.include?('[warn]') && log.include?("'type' is deprecated parameter name. use '@type' instead.") }.size == 4 } + assert{ logs.select{|log| log.include?('[warn]') && log.include?("'type' is deprecated parameter name. use '@type' instead.") }.size == expected_warn_log_size } end test '#emit_events calls #process always' do @@ -176,5 +179,26 @@ def create_output(type=:multi) assert_equal 2, @i.events.size end + + test 'can use metrics plugins and fallback methods' do + conf = config_element('ROOT', '', { '@type' => 'dummy_test_multi_output' }, + [ + config_element('store', '', { 'type' => 'dummy_test_multi_output_1' }), + config_element('store', '', { 'type' => 'dummy_test_multi_output_2' }), + config_element('store', '', { 'type' => 'dummy_test_multi_output_3' }), + config_element('store', '', { 'type' => 'dummy_test_multi_output_4' }), + ] + ) + @i.configure(conf) + + %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics].each do |metric_name| + assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + + assert_equal 0, @i.num_errors + assert_equal 0, @i.emit_count + assert_equal 0, @i.emit_size + assert_equal 0, @i.emit_records + end end end diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index 4ffba4a4f9..c25c7b1afb 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -223,6 +223,22 @@ def waiting(seconds) assert @i.terminated? end + test 'can use metrics plugins and fallback methods' do + @i.configure(config_element()) + + %w[num_errors_metrics emit_count_metrics emit_size_metrics emit_records_metrics + write_count_metrics rollback_count_metrics flush_time_count_metrics slow_flush_count_metrics].each do |metric_name| + assert_true @i.instance_variable_get(:"@#{metric_name}").is_a?(Fluent::Plugin::Metrics) + end + + assert_equal 0, @i.num_errors + assert_equal 0, @i.emit_count + assert_equal 0, @i.emit_size + assert_equal 0, @i.emit_records + assert_equal 0, @i.write_count + assert_equal 0, @i.rollback_count + end + data(:new_api => :chunk, :old_api => :metadata) test '#extract_placeholders does nothing if chunk key is not specified' do |api| diff --git a/test/test_plugin_classes.rb b/test/test_plugin_classes.rb index 12879881c8..8af95aa20c 100644 --- a/test/test_plugin_classes.rb +++ b/test/test_plugin_classes.rb @@ -5,11 +5,78 @@ require 'fluent/plugin/filter' module FluentTest + class FluentTestCounterMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('test_counter', self) + + attr_reader :data + + def initialize + super + @data = 0 + end + def get + @data + end + def inc + @data +=1 + end + def add(value) + @data += value + end + def set(value) + @data = value + end + def close + @data = 0 + super + end + end + + class FluentTestGaugeMetrics < Fluent::Plugin::Metrics + Fluent::Plugin.register_metrics('test_gauge', self) + + attr_reader :data + + def initialize + super + @data = 0 + end + def get + @data + end + def inc + @data += 1 + end + def dec + @data -=1 + end + def add(value) + @data += value + end + def sub(value) + @data -= value + end + def set(value) + @data = value + end + def close + @data = 0 + super + end + end + class FluentTestInput < ::Fluent::Plugin::Input ::Fluent::Plugin.register_input('test_in', self) attr_reader :started + def initialize + super + # stub metrics instances + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new + end + def start super @started = true @@ -28,6 +95,13 @@ class FluentTestGenInput < ::Fluent::Plugin::Input config_param :num, :integer, default: 10000 + def initialize + super + # stub metrics instances + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new + end + def start super @started = true @@ -49,6 +123,15 @@ class FluentTestOutput < ::Fluent::Plugin::Output def initialize super @events = Hash.new { |h, k| h[k] = [] } + # stub metrics instances + @num_errors_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_count_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new + @write_count_metrics = FluentTest::FluentTestCounterMetrics.new + @rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new + @flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new + @slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new end attr_reader :events @@ -168,6 +251,19 @@ def write(chunk) class FluentTestErrorOutput < ::Fluent::Plugin::Output ::Fluent::Plugin.register_output('test_out_error', self) + def initialize + super + # stub metrics instances + @num_errors_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_count_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new + @write_count_metrics = FluentTest::FluentTestCounterMetrics.new + @rollback_count_metrics = FluentTest::FluentTestCounterMetrics.new + @flush_time_count_metrics = FluentTest::FluentTestCounterMetrics.new + @slow_flush_count_metrics = FluentTest::FluentTestCounterMetrics.new + end + def format(tag, time, record) raise "emit error!" end @@ -184,6 +280,9 @@ def initialize(field = '__test__') super() @num = 0 @field = field + # stub metrics instances + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new end attr_reader :num @@ -213,6 +312,9 @@ def initialize(field = '__test__') super() @num = 0 @field = field + # stub metrics instances + @emit_records_metrics = FluentTest::FluentTestCounterMetrics.new + @emit_size_metrics = FluentTest::FluentTestCounterMetrics.new end attr_reader :num