Skip to content

Commit

Permalink
move all plugin registries to fluent/plugin, and fix some APIs, requi…
Browse files Browse the repository at this point in the history
…re timing
  • Loading branch information
tagomoris committed Feb 4, 2016
1 parent e33dd5c commit 9c58f42
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 73 deletions.
7 changes: 5 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

# TODO consolidate w/ Label ?

module Fluent
require 'fluent/configurable'
require 'fluent/engine'
Expand Down Expand Up @@ -78,7 +81,7 @@ def shutdown
@started_filters.map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_type_from_class(f.class), plugin_id: f.plugin_id
f.shutdown
rescue => e
log.warn "unexpected error while shutting down filter plugins", :plugin => f.class, :plugin_id => f.plugin_id, :error_class => e.class, :error => e
Expand All @@ -92,7 +95,7 @@ def shutdown
@started_outputs.map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_type_from_class(o.class), plugin_id: o.plugin_id
o.shutdown
rescue => e
log.warn "unexpected error while shutting down output plugins", :plugin => o.class, :plugin_id => o.plugin_id, :error_class => e.class, :error => e
Expand Down
21 changes: 13 additions & 8 deletions lib/fluent/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
# limitations under the License.
#

module Fluent
require 'fluent/config/configure_proxy'
require 'fluent/config/section'
require 'fluent/config/error'
require 'fluent/registry'
require 'fluent/config/configure_proxy'
require 'fluent/config/section'
require 'fluent/config/error'
require 'fluent/registry'
require 'fluent/plugin'

module Fluent
module Configurable
def self.included(mod)
mod.extend(ClassMethods)
Expand Down Expand Up @@ -50,9 +51,13 @@ def configure(conf)
proxy = self.class.merged_configure_proxy
conf.corresponding_proxies << proxy

# In the nested section, can't get plugin class through proxies so get plugin class here
plugin_class = Plugin.lookup_name_from_class(proxy.name.to_s)
root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class)
# Configuration sections must have plugin types to show unused parameters (for what type of plugin)
# in configuration files,
# but nested sections cannot take the type of root configuration section.
# So we need to get plugin type here, and use it in configuration proxies.
# ("type" is a stuff specified by '@type' in configuration file)
plugin_type = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s)
root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_type)
@config_root_section = root

root.instance_eval{ @params.keys }.each do |param_name|
Expand Down
18 changes: 10 additions & 8 deletions lib/fluent/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
# limitations under the License.
#

module Fluent
require 'fluent/registry'
require 'fluent/registry'
require 'fluent/configurable'
require 'fluent/plugin'

# Fluent::Plugin::Formatter ?

module Fluent
class Formatter
include Configurable

Expand Down Expand Up @@ -240,7 +244,6 @@ def format(tag, time, record)
end
end

TEMPLATE_REGISTRY = Registry.new(:formatter_type, 'fluent/plugin/formatter_')
{
'out_file' => Proc.new { OutFileFormatter.new },
'stdout' => Proc.new { StdoutFormatter.new },
Expand All @@ -251,23 +254,22 @@ def format(tag, time, record)
'csv' => Proc.new { CsvFormatter.new },
'single_value' => Proc.new { SingleValueFormatter.new },
}.each { |name, factory|
TEMPLATE_REGISTRY.register(name, factory)
Fluent::Plugin.register_formatter(name, factory)
}

def self.register_template(name, factory_or_proc)
factory = if factory_or_proc.is_a?(Class) # XXXFormatter
Proc.new { factory_or_proc.new }
factory_or_proc.new
elsif factory_or_proc.arity == 3 # Proc.new { |tag, time, record| }
Proc.new { ProcWrappedFormatter.new(factory_or_proc) }
else # Proc.new { XXXFormatter.new }
factory_or_proc
end

TEMPLATE_REGISTRY.register(name, factory)
Fluent::Plugin.register_formatter(name, factory)
end

def self.lookup(format)
TEMPLATE_REGISTRY.lookup(format).call
Fluent::Plugin.new_formatter(format)
end

# Keep backward-compatibility
Expand Down
5 changes: 4 additions & 1 deletion lib/fluent/load.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# TODO: This file MUST be deleted
# all test (and implementation) files should require its dependency by itself

