Skip to content

Commit

Permalink
Merge pull request #838 from fluent/register-plugin-into-registry
Browse files Browse the repository at this point in the history
Register plugin into registry
  • Loading branch information
tagomoris committed Mar 10, 2016
2 parents 1020344 + 063bc75 commit c10ae6e
Show file tree
Hide file tree
Showing 34 changed files with 198 additions and 202 deletions.
4 changes: 2 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def shutdown
@started_filters.map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_type_from_class(f.class), plugin_id: f.plugin_id
f.shutdown
rescue => e
log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e
Expand All @@ -95,7 +95,7 @@ def shutdown
@started_outputs.map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_type_from_class(o.class), plugin_id: o.plugin_id
o.shutdown
rescue => e
log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require 'fileutils'

require 'fluent/configurable'
require 'fluent/plugin' # to register itself to registry

module Fluent
class BufferError < StandardError
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def configure(conf)
conf.corresponding_proxies << proxy

# In the nested section, can't get plugin class through proxies so get plugin class here
plugin_class = Fluent::Plugin.lookup_name_from_class(proxy.name.to_s)
plugin_class = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s)
root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class)
@config_root_section = root

Expand Down
5 changes: 2 additions & 3 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def init(system_config)
@system_config = system_config

BasicSocket.do_not_reverse_lookup = true
Plugin.load_plugins
if defined?(Encoding)
Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal)
Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external)
Expand Down Expand Up @@ -126,8 +125,8 @@ def configure(conf)
end
end

def load_plugin_dir(dir)
Plugin.load_plugin_dir(dir)
def add_plugin_dir(dir)
Plugin.add_plugin_dir(dir)
end

def emit(tag, time, record)
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require 'fluent/plugin_id'
require 'fluent/engine'
require 'fluent/event'
require 'fluent/plugin' # to register itself to registry
require 'fluent/log'

module Fluent
Expand Down
47 changes: 23 additions & 24 deletions lib/fluent/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#

require 'fluent/configurable'
require 'fluent/registry'
require 'fluent/plugin'
require 'fluent/mixin'

module Fluent
Expand Down Expand Up @@ -242,34 +242,33 @@ def format(tag, time, record)
end
end

TEMPLATE_REGISTRY = Registry.new(:formatter_type, 'fluent/plugin/formatter_')
{
'out_file' => Proc.new { OutFileFormatter.new },
'stdout' => Proc.new { StdoutFormatter.new },
'json' => Proc.new { JSONFormatter.new },
'hash' => Proc.new { HashFormatter.new },
'msgpack' => Proc.new { MessagePackFormatter.new },
'ltsv' => Proc.new { LabeledTSVFormatter.new },
'csv' => Proc.new { CsvFormatter.new },
'single_value' => Proc.new { SingleValueFormatter.new },
}.each { |name, factory|
TEMPLATE_REGISTRY.register(name, factory)
'out_file' => OutFileFormatter,
'stdout' => StdoutFormatter,
'json' => JSONFormatter,
'hash' => HashFormatter,
'msgpack' => MessagePackFormatter,
'ltsv' => LabeledTSVFormatter,
'csv' => CsvFormatter,
'single_value' => SingleValueFormatter,
}.each { |type, factory|
Fluent::Plugin.register_formatter(type, factory)
}

def self.register_template(name, factory_or_proc)
factory = if factory_or_proc.is_a?(Class) # XXXFormatter
Proc.new { factory_or_proc.new }
elsif factory_or_proc.arity == 3 # Proc.new { |tag, time, record| }
Proc.new { ProcWrappedFormatter.new(factory_or_proc) }
else # Proc.new { XXXFormatter.new }
factory_or_proc
end

TEMPLATE_REGISTRY.register(name, factory)
def self.register_template(type, template)
if template.is_a?(Class)
Fluent::Plugin.register_formatter(type, template)
elsif template.respond_to?(:call) && template.arity == 3 # Proc.new { |tag, time, record| }
Fluent::Plugin.register_formatter(type, Proc.new { ProcWrappedFormatter.new(template) })
elsif template.respond_to?(:call)
Fluent::Plugin.register_formatter(type, template)
else
raise ArgumentError, "Template for formatter must be a Class or callable object"
end
end

def self.lookup(format)
TEMPLATE_REGISTRY.lookup(format).call
def self.lookup(type)
Fluent::Plugin.new_formatter(type)
end

# Keep backward-compatibility
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require 'fluent/configurable'
require 'fluent/plugin_id'
require 'fluent/engine'
require 'fluent/plugin' # to register itself to registry
require 'fluent/log'

module Fluent
Expand Down
43 changes: 19 additions & 24 deletions lib/fluent/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
require 'fluent/config/element'
require 'fluent/configurable'
require 'fluent/engine'
require 'fluent/registry'
require 'fluent/plugin' # to register itself to registry
require 'fluent/plugin/string_util'
require 'fluent/time'

module Fluent
Expand Down Expand Up @@ -633,34 +634,30 @@ def check_format_regexp(format, key)
end
end

TEMPLATE_REGISTRY = Registry.new(:config_type, 'fluent/plugin/parser_')
{
'apache' => Proc.new { RegexpParser.new(/^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/, {'time_format'=>"%d/%b/%Y:%H:%M:%S %z"}) },
'apache_error' => Proc.new { RegexpParser.new(/^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$/) },
'apache2' => Proc.new { ApacheParser.new },
'syslog' => Proc.new { SyslogParser.new },
'json' => Proc.new { JSONParser.new },
'tsv' => Proc.new { TSVParser.new },
'ltsv' => Proc.new { LabeledTSVParser.new },
'csv' => Proc.new { CSVParser.new },
'apache2' => ApacheParser,
'syslog' => SyslogParser,
'json' => JSONParser,
'tsv' => TSVParser,
'ltsv' => LabeledTSVParser,
'csv' => CSVParser,
'nginx' => Proc.new { RegexpParser.new(/^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$/, {'time_format'=>"%d/%b/%Y:%H:%M:%S %z"}) },
'none' => Proc.new { NoneParser.new },
'multiline' => Proc.new { MultilineParser.new },
}.each { |name, factory|
TEMPLATE_REGISTRY.register(name, factory)
'none' => NoneParser,
'multiline' => MultilineParser,
}.each { |type, factory|
Fluent::Plugin.register_parser(type, factory)
}

def self.register_template(name, regexp_or_proc, time_format=nil)
if regexp_or_proc.is_a?(Class)
factory = Proc.new { regexp_or_proc.new }
elsif regexp_or_proc.is_a?(Regexp)
regexp = regexp_or_proc
factory = Proc.new { RegexpParser.new(regexp, {'time_format'=>time_format}) }
def self.register_template(type, template, time_format=nil)
if template.is_a?(Class) || template.respond_to?(:call)
Fluent::Plugin.register_parser(type, template)
elsif template.is_a?(Regexp)
Fluent::Plugin.register_parser(type, Proc.new { RegexpParser.new(template, {'time_format' => time_format}) })
else
factory = regexp_or_proc
raise ArgumentError, "Template for parser must be a Class, callable object or regular expression object"
end

TEMPLATE_REGISTRY.register(name, factory)
end

def self.lookup(format)
Expand All @@ -683,12 +680,10 @@ def self.lookup(format)
else
# built-in template
begin
factory = TEMPLATE_REGISTRY.lookup(format)
Fluent::Plugin.new_parser(format)
rescue ConfigError => e # keep same error message
raise ConfigError, "Unknown format template '#{format}'"
end

factory.call
end
end

Expand Down
Loading

0 comments on commit c10ae6e

Please sign in to comment.