Skip to content

Commit

Permalink
Merge pull request #3484 from cosmo0920/buffer-metrics-statistics
Browse files Browse the repository at this point in the history
buffer: Use metrics plugin mechanism on a plugin base class
  • Loading branch information
cosmo0920 authored Aug 6, 2021
2 parents b28f7d7 + 206ca69 commit 50c773b
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 28 deletions.
106 changes: 84 additions & 22 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

require 'fluent/plugin/base'
require 'fluent/plugin/owned_by_mixin'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'
require 'fluent/unique_id'
require 'fluent/ext_monitor_require'

Expand All @@ -24,7 +26,9 @@ module Plugin
class Buffer < Base
include OwnedByMixin
include UniqueId::Mixin
include PluginId
include MonitorMixin
include PluginHelper::Mixin # for metrics

class BufferError < StandardError; end
class BufferOverflowError < BufferError; end
Expand All @@ -39,6 +43,8 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than

configured_in :buffer

helpers_internal :metrics

# TODO: system total buffer limit size in bytes by SystemConfig

config_param :chunk_limit_size, :size, default: DEFAULT_CHUNK_LIMIT_SIZE
Expand Down Expand Up @@ -153,8 +159,11 @@ def hash
end
end

# for metrics
attr_reader :stage_size_metrics, :stage_length_metrics, :queue_size_metrics, :queue_length_metrics
attr_reader :available_buffer_space_ratios_metrics, :total_queued_size_metrics
attr_reader :newest_timekey_metrics, :oldest_timekey_metrics
# for tests
attr_accessor :stage_size, :queue_size
attr_reader :stage, :queue, :dequeued, :queued_num

def initialize
Expand All @@ -171,12 +180,35 @@ def initialize
@queued_num = {} # metadata => int (number of queued chunks)
@dequeued_num = {} # metadata => int (number of dequeued chunks)

@stage_size = @queue_size = 0
@stage_length_metrics = nil
@stage_size_metrics = nil
@queue_length_metrics = nil
@queue_size_metrics = nil
@available_buffer_space_ratios_metrics = nil
@total_queued_size_metrics = nil
@newest_timekey_metrics = nil
@oldest_timekey_metrics = nil
@timekeys = Hash.new(0)
@enable_update_timekeys = false
@mutex = Mutex.new
end

def stage_size
@stage_size_metrics.get
end

def stage_size=(value)
@stage_size_metrics.set(value)
end

def queue_size
@queue_size_metrics.get
end

def queue_size=(value)
@queue_size_metrics.set(value)
end

def persistent?
false
end
Expand All @@ -187,6 +219,28 @@ def configure(conf)
unless @queue_limit_length.nil?
@total_limit_size = @chunk_limit_size * @queue_limit_length
end
@stage_length_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "stage_length",
help_text: 'Length of stage buffers', prefer_gauge: true)
@stage_length_metrics.set(0)
@stage_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "stage_byte_size",
help_text: 'Total size of stage buffers', prefer_gauge: true)
@stage_size_metrics.set(0) # Ensure zero.
@queue_length_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "queue_length",
help_text: 'Length of queue buffers', prefer_gauge: true)
@queue_length_metrics.set(0)
@queue_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "queue_byte_size",
help_text: 'Total size of queue buffers', prefer_gauge: true)
@queue_size_metrics.set(0) # Ensure zero.
@available_buffer_space_ratios_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "available_buffer_space_ratios",
help_text: 'Ratio of available space in buffer', prefer_gauge: true)
@available_buffer_space_ratios_metrics.set(100) # Default is 100%.
@total_queued_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "total_queued_size",
help_text: 'Total size of stage and queue buffers', prefer_gauge: true)
@total_queued_size_metrics.set(0)
@newest_timekey_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "newest_timekey",
help_text: 'Newest timekey in buffer', prefer_gauge: true)
@oldest_timekey_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "oldest_timekey",
help_text: 'Oldest timekey in buffer', prefer_gauge: true)
end

def enable_update_timekeys
Expand All @@ -198,15 +252,15 @@ def start

@stage, @queue = resume
@stage.each_pair do |metadata, chunk|
@stage_size += chunk.bytesize
@stage_size_metrics.add(chunk.bytesize)
end
@queue.each do |chunk|
@queued_num[chunk.metadata] ||= 0
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
@queue_size_metrics.add(chunk.bytesize)
end
update_timekeys
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size_metrics.get, queue_size: @queue_size_metrics.get
end

def close
Expand All @@ -228,17 +282,19 @@ def close
def terminate
super
@dequeued = @stage = @queue = @queued_num = nil
@stage_size = @queue_size = 0
@stage_length_metrics = @stage_size_metrics = @queue_length_metrics = @queue_size_metrics = nil
@available_buffer_space_ratios_metrics = @total_queued_size_metrics = nil
@newest_timekey_metrics = @oldest_timekey_metrics = nil
@timekeys.clear
end

def storable?
@total_limit_size > @stage_size + @queue_size
@total_limit_size > @stage_size_metrics.get + @queue_size_metrics.get
end

## TODO: for back pressure feature
# def used?(ratio)
# @total_limit_size * ratio > @stage_size + @queue_size
# @total_limit_size * ratio > @stage_size_metrics.get + @queue_size_metrics.get
# end

