Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions bazel/external/kafka_int32.patch

This file was deleted.

1 change: 0 additions & 1 deletion bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,6 @@ filegroup(
external_http_archive(
name = "kafka_source",
build_file_content = KAFKASOURCE_BUILD_CONTENT,
patches = ["@envoy//bazel/external:kafka_int32.patch"],
)

# This archive provides Kafka C/CPP client used by mesh filter to communicate with upstream
Expand Down
16 changes: 8 additions & 8 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -930,13 +930,13 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (source)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "2.4.1",
sha256 = "740236f44d66e33ea83382383b4fb7eabdab7093a644b525dd5ec90207f933bd",
version = "2.8.1",
sha256 = "c3fd89257e056e11b5e1b09d4bbd8332ce5abfdfa7c7a5bb6a5cfe9860fcc688",
strip_prefix = "kafka-{version}/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/{version}.zip"],
use_category = ["dataplane_ext"],
extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"],
release_date = "2020-03-03",
release_date = "2021-09-14",
cpe = "cpe:2.3:a:apache:kafka:*",
),
edenhill_librdkafka = dict(
Expand All @@ -956,11 +956,11 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (server binary)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "2.4.1",
sha256 = "2177cbd14118999e1d76fec628ca78ace7e6f841219dbc6035027c796bbe1a2a",
strip_prefix = "kafka_2.12-{version}",
urls = ["https://archive.apache.org/dist/kafka/{version}/kafka_2.12-{version}.tgz"],
release_date = "2020-03-12",
version = "2.8.1",
sha256 = "4888b03e3b27dd94f2d830ce3bae9d7d98b0ccee3a5d30c919ccb60e0fa1f139",
strip_prefix = "kafka_2.13-{version}",
urls = ["https://archive.apache.org/dist/kafka/{version}/kafka_2.13-{version}.tgz"],
release_date = "2021-09-14",
use_category = ["test_only"],
),
kafka_python_client = dict(
Expand Down
2 changes: 1 addition & 1 deletion contrib/kafka/filters/network/source/kafka_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Kafka {
* Decides if response with given api key & version should have tagged fields in header.
* Bear in mind, that ApiVersions responses DO NOT contain tagged fields in header (despite having
* flexible versions) as per
* https://github.com/apache/kafka/blob/2.4.0/clients/src/main/resources/common/message/ApiVersionsResponse.json#L24
* https://github.com/apache/kafka/blob/2.8.1/clients/src/main/resources/common/message/ApiVersionsResponse.json#L24
* This method gets implemented in generated code through 'kafka_response_resolver_cc.j2'.
*
* @param api_key Kafka request key.
Expand Down
14 changes: 14 additions & 0 deletions contrib/kafka/filters/network/source/kafka_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ using NullableBytes = absl::optional<Bytes>;
*/
template <typename T> using NullableArray = absl::optional<std::vector<T>>;

/**
* Analogous to:
* https://github.com/apache/kafka/blob/2.8.1/clients/src/main/java/org/apache/kafka/common/Uuid.java#L28
*/
struct Uuid {

const int64_t msb_;
const int64_t lsb_;

Uuid(const int64_t msb, const int64_t lsb) : msb_{msb}, lsb_{lsb} {};

bool operator==(const Uuid& rhs) const { return msb_ == rhs.msb_ && lsb_ == rhs.lsb_; };
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ AbstractResponseSharedPtr ApiVersionsRequestHolder::computeAnswer() const {
request_header_.correlation_id_};

const int16_t error_code = 0;
const ApiVersionsResponseKey produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED,
MAX_PRODUCE_SUPPORTED};
const ApiVersionsResponseKey metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED,
MAX_METADATA_SUPPORTED};
const ApiVersion produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED,
MAX_PRODUCE_SUPPORTED};
const ApiVersion metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED,
MAX_METADATA_SUPPORTED};
const ApiVersionsResponse real_response = {error_code, {produce_entry, metadata_entry}};

