Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split events emitted at once to multi chunks #1062

Merged
merged 16 commits into from
Jul 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def handle_stream_simple(tag, es, enqueue: false)
# on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically
meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil))
write_guard do
@buffer.write({meta => [data, size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
Expand All @@ -325,17 +325,16 @@ def handle_stream_simple(tag, es, enqueue: false)
size = es.size
bulk = format_stream(tag, es)
write_guard do
@buffer.write({meta => [bulk, size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += size }
return [meta]
end

meta = metadata(nil, nil, nil)
es_size = 0
es_bulk = es.map{|time,record| es_size += 1; format(tag, time, record) }.join
data = es.map{|time,record| format(tag, time, record) }
write_guard do
@buffer.write({meta => [es_bulk, es_size]}, bulk: true, enqueue: enqueue)
@buffer.write({meta => data}, enqueue: enqueue)
end
@counters_monitor.synchronize{ @emit_records += es_size }
[meta]
Expand Down
68 changes: 61 additions & 7 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def repeatable?
false
end

def slice(index, num)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def each(&block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end
Expand Down Expand Up @@ -124,6 +128,10 @@ def empty?
@entries.empty?
end

def slice(index, num)
ArrayEventStream.new(@entries.slice(index, num))
end

def each(&block)
@entries.each(&block)
nil
Expand Down Expand Up @@ -167,6 +175,10 @@ def empty?
@time_array.empty?
end

def slice(index, num)
MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num))
end

def each(&block)
time_array = @time_array
record_array = @record_array
Expand All @@ -178,32 +190,74 @@ def each(&block)
end

class MessagePackEventStream < EventStream
# Keep cached_unpacker argument for existence plugins
def initialize(data, cached_unpacker = nil, size = 0)
# https://github.com/msgpack/msgpack-ruby/issues/119

# Keep cached_unpacker argument for existing plugins
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
@data = data
@size = size
@unpacked_times = unpacked_times
@unpacked_records = unpacked_records
end

def empty?
# This is not correct, but actual number of records will be shown after iteration, and
# "size" argument is always 0 currently (because forward protocol doesn't tell it to destination)
false
@data.empty?
end

def dup
MessagePackEventStream.new(@data.dup, @size)
if @unpacked_times
MessagePackEventStream.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup))
else
MessagePackEventStream.new(@data.dup, nil, @size)
end
end

def size
# @size is unbelievable always when @size == 0
# If the number of events is really zero, unpacking events takes very short time.
ensure_unpacked! if @size == 0
@size
end

def repeatable?
true
end

def ensure_unpacked!
return if @unpacked_times && @unpacked_records
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
end
# @size should be updated always right after unpack.
# The real size of unpacked objects are correct, rather than given size.
@size = @unpacked_times.size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After unpacked, @data should be set to nil for GC?

Copy link
Member

@repeatedly repeatedly Jul 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... @data is used in empty?.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@data should be kept as is, because #to_msgpack_stream returns @data itself.

end

# This method returns MultiEventStream, because there are no reason
# to surve binary serialized by msgpack.
def slice(index, num)
ensure_unpacked!
MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end

def each(&block)
msgpack_unpacker.feed_each(@data, &block)
if @unpacked_times
@unpacked_times.each_with_index do |time, i|
block.call(time, @unpacked_records[i])
end
else
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
block.call(time, record)
end
@size = @unpacked_times.size
end
nil
end

Expand Down
Loading