Skip to content

Commit

Permalink
writing
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed May 9, 2016
1 parent dacbfff commit 5b23cc0
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 19 deletions.
88 changes: 69 additions & 19 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,36 +168,85 @@ def metadata(timekey: nil, tag: nil, variables: nil)

# metadata MUST have consistent object_id for each variation
# data MUST be Array of serialized events
def emit(metadata, data, force: false)
return if data.size < 1
# metadata_and_data MUST be a hash of { metadata => data }
def emit(metadata_and_data, bulk: false)
return if metadata_and_data.size < 1
raise BufferOverflowError unless storable?

stored = false
staged_size = 0
operated_chunks = []

begin
metadata_and_data.each do |metadata, data|
chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }

chunk.synchronize do
original_bytesize = chunk.bytesize
begin
chunk.append(data)
if chunk_size_over?(chunk)
chunk.rollback
else
stored = true
staged_size += (chunk.bytesize - original_bytesize)
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
end
rescue
chunk.rollback
raise
end
end
next if stored

# try step-by-step appending if data can't be stored into existing a chunk
chunks, appended_bytesize = emit_step_by_step(metadata, data)
operated_chunks.concat(chunks)
staged_size += appended_bytesize
end

# the case whole data can be stored in staged chunk: almost all emits will success
chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }
original_bytesize = chunk.bytesize
chunk.synchronize do
first_chunk = operated_chunks.shift
# Following commits for other chunks also can finish successfully if the first commit operation
# finishes without any exceptions.
# In most cases, #commit just requires very small disk spaces, so major failure reason are
# permission errors, disk failures and other permanent(fatal) errors.
begin
chunk.append(data)
if !chunk_size_over?(chunk) || force
first_chunk.commit
first_chunk.mon_exit
rescue
operated_chunks.unshift(first_chunk)
raise
end

errors = []
# Buffer plugin estimates there's no serious error cause: will commit for all chunks eigher way
operated_chunks.each do |chunk|
begin
chunk.commit
stored = true
@stage_size += (chunk.bytesize - original_bytesize)
else
chunk.mon_exit
rescue => e
chunk.rollback
chunk.mon_exit
errors << e
end
rescue
chunk.rollback
raise
end
end
return if stored

# try step-by-step appending if data can't be stored into existing a chunk
emit_step_by_step(metadata, data)
@stage_size += staged_size

if errors.size > 0
log.warn "error occurs in committing chunks: only first one raised", errors: errors.map(&:class)
raise errors.first
end
rescue
operated_chunks.each do |chunk|
chunk.rollback rescue nil # nothing possible to do for #rollback failure
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
raise
end
end

# TODO: merge into #emit w/ metadata_and_data support
def emit_bulk(metadata, bulk, size)
return if bulk.nil? || bulk.empty?
raise BufferOverflowError unless storable?
Expand Down Expand Up @@ -361,6 +410,7 @@ def chunk_size_full?(chunk)
chunk.bytesize >= @chunk_bytes_limit || (@chunk_records_limit && chunk.size >= @chunk_records_limit)
end

# TODO: add metadata_and_data support
def emit_step_by_step(metadata, data)
attempt_records = data.size / 3

Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class Output < Base

config_param :delayed_commit_timeout, :time, default: 60, desc: 'Seconds of timeout for buffer chunks to be committed by plugins later.'

config_param :overflow_action, :enum, list: [:exception, :block], default: :exception, desc: 'The action when the size of buffer exceeds the limit.'

config_param :retry_forever, :bool, default: false, desc: 'If true, plugin will ignore retry_timeout and retry_max_times options and retry flushing forever.'
config_param :retry_timeout, :time, default: 72 * 60 * 60, desc: 'The maximum seconds to retry to flush while failing, until plugin discards buffer chunks.'
# 72hours == 17 times with exponential backoff (not to change default behavior)
Expand Down

0 comments on commit 5b23cc0

Please sign in to comment.