def resume
Expand Down Expand Up @@ -344,7 +400,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
#
staged_bytesizes_by_chunk.each do |chunk, bytesize|
chunk.synchronize do
synchronize { @stage_size += bytesize }
synchronize { @stage_size_metrics.add(bytesize) }
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
end
end
Expand All @@ -361,7 +417,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
u.metadata.seq = 0
synchronize {
@stage[m] = u.staged!
@stage_size += u.bytesize
@stage_size_metrics.add(u.bytesize)
}
end
end
Expand Down Expand Up @@ -428,8 +484,8 @@ def enqueue_chunk(metadata)
chunk.enqueued!
end
bytesize = chunk.bytesize
@stage_size -= bytesize
@queue_size += bytesize
@stage_size_metrics.sub(bytesize)
@queue_size_metrics.add(bytesize)
end
end
nil
Expand All @@ -446,7 +502,7 @@ def enqueue_unstaged_chunk(chunk)
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
end
@queue_size += chunk.bytesize
@queue_size_metrics.add(chunk.bytesize)
end
end

Expand Down Expand Up @@ -531,7 +587,7 @@ def purge_chunk(chunk_id)
begin
bytesize = chunk.bytesize
chunk.purge
@queue_size -= bytesize
@queue_size_metrics.sub(bytesize)
rescue => e
log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e
log.error_backtrace
Expand Down Expand Up @@ -562,7 +618,7 @@ def clear_queue!
log.error_backtrace
end
end
@queue_size = 0
@queue_size_metrics.set(0)
end
end

Expand Down Expand Up @@ -765,23 +821,29 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
]

def statistics
stage_size, queue_size = @stage_size, @queue_size
stage_size, queue_size = @stage_size_metrics.get, @queue_size_metrics.get
buffer_space = 1.0 - ((stage_size + queue_size * 1.0) / @total_limit_size)
@stage_length_metrics.set(@stage.size)
@queue_length_metrics.set(@queue.size)
@available_buffer_space_ratios_metrics.set(buffer_space * 100)
@total_queued_size_metrics.set(stage_size + queue_size)
stats = {
'stage_length' => @stage.size,
'stage_length' => @stage_length_metrics.get,
'stage_byte_size' => stage_size,
'queue_length' => @queue.size,
'queue_length' => @queue_length_metrics.get,
'queue_byte_size' => queue_size,
'available_buffer_space_ratios' => (buffer_space * 100).round(1),
'total_queued_size' => stage_size + queue_size,
'available_buffer_space_ratios' => @available_buffer_space_ratios_metrics.get.round(1),
'total_queued_size' => @total_queued_size_metrics.get,
}

tkeys = timekeys
if (m = tkeys.min)
stats['oldest_timekey'] = m
@oldest_timekey_metrics.set(m)
stats['oldest_timekey'] = @oldest_timekey_metrics.get
end
if (m = tkeys.max)
stats['newest_timekey'] = m
@newest_timekey_metrics.set(m)
stats['newest_timekey'] = @newest_timekey_metrics.get
end

{ 'buffer' => stats }
Expand Down
6 changes: 5 additions & 1 deletion lib/fluent/plugin_helper/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ def configure(conf)
@plugin_type_or_id = if self.plugin_id_configured?
self.plugin_id
else
"#{conf["@type"] || conf["type"]}.#{self.plugin_id}"
if type = (conf["@type"] || conf["type"])
"#{type}.#{self.plugin_id}"
else
"#{self.class.to_s.split("::").last.downcase}.#{self.plugin_id}"
end
end
end

Expand Down
10 changes: 8 additions & 2 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,14 @@ def create_chunk_es(metadata, es)
assert_nil @p.queue
assert_nil @p.dequeued
assert_nil @p.queued_num
assert_equal 0, @p.stage_size
assert_equal 0, @p.queue_size
assert_nil @p.stage_length_metrics
assert_nil @p.stage_size_metrics
assert_nil @p.queue_length_metrics
assert_nil @p.queue_size_metrics
assert_nil @p.available_buffer_space_ratios_metrics
assert_nil @p.total_queued_size_metrics
assert_nil @p.newest_timekey_metrics
assert_nil @p.oldest_timekey_metrics
assert_equal [], @p.timekeys
end

Expand Down
15 changes: 12 additions & 3 deletions test/plugin_helper/test_metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,35 @@ def configure(conf)

test 'creates metrics instances' do
d = Dummy.new
d.configure(config_element())
i = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics1", help_text: "metrics testing")
d.configure(config_element())
assert do
d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object")
end
assert{ i.is_a?(Fluent::Plugin::LocalMetrics) }
assert_true i.has_methods_for_counter
assert_false i.has_methods_for_gauge

d = Dummy.new
d.configure(config_element())
i = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics2", help_text: "metrics testing", prefer_gauge: true)
d.configure(config_element())
assert do
d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object")
end
assert{ i.is_a?(Fluent::Plugin::LocalMetrics) }
assert_false i.has_methods_for_counter
assert_true i.has_methods_for_gauge
end

test 'calls lifecycle methods for all plugin instances via owner plugin' do
@d = d = Dummy.new
d.configure(config_element())
i1 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics1", help_text: "metrics testing")
i2 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics2", help_text: "metrics testing", prefer_gauge: true)
i3 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics3", help_text: "metrics testing")
d.configure(config_element())
assert do
d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object")
end
d.start

assert i1.started?
Expand Down

0 comments on commit 50c773b

Please sign in to comment.