Skip to content

Commit

Permalink
implement buffered output plugin base classes with v0.14 API
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Feb 2, 2016
1 parent e5e1bf4 commit 51c1377
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 142 deletions.
1 change: 1 addition & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def metadata_list
end

def metadata(timekey: nil, tag: nil, key_value_pairs={})
# TODO cache metadata not to new it
meta = if key_value_pairs.empty?
Metadata.new(timekey, tag, [])
else
Expand Down
71 changes: 36 additions & 35 deletions lib/fluent/plugin/buffered_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
module Fluent
module Plugin
class BufferedOutput < Output
def initialize
super
end
config_param :tag_chunked, :bool, default: false

config_section :buffer, param_name: :buffer_config, required: false, multi: false, final: true do
desc 'The buffer type (memory, file)'
config_argument :type, :string, default: "memory"
config_set_default :type, "memory"

# Buffer and its chunk configurations are defined in each buffer plugins
# This plugin defines options for how to flush/retry

config_param :flush_at_shutdown, :bool, default: false
config_param :flush_at_shutdown, :bool # default should be set by Buffer plugins

desc 'The interval between data flushes.'
config_param :flush_interval, :time, default: 60
Expand Down Expand Up @@ -75,6 +72,17 @@ def initialize
end
end

def format(tag, time, record)
raise NotImplementedError, "BUG: buffered output plugins MUST implement this method"
end

### Implement try_write if plugin does commit_flush under its own responsibility
# def try_write(chunk)

def write(chunk)
raise NotImplementedError, "BUG: buffered output plugins MUST implement this method"
end

def configure(conf)
super

Expand Down Expand Up @@ -127,6 +135,9 @@ def start
@num_errors = 0
@emit_count = 0

@cached_metadata = []
@cached_metadata_mutex = Mutex.new

@retry = nil # state machine
@retry_mutex = Mutex.new

Expand Down Expand Up @@ -181,30 +192,30 @@ def terminate
@buffer.terminate
end

# metadata() should be used only by myself
def metadata(*args)
@cached_metadata ||= @buffer.metadata()
end

# this should be overridden by child buffered_output class
def cached_metadata
[ metadata() ]
def metadata(tag)
@tag_chunked ? @buffer.metadata(tag: tag) : @buffer.metadata()
end

def emit(tag, es)
meta = metadata()
@emit_count += 1
data = format_stream(tag, es)
@buffer.emit(meta, data)

metalist = handle_stream(tag, es)
if @flush_immediately
@buffer.enqueue_chunk(meta)
matalist.each do |meta|
@buffer.enqueue_chunk(meta)
end
end
if !@retry && @buffer.enqueued?(meta)
if !@retry && metalist.reduce(false){|r,m| r || @buffer.enqueued?(m) }
submit_flush
end
end

def handle_stream(tag, es)
meta = metadata(tag)
@emit_count += 1
data = format_stream(tag, es)
@buffer.emit(meta, data)
[meta]
end

def format_stream(tag, es)
out = ''
es.each do |time, record|
Expand All @@ -213,17 +224,6 @@ def format_stream(tag, es)
out
end

def format(tag, time, record)
raise NotImplementedError, "BUG: buffered output plugins MUST implement this method"
end

### Implement try_write if plugin does commit_flush under its own responsibility
# def try_write(chunk)

def write(chunk)
raise NotImplementedError, "BUG: buffered output plugins MUST implement this method"
end

def submit_flush
# Without locks: it is rough but enough to select "next" writer selection
@writer_current_position = (@writer_current_position + 1) % @writers_size
Expand Down Expand Up @@ -260,12 +260,13 @@ def try_rollback_write
end
end

def enqueue_buffer(force: false)
def enqueue_buffer(force: false, test: ->(chunk){ chunk.created_at + @flush_interval > Time.now })
if force
@buffer.enqueue_all
else
cached_metadata.each do |meta|
if @buffer.staged_chunk_test(meta){ |chunk| chunk.modified_at + @flush_interval > Fluent::Engine.now }
metalist = @buffer.metadata_list
metalist.each do |meta|
if @buffer.staged_chunk_test(meta, &test)
@buffer.enqueue_chunk(meta)
end
end
Expand Down
47 changes: 30 additions & 17 deletions lib/fluent/plugin/object_buffered_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,44 @@
module Fluent
module Plugin
class ObjectBufferedOutput < BufferedOutput
# TODO check this works well w/ new API, or not
desc "Format times to integer, instead of event-time object"
config_param :time_as_integer, :bool, :default => true

def initialize
config_param :tag_chunked, :bool, default: true # actually, always true (this configuration value is ignored)

def configure(conf)
super
end

def emit(tag, es, chain)
@emit_count += 1
data = if @time_as_integer
es.to_msgpack_stream_forced_integer
else
es.to_msgpack_stream
end
key = tag
if @buffer.emit(key, data, chain)
submit_flush
if @time_as_integer
m = method(:format_stream_forced_integer)
(class << self; self; end).module_eval do
define_method(:format_stream, m)
end
end
end

def write_objects(chunk)
raise NotImplementedError
end

def metadata(tag)
@buffer.metadata(tag: tag)
end

def format_stream_forced_integer(tag, es)
es.to_msgpack_stream_forced_integer
end

def format_stream(tag, es)
es.to_msgpack_stream
end

def write(chunk)
chunk.extend(BufferedEventStreamMixin)
write_objects(chunk)
end

module BufferedEventStreamMixin
include Enumerable

Expand All @@ -54,11 +72,6 @@ def to_msgpack_stream
read
end
end

def write(chunk)
chunk.extend(BufferedEventStreamMixin)
write_objects(chunk.key, chunk)
end
end
end
end
Expand Down
12 changes: 8 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
module Fluent
module Plugin
class Output < Base
# config_param ...

def emit(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def initialize
super
end
Expand Down Expand Up @@ -51,11 +57,9 @@ def terminate
super
end

def emit(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
def inspect
"#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)]
end

def inspect; "#<%s:%014x>" % [self.class.name, '0x%014x' % (__id__ << 1)] end
end
end
end
Loading

0 comments on commit 51c1377

Please sign in to comment.