Skip to content

Commit

Permalink
input: filter: bare_output: multi_output: output: Don't use monitor lock
Browse files Browse the repository at this point in the history
if needn't

Signed-off-by: Hiroshi Hatake <[email protected]>
  • Loading branch information
cosmo0920 committed Jul 30, 2021
1 parent 123a5f2 commit 97486d1
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 16 deletions.
8 changes: 6 additions & 2 deletions lib/fluent/plugin/bare_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ def emit_sync(tag, es)
@emit_count_metrics.inc
begin
process(tag, es)
@counter_mutex.synchronize do
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
rescue
Expand Down
8 changes: 6 additions & 2 deletions lib/fluent/plugin/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ def statistics
end

def measure_metrics(es)
@counter_mutex.synchronize do
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end
end

Expand Down
8 changes: 6 additions & 2 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ def statistics
end

def metric_callback(es)
@counter_mutex.synchronize do
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end
end

Expand Down
8 changes: 6 additions & 2 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,12 @@ def emit_sync(tag, es)
@emit_count_metrics.inc
begin
process(tag, es)
@counter_mutex.synchronize do
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
end
rescue
Expand Down
32 changes: 24 additions & 8 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -836,9 +836,13 @@ def emit_sync(tag, es)
@emit_count_metrics.inc
begin
process(tag, es)
@counter_mutex.synchronize do
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end
rescue
@num_errors_metrics.inc
Expand Down Expand Up @@ -1005,9 +1009,13 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)
write_guard do
@buffer.write(meta_and_data, enqueue: enqueue)
end
@counter_mutex.synchronize do
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end
true
end
Expand All @@ -1025,9 +1033,13 @@ def handle_stream_with_standard_format(tag, es, enqueue: false)
write_guard do
@buffer.write(meta_and_data, format: format_proc, enqueue: enqueue)
end
@counter_mutex.synchronize do
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end
true
end
Expand All @@ -1053,9 +1065,13 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, format: format_proc, enqueue: enqueue)
end
@counter_mutex.synchronize do
if @enable_size_metrics
@counter_mutex.synchronize do
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize)
end
else
@emit_records_metrics.add(es.size)
@emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end
true
end
Expand Down

0 comments on commit 97486d1

Please sign in to comment.