diff --git a/source/extensions/filters/network/kafka/mesh/BUILD b/source/extensions/filters/network/kafka/mesh/BUILD index e707476b50891..b4a2581e686ff 100644 --- a/source/extensions/filters/network/kafka/mesh/BUILD +++ b/source/extensions/filters/network/kafka/mesh/BUILD @@ -44,6 +44,7 @@ envoy_cc_library( "//source/common/common:minimal_logger_lib", "//source/extensions/filters/network/kafka:kafka_request_codec_lib", "//source/extensions/filters/network/kafka:kafka_request_parser_lib", + "//source/extensions/filters/network/kafka/mesh/command_handlers:api_versions_lib", ], ) diff --git a/source/extensions/filters/network/kafka/mesh/command_handlers/BUILD b/source/extensions/filters/network/kafka/mesh/command_handlers/BUILD new file mode 100644 index 0000000000000..b04e546434889 --- /dev/null +++ b/source/extensions/filters/network/kafka/mesh/command_handlers/BUILD @@ -0,0 +1,29 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +# Handlers for particular Kafka requests that are used by Kafka-mesh filter. + +envoy_extension_package() + +envoy_cc_library( + name = "api_versions_lib", + srcs = [ + "api_versions.cc", + ], + hdrs = [ + "api_versions.h", + ], + tags = ["skip_on_windows"], + deps = [ + "//source/common/common:minimal_logger_lib", + "//source/extensions/filters/network/kafka:kafka_request_parser_lib", + "//source/extensions/filters/network/kafka:kafka_response_parser_lib", + "//source/extensions/filters/network/kafka:tagged_fields_lib", + "//source/extensions/filters/network/kafka/mesh:abstract_command_lib", + ], +) diff --git a/source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.cc b/source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.cc new file mode 100644 index 0000000000000..8ec3a92616fbe --- /dev/null +++ b/source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.cc @@ -0,0 +1,54 @@ +#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h" + +#include "source/extensions/filters/network/kafka/external/requests.h" +#include "source/extensions/filters/network/kafka/external/responses.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +// These constants define which versions of requests this "Kafka server" will understand. + +// As we can process only record format 2 (which itself is pretty old coming from Kafka 1.0), we are +// going to handle only produce requests with versions higher than 5. +constexpr int16_t MIN_PRODUCE_SUPPORTED = 5; +constexpr int16_t MAX_PRODUCE_SUPPORTED = PRODUCE_REQUEST_MAX_VERSION; /* Generated value. */ +// Right now we do not want to handle old version 0 request, as it expects us to enumerate all +// topics if list of requested topics is empty. +// Impl note: if filter gains knowledge of upstream cluster topics (e.g. thru admin clients), we +// could decrease this value. +constexpr int16_t MIN_METADATA_SUPPORTED = 1; +constexpr int16_t MAX_METADATA_SUPPORTED = METADATA_REQUEST_MAX_VERSION; /* Generated value. */ + +ApiVersionsRequestHolder::ApiVersionsRequestHolder(AbstractRequestListener& filter, + const RequestHeader request_header) + : BaseInFlightRequest{filter}, request_header_{request_header} {} + +// Api Versions requests are immediately ready for answer (as they do not need to reach upstream). +void ApiVersionsRequestHolder::startProcessing() { notifyFilter(); } + +// Because these requests can be trivially handled, the responses are okay to be sent downstream at +// any time. +bool ApiVersionsRequestHolder::finished() const { return true; } + +AbstractResponseSharedPtr ApiVersionsRequestHolder::computeAnswer() const { + const ResponseMetadata metadata = {request_header_.api_key_, request_header_.api_version_, + request_header_.correlation_id_}; + + const int16_t error_code = 0; + const ApiVersionsResponseKey produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED, + MAX_PRODUCE_SUPPORTED}; + const ApiVersionsResponseKey metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED, + MAX_METADATA_SUPPORTED}; + const ApiVersionsResponse real_response = {error_code, {produce_entry, metadata_entry}}; + + return std::make_shared>(metadata, real_response); +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h b/source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h new file mode 100644 index 0000000000000..004e3a1cd430f --- /dev/null +++ b/source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h @@ -0,0 +1,36 @@ +#pragma once + +#include "source/extensions/filters/network/kafka/external/requests.h" +#include "source/extensions/filters/network/kafka/mesh/abstract_command.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +/** + * Api version requests are the first requests sent by Kafka clients to brokers. + * We send our customized response to fail clients that might be trying to accomplish something more + * than this filter supports. + */ +class ApiVersionsRequestHolder : public BaseInFlightRequest { +public: + ApiVersionsRequestHolder(AbstractRequestListener& filter, const RequestHeader request_header); + + void startProcessing() override; + + bool finished() const override; + + AbstractResponseSharedPtr computeAnswer() const override; + +private: + // Original request header. + const RequestHeader request_header_; +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/kafka/mesh/request_processor.cc b/source/extensions/filters/network/kafka/mesh/request_processor.cc index d19c8f9edd055..0613e2bc38ef0 100644 --- a/source/extensions/filters/network/kafka/mesh/request_processor.cc +++ b/source/extensions/filters/network/kafka/mesh/request_processor.cc @@ -2,12 +2,16 @@ #include "envoy/common/exception.h" +#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { namespace Kafka { namespace Mesh { +RequestProcessor::RequestProcessor(AbstractRequestListener& origin) : origin_{origin} {} + // Helper function. Throws a nice message. Filter will react by closing the connection. static void throwOnUnsupportedRequest(const std::string& reason, const RequestHeader& header) { throw EnvoyException(absl::StrCat(reason, " Kafka request (key=", header.api_key_, ", version=", @@ -15,8 +19,20 @@ static void throwOnUnsupportedRequest(const std::string& reason, const RequestHe } void RequestProcessor::onMessage(AbstractRequestSharedPtr arg) { - // This will be replaced with switch on header's API key. - throwOnUnsupportedRequest("unsupported (bad client API invoked?)", arg->request_header_); + switch (arg->request_header_.api_key_) { + case API_VERSIONS_REQUEST_API_KEY: + process(std::dynamic_pointer_cast>(arg)); + break; + default: + // Client sent a request we cannot handle right now. + throwOnUnsupportedRequest("unsupported (bad client API invoked?)", arg->request_header_); + break; + } // switch +} + +void RequestProcessor::process(const std::shared_ptr> request) const { + auto res = std::make_shared(origin_, request->request_header_); + origin_.onRequest(res); } // We got something that the parser could not handle. diff --git a/source/extensions/filters/network/kafka/mesh/request_processor.h b/source/extensions/filters/network/kafka/mesh/request_processor.h index ae89ca8ae47be..9b4320fd3e329 100644 --- a/source/extensions/filters/network/kafka/mesh/request_processor.h +++ b/source/extensions/filters/network/kafka/mesh/request_processor.h @@ -16,11 +16,16 @@ namespace Mesh { */ class RequestProcessor : public RequestCallback, private Logger::Loggable { public: - RequestProcessor() = default; + RequestProcessor(AbstractRequestListener& origin); // RequestCallback void onMessage(AbstractRequestSharedPtr arg) override; void onFailedParse(RequestParseFailureSharedPtr) override; + +private: + void process(const std::shared_ptr> request) const; + + AbstractRequestListener& origin_; }; } // namespace Mesh diff --git a/source/extensions/filters/network/kafka/protocol/generator.py b/source/extensions/filters/network/kafka/protocol/generator.py index 3ba3ac0a844ff..846dd2aa2d9b7 100755 --- a/source/extensions/filters/network/kafka/protocol/generator.py +++ b/source/extensions/filters/network/kafka/protocol/generator.py @@ -700,8 +700,17 @@ def is_printable(self): class RenderingHelper: """ - Helper for jinja templates. - """ + Utility function that allows us to process names in jinja easier. + """ + + @staticmethod + def camel_case_to_snake_case(str): + import re + return re.sub('(?!^)([A-Z]+)', r'_\1', str) + + """ + Helper for jinja templates. + """ @staticmethod def get_template(template): @@ -713,4 +722,5 @@ def get_template(template): env = jinja2.Environment( loader=jinja2.FileSystemLoader( searchpath=os.path.dirname(os.path.abspath(sys.argv[0])))) + env.filters['camel_case_to_snake_case'] = RenderingHelper.camel_case_to_snake_case return env.get_template(template) diff --git a/source/extensions/filters/network/kafka/protocol/request_parser.j2 b/source/extensions/filters/network/kafka/protocol/request_parser.j2 index 712f0d4294f23..db536a0e03e14 100644 --- a/source/extensions/filters/network/kafka/protocol/request_parser.j2 +++ b/source/extensions/filters/network/kafka/protocol/request_parser.j2 @@ -8,6 +8,12 @@ (see 'kafka_request_resolver_cc.j2'). #} +constexpr int16_t {{ complex_type.name | camel_case_to_snake_case | upper }}_API_KEY = + {{ complex_type.get_extra('api_key') }}; + +constexpr int16_t {{ complex_type.name | camel_case_to_snake_case | upper }}_MAX_VERSION = + {{ complex_type.versions[-1] }}; + {% for version in complex_type.versions %}class {{ complex_type.name }}V{{ version }}Parser: public RequestDataParser< {{ complex_type.name }}, {{ complex_type.name }}V{{ version }}Deserializer> diff --git a/test/extensions/filters/network/kafka/mesh/command_handlers/BUILD b/test/extensions/filters/network/kafka/mesh/command_handlers/BUILD new file mode 100644 index 0000000000000..053ad707c2626 --- /dev/null +++ b/test/extensions/filters/network/kafka/mesh/command_handlers/BUILD @@ -0,0 +1,25 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_extension_cc_test( + name = "api_versions_unit_test", + srcs = ["api_versions_unit_test.cc"], + # This name needs to be changed after we have the mesh filter ready. + extension_names = ["envoy.filters.network.kafka_broker"], + tags = ["skip_on_windows"], + deps = [ + "//source/extensions/filters/network/kafka/mesh/command_handlers:api_versions_lib", + "//test/mocks/network:network_mocks", + "//test/mocks/stats:stats_mocks", + ], +) diff --git a/test/extensions/filters/network/kafka/mesh/command_handlers/api_versions_unit_test.cc b/test/extensions/filters/network/kafka/mesh/command_handlers/api_versions_unit_test.cc new file mode 100644 index 0000000000000..bb8e33f1e61ea --- /dev/null +++ b/test/extensions/filters/network/kafka/mesh/command_handlers/api_versions_unit_test.cc @@ -0,0 +1,45 @@ +#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +class MockAbstractRequestListener : public AbstractRequestListener { +public: + MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); + MOCK_METHOD(void, onRequestReadyForAnswer, ()); +}; + +TEST(ApiVersionsTest, shouldBeAlwaysReadyForAnswer) { + // given + MockAbstractRequestListener filter; + EXPECT_CALL(filter, onRequestReadyForAnswer()); + const RequestHeader header = {API_VERSIONS_REQUEST_API_KEY, 0, 0, absl::nullopt}; + ApiVersionsRequestHolder testee = {filter, header}; + + // when, then - invoking should immediately notify the filter. + testee.startProcessing(); + + // when, then - should always be considered finished. + const bool finished = testee.finished(); + EXPECT_TRUE(finished); + + // when, then - the computed result is always contains correct data (confirmed by integration + // tests). + const auto answer = testee.computeAnswer(); + EXPECT_EQ(answer->metadata_.api_key_, header.api_key_); + EXPECT_EQ(answer->metadata_.correlation_id_, header.correlation_id_); +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/network/kafka/mesh/request_processor_unit_test.cc b/test/extensions/filters/network/kafka/mesh/request_processor_unit_test.cc index 45639eb9b223b..d115c942ed6c8 100644 --- a/test/extensions/filters/network/kafka/mesh/request_processor_unit_test.cc +++ b/test/extensions/filters/network/kafka/mesh/request_processor_unit_test.cc @@ -1,4 +1,5 @@ #include "source/extensions/filters/network/kafka/mesh/abstract_command.h" +#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h" #include "source/extensions/filters/network/kafka/mesh/request_processor.h" #include "test/test_common/utility.h" @@ -6,6 +7,8 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +using testing::_; + namespace Envoy { namespace Extensions { namespace NetworkFilters { @@ -13,14 +16,37 @@ namespace Kafka { namespace Mesh { namespace { +class MockAbstractRequestListener : public AbstractRequestListener { +public: + MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr)); + MOCK_METHOD(void, onRequestReadyForAnswer, ()); +}; + class RequestProcessorTest : public testing::Test { protected: - RequestProcessor testee_ = {}; + MockAbstractRequestListener listener_; + RequestProcessor testee_ = {listener_}; }; +TEST_F(RequestProcessorTest, ShouldProcessApiVersionsRequest) { + // given + const RequestHeader header = {API_VERSIONS_REQUEST_API_KEY, 0, 0, absl::nullopt}; + const ApiVersionsRequest data = {}; + const auto message = std::make_shared>(header, data); + + InFlightRequestSharedPtr capture = nullptr; + EXPECT_CALL(listener_, onRequest(_)).WillOnce(testing::SaveArg<0>(&capture)); + + // when + testee_.onMessage(message); + + // then + ASSERT_NE(std::dynamic_pointer_cast(capture), nullptr); +} + TEST_F(RequestProcessorTest, ShouldHandleUnsupportedRequest) { // given - const RequestHeader header = {0, 0, 0, absl::nullopt}; + const RequestHeader header = {LIST_OFFSET_REQUEST_API_KEY, 0, 0, absl::nullopt}; const ListOffsetRequest data = {0, {}}; const auto message = std::make_shared>(header, data);