diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index be66f3e87e..954d07c7c2 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -313,22 +313,24 @@ def queued?(metadata=nil) def enqueue_chunk(metadata) log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata - synchronize do - chunk = @stage.delete(metadata) - return nil unless chunk + chunk = synchronize do + @stage.delete(metadata) + end + return nil unless chunk - chunk.synchronize do + chunk.synchronize do + synchronize do if chunk.empty? chunk.close else @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 - chunk.enqueued! if chunk.respond_to?(:enqueued!) + chunk.enqueued! end + bytesize = chunk.bytesize + @stage_size -= bytesize + @queue_size += bytesize end - bytesize = chunk.bytesize - @stage_size -= bytesize - @queue_size += bytesize end nil end @@ -340,7 +342,7 @@ def enqueue_unstaged_chunk(chunk) metadata = chunk.metadata @queue << chunk @queued_num[metadata] = @queued_num.fetch(metadata, 0) + 1 - chunk.enqueued! if chunk.respond_to?(:enqueued!) + chunk.enqueued! end @queue_size += chunk.bytesize end @@ -348,17 +350,18 @@ def enqueue_unstaged_chunk(chunk) def enqueue_all log.trace "enqueueing all chunks in buffer", instance: self.object_id - synchronize do - if block_given? - @stage.keys.each do |metadata| - chunk = @stage[metadata] - v = yield metadata, chunk - enqueue_chunk(metadata) if v - end - else - @stage.keys.each do |metadata| - enqueue_chunk(metadata) - end + if block_given? + synchronize{ @stage.keys }.each do |metadata| + # NOTE: The following line might cause data race depending on Ruby implementations except CRuby + # cf. https://github.com/fluent/fluentd/pull/1721#discussion_r146170251 + chunk = @stage[metadata] + next unless chunk + v = yield metadata, chunk + enqueue_chunk(metadata) if v + end + else + synchronize{ @stage.keys }.each do |metadata| + enqueue_chunk(metadata) end end end