From 0c5057859fd3149c1ee7f821d02c9e7eed7bd12e Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 17:38:36 +0900 Subject: [PATCH 01/22] fix test with added (missing) error definition --- test/plugin/test_out_forward.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 1921dac3b7..f6e4bd2219 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -489,6 +489,10 @@ def initialize @emit_streams ||= [] end + def clear! + @emit_streams = [] + end + def emit_stream(tag, es) @emit_streams << [tag, es.to_a] end From 51a1a246457b4e1fb9a10af24303937ade28a9e3 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 21 Apr 2016 15:00:46 +0900 Subject: [PATCH 02/22] Add Base#inspect to show this object in simple/short way --- lib/fluent/plugin/base.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 1f715dc2cc..21d015ba95 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -108,6 +108,14 @@ def closed? def terminated? @_state.terminate end + + def inspect + # Plugin instances are sometimes too big to dump because it may have too many thins (buffer,storage, ...) + # Original commit comment says that: + # To emulate normal inspect behavior `ruby -e'o=Object.new;p o;p (o.__id__<<1).to_s(16)'`. + # https://github.com/ruby/ruby/blob/trunk/gc.c#L788 + "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)] + end end end end From e5769f738e9e99a4aa8ce39ff644332d5fa325fb Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 21 Apr 2016 15:32:00 +0900 Subject: [PATCH 03/22] add fluent/compat/filter and make Fluent::Filter with it --- lib/fluent/compat/filter.rb | 26 +++++++++++++++++++ lib/fluent/filter.rb | 52 ++----------------------------------- 2 files changed, 28 insertions(+), 50 deletions(-) create mode 100644 lib/fluent/compat/filter.rb diff --git a/lib/fluent/compat/filter.rb b/lib/fluent/compat/filter.rb new file mode 100644 index 0000000000..352c830671 --- /dev/null +++ b/lib/fluent/compat/filter.rb @@ -0,0 +1,26 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin' +require 'fluent/plugin/filter' + +module Fluent + module Compat + class Filter < Fluent::Plugin::Filter + # TODO: warn when deprecated + end + end +end diff --git a/lib/fluent/filter.rb b/lib/fluent/filter.rb index 0f8310dc17..a04de9cdf1 100644 --- a/lib/fluent/filter.rb +++ b/lib/fluent/filter.rb @@ -14,56 +14,8 @@ # limitations under the License. # -require 'fluent/configurable' -require 'fluent/plugin_id' -require 'fluent/event' -require 'fluent/plugin' # to register itself to registry -require 'fluent/log' +require 'fluent/compat/filter' module Fluent - class Filter - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - - def initialize - super - end - - def configure(conf) - super - - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end - end - - def start - end - - def shutdown - end - - def filter(tag, time, record) - raise NotImplementedError, "Implement this method in child class" - end - - def filter_stream(tag, es) - new_es = MultiEventStream.new - es.each { |time, record| - begin - filtered_record = filter(tag, time, record) - new_es.add(time, filtered_record) if filtered_record - rescue => e - router.emit_error_event(tag, time, record, e) - end - } - new_es - end - end + Filter = Fluent::Compat::Filter end From a1d675426cbcc67745b8c2ff99cac23335e056d5 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 21 Apr 2016 15:33:16 +0900 Subject: [PATCH 04/22] add fluent/compat/input and make Fluent::Input with it --- lib/fluent/compat/input.rb | 26 ++++++++++++++++++++++++++ lib/fluent/input.rb | 35 ++--------------------------------- 2 files changed, 28 insertions(+), 33 deletions(-) create mode 100644 lib/fluent/compat/input.rb diff --git a/lib/fluent/compat/input.rb b/lib/fluent/compat/input.rb new file mode 100644 index 0000000000..fe6ed576f0 --- /dev/null +++ b/lib/fluent/compat/input.rb @@ -0,0 +1,26 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin' +require 'fluent/plugin/input' + +module Fluent + module Compat + class Input < Fluent::Plugin::Input + # TODO: warn when deprecated + end + end +end diff --git a/lib/fluent/input.rb b/lib/fluent/input.rb index 3572157fa0..1c334288d5 100644 --- a/lib/fluent/input.rb +++ b/lib/fluent/input.rb @@ -14,39 +14,8 @@ # limitations under the License. # -require 'fluent/configurable' -require 'fluent/plugin_id' -require 'fluent/engine' -require 'fluent/plugin' # to register itself to registry -require 'fluent/log' +require 'fluent/compat/input' module Fluent - class Input - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - - def initialize - super - end - - def configure(conf) - super - - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end - end - - def start - end - - def shutdown - end - end + Input = Fluent::Compat::Input end From bf3dbcc03bc188b5c9ed130d4aa89856c5b2ace5 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 25 Apr 2016 15:51:04 +0900 Subject: [PATCH 05/22] add comments about time handling --- lib/fluent/plugin/output.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index d4da5a7db0..6cc67ed5cc 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -45,9 +45,10 @@ class Output < Base config_param :@type, :string, default: 'memory2' config_param :timekey_range, :time, default: nil # range size to be used: `time.to_i / @timekey_range` - config_param :timekey_use_utc, :bool, default: false # default is localtime - config_param :timekey_zone, :string, default: Time.now.strftime('%z') # '+0900' config_param :timekey_wait, :time, default: 600 + # These are for #extract_placeholders + config_param :timekey_use_utc, :bool, default: false # default is localtime + config_param :timekey_zone, :string, default: Time.now.strftime('%z') # e.g., "-0700" or "Asia/Tokyo" desc 'If true, plugin will try to flush buffer just before shutdown.' config_param :flush_at_shutdown, :bool, default: nil # change default by buffer_plugin.persistent? @@ -499,6 +500,10 @@ def emit_buffered(tag, es) def metadata(tag, time, record) # this arguments are ordered in output plugin's rule # Metadata 's argument order is different from this one (timekey, tag, variables) + + # timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes. + # it's wrong if timezone is configured as one which supports leap second, but it's very rare and + # we can ignore it (especially in production systems). timekey_range = @buffer_config.timekey_range if @chunk_keys.empty? if !@chunk_key_time && !@chunk_key_tag From d5e90de0226f54bac780e8201a0fac83611b0121 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 25 Apr 2016 15:51:34 +0900 Subject: [PATCH 06/22] fix to extend chunk to add method for event iteration --- lib/fluent/plugin/output.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 6cc67ed5cc..8adf74e74d 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -670,6 +670,10 @@ def try_flush using_secondary = true end + unless @custom_format + chunk.extend ChunkMessagePackEventStreamer + end + begin if output.delayed_commit @counters_monitor.synchronize{ @write_count += 1 } From 97143281188d7d18aab955b4d7fef1bd1698f3ba Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:15:47 +0900 Subject: [PATCH 07/22] remove fileutils --- lib/fluent/plugin/buffer/chunk.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 3c737b0122..b915e64c10 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -18,7 +18,6 @@ require 'fluent/unique_id' require 'fluent/event' -require 'fileutils' require 'monitor' module Fluent @@ -111,7 +110,7 @@ def open(&block) def write_to(io) open do |i| - FileUtils.copy_stream(i, io) + IO.copy_stream(i, io) end end end From d0967261b6d88c06b8f18e4bc5abd97d6617b3b1 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:16:15 +0900 Subject: [PATCH 08/22] add optimization --- lib/fluent/plugin/buffer/memory_chunk.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb index b63c7383b5..2f6653b24f 100644 --- a/lib/fluent/plugin/buffer/memory_chunk.rb +++ b/lib/fluent/plugin/buffer/memory_chunk.rb @@ -88,6 +88,11 @@ def read def open(&block) StringIO.open(@chunk, &block) end + + def write_to(io) + # re-implementation to optimize not to create StringIO + io.write @chunk + end end end end From 8c07b0cef4ca90c4d098886621caf16a26dbeff9 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:16:56 +0900 Subject: [PATCH 09/22] take care about NaN & Inf --- lib/fluent/plugin_helper/retry_state.rb | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb index e5c090e60b..a269d15b02 100644 --- a/lib/fluent/plugin_helper/retry_state.rb +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -146,9 +146,14 @@ def initialize(title, wait, timeout, forever, max_steps, randomize, randomize_wi end def naive_next_time(retry_next_times) - interval = @constant_factor * ( @backoff_base ** ( retry_next_times - 1 ) ) - intr = if @max_interval && interval > @max_interval - @max_interval + # make it infinite if calculated "interval" is too big + interval = @constant_factor.to_f * ( @backoff_base ** ( retry_next_times - 1 ) ) + intr = if interval.finite? + if @max_interval && interval > @max_interval + @max_interval + else + interval + end else interval end From 0688ba0d1ebd3e178565aa86236a9318834dad28 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:17:35 +0900 Subject: [PATCH 10/22] add buffer path pattern only for suffix --- lib/fluent/plugin/buf_file2.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/plugin/buf_file2.rb b/lib/fluent/plugin/buf_file2.rb index 5c027df3c0..9de0b90628 100644 --- a/lib/fluent/plugin/buf_file2.rb +++ b/lib/fluent/plugin/buf_file2.rb @@ -70,6 +70,8 @@ def configure(conf) @path = File.join(@path, 'buffer.*.log') elsif File.basename(@path).include?('.*.') # valid path (buffer.*.log will be ignored) + elsif File.basename(@path).end_with?('.*') + @path = @path + '.log' else # existing file will be ignored @path = @path + '.*.log' @@ -77,6 +79,8 @@ def configure(conf) else # path doesn't exist if File.basename(@path).include?('.*.') # valid path + elsif File.basename(@path).end_with?('.*') + @path = @path + '.log' else # path is handled as directory, and it will be created at #start @path = File.join(@path, 'buffer.*.log') From a5a6b983d9cbc8c0245ec78f52c3dc02c86af7fe Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:19:48 +0900 Subject: [PATCH 11/22] initialize minimal internal things as early as possible for tests --- lib/fluent/plugin/output.rb | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 8adf74e74d..faad7c4030 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -143,10 +143,30 @@ def expired? def initialize super + @counters_monitor = Monitor.new @buffering = false @delayed_commit = false @as_secondary = false @primary_instance = nil + + # TODO: well organized counters + @num_errors = 0 + @emit_count = 0 + @emit_records = 0 + @write_count = 0 + @rollback_count = 0 + + # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start + if implement?(:synchronous) + if implement?(:buffered) || implement?(:delayed_commit) + @buffering = nil # do #configure or #start to determine this for full-featured plugins + else + @buffering = false + end + else + @buffering = true + end + @custom_format = implement?(:custom_format) end def acts_as_secondary(primary) @@ -268,13 +288,6 @@ def configure(conf) def start super - # TODO: well organized counters - @counters_monitor = Monitor.new - @num_errors = 0 - @emit_count = 0 - @emit_records = 0 - @write_count = 0 - @rollback_count = 0 if @buffering.nil? @buffering = prefer_buffered_processing @@ -296,7 +309,7 @@ def start implement?(:delayed_commit) end @delayed_commit_timeout = @buffer_config.delayed_commit_timeout - else # !@buffered + else # !@buffering m = method(:emit_sync) (class << self; self; end).module_eval do define_method(:emit, m) From 5e930924866e6af7360f8eaf6cc9af4ccc85f09e Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:25:11 +0900 Subject: [PATCH 12/22] Add compatibility layers for output plugin * change internal APIs to emit events to plugins/routers with newer one * remove all output chains from cores --- lib/fluent/agent.rb | 2 +- lib/fluent/compat/output.rb | 548 ++++++++++++++++++++++++++++ lib/fluent/compat/output_chain.rb | 60 ++++ lib/fluent/event_router.rb | 7 +- lib/fluent/output.rb | 577 +----------------------------- lib/fluent/output_chain.rb | 44 +-- lib/fluent/plugin/output.rb | 37 +- lib/fluent/test/output_test.rb | 51 ++- 8 files changed, 674 insertions(+), 652 deletions(-) create mode 100644 lib/fluent/compat/output.rb create mode 100644 lib/fluent/compat/output_chain.rb diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index f5cb4977ea..d99381ee26 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -160,7 +160,7 @@ def initialize(log) @count = 0 end - def emit(tag, es, chain) + def emit_events(tag, es) # TODO use time instead of num of records c = (@count += 1) if c < 512 diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb new file mode 100644 index 0000000000..538ccc9053 --- /dev/null +++ b/lib/fluent/compat/output.rb @@ -0,0 +1,548 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin' +require 'fluent/plugin/output' +require 'fluent/compat/output_chain' +require 'fluent/timezone' + +require 'time' + +module Fluent + module Compat + NULL_OUTPUT_CHAIN = NullOutputChain.instance + + module CompatOutputUtils + def self.buffer_section(conf) + conf.elements.select{|e| e.name == 'buffer'}.first + end + + def self.secondary_section(conf) + conf.elements.select{|e| e.name == 'secondary'}.first + end + + end + + module BufferedEventStreamMixin + include Enumerable + + def repeatable? + true + end + + def each(&block) + msgpack_each(&block) + end + + def to_msgpack_stream + read + end + + def key + metadata.tag + end + end + + module AddTimeSliceKeyToChunkMixin + def time_slice_format=(format) + @_time_slice_format = format + end + + def timekey_range=(range) + @_timekey_range = range + end + + def timezone=(tz) + @_timezone = tz + end + + def assume_timekey! + @_formatter = Fluent::Timezone.formatter(@_timezone, @_time_slice_format) + + return if self.metadata.timekey + if self.respond_to?(:path) && self.path =~ /\.(\d+)\.(?:b|q)(?:[a-z0-9]+)/ + begin + self.metadata.timekey = Time.parse($1, @_time_slice_format).to_i + rescue ArgumentError + # unknown format / value as timekey + end + end + unless self.metadata.timekey + # file creation time is assumed in the time range of that time slice + # because the first record should be in that range. + time_int = self.created_at.to_i + self.metadata.timekey = time_int - (time_int % @_timekey_range) + end + end + + def key + @_formatter.call(self.metadata.timekey) + end + end + + module ChunkSizeCompatMixin + def size + self.bytesize + end + end + + module BufferedChunkMixin + # prepend this module to BufferedOutput (including ObjectBufferedOutput) plugin singleton class + def write(chunk) + chunk.extend(ChunkSizeCompatMixin) + super + end + end + + module TimeSliceChunkMixin + # prepend this module to TimeSlicedOutput plugin singleton class + def write(chunk) + chunk.extend(ChunkSizeCompatMixin) + chunk.extend(AddTimeSliceKeyToChunkMixin) + chunk.time_slice_format = @time_slice_format + chunk.timekey_range = @_timekey_range + chunk.timezone = @timezone + chunk.assume_timekey! + super + end + end + + class Output < Fluent::Plugin::Output + # TODO: warn when deprecated + + helpers :event_emitter + + def support_in_v12_style?(feature) + case feature + when :synchronous then true + when :buffered then false + when :delayed_commit then false + when :custom_format then false + end + end + + # def configure(conf) + # super + # if @buffer_config + # raise Fluent::ConfigError, " can't be specified because this is non-buffered output plugin: '#{self.class}'" + # end + # if @secondary_config + # raise Fluent::ConfigError, " can't be specified because this is non-buffered output plugin: '#{self.class}'" + # end + # end + + ## emit must be implemented in plugin + # def emit(tag, es, chain) + # end + + def process(tag, es) + emit(tag, es, NULL_OUTPUT_CHAIN) + end + end + + class MultiOutput < Output + end + + class BufferedOutput < Fluent::Plugin::Output + # TODO: warn when deprecated + + helpers :event_emitter + + def support_in_v12_style?(feature) + case feature + when :synchronous then false + when :buffered then true + when :delayed_commit then false + when :custom_format then true + end + end + + desc 'The buffer type (memory, file)' + config_param :buffer_type, :string, default: 'memory' + desc 'The interval between data flushes.' + config_param :flush_interval, :time, default: 60 + config_param :try_flush_interval, :float, default: 1 + desc 'If true, the value of `retry_value` is ignored and there is no limit' + config_param :disable_retry_limit, :bool, default: false + desc 'The limit on the number of retries before buffered data is discarded' + config_param :retry_limit, :integer, default: 17 + desc 'The initial intervals between write retries.' + config_param :retry_wait, :time, default: 1.0 + desc 'The maximum intervals between write retries.' + config_param :max_retry_wait, :time, default: nil + desc 'The number of threads to flush the buffer.' + config_param :num_threads, :integer, default: 1 + desc 'The interval between data flushes for queued chunk.' + config_param :queued_chunk_flush_interval, :time, default: 1 + + desc 'The size of each buffer chunk.' + config_param :buffer_chunk_limit, :size, default: 8*1024*1024 + desc 'The length limit of the chunk queue.' + config_param :buffer_queue_limit, :integer, default: 256 + desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.' + config_param :buffer_queue_full_action, :enum, list: [:exception, :block], default: :exception + + config_param :flush_at_shutdown, :bool, default: true + + PARAMS_MAP = [ + ["buffer_type", "@type"], + ["num_threads", "flush_threads"], + ["flush_interval", "flush_interval"], + ["try_flush_interval", "flush_thread_interval"], + ["queued_chunk_flush_interval", "flush_burst_interval"], + ["disable_retry_limit", "retry_forever"], + ["retry_limit", "retry_max_times"], + ["max_retry_wait", "retry_max_interval"], + ["buffer_chunk_limit", "chunk_bytes_limit"], + ["buffer_queue_limit", "queue_length_limit"], + ["buffer_queue_full_action", nil], # TODO: implement this on fluent/plugin/buffer + ["flush_at_shutdown", "flush_at_shutdown"], + ] + + def configure(conf) + bufconf = CompatOutputUtils.buffer_section(conf) + config_style = (bufconf ? :v1 : :v0) + if config_style == :v0 + buf_params = { + "flush_mode" => "fast", + "retry_type" => "expbackoff", + } + PARAMS_MAP.each do |older, newer| + buf_params[newer] = conf[older] if conf.has_key?(older) + end + + bufconf = Fluent::Config::Element.new('buffer', '', buf_params, []) + + conf.elements << bufconf + + secconf = CompatOutputUtils.secondary_section(conf) + if secconf + if secconf['type'] && !secconf['@type'] + secconf['@type'] = secconf['type'] + end + end + end + + methods_of_plugin = self.class.instance_methods(false) + @overrides_format_stream = methods_of_plugin.include?(:format_stream) + + super + + if config_style == :v1 + unless @buffer_config.chunk_keys.empty? + raise Fluent::ConfigError, "this plugin '#{self.class}' cannot handle arguments for section" + end + end + + (class << self; self; end).module_eval do + prepend BufferedChunkMixin + end + end + + # #format MUST be implemented in plugin + # #write is also + + # This method overrides Fluent::Plugin::Output#handle_stream_simple + # because v0.12 BufferedOutput may overrides #format_stream, but original method doesn't consider about it + def handle_stream_simple(tag, es) + if @overrides_format_stream + meta = metadata(nil, nil, nil) + bulk = format_stream(tag, es) + @buffer.emit_bulk(meta, bulk, es.size) + return [meta] + end + + meta = metadata(nil, nil, nil) + es_size = 0 + es_bulk = es.map{|time,record| es_size += 1; format(tag, time, record) }.join + @buffer.emit_bulk(meta, es_bulk, es_size) + @counters_monitor.synchronize{ @emit_records += es_size } + [meta] + end + + def extract_placeholders(str, metadata) + raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" + end + end + + class ObjectBufferedOutput < Fluent::Plugin::Output + # TODO: warn when deprecated + + helpers :event_emitter + + # This plugin cannot inherit BufferedOutput because #configure sets chunk_key 'tag' + # to flush chunks per tags, but BufferedOutput#configure doesn't allow setting chunk_key + # in v1 style configuration + + def support_in_v12_style?(feature) + case feature + when :synchronous then false + when :buffered then true + when :delayed_commit then false + when :custom_format then false + end + end + + desc 'The buffer type (memory, file)' + config_param :buffer_type, :string, default: 'memory' + desc 'The interval between data flushes.' + config_param :flush_interval, :time, default: 60 + config_param :try_flush_interval, :float, default: 1 + desc 'If true, the value of `retry_value` is ignored and there is no limit' + config_param :disable_retry_limit, :bool, default: false + desc 'The limit on the number of retries before buffered data is discarded' + config_param :retry_limit, :integer, default: 17 + desc 'The initial intervals between write retries.' + config_param :retry_wait, :time, default: 1.0 + desc 'The maximum intervals between write retries.' + config_param :max_retry_wait, :time, default: nil + desc 'The number of threads to flush the buffer.' + config_param :num_threads, :integer, default: 1 + desc 'The interval between data flushes for queued chunk.' + config_param :queued_chunk_flush_interval, :time, default: 1 + + desc 'The size of each buffer chunk.' + config_param :buffer_chunk_limit, :size, default: 8*1024*1024 + desc 'The length limit of the chunk queue.' + config_param :buffer_queue_limit, :integer, default: 256 + desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.' + config_param :buffer_queue_full_action, :enum, list: [:exception, :block], default: :exception + + config_param :flush_at_shutdown, :bool, default: true + + config_set_default :time_as_integer, true + + PARAMS_MAP = [ + ["buffer_type", "@type"], + ["num_threads", "flush_threads"], + ["flush_interval", "flush_interval"], + ["try_flush_interval", "flush_thread_interval"], + ["queued_chunk_flush_interval", "flush_burst_interval"], + ["disable_retry_limit", "retry_forever"], + ["retry_limit", "retry_max_times"], + ["max_retry_wait", "retry_max_interval"], + ["buffer_chunk_limit", "chunk_bytes_limit"], + ["buffer_queue_limit", "queue_length_limit"], + ["buffer_queue_full_action", nil], # TODO: implement this on fluent/plugin/buffer + ["flush_at_shutdown", "flush_at_shutdown"], + ] + + def configure(conf) + bufconf = CompatOutputUtils.buffer_section(conf) + config_style = (bufconf ? :v1 : :v0) + if config_style == :v0 + buf_params = { + "flush_mode" => "fast", + "retry_type" => "expbackoff", + } + PARAMS_MAP.each do |older, newer| + buf_params[newer] = conf[older] if conf.has_key?(older) + end + + bufconf = Fluent::Config::Element.new('buffer', 'tag', buf_params, []) + + conf.elements << bufconf + + secconf = CompatOutputUtils.secondary_section(conf) + if secconf + if secconf['type'] && !secconf['@type'] + secconf['@type'] = secconf['type'] + end + end + end + + super + + if config_style == :v1 + if @buffer_config.chunk_keys == ['tag'] + raise Fluent::ConfigError, "this plugin '#{self.class}' allows only" + end + end + + (class << self; self; end).module_eval do + prepend BufferedChunkMixin + end + end + + def format_stream(tag, es) # for BufferedOutputTestDriver + es.to_msgpack_stream(time_int: @time_as_integer) + end + + def write(chunk) + write_objects(chunk.metadata.tag, chunk) + end + + def extract_placeholders(str, metadata) + raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" + end + end + + class TimeSlicedOutput < Fluent::Plugin::Output + # TODO: warn when deprecated + + helpers :event_emitter + + def support_in_v12_style?(feature) + case feature + when :synchronous then false + when :buffered then true + when :delayed_commit then false + when :custom_format then true + end + end + + desc 'The buffer type (memory, file)' + config_param :buffer_type, :string, default: 'file' + desc 'The interval between data flushes.' + config_param :flush_interval, :time, default: nil + config_param :try_flush_interval, :float, default: 1 + desc 'If true, the value of `retry_value` is ignored and there is no limit' + config_param :disable_retry_limit, :bool, default: false + desc 'The limit on the number of retries before buffered data is discarded' + config_param :retry_limit, :integer, default: 17 + desc 'The initial intervals between write retries.' + config_param :retry_wait, :time, default: 1.0 + desc 'The maximum intervals between write retries.' + config_param :max_retry_wait, :time, default: nil + desc 'The number of threads to flush the buffer.' + config_param :num_threads, :integer, default: 1 + desc 'The interval between data flushes for queued chunk.' + config_param :queued_chunk_flush_interval, :time, default: 1 + + desc 'The time format used as part of the file name.' + config_param :time_slice_format, :string, default: '%Y%m%d' + desc 'The amount of time Fluentd will wait for old logs to arrive.' + config_param :time_slice_wait, :time, default: 10*60 + desc 'Parse the time value in the specified timezone' + config_param :timezone, :string, default: nil + + desc 'The size of each buffer chunk.' + config_param :buffer_chunk_limit, :size, default: 256*1024*1024 + desc 'The length limit of the chunk queue.' + config_param :buffer_queue_limit, :integer, default: 256 + desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.' + config_param :buffer_queue_full_action, :enum, list: [:exception, :block], default: :exception + + config_param :flush_at_shutdown, :bool, default: false + + attr_accessor :localtime + + # config_section :buffer, param_name: :buffer_config, init: true, required: false, multi: false, final: true do + # config_set_default :@type, 'file2' + # end + config_section :buffer, param_name: :buffer_config do + config_set_default :@type, 'file2' + end + + PARAMS_MAP = [ + ["buffer_type", "@type"], + ["buffer_path", "path"], + ["num_threads", "flush_threads"], + ["flush_interval", "flush_interval"], + ["try_flush_interval", "flush_thread_interval"], + ["queued_chunk_flush_interval", "flush_burst_interval"], + ["disable_retry_limit", "retry_forever"], + ["retry_limit", "retry_max_times"], + ["max_retry_wait", "retry_max_interval"], + ["buffer_chunk_limit", "chunk_bytes_limit"], + ["buffer_queue_limit", "queue_length_limit"], + ["buffer_queue_full_action", nil], # TODO: implement this on fluent/plugin/buffer + ["flush_at_shutdown", "flush_at_shutdown"], + ["time_slice_wait", "timekey_wait"], + ] + + def initialize + super + @localtime = true + end + + def configure(conf) + bufconf = CompatOutputUtils.buffer_section(conf) + config_style = (bufconf ? :v1 : :v0) + if config_style == :v0 + buf_params = { + "@type" => "file2", + "flush_mode" => (conf['flush_interval'] ? "fast" : "none"), + "retry_type" => "expbackoff", + } + PARAMS_MAP.each do |older, newer| + buf_params[newer] = conf[older] if conf.has_key?(older) + end + unless buf_params.has_key?("@type") + buf_params["@type"] = "file2" + end + + if conf['timezone'] + @timezone = conf['timezone'] + Fluent::Timezone.validate!(@timezone) + elsif conf['utc'] + @timezone = "+0000" + @localtime = false + elsif conf['localtime'] + @timezone = Time.now.strftime('%z') + @localtime = true + else + @timezone = "+0000" # v0.12 assumes UTC without any configuration + @localtime = false + end + + @_timekey_range = case conf['time_slice_format'] + when /\%S/ then 1 + when /\%M/ then 60 + when /\%H/ then 3600 + when /\%d/ then 86400 + when nil then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d' + else + raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" + end + buf_params["timekey_range"] = @_timekey_range + + bufconf = Fluent::Config::Element.new('buffer', 'time', buf_params, []) + + conf.elements << bufconf + + secconf = CompatOutputUtils.secondary_section(conf) + if secconf + if secconf['type'] && !secconf['@type'] + secconf['@type'] = secconf['type'] + end + end + end + + super + + if config_style == :v1 + if @buffer_config.chunk_keys == ['tag'] + raise Fluent::ConfigError, "this plugin '#{self.class}' allows only" + end + end + + (class << self; self; end).module_eval do + prepend TimeSliceChunkMixin + end + end + + # Original TimeSlicedOutput#emit doesn't call #format_stream + + # #format MUST be implemented in plugin + # #write is also + + def extract_placeholders(str, metadata) + raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" + end + end + end +end diff --git a/lib/fluent/compat/output_chain.rb b/lib/fluent/compat/output_chain.rb new file mode 100644 index 0000000000..bd090a0aa2 --- /dev/null +++ b/lib/fluent/compat/output_chain.rb @@ -0,0 +1,60 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'singleton' + +module Fluent + module Compat + # TODO: remove when old plugin API are removed + class NullOutputChain + include Singleton + + def next + end + end + + class OutputChain + def initialize(array, tag, es, chain=NullOutputChain.instance) + @array = array + @tag = tag + @es = es + @offset = 0 + @chain = chain + end + + def next + if @array.length <= @offset + return @chain.next + end + @offset += 1 + result = @array[@offset-1].emit(@tag, @es, self) + result + end + end + + class CopyOutputChain < OutputChain + def next + if @array.length <= @offset + return @chain.next + end + @offset += 1 + es = @array.length > @offset ? @es.dup : @es + result = @array[@offset-1].emit(@tag, es, self) + result + end + end + end +end diff --git a/lib/fluent/event_router.rb b/lib/fluent/event_router.rb index 0b9e081502..fe46a9e36f 100644 --- a/lib/fluent/event_router.rb +++ b/lib/fluent/event_router.rb @@ -46,7 +46,6 @@ def initialize(default_collector, emit_error_handler) @match_cache = MatchCache.new @default_collector = default_collector @emit_error_handler = emit_error_handler - @chain = NullOutputChain.instance end attr_accessor :default_collector @@ -88,7 +87,7 @@ def emit_array(tag, array) end def emit_stream(tag, es) - match(tag).emit(tag, es, @chain) + match(tag).emit_events(tag, es) rescue => e @emit_error_handler.handle_emits_error(tag, es, e) end @@ -147,12 +146,12 @@ def set_output(output) @output = output end - def emit(tag, es, chain) + def emit_events(tag, es) processed = es @filters.each { |filter| processed = filter.filter_stream(tag, processed) } - @output.emit(tag, processed, chain) + @output.emit_events(tag, processed) end end diff --git a/lib/fluent/output.rb b/lib/fluent/output.rb index e6317a1a0f..9e3949cc82 100644 --- a/lib/fluent/output.rb +++ b/lib/fluent/output.rb @@ -14,577 +14,12 @@ # limitations under the License. # -require 'thread' - -require 'fluent/configurable' -require 'fluent/plugin_id' -require 'fluent/log' -require 'fluent/output_chain' -require 'fluent/plugin' -require 'fluent/timezone' -require 'fluent/formatter' +require 'fluent/compat/output' module Fluent - class Output - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - - def initialize - super - end - - def configure(conf) - super - - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end - end - - def start - end - - def shutdown - end - - #def emit(tag, es, chain) - #end - - def secondary_init(primary) - if primary.class != self.class - $log.warn "type of secondary output should be same as primary output", primary: primary.class.to_s, secondary: self.class.to_s - end - end - - def inspect; "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)] end - end - - class OutputThread - def initialize(output) - @output = output - @finish = false - @next_time = Time.now.to_f + 1.0 - end - - def configure(conf) - end - - def start - @mutex = Mutex.new - @cond = ConditionVariable.new - @thread = Thread.new(&method(:run)) - end - - def shutdown - @finish = true - @mutex.synchronize { - @cond.signal - } - Thread.pass - @thread.join - end - - def submit_flush - @mutex.synchronize { - @next_time = 0 - @cond.signal - } - Thread.pass - end - - private - def run - @mutex.lock - begin - until @finish - time = Time.now.to_f - - if @next_time <= time - @mutex.unlock - begin - @next_time = @output.try_flush - ensure - @mutex.lock - end - next_wait = @next_time - Time.now.to_f - else - next_wait = @next_time - time - end - - cond_wait(next_wait) if next_wait > 0 - end - ensure - @mutex.unlock - end - rescue - $log.error "error on output thread", error: $!.to_s - $log.error_backtrace - raise - ensure - @mutex.synchronize { - @output.before_shutdown - } - end - - def cond_wait(sec) - @cond.wait(@mutex, sec) - end - end - - - class BufferedOutput < Output - def initialize - super - @next_flush_time = 0 - @last_retry_time = 0 - @next_retry_time = 0 - @num_errors = 0 - @num_errors_lock = Mutex.new - @secondary = nil - @secondary_limit = 8 - @emit_count = 0 - end - - desc 'The buffer type (memory, file)' - config_param :buffer_type, :string, default: 'memory' - desc 'The interval between data flushes.' - config_param :flush_interval, :time, default: 60 - config_param :try_flush_interval, :float, default: 1 - desc 'If true, the value of `retry_value` is ignored and there is no limit' - config_param :disable_retry_limit, :bool, default: false - desc 'The limit on the number of retries before buffered data is discarded' - config_param :retry_limit, :integer, default: 17 - desc 'The initial intervals between write retries.' - config_param :retry_wait, :time, default: 1.0 - desc 'The maximum intervals between write retries.' - config_param :max_retry_wait, :time, default: nil - desc 'The number of threads to flush the buffer.' - config_param :num_threads, :integer, default: 1 - desc 'The interval between data flushes for queued chunk.' - config_param :queued_chunk_flush_interval, :time, default: 1 - - def configure(conf) - super - - @retry_wait = @retry_wait.to_f # converted to Float for calc_retry_wait - @buffer = Plugin.new_buffer(@buffer_type) - @buffer.configure(conf) - - if @buffer.respond_to?(:enable_parallel) - if @num_threads == 1 - @buffer.enable_parallel(false) - else - @buffer.enable_parallel(true) - end - end - - @writers = (1..@num_threads).map { - writer = OutputThread.new(self) - writer.configure(conf) - writer - } - - if sconf = conf.elements.select {|e| e.name == 'secondary' }.first - type = sconf['@type'] || conf['@type'] || sconf['type'] || conf['type'] - @secondary = Plugin.new_output(type) - @secondary.router = router - @secondary.configure(sconf) - - if secondary_limit = conf['secondary_limit'] - @secondary_limit = secondary_limit.to_i - if @secondary_limit < 0 - raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'" - end - end - - @secondary.secondary_init(self) - end - end - - def start - @next_flush_time = Time.now.to_f + @flush_interval - @buffer.start - @secondary.start if @secondary - @writers.each {|writer| writer.start } - @writer_current_position = 0 - @writers_size = @writers.size - end - - def shutdown - @writers.each {|writer| writer.shutdown } - @secondary.shutdown if @secondary - @buffer.shutdown - end - - def emit(tag, es, chain, key="") - @emit_count += 1 - data = format_stream(tag, es) - if @buffer.emit(key, data, chain) - submit_flush - end - end - - def submit_flush - # Without locks: it is rough but enough to select "next" writer selection - @writer_current_position = (@writer_current_position + 1) % @writers_size - @writers[@writer_current_position].submit_flush - end - - def format_stream(tag, es) - out = '' - es.each {|time,record| - out << format(tag, time, record) - } - out - end - - #def format(tag, time, record) - #end - - #def write(chunk) - #end - - def enqueue_buffer(force = false) - @buffer.keys.each {|key| - @buffer.push(key) - } - end - - def try_flush - time = Time.now.to_f - - empty = @buffer.queue_size == 0 - if empty && @next_flush_time < (now = Time.now.to_f) - @buffer.synchronize do - if @next_flush_time < now - enqueue_buffer - @next_flush_time = now + @flush_interval - empty = @buffer.queue_size == 0 - end - end - end - if empty - return time + @try_flush_interval - end - - begin - retrying = !@num_errors.zero? - - if retrying - @num_errors_lock.synchronize do - if retrying = !@num_errors.zero? # re-check in synchronize - if @next_retry_time >= time - # allow retrying for only one thread - return time + @try_flush_interval - end - # assume next retry failes and - # clear them if when it succeeds - @last_retry_time = time - @num_errors += 1 - @next_retry_time += calc_retry_wait - end - end - end - - if @secondary && !@disable_retry_limit && @num_errors > @retry_limit - has_next = flush_secondary(@secondary) - else - has_next = @buffer.pop(self) - end - - # success - if retrying - @num_errors = 0 - # Note: don't notify to other threads to prevent - # burst to recovered server - $log.warn "retry succeeded.", plugin_id: plugin_id - end - - if has_next - return Time.now.to_f + @queued_chunk_flush_interval - else - return time + @try_flush_interval - end - - rescue => e - if retrying - error_count = @num_errors - else - # first error - error_count = 0 - @num_errors_lock.synchronize do - if @num_errors.zero? - @last_retry_time = time - @num_errors += 1 - @next_retry_time = time + calc_retry_wait - end - end - end - - if @disable_retry_limit || error_count < @retry_limit - $log.warn "temporarily failed to flush the buffer.", next_retry: Time.at(@next_retry_time), error: e, plugin_id: plugin_id - $log.warn_backtrace e.backtrace - - elsif @secondary - if error_count == @retry_limit - $log.warn "failed to flush the buffer.", error: e, plugin_id: plugin_id - $log.warn "retry count exceededs limit. falling back to secondary output." - $log.warn_backtrace e.backtrace - retry # retry immediately - elsif error_count <= @retry_limit + @secondary_limit - $log.warn "failed to flush the buffer, next retry will be with secondary output.", next_retry: Time.at(@next_retry_time), error: e, plugin_id: plugin_id - $log.warn_backtrace e.backtrace - else - $log.warn "failed to flush the buffer.", error: e, plugin_id: plugin_id - $log.warn "secondary retry count exceededs limit." - $log.warn_backtrace e.backtrace - write_abort - @num_errors = 0 - end - - else - $log.warn "failed to flush the buffer.", error: e, plugin_id: plugin_id - $log.warn "retry count exceededs limit." - $log.warn_backtrace e.backtrace - write_abort - @num_errors = 0 - end - - return @next_retry_time - end - end - - def force_flush - @num_errors_lock.synchronize do - @next_retry_time = Time.now.to_f - 1 - end - enqueue_buffer(true) - submit_flush - end - - def before_shutdown - begin - @buffer.before_shutdown(self) - rescue - $log.warn "before_shutdown failed", error: $! - $log.warn_backtrace - end - end - - def calc_retry_wait - # TODO retry pattern - wait = if @disable_retry_limit || @num_errors <= @retry_limit - @retry_wait * (2 ** (@num_errors - 1)) - else - # secondary retry - @retry_wait * (2 ** (@num_errors - 2 - @retry_limit)) - end - retry_wait = wait.finite? ? wait + (rand * (wait / 4.0) - (wait / 8.0)) : wait - @max_retry_wait ? [retry_wait, @max_retry_wait].min : retry_wait - end - - def write_abort - $log.error "throwing away old logs." - begin - @buffer.clear! - rescue - $log.error "unexpected error while aborting", error: $! - $log.error_backtrace - end - end - - def flush_secondary(secondary) - @buffer.pop(secondary) - end - end - - - class ObjectBufferedOutput < BufferedOutput - config_param :time_as_integer, :bool, default: true - - def initialize - super - end - - def format_stream(tag, es) - if @time_as_integer - es.to_msgpack_stream_forced_integer - else - es.to_msgpack_stream - end - end - - def emit(tag, es, chain) - @emit_count += 1 - data = format_stream(tag, es) - key = tag - if @buffer.emit(key, data, chain) - submit_flush - end - end - - module BufferedEventStreamMixin - include Enumerable - - def repeatable? - true - end - - def each(&block) - msgpack_each(&block) - end - - def to_msgpack_stream - read - end - end - - def write(chunk) - chunk.extend(BufferedEventStreamMixin) - write_objects(chunk.key, chunk) - end - end - - - class TimeSlicedOutput < BufferedOutput - require 'fluent/timezone' - - def initialize - super - @localtime = true - #@ignore_old = false # TODO - end - - desc 'The time format used as part of the file name.' - config_param :time_slice_format, :string, default: '%Y%m%d' - desc 'The amount of time Fluentd will wait for old logs to arrive.' - config_param :time_slice_wait, :time, default: 10*60 - desc 'Parse the time value in the specified timezone' - config_param :timezone, :string, default: nil - config_set_default :buffer_type, 'file' # overwrite default buffer_type - config_set_default :buffer_chunk_limit, 256*1024*1024 # overwrite default buffer_chunk_limit - config_set_default :flush_interval, nil - - attr_accessor :localtime - attr_reader :time_slicer # for test - - def configure(conf) - super - - if conf['utc'] - @localtime = false - elsif conf['localtime'] - @localtime = true - end - - if conf['timezone'] - @timezone = conf['timezone'] - Fluent::Timezone.validate!(@timezone) - end - - if @timezone - @time_slicer = Timezone.formatter(@timezone, @time_slice_format) - elsif @localtime - @time_slicer = Proc.new {|time| - Time.at(time).strftime(@time_slice_format) - } - else - @time_slicer = Proc.new {|time| - Time.at(time).utc.strftime(@time_slice_format) - } - end - - @time_slice_cache_interval = time_slice_cache_interval - @before_tc = nil - @before_key = nil - - if @flush_interval - if conf['time_slice_wait'] - $log.warn "time_slice_wait is ignored if flush_interval is specified: #{conf}" - end - @enqueue_buffer_proc = Proc.new do - @buffer.keys.each {|key| - @buffer.push(key) - } - end - - else - @flush_interval = [60, @time_slice_cache_interval].min - @enqueue_buffer_proc = Proc.new do - nowslice = @time_slicer.call(Time.now - @time_slice_wait) - @buffer.keys.each {|key| - if key < nowslice - @buffer.push(key) - end - } - end - end - end - - def emit(tag, es, chain) - @emit_count += 1 - formatted_data = {} - es.each {|time,record| - begin - tc = time / @time_slice_cache_interval - if @before_tc == tc - key = @before_key - else - @before_tc = tc - key = @time_slicer.call(time) - @before_key = key - end - rescue => e - @router.emit_error_event(tag, Engine.now, {'time' => time, 'record' => record}, e) - next - end - - formatted_data[key] ||= '' - formatted_data[key] << format(tag, time, record) - } - formatted_data.each { |key, data| - if @buffer.emit(key, data, chain) - submit_flush - end - } - end - - def enqueue_buffer(force = false) - if force - @buffer.keys.each {|key| - @buffer.push(key) - } - else - @enqueue_buffer_proc.call - end - end - - #def format(tag, event) - #end - - private - def time_slice_cache_interval - if @time_slicer.call(0) != @time_slicer.call(60-1) - return 1 - elsif @time_slicer.call(0) != @time_slicer.call(60*60-1) - return 30 - elsif @time_slicer.call(0) != @time_slicer.call(24*60*60-1) - return 60*30 - else - return 24*60*30 - end - end - end - - - class MultiOutput < Output - #def outputs - #end - end + Output = Fluent::Compat::Output + BufferedOutput = Fluent::Compat::BufferedOutput + ObjectBufferedOutput = Fluent::Compat::ObjectBufferedOutput + TimeSlicedOutput = Fluent::Compat::TimeSlicedOutput + MultiOutput = Fluent::Compat::MultiOutput end diff --git a/lib/fluent/output_chain.rb b/lib/fluent/output_chain.rb index 3883d62d8e..c2c03d6da9 100644 --- a/lib/fluent/output_chain.rb +++ b/lib/fluent/output_chain.rb @@ -14,44 +14,10 @@ # limitations under the License. # -# OutputChain will be removed since v0.14. -module Fluent - class OutputChain - def initialize(array, tag, es, chain=NullOutputChain.instance) - @array = array - @tag = tag - @es = es - @offset = 0 - @chain = chain - end - - def next - if @array.length <= @offset - return @chain.next - end - @offset += 1 - result = @array[@offset-1].emit(@tag, @es, self) - result - end - end - - class CopyOutputChain < OutputChain - def next - if @array.length <= @offset - return @chain.next - end - @offset += 1 - es = @array.length > @offset ? @es.dup : @es - result = @array[@offset-1].emit(@tag, es, self) - result - end - end +require 'fluent/compat/output_chain' - class NullOutputChain - require 'singleton' - include Singleton - - def next - end - end +module Fluent + OutputChain = Fluent::Compat::OutputChain + CopyOutputChain = Fluent::Compat::CopyOutputChain + NullOutputChain = Fluent::Compat::NullOutputChain end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index faad7c4030..7405901a9c 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -470,7 +470,7 @@ def extract_placeholders(str, metadata) end end - def emit(tag, es) + def emit_events(tag, es) # actually this method will be overwritten by #configure if @buffering emit_buffered(tag, es) @@ -514,18 +514,28 @@ def metadata(tag, time, record) # this arguments are ordered in output plugin's rule # Metadata 's argument order is different from this one (timekey, tag, variables) + raise ArgumentError, "tag must be a String: #{tag.class}" unless tag.nil? || tag.is_a?(String) + raise ArgumentError, "time must be a Fluent::EventTime (or Integer): #{time.class}" unless time.nil? || time.is_a?(Fluent::EventTime) || time.is_a?(Integer) + raise ArgumentError, "record must be a Hash: #{record.class}" unless record.nil? || record.is_a?(Hash) + + if @chunk_keys.nil? && @chunk_key_time.nil? && @chunk_key_tag.nil? + # for tests + return Struct.new(:timekey, :tag, :variables).new + end + # timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes. # it's wrong if timezone is configured as one which supports leap second, but it's very rare and # we can ignore it (especially in production systems). - timekey_range = @buffer_config.timekey_range if @chunk_keys.empty? if !@chunk_key_time && !@chunk_key_tag @buffer.metadata() elsif @chunk_key_time && @chunk_key_tag + timekey_range = @buffer_config.timekey_range time_int = time.to_i timekey = time_int - (time_int % timekey_range) @buffer.metadata(timekey: timekey, tag: tag) elsif @chunk_key_time + timekey_range = @buffer_config.timekey_range time_int = time.to_i timekey = time_int - (time_int % timekey_range) @buffer.metadata(timekey: timekey) @@ -533,6 +543,7 @@ def metadata(tag, time, record) @buffer.metadata(tag: tag) end else + timekey_range = @buffer_config.timekey_range timekey = if @chunk_key_time time_int = time.to_i time_int - (time_int % timekey_range) @@ -588,14 +599,22 @@ def handle_stream_with_standard_format(tag, es) def handle_stream_simple(tag, es) meta = metadata((@chunk_key_tag ? tag : nil), nil, nil) - es_size = es.size - es_bulk = if @custom_format - es.map{|time,record| format(tag, time, record) }.join - else - es.to_msgpack_stream(time_int: @time_as_integer) - end + records = es.size + if @custom_format + records = 0 + es_size = 0 + es_bulk = '' + es.each do |time,record| + es_bulk << format(tag, time, record) + es_size += 1 + records += 1 + end + else + es_size = es.size + es_bulk = es.to_msgpack_stream(time_int: @time_as_integer) + end @buffer.emit_bulk(meta, es_bulk, es_size) - @counters_monitor.synchronize{ @emit_records += es_size } + @counters_monitor.synchronize{ @emit_records += records } [meta] end diff --git a/lib/fluent/test/output_test.rb b/lib/fluent/test/output_test.rb index 60176649fa..04c12ab935 100644 --- a/lib/fluent/test/output_test.rb +++ b/lib/fluent/test/output_test.rb @@ -43,9 +43,7 @@ def initialize(klass, tag='test', &block) def emit(record, time=Engine.now) es = OneEventStream.new(time, record) - chain = TestOutputChain.new - @instance.emit(@tag, es, chain) - assert_equal 1, chain.called + @instance.emit_events(@tag, es) end end @@ -85,13 +83,13 @@ def run(num_waits = 10, &block) assert_equal(@expected_buffer, buffer) end - key = if @instance.is_a? Fluent::ObjectBufferedOutput - @tag - else - '' - end - chunk = @instance.buffer.new_chunk(key) - chunk << buffer + chunk = if @instance.instance_eval{ @chunk_key_tag } + @instance.buffer.generate_chunk(@instance.metadata(@tag, nil, nil)) + else + @instance.buffer.generate_chunk(@instance.metadata(nil, nil, nil)) + end + chunk.concat(buffer, es.size) + begin result = @instance.write(chunk) ensure @@ -105,7 +103,7 @@ def run(num_waits = 10, &block) class TimeSlicedOutputTestDriver < InputTestDriver def initialize(klass, tag='test', &block) super(klass, &block) - @entries = {} + @entries = [] @expected_buffer = nil @tag = tag end @@ -113,10 +111,7 @@ def initialize(klass, tag='test', &block) attr_accessor :tag def emit(record, time=Engine.now) - slicer = @instance.instance_eval{@time_slicer} - key = slicer.call(time) - @entries[key] = [] unless @entries.has_key?(key) - @entries[key] << [time, record] + @entries << [time, record] self end @@ -130,30 +125,30 @@ def run(&block) block.call if block buffer = '' - @entries.keys.each {|key| - es = ArrayEventStream.new(@entries[key]) - @instance.emit(@tag, es, NullOutputChain.instance) - buffer << @instance.format_stream(@tag, es) - } + lines = {} + # v0.12 TimeSlicedOutput doesn't call #format_stream + @entries.each do |time, record| + meta = @instance.metadata(@tag, time, record) + line = @instance.format(@tag, time, record) + buffer << line + lines[meta] ||= [] + lines[meta] << line + end if @expected_buffer assert_equal(@expected_buffer, buffer) end chunks = [] - @instance.instance_eval do - @buffer.instance_eval{ @map.keys }.each do |key| - @buffer.push(key) - chunks << @buffer.instance_eval{ @queue.pop } - end - end - chunks.each { |chunk| + lines.keys.each do |meta| + chunk = @instance.buffer.generate_chunk(meta) + chunk.append(lines[meta]) begin result.push(@instance.write(chunk)) ensure chunk.purge end - } + end } result end From 33264dee8c7489384adb53557fee1c0bf47418b0 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:31:19 +0900 Subject: [PATCH 13/22] fix tests for core modules/classes with newer API --- test/plugin/test_output.rb | 18 ++++---- test/plugin/test_output_as_buffered.rb | 46 +++++++++---------- .../plugin/test_output_as_buffered_retries.rb | 40 ++++++++-------- .../test_output_as_buffered_secondary.rb | 32 ++++++------- test/test_event_router.rb | 15 +++--- test/test_output.rb | 45 ++++++++++++++---- 6 files changed, 112 insertions(+), 84 deletions(-) diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index d4eea8d60b..a867f3bfa2 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -320,7 +320,7 @@ def waiting(seconds) i.start t = event_time() - i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) + i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) assert process_called @@ -335,7 +335,7 @@ def waiting(seconds) i.start t = event_time() - i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) + i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) assert_equal 2, format_called_times @@ -357,7 +357,7 @@ def waiting(seconds) assert !i.prefer_buffered_processing t = event_time() - i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) + i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) waiting(4){ Thread.pass until process_called } @@ -382,7 +382,7 @@ def waiting(seconds) assert i.prefer_buffered_processing t = event_time() - i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) + i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) assert !process_called assert_equal 2, format_called_times @@ -399,7 +399,7 @@ def waiting(seconds) i.start t = event_time() - i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) + i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) i.force_flush waiting(4){ Thread.pass until write_called } @@ -418,7 +418,7 @@ def waiting(seconds) i.start t = event_time() - i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) + i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) i.force_flush waiting(4){ Thread.pass until try_write_called } @@ -442,7 +442,7 @@ def waiting(seconds) assert !i.prefer_delayed_commit t = event_time() - i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) + i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) i.force_flush waiting(4){ Thread.pass until write_called || try_write_called } @@ -467,7 +467,7 @@ def waiting(seconds) assert i.prefer_delayed_commit t = event_time() - i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) + i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) i.force_flush waiting(4){ Thread.pass until write_called || try_write_called } @@ -505,7 +505,7 @@ def waiting(seconds) t = event_time() es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do - @i.emit('tag', es) + @i.emit_events('tag', es) end assert_equal 5, ary.size diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index 2a598bab14..4d78b9d347 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -153,7 +153,7 @@ def waiting(seconds) es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do - @i.emit('tag.test', es) + @i.emit_events('tag.test', es) end assert_equal 10, ary.size @@ -176,14 +176,14 @@ def waiting(seconds) event_size = [tag, t, r].to_json.size # 195 (1024 / event_size).times do |i| - @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) end assert{ @i.buffer.queue.size == 0 && ary.size == 0 } staged_chunk = @i.buffer.stage[@i.buffer.stage.keys.first] assert{ staged_chunk.size != 0 } - @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) assert{ @i.buffer.queue.size > 0 || @i.buffer.dequeued.size > 0 || ary.size > 0 } @@ -209,7 +209,7 @@ def waiting(seconds) event_size = [tag, t, r].to_json.size # 195 (1024 / event_size).times do |i| - @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) end assert{ @i.buffer.queue.size == 0 && ary.size == 0 } @@ -253,7 +253,7 @@ def waiting(seconds) es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do - @i.emit('tag.test', es) + @i.emit_events('tag.test', es) end assert_equal 10, ary.size @@ -284,7 +284,7 @@ def waiting(seconds) @i.interrupt_flushes - @i.emit(tag, es) + @i.emit_events("test.tag", es) assert{ @i.buffer.stage.size == 1 } @@ -316,7 +316,7 @@ def waiting(seconds) event_size = [tag, t, r].to_json.size # 195 (1024 / event_size).times do |i| - @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) end assert{ @i.buffer.queue.size == 0 && ary.size == 0 } @@ -359,7 +359,7 @@ def waiting(seconds) es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do - @i.emit('tag.test', es) + @i.emit_events('tag.test', es) end assert_equal 10, ary.size @@ -387,7 +387,7 @@ def waiting(seconds) rand_records = rand(1..5) es = Fluent::ArrayEventStream.new([ [t, r] ] * rand_records) assert_equal rand_records, es.size - @i.emit(tag, es) + @i.emit_events("test.tag", es) assert{ @i.buffer.stage.size == 0 && (@i.buffer.queue.size == 1 || @i.buffer.dequeued.size == 1 || ary.size > 0) } @@ -410,7 +410,7 @@ def waiting(seconds) (0...10).each do |i| r["key#{i}"] = "value #{i}" end - @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) + @i.emit_events("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) @i.stop @i.before_shutdown @@ -463,7 +463,7 @@ def waiting(seconds) es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do - @i.emit('tag.test', es) + @i.emit_events('tag.test', es) end assert_equal 10, ary.size @@ -513,7 +513,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 3 } assert{ @i.write_count == 0 } @@ -592,7 +592,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 3 } assert{ @i.write_count == 0 } @@ -671,7 +671,7 @@ def waiting(seconds) es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do - @i.emit('tag.test', es) + @i.emit_events('tag.test', es) end assert_equal 10, ary.size @@ -722,7 +722,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } # test.tag.1 x1, test.tag.2 x1 @@ -813,7 +813,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } # test.tag.1 x1, test.tag.2 x1 @@ -890,7 +890,7 @@ def waiting(seconds) ]) 5.times do - @i.emit('tag.test', es) + @i.emit_events('tag.test', es) end assert_equal 10, ary.size @@ -932,7 +932,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 3 } @@ -1022,7 +1022,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 3 } @@ -1125,7 +1125,7 @@ def waiting(seconds) ]) 5.times do - @i.emit('tag.test', es) + @i.emit_events('tag.test', es) end assert_equal 10, ary.size @@ -1175,7 +1175,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } @@ -1294,7 +1294,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } @@ -1454,7 +1454,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) + @i.emit_events(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } diff --git a/test/plugin/test_output_as_buffered_retries.rb b/test/plugin/test_output_as_buffered_retries.rb index ecfd406552..55fc1d35c7 100644 --- a/test/plugin/test_output_as_buffered_retries.rb +++ b/test/plugin/test_output_as_buffered_retries.rb @@ -164,7 +164,7 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:32 -0700') Timecop.freeze( now ) @@ -203,7 +203,7 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:32 -0700') Timecop.freeze( now ) @@ -255,12 +255,12 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -317,7 +317,7 @@ def get_log_time(msg, logs) assert{ @i.buffer.stage.size == 0 } assert{ written_tags.all?{|t| t == 'test.tag.1' } } - @i.emit("test.tag.3", dummy_event_stream()) + @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") } } @@ -342,12 +342,12 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -390,7 +390,7 @@ def get_log_time(msg, logs) assert{ @i.buffer.stage.size == 0 } assert{ written_tags.all?{|t| t == 'test.tag.1' } } - @i.emit("test.tag.3", dummy_event_stream()) + @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } @@ -420,7 +420,7 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:32 -0700') Timecop.freeze( now ) @@ -463,12 +463,12 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -513,7 +513,7 @@ def get_log_time(msg, logs) chunks = @i.buffer.queue.dup - @i.emit("test.tag.3", dummy_event_stream()) + @i.emit_events("test.tag.3", dummy_event_stream()) now = @i.next_flush_time Timecop.freeze( now ) @@ -555,12 +555,12 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -604,7 +604,7 @@ def get_log_time(msg, logs) assert{ written_tags.all?{|t| t == 'test.tag.1' } } - @i.emit("test.tag.3", dummy_event_stream()) + @i.emit_events("test.tag.3", dummy_event_stream()) logs = @i.log.out.logs assert{ logs.any?{|l| l.include?("[error]: failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.") && l.include?("retry_times=10") } } @@ -650,12 +650,12 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -718,12 +718,12 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -783,7 +783,7 @@ def get_log_time(msg, logs) now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:32 -0700') Timecop.freeze( now ) diff --git a/test/plugin/test_output_as_buffered_secondary.rb b/test/plugin/test_output_as_buffered_secondary.rb index 2d5d7410b5..d4187bfcfe 100644 --- a/test/plugin/test_output_as_buffered_secondary.rb +++ b/test/plugin/test_output_as_buffered_secondary.rb @@ -198,12 +198,12 @@ def dummy_event_stream now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -260,12 +260,12 @@ def dummy_event_stream now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -323,12 +323,12 @@ def dummy_event_stream now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -397,12 +397,12 @@ def dummy_event_stream now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -472,8 +472,8 @@ def dummy_event_stream now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) @@ -554,12 +554,12 @@ def dummy_event_stream now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -622,12 +622,12 @@ def dummy_event_stream now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors @@ -690,12 +690,12 @@ def dummy_event_stream now = Time.parse('2016-04-13 18:33:30 -0700') Timecop.freeze( now ) - @i.emit("test.tag.1", dummy_event_stream()) + @i.emit_events("test.tag.1", dummy_event_stream()) now = Time.parse('2016-04-13 18:33:31 -0700') Timecop.freeze( now ) - @i.emit("test.tag.2", dummy_event_stream()) + @i.emit_events("test.tag.2", dummy_event_stream()) assert_equal 0, @i.write_count assert_equal 0, @i.num_errors diff --git a/test/test_event_router.rb b/test/test_event_router.rb index cc2bd88c45..b85e17e0db 100644 --- a/test/test_event_router.rb +++ b/test/test_event_router.rb @@ -97,7 +97,7 @@ def events(num = DEFAULT_EVENT_NUM) test 'set one output' do @pipeline.set_output(output) - @pipeline.emit('test', @es, nil) + @pipeline.emit_events('test', @es) assert_equal 1, output.events.size assert_equal 'value', output.events['test'].first['key'] end @@ -109,7 +109,7 @@ def events(num = DEFAULT_EVENT_NUM) test 'set one filer' do @pipeline.add_filter(filter) - @pipeline.emit('test', @es, nil) + @pipeline.emit_events('test', @es) assert_equal 1, output.events.size assert_equal 'value', output.events['test'].first['key'] assert_equal 0, output.events['test'].first['__test__'] @@ -117,7 +117,7 @@ def events(num = DEFAULT_EVENT_NUM) test 'set one filer with multi events' do @pipeline.add_filter(filter) - @pipeline.emit('test', events, nil) + @pipeline.emit_events('test', events) assert_equal 1, output.events.size assert_equal 5, output.events['test'].size DEFAULT_EVENT_NUM.times { |i| @@ -140,7 +140,7 @@ def event_router sub_test_case 'default collector' do test 'call default collector when no output' do assert_rr do - mock(default_collector).emit('test', is_a(OneEventStream), NullOutputChain.instance) + mock(default_collector).emit_events('test', is_a(OneEventStream)) event_router.emit('test', Engine.now, 'k' => 'v') end end @@ -149,7 +149,7 @@ def event_router event_router.add_rule('test', filter) assert_rr do # After apply Filter, EventStream becomes MultiEventStream by default - mock(default_collector).emit('test', is_a(MultiEventStream), NullOutputChain.instance) + mock(default_collector).emit_events('test', is_a(MultiEventStream)) event_router.emit('test', Engine.now, 'k' => 'v') end assert_equal 1, filter.num @@ -158,7 +158,7 @@ def event_router test "call default collector when no matched with output" do event_router.add_rule('test', output) assert_rr do - mock(default_collector).emit('dummy', is_a(OneEventStream), NullOutputChain.instance) + mock(default_collector).emit_events('dummy', is_a(OneEventStream)) event_router.emit('dummy', Engine.now, 'k' => 'v') end end @@ -166,7 +166,7 @@ def event_router test "don't call default collector when tag matched" do event_router.add_rule('test', output) assert_rr do - dont_allow(default_collector).emit('test', is_a(OneEventStream), NullOutputChain.instance) + dont_allow(default_collector).emit_events('test', is_a(OneEventStream)) event_router.emit('test', Engine.now, 'k' => 'v') end # check emit handler doesn't catch rr error @@ -224,6 +224,7 @@ def event_router test 'call handle_emits_error when emit failed' do event_router.add_rule('test', error_output) + event_router.emit('test', Engine.now, 'k' => 'v') assert_rr do mock(emit_handler).handle_emits_error('test', is_a(OneEventStream), is_a(RuntimeError)) event_router.emit('test', Engine.now, 'k' => 'v') diff --git a/test/test_output.rb b/test/test_output.rb index 21ba75c5ff..4718553944 100644 --- a/test/test_output.rb +++ b/test/test_output.rb @@ -3,7 +3,7 @@ require 'fluent/output' require 'fluent/output_chain' require 'timecop' -require 'flexmock' +require 'flexmock/test_unit' module FluentOutputTest include Fluent @@ -58,12 +58,15 @@ def test_configure d = create_driver(CONFIG + %[disable_retry_limit true]) assert_equal true, d.instance.disable_retry_limit - # retry_wait is converted to Float for calc_retry_wait - d = create_driver(CONFIG + %[retry_wait 1s]) - assert_equal Float, d.instance.retry_wait.class + #### retry_state cares it + # # retry_wait is converted to Float for calc_retry_wait + # d = create_driver(CONFIG + %[retry_wait 1s]) + # assert_equal Float, d.instance.retry_wait.class end def test_calc_retry_wait + omit "too internal test" + # default d = create_driver d.instance.retry_limit.times { @@ -82,6 +85,8 @@ def test_calc_retry_wait end def test_calc_retry_wait_with_integer_retry_wait + omit "too internal test" + d = create_driver(CONFIG + %[retry_wait 2s]) d.instance.retry_limit.times { d.instance.instance_eval { @num_errors += 1 } @@ -90,6 +95,8 @@ def test_calc_retry_wait_with_integer_retry_wait end def test_large_num_retries + omit "too internal test" + # Test that everything works properly after a very large number of # retries and we hit the expected max_retry_wait. exp_max_retry_wait = 300 @@ -135,6 +142,8 @@ def write(chunk) end def test_submit_flush_target + omit "too internal test" + # default d = create_mock_driver d.instance.start_mock @@ -188,6 +197,8 @@ def test_secondary end test "force_flush works on retrying" do + omit "too internal test" + d = create_driver(CONFIG) d.instance.start buffer = d.instance.instance_variable_get(:@buffer) @@ -236,10 +247,19 @@ def setup TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/tmp/time_sliced_output") - CONFIG = %[buffer_path #{TMP_DIR}/foo] + CONFIG = %[ + buffer_path #{TMP_DIR}/foo + time_slice_format %Y%m%d + ] + + class TimeSlicedOutputTestPlugin < Fluent::TimeSlicedOutput + def format(tag, time, record) + '' + end + end def create_driver(conf=CONFIG) - Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::TimeSlicedOutput).configure(conf, true) + Fluent::Test::TimeSlicedOutputTestDriver.new(TimeSlicedOutputTestPlugin).configure(conf, true) end sub_test_case "force_flush test" do @@ -254,6 +274,8 @@ def create_driver(conf=CONFIG) end test "force_flush immediately flushes" do + omit "too internal test" + d = create_driver(CONFIG + %[ time_format %Y%m%d%H%M%S ]) @@ -280,15 +302,20 @@ def create_driver(conf=CONFIG) test "emit with valid event" do d = create_driver d.instance.start - d.instance.emit('test', OneEventStream.new(@time.to_i, {"message" => "foo"}), NullOutputChain.instance) + if d.instance.method(:emit).arity == 3 + d.instance.emit('test', OneEventStream.new(@time.to_i, {"message" => "foo"}), NullOutputChain.instance) + else + d.instance.emit('test', OneEventStream.new(@time.to_i, {"message" => "foo"})) + end assert_equal 0, d.instance.log.logs.size end test "emit with invalid event" do d = create_driver d.instance.start - d.instance.emit('test', OneEventStream.new('string', 10), NullOutputChain.instance) - assert_equal 1, d.instance.log.logs.count { |line| line =~ /dump an error event/ } + assert_raise ArgumentError, "time must be a Fluent::EventTime (or Integer)" do + d.instance.emit_events('test', OneEventStream.new('string', 10)) + end end end end From a52e4d1c8e13294414a33a93e6029500ecad8d81 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:32:22 +0900 Subject: [PATCH 14/22] fix to use test driver for buffered output --- test/plugin/test_out_exec_filter.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/plugin/test_out_exec_filter.rb b/test/plugin/test_out_exec_filter.rb index f28bb5b2f3..c23634768f 100644 --- a/test/plugin/test_out_exec_filter.rb +++ b/test/plugin/test_out_exec_filter.rb @@ -21,7 +21,7 @@ def setup ] def create_driver(conf = CONFIG, tag = 'test') - Fluent::Test::OutputTestDriver.new(Fluent::ExecFilterOutput, tag).configure(conf) + Fluent::Test::BufferedOutputTestDriver.new(Fluent::ExecFilterOutput, tag).configure(conf) end def sed_unbuffered_support? @@ -38,6 +38,8 @@ def sed_unbuffered_option def test_configure d = create_driver + assert d.instance.instance_eval{ @overrides_format_stream } + assert_equal ["time_in","tag","k1"], d.instance.in_keys assert_equal ["time_out","tag","k2"], d.instance.out_keys assert_equal "tag", d.instance.out_tag_key From b70b5082175af2183035fb6787747afe08816d85 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 27 Apr 2016 22:32:42 +0900 Subject: [PATCH 15/22] re-implement symlink_path feature on v0.14 buffer APIs --- lib/fluent/plugin/out_file.rb | 23 ++++++++++++++++++++++- test/plugin/test_out_file.rb | 11 ++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 621a4212b0..4f96be5e66 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -52,6 +52,21 @@ class FileOutput < TimeSlicedOutput desc "Create symlink to temporary buffered file when buffer_type is file." config_param :symlink_path, :string, default: nil + module SymlinkBufferMixin + def symlink_path=(path) + @_symlink_path = path + end + + def generate_chunk(metadata) + chunk = super + latest_chunk = metadata_list.sort_by(&:timekey).last + if chunk.metadata == latest_chunk + FileUtils.ln_sf(chunk.path, @_symlink_path) + end + chunk + end + end + def initialize require 'zlib' require 'time' @@ -87,7 +102,13 @@ def configure(conf) @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) - @buffer.symlink_path = @symlink_path if @symlink_path + if @symlink_path && @buffer.respond_to?(:path) + (class << @buffer; self; end).module_eval do + prepend SymlinkBufferMixin + end + @buffer.symlink_path = @symlink_path + end + @dir_perm = system_config.dir_permission || DIR_PERMISSION @file_perm = system_config.file_permission || FILE_PERMISSION end diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index 6e35fde315..0fe34e8975 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -306,17 +306,22 @@ def test_write_with_symlink begin d.instance.start 10.times { sleep 0.05 } + time = Time.parse("2011-01-02 13:14:15 UTC").to_i es = Fluent::OneEventStream.new(time, {"a"=>1}) - d.instance.emit('tag', es, Fluent::NullOutputChain.instance) + d.instance.emit_events('tag', es) assert File.exist?(symlink_path) assert File.symlink?(symlink_path) - d.instance.enqueue_buffer + es = Fluent::OneEventStream.new(event_time("2011-01-03 14:15:16 UTC"), {"a"=>2}) + d.instance.emit_events('tag', es) - assert !File.exist?(symlink_path) + assert File.exist?(symlink_path) assert File.symlink?(symlink_path) + + meta = d.instance.metadata('tag', event_time("2011-01-03 14:15:16 UTC"), {}) + assert_equal d.instance.buffer.instance_eval{ @stage[meta].path }, File.readlink(symlink_path) ensure d.instance.shutdown FileUtils.rm_rf(symlink_path) From fc2c76d8b7f0d9450b1ac0a333db9f9c4055611d Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 28 Apr 2016 14:22:13 +0900 Subject: [PATCH 16/22] add break condition to wait checks to run --- test/plugin/test_out_exec_filter.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/plugin/test_out_exec_filter.rb b/test/plugin/test_out_exec_filter.rb index c23634768f..dd448973fb 100644 --- a/test/plugin/test_out_exec_filter.rb +++ b/test/plugin/test_out_exec_filter.rb @@ -74,6 +74,7 @@ def test_emit_1 time = Fluent::EventTime.parse("2011-01-02 13:14:15") + d.expected_emits_length = 2 d.run do d.emit({"k1"=>1}, time) d.emit({"k1"=>2}, time) @@ -99,6 +100,7 @@ def test_emit_2 time = Fluent::EventTime.parse("2011-01-02 13:14:15") + d.expected_emits_length = 2 d.run do d.emit({"k1"=>1}, time) d.emit({"k1"=>2}, time) @@ -124,6 +126,7 @@ def test_emit_3 time = Fluent::EventTime.parse("2011-01-02 13:14:15") + d.expected_emits_length = 2 d.run do d.emit({"val1"=>"sed-ed value foo"}, time) d.emit({"val1"=>"sed-ed value poo"}, time) @@ -145,6 +148,7 @@ def test_emit_3 time = Fluent::EventTime.parse("2011-01-02 13:14:15") + d.expected_emits_length = 2 d.run do d.emit({"val1"=>"sed-ed value foo"}, time) d.emit({"val1"=>"sed-ed value poo"}, time) @@ -172,6 +176,7 @@ def test_emit_4 time = Fluent::EventTime.parse("2011-01-02 13:14:15") + d.expected_emits_length = 2 d.run do d.emit({"val1"=>"sed-ed value foo"}, time) d.emit({"val1"=>"sed-ed value poo"}, time) @@ -196,6 +201,7 @@ def test_json_1 time = Fluent::EventTime.parse("2011-01-02 13:14:15") + d.expected_emits_length = 1 d.run do d.emit({"message"=>%[{"time":#{time},"tag":"t1","k1":"v1"}]}, time+10) end @@ -218,6 +224,7 @@ def test_json_with_float_time float_time = Time.parse("2011-01-02 13:14:15").to_f time = Fluent::EventTime.from_time(Time.at(float_time)) + d.expected_emits_length = 1 d.run do d.emit({"message"=>%[{"time":#{float_time},"tag":"t1","k1":"v1"}]}, time+10) end @@ -241,6 +248,7 @@ def test_json_with_time_format time_str = "28/Feb/2013 12:00:00.123456789 +0900" time = Fluent::EventTime.from_time(Time.strptime(time_str, "%d/%b/%Y %H:%M:%S.%N %z")) + d.expected_emits_length = 1 d.run do d.emit({"message"=>%[{"time":"#{time_str}","tag":"t1","k1":"v1"}]}, time+10) end From 903f4c2f0d74ff20b7163b9c471ea5e65fc16841 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 9 May 2016 13:57:11 +0900 Subject: [PATCH 17/22] update comments for correctness --- lib/fluent/compat/output.rb | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 538ccc9053..80909c6810 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -134,16 +134,6 @@ def support_in_v12_style?(feature) end end - # def configure(conf) - # super - # if @buffer_config - # raise Fluent::ConfigError, " can't be specified because this is non-buffered output plugin: '#{self.class}'" - # end - # if @secondary_config - # raise Fluent::ConfigError, " can't be specified because this is non-buffered output plugin: '#{self.class}'" - # end - # end - ## emit must be implemented in plugin # def emit(tag, es, chain) # end @@ -256,7 +246,7 @@ def configure(conf) # #write is also # This method overrides Fluent::Plugin::Output#handle_stream_simple - # because v0.12 BufferedOutput may overrides #format_stream, but original method doesn't consider about it + # because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it def handle_stream_simple(tag, es) if @overrides_format_stream meta = metadata(nil, nil, nil) @@ -440,9 +430,6 @@ def support_in_v12_style?(feature) attr_accessor :localtime - # config_section :buffer, param_name: :buffer_config, init: true, required: false, multi: false, final: true do - # config_set_default :@type, 'file2' - # end config_section :buffer, param_name: :buffer_config do config_set_default :@type, 'file2' end From 403cf5f6d990e37b341f2c00ce58ae6b338ffe5f Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 9 May 2016 14:06:35 +0900 Subject: [PATCH 18/22] add deprecation warning for "type" in secondary section --- lib/fluent/compat/output.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 80909c6810..fd75c2f49b 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -222,6 +222,7 @@ def configure(conf) if secconf if secconf['type'] && !secconf['@type'] secconf['@type'] = secconf['type'] + log.warn "'type' is deprecated, and will be ignored in v1: use '@type' instead." end end end @@ -350,6 +351,7 @@ def configure(conf) if secconf if secconf['type'] && !secconf['@type'] secconf['@type'] = secconf['type'] + log.warn "'type' is deprecated, and will be ignored in v1: use '@type' instead." end end end @@ -505,6 +507,7 @@ def configure(conf) if secconf if secconf['type'] && !secconf['@type'] secconf['@type'] = secconf['type'] + log.warn "'type' is deprecated, and will be ignored in v1: use '@type' instead." end end end From e0afba5333ef9fcdc16a5fca1f746c43f8b9868b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 9 May 2016 14:10:49 +0900 Subject: [PATCH 19/22] fix to use more explicit name --- lib/fluent/compat/output.rb | 6 +++--- lib/fluent/plugin/output.rb | 2 +- lib/fluent/plugin_helper/retry_state.rb | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index fd75c2f49b..0e7c3103d0 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -208,7 +208,7 @@ def configure(conf) if config_style == :v0 buf_params = { "flush_mode" => "fast", - "retry_type" => "expbackoff", + "retry_type" => "exponential_backoff", } PARAMS_MAP.each do |older, newer| buf_params[newer] = conf[older] if conf.has_key?(older) @@ -337,7 +337,7 @@ def configure(conf) if config_style == :v0 buf_params = { "flush_mode" => "fast", - "retry_type" => "expbackoff", + "retry_type" => "exponential_backoff", } PARAMS_MAP.each do |older, newer| buf_params[newer] = conf[older] if conf.has_key?(older) @@ -465,7 +465,7 @@ def configure(conf) buf_params = { "@type" => "file2", "flush_mode" => (conf['flush_interval'] ? "fast" : "none"), - "retry_type" => "expbackoff", + "retry_type" => "exponential_backoff", } PARAMS_MAP.each do |older, newer| buf_params[newer] = conf[older] if conf.has_key?(older) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 7405901a9c..0b4eb559e9 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -73,7 +73,7 @@ class Output < Base # expornential backoff sequence will be initialized at the time of this threshold desc 'How to wait next retry to flush buffer.' - config_param :retry_type, :enum, list: [:expbackoff, :periodic], default: :expbackoff + config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff ### Periodic -> fixed :retry_wait ### Exponencial backoff: k is number of retry times # c: constant factor, @retry_wait diff --git a/lib/fluent/plugin_helper/retry_state.rb b/lib/fluent/plugin_helper/retry_state.rb index a269d15b02..b6eaf26e4c 100644 --- a/lib/fluent/plugin_helper/retry_state.rb +++ b/lib/fluent/plugin_helper/retry_state.rb @@ -23,7 +23,7 @@ def retry_state_create( secondary: false, secondary_threshold: 0.8 ) case retry_type - when :expbackoff + when :exponential_backoff ExponentialBackOffRetry.new(title, wait, timeout, forever, max_steps, randomize, randomize_width, backoff_base, max_interval, secondary, secondary_threshold) when :periodic PeriodicRetry.new(title, wait, timeout, forever, max_steps, randomize, randomize_width, secondary, secondary_threshold) From dc9fe040518232eefb01949e600f47a745a3b6aa Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 9 May 2016 15:25:35 +0900 Subject: [PATCH 20/22] fix tests for exponential_backoff --- test/plugin/test_output_as_buffered_retries.rb | 6 +++--- .../test_output_as_buffered_secondary.rb | 4 ++-- test/plugin_helper/test_retry_state.rb | 18 +++++++++--------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/test/plugin/test_output_as_buffered_retries.rb b/test/plugin/test_output_as_buffered_retries.rb index 55fc1d35c7..eaca2cb119 100644 --- a/test/plugin/test_output_as_buffered_retries.rb +++ b/test/plugin/test_output_as_buffered_retries.rb @@ -128,7 +128,7 @@ def get_log_time(msg, logs) @i.register(:prefer_buffered_processing){ true } @i.start - assert_equal :expbackoff, @i.buffer_config.retry_type + assert_equal :exponential_backoff, @i.buffer_config.retry_type assert_equal 1, @i.buffer_config.retry_wait assert_equal 2.0, @i.buffer_config.retry_backoff_base assert !@i.buffer_config.retry_randomize @@ -231,7 +231,7 @@ def get_log_time(msg, logs) prev_write_count = @i.write_count prev_num_errors = @i.num_errors end - # expbackoff interval: 1 * 2 ** 10 == 1024 + # exponential backoff interval: 1 * 2 ** 10 == 1024 # but it should be limited by retry_max_interval=60 assert_equal 60, (@i.next_flush_time - now) end @@ -635,7 +635,7 @@ def get_log_time(msg, logs) hash = { 'flush_interval' => 1, 'flush_burst_interval' => 0.1, - 'retry_type' => :expbackoff, + 'retry_type' => :exponential_backoff, 'retry_forever' => true, 'retry_randomize' => false, 'retry_timeout' => 3600, diff --git a/test/plugin/test_output_as_buffered_secondary.rb b/test/plugin/test_output_as_buffered_secondary.rb index d4187bfcfe..38c536bb1a 100644 --- a/test/plugin/test_output_as_buffered_secondary.rb +++ b/test/plugin/test_output_as_buffered_secondary.rb @@ -608,7 +608,7 @@ def dummy_event_stream test 'primary plugin will emit event streams to secondary after retries for time of retry_timeout * retry_secondary_threshold' do written = [] - priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :expbackoff, 'retry_wait' => 1, 'retry_timeout' => 60, 'retry_randomize' => false}) + priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :exponential_backoff, 'retry_wait' => 1, 'retry_timeout' => 60, 'retry_randomize' => false}) secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'}) @i.configure(config_element('ROOT','',{},[priconf,secconf])) @i.register(:prefer_buffered_processing){ true } @@ -676,7 +676,7 @@ def dummy_event_stream end test 'exponential backoff interval will be initialized when switched to secondary' do - priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :expbackoff, 'retry_wait' => 1, 'retry_timeout' => 60, 'retry_randomize' => false}) + priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :exponential_backoff, 'retry_wait' => 1, 'retry_timeout' => 60, 'retry_randomize' => false}) secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'}) @i.configure(config_element('ROOT','',{},[priconf,secconf])) @i.register(:prefer_buffered_processing){ true } diff --git a/test/plugin_helper/test_retry_state.rb b/test/plugin_helper/test_retry_state.rb index 9cf7abe7a5..3222155296 100644 --- a/test/plugin_helper/test_retry_state.rb +++ b/test/plugin_helper/test_retry_state.rb @@ -23,13 +23,13 @@ class Dummy < Fluent::Plugin::TestBase end test 'randomize can generate value within specified +/- range' do - s = @d.retry_state_create(:t1, :expbackoff, 0.1, 30) # default enabled w/ 0.125 + s = @d.retry_state_create(:t1, :exponential_backoff, 0.1, 30) # default enabled w/ 0.125 500.times do r = s.randomize(1000) assert{ r >= 875 && r < 1125 } end - s = @d.retry_state_create(:t1, :expbackoff, 0.1, 30, randomize_width: 0.25) + s = @d.retry_state_create(:t1, :exponential_backoff, 0.1, 30, randomize_width: 0.25) 500.times do r = s.randomize(1000) assert{ r >= 750 && r < 1250 } @@ -37,7 +37,7 @@ class Dummy < Fluent::Plugin::TestBase end test 'plugin can create retry_state machine' do - s = @d.retry_state_create(:t1, :expbackoff, 0.1, 30) + s = @d.retry_state_create(:t1, :exponential_backoff, 0.1, 30) # attr_reader :title, :start, :steps, :next_time, :timeout_at, :current, :secondary_transition_at, :secondary_transition_times assert_equal :t1, s.title @@ -179,7 +179,7 @@ class Dummy < Fluent::Plugin::TestBase end test 'exponential backoff forever without randomization' do - s = @d.retry_state_create(:t11, :expbackoff, 0.1, 300, randomize: false, forever: true, backoff_base: 2) + s = @d.retry_state_create(:t11, :exponential_backoff, 0.1, 300, randomize: false, forever: true, backoff_base: 2) dummy_current_time = s.start s.override_current_time(dummy_current_time) @@ -199,7 +199,7 @@ class Dummy < Fluent::Plugin::TestBase end test 'exponential backoff with max_interval' do - s = @d.retry_state_create(:t12, :expbackoff, 0.1, 300, randomize: false, forever: true, backoff_base: 2, max_interval: 100) + s = @d.retry_state_create(:t12, :exponential_backoff, 0.1, 300, randomize: false, forever: true, backoff_base: 2, max_interval: 100) dummy_current_time = s.start s.override_current_time(dummy_current_time) @@ -228,7 +228,7 @@ class Dummy < Fluent::Plugin::TestBase end test 'exponential backoff with shorter timeout' do - s = @d.retry_state_create(:t13, :expbackoff, 1, 12, randomize: false, backoff_base: 2, max_interval: 10) + s = @d.retry_state_create(:t13, :exponential_backoff, 1, 12, randomize: false, backoff_base: 2, max_interval: 10) dummy_current_time = s.start s.override_current_time(dummy_current_time) @@ -269,7 +269,7 @@ class Dummy < Fluent::Plugin::TestBase end test 'exponential backoff with max_steps' do - s = @d.retry_state_create(:t14, :expbackoff, 1, 120, randomize: false, backoff_base: 2, max_interval: 10, max_steps: 6) + s = @d.retry_state_create(:t14, :exponential_backoff, 1, 120, randomize: false, backoff_base: 2, max_interval: 10, max_steps: 6) dummy_current_time = s.start s.override_current_time(dummy_current_time) @@ -320,7 +320,7 @@ class Dummy < Fluent::Plugin::TestBase end test 'exponential backoff retries with secondary' do - s = @d.retry_state_create(:t15, :expbackoff, 1, 100, randomize: false, backoff_base: 2, secondary: true) # threshold 0.8 + s = @d.retry_state_create(:t15, :exponential_backoff, 1, 100, randomize: false, backoff_base: 2, secondary: true) # threshold 0.8 dummy_current_time = s.start s.override_current_time(dummy_current_time) @@ -388,7 +388,7 @@ class Dummy < Fluent::Plugin::TestBase end test 'exponential backoff retries with secondary and specified threshold' do - s = @d.retry_state_create(:t16, :expbackoff, 1, 100, randomize: false, secondary: true, backoff_base: 2, secondary_threshold: 0.75) + s = @d.retry_state_create(:t16, :exponential_backoff, 1, 100, randomize: false, secondary: true, backoff_base: 2, secondary_threshold: 0.75) dummy_current_time = s.start s.override_current_time(dummy_current_time) From 0785ec53564d9ab998af00fc6841f21dcd6e1623 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 9 May 2016 15:47:37 +0900 Subject: [PATCH 21/22] for readability --- lib/fluent/compat/output.rb | 88 ++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 0e7c3103d0..2c65807e58 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -187,20 +187,20 @@ def support_in_v12_style?(feature) config_param :flush_at_shutdown, :bool, default: true - PARAMS_MAP = [ - ["buffer_type", "@type"], - ["num_threads", "flush_threads"], - ["flush_interval", "flush_interval"], - ["try_flush_interval", "flush_thread_interval"], - ["queued_chunk_flush_interval", "flush_burst_interval"], - ["disable_retry_limit", "retry_forever"], - ["retry_limit", "retry_max_times"], - ["max_retry_wait", "retry_max_interval"], - ["buffer_chunk_limit", "chunk_bytes_limit"], - ["buffer_queue_limit", "queue_length_limit"], - ["buffer_queue_full_action", nil], # TODO: implement this on fluent/plugin/buffer - ["flush_at_shutdown", "flush_at_shutdown"], - ] + PARAMS_MAP = { + "buffer_type" => "@type", + "num_threads" => "flush_threads", + "flush_interval" => "flush_interval", + "try_flush_interval" => "flush_thread_interval", + "queued_chunk_flush_interval" => "flush_burst_interval", + "disable_retry_limit" => "retry_forever", + "retry_limit" => "retry_max_times", + "max_retry_wait" => "retry_max_interval", + "buffer_chunk_limit" => "chunk_bytes_limit", + "buffer_queue_limit" => "queue_length_limit", + "buffer_queue_full_action" => nil, # TODO: implement this on fluent/plugin/buffer + "flush_at_shutdown" => "flush_at_shutdown", + } def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) @@ -316,20 +316,20 @@ def support_in_v12_style?(feature) config_set_default :time_as_integer, true - PARAMS_MAP = [ - ["buffer_type", "@type"], - ["num_threads", "flush_threads"], - ["flush_interval", "flush_interval"], - ["try_flush_interval", "flush_thread_interval"], - ["queued_chunk_flush_interval", "flush_burst_interval"], - ["disable_retry_limit", "retry_forever"], - ["retry_limit", "retry_max_times"], - ["max_retry_wait", "retry_max_interval"], - ["buffer_chunk_limit", "chunk_bytes_limit"], - ["buffer_queue_limit", "queue_length_limit"], - ["buffer_queue_full_action", nil], # TODO: implement this on fluent/plugin/buffer - ["flush_at_shutdown", "flush_at_shutdown"], - ] + PARAMS_MAP = { + "buffer_type" => "@type", + "num_threads" => "flush_threads", + "flush_interval" => "flush_interval", + "try_flush_interval" => "flush_thread_interval", + "queued_chunk_flush_interval" => "flush_burst_interval", + "disable_retry_limit" => "retry_forever", + "retry_limit" => "retry_max_times", + "max_retry_wait" => "retry_max_interval", + "buffer_chunk_limit" => "chunk_bytes_limit", + "buffer_queue_limit" => "queue_length_limit", + "buffer_queue_full_action" => nil, # TODO: implement this on fluent/plugin/buffer + "flush_at_shutdown" => "flush_at_shutdown", + } def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) @@ -436,22 +436,22 @@ def support_in_v12_style?(feature) config_set_default :@type, 'file2' end - PARAMS_MAP = [ - ["buffer_type", "@type"], - ["buffer_path", "path"], - ["num_threads", "flush_threads"], - ["flush_interval", "flush_interval"], - ["try_flush_interval", "flush_thread_interval"], - ["queued_chunk_flush_interval", "flush_burst_interval"], - ["disable_retry_limit", "retry_forever"], - ["retry_limit", "retry_max_times"], - ["max_retry_wait", "retry_max_interval"], - ["buffer_chunk_limit", "chunk_bytes_limit"], - ["buffer_queue_limit", "queue_length_limit"], - ["buffer_queue_full_action", nil], # TODO: implement this on fluent/plugin/buffer - ["flush_at_shutdown", "flush_at_shutdown"], - ["time_slice_wait", "timekey_wait"], - ] + PARAMS_MAP = { + "buffer_type" => "@type", + "buffer_path" => "path", + "num_threads" => "flush_threads", + "flush_interval" => "flush_interval", + "try_flush_interval" => "flush_thread_interval", + "queued_chunk_flush_interval" => "flush_burst_interval", + "disable_retry_limit" => "retry_forever", + "retry_limit" => "retry_max_times", + "max_retry_wait" => "retry_max_interval", + "buffer_chunk_limit" => "chunk_bytes_limit", + "buffer_queue_limit" => "queue_length_limit", + "buffer_queue_full_action" => nil, # TODO: implement this on fluent/plugin/buffer + "flush_at_shutdown" => "flush_at_shutdown", + "time_slice_wait" => "timekey_wait", + } def initialize super From a424505feced0f9104059d9e12b01974cf44ab6e Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 9 May 2016 18:42:01 +0900 Subject: [PATCH 22/22] remove/merge dup code --- lib/fluent/compat/output.rb | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 2c65807e58..362deeefd2 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -34,6 +34,12 @@ def self.secondary_section(conf) conf.elements.select{|e| e.name == 'secondary'}.first end + def self.inject_type_from_obsoleted_name(secconf, log) + if secconf['type'] && !secconf['@type'] + secconf['@type'] = secconf['type'] + log.warn "'type' is deprecated, and will be ignored in v1: use '@type' instead." + end + end end module BufferedEventStreamMixin @@ -220,10 +226,7 @@ def configure(conf) secconf = CompatOutputUtils.secondary_section(conf) if secconf - if secconf['type'] && !secconf['@type'] - secconf['@type'] = secconf['type'] - log.warn "'type' is deprecated, and will be ignored in v1: use '@type' instead." - end + CompatOutputUtils.inject_type_from_obsoleted_name(secconf, log) end end @@ -349,10 +352,7 @@ def configure(conf) secconf = CompatOutputUtils.secondary_section(conf) if secconf - if secconf['type'] && !secconf['@type'] - secconf['@type'] = secconf['type'] - log.warn "'type' is deprecated, and will be ignored in v1: use '@type' instead." - end + CompatOutputUtils.inject_type_from_obsoleted_name(secconf, log) end end @@ -505,10 +505,7 @@ def configure(conf) secconf = CompatOutputUtils.secondary_section(conf) if secconf - if secconf['type'] && !secconf['@type'] - secconf['@type'] = secconf['type'] - log.warn "'type' is deprecated, and will be ignored in v1: use '@type' instead." - end + CompatOutputUtils.inject_type_from_obsoleted_name(secconf, log) end end