Skip to content

Commit

Permalink
fix compat buffered output plugins with renewed buffer APIs to write …
Browse files Browse the repository at this point in the history
…some event streams at once
  • Loading branch information
tagomoris committed May 24, 2016
1 parent c56a18f commit d6245d0
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,24 @@ def configure(conf)

# This method overrides Fluent::Plugin::Output#handle_stream_simple
# because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it
def handle_stream_simple(tag, es)
def handle_stream_simple(tag, es, enqueue: false)
if @overrides_format_stream
meta = metadata(nil, nil, nil)
bulk = format_stream(tag, es)
@buffer.emit_bulk(meta, bulk, es.size)
size = es.size
write_guard do
@buffer.write({meta => [bulk, size]}, bulk: true, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
end

meta = metadata(nil, nil, nil)
es_size = 0
es_bulk = es.map{|time,record| es_size += 1; format(tag, time, record) }.join
@buffer.emit_bulk(meta, es_bulk, es_size)
write_guard do
@buffer.write({meta => [es_bulk, es_size]}, bulk: true, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += es_size }
[meta]
end
Expand Down

0 comments on commit d6245d0

Please sign in to comment.