diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index eab8e6bb21..3fea70c3f1 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -131,7 +131,7 @@ def add_match(type, pattern, conf) log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type output = Plugin.new_output(type) - output.router = @event_router + output.router = @event_router if output.respond_to?(:router=) output.configure(conf) @outputs << output @event_router.add_rule(pattern, output) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index f90decd15d..4f2a443090 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -247,18 +247,24 @@ def configure(conf) # This method overrides Fluent::Plugin::Output#handle_stream_simple # because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it - def handle_stream_simple(tag, es) + def handle_stream_simple(tag, es, enqueue: false) if @overrides_format_stream meta = metadata(nil, nil, nil) bulk = format_stream(tag, es) - @buffer.emit_bulk(meta, bulk, es.size) + size = es.size + write_guard do + @buffer.write({meta => [bulk, size]}, bulk: true, enqueue: enqueue) + end + @counters_monitor.synchronize{ @emit_records += size } return [meta] end meta = metadata(nil, nil, nil) es_size = 0 es_bulk = es.map{|time,record| es_size += 1; format(tag, time, record) }.join - @buffer.emit_bulk(meta, es_bulk, es_size) + write_guard do + @buffer.write({meta => [es_bulk, es_size]}, bulk: true, enqueue: enqueue) + end @counters_monitor.synchronize{ @emit_records += es_size } [meta] end