Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 87 additions & 2 deletions source/extensions/filters/network/kafka/serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class BooleanDeserializer : public Deserializer<bool> {
* Impl note:
* This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 5-byte
* inputs, the data at bits 5-7 in 5th byte are *ignored* (as long as 8th bit is unset).
* Reference: org.apache.kafka.common.utils.ByteUtils.readUnsignedVarint
*/
class VarUInt32Deserializer : public Deserializer<uint32_t> {
public:
Expand Down Expand Up @@ -224,6 +225,90 @@ class VarUInt32Deserializer : public Deserializer<uint32_t> {
bool ready_ = false;
};

/**
* Deserializer for Kafka 'varint' type.
* Encoding documentation: https://kafka.apache.org/24/protocol.html#protocol_types
*
* Impl note:
* This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 5-byte
* inputs, the data at bits 5-7 in 5th byte are *ignored* (as long as 8th bit is unset).
* Reference: org.apache.kafka.common.utils.ByteUtils.readVarint
*/
class VarInt32Deserializer : public Deserializer<int32_t> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add class comments like the other classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea!

public:
VarInt32Deserializer() = default;

uint32_t feed(absl::string_view& data) override { return varuint32_deserializer_.feed(data); }

bool ready() const override { return varuint32_deserializer_.ready(); }

int32_t get() const override {
const uint32_t res = varuint32_deserializer_.get();
return (res >> 1) ^ -(res & 1);
}

private:
VarUInt32Deserializer varuint32_deserializer_;
};

/**
* Deserializer for Kafka 'varlong' type.
* Encoding documentation: https://kafka.apache.org/24/protocol.html#protocol_types
*
* Impl note:
* This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 10-byte
* inputs, the data at bits 3-7 in 10th byte are *ignored* (as long as 8th bit is unset).
* Reference: org.apache.kafka.common.utils.ByteUtils.readVarlong
*/
class VarInt64Deserializer : public Deserializer<int64_t> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we eventually need a VarUInt64Deserializer that this deserializer can be written on top off like the above?

Copy link
Contributor Author

@adamkotwasinski adamkotwasinski Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public:
VarInt64Deserializer() = default;

uint32_t feed(absl::string_view& data) override {
uint32_t processed = 0;
while (!ready_ && !data.empty()) {

// Read next byte from input.
uint8_t el;
safeMemcpy(&el, data.data());
data = {data.data() + 1, data.size() - 1};
processed++;

// Put the 7 bits where they should have been.
// Impl note: the cast is done to avoid undefined behaviour when offset_ >= 63 and some bits
// at positions 3-7 are set (we would have left shift of signed value that does not fit in
// data type).
bytes_ |= ((static_cast<uint64_t>(el) & 0x7f) << offset_);
if ((el & 0x80) == 0) {
// If this was the last byte to process (what is marked by unset highest bit), we are done.
ready_ = true;
break;
} else {
// Otherwise, we need to read next byte.
offset_ += 7;
// Valid input can have at most 10 bytes.
if (offset_ >= 10 * 7) {
ExceptionUtil::throwEnvoyException(
"VarInt64 is too long (10th byte has highest bit set)");
}
}
}
return processed;
}

bool ready() const override { return ready_; }

int64_t get() const override {
// Do the final conversion, this is a zig-zag encoded signed value.
return (bytes_ >> 1) ^ -(bytes_ & 1);
}

private:
uint64_t bytes_ = 0;
uint32_t offset_ = 0;
bool ready_ = false;
};

