Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ envoy_cc_library(
deps = [
":abstract_command_lib",
":upstream_config_lib",
":upstream_kafka_facade_lib",
"//contrib/kafka/filters/network/source:kafka_request_codec_lib",
"//contrib/kafka/filters/network/source:kafka_request_parser_lib",
"//contrib/kafka/filters/network/source/mesh/command_handlers:api_versions_lib",
"//contrib/kafka/filters/network/source/mesh/command_handlers:metadata_lib",
"//contrib/kafka/filters/network/source/mesh/command_handlers:produce_lib",
"//source/common/common:minimal_logger_lib",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,53 @@ envoy_contrib_package()

# Handlers for particular Kafka requests that are used by Kafka-mesh filter.

envoy_cc_library(
name = "produce_lib",
srcs = [
"produce.cc",
],
hdrs = [
"produce.h",
],
tags = ["skip_on_windows"],
deps = [
":produce_outbound_record_lib",
":produce_record_extractor_lib",
"//contrib/kafka/filters/network/source:kafka_request_parser_lib",
"//contrib/kafka/filters/network/source:kafka_response_parser_lib",
"//contrib/kafka/filters/network/source/mesh:abstract_command_lib",
"//contrib/kafka/filters/network/source/mesh:upstream_kafka_facade_lib",
"//source/common/common:minimal_logger_lib",
],
)

envoy_cc_library(
name = "produce_outbound_record_lib",
srcs = [
],
hdrs = [
"produce_outbound_record.h",
],
tags = ["skip_on_windows"],
deps = [
],
)

envoy_cc_library(
name = "produce_record_extractor_lib",
srcs = [
"produce_record_extractor.cc",
],
hdrs = [
"produce_record_extractor.h",
],
tags = ["skip_on_windows"],
deps = [
":produce_outbound_record_lib",
"//contrib/kafka/filters/network/source:kafka_request_parser_lib",
],
)

envoy_cc_library(
name = "metadata_lib",
srcs = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce.h"

#include "contrib/kafka/filters/network/source/external/responses.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

constexpr static int16_t NO_ERROR = 0;

ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter,
UpstreamKafkaFacade& kafka_facade,
const std::shared_ptr<Request<ProduceRequest>> request)
: ProduceRequestHolder{filter, kafka_facade, PlaceholderRecordExtractor{}, request} {};

ProduceRequestHolder::ProduceRequestHolder(AbstractRequestListener& filter,
UpstreamKafkaFacade& kafka_facade,
const RecordExtractor& record_extractor,
const std::shared_ptr<Request<ProduceRequest>> request)
: BaseInFlightRequest{filter}, kafka_facade_{kafka_facade}, request_{request} {
outbound_records_ = record_extractor.extractRecords(request_->data_.topics_);
expected_responses_ = outbound_records_.size();
}

void ProduceRequestHolder::startProcessing() {
// Main part of the proxy: for each outbound record we get the appropriate sink (effectively a
// facade for upstream Kafka cluster), and send the record to it.
for (const auto& outbound_record : outbound_records_) {
KafkaProducer& producer = kafka_facade_.getProducerForTopic(outbound_record.topic_);
// We need to provide our object as first argument, as we will want to be notified when the
// delivery finishes.
producer.send(shared_from_this(), outbound_record.topic_, outbound_record.partition_,
outbound_record.key_, outbound_record.value_);
}
// Corner case handling:
// If we ever receive produce request without records, we need to notify the filter we are ready,
// because otherwise no notification will ever come from the real Kafka producer.
if (finished()) {
notifyFilter();
}
}

bool ProduceRequestHolder::finished() const { return 0 == expected_responses_; }

// Find a record that matches provided delivery confirmation coming from Kafka producer.
// If all the records got their delivery data filled in, we are done, and can notify the origin
// filter.
bool ProduceRequestHolder::accept(const DeliveryMemento& memento) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[future]
this method gets invoked thru future RichKafkaProducer implementation going thru the dispatcher to handle librdkafka delivery confirmation

TLDR: librdkafka callback (dedicated thread) ~> dispatcher ~> RichKafkaProducer (envoy worker thread) -> PRH::accept

for (auto& outbound_record : outbound_records_) {
if (outbound_record.value_.data() == memento.data_) {
// We have matched the downstream request that matches our confirmation from upstream Kafka.
outbound_record.error_code_ = memento.error_code_;
outbound_record.saved_offset_ = memento.offset_;
--expected_responses_;
if (finished()) {
// All elements had their responses matched.
ENVOY_LOG(trace, "All deliveries finished for produce request {}",
request_->request_header_.correlation_id_);
notifyFilter();
}
return true;
}
}
return false;
}

AbstractResponseSharedPtr ProduceRequestHolder::computeAnswer() const {

// Header.
const RequestHeader& rh = request_->request_header_;
ResponseMetadata metadata = {rh.api_key_, rh.api_version_, rh.correlation_id_};

// Real answer.
using ErrorCodeAndOffset = std::pair<int16_t, uint32_t>;
std::map<std::string, std::map<int32_t, ErrorCodeAndOffset>> topic_to_partition_responses;
for (const auto& outbound_record : outbound_records_) {
auto& partition_map = topic_to_partition_responses[outbound_record.topic_];
auto it = partition_map.find(outbound_record.partition_);
if (it == partition_map.end()) {
partition_map[outbound_record.partition_] = {outbound_record.error_code_,
outbound_record.saved_offset_};
} else {
// Proxy logic - aggregating multiple upstream answers into single downstream answer.
// Let's fail if anything fails, otherwise use the lowest offset (like Kafka would have done).
ErrorCodeAndOffset& curr = it->second;
if (NO_ERROR == curr.first) {
curr.first = outbound_record.error_code_;
curr.second = std::min(curr.second, outbound_record.saved_offset_);
}
}
}

std::vector<TopicProduceResponse> topic_responses;
for (const auto& topic_entry : topic_to_partition_responses) {
std::vector<PartitionProduceResponse> partition_responses;
for (const auto& partition_entry : topic_entry.second) {
const int32_t& partition = partition_entry.first;
const int16_t& error_code = partition_entry.second.first;
const int64_t& offset = partition_entry.second.second;
partition_responses.emplace_back(partition, error_code, offset);
}
const std::string& topic = topic_entry.first;
topic_responses.emplace_back(topic, partition_responses);
}

ProduceResponse data = {topic_responses, 0};
return std::make_shared<Response<ProduceResponse>>(metadata, data);
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#pragma once

#include "contrib/kafka/filters/network/source/external/requests.h"
#include "contrib/kafka/filters/network/source/mesh/abstract_command.h"
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_outbound_record.h"
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h"
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h"
#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Kafka 'Produce' request, that is aimed at particular cluster.
* A single Produce request coming from downstream can map into multiple entries,
* as the topics can be hosted on different clusters.
*/
class ProduceRequestHolder : public BaseInFlightRequest,
public ProduceFinishCb,
public std::enable_shared_from_this<ProduceRequestHolder> {
public:
ProduceRequestHolder(AbstractRequestListener& filter, UpstreamKafkaFacade& kafka_facade,
const std::shared_ptr<Request<ProduceRequest>> request);

// Visible for testing.
ProduceRequestHolder(AbstractRequestListener& filter, UpstreamKafkaFacade& kafka_facade,
const RecordExtractor& record_extractor,
const std::shared_ptr<Request<ProduceRequest>> request);

// AbstractInFlightRequest
void startProcessing() override;

// AbstractInFlightRequest
bool finished() const override;

// AbstractInFlightRequest
AbstractResponseSharedPtr computeAnswer() const override;

// ProduceFinishCb
bool accept(const DeliveryMemento& memento) override;

private:
// Access to Kafka producers pointing to upstream Kafka clusters.
UpstreamKafkaFacade& kafka_facade_;

// Original request.
const std::shared_ptr<Request<ProduceRequest>> request_;

// How many responses from Kafka Producer handling our request we still expect.
// This value decreases to 0 as we get confirmations from Kafka (successful or not).
int expected_responses_;

// Real records extracted out of request.
std::vector<OutboundRecord> outbound_records_;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <string>

#include "absl/strings/string_view.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// Binds a single inbound record from Kafka client with its delivery information.
struct OutboundRecord {

// These fields were received from downstream.
const std::string topic_;
const int32_t partition_;
const absl::string_view key_;
const absl::string_view value_;

// These fields will get updated when delivery to upstream Kafka cluster finishes.
int16_t error_code_;
uint32_t saved_offset_;

OutboundRecord(const std::string& topic, const int32_t partition, const absl::string_view key,
const absl::string_view value)
: topic_{topic}, partition_{partition}, key_{key}, value_{value}, error_code_{0},
saved_offset_{0} {};
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_record_extractor.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

std::vector<OutboundRecord>
PlaceholderRecordExtractor::extractRecords(const std::vector<TopicProduceData>&) const {
return {};
}

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include "contrib/kafka/filters/network/source/external/requests.h"
#include "contrib/kafka/filters/network/source/mesh/command_handlers/produce_outbound_record.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

/**
* Dependency injection class responsible for extracting records out of produce request's contents.
*/
class RecordExtractor {
public:
virtual ~RecordExtractor() = default;

virtual std::vector<OutboundRecord>
extractRecords(const std::vector<TopicProduceData>& data) const PURE;
};

/**
* Just a placeholder for now.
*/
class PlaceholderRecordExtractor : public RecordExtractor {
public:
std::vector<OutboundRecord>
extractRecords(const std::vector<TopicProduceData>& data) const override;
};

} // namespace Mesh
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
5 changes: 3 additions & 2 deletions contrib/kafka/filters/network/source/mesh/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

KafkaMeshFilter::KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration)
KafkaMeshFilter::KafkaMeshFilter(const UpstreamKafkaConfiguration& configuration,
UpstreamKafkaFacade& upstream_kafka_facade)
: KafkaMeshFilter{std::make_shared<RequestDecoder>(std::vector<RequestCallbackSharedPtr>(
{std::make_shared<RequestProcessor>(*this, configuration)}))} {}
{std::make_shared<RequestProcessor>(*this, configuration, upstream_kafka_facade)}))} {}

KafkaMeshFilter::KafkaMeshFilter(RequestDecoderSharedPtr request_decoder)
: request_decoder_{request_decoder} {}
Expand Down
Loading