Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions bazel/external/kafka_int32.patch

This file was deleted.

1 change: 0 additions & 1 deletion bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,6 @@ filegroup(
external_http_archive(
name = "kafka_source",
build_file_content = KAFKASOURCE_BUILD_CONTENT,
patches = ["@envoy//bazel/external:kafka_int32.patch"],
)

# This archive provides Kafka C/CPP client used by mesh filter to communicate with upstream
Expand Down
6 changes: 3 additions & 3 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -930,13 +930,13 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (source)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "2.4.1",
sha256 = "740236f44d66e33ea83382383b4fb7eabdab7093a644b525dd5ec90207f933bd",
version = "2.8.1-rc1",
sha256 = "4f013aadedd0c26c31c67bc8f3984833430316fa990553ded77dfbaa65f43376",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strip_prefix = "kafka-{version}/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/{version}.zip"],
use_category = ["dataplane_ext"],
extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"],
release_date = "2020-03-03",
release_date = "2021-09-14",
cpe = "cpe:2.3:a:apache:kafka:*",
),
edenhill_librdkafka = dict(
Expand Down
14 changes: 14 additions & 0 deletions contrib/kafka/filters/network/source/kafka_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ using NullableBytes = absl::optional<Bytes>;
*/
template <typename T> using NullableArray = absl::optional<std::vector<T>>;

/**
* Analogous to:
* https://github.com/apache/kafka/blob/2.8.0/clients/src/main/java/org/apache/kafka/common/Uuid.java#L28
*/
struct Uuid {

const int64_t msb_;
const int64_t lsb_;

Uuid(const int64_t msb, const int64_t lsb) : msb_{msb}, lsb_{lsb} {};

bool operator==(const Uuid& rhs) const { return msb_ == rhs.msb_ && lsb_ == rhs.lsb_; };
};

} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ AbstractResponseSharedPtr ApiVersionsRequestHolder::computeAnswer() const {
request_header_.correlation_id_};

const int16_t error_code = 0;
const ApiVersionsResponseKey produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED,
MAX_PRODUCE_SUPPORTED};
const ApiVersionsResponseKey metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED,
MAX_METADATA_SUPPORTED};
const ApiVersion produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED,
MAX_PRODUCE_SUPPORTED};
const ApiVersion metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED,
MAX_METADATA_SUPPORTED};
const ApiVersionsResponse real_response = {error_code, {produce_entry, metadata_entry}};