/**
* Deserializer of string value.
* First reads length (INT16) and then allocates the buffer of given length.
Expand Down Expand Up @@ -373,10 +458,10 @@ class BytesDeserializer : public Deserializer<Bytes> {

/**
* Deserializer of compact bytes value.
* First reads length (UNSIGNED_VARINT) and then allocates the buffer of given length.
* First reads length (UNSIGNED_VARINT32) and then allocates the buffer of given length.
*
* From Kafka documentation:
* First the length N+1 is given as an UNSIGNED_VARINT. Then N bytes follow.
* First the length N+1 is given as an UNSIGNED_VARINT32. Then N bytes follow.
*/
class CompactBytesDeserializer : public Deserializer<Bytes> {
public:
Expand Down
76 changes: 75 additions & 1 deletion test/extensions/filters/network/kafka/serialization_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "source/extensions/filters/network/kafka/tagged_fields.h"

#include "test/extensions/filters/network/kafka/serialization_utilities.h"
#include "test/test_common/utility.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -27,6 +28,8 @@ TEST_EmptyDeserializerShouldNotBeReady(UInt32Deserializer);
TEST_EmptyDeserializerShouldNotBeReady(Int64Deserializer);
TEST_EmptyDeserializerShouldNotBeReady(BooleanDeserializer);
TEST_EmptyDeserializerShouldNotBeReady(VarUInt32Deserializer);
TEST_EmptyDeserializerShouldNotBeReady(VarInt32Deserializer);
TEST_EmptyDeserializerShouldNotBeReady(VarInt64Deserializer);

TEST_EmptyDeserializerShouldNotBeReady(StringDeserializer);
TEST_EmptyDeserializerShouldNotBeReady(CompactStringDeserializer);
Expand Down Expand Up @@ -151,7 +154,78 @@ TEST(VarUInt32Deserializer, ShouldThrowIfNoEndWith5Bytes) {

// when
// then
EXPECT_THROW(testee.feed(data), EnvoyException);
EXPECT_THROW_WITH_REGEX(testee.feed(data), EnvoyException, "is too long");
}

// Variable-length int32_t tests.

TEST(VarInt32Deserializer, ShouldDeserialize) {
Buffer::OwnedImpl buffer;
const char input[1] = {0};
buffer.add(absl::string_view(input, sizeof(input)));
const int32_t expected_value = 0;
deserializeCompactAndCheckEquality<VarInt32Deserializer>(buffer, expected_value);
}

TEST(VarInt32Deserializer, ShouldDeserializeMinInt32) {
Buffer::OwnedImpl buffer;
const uint8_t input[5] = {0xFF, 0xFF, 0xFF, 0xFF, 0x0F};
buffer.add(absl::string_view(reinterpret_cast<const char*>(input), sizeof(input)));
const int32_t expected_value = std::numeric_limits<int32_t>::min();
deserializeCompactAndCheckEquality<VarInt32Deserializer>(buffer, expected_value);
}

TEST(VarInt32Deserializer, ShouldDeserializeMaxInt32) {
Buffer::OwnedImpl buffer;
const uint8_t input[5] = {0xFE, 0xFF, 0xFF, 0xFF, 0x0F};
buffer.add(absl::string_view(reinterpret_cast<const char*>(input), sizeof(input)));
const int32_t expected_value = std::numeric_limits<int32_t>::max();
deserializeCompactAndCheckEquality<VarInt32Deserializer>(buffer, expected_value);
}

// Variable-length int64_t tests.

TEST(VarInt64Deserializer, ShouldDeserialize) {
Buffer::OwnedImpl buffer;
const char input[1] = {0};
buffer.add(absl::string_view(input, sizeof(input)));
const int64_t expected_value = 0;
deserializeCompactAndCheckEquality<VarInt64Deserializer>(buffer, expected_value);
}

TEST(VarInt64Deserializer, ShouldDeserializeMinInt64) {
Buffer::OwnedImpl buffer;
const uint8_t input[10] = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x0F};
buffer.add(absl::string_view(reinterpret_cast<const char*>(input), sizeof(input)));
const int64_t expected_value = std::numeric_limits<int64_t>::min();
deserializeCompactAndCheckEquality<VarInt64Deserializer>(buffer, expected_value);
}

TEST(VarInt64Deserializer, ShouldDeserializeMaxInt64) {
Buffer::OwnedImpl buffer;
const uint8_t input[10] = {0xFE, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x0F};
buffer.add(absl::string_view(reinterpret_cast<const char*>(input), sizeof(input)));
const int64_t expected_value = std::numeric_limits<int64_t>::max();
deserializeCompactAndCheckEquality<VarInt64Deserializer>(buffer, expected_value);
}

