From 9aca889cd218108c46d317ba669c31558a00ca01 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 14 Aug 2019 12:58:31 +0900 Subject: [PATCH 1/3] Add thread local packer not to create msgpacker object duplicatedly Signed-off-by: Yuta Iwama --- lib/fluent/event.rb | 24 ++++++++++++------------ lib/fluent/msgpack_factory.rb | 4 ++++ lib/fluent/plugin/buffer/file_chunk.rb | 2 +- lib/fluent/plugin/output.rb | 8 ++++---- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 2fe2a2525d..ec754a6dc1 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -54,26 +54,26 @@ def each(&block) raise NotImplementedError, "DO NOT USE THIS CLASS directly." end - def to_msgpack_stream(time_int: false) - return to_msgpack_stream_forced_integer if time_int - out = msgpack_packer + def to_msgpack_stream(time_int: false, packer: nil) + return to_msgpack_stream_forced_integer(packer: packer) if time_int + out = packer || msgpack_packer each {|time,record| out.write([time,record]) } - out.to_s + out.full_pack end - def to_compressed_msgpack_stream(time_int: false) - packed = to_msgpack_stream(time_int: time_int) + def to_compressed_msgpack_stream(time_int: false, packer: nil) + packed = to_msgpack_stream(time_int: time_int, packer: packer) compress(packed) end - def to_msgpack_stream_forced_integer - out = msgpack_packer + def to_msgpack_stream_forced_integer(packer: nil) + out = packer || msgpack_packer each {|time,record| out.write([time.to_i,record]) } - out.to_s + out.full_pack end end @@ -272,7 +272,7 @@ def each(&block) nil end - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) # time_int is always ignored because @data is always packed binary in this class @data end @@ -300,7 +300,7 @@ def each(&block) super end - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) ensure_decompressed! super end @@ -330,7 +330,7 @@ def each(&block) end alias :msgpack_each :each - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) # time_int is always ignored because data is already packed and written in chunk read end diff --git a/lib/fluent/msgpack_factory.rb b/lib/fluent/msgpack_factory.rb index 54d0b9033e..e6ce0665e0 100644 --- a/lib/fluent/msgpack_factory.rb +++ b/lib/fluent/msgpack_factory.rb @@ -58,5 +58,9 @@ def self.init factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime) @@engine_factory = factory end + + def self.thread_local_msgpack_packer + Thread.current[:local_msgpack_packer] ||= MessagePackFactory.engine_factory.packer + end end end diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 81245ad8d5..934ffd3c3f 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -254,7 +254,7 @@ def write_metadata(update: true) c: @created_at, m: (update ? Fluent::Clock.real_now : @modified_at), }) - bin = msgpack_packer.pack(data).to_s + bin = Fluent::MessagePackFactory.thread_local_msgpack_packer.pack(data).full_pack size = [bin.bytesize].pack('N') @meta.seek(0, IO::SEEK_SET) @meta.write(BUFFER_HEADER + size + bin) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 823e91081f..7703be8556 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -919,10 +919,10 @@ def write_guard(&block) end end - FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream } - FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream } - FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true) } - FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true) } + FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } def generate_format_proc if @buffer && @buffer.compress == :gzip From f4186cf46a327688887c04618e4eb10a059b0248 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 14 Aug 2019 14:24:24 +0900 Subject: [PATCH 2/3] Use local thread unpacker not to create msgpacker object duplicatedly Signed-off-by: Yuta Iwama --- lib/fluent/event.rb | 24 ++++++++++++------------ lib/fluent/event_router.rb | 4 +++- lib/fluent/msgpack_factory.rb | 4 ++++ lib/fluent/plugin/output.rb | 7 ++++--- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index ec754a6dc1..f25d8a21fc 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -50,7 +50,7 @@ def slice(index, num) raise NotImplementedError, "DO NOT USE THIS CLASS directly." end - def each(&block) + def each(unapcker: nil, &block) raise NotImplementedError, "DO NOT USE THIS CLASS directly." end @@ -107,7 +107,7 @@ def slice(index, num) end end - def each(&block) + def each(unpacker: nil, &block) block.call(@time, @record) nil end @@ -143,7 +143,7 @@ def slice(index, num) ArrayEventStream.new(@entries.slice(index, num)) end - def each(&block) + def each(unpacker: nil, &block) @entries.each(&block) nil end @@ -190,7 +190,7 @@ def slice(index, num) MultiEventStream.new(@time_array.slice(index, num), @record_array.slice(index, num)) end - def each(&block) + def each(unpacker: nil, &block) time_array = @time_array record_array = @record_array for i in 0..time_array.length-1 @@ -234,11 +234,11 @@ def repeatable? true end - def ensure_unpacked! + def ensure_unpacked!(unpacker: nil) return if @unpacked_times && @unpacked_records @unpacked_times = [] @unpacked_records = [] - msgpack_unpacker.feed_each(@data) do |time, record| + (unpacker || msgpack_unpacker).feed_each(@data) do |time, record| @unpacked_times << time @unpacked_records << record end @@ -254,7 +254,7 @@ def slice(index, num) MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num)) end - def each(&block) + def each(unpacker: nil, &block) if @unpacked_times @unpacked_times.each_with_index do |time, i| block.call(time, @unpacked_records[i]) @@ -262,7 +262,7 @@ def each(&block) else @unpacked_times = [] @unpacked_records = [] - msgpack_unpacker.feed_each(@data) do |time, record| + (unpacker || msgpack_unpacker).feed_each(@data) do |time, record| @unpacked_times << time @unpacked_records << record block.call(time, record) @@ -290,12 +290,12 @@ def empty? super end - def ensure_unpacked! + def ensure_unpacked!(unpacker: nil) ensure_decompressed! super end - def each(&block) + def each(unpacker: nil, &block) ensure_decompressed! super end @@ -322,9 +322,9 @@ module ChunkMessagePackEventStreamer include MessagePackFactory::Mixin # chunk.extend(ChunkEventStreamer) # => chunk.each{|time, record| ... } - def each(&block) + def each(unpacker: nil, &block) open do |io| - msgpack_unpacker(io).each(&block) + (unpacker || msgpack_unpacker(io)).each(&block) end nil end diff --git a/lib/fluent/event_router.rb b/lib/fluent/event_router.rb index 23db13a66c..bab1cfe7d6 100644 --- a/lib/fluent/event_router.rb +++ b/lib/fluent/event_router.rb @@ -17,6 +17,7 @@ require 'fluent/match' require 'fluent/event' require 'fluent/filter' +require 'fluent/msgpack_factory' module Fluent # @@ -143,6 +144,7 @@ def initialize @filters = [] @output = nil @optimizer = FilterOptimizer.new + @unpacker = Fluent::MessagePackFactory.unpacker end def add_filter(filter) @@ -182,7 +184,7 @@ def filter_stream(tag, es) def optimized_filter_stream(tag, es) new_es = MultiEventStream.new - es.each do |time, record| + es.each(unpacker: @unpacker) do |time, record| filtered_record = record filtered_time = time diff --git a/lib/fluent/msgpack_factory.rb b/lib/fluent/msgpack_factory.rb index e6ce0665e0..0b34a59c04 100644 --- a/lib/fluent/msgpack_factory.rb +++ b/lib/fluent/msgpack_factory.rb @@ -62,5 +62,9 @@ def self.init def self.thread_local_msgpack_packer Thread.current[:local_msgpack_packer] ||= MessagePackFactory.engine_factory.packer end + + def self.thread_local_msgpack_unpacker + Thread.current[:local_msgpack_unpacker] ||= MessagePackFactory.engine_factory.unpacker + end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 7703be8556..08ec169a89 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -18,6 +18,7 @@ require 'fluent/plugin/base' require 'fluent/plugin/buffer' require 'fluent/plugin_helper/record_accessor' +require 'fluent/msgpack_factory' require 'fluent/log' require 'fluent/plugin_id' require 'fluent/plugin_helper' @@ -944,7 +945,7 @@ def generate_format_proc def handle_stream_with_custom_format(tag, es, enqueue: false) meta_and_data = {} records = 0 - es.each do |time, record| + es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= [] res = format(tag, time, record) @@ -964,7 +965,7 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) format_proc = generate_format_proc meta_and_data = {} records = 0 - es.each do |time, record| + es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= MultiEventStream.new meta_and_data[meta].add(time, record) @@ -984,7 +985,7 @@ def handle_stream_simple(tag, es, enqueue: false) if @custom_format records = 0 data = [] - es.each do |time, record| + es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| res = format(tag, time, record) if res data << res From b8021e9857a7815ea92faaa0acef11dbfe0a240d Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Fri, 16 Aug 2019 19:12:26 +0900 Subject: [PATCH 3/3] use thread local unpacker Signed-off-by: Yuta Iwama --- lib/fluent/event_router.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/fluent/event_router.rb b/lib/fluent/event_router.rb index bab1cfe7d6..365fba4b94 100644 --- a/lib/fluent/event_router.rb +++ b/lib/fluent/event_router.rb @@ -144,7 +144,6 @@ def initialize @filters = [] @output = nil @optimizer = FilterOptimizer.new - @unpacker = Fluent::MessagePackFactory.unpacker end def add_filter(filter) @@ -184,7 +183,7 @@ def filter_stream(tag, es) def optimized_filter_stream(tag, es) new_es = MultiEventStream.new - es.each(unpacker: @unpacker) do |time, record| + es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| filtered_record = record filtered_time = time