return std::make_shared<Response<ApiVersionsResponse>>(metadata, real_response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ AbstractResponseSharedPtr MetadataRequestHolder::computeAnswer() const {
advertised_address.second};
std::vector<MetadataResponseTopic> response_topics;
if (request_->data_.topics_) {
for (const auto& topic : *(request_->data_.topics_)) {
const std::string& topic_name = topic.name_;
for (const MetadataRequestTopic& topic : *(request_->data_.topics_)) {
if (!topic.name_) {
// The client sent request without topic name (UUID was sent instead).
// We do not know how to handle it, so do not send any metadata.
// This will cause failures in clients downstream.
continue;
}
const std::string& topic_name = *(topic.name_);
std::vector<MetadataResponsePartition> topic_partitions;
const absl::optional<ClusterConfig> cluster_config =
configuration_.computeClusterConfigForTopic(topic_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter,
const RecordExtractor& record_extractor,
const std::shared_ptr<Request<ProduceRequest>> request)
: BaseInFlightRequest{filter}, kafka_facade_{kafka_facade}, request_{request} {
outbound_records_ = record_extractor.extractRecords(request_->data_.topics_);
outbound_records_ = record_extractor.extractRecords(request_->data_.topic_data_);
expected_responses_ = outbound_records_.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ std::vector<OutboundRecord>
RecordExtractorImpl::extractRecords(const std::vector<TopicProduceData>& data) const {
std::vector<OutboundRecord> result;
for (const auto& topic_data : data) {
for (const auto& partition_data : topic_data.partitions_) {
for (const auto& partition_data : topic_data.partition_data_) {
// Kafka protocol allows nullable data.
if (partition_data.records_) {
const auto topic_result = extractPartitionRecords(
topic_data.name_, partition_data.partition_index_, *(partition_data.records_));
const auto topic_result = extractPartitionRecords(topic_data.name_, partition_data.index_,
*(partition_data.records_));
std::copy(topic_result.begin(), topic_result.end(), std::back_inserter(result));
}
}
Expand Down
57 changes: 44 additions & 13 deletions contrib/kafka/filters/network/source/protocol/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def generate_main_code(type, main_header_file, resolver_cc_file, metrics_header_
- resolver_cc_file - contains request api key & version mapping to deserializer (from header file)
- metrics_header_file - contains metrics with names corresponding to messages
"""
processor = StatefulProcessor()
processor = StatefulProcessor(type)
# Parse provided input files.
messages = processor.parse_messages(input_files)

Expand Down Expand Up @@ -66,7 +66,7 @@ def generate_test_code(
- codec_test_cc_file - tests involving codec and Request/ResponseParserResolver,
- utilities_cc_file - utilities for creating sample messages.
"""
processor = StatefulProcessor()
processor = StatefulProcessor(type)
# Parse provided input files.
messages = processor.parse_messages(input_files)

Expand Down Expand Up @@ -97,7 +97,8 @@ class StatefulProcessor:
AlterConfigsResource, what would cause a compile-time error if we were to handle it trivially).
"""

def __init__(self):
def __init__(self, type):
self.type = type
# Complex types that have been encountered during processing.
self.known_types = set()
# Name of parent message type that's being processed right now.
Expand All @@ -107,8 +108,8 @@ def __init__(self):

def parse_messages(self, input_files):
"""
Parse request/response structures from provided input files.
"""
Parse request/response structures from provided input files.
"""
import re
import json

Expand All @@ -123,9 +124,18 @@ def parse_messages(self, input_files):
without_comments = re.sub(r'\s*//.*\n', '\n', raw_contents)
without_empty_newlines = re.sub(
r'^\s*$', '', without_comments, flags=re.MULTILINE)
message_spec = json.loads(without_empty_newlines)
message = self.parse_top_level_element(message_spec)
messages.append(message)
# Windows support: see PR 10542 for details.
amended = re.sub(r'-2147483648', 'INT32_MIN', without_empty_newlines)
# Kafka JSON files are malformed. See KAFKA-12794.
if input_file == 'external/kafka_source/DescribeProducersRequest.json':
amended = amended[:-6]
message_spec = json.loads(amended)
# Adopt publicly available messages only:
# https://kafka.apache.org/28/protocol.html#protocol_api_keys
api_key = message_spec['apiKey']
if api_key <= 51 or api_key in [56, 57, 60, 61]:
message = self.parse_top_level_element(message_spec)
messages.append(message)
except Exception as e:
print('could not process %s' % input_file)
raise
Expand Down Expand Up @@ -195,7 +205,9 @@ def parse_complex_type(self, type_name, field_spec, versions):
child = self.parse_field(child_field, versions[-1])
if child is not None:
fields.append(child)

# Some structures share the same name, use request/response as prefix.
if type_name in ['EntityData', 'EntryData', 'PartitionData', 'TopicData']:
type_name = self.type.capitalize() + type_name
# Some of the types repeat multiple times (e.g. AlterableConfig).
# In such a case, every second or later occurrence of the same name is going to be prefixed
# with parent type, e.g. we have AlterableConfig (for AlterConfigsRequest) and then
Expand Down Expand Up @@ -379,7 +391,7 @@ def default_value(self):
return str(self.type.default_value())

def example_value_for_test(self, version):
if self.is_nullable():
if self.is_nullable_in_version(version):
return 'absl::make_optional<%s>(%s)' % (
self.type.name, self.type.example_value_for_test(version))
else:
Expand Down Expand Up @@ -470,7 +482,10 @@ class Primitive(TypeSpecification):
Represents a Kafka primitive value.
"""

USABLE_PRIMITIVE_TYPE_NAMES = ['bool', 'int8', 'int16', 'int32', 'int64', 'string', 'bytes']
USABLE_PRIMITIVE_TYPE_NAMES = [
'bool', 'int8', 'int16', 'int32', 'int64', 'uint16', 'float64', 'string', 'bytes',
'records', 'uuid'
]

KAFKA_TYPE_TO_ENVOY_TYPE = {
'string': 'std::string',
Expand All @@ -479,7 +494,11 @@ class Primitive(TypeSpecification):
'int16': 'int16_t',
'int32': 'int32_t',
'int64': 'int64_t',
'uint16': 'uint16_t',
'float64': 'double',
'bytes': 'Bytes',
'records': 'Bytes',
'uuid': 'Uuid',
'tagged_fields': 'TaggedFields',
}

Expand All @@ -490,13 +509,18 @@ class Primitive(TypeSpecification):
'int16': 'Int16Deserializer',
'int32': 'Int32Deserializer',
'int64': 'Int64Deserializer',
'uint16': 'UInt16Deserializer',
'float64': 'Float64Deserializer',
'bytes': 'BytesDeserializer',
'records': 'BytesDeserializer',
'uuid': 'UuidDeserializer',
'tagged_fields': 'TaggedFieldsDeserializer',
}

KAFKA_TYPE_TO_COMPACT_DESERIALIZER = {
'string': 'CompactStringDeserializer',
'bytes': 'CompactBytesDeserializer'
'bytes': 'CompactBytesDeserializer',
'records': 'CompactBytesDeserializer'
}

# See https://github.com/apache/kafka/tree/trunk/clients/src/main/resources/common/message#deserializing-messages
Expand All @@ -508,6 +532,7 @@ class Primitive(TypeSpecification):
'int32': '0',
'int64': '0',
'bytes': '{}',
'uuid': 'Uuid{0, 0}',
'tagged_fields': 'TaggedFields({})',
}

Expand All @@ -525,8 +550,14 @@ class Primitive(TypeSpecification):
'static_cast<int32_t>(32)',
'int64':
'static_cast<int64_t>(64)',
'float64':
'static_cast<double>(13.125)',
'bytes':
'Bytes({0, 1, 2, 3})',
'records':
'Bytes({0, 1, 2, 3})',
'uuid':
'Uuid{13, 42}',
'tagged_fields':
'TaggedFields{std::vector<TaggedField>{{10, Bytes({1, 2, 3})}, {20, Bytes({4, 5, 6})}}}',
}
Expand Down Expand Up @@ -561,7 +592,7 @@ def default_value(self):
return Primitive.compute(self.original_name, Primitive.KAFKA_TYPE_TO_DEFAULT_VALUE)

def has_flexible_handling(self):
return self.original_name in ['string', 'bytes', 'tagged_fields']
return self.original_name in ['string', 'bytes', 'records', 'tagged_fields']

def example_value_for_test(self, version):
return Primitive.compute(self.original_name, Primitive.KAFKA_TYPE_TO_EXAMPLE_VALUE_FOR_TEST)
Expand Down
15 changes: 15 additions & 0 deletions contrib/kafka/filters/network/source/serialization.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,21 @@ uint32_t CompactBytesDeserializer::feed(absl::string_view& data) {
false);
}

uint32_t NullableCompactBytesDeserializer::feed(absl::string_view& data) {
return feedCompactBytesIntoBuffers<unsigned char>(data, length_buf_, length_consumed_, required_,
data_buf_, ready_, NULL_COMPACT_BYTES_LENGTH,
true);
}

NullableBytes NullableCompactBytesDeserializer::get() const {
const uint32_t original_data_len = length_buf_.get();
if (NULL_COMPACT_BYTES_LENGTH == original_data_len) {
return absl::nullopt;
} else {
return absl::make_optional(data_buf_);
}
}

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
Loading