diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc index b44a452358c54..e2ed06fdbb17e 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc @@ -13,7 +13,7 @@ constexpr static int16_t NO_ERROR = 0; ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter, UpstreamKafkaFacade& kafka_facade, const std::shared_ptr> request) - : ProduceRequestHolder{filter, kafka_facade, PlaceholderRecordExtractor{}, request} {}; + : ProduceRequestHolder{filter, kafka_facade, RecordExtractorImpl{}, request} {}; ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter, UpstreamKafkaFacade& kafka_facade, diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc index ee970b27cab1f..3c98dc4885cf9 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc @@ -7,8 +7,198 @@ namespace Kafka { namespace Mesh { std::vector -PlaceholderRecordExtractor::extractRecords(const std::vector&) const { - return {}; +RecordExtractorImpl::extractRecords(const std::vector& data) const { + std::vector result; + for (const auto& topic_data : data) { + for (const auto& partition_data : topic_data.partitions_) { + // Kafka protocol allows nullable data. + if (partition_data.records_) { + const auto topic_result = extractPartitionRecords( + topic_data.name_, partition_data.partition_index_, *(partition_data.records_)); + std::copy(topic_result.begin(), topic_result.end(), std::back_inserter(result)); + } + } + } + return result; +} + +// Fields common to any record batch payload. +// See: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L46 +constexpr unsigned int RECORD_BATCH_COMMON_FIELDS_SIZE = /* BaseOffset */ sizeof(int64_t) + + /* Length */ sizeof(int32_t) + + /* PartitionLeaderEpoch */ sizeof(int32_t); + +// Magic format introduced around Kafka 1.0.0 and still used with Kafka 2.4. +// We can extract records out of record batches that use this magic. +constexpr int8_t SUPPORTED_MAGIC = 2; + +// Reference implementation: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L443 +std::vector RecordExtractorImpl::extractPartitionRecords(const std::string& topic, + const int32_t partition, + const Bytes& bytes) const { + + absl::string_view data = {reinterpret_cast(bytes.data()), bytes.size()}; + + // Let's skip these common fields, because we are not using them. + if (data.length() < RECORD_BATCH_COMMON_FIELDS_SIZE) { + throw EnvoyException(fmt::format("record batch for [{}-{}] is too short (no common fields): {}", + topic, partition, data.length())); + } + data = {data.data() + RECORD_BATCH_COMMON_FIELDS_SIZE, + data.length() - RECORD_BATCH_COMMON_FIELDS_SIZE}; + + // Extract magic - it what is the format of records present in the bytes provided. + Int8Deserializer magic_deserializer; + magic_deserializer.feed(data); + if (!magic_deserializer.ready()) { + throw EnvoyException( + fmt::format("magic byte is not present in record batch for [{}-{}]", topic, partition)); + } + + // Old client sending old magic, or Apache Kafka introducing new magic. + const int8_t magic = magic_deserializer.get(); + if (SUPPORTED_MAGIC != magic) { + throw EnvoyException(fmt::format("unknown magic value in record batch for [{}-{}]: {}", topic, + partition, magic)); + } + + // We have received a record batch with good magic. + return processRecordBatch(topic, partition, data); +} + +// Record batch fields we are going to ignore (because we rip it up and send its contents). +// See: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L50 +// and: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L471 +constexpr unsigned int IGNORED_FIELDS_SIZE = + /* CRC */ sizeof(int32_t) + /* Attributes */ sizeof(int16_t) + + /* LastOffsetDelta */ sizeof(int32_t) + /* FirstTimestamp */ sizeof(int64_t) + + /* MaxTimestamp */ sizeof(int64_t) + /* ProducerId */ sizeof(int64_t) + + /* ProducerEpoch */ sizeof(int16_t) + /* BaseSequence */ sizeof(int32_t) + + /* RecordCount */ sizeof(int32_t); + +std::vector RecordExtractorImpl::processRecordBatch(const std::string& topic, + const int32_t partition, + absl::string_view data) const { + + if (data.length() < IGNORED_FIELDS_SIZE) { + throw EnvoyException( + fmt::format("record batch for [{}-{}] is too short (no attribute fields): {}", topic, + partition, data.length())); + } + data = {data.data() + IGNORED_FIELDS_SIZE, data.length() - IGNORED_FIELDS_SIZE}; + + // We have managed to consume all the fancy bytes, now it's time to get to records. + std::vector result; + while (!data.empty()) { + const OutboundRecord record = extractRecord(topic, partition, data); + result.push_back(record); + } + return result; +} + +// Reference implementation: +// https://github.com/apache/kafka/blob/2.4.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L179 +OutboundRecord RecordExtractorImpl::extractRecord(const std::string& topic, const int32_t partition, + absl::string_view& data) const { + + VarInt32Deserializer length; + length.feed(data); + if (!length.ready()) { + throw EnvoyException( + fmt::format("record for [{}-{}] is too short (no length)", topic, partition)); + } + const int32_t len = length.get(); + if (len < 0) { + throw EnvoyException( + fmt::format("record for [{}-{}] has invalid length: {}", topic, partition, len)); + } + if (static_cast(len) > data.length()) { + throw EnvoyException(fmt::format("record for [{}-{}] is too short (not enough bytes provided)", + topic, partition)); + } + + const absl::string_view expected_end_of_record = {data.data() + len, data.length() - len}; + + // We throw away the following batch fields: attributes, timestamp delta, offset delta (cannot do + // an easy jump, as some are variable-length). + Int8Deserializer attributes; + attributes.feed(data); + VarInt64Deserializer tsDelta; + tsDelta.feed(data); + VarUInt32Deserializer offsetDelta; + offsetDelta.feed(data); + if (!attributes.ready() || !tsDelta.ready() || !offsetDelta.ready()) { + throw EnvoyException( + fmt::format("attributes not present in record for [{}-{}]", topic, partition)); + } + + // Record key and value. + const absl::string_view key = extractByteArray(data); + const absl::string_view value = extractByteArray(data); + + // Headers. + VarInt32Deserializer headers_count_deserializer; + headers_count_deserializer.feed(data); + if (!headers_count_deserializer.ready()) { + throw EnvoyException( + fmt::format("header count not present in record for [{}-{}]", topic, partition)); + } + const int32_t headers_count = headers_count_deserializer.get(); + if (headers_count < 0) { + throw EnvoyException(fmt::format("invalid header count in record for [{}-{}]: {}", topic, + partition, headers_count)); + } + for (int32_t i = 0; i < headers_count; ++i) { + // For now, we ignore headers. + extractByteArray(data); // Header key. + extractByteArray(data); // Header value. + } + + if (data == expected_end_of_record) { + // We have consumed everything nicely. + return OutboundRecord{topic, partition, key, value}; + } else { + // Bad data - there are bytes left. + throw EnvoyException(fmt::format("data left after consuming record for [{}-{}]: {}", topic, + partition, data.length())); + } +} + +absl::string_view RecordExtractorImpl::extractByteArray(absl::string_view& input) { + + // Get the length. + VarInt32Deserializer length_deserializer; + length_deserializer.feed(input); + if (!length_deserializer.ready()) { + throw EnvoyException("byte array length not present"); + } + const int32_t length = length_deserializer.get(); + + // Length can be -1 (null value was published by client). + if (-1 == length) { + return {}; + } + + // Otherwise, length cannot be negative. + if (length < 0) { + throw EnvoyException(fmt::format("byte array length less than -1: {}", length)); + } + + // Underflow handling. + if (static_cast(length) > input.size()) { + throw EnvoyException( + fmt::format("byte array length larger than data provided: {} vs {}", length, input.size())); + } + + // We have enough data to return it. + const absl::string_view result = {input.data(), + static_cast(length)}; + input = {input.data() + length, input.length() - length}; + return result; } } // namespace Mesh diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h index e8c89fcbd3e3e..59c6e7380e4fa 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h @@ -21,12 +21,29 @@ class RecordExtractor { }; /** - * Just a placeholder for now. + * Proper implementation of record extractor, capable of parsing V2 record set. + * Reference: https://kafka.apache.org/24/documentation/#messageformat */ -class PlaceholderRecordExtractor : public RecordExtractor { +class RecordExtractorImpl : public RecordExtractor { public: + // RecordExtractor std::vector extractRecords(const std::vector& data) const override; + + // Helper function to get the data (such as key, value) out of given input, as most of the + // interesting fields in records are kept as variable-encoded length and following bytes. + static absl::string_view extractByteArray(absl::string_view& input); + +private: + std::vector extractPartitionRecords(const std::string& topic, + const int32_t partition, + const Bytes& records) const; + + std::vector processRecordBatch(const std::string& topic, const int32_t partition, + absl::string_view data) const; + + OutboundRecord extractRecord(const std::string& topic, const int32_t partition, + absl::string_view& data) const; }; } // namespace Mesh diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD index 3c57a687c37a4..18b75f4206f50 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/BUILD @@ -19,6 +19,15 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "produce_record_extractor_unit_test", + srcs = ["produce_record_extractor_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh/command_handlers:produce_record_extractor_lib", + ], +) + envoy_cc_test( name = "metadata_unit_test", srcs = ["metadata_unit_test.cc"], diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc new file mode 100644 index 0000000000000..068aa40c1334b --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_record_extractor_unit_test.cc @@ -0,0 +1,243 @@ +#include + +#include "test/test_common/utility.h" + +#include "contrib/kafka/filters/network/source/external/requests.h" +#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +// Simple matcher that verifies that the input given is a collection containing correct number of +// unique (!) records for given topic-partition pairs. +MATCHER_P3(HasRecords, topic, partition, expected, "") { + size_t expected_count = expected; + std::set saved_key_pointers = {}; + std::set saved_value_pointers = {}; + size_t count = 0; + + for (const auto& record : arg) { + if (record.topic_ == topic && record.partition_ == partition) { + saved_key_pointers.insert(record.key_); + saved_value_pointers.insert(record.value_); + ++count; + } + } + + if (expected_count != count) { + return false; + } + if (expected_count != saved_key_pointers.size()) { + return false; + } + return saved_key_pointers.size() == saved_value_pointers.size(); +} + +// Helper function to create a record batch that contains a single record with 5-byte key and 5-byte +// value. +Bytes makeGoodRecordBatch() { + // Record batch bytes get ignored (apart from magic field), so we can put 0 there. + Bytes result = Bytes(16 + 1 + 44); + result[16] = 2; // Record batch magic value. + Bytes real_data = {/* Length = 36 */ 72, + /* Attributes */ 0, + /* Timestamp delta */ 0, + /* Offset delta */ 0, + /* Key length = 5 */ 10, + 107, + 107, + 107, + 107, + 107, + /* Value length = 5 */ 10, + 118, + 118, + 118, + 118, + 118, + /* Headers count = 2 */ 4, + /* Header key length = 3 */ 6, + 49, + 49, + 49, + /* Header value length = 5 */ 10, + 97, + 97, + 97, + 97, + 97, + /* Header key length = 3 */ 6, + 50, + 50, + 50, + /* Header value length = 5 */ 10, + 98, + 98, + 98, + 98, + 98}; + result.insert(result.end(), real_data.begin(), real_data.end()); + return result; +} + +TEST(RecordExtractorImpl, shouldProcessRecordBytes) { + // given + const RecordExtractorImpl testee; + + const PartitionProduceData t1_ppd1 = {0, makeGoodRecordBatch()}; + const PartitionProduceData t1_ppd2 = {1, makeGoodRecordBatch()}; + const PartitionProduceData t1_ppd3 = {2, makeGoodRecordBatch()}; + const TopicProduceData tpd1 = {"topic1", {t1_ppd1, t1_ppd2, t1_ppd3}}; + + // Weird input from client, protocol allows sending null value as bytes array. + const PartitionProduceData t2_ppd = {20, absl::nullopt}; + const TopicProduceData tpd2 = {"topic2", {t2_ppd}}; + + const std::vector input = {tpd1, tpd2}; + + // when + const auto result = testee.extractRecords(input); + + // then + EXPECT_THAT(result, HasRecords("topic1", 0, 1)); + EXPECT_THAT(result, HasRecords("topic1", 1, 1)); + EXPECT_THAT(result, HasRecords("topic1", 2, 1)); + EXPECT_THAT(result, HasRecords("topic2", 20, 0)); +} + +/** + * Helper function to make record batch (batch contains 1+ records). + * We use 'stage' parameter to make it a single function with various failure modes. + */ +const std::vector makeTopicProduceData(const unsigned int stage) { + Bytes bytes = makeGoodRecordBatch(); + if (1 == stage) { + // No common fields before magic. + bytes.erase(bytes.begin(), bytes.end()); + } + if (2 == stage) { + // No magic. + bytes.erase(bytes.begin() + 16, bytes.end()); + } + if (3 == stage) { + // Bad magic. + bytes[16] = 42; + } + if (4 == stage) { + // No common fields after magic. + bytes.erase(bytes.begin() + 17, bytes.end()); + } + if (5 == stage) { + // No record length after common fields. + bytes[61] = 128; // This will force variable-length deserializer to wait for more bytes. + bytes.erase(bytes.begin() + 62, bytes.end()); + } + if (6 == stage) { + // Record length is higher than size of real data. + bytes.erase(bytes.begin() + 62, bytes.end()); + } + if (7 == stage) { + // Attributes field has negative length. + bytes[61] = 3; /* -1 */ + bytes.erase(bytes.begin() + 62, bytes.end()); + } + if (8 == stage) { + // Attributes field is missing - length is valid, but there is no more data to read. + bytes[61] = 0; + bytes.erase(bytes.begin() + 62, bytes.end()); + } + if (9 == stage) { + // Header count not present - we are going to drop all 21 header bytes after value. + bytes[61] = (36 - 21) << 1; // Length is encoded as variable length. + bytes.erase(bytes.begin() + 77, bytes.end()); + } + if (10 == stage) { + // Negative variable length integer for header count. + bytes[77] = 17; + } + if (11 == stage) { + // Last header value is going to be shorter, so there will be one unconsumed byte. + bytes[92] = 8; + } + const PartitionProduceData ppd = {0, bytes}; + const TopicProduceData tpd = {"topic", {ppd}}; + return {tpd}; +} + +TEST(RecordExtractorImpl, shouldHandleInvalidRecordBytes) { + const RecordExtractorImpl testee; + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(1)), EnvoyException, + "no common fields"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(2)), EnvoyException, + "magic byte is not present"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(3)), EnvoyException, + "unknown magic value"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(4)), EnvoyException, + "no attribute fields"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(5)), EnvoyException, + "no length"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(6)), EnvoyException, + "not enough bytes provided"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(7)), EnvoyException, + "has invalid length"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(8)), EnvoyException, + "attributes not present"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(9)), EnvoyException, + "header count not present"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(10)), EnvoyException, + "invalid header count"); + EXPECT_THROW_WITH_REGEX(testee.extractRecords(makeTopicProduceData(11)), EnvoyException, + "data left after consuming record"); +} + +// Minor helper function. +absl::string_view bytesToStringView(const Bytes& bytes) { + return {reinterpret_cast(bytes.data()), bytes.size()}; +} + +TEST(RecordExtractorImpl, shouldExtractByteArray) { + { + const Bytes noBytes = Bytes(0); + auto arg = bytesToStringView(noBytes); + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractByteArray(arg), EnvoyException, + "byte array length not present"); + } + { + const Bytes nullValueBytes = {0b00000001}; // Length = -1. + auto arg = bytesToStringView(nullValueBytes); + EXPECT_EQ(RecordExtractorImpl::extractByteArray(arg), absl::string_view()); + } + { + const Bytes negativeLengthBytes = {0b01111111}; // Length = -64. + auto arg = bytesToStringView(negativeLengthBytes); + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractByteArray(arg), EnvoyException, + "byte array length less than -1: -64"); + } + { + const Bytes bigLengthBytes = {0b01111110}; // Length = 63. + auto arg = bytesToStringView(bigLengthBytes); + EXPECT_THROW_WITH_REGEX(RecordExtractorImpl::extractByteArray(arg), EnvoyException, + "byte array length larger than data provided: 63 vs 0"); + } + { + // Length = 4, 7 bytes follow, 4 should be consumed, 13s should stay unconsumed. + const Bytes goodBytes = {0b00001000, 42, 42, 42, 42, 13, 13, 13}; + auto arg = bytesToStringView(goodBytes); + EXPECT_EQ(RecordExtractorImpl::extractByteArray(arg), + absl::string_view(reinterpret_cast(goodBytes.data() + 1), 4)); + EXPECT_EQ(arg.data(), reinterpret_cast(goodBytes.data() + 5)); + EXPECT_EQ(arg.size(), 3); + } +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy