From 51c13772e864d3b920608537bb1a14de9b7efebf Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 29 May 2015 20:07:37 +0900 Subject: [PATCH] implement buffered output plugin base classes with v0.14 API --- lib/fluent/plugin/buffer.rb | 1 + lib/fluent/plugin/buffered_output.rb | 71 ++++----- lib/fluent/plugin/object_buffered_output.rb | 47 +++--- lib/fluent/plugin/output.rb | 12 +- lib/fluent/plugin/time_sliced_output.rb | 155 +++++++++----------- 5 files changed, 144 insertions(+), 142 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 7865be255b..f33c753bde 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -128,6 +128,7 @@ def metadata_list end def metadata(timekey: nil, tag: nil, key_value_pairs={}) + # TODO cache metadata not to new it meta = if key_value_pairs.empty? Metadata.new(timekey, tag, []) else diff --git a/lib/fluent/plugin/buffered_output.rb b/lib/fluent/plugin/buffered_output.rb index 76222e8739..305af48eb0 100644 --- a/lib/fluent/plugin/buffered_output.rb +++ b/lib/fluent/plugin/buffered_output.rb @@ -19,18 +19,15 @@ module Fluent module Plugin class BufferedOutput < Output - def initialize - super - end + config_param :tag_chunked, :bool, default: false config_section :buffer, param_name: :buffer_config, required: false, multi: false, final: true do - desc 'The buffer type (memory, file)' - config_argument :type, :string, default: "memory" + config_set_default :type, "memory" # Buffer and its chunk configurations are defined in each buffer plugins # This plugin defines options for how to flush/retry - config_param :flush_at_shutdown, :bool, default: false + config_param :flush_at_shutdown, :bool # default should be set by Buffer plugins desc 'The interval between data flushes.' config_param :flush_interval, :time, default: 60 @@ -75,6 +72,17 @@ def initialize end end + def format(tag, time, record) + raise NotImplementedError, "BUG: buffered output plugins MUST implement this method" + end + + ### Implement try_write if plugin does commit_flush under its own responsibility + # def try_write(chunk) + + def write(chunk) + raise NotImplementedError, "BUG: buffered output plugins MUST implement this method" + end + def configure(conf) super @@ -127,6 +135,9 @@ def start @num_errors = 0 @emit_count = 0 + @cached_metadata = [] + @cached_metadata_mutex = Mutex.new + @retry = nil # state machine @retry_mutex = Mutex.new @@ -181,30 +192,30 @@ def terminate @buffer.terminate end - # metadata() should be used only by myself - def metadata(*args) - @cached_metadata ||= @buffer.metadata() - end - - # this should be overridden by child buffered_output class - def cached_metadata - [ metadata() ] + def metadata(tag) + @tag_chunked ? @buffer.metadata(tag: tag) : @buffer.metadata() end def emit(tag, es) - meta = metadata() - @emit_count += 1 - data = format_stream(tag, es) - @buffer.emit(meta, data) - + metalist = handle_stream(tag, es) if @flush_immediately - @buffer.enqueue_chunk(meta) + matalist.each do |meta| + @buffer.enqueue_chunk(meta) + end end - if !@retry && @buffer.enqueued?(meta) + if !@retry && metalist.reduce(false){|r,m| r || @buffer.enqueued?(m) } submit_flush end end + def handle_stream(tag, es) + meta = metadata(tag) + @emit_count += 1 + data = format_stream(tag, es) + @buffer.emit(meta, data) + [meta] + end + def format_stream(tag, es) out = '' es.each do |time, record| @@ -213,17 +224,6 @@ def format_stream(tag, es) out end - def format(tag, time, record) - raise NotImplementedError, "BUG: buffered output plugins MUST implement this method" - end - - ### Implement try_write if plugin does commit_flush under its own responsibility - # def try_write(chunk) - - def write(chunk) - raise NotImplementedError, "BUG: buffered output plugins MUST implement this method" - end - def submit_flush # Without locks: it is rough but enough to select "next" writer selection @writer_current_position = (@writer_current_position + 1) % @writers_size @@ -260,12 +260,13 @@ def try_rollback_write end end - def enqueue_buffer(force: false) + def enqueue_buffer(force: false, test: ->(chunk){ chunk.created_at + @flush_interval > Time.now }) if force @buffer.enqueue_all else - cached_metadata.each do |meta| - if @buffer.staged_chunk_test(meta){ |chunk| chunk.modified_at + @flush_interval > Fluent::Engine.now } + metalist = @buffer.metadata_list + metalist.each do |meta| + if @buffer.staged_chunk_test(meta, &test) @buffer.enqueue_chunk(meta) end end diff --git a/lib/fluent/plugin/object_buffered_output.rb b/lib/fluent/plugin/object_buffered_output.rb index 67598752f9..d24512a331 100644 --- a/lib/fluent/plugin/object_buffered_output.rb +++ b/lib/fluent/plugin/object_buffered_output.rb @@ -19,26 +19,44 @@ module Fluent module Plugin class ObjectBufferedOutput < BufferedOutput + # TODO check this works well w/ new API, or not desc "Format times to integer, instead of event-time object" config_param :time_as_integer, :bool, :default => true - def initialize + config_param :tag_chunked, :bool, default: true # actually, always true (this configuration value is ignored) + + def configure(conf) super - end - def emit(tag, es, chain) - @emit_count += 1 - data = if @time_as_integer - es.to_msgpack_stream_forced_integer - else - es.to_msgpack_stream - end - key = tag - if @buffer.emit(key, data, chain) - submit_flush + if @time_as_integer + m = method(:format_stream_forced_integer) + (class << self; self; end).module_eval do + define_method(:format_stream, m) + end end end + def write_objects(chunk) + raise NotImplementedError + end + + def metadata(tag) + @buffer.metadata(tag: tag) + end + + def format_stream_forced_integer(tag, es) + es.to_msgpack_stream_forced_integer + end + + def format_stream(tag, es) + es.to_msgpack_stream + end + + def write(chunk) + chunk.extend(BufferedEventStreamMixin) + write_objects(chunk) + end + module BufferedEventStreamMixin include Enumerable @@ -54,11 +72,6 @@ def to_msgpack_stream read end end - - def write(chunk) - chunk.extend(BufferedEventStreamMixin) - write_objects(chunk.key, chunk) - end end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index cf108f5dd3..bb3bbdfe59 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -19,6 +19,12 @@ module Fluent module Plugin class Output < Base + # config_param ... + + def emit(tag, es) + raise NotImplementedError, "BUG: output plugins MUST implement this method" + end + def initialize super end @@ -51,11 +57,9 @@ def terminate super end - def emit(tag, es) - raise NotImplementedError, "BUG: output plugins MUST implement this method" + def inspect + "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)] end - - def inspect; "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)] end end end end diff --git a/lib/fluent/plugin/time_sliced_output.rb b/lib/fluent/plugin/time_sliced_output.rb index a5d2295d04..5d98636db4 100644 --- a/lib/fluent/plugin/time_sliced_output.rb +++ b/lib/fluent/plugin/time_sliced_output.rb @@ -20,125 +20,108 @@ module Fluent module Plugin class TimeSlicedOutput < BufferedOutput - - def initialize - super - @localtime = true - #@ignore_old = false # TODO - end + DEFAULT_CHUNK_BYTES_LIMIT = 256 * 1024 * 1024 # 256MB for file desc 'The time format used as part of the file name.' - config_param :time_slice_format, :string, :default => '%Y%m%d' + config_param :time_slice_format, :string, default: '%Y%m%d' desc 'The amount of time Fluentd will wait for old logs to arrive.' - config_param :time_slice_wait, :time, :default => 10*60 + config_param :time_slice_wait, :time, default: 10*60 + desc 'Parse the time value in the specified timezone' - config_param :timezone, :string, :default => nil - config_set_default :buffer_type, 'file' # overwrite default buffer_type - config_set_default :buffer_chunk_limit, 256*1024*1024 # overwrite default buffer_chunk_limit - config_set_default :flush_interval, nil + config_param :timezone, :string, default: nil # most authoritive if specified + config_param :localtime, :bool, default: true # localtime and utc are exclusive + config_param :utc, :bool, default: false + + config_section :buffer, param_name: :buffer_config do + config_set_default :type, 'file' # overwrite default buffer_type + config_set_default :chunk_bytes_limit, DEFAULT_CHUNK_BYTES_LIMIT + config_set_default :flush_interval, nil + end - attr_accessor :localtime attr_reader :time_slicer # for test def configure(conf) super - if conf['utc'] - @localtime = false - elsif conf['localtime'] - @localtime = true - end - - if conf['timezone'] - @timezone = conf['timezone'] - Fluent::Timezone.validate!(@timezone) - end - if @timezone + Fluent::Timezone.validate!(@timezone) @time_slicer = Timezone.formatter(@timezone, @time_slice_format) - elsif @localtime - @time_slicer = Proc.new {|time| - Time.at(time).strftime(@time_slice_format) - } else - @time_slicer = Proc.new {|time| - Time.at(time).utc.strftime(@time_slice_format) - } + if @utc + @localtime = false # if utc is set true explicitly, @localtime should be false + elsif !@localtime # if localtime is set false explicitly, @utc should be true + @utc = true + end + + if @localtime + @time_slicer = Proc.new {|time| + Time.at(time).strftime(@time_slice_format) + } + else # UTC + @time_slicer = Proc.new {|time| + Time.at(time).utc.strftime(@time_slice_format) + } + end end @time_slice_cache_interval = time_slice_cache_interval @before_tc = nil @before_key = nil + @flush_interval = @buffer_config.flush_interval + if @flush_interval - if conf['time_slice_wait'] - $log.warn "time_slice_wait is ignored if flush_interval is specified: #{conf}" + if @time_slice_wait + log.warn "time_slice_wait is ignored if flush_interval is specified" end - @enqueue_buffer_proc = Proc.new do - @buffer.keys.each {|key| - @buffer.push(key) - } - end - + @chunk_enqueue_rule = ->(chunk){ chunk.created_at + @flush_interval > Time.now } else @flush_interval = [60, @time_slice_cache_interval].min - @enqueue_buffer_proc = Proc.new do - nowslice = @time_slicer.call(Engine.now.to_i - @time_slice_wait) - @buffer.keys.each {|key| - if key < nowslice - @buffer.push(key) - end - } - end + @chunk_enqueue_rule = ->(chunk){ + current_slice = @time_slicer.call(Time.now.to_i - @time_slice_wait) + chunk.metadata.timekey < current_slice + } end end - def emit(tag, es, chain) - @emit_count += 1 - formatted_data = {} - es.each {|time,record| - tc = time / @time_slice_cache_interval - if @before_tc == tc - key = @before_key - else - @before_tc = tc - key = @time_slicer.call(time) - @before_key = key - end - formatted_data[key] ||= '' - formatted_data[key] << format(tag, time, record) - } - formatted_data.each { |key, data| - if @buffer.emit(key, data, chain) - submit_flush - end - } + def metadata(timekey, tag) + @tag_chunked ? @buffer.metadata(timekey: timekey, tag: tag) : @buffer.metadata(timekey: timekey) end - def enqueue_buffer(force = false) - if force - @buffer.keys.each {|key| - @buffer.push(key) - } - else - @enqueue_buffer_proc.call + def handle_stream(tag, es) + @emit_count += 1 + emitted_meta = {} + + es.each do |time, record| + ts = time / @time_slice_cache_interval + timekey = if @before_tc == ts # same time_slice with event just before + @before_key + else # new time_slice, so update cache by calling @time_slicer (heavy) + @before_tc = ts + @before_key = @time_slicer.call(time) + end + data = format(tag, time, record) + meta = metadata + @buffer.emit(meta, data) + emitted_meta[meta] = true end + + emitted_meta.keys end - #def format(tag, event) - # # TODO - #end + def enqueue_buffer(force: false, test: nil) + super(force: force, test: @chunk_enqueue_rule) + end - private def time_slice_cache_interval - if @time_slicer.call(0) != @time_slicer.call(60-1) - return 1 - elsif @time_slicer.call(0) != @time_slicer.call(60*60-1) - return 30 - elsif @time_slicer.call(0) != @time_slicer.call(24*60*60-1) - return 60*30 - else - return 24*60*30 + if @time_slicer.call(0) != @time_slicer.call(60-1) # time slice length is seconds (1-59) + 1 + elsif @time_slicer.call(0) != @time_slicer.call(60*60-1) # time slice length is minutes + 30 + elsif @time_slicer.call(0) != @time_slicer.call(24*60*60-1) # time slice length is hours + 60*30 + else # longer than day + 24*60*30 end end end