From f601453903e1fa64d3fca5d77eb894c55ac2efa7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 18 Apr 2016 15:26:34 +0900 Subject: [PATCH] fix plugin base API to add "owned" plugins by other (input/output/filter) plugins * Buffer/Storage/Parser/Formatter are candidates for this feature * Input/Output/Filter can have these own default values for owned plugins * RetryState plugin helper is added at same time to fix helpers API --- lib/fluent/config/configure_proxy.rb | 35 ++- lib/fluent/configurable.rb | 13 + lib/fluent/plugin.rb | 39 ++- lib/fluent/plugin/base.rb | 55 ++-- lib/fluent/plugin/owned_by_mixin.rb | 38 +++ lib/fluent/plugin/storage.rb | 43 +-- lib/fluent/plugin_helper.rb | 2 + lib/fluent/plugin_helper/retry_state.rb | 172 ++++++++++ lib/fluent/plugin_helper/storage.rb | 3 +- test/config/test_configurable.rb | 41 +++ test/config/test_configure_proxy.rb | 51 +++ test/helper.rb | 25 ++ test/plugin/test_owned_by.rb | 35 +++ test/plugin/test_storage.rb | 2 +- test/plugin_helper/test_retry_state.rb | 398 ++++++++++++++++++++++++ test/plugin_helper/test_storage.rb | 2 +- 16 files changed, 874 insertions(+), 80 deletions(-) create mode 100644 lib/fluent/plugin/owned_by_mixin.rb create mode 100644 lib/fluent/plugin_helper/retry_state.rb create mode 100644 test/plugin/test_owned_by.rb create mode 100644 test/plugin_helper/test_retry_state.rb diff --git a/lib/fluent/config/configure_proxy.rb b/lib/fluent/config/configure_proxy.rb index 3e23c3e8c0..a54fae4b05 100644 --- a/lib/fluent/config/configure_proxy.rb +++ b/lib/fluent/config/configure_proxy.rb @@ -19,7 +19,8 @@ module Fluent module Config class ConfigureProxy - attr_accessor :name, :final, :param_name, :init, :required, :multi, :alias, :argument, :params, :defaults, :descriptions, :sections + attr_accessor :name, :final, :param_name, :init, :required, :multi, :alias, :configured_in_section + attr_accessor :argument, :params, :defaults, :descriptions, :sections # config_param :desc, :string, :default => '....' # config_set_default :buffer_type, :memory # @@ -50,6 +51,11 @@ def initialize(name, opts = {}) raise "init and required are exclusive" if @init && @required + # specify section name for viewpoint of owner(parent) plugin + # for buffer plugins: all params are in section of owner + # others: , (formatter/parser), ... + @configured_in_section = nil + @argument = nil # nil: ignore argument @params = {} @defaults = {} @@ -89,6 +95,9 @@ def merge(other) # self is base class, other is subclass if overwrite?(other, :alias) raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: alias" end + if overwrite?(other, :configured_in_section) + raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: configured_in" + end options = {} # param_name is used not to ovewrite plugin's instance @@ -103,6 +112,9 @@ def merge(other) # self is base class, other is subclass merged = self.class.new(other.name, options) + # configured_in MUST be kept + merged.configured_in_section = self.configured_in_section + merged.argument = other.argument || self.argument merged.params = other.params.merge(self.params) merged.defaults = self.defaults.merge(other.defaults) @@ -144,6 +156,9 @@ def merge_for_finalized(other) if overwrite?(other, :alias) raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: alias" end + if overwrite?(other, :configured_in_section) + raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: configured_in" + end options = {} options[:param_name] = other.param_name @@ -155,6 +170,8 @@ def merge_for_finalized(other) merged = self.class.new(other.name, options) + merged.configured_in_section = self.configured_in_section + merged.argument = self.argument || other.argument merged.params = other.params.merge(self.params) merged.defaults = other.defaults.merge(self.defaults) @@ -175,6 +192,15 @@ def merge_for_finalized(other) merged end + def overwrite_defaults(other) # other is owner plugin's corresponding proxy + self.defaults = self.defaults.merge(other.defaults) + self.sections.keys.each do |section_key| + if other.sections.has_key?(section_key) + self.sections[section_key].overwrite_defaults(other.sections[section_key]) + end + end + end + def parameter_configuration(name, *args, &block) name = name.to_sym @@ -213,6 +239,13 @@ def parameter_configuration(name, *args, &block) [name, block, opts] end + def configured_in(section_name) + if @configured_in_section + raise ArgumentError, "#{self.name}: configured_in called twice" + end + @configured_in_section = section_name.to_sym + end + def config_argument(name, *args, &block) if @argument raise ArgumentError, "#{self.name}: config_argument called twice" diff --git a/lib/fluent/configurable.rb b/lib/fluent/configurable.rb index 9d7322b0ec..f762a795b6 100644 --- a/lib/fluent/configurable.rb +++ b/lib/fluent/configurable.rb @@ -27,6 +27,7 @@ def self.included(mod) end def initialize + super # to simulate implicit 'attr_accessor' by config_param / config_section and its value by config_set_default proxy = self.class.merged_configure_proxy proxy.params.keys.each do |name| @@ -51,6 +52,14 @@ def configure(conf) proxy = self.class.merged_configure_proxy conf.corresponding_proxies << proxy + if self.respond_to?(:owner) && self.owner + owner_proxy = owner.class.merged_configure_proxy + if proxy.configured_in_section + owner_proxy = owner_proxy.sections[proxy.configured_in_section] + end + proxy.overwrite_defaults(owner_proxy) if owner_proxy + end + # In the nested section, can't get plugin class through proxies so get plugin class here plugin_class = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s) root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class) @@ -97,6 +106,10 @@ def configure_proxy(mod_name) map[mod_name] end + def configured_in(section_name) + configure_proxy(self.name).configured_in(section_name) + end + def config_param(name, *args, &block) configure_proxy(self.name).config_param(name, *args, &block) attr_accessor name diff --git a/lib/fluent/plugin.rb b/lib/fluent/plugin.rb index e474174051..e970580446 100644 --- a/lib/fluent/plugin.rb +++ b/lib/fluent/plugin.rb @@ -25,9 +25,12 @@ module Plugin # plugins for fluentd plugins: fluent/plugin/type/NAME.rb # ex: storage, buffer chunk, ... + # first class plugins (instantiated by Engine) INPUT_REGISTRY = Registry.new(:input, 'fluent/plugin/in_') OUTPUT_REGISTRY = Registry.new(:output, 'fluent/plugin/out_') FILTER_REGISTRY = Registry.new(:filter, 'fluent/plugin/filter_') + + # feature plugin: second class plugins (instanciated by Plugins or Helpers) BUFFER_REGISTRY = Registry.new(:buffer, 'fluent/plugin/buf_') PARSER_REGISTRY = Registry.new(:parser, 'fluent/plugin/parser_') FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_') @@ -105,11 +108,11 @@ def self.new_filter(type) new_impl('filter', FILTER_REGISTRY, type) end - def self.new_buffer(type) - new_impl('buffer', BUFFER_REGISTRY, type) + def self.new_buffer(type, parent: nil) + new_impl('buffer', BUFFER_REGISTRY, type, parent) end - def self.new_parser(type) + def self.new_parser(type, parent: nil) require 'fluent/parser' if type[0] == '/' && type[-1] == '/' @@ -117,16 +120,16 @@ def self.new_parser(type) require 'fluent/parser' Fluent::TextParser.lookup(type) else - new_impl('parser', PARSER_REGISTRY, type) + new_impl('parser', PARSER_REGISTRY, type, parent) end end - def self.new_formatter(type) - new_impl('formatter', FORMATTER_REGISTRY, type) + def self.new_formatter(type, parent: nil) + new_impl('formatter', FORMATTER_REGISTRY, type, parent) end - def self.new_storage(type) - new_impl('storage', STORAGE_REGISTRY, type) + def self.new_storage(type, parent: nil) + new_impl('storage', STORAGE_REGISTRY, type, parent) end def self.register_impl(kind, registry, type, value) @@ -138,17 +141,21 @@ def self.register_impl(kind, registry, type, value) nil end - def self.new_impl(kind, registry, type) + def self.new_impl(kind, registry, type, parent=nil) # "'type' not found" is handled by registry obj = registry.lookup(type) - case - when obj.is_a?(Class) - obj.new - when obj.respond_to?(:call) && obj.arity == 0 - obj.call - else - raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)." + impl = case + when obj.is_a?(Class) + obj.new + when obj.respond_to?(:call) && obj.arity == 0 + obj.call + else + raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)." + end + if parent && impl.respond_to?("owner=") + impl.owner = parent end + impl end end end diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index e58770c7f8..1f715dc2cc 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -16,25 +16,19 @@ require 'fluent/plugin' require 'fluent/configurable' -require 'fluent/plugin_id' -require 'fluent/log' -require 'fluent/plugin_helper' require 'fluent/system_config' module Fluent module Plugin class Base include Configurable - include PluginId include SystemConfig::Mixin - include PluginLoggerMixin - include PluginHelper::Mixin - State = Struct.new(:configure, :start, :stop, :shutdown, :close, :terminate) + State = Struct.new(:configure, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate) def initialize super - @state = State.new(false, false, false, false, false, false) + @_state = State.new(false, false, false, false, false, false, false, false) end def has_router? @@ -43,59 +37,76 @@ def has_router? def configure(conf) super - @state.configure = true + @_state ||= State.new(false, false, false, false, false, false, false, false) + @_state.configure = true self end def start - @log.reset - @state.start = true + @_state.start = true self end def stop - @state.stop = true + @_state.stop = true + self + end + + def before_shutdown + @_state.before_shutdown = true self end def shutdown - @state.shutdown = true + @_state.shutdown = true + self + end + + def after_shutdown + @_state.after_shutdown = true self end def close - @state.close = true + @_state.close = true self end def terminate - @state.terminate = true - @log.reset + @_state.terminate = true self end def configured? - @state.configure + @_state.configure end def started? - @state.start + @_state.start end def stopped? - @state.stop + @_state.stop + end + + def before_shutdown? + @_state.before_shutdown end def shutdown? - @state.shutdown + @_state.shutdown + end + + def after_shutdown? + @_state.after_shutdown end def closed? - @state.close + @_state.close end def terminated? - @state.terminate + @_state.terminate end end end diff --git a/lib/fluent/plugin/owned_by_mixin.rb b/lib/fluent/plugin/owned_by_mixin.rb new file mode 100644 index 0000000000..aac9581f12 --- /dev/null +++ b/lib/fluent/plugin/owned_by_mixin.rb @@ -0,0 +1,38 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module Plugin + module OwnedByMixin + def owner=(plugin) + @_owner = plugin + + @_plugin_id = plugin.plugin_id + @_plugin_id_configured = plugin.plugin_id_configured? + + @log = plugin.log + end + + def owner + @_owner + end + + def log + @log + end + end + end +end diff --git a/lib/fluent/plugin/storage.rb b/lib/fluent/plugin/storage.rb index cead81ec58..962755593b 100644 --- a/lib/fluent/plugin/storage.rb +++ b/lib/fluent/plugin/storage.rb @@ -14,17 +14,18 @@ # limitations under the License. # -require 'fluent/plugin' -require 'fluent/configurable' +require 'fluent/plugin/base' +require 'fluent/plugin/owned_by_mixin' module Fluent module Plugin - class Storage - include Fluent::Configurable - include Fluent::SystemConfig::Mixin + class Storage < Base + include OwnedByMixin DEFAULT_TYPE = 'local' + configured_in :storage + config_param :persistent, :bool, default: false # load/save with all operations config_param :autosave, :bool, default: true config_param :autosave_interval, :time, default: 10 @@ -37,30 +38,6 @@ def self.validate_key(key) attr_accessor :log - def configure(conf) - super(conf) - - @_owner = nil - end - - def plugin_id(id, configured) - @_plugin_id = id - @_plugin_id_configured = configured - end - - def owner=(plugin) - @_owner = plugin - - @_plugin_id = plugin.plugin_id - @_plugin_id_configured = plugin.plugin_id_configured? - - @log = plugin.log - end - - def owner - @_owner - end - def persistent_always? false end @@ -102,14 +79,6 @@ def delete(key) def update(key, &block) # transactional get-and-update raise NotImplementedError, "Implement this method in child class" end - - # storage plugins has only 'close' and 'terminate' - # stop: used in helper to stop autosave - # shutdown: used in helper to call #save finally if needed - def close; end - def terminate - @_owner = nil - end end end end diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index 49b7e896e4..6913e91dc9 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -19,6 +19,8 @@ require 'fluent/plugin_helper/event_loop' require 'fluent/plugin_helper/timer' require 'fluent/plugin_helper/child_process' +require 'fluent/plugin_helper/storage' +require 'fluent/plugin_helper/retry_state' module Fluent module PluginHelper diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb new file mode 100644 index 0000000000..e5c090e60b --- /dev/null +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -0,0 +1,172 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module PluginHelper + module RetryState + def retry_state_create( + title, retry_type, wait, timeout, + forever: false, max_steps: nil, backoff_base: 2, max_interval: nil, randomize: true, randomize_width: 0.125, + secondary: false, secondary_threshold: 0.8 + ) + case retry_type + when :expbackoff + ExponentialBackOffRetry.new(title, wait, timeout, forever, max_steps, randomize, randomize_width, backoff_base, max_interval, secondary, secondary_threshold) + when :periodic + PeriodicRetry.new(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threshold) + else + raise "BUG: unknown retry_type specified: '#{retry_type}'" + end + end + + class RetryStateMachine + attr_reader :title, :start, :steps, :next_time, :timeout_at, :current, :secondary_transition_at, :secondary_transition_steps + + def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threshold) + @title = title + + @start = current_time + @steps = 0 + @next_time = nil # should be initialized for first retry by child class + + @timeout = timeout + @timeout_at = @start + timeout + @current = :primary + + if randomize_width < 0 || randomize_width > 0.5 + raise "BUG: randomize_width MUST be between 0 and 0.5" + end + + @randomize = randomize + @randomize_width = randomize_width + + if forever && secondary + raise "BUG: forever and secondary are exclusive to each other" + end + + @forever = forever + @max_steps = max_steps + + @secondary = secondary + @secondary_threshold = secondary_threshold + if @secondary + raise "BUG: secondary_transition_threshold MUST be between 0 and 1" if @secondary_threshold <= 0 || @secondary_threshold >= 1 + @secondary_transition_at = @start + timeout * @secondary_threshold + @secondary_transition_steps = nil + end + end + + def current_time + Time.now + end + + def randomize(interval) + return interval unless @randomize + + interval + (interval * @randomize_width * (2 * rand - 1.0)) + end + + def calc_next_time + if @forever || !@secondary # primary + naive = naive_next_time(@steps) + if @forever + naive + elsif naive >= @timeout_at + @timeout_at + else + naive + end + elsif @current == :primary && @secondary + naive = naive_next_time(@steps) + if naive >= @secondary_transition_at + @secondary_transition_at + else + naive + end + elsif @current == :secondary + naive = naive_next_time(@steps - @secondary_transition_steps + 1) + if naive >= @timeout_at + @timeout_at + else + naive + end + else + raise "BUG: it's out of design" + end + end + + def naive_next_time(retry_times) + raise NotImplementedError + end + + def secondary? + @secondary && (@current == :secondary || current_time >= @secondary_transition_at) + end + + def step + @steps += 1 + if @secondary && @current != :secondary && current_time >= @secondary_transition_at + @current = :secondary + @secondary_transition_steps = @steps + end + @next_time = calc_next_time + nil + end + + def limit? + if @forever + false + else + @next_time >= @timeout_at || !!(@max_steps && @steps >= @max_steps) + end + end + end + + class ExponentialBackOffRetry < RetryStateMachine + def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_width, backoff_base, max_interval, secondary, secondary_threathold) + super(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threathold) + @constant_factor = wait + @backoff_base = backoff_base + @max_interval = max_interval + + @next_time = @start + @constant_factor + end + + def naive_next_time(retry_next_times) + interval = @constant_factor * ( @backoff_base ** ( retry_next_times - 1 ) ) + intr = if @max_interval && interval > @max_interval + @max_interval + else + interval + end + current_time + randomize(intr) + end + end + + class PeriodicRetry < RetryStateMachine + def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threathold) + super(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threathold) + @retry_wait = wait + @next_time = @start + @retry_wait + end + + def naive_next_time(retry_next_times) + current_time + randomize(@retry_wait) + end + end + end + end +end diff --git a/lib/fluent/plugin_helper/storage.rb b/lib/fluent/plugin_helper/storage.rb index c7f5245e04..cc89b83916 100644 --- a/lib/fluent/plugin_helper/storage.rb +++ b/lib/fluent/plugin_helper/storage.rb @@ -39,8 +39,7 @@ def storage_create(usage: '', type: nil, conf: nil) unless type raise ArgumentError, "BUG: type not specified without configuration" end - storage = Plugin.new_storage(type) - storage.owner = self + storage = Plugin.new_storage(type, parent: self) config = case conf when Fluent::Config::Element conf diff --git a/test/config/test_configurable.rb b/test/config/test_configurable.rb index d1d7fe3411..3f26824bd0 100644 --- a/test/config/test_configurable.rb +++ b/test/config/test_configurable.rb @@ -286,6 +286,29 @@ class OverwriteAlias < FinalizedBase end end end + + module OverwriteDefaults + class Owner + include Fluent::Configurable + config_set_default :key1, "V1" + config_section :buffer do + config_set_default :size_of_something, 1024 + end + end + + class FlatChild + include Fluent::Configurable + attr_accessor :owner + config_param :key1, :string, default: "v1" + end + + class BufferChild + include Fluent::Configurable + attr_accessor :owner + configured_in :buffer + config_param :size_of_something, :size, default: 128 + end + end end module Fluent::Config @@ -930,6 +953,24 @@ class TestConfigurable < ::Test::Unit::TestCase end end + sub_test_case 'defaults can be overwritten by owner' do + test 'for feature plugin which has flat parameters with parent' do + owner = ConfigurableSpec::OverwriteDefaults::Owner.new + child = ConfigurableSpec::OverwriteDefaults::FlatChild.new + child.owner = owner + child.configure(config_element('ROOT', '', {}, [])) + assert_equal "V1", child.key1 + end + + test 'for feature plugin which has parameters in subsection of parent' do + owner = ConfigurableSpec::OverwriteDefaults::Owner.new + child = ConfigurableSpec::OverwriteDefaults::BufferChild.new + child.owner = owner + child.configure(config_element('ROOT', '', {}, [])) + assert_equal 1024, child.size_of_something + end + end + sub_test_case ':secret option' do setup do @conf = config_element('ROOT', '', diff --git a/test/config/test_configure_proxy.rb b/test/config/test_configure_proxy.rb index 8c6c4c5654..07aac42632 100644 --- a/test/config/test_configure_proxy.rb +++ b/test/config/test_configure_proxy.rb @@ -53,6 +53,7 @@ class TestConfigureProxy < ::Test::Unit::TestCase assert_false(proxy.required?) assert_nil(proxy.multi) assert_true(proxy.multi?) + assert_nil(proxy.configured_in_section) p2 = Fluent::Config::ConfigureProxy.new(:section, param_name: :sections, init: false, required: true, multi: false) proxy = p1.merge(p2) @@ -64,10 +65,12 @@ class TestConfigureProxy < ::Test::Unit::TestCase assert_true(proxy.required?) assert_false(proxy.multi) assert_false(proxy.multi?) + assert_nil(proxy.configured_in_section) end test 'does not overwrite with argument object without any specifications of required/multi' do p1 = Fluent::Config::ConfigureProxy.new(:section1) + p1.configured_in_section = :subsection p2 = Fluent::Config::ConfigureProxy.new(:section2, param_name: :sections, init: false, required: true, multi: false) p3 = Fluent::Config::ConfigureProxy.new(:section3) proxy = p1.merge(p2).merge(p3) @@ -79,6 +82,54 @@ class TestConfigureProxy < ::Test::Unit::TestCase assert_true(proxy.required?) assert_false(proxy.multi) assert_false(proxy.multi?) + assert_equal :subsection, proxy.configured_in_section + end + end + + sub_test_case '#overwrite_defaults' do + test 'overwrites only defaults with others defaults' do + p1 = Fluent::Config::ConfigureProxy.new(:mychild) + p1.configured_in_section = :child + p1.config_param(:k1a, :string) + p1.config_param(:k1b, :string) + p1.config_param(:k2a, :integer, default: 0) + p1.config_param(:k2b, :integer, default: 0) + p1.config_section(:sub1) do + config_param :k3, :time, default: 30 + end + + p0 = Fluent::Config::ConfigureProxy.new(:myparent) + p0.config_section(:child) do + config_set_default :k1a, "v1a" + config_param :k1b, :string, default: "v1b" + config_set_default :k2a, 21 + config_param :k2b, :integer, default: 22 + config_section :sub1 do + config_set_default :k3, 60 + end + end + + p1.overwrite_defaults(p0.sections[:child]) + + assert_equal "v1a", p1.defaults[:k1a] + assert_equal "v1b", p1.defaults[:k1b] + assert_equal 21, p1.defaults[:k2a] + assert_equal 22, p1.defaults[:k2b] + assert_equal 60, p1.sections[:sub1].defaults[:k3] + end + end + + sub_test_case '#configured_in' do + test 'sets a section name which have configuration parameters of target plugin in owners configuration' do + proxy = Fluent::Config::ConfigureProxy.new(:section) + proxy.configured_in(:mysection) + assert_equal :mysection, proxy.configured_in_section + end + + test 'do not permit to be called twice' do + proxy = Fluent::Config::ConfigureProxy.new(:section) + proxy.configured_in(:mysection) + assert_raise(ArgumentError) { proxy.configured_in(:myothersection) } end end diff --git a/test/helper.rb b/test/helper.rb index d1fb1fd583..47b0f32aca 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -42,6 +42,23 @@ def to_masked_element require 'fluent/config/element' require 'fluent/log' require 'fluent/test' +require 'fluent/plugin/base' +require 'fluent/log' +require 'fluent/plugin_id' +require 'fluent/plugin_helper' +require 'fluent/time' + +module Fluent + module Plugin + class TestBase < Base + # a base plugin class, but not input nor output + # mainly for helpers and owned plugins + include PluginId + include PluginLoggerMixin + include PluginHelper::Mixin + end + end +end unless defined?(Test::Unit::AssertionFailedError) class Test::Unit::AssertionFailedError < StandardError @@ -52,6 +69,14 @@ def config_element(name = 'test', argument = '', params = {}, elements = []) Fluent::Config::Element.new(name, argument, params, elements) end +def event_time(str=nil) + if str + Fluent::EventTime.parse(str) + else + Fluent::EventTime.now + end +end + def unused_port(num = 1) ports = [] sockets = [] diff --git a/test/plugin/test_owned_by.rb b/test/plugin/test_owned_by.rb new file mode 100644 index 0000000000..50bdd323d2 --- /dev/null +++ b/test/plugin/test_owned_by.rb @@ -0,0 +1,35 @@ +require_relative '../helper' +require 'fluent/plugin/base' +require 'fluent/plugin/input' +require 'fluent/plugin/owned_by_mixin' + +module OwnedByMixinTestEnv + class DummyParent < Fluent::Plugin::Input + Fluent::Plugin.register_input('dummy_parent', self) + end + class DummyChild < Fluent::Plugin::Base + include Fluent::Plugin::OwnedByMixin + Fluent::Plugin.register_parser('dummy_child', self) + end +end + +class OwnedByMixinTest < Test::Unit::TestCase + sub_test_case 'Owned plugins' do + setup do + Fluent::Test.setup + end + + test 'inherits plugin id and logger from parent' do + parent = Fluent::Plugin.new_input('dummy_parent') + parent.configure(config_element('ROOT', '', {'@id' => 'my_parent_id', '@log_level' => 'trace'})) + child = Fluent::Plugin.new_parser('dummy_child', parent: parent) + + assert_equal parent.object_id, child.owner.object_id + + assert child.instance_eval{ @_plugin_id_configured } + assert_equal 'my_parent_id', child.instance_eval{ @_plugin_id } + + assert_equal Fluent::Log::LEVEL_TRACE, child.log.level + end + end +end diff --git a/test/plugin/test_storage.rb b/test/plugin/test_storage.rb index 901cbae9be..430ed21032 100644 --- a/test/plugin/test_storage.rb +++ b/test/plugin/test_storage.rb @@ -2,7 +2,7 @@ require 'fluent/plugin/storage' require 'fluent/plugin/base' -class DummyPlugin < Fluent::Plugin::Base +class DummyPlugin < Fluent::Plugin::TestBase end class BareStorage < Fluent::Plugin::Storage diff --git a/test/plugin_helper/test_retry_state.rb b/test/plugin_helper/test_retry_state.rb new file mode 100644 index 0000000000..5108042f32 --- /dev/null +++ b/test/plugin_helper/test_retry_state.rb @@ -0,0 +1,398 @@ +require_relative '../helper' +require 'fluent/plugin_helper/retry_state' +require 'fluent/plugin/base' + +require 'time' + +class Fluent::PluginHelper::RetryState::RetryStateMachine + def override_current_time(time) + (class << self; self; end).module_eval do + define_method(:current_time){ time } + end + end +end + +class RetryStateHelperTest < Test::Unit::TestCase + class Dummy < Fluent::Plugin::TestBase + helpers :retry_state + end + + setup do + @d = Dummy.new + end + + test 'randomize can generate value within specified +/- range' do + s = @d.retry_state_create(:t1, :expbackoff, 0.1, 30) # default enabled w/ 0.125 + 500.times do + r = s.randomize(1000) + assert{ r >= 875 && r < 1125 } + end + + s = @d.retry_state_create(:t1, :expbackoff, 0.1, 30, randomize_width: 0.25) + 500.times do + r = s.randomize(1000) + assert{ r >= 750 && r < 1250 } + end + end + + test 'plugin can create retry_state machine' do + s = @d.retry_state_create(:t1, :expbackoff, 0.1, 30) + # attr_reader :title, :start, :steps, :next_time, :timeout_at, :current, :secondary_transition_at, :secondary_transition_times + + assert_equal :t1, s.title + start_time = s.start + + assert_equal 0, s.steps + assert_equal (start_time + 0.1).to_i, s.next_time.to_i + assert_equal (start_time + 0.1).nsec, s.next_time.nsec + assert_equal (start_time + 30), s.timeout_at + + assert_equal :primary, s.current + assert{ s.is_a? Fluent::PluginHelper::RetryState::ExponentialBackOffRetry } + end + + test 'periodic retries' do + s = @d.retry_state_create(:t2, :periodic, 3, 29, randomize: false) + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + assert_equal (dummy_current_time + 29), s.timeout_at + assert_equal (dummy_current_time + 3), s.next_time + + i = 1 + while i < 9 + s.override_current_time(s.next_time) + s.step + assert_equal i, s.steps + assert_equal (s.current_time + 3), s.next_time + assert !s.limit? + i += 1 + end + + assert_equal 9, i + s.override_current_time(s.next_time) + s.step + assert_equal s.timeout_at, s.next_time + assert s.limit? + end + + test 'periodic retries with max_steps' do + s = @d.retry_state_create(:t2, :periodic, 3, 29, randomize: false, max_steps: 5) + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + assert_equal (dummy_current_time + 29), s.timeout_at + assert_equal (dummy_current_time + 3), s.next_time + + i = 1 + while i < 5 + s.override_current_time(s.next_time) + s.step + assert_equal i, s.steps + assert_equal (s.current_time + 3), s.next_time + assert !s.limit? + i += 1 + end + + assert_equal 5, i + s.override_current_time(s.next_time) + s.step + assert_equal (s.current_time + 3), s.next_time + assert s.limit? + end + + test 'periodic retries with secondary' do + s = @d.retry_state_create(:t3, :periodic, 3, 100, randomize: false, secondary: true) # threshold 0.8 + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + assert_equal (dummy_current_time + 100), s.timeout_at + assert_equal (dummy_current_time + 100 * 0.8), s.secondary_transition_at + + assert_equal (dummy_current_time + 3), s.next_time + assert !s.secondary? + + i = 1 + while i < 26 + s.override_current_time(s.next_time) + assert !s.secondary? + + s.step + assert_equal i, s.steps + assert_equal (s.current_time + 3), s.next_time + assert !s.limit? + i += 1 + end + + assert_equal 26, i + s.override_current_time(s.next_time) # 78 + assert !s.secondary? + + s.step + assert_equal 26, s.steps + assert_equal s.secondary_transition_at, s.next_time + assert !s.limit? + + i += 1 + assert_equal 27, i + s.override_current_time(s.next_time) # 80 + assert s.secondary? + + s.step + assert_equal (s.current_time + 3), s.next_time + assert_equal s.steps, s.secondary_transition_steps + assert !s.limit? + + i += 1 + + while i < 33 + s.override_current_time(s.next_time) + assert s.secondary? + + s.step + assert_equal (s.current_time + 3), s.next_time + assert !s.limit? + i += 1 + end + + assert_equal 33, i + s.override_current_time(s.next_time) # 98 + assert s.secondary? + + s.step + assert_equal s.timeout_at, s.next_time + assert s.limit? + end + + test 'periodic retries with secondary and specified threshold' do + s = @d.retry_state_create(:t3, :periodic, 3, 100, randomize: false, secondary: true, secondary_threshold: 0.75) + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + assert_equal (dummy_current_time + 100), s.timeout_at + assert_equal (dummy_current_time + 100 * 0.75), s.secondary_transition_at + end + + test 'exponential backoff forever without randomization' do + s = @d.retry_state_create(:t11, :expbackoff, 0.1, 300, randomize: false, forever: true, backoff_base: 2) + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + + assert_equal 0, s.steps + assert_equal (dummy_current_time + 0.1), s.next_time + + i = 1 + while i < 300 + s.step + assert_equal i, s.steps + assert_equal (dummy_current_time + 0.1 * (2 ** (i - 1))), s.next_time + assert !s.limit? + i += 1 + end + end + + test 'exponential backoff with max_interval' do + s = @d.retry_state_create(:t12, :expbackoff, 0.1, 300, randomize: false, forever: true, backoff_base: 2, max_interval: 100) + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + + assert_equal 0, s.steps + assert_equal (dummy_current_time + 0.1), s.next_time + + # 0.1 * (2 ** (10 - 1)) == 0.1 * 2 ** 9 == 51.2 + # 0.1 * (2 ** (11 - 1)) == 0.1 * 2 ** 10 == 102.4 + i = 1 + while i < 11 + s.step + assert_equal i, s.steps + assert_equal (dummy_current_time + 0.1 * (2 ** (i - 1))), s.next_time, "start:#{dummy_current_time}, i:#{i}" + i += 1 + end + + s.step + assert_equal 11, s.steps + assert_equal (dummy_current_time + 100), s.next_time + + s.step + assert_equal 12, s.steps + assert_equal (dummy_current_time + 100), s.next_time + end + + test 'exponential backoff with shorter timeout' do + s = @d.retry_state_create(:t13, :expbackoff, 1, 12, randomize: false, backoff_base: 2, max_interval: 10) + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + + assert_equal (dummy_current_time + 12), s.timeout_at + + assert_equal 0, s.steps + assert_equal (dummy_current_time + 1), s.next_time + + # 1 + 1 + 2 + 4 (=8) + + s.override_current_time(s.next_time) + s.step + assert_equal 1, s.steps + assert_equal (s.current_time + 1), s.next_time + + s.override_current_time(s.next_time) + s.step + assert_equal 2, s.steps + assert_equal (s.current_time + 2), s.next_time + + s.override_current_time(s.next_time) + s.step + assert_equal 3, s.steps + assert_equal (s.current_time + 4), s.next_time + + assert !s.limit? + + # + 8 (=16) > 12 + + s.override_current_time(s.next_time) + s.step + assert_equal 4, s.steps + assert_equal s.timeout_at, s.next_time + + assert s.limit? + end + + test 'exponential backoff with max_steps' do + s = @d.retry_state_create(:t14, :expbackoff, 1, 120, randomize: false, backoff_base: 2, max_interval: 10, max_steps: 6) + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + + assert_equal (dummy_current_time + 120), s.timeout_at + + assert_equal 0, s.steps + assert_equal (dummy_current_time + 1), s.next_time + + s.override_current_time(s.next_time) + s.step + assert_equal 1, s.steps + assert_equal (s.current_time + 1), s.next_time + + s.override_current_time(s.next_time) + s.step + assert_equal 2, s.steps + assert_equal (s.current_time + 2), s.next_time + + s.override_current_time(s.next_time) + s.step + assert_equal 3, s.steps + assert_equal (s.current_time + 4), s.next_time + + assert !s.limit? + + s.override_current_time(s.next_time) + s.step + assert_equal 4, s.steps + assert_equal (s.current_time + 8), s.next_time + + assert !s.limit? + + s.override_current_time(s.next_time) + s.step + assert_equal 5, s.steps + assert_equal (s.current_time + 10), s.next_time + + assert !s.limit? + + s.override_current_time(s.next_time) + s.step + assert_equal 6, s.steps + assert_equal (s.current_time + 10), s.next_time + + assert s.limit? + end + + test 'exponential backoff retries with secondary' do + s = @d.retry_state_create(:t15, :expbackoff, 1, 100, randomize: false, backoff_base: 2, secondary: true) # threshold 0.8 + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + assert_equal (dummy_current_time + 100), s.timeout_at + assert_equal (dummy_current_time + 100 * 0.8), s.secondary_transition_at + + assert_equal (dummy_current_time + 1), s.next_time + assert !s.secondary? + + # 1, 1(2), 2(4), 4(8), 8(16), 16(32), 32(64), (80), (81), (83), (87), (95), (100) + i = 1 + while i < 7 + s.override_current_time(s.next_time) + assert !s.secondary? + + s.step + assert_equal i, s.steps + assert_equal (s.current_time + 1 * (2 ** (i - 1))), s.next_time + assert !s.limit? + i += 1 + end + + assert_equal 7, i + s.override_current_time(s.next_time) # 64 + assert !s.secondary? + + s.step + assert_equal 7, s.steps + assert_equal s.secondary_transition_at, s.next_time + assert !s.limit? + + i += 1 + assert_equal 8, i + s.override_current_time(s.next_time) # 80 + assert s.secondary? + + s.step + assert_equal 8, s.steps + assert_equal s.steps, s.secondary_transition_steps + assert_equal (s.secondary_transition_at + 1.0), s.next_time + assert !s.limit? + + # 81, 82, 84, 88, 96, 100 + j = 1 + while j < 4 + s.override_current_time(s.next_time) + assert s.secondary? + assert_equal :secondary, s.current + + s.step + assert_equal (8 + j), s.steps + assert_equal (s.current_time + (1 * (2 ** j))), s.next_time + assert !s.limit?, "j:#{j}" + j += 1 + end + + assert_equal 4, j + s.override_current_time(s.next_time) # 96 + assert s.secondary? + + s.step + assert_equal s.timeout_at, s.next_time + assert s.limit? + end + + test 'exponential backoff retries with secondary and specified threshold' do + s = @d.retry_state_create(:t16, :expbackoff, 1, 100, randomize: false, secondary: true, backoff_base: 2, secondary_threshold: 0.75) + dummy_current_time = s.start + s.override_current_time(dummy_current_time) + + assert_equal dummy_current_time, s.current_time + assert_equal (dummy_current_time + 100), s.timeout_at + assert_equal (dummy_current_time + 100 * 0.75), s.secondary_transition_at + end +end diff --git a/test/plugin_helper/test_storage.rb b/test/plugin_helper/test_storage.rb index b6822f8b46..9f9d3a6347 100644 --- a/test/plugin_helper/test_storage.rb +++ b/test/plugin_helper/test_storage.rb @@ -69,7 +69,7 @@ def synchronized? end class StorageHelperTest < Test::Unit::TestCase - class Dummy < Fluent::Plugin::Base + class Dummy < Fluent::Plugin::TestBase helpers :storage end