Skip to content

Commit

Permalink
Merge pull request #988 from fluent/fix-missed-bugs-for-v0.14
Browse files Browse the repository at this point in the history
Fix missed bugs for v0.14
  • Loading branch information
tagomoris committed May 25, 2016
2 parents 9b556e4 + d6245d0 commit 64edd13
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
2 changes: 1 addition & 1 deletion lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def add_match(type, pattern, conf)
log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

output = Plugin.new_output(type)
output.router = @event_router
output.router = @event_router if output.respond_to?(:router=)
output.configure(conf)
@outputs << output
@event_router.add_rule(pattern, output)
Expand Down
12 changes: 9 additions & 3 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,24 @@ def configure(conf)

# This method overrides Fluent::Plugin::Output#handle_stream_simple
# because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it
def handle_stream_simple(tag, es)
def handle_stream_simple(tag, es, enqueue: false)
if @overrides_format_stream
meta = metadata(nil, nil, nil)
bulk = format_stream(tag, es)
@buffer.emit_bulk(meta, bulk, es.size)
size = es.size
write_guard do
@buffer.write({meta => [bulk, size]}, bulk: true, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
end

meta = metadata(nil, nil, nil)
es_size = 0
es_bulk = es.map{|time,record| es_size += 1; format(tag, time, record) }.join
@buffer.emit_bulk(meta, es_bulk, es_size)
write_guard do
@buffer.write({meta => [es_bulk, es_size]}, bulk: true, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += es_size }
[meta]
end
Expand Down

0 comments on commit 64edd13

Please sign in to comment.