Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: Use metrics plugin mechanism on a plugin base class #3484

Merged
merged 3 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 84 additions & 22 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

require 'fluent/plugin/base'
require 'fluent/plugin/owned_by_mixin'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'
require 'fluent/unique_id'
require 'fluent/ext_monitor_require'

Expand All @@ -24,7 +26,9 @@ module Plugin
class Buffer < Base
include OwnedByMixin
include UniqueId::Mixin
include PluginId
include MonitorMixin
include PluginHelper::Mixin # for metrics

class BufferError < StandardError; end
class BufferOverflowError < BufferError; end
Expand All @@ -39,6 +43,8 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than

configured_in :buffer

helpers_internal :metrics

# TODO: system total buffer limit size in bytes by SystemConfig

config_param :chunk_limit_size, :size, default: DEFAULT_CHUNK_LIMIT_SIZE
Expand Down Expand Up @@ -153,8 +159,11 @@ def hash
end
end

# for metrics
attr_reader :stage_size_metrics, :stage_length_metrics, :queue_size_metrics, :queue_length_metrics
attr_reader :available_buffer_space_ratios_metrics, :total_queued_size_metrics
attr_reader :newest_timekey_metrics, :oldest_timekey_metrics
# for tests
attr_accessor :stage_size, :queue_size
attr_reader :stage, :queue, :dequeued, :queued_num

def initialize
Expand All @@ -171,12 +180,35 @@ def initialize
@queued_num = {} # metadata => int (number of queued chunks)
@dequeued_num = {} # metadata => int (number of dequeued chunks)

@stage_size = @queue_size = 0
@stage_length_metrics = nil
@stage_size_metrics = nil
@queue_length_metrics = nil
@queue_size_metrics = nil
@available_buffer_space_ratios_metrics = nil
@total_queued_size_metrics = nil
@newest_timekey_metrics = nil
@oldest_timekey_metrics = nil
@timekeys = Hash.new(0)
@enable_update_timekeys = false
@mutex = Mutex.new
end

def stage_size
@stage_size_metrics.get
end

def stage_size=(value)
@stage_size_metrics.set(value)
end

def queue_size
@queue_size_metrics.get
end

def queue_size=(value)
@queue_size_metrics.set(value)
end

def persistent?
false
end
Expand All @@ -187,6 +219,28 @@ def configure(conf)
unless @queue_limit_length.nil?
@total_limit_size = @chunk_limit_size * @queue_limit_length
end
@stage_length_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "stage_length",
help_text: 'Length of stage buffers', prefer_gauge: true)
@stage_length_metrics.set(0)
@stage_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "stage_byte_size",
help_text: 'Total size of stage buffers', prefer_gauge: true)
@stage_size_metrics.set(0) # Ensure zero.
@queue_length_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "queue_length",
help_text: 'Length of queue buffers', prefer_gauge: true)
@queue_length_metrics.set(0)
@queue_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "queue_byte_size",
help_text: 'Total size of queue buffers', prefer_gauge: true)
@queue_size_metrics.set(0) # Ensure zero.
@available_buffer_space_ratios_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "available_buffer_space_ratios",
help_text: 'Ratio of available space in buffer', prefer_gauge: true)
@available_buffer_space_ratios_metrics.set(100) # Default is 100%.
@total_queued_size_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "total_queued_size",
help_text: 'Total size of stage and queue buffers', prefer_gauge: true)
@total_queued_size_metrics.set(0)
@newest_timekey_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "newest_timekey",
help_text: 'Newest timekey in buffer', prefer_gauge: true)
@oldest_timekey_metrics = metrics_create(namespace: "fluentd", subsystem: "buffer", name: "oldest_timekey",
help_text: 'Oldest timekey in buffer', prefer_gauge: true)
end

def enable_update_timekeys
Expand All @@ -198,15 +252,15 @@ def start

@stage, @queue = resume
@stage.each_pair do |metadata, chunk|
@stage_size += chunk.bytesize
@stage_size_metrics.add(chunk.bytesize)
end
@queue.each do |chunk|
@queued_num[chunk.metadata] ||= 0
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
@queue_size_metrics.add(chunk.bytesize)
end
update_timekeys
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size_metrics.get, queue_size: @queue_size_metrics.get
end

def close
Expand All @@ -228,17 +282,19 @@ def close
def terminate
super
@dequeued = @stage = @queue = @queued_num = nil
@stage_size = @queue_size = 0
@stage_length_metrics = @stage_size_metrics = @queue_length_metrics = @queue_size_metrics = nil
@available_buffer_space_ratios_metrics = @total_queued_size_metrics = nil
@newest_timekey_metrics = @oldest_timekey_metrics = nil
@timekeys.clear
end

def storable?
@total_limit_size > @stage_size + @queue_size
@total_limit_size > @stage_size_metrics.get + @queue_size_metrics.get
end

## TODO: for back pressure feature
# def used?(ratio)
# @total_limit_size * ratio > @stage_size + @queue_size
# @total_limit_size * ratio > @stage_size_metrics.get + @queue_size_metrics.get
# end

