-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Always lock chunks first to avoid deadlock #1721
Conversation
All chunk classes which inherit Chunk have enqueued! method.
@repeatedly Could you review the change? I want to try the latest version (v0.14.22.rc2?) which includes this change in our production environment if it looks good to you. |
Thanks! I will review it! |
I found there were errors like |
This commit resolves fluent#1549. For example, deadlock occurs by the following steps before this commit: 1. input plugin thread receives multiple events (metadata_and_data.size > 1) 2. input plugin thread processes the first metadata_and_data element and aquires the lock of chunk1 (chunk.mon_exit in Buffer#write) 3. enqueue thread aquires the lock of buffer (synchronize in Buffer#enqueue_chunk) 4. enqueue thread tries to aquire the lock of buffer (chunk.synchronize in Buffer#enqueue_chunk) 5. input plugin thread processes the second metadata_and_data element and tries to aquire the lock of buffer (synchronize in Buffer#write_once)
93088bf
to
723038c
Compare
723038c
to
1a6f16b
Compare
I fixed "closed stream" error in 1a6f16b. @repeatedly Please review. I've checked the behavior in our development environment and it works fine. I want your review before I try it in our production environment. |
lib/fluent/plugin/buffer.rb
Outdated
end | ||
if block_given? | ||
synchronize{ @stage.keys }.each do |metadata| | ||
chunk = @stage[metadata] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need synchronized
guard here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... deleting metadata from @stage
happens in only enqueue_chunk
so it seems safe in CRuby.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think data race doesn't occur in CRuby thanks to GIL and synchronized
is not necessary, but I'll change the code like below for other Ruby implementations:
diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb
index 337b3b79..61a904ed 100644
--- a/lib/fluent/plugin/buffer.rb
+++ b/lib/fluent/plugin/buffer.rb
@@ -351,11 +351,15 @@ module Fluent
def enqueue_all
log.trace "enqueueing all chunks in buffer", instance: self.object_id
if block_given?
- synchronize{ @stage.keys }.each do |metadata|
- chunk = @stage[metadata]
- next unless chunk
- v = yield metadata, chunk
- enqueue_chunk(metadata) if v
+ metadata_array = []
+ synchronize do
+ @stage.each do |metadata, chunk|
+ v = yield metadata, chunk
+ metadata_array << metadata if v
+ end
+ end
+ metadata_array.each do |metadata|
+ enqueue_chunk(metadata)
end
else
synchronize{ @stage.keys }.each do |metadata|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and currently fluentd focus on CRuby so I think code change is not needed for now.
But adding comment is better for the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okey, I added a comment 2261428
I re-run failed tests and all tests are passed. |
This PR resolves #1549.
For example, deadlock occurs by the following steps before this commit:
(
metadata_and_data.size > 1
)metadata_and_data
elementand aquires the lock of chunk1 (
chunk.mon_exit
inBuffer#write
)(
synchronize
inBuffer#enqueue_chunk
)(
chunk.synchronize
inBuffer#enqueue_chunk
)metadata_and_data
elementand tries to aquire the lock of buffer (
synchronize
inBuffer#write_once
)cf. #1549 (comment)