From 3c4fbf01ad25e44703e11cdde55bbbd04a508c40 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 26 Apr 2023 18:52:12 +0900 Subject: [PATCH] MessagePackFactory: Make sure to reset local unpacker every time Fixes bug in c6c6c038c5cacab6fcb9aa89a714d9366ac4902e (#2559). Received incomplete data must not affect data from other senders. Signed-off-by: Daijiro Fukuda --- lib/fluent/msgpack_factory.rb | 7 ++++++- test/test_msgpack_factory.rb | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/lib/fluent/msgpack_factory.rb b/lib/fluent/msgpack_factory.rb index fa7b52a2d4..0d0c0ff01a 100644 --- a/lib/fluent/msgpack_factory.rb +++ b/lib/fluent/msgpack_factory.rb @@ -100,7 +100,12 @@ def self.thread_local_msgpack_packer end def self.thread_local_msgpack_unpacker - Thread.current[:local_msgpack_unpacker] ||= MessagePackFactory.engine_factory.unpacker + unpacker = Thread.current[:local_msgpack_unpacker] + if unpacker.nil? + return Thread.current[:local_msgpack_unpacker] = MessagePackFactory.engine_factory.unpacker + end + unpacker.reset + unpacker end end end diff --git a/test/test_msgpack_factory.rb b/test/test_msgpack_factory.rb index 8593f34968..ba9909590d 100644 --- a/test/test_msgpack_factory.rb +++ b/test/test_msgpack_factory.rb @@ -15,4 +15,36 @@ class MessagePackFactoryTest < Test::Unit::TestCase assert mp.msgpack_factory assert mp.msgpack_factory end + + sub_test_case 'thread_local_msgpack_packer' do + test 'packer is cached' do + packer1 = Fluent::MessagePackFactory.thread_local_msgpack_packer + packer2 = Fluent::MessagePackFactory.thread_local_msgpack_packer + assert_equal packer1, packer2 + end + end + + sub_test_case 'thread_local_msgpack_unpacker' do + test 'unpacker is cached' do + unpacker1 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker + unpacker2 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker + assert_equal unpacker1, unpacker2 + end + + # We need to reset the buffer every time so that received incomplete data + # must not affect data from other senders. + test 'reset the internal buffer of unpacker every time' do + unpacker1 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker + unpacker1.feed_each("\xA6foo") do |result| + flunk("This callback must not be called since the data is uncomplete.") + end + + records = [] + unpacker2 = Fluent::MessagePackFactory.thread_local_msgpack_unpacker + unpacker2.feed_each("\xA3foo") do |result| + records.append(result) + end + assert_equal ["foo"], records + end + end end