Skip to content

Commit

Permalink
writing
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Jul 20, 2016
1 parent 437e42c commit 5622485
Show file tree
Hide file tree
Showing 24 changed files with 487 additions and 97 deletions.
6 changes: 4 additions & 2 deletions lib/fluent/compat/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

require 'fluent/plugin'
require 'fluent/plugin/formatter'
require 'fluent/compat/handle_tag_and_time_mixin'
require 'fluent/compat/structured_format_mixin'

require 'fluent/plugin/formatter_out_file'
require 'fluent/plugin/formatter_stdout'
Expand Down Expand Up @@ -66,8 +68,8 @@ def self.create(conf)
formatter
end

HandleTagAndTimeMixin = Fluent::Plugin::Formatter::HandleTagAndTimeMixin
StructuredFormatMixin = Fluent::Plugin::Formatter::StructuredFormatMixin
HandleTagAndTimeMixin = Fluent::Compat::HandleTagAndTimeMixin
StructuredFormatMixin = Fluent::Compat::StructuredFormatMixin

class ProcWrappedFormatter < Fluent::Plugin::ProcWrappedFormatter
# TODO: warn when deprecated
Expand Down
82 changes: 82 additions & 0 deletions lib/fluent/compat/formatter_utils.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin_helper/compat_parameters'

module Fluent
module Compat
module FormatterUtils
INJECT_PARAMS = Fluent::PluginHelper::CompatParameters::INJECT_PARAMS
FORMATTER_PARAMS = Fluent::PluginHelper::CompatParameters::FORMATTER_PARAMS

module InjectMixin
def format(tag, time, record)
r = owner.inject_record(tag, time, record)
super(tag, time, r)
end
end

def self.convert_formatter_conf(conf)
return if conf.elements(name: 'inject').first || conf.elements(name: 'format').first

inject_params = {}
INJECT_PARAMS.each do |older, newer|
next unless newer
if conf.has_key?(older)
inject_params[newer] = conf[older]
end
end

if conf.has_key?('include_time_key') && Fluent::Config.bool_value(conf['include_time_key'])
inject_params['time_key'] ||= 'time'
inject_params['time_type'] ||= 'string'
end
if conf.has_key?('time_as_epoch') && Fluent::Config.bool_value(conf['time_as_epoch'])
inject_params['time_type'] = 'unixtime'
end
if conf.has_key?('localtime') || conf.has_key?('utc')
if conf.has_key?('localtime') && Fluent::Config.bool_value(conf['localtime'])
inject_params['localtime'] = true
elsif conf.has_key?('utc') && Fluent::Config.bool_value(conf['utc'])
inject_params['localtime'] = false
inject_params['timezone'] ||= "+0000"
else
log.warn "both of localtime and utc are specified as false"
end
end

if conf.has_key?('include_tag_key') && Fluent::Config.bool_value(conf['include_tag_key'])
inject_params['tag_key'] ||= 'tag'
end

unless inject_params.empty?
conf.elements << Fluent::Config::Element.new('inject', '', inject_params, [])
end

formatter_params = {}
FORMATTER_PARAMS.each do |older, newer|
next unless newer
if conf.has_key?(older)
formatter_params[newer] = conf[older]
end
end
unless formatter_params.empty?
conf.elements << Fluent::Config::Element.new('format', '', formatter_params, [])
end
end
end
end
end
60 changes: 60 additions & 0 deletions lib/fluent/compat/handle_tag_and_time_mixin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module Fluent
module Compat
module HandleTagAndTimeMixin
def self.included(klass)
klass.instance_eval {
config_param :include_time_key, :bool, default: false
config_param :time_key, :string, default: 'time'
config_param :time_format, :string, default: nil
config_param :time_as_epoch, :bool, default: false
config_param :include_tag_key, :bool, default: false
config_param :tag_key, :string, default: 'tag'
config_param :localtime, :bool, default: true
config_param :timezone, :string, default: nil
}
end

def configure(conf)
super

if conf['utc']
@localtime = false
end
@timef = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone)
if @time_as_epoch && !@include_time_key
log.warn "time_as_epoch will be ignored because include_time_key is false"
end
end

def filter_record(tag, time, record)
if @include_tag_key
record[@tag_key] = tag
end
if @include_time_key
if @time_as_epoch
record[@time_key] = time.to_i
else
record[@time_key] = @timef.format(time)
end
end
end
end
end
end

79 changes: 74 additions & 5 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
require 'fluent/plugin/output'
require 'fluent/plugin/bare_output'
require 'fluent/compat/call_super_mixin'
require 'fluent/compat/formatter_utils'
require 'fluent/compat/parser_utils'
require 'fluent/compat/propagate_default'
require 'fluent/compat/output_chain'
require 'fluent/timezone'
Expand Down Expand Up @@ -136,7 +138,7 @@ def write(chunk)
class Output < Fluent::Plugin::Output
# TODO: warn when deprecated

