Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/extensions/filters/network/kafka/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ envoy_cc_library(
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_codec_lib",
"//source/extensions/filters/network/kafka:kafka_request_parser_lib",
"//source/extensions/filters/network/kafka/mesh/command_handlers:api_versions_lib",
],
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

# Handlers for particular Kafka requests that are used by Kafka-mesh filter.

envoy_extension_package()

envoy_cc_library(
name = "api_versions_lib",
srcs = [
"api_versions.cc",
],
hdrs = [
"api_versions.h",
],
tags = ["skip_on_windows"],
deps = [
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_parser_lib",
"//source/extensions/filters/network/kafka:kafka_response_parser_lib",
"//source/extensions/filters/network/kafka:tagged_fields_lib",
"//source/extensions/filters/network/kafka/mesh:abstract_command_lib",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h"

#include "source/extensions/filters/network/kafka/external/requests.h"
#include "source/extensions/filters/network/kafka/external/responses.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// These constants define which versions of requests this "Kafka server" will understand.

// As we can process only record format 2 (which itself is pretty old coming from Kafka 1.0), we are
// going to handle only produce requests with versions higher than 5.
constexpr int16_t MIN_PRODUCE_SUPPORTED = 5;
constexpr int16_t MAX_PRODUCE_SUPPORTED = PRODUCE_REQUEST_MAX_VERSION; /* Generated value. */
// Right now we do not want to handle old version 0 request, as it expects us to enumerate all
// topics if list of requested topics is empty.
// Impl note: if filter gains knowledge of upstream cluster topics (e.g. thru admin clients), we
// could decrease this value.
constexpr int16_t MIN_METADATA_SUPPORTED = 1;
constexpr int16_t MAX_METADATA_SUPPORTED = METADATA_REQUEST_MAX_VERSION; /* Generated value. */

ApiVersionsRequestHolder::ApiVersionsRequestHolder(AbstractRequestListener& filter,
const RequestHeader request_header)
: BaseInFlightRequest{filter}, request_header_{request_header} {}

// Api Versions requests are immediately ready for answer (as they do not need to reach upstream).
void ApiVersionsRequestHolder::startProcessing() { notifyFilter(); }

// Because these requests can be trivially handled, the responses are okay to be sent downstream at
// any time.
bool ApiVersionsRequestHolder::finished() const { return true; }

AbstractResponseSharedPtr ApiVersionsRequestHolder::computeAnswer() const {
const ResponseMetadata metadata = {request_header_.api_key_, request_header_.api_version_,
request_header_.correlation_id_};

const int16_t error_code = 0;
const ApiVersionsResponseKey produce_entry = {PRODUCE_REQUEST_API_KEY, MIN_PRODUCE_SUPPORTED,
MAX_PRODUCE_SUPPORTED};
const ApiVersionsResponseKey metadata_entry = {METADATA_REQUEST_API_KEY, MIN_METADATA_SUPPORTED,
MAX_METADATA_SUPPORTED};
const ApiVersionsResponse real_response = {error_code, {produce_entry, metadata_entry}};

return std::make_shared<Response<ApiVersionsResponse>>(metadata, real_response);
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include "source/extensions/filters/network/kafka/external/requests.h"
#include "source/extensions/filters/network/kafka/mesh/abstract_command.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Api version requests are the first requests sent by Kafka clients to brokers.
* We send our customized response to fail clients that might be trying to accomplish something more
* than this filter supports.
*/
class ApiVersionsRequestHolder : public BaseInFlightRequest {
public:
ApiVersionsRequestHolder(AbstractRequestListener& filter, const RequestHeader request_header);

void startProcessing() override;

bool finished() const override;

AbstractResponseSharedPtr computeAnswer() const override;

private:
// Original request header.
const RequestHeader request_header_;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
20 changes: 18 additions & 2 deletions source/extensions/filters/network/kafka/mesh/request_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,37 @@

#include "envoy/common/exception.h"

#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

RequestProcessor::RequestProcessor(AbstractRequestListener& origin) : origin_{origin} {}

// Helper function. Throws a nice message. Filter will react by closing the connection.
static void throwOnUnsupportedRequest(const std::string& reason, const RequestHeader& header) {
throw EnvoyException(absl::StrCat(reason, " Kafka request (key=", header.api_key_, ", version=",
header.api_version_, ", cid=", header.correlation_id_));
}

void RequestProcessor::onMessage(AbstractRequestSharedPtr arg) {
// This will be replaced with switch on header's API key.
throwOnUnsupportedRequest("unsupported (bad client API invoked?)", arg->request_header_);
switch (arg->request_header_.api_key_) {
case API_VERSIONS_REQUEST_API_KEY:
process(std::dynamic_pointer_cast<Request<ApiVersionsRequest>>(arg));
break;
default:
// Client sent a request we cannot handle right now.
throwOnUnsupportedRequest("unsupported (bad client API invoked?)", arg->request_header_);
break;
} // switch
}

void RequestProcessor::process(const std::shared_ptr<Request<ApiVersionsRequest>> request) const {
auto res = std::make_shared<ApiVersionsRequestHolder>(origin_, request->request_header_);
origin_.onRequest(res);
}

// We got something that the parser could not handle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ namespace Mesh {
*/
class RequestProcessor : public RequestCallback, private Logger::Loggable<Logger::Id::kafka> {
public:
RequestProcessor() = default;
RequestProcessor(AbstractRequestListener& origin);

// RequestCallback
void onMessage(AbstractRequestSharedPtr arg) override;
void onFailedParse(RequestParseFailureSharedPtr) override;

private:
void process(const std::shared_ptr<Request<ApiVersionsRequest>> request) const;

AbstractRequestListener& origin_;
};

} // namespace Mesh
Expand Down
14 changes: 12 additions & 2 deletions source/extensions/filters/network/kafka/protocol/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,17 @@ def is_printable(self):

class RenderingHelper:
"""
Helper for jinja templates.
"""
Utility function that allows us to process names in jinja easier.
"""

@staticmethod
def camel_case_to_snake_case(str):
import re
return re.sub('(?!^)([A-Z]+)', r'_\1', str)

"""
Helper for jinja templates.
"""

@staticmethod
def get_template(template):
Expand All @@ -713,4 +722,5 @@ def get_template(template):
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(
searchpath=os.path.dirname(os.path.abspath(sys.argv[0]))))
env.filters['camel_case_to_snake_case'] = RenderingHelper.camel_case_to_snake_case
return env.get_template(template)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
(see 'kafka_request_resolver_cc.j2').
#}

constexpr int16_t {{ complex_type.name | camel_case_to_snake_case | upper }}_API_KEY =
{{ complex_type.get_extra('api_key') }};

constexpr int16_t {{ complex_type.name | camel_case_to_snake_case | upper }}_MAX_VERSION =
{{ complex_type.versions[-1] }};

{% for version in complex_type.versions %}class {{ complex_type.name }}V{{ version }}Parser:
public RequestDataParser<
{{ complex_type.name }}, {{ complex_type.name }}V{{ version }}Deserializer>
Expand Down
25 changes: 25 additions & 0 deletions test/extensions/filters/network/kafka/mesh/command_handlers/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_package",
)
load(
"//test/extensions:extensions_build_system.bzl",
"envoy_extension_cc_test",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_extension_cc_test(
name = "api_versions_unit_test",
srcs = ["api_versions_unit_test.cc"],
# This name needs to be changed after we have the mesh filter ready.
extension_names = ["envoy.filters.network.kafka_broker"],
tags = ["skip_on_windows"],
deps = [
"//source/extensions/filters/network/kafka/mesh/command_handlers:api_versions_lib",
"//test/mocks/network:network_mocks",
"//test/mocks/stats:stats_mocks",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {
namespace {

class MockAbstractRequestListener : public AbstractRequestListener {
public:
MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr));
MOCK_METHOD(void, onRequestReadyForAnswer, ());
};

TEST(ApiVersionsTest, shouldBeAlwaysReadyForAnswer) {
// given
MockAbstractRequestListener filter;
EXPECT_CALL(filter, onRequestReadyForAnswer());
const RequestHeader header = {API_VERSIONS_REQUEST_API_KEY, 0, 0, absl::nullopt};
ApiVersionsRequestHolder testee = {filter, header};

// when, then - invoking should immediately notify the filter.
testee.startProcessing();

// when, then - should always be considered finished.
const bool finished = testee.finished();
EXPECT_TRUE(finished);

// when, then - the computed result is always contains correct data (confirmed by integration
// tests).
const auto answer = testee.computeAnswer();
EXPECT_EQ(answer->metadata_.api_key_, header.api_key_);
EXPECT_EQ(answer->metadata_.correlation_id_, header.correlation_id_);
}

} // namespace
} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -1,26 +1,52 @@
#include "source/extensions/filters/network/kafka/mesh/abstract_command.h"
#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h"
#include "source/extensions/filters/network/kafka/mesh/request_processor.h"

#include "test/test_common/utility.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::_;

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {
namespace {

class MockAbstractRequestListener : public AbstractRequestListener {
public:
MOCK_METHOD(void, onRequest, (InFlightRequestSharedPtr));
MOCK_METHOD(void, onRequestReadyForAnswer, ());
};

class RequestProcessorTest : public testing::Test {
protected:
RequestProcessor testee_ = {};
MockAbstractRequestListener listener_;
RequestProcessor testee_ = {listener_};
};

TEST_F(RequestProcessorTest, ShouldProcessApiVersionsRequest) {
// given
const RequestHeader header = {API_VERSIONS_REQUEST_API_KEY, 0, 0, absl::nullopt};
const ApiVersionsRequest data = {};
const auto message = std::make_shared<Request<ApiVersionsRequest>>(header, data);

InFlightRequestSharedPtr capture = nullptr;
EXPECT_CALL(listener_, onRequest(_)).WillOnce(testing::SaveArg<0>(&capture));

// when
testee_.onMessage(message);

// then
ASSERT_NE(std::dynamic_pointer_cast<ApiVersionsRequestHolder>(capture), nullptr);
}

TEST_F(RequestProcessorTest, ShouldHandleUnsupportedRequest) {
// given
const RequestHeader header = {0, 0, 0, absl::nullopt};
const RequestHeader header = {LIST_OFFSET_REQUEST_API_KEY, 0, 0, absl::nullopt};
const ListOffsetRequest data = {0, {}};
const auto message = std::make_shared<Request<ListOffsetRequest>>(header, data);

Expand Down