Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always lock chunks first to avoid deadlock #1721

Merged
merged 4 commits into from
Oct 23, 2017
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -313,22 +313,24 @@ def queued?(metadata=nil)

def enqueue_chunk(metadata)
log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata
synchronize do
chunk = @stage.delete(metadata)
return nil unless chunk
chunk = synchronize do
@stage.delete(metadata)
end
return nil unless chunk

chunk.synchronize do
chunk.synchronize do
synchronize do
if chunk.empty?
chunk.close
else
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued! if chunk.respond_to?(:enqueued!)
chunk.enqueued!
end
bytesize = chunk.bytesize
@stage_size -= bytesize
@queue_size += bytesize
end
bytesize = chunk.bytesize
@stage_size -= bytesize
@queue_size += bytesize
end
nil
end
Expand All @@ -340,25 +342,24 @@ def enqueue_unstaged_chunk(chunk)
metadata = chunk.metadata
@queue << chunk
@queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1
chunk.enqueued! if chunk.respond_to?(:enqueued!)
chunk.enqueued!
end
@queue_size += chunk.bytesize
end
end

def enqueue_all
log.trace "enqueueing all chunks in buffer", instance: self.object_id
synchronize do
if block_given?
@stage.keys.each do |metadata|
chunk = @stage[metadata]
v = yield metadata, chunk
enqueue_chunk(metadata) if v
end
else
@stage.keys.each do |metadata|
enqueue_chunk(metadata)
end
if block_given?
synchronize{ @stage.keys }.each do |metadata|
chunk = @stage[metadata]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need synchronized guard here?

Copy link
Member

@repeatedly repeatedly Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... deleting metadata from @stage happens in only enqueue_chunk so it seems safe in CRuby.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think data race doesn't occur in CRuby thanks to GIL and synchronized is not necessary, but I'll change the code like below for other Ruby implementations:

diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb
index 337b3b79..61a904ed 100644
--- a/lib/fluent/plugin/buffer.rb
+++ b/lib/fluent/plugin/buffer.rb
@@ -351,11 +351,15 @@ module Fluent
       def enqueue_all
         log.trace "enqueueing all chunks in buffer", instance: self.object_id
         if block_given?
-          synchronize{ @stage.keys }.each do |metadata|
-            chunk = @stage[metadata]
-            next unless chunk
-            v = yield metadata, chunk
-            enqueue_chunk(metadata) if v
+          metadata_array = []
+          synchronize do
+            @stage.each do |metadata, chunk|
+              v = yield metadata, chunk
+              metadata_array << metadata if v
+            end
+          end
+          metadata_array.each do |metadata|
+            enqueue_chunk(metadata)
           end
         else
           synchronize{ @stage.keys }.each do |metadata|

Copy link
Member

@repeatedly repeatedly Oct 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and currently fluentd focus on CRuby so I think code change is not needed for now.
But adding comment is better for the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okey, I added a comment 2261428

next unless chunk
v = yield metadata, chunk
enqueue_chunk(metadata) if v
end
else
synchronize{ @stage.keys }.each do |metadata|
enqueue_chunk(metadata)
end
end
end
Expand Down