diff --git a/api/BUILD b/api/BUILD index 5bbde32946b63..93f9184a2b400 100644 --- a/api/BUILD +++ b/api/BUILD @@ -60,6 +60,7 @@ proto_library( "//contrib/envoy/extensions/filters/http/squash/v3:pkg", "//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/kafka_broker/v3:pkg", + "//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/mysql_proxy/v3:pkg", "//contrib/envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/rocketmq_proxy/v3:pkg", diff --git a/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/BUILD b/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/BUILD new file mode 100644 index 0000000000000..ee92fb652582e --- /dev/null +++ b/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], +) diff --git a/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto b/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto new file mode 100644 index 0000000000000..03a6522852ab5 --- /dev/null +++ b/api/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto @@ -0,0 +1,58 @@ +syntax = "proto3"; + +package envoy.extensions.filters.network.kafka_mesh.v3alpha; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.network.kafka_mesh.v3alpha"; +option java_outer_classname = "KafkaMeshProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).work_in_progress = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Kafka Mesh] +// Kafka Mesh :ref:`configuration overview `. +// [#extension: envoy.filters.network.kafka_mesh] + +message KafkaMesh { + // Envoy's host that's advertised to clients. + // Has the same meaning as corresponding Kafka broker properties. + // Usually equal to filter chain's listener config, but needs to be reachable by clients + // (so 0.0.0.0 will not work). + string advertised_host = 1 [(validate.rules).string = {min_len: 1}]; + + // Envoy's port that's advertised to clients. + int32 advertised_port = 2 [(validate.rules).int32 = {gt: 0}]; + + // Upstream clusters this filter will connect to. + repeated KafkaClusterDefinition upstream_clusters = 3; + + // Rules that will decide which cluster gets which request. + repeated ForwardingRule forwarding_rules = 4; +} + +message KafkaClusterDefinition { + // Cluster name. + string cluster_name = 1 [(validate.rules).string = {min_len: 1}]; + + // Kafka cluster address. + string bootstrap_servers = 2 [(validate.rules).string = {min_len: 1}]; + + // Default number of partitions present in this cluster. + // This is especially important for clients that do not specify partition in their payloads and depend on this value for hashing. + int32 partition_count = 3 [(validate.rules).int32 = {gt: 0}]; + + // Custom configuration passed to Kafka producer. + map producer_config = 4; +} + +message ForwardingRule { + // Cluster name. + string target_cluster = 1; + + oneof trigger { + // Intended place for future types of forwarding rules. + string topic_prefix = 2; + } +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index 52cb8c09eaf81..61af4c4764680 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -12,6 +12,7 @@ proto_library( "//contrib/envoy/extensions/filters/http/squash/v3:pkg", "//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/kafka_broker/v3:pkg", + "//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/mysql_proxy/v3:pkg", "//contrib/envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/rocketmq_proxy/v3:pkg", diff --git a/bazel/BUILD b/bazel/BUILD index 016482a577f3e..303ab531bead3 100644 --- a/bazel/BUILD +++ b/bazel/BUILD @@ -586,3 +586,8 @@ alias( name = "remote_jdk11", actual = "@bazel_tools//tools/jdk:remote_jdk11", ) + +alias( + name = "windows", + actual = "@bazel_tools//src/conditions:windows", +) diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index b817e6efffab7..75fa3544bcf83 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -935,7 +935,7 @@ REPOSITORY_LOCATIONS_SPEC = dict( strip_prefix = "kafka-{version}/clients/src/main/resources/common/message", urls = ["https://github.com/apache/kafka/archive/{version}.zip"], use_category = ["dataplane_ext"], - extensions = ["envoy.filters.network.kafka_broker"], + extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"], release_date = "2020-03-03", cpe = "cpe:2.3:a:apache:kafka:*", ), @@ -948,7 +948,7 @@ REPOSITORY_LOCATIONS_SPEC = dict( strip_prefix = "librdkafka-{version}", urls = ["https://github.com/edenhill/librdkafka/archive/v{version}.tar.gz"], use_category = ["dataplane_ext"], - extensions = ["envoy.filters.network.kafka_broker"], + extensions = ["envoy.filters.network.kafka_mesh"], release_date = "2021-05-10", cpe = "N/A", ), diff --git a/contrib/contrib_build_config.bzl b/contrib/contrib_build_config.bzl index 34ef00af9fd15..f27001d971be8 100644 --- a/contrib/contrib_build_config.bzl +++ b/contrib/contrib_build_config.bzl @@ -12,6 +12,7 @@ CONTRIB_EXTENSIONS = { # "envoy.filters.network.kafka_broker": "//contrib/kafka/filters/network/source:kafka_broker_config_lib", + "envoy.filters.network.kafka_mesh": "//contrib/kafka/filters/network/source/mesh:config_lib", "envoy.filters.network.mysql_proxy": "//contrib/mysql_proxy/filters/network/source:config", "envoy.filters.network.postgres_proxy": "//contrib/postgres_proxy/filters/network/source:config", "envoy.filters.network.rocketmq_proxy": "//contrib/rocketmq_proxy/filters/network/source:config", diff --git a/contrib/extensions_metadata.yaml b/contrib/extensions_metadata.yaml index c3ccc61e53ee1..8614d2dbddb83 100644 --- a/contrib/extensions_metadata.yaml +++ b/contrib/extensions_metadata.yaml @@ -13,6 +13,11 @@ envoy.filters.network.kafka_broker: - envoy.filters.network security_posture: requires_trusted_downstream_and_upstream status: wip +envoy.filters.network.kafka_mesh: + categories: + - envoy.filters.network + security_posture: requires_trusted_downstream_and_upstream + status: wip envoy.filters.network.rocketmq_proxy: categories: - envoy.filters.network diff --git a/contrib/kafka/filters/network/source/mesh/BUILD b/contrib/kafka/filters/network/source/mesh/BUILD index fe24168a884b0..f457afee713ea 100644 --- a/contrib/kafka/filters/network/source/mesh/BUILD +++ b/contrib/kafka/filters/network/source/mesh/BUILD @@ -1,5 +1,6 @@ load( "//bazel:envoy_build_system.bzl", + "envoy_cc_contrib_extension", "envoy_cc_library", "envoy_contrib_package", ) @@ -10,6 +11,25 @@ licenses(["notice"]) # Apache 2 envoy_contrib_package() # Kafka-mesh network filter. +# Mesh filter public docs: docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst + +envoy_cc_contrib_extension( + name = "config_lib", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + "//envoy/registry", + "//source/extensions/filters/network/common:factory_base_lib", + "@envoy_api//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg_cc_proto", + ] + select({ + "//bazel:windows": [], + "//conditions:default": [ + ":filter_lib", + ":upstream_config_lib", + ":upstream_kafka_facade_lib", + ], + }), +) envoy_cc_library( name = "filter_lib", @@ -121,11 +141,15 @@ envoy_cc_library( envoy_cc_library( name = "upstream_config_lib", srcs = [ + "upstream_config.cc", ], hdrs = [ "upstream_config.h", ], tags = ["skip_on_windows"], deps = [ + "//source/common/common:assert_lib", + "//source/common/common:minimal_logger_lib", + "@envoy_api//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg_cc_proto", ], ) diff --git a/contrib/kafka/filters/network/source/mesh/config.cc b/contrib/kafka/filters/network/source/mesh/config.cc new file mode 100644 index 0000000000000..7c2a1f4e2474c --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/config.cc @@ -0,0 +1,55 @@ +#include "contrib/kafka/filters/network/source/mesh/config.h" + +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" +#include "envoy/stats/scope.h" + +#ifndef WIN32 +#include "contrib/kafka/filters/network/source/mesh/upstream_config.h" +#include "contrib/kafka/filters/network/source/mesh/upstream_kafka_facade.h" +#include "contrib/kafka/filters/network/source/mesh/filter.h" +#else +#include "envoy/common/exception.h" +#endif + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +// The mesh filter doesn't do anything special, it just sets up the shared entities. +// Any extra configuration validation is done in UpstreamKafkaConfiguration constructor. +Network::FilterFactoryCb KafkaMeshConfigFactory::createFilterFactoryFromProtoTyped( + const KafkaMeshProtoConfig& config, Server::Configuration::FactoryContext& context) { + +#ifdef WIN32 + throw EnvoyException("Kafka mesh filter is not supported on Windows"); +#else + // Shared configuration (tells us where the upstream clusters are). + const UpstreamKafkaConfigurationSharedPtr configuration = + std::make_shared(config); + + // Shared upstream facade (connects us to upstream Kafka clusters). + const UpstreamKafkaFacadeSharedPtr upstream_kafka_facade = + std::make_shared(*configuration, context.threadLocal(), + context.api().threadFactory()); + + return [configuration, upstream_kafka_facade](Network::FilterManager& filter_manager) -> void { + Network::ReadFilterSharedPtr filter = + std::make_shared(*configuration, *upstream_kafka_facade); + filter_manager.addReadFilter(filter); + }; +#endif +} + +/** + * Static registration for the Kafka filter. @see RegisterFactory. + */ +REGISTER_FACTORY(KafkaMeshConfigFactory, Server::Configuration::NamedNetworkFilterConfigFactory); + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/config.h b/contrib/kafka/filters/network/source/mesh/config.h new file mode 100644 index 0000000000000..12ba71691bbca --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/config.h @@ -0,0 +1,33 @@ +#pragma once + +#include "source/extensions/filters/network/common/factory_base.h" + +#include "contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.pb.h" +#include "contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.pb.validate.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +using KafkaMeshProtoConfig = envoy::extensions::filters::network::kafka_mesh::v3alpha::KafkaMesh; + +/** + * Config registration for the Kafka mesh filter. + */ +class KafkaMeshConfigFactory : public Common::FactoryBase { +public: + KafkaMeshConfigFactory() : FactoryBase("envoy.filters.network.kafka_mesh", true) {} + +private: + Network::FilterFactoryCb + createFilterFactoryFromProtoTyped(const KafkaMeshProtoConfig& config, + Server::Configuration::FactoryContext& context) override; +}; + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/upstream_config.cc b/contrib/kafka/filters/network/source/mesh/upstream_config.cc new file mode 100644 index 0000000000000..8e6917df034c8 --- /dev/null +++ b/contrib/kafka/filters/network/source/mesh/upstream_config.cc @@ -0,0 +1,93 @@ +#include "contrib/kafka/filters/network/source/mesh/upstream_config.h" + +#include "envoy/common/exception.h" + +#include "source/common/common/assert.h" + +#include "absl/strings/str_cat.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +using KafkaClusterDefinition = + envoy::extensions::filters::network::kafka_mesh::v3alpha::KafkaClusterDefinition; +using ForwardingRule = envoy::extensions::filters::network::kafka_mesh::v3alpha::ForwardingRule; + +UpstreamKafkaConfigurationImpl::UpstreamKafkaConfigurationImpl(const KafkaMeshProtoConfig& config) + : advertised_address_{config.advertised_host(), config.advertised_port()} { + + // Processing cluster data. + const auto& upstream_clusters = config.upstream_clusters(); + if (upstream_clusters.empty()) { + throw EnvoyException("kafka-mesh filter needs to have at least one upstream Kafka cluster"); + } + + // Processing cluster configuration. + std::map cluster_name_to_cluster_config; + for (const auto& upstream_cluster_definition : upstream_clusters) { + const std::string& cluster_name = upstream_cluster_definition.cluster_name(); + + // No duplicates are allowed. + if (cluster_name_to_cluster_config.find(cluster_name) != cluster_name_to_cluster_config.end()) { + throw EnvoyException( + absl::StrCat("kafka-mesh filter has multiple Kafka clusters referenced by the same name", + cluster_name)); + } + + // Upstream client configuration - use all the optional custom configs provided, and then use + // the target IPs. + std::map producer_configs = { + upstream_cluster_definition.producer_config().begin(), + upstream_cluster_definition.producer_config().end()}; + producer_configs["bootstrap.servers"] = upstream_cluster_definition.bootstrap_servers(); + ClusterConfig cluster_config = {cluster_name, upstream_cluster_definition.partition_count(), + producer_configs}; + cluster_name_to_cluster_config[cluster_name] = cluster_config; + } + + // Processing forwarding rules. + const auto& forwarding_rules = config.forwarding_rules(); + if (forwarding_rules.empty()) { + throw EnvoyException("kafka-mesh filter needs to have at least one forwarding rule"); + } + + for (const auto& rule : forwarding_rules) { + const std::string& target_cluster = rule.target_cluster(); + ASSERT(rule.trigger_case() == ForwardingRule::TriggerCase::kTopicPrefix); + ENVOY_LOG(trace, "Setting up forwarding rule: {} -> {}", rule.topic_prefix(), target_cluster); + // Each forwarding rule needs to reference a cluster. + if (cluster_name_to_cluster_config.find(target_cluster) == + cluster_name_to_cluster_config.end()) { + throw EnvoyException(absl::StrCat( + "kafka-mesh filter forwarding rule is referencing unknown upstream Kafka cluster: ", + target_cluster)); + } + topic_prefix_to_cluster_config_[rule.topic_prefix()] = + cluster_name_to_cluster_config[target_cluster]; + } +} + +absl::optional +UpstreamKafkaConfigurationImpl::computeClusterConfigForTopic(const std::string& topic) const { + // We find the first matching prefix (this is why ordering is important). + for (const auto& it : topic_prefix_to_cluster_config_) { + if (topic.rfind(it.first, 0) == 0) { + const ClusterConfig cluster_config = it.second; + return absl::make_optional(cluster_config); + } + } + return absl::nullopt; +} + +std::pair UpstreamKafkaConfigurationImpl::getAdvertisedAddress() const { + return advertised_address_; +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/source/mesh/upstream_config.h b/contrib/kafka/filters/network/source/mesh/upstream_config.h index 00e3e7faf32da..ad49f2f1304d6 100644 --- a/contrib/kafka/filters/network/source/mesh/upstream_config.h +++ b/contrib/kafka/filters/network/source/mesh/upstream_config.h @@ -7,7 +7,11 @@ #include "envoy/common/pure.h" +#include "source/common/common/logger.h" + #include "absl/types/optional.h" +#include "contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.pb.h" +#include "contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.pb.validate.h" namespace Envoy { namespace Extensions { @@ -15,6 +19,8 @@ namespace NetworkFilters { namespace Kafka { namespace Mesh { +using KafkaMeshProtoConfig = envoy::extensions::filters::network::kafka_mesh::v3alpha::KafkaMesh; + // Minor helper structure that contains information about upstream Kafka clusters. struct ClusterConfig { @@ -32,23 +38,52 @@ struct ClusterConfig { // This map always contains entry with key 'bootstrap.servers', as this is the only mandatory // producer property. std::map upstream_producer_properties_; + + bool operator==(const ClusterConfig& rhs) const { + return name_ == rhs.name_ && partition_count_ == rhs.partition_count_ && + upstream_producer_properties_ == rhs.upstream_producer_properties_; + } }; /** * Keeps the configuration related to upstream Kafka clusters. - * Impl note: current matching from topic to cluster is based on prefix matching but more complex - * rules could be added. */ class UpstreamKafkaConfiguration { public: virtual ~UpstreamKafkaConfiguration() = default; + + // Return this the host-port pair that's provided to Kafka clients. + // This value needs to follow same rules as 'advertised.address' property of Kafka broker. + virtual std::pair getAdvertisedAddress() const PURE; + + // Provides cluster for given Kafka topic, according to the rules contained within this + // configuration object. virtual absl::optional computeClusterConfigForTopic(const std::string& topic) const PURE; - virtual std::pair getAdvertisedAddress() const PURE; }; using UpstreamKafkaConfigurationSharedPtr = std::shared_ptr; +/** + * Implementation that uses only topic-prefix to figure out which Kafka cluster to use. + */ +class UpstreamKafkaConfigurationImpl : public UpstreamKafkaConfiguration, + private Logger::Loggable { +public: + UpstreamKafkaConfigurationImpl(const KafkaMeshProtoConfig& config); + + // UpstreamKafkaConfiguration + absl::optional + computeClusterConfigForTopic(const std::string& topic) const override; + + // UpstreamKafkaConfiguration + std::pair getAdvertisedAddress() const override; + +private: + const std::pair advertised_address_; + std::map topic_prefix_to_cluster_config_; +}; + } // namespace Mesh } // namespace Kafka } // namespace NetworkFilters diff --git a/contrib/kafka/filters/network/test/mesh/BUILD b/contrib/kafka/filters/network/test/mesh/BUILD index acff686d9e163..bc6e740f08842 100644 --- a/contrib/kafka/filters/network/test/mesh/BUILD +++ b/contrib/kafka/filters/network/test/mesh/BUILD @@ -13,6 +13,16 @@ licenses(["notice"]) # Apache 2 envoy_contrib_package() +envoy_cc_test( + name = "config_unit_test", + srcs = ["config_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh:config_lib", + "//test/mocks/server:factory_context_mocks", + ], +) + envoy_cc_test( name = "filter_unit_test", srcs = ["filter_unit_test.cc"], @@ -73,3 +83,12 @@ envoy_cc_test_library( envoy_external_dep_path("librdkafka"), ], ) + +envoy_cc_test( + name = "upstream_config_unit_test", + srcs = ["upstream_config_unit_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//contrib/kafka/filters/network/source/mesh:upstream_config_lib", + ], +) diff --git a/contrib/kafka/filters/network/test/mesh/config_unit_test.cc b/contrib/kafka/filters/network/test/mesh/config_unit_test.cc new file mode 100644 index 0000000000000..3ac2ad70a64e6 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/config_unit_test.cc @@ -0,0 +1,81 @@ +#include "test/mocks/server/factory_context.h" + +#include "contrib/kafka/filters/network/source/mesh/config.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +class MockThreadFactory : public Thread::ThreadFactory { +public: + MOCK_METHOD(Thread::ThreadPtr, createThread, (std::function, Thread::OptionsOptConstRef)); + MOCK_METHOD(Thread::ThreadId, currentThreadId, ()); +}; + +TEST(KafkaMeshConfigFactoryUnitTest, shouldCreateFilter) { + // given + const std::string yaml = R"EOF( +advertised_host: "127.0.0.1" +advertised_port: 19092 +upstream_clusters: +- cluster_name: kafka_c1 + bootstrap_servers: 127.0.0.1:9092 + partition_count: 1 +- cluster_name: kafka_c2 + bootstrap_servers: 127.0.0.1:9093 + partition_count: 1 +- cluster_name: kafka_c3 + bootstrap_servers: 127.0.0.1:9094 + partition_count: 5 + producer_config: + acks: "1" + linger.ms: "500" +forwarding_rules: +- target_cluster: kafka_c1 + topic_prefix: apples +- target_cluster: kafka_c2 + topic_prefix: bananas +- target_cluster: kafka_c3 + topic_prefix: cherries + )EOF"; + + KafkaMeshProtoConfig proto_config; + TestUtility::loadFromYamlAndValidate(yaml, proto_config); + + testing::NiceMock context; + testing::NiceMock thread_factory; + ON_CALL(context.api_, threadFactory()).WillByDefault(ReturnRef(thread_factory)); + KafkaMeshConfigFactory factory; + + Network::FilterFactoryCb cb = factory.createFilterFactoryFromProto(proto_config, context); + Network::MockConnection connection; + EXPECT_CALL(connection, addReadFilter(_)); + + // when + cb(connection); + + // then - connection had `addFilter` invoked +} + +TEST(KafkaMeshConfigFactoryUnitTest, throwsIfAdvertisedPortIsMissing) { + // given + const std::string yaml = R"EOF( +advertised_host: "127.0.0.1" + )EOF"; + + KafkaMeshProtoConfig proto_config; + + // when + // then - exception gets thrown + EXPECT_THROW(TestUtility::loadFromYamlAndValidate(yaml, proto_config), ProtoValidationException); +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/kafka/filters/network/test/mesh/integration_test/BUILD b/contrib/kafka/filters/network/test/mesh/integration_test/BUILD new file mode 100644 index 0000000000000..295dcd6302177 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/integration_test/BUILD @@ -0,0 +1,31 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_contrib_package", + "envoy_py_test", +) +load("@kafka_pip3//:requirements.bzl", "requirement") + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +# This test sets up multiple services, and this can take variable amount of time (30-60 seconds). +envoy_py_test( + name = "kafka_mesh_integration_test", + srcs = [ + "kafka_mesh_integration_test.py", + "@kafka_python_client//:all", + ], + data = [ + "//contrib/exe:envoy-static", + "//bazel:remote_jdk11", + "@kafka_server_binary//:all", + ] + glob(["*.j2"]), + flaky = True, + python_version = "PY3", + srcs_version = "PY3", + deps = [ + requirement("Jinja2"), + requirement("MarkupSafe"), + ], +) diff --git a/contrib/kafka/filters/network/test/mesh/integration_test/envoy_config_yaml.j2 b/contrib/kafka/filters/network/test/mesh/integration_test/envoy_config_yaml.j2 new file mode 100644 index 0000000000000..fbb22d2af3a96 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/integration_test/envoy_config_yaml.j2 @@ -0,0 +1,34 @@ +static_resources: + listeners: + - address: + socket_address: + address: 127.0.0.1 + port_value: {{ data['kafka_envoy_port'] }} + filter_chains: + - filters: + - name: requesttypes + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker + stat_prefix: testfilter + - name: mesh + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh + advertised_host: "127.0.0.1" + advertised_port: {{ data['kafka_envoy_port'] }} + upstream_clusters: + - cluster_name: kafka_c1 + bootstrap_servers: 127.0.0.1:{{ data['kafka_real_port1'] }} + partition_count: 1 + - cluster_name: kafka_c2 + bootstrap_servers: 127.0.0.1:{{ data['kafka_real_port2'] }} + partition_count: 1 + forwarding_rules: + - target_cluster: kafka_c1 + topic_prefix: a + - target_cluster: kafka_c2 + topic_prefix: b +admin: + access_log_path: /dev/null + profile_path: /dev/null + address: + socket_address: { address: 127.0.0.1, port_value: {{ data['envoy_monitoring_port'] }} } diff --git a/contrib/kafka/filters/network/test/mesh/integration_test/kafka_mesh_integration_test.py b/contrib/kafka/filters/network/test/mesh/integration_test/kafka_mesh_integration_test.py new file mode 100644 index 0000000000000..f21145cda99a4 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/integration_test/kafka_mesh_integration_test.py @@ -0,0 +1,650 @@ +#!/usr/bin/python + +import random +import os +import shutil +import socket +import subprocess +import tempfile +from threading import Thread, Semaphore +import time +import unittest +import random + +from kafka import KafkaConsumer, KafkaProducer, TopicPartition +import urllib.request + + +class IntegrationTest(unittest.TestCase): + """ + All tests in this class depend on Envoy/Zookeeper/Kafka running. + For each of these tests we are going to create Kafka producers and consumers, with producers + pointing to Envoy (so the records get forwarded to target Kafka clusters) and verifying consumers + pointing to Kafka clusters directly (as mesh filter does not yet support Fetch requests). + We expect every operation to succeed (as they should reach Kafka) and the corresponding metrics + to increase on Envoy side (to show that messages were received and forwarded successfully). + """ + + services = None + + @classmethod + def setUpClass(cls): + IntegrationTest.services = ServicesHolder() + IntegrationTest.services.start() + + @classmethod + def tearDownClass(cls): + IntegrationTest.services.shut_down() + + def setUp(self): + # We want to check if our services are okay before running any kind of test. + IntegrationTest.services.check_state() + self.metrics = MetricsHolder(self) + + def tearDown(self): + # We want to check if our services are okay after running any test. + IntegrationTest.services.check_state() + + @classmethod + def kafka_envoy_address(cls): + return '127.0.0.1:%s' % IntegrationTest.services.kafka_envoy_port + + @classmethod + def kafka_cluster1_address(cls): + return '127.0.0.1:%s' % IntegrationTest.services.kafka_real_port1 + + @classmethod + def kafka_cluster2_address(cls): + return '127.0.0.1:%s' % IntegrationTest.services.kafka_real_port2 + + @classmethod + def envoy_stats_address(cls): + return 'http://127.0.0.1:%s/stats' % IntegrationTest.services.envoy_monitoring_port + + def test_producing(self): + """ + This test verifies that producer can send messages through mesh filter. + We are going to send messages to two topics: 'apples' and 'bananas'. + The mesh filter is configured to forward records for topics starting with 'a' (like 'apples') + to the first cluster, and the ones starting with 'b' (so 'bananas') to the second one. + + We are going to send messages one by one, so they will not be batched in Kafka producer, + so the filter is going to receive them one by one too. + + After sending, the consumers are going to read from Kafka clusters directly to make sure that + nothing was lost. + """ + + messages_to_send = 100 + partition1 = TopicPartition('apples', 0) + partition2 = TopicPartition('bananas', 0) + + producer = KafkaProducer( + bootstrap_servers=IntegrationTest.kafka_envoy_address(), api_version=(1, 0, 0)) + offset_to_payload1 = {} + offset_to_payload2 = {} + for _ in range(messages_to_send): + payload = bytearray(random.getrandbits(8) for _ in range(5)) + future1 = producer.send( + value=payload, topic=partition1.topic, partition=partition1.partition) + self.assertTrue(future1.get().offset >= 0) + offset_to_payload1[future1.get().offset] = payload + + future2 = producer.send( + value=payload, topic=partition2.topic, partition=partition2.partition) + self.assertTrue(future2.get().offset >= 0) + offset_to_payload2[future2.get().offset] = payload + self.assertTrue(len(offset_to_payload1) == messages_to_send) + self.assertTrue(len(offset_to_payload2) == messages_to_send) + producer.close() + + # Check the target clusters. + self.__verify_target_kafka_cluster( + IntegrationTest.kafka_cluster1_address(), partition1, offset_to_payload1, partition2) + self.__verify_target_kafka_cluster( + IntegrationTest.kafka_cluster2_address(), partition2, offset_to_payload2, partition1) + + # Check if requests have been received. + self.metrics.collect_final_metrics() + self.metrics.assert_metric_increase('produce', 200) + + def test_producing_with_batched_records(self): + """ + Compared to previous test, we are going to have batching in Kafka producers (this is caused by high 'linger.ms' value). + So a single request that reaches a Kafka broker might be carrying more than one record, for different partitions. + """ + messages_to_send = 100 + partition1 = TopicPartition('apricots', 0) + partition2 = TopicPartition('berries', 0) + + # This ensures that records to 'apricots' and 'berries' partitions. + producer = KafkaProducer( + bootstrap_servers=IntegrationTest.kafka_envoy_address(), + api_version=(1, 0, 0), + linger_ms=1000, + batch_size=100) + future_to_payload1 = {} + future_to_payload2 = {} + for _ in range(messages_to_send): + payload = bytearray(random.getrandbits(8) for _ in range(5)) + future1 = producer.send( + value=payload, topic=partition1.topic, partition=partition1.partition) + future_to_payload1[future1] = payload + + payload = bytearray(random.getrandbits(8) for _ in range(5)) + future2 = producer.send( + value=payload, topic=partition2.topic, partition=partition2.partition) + future_to_payload2[future2] = payload + + offset_to_payload1 = {} + offset_to_payload2 = {} + for future in future_to_payload1.keys(): + offset_to_payload1[future.get().offset] = future_to_payload1[future] + self.assertTrue(future.get().offset >= 0) + for future in future_to_payload2.keys(): + offset_to_payload2[future.get().offset] = future_to_payload2[future] + self.assertTrue(future.get().offset >= 0) + self.assertTrue(len(offset_to_payload1) == messages_to_send) + self.assertTrue(len(offset_to_payload2) == messages_to_send) + producer.close() + + # Check the target clusters. + self.__verify_target_kafka_cluster( + IntegrationTest.kafka_cluster1_address(), partition1, offset_to_payload1, partition2) + self.__verify_target_kafka_cluster( + IntegrationTest.kafka_cluster2_address(), partition2, offset_to_payload2, partition1) + + # Check if requests have been received. + self.metrics.collect_final_metrics() + self.metrics.assert_metric_increase('produce', 1) + + def __verify_target_kafka_cluster( + self, bootstrap_servers, partition, offset_to_payload_map, other_partition): + # Check if records were properly forwarded to the cluster. + consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest') + consumer.assign([partition]) + received_messages = [] + while (len(received_messages) < len(offset_to_payload_map)): + poll_result = consumer.poll(timeout_ms=1000) + received_messages += poll_result[partition] + self.assertTrue(len(received_messages) == len(offset_to_payload_map)) + for record in received_messages: + self.assertTrue(record.value == offset_to_payload_map[record.offset]) + + # Check that no records were incorrectly routed from the "other" partition (they would have created the topics). + self.assertTrue(other_partition.topic not in consumer.topics()) + consumer.close(False) + + +class MetricsHolder: + """ + Utility for storing Envoy metrics. + Expected to be created before the test (to get initial metrics), and then to collect them at the + end of test, so the expected increases can be verified. + """ + + def __init__(self, owner): + self.owner = owner + self.initial_requests, self.inital_responses = MetricsHolder.get_envoy_stats() + self.final_requests = None + self.final_responses = None + + def collect_final_metrics(self): + self.final_requests, self.final_responses = MetricsHolder.get_envoy_stats() + + def assert_metric_increase(self, message_type, count): + request_type = message_type + '_request' + response_type = message_type + '_response' + + initial_request_value = self.initial_requests.get(request_type, 0) + final_request_value = self.final_requests.get(request_type, 0) + self.owner.assertGreaterEqual(final_request_value, initial_request_value + count) + + initial_response_value = self.inital_responses.get(response_type, 0) + final_response_value = self.final_responses.get(response_type, 0) + self.owner.assertGreaterEqual(final_response_value, initial_response_value + count) + + @staticmethod + def get_envoy_stats(): + """ + Grab request/response metrics from envoy's stats interface. + """ + + stats_url = IntegrationTest.envoy_stats_address() + requests = {} + responses = {} + with urllib.request.urlopen(stats_url) as remote_metrics_url: + payload = remote_metrics_url.read().decode() + lines = payload.splitlines() + for line in lines: + request_prefix = 'kafka.testfilter.request.' + response_prefix = 'kafka.testfilter.response.' + if line.startswith(request_prefix): + data = line[len(request_prefix):].split(': ') + requests[data[0]] = int(data[1]) + pass + if line.startswith(response_prefix) and '_response:' in line: + data = line[len(response_prefix):].split(': ') + responses[data[0]] = int(data[1]) + return [requests, responses] + + +class ServicesHolder: + """ + Utility class for setting up our external dependencies: Envoy, Zookeeper + and two Kafka clusters (single-broker each). + """ + + def __init__(self): + self.kafka_tmp_dir = None + + self.envoy_worker = None + self.zk_worker = None + self.kafka_workers = None + + @staticmethod + def get_random_listener_port(): + """ + Here we count on OS to give us some random socket. + Obviously this method will need to be invoked in a try loop anyways, as in degenerate scenario + someone else might have bound to it after we had closed the socket and before the service + that's supposed to use it binds to it. + """ + + import socket + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: + server_socket.bind(('0.0.0.0', 0)) + socket_port = server_socket.getsockname()[1] + print('returning %s' % socket_port) + return socket_port + + def start(self): + """ + Starts all the services we need for integration tests. + """ + + # Find java installation that we are going to use to start Zookeeper & Kafka. + java_directory = ServicesHolder.find_java() + + launcher_environment = os.environ.copy() + # Make `java` visible to build script: + # https://github.com/apache/kafka/blob/2.2.0/bin/kafka-run-class.sh#L226 + new_path = os.path.abspath(java_directory) + os.pathsep + launcher_environment['PATH'] + launcher_environment['PATH'] = new_path + # Both ZK & Kafka use Kafka launcher script. + # By default it sets up JMX options: + # https://github.com/apache/kafka/blob/2.2.0/bin/kafka-run-class.sh#L167 + # But that forces the JVM to load file that is not present due to: + # https://docs.oracle.com/javase/9/management/monitoring-and-management-using-jmx-technology.htm + # Let's make it simple and just disable JMX. + launcher_environment['KAFKA_JMX_OPTS'] = ' ' + + # Setup a temporary directory, which will be used by Kafka & Zookeeper servers. + self.kafka_tmp_dir = tempfile.mkdtemp() + print('Temporary directory used for tests: ' + self.kafka_tmp_dir) + + # This directory will store the configuration files fed to services. + config_dir = self.kafka_tmp_dir + '/config' + os.mkdir(config_dir) + # This directory will store Zookeeper's data (== Kafka server metadata). + zookeeper_store_dir = self.kafka_tmp_dir + '/zookeeper_data' + os.mkdir(zookeeper_store_dir) + # These directories will store Kafka's data (== partitions). + kafka_store_dir1 = self.kafka_tmp_dir + '/kafka_data1' + os.mkdir(kafka_store_dir1) + kafka_store_dir2 = self.kafka_tmp_dir + '/kafka_data2' + os.mkdir(kafka_store_dir2) + + # Find the Kafka server 'bin' directory. + kafka_bin_dir = os.path.join('.', 'external', 'kafka_server_binary', 'bin') + + # Main initialization block: + # - generate random ports, + # - render configuration with these ports, + # - start services and check if they are running okay, + # - if anything is having problems, kill everything and start again. + while True: + + # Generate random ports. + zk_port = ServicesHolder.get_random_listener_port() + kafka_envoy_port = ServicesHolder.get_random_listener_port() + kafka_real_port1 = ServicesHolder.get_random_listener_port() + kafka_real_port2 = ServicesHolder.get_random_listener_port() + envoy_monitoring_port = ServicesHolder.get_random_listener_port() + + # These ports need to be exposed to tests. + self.kafka_envoy_port = kafka_envoy_port + self.kafka_real_port1 = kafka_real_port1 + self.kafka_real_port2 = kafka_real_port2 + self.envoy_monitoring_port = envoy_monitoring_port + + # Render config file for Envoy. + template = RenderingHelper.get_template('envoy_config_yaml.j2') + contents = template.render( + data={ + 'kafka_envoy_port': kafka_envoy_port, + 'kafka_real_port1': kafka_real_port1, + 'kafka_real_port2': kafka_real_port2, + 'envoy_monitoring_port': envoy_monitoring_port + }) + envoy_config_file = os.path.join(config_dir, 'envoy_config.yaml') + with open(envoy_config_file, 'w') as fd: + fd.write(contents) + print('Envoy config file rendered at: ' + envoy_config_file) + + # Render config file for Zookeeper. + template = RenderingHelper.get_template('zookeeper_properties.j2') + contents = template.render(data={'data_dir': zookeeper_store_dir, 'zk_port': zk_port}) + zookeeper_config_file = os.path.join(config_dir, 'zookeeper.properties') + with open(zookeeper_config_file, 'w') as fd: + fd.write(contents) + print('Zookeeper config file rendered at: ' + zookeeper_config_file) + + # Render config file for Kafka cluster 1. + template = RenderingHelper.get_template('kafka_server_properties.j2') + contents = template.render( + data={ + 'kafka_real_port': kafka_real_port1, + 'data_dir': kafka_store_dir1, + 'zk_port': zk_port, + 'kafka_zk_instance': 'instance1' + }) + kafka_config_file1 = os.path.join(config_dir, 'kafka_server1.properties') + with open(kafka_config_file1, 'w') as fd: + fd.write(contents) + print('Kafka config file rendered at: ' + kafka_config_file1) + + # Render config file for Kafka cluster 2. + template = RenderingHelper.get_template('kafka_server_properties.j2') + contents = template.render( + data={ + 'kafka_real_port': kafka_real_port2, + 'data_dir': kafka_store_dir2, + 'zk_port': zk_port, + 'kafka_zk_instance': 'instance2' + }) + kafka_config_file2 = os.path.join(config_dir, 'kafka_server2.properties') + with open(kafka_config_file2, 'w') as fd: + fd.write(contents) + print('Kafka config file rendered at: ' + kafka_config_file2) + + # Start the services now. + try: + + # Start Envoy in the background, pointing to rendered config file. + envoy_binary = ServicesHolder.find_envoy() + # --base-id is added to allow multiple Envoy instances to run at the same time. + envoy_args = [ + os.path.abspath(envoy_binary), '-c', envoy_config_file, '--base-id', + str(random.randint(1, 999999)) + ] + envoy_handle = subprocess.Popen( + envoy_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.envoy_worker = ProcessWorker( + envoy_handle, 'Envoy', 'starting main dispatch loop') + self.envoy_worker.await_startup() + + # Start Zookeeper in background, pointing to rendered config file. + zk_binary = os.path.join(kafka_bin_dir, 'zookeeper-server-start.sh') + zk_args = [os.path.abspath(zk_binary), zookeeper_config_file] + zk_handle = subprocess.Popen( + zk_args, + env=launcher_environment, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.zk_worker = ProcessWorker(zk_handle, 'Zookeeper', 'binding to port') + self.zk_worker.await_startup() + + self.kafka_workers = [] + + # Start Kafka 1 in background, pointing to rendered config file. + kafka_binary = os.path.join(kafka_bin_dir, 'kafka-server-start.sh') + kafka_args = [os.path.abspath(kafka_binary), os.path.abspath(kafka_config_file1)] + kafka_handle = subprocess.Popen( + kafka_args, + env=launcher_environment, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + kafka_worker = ProcessWorker(kafka_handle, 'Kafka', '[KafkaServer id=0] started') + kafka_worker.await_startup() + self.kafka_workers.append(kafka_worker) + + # Start Kafka 2 in background, pointing to rendered config file. + kafka_binary = os.path.join(kafka_bin_dir, 'kafka-server-start.sh') + kafka_args = [os.path.abspath(kafka_binary), os.path.abspath(kafka_config_file2)] + kafka_handle = subprocess.Popen( + kafka_args, + env=launcher_environment, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + kafka_worker = ProcessWorker(kafka_handle, 'Kafka', '[KafkaServer id=0] started') + kafka_worker.await_startup() + self.kafka_workers.append(kafka_worker) + + # All services have started without problems - now we can finally finish. + break + + except Exception as e: + print('Could not start services, will try again', e) + + if self.kafka_workers: + self.kafka_worker.kill() + self.kafka_worker = None + if self.zk_worker: + self.zk_worker.kill() + self.zk_worker = None + if self.envoy_worker: + self.envoy_worker.kill() + self.envoy_worker = None + + @staticmethod + def find_java(): + """ + This method just locates the Java installation in current directory. + We cannot hardcode the name, as the dirname changes as per: + https://github.com/bazelbuild/bazel/blob/master/tools/jdk/BUILD#L491 + """ + + external_dir = os.path.join('.', 'external') + for directory in os.listdir(external_dir): + if 'remotejdk11' in directory: + result = os.path.join(external_dir, directory, 'bin') + print('Using Java: ' + result) + return result + raise Exception('Could not find Java in: ' + external_dir) + + @staticmethod + def find_envoy(): + """ + This method locates envoy binary. + It's present at ./contrib/exe/envoy-static (at least for mac/bazel-asan/bazel-tsan), + or at ./external/envoy/contrib/exe/envoy-static (for bazel-compile_time_options). + """ + + candidate = os.path.join('.', 'contrib', 'exe', 'envoy-static') + if os.path.isfile(candidate): + return candidate + candidate = os.path.join('.', 'external', 'envoy', 'contrib', 'exe', 'envoy-static') + if os.path.isfile(candidate): + return candidate + raise Exception("Could not find Envoy") + + def shut_down(self): + # Teardown - kill Kafka, Zookeeper, and Envoy. Then delete their data directory. + print('Cleaning up') + + if self.kafka_workers: + for worker in self.kafka_workers: + worker.kill() + + if self.zk_worker: + self.zk_worker.kill() + + if self.envoy_worker: + self.envoy_worker.kill() + + if self.kafka_tmp_dir: + print('Removing temporary directory: ' + self.kafka_tmp_dir) + shutil.rmtree(self.kafka_tmp_dir) + + def check_state(self): + self.envoy_worker.check_state() + self.zk_worker.check_state() + for worker in self.kafka_workers: + worker.check_state() + + +class ProcessWorker: + """ + Helper class that wraps the external service process. + Provides ability to wait until service is ready to use (this is done by tracing logs) and + printing service's output to stdout. + """ + + # Service is considered to be properly initialized after it has logged its startup message + # and has been alive for INITIALIZATION_WAIT_SECONDS after that message has been seen. + # This (clunky) design is needed because Zookeeper happens to log "binding to port" and then + # might fail to bind. + INITIALIZATION_WAIT_SECONDS = 3 + + def __init__(self, process_handle, name, startup_message): + # Handle to process and pretty name. + self.process_handle = process_handle + self.name = name + + self.startup_message = startup_message + self.startup_message_ts = None + + # Semaphore raised when startup has finished and information regarding startup's success. + self.initialization_semaphore = Semaphore(value=0) + self.initialization_ok = False + + self.state_worker = Thread(target=ProcessWorker.initialization_worker, args=(self,)) + self.state_worker.start() + self.out_worker = Thread( + target=ProcessWorker.pipe_handler, args=(self, self.process_handle.stdout, 'out')) + self.out_worker.start() + self.err_worker = Thread( + target=ProcessWorker.pipe_handler, args=(self, self.process_handle.stderr, 'err')) + self.err_worker.start() + + @staticmethod + def initialization_worker(owner): + """ + Worker thread. + Responsible for detecting if service died during initialization steps and ensuring if enough + time has passed since the startup message has been seen. + When either of these happens, we just raise the initialization semaphore. + """ + + while True: + status = owner.process_handle.poll() + if status: + # Service died. + print('%s did not initialize properly - finished with: %s' % (owner.name, status)) + owner.initialization_ok = False + owner.initialization_semaphore.release() + break + else: + # Service is still running. + startup_message_ts = owner.startup_message_ts + if startup_message_ts: + # The log message has been registered (by pipe_handler thread), let's just ensure that + # some time has passed and mark the service as running. + current_time = int(round(time.time())) + if current_time - startup_message_ts >= ProcessWorker.INITIALIZATION_WAIT_SECONDS: + print( + 'Startup message seen %s seconds ago, and service is still running' % + (ProcessWorker.INITIALIZATION_WAIT_SECONDS), + flush=True) + owner.initialization_ok = True + owner.initialization_semaphore.release() + break + time.sleep(1) + print('Initialization worker for %s has finished' % (owner.name)) + + @staticmethod + def pipe_handler(owner, pipe, pipe_name): + """ + Worker thread. + If a service startup message is seen, then it just registers the timestamp of its appearance. + Also prints every received message. + """ + + try: + for raw_line in pipe: + line = raw_line.decode().rstrip() + print('%s(%s):' % (owner.name, pipe_name), line, flush=True) + if owner.startup_message in line: + print( + '%s initialization message [%s] has been logged' % + (owner.name, owner.startup_message)) + owner.startup_message_ts = int(round(time.time())) + finally: + pipe.close() + print('Pipe handler for %s(%s) has finished' % (owner.name, pipe_name)) + + def await_startup(self): + """ + Awaits on initialization semaphore, and then verifies the initialization state. + If everything is okay, we just continue (we can use the service), otherwise throw. + """ + + print('Waiting for %s to start...' % (self.name)) + self.initialization_semaphore.acquire() + try: + if self.initialization_ok: + print('Service %s started successfully' % (self.name)) + else: + raise Exception('%s could not start' % (self.name)) + finally: + self.initialization_semaphore.release() + + def check_state(self): + """ + Verifies if the service is still running. Throws if it is not. + """ + + status = self.process_handle.poll() + if status: + raise Exception('%s died with: %s' % (self.name, str(status))) + + def kill(self): + """ + Utility method to kill the main service thread and all related workers. + """ + + print('Stopping service %s' % self.name) + + # Kill the real process. + self.process_handle.kill() + self.process_handle.wait() + + # The sub-workers are going to finish on their own, as they will detect main thread dying + # (through pipes closing, or .poll() returning a non-null value). + self.state_worker.join() + self.out_worker.join() + self.err_worker.join() + + print('Service %s has been stopped' % self.name) + + +class RenderingHelper: + """ + Helper for jinja templates. + """ + + @staticmethod + def get_template(template): + import jinja2 + import os + import sys + # Templates are resolved relatively to main start script, due to main & test templates being + # stored in different directories. + env = jinja2.Environment( + loader=jinja2.FileSystemLoader(searchpath=os.path.dirname(os.path.abspath(__file__)))) + return env.get_template(template) + + +if __name__ == '__main__': + unittest.main() diff --git a/contrib/kafka/filters/network/test/mesh/integration_test/kafka_server_properties.j2 b/contrib/kafka/filters/network/test/mesh/integration_test/kafka_server_properties.j2 new file mode 100644 index 0000000000000..021991a0d4670 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/integration_test/kafka_server_properties.j2 @@ -0,0 +1,31 @@ +broker.id=0 +listeners=PLAINTEXT://127.0.0.1:{{ data['kafka_real_port'] }} +advertised.listeners=PLAINTEXT://127.0.0.1:{{ data['kafka_real_port'] }} + +num.network.threads=3 +num.io.threads=8 +socket.send.buffer.bytes=102400 +socket.receive.buffer.bytes=102400 +socket.request.max.bytes=104857600 + +log.dirs={{ data['data_dir'] }} +num.partitions=1 +num.recovery.threads.per.data.dir=1 + +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +log.retention.hours=168 +log.segment.bytes=1073741824 +log.retention.check.interval.ms=300000 + +# As we are going to have multiple Kafka clusters (not even brokers!), +# we need to register them at different paths in ZK. +zookeeper.connect=127.0.0.1:{{ data['zk_port'] }}/{{ data['kafka_zk_instance'] }} +zookeeper.connection.timeout.ms=6000 + +group.initial.rebalance.delay.ms=0 + +# The number of __consumer_offsets partitions is reduced to make logs a bit more readable. +offsets.topic.num.partitions=5 diff --git a/contrib/kafka/filters/network/test/mesh/integration_test/zookeeper_properties.j2 b/contrib/kafka/filters/network/test/mesh/integration_test/zookeeper_properties.j2 new file mode 100644 index 0000000000000..be524bea342bc --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/integration_test/zookeeper_properties.j2 @@ -0,0 +1,5 @@ +clientPort={{ data['zk_port'] }} +dataDir={{ data['data_dir'] }} +maxClientCnxns=0 +# ZK 3.5 tries to bind 8080 for introspection capacility - we do not need that. +admin.enableServer=false diff --git a/contrib/kafka/filters/network/test/mesh/upstream_config_unit_test.cc b/contrib/kafka/filters/network/test/mesh/upstream_config_unit_test.cc new file mode 100644 index 0000000000000..23bfb039b9a03 --- /dev/null +++ b/contrib/kafka/filters/network/test/mesh/upstream_config_unit_test.cc @@ -0,0 +1,143 @@ +#include "source/common/protobuf/utility.h" + +#include "test/test_common/utility.h" + +#include "contrib/kafka/filters/network/source/mesh/upstream_config.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace Kafka { +namespace Mesh { + +TEST(UpstreamKafkaConfigurationTest, shouldThrowIfNoKafkaClusters) { + // given + KafkaMeshProtoConfig proto_config; + + // when + // then - exception gets thrown + EXPECT_THROW_WITH_REGEX(UpstreamKafkaConfigurationImpl{proto_config}, EnvoyException, + "at least one upstream Kafka cluster"); +} + +TEST(UpstreamKafkaConfigurationTest, shouldThrowIfKafkaClustersWithSameName) { + // given + const std::string yaml = R"EOF( +advertised_host: mock +advertised_port: 1 +upstream_clusters: +- cluster_name: REPEATEDNAME + bootstrap_servers: mock + partition_count : 1 +- cluster_name: REPEATEDNAME + bootstrap_servers: mock + partition_count : 1 +forwarding_rules: + )EOF"; + KafkaMeshProtoConfig proto_config; + TestUtility::loadFromYamlAndValidate(yaml, proto_config); + + // when + // then - exception gets thrown + EXPECT_THROW_WITH_REGEX(UpstreamKafkaConfigurationImpl{proto_config}, EnvoyException, + "multiple Kafka clusters referenced by the same name"); +} + +TEST(UpstreamKafkaConfigurationTest, shouldThrowIfNoForwardingRules) { + // given + const std::string yaml = R"EOF( +advertised_host: mock_host +advertised_port: 42 +upstream_clusters: +- cluster_name: mock + bootstrap_servers: mock + partition_count : 1 +forwarding_rules: + )EOF"; + KafkaMeshProtoConfig proto_config; + TestUtility::loadFromYamlAndValidate(yaml, proto_config); + + // when + // then - exception gets thrown + EXPECT_THROW_WITH_REGEX(UpstreamKafkaConfigurationImpl{proto_config}, EnvoyException, + "at least one forwarding rule"); +} + +TEST(UpstreamKafkaConfigurationTest, shouldThrowIfForwardingRuleWithUnknownTarget) { + // given + const std::string yaml = R"EOF( +advertised_host: mock_host +advertised_port: 42 +upstream_clusters: +- cluster_name: mock + bootstrap_servers: mock + partition_count : 1 +forwarding_rules: +- target_cluster: BADNAME + topic_prefix: mock + )EOF"; + KafkaMeshProtoConfig proto_config; + TestUtility::loadFromYamlAndValidate(yaml, proto_config); + + // when + // then - exception gets thrown + EXPECT_THROW_WITH_REGEX(UpstreamKafkaConfigurationImpl{proto_config}, EnvoyException, + "forwarding rule is referencing unknown upstream Kafka cluster"); +} + +TEST(UpstreamKafkaConfigurationTest, shouldBehaveProperly) { + // given + const std::string yaml = R"EOF( +advertised_host: mock_host +advertised_port: 42 +upstream_clusters: +- cluster_name: cluster1 + bootstrap_servers: s1 + partition_count : 1 +- cluster_name: cluster2 + bootstrap_servers: s2 + partition_count : 2 +forwarding_rules: +- target_cluster: cluster1 + topic_prefix: prefix1 +- target_cluster: cluster2 + topic_prefix: prefix2 + )EOF"; + KafkaMeshProtoConfig proto_config; + TestUtility::loadFromYamlAndValidate(yaml, proto_config); + const UpstreamKafkaConfiguration& testee = UpstreamKafkaConfigurationImpl{proto_config}; + + const ClusterConfig cluster1 = {"cluster1", 1, {{"bootstrap.servers", "s1"}}}; + const ClusterConfig cluster2 = {"cluster2", 2, {{"bootstrap.servers", "s2"}}}; + + // when, then (advertised address is returned properly) + const auto address = testee.getAdvertisedAddress(); + EXPECT_EQ(address.first, "mock_host"); + EXPECT_EQ(address.second, 42); + + // when, then (matching prefix with something more) + const auto res1 = testee.computeClusterConfigForTopic("prefix1somethingmore"); + ASSERT_TRUE(res1.has_value()); + EXPECT_EQ(*res1, cluster1); + + // when, then (matching prefix alone) + const auto res2 = testee.computeClusterConfigForTopic("prefix1"); + ASSERT_TRUE(res2.has_value()); + EXPECT_EQ(*res2, cluster1); + + // when, then (failing to match first rule, but then matching the second one) + const auto res3 = testee.computeClusterConfigForTopic("prefix2somethingmore"); + ASSERT_TRUE(res3.has_value()); + EXPECT_EQ(*res3, cluster2); + + // when, then (no rules match) + const auto res4 = testee.computeClusterConfigForTopic("someotherthing"); + EXPECT_FALSE(res4.has_value()); +} + +} // namespace Mesh +} // namespace Kafka +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst b/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst index 4753f3845a78c..2c286686f33f5 100644 --- a/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst +++ b/docs/root/configuration/listeners/network_filters/kafka_broker_filter.rst @@ -14,6 +14,10 @@ this filter) are forwarded as-is. * :ref:`v3 API reference ` * This filter should be configured with the name *envoy.filters.network.kafka_broker*. +.. attention:: + + The Kafka broker filter is only included in :ref:`contrib images ` + .. attention:: The kafka_broker filter is experimental and is currently under active development. diff --git a/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst b/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst new file mode 100644 index 0000000000000..4a8504b7d67e3 --- /dev/null +++ b/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst @@ -0,0 +1,103 @@ +.. _config_network_filters_kafka_mesh: + +Kafka Mesh filter +=================== + +The Apache Kafka mesh filter provides a facade for `Apache Kafka `_ +producers. Produce requests sent to this filter insance can be forwarded to one of multiple +clusters, depending on configured forwarding rules. Corresponding message versions from +Kafka 2.4.0 are supported. + +* :ref:`v3 API reference ` +* This filter should be configured with the name *envoy.filters.network.kafka_mesh*. + +.. attention:: + + The Kafka mesh filter is only included in :ref:`contrib images ` + +.. attention:: + + The kafka_mesh filter is experimental and is currently under active development. + Capabilities will be expanded over time and the configuration structures are likely to change. + +.. attention:: + + The kafka_mesh filter is does not work on Windows (the blocker is getting librdkafka compiled). + +.. _config_network_filters_kafka_mesh_config: + +Configuration +------------- + +Below example shows us typical filter configuration that proxies 3 Kafka clusters. +Clients are going to connect to '127.0.0.1:19092', and their messages are going to be distributed +to cluster depending on topic names. + +.. code-block:: yaml + + listeners: + - address: + socket_address: + address: 127.0.0.1 # Host that Kafka clients should connect to. + port_value: 19092 # Port that Kafka clients should connect to. + filter_chains: + - filters: + - name: envoy.filters.network.kafka_mesh + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh + advertised_host: "127.0.0.1" + advertised_port: 19092 + upstream_clusters: + - cluster_name: kafka_c1 + bootstrap_servers: cluster1_node1:9092,cluster1_node2:9092,cluster1_node3:9092 + partition_count: 1 + - cluster_name: kafka_c2 + bootstrap_servers: cluster2_node1:9092,cluster2_node2:9092,cluster2_node3:9092 + partition_count: 1 + - cluster_name: kafka_c3 + bootstrap_servers: cluster3_node1:9092,cluster3_node2:9092 + partition_count: 5 + producer_config: + acks: "1" + linger.ms: "500" + forwarding_rules: + - target_cluster: kafka_c1 + topic_prefix: apples + - target_cluster: kafka_c2 + topic_prefix: bananas + - target_cluster: kafka_c3 + topic_prefix: cherries + +It should be noted that Kafka broker filter can be inserted before Kafka mesh filter in the filter +chain to capture the request processing metrics. + +.. _config_network_filters_kafka_mesh_notes: + +Notes +----- +Given that this filter does its own processing of received requests, there are some changes +in behaviour compared to explicit connection to a Kafka cluster: + +#. Record headers are not sent upstream. +#. Only ProduceRequests with version 2 are supported (what means very old producers like 0.8 are + not going to be supported). +#. Python producers need to set API version of at least 1.0.0, so that the produce requests they + send are going to have records with magic equal to 2. +#. Downstream handling of Kafka producer 'acks' property is delegated to upstream client. + E.g. if upstream client is configured to use acks=0 then the response is going to be sent + to downstream client as soon as possible (even if they had non-zero acks!). +#. As the filter splits single producer requests into separate records, it's possible that delivery + of only some of these records fails. In that case, the response returned to upstream client is + a failure, however it is possible some of the records have been appended in target cluster. +#. Because of the splitting mentioned above, records are not necessarily appended one after another + (as they do not get sent as single request to upstream). Users that want to avoid this scenario + might want to take a look into downstream producer configs: 'linger.ms' and 'batch.size'. +#. Produce requests that reference to topics that do not match any of the rules are going to close + connection and fail. This usually should not happen (clients request metadata first, and they + should then fail with 'no broker available' first), but is possible if someone tailors binary + payloads over the connection. +#. librdkafka was compiled without ssl, lz4, gssapi, so related custom producer config options are + not supported. +#. Invalid custom producer configs are not found at startup (only when appropriate clusters are + being sent to). Requests that would have referenced these clusters are going to close connection + and fail. diff --git a/docs/root/configuration/listeners/network_filters/network_filters.rst b/docs/root/configuration/listeners/network_filters/network_filters.rst index a4b918ddf380b..d4dcc5e86c97a 100644 --- a/docs/root/configuration/listeners/network_filters/network_filters.rst +++ b/docs/root/configuration/listeners/network_filters/network_filters.rst @@ -17,6 +17,7 @@ filters. direct_response_filter ext_authz_filter kafka_broker_filter + kafka_mesh_filter local_rate_limit_filter mongo_proxy_filter mysql_proxy_filter diff --git a/generated_api_shadow/BUILD b/generated_api_shadow/BUILD index 5bbde32946b63..93f9184a2b400 100644 --- a/generated_api_shadow/BUILD +++ b/generated_api_shadow/BUILD @@ -60,6 +60,7 @@ proto_library( "//contrib/envoy/extensions/filters/http/squash/v3:pkg", "//contrib/envoy/extensions/filters/http/sxg/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/kafka_broker/v3:pkg", + "//contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/mysql_proxy/v3:pkg", "//contrib/envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg", "//contrib/envoy/extensions/filters/network/rocketmq_proxy/v3:pkg", diff --git a/generated_api_shadow/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/BUILD b/generated_api_shadow/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/BUILD new file mode 100644 index 0000000000000..ee92fb652582e --- /dev/null +++ b/generated_api_shadow/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], +) diff --git a/generated_api_shadow/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto b/generated_api_shadow/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto new file mode 100644 index 0000000000000..03a6522852ab5 --- /dev/null +++ b/generated_api_shadow/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto @@ -0,0 +1,58 @@ +syntax = "proto3"; + +package envoy.extensions.filters.network.kafka_mesh.v3alpha; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.network.kafka_mesh.v3alpha"; +option java_outer_classname = "KafkaMeshProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).work_in_progress = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Kafka Mesh] +// Kafka Mesh :ref:`configuration overview `. +// [#extension: envoy.filters.network.kafka_mesh] + +message KafkaMesh { + // Envoy's host that's advertised to clients. + // Has the same meaning as corresponding Kafka broker properties. + // Usually equal to filter chain's listener config, but needs to be reachable by clients + // (so 0.0.0.0 will not work). + string advertised_host = 1 [(validate.rules).string = {min_len: 1}]; + + // Envoy's port that's advertised to clients. + int32 advertised_port = 2 [(validate.rules).int32 = {gt: 0}]; + + // Upstream clusters this filter will connect to. + repeated KafkaClusterDefinition upstream_clusters = 3; + + // Rules that will decide which cluster gets which request. + repeated ForwardingRule forwarding_rules = 4; +} + +message KafkaClusterDefinition { + // Cluster name. + string cluster_name = 1 [(validate.rules).string = {min_len: 1}]; + + // Kafka cluster address. + string bootstrap_servers = 2 [(validate.rules).string = {min_len: 1}]; + + // Default number of partitions present in this cluster. + // This is especially important for clients that do not specify partition in their payloads and depend on this value for hashing. + int32 partition_count = 3 [(validate.rules).int32 = {gt: 0}]; + + // Custom configuration passed to Kafka producer. + map producer_config = 4; +} + +message ForwardingRule { + // Cluster name. + string target_cluster = 1; + + oneof trigger { + // Intended place for future types of forwarding rules. + string topic_prefix = 2; + } +} diff --git a/tools/code_format/check_format.py b/tools/code_format/check_format.py index d46f40af7dc2c..fa88387510e19 100755 --- a/tools/code_format/check_format.py +++ b/tools/code_format/check_format.py @@ -257,6 +257,8 @@ "extensions/filters/network/redis_proxy", "extensions/filters/network/kafka", "extensions/filters/network/kafka/broker", + "extensions/filters/network/kafka/mesh", + "extensions/filters/network/kafka/mesh/command_handlers", "extensions/filters/network/kafka/protocol", "extensions/filters/network/kafka/serialization", "extensions/filters/network/mongo_proxy",