TEST(VarInt64Deserializer, ShouldThrowIfNoEndWith10Bytes) {
// given
VarInt64Deserializer testee;
Buffer::OwnedImpl buffer;

// The buffer makes no sense, it's 10 times 0xFF, while varint encoding ensures that in the worst
// case 10th byte has the highest bit clear.
for (int i = 0; i < 10; ++i) {
const uint8_t all_bits_set = 0xFF;
buffer.add(&all_bits_set, sizeof(all_bits_set));
}

absl::string_view data = {getRawData(buffer), 1024};

// when
// then
EXPECT_THROW_WITH_REGEX(testee.feed(data), EnvoyException, "is too long");
}

// String tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void assertStringViewIncrement(const absl::string_view incremented,
ASSERT_EQ(incremented.size(), original.size() - difference);
}

const char* getRawData(const Buffer::OwnedImpl& buffer) {
const char* getRawData(const Buffer::Instance& buffer) {
Buffer::RawSliceVector slices = buffer.getRawSlices(1);
ASSERT(slices.size() == 1);
return reinterpret_cast<const char*>((slices[0]).mem_);
Expand Down
110 changes: 92 additions & 18 deletions test/extensions/filters/network/kafka/serialization_utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ void assertStringViewIncrement(absl::string_view incremented, absl::string_view
size_t difference);

// Helper function converting buffer to raw bytes.
const char* getRawData(const Buffer::OwnedImpl& buffer);
const char* getRawData(const Buffer::Instance& buffer);

// Helper methods for testing serialization and deserialization.
// We have two dimensions to test here: single-pass vs chunks (as we never know how the input is
// going to be delivered), and normal vs compact for some data types (like strings).

// Exactly what is says on the tin:
// 1. serialize expected using Encoder,
Expand All @@ -29,8 +33,8 @@ const char* getRawData(const Buffer::OwnedImpl& buffer);
// 4. verify that data pointer moved correct amount,
// 5. feed testee more data,
// 6. verify that nothing more was consumed (because the testee has been ready since step 3).
template <typename BT, typename AT>
void serializeThenDeserializeAndCheckEqualityInOneGo(AT expected) {
template <typename BT>
void serializeThenDeserializeAndCheckEqualityInOneGo(const typename BT::result_type expected) {
// given
BT testee{};

Expand Down Expand Up @@ -64,8 +68,8 @@ void serializeThenDeserializeAndCheckEqualityInOneGo(AT expected) {
// Does the same thing as the above test, but instead of providing whole data at one, it provides
// it in N one-byte chunks.
// This verifies if deserializer keeps state properly (no overwrites etc.).
template <typename BT, typename AT>
void serializeThenDeserializeAndCheckEqualityWithChunks(AT expected) {
template <typename BT>
void serializeThenDeserializeAndCheckEqualityWithChunks(const typename BT::result_type expected) {
// given
BT testee{};

Expand Down Expand Up @@ -105,20 +109,18 @@ void serializeThenDeserializeAndCheckEqualityWithChunks(AT expected) {
ASSERT_EQ(more_data.size(), garbage_size);
}

// Same thing as 'serializeThenDeserializeAndCheckEqualityInOneGo', just uses compact encoding.
template <typename BT, typename AT>
void serializeCompactThenDeserializeAndCheckEqualityInOneGo(AT expected) {
// Deserialization (only) of compact-encoded data (for data types where we do not need serializer
// code).
template <typename BT>
void deserializeCompactAndCheckEqualityInOneGo(Buffer::Instance& buffer,
const typename BT::result_type expected) {
// given
BT testee{};

Buffer::OwnedImpl buffer;
EncodingContext encoder{-1};
const uint32_t expected_written_size = encoder.computeCompactSize(expected);
const uint32_t written = encoder.encodeCompact(expected, buffer);
ASSERT_EQ(written, expected_written_size);
const uint32_t written = buffer.length();
// Insert garbage after serialized payload.
const uint32_t garbage_size = encoder.encode(Bytes(10000), buffer);

const char* raw_buffer_ptr =
reinterpret_cast<const char*>(buffer.linearize(written + garbage_size));
// Tell parser that there is more data, it should never consume more than written.
Expand All @@ -142,9 +144,69 @@ void serializeCompactThenDeserializeAndCheckEqualityInOneGo(AT expected) {
assertStringViewIncrement(data, orig_data, consumed);
}

// Does the same thing as the above test, but instead of providing whole data at one, it provides
// it in N one-byte chunks.
// This verifies if deserializer keeps state properly (no overwrites etc.).
template <typename BT>
void deserializeCompactAndCheckEqualityWithChunks(Buffer::Instance& buffer,
const typename BT::result_type expected) {
// given
BT testee{};

EncodingContext encoder{-1};
const uint32_t written = buffer.length();
// Insert garbage after serialized payload.
const uint32_t garbage_size = encoder.encode(Bytes(10000), buffer);

const char* raw_buffer_ptr =
reinterpret_cast<const char*>(buffer.linearize(written + garbage_size));
// Tell parser that there is more data, it should never consume more than written.
const absl::string_view orig_data = {raw_buffer_ptr, written + garbage_size};

// when
absl::string_view data = orig_data;
uint32_t consumed = 0;
for (uint32_t i = 0; i < written; ++i) {
data = {data.data(), 1}; // Consume data byte-by-byte.
uint32_t step = testee.feed(data);
consumed += step;
ASSERT_EQ(step, 1);
ASSERT_EQ(data.size(), 0);
}

// then
ASSERT_EQ(consumed, written);
ASSERT_EQ(testee.ready(), true);
ASSERT_EQ(testee.get(), expected);

ASSERT_EQ(data.data(), orig_data.data() + consumed);

// when - 2
absl::string_view more_data = {data.data(), garbage_size};
const uint32_t consumed2 = testee.feed(more_data);

// then - 2 (nothing changes)
ASSERT_EQ(consumed2, 0);
ASSERT_EQ(more_data.data(), data.data());
ASSERT_EQ(more_data.size(), garbage_size);
}

// Same thing as 'serializeThenDeserializeAndCheckEqualityInOneGo', just uses compact encoding.
template <typename BT>
void serializeCompactThenDeserializeAndCheckEqualityInOneGo(
const typename BT::result_type expected) {
Buffer::OwnedImpl buffer;
EncodingContext encoder{-1};
const uint32_t expected_written_size = encoder.computeCompactSize(expected);
const uint32_t written = encoder.encodeCompact(expected, buffer);
ASSERT_EQ(written, expected_written_size);
deserializeCompactAndCheckEqualityInOneGo<BT>(buffer, expected);
}

// Same thing as 'serializeThenDeserializeAndCheckEqualityWithChunks', just uses compact encoding.
template <typename BT, typename AT>
void serializeCompactThenDeserializeAndCheckEqualityWithChunks(AT expected) {
template <typename BT>
void serializeCompactThenDeserializeAndCheckEqualityWithChunks(
const typename BT::result_type expected) {
// given
BT testee{};

Expand Down Expand Up @@ -190,18 +252,30 @@ void serializeCompactThenDeserializeAndCheckEqualityWithChunks(AT expected) {
}

// Wrapper to run both tests for normal serialization.
template <typename BT, typename AT> void serializeThenDeserializeAndCheckEquality(AT expected) {
template <typename BT>
void serializeThenDeserializeAndCheckEquality(const typename BT::result_type expected) {
serializeThenDeserializeAndCheckEqualityInOneGo<BT>(expected);
serializeThenDeserializeAndCheckEqualityWithChunks<BT>(expected);
}

// Wrapper to run both tests for compact serialization.
template <typename BT, typename AT>
void serializeCompactThenDeserializeAndCheckEquality(AT expected) {
template <typename BT>
void serializeCompactThenDeserializeAndCheckEquality(const typename BT::result_type expected) {
serializeCompactThenDeserializeAndCheckEqualityInOneGo<BT>(expected);
serializeCompactThenDeserializeAndCheckEqualityWithChunks<BT>(expected);
}

// Wrapper to run both tests for compact deserialization (for non-serializable types).
template <typename BT>
void deserializeCompactAndCheckEquality(Buffer::Instance& buffer,
const typename BT::result_type expected) {
Buffer::OwnedImpl
copy_for_chunking_test; // Tests modify input buffers, so let's just make a copy.
copy_for_chunking_test.add(getRawData(buffer), buffer.length());
deserializeCompactAndCheckEqualityInOneGo<BT>(buffer, expected);
deserializeCompactAndCheckEqualityWithChunks<BT>(copy_for_chunking_test, expected);
}

/**
* Message callback that captures the messages.
*/
Expand Down