Skip to content

Commit

Permalink
add optional argument to pass size of es (mainly for compat layer)
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed Jul 4, 2016
1 parent 5e051e2 commit 14c7719
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
9 changes: 4 additions & 5 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def handle_stream_simple(tag, es, enqueue: false)
# on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically
meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil))
write_guard do
@buffer.write({meta => [data, size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => data}, format: ->(data){ data }, size: ->(){ size }, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
Expand All @@ -331,17 +331,16 @@ def handle_stream_simple(tag, es, enqueue: false)
size = es.size
bulk = format_stream(tag, es)
write_guard do
@buffer.write({meta => [bulk, size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => bulk}, format: ->(data){ data }, size: ->(){ size }, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
end

meta = metadata(nil, nil, nil)
es_size = 0
es_bulk = es.map{|time,record| es_size += 1; format(tag, time, record) }.join
data = es.map{|time,record| format(tag, time, record) }
write_guard do
@buffer.write({meta => [es_bulk, es_size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => data}, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += es_size }
[meta]
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def metadata(timekey: nil, tag: nil, variables: nil)
# metadata MUST have consistent object_id for each variation
# data MUST be Array of serialized events, or EventStream
# metadata_and_data MUST be a hash of { metadata => data }
def write(metadata_and_data, format: nil, enqueue: false)
def write(metadata_and_data, format: nil, size: nil, enqueue: false)
return if metadata_and_data.size < 1
raise BufferOverflowError, "buffer space has too many data" unless storable?

Expand All @@ -185,7 +185,7 @@ def write(metadata_and_data, format: nil, enqueue: false)

begin
metadata_and_data.each do |metadata, data|
write_once(metadata, data, format: format) do |chunk, adding_bytesize|
write_once(metadata, data, format: format, size: size) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
if chunk.unstaged?
Expand Down Expand Up @@ -416,7 +416,7 @@ class ShouldRetry < StandardError; end
# 3. enqueue existing chunk & retry whole method if chunk was not empty
# 4. go to step_by_step writing

def write_once(metadata, data, format: nil, &block)
def write_once(metadata, data, format: nil, size: nil, &block)
return if data.empty?

stored = false
Expand All @@ -434,7 +434,7 @@ def write_once(metadata, data, format: nil, &block)
begin
if format
serialized = format.call(data)
chunk.concat(serialized, data.size)
chunk.concat(serialized, size ? size.call : data.size)
else
chunk.append(data)
end
Expand Down

0 comments on commit 14c7719

Please sign in to comment.