diff --git a/contrib/kafka/filters/network/source/mesh/BUILD b/contrib/kafka/filters/network/source/mesh/BUILD index e43c9ca505928..2745f13fb1f73 100644 --- a/contrib/kafka/filters/network/source/mesh/BUILD +++ b/contrib/kafka/filters/network/source/mesh/BUILD @@ -63,6 +63,52 @@ envoy_cc_library( deps = [ "//contrib/kafka/filters/network/source:kafka_response_lib", "//contrib/kafka/filters/network/source:tagged_fields_lib", + ], +) + +envoy_cc_library( + name = "upstream_kafka_facade_lib", + srcs = [ + "upstream_kafka_facade.cc", + ], + hdrs = [ + "upstream_kafka_facade.h", + ], + tags = ["skip_on_windows"], + deps = [ + ":upstream_config_lib", + ":upstream_kafka_client_impl_lib", + ":upstream_kafka_client_lib", + "//envoy/thread:thread_interface", + "//envoy/thread_local:thread_local_interface", + "//source/common/common:minimal_logger_lib", + ], +) + +envoy_cc_library( + name = "upstream_kafka_client_lib", + srcs = [ + ], + hdrs = [ + "upstream_kafka_client.h", + ], + tags = ["skip_on_windows"], + deps = [ + ], +) + +envoy_cc_library( + name = "upstream_kafka_client_impl_lib", + srcs = [ + "upstream_kafka_client_impl.cc", + ], + hdrs = [ + "upstream_kafka_client_impl.h", + ], + tags = ["skip_on_windows"], + deps = [ + ":upstream_kafka_client_lib", + "//envoy/event:dispatcher_interface", "//source/common/common:minimal_logger_lib", ], ) diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h new file mode 100644 index 0000000000000..89df2862aff21 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include + +#include "envoy/common/pure.h" + +#include "absl/strings/string_view.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +// Trivial memento that keeps the information about how given request was delivered: +// in case of success this means offset (if acks > 0), or error code. +struct DeliveryMemento { + + // Kafka producer error code. + const int32_t error_code_; + + // Offset (only meaningful if error code is equal to 0). + const int64_t offset_; +}; + +// Callback for objects that want to be notified that record delivery has been finished. +class ProduceFinishCb { +public: + virtual ~ProduceFinishCb() = default; + + // Attempt to process this delivery. + // @returns true if given callback is related to this delivery + virtual bool accept(const DeliveryMemento& memento) PURE; +}; + +using ProduceFinishCbSharedPtr = std::shared_ptr; + +/** + * Filter facing interface. + * A thing that takes records and sends them to upstream Kafka. + */ +class KafkaProducer { +public: + virtual ~KafkaProducer() = default; + + // Sends given record (key, value) to Kafka (topic, partition). + // When delivery is finished, it notifies the callback provided with corresponding delivery data + // (error code, offset). + virtual void send(const ProduceFinishCbSharedPtr origin, const std::string& topic, + const int32_t partition, const absl::string_view key, + const absl::string_view value) PURE; + + // Impl leakage: real implementations of Kafka Producer need to stop a monitoring thread, then + // they can close the producer. Because the polling thread should not be interrupted, we just mark + // it as finished, and it's going to notice that change on the next iteration. + // Theoretically we do not need to do this and leave it all to destructor, but then closing N + // producers would require doing that in sequence, while we can optimize it somewhat (so we just + // wait for the slowest one). + // See https://github.com/edenhill/librdkafka/issues/2972 + virtual void markFinished() PURE; +}; + +using KafkaProducerPtr = std::unique_ptr; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.cc b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.cc new file mode 100644 index 0000000000000..7cba96352cc35 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.cc @@ -0,0 +1,24 @@ +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +// Just a placeholder implementation. + +PlaceholderKafkaProducer::PlaceholderKafkaProducer(Event::Dispatcher&, Thread::ThreadFactory&, + const RawKafkaProducerConfig&){}; + +void PlaceholderKafkaProducer::send(const ProduceFinishCbSharedPtr, const std::string&, + const int32_t, const absl::string_view, + const absl::string_view){}; + +void PlaceholderKafkaProducer::markFinished(){}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h new file mode 100644 index 0000000000000..58ca39b9a3c7b --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h @@ -0,0 +1,36 @@ +#pragma once + +#include "envoy/event/dispatcher.h" + +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +using RawKafkaProducerConfig = std::map; + +// Placeholder for proper Kafka Producer object. +// It will also keep a reference to a dedicated thread (that's why we need a factory) that's going +// to be polling for delivery notifications. +class PlaceholderKafkaProducer : public KafkaProducer { +public: + PlaceholderKafkaProducer(Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory, + const RawKafkaProducerConfig& configuration); + + // KafkaProducer + void send(const ProduceFinishCbSharedPtr origin, const std::string& topic, + const int32_t partition, const absl::string_view key, + const absl::string_view value) override; + + // KafkaProducer + void markFinished() override; +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc b/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc new file mode 100644 index 0000000000000..ac04837347fe1 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc @@ -0,0 +1,100 @@ +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h" + +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +/** + * Responsible for keeping a map of upstream-facing Kafka clients. + */ +class ThreadLocalKafkaFacade : public ThreadLocal::ThreadLocalObject, + private Logger::Loggable { +public: + ThreadLocalKafkaFacade(const UpstreamKafkaConfiguration& configuration, + Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory); + ~ThreadLocalKafkaFacade() override; + + KafkaProducer& getProducerForTopic(const std::string& topic); + + size_t getProducerCountForTest() const; + +private: + // Mutates 'cluster_to_kafka_client_'. + KafkaProducer& registerNewProducer(const ClusterConfig& cluster_config); + + const UpstreamKafkaConfiguration& configuration_; + Event::Dispatcher& dispatcher_; + Thread::ThreadFactory& thread_factory_; + + std::map cluster_to_kafka_client_; +}; + +ThreadLocalKafkaFacade::ThreadLocalKafkaFacade(const UpstreamKafkaConfiguration& configuration, + Event::Dispatcher& dispatcher, + Thread::ThreadFactory& thread_factory) + : configuration_{configuration}, dispatcher_{dispatcher}, thread_factory_{thread_factory} {} + +ThreadLocalKafkaFacade::~ThreadLocalKafkaFacade() { + // Because the producers take a moment to shutdown, we mark their monitoring threads as shut down + // before the destructors get called. + for (auto& entry : cluster_to_kafka_client_) { + entry.second->markFinished(); + } +} + +KafkaProducer& ThreadLocalKafkaFacade::getProducerForTopic(const std::string& topic) { + const absl::optional cluster_config = + configuration_.computeClusterConfigForTopic(topic); + if (cluster_config) { + const auto it = cluster_to_kafka_client_.find(cluster_config->name_); + // Return client already present or create new one and register it. + return (cluster_to_kafka_client_.end() == it) ? registerNewProducer(*cluster_config) + : *(it->second); + } else { + throw EnvoyException(absl::StrCat("cannot compute target producer for topic: ", topic)); + } +} + +KafkaProducer& ThreadLocalKafkaFacade::registerNewProducer(const ClusterConfig& cluster_config) { + ENVOY_LOG(debug, "Registering new Kafka producer for cluster [{}]", cluster_config.name_); + KafkaProducerPtr new_producer = std::make_unique( + dispatcher_, thread_factory_, cluster_config.upstream_producer_properties_); + auto result = cluster_to_kafka_client_.emplace(cluster_config.name_, std::move(new_producer)); + return *(result.first->second); +} + +size_t ThreadLocalKafkaFacade::getProducerCountForTest() const { + return cluster_to_kafka_client_.size(); +} + +UpstreamKafkaFacadeImpl::UpstreamKafkaFacadeImpl(const UpstreamKafkaConfiguration& configuration, + ThreadLocal::SlotAllocator& slot_allocator, + Thread::ThreadFactory& thread_factory) + : tls_{slot_allocator.allocateSlot()} { + + ThreadLocal::Slot::InitializeCb cb = + [&configuration, + &thread_factory](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { + return std::make_shared(configuration, dispatcher, thread_factory); + }; + tls_->set(cb); +} + +// Return Producer instance that is local to given thread, via ThreadLocalKafkaFacade. +KafkaProducer& UpstreamKafkaFacadeImpl::getProducerForTopic(const std::string& topic) { + return tls_->getTyped().getProducerForTopic(topic); +} + +size_t UpstreamKafkaFacadeImpl::getProducerCountForTest() const { + return tls_->getTyped().getProducerCountForTest(); +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h b/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h new file mode 100644 index 0000000000000..9cf69aa5f2241 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h @@ -0,0 +1,58 @@ +#pragma once + +#include "envoy/thread/thread.h" +#include "envoy/thread_local/thread_local.h" + +#include "source/common/common/logger.h" + +#include "contrib/kafka/filters/network/source/mesh/upstream_config.h" +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +/** + * Provides access to upstream Kafka clients. + */ +class UpstreamKafkaFacade { +public: + virtual ~UpstreamKafkaFacade() = default; + + /** + * Returns a Kafka producer that points an upstream Kafka cluster that is supposed to receive + * messages for the given topic. + */ + virtual KafkaProducer& getProducerForTopic(const std::string& topic) PURE; +}; + +using UpstreamKafkaFacadeSharedPtr = std::shared_ptr; + +/** + * Provides access to upstream Kafka clients. + * This is done by using thread-local maps of cluster to producer. + * We are going to have one Kafka producer per upstream cluster, per Envoy worker thread. + */ +class UpstreamKafkaFacadeImpl : public UpstreamKafkaFacade, + private Logger::Loggable { +public: + UpstreamKafkaFacadeImpl(const UpstreamKafkaConfiguration& configuration, + ThreadLocal::SlotAllocator& slot_allocator, + Thread::ThreadFactory& thread_factory); + + // UpstreamKafkaFacade + KafkaProducer& getProducerForTopic(const std::string& topic) override; + + size_t getProducerCountForTest() const; + +private: + ThreadLocal::SlotPtr tls_; +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/test/mesh/BUILD b/contrib/kafka/filters/network/test/mesh/BUILD index afef5fbcff575..e384302637063 100644 --- a/contrib/kafka/filters/network/test/mesh/BUILD +++ b/contrib/kafka/filters/network/test/mesh/BUILD @@ -27,6 +27,17 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "upstream_kafka_facade_unit_test", + srcs = ["upstream_kafka_facade_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh:upstream_kafka_facade_lib", + "//test/mocks/thread_local:thread_local_mocks", + "//test/test_common:thread_factory_for_test_lib", + ], +) + envoy_cc_test( name = "abstract_command_unit_test", srcs = ["abstract_command_unit_test.cc"], diff --git a/contrib/kafka/filters/network/test/mesh/upstream_kafka_facade_unit_test.cc b/contrib/kafka/filters/network/test/mesh/upstream_kafka_facade_unit_test.cc new file mode 100644 index 0000000000000..d4a0497c21476 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/upstream_kafka_facade_unit_test.cc @@ -0,0 +1,107 @@ +#include "envoy/thread/thread.h" +#include "envoy/thread_local/thread_local.h" + +#include "test/mocks/thread_local/mocks.h" +#include "test/test_common/thread_factory_for_test.h" + +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::Return; + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { +namespace { + +class MockUpstreamKafkaConfiguration : public UpstreamKafkaConfiguration { +public: + MOCK_METHOD(absl::optional, computeClusterConfigForTopic, (const std::string&), + (const)); + MOCK_METHOD((std::pair), getAdvertisedAddress, (), (const)); +}; + +class MockThreadFactory : public Thread::ThreadFactory { +public: + MOCK_METHOD(Thread::ThreadPtr, createThread, (std::function, Thread::OptionsOptConstRef)); + MOCK_METHOD(Thread::ThreadId, currentThreadId, ()); +}; + +TEST(UpstreamKafkaFacadeTest, shouldCreateProducerOnlyOnceForTheSameCluster) { + // given + const std::string topic1 = "topic1"; + const std::string topic2 = "topic2"; + + MockUpstreamKafkaConfiguration configuration; + const ClusterConfig cluster_config = {"cluster", 1, {{"bootstrap.servers", "localhost:9092"}}}; + EXPECT_CALL(configuration, computeClusterConfigForTopic(topic1)).WillOnce(Return(cluster_config)); + EXPECT_CALL(configuration, computeClusterConfigForTopic(topic2)).WillOnce(Return(cluster_config)); + ThreadLocal::MockInstance slot_allocator; + EXPECT_CALL(slot_allocator, allocateSlot()) + .WillOnce(Invoke(&slot_allocator, &ThreadLocal::MockInstance::allocateSlotMock)); + Thread::ThreadFactory& thread_factory = Thread::threadFactoryForTest(); + UpstreamKafkaFacadeImpl testee = {configuration, slot_allocator, thread_factory}; + + // when + auto& result1 = testee.getProducerForTopic(topic1); + auto& result2 = testee.getProducerForTopic(topic2); + + // then + EXPECT_EQ(&result1, &result2); + EXPECT_EQ(testee.getProducerCountForTest(), 1); +} + +TEST(UpstreamKafkaFacadeTest, shouldCreateDifferentProducersForDifferentClusters) { + // given + const std::string topic1 = "topic1"; + const std::string topic2 = "topic2"; + + MockUpstreamKafkaConfiguration configuration; + // Notice it's the cluster name that matters, not the producer config. + const ClusterConfig cluster_config1 = {"cluster1", 1, {{"bootstrap.servers", "localhost:9092"}}}; + EXPECT_CALL(configuration, computeClusterConfigForTopic(topic1)) + .WillOnce(Return(cluster_config1)); + const ClusterConfig cluster_config2 = {"cluster2", 1, {{"bootstrap.servers", "localhost:9092"}}}; + EXPECT_CALL(configuration, computeClusterConfigForTopic(topic2)) + .WillOnce(Return(cluster_config2)); + ThreadLocal::MockInstance slot_allocator; + EXPECT_CALL(slot_allocator, allocateSlot()) + .WillOnce(Invoke(&slot_allocator, &ThreadLocal::MockInstance::allocateSlotMock)); + Thread::ThreadFactory& thread_factory = Thread::threadFactoryForTest(); + UpstreamKafkaFacadeImpl testee = {configuration, slot_allocator, thread_factory}; + + // when + auto& result1 = testee.getProducerForTopic(topic1); + auto& result2 = testee.getProducerForTopic(topic2); + + // then + EXPECT_NE(&result1, &result2); + EXPECT_EQ(testee.getProducerCountForTest(), 2); +} + +TEST(UpstreamKafkaFacadeTest, shouldThrowIfThereIsNoConfigurationForGivenTopic) { + // given + const std::string topic = "topic1"; + + MockUpstreamKafkaConfiguration configuration; + const ClusterConfig cluster_config = {"cluster", 1, {{"bootstrap.servers", "localhost:9092"}}}; + EXPECT_CALL(configuration, computeClusterConfigForTopic(topic)).WillOnce(Return(absl::nullopt)); + ThreadLocal::MockInstance slot_allocator; + EXPECT_CALL(slot_allocator, allocateSlot()) + .WillOnce(Invoke(&slot_allocator, &ThreadLocal::MockInstance::allocateSlotMock)); + Thread::ThreadFactory& thread_factory = Thread::threadFactoryForTest(); + UpstreamKafkaFacadeImpl testee = {configuration, slot_allocator, thread_factory}; + + // when, then - exception gets thrown. + EXPECT_THROW(testee.getProducerForTopic(topic), EnvoyException); +} + +} // namespace +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy