Skip to content

Commit

Permalink
Merge pull request #1023 from fluent/add-plugin-helpers-for-parser-an…
Browse files Browse the repository at this point in the history
…d-formatter

Add plugin helpers for parser and formatter
  • Loading branch information
tagomoris committed Jun 6, 2016
2 parents 13e304a + 44e0f96 commit aaf0dfd
Show file tree
Hide file tree
Showing 7 changed files with 854 additions and 60 deletions.
2 changes: 2 additions & 0 deletions lib/fluent/plugin_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
require 'fluent/plugin_helper/timer'
require 'fluent/plugin_helper/child_process'
require 'fluent/plugin_helper/storage'
require 'fluent/plugin_helper/parser'
require 'fluent/plugin_helper/formatter'
require 'fluent/plugin_helper/retry_state'
require 'fluent/plugin_helper/compat_parameters'

Expand Down
138 changes: 138 additions & 0 deletions lib/fluent/plugin_helper/formatter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin'
require 'fluent/plugin/formatter'
require 'fluent/config/element'

module Fluent
module PluginHelper
module Formatter
def formatter_create(usage: '', type: nil, conf: nil)
formatter = @_formatters[usage]
return formatter if formatter

if !type
raise ArgumentError, "BUG: both type and conf are not specified" unless conf
raise Fluent::ConfigError, "@type is required in <format>" unless conf['@type']
type = conf['@type']
end
formatter = Fluent::Plugin.new_formatter(type, parent: self)
config = case conf
when Fluent::Config::Element
conf
when Hash
# in code, programmer may use symbols as keys, but Element needs strings
conf = Hash[conf.map{|k,v| [k.to_s, v]}]
Fluent::Config::Element.new('format', usage, conf, [])
when nil
Fluent::Config::Element.new('format', usage, {}, [])
else
raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
end
formatter.configure(config)
if @_formatters_started
formatter.start
end

@_formatters[usage] = formatter
formatter
end

def self.included(mod)
mod.instance_eval do
# minimum section definition to instantiate formatter plugin instances
config_section :format, required: false, multi: true, param_name: :formatter_configs do
config_argument :usage, :string, default: ''
config_param :@type, :string
end
end
end

attr_reader :_formatters # for tests

def initialize
super
@_formatters_started = false
@_formatters = {} # usage => formatter
end

def configure(conf)
super

@formatter_configs.each do |section|
if @_formatters[section.usage]
raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}"
end
formatter = Plugin.new_formatter(section[:@type], parent: self)
formatter.configure(section.corresponding_config_element)
@_formatters[section.usage] = formatter
end
end

def start
super
@_formatters_started = true
@_formatters.each_pair do |usage, formatter|
formatter.start
end
end

def formatter_operate(method_name, &block)
@_formatters.each_pair do |usage, formatter|
begin
formatter.send(method_name)
block.call(formatter) if block_given?
rescue => e
log.error "unexpected error while #{method_name}", usage: usage, formatter: formatter, error: e
end
end
end

def stop
super
formatter_operate(:stop)
end

def before_shutdown
formatter_operate(:before_shutdown)
super
end

def shutdown
formatter_operate(:shutdown)
super
end

def after_shutdown
formatter_operate(:after_shutdown)
super
end

def close
formatter_operate(:close)
super
end

def terminate
formatter_operate(:terminate)
@_formatters_started = false
@_formatters = {}
super
end
end
end
end
138 changes: 138 additions & 0 deletions lib/fluent/plugin_helper/parser.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin'
require 'fluent/plugin/parser'
require 'fluent/config/element'

module Fluent
module PluginHelper
module Parser
def parser_create(usage: '', type: nil, conf: nil)
parser = @_parsers[usage]
return parser if parser

if !type
raise ArgumentError, "BUG: both type and conf are not specified" unless conf
raise Fluent::ConfigError, "@type is required in <parse>" unless conf['@type']
type = conf['@type']
end
parser = Fluent::Plugin.new_parser(type, parent: self)
config = case conf
when Fluent::Config::Element
conf
when Hash
# in code, programmer may use symbols as keys, but Element needs strings
conf = Hash[conf.map{|k,v| [k.to_s, v]}]
Fluent::Config::Element.new('parse', usage, conf, [])
when nil
Fluent::Config::Element.new('parse', usage, {}, [])
else
raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
end
parser.configure(config)
if @_parsers_started
parser.start
end

@_parsers[usage] = parser
parser
end

def self.included(mod)
mod.instance_eval do
# minimum section definition to instantiate parser plugin instances
config_section :parse, required: false, multi: true, param_name: :parser_configs do
config_argument :usage, :string, default: ''
config_param :@type, :string
end
end
end

attr_reader :_parsers # for tests

def initialize
super
@_parsers_started = false
@_parsers = {} # usage => parser
end

def configure(conf)
super

@parser_configs.each do |section|
if @_parsers[section.usage]
raise Fluent::ConfigError, "duplicated parsers configured: #{section.usage}"
end
parser = Plugin.new_parser(section[:@type], parent: self)
parser.configure(section.corresponding_config_element)
@_parsers[section.usage] = parser
end
end

def start
super
@_parsers_started = true
@_parsers.each_pair do |usage, parser|
parser.start
end
end

def parser_operate(method_name, &block)
@_parsers.each_pair do |usage, parser|
begin
parser.send(method_name)
block.call(parser) if block_given?
rescue => e
log.error "unexpected error while #{method_name}", usage: usage, parser: parser, error: e
end
end
end

def stop
super
parser_operate(:stop)
end

def before_shutdown
parser_operate(:before_shutdown)
super
end

def shutdown
parser_operate(:shutdown)
super
end

def after_shutdown
parser_operate(:after_shutdown)
super
end

def close
parser_operate(:close)
super
end

def terminate
parser_operate(:terminate)
@_parsers_started = false
@_parsers = {}
super
end
end
end
end
Loading

0 comments on commit aaf0dfd

Please sign in to comment.