Skip to content

Commit

Permalink
fix plugin base API to add "owned" plugins by other (input/output/fil…
Browse files Browse the repository at this point in the history
…ter) plugins

* Buffer/Storage/Parser/Formatter are candidates for this feature
* Input/Output/Filter can have these own default values for owned plugins
* RetryState plugin helper is added at same time to fix helpers API
  • Loading branch information
tagomoris committed Apr 18, 2016
1 parent 19caf7b commit f601453
Show file tree
Hide file tree
Showing 16 changed files with 874 additions and 80 deletions.
35 changes: 34 additions & 1 deletion lib/fluent/config/configure_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
module Fluent
module Config
class ConfigureProxy
attr_accessor :name, :final, :param_name, :init, :required, :multi, :alias, :argument, :params, :defaults, :descriptions, :sections
attr_accessor :name, :final, :param_name, :init, :required, :multi, :alias, :configured_in_section
attr_accessor :argument, :params, :defaults, :descriptions, :sections
# config_param :desc, :string, :default => '....'
# config_set_default :buffer_type, :memory
#
Expand Down Expand Up @@ -50,6 +51,11 @@ def initialize(name, opts = {})

raise "init and required are exclusive" if @init && @required

# specify section name for viewpoint of owner(parent) plugin
# for buffer plugins: all params are in <buffer> section of owner
# others: <storage>, <format> (formatter/parser), ...
@configured_in_section = nil

@argument = nil # nil: ignore argument
@params = {}
@defaults = {}
Expand Down Expand Up @@ -89,6 +95,9 @@ def merge(other) # self is base class, other is subclass
if overwrite?(other, :alias)
raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: alias"
end
if overwrite?(other, :configured_in_section)
raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: configured_in"
end

options = {}
# param_name is used not to ovewrite plugin's instance
Expand All @@ -103,6 +112,9 @@ def merge(other) # self is base class, other is subclass

merged = self.class.new(other.name, options)

# configured_in MUST be kept
merged.configured_in_section = self.configured_in_section

merged.argument = other.argument || self.argument
merged.params = other.params.merge(self.params)
merged.defaults = self.defaults.merge(other.defaults)
Expand Down Expand Up @@ -144,6 +156,9 @@ def merge_for_finalized(other)
if overwrite?(other, :alias)
raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: alias"
end
if overwrite?(other, :configured_in_section)
raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: configured_in"
end

options = {}
options[:param_name] = other.param_name
Expand All @@ -155,6 +170,8 @@ def merge_for_finalized(other)

merged = self.class.new(other.name, options)

merged.configured_in_section = self.configured_in_section

merged.argument = self.argument || other.argument
merged.params = other.params.merge(self.params)
merged.defaults = other.defaults.merge(self.defaults)
Expand All @@ -175,6 +192,15 @@ def merge_for_finalized(other)
merged
end

def overwrite_defaults(other) # other is owner plugin's corresponding proxy
self.defaults = self.defaults.merge(other.defaults)
self.sections.keys.each do |section_key|
if other.sections.has_key?(section_key)
self.sections[section_key].overwrite_defaults(other.sections[section_key])
end
end
end

def parameter_configuration(name, *args, &block)
name = name.to_sym

Expand Down Expand Up @@ -213,6 +239,13 @@ def parameter_configuration(name, *args, &block)
[name, block, opts]
end

def configured_in(section_name)
if @configured_in_section
raise ArgumentError, "#{self.name}: configured_in called twice"
end
@configured_in_section = section_name.to_sym
end

def config_argument(name, *args, &block)
if @argument
raise ArgumentError, "#{self.name}: config_argument called twice"
Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def self.included(mod)
end

def initialize
super
# to simulate implicit 'attr_accessor' by config_param / config_section and its value by config_set_default
proxy = self.class.merged_configure_proxy
proxy.params.keys.each do |name|
Expand All @@ -51,6 +52,14 @@ def configure(conf)
proxy = self.class.merged_configure_proxy
conf.corresponding_proxies << proxy

if self.respond_to?(:owner) && self.owner
owner_proxy = owner.class.merged_configure_proxy
if proxy.configured_in_section
owner_proxy = owner_proxy.sections[proxy.configured_in_section]
end
proxy.overwrite_defaults(owner_proxy) if owner_proxy
end

# In the nested section, can't get plugin class through proxies so get plugin class here
plugin_class = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s)
root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class)
Expand Down Expand Up @@ -97,6 +106,10 @@ def configure_proxy(mod_name)
map[mod_name]
end

def configured_in(section_name)
configure_proxy(self.name).configured_in(section_name)
end

def config_param(name, *args, &block)
configure_proxy(self.name).config_param(name, *args, &block)
attr_accessor name
Expand Down
39 changes: 23 additions & 16 deletions lib/fluent/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ module Plugin
# plugins for fluentd plugins: fluent/plugin/type/NAME.rb
# ex: storage, buffer chunk, ...

