From 39f2d1736252779d6fbcf36925b0180ad7c5ae8f Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Fri, 10 Sep 2021 12:34:53 -0700 Subject: [PATCH 01/11] kafka: get rid of source patch and do the Windows-handling in processor script Signed-off-by: Adam Kotwasinski --- bazel/external/kafka_int32.patch | 27 ------------------- bazel/repositories.bzl | 1 - .../network/source/protocol/generator.py | 8 +++--- 3 files changed, 5 insertions(+), 31 deletions(-) delete mode 100644 bazel/external/kafka_int32.patch diff --git a/bazel/external/kafka_int32.patch b/bazel/external/kafka_int32.patch deleted file mode 100644 index 8b88fe3358211..0000000000000 --- a/bazel/external/kafka_int32.patch +++ /dev/null @@ -1,27 +0,0 @@ ---- DescribeGroupsResponse.json 2020-03-25 16:12:16.373302600 -0400 -+++ DescribeGroupsResponse.json 2020-03-25 16:11:16.184156200 -0400 -@@ -63,7 +63,7 @@ - { "name": "MemberAssignment", "type": "bytes", "versions": "0+", - "about": "The current assignment provided by the group leader." } - ]}, -- { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648", -+ { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "INT32_MIN", - "about": "32-bit bitfield to represent authorized operations for this group." } - ]} - ] - ---- MetadataResponse.json 2020-03-25 15:53:36.319161000 -0400 -+++ MetadataResponse.json 2020-03-25 15:54:11.510400000 -0400 -@@ -81,10 +81,10 @@ - { "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true, - "about": "The set of offline replicas of this partition." } - ]}, -- { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", -+ { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "INT32_MIN", - "about": "32-bit bitfield to represent authorized operations for this topic." } - ]}, -- { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", -+ { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "INT32_MIN", - "about": "32-bit bitfield to represent authorized operations for this cluster." } - ] - } diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index cb8967cd6faee..2547bfbec54f4 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -1065,7 +1065,6 @@ filegroup( external_http_archive( name = "kafka_source", build_file_content = KAFKASOURCE_BUILD_CONTENT, - patches = ["@envoy//bazel/external:kafka_int32.patch"], ) # This archive provides Kafka C/CPP client used by mesh filter to communicate with upstream diff --git a/contrib/kafka/filters/network/source/protocol/generator.py b/contrib/kafka/filters/network/source/protocol/generator.py index 846dd2aa2d9b7..18155ca96edb9 100755 --- a/contrib/kafka/filters/network/source/protocol/generator.py +++ b/contrib/kafka/filters/network/source/protocol/generator.py @@ -107,8 +107,8 @@ def __init__(self): def parse_messages(self, input_files): """ - Parse request/response structures from provided input files. - """ + Parse request/response structures from provided input files. + """ import re import json @@ -123,7 +123,9 @@ def parse_messages(self, input_files): without_comments = re.sub(r'\s*//.*\n', '\n', raw_contents) without_empty_newlines = re.sub( r'^\s*$', '', without_comments, flags=re.MULTILINE) - message_spec = json.loads(without_empty_newlines) + # Windows support: see PR 10542 for details. + amended = re.sub(r'-2147483648', 'INT32_MIN', without_empty_newlines) + message_spec = json.loads(amended) message = self.parse_top_level_element(message_spec) messages.append(message) except Exception as e: From fd076c1f88af943b152c781393a361113984b5e9 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Fri, 10 Sep 2021 12:56:37 -0700 Subject: [PATCH 02/11] kafka: upgrade kafka-server dependency to 2.8.0 Signed-off-by: Adam Kotwasinski --- bazel/repository_locations.bzl | 6 +- .../filters/network/source/kafka_types.h | 14 ++++ .../network/source/protocol/generator.py | 29 ++++++- .../filters/network/source/serialization.h | 78 +++++++++++++++++++ .../network/test/serialization_test.cc | 4 + 5 files changed, 124 insertions(+), 7 deletions(-) diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index f088dfd3d05f4..a48a28bda3ec6 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -930,13 +930,13 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "Kafka (source)", project_desc = "Open-source distributed event streaming platform", project_url = "https://kafka.apache.org", - version = "2.4.1", - sha256 = "740236f44d66e33ea83382383b4fb7eabdab7093a644b525dd5ec90207f933bd", + version = "2.8.0", + sha256 = "02e00d63e5190b916d857cef5935af2eb448a33c2a8479698243cbb2f7e6d607", strip_prefix = "kafka-{version}/clients/src/main/resources/common/message", urls = ["https://github.com/apache/kafka/archive/{version}.zip"], use_category = ["dataplane_ext"], extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"], - release_date = "2020-03-03", + release_date = "2021-04-18", cpe = "cpe:2.3:a:apache:kafka:*", ), edenhill_librdkafka = dict( diff --git a/contrib/kafka/filters/network/source/kafka_types.h b/contrib/kafka/filters/network/source/kafka_types.h index 3240b9a9c2d6c..aef2ee018b5b0 100644 --- a/contrib/kafka/filters/network/source/kafka_types.h +++ b/contrib/kafka/filters/network/source/kafka_types.h @@ -31,6 +31,20 @@ using NullableBytes = absl::optional; */ template using NullableArray = absl::optional>; +/** + * Analogous to: + * https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/Uuid.java#L28 + */ +struct Uuid { + + const int64_t msb_; + const int64_t lsb_; + + Uuid(const int64_t msb, const int64_t lsb) : msb_{msb}, lsb_{lsb} {}; + + bool operator==(const Uuid& rhs) const { return msb_ == rhs.msb_ && lsb_ == rhs.lsb_; }; +}; + } // namespace Kafka } // namespace NetworkFilters } // namespace Extensions diff --git a/contrib/kafka/filters/network/source/protocol/generator.py b/contrib/kafka/filters/network/source/protocol/generator.py index 18155ca96edb9..8057d2eb0f899 100755 --- a/contrib/kafka/filters/network/source/protocol/generator.py +++ b/contrib/kafka/filters/network/source/protocol/generator.py @@ -125,9 +125,15 @@ def parse_messages(self, input_files): r'^\s*$', '', without_comments, flags=re.MULTILINE) # Windows support: see PR 10542 for details. amended = re.sub(r'-2147483648', 'INT32_MIN', without_empty_newlines) + # FIXME - use 2.8.1-rc0 + if input_file == 'external/kafka_source/DescribeProducersRequest.json': + amended = amended[:-6] message_spec = json.loads(amended) - message = self.parse_top_level_element(message_spec) - messages.append(message) + # Adopt publicly available messages only: https://kafka.apache.org/28/protocol.html#protocol_api_keys + api_key = message_spec['apiKey'] + if api_key <= 51 or api_key in [56, 57, 60, 61]: + message = self.parse_top_level_element(message_spec) + messages.append(message) except Exception as e: print('could not process %s' % input_file) raise @@ -381,7 +387,7 @@ def default_value(self): return str(self.type.default_value()) def example_value_for_test(self, version): - if self.is_nullable(): + if self.is_nullable_in_version(version): return 'absl::make_optional<%s>(%s)' % ( self.type.name, self.type.example_value_for_test(version)) else: @@ -472,7 +478,7 @@ class Primitive(TypeSpecification): Represents a Kafka primitive value. """ - USABLE_PRIMITIVE_TYPE_NAMES = ['bool', 'int8', 'int16', 'int32', 'int64', 'string', 'bytes'] + USABLE_PRIMITIVE_TYPE_NAMES = ['bool', 'int8', 'int16', 'int32', 'int64', 'uint16', 'float64', 'string', 'bytes', 'records', 'uuid'] KAFKA_TYPE_TO_ENVOY_TYPE = { 'string': 'std::string', @@ -481,7 +487,11 @@ class Primitive(TypeSpecification): 'int16': 'int16_t', 'int32': 'int32_t', 'int64': 'int64_t', + 'uint16': 'uint16_t', + 'float64': 'double', 'bytes': 'Bytes', + 'records': 'Bytes', + 'uuid': 'Uuid', 'tagged_fields': 'TaggedFields', } @@ -492,7 +502,11 @@ class Primitive(TypeSpecification): 'int16': 'Int16Deserializer', 'int32': 'Int32Deserializer', 'int64': 'Int64Deserializer', + 'uint16': 'UInt16Deserializer', + 'float64': 'Float64Deserializer', 'bytes': 'BytesDeserializer', + 'records': 'BytesDeserializer', + 'uuid': 'UuidDeserializer', 'tagged_fields': 'TaggedFieldsDeserializer', } @@ -510,6 +524,7 @@ class Primitive(TypeSpecification): 'int32': '0', 'int64': '0', 'bytes': '{}', + 'uuid': 'Uuid{0, 0}', 'tagged_fields': 'TaggedFields({})', } @@ -527,8 +542,14 @@ class Primitive(TypeSpecification): 'static_cast(32)', 'int64': 'static_cast(64)', + 'float64': + 'static_cast(13.125)', 'bytes': 'Bytes({0, 1, 2, 3})', + 'records': + 'Bytes({0, 1, 2, 3})', + 'uuid': + 'Uuid{13, 42}', 'tagged_fields': 'TaggedFields{std::vector{{10, Bytes({1, 2, 3})}, {20, Bytes({4, 5, 6})}}}', } diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index 3401199c002f8..49bfedbec6a2d 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -112,6 +112,18 @@ class Int16Deserializer : public IntDeserializer { } }; +/** + * Integer deserializer for uint16_t. + */ +class UInt16Deserializer : public IntDeserializer { +public: + uint16_t get() const override { + uint16_t result; + safeMemcpyUnsafeSrc(&result, buf_); + return be16toh(result); + } +}; + /** * Integer deserializer for int32_t. */ @@ -148,6 +160,21 @@ class Int64Deserializer : public IntDeserializer { } }; +/** + * Deserializer for Kafka Float64 type. + * Reference: https://kafka.apache.org/28/protocol.html#protocol_types + * Represents a double-precision 64-bit format IEEE 754 value. The values are encoded using eight bytes in network byte order (big-endian). + */ +class Float64Deserializer : public IntDeserializer { +public: + double get() const override { + uint64_t in_network_order; + safeMemcpyUnsafeSrc(&in_network_order, buf_); + uint64_t in_host_order = be64toh(in_network_order); + return *reinterpret_cast(&in_host_order); + } +}; + /** * Deserializer for boolean values. * Uses a single int8 deserializer, and checks whether the results equals 0. @@ -851,6 +878,30 @@ class NullableCompactArrayDeserializer bool ready_{false}; }; +/** + * Kafka UUID is basically two longs, so we are going to keep model them the same way. + * Reference: https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/Uuid.java#L38 + */ +class UuidDeserializer : public Deserializer { +public: + uint32_t feed(absl::string_view& data) override { + uint32_t consumed = 0; + consumed += high_bytes_deserializer_.feed(data); + consumed += low_bytes_deserializer_.feed(data); + return consumed; + } + + bool ready() const override { return low_bytes_deserializer_.ready(); } + + Uuid get() const override { + return {high_bytes_deserializer_.get(), low_bytes_deserializer_.get()}; + } + +private: + Int64Deserializer high_bytes_deserializer_; + Int64Deserializer low_bytes_deserializer_; +}; + /** * Encodes provided argument in Kafka format. * In case of primitive types, this is done explicitly as per specification. @@ -964,6 +1015,7 @@ COMPUTE_SIZE_OF_NUMERIC_TYPE(int16_t) COMPUTE_SIZE_OF_NUMERIC_TYPE(int32_t) COMPUTE_SIZE_OF_NUMERIC_TYPE(uint32_t) COMPUTE_SIZE_OF_NUMERIC_TYPE(int64_t) +COMPUTE_SIZE_OF_NUMERIC_TYPE(double) /** * Template overload for string. @@ -1019,6 +1071,11 @@ inline uint32_t EncodingContext::computeSize(const NullableArray& arg) const return arg ? computeSize(*arg) : sizeof(int32_t); } +// FIXME +template <> inline uint32_t EncodingContext::computeSize(const Uuid&) const { + return 2 * sizeof(uint64_t); +} + /** * For non-primitive types, call `computeCompactSize` on them, to delegate the work to the entity * itself. The entity may use the information in context to decide which fields are included etc. @@ -1135,6 +1192,20 @@ ENCODE_NUMERIC_TYPE(int32_t, htobe32); ENCODE_NUMERIC_TYPE(uint32_t, htobe32); ENCODE_NUMERIC_TYPE(int64_t, htobe64); +/** + * Template overload for double. + * Encodes 8 bytes. + */ +template <> inline uint32_t EncodingContext::encode(const double& arg, Buffer::Instance& dst) { + static_assert(sizeof(double) == sizeof(uint64_t)); + + double tmp = arg; + const uint64_t as_long = *reinterpret_cast(&tmp); + const uint64_t in_network_order = htobe64(as_long); + dst.add(&in_network_order, sizeof(uint64_t)); + return sizeof(uint64_t); +} + /** * Template overload for bool. * Encode boolean as a single byte. @@ -1227,6 +1298,13 @@ uint32_t EncodingContext::encode(const NullableArray& arg, Buffer::Instance& } } +template <> inline uint32_t EncodingContext::encode(const Uuid& arg, Buffer::Instance& dst) { + uint32_t result = 0; + result += encode(arg.msb_, dst); + result += encode(arg.lsb_, dst); + return result; +} + /** * For non-primitive types, call `encodeCompact` on them, to delegate the serialization to the * entity itself. diff --git a/contrib/kafka/filters/network/test/serialization_test.cc b/contrib/kafka/filters/network/test/serialization_test.cc index c177e86364dd7..fc882cbfa836c 100644 --- a/contrib/kafka/filters/network/test/serialization_test.cc +++ b/contrib/kafka/filters/network/test/serialization_test.cc @@ -23,9 +23,11 @@ namespace SerializationTest { TEST_EmptyDeserializerShouldNotBeReady(Int8Deserializer); TEST_EmptyDeserializerShouldNotBeReady(Int16Deserializer); +TEST_EmptyDeserializerShouldNotBeReady(UInt16Deserializer); TEST_EmptyDeserializerShouldNotBeReady(Int32Deserializer); TEST_EmptyDeserializerShouldNotBeReady(UInt32Deserializer); TEST_EmptyDeserializerShouldNotBeReady(Int64Deserializer); +TEST_EmptyDeserializerShouldNotBeReady(Float64Deserializer); TEST_EmptyDeserializerShouldNotBeReady(BooleanDeserializer); TEST_EmptyDeserializerShouldNotBeReady(VarUInt32Deserializer); TEST_EmptyDeserializerShouldNotBeReady(VarInt32Deserializer); @@ -77,9 +79,11 @@ TEST(NullableCompactArrayDeserializer, EmptyBufferShouldNotBeReady) { TEST_DeserializerShouldDeserialize(Int8Deserializer, int8_t, 42); TEST_DeserializerShouldDeserialize(Int16Deserializer, int16_t, 42); +TEST_DeserializerShouldDeserialize(UInt16Deserializer, uint16_t, 42); TEST_DeserializerShouldDeserialize(Int32Deserializer, int32_t, 42); TEST_DeserializerShouldDeserialize(UInt32Deserializer, uint32_t, 42); TEST_DeserializerShouldDeserialize(Int64Deserializer, int64_t, 42); +TEST_DeserializerShouldDeserialize(Int64Deserializer, double, 13.25); TEST_DeserializerShouldDeserialize(BooleanDeserializer, bool, true); EncodingContext encoder{-1}; // Provided api_version does not matter for primitive types. From 6a91c67bfb926efb3667f1fbc7b6d1f1cfe19522 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Thu, 16 Sep 2021 15:25:52 -0700 Subject: [PATCH 03/11] kafka: Use 2.8.1-rc1 Signed-off-by: Adam Kotwasinski --- bazel/repository_locations.bzl | 6 +++--- contrib/kafka/filters/network/source/protocol/generator.py | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index a48a28bda3ec6..7f8736a5ea5a7 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -930,13 +930,13 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "Kafka (source)", project_desc = "Open-source distributed event streaming platform", project_url = "https://kafka.apache.org", - version = "2.8.0", - sha256 = "02e00d63e5190b916d857cef5935af2eb448a33c2a8479698243cbb2f7e6d607", + version = "2.8.1-rc1", + sha256 = "4f013aadedd0c26c31c67bc8f3984833430316fa990553ded77dfbaa65f43376", strip_prefix = "kafka-{version}/clients/src/main/resources/common/message", urls = ["https://github.com/apache/kafka/archive/{version}.zip"], use_category = ["dataplane_ext"], extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"], - release_date = "2021-04-18", + release_date = "2021-09-14", cpe = "cpe:2.3:a:apache:kafka:*", ), edenhill_librdkafka = dict( diff --git a/contrib/kafka/filters/network/source/protocol/generator.py b/contrib/kafka/filters/network/source/protocol/generator.py index 8057d2eb0f899..34300de0dd48e 100755 --- a/contrib/kafka/filters/network/source/protocol/generator.py +++ b/contrib/kafka/filters/network/source/protocol/generator.py @@ -125,11 +125,12 @@ def parse_messages(self, input_files): r'^\s*$', '', without_comments, flags=re.MULTILINE) # Windows support: see PR 10542 for details. amended = re.sub(r'-2147483648', 'INT32_MIN', without_empty_newlines) - # FIXME - use 2.8.1-rc0 + # Kafka JSON files are malformed. See KAFKA-12794. if input_file == 'external/kafka_source/DescribeProducersRequest.json': amended = amended[:-6] message_spec = json.loads(amended) - # Adopt publicly available messages only: https://kafka.apache.org/28/protocol.html#protocol_api_keys + # Adopt publicly available messages only: + # https://kafka.apache.org/28/protocol.html#protocol_api_keys api_key = message_spec['apiKey'] if api_key <= 51 or api_key in [56, 57, 60, 61]: message = self.parse_top_level_element(message_spec) From f42309221eb318f1772a1bf5f119fd1c58c1d4c1 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Thu, 16 Sep 2021 15:47:20 -0700 Subject: [PATCH 04/11] kafka: handle struct renames and collisions, non-sequential api keys, some args being optional now Signed-off-by: Adam Kotwasinski --- .../source/mesh/command_handlers/api_versions.cc | 8 ++++---- .../source/mesh/command_handlers/metadata.cc | 8 +++++++- .../source/mesh/command_handlers/produce.cc | 2 +- .../command_handlers/produce_record_extractor.cc | 6 +++--- .../filters/network/source/protocol/generator.py | 11 +++++++---- .../kafka/filters/network/source/serialization.h | 15 ++++++++++++--- .../network/test/broker/filter_protocol_test.cc | 2 +- .../mesh/command_handlers/produce_unit_test.cc | 16 ++++++++-------- .../test/mesh/request_processor_unit_test.cc | 6 +++--- .../filters/network/test/message_utilities.h | 5 ++--- .../network/test/metrics_integration_test.cc | 4 ++-- .../test/protocol/request_utilities_cc.j2 | 15 +++++++++++---- .../test/protocol/response_utilities_cc.j2 | 7 +++++-- 13 files changed, 66 insertions(+), 39 deletions(-) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.cc index 1fa8cfa8f5b82..31cb53f12a402 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/api_versions.cc @@ -38,10 +38,10 @@ AbstractResponseSharedPtr ApiVersionsRequestHolder::computeAnswer() const { request_header_.correlation_id_}; const int16_t error_code = 0; - const ApiVersionsResponseKey produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED, - MAX_PRODUCE_SUPPORTED}; - const ApiVersionsResponseKey metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED, - MAX_METADATA_SUPPORTED}; + const ApiVersion produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED, + MAX_PRODUCE_SUPPORTED}; + const ApiVersion metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED, + MAX_METADATA_SUPPORTED}; const ApiVersionsResponse real_response = {error_code, {produce_entry, metadata_entry}}; return std::make_shared>(metadata, real_response); diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/metadata.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/metadata.cc index 05b63b451d1fa..1ca4fb9c7833a 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/metadata.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/metadata.cc @@ -34,7 +34,13 @@ AbstractResponseSharedPtr MetadataRequestHolder::computeAnswer() const { std::vector response_topics; if (request_->data_.topics_) { for (const auto& topic : *(request_->data_.topics_)) { - const std::string& topic_name = topic.name_; + if (!topic.name_) { + // The client sent request without topic name (UUID was sent instead). + // We do not know how to handle it, so do not send any metadata. + // This will cause failures in clients downstream. + continue; + } + const std::string& topic_name = *(topic.name_); std::vector topic_partitions; const absl::optional cluster_config = configuration_.computeClusterConfigForTopic(topic_name); diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc index e2ed06fdbb17e..48d08947f0890 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce.cc @@ -20,7 +20,7 @@ ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter, const RecordExtractor& record_extractor, const std::shared_ptr> request) : BaseInFlightRequest{filter}, kafka_facade_{kafka_facade}, request_{request} { - outbound_records_ = record_extractor.extractRecords(request_->data_.topics_); + outbound_records_ = record_extractor.extractRecords(request_->data_.topic_data_); expected_responses_ = outbound_records_.size(); } diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc index 3c98dc4885cf9..6ea8620d600a5 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.cc @@ -10,11 +10,11 @@ std::vector RecordExtractorImpl::extractRecords(const std::vector& data) const { std::vector result; for (const auto& topic_data : data) { - for (const auto& partition_data : topic_data.partitions_) { + for (const auto& partition_data : topic_data.partition_data_) { // Kafka protocol allows nullable data. if (partition_data.records_) { - const auto topic_result = extractPartitionRecords( - topic_data.name_, partition_data.partition_index_, *(partition_data.records_)); + const auto topic_result = extractPartitionRecords(topic_data.name_, partition_data.index_, + *(partition_data.records_)); std::copy(topic_result.begin(), topic_result.end(), std::back_inserter(result)); } } diff --git a/contrib/kafka/filters/network/source/protocol/generator.py b/contrib/kafka/filters/network/source/protocol/generator.py index 34300de0dd48e..49614627d203b 100755 --- a/contrib/kafka/filters/network/source/protocol/generator.py +++ b/contrib/kafka/filters/network/source/protocol/generator.py @@ -15,7 +15,7 @@ def generate_main_code(type, main_header_file, resolver_cc_file, metrics_header_ - resolver_cc_file - contains request api key & version mapping to deserializer (from header file) - metrics_header_file - contains metrics with names corresponding to messages """ - processor = StatefulProcessor() + processor = StatefulProcessor(type) # Parse provided input files. messages = processor.parse_messages(input_files) @@ -66,7 +66,7 @@ def generate_test_code( - codec_test_cc_file - tests involving codec and Request/ResponseParserResolver, - utilities_cc_file - utilities for creating sample messages. """ - processor = StatefulProcessor() + processor = StatefulProcessor(type) # Parse provided input files. messages = processor.parse_messages(input_files) @@ -97,7 +97,8 @@ class StatefulProcessor: AlterConfigsResource, what would cause a compile-time error if we were to handle it trivially). """ - def __init__(self): + def __init__(self, type): + self.type = type # Complex types that have been encountered during processing. self.known_types = set() # Name of parent message type that's being processed right now. @@ -204,7 +205,9 @@ def parse_complex_type(self, type_name, field_spec, versions): child = self.parse_field(child_field, versions[-1]) if child is not None: fields.append(child) - + # Some structures share the same name, use request/response as prefix. + if type_name in ['EntityData', 'EntryData', 'PartitionData', 'TopicData']: + type_name = self.type.capitalize() + type_name # Some of the types repeat multiple times (e.g. AlterableConfig). # In such a case, every second or later occurrence of the same name is going to be prefixed # with parent type, e.g. we have AlterableConfig (for AlterConfigsRequest) and then diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index 49bfedbec6a2d..bdd0217b9e561 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -163,7 +163,8 @@ class Int64Deserializer : public IntDeserializer { /** * Deserializer for Kafka Float64 type. * Reference: https://kafka.apache.org/28/protocol.html#protocol_types - * Represents a double-precision 64-bit format IEEE 754 value. The values are encoded using eight bytes in network byte order (big-endian). + * Represents a double-precision 64-bit format IEEE 754 value. The values are encoded using eight + * bytes in network byte order (big-endian). */ class Float64Deserializer : public IntDeserializer { public: @@ -880,7 +881,8 @@ class NullableCompactArrayDeserializer /** * Kafka UUID is basically two longs, so we are going to keep model them the same way. - * Reference: https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/Uuid.java#L38 + * Reference: + * https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/Uuid.java#L38 */ class UuidDeserializer : public Deserializer { public: @@ -1012,6 +1014,7 @@ template inline uint32_t EncodingContext::computeSize(const T& arg) COMPUTE_SIZE_OF_NUMERIC_TYPE(bool) COMPUTE_SIZE_OF_NUMERIC_TYPE(int8_t) COMPUTE_SIZE_OF_NUMERIC_TYPE(int16_t) +COMPUTE_SIZE_OF_NUMERIC_TYPE(uint16_t) COMPUTE_SIZE_OF_NUMERIC_TYPE(int32_t) COMPUTE_SIZE_OF_NUMERIC_TYPE(uint32_t) COMPUTE_SIZE_OF_NUMERIC_TYPE(int64_t) @@ -1071,7 +1074,9 @@ inline uint32_t EncodingContext::computeSize(const NullableArray& arg) const return arg ? computeSize(*arg) : sizeof(int32_t); } -// FIXME +/** + * Template overload for Uuid. + */ template <> inline uint32_t EncodingContext::computeSize(const Uuid&) const { return 2 * sizeof(uint64_t); } @@ -1188,6 +1193,7 @@ template <> inline uint32_t EncodingContext::encode(const int8_t& arg, Buffer::I } ENCODE_NUMERIC_TYPE(int16_t, htobe16); +ENCODE_NUMERIC_TYPE(uint16_t, htobe16); ENCODE_NUMERIC_TYPE(int32_t, htobe32); ENCODE_NUMERIC_TYPE(uint32_t, htobe32); ENCODE_NUMERIC_TYPE(int64_t, htobe64); @@ -1298,6 +1304,9 @@ uint32_t EncodingContext::encode(const NullableArray& arg, Buffer::Instance& } } +/** + * Template overload for Uuid. + */ template <> inline uint32_t EncodingContext::encode(const Uuid& arg, Buffer::Instance& dst) { uint32_t result = 0; result += encode(arg.msb_, dst); diff --git a/contrib/kafka/filters/network/test/broker/filter_protocol_test.cc b/contrib/kafka/filters/network/test/broker/filter_protocol_test.cc index 8d790b14806eb..3f30be33c7e47 100644 --- a/contrib/kafka/filters/network/test/broker/filter_protocol_test.cc +++ b/contrib/kafka/filters/network/test/broker/filter_protocol_test.cc @@ -143,7 +143,7 @@ TEST_F(KafkaBrokerFilterProtocolTest, ShouldProcessMessages) { ASSERT_EQ(result2, Network::FilterStatus::Continue); // Also, assert that every message type has been processed properly. - for (int16_t i = 0; i < MessageUtilities::apiKeys(); ++i) { + for (const int16_t i : MessageUtilities::apiKeys()) { // We should have received one request per api version. const Stats::Counter& request_counter = scope_.counter(MessageUtilities::requestMetric(i)); ASSERT_EQ(request_counter.value(), MessageUtilities::requestApiVersions(i)); diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/produce_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_unit_test.cc index efa05e82e1f88..cdcc53d219786 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/produce_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/produce_unit_test.cc @@ -128,10 +128,10 @@ TEST_F(ProduceUnitTest, ShouldSendRecordsInNormalFlow) { ASSERT_TRUE(response); const std::vector responses = response->data_.responses_; EXPECT_EQ(responses.size(), 2); - EXPECT_EQ(responses[0].partitions_[0].error_code_, dm1.error_code_); - EXPECT_EQ(responses[0].partitions_[0].base_offset_, dm1.offset_); - EXPECT_EQ(responses[1].partitions_[0].error_code_, dm2.error_code_); - EXPECT_EQ(responses[1].partitions_[0].base_offset_, dm2.offset_); + EXPECT_EQ(responses[0].partition_responses_[0].error_code_, dm1.error_code_); + EXPECT_EQ(responses[0].partition_responses_[0].base_offset_, dm1.offset_); + EXPECT_EQ(responses[1].partition_responses_[0].error_code_, dm2.error_code_); + EXPECT_EQ(responses[1].partition_responses_[0].base_offset_, dm2.offset_); } // Typical flow without errors. @@ -182,9 +182,9 @@ TEST_F(ProduceUnitTest, ShouldMergeOutboundRecordResponses) { ASSERT_TRUE(response); const std::vector responses = response->data_.responses_; EXPECT_EQ(responses.size(), 1); - EXPECT_EQ(responses[0].partitions_.size(), 1); - EXPECT_EQ(responses[0].partitions_[0].error_code_, 0); - EXPECT_EQ(responses[0].partitions_[0].base_offset_, 1313); + EXPECT_EQ(responses[0].partition_responses_.size(), 1); + EXPECT_EQ(responses[0].partition_responses_[0].error_code_, 0); + EXPECT_EQ(responses[0].partition_responses_[0].base_offset_, 1313); } // Flow with errors. @@ -237,7 +237,7 @@ TEST_F(ProduceUnitTest, ShouldHandleDeliveryErrors) { ASSERT_TRUE(response); const std::vector responses = response->data_.responses_; EXPECT_EQ(responses.size(), 1); - EXPECT_EQ(responses[0].partitions_[0].error_code_, dm1.error_code_); + EXPECT_EQ(responses[0].partition_responses_[0].error_code_, dm1.error_code_); } // As with current version of Kafka library we have no capability of linking producer's notification diff --git a/contrib/kafka/filters/network/test/mesh/request_processor_unit_test.cc b/contrib/kafka/filters/network/test/mesh/request_processor_unit_test.cc index 605019141e707..c49b449138c50 100644 --- a/contrib/kafka/filters/network/test/mesh/request_processor_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/request_processor_unit_test.cc @@ -94,9 +94,9 @@ TEST_F(RequestProcessorTest, ShouldProcessApiVersionsRequest) { TEST_F(RequestProcessorTest, ShouldHandleUnsupportedRequest) { // given - const RequestHeader header = {LIST_OFFSET_REQUEST_API_KEY, 0, 0, absl::nullopt}; - const ListOffsetRequest data = {0, {}}; - const auto message = std::make_shared>(header, data); + const RequestHeader header = {LIST_OFFSETS_REQUEST_API_KEY, 0, 0, absl::nullopt}; + const ListOffsetsRequest data = {0, {}}; + const auto message = std::make_shared>(header, data); // when, then - exception gets thrown. EXPECT_THROW_WITH_REGEX(testee_.onMessage(message), EnvoyException, "unsupported"); diff --git a/contrib/kafka/filters/network/test/message_utilities.h b/contrib/kafka/filters/network/test/message_utilities.h index 00278094e2cc1..1e8b5bac8bb44 100644 --- a/contrib/kafka/filters/network/test/message_utilities.h +++ b/contrib/kafka/filters/network/test/message_utilities.h @@ -25,10 +25,9 @@ class MessageUtilities { public: /** - * How many request/response types are supported. - * Proper values are 0..apiKeys() - 1. + * What are the supported request / response types. */ - static int16_t apiKeys(); + static std::vector apiKeys(); /** * How many request types are supported for given api key. diff --git a/contrib/kafka/filters/network/test/metrics_integration_test.cc b/contrib/kafka/filters/network/test/metrics_integration_test.cc index 47873fbfaeef6..488befd0c28eb 100644 --- a/contrib/kafka/filters/network/test/metrics_integration_test.cc +++ b/contrib/kafka/filters/network/test/metrics_integration_test.cc @@ -21,7 +21,7 @@ class MetricsIntegrationTest : public testing::Test { constexpr static int32_t UPDATE_COUNT = 42; TEST_F(MetricsIntegrationTest, ShouldUpdateRequestMetrics) { - for (int16_t api_key = 0; api_key < MessageUtilities::apiKeys(); ++api_key) { + for (const int16_t api_key : MessageUtilities::apiKeys()) { // given // when for (int i = 0; i < UPDATE_COUNT; ++i) { @@ -46,7 +46,7 @@ TEST_F(MetricsIntegrationTest, ShouldHandleUnparseableRequest) { } TEST_F(MetricsIntegrationTest, ShouldUpdateResponseMetrics) { - for (int16_t api_key = 0; api_key < MessageUtilities::apiKeys(); ++api_key) { + for (const int16_t api_key : MessageUtilities::apiKeys()) { // given // when for (int i = 0; i < UPDATE_COUNT; ++i) { diff --git a/contrib/kafka/filters/network/test/protocol/request_utilities_cc.j2 b/contrib/kafka/filters/network/test/protocol/request_utilities_cc.j2 index a90796c0acc11..3ec7d9f5535e9 100644 --- a/contrib/kafka/filters/network/test/protocol/request_utilities_cc.j2 +++ b/contrib/kafka/filters/network/test/protocol/request_utilities_cc.j2 @@ -3,6 +3,8 @@ This file contains implementation of request-related methods contained in 'message_utilities.h'. #} +#include + #include "contrib/kafka/filters/network/test/message_utilities.h" #include "contrib/kafka/filters/network/source/external/requests.h" @@ -12,8 +14,12 @@ namespace Extensions { namespace NetworkFilters { namespace Kafka { -int16_t MessageUtilities::apiKeys() { - return {{ message_types | length }}; +std::vector MessageUtilities::apiKeys() { + std::vector result; + {% for message_type in message_types %} + result.push_back({{ message_type.get_extra('api_key') }}); + {% endfor %} + return result; } int16_t MessageUtilities::requestApiVersions(const int16_t api_key) { @@ -30,7 +36,8 @@ int16_t MessageUtilities::requestApiVersions(const int16_t api_key) { std::vector MessageUtilities::makeRequests( const int16_t api_key, int32_t& correlation_id) { - if ((api_key < 0) || (api_key >= {{ message_types | length }})) { + const std::vector api_keys = apiKeys(); + if (std::find(api_keys.begin(), api_keys.end(), api_key) == api_keys.end()) { throw EnvoyException("unsupported api key used in test code"); } @@ -56,7 +63,7 @@ std::vector MessageUtilities::makeRequests( std::vector MessageUtilities::makeAllRequests() { std::vector result; int32_t correlation_id = 0; - for (int16_t i = 0; i < MessageUtilities::apiKeys(); ++i) { + for (const int16_t i : MessageUtilities::apiKeys()) { const std::vector tmp = MessageUtilities::makeRequests(i, correlation_id); result.insert(result.end(), tmp.begin(), tmp.end()); diff --git a/contrib/kafka/filters/network/test/protocol/response_utilities_cc.j2 b/contrib/kafka/filters/network/test/protocol/response_utilities_cc.j2 index cf41d02e3ca78..c57d386a5b565 100644 --- a/contrib/kafka/filters/network/test/protocol/response_utilities_cc.j2 +++ b/contrib/kafka/filters/network/test/protocol/response_utilities_cc.j2 @@ -3,6 +3,8 @@ This file contains implementation of response-related methods contained in 'message_utilities.h'. #} +#include + #include "contrib/kafka/filters/network/test/message_utilities.h" #include "contrib/kafka/filters/network/source/external/responses.h" @@ -26,7 +28,8 @@ int16_t MessageUtilities::responseApiVersions(const int16_t api_key) { std::vector MessageUtilities::makeResponses( const int16_t api_key, int32_t& correlation_id) { - if ((api_key < 0) || (api_key >= {{ message_types | length }})) { + const std::vector api_keys = apiKeys(); + if (std::find(api_keys.begin(), api_keys.end(), api_key) == api_keys.end()) { throw EnvoyException("unsupported api key used in test code"); } @@ -51,7 +54,7 @@ std::vector MessageUtilities::makeResponses( std::vector MessageUtilities::makeAllResponses() { std::vector result; int32_t correlation_id = 0; - for (int16_t i = 0; i < MessageUtilities::apiKeys(); ++i) { + for (const int16_t i : MessageUtilities::apiKeys()) { const std::vector tmp = MessageUtilities::makeResponses(i, correlation_id); result.insert(result.end(), tmp.begin(), tmp.end()); From be4a4f54634ecba12905456ff089b355776ef8ee Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Fri, 17 Sep 2021 17:02:39 -0700 Subject: [PATCH 05/11] kafka: fix test Signed-off-by: Adam Kotwasinski --- contrib/kafka/filters/network/test/serialization_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/kafka/filters/network/test/serialization_test.cc b/contrib/kafka/filters/network/test/serialization_test.cc index fc882cbfa836c..45bb561814fd0 100644 --- a/contrib/kafka/filters/network/test/serialization_test.cc +++ b/contrib/kafka/filters/network/test/serialization_test.cc @@ -83,7 +83,7 @@ TEST_DeserializerShouldDeserialize(UInt16Deserializer, uint16_t, 42); TEST_DeserializerShouldDeserialize(Int32Deserializer, int32_t, 42); TEST_DeserializerShouldDeserialize(UInt32Deserializer, uint32_t, 42); TEST_DeserializerShouldDeserialize(Int64Deserializer, int64_t, 42); -TEST_DeserializerShouldDeserialize(Int64Deserializer, double, 13.25); +TEST_DeserializerShouldDeserialize(Float64Deserializer, double, 13.25); TEST_DeserializerShouldDeserialize(BooleanDeserializer, bool, true); EncodingContext encoder{-1}; // Provided api_version does not matter for primitive types. From 518fa39fd100e8ffca06fc80fbb762c71ed447ab Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Mon, 20 Sep 2021 11:04:42 -0700 Subject: [PATCH 06/11] kafka: 2.8.1 + formatting Signed-off-by: Adam Kotwasinski --- bazel/repository_locations.bzl | 6 +++--- contrib/kafka/filters/network/source/protocol/generator.py | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index 7f8736a5ea5a7..e54fb9f82db51 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -930,13 +930,13 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "Kafka (source)", project_desc = "Open-source distributed event streaming platform", project_url = "https://kafka.apache.org", - version = "2.8.1-rc1", - sha256 = "4f013aadedd0c26c31c67bc8f3984833430316fa990553ded77dfbaa65f43376", + version = "2.8.1", + sha256 = "c3fd89257e056e11b5e1b09d4bbd8332ce5abfdfa7c7a5bb6a5cfe9860fcc688", strip_prefix = "kafka-{version}/clients/src/main/resources/common/message", urls = ["https://github.com/apache/kafka/archive/{version}.zip"], use_category = ["dataplane_ext"], extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"], - release_date = "2021-09-14", + release_date = "2021-09-17", cpe = "cpe:2.3:a:apache:kafka:*", ), edenhill_librdkafka = dict( diff --git a/contrib/kafka/filters/network/source/protocol/generator.py b/contrib/kafka/filters/network/source/protocol/generator.py index 49614627d203b..c583edae682f5 100755 --- a/contrib/kafka/filters/network/source/protocol/generator.py +++ b/contrib/kafka/filters/network/source/protocol/generator.py @@ -482,7 +482,10 @@ class Primitive(TypeSpecification): Represents a Kafka primitive value. """ - USABLE_PRIMITIVE_TYPE_NAMES = ['bool', 'int8', 'int16', 'int32', 'int64', 'uint16', 'float64', 'string', 'bytes', 'records', 'uuid'] + USABLE_PRIMITIVE_TYPE_NAMES = [ + 'bool', 'int8', 'int16', 'int32', 'int64', 'uint16', 'float64', 'string', 'bytes', + 'records', 'uuid' + ] KAFKA_TYPE_TO_ENVOY_TYPE = { 'string': 'std::string', From fdd255424c975a98a505539e157dab6ffda9b810 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Mon, 20 Sep 2021 11:43:46 -0700 Subject: [PATCH 07/11] kafka: release date Signed-off-by: Adam Kotwasinski --- bazel/repository_locations.bzl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index e54fb9f82db51..8231618fdbb26 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -936,7 +936,7 @@ REPOSITORY_LOCATIONS_SPEC = dict( urls = ["https://github.com/apache/kafka/archive/{version}.zip"], use_category = ["dataplane_ext"], extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"], - release_date = "2021-09-17", + release_date = "2021-09-14", cpe = "cpe:2.3:a:apache:kafka:*", ), edenhill_librdkafka = dict( From 1d5e5e1e6a328b0c2b4382fdd713c03247cd0f83 Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Mon, 20 Sep 2021 14:21:53 -0700 Subject: [PATCH 08/11] kafka: more tests and docs update Signed-off-by: Adam Kotwasinski --- .../filters/network/source/kafka_response.h | 2 +- .../filters/network/source/kafka_types.h | 2 +- .../source/mesh/command_handlers/metadata.cc | 2 +- .../filters/network/source/serialization.h | 21 +++++++++++-------- .../filters/network/source/tagged_fields.h | 2 +- .../command_handlers/metadata_unit_test.cc | 10 +++++++-- .../network/test/serialization_test.cc | 8 +++++++ .../network_filters/kafka_broker_filter.rst | 2 +- .../network_filters/kafka_mesh_filter.rst | 2 +- 9 files changed, 34 insertions(+), 17 deletions(-) diff --git a/contrib/kafka/filters/network/source/kafka_response.h b/contrib/kafka/filters/network/source/kafka_response.h index 32bc8317f5131..f135f5cacb744 100644 --- a/contrib/kafka/filters/network/source/kafka_response.h +++ b/contrib/kafka/filters/network/source/kafka_response.h @@ -13,7 +13,7 @@ namespace Kafka { * Decides if response with given api key & version should have tagged fields in header. * Bear in mind, that ApiVersions responses DO NOT contain tagged fields in header (despite having * flexible versions) as per - * https://github.com/apache/kafka/blob/2.4.0/clients/src/main/resources/common/message/ApiVersionsResponse.json#L24 + * https://github.com/apache/kafka/blob/2.8.1/clients/src/main/resources/common/message/ApiVersionsResponse.json#L24 * This method gets implemented in generated code through 'kafka_response_resolver_cc.j2'. * * @param api_key Kafka request key. diff --git a/contrib/kafka/filters/network/source/kafka_types.h b/contrib/kafka/filters/network/source/kafka_types.h index aef2ee018b5b0..d01c304984e4c 100644 --- a/contrib/kafka/filters/network/source/kafka_types.h +++ b/contrib/kafka/filters/network/source/kafka_types.h @@ -33,7 +33,7 @@ template using NullableArray = absl::optional>; /** * Analogous to: - * https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/Uuid.java#L28 + * https://github.com/apache/kafka/blob/2.8.1/clients/src/main/java/org/apache/kafka/common/Uuid.java#L28 */ struct Uuid { diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/metadata.cc b/contrib/kafka/filters/network/source/mesh/command_handlers/metadata.cc index 1ca4fb9c7833a..07f402a80802e 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/metadata.cc +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/metadata.cc @@ -33,7 +33,7 @@ AbstractResponseSharedPtr MetadataRequestHolder::computeAnswer() const { advertised_address.second}; std::vector response_topics; if (request_->data_.topics_) { - for (const auto& topic : *(request_->data_.topics_)) { + for (const MetadataRequestTopic& topic : *(request_->data_.topics_)) { if (!topic.name_) { // The client sent request without topic name (UUID was sent instead). // We do not know how to handle it, so do not send any metadata. diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index bdd0217b9e561..25e35ec58048c 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -203,9 +203,10 @@ class BooleanDeserializer : public Deserializer { * https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields#KIP-482:TheKafkaProtocolshouldSupportOptionalTaggedFields-UnsignedVarints * * Impl note: - * This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 5-byte + * This implementation is equivalent to the one present in Kafka, what means that for 5-byte * inputs, the data at bits 5-7 in 5th byte are *ignored* (as long as 8th bit is unset). - * Reference: org.apache.kafka.common.utils.ByteUtils.readUnsignedVarint + * Reference: + * https://github.com/apache/kafka/blob/2.8.1/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java#L142 */ class VarUInt32Deserializer : public Deserializer { public: @@ -255,12 +256,13 @@ class VarUInt32Deserializer : public Deserializer { /** * Deserializer for Kafka 'varint' type. - * Encoding documentation: https://kafka.apache.org/24/protocol.html#protocol_types + * Encoding documentation: https://kafka.apache.org/28/protocol.html#protocol_types * * Impl note: - * This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 5-byte + * This implementation is equivalent to the one present in Kafka, what means that for 5-byte * inputs, the data at bits 5-7 in 5th byte are *ignored* (as long as 8th bit is unset). - * Reference: org.apache.kafka.common.utils.ByteUtils.readVarint + * Reference: + * https://github.com/apache/kafka/blob/2.8.1/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java#L189 */ class VarInt32Deserializer : public Deserializer { public: @@ -281,12 +283,13 @@ class VarInt32Deserializer : public Deserializer { /** * Deserializer for Kafka 'varlong' type. - * Encoding documentation: https://kafka.apache.org/24/protocol.html#protocol_types + * Encoding documentation: https://kafka.apache.org/28/protocol.html#protocol_types * * Impl note: - * This implementation is equivalent to the one present in Kafka 2.4.0, what means that for 10-byte + * This implementation is equivalent to the one present in Kafka, what means that for 10-byte * inputs, the data at bits 3-7 in 10th byte are *ignored* (as long as 8th bit is unset). - * Reference: org.apache.kafka.common.utils.ByteUtils.readVarlong + * Reference: + * https://github.com/apache/kafka/blob/2.8.1/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java#L242 */ class VarInt64Deserializer : public Deserializer { public: @@ -882,7 +885,7 @@ class NullableCompactArrayDeserializer /** * Kafka UUID is basically two longs, so we are going to keep model them the same way. * Reference: - * https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/Uuid.java#L38 + * https://github.com/apache/kafka/blob/2.8.1/clients/src/main/java/org/apache/kafka/common/Uuid.java#L38 */ class UuidDeserializer : public Deserializer { public: diff --git a/contrib/kafka/filters/network/source/tagged_fields.h b/contrib/kafka/filters/network/source/tagged_fields.h index f9aebaf7472bf..96e3c72e50bba 100644 --- a/contrib/kafka/filters/network/source/tagged_fields.h +++ b/contrib/kafka/filters/network/source/tagged_fields.h @@ -6,7 +6,7 @@ /** * This header file provides serialization support for tagged fields structure added in 2.4. - * https://github.com/apache/kafka/blob/2.4.0/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java + * https://github.com/apache/kafka/blob/2.8.1/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java * * Impl note: contrary to other compact data structures, data in tagged field does not have +1 in * data length. diff --git a/contrib/kafka/filters/network/test/mesh/command_handlers/metadata_unit_test.cc b/contrib/kafka/filters/network/test/mesh/command_handlers/metadata_unit_test.cc index d9ffda89635c8..ba144e3ce74c2 100644 --- a/contrib/kafka/filters/network/test/mesh/command_handlers/metadata_unit_test.cc +++ b/contrib/kafka/filters/network/test/mesh/command_handlers/metadata_unit_test.cc @@ -39,8 +39,13 @@ TEST(MetadataTest, shouldBeAlwaysReadyForAnswer) { // Second topic is not going to have configuration present. EXPECT_CALL(configuration, computeClusterConfigForTopic("topic2")) .WillOnce(Return(absl::nullopt)); - const RequestHeader header = {0, 0, 0, absl::nullopt}; - const MetadataRequest data = {{MetadataRequestTopic{"topic1"}, MetadataRequestTopic{"topic2"}}}; + const RequestHeader header = {METADATA_REQUEST_API_KEY, METADATA_REQUEST_MAX_VERSION, 0, + absl::nullopt}; + const MetadataRequestTopic t1 = MetadataRequestTopic{"topic1"}; + const MetadataRequestTopic t2 = MetadataRequestTopic{"topic2"}; + // Third topic is not going to have an explicit name. + const MetadataRequestTopic t3 = MetadataRequestTopic{Uuid{13, 42}, absl::nullopt, TaggedFields{}}; + const MetadataRequest data = {{t1, t2, t3}}; const auto message = std::make_shared>(header, data); MetadataRequestHolder testee = {filter, configuration, message}; @@ -61,6 +66,7 @@ TEST(MetadataTest, shouldBeAlwaysReadyForAnswer) { ASSERT_TRUE(response); const auto topics = response->data_.topics_; EXPECT_EQ(topics.size(), 1); + EXPECT_EQ(topics[0].name_, *(t1.name_)); EXPECT_EQ(topics[0].partitions_.size(), 42); } diff --git a/contrib/kafka/filters/network/test/serialization_test.cc b/contrib/kafka/filters/network/test/serialization_test.cc index 45bb561814fd0..e189063e1abd2 100644 --- a/contrib/kafka/filters/network/test/serialization_test.cc +++ b/contrib/kafka/filters/network/test/serialization_test.cc @@ -40,6 +40,7 @@ TEST_EmptyDeserializerShouldNotBeReady(NullableCompactStringDeserializer); TEST_EmptyDeserializerShouldNotBeReady(BytesDeserializer); TEST_EmptyDeserializerShouldNotBeReady(CompactBytesDeserializer); TEST_EmptyDeserializerShouldNotBeReady(NullableBytesDeserializer); +TEST_EmptyDeserializerShouldNotBeReady(UuidDeserializer); TEST(ArrayDeserializer, EmptyBufferShouldNotBeReady) { // given @@ -525,6 +526,13 @@ TEST(NullableCompactArrayDeserializer, ShouldConsumeCorrectAmountOfDataForLargeI NullableCompactArrayDeserializer>(value); } +// UUID. + +TEST(UuidDeserializer, ShouldDeserialize) { + const Uuid value = {13, 42}; + serializeThenDeserializeAndCheckEquality(value); +} + // Tagged fields. TEST(TaggedFieldDeserializer, ShouldConsumeCorrectAmountOfData) { diff --git a/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst b/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst index 2c286686f33f5..ec7828db5c120 100644 --- a/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst +++ b/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst @@ -5,7 +5,7 @@ Kafka Broker filter The Apache Kafka broker filter decodes the client protocol for `Apache Kafka `_, both the requests and responses in the payload. -The message versions in `Kafka 2.4.0 `_ +The message versions in `Kafka 2.8.1 `_ are supported. The filter attempts not to influence the communication between client and brokers, so the messages that could not be decoded (due to Kafka client or broker running a newer version than supported by diff --git a/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst b/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst index 4a8504b7d67e3..6942bf8bb2187 100644 --- a/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst +++ b/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst @@ -6,7 +6,7 @@ Kafka Mesh filter The Apache Kafka mesh filter provides a facade for `Apache Kafka `_ producers. Produce requests sent to this filter insance can be forwarded to one of multiple clusters, depending on configured forwarding rules. Corresponding message versions from -Kafka 2.4.0 are supported. +Kafka 2.8.1 are supported. * :ref:`v3 API reference ` * This filter should be configured with the name *envoy.filters.network.kafka_mesh*. From aaf3daee0039f8ea8fb3fda9c4804e9d720b504c Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Tue, 21 Sep 2021 15:17:51 -0700 Subject: [PATCH 09/11] kafka: support for NullableCompactBytes (used in Produce v9) Signed-off-by: Adam Kotwasinski --- .../network/source/protocol/generator.py | 5 +- .../filters/network/source/serialization.cc | 15 ++++++ .../filters/network/source/serialization.h | 52 +++++++++++++++++++ .../network/test/serialization_test.cc | 26 ++++++++-- 4 files changed, 92 insertions(+), 6 deletions(-) diff --git a/contrib/kafka/filters/network/source/protocol/generator.py b/contrib/kafka/filters/network/source/protocol/generator.py index c583edae682f5..2fd18ebc2d69b 100755 --- a/contrib/kafka/filters/network/source/protocol/generator.py +++ b/contrib/kafka/filters/network/source/protocol/generator.py @@ -519,7 +519,8 @@ class Primitive(TypeSpecification): KAFKA_TYPE_TO_COMPACT_DESERIALIZER = { 'string': 'CompactStringDeserializer', - 'bytes': 'CompactBytesDeserializer' + 'bytes': 'CompactBytesDeserializer', + 'records': 'CompactBytesDeserializer' } # See https://github.com/apache/kafka/tree/trunk/clients/src/main/resources/common/message#deserializing-messages @@ -591,7 +592,7 @@ def default_value(self): return Primitive.compute(self.original_name, Primitive.KAFKA_TYPE_TO_DEFAULT_VALUE) def has_flexible_handling(self): - return self.original_name in ['string', 'bytes', 'tagged_fields'] + return self.original_name in ['string', 'bytes', 'records', 'tagged_fields'] def example_value_for_test(self, version): return Primitive.compute(self.original_name, Primitive.KAFKA_TYPE_TO_EXAMPLE_VALUE_FOR_TEST) diff --git a/contrib/kafka/filters/network/source/serialization.cc b/contrib/kafka/filters/network/source/serialization.cc index fc8464f7aac0d..b78085fdbf18e 100644 --- a/contrib/kafka/filters/network/source/serialization.cc +++ b/contrib/kafka/filters/network/source/serialization.cc @@ -205,6 +205,21 @@ uint32_t CompactBytesDeserializer::feed(absl::string_view& data) { false); } +uint32_t NullableCompactBytesDeserializer::feed(absl::string_view& data) { + return feedCompactBytesIntoBuffers(data, length_buf_, length_consumed_, required_, + data_buf_, ready_, NULL_COMPACT_BYTES_LENGTH, + true); +} + +NullableBytes NullableCompactBytesDeserializer::get() const { + const uint32_t original_data_len = length_buf_.get(); + if (NULL_COMPACT_BYTES_LENGTH == original_data_len) { + return absl::nullopt; + } else { + return absl::make_optional(data_buf_); + } +} + } // namespace Kafka } // namespace NetworkFilters } // namespace Extensions diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index 25e35ec58048c..b66b757c26c2a 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -546,6 +546,36 @@ class NullableBytesDeserializer : public Deserializer { bool ready_{false}; }; +/** + * Deserializer of nullable compact bytes value. + * First reads length (UNSIGNED_VARINT32) and then allocates the buffer of given length. + * If length was 0, buffer allocation is omitted and deserializer is immediately ready (returning + * null value). + * + * From Kafka documentation: + * First the length N+1 is given as an UNSIGNED_VARINT. Then N bytes follow. + * A null object is represented with a length of 0. + */ +class NullableCompactBytesDeserializer : public Deserializer { +public: + /** + * Can throw EnvoyException if given bytes length is not valid. + */ + uint32_t feed(absl::string_view& data) override; + + bool ready() const override { return ready_; } + + NullableBytes get() const override; + +private: + VarUInt32Deserializer length_buf_; + bool length_consumed_{false}; + uint32_t required_; + + std::vector data_buf_; + bool ready_{false}; +}; + /** * Deserializer for array of objects of the same type. * @@ -1144,6 +1174,14 @@ template <> inline uint32_t EncodingContext::computeCompactSize(const Bytes& arg return computeCompactSize(static_cast(arg.size()) + 1) + arg.size(); } +/** + * Template overload for nullable compact byte array. + * Kafka NullableCompactBytes' size is var-len encoding of N+1 + N bytes. + */ +template <> inline uint32_t EncodingContext::computeCompactSize(const NullableBytes& arg) const { + return arg ? computeCompactSize(*arg) : 1; +} + /** * Template overload for CompactArray of T. * The size of array is compact size of header and all of its elements. @@ -1399,6 +1437,20 @@ inline uint32_t EncodingContext::encodeCompact(const Bytes& arg, Buffer::Instanc return header_length + data_length; } +/** + * Template overload for NullableBytes. + * Encode byte array as VAR_UINT + N bytes. + */ +template <> +inline uint32_t EncodingContext::encodeCompact(const NullableBytes& arg, Buffer::Instance& dst) { + if (arg.has_value()) { + return encodeCompact(*arg, dst); + } else { + const uint32_t len = 0; + return encodeCompact(len, dst); + } +} + /** * Encode object array of T as VAR_UINT + N elements. * Each element of type T then serializes itself on its own. diff --git a/contrib/kafka/filters/network/test/serialization_test.cc b/contrib/kafka/filters/network/test/serialization_test.cc index e189063e1abd2..b9264cf237e47 100644 --- a/contrib/kafka/filters/network/test/serialization_test.cc +++ b/contrib/kafka/filters/network/test/serialization_test.cc @@ -40,6 +40,7 @@ TEST_EmptyDeserializerShouldNotBeReady(NullableCompactStringDeserializer); TEST_EmptyDeserializerShouldNotBeReady(BytesDeserializer); TEST_EmptyDeserializerShouldNotBeReady(CompactBytesDeserializer); TEST_EmptyDeserializerShouldNotBeReady(NullableBytesDeserializer); +TEST_EmptyDeserializerShouldNotBeReady(NullableCompactBytesDeserializer); TEST_EmptyDeserializerShouldNotBeReady(UuidDeserializer); TEST(ArrayDeserializer, EmptyBufferShouldNotBeReady) { @@ -429,7 +430,24 @@ TEST(NullableBytesDeserializer, ShouldThrowOnInvalidLength) { EXPECT_THROW(testee.feed(data), EnvoyException); } -// Generic array tests. +// Nullable compact byte-array tests. + +TEST(NullableCompactBytesDeserializer, ShouldDeserialize) { + const NullableBytes value{{'a', 'b', 'c', 'd'}}; + serializeCompactThenDeserializeAndCheckEquality(value); +} + +TEST(NullableCompactBytesDeserializer, ShouldDeserializeEmptyBytes) { + const NullableBytes value = {{}}; + serializeCompactThenDeserializeAndCheckEquality(value); +} + +TEST(NullableCompactBytesDeserializer, ShouldDeserializeNullBytes) { + const NullableBytes value = absl::nullopt; + serializeCompactThenDeserializeAndCheckEquality(value); +} + +// Generic-array tests. TEST(ArrayDeserializer, ShouldConsumeCorrectAmountOfData) { const std::vector value{{"aaa", "bbbbb", "cc", "d", "e", "ffffffff"}}; @@ -451,7 +469,7 @@ TEST(ArrayDeserializer, ShouldThrowOnInvalidLength) { EXPECT_THROW(testee.feed(data), EnvoyException); } -// Compact generic array tests. +// Compact generic-array tests. TEST(CompactArrayDeserializer, ShouldConsumeCorrectAmountOfData) { const std::vector value{{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}}; @@ -474,7 +492,7 @@ TEST(CompactArrayDeserializer, ShouldThrowOnInvalidLength) { EXPECT_THROW(testee.feed(data), EnvoyException); } -// Generic nullable array tests. +// Nullable generic-array tests. TEST(NullableArrayDeserializer, ShouldConsumeCorrectAmountOfData) { const NullableArray value{{"aaa", "bbbbb", "cc", "d", "e", "ffffffff"}}; @@ -501,7 +519,7 @@ TEST(NullableArrayDeserializer, ShouldThrowOnInvalidLength) { EXPECT_THROW(testee.feed(data), EnvoyException); } -// Compact nullable generic array tests. +// Nullable compact generic-array tests. TEST(NullableCompactArrayDeserializer, ShouldConsumeCorrectAmountOfData) { const NullableArray value{{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}}; From cce7c2225651250e050249e04a9a5b85e464265c Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Wed, 22 Sep 2021 12:04:33 -0700 Subject: [PATCH 10/11] kafka: upgrade kafka-server dependency Signed-off-by: Adam Kotwasinski --- bazel/repository_locations.bzl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index 8231618fdbb26..8853bbbeac0a2 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -956,11 +956,11 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "Kafka (server binary)", project_desc = "Open-source distributed event streaming platform", project_url = "https://kafka.apache.org", - version = "2.4.1", - sha256 = "2177cbd14118999e1d76fec628ca78ace7e6f841219dbc6035027c796bbe1a2a", - strip_prefix = "kafka_2.12-{version}", - urls = ["https://archive.apache.org/dist/kafka/{version}/kafka_2.12-{version}.tgz"], - release_date = "2020-03-12", + version = "2.8.1", + sha256 = "4888b03e3b27dd94f2d830ce3bae9d7d98b0ccee3a5d30c919ccb60e0fa1f139", + strip_prefix = "kafka_2.13-{version}", + urls = ["https://archive.apache.org/dist/kafka/{version}/kafka_2.13-{version}.tgz"], + release_date = "2021-09-14", use_category = ["test_only"], ), kafka_python_client = dict( From 9b728def3398a3df91c08001693ce0190f0a69bb Mon Sep 17 00:00:00 2001 From: Adam Kotwasinski Date: Wed, 22 Sep 2021 14:59:13 -0700 Subject: [PATCH 11/11] kafka: renames + proper double conversions Signed-off-by: Adam Kotwasinski --- .../filters/network/source/serialization.h | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/contrib/kafka/filters/network/source/serialization.h b/contrib/kafka/filters/network/source/serialization.h index b66b757c26c2a..7f77de45b4687 100644 --- a/contrib/kafka/filters/network/source/serialization.h +++ b/contrib/kafka/filters/network/source/serialization.h @@ -65,7 +65,7 @@ template class Deserializer { * Generic integer deserializer (uses array of sizeof(T) bytes). * After all bytes are filled in, the value is converted from network byte-order and returned. */ -template class IntDeserializer : public Deserializer { +template class FixedSizeDeserializer : public Deserializer { public: uint32_t feed(absl::string_view& data) override { const uint32_t available = std::min(sizeof(buf_) - written_, data.size()); @@ -92,7 +92,7 @@ template class IntDeserializer : public Deserializer { /** * Integer deserializer for int8_t. */ -class Int8Deserializer : public IntDeserializer { +class Int8Deserializer : public FixedSizeDeserializer { public: int8_t get() const override { int8_t result = buf_[0]; @@ -103,7 +103,7 @@ class Int8Deserializer : public IntDeserializer { /** * Integer deserializer for int16_t. */ -class Int16Deserializer : public IntDeserializer { +class Int16Deserializer : public FixedSizeDeserializer { public: int16_t get() const override { int16_t result; @@ -115,7 +115,7 @@ class Int16Deserializer : public IntDeserializer { /** * Integer deserializer for uint16_t. */ -class UInt16Deserializer : public IntDeserializer { +class UInt16Deserializer : public FixedSizeDeserializer { public: uint16_t get() const override { uint16_t result; @@ -127,7 +127,7 @@ class UInt16Deserializer : public IntDeserializer { /** * Integer deserializer for int32_t. */ -class Int32Deserializer : public IntDeserializer { +class Int32Deserializer : public FixedSizeDeserializer { public: int32_t get() const override { int32_t result; @@ -139,7 +139,7 @@ class Int32Deserializer : public IntDeserializer { /** * Integer deserializer for uint32_t. */ -class UInt32Deserializer : public IntDeserializer { +class UInt32Deserializer : public FixedSizeDeserializer { public: uint32_t get() const override { uint32_t result; @@ -151,7 +151,7 @@ class UInt32Deserializer : public IntDeserializer { /** * Integer deserializer for uint64_t. */ -class Int64Deserializer : public IntDeserializer { +class Int64Deserializer : public FixedSizeDeserializer { public: int64_t get() const override { int64_t result; @@ -166,13 +166,19 @@ class Int64Deserializer : public IntDeserializer { * Represents a double-precision 64-bit format IEEE 754 value. The values are encoded using eight * bytes in network byte order (big-endian). */ -class Float64Deserializer : public IntDeserializer { +class Float64Deserializer : public FixedSizeDeserializer { + + static_assert(sizeof(double) == sizeof(uint64_t), "sizeof(double) != sizeof(uint64_t)"); + static_assert(std::numeric_limits::is_iec559, "non-IEC559 (IEEE 754) double"); + public: double get() const override { uint64_t in_network_order; safeMemcpyUnsafeSrc(&in_network_order, buf_); uint64_t in_host_order = be64toh(in_network_order); - return *reinterpret_cast(&in_host_order); + double result; + safeMemcpy(&result, &in_host_order); + return result; } }; @@ -180,8 +186,8 @@ class Float64Deserializer : public IntDeserializer { * Deserializer for boolean values. * Uses a single int8 deserializer, and checks whether the results equals 0. * When reading a boolean value, any non-zero value is considered true. - * Impl note: could have been a subclass of IntDeserializer with a different get function, - * but it makes it harder to understand. + * Impl note: could have been a subclass of FixedSizeDeserializer with a different get + * function, but it makes it harder to understand. */ class BooleanDeserializer : public Deserializer { public: @@ -1244,11 +1250,10 @@ ENCODE_NUMERIC_TYPE(int64_t, htobe64); * Encodes 8 bytes. */ template <> inline uint32_t EncodingContext::encode(const double& arg, Buffer::Instance& dst) { - static_assert(sizeof(double) == sizeof(uint64_t)); - double tmp = arg; - const uint64_t as_long = *reinterpret_cast(&tmp); - const uint64_t in_network_order = htobe64(as_long); + uint64_t in_host_order; + safeMemcpy(&in_host_order, &tmp); + const uint64_t in_network_order = htobe64(in_host_order); dst.add(&in_network_order, sizeof(uint64_t)); return sizeof(uint64_t); }