require 'thread'
require 'socket'
require 'fcntl'
Expand Down Expand Up @@ -33,5 +36,5 @@
require 'fluent/plugin/buffer'
require 'fluent/plugin/input'
require 'fluent/plugin/output'
require 'fluent/plugin/filter'
require 'fluent/filter'
require 'fluent/match'
32 changes: 16 additions & 16 deletions lib/fluent/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

require 'fluent/registry'
require 'fluent/configurable'
require 'fluent/plugin'

module Fluent
# TODO Fluent::Plugin::Parser ?

module Fluent
class ParserError < StandardError
end

Expand Down Expand Up @@ -624,7 +626,6 @@ def check_format_regexp(format, key)
end
end

TEMPLATE_REGISTRY = Registry.new(:config_type, 'fluent/plugin/parser_')
{
'apache' => Proc.new { RegexpParser.new(/^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/, {'time_format'=>"%d/%b/%Y:%H:%M:%S %z"}) },
'apache_error' => Proc.new { RegexpParser.new(/^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$/) },
Expand All @@ -638,20 +639,21 @@ def check_format_regexp(format, key)
'none' => Proc.new { NoneParser.new },
'multiline' => Proc.new { MultilineParser.new },
}.each { |name, factory|
TEMPLATE_REGISTRY.register(name, factory)
Fluent::Plugin.register_parser(name, factory)
}

def self.register_template(name, regexp_or_proc, time_format=nil)
if regexp_or_proc.is_a?(Class)
factory = Proc.new { regexp_or_proc.new }
elsif regexp_or_proc.is_a?(Regexp)
regexp = regexp_or_proc
factory = Proc.new { RegexpParser.new(regexp, {'time_format'=>time_format}) }
else
factory = regexp_or_proc
end

TEMPLATE_REGISTRY.register(name, factory)
factory = if regexp_or_proc.is_a?(Class)
regexp_or_proc
elsif regexp_or_proc.is_a?(Regexp)
regexp = regexp_or_proc
Proc.new { RegexpParser.new(regexp, {'time_format'=>time_format}) }
elsif regexp_or_proc.respond_to? :call
regexp_or_proc
else
raise ArgumentError, "invalid factory for parser template: #{regexp_or_proc}"
end
Fluent::Plugin.register_parser(name, factory)
end

def self.lookup(format)
Expand All @@ -674,12 +676,10 @@ def self.lookup(format)
else
# built-in template
begin
factory = TEMPLATE_REGISTRY.lookup(format)
Fluent::Plugin.new_parser(format)
rescue ConfigError => e # keep same error message
raise ConfigError, "Unknown format template '#{format}'"
end

factory.call
end
end

Expand Down
52 changes: 35 additions & 17 deletions lib/fluent/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,36 @@
#

require 'fluent/registry'
require 'fluent/parser'
require 'fluent/storage'
require 'fluent/config/error'

module Fluent
module Plugin
SEARCH_PATHS = []

# plugins for fluentd: fluent/plugin/type_NAME.rb
# plugins for fluentd plugins: fluent/plugin/type/NAME.rb
# ex: storage, buffer(chunk), ...

INPUT_REGISTRY = Registry.new(:input_type, 'fluent/plugin/in_')
OUTPUT_REGISTRY = Registry.new(:output_type, 'fluent/plugin/out_')
FILTER_REGISTRY = Registry.new(:filter_type, 'fluent/plugin/filter_')
BUFFER_REGISTRY = Registry.new(:buffer_type, 'fluent/plugin/buf_')
PARSER_REGISTRY = Registry.new(:config_type, 'fluent/plugin/parser_')
FORMATTER_REGISTRY = Registry.new(:formatter_type, 'fluent/plugin/formatter_')
STORAGE_REGISTRY = Registry.new(:storage, 'fluent/plugin/storage/') # storage for plugins

REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY]

REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY]
def self.lookup_type_from_class(klass_or_its_name)
klass = if klass_or_its_name.is_a? Class
klass_or_its_name
elsif klass_or_its_name.is_a? String
eval(klass_or_its_name) # const_get can't handle qualified klass name (ex: A::B)
else
raise ArgumentError, "invalid argument type #{klass_or_its_name.class}: #{klass_or_its_name}"
end
REGISTRIES.reduce(nil){|a, r| a || r.reverse_lookup(klass) }
end

def self.add_plugin_dir(dir)
REGISTRIES.each do |r|
Expand All @@ -53,16 +69,16 @@ def self.register_buffer(type, klass)
register_impl('buffer', BUFFER_REGISTRY, type, klass)
end

def self.register_parser(type, klass)
TextParser.register_template(type, klass)
def self.register_parser(type, klass_or_proc)
register_impl('parser', PARSER_REGISTRY, type, klass_or_proc)
end

def self.register_formatter(type, klass)
TextFormatter.register_template(type, klass)
def self.register_formatter(type, klass_or_proc)
register_impl('formatter', FORMATTER_REGISTRY, type, klass_or_proc)
end

def self.register_storage(type, klass)
Storage.register(type, klass)
register_impl('storage', STORAGE_REGISTRY, type, klass)
end

def self.new_input(type)
Expand All @@ -82,18 +98,15 @@ def self.new_buffer(type)
end

def self.new_parser(type)
TextParser.lookup(type)
new_impl('parser', PARSER_REGISTRY, type)
end

def self.new_formatter(type)
TextFormatter.lookup(type)
new_impl('formatter', FORMATTER_REGISTRY, type)
end

def self.new_storage(type)
if klass = Storage.lookup(type)
return klass.new
end
raise ConfigError, "Unknown storage plugin '#{type}'"
new_impl('storage', STORAGE_REGISTRY, type)
end

def self.register_impl(name, registry, type, klass)
Expand All @@ -103,10 +116,15 @@ def self.register_impl(name, registry, type, klass)
end

def self.new_impl(name, registry, type)
if klass = registry.lookup(type)
return klass.new
obj = registry.lookup(type)
case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call)
obj.call
else
raise ConfigError, "Unknown #{name} plugin '#{type}'. Run 'gem search -rd fluent-plugin' to find plugins"
end
raise ConfigError, "Unknown #{name} plugin '#{type}'. Run 'gem search -rd fluent-plugin' to find plugins"
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/in_debug_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#

require 'fluent/input'

module Fluent
class DebugAgentInput < Input
Plugin.register_input('debug_agent', self)
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#

require 'fluent/output'

module Fluent
class CopyOutput < MultiOutput
Plugin.register_output('copy', self)
Expand Down
12 changes: 10 additions & 2 deletions lib/fluent/registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
# limitations under the License.
#

module Fluent
require 'fluent/config/error'
require 'rubygems'
require 'fluent/config/error'

module Fluent
class Registry
def initialize(kind, search_prefix)
@kind = kind
Expand Down Expand Up @@ -44,6 +45,13 @@ def lookup(type)
raise ConfigError, "Unknown #{@kind} plugin '#{type}'. Run 'gem search -rd fluentd-plugin' to find plugins" # TODO error class
end

def reverse_lookup(value)
@map.each do |k, v|
return k if v == value
end
nil
end

def search(type)
path = "#{@search_prefix}#{type}"

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def shutdown
@started_inputs.map { |i|
Thread.new do
begin
log.info "shutting down input", type: Plugin.lookup_name_from_class(i.class), plugin_id: i.plugin_id
log.info "shutting down input", type: Plugin.lookup_type_from_class(i.class), plugin_id: i.plugin_id
i.shutdown
rescue => e
log.warn "unexpected error while shutting down input plugin", :plugin => i.class, :plugin_id => i.plugin_id, :error_class => e.class, :error => e
Expand Down
20 changes: 4 additions & 16 deletions lib/fluent/storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,13 @@
# limitations under the License.
#

require 'fluent/plugin'

# TODO Fluent::Plugin::Storage ?

module Fluent
module Storage
STORAGE_REGISTRY = Registry.new(:storage, 'fluent/storage/')

DEFAULT_TYPE = 'json'

def self.register(type, klass)
STORAGE_REGISTRY.register(type, klass)
end

def self.lookup(type)
STORAGE_REGISTRY.lookup(type)
end

class LoadError
end

class SaveError
end
end
end

Expand Down
5 changes: 5 additions & 0 deletions lib/fluent/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
#

require 'test/unit'

# TODO: remove this line after `require 'fluent/load'` deleted
# all test (and implementation) files should require its dependency by itself
$log = Fluent::Log.new(StringIO.new, Fluent::Log::LEVEL_WARN)

require 'fluent/load'
require 'fluent/test/base'
require 'fluent/test/input_test'
Expand Down
Loading

0 comments on commit 9c58f42

Please sign in to comment.