Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion source/extensions/filters/network/kafka/kafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ template <typename Data> class Request : public AbstractRequest {
return request_header_ == rhs.request_header_ && data_ == rhs.data_;
};

private:
const Data data_;
};

Expand Down
1 change: 0 additions & 1 deletion source/extensions/filters/network/kafka/kafka_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ template <typename Data> class Response : public AbstractResponse {
return metadata_ == rhs.metadata_ && data_ == rhs.data_;
};

private:
const Data data_;
};

Expand Down
15 changes: 15 additions & 0 deletions source/extensions/filters/network/kafka/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ envoy_cc_library(
deps = [
":abstract_command_lib",
":request_processor_lib",
":upstream_config_lib",
"//envoy/buffer:buffer_interface",
"//envoy/network:connection_interface",
"//envoy/network:filter_interface",
Expand All @@ -41,10 +42,12 @@ envoy_cc_library(
tags = ["skip_on_windows"],
deps = [
":abstract_command_lib",
":upstream_config_lib",
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_codec_lib",
"//source/extensions/filters/network/kafka:kafka_request_parser_lib",
"//source/extensions/filters/network/kafka/mesh/command_handlers:api_versions_lib",
"//source/extensions/filters/network/kafka/mesh/command_handlers:metadata_lib",
],
)

Expand All @@ -63,3 +66,15 @@ envoy_cc_library(
"//source/extensions/filters/network/kafka:tagged_fields_lib",
],
)

envoy_cc_library(
name = "upstream_config_lib",
srcs = [
],
hdrs = [
"upstream_config.h",
],
tags = ["skip_on_windows"],
deps = [
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "metadata_lib",
srcs = [
"metadata.cc",
],
hdrs = [
"metadata.h",
],
tags = ["skip_on_windows"],
deps = [
"//source/common/common:minimal_logger_lib",
"//source/extensions/filters/network/kafka:kafka_request_parser_lib",
"//source/extensions/filters/network/kafka:kafka_response_parser_lib",
"//source/extensions/filters/network/kafka/mesh:abstract_command_lib",
"//source/extensions/filters/network/kafka/mesh:upstream_config_lib",
],
)

envoy_cc_library(
name = "api_versions_lib",
srcs = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include "source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h"

#include "source/extensions/filters/network/kafka/external/responses.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

MetadataRequestHolder::MetadataRequestHolder(
AbstractRequestListener& filter, const UpstreamKafkaConfiguration& configuration,
const std::shared_ptr<Request<MetadataRequest>> request)
: BaseInFlightRequest{filter}, configuration_{configuration}, request_{request} {}

// Metadata requests are immediately ready for answer (as they do not need to reach upstream).
void MetadataRequestHolder::startProcessing() { notifyFilter(); }

bool MetadataRequestHolder::finished() const { return true; }

constexpr int32_t ENVOY_BROKER_ID = 0;
constexpr int32_t NO_ERROR = 0;

// Cornerstone of how the mesh-filter actually works.
// We pretend to be one-node Kafka cluster, with Envoy instance being the only member.
// What means all the Kafka future traffic will go through this instance.
AbstractResponseSharedPtr MetadataRequestHolder::computeAnswer() const {
const auto& header = request_->request_header_;
const ResponseMetadata metadata = {header.api_key_, header.api_version_, header.correlation_id_};

const auto advertised_address = configuration_.getAdvertisedAddress();
MetadataResponseBroker broker = {ENVOY_BROKER_ID, advertised_address.first,
advertised_address.second};
std::vector<MetadataResponseTopic> response_topics;
if (request_->data_.topics_) {
for (const auto& topic : *(request_->data_.topics_)) {
const std::string& topic_name = topic.name_;
std::vector<MetadataResponsePartition> topic_partitions;
const absl::optional<ClusterConfig> cluster_config =
configuration_.computeClusterConfigForTopic(topic_name);
if (!cluster_config) {
// Someone is requesting topics that are not known to our configuration.
// So we do not attach any metadata, this will cause clients failures downstream as they
// will never be able to get metadata for these topics.
continue;
}
for (int32_t partition_id = 0; partition_id < cluster_config->partition_count_;
++partition_id) {
// Every partition is hosted by this proxy-broker.
MetadataResponsePartition partition = {
NO_ERROR, partition_id, broker.node_id_, {broker.node_id_}, {broker.node_id_}};
topic_partitions.push_back(partition);
}
MetadataResponseTopic response_topic = {NO_ERROR, topic_name, false, topic_partitions};
response_topics.push_back(response_topic);
}
}
MetadataResponse data = {{broker}, broker.node_id_, response_topics};
return std::make_shared<Response<MetadataResponse>>(metadata, data);
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include "source/extensions/filters/network/kafka/external/requests.h"
#include "source/extensions/filters/network/kafka/mesh/abstract_command.h"
#include "source/extensions/filters/network/kafka/mesh/upstream_config.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

class MetadataRequestHolder : public BaseInFlightRequest {
public:
MetadataRequestHolder(AbstractRequestListener& filter,
const UpstreamKafkaConfiguration& configuration,
const std::shared_ptr<Request<MetadataRequest>> request);

void startProcessing() override;

bool finished() const override;

AbstractResponseSharedPtr computeAnswer() const override;

private:
// Configuration used to provide data for response.
const UpstreamKafkaConfiguration& configuration_;

// Original request.
const std::shared_ptr<Request<MetadataRequest>> request_;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
4 changes: 4 additions & 0 deletions source/extensions/filters/network/kafka/mesh/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

KafkaMeshFilter::KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration)
: KafkaMeshFilter{std::make_shared<RequestDecoder>(std::vector<RequestCallbackSharedPtr>(
{std::make_shared<RequestProcessor>(*this, configuration)}))} {}

KafkaMeshFilter::KafkaMeshFilter(RequestDecoderSharedPtr request_decoder)
: request_decoder_{request_decoder} {}

Expand Down
4 changes: 4 additions & 0 deletions source/extensions/filters/network/kafka/mesh/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "source/extensions/filters/network/kafka/external/requests.h"
#include "source/extensions/filters/network/kafka/mesh/abstract_command.h"
#include "source/extensions/filters/network/kafka/mesh/request_processor.h"
#include "source/extensions/filters/network/kafka/mesh/upstream_config.h"
#include "source/extensions/filters/network/kafka/request_codec.h"

namespace Envoy {
Expand Down Expand Up @@ -38,6 +39,9 @@ class KafkaMeshFilter : public Network::ReadFilter,
public AbstractRequestListener,
private Logger::Loggable<Logger::Id::kafka> {
public:
// Proper constructor.
KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration);

// Visible for testing.
KafkaMeshFilter(RequestDecoderSharedPtr request_decoder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
#include "envoy/common/exception.h"

#include "source/extensions/filters/network/kafka/mesh/command_handlers/api_versions.h"
#include "source/extensions/filters/network/kafka/mesh/command_handlers/metadata.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

RequestProcessor::RequestProcessor(AbstractRequestListener& origin) : origin_{origin} {}
RequestProcessor::RequestProcessor(AbstractRequestListener& origin,
const UpstreamKafkaConfiguration& configuration)
: origin_{origin}, configuration_{configuration} {}

// Helper function. Throws a nice message. Filter will react by closing the connection.
static void throwOnUnsupportedRequest(const std::string& reason, const RequestHeader& header) {
Expand All @@ -20,6 +23,9 @@ static void throwOnUnsupportedRequest(const std::string& reason, const RequestHe

void RequestProcessor::onMessage(AbstractRequestSharedPtr arg) {
switch (arg->request_header_.api_key_) {
case METADATA_REQUEST_API_KEY:
process(std::dynamic_pointer_cast<Request<MetadataRequest>>(arg));
break;
case API_VERSIONS_REQUEST_API_KEY:
process(std::dynamic_pointer_cast<Request<ApiVersionsRequest>>(arg));
break;
Expand All @@ -30,6 +36,11 @@ void RequestProcessor::onMessage(AbstractRequestSharedPtr arg) {
} // switch
}

void RequestProcessor::process(const std::shared_ptr<Request<MetadataRequest>> request) const {
auto res = std::make_shared<MetadataRequestHolder>(origin_, configuration_, request);
origin_.onRequest(res);
}

void RequestProcessor::process(const std::shared_ptr<Request<ApiVersionsRequest>> request) const {
auto res = std::make_shared<ApiVersionsRequestHolder>(origin_, request->request_header_);
origin_.onRequest(res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "source/common/common/logger.h"
#include "source/extensions/filters/network/kafka/external/requests.h"
#include "source/extensions/filters/network/kafka/mesh/abstract_command.h"
#include "source/extensions/filters/network/kafka/mesh/upstream_config.h"
#include "source/extensions/filters/network/kafka/request_codec.h"

namespace Envoy {
Expand All @@ -16,16 +17,19 @@ namespace Mesh {
*/
class RequestProcessor : public RequestCallback, private Logger::Loggable<Logger::Id::kafka> {
public:
RequestProcessor(AbstractRequestListener& origin);
RequestProcessor(AbstractRequestListener& origin,
const UpstreamKafkaConfiguration& configuration);

// RequestCallback
void onMessage(AbstractRequestSharedPtr arg) override;
void onFailedParse(RequestParseFailureSharedPtr) override;

private:
void process(const std::shared_ptr<Request<MetadataRequest>> request) const;
void process(const std::shared_ptr<Request<ApiVersionsRequest>> request) const;

AbstractRequestListener& origin_;
const UpstreamKafkaConfiguration& configuration_;
};

} // namespace Mesh
Expand Down
56 changes: 56 additions & 0 deletions source/extensions/filters/network/kafka/mesh/upstream_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include <map>
#include <memory>
#include <string>
#include <utility>

#include "envoy/common/pure.h"

#include "absl/types/optional.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// Minor helper structure that contains information about upstream Kafka clusters.
struct ClusterConfig {

// Cluster name, as it appears in configuration input.
std::string name_;

// How many partitions do we expect for every one of the topics present in given upstream cluster.
// Impl note: this could be replaced with creating (shared?) AdminClient and having it reach out
// upstream to get configuration (or we could just send a correct request via codec). The response
// would need to be cached (as this data is frequently requested).
int32_t partition_count_;

// The configuration that will be passed to upstream client for given cluster.
// This allows us to reference different clusters with different configs (e.g. linger.ms).
// This map always contains entry with key 'bootstrap.servers', as this is the only mandatory
// producer property.
std::map<std::string, std::string> upstream_producer_properties_;
};

/**
* Keeps the configuration related to upstream Kafka clusters.
* Impl note: current matching from topic to cluster is based on prefix matching but more complex
* rules could be added.
*/
class UpstreamKafkaConfiguration {
Copy link
Contributor Author

@adamkotwasinski adamkotwasinski Aug 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public:
virtual ~UpstreamKafkaConfiguration() = default;
virtual absl::optional<ClusterConfig>
computeClusterConfigForTopic(const std::string& topic) const PURE;
virtual std::pair<std::string, int32_t> getAdvertisedAddress() const PURE;
};

using UpstreamKafkaConfigurationSharedPtr = std::shared_ptr<const UpstreamKafkaConfiguration>;

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
13 changes: 13 additions & 0 deletions test/extensions/filters/network/kafka/mesh/command_handlers/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ licenses(["notice"]) # Apache 2

envoy_package()

envoy_extension_cc_test(
name = "metadata_unit_test",
srcs = ["metadata_unit_test.cc"],
# This name needs to be changed after we have the mesh filter ready.
extension_names = ["envoy.filters.network.kafka_broker"],
tags = ["skip_on_windows"],
deps = [
"//source/extensions/filters/network/kafka/mesh/command_handlers:metadata_lib",
"//test/mocks/network:network_mocks",
"//test/mocks/stats:stats_mocks",
],
)

envoy_extension_cc_test(
name = "api_versions_unit_test",
srcs = ["api_versions_unit_test.cc"],
Expand Down
Loading