Skip to content

Commit

Permalink
Always lock chunks first to avoid deadlock
Browse files Browse the repository at this point in the history
This commit resolves #1549.

For example, deadlock occurs by the following steps before this commit:

1. input plugin thread receives multiple events
  (metadata_and_data.size > 1)
2. input plugin thread processes the first metadata_and_data element
  and aquires the lock of chunk1 (chunk.mon_exit in Buffer#write)
3. enqueue thread aquires the lock of buffer
  (synchronize in Buffer#enqueue_chunk)
4. enqueue thread tries to aquire the lock of buffer
  (chunk.synchronize in Buffer#enqueue_chunk)
5. input plugin thread processes the second metadata_and_data element
  and tries to aquire the lock of buffer (synchronize in Buffer#write_once)
  • Loading branch information
abicky committed Oct 20, 2017
1 parent e845897 commit ed9f921
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,20 +313,24 @@ def queued?(metadata=nil)

def enqueue_chunk(metadata)
log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata
synchronize do
chunk = @stage.delete(metadata)
return nil unless chunk
chunk = synchronize do
@stage.delete(metadata)
end
return nil unless chunk

chunk.synchronize do
if chunk.empty?
chunk.close
else
chunk.synchronize do
if chunk.empty?
chunk.close
else
synchronize do
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued!
end
chunk.enqueued!
end
bytesize = chunk.bytesize
end
bytesize = chunk.bytesize
synchronize do
@stage_size -= bytesize
@queue_size += bytesize
end
Expand All @@ -348,17 +352,16 @@ def enqueue_unstaged_chunk(chunk)

def enqueue_all
log.trace "enqueueing all chunks in buffer", instance: self.object_id
synchronize do
if block_given?
@stage.keys.each do |metadata|
chunk = @stage[metadata]
v = yield metadata, chunk
enqueue_chunk(metadata) if v
end
else
@stage.keys.each do |metadata|
enqueue_chunk(metadata)
end
if block_given?
synchronize{ @stage.keys }.each do |metadata|
chunk = @stage[metadata]
next unless chunk
v = yield metadata, chunk
enqueue_chunk(metadata) if v
end
else
synchronize{ @stage.keys }.each do |metadata|
enqueue_chunk(metadata)
end
end
end
Expand Down

0 comments on commit ed9f921

Please sign in to comment.