Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions bazel/foreign_cc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,32 @@ cc_library(
],
)

# Kafka client dependency used by Kafka-mesh filter.
# librdkafka build generates extra headers that need to be copied into source to get it to compile.
configure_make(
name = "librdkafka_build",
configure_in_place = True,
configure_options = ["--disable-ssl --disable-gssapi --disable-lz4-ext --disable-zstd && cp Makefile.config src/.. && cp config.h src/.."],
lib_source = "@edenhill_librdkafka//:all",
make_commands = [
"make ARFLAGS='' libs install-subdirs",
],
static_libraries = [
"librdkafka.a",
"librdkafka++.a",
],
tags = ["skip_on_windows"],
alwayslink = True,
)

cc_library(
name = "librdkafka",
tags = ["skip_on_windows"],
deps = [
"librdkafka_build",
],
)

configure_make(
name = "luajit",
configure_command = "build.py",
Expand Down
11 changes: 11 additions & 0 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,17 @@ filegroup(
patches = ["@envoy//bazel/external:kafka_int32.patch"],
)

# This archive provides Kafka C/CPP client used by mesh filter to communicate with upstream
# Kafka clusters.
external_http_archive(
name = "edenhill_librdkafka",
build_file_content = BUILD_ALL_CONTENT,
)
native.bind(
name = "librdkafka",
actual = "@envoy//bazel/foreign_cc:librdkafka",
)

# This archive provides Kafka (and Zookeeper) binaries, that are used during Kafka integration
# tests.
external_http_archive(
Expand Down
13 changes: 13 additions & 0 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,19 @@ REPOSITORY_LOCATIONS_SPEC = dict(
release_date = "2020-03-03",
cpe = "cpe:2.3:a:apache:kafka:*",
),
edenhill_librdkafka = dict(
project_name = "Kafka (C/C++ client)",
project_desc = "C/C++ client for Apache Kafka (open-source distributed event streaming platform)",
project_url = "https://github.com/edenhill/librdkafka",
version = "1.7.0",
sha256 = "c71b8c5ff419da80c31bb8d3036a408c87ad523e0c7588e7660ee5f3c8973057",
strip_prefix = "librdkafka-{version}",
urls = ["https://github.com/edenhill/librdkafka/archive/v{version}.tar.gz"],
use_category = ["dataplane_ext"],
extensions = ["envoy.filters.network.kafka_broker"],
release_date = "2021-05-10",
cpe = "N/A",
),
kafka_server_binary = dict(
project_name = "Kafka (server binary)",
project_desc = "Open-source distributed event streaming platform",
Expand Down
3 changes: 3 additions & 0 deletions contrib/kafka/filters/network/source/mesh/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load(
"envoy_cc_library",
"envoy_contrib_package",
)
load("//bazel:envoy_internal.bzl", "envoy_external_dep_path")

licenses(["notice"]) # Apache 2

Expand All @@ -21,6 +22,7 @@ envoy_cc_library(
":abstract_command_lib",
":request_processor_lib",
":upstream_config_lib",
":upstream_kafka_facade_lib",
"//contrib/kafka/filters/network/source:kafka_request_codec_lib",
"//contrib/kafka/filters/network/source:kafka_response_codec_lib",
"//envoy/buffer:buffer_interface",
Expand Down Expand Up @@ -112,6 +114,7 @@ envoy_cc_library(
":upstream_kafka_client_lib",
"//envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
envoy_external_dep_path("librdkafka"),
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,37 @@ namespace Mesh {
* Kafka 'Produce' request, that is aimed at particular cluster.
* A single Produce request coming from downstream can map into multiple entries,
* as the topics can be hosted on different clusters.
*
* These requests stored in 2 places: this filter (request's origin) and in RichKafkaProducer
* instances (to match pure-Kafka confirmations to the requests).
*
* +--------------+
* |<<librdkafka |
* |notification>>+--------+
* +-+------------+ |
* | |
* |<notifies> |
* | |
* +---------------+ +-v---------------+ |
* |KafkaMeshFilter+--+ +--+RichKafkaProducer| |
* +-^-------------+ | | +-----------------+ |
* | | | |
* | <in-flight>| |<requests-waiting |
* | | |for-delivery> |
* | | | <matches>|
* | +--------v-v---------+ |
* +-------+ProduceRequestHolder|----------+ |
* <notifies- +---------+----------+<contains>| |
* when-finished> | | |
* +---------v----------+ | |
* |PartitionProduceData| | |
* +---------^----------+ | |
* |<absl::string_view> | |
* +-----------------+----------------+ | |
* | | | | |
* +-----+--------+ +------+-------+ +------+----v--+ |
* |OutboundRecord| |OutboundRecord| |OutboundRecord<--+
* +--------------+ +--------------+ +--------------+
*/
class ProduceRequestHolder : public BaseInFlightRequest,
public ProduceFinishCb,
Expand Down
20 changes: 9 additions & 11 deletions contrib/kafka/filters/network/source/mesh/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Mesh {
* Decoded request bytes are passed to processor, that calls us back with enriched request.
* Request then gets invoked to starts its processing.
* Filter is going to maintain a list of in-flight-request so it can send responses when they
*finish.
* finish.
*
*
* +----------------+ <creates> +-----------------------+
Expand All @@ -37,15 +37,15 @@ namespace Mesh {
* +-------+-------+ |
* | |
* | |
* +-------v-----------+ | <uses>
* |UpstreamKafkaFacade| |
* +-------v-----------+ |<in-flight-reference>
* |UpstreamKafkaFacade| |(for callback when finished)
* +-------+-----------+ |
* | |
* | |
* +-------v--------------+ +--------------v---------+
* |<<ThreadLocalObject>> +------->PlaceholderKafkaProducer+
* |ThreadLocalKafkaFacade| +------------------------+
* +----------------------+
* +-------v--------------+ +--------------v--+ +-----------------+
* |<<ThreadLocalObject>> +------->RichKafkaProducer+--->><<librdkafka>> |
* |ThreadLocalKafkaFacade| +-----------------+ |RdKafka::Producer|
* +----------------------+ +-----------------+
**/
class KafkaMeshFilter : public Network::ReadFilter,
public Network::ConnectionCallbacks,
Expand Down Expand Up @@ -80,10 +80,8 @@ class KafkaMeshFilter : public Network::ReadFilter,

private:
// Helper method invoked when connection gets dropped.
// Request references are going to be stored in 2 places: this filter (request's origin) and in
// UpstreamKafkaClient instances (to match pure-Kafka confirmations to the requests). Because
// filter can be destroyed before confirmations from Kafka are received, we are just going to mark
// related requests as abandoned, so they do not attempt to reference this filter anymore.
// Because filter can be destroyed before confirmations from Kafka are received, we are just going
// to mark related requests as abandoned, so they do not attempt to reference this filter anymore.
// Impl note: this is similar to what Redis filter does.
void abandonAllInFlightRequests();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,18 @@ class KafkaProducer {
public:
virtual ~KafkaProducer() = default;

// Sends given record (key, value) to Kafka (topic, partition).
// When delivery is finished, it notifies the callback provided with corresponding delivery data
// (error code, offset).
/*
* Sends given record (key, value) to Kafka (topic, partition).
* When delivery is finished, it notifies the callback provided with corresponding delivery data
* (error code, offset).
*
* @param origin origin of payload to be notified when delivery finishes.
* @param topic Kafka topic.
* @param partition Kafka partition (as clients do partitioning, we just reuse what downstream
* gave us).
* @param key Kafka message key.
* @param value Kafka message value.
*/
virtual void send(const ProduceFinishCbSharedPtr origin, const std::string& topic,
const int32_t partition, const absl::string_view key,
const absl::string_view value) PURE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,144 @@ namespace NetworkFilters {
namespace Kafka {
namespace Mesh {

// Just a placeholder implementation.
class LibRdKafkaUtilsImpl : public LibRdKafkaUtils {

PlaceholderKafkaProducer::PlaceholderKafkaProducer(Event::Dispatcher&, Thread::ThreadFactory&,
const RawKafkaProducerConfig&){};
// LibRdKafkaUtils
RdKafka::Conf::ConfResult setConfProperty(RdKafka::Conf& conf, const std::string& name,
const std::string& value,
std::string& errstr) const override {
return conf.set(name, value, errstr);
}

void PlaceholderKafkaProducer::send(const ProduceFinishCbSharedPtr, const std::string&,
const int32_t, const absl::string_view,
const absl::string_view){};
// LibRdKafkaUtils
RdKafka::Conf::ConfResult setConfDeliveryCallback(RdKafka::Conf& conf,
RdKafka::DeliveryReportCb* dr_cb,
std::string& errstr) const override {
return conf.set("dr_cb", dr_cb, errstr);
}

void PlaceholderKafkaProducer::markFinished(){};
// LibRdKafkaUtils
std::unique_ptr<RdKafka::Producer> createProducer(RdKafka::Conf* conf,
std::string& errstr) const override {
return std::unique_ptr<RdKafka::Producer>(RdKafka::Producer::create(conf, errstr));
}
};

RichKafkaProducer::RichKafkaProducer(Event::Dispatcher& dispatcher,
Thread::ThreadFactory& thread_factory,
const RawKafkaProducerConfig& configuration)
: RichKafkaProducer(dispatcher, thread_factory, configuration, LibRdKafkaUtilsImpl{}){};

RichKafkaProducer::RichKafkaProducer(Event::Dispatcher& dispatcher,
Thread::ThreadFactory& thread_factory,
const RawKafkaProducerConfig& configuration,
const LibRdKafkaUtils& utils)
: dispatcher_{dispatcher} {

// Create producer configuration object.
std::unique_ptr<RdKafka::Conf> conf =
std::unique_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::string errstr;

// Setup producer custom properties.
for (const auto& e : configuration) {
if (utils.setConfProperty(*conf, e.first, e.second, errstr) != RdKafka::Conf::CONF_OK) {
throw EnvoyException(absl::StrCat("Could not set producer property [", e.first, "] to [",
e.second, "]:", errstr));
}
}

// Setup callback (this callback is going to be invoked in dedicated monitoring thread).
if (utils.setConfDeliveryCallback(*conf, this, errstr) != RdKafka::Conf::CONF_OK) {
throw EnvoyException(absl::StrCat("Could not set producer callback:", errstr));
}

// Finally, we create the producer.
producer_ = utils.createProducer(conf.get(), errstr);
if (!producer_) {
throw EnvoyException(absl::StrCat("Could not create producer:", errstr));
}

// Start the monitoring thread.
poller_thread_active_ = true;
std::function<void()> thread_routine = [this]() -> void { checkDeliveryReports(); };
poller_thread_ = thread_factory.createThread(thread_routine);
}

RichKafkaProducer::~RichKafkaProducer() {
ENVOY_LOG(debug, "Shutting down worker thread");
poller_thread_active_ = false; // This should never be needed, as we call 'markFinished' earlier.
poller_thread_->join();
ENVOY_LOG(debug, "Worker thread shut down successfully");
}

void RichKafkaProducer::markFinished() { poller_thread_active_ = false; }

void RichKafkaProducer::send(const ProduceFinishCbSharedPtr origin, const std::string& topic,
const int32_t partition, const absl::string_view key,
const absl::string_view value) {
{
void* value_data = const_cast<char*>(value.data()); // Needed for Kafka API.
// Data is a pointer into request internals, and it is going to be managed by
// ProduceRequestHolder lifecycle. So we are not going to use any of librdkafka's memory
// management.
const int flags = 0;
const RdKafka::ErrorCode ec = producer_->produce(
topic, partition, flags, value_data, value.size(), key.data(), key.size(), 0, nullptr);
if (RdKafka::ERR_NO_ERROR == ec) {
// We have succeeded with submitting data to producer, so we register a callback.
unfinished_produce_requests_.push_back(origin);
} else {
// We could not submit data to producer.
// Let's treat that as a normal failure (Envoy is a broker after all) and propagate
// downstream.
ENVOY_LOG(trace, "Produce failure: {}, while sending to [{}/{}]", ec, topic, partition);
const DeliveryMemento memento = {value_data, ec, 0};
origin->accept(memento);
}
}
}

void RichKafkaProducer::checkDeliveryReports() {
while (poller_thread_active_) {
// We are going to wait for 1000ms, returning when an event (message delivery) happens or
// producer is closed. Unfortunately we do not have any ability to interrupt this call, so every
// destructor is going to take up to this much time.
producer_->poll(1000);
// This invokes the callback below, if any delivery finished (successful or not).
}
ENVOY_LOG(debug, "Poller thread finished");
}

// Kafka callback that contains the delivery information.
void RichKafkaProducer::dr_cb(RdKafka::Message& message) {
ENVOY_LOG(trace, "Delivery finished: {}, payload has been saved at offset {} in {}/{}",
message.err(), message.topic_name(), message.partition(), message.offset());
const DeliveryMemento memento = {message.payload(), message.err(), message.offset()};
// Because this method gets executed in poller thread, we need to pass the data through
// dispatcher.
const Event::PostCb callback = [this, memento]() -> void { processDelivery(memento); };
dispatcher_.post(callback);
}

// We got the delivery data.
// Now we just check all unfinished requests, find the one that originated this particular delivery,
// and notify it.
void RichKafkaProducer::processDelivery(const DeliveryMemento& memento) {
for (auto it = unfinished_produce_requests_.begin(); it != unfinished_produce_requests_.end();) {
bool accepted = (*it)->accept(memento);
if (accepted) {
unfinished_produce_requests_.erase(it);
break; // This is important - a single request can be mapped into multiple callbacks here.
} else {
++it;
}
}
}

std::list<ProduceFinishCbSharedPtr>& RichKafkaProducer::getUnfinishedRequestsForTest() {
return unfinished_produce_requests_;
}

} // namespace Mesh
} // namespace Kafka
Expand Down
Loading