diff --git a/.travis.yml b/.travis.yml index 1cefc919fa..1f1110b07b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,8 @@ language: ruby rvm: - - 2.0.0 - 2.1 + - 2.1.5 - 2.2.3 - 2.3.0 - ruby-head diff --git a/example/in_status.conf b/example/in_status.conf new file mode 100644 index 0000000000..80b6df5b62 --- /dev/null +++ b/example/in_status.conf @@ -0,0 +1,10 @@ + + @type status + emit_interval 3 + tag "test" + + + + @type stdout + + diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index 5080f2377c..f0262842b6 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +# TODO consolidate w/ Label ? + module Fluent require 'fluent/configurable' require 'fluent/engine' @@ -78,7 +81,7 @@ def shutdown @started_filters.map { |f| Thread.new do begin - log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id + log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_type_from_class(f.class), plugin_id: f.plugin_id f.shutdown rescue => e log.warn "unexpected error while shutting down filter plugins", :plugin => f.class, :plugin_id => f.plugin_id, :error_class => e.class, :error => e @@ -92,7 +95,7 @@ def shutdown @started_outputs.map { |o| Thread.new do begin - log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id + log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_type_from_class(o.class), plugin_id: o.plugin_id o.shutdown rescue => e log.warn "unexpected error while shutting down output plugins", :plugin => o.class, :plugin_id => o.plugin_id, :error_class => e.class, :error => e diff --git a/lib/fluent/command/debug.rb b/lib/fluent/command/debug.rb index 22a21d18a5..a748d518af 100644 --- a/lib/fluent/command/debug.rb +++ b/lib/fluent/command/debug.rb @@ -85,7 +85,6 @@ puts "Usage:" puts " Engine.match('some.tag').output : get an output plugin instance" puts " Engine.sources[i] : get input plugin instances" -puts " Plugin.load_plugin(type,name) : load plugin class (use this if you get DRb::DRbUnknown)" puts "" Encoding.default_internal = nil if Encoding.respond_to?(:default_internal) diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb index 6ba415d5ab..1f09488636 100644 --- a/lib/fluent/command/fluentd.rb +++ b/lib/fluent/command/fluentd.rb @@ -92,6 +92,10 @@ opts[:without_source] = b } +op.on('--plugin-storage-path DIR_PATH', "directory path which is used for storages of plugin internal data") {|s| + opts[:plugin_storage_path] = s +} + op.on('--use-v1-config', "Use v1 configuration format (default)", TrueClass) {|b| opts[:use_v1_config] = b } diff --git a/lib/fluent/config.rb b/lib/fluent/config.rb index 5ff253a714..cdeed1f52c 100644 --- a/lib/fluent/config.rb +++ b/lib/fluent/config.rb @@ -38,17 +38,4 @@ def self.new(name = '') Element.new(name, '', {}, []) end end - - require 'fluent/configurable' - - module PluginId - def configure(conf) - @id = conf['@id'] || conf['id'] - super - end - - def plugin_id - @id ? @id : "object:#{object_id.to_s(16)}" - end - end end diff --git a/lib/fluent/config/configure_proxy.rb b/lib/fluent/config/configure_proxy.rb index 5083da0b4a..5bccfa9658 100644 --- a/lib/fluent/config/configure_proxy.rb +++ b/lib/fluent/config/configure_proxy.rb @@ -77,19 +77,7 @@ def merge(other) # self is base class, other is subclass merged.argument = other.argument || self.argument merged.params = self.params.merge(other.params) merged.defaults = self.defaults.merge(other.defaults) - merged.sections = {} - (self.sections.keys + other.sections.keys).uniq.each do |section_key| - self_section = self.sections[section_key] - other_section = other.sections[section_key] - merged_section = if self_section && other_section - self_section.merge(other_section) - elsif self_section || other_section - self_section || other_section - else - raise "BUG: both of self and other section are nil" - end - merged.sections[section_key] = merged_section - end + merged.sections = self.sections.merge(other.sections) merged end diff --git a/lib/fluent/config/section.rb b/lib/fluent/config/section.rb index 8549e22fd3..d208493bad 100644 --- a/lib/fluent/config/section.rb +++ b/lib/fluent/config/section.rb @@ -33,6 +33,10 @@ def initialize(params = {}) alias :object_id :__id__ + def to_s + inspect + end + def inspect "" end @@ -62,6 +66,19 @@ def [](key) @params[key.to_sym] end + def respond_to?(symbol, include_all=false) + case symbol + when :inspect, :nil?, :to_h, :+, :instance_of?, :kind_of?, :[], :respond_to?, :respond_to_missing?, :method_missing, + true + when :!, :!= , :==, :equal?, :instance_eval, :instane_exec + true + when :method_missing, :singleton_method_added, :singleton_method_removed, :singleton_method_undefined + include_all + else + false + end + end + def respond_to_missing?(symbol, include_private) @params.has_key?(symbol) end @@ -70,7 +87,7 @@ def method_missing(name, *args) if @params.has_key?(name) @params[name] else - super + ::Kernel.raise ::NoMethodError, "undefined method `#{name}' for #{self.inspect}" end end end diff --git a/lib/fluent/configurable.rb b/lib/fluent/configurable.rb index a17008ed2a..280810890e 100644 --- a/lib/fluent/configurable.rb +++ b/lib/fluent/configurable.rb @@ -14,12 +14,13 @@ # limitations under the License. # -module Fluent - require 'fluent/config/configure_proxy' - require 'fluent/config/section' - require 'fluent/config/error' - require 'fluent/registry' +require 'fluent/config/configure_proxy' +require 'fluent/config/section' +require 'fluent/config/error' +require 'fluent/registry' +require 'fluent/plugin' +module Fluent module Configurable def self.included(mod) mod.extend(ClassMethods) @@ -50,9 +51,13 @@ def configure(conf) proxy = self.class.merged_configure_proxy conf.corresponding_proxies << proxy - # In the nested section, can't get plugin class through proxies so get plugin class here - plugin_class = Plugin.lookup_name_from_class(proxy.name.to_s) - root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class) + # Configuration sections must have plugin types to show unused parameters (for what type of plugin) + # in configuration files, + # but nested sections cannot take the type of root configuration section. + # So we need to get plugin type here, and use it in configuration proxies. + # ("type" is a stuff specified by '@type' in configuration file) + plugin_type = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s) + root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_type) @config_root_section = root root.instance_eval{ @params.keys }.each do |param_name| diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 05883ffbe4..2029b76e6c 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -18,6 +18,7 @@ module Fluent require 'fluent/event_router' require 'fluent/root_agent' require 'fluent/time' + require 'fluent/system_config' class EngineClass def initialize @@ -34,26 +35,30 @@ def initialize @msgpack_factory = MessagePack::Factory.new @msgpack_factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime) + @system_config = SystemConfig.new({}) end MATCH_CACHE_SIZE = 1024 LOG_EMIT_INTERVAL = 0.1 + attr_reader :system_config attr_reader :root_agent attr_reader :matches, :sources attr_reader :msgpack_factory def init(opts = {}) BasicSocket.do_not_reverse_lookup = true - Plugin.load_plugins if defined?(Encoding) Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal) Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external) end + @system_config = opts[:system_config] if opts[:system_config] + suppress_interval(opts[:suppress_interval]) if opts[:suppress_interval] @suppress_config_dump = opts[:suppress_config_dump] if opts[:suppress_config_dump] @without_source = opts[:without_source] if opts[:without_source] + @plugin_storage_path = opts[:plugin_storage_path] if opts[:plugin_storage_path] @root_agent = RootAgent.new(opts) @@ -113,22 +118,20 @@ def configure(conf) end end - def load_plugin_dir(dir) - Plugin.load_plugin_dir(dir) + def add_plugin_dir(dir) + Plugin.add_plugin_dir(dir) end def emit(tag, time, record) - unless record.nil? - emit_stream tag, OneEventStream.new(time, record) - end + raise "BUG: use router.emit instead of Engine.emit" end def emit_array(tag, array) - emit_stream tag, ArrayEventStream.new(array) + raise "BUG: use router.emit_array instead of Engine.emit_array" end def emit_stream(tag, es) - @event_router.emit_stream(tag, es) + raise "BUG: use router.emit_stream instead of Engine.emit_stream" end def flush! diff --git a/lib/fluent/env.rb b/lib/fluent/env.rb index 8f20db8906..0102a393d4 100644 --- a/lib/fluent/env.rb +++ b/lib/fluent/env.rb @@ -18,9 +18,9 @@ module Fluent DEFAULT_CONFIG_PATH = ENV['FLUENT_CONF'] || '/etc/fluent/fluent.conf' DEFAULT_PLUGIN_DIR = ENV['FLUENT_PLUGIN'] || '/etc/fluent/plugin' DEFAULT_SOCKET_PATH = ENV['FLUENT_SOCKET'] || '/var/run/fluent/fluent.sock' - DEFAULT_LISTEN_PORT = 24224 - DEFAULT_FILE_PERMISSION = 0644 - DEFAULT_DIR_PERMISSION = 0755 + DEFAULT_LISTEN_PORT = 24224 # TODO: obsolete + DEFAULT_FILE_PERMISSION = 0644 # TODO: configurable w/ + DEFAULT_DIR_PERMISSION = 0755 # TODO: configurable w/ IS_WINDOWS = /mswin|mingw/ === RUBY_PLATFORM private_constant :IS_WINDOWS diff --git a/lib/fluent/filter.rb b/lib/fluent/filter.rb index b5da9c4982..6b7b046faf 100644 --- a/lib/fluent/filter.rb +++ b/lib/fluent/filter.rb @@ -16,6 +16,7 @@ module Fluent class Filter + # TODO: move to plugin/filter.rb, and make interoperability layer here include Configurable include PluginId include PluginLoggerMixin diff --git a/lib/fluent/formatter.rb b/lib/fluent/formatter.rb index 5b14892f3a..85555207ef 100644 --- a/lib/fluent/formatter.rb +++ b/lib/fluent/formatter.rb @@ -14,9 +14,13 @@ # limitations under the License. # -module Fluent - require 'fluent/registry' +require 'fluent/registry' +require 'fluent/configurable' +require 'fluent/plugin' + +# Fluent::Plugin::Formatter ? +module Fluent class Formatter include Configurable @@ -240,7 +244,6 @@ def format(tag, time, record) end end - TEMPLATE_REGISTRY = Registry.new(:formatter_type, 'fluent/plugin/formatter_') { 'out_file' => Proc.new { OutFileFormatter.new }, 'stdout' => Proc.new { StdoutFormatter.new }, @@ -251,23 +254,22 @@ def format(tag, time, record) 'csv' => Proc.new { CsvFormatter.new }, 'single_value' => Proc.new { SingleValueFormatter.new }, }.each { |name, factory| - TEMPLATE_REGISTRY.register(name, factory) + Fluent::Plugin.register_formatter(name, factory) } def self.register_template(name, factory_or_proc) factory = if factory_or_proc.is_a?(Class) # XXXFormatter - Proc.new { factory_or_proc.new } + factory_or_proc.new elsif factory_or_proc.arity == 3 # Proc.new { |tag, time, record| } Proc.new { ProcWrappedFormatter.new(factory_or_proc) } else # Proc.new { XXXFormatter.new } factory_or_proc end - - TEMPLATE_REGISTRY.register(name, factory) + Fluent::Plugin.register_formatter(name, factory) end def self.lookup(format) - TEMPLATE_REGISTRY.lookup(format).call + Fluent::Plugin.new_formatter(format) end # Keep backward-compatibility diff --git a/lib/fluent/input.rb b/lib/fluent/input.rb index 10a6022bae..e9b37cad7f 100644 --- a/lib/fluent/input.rb +++ b/lib/fluent/input.rb @@ -14,33 +14,10 @@ # limitations under the License. # -module Fluent - class Input - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - - def initialize - super - end - - def configure(conf) - super +require 'fluent/plugin/input' - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end - end - - def start - end - - def shutdown - end +module Fluent + class Input < Plugin::Input + # TODO: add interoperability layer end end diff --git a/lib/fluent/load.rb b/lib/fluent/load.rb index 200f09c8ec..e693450878 100644 --- a/lib/fluent/load.rb +++ b/lib/fluent/load.rb @@ -1,3 +1,6 @@ +# TODO: This file MUST be deleted +# all test (and implementation) files should require its dependency by itself + require 'thread' require 'socket' require 'fcntl' @@ -30,8 +33,8 @@ require 'fluent/parser' require 'fluent/formatter' require 'fluent/event' -require 'fluent/buffer' -require 'fluent/input' -require 'fluent/output' +require 'fluent/plugin/buffer' +require 'fluent/plugin/input' +require 'fluent/plugin/output' require 'fluent/filter' require 'fluent/match' diff --git a/lib/fluent/log.rb b/lib/fluent/log.rb index f14f27c32f..85919fd09e 100644 --- a/lib/fluent/log.rb +++ b/lib/fluent/log.rb @@ -65,6 +65,9 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={}) # TODO: This variable name is unclear so we should change to better name. @threads_exclude_events = [] + @optional_header = nil + @optional_attrs = nil + if opts.has_key?(:suppress_repeated_stacktrace) @suppress_repeated_stacktrace = opts[:suppress_repeated_stacktrace] end @@ -74,6 +77,7 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={}) attr_accessor :level attr_accessor :tag attr_accessor :time_format + attr_accessor :optional_header, :optional_attrs def enable_debug(b=true) @debug_mode = b @@ -263,8 +267,8 @@ def dump_stacktrace(backtrace, level) def event(level, args) time = Time.now - message = '' - map = {} + message = @optional_header ? @optional_header.dup : '' + map = @optional_attrs ? @optional_attrs.dup : {} args.each {|a| if a.is_a?(Hash) a.each_pair {|k,v| @@ -364,6 +368,13 @@ def configure(conf) @log = PluginLogger.new($log) end @log.level = @log_level + @log.optional_header = "[#{self.class.name}#{@_id_configured ? "(" + @id + ")" : ""}] " + @log.optional_attrs = { + 'plugin_type' => self.class.name, + } + if @_id_configured + @log.optional_attrs.update({'plugin_id' => @id}) + end end end end diff --git a/lib/fluent/output.rb b/lib/fluent/output.rb index c5873f1ca9..3ea9ba7428 100644 --- a/lib/fluent/output.rb +++ b/lib/fluent/output.rb @@ -14,598 +14,118 @@ # limitations under the License. # -module Fluent - class OutputChain - def initialize(array, tag, es, chain=NullOutputChain.instance) - @array = array - @tag = tag - @es = es - @offset = 0 - @chain = chain - end - - def next - if @array.length <= @offset - return @chain.next - end - @offset += 1 - result = @array[@offset-1].emit(@tag, @es, self) - result - end - end - - class CopyOutputChain < OutputChain - def next - if @array.length <= @offset - return @chain.next - end - @offset += 1 - es = @array.length > @offset ? @es.dup : @es - result = @array[@offset-1].emit(@tag, es, self) - result - end - end +require 'fluent/plugin/output' +require 'fluent/plugin/buffered_output' +require 'fluent/plugin/object_buffered_output' +require 'fluent/plugin/time_sliced_output' - class NullOutputChain - require 'singleton' - include Singleton - - def next - end - end +# This classes are for compatibility. +# Fluent::Input (or other plugin base classes) are obsolete in v0.14. +require 'fluent/plugin_support/emitter' - class Output - include Configurable - include PluginId - include PluginLoggerMixin - - attr_accessor :router - +module Fluent + class EngineCompat def initialize - super + @router = nil end - def configure(conf) - super - - if label_name = conf['@label'] - label = Engine.root_agent.find_label(label_name) - @router = label.event_router - elsif @router.nil? - @router = Engine.root_agent.event_router - end + def reconfigure + @router = Fluent::Engine.root_agent.event_router end - def start + def emit(tag, time, record) + @router.emit(tag, time, record) end - def shutdown + def emit_array(tag, array) + @router.emit(tag, array) end - #def emit(tag, es, chain) - #end - - def secondary_init(primary) - if primary.class != self.class - $log.warn "type of secondary output should be same as primary output", :primary=>primary.class.to_s, :secondary=>self.class.to_s - end + def emit_stream(tag, es) + @router.emit_stream(tag, es) end - - def inspect; "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__<<1)] end end + module OutputPluginCompat + # TODO: add interoperability layer (especially for chain) - class OutputThread - def initialize(output) - @output = output - @finish = false - @next_time = Time.now.to_f + 1.0 - end - - def configure(conf) - end + # All traditional output plugins can emit events + include Fluent::PluginSupport::Emitter - def start - @mutex = Mutex.new - @cond = ConditionVariable.new - @thread = Thread.new(&method(:run)) - end - - def shutdown - @finish = true - @mutex.synchronize { - @cond.signal - } - Thread.pass - @thread.join - end + # to overwrite Fluent::Engine in traditional plugin code + module Fluent; end - def submit_flush - @mutex.synchronize { - @next_time = 0 - @cond.signal - } - Thread.pass - end - - private - def run - @mutex.lock - begin - until @finish - time = Time.now.to_f - - if @next_time <= time - @mutex.unlock - begin - @next_time = @output.try_flush - ensure - @mutex.lock - end - next_wait = @next_time - Time.now.to_f - else - next_wait = @next_time - time - end - - cond_wait(next_wait) if next_wait > 0 - end - ensure - @mutex.unlock - end - rescue - $log.error "error on output thread", :error=>$!.to_s - $log.error_backtrace - raise - ensure - @mutex.synchronize { - @output.before_shutdown - } - end - - def cond_wait(sec) - @cond.wait(@mutex, sec) - end - end - - - class BufferedOutput < Output - def initialize - super - @next_flush_time = 0 - @last_retry_time = 0 - @next_retry_time = 0 - @num_errors = 0 - @num_errors_lock = Mutex.new - @secondary_limit = 8 - @emit_count = 0 - end - - desc 'The buffer type (memory, file)' - config_param :buffer_type, :string, :default => 'memory' - desc 'The interval between data flushes.' - config_param :flush_interval, :time, :default => 60 - config_param :try_flush_interval, :float, :default => 1 - desc 'If true, the value of `retry_value` is ignored and there is no limit' - config_param :disable_retry_limit, :bool, :default => false - desc 'The limit on the number of retries before buffered data is discarded' - config_param :retry_limit, :integer, :default => 17 - desc 'The initial intervals between write retries.' - config_param :retry_wait, :time, :default => 1.0 - desc 'The maximum intervals between write retries.' - config_param :max_retry_wait, :time, :default => nil - desc 'The number of threads to flush the buffer.' - config_param :num_threads, :integer, :default => 1 - desc 'The interval between data flushes for queued chunk.' - config_param :queued_chunk_flush_interval, :time, :default => 1 + Fluent::Engine = Engine = EngineCompat.new # Engine.root_agent is not initialized yet def configure(conf) super - - @retry_wait = @retry_wait.to_f # converted to Float for calc_retry_wait - @buffer = Plugin.new_buffer(@buffer_type) - @buffer.configure(conf) - - if @buffer.respond_to?(:enable_parallel) - if @num_threads == 1 - @buffer.enable_parallel(false) - else - @buffer.enable_parallel(true) - end - end - - @writers = (1..@num_threads).map { - writer = OutputThread.new(self) - writer.configure(conf) - writer - } - - if sconf = conf.elements.select {|e| e.name == 'secondary' }.first - type = sconf['@type'] || conf['@type'] || sconf['type'] || conf['type'] - @secondary = Plugin.new_output(type) - @secondary.router = router - @secondary.configure(sconf) - - if secondary_limit = conf['secondary_limit'] - @secondary_limit = secondary_limit.to_i - if @secondary_limit < 0 - raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'" - end - end - - @secondary.secondary_init(self) - end - end - - def start - @next_flush_time = Time.now.to_f + @flush_interval - @buffer.start - @secondary.start if @secondary - @writers.each {|writer| writer.start } - @writer_current_position = 0 - @writers_size = @writers.size - end - - def shutdown - @writers.each {|writer| writer.shutdown } - @secondary.shutdown if @secondary - @buffer.shutdown + # set root_agent.event_router here + Engine.reconfigure end + end - def emit(tag, es, chain, key="") - @emit_count += 1 - data = format_stream(tag, es) - if @buffer.emit(key, data, chain) - submit_flush - end - end + class Output < Plugin::Output + include OutputPluginCompat + end - def submit_flush - # Without locks: it is rough but enough to select "next" writer selection - @writer_current_position = (@writer_current_position + 1) % @writers_size - @writers[@writer_current_position].submit_flush - end + class BufferedOutput < Plugin::BufferedOutput + include OutputPluginCompat + end - def format_stream(tag, es) - out = '' - es.each {|time,record| - out << format(tag, time, record) - } - out - end + class ObjectBufferedOutput < Plugin::ObjectBufferedOutput + include OutputPluginCompat + end - #def format(tag, time, record) - #end + class TimeSlicedOutput < Plugin::TimeSlicedOutput + include OutputPluginCompat + end - #def write(chunk) + class MultiOutput < Output + #def outputs + # # TODO #end - - def enqueue_buffer(force = false) - @buffer.keys.each {|key| - @buffer.push(key) - } - end - - def try_flush - time = Time.now.to_f - - empty = @buffer.queue_size == 0 - if empty && @next_flush_time < (now = Time.now.to_f) - @buffer.synchronize do - if @next_flush_time < now - enqueue_buffer - @next_flush_time = now + @flush_interval - empty = @buffer.queue_size == 0 - end - end - end - if empty - return time + @try_flush_interval - end - - begin - retrying = !@num_errors.zero? - - if retrying - @num_errors_lock.synchronize do - if retrying = !@num_errors.zero? # re-check in synchronize - if @next_retry_time >= time - # allow retrying for only one thread - return time + @try_flush_interval - end - # assume next retry failes and - # clear them if when it succeeds - @last_retry_time = time - @num_errors += 1 - @next_retry_time += calc_retry_wait - end - end - end - - if @secondary && !@disable_retry_limit && @num_errors > @retry_limit - has_next = flush_secondary(@secondary) - else - has_next = @buffer.pop(self) - end - - # success - if retrying - @num_errors = 0 - # Note: don't notify to other threads to prevent - # burst to recovered server - $log.warn "retry succeeded.", :plugin_id=>plugin_id - end - - if has_next - return Time.now.to_f + @queued_chunk_flush_interval - else - return time + @try_flush_interval - end - - rescue => e - if retrying - error_count = @num_errors - else - # first error - error_count = 0 - @num_errors_lock.synchronize do - if @num_errors.zero? - @last_retry_time = time - @num_errors += 1 - @next_retry_time = time + calc_retry_wait - end - end - end - - if @disable_retry_limit || error_count < @retry_limit - $log.warn "temporarily failed to flush the buffer.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn_backtrace e.backtrace - - elsif @secondary - if error_count == @retry_limit - $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn "retry count exceededs limit. falling back to secondary output." - $log.warn_backtrace e.backtrace - retry # retry immediately - elsif error_count <= @retry_limit + @secondary_limit - $log.warn "failed to flush the buffer, next retry will be with secondary output.", :next_retry=>Time.at(@next_retry_time), :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn_backtrace e.backtrace - else - $log.warn "failed to flush the buffer.", :error_class=>e.class, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn "secondary retry count exceededs limit." - $log.warn_backtrace e.backtrace - write_abort - @num_errors = 0 - end - - else - $log.warn "failed to flush the buffer.", :error_class=>e.class.to_s, :error=>e.to_s, :plugin_id=>plugin_id - $log.warn "retry count exceededs limit." - $log.warn_backtrace e.backtrace - write_abort - @num_errors = 0 - end - - return @next_retry_time - end - end - - def force_flush - @num_errors_lock.synchronize do - @next_retry_time = Time.now.to_f - 1 - end - enqueue_buffer(true) - submit_flush - end - - def before_shutdown - begin - @buffer.before_shutdown(self) - rescue - $log.warn "before_shutdown failed", :error=>$!.to_s - $log.warn_backtrace - end - end - - def calc_retry_wait - # TODO retry pattern - wait = if @disable_retry_limit || @num_errors <= @retry_limit - @retry_wait * (2 ** (@num_errors - 1)) - else - # secondary retry - @retry_wait * (2 ** (@num_errors - 2 - @retry_limit)) - end - retry_wait = wait.finite? ? wait + (rand * (wait / 4.0) - (wait / 8.0)) : wait - @max_retry_wait ? [retry_wait, @max_retry_wait].min : retry_wait - end - - def write_abort - $log.error "throwing away old logs." - begin - @buffer.clear! - rescue - $log.error "unexpected error while aborting", :error=>$!.to_s - $log.error_backtrace - end - end - - def flush_secondary(secondary) - @buffer.pop(secondary) - end end - - class ObjectBufferedOutput < BufferedOutput - config_param :time_as_integer, :bool, :default => true - - def initialize - super - end - - def emit(tag, es, chain) - @emit_count += 1 - if @time_as_integer - data = es.to_msgpack_stream_forced_integer - else - data = es.to_msgpack_stream - end - key = tag - if @buffer.emit(key, data, chain) - submit_flush - end + # Output Chain does nothing currently. + # These will be removed at v1. + class OutputChain + def initialize(array, tag, es, chain=NullOutputChain.instance) + @array = array + @tag = tag + @es = es + @offset = 0 + @chain = chain end - module BufferedEventStreamMixin - include Enumerable - - def repeatable? - true - end - - def each(&block) - msgpack_each(&block) - end - - def to_msgpack_stream - read + def next + if @array.length <= @offset + return @chain.next end - end - - def write(chunk) - chunk.extend(BufferedEventStreamMixin) - write_objects(chunk.key, chunk) + @offset += 1 + result = @array[@offset-1].emit(@tag, @es, self) + result end end - - class TimeSlicedOutput < BufferedOutput - require 'fluent/timezone' - - def initialize - super - @localtime = true - #@ignore_old = false # TODO - end - - desc 'The time format used as part of the file name.' - config_param :time_slice_format, :string, :default => '%Y%m%d' - desc 'The amount of time Fluentd will wait for old logs to arrive.' - config_param :time_slice_wait, :time, :default => 10*60 - desc 'Parse the time value in the specified timezone' - config_param :timezone, :string, :default => nil - config_set_default :buffer_type, 'file' # overwrite default buffer_type - config_set_default :buffer_chunk_limit, 256*1024*1024 # overwrite default buffer_chunk_limit - config_set_default :flush_interval, nil - - attr_accessor :localtime - attr_reader :time_slicer # for test - - def configure(conf) - super - - if conf['utc'] - @localtime = false - elsif conf['localtime'] - @localtime = true - end - - if conf['timezone'] - @timezone = conf['timezone'] - Fluent::Timezone.validate!(@timezone) - end - - if @timezone - @time_slicer = Timezone.formatter(@timezone, @time_slice_format) - elsif @localtime - @time_slicer = Proc.new {|time| - Time.at(time).strftime(@time_slice_format) - } - else - @time_slicer = Proc.new {|time| - Time.at(time).utc.strftime(@time_slice_format) - } - end - - @time_slice_cache_interval = time_slice_cache_interval - @before_tc = nil - @before_key = nil - - if @flush_interval - if conf['time_slice_wait'] - $log.warn "time_slice_wait is ignored if flush_interval is specified: #{conf}" - end - @enqueue_buffer_proc = Proc.new do - @buffer.keys.each {|key| - @buffer.push(key) - } - end - - else - @flush_interval = [60, @time_slice_cache_interval].min - @enqueue_buffer_proc = Proc.new do - nowslice = @time_slicer.call(Time.now - @time_slice_wait) - @buffer.keys.each {|key| - if key < nowslice - @buffer.push(key) - end - } - end - end - end - - def emit(tag, es, chain) - @emit_count += 1 - formatted_data = {} - es.each {|time,record| - tc = time / @time_slice_cache_interval - if @before_tc == tc - key = @before_key - else - @before_tc = tc - key = @time_slicer.call(time) - @before_key = key - end - formatted_data[key] ||= '' - formatted_data[key] << format(tag, time, record) - } - formatted_data.each { |key, data| - if @buffer.emit(key, data, chain) - submit_flush - end - } - end - - def enqueue_buffer(force = false) - if force - @buffer.keys.each {|key| - @buffer.push(key) - } - else - @enqueue_buffer_proc.call - end - end - - #def format(tag, event) - #end - - private - def time_slice_cache_interval - if @time_slicer.call(0) != @time_slicer.call(60-1) - return 1 - elsif @time_slicer.call(0) != @time_slicer.call(60*60-1) - return 30 - elsif @time_slicer.call(0) != @time_slicer.call(24*60*60-1) - return 60*30 - else - return 24*60*30 + class CopyOutputChain < OutputChain + def next + if @array.length <= @offset + return @chain.next end + @offset += 1 + es = @array.length > @offset ? @es.dup : @es + result = @array[@offset-1].emit(@tag, es, self) + result end end + class NullOutputChain + require 'singleton' + include Singleton - class MultiOutput < Output - #def outputs - #end + def next + end end end diff --git a/lib/fluent/parser.rb b/lib/fluent/parser.rb index b30b47a344..d1f23dc79e 100644 --- a/lib/fluent/parser.rb +++ b/lib/fluent/parser.rb @@ -14,9 +14,13 @@ # limitations under the License. # -module Fluent - require 'fluent/registry' +require 'fluent/registry' +require 'fluent/configurable' +require 'fluent/plugin' + +# TODO Fluent::Plugin::Parser ? +module Fluent class ParserError < StandardError end @@ -622,7 +626,6 @@ def check_format_regexp(format, key) end end - TEMPLATE_REGISTRY = Registry.new(:config_type, 'fluent/plugin/parser_') { 'apache' => Proc.new { RegexpParser.new(/^(?[^ ]*) [^ ]* (?[^ ]*) \[(? EOS s = FakeSupervisor.new - sc = Fluent::Supervisor::SystemConfig.new(conf) + sc = Fluent::SystemConfig.new(conf) sc.apply(s) assert_equal(Fluent::Log::LEVEL_WARN, s.instance_variable_get("@log").level) end diff --git a/test/helper.rb b/test/helper.rb index 86acd6953b..02f9a3c4e1 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -42,16 +42,27 @@ def to_masked_element require 'fluent/log' require 'fluent/test' +$log = Fluent::Log.new(Fluent::Test::DummyLogDevice.new, Fluent::Log::LEVEL_WARN) + unless defined?(Test::Unit::AssertionFailedError) class Test::Unit::AssertionFailedError < StandardError end end -def unused_port - s = TCPServer.open(0) - port = s.addr[1] - s.close - port +def unused_port(num=1) + ports = [] + sockets = [] + num.times do + s = TCPServer.open(0) + sockets << s + ports << s.addr[1] + end + sockets.each{|s| s.close } + if num == 1 + ports.first + else + ports + end end def ipv6_enabled? @@ -64,5 +75,3 @@ def ipv6_enabled? false end end - -$log = Fluent::Log.new(Fluent::Test::DummyLogDevice.new, Fluent::Log::LEVEL_WARN) diff --git a/test/plugin/test_in_debug_agent.rb b/test/plugin/test_in_debug_agent.rb index 4ca94eb682..3de87e0461 100644 --- a/test/plugin/test_in_debug_agent.rb +++ b/test/plugin/test_in_debug_agent.rb @@ -1,5 +1,6 @@ require_relative '../helper' require 'fileutils' +require 'fluent/plugin/in_debug_agent' class DebugAgentInputTest < Test::Unit::TestCase def setup diff --git a/test/plugin/test_in_dummy.rb b/test/plugin/test_in_dummy.rb index d2bf159bc0..31ee031a25 100644 --- a/test/plugin/test_in_dummy.rb +++ b/test/plugin/test_in_dummy.rb @@ -1,13 +1,15 @@ require_relative '../helper' require 'fluent/test' +require 'fluent/plugin/in_dummy' +require 'fileutils' class DummyTest < Test::Unit::TestCase def setup Fluent::Test.setup end - def create_driver(conf) - Fluent::Test::InputTestDriver.new(Fluent::DummyInput).configure(conf) + def create_driver(conf, system_opts={}) + Fluent::Test::Driver::Input.new(Fluent::Plugin::DummyInput, system_opts).configure(conf) end sub_test_case 'configure' do @@ -70,9 +72,9 @@ def create_driver(conf) test 'simple' do d = create_driver(config) - d.run { - # d.run sleeps 0.5 sec - } + d.expected_emits_length = 5 + d.run + emits = d.emits emits.each do |tag, time, record| assert_equal("dummy", tag) @@ -83,9 +85,9 @@ def create_driver(conf) test 'with auto_increment_key' do d = create_driver(config + %[auto_increment_key id]) - d.run { - # d.run sleeps 0.5 sec - } + d.expected_emits_length = 5 + d.run + emits = d.emits emits.each_with_index do |(tag, time, record), i| assert_equal("dummy", tag) @@ -93,4 +95,81 @@ def create_driver(conf) end end end + + TEST_PLUGIN_STORAGE_PATH = File.join( File.dirname(File.dirname(__FILE__)), 'tmp', 'in_dummy', 'store' ) + FileUtils.mkdir_p TEST_PLUGIN_STORAGE_PATH + + sub_test_case "doesn't suspend internal counters in default" do + config1 = %[ + @id test-01 + tag dummy + rate 10 + dummy [{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}] + auto_increment_key id + ] + test "value of auto increment key is not suspended after stop-and-start" do + assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json')) + + d1 = create_driver(config1, plugin_storage_path: TEST_PLUGIN_STORAGE_PATH) + d1.expected_emits_length = 4 + d1.run + + first_id1 = d1.emits.first[2]['id'] + assert_equal 0, first_id1 + + last_id1 = d1.emits.last[2]['id'] + assert { last_id1 > 0 } + + assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json')) + + d2 = create_driver(config1, plugin_storage_path: TEST_PLUGIN_STORAGE_PATH) + d2.expected_emits_length = 4 + d2.run + + first_id2 = d2.emits.first[2]['id'] + assert_equal 0, first_id2 + + assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-01.json')) + end + end + + sub_test_case "suspend internal counters if suspend is true" do + setup do + FileUtils.rm_rf(TEST_PLUGIN_STORAGE_PATH) + end + + config2 = %[ + @id test-02 + tag dummy + rate 2 + dummy [{"x": 1, "y": "1"}, {"x": 2, "y": "2"}, {"x": 3, "y": "3"}] + auto_increment_key id + suspend true + ] + test "value of auto increment key is suspended after stop-and-start" do + assert !File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json')) + + d1 = create_driver(config2, plugin_storage_path: TEST_PLUGIN_STORAGE_PATH) + + d1.expected_emits_length = 4 + d1.run + + first_id1 = d1.emits.first[2]['id'] + assert_equal 0, first_id1 + + last_id1 = d1.emits.last[2]['id'] + assert { last_id1 > 0 } + + assert File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json')) + + d2 = create_driver(config2, plugin_storage_path: TEST_PLUGIN_STORAGE_PATH) + d2.expected_emits_length = 4 + d2.run + + first_id2 = d2.emits.first[2]['id'] + assert_equal last_id1 + 1, first_id2 + + assert File.exist?(File.join(TEST_PLUGIN_STORAGE_PATH, 'json', 'test-02.json')) + end + end end diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index 34c7abcea1..96e672c03b 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -1,59 +1,53 @@ require_relative '../helper' require 'fluent/test' require 'net/http' +require 'fluent/plugin/in_exec' class ExecInputTest < Test::Unit::TestCase def setup Fluent::Test.setup - @test_time = Fluent::EventTime.parse("2011-01-02 13:14:15") - @script = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) end - def create_driver(conf = tsv_config) - Fluent::Test::InputTestDriver.new(Fluent::ExecInput).configure(conf) + def create_driver(conf=TSV_CONFIG) + Fluent::Test::Driver::Input.new(Fluent::Plugin::ExecInput).configure(conf) end - def tsv_config - %[ - command ruby #{@script} "2011-01-02 13:14:15" 0 + TEST_TIME = Time.parse("2011-01-02 13:14:15").to_i + SCRIPT = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) + + TSV_CONFIG = %[ + command ruby #{SCRIPT} #{TEST_TIME} 0 keys time,tag,k1 time_key time tag_key tag time_format %Y-%m-%d %H:%M:%S run_interval 1s - ] - end + ] - def json_config - %[ - command ruby #{@script} #{@test_time} 1 + JSON_CONFIG = %[ + command ruby #{SCRIPT} #{TEST_TIME} 1 format json tag_key tag time_key time run_interval 1s - ] - end + ] - def msgpack_config - %[ - command ruby #{@script} #{@test_time} 2 + MSGPACK_CONFIG = %[ + command ruby #{SCRIPT} #{TEST_TIME} 2 format msgpack tag_key tagger time_key datetime run_interval 1s - ] - end + ] - def regexp_config - %[ - command ruby #{@script} "2011-01-02 13:14:15" 3 + REGEXP_CONFIG = %[ + command ruby #{SCRIPT} #{TEST_TIME} 3 format /(?