Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input: filter: output: bare_output: multi_output: Use metrics plugin mechanism on plugin base classes #3479

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def configure(conf)
# original implementation of v0.12 BufferedOutput
def emit(tag, es, chain, key="")
# this method will not be used except for the case that plugin calls super
@emit_count += 1
@emit_count_metrics.inc
data = format_stream(tag, es)
if @buffer.emit(key, data, chain)
submit_flush
Expand All @@ -337,22 +337,23 @@ def format_stream(tag, es)
# because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it
def handle_stream_simple(tag, es, enqueue: false)
if @overrides_emit
current_emit_count = @emit_count
current_emit_count = @emit_count_metrics.get
size = es.size
key = data = nil
begin
emit(tag, es, NULL_OUTPUT_CHAIN)
key, data = self.last_emit_via_buffer
ensure
@emit_count = current_emit_count
@emit_count_metrics.set(current_emit_count)
self.last_emit_via_buffer = nil
end
# on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically
meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil))
write_guard do
@buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
return [meta]
end

Expand All @@ -363,7 +364,8 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
return [meta]
end

Expand All @@ -373,7 +375,8 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, enqueue: enqueue)
end
@counter_mutex.synchronize{ @emit_records += size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
[meta]
end

Expand Down
57 changes: 49 additions & 8 deletions lib/fluent/plugin/bare_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,78 @@
module Fluent
module Plugin
class BareOutput < Base
include PluginHelper::Mixin # for metrics

# DO NOT USE THIS plugin for normal output plugin. Use Output instead.
# This output plugin base class is only for meta-output plugins
# which cannot be implemented on MultiOutput.
# E.g,: forest, config-expander

helpers_internal :metrics

include PluginId
include PluginLoggerMixin
include PluginHelper::Mixin

attr_reader :num_errors, :emit_count, :emit_records

def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def num_errors
@num_errors_metrics.get
end

def emit_count
@emit_count_metrics.get
end

def emit_size
@emit_size_metrics.get
end

def emit_records
@emit_records_metrics.get
end

def initialize
super
@counter_mutex = Mutex.new
# TODO: well organized counters
@num_errors = 0
@emit_count = 0
@emit_records = 0
@num_errors_metrics = nil
@emit_count_metrics = nil
@emit_records_metrics = nil
@emit_size_metrics = nil
end

def configure(conf)
super

@num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "num_errors", help_text: "Number of count num errors")
@emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of count emits")
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_records", help_text: "Number of emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "bare_output", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics
end

def statistics
stats = {
'num_errors' => @num_errors_metrics.get,
'emit_records' => @emit_records_metrics.get,
'emit_count' => @emit_count_metrics.get,
'emit_size' => @emit_size_metrics.get,
}

{ 'bare_output' => stats }
end

def emit_sync(tag, es)
@counter_mutex.synchronize{ @emit_count += 1 }
@emit_count_metrics.inc
begin
process(tag, es)
@counter_mutex.synchronize{ @emit_records += es.size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@counter_mutex.synchronize{ @num_errors += 1 }
@num_errors_metrics.inc
raise
end
end
Expand Down
26 changes: 17 additions & 9 deletions lib/fluent/plugin/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,47 @@ class Filter < Base
include PluginLoggerMixin
include PluginHelper::Mixin

helpers_internal :event_emitter
helpers_internal :event_emitter, :metrics

attr_reader :has_filter_with_time

def initialize
super
@has_filter_with_time = has_filter_with_time?
@emit_records = 0
@emit_size = 0
@emit_records_metrics = nil
@emit_size_metrics = nil
@counter_mutex = Mutex.new
@enable_size_metrics = false
end

def emit_records
@emit_records_metrics.get
end

def emit_size
@emit_size_metrics.get
end

def configure(conf)
super

@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics
end

def statistics
stats = {
'emit_records' => @emit_records,
'emit_size' => @emit_size,
'emit_records' => @emit_records_metrics.get,
'emit_size' => @emit_size_metrics.get,
}

{ 'filter' => stats }
end

def measure_metrics(es)
@counter_mutex.synchronize do
@emit_records += es.size
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end

def filter(tag, time, record)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def start
'buffer_queue_length' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && [email protected]? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.queue.size },
'buffer_timekeys' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && [email protected]? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.timekeys },
'buffer_total_queued_size' => ->(){ throw(:skip) unless instance_variable_defined?(:@buffer) && [email protected]? && @buffer.is_a?(::Fluent::Plugin::Buffer); @buffer.stage_size + @buffer.queue_size },
'retry_count' => ->(){ instance_variable_defined?(:@num_errors) ? @num_errors : nil },
'retry_count' => ->(){ respond_to?(:num_errors) ? num_errors : nil },
}

