Skip to content

Commit

Permalink
Merge pull request #1062 from fluent/split-events-emitted-at-once-to-…
Browse files Browse the repository at this point in the history
…multi-chunks

Split events emitted at once to multi chunks
  • Loading branch information
tagomoris authored Jul 20, 2016
2 parents d9cb550 + fe3a860 commit 19b6e05
Show file tree
Hide file tree
Showing 14 changed files with 776 additions and 206 deletions.
9 changes: 4 additions & 5 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def handle_stream_simple(tag, es, enqueue: false)
# on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically
meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil))
write_guard do
@buffer.write({meta => [data, size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
Expand All @@ -325,17 +325,16 @@ def handle_stream_simple(tag, es, enqueue: false)
size = es.size
bulk = format_stream(tag, es)
write_guard do
@buffer.write({meta => [bulk, size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
end

meta = metadata(nil, nil, nil)
es_size = 0
es_bulk = es.map{|time,record| es_size += 1; format(tag, time, record) }.join
data = es.map{|time,record| format(tag, time, record) }
write_guard do
@buffer.write({meta => [es_bulk, es_size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => data}, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += es_size }
[meta]
Expand Down
68 changes: 61 additions & 7 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def repeatable?
false
end

def slice(index, num)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def each(&block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end
Expand Down Expand Up @@ -124,6 +128,10 @@ def empty?
@entries.empty?
end

def slice(index, num)
ArrayEventStream.new(@entries.slice(index, num))
end

def each(&block)
@entries.each(&block)
nil
Expand Down Expand Up @@ -167,6 +175,10 @@ def empty?
@time_array.empty?
end

def slice(index, num)
MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num))
end

def each(&block)
time_array = @time_array
record_array = @record_array
Expand All @@ -178,32 +190,74 @@ def each(&block)
end

class MessagePackEventStream < EventStream
# Keep cached_unpacker argument for existence plugins
def initialize(data, cached_unpacker = nil, size = 0)
# https://github.com/msgpack/msgpack-ruby/issues/119

# Keep cached_unpacker argument for existing plugins
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
@data = data
@size = size
@unpacked_times = unpacked_times
@unpacked_records = unpacked_records
end

def empty?
# This is not correct, but actual number of records will be shown after iteration, and
# "size" argument is always 0 currently (because forward protocol doesn't tell it to destination)
false
@data.empty?
end

def dup
MessagePackEventStream.new(@data.dup, @size)
if @unpacked_times
MessagePackEventStream.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup))
else
MessagePackEventStream.new(@data.dup, nil, @size)
end
end

def size
# @size is unbelievable always when @size == 0
# If the number of events is really zero, unpacking events takes very short time.
ensure_unpacked! if @size == 0
@size
end

def repeatable?
true
end

def ensure_unpacked!
return if @unpacked_times && @unpacked_records
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
end
# @size should be updated always right after unpack.
# The real size of unpacked objects are correct, rather than given size.
@size = @unpacked_times.size
end

# This method returns MultiEventStream, because there are no reason
# to surve binary serialized by msgpack.
def slice(index, num)
ensure_unpacked!
MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end

def each(&block)
msgpack_unpacker.feed_each(@data, &block)
if @unpacked_times
@unpacked_times.each_with_index do |time, i|
block.call(time, @unpacked_records[i])
end
else
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
block.call(time, record)
end
@size = @unpacked_times.size
end
nil
end

Expand Down
Loading

0 comments on commit 19b6e05

Please sign in to comment.