From c56a18f3834f2e1cb6ee973edbd3298da03e14d5 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 25 May 2016 01:41:21 +0900 Subject: [PATCH 1/2] assign router to output plugins only if needed --- lib/fluent/agent.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index eab8e6bb21..3fea70c3f1 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -131,7 +131,7 @@ def add_match(type, pattern, conf) log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type output = Plugin.new_output(type) - output.router = @event_router + output.router = @event_router if output.respond_to?(:router=) output.configure(conf) @outputs << output @event_router.add_rule(pattern, output) From d6245d0079bbcec6fea434dadca53b00dc535dfc Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Wed, 25 May 2016 01:42:36 +0900 Subject: [PATCH 2/2] fix compat buffered output plugins with renewed buffer APIs to write some event streams at once --- lib/fluent/compat/output.rb | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index f90decd15d..4f2a443090 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -247,18 +247,24 @@ def configure(conf) # This method overrides Fluent::Plugin::Output#handle_stream_simple # because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn't consider about it - def handle_stream_simple(tag, es) + def handle_stream_simple(tag, es, enqueue: false) if @overrides_format_stream meta = metadata(nil, nil, nil) bulk = format_stream(tag, es) - @buffer.emit_bulk(meta, bulk, es.size) + size = es.size + write_guard do + @buffer.write({meta => [bulk, size]}, bulk: true, enqueue: enqueue) + end + @counters_monitor.synchronize{ @emit_records += size } return [meta] end meta = metadata(nil, nil, nil) es_size = 0 es_bulk = es.map{|time,record| es_size += 1; format(tag, time, record) }.join - @buffer.emit_bulk(meta, es_bulk, es_size) + write_guard do + @buffer.write({meta => [es_bulk, es_size]}, bulk: true, enqueue: enqueue) + end @counters_monitor.synchronize{ @emit_records += es_size } [meta] end