Skip to content

Commit

Permalink
Merge pull request #1721 from abicky/hotfix/lock-chunks-first
Browse files Browse the repository at this point in the history
Always lock chunks first to avoid deadlock
  • Loading branch information
repeatedly authored Oct 23, 2017
2 parents 8bb26a3 + 2261428 commit 0848224
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,24 @@ def queued?(metadata=nil)

def enqueue_chunk(metadata)
log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata
synchronize do
chunk = @stage.delete(metadata)
return nil unless chunk
chunk = synchronize do
@stage.delete(metadata)
end
return nil unless chunk

chunk.synchronize do
chunk.synchronize do
synchronize do
if chunk.empty?
chunk.close
else
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued! if chunk.respond_to?(:enqueued!)
chunk.enqueued!
end
bytesize = chunk.bytesize
@stage_size -= bytesize
@queue_size += bytesize
end
bytesize = chunk.bytesize
@stage_size -= bytesize
@queue_size += bytesize
end
nil
end
Expand All @@ -340,25 +342,26 @@ def enqueue_unstaged_chunk(chunk)
metadata = chunk.metadata
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued! if chunk.respond_to?(:enqueued!)
chunk.enqueued!
end
@queue_size += chunk.bytesize
end
end

def enqueue_all
log.trace "enqueueing all chunks in buffer", instance: self.object_id
synchronize do
if block_given?
@stage.keys.each do |metadata|
chunk = @stage[metadata]
v = yield metadata, chunk
enqueue_chunk(metadata) if v
end
else
@stage.keys.each do |metadata|
enqueue_chunk(metadata)
end
if block_given?
synchronize{ @stage.keys }.each do |metadata|
# NOTE: The following line might cause data race depending on Ruby implementations except CRuby
# cf. https://github.com/fluent/fluentd/pull/1721#discussion_r146170251
chunk = @stage[metadata]
next unless chunk
v = yield metadata, chunk
enqueue_chunk(metadata) if v
end
else
synchronize{ @stage.keys }.each do |metadata|
enqueue_chunk(metadata)
end
end
end
Expand Down

0 comments on commit 0848224

Please sign in to comment.