def all_plugins
Expand Down
26 changes: 17 additions & 9 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,44 @@ class Input < Base
include PluginLoggerMixin
include PluginHelper::Mixin

helpers_internal :event_emitter
helpers_internal :event_emitter, :metrics

def initialize
super
@emit_records = 0
@emit_size = 0
@emit_records_metrics = nil
@emit_size_metrics = nil
@counter_mutex = Mutex.new
@enable_size_metrics = false
end

def emit_records
@emit_records_metrics.get
end

def emit_size
@emit_size_metrics.get
end

def configure(conf)
super

@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "emit_records", help_text: "Number of count emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "input", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics
end

def statistics
stats = {
'emit_records' => @emit_records,
'emit_size' => @emit_size,
'emit_records' => @emit_records_metrics.get,
'emit_size' => @emit_size_metrics.get,
}

{ 'input' => stats }
end

def metric_callback(es)
@counter_mutex.synchronize do
@emit_records += es.size
@emit_size += es.to_msgpack_stream.bytesize if @enable_size_metrics
end
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end

def multi_workers_ready?
Expand Down
49 changes: 43 additions & 6 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MultiOutput < Base
include PluginHelper::Mixin # for event_emitter

helpers :event_emitter # to get router from agent, which will be supplied to child plugins
helpers_internal :metrics

config_section :store, param_name: :stores, multi: true, required: true do
config_argument :arg, :string, default: ''
Expand All @@ -46,11 +47,40 @@ def initialize

@counter_mutex = Mutex.new
# TODO: well organized counters
@num_errors = 0
@emit_count = 0
@emit_records = 0
@num_errors_metrics = nil
@emit_count_metrics = nil
@emit_records_metrics = nil
@emit_size_metrics = nil
# @write_count = 0
# @rollback_count = 0
@enable_size_metrics = false
end

def num_errors
@num_errors_metrics.get
end

def emit_count
@emit_count_metrics.get
end

def emit_size
@emit_size_metrics.get
end

def emit_records
@emit_records_metrics.get
end

def statistics
stats = {
'num_errors' => @num_errors_metrics.get,
'emit_records' => @emit_records_metrics.get,
'emit_count' => @emit_count_metrics.get,
'emit_size' => @emit_size_metrics.get,
}

{ 'multi_output' => stats }
end

def multi_output?
Expand All @@ -60,6 +90,12 @@ def multi_output?
def configure(conf)
super

@num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "num_errors", help_text: "Number of count num errors")
@emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of count emits")
@emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_records", help_text: "Number of emit records")
@emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "multi_output", name: "emit_size", help_text: "Total size of emit events")
@enable_size_metrics = !!system_config.enable_size_metrics

@stores.each do |store|
store_conf = store.corresponding_config_element
type = store_conf['@type']
Expand Down Expand Up @@ -143,12 +179,13 @@ def terminate
end

def emit_sync(tag, es)
@counter_mutex.synchronize{ @emit_count += 1 }
@emit_count_metrics.inc
begin
process(tag, es)
@counter_mutex.synchronize{ @emit_records += es.size }
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
rescue
@counter_mutex.synchronize{ @num_errors += 1 }
@num_errors_metrics.inc
raise
end
end
Expand Down
Loading