From 97486d1170f75814c2e92779cb9ca22664379c6e Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 30 Jul 2021 19:02:17 +0900 Subject: [PATCH] input: filter: bare_output: multi_output: output: Don't use monitor lock if needn't Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/bare_output.rb | 8 ++++++-- lib/fluent/plugin/filter.rb | 8 ++++++-- lib/fluent/plugin/input.rb | 8 ++++++-- lib/fluent/plugin/multi_output.rb | 8 ++++++-- lib/fluent/plugin/output.rb | 32 +++++++++++++++++++++++-------- 5 files changed, 48 insertions(+), 16 deletions(-) diff --git a/lib/fluent/plugin/bare_output.rb b/lib/fluent/plugin/bare_output.rb index 527d3e10a8..e5858bbfb3 100644 --- a/lib/fluent/plugin/bare_output.rb +++ b/lib/fluent/plugin/bare_output.rb @@ -91,8 +91,12 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize do - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) end rescue diff --git a/lib/fluent/plugin/filter.rb b/lib/fluent/plugin/filter.rb index a2580c03d0..a03b5b6c85 100644 --- a/lib/fluent/plugin/filter.rb +++ b/lib/fluent/plugin/filter.rb @@ -67,9 +67,13 @@ def statistics end def measure_metrics(es) - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end end diff --git a/lib/fluent/plugin/input.rb b/lib/fluent/plugin/input.rb index 3442387cdb..0768fa8005 100644 --- a/lib/fluent/plugin/input.rb +++ b/lib/fluent/plugin/input.rb @@ -63,9 +63,13 @@ def statistics end def metric_callback(es) - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end end diff --git a/lib/fluent/plugin/multi_output.rb b/lib/fluent/plugin/multi_output.rb index 81c5cc800f..21547ef79c 100644 --- a/lib/fluent/plugin/multi_output.rb +++ b/lib/fluent/plugin/multi_output.rb @@ -182,8 +182,12 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize do - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) end rescue diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 00ec6624ac..f61d23c1b7 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -836,9 +836,13 @@ def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end rescue @num_errors_metrics.inc @@ -1005,9 +1009,13 @@ def handle_stream_with_custom_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end @@ -1025,9 +1033,13 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end @@ -1053,9 +1065,13 @@ def handle_stream_simple(tag, es, enqueue: false) write_guard do @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end - @counter_mutex.synchronize do + if @enable_size_metrics + @counter_mutex.synchronize do + @emit_records_metrics.add(es.size) + @emit_size_metrics.add(es.to_msgpack_stream.bytesize) + end + else @emit_records_metrics.add(es.size) - @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end true end