diff --git a/source/extensions/filters/network/kafka/kafka_request.h b/source/extensions/filters/network/kafka/kafka_request.h index 3c9cc414083e0..b828d76b97008 100644 --- a/source/extensions/filters/network/kafka/kafka_request.h +++ b/source/extensions/filters/network/kafka/kafka_request.h @@ -163,7 +163,6 @@ template class Request : public AbstractRequest { return request_header_ == rhs.request_header_ && data_ == rhs.data_; }; -private: const Data data_; }; diff --git a/source/extensions/filters/network/kafka/kafka_response.h b/source/extensions/filters/network/kafka/kafka_response.h index 805d46defb698..4a2a9d9ff8c19 100644 --- a/source/extensions/filters/network/kafka/kafka_response.h +++ b/source/extensions/filters/network/kafka/kafka_response.h @@ -141,7 +141,6 @@ template class Response : public AbstractResponse { return metadata_ == rhs.metadata_ && data_ == rhs.data_; }; -private: const Data data_; }; diff --git a/source/extensions/filters/network/kafka/mesh/BUILD b/source/extensions/filters/network/kafka/mesh/BUILD index b4a2581e686ff..79d1a366b2f27 100644 --- a/source/extensions/filters/network/kafka/mesh/BUILD +++ b/source/extensions/filters/network/kafka/mesh/BUILD @@ -20,6 +20,7 @@ envoy_cc_library( deps = [ ":abstract_command_lib", ":request_processor_lib", + ":upstream_config_lib", "//envoy/buffer:buffer_interface", "//envoy/network:connection_interface", "//envoy/network:filter_interface", @@ -41,10 +42,12 @@ envoy_cc_library( tags = ["skip_on_windows"], deps = [ ":abstract_command_lib", + ":upstream_config_lib", "//source/common/common:minimal_logger_lib", "//source/extensions/filters/network/kafka:kafka_request_codec_lib", "//source/extensions/filters/network/kafka:kafka_request_parser_lib", "//source/extensions/filters/network/kafka/mesh/command_handlers:api_versions_lib", + "//source/extensions/filters/network/kafka/mesh/command_handlers:metadata_lib", ], ) @@ -63,3 +66,15 @@ envoy_cc_library( "//source/extensions/filters/network/kafka:tagged_fields_lib", ], ) + +envoy_cc_library( + name = "upstream_config_lib", + srcs = [ + ], + hdrs = [ + "upstream_config.h", + ], + tags = ["skip_on_windows"], + deps = [ + ], +) diff --git a/source/extensions/filters/network/kafka/mesh/command_handlers/BUILD b/source/extensions/filters/network/kafka/mesh/command_handlers/BUILD index b04e546434889..abe69acab5c7b 100644 --- a/source/extensions/filters/network/kafka/mesh/command_handlers/BUILD +++ b/source/extensions/filters/network/kafka/mesh/command_handlers/BUILD @@ -10,6 +10,24 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() +envoy_cc_library( + name = "metadata_lib", + srcs = [ + "metadata.cc", + ], + hdrs = [ + "metadata.h", + ], + tags = ["skip_on_windows"], + deps = [ + "//source/common/common:minimal_logger_lib", + "//source/extensions/filters/network/kafka:kafka_request_parser_lib", + "//source/extensions/filters/network/kafka:kafka_response_parser_lib", + "//source/extensions/filters/network/kafka/mesh:abstract_command_lib", + "//source/extensions/filters/network/kafka/mesh:upstream_config_lib", + ], +) + envoy_cc_library( name = "api_versions_lib", srcs = [ diff --git a/source/extensions/filters/network/kafka/mesh/command_handlers/metadata.cc b/source/extensions/filters/network/kafka/mesh/command_handlers/metadata.cc new file mode 100644 index 0000000000000..dca474b823797 --- /dev/null +++ b/source/extensions/filters/network/kafka/mesh/command_handlers/metadata.cc @@ -0,0 +1,66 @@ +#include "source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h" + +#include "source/extensions/filters/network/kafka/external/responses.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +MetadataRequestHolder::MetadataRequestHolder( + AbstractRequestListener& filter, const UpstreamKafkaConfiguration& configuration, + const std::shared_ptr> request) + : BaseInFlightRequest{filter}, configuration_{configuration}, request_{request} {} + +// Metadata requests are immediately ready for answer (as they do not need to reach upstream). +void MetadataRequestHolder::startProcessing() { notifyFilter(); } + +bool MetadataRequestHolder::finished() const { return true; } + +constexpr int32_t ENVOY_BROKER_ID = 0; +constexpr int32_t NO_ERROR = 0; + +// Cornerstone of how the mesh-filter actually works. +// We pretend to be one-node Kafka cluster, with Envoy instance being the only member. +// What means all the Kafka future traffic will go through this instance. +AbstractResponseSharedPtr MetadataRequestHolder::computeAnswer() const { + const auto& header = request_->request_header_; + const ResponseMetadata metadata = {header.api_key_, header.api_version_, header.correlation_id_}; + + const auto advertised_address = configuration_.getAdvertisedAddress(); + MetadataResponseBroker broker = {ENVOY_BROKER_ID, advertised_address.first, + advertised_address.second}; + std::vector response_topics; + if (request_->data_.topics_) { + for (const auto& topic : *(request_->data_.topics_)) { + const std::string& topic_name = topic.name_; + std::vector topic_partitions; + const absl::optional cluster_config = + configuration_.computeClusterConfigForTopic(topic_name); + if (!cluster_config) { + // Someone is requesting topics that are not known to our configuration. + // So we do not attach any metadata, this will cause clients failures downstream as they + // will never be able to get metadata for these topics. + continue; + } + for (int32_t partition_id = 0; partition_id < cluster_config->partition_count_; + ++partition_id) { + // Every partition is hosted by this proxy-broker. + MetadataResponsePartition partition = { + NO_ERROR, partition_id, broker.node_id_, {broker.node_id_}, {broker.node_id_}}; + topic_partitions.push_back(partition); + } + MetadataResponseTopic response_topic = {NO_ERROR, topic_name, false, topic_partitions}; + response_topics.push_back(response_topic); + } + } + MetadataResponse data = {{broker}, broker.node_id_, response_topics}; + return std::make_shared>(metadata, data); +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h b/source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h new file mode 100644 index 0000000000000..c98f7b06777c2 --- /dev/null +++ b/source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h @@ -0,0 +1,37 @@ +#pragma once + +#include "source/extensions/filters/network/kafka/external/requests.h" +#include "source/extensions/filters/network/kafka/mesh/abstract_command.h" +#include "source/extensions/filters/network/kafka/mesh/upstream_config.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +class MetadataRequestHolder : public BaseInFlightRequest { +public: + MetadataRequestHolder(AbstractRequestListener& filter, + const UpstreamKafkaConfiguration& configuration, + const std::shared_ptr> request); + + void startProcessing() override; + + bool finished() const override; + + AbstractResponseSharedPtr computeAnswer() const override; + +private: + // Configuration used to provide data for response. + const UpstreamKafkaConfiguration& configuration_; + + // Original request. + const std::shared_ptr> request_; +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/kafka/mesh/filter.cc b/source/extensions/filters/network/kafka/mesh/filter.cc index f250046c849f7..02b86b59f9d06 100644 --- a/source/extensions/filters/network/kafka/mesh/filter.cc +++ b/source/extensions/filters/network/kafka/mesh/filter.cc @@ -13,6 +13,10 @@ namespace NetworkFilters { namespace Kafka { namespace Mesh { +KafkaMeshFilter::KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration) + : KafkaMeshFilter{std::make_shared(std::vector( + {std::make_shared(*this, configuration)}))} {} + KafkaMeshFilter::KafkaMeshFilter(RequestDecoderSharedPtr request_decoder) : request_decoder_{request_decoder} {} diff --git a/source/extensions/filters/network/kafka/mesh/filter.h b/source/extensions/filters/network/kafka/mesh/filter.h index 3fde92e6ca273..6f6d7effbb71b 100644 --- a/source/extensions/filters/network/kafka/mesh/filter.h +++ b/source/extensions/filters/network/kafka/mesh/filter.h @@ -9,6 +9,7 @@ #include "source/extensions/filters/network/kafka/external/requests.h" #include "source/extensions/filters/network/kafka/mesh/abstract_command.h" #include "source/extensions/filters/network/kafka/mesh/request_processor.h" +#include "source/extensions/filters/network/kafka/mesh/upstream_config.h" #include "source/extensions/filters/network/kafka/request_codec.h" namespace Envoy { @@ -38,6 +39,9 @@ class KafkaMeshFilter : public Network::ReadFilter, public AbstractRequestListener, private Logger::Loggable { public: + // Proper constructor. + KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration); + // Visible for testing. KafkaMeshFilter(RequestDecoderSharedPtr request_decoder); diff --git a/source/extensions/filters/network/kafka/mesh/request_processor.cc b/source/extensions/filters/network/kafka/mesh/request_processor.cc index 0613e2bc38ef0..7c787f3528878 100644 --- a/source/extensions/filters/network/kafka/mesh/request_processor.cc +++ b/source/extensions/filters/network/kafka/mesh/request_processor.cc @@ -3,6 +3,7 @@ #include "envoy/common/exception.h" #include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h" +#include "source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h" namespace Envoy { namespace Extensions { @@ -10,7 +11,9 @@ namespace NetworkFilters { namespace Kafka { namespace Mesh { -RequestProcessor::RequestProcessor(AbstractRequestListener& origin) : origin_{origin} {} +RequestProcessor::RequestProcessor(AbstractRequestListener& origin, + const UpstreamKafkaConfiguration& configuration) + : origin_{origin}, configuration_{configuration} {} // Helper function. Throws a nice message. Filter will react by closing the connection. static void throwOnUnsupportedRequest(const std::string& reason, const RequestHeader& header) { @@ -20,6 +23,9 @@ static void throwOnUnsupportedRequest(const std::string& reason, const RequestHe void RequestProcessor::onMessage(AbstractRequestSharedPtr arg) { switch (arg->request_header_.api_key_) { + case METADATA_REQUEST_API_KEY: + process(std::dynamic_pointer_cast>(arg)); + break; case API_VERSIONS_REQUEST_API_KEY: process(std::dynamic_pointer_cast>(arg)); break; @@ -30,6 +36,11 @@ void RequestProcessor::onMessage(AbstractRequestSharedPtr arg) { } // switch } +void RequestProcessor::process(const std::shared_ptr> request) const { + auto res = std::make_shared(origin_, configuration_, request); + origin_.onRequest(res); +} + void RequestProcessor::process(const std::shared_ptr> request) const { auto res = std::make_shared(origin_, request->request_header_); origin_.onRequest(res); diff --git a/source/extensions/filters/network/kafka/mesh/request_processor.h b/source/extensions/filters/network/kafka/mesh/request_processor.h index 9b4320fd3e329..3b3e2271b096f 100644 --- a/source/extensions/filters/network/kafka/mesh/request_processor.h +++ b/source/extensions/filters/network/kafka/mesh/request_processor.h @@ -3,6 +3,7 @@ #include "source/common/common/logger.h" #include "source/extensions/filters/network/kafka/external/requests.h" #include "source/extensions/filters/network/kafka/mesh/abstract_command.h" +#include "source/extensions/filters/network/kafka/mesh/upstream_config.h" #include "source/extensions/filters/network/kafka/request_codec.h" namespace Envoy { @@ -16,16 +17,19 @@ namespace Mesh { */ class RequestProcessor : public RequestCallback, private Logger::Loggable { public: - RequestProcessor(AbstractRequestListener& origin); + RequestProcessor(AbstractRequestListener& origin, + const UpstreamKafkaConfiguration& configuration); // RequestCallback void onMessage(AbstractRequestSharedPtr arg) override; void onFailedParse(RequestParseFailureSharedPtr) override; private: + void process(const std::shared_ptr> request) const; void process(const std::shared_ptr> request) const; AbstractRequestListener& origin_; + const UpstreamKafkaConfiguration& configuration_; }; } // namespace Mesh diff --git a/source/extensions/filters/network/kafka/mesh/upstream_config.h b/source/extensions/filters/network/kafka/mesh/upstream_config.h new file mode 100644 index 0000000000000..00e3e7faf32da --- /dev/null +++ b/source/extensions/filters/network/kafka/mesh/upstream_config.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/common/pure.h" + +#include "absl/types/optional.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +// Minor helper structure that contains information about upstream Kafka clusters. +struct ClusterConfig { + + // Cluster name, as it appears in configuration input. + std::string name_; + + // How many partitions do we expect for every one of the topics present in given upstream cluster. + // Impl note: this could be replaced with creating (shared?) AdminClient and having it reach out + // upstream to get configuration (or we could just send a correct request via codec). The response + // would need to be cached (as this data is frequently requested). + int32_t partition_count_; + + // The configuration that will be passed to upstream client for given cluster. + // This allows us to reference different clusters with different configs (e.g. linger.ms). + // This map always contains entry with key 'bootstrap.servers', as this is the only mandatory + // producer property. + std::map upstream_producer_properties_; +}; + +/** + * Keeps the configuration related to upstream Kafka clusters. + * Impl note: current matching from topic to cluster is based on prefix matching but more complex + * rules could be added. + */ +class UpstreamKafkaConfiguration { +public: + virtual ~UpstreamKafkaConfiguration() = default; + virtual absl::optional + computeClusterConfigForTopic(const std::string& topic) const PURE; + virtual std::pair getAdvertisedAddress() const PURE; +}; + +using UpstreamKafkaConfigurationSharedPtr = std::shared_ptr; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/network/kafka/mesh/command_handlers/BUILD b/test/extensions/filters/network/kafka/mesh/command_handlers/BUILD index 053ad707c2626..ec4c387eb04b2 100644 --- a/test/extensions/filters/network/kafka/mesh/command_handlers/BUILD +++ b/test/extensions/filters/network/kafka/mesh/command_handlers/BUILD @@ -11,6 +11,19 @@ licenses(["notice"]) # Apache 2 envoy_package() +envoy_extension_cc_test( + name = "metadata_unit_test", + srcs = ["metadata_unit_test.cc"], + # This name needs to be changed after we have the mesh filter ready. + extension_names = ["envoy.filters.network.kafka_broker"], + tags = ["skip_on_windows"], + deps = [ + "//source/extensions/filters/network/kafka/mesh/command_handlers:metadata_lib", + "//test/mocks/network:network_mocks", + "//test/mocks/stats:stats_mocks", + ], +) + envoy_extension_cc_test( name = "api_versions_unit_test", srcs = ["api_versions_unit_test.cc"], diff --git a/test/extensions/filters/network/kafka/mesh/command_handlers/metadata_unit_test.cc b/test/extensions/filters/network/kafka/mesh/command_handlers/metadata_unit_test.cc new file mode 100644 index 0000000000000..a60d566f0b080 --- /dev/null +++ b/test/extensions/filters/network/kafka/mesh/command_handlers/metadata_unit_test.cc @@ -0,0 +1,73 @@ +#include "source/extensions/filters/network/kafka/external/responses.h" +#include "source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::Return; + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +class MockAbstractRequestListener : public AbstractRequestListener { +public: + MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); + MOCK_METHOD(void, onRequestReadyForAnswer, ()); +}; + +class MockUpstreamKafkaConfiguration : public UpstreamKafkaConfiguration { +public: + MOCK_METHOD(absl::optional, computeClusterConfigForTopic, (const std::string&), + (const)); + MOCK_METHOD((std::pair), getAdvertisedAddress, (), (const)); +}; + +TEST(MetadataTest, shouldBeAlwaysReadyForAnswer) { + // given + MockAbstractRequestListener filter; + EXPECT_CALL(filter, onRequestReadyForAnswer()); + MockUpstreamKafkaConfiguration configuration; + const std::pair advertised_address = {"host", 1234}; + EXPECT_CALL(configuration, getAdvertisedAddress()).WillOnce(Return(advertised_address)); + // First topic is going to have configuration present (42 partitions for each topic). + const ClusterConfig topic1config = {"", 42, {}}; + EXPECT_CALL(configuration, computeClusterConfigForTopic("topic1")) + .WillOnce(Return(absl::make_optional(topic1config))); + // Second topic is not going to have configuration present. + EXPECT_CALL(configuration, computeClusterConfigForTopic("topic2")) + .WillOnce(Return(absl::nullopt)); + const RequestHeader header = {0, 0, 0, absl::nullopt}; + const MetadataRequest data = {{MetadataRequestTopic{"topic1"}, MetadataRequestTopic{"topic2"}}}; + const auto message = std::make_shared>(header, data); + MetadataRequestHolder testee = {filter, configuration, message}; + + // when, then - invoking should immediately notify the filter. + testee.startProcessing(); + + // when, then - should always be considered finished. + const bool finished = testee.finished(); + EXPECT_TRUE(finished); + + // when, then - the computed result is always contains correct data (confirmed by integration + // tests). + const auto answer = testee.computeAnswer(); + EXPECT_EQ(answer->metadata_.api_key_, header.api_key_); + EXPECT_EQ(answer->metadata_.correlation_id_, header.correlation_id_); + + const auto response = std::dynamic_pointer_cast>(answer); + ASSERT_TRUE(response); + const auto topics = response->data_.topics_; + EXPECT_EQ(topics.size(), 1); + EXPECT_EQ(topics[0].partitions_.size(), 42); +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/network/kafka/mesh/filter_unit_test.cc b/test/extensions/filters/network/kafka/mesh/filter_unit_test.cc index 07fffa2466a0e..a977c5a8901ad 100644 --- a/test/extensions/filters/network/kafka/mesh/filter_unit_test.cc +++ b/test/extensions/filters/network/kafka/mesh/filter_unit_test.cc @@ -176,6 +176,25 @@ TEST_F(FilterUnitTest, ShouldDoNothingOnBufferWatermarkEvents) { testee_.onAboveWriteBufferHighWatermark(); } +class MockUpstreamKafkaConfiguration : public UpstreamKafkaConfiguration { +public: + MOCK_METHOD(void, onData, (Buffer::Instance&)); + MOCK_METHOD(void, reset, ()); + MOCK_METHOD(absl::optional, computeClusterConfigForTopic, + (const std::string& topic), (const)); + MOCK_METHOD((std::pair), getAdvertisedAddress, (), (const)); +}; + +TEST(Filter, ShouldBeConstructable) { + // given + MockUpstreamKafkaConfiguration configuration; + + // when + KafkaMeshFilter filter = KafkaMeshFilter(configuration); + + // then - no exceptions. +} + } // namespace } // namespace Mesh } // namespace Kafka diff --git a/test/extensions/filters/network/kafka/mesh/request_processor_unit_test.cc b/test/extensions/filters/network/kafka/mesh/request_processor_unit_test.cc index d115c942ed6c8..a512633a993fa 100644 --- a/test/extensions/filters/network/kafka/mesh/request_processor_unit_test.cc +++ b/test/extensions/filters/network/kafka/mesh/request_processor_unit_test.cc @@ -1,5 +1,6 @@ #include "source/extensions/filters/network/kafka/mesh/abstract_command.h" #include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h" +#include "source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h" #include "source/extensions/filters/network/kafka/mesh/request_processor.h" #include "test/test_common/utility.h" @@ -22,12 +23,36 @@ class MockAbstractRequestListener : public AbstractRequestListener { MOCK_METHOD(void, onRequestReadyForAnswer, ()); }; +class MockUpstreamKafkaConfiguration : public UpstreamKafkaConfiguration { +public: + MOCK_METHOD(absl::optional, computeClusterConfigForTopic, (const std::string&), + (const)); + MOCK_METHOD((std::pair), getAdvertisedAddress, (), (const)); +}; + class RequestProcessorTest : public testing::Test { protected: MockAbstractRequestListener listener_; - RequestProcessor testee_ = {listener_}; + MockUpstreamKafkaConfiguration configuration_; + RequestProcessor testee_ = {listener_, configuration_}; }; +TEST_F(RequestProcessorTest, ShouldProcessMetadataRequest) { + // given + const RequestHeader header = {METADATA_REQUEST_API_KEY, 0, 0, absl::nullopt}; + const MetadataRequest data = {absl::nullopt}; + const auto message = std::make_shared>(header, data); + + InFlightRequestSharedPtr capture = nullptr; + EXPECT_CALL(listener_, onRequest(_)).WillOnce(testing::SaveArg<0>(&capture)); + + // when + testee_.onMessage(message); + + // then + ASSERT_NE(std::dynamic_pointer_cast(capture), nullptr); +} + TEST_F(RequestProcessorTest, ShouldProcessApiVersionsRequest) { // given const RequestHeader header = {API_VERSIONS_REQUEST_API_KEY, 0, 0, absl::nullopt};