return std::make_shared<Response<ApiVersionsResponse>>(metadata, real_response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ AbstractResponseSharedPtr MetadataRequestHolder::computeAnswer() const {
std::vector<MetadataResponseTopic> response_topics;
if (request_->data_.topics_) {
for (const auto& topic : *(request_->data_.topics_)) {
const std::string& topic_name = topic.name_;
if (!topic.name_) {
// The client sent request without topic name (UUID was sent instead).
// We do not know how to handle it, so do not send any metadata.
// This will cause failures in clients downstream.
continue;
}
const std::string& topic_name = *(topic.name_);
std::vector<MetadataResponsePartition> topic_partitions;
const absl::optional<ClusterConfig> cluster_config =
configuration_.computeClusterConfigForTopic(topic_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter,
const RecordExtractor& record_extractor,
const std::shared_ptr<Request<ProduceRequest>> request)
: BaseInFlightRequest{filter}, kafka_facade_{kafka_facade}, request_{request} {
outbound_records_ = record_extractor.extractRecords(request_->data_.topics_);
outbound_records_ = record_extractor.extractRecords(request_->data_.topic_data_);
expected_responses_ = outbound_records_.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ std::vector<OutboundRecord>
RecordExtractorImpl::extractRecords(const std::vector<TopicProduceData>& data) const {
std::vector<OutboundRecord> result;
for (const auto& topic_data : data) {
for (const auto& partition_data : topic_data.partitions_) {
for (const auto& partition_data : topic_data.partition_data_) {
// Kafka protocol allows nullable data.
if (partition_data.records_) {
const auto topic_result = extractPartitionRecords(
topic_data.name_, partition_data.partition_index_, *(partition_data.records_));
const auto topic_result = extractPartitionRecords(topic_data.name_, partition_data.index_,
*(partition_data.records_));
std::copy(topic_result.begin(), topic_result.end(), std::back_inserter(result));
}
}
Expand Down
49 changes: 38 additions & 11 deletions contrib/kafka/filters/network/source/protocol/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def generate_main_code(type, main_header_file, resolver_cc_file, metrics_header_
- resolver_cc_file - contains request api key & version mapping to deserializer (from header file)
- metrics_header_file - contains metrics with names corresponding to messages
"""
processor = StatefulProcessor()
processor = StatefulProcessor(type)
# Parse provided input files.
messages = processor.parse_messages(input_files)

Expand Down Expand Up @@ -66,7 +66,7 @@ def generate_test_code(
- codec_test_cc_file - tests involving codec and Request/ResponseParserResolver,
- utilities_cc_file - utilities for creating sample messages.
"""
processor = StatefulProcessor()
processor = StatefulProcessor(type)
# Parse provided input files.
messages = processor.parse_messages(input_files)

Expand Down Expand Up @@ -97,7 +97,8 @@ class StatefulProcessor:
AlterConfigsResource, what would cause a compile-time error if we were to handle it trivially).
"""

def __init__(self):
def __init__(self, type):
self.type = type
# Complex types that have been encountered during processing.
self.known_types = set()
# Name of parent message type that's being processed right now.
Expand All @@ -107,8 +108,8 @@ def __init__(self):

def parse_messages(self, input_files):
"""
Parse request/response structures from provided input files.
"""
Parse request/response structures from provided input files.
"""
import re
import json

Expand All @@ -123,9 +124,18 @@ def parse_messages(self, input_files):
without_comments = re.sub(r'\s*//.*\n', '\n', raw_contents)
without_empty_newlines = re.sub(
r'^\s*$', '', without_comments, flags=re.MULTILINE)
message_spec = json.loads(without_empty_newlines)
message = self.parse_top_level_element(message_spec)
messages.append(message)
# Windows support: see PR 10542 for details.
amended = re.sub(r'-2147483648', 'INT32_MIN', without_empty_newlines)
# Kafka JSON files are malformed. See KAFKA-12794.
if input_file == 'external/kafka_source/DescribeProducersRequest.json':
amended = amended[:-6]
message_spec = json.loads(amended)
# Adopt publicly available messages only:
# https://kafka.apache.org/28/protocol.html#protocol_api_keys
api_key = message_spec['apiKey']
if api_key <= 51 or api_key in [56, 57, 60, 61]:
message = self.parse_top_level_element(message_spec)
messages.append(message)
except Exception as e:
print('could not process %s' % input_file)
raise
Expand Down Expand Up @@ -195,7 +205,9 @@ def parse_complex_type(self, type_name, field_spec, versions):
child = self.parse_field(child_field, versions[-1])
if child is not None:
fields.append(child)

# Some structures share the same name, use request/response as prefix.
if type_name in ['EntityData', 'EntryData', 'PartitionData', 'TopicData']:
type_name = self.type.capitalize() + type_name
# Some of the types repeat multiple times (e.g. AlterableConfig).
# In such a case, every second or later occurrence of the same name is going to be prefixed
# with parent type, e.g. we have AlterableConfig (for AlterConfigsRequest) and then
Expand Down Expand Up @@ -379,7 +391,7 @@ def default_value(self):
return str(self.type.default_value())

def example_value_for_test(self, version):
if self.is_nullable():
if self.is_nullable_in_version(version):
return 'absl::make_optional<%s>(%s)' % (
self.type.name, self.type.example_value_for_test(version))
else:
Expand Down Expand Up @@ -470,7 +482,7 @@ class Primitive(TypeSpecification):
Represents a Kafka primitive value.
"""

USABLE_PRIMITIVE_TYPE_NAMES = ['bool', 'int8', 'int16', 'int32', 'int64', 'string', 'bytes']
USABLE_PRIMITIVE_TYPE_NAMES = ['bool', 'int8', 'int16', 'int32', 'int64', 'uint16', 'float64', 'string', 'bytes', 'records', 'uuid']

KAFKA_TYPE_TO_ENVOY_TYPE = {
'string': 'std::string',
Expand All @@ -479,7 +491,11 @@ class Primitive(TypeSpecification):
'int16': 'int16_t',
'int32': 'int32_t',
'int64': 'int64_t',
'uint16': 'uint16_t',
'float64': 'double',
'bytes': 'Bytes',
'records': 'Bytes',
'uuid': 'Uuid',
'tagged_fields': 'TaggedFields',
}

Expand All @@ -490,7 +506,11 @@ class Primitive(TypeSpecification):
'int16': 'Int16Deserializer',
'int32': 'Int32Deserializer',
'int64': 'Int64Deserializer',
'uint16': 'UInt16Deserializer',
'float64': 'Float64Deserializer',
'bytes': 'BytesDeserializer',
'records': 'BytesDeserializer',
'uuid': 'UuidDeserializer',
'tagged_fields': 'TaggedFieldsDeserializer',
}

Expand All @@ -508,6 +528,7 @@ class Primitive(TypeSpecification):
'int32': '0',
'int64': '0',
'bytes': '{}',
'uuid': 'Uuid{0, 0}',
'tagged_fields': 'TaggedFields({})',
}

Expand All @@ -525,8 +546,14 @@ class Primitive(TypeSpecification):
'static_cast<int32_t>(32)',
'int64':
'static_cast<int64_t>(64)',
'float64':
'static_cast<double>(13.125)',
'bytes':
'Bytes({0, 1, 2, 3})',
'records':
'Bytes({0, 1, 2, 3})',
'uuid':
'Uuid{13, 42}',
'tagged_fields':
'TaggedFields{std::vector<TaggedField>{{10, Bytes({1, 2, 3})}, {20, Bytes({4, 5, 6})}}}',
}
Expand Down
Loading