From 9aca889cd218108c46d317ba669c31558a00ca01 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Wed, 14 Aug 2019 12:58:31 +0900 Subject: [PATCH] Add thread local packer not to create msgpacker object duplicatedly Signed-off-by: Yuta Iwama --- lib/fluent/event.rb | 24 ++++++++++++------------ lib/fluent/msgpack_factory.rb | 4 ++++ lib/fluent/plugin/buffer/file_chunk.rb | 2 +- lib/fluent/plugin/output.rb | 8 ++++---- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 2fe2a2525d..ec754a6dc1 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -54,26 +54,26 @@ def each(&block) raise NotImplementedError, "DO NOT USE THIS CLASS directly." end - def to_msgpack_stream(time_int: false) - return to_msgpack_stream_forced_integer if time_int - out = msgpack_packer + def to_msgpack_stream(time_int: false, packer: nil) + return to_msgpack_stream_forced_integer(packer: packer) if time_int + out = packer || msgpack_packer each {|time,record| out.write([time,record]) } - out.to_s + out.full_pack end - def to_compressed_msgpack_stream(time_int: false) - packed = to_msgpack_stream(time_int: time_int) + def to_compressed_msgpack_stream(time_int: false, packer: nil) + packed = to_msgpack_stream(time_int: time_int, packer: packer) compress(packed) end - def to_msgpack_stream_forced_integer - out = msgpack_packer + def to_msgpack_stream_forced_integer(packer: nil) + out = packer || msgpack_packer each {|time,record| out.write([time.to_i,record]) } - out.to_s + out.full_pack end end @@ -272,7 +272,7 @@ def each(&block) nil end - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) # time_int is always ignored because @data is always packed binary in this class @data end @@ -300,7 +300,7 @@ def each(&block) super end - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) ensure_decompressed! super end @@ -330,7 +330,7 @@ def each(&block) end alias :msgpack_each :each - def to_msgpack_stream(time_int: false) + def to_msgpack_stream(time_int: false, packer: nil) # time_int is always ignored because data is already packed and written in chunk read end diff --git a/lib/fluent/msgpack_factory.rb b/lib/fluent/msgpack_factory.rb index 54d0b9033e..e6ce0665e0 100644 --- a/lib/fluent/msgpack_factory.rb +++ b/lib/fluent/msgpack_factory.rb @@ -58,5 +58,9 @@ def self.init factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime) @@engine_factory = factory end + + def self.thread_local_msgpack_packer + Thread.current[:local_msgpack_packer] ||= MessagePackFactory.engine_factory.packer + end end end diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 81245ad8d5..934ffd3c3f 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -254,7 +254,7 @@ def write_metadata(update: true) c: @created_at, m: (update ? Fluent::Clock.real_now : @modified_at), }) - bin = msgpack_packer.pack(data).to_s + bin = Fluent::MessagePackFactory.thread_local_msgpack_packer.pack(data).full_pack size = [bin.bytesize].pack('N') @meta.seek(0, IO::SEEK_SET) @meta.write(BUFFER_HEADER + size + bin) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 823e91081f..7703be8556 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -919,10 +919,10 @@ def write_guard(&block) end end - FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream } - FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream } - FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true) } - FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true) } + FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } + FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) } def generate_format_proc if @buffer && @buffer.compress == :gzip