Skip to content

Commit

Permalink
Use local thread unpacker not to create msgpacker object duplicatedly
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Aug 16, 2019
1 parent 9aca889 commit f4186cf
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 16 deletions.
24 changes: 12 additions & 12 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def slice(index, num)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def each(&block)
def each(unapcker: nil, &block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

Expand Down Expand Up @@ -107,7 +107,7 @@ def slice(index, num)
end
end

def each(&block)
def each(unpacker: nil, &block)
block.call(@time, @record)
nil
end
Expand Down Expand Up @@ -143,7 +143,7 @@ def slice(index, num)
ArrayEventStream.new(@entries.slice(index, num))
end

def each(&block)
def each(unpacker: nil, &block)
@entries.each(&block)
nil
end
Expand Down Expand Up @@ -190,7 +190,7 @@ def slice(index, num)
MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num))
end

def each(&block)
def each(unpacker: nil, &block)
time_array = @time_array
record_array = @record_array
for i in 0..time_array.length-1
Expand Down Expand Up @@ -234,11 +234,11 @@ def repeatable?
true
end

def ensure_unpacked!
def ensure_unpacked!(unpacker: nil)
return if @unpacked_times && @unpacked_records
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
(unpacker || msgpack_unpacker).feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
end
Expand All @@ -254,15 +254,15 @@ def slice(index, num)
MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num))
end

def each(&block)
def each(unpacker: nil, &block)
if @unpacked_times
@unpacked_times.each_with_index do |time, i|
block.call(time, @unpacked_records[i])
end
else
@unpacked_times = []
@unpacked_records = []
msgpack_unpacker.feed_each(@data) do |time, record|
(unpacker || msgpack_unpacker).feed_each(@data) do |time, record|
@unpacked_times << time
@unpacked_records << record
block.call(time, record)
Expand Down Expand Up @@ -290,12 +290,12 @@ def empty?
super
end

def ensure_unpacked!
def ensure_unpacked!(unpacker: nil)
ensure_decompressed!
super
end

def each(&block)
def each(unpacker: nil, &block)
ensure_decompressed!
super
end
Expand All @@ -322,9 +322,9 @@ module ChunkMessagePackEventStreamer
include MessagePackFactory::Mixin
# chunk.extend(ChunkEventStreamer)
# => chunk.each{|time, record| ... }
def each(&block)
def each(unpacker: nil, &block)
open do |io|
msgpack_unpacker(io).each(&block)
(unpacker || msgpack_unpacker(io)).each(&block)
end
nil
end
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/event_router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require 'fluent/match'
require 'fluent/event'
require 'fluent/filter'
require 'fluent/msgpack_factory'

module Fluent
#
Expand Down Expand Up @@ -143,6 +144,7 @@ def initialize
@filters = []
@output = nil
@optimizer = FilterOptimizer.new
@unpacker = Fluent::MessagePackFactory.unpacker
end

def add_filter(filter)
Expand Down Expand Up @@ -182,7 +184,7 @@ def filter_stream(tag, es)

def optimized_filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
es.each(unpacker: @unpacker) do |time, record|
filtered_record = record
filtered_time = time

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/msgpack_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,9 @@ def self.init
def self.thread_local_msgpack_packer
Thread.current[:local_msgpack_packer] ||= MessagePackFactory.engine_factory.packer
end

def self.thread_local_msgpack_unpacker
Thread.current[:local_msgpack_unpacker] ||= MessagePackFactory.engine_factory.unpacker
end
end
end
7 changes: 4 additions & 3 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require 'fluent/plugin/base'
require 'fluent/plugin/buffer'
require 'fluent/plugin_helper/record_accessor'
require 'fluent/msgpack_factory'
require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'
Expand Down Expand Up @@ -944,7 +945,7 @@ def generate_format_proc
def handle_stream_with_custom_format(tag, es, enqueue: false)
meta_and_data = {}
records = 0
es.each do |time, record|
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= []
res = format(tag, time, record)
Expand All @@ -964,7 +965,7 @@ def handle_stream_with_standard_format(tag, es, enqueue: false)
format_proc = generate_format_proc
meta_and_data = {}
records = 0
es.each do |time, record|
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= MultiEventStream.new
meta_and_data[meta].add(time, record)
Expand All @@ -984,7 +985,7 @@ def handle_stream_simple(tag, es, enqueue: false)
if @custom_format
records = 0
data = []
es.each do |time, record|
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
res = format(tag, time, record)
if res
data << res
Expand Down

0 comments on commit f4186cf

Please sign in to comment.