Skip to content

Commit

Permalink
buffer: Avoid to process discarded chunks in write_step_by_step
Browse files Browse the repository at this point in the history
It fixes following error when many `chunk bytes limit exceeds` errors
are occurred:
```
2020-07-28 14:59:26 +0000 [warn]: #0 emit transaction failed: error_class=IOError error="closed stream" location="/fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'" tag="cafiscode-eks-cluster.default"
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `pos'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer/file_chunk.rb:82:in `rollback'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:339:in `rescue in block in write'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:332:in `block in write'
  2020-07-28 14:59:26 +0000 [warn]: #0 /fluentd/vendor/bundle/ruby/2.6.0/gems/fluentd-1.11.1/lib/fluent/plugin/buffer.rb:331:in `each'
  ...
```

Fix #3089

Signed-off-by: Takuro Ashie <[email protected]>
  • Loading branch information
ashie committed Dec 12, 2023
1 parent 7e9eba7 commit 92d09bd
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,6 @@ def write_once(metadata, data, format: nil, size: nil, &block)

def write_step_by_step(metadata, data, format, splits_count, &block)
splits = []
errors = []
if splits_count > data.size
splits_count = data.size
end
Expand Down Expand Up @@ -757,15 +756,15 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
else
synchronize { @stage[modified_metadata] ||= generate_chunk(modified_metadata).staged! }
end
modified_chunks << c
c
modified_chunks << {chunk: c, adding_bytesize: 0, errors: []}
return c, modified_chunks.last[:errors]
}

writing_splits_index = 0
enqueue_chunk_before_retry = false

while writing_splits_index < splits.size
chunk = get_next_chunk.call
chunk, errors = get_next_chunk.call
chunk.synchronize do
raise ShouldRetry unless chunk.writable?
staged_chunk_used = true if chunk.staged?
Expand Down Expand Up @@ -851,15 +850,18 @@ def write_step_by_step(metadata, data, format, splits_count, &block)
raise
end

block.call(chunk, chunk.bytesize - original_bytesize, errors)
errors = []
modified_chunks.last[:adding_bytesize] = chunk.bytesize - original_bytesize
end
end
modified_chunks.each do |data|
block.call(data[:chunk], data[:adding_bytesize], data[:errors])
end
rescue ShouldRetry
modified_chunks.each do |mc|
mc.rollback rescue nil
if mc.unstaged?
mc.purge rescue nil
modified_chunks.each do |data|
chunk = data[:chunk]
chunk.rollback rescue nil
if chunk.unstaged?
chunk.purge rescue nil
end
end
enqueue_chunk(metadata) if enqueue_chunk_before_retry
Expand Down

0 comments on commit 92d09bd

Please sign in to comment.