diff --git a/source/extensions/filters/network/kafka/serialization.h b/source/extensions/filters/network/kafka/serialization.h index cab81a0acf238..3abc5f1fb5597 100644 --- a/source/extensions/filters/network/kafka/serialization.h +++ b/source/extensions/filters/network/kafka/serialization.h @@ -177,6 +177,7 @@ class BooleanDeserializer : public Deserializer { * Impl note: * This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 5-byte * inputs, the data at bits 5-7 in 5th byte are *ignored* (as long as 8th bit is unset). + * Reference: org.apache.kafka.common.utils.ByteUtils.readUnsignedVarint */ class VarUInt32Deserializer : public Deserializer { public: @@ -224,6 +225,90 @@ class VarUInt32Deserializer : public Deserializer { bool ready_ = false; }; +/** + * Deserializer for Kafka 'varint' type. + * Encoding documentation: https://kafka.apache.org/24/protocol.html#protocol_types + * + * Impl note: + * This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 5-byte + * inputs, the data at bits 5-7 in 5th byte are *ignored* (as long as 8th bit is unset). + * Reference: org.apache.kafka.common.utils.ByteUtils.readVarint + */ +class VarInt32Deserializer : public Deserializer { +public: + VarInt32Deserializer() = default; + + uint32_t feed(absl::string_view& data) override { return varuint32_deserializer_.feed(data); } + + bool ready() const override { return varuint32_deserializer_.ready(); } + + int32_t get() const override { + const uint32_t res = varuint32_deserializer_.get(); + return (res >> 1) ^ -(res & 1); + } + +private: + VarUInt32Deserializer varuint32_deserializer_; +}; + +/** + * Deserializer for Kafka 'varlong' type. + * Encoding documentation: https://kafka.apache.org/24/protocol.html#protocol_types + * + * Impl note: + * This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 10-byte + * inputs, the data at bits 3-7 in 10th byte are *ignored* (as long as 8th bit is unset). + * Reference: org.apache.kafka.common.utils.ByteUtils.readVarlong + */ +class VarInt64Deserializer : public Deserializer { +public: + VarInt64Deserializer() = default; + + uint32_t feed(absl::string_view& data) override { + uint32_t processed = 0; + while (!ready_ && !data.empty()) { + + // Read next byte from input. + uint8_t el; + safeMemcpy(&el, data.data()); + data = {data.data() + 1, data.size() - 1}; + processed++; + + // Put the 7 bits where they should have been. + // Impl note: the cast is done to avoid undefined behaviour when offset_ >= 63 and some bits + // at positions 3-7 are set (we would have left shift of signed value that does not fit in + // data type). + bytes_ |= ((static_cast(el) & 0x7f) << offset_); + if ((el & 0x80) == 0) { + // If this was the last byte to process (what is marked by unset highest bit), we are done. + ready_ = true; + break; + } else { + // Otherwise, we need to read next byte. + offset_ += 7; + // Valid input can have at most 10 bytes. + if (offset_ >= 10 * 7) { + ExceptionUtil::throwEnvoyException( + "VarInt64 is too long (10th byte has highest bit set)"); + } + } + } + return processed; + } + + bool ready() const override { return ready_; } + + int64_t get() const override { + // Do the final conversion, this is a zig-zag encoded signed value. + return (bytes_ >> 1) ^ -(bytes_ & 1); + } + +private: + uint64_t bytes_ = 0; + uint32_t offset_ = 0; + bool ready_ = false; +}; + /** * Deserializer of string value. * First reads length (INT16) and then allocates the buffer of given length. @@ -373,10 +458,10 @@ class BytesDeserializer : public Deserializer { /** * Deserializer of compact bytes value. - * First reads length (UNSIGNED_VARINT) and then allocates the buffer of given length. + * First reads length (UNSIGNED_VARINT32) and then allocates the buffer of given length. * * From Kafka documentation: - * First the length N+1 is given as an UNSIGNED_VARINT. Then N bytes follow. + * First the length N+1 is given as an UNSIGNED_VARINT32. Then N bytes follow. */ class CompactBytesDeserializer : public Deserializer { public: diff --git a/test/extensions/filters/network/kafka/serialization_test.cc b/test/extensions/filters/network/kafka/serialization_test.cc index 41306d143ee3d..5a6a1ae7e89b9 100644 --- a/test/extensions/filters/network/kafka/serialization_test.cc +++ b/test/extensions/filters/network/kafka/serialization_test.cc @@ -1,6 +1,7 @@ #include "source/extensions/filters/network/kafka/tagged_fields.h" #include "test/extensions/filters/network/kafka/serialization_utilities.h" +#include "test/test_common/utility.h" namespace Envoy { namespace Extensions { @@ -27,6 +28,8 @@ TEST_EmptyDeserializerShouldNotBeReady(UInt32Deserializer); TEST_EmptyDeserializerShouldNotBeReady(Int64Deserializer); TEST_EmptyDeserializerShouldNotBeReady(BooleanDeserializer); TEST_EmptyDeserializerShouldNotBeReady(VarUInt32Deserializer); +TEST_EmptyDeserializerShouldNotBeReady(VarInt32Deserializer); +TEST_EmptyDeserializerShouldNotBeReady(VarInt64Deserializer); TEST_EmptyDeserializerShouldNotBeReady(StringDeserializer); TEST_EmptyDeserializerShouldNotBeReady(CompactStringDeserializer); @@ -151,7 +154,78 @@ TEST(VarUInt32Deserializer, ShouldThrowIfNoEndWith5Bytes) { // when // then - EXPECT_THROW(testee.feed(data), EnvoyException); + EXPECT_THROW_WITH_REGEX(testee.feed(data), EnvoyException, "is too long"); +} + +// Variable-length int32_t tests. + +TEST(VarInt32Deserializer, ShouldDeserialize) { + Buffer::OwnedImpl buffer; + const char input[1] = {0}; + buffer.add(absl::string_view(input, sizeof(input))); + const int32_t expected_value = 0; + deserializeCompactAndCheckEquality(buffer, expected_value); +} + +TEST(VarInt32Deserializer, ShouldDeserializeMinInt32) { + Buffer::OwnedImpl buffer; + const uint8_t input[5] = {0xFF, 0xFF, 0xFF, 0xFF, 0x0F}; + buffer.add(absl::string_view(reinterpret_cast(input), sizeof(input))); + const int32_t expected_value = std::numeric_limits::min(); + deserializeCompactAndCheckEquality(buffer, expected_value); +} + +TEST(VarInt32Deserializer, ShouldDeserializeMaxInt32) { + Buffer::OwnedImpl buffer; + const uint8_t input[5] = {0xFE, 0xFF, 0xFF, 0xFF, 0x0F}; + buffer.add(absl::string_view(reinterpret_cast(input), sizeof(input))); + const int32_t expected_value = std::numeric_limits::max(); + deserializeCompactAndCheckEquality(buffer, expected_value); +} + +// Variable-length int64_t tests. + +TEST(VarInt64Deserializer, ShouldDeserialize) { + Buffer::OwnedImpl buffer; + const char input[1] = {0}; + buffer.add(absl::string_view(input, sizeof(input))); + const int64_t expected_value = 0; + deserializeCompactAndCheckEquality(buffer, expected_value); +} + +TEST(VarInt64Deserializer, ShouldDeserializeMinInt64) { + Buffer::OwnedImpl buffer; + const uint8_t input[10] = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x0F}; + buffer.add(absl::string_view(reinterpret_cast(input), sizeof(input))); + const int64_t expected_value = std::numeric_limits::min(); + deserializeCompactAndCheckEquality(buffer, expected_value); +} + +TEST(VarInt64Deserializer, ShouldDeserializeMaxInt64) { + Buffer::OwnedImpl buffer; + const uint8_t input[10] = {0xFE, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x0F}; + buffer.add(absl::string_view(reinterpret_cast(input), sizeof(input))); + const int64_t expected_value = std::numeric_limits::max(); + deserializeCompactAndCheckEquality(buffer, expected_value); +} + +TEST(VarInt64Deserializer, ShouldThrowIfNoEndWith10Bytes) { + // given + VarInt64Deserializer testee; + Buffer::OwnedImpl buffer; + + // The buffer makes no sense, it's 10 times 0xFF, while varint encoding ensures that in the worst + // case 10th byte has the highest bit clear. + for (int i = 0; i < 10; ++i) { + const uint8_t all_bits_set = 0xFF; + buffer.add(&all_bits_set, sizeof(all_bits_set)); + } + + absl::string_view data = {getRawData(buffer), 1024}; + + // when + // then + EXPECT_THROW_WITH_REGEX(testee.feed(data), EnvoyException, "is too long"); } // String tests. diff --git a/test/extensions/filters/network/kafka/serialization_utilities.cc b/test/extensions/filters/network/kafka/serialization_utilities.cc index 625d4f0835019..10d32b53ca7c5 100644 --- a/test/extensions/filters/network/kafka/serialization_utilities.cc +++ b/test/extensions/filters/network/kafka/serialization_utilities.cc @@ -12,7 +12,7 @@ void assertStringViewIncrement(const absl::string_view incremented, ASSERT_EQ(incremented.size(), original.size() - difference); } -const char* getRawData(const Buffer::OwnedImpl& buffer) { +const char* getRawData(const Buffer::Instance& buffer) { Buffer::RawSliceVector slices = buffer.getRawSlices(1); ASSERT(slices.size() == 1); return reinterpret_cast((slices[0]).mem_); diff --git a/test/extensions/filters/network/kafka/serialization_utilities.h b/test/extensions/filters/network/kafka/serialization_utilities.h index d2fb902b8294d..4b063971f0c5c 100644 --- a/test/extensions/filters/network/kafka/serialization_utilities.h +++ b/test/extensions/filters/network/kafka/serialization_utilities.h @@ -20,7 +20,11 @@ void assertStringViewIncrement(absl::string_view incremented, absl::string_view size_t difference); // Helper function converting buffer to raw bytes. -const char* getRawData(const Buffer::OwnedImpl& buffer); +const char* getRawData(const Buffer::Instance& buffer); + +// Helper methods for testing serialization and deserialization. +// We have two dimensions to test here: single-pass vs chunks (as we never know how the input is +// going to be delivered), and normal vs compact for some data types (like strings). // Exactly what is says on the tin: // 1. serialize expected using Encoder, @@ -29,8 +33,8 @@ const char* getRawData(const Buffer::OwnedImpl& buffer); // 4. verify that data pointer moved correct amount, // 5. feed testee more data, // 6. verify that nothing more was consumed (because the testee has been ready since step 3). -template -void serializeThenDeserializeAndCheckEqualityInOneGo(AT expected) { +template +void serializeThenDeserializeAndCheckEqualityInOneGo(const typename BT::result_type expected) { // given BT testee{}; @@ -64,8 +68,8 @@ void serializeThenDeserializeAndCheckEqualityInOneGo(AT expected) { // Does the same thing as the above test, but instead of providing whole data at one, it provides // it in N one-byte chunks. // This verifies if deserializer keeps state properly (no overwrites etc.). -template -void serializeThenDeserializeAndCheckEqualityWithChunks(AT expected) { +template +void serializeThenDeserializeAndCheckEqualityWithChunks(const typename BT::result_type expected) { // given BT testee{}; @@ -105,20 +109,18 @@ void serializeThenDeserializeAndCheckEqualityWithChunks(AT expected) { ASSERT_EQ(more_data.size(), garbage_size); } -// Same thing as 'serializeThenDeserializeAndCheckEqualityInOneGo', just uses compact encoding. -template -void serializeCompactThenDeserializeAndCheckEqualityInOneGo(AT expected) { +// Deserialization (only) of compact-encoded data (for data types where we do not need serializer +// code). +template +void deserializeCompactAndCheckEqualityInOneGo(Buffer::Instance& buffer, + const typename BT::result_type expected) { // given BT testee{}; - Buffer::OwnedImpl buffer; EncodingContext encoder{-1}; - const uint32_t expected_written_size = encoder.computeCompactSize(expected); - const uint32_t written = encoder.encodeCompact(expected, buffer); - ASSERT_EQ(written, expected_written_size); + const uint32_t written = buffer.length(); // Insert garbage after serialized payload. const uint32_t garbage_size = encoder.encode(Bytes(10000), buffer); - const char* raw_buffer_ptr = reinterpret_cast(buffer.linearize(written + garbage_size)); // Tell parser that there is more data, it should never consume more than written. @@ -142,9 +144,69 @@ void serializeCompactThenDeserializeAndCheckEqualityInOneGo(AT expected) { assertStringViewIncrement(data, orig_data, consumed); } +// Does the same thing as the above test, but instead of providing whole data at one, it provides +// it in N one-byte chunks. +// This verifies if deserializer keeps state properly (no overwrites etc.). +template +void deserializeCompactAndCheckEqualityWithChunks(Buffer::Instance& buffer, + const typename BT::result_type expected) { + // given + BT testee{}; + + EncodingContext encoder{-1}; + const uint32_t written = buffer.length(); + // Insert garbage after serialized payload. + const uint32_t garbage_size = encoder.encode(Bytes(10000), buffer); + + const char* raw_buffer_ptr = + reinterpret_cast(buffer.linearize(written + garbage_size)); + // Tell parser that there is more data, it should never consume more than written. + const absl::string_view orig_data = {raw_buffer_ptr, written + garbage_size}; + + // when + absl::string_view data = orig_data; + uint32_t consumed = 0; + for (uint32_t i = 0; i < written; ++i) { + data = {data.data(), 1}; // Consume data byte-by-byte. + uint32_t step = testee.feed(data); + consumed += step; + ASSERT_EQ(step, 1); + ASSERT_EQ(data.size(), 0); + } + + // then + ASSERT_EQ(consumed, written); + ASSERT_EQ(testee.ready(), true); + ASSERT_EQ(testee.get(), expected); + + ASSERT_EQ(data.data(), orig_data.data() + consumed); + + // when - 2 + absl::string_view more_data = {data.data(), garbage_size}; + const uint32_t consumed2 = testee.feed(more_data); + + // then - 2 (nothing changes) + ASSERT_EQ(consumed2, 0); + ASSERT_EQ(more_data.data(), data.data()); + ASSERT_EQ(more_data.size(), garbage_size); +} + +// Same thing as 'serializeThenDeserializeAndCheckEqualityInOneGo', just uses compact encoding. +template +void serializeCompactThenDeserializeAndCheckEqualityInOneGo( + const typename BT::result_type expected) { + Buffer::OwnedImpl buffer; + EncodingContext encoder{-1}; + const uint32_t expected_written_size = encoder.computeCompactSize(expected); + const uint32_t written = encoder.encodeCompact(expected, buffer); + ASSERT_EQ(written, expected_written_size); + deserializeCompactAndCheckEqualityInOneGo(buffer, expected); +} + // Same thing as 'serializeThenDeserializeAndCheckEqualityWithChunks', just uses compact encoding. -template -void serializeCompactThenDeserializeAndCheckEqualityWithChunks(AT expected) { +template +void serializeCompactThenDeserializeAndCheckEqualityWithChunks( + const typename BT::result_type expected) { // given BT testee{}; @@ -190,18 +252,30 @@ void serializeCompactThenDeserializeAndCheckEqualityWithChunks(AT expected) { } // Wrapper to run both tests for normal serialization. -template void serializeThenDeserializeAndCheckEquality(AT expected) { +template +void serializeThenDeserializeAndCheckEquality(const typename BT::result_type expected) { serializeThenDeserializeAndCheckEqualityInOneGo(expected); serializeThenDeserializeAndCheckEqualityWithChunks(expected); } // Wrapper to run both tests for compact serialization. -template -void serializeCompactThenDeserializeAndCheckEquality(AT expected) { +template +void serializeCompactThenDeserializeAndCheckEquality(const typename BT::result_type expected) { serializeCompactThenDeserializeAndCheckEqualityInOneGo(expected); serializeCompactThenDeserializeAndCheckEqualityWithChunks(expected); } +// Wrapper to run both tests for compact deserialization (for non-serializable types). +template +void deserializeCompactAndCheckEquality(Buffer::Instance& buffer, + const typename BT::result_type expected) { + Buffer::OwnedImpl + copy_for_chunking_test; // Tests modify input buffers, so let's just make a copy. + copy_for_chunking_test.add(getRawData(buffer), buffer.length()); + deserializeCompactAndCheckEqualityInOneGo(buffer, expected); + deserializeCompactAndCheckEqualityWithChunks(copy_for_chunking_test, expected); +} + /** * Message callback that captures the messages. */