From e84589761b8c2abf7fd507fda5951988e166758a Mon Sep 17 00:00:00 2001 From: abicky Date: Tue, 17 Oct 2017 07:12:17 +0900 Subject: [PATCH 1/4] Refactoring: Remove unnecessary conditions All chunk classes which inherit Chunk have enqueued! method. --- lib/fluent/plugin/buffer.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index be66f3e87e..c2fbd7814f 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -323,7 +323,7 @@ def enqueue_chunk(metadata) else @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 - chunk.enqueued! if chunk.respond_to?(:enqueued!) + chunk.enqueued! end end bytesize = chunk.bytesize @@ -340,7 +340,7 @@ def enqueue_unstaged_chunk(chunk) metadata = chunk.metadata @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 - chunk.enqueued! if chunk.respond_to?(:enqueued!) + chunk.enqueued! end @queue_size += chunk.bytesize end From ed9f92188f5afca7cf8386da7a82d0e9841bbdd8 Mon Sep 17 00:00:00 2001 From: abicky Date: Fri, 20 Oct 2017 10:48:49 +0900 Subject: [PATCH 2/4] Always lock chunks first to avoid deadlock This commit resolves https://github.com/fluent/fluentd/issues/1549. For example, deadlock occurs by the following steps before this commit: 1. input plugin thread receives multiple events (metadata_and_data.size > 1) 2. input plugin thread processes the first metadata_and_data element and aquires the lock of chunk1 (chunk.mon_exit in Buffer#write) 3. enqueue thread aquires the lock of buffer (synchronize in Buffer#enqueue_chunk) 4. enqueue thread tries to aquire the lock of buffer (chunk.synchronize in Buffer#enqueue_chunk) 5. input plugin thread processes the second metadata_and_data element and tries to aquire the lock of buffer (synchronize in Buffer#write_once) --- lib/fluent/plugin/buffer.rb | 43 ++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index c2fbd7814f..f62e92a713 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -313,20 +313,24 @@ def queued?(metadata=nil) def enqueue_chunk(metadata) log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata - synchronize do - chunk = @stage.delete(metadata) - return nil unless chunk + chunk = synchronize do + @stage.delete(metadata) + end + return nil unless chunk - chunk.synchronize do - if chunk.empty? - chunk.close - else + chunk.synchronize do + if chunk.empty? + chunk.close + else + synchronize do @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 - chunk.enqueued! end + chunk.enqueued! end - bytesize = chunk.bytesize + end + bytesize = chunk.bytesize + synchronize do @stage_size -= bytesize @queue_size += bytesize end @@ -348,17 +352,16 @@ def enqueue_unstaged_chunk(chunk) def enqueue_all log.trace "enqueueing all chunks in buffer", instance: self.object_id - synchronize do - if block_given? - @stage.keys.each do |metadata| - chunk = @stage[metadata] - v = yield metadata, chunk - enqueue_chunk(metadata) if v - end - else - @stage.keys.each do |metadata| - enqueue_chunk(metadata) - end + if block_given? + synchronize{ @stage.keys }.each do |metadata| + chunk = @stage[metadata] + next unless chunk + v = yield metadata, chunk + enqueue_chunk(metadata) if v + end + else + synchronize{ @stage.keys }.each do |metadata| + enqueue_chunk(metadata) end end end From 1a6f16b4c2c06060c65837c1167d055ec93bded5 Mon Sep 17 00:00:00 2001 From: abicky Date: Fri, 20 Oct 2017 18:34:42 +0900 Subject: [PATCH 3/4] Process chunks with buffer lock in enqueue_chunk In flush threads @buffer could call destructive methods of chunks in @queue without chunk lock, so buffer lock is required. --- lib/fluent/plugin/buffer.rb | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index f62e92a713..337b3b79c8 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -319,21 +319,19 @@ def enqueue_chunk(metadata) return nil unless chunk chunk.synchronize do - if chunk.empty? - chunk.close - else - synchronize do + synchronize do + if chunk.empty? + chunk.close + else @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 + chunk.enqueued! end - chunk.enqueued! + bytesize = chunk.bytesize + @stage_size -= bytesize + @queue_size += bytesize end end - bytesize = chunk.bytesize - synchronize do - @stage_size -= bytesize - @queue_size += bytesize - end nil end From 2261428eab806a061e6e39824f6da125100386f1 Mon Sep 17 00:00:00 2001 From: abicky Date: Mon, 23 Oct 2017 15:54:36 +0900 Subject: [PATCH 4/4] [ci skip] Add a comment --- lib/fluent/plugin/buffer.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 337b3b79c8..954d07c7c2 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -352,6 +352,8 @@ def enqueue_all log.trace "enqueueing all chunks in buffer", instance: self.object_id if block_given? synchronize{ @stage.keys }.each do |metadata| + # NOTE: The following line might cause data race depending on Ruby implementations except CRuby + # cf. https://github.com/fluent/fluentd/pull/1721#discussion_r146170251 chunk = @stage[metadata] next unless chunk v = yield metadata, chunk