From f4a4591af8ac12344438621a72adf6399a1d3112 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 26 Mar 2024 18:08:43 +0900 Subject: [PATCH] buffer: fix emit error of race condition After 95438b2eeb4bb5586a4031c1fbc35756f3c12565 (#4342), there is a section where chunks do not have a lock in `write_step_by_step()`. `write_step_by_step()` must ensure their locks until passing them to the block. Otherwise, race condition can occur and it can cause emit error by IOError. Example of warning messages of emit error: [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location=... [warn]: #0 send an error event stream to @ERROR: error_class=IOError error="closed stream" location=... Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/buffer.rb | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 0251de3409..ce88c15ab2 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -764,8 +764,10 @@ def write_step_by_step(metadata, data, format, splits_count, &block) while writing_splits_index < splits.size chunk = get_next_chunk.call errors = [] + # The chunk must be locked until being passed to &block. + chunk.mon_enter modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors} - chunk.synchronize do + raise ShouldRetry unless chunk.writable? staged_chunk_used = true if chunk.staged? @@ -851,7 +853,6 @@ def write_step_by_step(metadata, data, format, splits_count, &block) end modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize - end end modified_chunks.each do |data| block.call(data[:chunk], data[:adding_bytesize], data[:errors]) @@ -863,9 +864,15 @@ def write_step_by_step(metadata, data, format, splits_count, &block) if chunk.unstaged? chunk.purge rescue nil end + chunk.mon_exit rescue nil end enqueue_chunk(metadata) if enqueue_chunk_before_retry retry + ensure + modified_chunks.each do |data| + chunk = data[:chunk] + chunk.mon_exit + end end STATS_KEYS = [