Skip to content

Commit

Permalink
buffer: fix emit error of race condition
Browse files Browse the repository at this point in the history
After 95438b2 (#4342), there is a
section where chunks do not have a lock in `write_step_by_step()`.

`write_step_by_step()` must ensure their locks until passing them
to the block.
Otherwise, race condition can occur and it can cause emit error
by IOError.
Example of warning messages of emit error:

    [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location=...
    [warn]: #0 send an error event stream to @error: error_class=IOError error="closed stream" location=...

Signed-off-by: Daijiro Fukuda <[email protected]>
  • Loading branch information
daipom committed Mar 26, 2024
1 parent eac830f commit f4a4591
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,10 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
while writing_splits_index < splits.size
chunk = get_next_chunk.call
errors = []
# The chunk must be locked until being passed to &block.
chunk.mon_enter
modified_chunks << {chunk: chunk, adding_bytesize: 0, errors: errors}
chunk.synchronize do

raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?

Expand Down Expand Up @@ -851,7 +853,6 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
end

modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
end
end
modified_chunks.each do |data|
block.call(data[:chunk], data[:adding_bytesize], data[:errors])
Expand All @@ -863,9 +864,15 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
if chunk.unstaged?
chunk.purge rescue nil
end
chunk.mon_exit rescue nil
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
retry
ensure
modified_chunks.each do |data|
chunk = data[:chunk]
chunk.mon_exit
end
end

STATS_KEYS = [
Expand Down

0 comments on commit f4a4591

Please sign in to comment.