helpers :event_emitter
helpers :event_emitter, :inject

def support_in_v12_style?(feature)
case feature
Expand All @@ -157,6 +159,26 @@ def initialize
self.class.prepend Fluent::Compat::CallSuperMixin
end
end

def configure(conf)
ParserUtils.convert_parser_conf(conf)
FormatterUtils.convert_formatter_conf(conf)

super
end

def start
super

if instance_variable_defined?(:@formatter) && @inject_config
unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
if @formatter.respond_to?(:owner) && !@formatter.owner
@formatter.owner = self
@formatter.singleton_class.prepend FormatterUtils::InjectMixin
end
end
end
end
end

class MultiOutput < Fluent::Plugin::BareOutput
Expand All @@ -172,7 +194,7 @@ def process(tag, es)
class BufferedOutput < Fluent::Plugin::Output
# TODO: warn when deprecated

helpers :event_emitter
helpers :event_emitter, :inject

def support_in_v12_style?(feature)
case feature
Expand Down Expand Up @@ -246,6 +268,9 @@ def configure(conf)
# RecordFilter mixin uses its own #format_stream method implementation
@overrides_format_stream = methods_of_plugin.include?(:format_stream) || @includes_record_filter

ParserUtils.convert_parser_conf(conf)
FormatterUtils.convert_formatter_conf(conf)

super

if config_style == :v1
Expand Down Expand Up @@ -351,12 +376,25 @@ def initialize
self.class.prepend Fluent::Compat::CallSuperMixin
end
end

def start
super

if instance_variable_defined?(:@formatter) && @inject_config
unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
if @formatter.respond_to?(:owner) && !@formatter.owner
@formatter.owner = self
@formatter.singleton_class.prepend FormatterUtils::InjectMixin
end
end
end
end
end

class ObjectBufferedOutput < Fluent::Plugin::Output
# TODO: warn when deprecated

helpers :event_emitter
helpers :event_emitter, :inject

# This plugin cannot inherit BufferedOutput because #configure sets chunk_key 'tag'
# to flush chunks per tags, but BufferedOutput#configure doesn't allow setting chunk_key
Expand Down Expand Up @@ -429,6 +467,9 @@ def configure(conf)
conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, [])
end

ParserUtils.convert_parser_conf(conf)
FormatterUtils.convert_formatter_conf(conf)

super

if config_style == :v1
Expand Down Expand Up @@ -458,12 +499,25 @@ def initialize
self.class.prepend Fluent::Compat::CallSuperMixin
end
end

def start
super

if instance_variable_defined?(:@formatter) && @inject_config
unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
if @formatter.respond_to?(:owner) && !@formatter.owner
@formatter.owner = self
@formatter.singleton_class.prepend FormatterUtils::InjectMixin
end
end
end
end
end

class TimeSlicedOutput < Fluent::Plugin::Output
# TODO: warn when deprecated

helpers :event_emitter
helpers :event_emitter, :inject

def support_in_v12_style?(feature)
case feature
Expand Down Expand Up @@ -577,6 +631,9 @@ def configure(conf)
conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, [])
end

ParserUtils.convert_parser_conf(conf)
FormatterUtils.convert_formatter_conf(conf)

super

if config_style == :v1
Expand All @@ -588,6 +645,19 @@ def configure(conf)
self.extend TimeSliceChunkMixin
end

def start
super

if instance_variable_defined?(:@formatter) && @inject_config
unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
if @formatter.respond_to?(:owner) && !@formatter.owner
@formatter.owner = self
@formatter.singleton_class.prepend FormatterUtils::InjectMixin
end
end
end
end

# Original TimeSlicedOutput#emit doesn't call #format_stream

# #format MUST be implemented in plugin
Expand All @@ -599,4 +669,3 @@ def extract_placeholders(str, metadata)
end
end
end

24 changes: 24 additions & 0 deletions lib/fluent/compat/parser_util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin_helper/compat_parameters'

module Fluent
module Compat
module ParserUtils
end
end
end
40 changes: 40 additions & 0 deletions lib/fluent/compat/parser_utils.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin_helper/compat_parameters'

module Fluent
module Compat
module ParserUtils
PARSER_PARAMS = Fluent::PluginHelper::CompatParameters::PARSER_PARAMS

def self.convert_parser_conf(conf)
return if conf.elements(name: 'parse').first

parser_params = {}
PARSER_PARAMS.each do |older, newer|
next unless newer
if conf.has_key?(older)
parser_params[newer] = conf[older]
end
end
unless parser_params.empty?
conf.elements << Fluent::Config::Element.new('parse', '', parser_params, [])
end
end
end
end
end
Loading

0 comments on commit 5622485

Please sign in to comment.