diff --git a/bazel/foreign_cc/BUILD b/bazel/foreign_cc/BUILD index 0a831d81e2665..0a34e16b0e58d 100644 --- a/bazel/foreign_cc/BUILD +++ b/bazel/foreign_cc/BUILD @@ -37,6 +37,32 @@ cc_library( ], ) +# Kafka client dependency used by Kafka-mesh filter. +# librdkafka build generates extra headers that need to be copied into source to get it to compile. +configure_make( + name = "librdkafka_build", + configure_in_place = True, + configure_options = ["--disable-ssl --disable-gssapi --disable-lz4-ext --disable-zstd && cp Makefile.config src/.. && cp config.h src/.."], + lib_source = "@edenhill_librdkafka//:all", + make_commands = [ + "make ARFLAGS='' libs install-subdirs", + ], + static_libraries = [ + "librdkafka.a", + "librdkafka++.a", + ], + tags = ["skip_on_windows"], + alwayslink = True, +) + +cc_library( + name = "librdkafka", + tags = ["skip_on_windows"], + deps = [ + "librdkafka_build", + ], +) + configure_make( name = "luajit", configure_command = "build.py", diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index c5db74d568cb6..a411c1313638b 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -1010,6 +1010,17 @@ filegroup( patches = ["@envoy//bazel/external:kafka_int32.patch"], ) + # This archive provides Kafka C/CPP client used by mesh filter to communicate with upstream + # Kafka clusters. + external_http_archive( + name = "edenhill_librdkafka", + build_file_content = BUILD_ALL_CONTENT, + ) + native.bind( + name = "librdkafka", + actual = "@envoy//bazel/foreign_cc:librdkafka", + ) + # This archive provides Kafka (and Zookeeper) binaries, that are used during Kafka integration # tests. external_http_archive( diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index ce780b1112bda..822e0dfa9fde5 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -926,6 +926,19 @@ REPOSITORY_LOCATIONS_SPEC = dict( release_date = "2020-03-03", cpe = "cpe:2.3:a:apache:kafka:*", ), + edenhill_librdkafka = dict( + project_name = "Kafka (C/C++ client)", + project_desc = "C/C++ client for Apache Kafka (open-source distributed event streaming platform)", + project_url = "https://github.com/edenhill/librdkafka", + version = "1.7.0", + sha256 = "c71b8c5ff419da80c31bb8d3036a408c87ad523e0c7588e7660ee5f3c8973057", + strip_prefix = "librdkafka-{version}", + urls = ["https://github.com/edenhill/librdkafka/archive/v{version}.tar.gz"], + use_category = ["dataplane_ext"], + extensions = ["envoy.filters.network.kafka_broker"], + release_date = "2021-05-10", + cpe = "N/A", + ), kafka_server_binary = dict( project_name = "Kafka (server binary)", project_desc = "Open-source distributed event streaming platform", diff --git a/contrib/kafka/filters/network/source/mesh/BUILD b/contrib/kafka/filters/network/source/mesh/BUILD index 44e69ca05d358..fe24168a884b0 100644 --- a/contrib/kafka/filters/network/source/mesh/BUILD +++ b/contrib/kafka/filters/network/source/mesh/BUILD @@ -3,6 +3,7 @@ load( "envoy_cc_library", "envoy_contrib_package", ) +load("//bazel:envoy_internal.bzl", "envoy_external_dep_path") licenses(["notice"]) # Apache 2 @@ -21,6 +22,7 @@ envoy_cc_library( ":abstract_command_lib", ":request_processor_lib", ":upstream_config_lib", + ":upstream_kafka_facade_lib", "//contrib/kafka/filters/network/source:kafka_request_codec_lib", "//contrib/kafka/filters/network/source:kafka_response_codec_lib", "//envoy/buffer:buffer_interface", @@ -112,6 +114,7 @@ envoy_cc_library( ":upstream_kafka_client_lib", "//envoy/event:dispatcher_interface", "//source/common/common:minimal_logger_lib", + envoy_external_dep_path("librdkafka"), ], ) diff --git a/contrib/kafka/filters/network/source/mesh/command_handlers/produce.h b/contrib/kafka/filters/network/source/mesh/command_handlers/produce.h index 1769d73f022b9..04781366ea90f 100644 --- a/contrib/kafka/filters/network/source/mesh/command_handlers/produce.h +++ b/contrib/kafka/filters/network/source/mesh/command_handlers/produce.h @@ -17,6 +17,37 @@ namespace Mesh { * Kafka 'Produce' request, that is aimed at particular cluster. * A single Produce request coming from downstream can map into multiple entries, * as the topics can be hosted on different clusters. + * + * These requests stored in 2 places: this filter (request's origin) and in RichKafkaProducer + * instances (to match pure-Kafka confirmations to the requests). + * + * +--------------+ + * |<>+--------+ + * +-+------------+ | + * | | + * | | + * | | + * +---------------+ +-v---------------+ | + * |KafkaMeshFilter+--+ +--+RichKafkaProducer| | + * +-^-------------+ | | +-----------------+ | + * | | | | + * | | | | + * | | | | + * | +--------v-v---------+ | + * +-------+ProduceRequestHolder|----------+ | + * | | + * when-finished> | | | + * +---------v----------+ | | + * |PartitionProduceData| | | + * +---------^----------+ | | + * | | | + * +-----------------+----------------+ | | + * | | | | | + * +-----+--------+ +------+-------+ +------+----v--+ | + * |OutboundRecord| |OutboundRecord| |OutboundRecord<--+ + * +--------------+ +--------------+ +--------------+ */ class ProduceRequestHolder : public BaseInFlightRequest, public ProduceFinishCb, diff --git a/contrib/kafka/filters/network/source/mesh/filter.h b/contrib/kafka/filters/network/source/mesh/filter.h index 384e915459d6c..a6b4ec80cdd4f 100644 --- a/contrib/kafka/filters/network/source/mesh/filter.h +++ b/contrib/kafka/filters/network/source/mesh/filter.h @@ -24,7 +24,7 @@ namespace Mesh { * Decoded request bytes are passed to processor, that calls us back with enriched request. * Request then gets invoked to starts its processing. * Filter is going to maintain a list of in-flight-request so it can send responses when they - *finish. + * finish. * * * +----------------+ +-----------------------+ @@ -37,15 +37,15 @@ namespace Mesh { * +-------+-------+ | * | | * | | - * +-------v-----------+ | - * |UpstreamKafkaFacade| | + * +-------v-----------+ | + * |UpstreamKafkaFacade| |(for callback when finished) * +-------+-----------+ | * | | * | | - * +-------v--------------+ +--------------v---------+ - * |<> +------->PlaceholderKafkaProducer+ - * |ThreadLocalKafkaFacade| +------------------------+ - * +----------------------+ + * +-------v--------------+ +--------------v--+ +-----------------+ + * |<> +------->RichKafkaProducer+--->><> | + * |ThreadLocalKafkaFacade| +-----------------+ |RdKafka::Producer| + * +----------------------+ +-----------------+ **/ class KafkaMeshFilter : public Network::ReadFilter, public Network::ConnectionCallbacks, @@ -80,10 +80,8 @@ class KafkaMeshFilter : public Network::ReadFilter, private: // Helper method invoked when connection gets dropped. - // Request references are going to be stored in 2 places: this filter (request's origin) and in - // UpstreamKafkaClient instances (to match pure-Kafka confirmations to the requests). Because - // filter can be destroyed before confirmations from Kafka are received, we are just going to mark - // related requests as abandoned, so they do not attempt to reference this filter anymore. + // Because filter can be destroyed before confirmations from Kafka are received, we are just going + // to mark related requests as abandoned, so they do not attempt to reference this filter anymore. // Impl note: this is similar to what Redis filter does. void abandonAllInFlightRequests(); diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h index cd7bf1236d0a5..24e9b36efdc65 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h @@ -51,9 +51,18 @@ class KafkaProducer { public: virtual ~KafkaProducer() = default; - // Sends given record (key, value) to Kafka (topic, partition). - // When delivery is finished, it notifies the callback provided with corresponding delivery data - // (error code, offset). + /* + * Sends given record (key, value) to Kafka (topic, partition). + * When delivery is finished, it notifies the callback provided with corresponding delivery data + * (error code, offset). + * + * @param origin origin of payload to be notified when delivery finishes. + * @param topic Kafka topic. + * @param partition Kafka partition (as clients do partitioning, we just reuse what downstream + * gave us). + * @param key Kafka message key. + * @param value Kafka message value. + */ virtual void send(const ProduceFinishCbSharedPtr origin, const std::string& topic, const int32_t partition, const absl::string_view key, const absl::string_view value) PURE; diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.cc b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.cc index 7cba96352cc35..fd43b61a2cf2b 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.cc +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.cc @@ -6,16 +6,144 @@ namespace NetworkFilters { namespace Kafka { namespace Mesh { -// Just a placeholder implementation. +class LibRdKafkaUtilsImpl : public LibRdKafkaUtils { -PlaceholderKafkaProducer::PlaceholderKafkaProducer(Event::Dispatcher&, Thread::ThreadFactory&, - const RawKafkaProducerConfig&){}; + // LibRdKafkaUtils + RdKafka::Conf::ConfResult setConfProperty(RdKafka::Conf& conf, const std::string& name, + const std::string& value, + std::string& errstr) const override { + return conf.set(name, value, errstr); + } -void PlaceholderKafkaProducer::send(const ProduceFinishCbSharedPtr, const std::string&, - const int32_t, const absl::string_view, - const absl::string_view){}; + // LibRdKafkaUtils + RdKafka::Conf::ConfResult setConfDeliveryCallback(RdKafka::Conf& conf, + RdKafka::DeliveryReportCb* dr_cb, + std::string& errstr) const override { + return conf.set("dr_cb", dr_cb, errstr); + } -void PlaceholderKafkaProducer::markFinished(){}; + // LibRdKafkaUtils + std::unique_ptr createProducer(RdKafka::Conf* conf, + std::string& errstr) const override { + return std::unique_ptr(RdKafka::Producer::create(conf, errstr)); + } +}; + +RichKafkaProducer::RichKafkaProducer(Event::Dispatcher& dispatcher, + Thread::ThreadFactory& thread_factory, + const RawKafkaProducerConfig& configuration) + : RichKafkaProducer(dispatcher, thread_factory, configuration, LibRdKafkaUtilsImpl{}){}; + +RichKafkaProducer::RichKafkaProducer(Event::Dispatcher& dispatcher, + Thread::ThreadFactory& thread_factory, + const RawKafkaProducerConfig& configuration, + const LibRdKafkaUtils& utils) + : dispatcher_{dispatcher} { + + // Create producer configuration object. + std::unique_ptr conf = + std::unique_ptr(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + std::string errstr; + + // Setup producer custom properties. + for (const auto& e : configuration) { + if (utils.setConfProperty(*conf, e.first, e.second, errstr) != RdKafka::Conf::CONF_OK) { + throw EnvoyException(absl::StrCat("Could not set producer property [", e.first, "] to [", + e.second, "]:", errstr)); + } + } + + // Setup callback (this callback is going to be invoked in dedicated monitoring thread). + if (utils.setConfDeliveryCallback(*conf, this, errstr) != RdKafka::Conf::CONF_OK) { + throw EnvoyException(absl::StrCat("Could not set producer callback:", errstr)); + } + + // Finally, we create the producer. + producer_ = utils.createProducer(conf.get(), errstr); + if (!producer_) { + throw EnvoyException(absl::StrCat("Could not create producer:", errstr)); + } + + // Start the monitoring thread. + poller_thread_active_ = true; + std::function thread_routine = [this]() -> void { checkDeliveryReports(); }; + poller_thread_ = thread_factory.createThread(thread_routine); +} + +RichKafkaProducer::~RichKafkaProducer() { + ENVOY_LOG(debug, "Shutting down worker thread"); + poller_thread_active_ = false; // This should never be needed, as we call 'markFinished' earlier. + poller_thread_->join(); + ENVOY_LOG(debug, "Worker thread shut down successfully"); +} + +void RichKafkaProducer::markFinished() { poller_thread_active_ = false; } + +void RichKafkaProducer::send(const ProduceFinishCbSharedPtr origin, const std::string& topic, + const int32_t partition, const absl::string_view key, + const absl::string_view value) { + { + void* value_data = const_cast(value.data()); // Needed for Kafka API. + // Data is a pointer into request internals, and it is going to be managed by + // ProduceRequestHolder lifecycle. So we are not going to use any of librdkafka's memory + // management. + const int flags = 0; + const RdKafka::ErrorCode ec = producer_->produce( + topic, partition, flags, value_data, value.size(), key.data(), key.size(), 0, nullptr); + if (RdKafka::ERR_NO_ERROR == ec) { + // We have succeeded with submitting data to producer, so we register a callback. + unfinished_produce_requests_.push_back(origin); + } else { + // We could not submit data to producer. + // Let's treat that as a normal failure (Envoy is a broker after all) and propagate + // downstream. + ENVOY_LOG(trace, "Produce failure: {}, while sending to [{}/{}]", ec, topic, partition); + const DeliveryMemento memento = {value_data, ec, 0}; + origin->accept(memento); + } + } +} + +void RichKafkaProducer::checkDeliveryReports() { + while (poller_thread_active_) { + // We are going to wait for 1000ms, returning when an event (message delivery) happens or + // producer is closed. Unfortunately we do not have any ability to interrupt this call, so every + // destructor is going to take up to this much time. + producer_->poll(1000); + // This invokes the callback below, if any delivery finished (successful or not). + } + ENVOY_LOG(debug, "Poller thread finished"); +} + +// Kafka callback that contains the delivery information. +void RichKafkaProducer::dr_cb(RdKafka::Message& message) { + ENVOY_LOG(trace, "Delivery finished: {}, payload has been saved at offset {} in {}/{}", + message.err(), message.topic_name(), message.partition(), message.offset()); + const DeliveryMemento memento = {message.payload(), message.err(), message.offset()}; + // Because this method gets executed in poller thread, we need to pass the data through + // dispatcher. + const Event::PostCb callback = [this, memento]() -> void { processDelivery(memento); }; + dispatcher_.post(callback); +} + +// We got the delivery data. +// Now we just check all unfinished requests, find the one that originated this particular delivery, +// and notify it. +void RichKafkaProducer::processDelivery(const DeliveryMemento& memento) { + for (auto it = unfinished_produce_requests_.begin(); it != unfinished_produce_requests_.end();) { + bool accepted = (*it)->accept(memento); + if (accepted) { + unfinished_produce_requests_.erase(it); + break; // This is important - a single request can be mapped into multiple callbacks here. + } else { + ++it; + } + } +} + +std::list& RichKafkaProducer::getUnfinishedRequestsForTest() { + return unfinished_produce_requests_; +} } // namespace Mesh } // namespace Kafka diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h index 58ca39b9a3c7b..82b3e549c2be3 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h @@ -1,8 +1,11 @@ #pragma once +#include + #include "envoy/event/dispatcher.h" #include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client.h" +#include "librdkafka/rdkafkacpp.h" namespace Envoy { namespace Extensions { @@ -10,25 +13,90 @@ namespace NetworkFilters { namespace Kafka { namespace Mesh { +/** + * Helper class responsible for creating librdkafka entities, so we can have mocks in tests. + */ +class LibRdKafkaUtils { +public: + virtual ~LibRdKafkaUtils() = default; + + virtual RdKafka::Conf::ConfResult setConfProperty(RdKafka::Conf& conf, const std::string& name, + const std::string& value, + std::string& errstr) const PURE; + + virtual RdKafka::Conf::ConfResult setConfDeliveryCallback(RdKafka::Conf& conf, + RdKafka::DeliveryReportCb* dr_cb, + std::string& errstr) const PURE; + + virtual std::unique_ptr createProducer(RdKafka::Conf* conf, + std::string& errstr) const PURE; +}; + using RawKafkaProducerConfig = std::map; -// Placeholder for proper Kafka Producer object. -// It will also keep a reference to a dedicated thread (that's why we need a factory) that's going -// to be polling for delivery notifications. -class PlaceholderKafkaProducer : public KafkaProducer { +/** + * Combines the librdkafka producer and its dedicated monitoring thread. + * Producer is used to schedule messages to be sent to Kafka. + * Independently running monitoring thread picks up delivery confirmations from producer and uses + * Dispatcher to notify itself about delivery in worker thread. + */ +class RichKafkaProducer : public KafkaProducer, + public RdKafka::DeliveryReportCb, + private Logger::Loggable { public: - PlaceholderKafkaProducer(Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory, - const RawKafkaProducerConfig& configuration); + // Main constructor. + RichKafkaProducer(Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory, + const RawKafkaProducerConfig& configuration); + + // Visible for testing (allows injection of LibRdKafkaUtils). + RichKafkaProducer(Event::Dispatcher& dispatcher, Thread::ThreadFactory& thread_factory, + const RawKafkaProducerConfig& configuration, const LibRdKafkaUtils& utils); + + // More complex than usual. + // Marks that monitoring thread should finish and waits for it to join. + ~RichKafkaProducer() override; + + // KafkaProducer + void markFinished() override; // KafkaProducer void send(const ProduceFinishCbSharedPtr origin, const std::string& topic, const int32_t partition, const absl::string_view key, const absl::string_view value) override; - // KafkaProducer - void markFinished() override; + // This method gets executed by monitoring thread. + // Does not finish until this object gets 'markFinished' invoked or gets destroyed. + // Executed in dedicated monitoring thread. + void checkDeliveryReports(); + + // RdKafka::DeliveryReportCb + void dr_cb(RdKafka::Message& message) override; + + // Processes the delivery confirmation. + // Executed in Envoy worker thread. + void processDelivery(const DeliveryMemento& memento); + + std::list& getUnfinishedRequestsForTest(); + +private: + Event::Dispatcher& dispatcher_; + + std::list unfinished_produce_requests_; + + // Real Kafka producer (thread-safe). + // Invoked by Envoy handler thread (to produce), and internal monitoring thread + // (to poll for delivery events). + std::unique_ptr producer_; + + // Flag controlling monitoring threads's execution. + std::atomic poller_thread_active_; + + // Monitoring thread that's responsible for continuously polling for new Kafka producer events. + Thread::ThreadPtr poller_thread_; }; +using RichKafkaProducerPtr = std::unique_ptr; + } // namespace Mesh } // namespace Kafka } // namespace NetworkFilters diff --git a/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc b/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc index ac04837347fe1..6d096a0c95255 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc +++ b/contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.cc @@ -61,7 +61,7 @@ KafkaProducer& ThreadLocalKafkaFacade::getProducerForTopic(const std::string& to KafkaProducer& ThreadLocalKafkaFacade::registerNewProducer(const ClusterConfig& cluster_config) { ENVOY_LOG(debug, "Registering new Kafka producer for cluster [{}]", cluster_config.name_); - KafkaProducerPtr new_producer = std::make_unique( + KafkaProducerPtr new_producer = std::make_unique( dispatcher_, thread_factory_, cluster_config.upstream_producer_properties_); auto result = cluster_to_kafka_client_.emplace(cluster_config.name_, std::move(new_producer)); return *(result.first->second); @@ -84,7 +84,7 @@ UpstreamKafkaFacadeImpl::UpstreamKafkaFacadeImpl(const UpstreamKafkaConfiguratio tls_->set(cb); } -// Return Producer instance that is local to given thread, via ThreadLocalKafkaFacade. +// Return KafkaProducer instance that is local to given thread, via ThreadLocalKafkaFacade. KafkaProducer& UpstreamKafkaFacadeImpl::getProducerForTopic(const std::string& topic) { return tls_->getTyped().getProducerForTopic(topic); } diff --git a/contrib/kafka/filters/network/test/mesh/BUILD b/contrib/kafka/filters/network/test/mesh/BUILD index e384302637063..acff686d9e163 100644 --- a/contrib/kafka/filters/network/test/mesh/BUILD +++ b/contrib/kafka/filters/network/test/mesh/BUILD @@ -1,8 +1,13 @@ load( "//bazel:envoy_build_system.bzl", "envoy_cc_test", + "envoy_cc_test_library", "envoy_contrib_package", ) +load( + "//bazel:envoy_internal.bzl", + "envoy_external_dep_path", +) licenses(["notice"]) # Apache 2 @@ -27,6 +32,15 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "abstract_command_unit_test", + srcs = ["abstract_command_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh:abstract_command_lib", + ], +) + envoy_cc_test( name = "upstream_kafka_facade_unit_test", srcs = ["upstream_kafka_facade_unit_test.cc"], @@ -39,10 +53,23 @@ envoy_cc_test( ) envoy_cc_test( - name = "abstract_command_unit_test", - srcs = ["abstract_command_unit_test.cc"], + name = "upstream_kafka_client_impl_unit_test", + srcs = ["upstream_kafka_client_impl_unit_test.cc"], tags = ["skip_on_windows"], deps = [ - "//contrib/kafka/filters/network/source/mesh:abstract_command_lib", + ":kafka_mocks_lib", + "//contrib/kafka/filters/network/source/mesh:upstream_kafka_client_impl_lib", + "//test/mocks/event:event_mocks", + "//test/test_common:thread_factory_for_test_lib", + ], +) + +envoy_cc_test_library( + name = "kafka_mocks_lib", + srcs = [], + hdrs = ["kafka_mocks.h"], + tags = ["skip_on_windows"], + deps = [ + envoy_external_dep_path("librdkafka"), ], ) diff --git a/contrib/kafka/filters/network/test/mesh/kafka_mocks.h b/contrib/kafka/filters/network/test/mesh/kafka_mocks.h new file mode 100644 index 0000000000000..ba8996118b0f4 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/kafka_mocks.h @@ -0,0 +1,98 @@ +#pragma once + +#include "gmock/gmock.h" +#include "librdkafka/rdkafkacpp.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +class MockKafkaProducer : public RdKafka::Producer { +public: + // Producer API. + MOCK_METHOD(RdKafka::ErrorCode, produce, + (RdKafka::Topic*, int32_t, int, void*, size_t, const std::string*, void*), ()); + MOCK_METHOD(RdKafka::ErrorCode, produce, + (RdKafka::Topic*, int32_t, int, void*, size_t, const void*, size_t, void*), ()); + MOCK_METHOD(RdKafka::ErrorCode, produce, + (const std::string, int32_t, int, void*, size_t, const void*, size_t, int64_t, void*), + ()); + MOCK_METHOD(RdKafka::ErrorCode, produce, + (const std::string, int32_t, int, void*, size_t, const void*, size_t, int64_t, + RdKafka::Headers*, void*), + ()); + MOCK_METHOD(RdKafka::ErrorCode, produce, + (RdKafka::Topic*, int32_t, const std::vector*, const std::vector*, void*), + ()); + MOCK_METHOD(RdKafka::ErrorCode, flush, (int), ()); + MOCK_METHOD(RdKafka::ErrorCode, purge, (int), ()); + MOCK_METHOD(RdKafka::Error*, init_transactions, (int), ()); + MOCK_METHOD(RdKafka::Error*, begin_transaction, (), ()); + MOCK_METHOD(RdKafka::Error*, send_offsets_to_transaction, + (const std::vector&, const RdKafka::ConsumerGroupMetadata*, + int), + ()); + MOCK_METHOD(RdKafka::Error*, commit_transaction, (int), ()); + MOCK_METHOD(RdKafka::Error*, abort_transaction, (int), ()); + + // Handle API (unused by us). + MOCK_METHOD(const std::string, name, (), (const)); + MOCK_METHOD(const std::string, memberid, (), (const)); + MOCK_METHOD(int, poll, (int), ()); + MOCK_METHOD(int, outq_len, (), ()); + MOCK_METHOD(RdKafka::ErrorCode, metadata, + (bool, const RdKafka::Topic*, RdKafka::Metadata**, int timout_ms), ()); + MOCK_METHOD(RdKafka::ErrorCode, pause, (std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, resume, (std::vector&), ()); + MOCK_METHOD(RdKafka::ErrorCode, query_watermark_offsets, + (const std::string&, int32_t, int64_t*, int64_t*, int), ()); + MOCK_METHOD(RdKafka::ErrorCode, get_watermark_offsets, + (const std::string&, int32_t, int64_t*, int64_t*), ()); + MOCK_METHOD(RdKafka::ErrorCode, offsetsForTimes, (std::vector&, int), + ()); + MOCK_METHOD(RdKafka::Queue*, get_partition_queue, (const RdKafka::TopicPartition*), ()); + MOCK_METHOD(RdKafka::ErrorCode, set_log_queue, (RdKafka::Queue*), ()); + MOCK_METHOD(void, yield, (), ()); + MOCK_METHOD(const std::string, clusterid, (int), ()); + MOCK_METHOD(struct rd_kafka_s*, c_ptr, (), ()); + MOCK_METHOD(int32_t, controllerid, (int), ()); + MOCK_METHOD(RdKafka::ErrorCode, fatal_error, (std::string&), (const)); + MOCK_METHOD(RdKafka::ErrorCode, oauthbearer_set_token, + (const std::string&, int64_t, const std::string&, const std::list&, + std::string&), + ()); + MOCK_METHOD(RdKafka::ErrorCode, oauthbearer_set_token_failure, (const std::string&), ()); + MOCK_METHOD(void*, mem_malloc, (size_t), ()); + MOCK_METHOD(void, mem_free, (void*), ()); +}; + +class MockKafkaMessage : public RdKafka::Message { +public: + MOCK_METHOD(std::string, errstr, (), (const)); + MOCK_METHOD(RdKafka::ErrorCode, err, (), (const)); + MOCK_METHOD(RdKafka::Topic*, topic, (), (const)); + MOCK_METHOD(std::string, topic_name, (), (const)); + MOCK_METHOD(int32_t, partition, (), (const)); + MOCK_METHOD(void*, payload, (), (const)); + MOCK_METHOD(size_t, len, (), (const)); + MOCK_METHOD(const std::string*, key, (), (const)); + MOCK_METHOD(const void*, key_pointer, (), (const)); + MOCK_METHOD(size_t, key_len, (), (const)); + MOCK_METHOD(int64_t, offset, (), (const)); + MOCK_METHOD(RdKafka::MessageTimestamp, timestamp, (), (const)); + MOCK_METHOD(void*, msg_opaque, (), (const)); + MOCK_METHOD(int64_t, latency, (), (const)); + MOCK_METHOD(struct rd_kafka_message_s*, c_ptr, ()); + MOCK_METHOD(RdKafka::Message::Status, status, (), (const)); + MOCK_METHOD(RdKafka::Headers*, headers, ()); + MOCK_METHOD(RdKafka::Headers*, headers, (RdKafka::ErrorCode*)); + MOCK_METHOD(int32_t, broker_id, (), (const)); +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/test/mesh/upstream_kafka_client_impl_unit_test.cc b/contrib/kafka/filters/network/test/mesh/upstream_kafka_client_impl_unit_test.cc new file mode 100644 index 0000000000000..c40ebd8589bc8 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/upstream_kafka_client_impl_unit_test.cc @@ -0,0 +1,211 @@ +#include "test/mocks/event/mocks.h" +#include "test/test_common/thread_factory_for_test.h" + +#include "absl/synchronization/blocking_counter.h" +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_client_impl.h" +#include "contrib/kafka/filters/network/test/mesh/kafka_mocks.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::AnyNumber; +using testing::AtLeast; +using testing::Return; +using testing::ReturnNull; + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +class MockLibRdKafkaUtils : public LibRdKafkaUtils { +public: + MOCK_METHOD(RdKafka::Conf::ConfResult, setConfProperty, + (RdKafka::Conf&, const std::string&, const std::string&, std::string&), (const)); + MOCK_METHOD(RdKafka::Conf::ConfResult, setConfDeliveryCallback, + (RdKafka::Conf&, RdKafka::DeliveryReportCb*, std::string&), (const)); + MOCK_METHOD((std::unique_ptr), createProducer, + (RdKafka::Conf*, std::string& errstr), (const)); +}; + +class MockProduceFinishCb : public ProduceFinishCb { +public: + MOCK_METHOD(bool, accept, (const DeliveryMemento&)); +}; + +class UpstreamKafkaClientTest : public testing::Test { +protected: + Event::MockDispatcher dispatcher_; + Thread::ThreadFactory& thread_factory_ = Thread::threadFactoryForTest(); + MockLibRdKafkaUtils kafka_utils_; + RawKafkaProducerConfig config_ = {{"key1", "value1"}, {"key2", "value2"}}; + + std::unique_ptr producer_ptr = std::make_unique(); + MockKafkaProducer& producer = *producer_ptr; + + std::shared_ptr origin_ = std::make_shared(); + + // Helper method - allows creation of RichKafkaProducer without problems. + void setupConstructorExpectations() { + EXPECT_CALL(kafka_utils_, setConfProperty(_, "key1", "value1", _)) + .WillOnce(Return(RdKafka::Conf::CONF_OK)); + EXPECT_CALL(kafka_utils_, setConfProperty(_, "key2", "value2", _)) + .WillOnce(Return(RdKafka::Conf::CONF_OK)); + EXPECT_CALL(kafka_utils_, setConfDeliveryCallback(_, _, _)) + .WillOnce(Return(RdKafka::Conf::CONF_OK)); + + EXPECT_CALL(producer, poll(_)).Times(AnyNumber()); + EXPECT_CALL(kafka_utils_, createProducer(_, _)) + .WillOnce(Return(testing::ByMove(std::move(producer_ptr)))); + } +}; + +TEST_F(UpstreamKafkaClientTest, ShouldConstructWithoutProblems) { + // given + setupConstructorExpectations(); + + // when, then - producer got created without problems. + RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; +} + +TEST_F(UpstreamKafkaClientTest, ShouldSendRecordsAndReceiveConfirmations) { + // given + setupConstructorExpectations(); + RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; + + // when, then - should send request without problems. + EXPECT_CALL(producer, produce("t1", 13, 0, _, _, _, _, _, _)) + .Times(3) + .WillRepeatedly(Return(RdKafka::ERR_NO_ERROR)); + const std::vector payloads = {"value1", "value2", "value3"}; + for (const auto& arg : payloads) { + testee.send(origin_, "t1", 13, "KEY", arg); + } + EXPECT_EQ(testee.getUnfinishedRequestsForTest().size(), payloads.size()); + + // when, then - should process confirmations. + EXPECT_CALL(*origin_, accept(_)).Times(3).WillRepeatedly(Return(true)); + for (const auto& arg : payloads) { + const DeliveryMemento memento = {arg.c_str(), RdKafka::ERR_NO_ERROR, 0}; + testee.processDelivery(memento); + } + EXPECT_EQ(testee.getUnfinishedRequestsForTest().size(), 0); +} + +TEST_F(UpstreamKafkaClientTest, ShouldCheckCallbacksForDeliveries) { + // given + setupConstructorExpectations(); + RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; + + // when, then - should send request without problems. + EXPECT_CALL(producer, produce("t1", 13, 0, _, _, _, _, _, _)) + .Times(2) + .WillRepeatedly(Return(RdKafka::ERR_NO_ERROR)); + const std::vector payloads = {"value1", "value2"}; + auto origin1 = std::make_shared(); + auto origin2 = std::make_shared(); + testee.send(origin1, "t1", 13, "KEY", payloads[0]); + testee.send(origin2, "t1", 13, "KEY", payloads[1]); + EXPECT_EQ(testee.getUnfinishedRequestsForTest().size(), payloads.size()); + + // when, then - should process confirmations (notice we pass second memento first). + EXPECT_CALL(*origin1, accept(_)).WillOnce(Return(false)).WillOnce(Return(true)); + EXPECT_CALL(*origin2, accept(_)).WillOnce(Return(true)); + const DeliveryMemento memento1 = {payloads[1].c_str(), RdKafka::ERR_NO_ERROR, 0}; + testee.processDelivery(memento1); + EXPECT_EQ(testee.getUnfinishedRequestsForTest().size(), 1); + const DeliveryMemento memento2 = {payloads[0].c_str(), RdKafka::ERR_NO_ERROR, 0}; + testee.processDelivery(memento2); + EXPECT_EQ(testee.getUnfinishedRequestsForTest().size(), 0); +} + +TEST_F(UpstreamKafkaClientTest, ShouldHandleProduceFailures) { + // given + setupConstructorExpectations(); + RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; + + // when, then - if there are problems while sending, notify the source immediately. + EXPECT_CALL(producer, produce("t1", 42, 0, _, _, _, _, _, _)) + .WillOnce(Return(RdKafka::ERR_LEADER_NOT_AVAILABLE)); + EXPECT_CALL(*origin_, accept(_)).WillOnce(Return(true)); + testee.send(origin_, "t1", 42, "KEY", "VALUE"); + EXPECT_EQ(testee.getUnfinishedRequestsForTest().size(), 0); +} + +TEST_F(UpstreamKafkaClientTest, ShouldHandleKafkaCallback) { + // given + setupConstructorExpectations(); + RichKafkaProducer testee = {dispatcher_, thread_factory_, config_, kafka_utils_}; + testing::NiceMock message; + + // when, then - notification is passed to dispatcher. + EXPECT_CALL(dispatcher_, post(_)); + testee.dr_cb(message); +} + +// This handles situations when users pass bad config to raw producer. +TEST_F(UpstreamKafkaClientTest, ShouldThrowIfSettingPropertiesFails) { + // given + EXPECT_CALL(kafka_utils_, setConfProperty(_, _, _, _)) + .WillOnce(Return(RdKafka::Conf::CONF_INVALID)); + + // when, then - exception gets thrown during construction. + EXPECT_THROW(RichKafkaProducer(dispatcher_, thread_factory_, config_, kafka_utils_), + EnvoyException); +} + +TEST_F(UpstreamKafkaClientTest, ShouldThrowIfSettingDeliveryCallbackFails) { + // given + EXPECT_CALL(kafka_utils_, setConfProperty(_, _, _, _)) + .WillRepeatedly(Return(RdKafka::Conf::CONF_OK)); + EXPECT_CALL(kafka_utils_, setConfDeliveryCallback(_, _, _)) + .WillOnce(Return(RdKafka::Conf::CONF_INVALID)); + + // when, then - exception gets thrown during construction. + EXPECT_THROW(RichKafkaProducer(dispatcher_, thread_factory_, config_, kafka_utils_), + EnvoyException); +} + +TEST_F(UpstreamKafkaClientTest, ShouldThrowIfRawProducerConstructionFails) { + // given + EXPECT_CALL(kafka_utils_, setConfProperty(_, _, _, _)) + .WillRepeatedly(Return(RdKafka::Conf::CONF_OK)); + EXPECT_CALL(kafka_utils_, setConfDeliveryCallback(_, _, _)) + .WillOnce(Return(RdKafka::Conf::CONF_OK)); + EXPECT_CALL(kafka_utils_, createProducer(_, _)).WillOnce(ReturnNull()); + + // when, then - exception gets thrown during construction. + EXPECT_THROW(RichKafkaProducer(dispatcher_, thread_factory_, config_, kafka_utils_), + EnvoyException); +} + +// Rich producer's constructor starts a monitoring thread. +// We are going to wait for at least one invocation of producer 'poll', so we are confident that it +// does monitoring. Then we are going to destroy the testee, and expect the thread to finish. +TEST_F(UpstreamKafkaClientTest, ShouldPollProducerForEventsUntilShutdown) { + // given + setupConstructorExpectations(); + + absl::BlockingCounter counter{1}; + EXPECT_CALL(producer, poll(_)).Times(AtLeast(1)).WillOnce([&counter]() { + counter.DecrementCount(); + return 0; + }); + + // when + { + std::unique_ptr testee = + std::make_unique(dispatcher_, thread_factory_, config_, kafka_utils_); + counter.Wait(); + } + + // then - the above block actually finished, what means that the monitoring thread interacted with + // underlying Kafka producer. +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index c8cd2cff02645..d2cde24524ce4 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -766,6 +766,7 @@ lexically libc libevent libprotobuf +librdkafka libtool libstdc lifecycle