Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Mutex instead of Monitor #2561

Merged
merged 1 commit into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
@counter_mutex.synchronize{ @emit_records += size }
return [meta]
end

Expand All @@ -363,7 +363,7 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
@counter_mutex.synchronize{ @emit_records += size }
return [meta]
end

Expand All @@ -373,7 +373,7 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
@counter_mutex.synchronize{ @emit_records += size }
[meta]
end

Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/bare_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ def process(tag, es)

def initialize
super
@counters_monitor = Monitor.new
@counter_mutex = Mutex.new
# TODO: well organized counters
@num_errors = 0
@emit_count = 0
@emit_records = 0
end

def emit_sync(tag, es)
@counters_monitor.synchronize{ @emit_count += 1 }
@counter_mutex.synchronize{ @emit_count += 1 }
begin
process(tag, es)
@counters_monitor.synchronize{ @emit_records += es.size }
@counter_mutex.synchronize{ @emit_records += es.size }
rescue
@counters_monitor.synchronize{ @num_errors += 1 }
@counter_mutex.synchronize{ @num_errors += 1 }
raise
end
end
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/multi_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def initialize
@outputs = []
@outputs_statically_created = false

@counters_monitor = Monitor.new
@counter_mutex = Mutex.new
# TODO: well organized counters
@num_errors = 0
@emit_count = 0
Expand Down Expand Up @@ -143,12 +143,12 @@ def terminate
end

def emit_sync(tag, es)
@counters_monitor.synchronize{ @emit_count += 1 }
@counter_mutex.synchronize{ @emit_count += 1 }
begin
process(tag, es)
@counters_monitor.synchronize{ @emit_records += es.size }
@counter_mutex.synchronize{ @emit_records += es.size }
rescue
@counters_monitor.synchronize{ @num_errors += 1 }
@counter_mutex.synchronize{ @num_errors += 1 }
raise
end
end
Expand Down
36 changes: 18 additions & 18 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def expired?

def initialize
super
@counters_monitor = Monitor.new
@counter_mutex = Mutex.new
@buffering = false
@delayed_commit = false
@as_secondary = false
Expand Down Expand Up @@ -780,26 +780,26 @@ def emit_events(tag, es)
end

def emit_sync(tag, es)
@counters_monitor.synchronize{ @emit_count += 1 }
@counter_mutex.synchronize{ @emit_count += 1 }
begin
process(tag, es)
@counters_monitor.synchronize{ @emit_records += es.size }
@counter_mutex.synchronize{ @emit_records += es.size }
rescue
@counters_monitor.synchronize{ @num_errors += 1 }
@counter_mutex.synchronize{ @num_errors += 1 }
raise
end
end

def emit_buffered(tag, es)
@counters_monitor.synchronize{ @emit_count += 1 }
@counter_mutex.synchronize{ @emit_count += 1 }
begin
execute_chunking(tag, es, enqueue: (@flush_mode == :immediate))
if !@retry && @buffer.queued?
submit_flush_once
end
rescue
# TODO: separate number of errors into emit errors and write/flush errors
@counters_monitor.synchronize{ @num_errors += 1 }
@counter_mutex.synchronize{ @num_errors += 1 }
raise
end
end
Expand Down Expand Up @@ -956,7 +956,7 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)
write_guard do
@buffer.write(meta_and_data, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += records }
@counter_mutex.synchronize{ @emit_records += records }
true
end

Expand All @@ -973,7 +973,7 @@ def handle_stream_with_standard_format(tag, es, enqueue: false)
write_guard do
@buffer.write(meta_and_data, format: format_proc, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += records }
@counter_mutex.synchronize{ @emit_records += records }
true
end

Expand All @@ -998,7 +998,7 @@ def handle_stream_simple(tag, es, enqueue: false)
write_guard do
@buffer.write({meta => data}, format: format_proc, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += records }
@counter_mutex.synchronize{ @emit_records += records }
true
end

Expand Down Expand Up @@ -1036,7 +1036,7 @@ def rollback_write(chunk_id, update_retry: true)
# false if chunk was already flushed and couldn't be rollbacked unexpectedly
# in many cases, false can be just ignored
if @buffer.takeback_chunk(chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
@counter_mutex.synchronize{ @rollback_count += 1 }
if update_retry
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(chunk_id, @as_secondary)
Expand All @@ -1052,7 +1052,7 @@ def try_rollback_write
while @dequeued_chunks.first && @dequeued_chunks.first.expired?
info = @dequeued_chunks.shift
if @buffer.takeback_chunk(info.chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
@counter_mutex.synchronize{ @rollback_count += 1 }
log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(info.chunk_id, @as_secondary)
Expand All @@ -1067,7 +1067,7 @@ def try_rollback_all
until @dequeued_chunks.empty?
info = @dequeued_chunks.shift
if @buffer.takeback_chunk(info.chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
@counter_mutex.synchronize{ @rollback_count += 1 }
log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id)
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(info.chunk_id, @as_secondary)
Expand Down Expand Up @@ -1110,7 +1110,7 @@ def try_flush

if output.delayed_commit
log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
@counters_monitor.synchronize{ @write_count += 1 }
@counter_mutex.synchronize{ @write_count += 1 }
@dequeued_chunks_mutex.synchronize do
# delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>)
@dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout)
Expand All @@ -1122,7 +1122,7 @@ def try_flush
chunk_id = chunk.unique_id
dump_chunk_id = dump_unique_id_hex(chunk_id)
log.trace "adding write count", instance: self.object_id
@counters_monitor.synchronize{ @write_count += 1 }
@counter_mutex.synchronize{ @write_count += 1 }
log.trace "executing sync write", chunk: dump_chunk_id

output.write(chunk)
Expand Down Expand Up @@ -1178,7 +1178,7 @@ def try_flush
end

if @buffer.takeback_chunk(chunk.unique_id)
@counters_monitor.synchronize { @rollback_count += 1 }
@counter_mutex.synchronize { @rollback_count += 1 }
end

update_retry_state(chunk.unique_id, using_secondary, e)
Expand Down Expand Up @@ -1209,17 +1209,17 @@ def backup_chunk(chunk, using_secondary, delayed_commit)
def check_slow_flush(start)
elapsed_time = Fluent::Clock.now - start
elapsed_millsec = (elapsed_time * 1000).to_i
@counters_monitor.synchronize { @flush_time_count += elapsed_millsec }
@counter_mutex.synchronize { @flush_time_count += elapsed_millsec }
if elapsed_time > @slow_flush_log_threshold
@counters_monitor.synchronize { @slow_flush_count += 1 }
@counter_mutex.synchronize { @slow_flush_count += 1 }
log.warn "buffer flush took longer time than slow_flush_log_threshold:",
elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id
end
end

def update_retry_state(chunk_id, using_secondary, error = nil)
@retry_mutex.synchronize do
@counters_monitor.synchronize{ @num_errors += 1 }
@counter_mutex.synchronize{ @num_errors += 1 }
chunk_id_hex = dump_unique_id_hex(chunk_id)

unless @retry
Expand Down