Skip to content

Commit

Permalink
Add thread local packer not to create msgpacker object duplicatedly
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Aug 16, 2019
1 parent d7dcda5 commit 9aca889
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 17 deletions.
24 changes: 12 additions & 12 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,26 @@ def each(&block)
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def to_msgpack_stream(time_int: false)
return to_msgpack_stream_forced_integer if time_int
out = msgpack_packer
def to_msgpack_stream(time_int: false, packer: nil)
return to_msgpack_stream_forced_integer(packer: packer) if time_int
out = packer || msgpack_packer
each {|time,record|
out.write([time,record])
}
out.to_s
out.full_pack
end

def to_compressed_msgpack_stream(time_int: false)
packed = to_msgpack_stream(time_int: time_int)
def to_compressed_msgpack_stream(time_int: false, packer: nil)
packed = to_msgpack_stream(time_int: time_int, packer: packer)
compress(packed)
end

def to_msgpack_stream_forced_integer
out = msgpack_packer
def to_msgpack_stream_forced_integer(packer: nil)
out = packer || msgpack_packer
each {|time,record|
out.write([time.to_i,record])
}
out.to_s
out.full_pack
end
end

Expand Down Expand Up @@ -272,7 +272,7 @@ def each(&block)
nil
end

def to_msgpack_stream(time_int: false)
def to_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because @data is always packed binary in this class
@data
end
Expand Down Expand Up @@ -300,7 +300,7 @@ def each(&block)
super
end

def to_msgpack_stream(time_int: false)
def to_msgpack_stream(time_int: false, packer: nil)
ensure_decompressed!
super
end
Expand Down Expand Up @@ -330,7 +330,7 @@ def each(&block)
end
alias :msgpack_each :each

def to_msgpack_stream(time_int: false)
def to_msgpack_stream(time_int: false, packer: nil)
# time_int is always ignored because data is already packed and written in chunk
read
end
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/msgpack_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,9 @@ def self.init
factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
@@engine_factory = factory
end

def self.thread_local_msgpack_packer
Thread.current[:local_msgpack_packer] ||= MessagePackFactory.engine_factory.packer
end
end
end
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def write_metadata(update: true)
c: @created_at,
m: (update ? Fluent::Clock.real_now : @modified_at),
})
bin = msgpack_packer.pack(data).to_s
bin = Fluent::MessagePackFactory.thread_local_msgpack_packer.pack(data).full_pack
size = [bin.bytesize].pack('N')
@meta.seek(0, IO::SEEK_SET)
@meta.write(BUFFER_HEADER + size + bin)
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -919,10 +919,10 @@ def write_guard(&block)
end
end

FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream }
FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream }
FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true) }
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true) }
FORMAT_MSGPACK_STREAM = ->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM = ->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT = ->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }

def generate_format_proc
if @buffer && @buffer.compress == :gzip
Expand Down

0 comments on commit 9aca889

Please sign in to comment.