def resume
Expand Down Expand Up @@ -344,7 +400,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
#
staged_bytesizes_by_chunk.each do |chunk, bytesize|
chunk.synchronize do
synchronize { @stage_size += bytesize }
synchronize { @stage_size_metrics.add(bytesize) }
log.on_trace { log.trace { "chunk #{chunk.path} size_added: #{bytesize} new_size: #{chunk.bytesize}" } }
end
end
Expand All @@ -361,7 +417,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
u.metadata.seq = 0
synchronize {
@stage[m] = u.staged!
@stage_size += u.bytesize
@stage_size_metrics.add(u.bytesize)
}
end
end
Expand Down Expand Up @@ -428,8 +484,8 @@ def enqueue_chunk(metadata)
chunk.enqueued!
end
bytesize = chunk.bytesize
@stage_size -= bytesize
@queue_size += bytesize
@stage_size_metrics.sub(bytesize)
@queue_size_metrics.add(bytesize)
end
end
nil
Expand All @@ -446,7 +502,7 @@ def enqueue_unstaged_chunk(chunk)
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
end
@queue_size += chunk.bytesize
@queue_size_metrics.add(chunk.bytesize)
end
end

Expand Down Expand Up @@ -531,7 +587,7 @@ def purge_chunk(chunk_id)
begin
bytesize = chunk.bytesize
chunk.purge
@queue_size -= bytesize
@queue_size_metrics.sub(bytesize)
rescue => e
log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e
log.error_backtrace
Expand Down Expand Up @@ -562,7 +618,7 @@ def clear_queue!
log.error_backtrace
end
end
@queue_size = 0
@queue_size_metrics.set(0)
end
end

Expand Down Expand Up @@ -765,23 +821,29 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
]

def statistics
stage_size, queue_size = @stage_size, @queue_size
stage_size, queue_size = @stage_size_metrics.get, @queue_size_metrics.get
buffer_space = 1.0 - ((stage_size + queue_size * 1.0) / @total_limit_size)
@stage_length_metrics.set(@stage.size)
@queue_length_metrics.set(@queue.size)
@available_buffer_space_ratios_metrics.set(buffer_space * 100)
@total_queued_size_metrics.set(stage_size + queue_size)
stats = {
'stage_length' => @stage.size,
'stage_length' => @stage_length_metrics.get,
'stage_byte_size' => stage_size,
'queue_length' => @queue.size,
'queue_length' => @queue_length_metrics.get,
'queue_byte_size' => queue_size,
'available_buffer_space_ratios' => (buffer_space * 100).round(1),
'total_queued_size' => stage_size + queue_size,
'available_buffer_space_ratios' => @available_buffer_space_ratios_metrics.get.round(1),
'total_queued_size' => @total_queued_size_metrics.get,
}

tkeys = timekeys
if (m = tkeys.min)
stats['oldest_timekey'] = m
@oldest_timekey_metrics.set(m)
stats['oldest_timekey'] = @oldest_timekey_metrics.get
end
if (m = tkeys.max)
stats['newest_timekey'] = m
@newest_timekey_metrics.set(m)
stats['newest_timekey'] = @newest_timekey_metrics.get
end

{ 'buffer' => stats }
Expand Down
6 changes: 5 additions & 1 deletion lib/fluent/plugin_helper/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ def configure(conf)
@plugin_type_or_id = if self.plugin_id_configured?
self.plugin_id
else
"#{conf["@type"] || conf["type"]}.#{self.plugin_id}"
if type = (conf["@type"] || conf["type"])
"#{type}.#{self.plugin_id}"
else
"#{self.class.to_s.split("::").last.downcase}.#{self.plugin_id}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end
end
end

Expand Down
10 changes: 8 additions & 2 deletions test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,14 @@ def create_chunk_es(metadata, es)
assert_nil @p.queue
assert_nil @p.dequeued
assert_nil @p.queued_num
assert_equal 0, @p.stage_size
assert_equal 0, @p.queue_size
assert_nil @p.stage_length_metrics
assert_nil @p.stage_size_metrics
assert_nil @p.queue_length_metrics
assert_nil @p.queue_size_metrics
assert_nil @p.available_buffer_space_ratios_metrics
assert_nil @p.total_queued_size_metrics
assert_nil @p.newest_timekey_metrics
assert_nil @p.oldest_timekey_metrics
assert_equal [], @p.timekeys
end

Expand Down
15 changes: 12 additions & 3 deletions test/plugin_helper/test_metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,35 @@ def configure(conf)

test 'creates metrics instances' do
d = Dummy.new
d.configure(config_element())
i = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics1", help_text: "metrics testing")
d.configure(config_element())
assert do
d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object")
end
assert{ i.is_a?(Fluent::Plugin::LocalMetrics) }
assert_true i.has_methods_for_counter
assert_false i.has_methods_for_gauge

d = Dummy.new
d.configure(config_element())
i = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics2", help_text: "metrics testing", prefer_gauge: true)
d.configure(config_element())
assert do
d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object")
end
assert{ i.is_a?(Fluent::Plugin::LocalMetrics) }
assert_false i.has_methods_for_counter
assert_true i.has_methods_for_gauge
end

test 'calls lifecycle methods for all plugin instances via owner plugin' do
@d = d = Dummy.new
d.configure(config_element())
i1 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics1", help_text: "metrics testing")
i2 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics2", help_text: "metrics testing", prefer_gauge: true)
i3 = d.metrics_create(namespace: "fluentd_test", subsystem: "unit-test", name: "metrics3", help_text: "metrics testing")
d.configure(config_element())
assert do
d.instance_variable_get(:@plugin_type_or_id).include?("dummy.object")
end
d.start

assert i1.started?
Expand Down