From 6a8b284be962e025a654a5371a088075a96fa6e8 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 3 Mar 2015 14:36:02 +0900 Subject: [PATCH 001/111] Refactor to use Registry as Plugin dictionary, instead of original impl. * add `paths` to add optional load_path for each registory instance * add `require`s to load plugin classes in each tests --- lib/fluent/command/debug.rb | 1 - lib/fluent/engine.rb | 5 +- lib/fluent/plugin.rb | 157 +++++++--------------------- lib/fluent/registry.rb | 5 +- lib/fluent/supervisor.rb | 2 +- test/plugin/test_in_debug_agent.rb | 1 + test/plugin/test_in_dummy.rb | 1 + test/plugin/test_in_exec.rb | 1 + test/plugin/test_in_forward.rb | 1 + test/plugin/test_in_gc_stat.rb | 1 + test/plugin/test_in_http.rb | 1 + test/plugin/test_in_object_space.rb | 1 + test/plugin/test_in_status.rb | 39 +++++++ test/plugin/test_in_stream.rb | 1 + test/plugin/test_in_syslog.rb | 1 + test/plugin/test_in_tail.rb | 1 + test/plugin/test_in_tcp.rb | 1 + test/plugin/test_in_udp.rb | 1 + test/plugin/test_out_copy.rb | 1 + test/plugin/test_out_exec.rb | 1 + test/plugin/test_out_exec_filter.rb | 1 + test/plugin/test_out_file.rb | 1 + test/plugin/test_out_forward.rb | 1 + test/plugin/test_out_roundrobin.rb | 1 + test/plugin/test_out_stdout.rb | 1 + test/plugin/test_out_stream.rb | 1 + 26 files changed, 104 insertions(+), 125 deletions(-) create mode 100644 test/plugin/test_in_status.rb diff --git a/lib/fluent/command/debug.rb b/lib/fluent/command/debug.rb index 22a21d18a5..a748d518af 100644 --- a/lib/fluent/command/debug.rb +++ b/lib/fluent/command/debug.rb @@ -85,7 +85,6 @@ puts "Usage:" puts " Engine.match('some.tag').output : get an output plugin instance" puts " Engine.sources[i] : get input plugin instances" -puts " Plugin.load_plugin(type,name) : load plugin class (use this if you get DRb::DRbUnknown)" puts "" Encoding.default_internal = nil if Encoding.respond_to?(:default_internal) diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 05883ffbe4..dfe0e87393 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -45,7 +45,6 @@ def initialize def init(opts = {}) BasicSocket.do_not_reverse_lookup = true - Plugin.load_plugins if defined?(Encoding) Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal) Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external) @@ -113,8 +112,8 @@ def configure(conf) end end - def load_plugin_dir(dir) - Plugin.load_plugin_dir(dir) + def add_plugin_dir(dir) + Plugin.add_plugin_dir(dir) end def emit(tag, time, record) diff --git a/lib/fluent/plugin.rb b/lib/fluent/plugin.rb index da26d4d67c..1a59e1270f 100644 --- a/lib/fluent/plugin.rb +++ b/lib/fluent/plugin.rb @@ -15,161 +15,82 @@ # module Fluent - class PluginClass - # This class is refactored using Fluent::Registry at v0.14 - - def initialize - @input = {} - @output = {} - @filter = {} - @buffer = {} + module Plugin + SEARCH_PATHS = [] + + INPUT_REGISTRY = Registry.new(:input_type, 'fluent/plugin/in_') + OUTPUT_REGISTRY = Registry.new(:output_type, 'fluent/plugin/out_') + FILTER_REGISTRY = Registry.new(:filter_type, 'fluent/plugin/filter_') + BUFFER_REGISTRY = Registry.new(:buffer_type, 'fluent/plugin/buf_') + + REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY] + + def self.add_plugin_dir(dir) + REGISTRIES.each do |r| + r.paths.push(dir) + end + nil end - def register_input(type, klass) - register_impl('input', @input, type, klass) + def self.register_input(type, klass) + register_impl('input', INPUT_REGISTRY, type, klass) end - def register_output(type, klass) - register_impl('output', @output, type, klass) + def self.register_output(type, klass) + register_impl('output', OUTPUT_REGISTRY, type, klass) end - def register_filter(type, klass) - register_impl('filter', @filter, type, klass) + def self.register_filter(type, klass) + register_impl('filter', FILTER_REGISTRY, type, klass) end - def register_buffer(type, klass) - register_impl('buffer', @buffer, type, klass) + def self.register_buffer(type, klass) + register_impl('buffer', BUFFER_REGISTRY, type, klass) end - def register_parser(type, klass) + def self.register_parser(type, klass) TextParser.register_template(type, klass) end - def register_formatter(type, klass) + def self.register_formatter(type, klass) TextFormatter.register_template(type, klass) end - def new_input(type) - new_impl('input', @input, type) + def self.new_input(type) + new_impl('input', INPUT_REGISTRY, type) end - def new_output(type) - new_impl('output', @output, type) + def self.new_output(type) + new_impl('output', OUTPUT_REGISTRY, type) end - def new_filter(type) - new_impl('filter', @filter, type) + def self.new_filter(type) + new_impl('filter', FILTER_REGISTRY, type) end - def new_buffer(type) - new_impl('buffer', @buffer, type) + def self.new_buffer(type) + new_impl('buffer', BUFFER_REGISTRY, type) end - def new_parser(type) + def self.new_parser(type) TextParser.lookup(type) end - def new_formatter(type) + def self.new_formatter(type) TextFormatter.lookup(type) end - def load_plugins - dir = File.join(File.dirname(__FILE__), "plugin") - load_plugin_dir(dir) - end - - def load_plugin_dir(dir) - dir = File.expand_path(dir) - Dir.entries(dir).sort.each {|fname| - if fname =~ /\.rb$/ - require File.join(dir, fname) - end - } - nil - end - - def load_plugin(type, name) - try_load_plugin(name, type) - end - - def lookup_name_from_class(klass_or_str) - klass = if klass_or_str.class == String - eval(klass_or_str) # const_get can't handle A::B - else - klass_or_str - end - - @input.each { |name, plugin| - return name if plugin == klass - } - @output.each { |name, plugin| - return name if plugin == klass - } - @filter.each { |name, plugin| - return name if plugin == klass - } - - nil - end - - private - def register_impl(name, map, type, klass) - map[type] = klass + def self.register_impl(name, registry, type, klass) + registry.register(type, klass) $log.trace { "registered #{name} plugin '#{type}'" } nil end - def new_impl(name, map, type) - if klass = map[type] - return klass.new - end - try_load_plugin(name, type) - if klass = map[type] + def self.new_impl(name, registry, type) + if klass = registry.lookup(type) return klass.new end raise ConfigError, "Unknown #{name} plugin '#{type}'. Run 'gem search -rd fluent-plugin' to find plugins" end - - def try_load_plugin(name, type) - case name - when 'input' - path = "fluent/plugin/in_#{type}" - when 'output' - path = "fluent/plugin/out_#{type}" - when 'filter' - path = "fluent/plugin/filter_#{type}" - when 'buffer' - path = "fluent/plugin/buf_#{type}" - else - return - end - - # prefer LOAD_PATH than gems - files = $LOAD_PATH.map {|lp| - lpath = File.join(lp, "#{path}.rb") - File.exist?(lpath) ? lpath : nil - }.compact - unless files.empty? - # prefer newer version - require File.expand_path(files.sort.last) - return - end - - # search gems - specs = Gem::Specification.find_all { |spec| - spec.contains_requirable_file? path - } - - # prefer newer version - specs = specs.sort_by { |spec| spec.version } - if spec = specs.last - spec.require_paths.each { |lib| - file = "#{spec.full_gem_path}/#{lib}/#{path}" - require file - } - end - end end - - Plugin = PluginClass.new end diff --git a/lib/fluent/registry.rb b/lib/fluent/registry.rb index e5e1c87188..929df3ce30 100644 --- a/lib/fluent/registry.rb +++ b/lib/fluent/registry.rb @@ -22,9 +22,10 @@ def initialize(kind, search_prefix) @kind = kind @search_prefix = search_prefix @map = {} + @paths = [] end - attr_reader :kind + attr_reader :kind, :paths def register(type, value) type = type.to_sym @@ -47,7 +48,7 @@ def search(type) path = "#{@search_prefix}#{type}" # prefer LOAD_PATH than gems - files = $LOAD_PATH.map { |lp| + files = ($LOAD_PATH + @paths).map { |lp| lpath = File.expand_path(File.join(lp, "#{path}.rb")) File.exist?(lpath) ? lpath : nil }.compact diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index d3f88c58c7..94f5b494b3 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -595,7 +595,7 @@ def init_engine @plugin_dirs.each {|dir| if Dir.exist?(dir) dir = File.expand_path(dir) - Fluent::Engine.load_plugin_dir(dir) + Fluent::Engine.add_plugin_dir(dir) end } end diff --git a/test/plugin/test_in_debug_agent.rb b/test/plugin/test_in_debug_agent.rb index 4ca94eb682..3de87e0461 100644 --- a/test/plugin/test_in_debug_agent.rb +++ b/test/plugin/test_in_debug_agent.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fileutils' +require 'fluent/plugin/in_debug_agent' class DebugAgentInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_dummy.rb b/test/plugin/test_in_dummy.rb index d2bf159bc0..bea015c35c 100644 --- a/test/plugin/test_in_dummy.rb +++ b/test/plugin/test_in_dummy.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/in_dummy' class DummyTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index 34c7abcea1..3836069441 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/test' require 'net/http' +require 'fluent/plugin/in_exec' class ExecInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 36d5b41d18..d8a6c14a22 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/test' require 'base64' +require 'fluent/plugin/in_forward' class ForwardInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_gc_stat.rb b/test/plugin/test_in_gc_stat.rb index ff0638251f..2602104bd2 100644 --- a/test/plugin/test_in_gc_stat.rb +++ b/test/plugin/test_in_gc_stat.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/in_gc_stat' class GCStatInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 0ecc0706d4..92971af217 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/test' require 'net/http' +require 'fluent/plugin/in_http' class HttpInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_object_space.rb b/test/plugin/test_in_object_space.rb index fc370e9529..7a3cc4d2a4 100644 --- a/test/plugin/test_in_object_space.rb +++ b/test/plugin/test_in_object_space.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/in_object_space' class ObjectSpaceInputTest < Test::Unit::TestCase class FailObject diff --git a/test/plugin/test_in_status.rb b/test/plugin/test_in_status.rb new file mode 100644 index 0000000000..d93b61b8dc --- /dev/null +++ b/test/plugin/test_in_status.rb @@ -0,0 +1,39 @@ +require_relative '../helper' +require 'fluent/test' +require 'fluent/plugin/in_status' + +class StatusInputTest < Test::Unit::TestCase + def setup + Fluent::Test.setup + end + + CONFIG = %[ + emit_interval 1 + tag t1 + ] + + def create_driver(conf=CONFIG) + Fluent::Test::InputTestDriver.new(Fluent::StatusInput).configure(conf) + end + + def test_configure + d = create_driver + assert_equal(1, d.instance.emit_interval) + assert_equal("t1", d.instance.tag) + end + + def test_emit + stub(Fluent::Status).each { |b| + b.call("answer" => "42") + } + + d = create_driver + d.run do + sleep 2 + end + + emits = d.emits + assert(emits.length > 0) + assert_equal({"answer" => "42"}, emits[0][2]) + end +end diff --git a/test/plugin/test_in_stream.rb b/test/plugin/test_in_stream.rb index 2501832f17..462eb24308 100644 --- a/test/plugin/test_in_stream.rb +++ b/test/plugin/test_in_stream.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/in_stream' module StreamInputTest def setup diff --git a/test/plugin/test_in_syslog.rb b/test/plugin/test_in_syslog.rb index d65c4afd32..730a3f37f8 100755 --- a/test/plugin/test_in_syslog.rb +++ b/test/plugin/test_in_syslog.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/in_syslog' class SyslogInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 21f40ff7b2..30dc15165f 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -2,6 +2,7 @@ require 'fluent/test' require 'net/http' require 'flexmock' +require 'fluent/plugin/in_tail' class TailInputTest < Test::Unit::TestCase include FlexMock::TestCase diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb index 0b89da8c9e..a5cbfb79e7 100755 --- a/test/plugin/test_in_tcp.rb +++ b/test/plugin/test_in_tcp.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/in_tcp' class TcpInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_udp.rb b/test/plugin/test_in_udp.rb index 1d4d46284f..3d8f60db32 100755 --- a/test/plugin/test_in_udp.rb +++ b/test/plugin/test_in_udp.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/in_udp' class UdpInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb index 04725d198c..52c947d4b0 100644 --- a/test/plugin/test_out_copy.rb +++ b/test/plugin/test_out_copy.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/out_copy' class CopyOutputTest < Test::Unit::TestCase class << self diff --git a/test/plugin/test_out_exec.rb b/test/plugin/test_out_exec.rb index 3b8a8a25af..f76f223197 100644 --- a/test/plugin/test_out_exec.rb +++ b/test/plugin/test_out_exec.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/test' require 'fileutils' +require 'fluent/plugin/out_exec' class ExecOutputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_out_exec_filter.rb b/test/plugin/test_out_exec_filter.rb index 00d888dd5c..2a7b72a6e2 100644 --- a/test/plugin/test_out_exec_filter.rb +++ b/test/plugin/test_out_exec_filter.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/test' require 'fileutils' +require 'fluent/plugin/out_exec_filter' class ExecFilterOutputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index daa50fc5da..6a05c99c28 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -2,6 +2,7 @@ require 'fluent/test' require 'fileutils' require 'time' +require 'fluent/plugin/out_file' class FileOutputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index b3f60bd06a..199dbd285b 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/out_forward' class ForwardOutputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_out_roundrobin.rb b/test/plugin/test_out_roundrobin.rb index 76c87460ab..56722f0174 100644 --- a/test/plugin/test_out_roundrobin.rb +++ b/test/plugin/test_out_roundrobin.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/out_roundrobin' class RoundRobinOutputTest < Test::Unit::TestCase class << self diff --git a/test/plugin/test_out_stdout.rb b/test/plugin/test_out_stdout.rb index ba07c33083..49e0567057 100644 --- a/test/plugin/test_out_stdout.rb +++ b/test/plugin/test_out_stdout.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/out_stdout' class StdoutOutputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_out_stream.rb b/test/plugin/test_out_stream.rb index b105411a5c..f79d51cf86 100644 --- a/test/plugin/test_out_stream.rb +++ b/test/plugin/test_out_stream.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/out_stream' module StreamOutputTest def setup From ba6120288b4de3e0d7f73623b72946381e576cbb Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 3 Mar 2015 16:50:14 +0900 Subject: [PATCH 002/111] Plugin base class change: Fluent::** -> Fluent::Plugin::** * and separate each output plugin classes into files * Fluent::** are preserved for compatibility of traditional plugins --- lib/fluent/input.rb | 31 +- lib/fluent/output.rb | 587 ++------------------ lib/fluent/plugin/buffered_output.rb | 360 ++++++++++++ lib/fluent/plugin/input.rb | 53 ++ lib/fluent/plugin/object_buffered_output.rb | 65 +++ lib/fluent/plugin/output.rb | 65 +++ lib/fluent/plugin/time_sliced_output.rb | 146 +++++ 7 files changed, 726 insertions(+), 581 deletions(-) create mode 100644 lib/fluent/plugin/buffered_output.rb create mode 100644 lib/fluent/plugin/input.rb create mode 100644 lib/fluent/plugin/object_buffered_output.rb create mode 100644 lib/fluent/plugin/output.rb create mode 100644 lib/fluent/plugin/time_sliced_output.rb diff --git a/lib/fluent/input.rb b/lib/fluent/input.rb index 10a6022bae..e9b37cad7f 100644 --- a/lib/fluent/input.rb +++ b/lib/fluent/input.rb @@ -14,33 +14,10 @@ # limitations under the License. # -module Fluent - class Input - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - - def initialize - super - end - - def configure(conf) - super +require 'fluent/plugin/input' - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end - end - - def start - end - - def shutdown - end +module Fluent + class Input < Plugin::Input + # TODO: add interoperability layer end end diff --git a/lib/fluent/output.rb b/lib/fluent/output.rb index c5873f1ca9..fda7f3839a 100644 --- a/lib/fluent/output.rb +++ b/lib/fluent/output.rb @@ -14,7 +14,40 @@ # limitations under the License. # +require 'fluent/plugin/output' +require 'fluent/plugin/buffered_output' +require 'fluent/plugin/object_buffered_output' +require 'fluent/plugin/time_sliced_output' + +# This classes are for compatibility. +# Fluent::Input (or other plugin base classes) are obsolete in v0.14. + module Fluent + class Output < Plugin::Output + # TODO: add interoperability layer (especially for chain) + end + + + class BufferedOutput < Plugin::BufferedOutput + # TODO: add interoperability layer (especially for chain) + end + + class ObjectBufferedOutput < Plugin::ObjectBufferedOutput + # TODO: add interoperability layer (especially for chain) + end + + class TimeSlicedOutput < Plugin::TimeSlicedOutput + # TODO: add interoperability layer (especially for chain) + end + + class MultiOutput < Output + #def outputs + # # TODO + #end + end + + # Output Chain does nothing currently. + # These will be removed at v1. class OutputChain def initialize(array, tag, es, chain=NullOutputChain.instance) @array = array @@ -53,559 +86,5 @@ class NullOutputChain def next end end - - - class Output - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - - def initialize - super - end - - def configure(conf) - super - - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end - end - - def start - end - - def shutdown - end - - #def emit(tag, es, chain) - #end - - def secondary_init(primary) - if primary.class != self.class - $log.warn "type of secondary output should be same as primary output", :primary=>primary.class.to_s, :secondary=>self.class.to_s - end - end - - def inspect; "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__<<1)] end - end - - - class OutputThread - def initialize(output) - @output = output - @finish = false - @next_time = Time.now.to_f + 1.0 - end - - def configure(conf) - end - - def start - @mutex = Mutex.new - @cond = ConditionVariable.new - @thread = Thread.new(&method(:run)) - end - - def shutdown - @finish = true - @mutex.synchronize { - @cond.signal - } - Thread.pass - @thread.join - end - - def submit_flush - @mutex.synchronize { - @next_time = 0 - @cond.signal - } - Thread.pass - end - - private - def run - @mutex.lock - begin - until @finish - time = Time.now.to_f - - if @next_time <= time - @mutex.unlock - begin - @next_time = @output.try_flush - ensure - @mutex.lock - end - next_wait = @next_time - Time.now.to_f - else - next_wait = @next_time - time - end - - cond_wait(next_wait) if next_wait > 0 - end - ensure - @mutex.unlock - end - rescue - $log.error "error on output thread", :error=>$!.to_s - $log.error_backtrace - raise - ensure - @mutex.synchronize { - @output.before_shutdown - } - end - - def cond_wait(sec) - @cond.wait(@mutex, sec) - end - end - - - class BufferedOutput < Output - def initialize - super - @next_flush_time = 0 - @last_retry_time = 0 - @next_retry_time = 0 - @num_errors = 0 - @num_errors_lock = Mutex.new - @secondary_limit = 8 - @emit_count = 0 - end - - desc 'The buffer type (memory, file)' - config_param :buffer_type, :string, :default => 'memory' - desc 'The interval between data flushes.' - config_param :flush_interval, :time, :default => 60 - config_param :try_flush_interval, :float, :default => 1 - desc 'If true, the value of `retry_value` is ignored and there is no limit' - config_param :disable_retry_limit, :bool, :default => false - desc 'The limit on the number of retries before buffered data is discarded' - config_param :retry_limit, :integer, :default => 17 - desc 'The initial intervals between write retries.' - config_param :retry_wait, :time, :default => 1.0 - desc 'The maximum intervals between write retries.' - config_param :max_retry_wait, :time, :default => nil - desc 'The number of threads to flush the buffer.' - config_param :num_threads, :integer, :default => 1 - desc 'The interval between data flushes for queued chunk.' - config_param :queued_chunk_flush_interval, :time, :default => 1 - - def configure(conf) - super - - @retry_wait = @retry_wait.to_f # converted to Float for calc_retry_wait - @buffer = Plugin.new_buffer(@buffer_type) - @buffer.configure(conf) - - if @buffer.respond_to?(:enable_parallel) - if @num_threads == 1 - @buffer.enable_parallel(false) - else - @buffer.enable_parallel(true) - end - end - - @writers = (1..@num_threads).map { - writer = OutputThread.new(self) - writer.configure(conf) - writer - } - - if sconf = conf.elements.select {|e| e.name == 'secondary' }.first - type = sconf['@type'] || conf['@type'] || sconf['type'] || conf['type'] - @secondary = Plugin.new_output(type) - @secondary.router = router - @secondary.configure(sconf) - - if secondary_limit = conf['secondary_limit'] - @secondary_limit = secondary_limit.to_i - if @secondary_limit < 0 - raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'" - end - end - - @secondary.secondary_init(self) - end - end - - def start - @next_flush_time = Time.now.to_f + @flush_interval - @buffer.start - @secondary.start if @secondary - @writers.each {|writer| writer.start } - @writer_current_position = 0 - @writers_size = @writers.size - end - - def shutdown - @writers.each {|writer| writer.shutdown } - @secondary.shutdown if @secondary - @buffer.shutdown - end - - def emit(tag, es, chain, key="") - @emit_count += 1 - data = format_stream(tag, es) - if @buffer.emit(key, data, chain) - submit_flush - end - end - - def submit_flush - # Without locks: it is rough but enough to select "next" writer selection - @writer_current_position = (@writer_current_position + 1) % @writers_size - @writers[@writer_current_position].submit_flush - end - - def format_stream(tag, es) - out = '' - es.each {|time,record| - out << format(tag, time, record) - } - out - end - - #def format(tag, time, record) - #end - - #def write(chunk) - #end - - def enqueue_buffer(force = false) - @buffer.keys.each {|key| - @buffer.push(key) - } - end - - def try_flush - time = Time.now.to_f - - empty = @buffer.queue_size == 0 - if empty && @next_flush_time < (now = Time.now.to_f) - @buffer.synchronize do - if @next_flush_time < now - enqueue_buffer - @next_flush_time = now + @flush_interval - empty = @buffer.queue_size == 0 - end - end - end - if empty - return time + @try_flush_interval - end - - begin - retrying = !@num_errors.zero? - - if retrying - @num_errors_lock.synchronize do - if retrying = !@num_errors.zero? # re-check in synchronize - if @next_retry_time >= time - # allow retrying for only one thread - return time + @try_flush_interval - end - # assume next retry failes and - # clear them if when it succeeds - @last_retry_time = time - @num_errors += 1 - @next_retry_time += calc_retry_wait - end - end - end - - if @secondary && !@disable_retry_limit && @num_errors > @retry_limit - has_next = flush_secondary(@secondary) - else - has_next = @buffer.pop(self) - end - - # success - if retrying - @num_errors = 0 - # Note: don't notify to other threads to prevent - # burst to recovered server - $log.warn "retry succeeded.", :plugin_id=>plugin_id - end - - if has_next - return Time.now.to_f + @queued_chunk_flush_interval - else - return time + @try_flush_interval - end - - rescue => e - if retrying - error_count = @num_errors - else - # first error - error_count = 0 - @num_errors_lock.synchronize do - if @num_errors.zero? - @last_retry_time = time - @num_errors += 1 - @next_retry_time = time + calc_retry_wait - end - end - end - - if @disable_retry_limit || error_count < @retry_limit - $log.warn "temporarily failed to flush the buffer.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn_backtrace e.backtrace - - elsif @secondary - if error_count == @retry_limit - $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn "retry count exceededs limit. falling back to secondary output." - $log.warn_backtrace e.backtrace - retry # retry immediately - elsif error_count <= @retry_limit + @secondary_limit - $log.warn "failed to flush the buffer, next retry will be with secondary output.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn_backtrace e.backtrace - else - $log.warn "failed to flush the buffer.", :error_class=>e.class, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn "secondary retry count exceededs limit." - $log.warn_backtrace e.backtrace - write_abort - @num_errors = 0 - end - - else - $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn "retry count exceededs limit." - $log.warn_backtrace e.backtrace - write_abort - @num_errors = 0 - end - - return @next_retry_time - end - end - - def force_flush - @num_errors_lock.synchronize do - @next_retry_time = Time.now.to_f - 1 - end - enqueue_buffer(true) - submit_flush - end - - def before_shutdown - begin - @buffer.before_shutdown(self) - rescue - $log.warn "before_shutdown failed", :error=>$!.to_s - $log.warn_backtrace - end - end - - def calc_retry_wait - # TODO retry pattern - wait = if @disable_retry_limit || @num_errors <= @retry_limit - @retry_wait * (2 ** (@num_errors - 1)) - else - # secondary retry - @retry_wait * (2 ** (@num_errors - 2 - @retry_limit)) - end - retry_wait = wait.finite? ? wait + (rand * (wait / 4.0) - (wait / 8.0)) : wait - @max_retry_wait ? [retry_wait, @max_retry_wait].min : retry_wait - end - - def write_abort - $log.error "throwing away old logs." - begin - @buffer.clear! - rescue - $log.error "unexpected error while aborting", :error=>$!.to_s - $log.error_backtrace - end - end - - def flush_secondary(secondary) - @buffer.pop(secondary) - end - end - - - class ObjectBufferedOutput < BufferedOutput - config_param :time_as_integer, :bool, :default => true - - def initialize - super - end - - def emit(tag, es, chain) - @emit_count += 1 - if @time_as_integer - data = es.to_msgpack_stream_forced_integer - else - data = es.to_msgpack_stream - end - key = tag - if @buffer.emit(key, data, chain) - submit_flush - end - end - - module BufferedEventStreamMixin - include Enumerable - - def repeatable? - true - end - - def each(&block) - msgpack_each(&block) - end - - def to_msgpack_stream - read - end - end - - def write(chunk) - chunk.extend(BufferedEventStreamMixin) - write_objects(chunk.key, chunk) - end - end - - - class TimeSlicedOutput < BufferedOutput - require 'fluent/timezone' - - def initialize - super - @localtime = true - #@ignore_old = false # TODO - end - - desc 'The time format used as part of the file name.' - config_param :time_slice_format, :string, :default => '%Y%m%d' - desc 'The amount of time Fluentd will wait for old logs to arrive.' - config_param :time_slice_wait, :time, :default => 10*60 - desc 'Parse the time value in the specified timezone' - config_param :timezone, :string, :default => nil - config_set_default :buffer_type, 'file' # overwrite default buffer_type - config_set_default :buffer_chunk_limit, 256*1024*1024 # overwrite default buffer_chunk_limit - config_set_default :flush_interval, nil - - attr_accessor :localtime - attr_reader :time_slicer # for test - - def configure(conf) - super - - if conf['utc'] - @localtime = false - elsif conf['localtime'] - @localtime = true - end - - if conf['timezone'] - @timezone = conf['timezone'] - Fluent::Timezone.validate!(@timezone) - end - - if @timezone - @time_slicer = Timezone.formatter(@timezone, @time_slice_format) - elsif @localtime - @time_slicer = Proc.new {|time| - Time.at(time).strftime(@time_slice_format) - } - else - @time_slicer = Proc.new {|time| - Time.at(time).utc.strftime(@time_slice_format) - } - end - - @time_slice_cache_interval = time_slice_cache_interval - @before_tc = nil - @before_key = nil - - if @flush_interval - if conf['time_slice_wait'] - $log.warn "time_slice_wait is ignored if flush_interval is specified: #{conf}" - end - @enqueue_buffer_proc = Proc.new do - @buffer.keys.each {|key| - @buffer.push(key) - } - end - - else - @flush_interval = [60, @time_slice_cache_interval].min - @enqueue_buffer_proc = Proc.new do - nowslice = @time_slicer.call(Time.now - @time_slice_wait) - @buffer.keys.each {|key| - if key < nowslice - @buffer.push(key) - end - } - end - end - end - - def emit(tag, es, chain) - @emit_count += 1 - formatted_data = {} - es.each {|time,record| - tc = time / @time_slice_cache_interval - if @before_tc == tc - key = @before_key - else - @before_tc = tc - key = @time_slicer.call(time) - @before_key = key - end - formatted_data[key] ||= '' - formatted_data[key] << format(tag, time, record) - } - formatted_data.each { |key, data| - if @buffer.emit(key, data, chain) - submit_flush - end - } - end - - def enqueue_buffer(force = false) - if force - @buffer.keys.each {|key| - @buffer.push(key) - } - else - @enqueue_buffer_proc.call - end - end - - #def format(tag, event) - #end - - private - def time_slice_cache_interval - if @time_slicer.call(0) != @time_slicer.call(60-1) - return 1 - elsif @time_slicer.call(0) != @time_slicer.call(60*60-1) - return 30 - elsif @time_slicer.call(0) != @time_slicer.call(24*60*60-1) - return 60*30 - else - return 24*60*30 - end - end - end - - - class MultiOutput < Output - #def outputs - #end - end end diff --git a/lib/fluent/plugin/buffered_output.rb b/lib/fluent/plugin/buffered_output.rb new file mode 100644 index 0000000000..79bd2cabe2 --- /dev/null +++ b/lib/fluent/plugin/buffered_output.rb @@ -0,0 +1,360 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/output' + +module Fluent + module Plugin + class OutputThread + def initialize(output) + @output = output + @finish = false + @next_time = Engine.now + 1.0 + end + + def configure(conf) + end + + def start + @mutex = Mutex.new + @cond = ConditionVariable.new + @thread = Thread.new(&method(:run)) + end + + def shutdown + @finish = true + @mutex.synchronize { + @cond.signal + } + Thread.pass + @thread.join + end + + def submit_flush + @mutex.synchronize { + @next_time = 0 + @cond.signal + } + Thread.pass + end + + private + def run + @mutex.lock + begin + until @finish + time = Engine.now + + if @next_time <= time + @mutex.unlock + begin + @next_time = @output.try_flush + ensure + @mutex.lock + end + next_wait = @next_time - Engine.now + else + next_wait = @next_time - time + end + + cond_wait(next_wait) if next_wait > 0 + end + ensure + @mutex.unlock + end + rescue + $log.error "error on output thread", :error=>$!.to_s + $log.error_backtrace + raise + ensure + @mutex.synchronize { + @output.before_shutdown + } + end + + def cond_wait(sec) + @cond.wait(@mutex, sec) + end + end + + class BufferedOutput < Output + def initialize + super + @next_flush_time = 0 + @last_retry_time = 0 + @next_retry_time = 0 + @num_errors = 0 + @num_errors_lock = Mutex.new + @secondary_limit = 8 + @emit_count = 0 + end + + desc 'The buffer type (memory, file)' + config_param :buffer_type, :string, :default => 'memory' + desc 'The interval between data flushes.' + config_param :flush_interval, :time, :default => 60 + config_param :try_flush_interval, :float, :default => 1 + desc 'If true, the value of `retry_value` is ignored and there is no limit' + config_param :disable_retry_limit, :bool, :default => false + desc 'The limit on the number of retries before buffered data is discarded' + config_param :retry_limit, :integer, :default => 17 + desc 'The initial intervals between write retries.' + config_param :retry_wait, :time, :default => 1.0 + desc 'The maximum intervals between write retries.' + config_param :max_retry_wait, :time, :default => nil + desc 'The number of threads to flush the buffer.' + config_param :num_threads, :integer, :default => 1 + desc 'The interval between data flushes for queued chunk.' + config_param :queued_chunk_flush_interval, :time, :default => 1 + + def configure(conf) + super + + @retry_wait = @retry_wait.to_f # converted to Float for calc_retry_wait + @buffer = Plugin.new_buffer(@buffer_type) + @buffer.configure(conf) + + if @buffer.respond_to?(:enable_parallel) + if @num_threads == 1 + @buffer.enable_parallel(false) + else + @buffer.enable_parallel(true) + end + end + + @writers = (1..@num_threads).map { + writer = OutputThread.new(self) + writer.configure(conf) + writer + } + + if sconf = conf.elements.select {|e| e.name == 'secondary' }.first + type = sconf['@type'] || conf['@type'] || sconf['type'] || conf['type'] + @secondary = Plugin.new_output(type) + @secondary.router = router + @secondary.configure(sconf) + + if secondary_limit = conf['secondary_limit'] + @secondary_limit = secondary_limit.to_i + if @secondary_limit < 0 + raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'" + end + end + + @secondary.secondary_init(self) + end + end + + def start + @next_flush_time = Engine.now + @flush_interval + @buffer.start + @secondary.start if @secondary + @writers.each {|writer| writer.start } + @writer_current_position = 0 + @writers_size = @writers.size + end + + def shutdown + @writers.each {|writer| writer.shutdown } + @secondary.shutdown if @secondary + @buffer.shutdown + end + + def emit(tag, es, chain, key="") + @emit_count += 1 + data = format_stream(tag, es) + if @buffer.emit(key, data, chain) + submit_flush + end + end + + def submit_flush + # Without locks: it is rough but enough to select "next" writer selection + @writer_current_position = (@writer_current_position + 1) % @writers_size + @writers[@writer_current_position].submit_flush + end + + def format_stream(tag, es) + out = '' + es.each {|time,record| + out << format(tag, time, record) + } + out + end + + #def format(tag, time, record) + # # TODO + #end + + #def write(chunk) + # # TODO + #end + + def enqueue_buffer(force = false) + @buffer.keys.each {|key| + @buffer.push(key) + } + end + + def try_flush + time = Engine.now + + empty = @buffer.queue_size == 0 + if empty && @next_flush_time < (now = Engine.now) + @buffer.synchronize do + if @next_flush_time < now + enqueue_buffer + @next_flush_time = now + @flush_interval + empty = @buffer.queue_size == 0 + end + end + end + if empty + return time + @try_flush_interval + end + + begin + retrying = !@num_errors.zero? + + if retrying + @num_errors_lock.synchronize do + if retrying = !@num_errors.zero? # re-check in synchronize + if @next_retry_time >= time + # allow retrying for only one thread + return time + @try_flush_interval + end + # assume next retry failes and + # clear them if when it succeeds + @last_retry_time = time + @num_errors += 1 + @next_retry_time += calc_retry_wait + end + end + end + + if @secondary && !@disable_retry_limit && @num_errors > @retry_limit + has_next = flush_secondary(@secondary) + else + has_next = @buffer.pop(self) + end + + # success + if retrying + @num_errors = 0 + # Note: don't notify to other threads to prevent + # burst to recovered server + $log.warn "retry succeeded.", :plugin_id=>plugin_id + end + + if has_next + return Engine.now + @queued_chunk_flush_interval + else + return time + @try_flush_interval + end + + rescue => e + if retrying + error_count = @num_errors + else + # first error + error_count = 0 + @num_errors_lock.synchronize do + if @num_errors.zero? + @last_retry_time = time + @num_errors += 1 + @next_retry_time = time + calc_retry_wait + end + end + end + + if @disable_retry_limit || error_count < @retry_limit + $log.warn "temporarily failed to flush the buffer.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id + $log.warn_backtrace e.backtrace + + elsif @secondary + if error_count == @retry_limit + $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id + $log.warn "retry count exceededs limit. falling back to secondary output." + $log.warn_backtrace e.backtrace + retry # retry immediately + elsif error_count <= @retry_limit + @secondary_limit + $log.warn "failed to flush the buffer, next retry will be with secondary output.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id + $log.warn_backtrace e.backtrace + else + $log.warn "failed to flush the buffer.", :error_class=>e.class, :error=>e.to_s, :plugin_id=>plugin_id + $log.warn "secondary retry count exceededs limit." + $log.warn_backtrace e.backtrace + write_abort + @num_errors = 0 + end + + else + $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id + $log.warn "retry count exceededs limit." + $log.warn_backtrace e.backtrace + write_abort + @num_errors = 0 + end + + return @next_retry_time + end + end + + def force_flush + @num_errors_lock.synchronize do + @next_retry_time = Engine.now - 1 + end + enqueue_buffer(true) + submit_flush + end + + def before_shutdown + begin + @buffer.before_shutdown(self) + rescue + $log.warn "before_shutdown failed", :error=>$!.to_s + $log.warn_backtrace + end + end + + def calc_retry_wait + # TODO retry pattern + wait = if @disable_retry_limit || @num_errors <= @retry_limit + @retry_wait * (2 ** (@num_errors - 1)) + else + # secondary retry + @retry_wait * (2 ** (@num_errors - 2 - @retry_limit)) + end + retry_wait = wait.finite? ? wait + (rand * (wait / 4.0) - (wait / 8.0)) : wait + @max_retry_wait ? [retry_wait, @max_retry_wait].min : retry_wait + end + + def write_abort + $log.error "throwing away old logs." + begin + @buffer.clear! + rescue + $log.error "unexpected error while aborting", :error=>$!.to_s + $log.error_backtrace + end + end + + def flush_secondary(secondary) + @buffer.pop(secondary) + end + end + end +end diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb new file mode 100644 index 0000000000..44a7fc8e0b --- /dev/null +++ b/lib/fluent/plugin/input.rb @@ -0,0 +1,53 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin' +require 'fluent/configurable' +require 'fluent/config' # for PluginId +require 'fluent/log' + +module Fluent + module Plugin + class Input + include Configurable + include PluginId + include PluginLoggerMixin + + attr_accessor :router + + def initialize + super + end + + def configure(conf) + super + + if label_name = conf['@label'] + label = Engine.root_agent.find_label(label_name) + @router = label.event_router + elsif @router.nil? + @router = Engine.root_agent.event_router + end + end + + def start + end + + def shutdown + end + end + end +end diff --git a/lib/fluent/plugin/object_buffered_output.rb b/lib/fluent/plugin/object_buffered_output.rb new file mode 100644 index 0000000000..67598752f9 --- /dev/null +++ b/lib/fluent/plugin/object_buffered_output.rb @@ -0,0 +1,65 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/buffered_output' + +module Fluent + module Plugin + class ObjectBufferedOutput < BufferedOutput + desc "Format times to integer, instead of event-time object" + config_param :time_as_integer, :bool, :default => true + + def initialize + super + end + + def emit(tag, es, chain) + @emit_count += 1 + data = if @time_as_integer + es.to_msgpack_stream_forced_integer + else + es.to_msgpack_stream + end + key = tag + if @buffer.emit(key, data, chain) + submit_flush + end + end + + module BufferedEventStreamMixin + include Enumerable + + def repeatable? + true + end + + def each(&block) + msgpack_each(&block) + end + + def to_msgpack_stream + read + end + end + + def write(chunk) + chunk.extend(BufferedEventStreamMixin) + write_objects(chunk.key, chunk) + end + end + end +end + diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb new file mode 100644 index 0000000000..4a645f7256 --- /dev/null +++ b/lib/fluent/plugin/output.rb @@ -0,0 +1,65 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin' +require 'fluent/configurable' +require 'fluent/config' # for PluginId +require 'fluent/log' + +module Fluent + module Plugin + class Output + include Configurable + include PluginId + include PluginLoggerMixin + + attr_accessor :router + + def initialize + super + end + + def configure(conf) + super + + if label_name = conf['@label'] + label = Engine.root_agent.find_label(label_name) + @router = label.event_router + elsif @router.nil? + @router = Engine.root_agent.event_router + end + end + + def start + end + + def shutdown + end + + #def emit(tag, es, chain) + # # TODO + #end + + def secondary_init(primary) + if primary.class != self.class + $log.warn "type of secondary output should be same as primary output", primary: primary.class.to_s, secondary: self.class.to_s + end + end + + def inspect; "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)] end + end + end +end diff --git a/lib/fluent/plugin/time_sliced_output.rb b/lib/fluent/plugin/time_sliced_output.rb new file mode 100644 index 0000000000..a5d2295d04 --- /dev/null +++ b/lib/fluent/plugin/time_sliced_output.rb @@ -0,0 +1,146 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/buffered_output' +require 'fluent/timezone' + +module Fluent + module Plugin + class TimeSlicedOutput < BufferedOutput + + def initialize + super + @localtime = true + #@ignore_old = false # TODO + end + + desc 'The time format used as part of the file name.' + config_param :time_slice_format, :string, :default => '%Y%m%d' + desc 'The amount of time Fluentd will wait for old logs to arrive.' + config_param :time_slice_wait, :time, :default => 10*60 + desc 'Parse the time value in the specified timezone' + config_param :timezone, :string, :default => nil + config_set_default :buffer_type, 'file' # overwrite default buffer_type + config_set_default :buffer_chunk_limit, 256*1024*1024 # overwrite default buffer_chunk_limit + config_set_default :flush_interval, nil + + attr_accessor :localtime + attr_reader :time_slicer # for test + + def configure(conf) + super + + if conf['utc'] + @localtime = false + elsif conf['localtime'] + @localtime = true + end + + if conf['timezone'] + @timezone = conf['timezone'] + Fluent::Timezone.validate!(@timezone) + end + + if @timezone + @time_slicer = Timezone.formatter(@timezone, @time_slice_format) + elsif @localtime + @time_slicer = Proc.new {|time| + Time.at(time).strftime(@time_slice_format) + } + else + @time_slicer = Proc.new {|time| + Time.at(time).utc.strftime(@time_slice_format) + } + end + + @time_slice_cache_interval = time_slice_cache_interval + @before_tc = nil + @before_key = nil + + if @flush_interval + if conf['time_slice_wait'] + $log.warn "time_slice_wait is ignored if flush_interval is specified: #{conf}" + end + @enqueue_buffer_proc = Proc.new do + @buffer.keys.each {|key| + @buffer.push(key) + } + end + + else + @flush_interval = [60, @time_slice_cache_interval].min + @enqueue_buffer_proc = Proc.new do + nowslice = @time_slicer.call(Engine.now.to_i - @time_slice_wait) + @buffer.keys.each {|key| + if key < nowslice + @buffer.push(key) + end + } + end + end + end + + def emit(tag, es, chain) + @emit_count += 1 + formatted_data = {} + es.each {|time,record| + tc = time / @time_slice_cache_interval + if @before_tc == tc + key = @before_key + else + @before_tc = tc + key = @time_slicer.call(time) + @before_key = key + end + formatted_data[key] ||= '' + formatted_data[key] << format(tag, time, record) + } + formatted_data.each { |key, data| + if @buffer.emit(key, data, chain) + submit_flush + end + } + end + + def enqueue_buffer(force = false) + if force + @buffer.keys.each {|key| + @buffer.push(key) + } + else + @enqueue_buffer_proc.call + end + end + + #def format(tag, event) + # # TODO + #end + + private + def time_slice_cache_interval + if @time_slicer.call(0) != @time_slicer.call(60-1) + return 1 + elsif @time_slicer.call(0) != @time_slicer.call(60*60-1) + return 30 + elsif @time_slicer.call(0) != @time_slicer.call(24*60*60-1) + return 60*30 + else + return 24*60*30 + end + end + end + end +end From 249bd0e088e0d72d6e1deb2dc0947f188c69144b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 5 Mar 2015 15:15:02 +0900 Subject: [PATCH 003/111] Add Fluent::Plugin::Base * to reduce duplicated implementation in Input and Output --- lib/fluent/plugin/base.rb | 55 +++++++++++++++++++++++++++++++++++++ lib/fluent/plugin/input.rb | 22 +++------------ lib/fluent/plugin/output.rb | 28 +++++-------------- 3 files changed, 66 insertions(+), 39 deletions(-) create mode 100644 lib/fluent/plugin/base.rb diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb new file mode 100644 index 0000000000..268567f106 --- /dev/null +++ b/lib/fluent/plugin/base.rb @@ -0,0 +1,55 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin' +require 'fluent/configurable' +require 'fluent/config' # for PluginId +require 'fluent/log' + +module Fluent + module Plugin + class Base + include Configurable + include PluginId + include PluginLoggerMixin + + attr_accessor :router + attr_reader :_threads + + def initialize + super + @_threads = {} # dic for threads generated by Fluent::PluginSupport::* + end + + def configure(conf) + super + + if label_name = conf['@label'] + label = Engine.root_agent.find_label(label_name) + @router = label.event_router + elsif @router.nil? + @router = Engine.root_agent.event_router + end + end + + def start + end + + def shutdown + end + end + end +end diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 44a7fc8e0b..95ef39c0c8 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -14,39 +14,25 @@ # limitations under the License. # -require 'fluent/plugin' -require 'fluent/configurable' -require 'fluent/config' # for PluginId -require 'fluent/log' +require 'fluent/plugin/base' module Fluent module Plugin - class Input - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - + class Input < Base def initialize super end def configure(conf) super - - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end end def start + super end def shutdown + super end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 4a645f7256..c84130410d 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -14,44 +14,30 @@ # limitations under the License. # -require 'fluent/plugin' -require 'fluent/configurable' -require 'fluent/config' # for PluginId -require 'fluent/log' +require 'fluent/plugin/base' module Fluent module Plugin - class Output - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - + class Output < Base def initialize super end def configure(conf) super - - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end end def start + super end def shutdown + super end - #def emit(tag, es, chain) - # # TODO - #end + def emit(tag, es, chain) + #### TODO: + end def secondary_init(primary) if primary.class != self.class From 31f56b29f8a4bcb0c6b516c48336e547756a3ef7 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 5 Mar 2015 15:17:21 +0900 Subject: [PATCH 004/111] Add Fluent::PluginSupport to provide thread/timer/child_process operations instead of Actor of v11 Plugins should include using PluginSupport Mixin individually. --- lib/fluent/plugin_support.rb | 20 +++ lib/fluent/plugin_support/child_process.rb | 157 +++++++++++++++++++++ lib/fluent/plugin_support/thread.rb | 41 ++++++ lib/fluent/plugin_support/util.rb | 0 4 files changed, 218 insertions(+) create mode 100644 lib/fluent/plugin_support.rb create mode 100644 lib/fluent/plugin_support/child_process.rb create mode 100644 lib/fluent/plugin_support/thread.rb create mode 100644 lib/fluent/plugin_support/util.rb diff --git a/lib/fluent/plugin_support.rb b/lib/fluent/plugin_support.rb new file mode 100644 index 0000000000..6078798c9f --- /dev/null +++ b/lib/fluent/plugin_support.rb @@ -0,0 +1,20 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin_support/child_process' +require 'fluent/plugin_support/timer' +require 'fluent/plugin_support/thread' +require 'fluent/plugin_support/socket' # TODO: just socket? Or tcp_socket, udp_socket, unix_socket & ssl_socket? diff --git a/lib/fluent/plugin_support/child_process.rb b/lib/fluent/plugin_support/child_process.rb new file mode 100644 index 0000000000..9f9c184dba --- /dev/null +++ b/lib/fluent/plugin_support/child_process.rb @@ -0,0 +1,157 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/configurable' +require 'fluent/plugin_support/thread' + +module Fluent + module PluginSupport + module ChildProcess + include Fluent::PluginSupport::Thread + + CHILD_PROCESS_LOOP_CHECK_INTERVAL = 0.2 + + ChildProcessStat = Struct.new(:processes, :kill_timeout, :periodics, :loop, :shutdown, :mutex) + PeriodicProc = Struct.new(:spec, :block, :interval, :last_executed) + + def initialize + super + end + + def start + super + + @child_process = ChildProcessStat.new + @child_process.processes = {} # pid => thread + @child_process.kill_timeout = 60 + @child_process.periodics = [] + @child_process.loop = nil + @child_process.shutdown = false + + @child_process.mutex = Mutex.new + end + + def shutdown + @child_process.shutdown = true + if @child_process.loop + @child_process.loop.join + end + + @child_process.processes.keys.each do |pid| + begin + Process.kill(:TERM, pid) + rescue + # Errno::ECHILD, Errno::ESRCH, Errno::EPERM + end + end + @child_process.processes.keys.each do |pid| + thread = @child_process.processes[pid] + thread.join(@child_process.kill_timeout) + @child_process.processes.delete(pid) unless thread.alive? + end + + @child_process.processes.keys.each do |pid| + thread = @child_process.processes[pid] + begin + Process.kill(:KILL, pid) + rescue + # Errno::ECHILD, Errno::ESRCH, Errno::EPERM + end + thread.join + end + + super + end + + def child_process(command, arguments: nil, subprocess_name: nil, read: true, write: true, encoding: 'utf-8', interval: nil, immediate: false, &block) + raise "BUG: illegal specification to disable both of input and output file handle" if !read && !write + raise "BUG: block not specified which receive i/o object" unless block_given? + if interval + child_process_execute_loop + + periodic = PeriodicProc.new + periodic.spec = [command, arguments, subprocess_name, read, write, encoding] + periodic.block = block + periodic.interval = interval + periodic.last_executed = if immediate + Time.now - (interval + 1) + else + Time.now + end + @child_process.mutex.synchronize do + @child_process.periodics.push(periodic) + end + else + child_process_once(command, arguments, subprocess_name, read, write, encoding, &block) + end + end + + # for private use + def child_process_once(command, arguments, subprocess_name, read, write, encoding, &block) + ext_enc = encoding + int_enc = "utf-8" + mode = case + when read && write then "r+" + when read then "r" + when write then "w" + else "r" # but it is not for any bytes + end + cmd = command + if arguments # if arguments is nil, then command is executed by shell + cmd = [] + if subprocess_name + cmd.push [command, subprocess_name] + else + cmd.push command + end + cmd += arguments + end + io = IO.popen(cmd, mode, external_encoding: ext_enc, internal_encoding: int_enc) + pid = io.pid + thread = thread_create do + block.call(io) # TODO: rescue and log? or kill entire process? + + # child process probablly ended if sequence reaches here + Process.waitpid(pid) + @child_process.processes.delete(pid) + end + @child_process.processes[pid] = thread + end + + # for private use + def child_process_execute_loop + return if @child_process.loop + + thread = thread_create do + while sleep(CHILD_PROCESS_LOOP_CHECK_INTERVAL) + break if @child_process.shutdown + now = Time.now + periodics = @child_process.mutex.synchronize do + @child_process.periodics.dup + end + periodics.each do |periodic| + if now - periodic.last_executed >= periodic.interval + spec = periodic.spec + block = periodic.block + child_process_once(*spec, &block) + end + end + end + end + end + end + end +end diff --git a/lib/fluent/plugin_support/thread.rb b/lib/fluent/plugin_support/thread.rb new file mode 100644 index 0000000000..e81365a090 --- /dev/null +++ b/lib/fluent/plugin_support/thread.rb @@ -0,0 +1,41 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module PluginSupport + module Thread + def initialize + super + @_threads = {} + end + + def thread_create(*args) + thread = ::Thread.new(*args) do |*t_args| + current = ::Thread.current + current.abort_on_exception = true + @_threads[current.object_id] = current + begin + yield *t_args + ensure + @_threads.delete(::Thread.current.object_id) + end + end + @_threads[thread.object_id] = thread + thread + end + end + end +end diff --git a/lib/fluent/plugin_support/util.rb b/lib/fluent/plugin_support/util.rb new file mode 100644 index 0000000000..e69de29bb2 From 29eeaa38ce6476f96aaba72dcbe1422b51faf53e Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 5 Mar 2015 15:19:50 +0900 Subject: [PATCH 005/111] Add new Test Driver with simplified API * and mark old TestDriver as obsolete --- lib/fluent/test.rb | 2 +- lib/fluent/test/base.rb | 1 + lib/fluent/test/driver.rb | 22 +++++++ lib/fluent/test/driver/base.rb | 100 +++++++++++++++++++++++++++++ lib/fluent/test/driver/input.rb | 109 ++++++++++++++++++++++++++++++++ 5 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 lib/fluent/test/driver.rb create mode 100644 lib/fluent/test/driver/base.rb create mode 100644 lib/fluent/test/driver/input.rb diff --git a/lib/fluent/test.rb b/lib/fluent/test.rb index f552f7a20d..f973686fc7 100644 --- a/lib/fluent/test.rb +++ b/lib/fluent/test.rb @@ -23,4 +23,4 @@ require 'fluent/test/parser_test' require 'fluent/test/formatter_test' -$log ||= Fluent::Log.new(Fluent::Test::DummyLogDevice.new) +require 'fluent/test/driver/input' diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb index a3f9b77de0..b22c042c96 100644 --- a/lib/fluent/test/base.rb +++ b/lib/fluent/test/base.rb @@ -37,6 +37,7 @@ def assert_equal_event_time(a, b) nil end + # obsolete for v0.14 class TestDriver include ::Test::Unit::Assertions diff --git a/lib/fluent/test/driver.rb b/lib/fluent/test/driver.rb new file mode 100644 index 0000000000..f24961d230 --- /dev/null +++ b/lib/fluent/test/driver.rb @@ -0,0 +1,22 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module Test + module Driver + end + end +end diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb new file mode 100644 index 0000000000..23ef16c389 --- /dev/null +++ b/lib/fluent/test/driver/base.rb @@ -0,0 +1,100 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'test/unit' + +module Fluent + module Test + module Driver + class Base + include ::Test::Unit::Assertions + + def initialize(klass, &block) + if klass.is_a?(Class) + if block + # Create new class for test w/ overwritten methods + # klass.dup is worse because its ancestors does NOT include original class name + klass = Class.new(klass) + klass.module_eval(&block) + end + @instance = klass.new + else + @instance = klass + end + @instance.router = Engine.root_agent.event_router + @instance.log = TestLogger.new + + @config = Config.new + end + + attr_reader :instance, :config + + def configure(str, use_v1=true) # v1 syntax default + if str.is_a?(Fluent::Config::Element) + @config = str + else + @config = Config.parse(str, "(test)", "(test_dir)", use_v1) + end + if label_name = @config['@label'] + Engine.root_agent.add_label(label_name) + end + @instance.configure(@config) + self + end + + def run(&block) + @instance.start + until @instance._threads.values.all?(&:alive?) + sleep 0.01 + end + begin + yield + ensure + @instance.shutdown + end + end + + def stop? + # Should stop running if post conditions are not registered. + return true unless @run_post_conditions + + # Should stop running if all of the post conditions are true. + return true if @run_post_conditions.all? {|proc| proc.call } + + # Should stop running if any of the breaking conditions is true. + # In this case, some post conditions may be not true. + return true if @run_breaking_conditions && @run_breaking_conditions.any? {|proc| proc.call } + + false + end + + def end_if(&block) + if block + @run_post_conditions ||= [] + @run_post_conditions << block + end + end + + def break_if(&block) + if block + @run_breaking_conditions ||= [] + @run_breaking_conditions << block + end + end + end + end + end +end diff --git a/lib/fluent/test/driver/input.rb b/lib/fluent/test/driver/input.rb new file mode 100644 index 0000000000..0d51e7983d --- /dev/null +++ b/lib/fluent/test/driver/input.rb @@ -0,0 +1,109 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/test/driver/base' + +module Fluent + # TODO: Chekc What is this for? + class FileBuffer < BasicBuffer + def self.clear_buffer_paths + @@buffer_paths = {} + end + end + + module Test + module Driver + class Input < Base + attr_accessor :run_timeout + attr_reader :emit_streams + + def initialize(klass, &block) + FileBuffer.clear_buffer_paths + + super(klass, &block) + + @emit_streams = [] + end + + def emits + all = [] + @emit_streams.each {|tag,events| + events.each {|time,record| + all << [tag, time, record] + } + } + all + end + + def events + emits.map{|tag, time record| [time, record] } + end + + def records + emits.map{|tag, time, record| record } + end + + def run(&block) + m = method(:emit_stream) + Engine.define_singleton_method(:emit_stream) {|tag,es| + m.call(tag, es) + } + instance.router.define_singleton_method(:emit_stream) {|tag,es| + m.call(tag, es) + } + super { + block.call if block + + if @expected_emits_length || @run_post_conditions + # counters for emits and emit_streams + emitted_count = 0 + emit_times_count = 0 + + # Events of expected length will be emitted at the end. + if @expected_emits_length + register_run_post_condition do + emitted_count >= max_length + end + end + + # Set runnning timeout to avoid infinite loop caused by some errors. + started_at = Time.now + register_run_breaking_condition do + Time.now >= started_at + @run_timeout + end + + until stop? + if emit_times_count == @emit_streams.length + sleep 0.01 + next + end + + tag, events = @emit_streams[j] + emitted_count += events.length + end + end + } + self + end + + private + def emit_stream(tag, es) + @emit_streams << [tag, es.to_a] + end + end + end + end +end From f7e7c397285e06ae89f694007b462765ac4f724f Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Thu, 5 Mar 2015 15:21:12 +0900 Subject: [PATCH 006/111] Re-implement in_exec w/ v0.14 API (tests: green) --- lib/fluent/plugin/exec_util.rb | 4 ++ lib/fluent/plugin/in_exec.rb | 79 ++++++++++------------------------ test/plugin/test_in_exec.rb | 4 +- 3 files changed, 28 insertions(+), 59 deletions(-) diff --git a/lib/fluent/plugin/exec_util.rb b/lib/fluent/plugin/exec_util.rb index 9ed3d9edff..e3d195cc5e 100644 --- a/lib/fluent/plugin/exec_util.rb +++ b/lib/fluent/plugin/exec_util.rb @@ -119,4 +119,8 @@ def call(record, out) end end end + + module Plugin + ExecUtil = Fluent::ExecUtil + end end diff --git a/lib/fluent/plugin/in_exec.rb b/lib/fluent/plugin/in_exec.rb index 57d6af097e..5aea0f496f 100644 --- a/lib/fluent/plugin/in_exec.rb +++ b/lib/fluent/plugin/in_exec.rb @@ -14,9 +14,13 @@ # limitations under the License. # -module Fluent - class ExecInput < Input - Plugin.register_input('exec', self) +require 'fluent/plugin_support/child_process' + +module Fluent::Plugin + class ExecInput < Fluent::Plugin::Input + include Fluent::PluginSupport::ChildProcess + + Fluent::Plugin.register_input('exec', self) def initialize super @@ -27,21 +31,23 @@ def initialize desc 'The command (program) to execute.' config_param :command, :string desc 'The format used to map the program output to the incoming event.(tsv,json,msgpack)' - config_param :format, :string, :default => 'tsv' + config_param :format, default: 'tsv' desc 'Specify the comma-separated keys when using the tsv format.' - config_param :keys, :default => [] do |val| + config_param :keys, default: [] do |val| val.split(',') end desc 'Tag of the output events.' - config_param :tag, :string, :default => nil + config_param :tag, :string, default: nil desc 'The key to use as the event tag instead of the value in the event record. ' - config_param :tag_key, :string, :default => nil + config_param :tag_key, :string, default: nil desc 'The key to use as the event time instead of the value in the event record.' - config_param :time_key, :string, :default => nil + config_param :time_key, :string, default: nil desc 'The format of the event time used for the time_key parameter.' - config_param :time_format, :string, :default => nil + config_param :time_format, :string, default: nil desc 'The interval time between periodic program runs.' - config_param :run_interval, :time, :default => nil + config_param :run_interval, :time, default: nil + desc 'The flag to execute command immediately after startup' + config_param :run_immediately, :bool, default: false def configure(conf) super @@ -96,56 +102,15 @@ def setup_parser(conf) end def start - if @run_interval - @finished = false - @thread = Thread.new(&method(:run_periodic)) - else - @io = IO.popen(@command, "r") - @pid = @io.pid - @thread = Thread.new(&method(:run)) - end - end - - def shutdown - if @run_interval - @finished = true - # call Thread#run which interupts sleep in order to stop run_periodic thread immediately. - @thread.run - @thread.join - else - begin - Process.kill(:TERM, @pid) - rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM - end - if @thread.join(60) # TODO wait time - return - end + super - begin - Process.kill(:KILL, @pid) - rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM - end - @thread.join + child_process(@command, read: true, write: false, interval: @run_interval, immediate: @run_immediately) do |io| + @parser.call(io) end end - def run - @parser.call(@io) - end - - def run_periodic - sleep @run_interval - until @finished - begin - io = IO.popen(@command, "r") - @parser.call(io) - Process.waitpid(io.pid) - sleep @run_interval - rescue - log.error "exec failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s - log.warn_backtrace $!.backtrace - end - end + def shutdown + super end private @@ -169,7 +134,7 @@ def on_message(record, parsed_time = nil) router.emit(tag, time, record) rescue => e - log.error "exec failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record) + log.error "exec failed to emit", error: e.to_s, error_class: e.class.to_s, tag: tag, record: Yajl.dump(record) end end end diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index 3836069441..e45ce5204f 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -10,8 +10,8 @@ def setup @script = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) end - def create_driver(conf = tsv_config) - Fluent::Test::InputTestDriver.new(Fluent::ExecInput).configure(conf) + def create_driver(conf=tsv_config) + Fluent::Test::Driver::Input.new(Fluent::Plugin::ExecInput).configure(conf) end def tsv_config From 06bac366fc9155f62a748990b7ef2ab4fdb70db1 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 6 Mar 2015 13:22:34 +0900 Subject: [PATCH 007/111] Fix bug * not to specify default run_timeout & expected_emit_length * to miss to revise obsolete method/variable names --- lib/fluent/test/driver/input.rb | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lib/fluent/test/driver/input.rb b/lib/fluent/test/driver/input.rb index 0d51e7983d..204439088f 100644 --- a/lib/fluent/test/driver/input.rb +++ b/lib/fluent/test/driver/input.rb @@ -27,7 +27,7 @@ def self.clear_buffer_paths module Test module Driver class Input < Base - attr_accessor :run_timeout + attr_accessor :run_timeout, :expected_emits_length attr_reader :emit_streams def initialize(klass, &block) @@ -36,6 +36,8 @@ def initialize(klass, &block) super(klass, &block) @emit_streams = [] + @run_timeout = 60 + @expected_emits_length = nil end def emits @@ -49,7 +51,7 @@ def emits end def events - emits.map{|tag, time record| [time, record] } + emits.map{|tag, time, record| [time, record] } end def records @@ -74,14 +76,14 @@ def run(&block) # Events of expected length will be emitted at the end. if @expected_emits_length - register_run_post_condition do - emitted_count >= max_length + end_if do + emitted_count >= @expected_emits_length end end # Set runnning timeout to avoid infinite loop caused by some errors. started_at = Time.now - register_run_breaking_condition do + break_if do Time.now >= started_at + @run_timeout end @@ -91,7 +93,7 @@ def run(&block) next end - tag, events = @emit_streams[j] + tag, events = @emit_streams[emit_times_count] emitted_count += events.length end end From 65d6356a5a035c6beb88f139d7482cfb261da38f Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 6 Mar 2015 13:25:14 +0900 Subject: [PATCH 008/111] fix not to use unreliable "sleep" --- test/plugin/test_in_exec.rb | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index e45ce5204f..a47c9fc5e1 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -85,10 +85,9 @@ def test_configure_with_regexp def test_emit d = create_driver - - d.run do - sleep 2 - end + d.run_timeout = 2 + d.expected_emits_length = 1 + d.run emits = d.emits assert_equal true, emits.length > 0 @@ -98,10 +97,9 @@ def test_emit def test_emit_json d = create_driver json_config - - d.run do - sleep 2 - end + d.run_timeout = 2 + d.expected_emits_length = 1 + d.run emits = d.emits assert_equal true, emits.length > 0 @@ -111,10 +109,9 @@ def test_emit_json def test_emit_msgpack d = create_driver msgpack_config - - d.run do - sleep 2 - end + d.run_timeout = 2 + d.expected_emits_length = 1 + d.run emits = d.emits assert_equal true, emits.length > 0 From 542f4d1c7932f49f52440aff0b30e801ef1070f1 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 6 Mar 2015 13:44:27 +0900 Subject: [PATCH 009/111] fix tests w/ modern test-unit styles --- test/plugin/test_in_exec.rb | 110 ++++++++++++------------------------ 1 file changed, 35 insertions(+), 75 deletions(-) diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index a47c9fc5e1..55a4edad10 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -6,55 +6,48 @@ class ExecInputTest < Test::Unit::TestCase def setup Fluent::Test.setup - @test_time = Fluent::EventTime.parse("2011-01-02 13:14:15") - @script = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) end - def create_driver(conf=tsv_config) + def create_driver(conf=TSV_CONFIG) Fluent::Test::Driver::Input.new(Fluent::Plugin::ExecInput).configure(conf) end - def tsv_config - %[ - command ruby #{@script} "2011-01-02 13:14:15" 0 + TEST_TIME = Time.parse("2011-01-02 13:14:15").to_i + SCRIPT = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) + + TSV_CONFIG = %[ + command ruby #{SCRIPT} #{TEST_TIME} 0 keys time,tag,k1 time_key time tag_key tag time_format %Y-%m-%d %H:%M:%S run_interval 1s - ] - end + ] - def json_config - %[ - command ruby #{@script} #{@test_time} 1 + JSON_CONFIG = %[ + command ruby #{SCRIPT} #{TEST_TIME} 1 format json tag_key tag time_key time run_interval 1s - ] - end + ] - def msgpack_config - %[ - command ruby #{@script} #{@test_time} 2 + MSGPACK_CONFIG = %[ + command ruby #{SCRIPT} #{TEST_TIME} 2 format msgpack tag_key tagger time_key datetime run_interval 1s - ] - end + ] - def regexp_config - %[ - command ruby #{@script} "2011-01-02 13:14:15" 3 + REGEXP_CONFIG = %[ + command ruby #{SCRIPT} #{TEST_TIME} 3 format /(?