# first class plugins (instantiated by Engine)
INPUT_REGISTRY = Registry.new(:input, 'fluent/plugin/in_')
OUTPUT_REGISTRY = Registry.new(:output, 'fluent/plugin/out_')
FILTER_REGISTRY = Registry.new(:filter, 'fluent/plugin/filter_')

# feature plugin: second class plugins (instanciated by Plugins or Helpers)
BUFFER_REGISTRY = Registry.new(:buffer, 'fluent/plugin/buf_')
PARSER_REGISTRY = Registry.new(:parser, 'fluent/plugin/parser_')
FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_')
Expand Down Expand Up @@ -105,28 +108,28 @@ def self.new_filter(type)
new_impl('filter', FILTER_REGISTRY, type)
end

def self.new_buffer(type)
new_impl('buffer', BUFFER_REGISTRY, type)
def self.new_buffer(type, parent: nil)
new_impl('buffer', BUFFER_REGISTRY, type, parent)
end

def self.new_parser(type)
def self.new_parser(type, parent: nil)
require 'fluent/parser'

if type[0] == '/' && type[-1] == '/'
# This usage is not recommended for new API... create RegexpParser directly
require 'fluent/parser'
Fluent::TextParser.lookup(type)
else
new_impl('parser', PARSER_REGISTRY, type)
new_impl('parser', PARSER_REGISTRY, type, parent)
end
end

def self.new_formatter(type)
new_impl('formatter', FORMATTER_REGISTRY, type)
def self.new_formatter(type, parent: nil)
new_impl('formatter', FORMATTER_REGISTRY, type, parent)
end

def self.new_storage(type)
new_impl('storage', STORAGE_REGISTRY, type)
def self.new_storage(type, parent: nil)
new_impl('storage', STORAGE_REGISTRY, type, parent)
end

def self.register_impl(kind, registry, type, value)
Expand All @@ -138,17 +141,21 @@ def self.register_impl(kind, registry, type, value)
nil
end

def self.new_impl(kind, registry, type)
def self.new_impl(kind, registry, type, parent=nil)
# "'type' not found" is handled by registry
obj = registry.lookup(type)
case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call) && obj.arity == 0
obj.call
else
raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)."
impl = case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call) && obj.arity == 0
obj.call
else
raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)."
end
if parent && impl.respond_to?("owner=")
impl.owner = parent
end
impl
end
end
end
55 changes: 33 additions & 22 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,19 @@

require 'fluent/plugin'
require 'fluent/configurable'
require 'fluent/plugin_id'
require 'fluent/log'
require 'fluent/plugin_helper'
require 'fluent/system_config'

module Fluent
module Plugin
class Base
include Configurable
include PluginId
include SystemConfig::Mixin
include PluginLoggerMixin
include PluginHelper::Mixin

State = Struct.new(:configure, :start, :stop, :shutdown, :close, :terminate)
State = Struct.new(:configure, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)

def initialize
super
@state = State.new(false, false, false, false, false, false)
@_state = State.new(false, false, false, false, false, false, false, false)
end

def has_router?
Expand All @@ -43,59 +37,76 @@ def has_router?

def configure(conf)
super
@state.configure = true
@_state ||= State.new(false, false, false, false, false, false, false, false)
@_state.configure = true
self
end

def start
@log.reset
@state.start = true
@_state.start = true
self
end

def stop
@state.stop = true
@_state.stop = true
self
end

def before_shutdown
@_state.before_shutdown = true
self
end

def shutdown
@state.shutdown = true
@_state.shutdown = true
self
end

def after_shutdown
@_state.after_shutdown = true
self
end

def close
@state.close = true
@_state.close = true
self
end

def terminate
@state.terminate = true
@log.reset
@_state.terminate = true
self
end

def configured?
@state.configure
@_state.configure
end

def started?
@state.start
@_state.start
end

def stopped?
@state.stop
@_state.stop
end

def before_shutdown?
@_state.before_shutdown
end

def shutdown?
@state.shutdown
@_state.shutdown
end

def after_shutdown?
@_state.after_shutdown
end

def closed?
@state.close
@_state.close
end

def terminated?
@state.terminate
@_state.terminate
end
end
end
Expand Down
38 changes: 38 additions & 0 deletions lib/fluent/plugin/owned_by_mixin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module Fluent
module Plugin
module OwnedByMixin
def owner=(plugin)
@_owner = plugin

@_plugin_id = plugin.plugin_id
@_plugin_id_configured = plugin.plugin_id_configured?

@log = plugin.log
end

def owner
@_owner
end

def log
@log
end
end
end
end
Loading

0 comments on commit f601453

Please sign in to comment.