diff --git a/api/docs/BUILD b/api/docs/BUILD index b6840bd844a5c..11965bd89e65e 100644 --- a/api/docs/BUILD +++ b/api/docs/BUILD @@ -33,6 +33,7 @@ proto_library( "//envoy/config/accesslog/v2:als", "//envoy/config/accesslog/v2:file", "//envoy/config/bootstrap/v2:bootstrap", + "//envoy/config/cluster/redis:redis_cluster", "//envoy/config/common/tap/v2alpha:common", "//envoy/config/filter/accesslog/v2:accesslog", "//envoy/config/filter/dubbo/router/v2alpha1:router", diff --git a/api/envoy/config/cluster/redis/BUILD b/api/envoy/config/cluster/redis/BUILD new file mode 100644 index 0000000000000..42e2d408e3584 --- /dev/null +++ b/api/envoy/config/cluster/redis/BUILD @@ -0,0 +1,8 @@ +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library_internal") + +licenses(["notice"]) # Apache 2 + +api_proto_library_internal( + name = "redis_cluster", + srcs = ["redis_cluster.proto"], +) diff --git a/api/envoy/config/cluster/redis/redis_cluster.proto b/api/envoy/config/cluster/redis/redis_cluster.proto new file mode 100644 index 0000000000000..2644288c40d2d --- /dev/null +++ b/api/envoy/config/cluster/redis/redis_cluster.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +package envoy.config.cluster.redis; + +option java_outer_classname = "RedisClusterProto"; +option java_multiple_files = true; +option java_package = "io.envoyproxy.envoy.config.cluster.redis"; +option go_package = "v2"; + +import "google/protobuf/duration.proto"; + +import "validate/validate.proto"; +import "gogoproto/gogo.proto"; + +// [#protodoc-title: Redis Cluster Configuration] +// This cluster adds support for `Redis Cluster `_, as part +// of :ref:`Envoy's support for Redis Cluster `. +// +// Redis Cluster is an extension of Redis which supports sharding and high availability (where a +// shard that loses its master fails over to a replica, and designates it as the new master). +// However, as there is no unified frontend or proxy service in front of Redis Cluster, the client +// (in this case Envoy) must locally maintain the state of the Redis Cluster, specifically the +// topology. A random node in the cluster is queried for the topology using the `CLUSTER SLOTS +// command `_. This result is then stored locally, and +// updated at user-configured intervals. +// +// Example: +// +// .. code-block:: yaml +// +// name: name +// connect_timeout: 0.25s +// dns_lookup_family: V4_ONLY +// hosts: +// - socket_address: +// address: foo.bar.com +// port_value: 22120 +// cluster_type: +// name: envoy.clusters.redis +// typed_config: +// "@type": type.googleapis.com/google.protobuf.Struct +// value: +// cluster_refresh_rate: 30s +// cluster_refresh_timeout: 0.5s + +message RedisClusterConfig { + // Interval between successive topology refresh requests. If not set, this defaults to 5s. + google.protobuf.Duration cluster_refresh_rate = 1 + [(validate.rules).duration.gt = {}, (gogoproto.stdduration) = true]; + + // Timeout for topology refresh request. If not set, this defaults to 3s. + google.protobuf.Duration cluster_refresh_timeout = 2 + [(validate.rules).duration.gt = {}, (gogoproto.stdduration) = true]; +} diff --git a/docs/build.sh b/docs/build.sh index c815b80f4a621..caa6264ee0eed 100755 --- a/docs/build.sh +++ b/docs/build.sh @@ -84,6 +84,7 @@ PROTO_RST=" /envoy/config/accesslog/v2/als/envoy/config/accesslog/v2/als.proto.rst /envoy/config/accesslog/v2/file/envoy/config/accesslog/v2/file.proto.rst /envoy/config/bootstrap/v2/bootstrap/envoy/config/bootstrap/v2/bootstrap.proto.rst + /envoy/config/cluster/redis/redis_cluster/envoy/config/cluster/redis/redis_cluster.proto.rst /envoy/config/common/tap/v2alpha/common/envoy/config/common/tap/v2alpha/common.proto.rst /envoy/config/ratelimit/v2/rls/envoy/config/ratelimit/v2/rls.proto.rst /envoy/config/metrics/v2/metrics_service/envoy/config/metrics/v2/metrics_service.proto.rst diff --git a/docs/root/api-v2/config/cluster/cluster.rst b/docs/root/api-v2/config/cluster/cluster.rst new file mode 100644 index 0000000000000..7bb5343e81dd2 --- /dev/null +++ b/docs/root/api-v2/config/cluster/cluster.rst @@ -0,0 +1,8 @@ +Cluster +======= + +.. toctree:: + :glob: + :maxdepth: 1 + + redis/* diff --git a/docs/root/api-v2/config/config.rst b/docs/root/api-v2/config/config.rst index 65e276183e5fb..134d5101c7c83 100644 --- a/docs/root/api-v2/config/config.rst +++ b/docs/root/api-v2/config/config.rst @@ -12,3 +12,4 @@ Extensions transport_socket/transport_socket resource_monitor/resource_monitor common/common + cluster/cluster diff --git a/docs/root/intro/arch_overview/redis.rst b/docs/root/intro/arch_overview/redis.rst index 4d2929a14e2ae..866f2ad01ad9a 100644 --- a/docs/root/intro/arch_overview/redis.rst +++ b/docs/root/intro/arch_overview/redis.rst @@ -57,6 +57,28 @@ If passive healthchecking is desired, also configure For the purposes of passive healthchecking, connect timeouts, command timeouts, and connection close map to 5xx. All other responses from Redis are counted as a success. +Redis Cluster Support (Experimental) +---------------------------------------- + +Envoy currently offers experimental support for `Redis Cluster `_. + +When using Envoy as a sidecar proxy for a Redis Cluster, the service can use a non-cluster Redis client +implemented in any language to connect to the proxy as if it's a single node Redis instance. +The Envoy proxy will keep track of the cluster topology and send commands to the correct Redis node in the +cluster according to the `spec `_. Advance features such as reading +from replicas can also be added to the Envoy proxy instead of updating redis clients in each language. + +Envoy proxy tracks the topology of the cluster by sending periodic +`cluster slots `_ commands to a random node in the cluster, and maintains the +following information: + +* List of known nodes. +* The masters for each shard. +* Nodes entering or leaving the cluster. + +For topology configuration details, see the Redis Cluster +:ref:`v2 API reference `. + Supported commands ------------------ diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 6fe432f0139f2..b4e408f2a15e2 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -12,6 +12,7 @@ Version history * hot restart: stats are no longer shared between hot restart parent/child via shared memory, but rather by RPC. Hot restart version incremented to 11. * http: mitigated a race condition with the :ref:`delayed_close_timeout` where it could trigger while actively flushing a pending write buffer for a downstream connection. * jwt_authn: make filter's parsing of JWT more flexible, allowing syntax like ``jwt=eyJhbGciOiJS...ZFnFIw,extra=7,realm=123`` +* redis: add support for Redis cluster custom cluster type. * redis: added :ref:`prefix routing ` to enable routing commands based on their key's prefix to different upstream. * redis: add support for zpopmax and zpopmin commands. * redis: added diff --git a/source/common/upstream/logical_dns_cluster.cc b/source/common/upstream/logical_dns_cluster.cc index 6f1c4547180d3..e511deaf0652f 100644 --- a/source/common/upstream/logical_dns_cluster.cc +++ b/source/common/upstream/logical_dns_cluster.cc @@ -42,19 +42,7 @@ LogicalDnsCluster::LogicalDnsCluster( } } - switch (cluster.dns_lookup_family()) { - case envoy::api::v2::Cluster::V6_ONLY: - dns_lookup_family_ = Network::DnsLookupFamily::V6Only; - break; - case envoy::api::v2::Cluster::V4_ONLY: - dns_lookup_family_ = Network::DnsLookupFamily::V4Only; - break; - case envoy::api::v2::Cluster::AUTO: - dns_lookup_family_ = Network::DnsLookupFamily::Auto; - break; - default: - NOT_REACHED_GCOVR_EXCL_LINE; - } + dns_lookup_family_ = getDnsLookupFamilyFromCluster(cluster); const envoy::api::v2::core::SocketAddress& socket_address = lbEndpoint().endpoint().address().socket_address(); diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index b385350e1b1a1..18542d18b13e1 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -1288,20 +1288,7 @@ StrictDnsClusterImpl::StrictDnsClusterImpl( local_info_(factory_context.localInfo()), dns_resolver_(dns_resolver), dns_refresh_rate_ms_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))) { - switch (cluster.dns_lookup_family()) { - case envoy::api::v2::Cluster::V6_ONLY: - dns_lookup_family_ = Network::DnsLookupFamily::V6Only; - break; - case envoy::api::v2::Cluster::V4_ONLY: - dns_lookup_family_ = Network::DnsLookupFamily::V4Only; - break; - case envoy::api::v2::Cluster::AUTO: - dns_lookup_family_ = Network::DnsLookupFamily::Auto; - break; - default: - NOT_REACHED_GCOVR_EXCL_LINE; - } - + dns_lookup_family_ = getDnsLookupFamilyFromCluster(cluster); const envoy::api::v2::ClusterLoadAssignment load_assignment( cluster.has_load_assignment() ? cluster.load_assignment() : Config::Utility::translateClusterHosts(cluster.hosts())); @@ -1320,6 +1307,19 @@ StrictDnsClusterImpl::StrictDnsClusterImpl( load_assignment.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor); } +Network::DnsLookupFamily getDnsLookupFamilyFromCluster(const envoy::api::v2::Cluster& cluster) { + switch (cluster.dns_lookup_family()) { + case envoy::api::v2::Cluster::V6_ONLY: + return Network::DnsLookupFamily::V6Only; + case envoy::api::v2::Cluster::V4_ONLY: + return Network::DnsLookupFamily::V4Only; + case envoy::api::v2::Cluster::AUTO: + return Network::DnsLookupFamily::Auto; + default: + NOT_REACHED_GCOVR_EXCL_LINE; + } +} + void StrictDnsClusterImpl::startPreInit() { for (const ResolveTargetPtr& target : resolve_targets_) { target->startResolve(); diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 5bbf793e7bf4b..33a4dbfe85e1d 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -808,6 +808,11 @@ class BaseDynamicClusterImpl : public ClusterImplBase { HostMap& updated_hosts, const HostMap& all_hosts); }; +/** + * Utility function to get Dns from cluster. + */ +Network::DnsLookupFamily getDnsLookupFamilyFromCluster(const envoy::api::v2::Cluster& cluster); + /** * Implementation of Upstream::Cluster that does periodic DNS resolution and updates the host * member set if the DNS members change. diff --git a/source/extensions/clusters/redis/BUILD b/source/extensions/clusters/redis/BUILD new file mode 100644 index 0000000000000..7248e397168b6 --- /dev/null +++ b/source/extensions/clusters/redis/BUILD @@ -0,0 +1,40 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_library( + name = "redis_cluster", + srcs = [ + "redis_cluster.cc", + "redis_cluster.h", + ], + deps = [ + "//include/envoy/api:api_interface", + "//include/envoy/http:codec_interface", + "//include/envoy/upstream:cluster_factory_interface", + "//include/envoy/upstream:cluster_manager_interface", + "//include/envoy/upstream:upstream_interface", + "//source/common/config:metadata_lib", + "//source/common/event:dispatcher_lib", + "//source/common/json:config_schemas_lib", + "//source/common/json:json_loader_lib", + "//source/common/network:utility_lib", + "//source/common/singleton:manager_impl_lib", + "//source/common/upstream:cluster_factory_lib", + "//source/common/upstream:upstream_includes", + "//source/common/upstream:upstream_lib", + "//source/extensions/clusters:well_known_names", + "//source/extensions/filters/network/common/redis:client_interface", + "//source/extensions/filters/network/common/redis:client_lib", + "//source/extensions/filters/network/common/redis:codec_interface", + "//source/extensions/transport_sockets/raw_buffer:config", + "//source/server:transport_socket_config_lib", + "@envoy_api//envoy/config/cluster/redis:redis_cluster_cc", + ], +) diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc new file mode 100644 index 0000000000000..96c64714211c9 --- /dev/null +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -0,0 +1,313 @@ +#include "redis_cluster.h" + +#include + +namespace Envoy { +namespace Extensions { +namespace Clusters { +namespace Redis { + +RedisCluster::RedisCluster( + const envoy::api::v2::Cluster& cluster, + const envoy::config::cluster::redis::RedisClusterConfig& redisCluster, + NetworkFilters::Common::Redis::Client::ClientFactory& redis_client_factory, + Upstream::ClusterManager& clusterManager, Runtime::Loader& runtime, + Network::DnsResolverSharedPtr dns_resolver, + Server::Configuration::TransportSocketFactoryContext& factory_context, + Stats::ScopePtr&& stats_scope, bool added_via_api) + : Upstream::BaseDynamicClusterImpl(cluster, runtime, factory_context, std::move(stats_scope), + added_via_api), + cluster_manager_(clusterManager), + cluster_refresh_rate_(std::chrono::milliseconds( + PROTOBUF_GET_MS_OR_DEFAULT(redisCluster, cluster_refresh_rate, 5000))), + cluster_refresh_timeout_(std::chrono::milliseconds( + PROTOBUF_GET_MS_OR_DEFAULT(redisCluster, cluster_refresh_timeout, 3000))), + dispatcher_(factory_context.dispatcher()), dns_resolver_(std::move(dns_resolver)), + dns_lookup_family_(Upstream::getDnsLookupFamilyFromCluster(cluster)), + load_assignment_(cluster.has_load_assignment() + ? cluster.load_assignment() + : Config::Utility::translateClusterHosts(cluster.hosts())), + local_info_(factory_context.localInfo()), random_(factory_context.random()), + redis_discovery_session_(*this, redis_client_factory) { + const auto& locality_lb_endpoints = load_assignment_.endpoints(); + for (const auto& locality_lb_endpoint : locality_lb_endpoints) { + for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { + const auto& host = lb_endpoint.endpoint().address(); + dns_discovery_resolve_targets_.emplace_back(new DnsDiscoveryResolveTarget( + *this, host.socket_address().address(), host.socket_address().port_value(), + locality_lb_endpoint, lb_endpoint)); + } + } +}; + +void RedisCluster::startPreInit() { + for (const DnsDiscoveryResolveTargetPtr& target : dns_discovery_resolve_targets_) { + target->startResolve(); + } +} + +void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added, + const Upstream::HostVector& hosts_removed, + uint32_t current_priority) { + Upstream::PriorityStateManager priority_state_manager(*this, local_info_, nullptr); + + auto locality_lb_endpoint = localityLbEndpoint(); + priority_state_manager.initializePriorityFor(locality_lb_endpoint); + for (const Upstream::HostSharedPtr& host : hosts_) { + if (locality_lb_endpoint.priority() == current_priority) { + priority_state_manager.registerHostForPriority(host, locality_lb_endpoint); + } + } + + priority_state_manager.updateClusterPrioritySet( + current_priority, std::move(priority_state_manager.priorityState()[current_priority].first), + hosts_added, hosts_removed, absl::nullopt); +} + +void RedisCluster::onClusterSlotUpdate(const std::vector& slots) { + Upstream::HostVector new_hosts; + SlotArray slots_; + + for (const ClusterSlot& slot : slots) { + new_hosts.emplace_back(new RedisHost(info(), "", slot.master_, *this, true)); + } + + std::unordered_map updated_hosts; + Upstream::HostVector hosts_added; + Upstream::HostVector hosts_removed; + if (updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed, updated_hosts, + all_hosts_)) { + ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) { + return host->priority() == localityLbEndpoint().priority(); + })); + updateAllHosts(hosts_added, hosts_removed, localityLbEndpoint().priority()); + } else { + info_->stats().update_no_rebuild_.inc(); + } + + for (const ClusterSlot& slot : slots) { + auto host = updated_hosts.find(slot.master_->asString()); + ASSERT(host != updated_hosts.end(), "we expect all address to be found in the updated_hosts"); + for (auto i = slot.start_; i <= slot.end_; ++i) { + slots_[i] = host->second; + } + } + + all_hosts_ = std::move(updated_hosts); + cluster_slots_map_.swap(slots_); + + // TODO(hyang): If there is an initialize callback, fire it now. Note that if the + // cluster refers to multiple DNS names, this will return initialized after a single + // DNS resolution completes. This is not perfect but is easier to code and it is unclear + // if the extra complexity is needed so will start with this. + onPreInitComplete(); +} + +// DnsDiscoveryResolveTarget +RedisCluster::DnsDiscoveryResolveTarget::DnsDiscoveryResolveTarget( + RedisCluster& parent, const std::string& dns_address, const uint32_t port, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint) + : parent_(parent), dns_address_(dns_address), port_(port), + locality_lb_endpoint_(locality_lb_endpoint), lb_endpoint_(lb_endpoint) {} + +RedisCluster::DnsDiscoveryResolveTarget::~DnsDiscoveryResolveTarget() { + if (active_query_) { + active_query_->cancel(); + } +} + +void RedisCluster::DnsDiscoveryResolveTarget::startResolve() { + ENVOY_LOG(trace, "starting async DNS resolution for {}", dns_address_); + + active_query_ = parent_.dns_resolver_->resolve( + dns_address_, parent_.dns_lookup_family_, + [this](const std::list&& address_list) -> void { + active_query_ = nullptr; + ENVOY_LOG(trace, "async DNS resolution complete for {}", dns_address_); + parent_.redis_discovery_session_.registerDiscoveryAddress(address_list, port_); + parent_.redis_discovery_session_.startResolve(); + }); +} + +// RedisCluster +RedisCluster::RedisDiscoverySession::RedisDiscoverySession( + Envoy::Extensions::Clusters::Redis::RedisCluster& parent, + NetworkFilters::Common::Redis::Client::ClientFactory& client_factory) + : parent_(parent), dispatcher_(parent.dispatcher_), + resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { startResolve(); })), + client_factory_(client_factory), buffer_timeout_(0) {} + +namespace { +// Convert the cluster slot IP/Port response to and address, return null if the response does not +// match the expected type. +Network::Address::InstanceConstSharedPtr +ProcessCluster(const NetworkFilters::Common::Redis::RespValue& value) { + if (value.type() != NetworkFilters::Common::Redis::RespType::Array) { + return nullptr; + } + auto& array = value.asArray(); + + if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString || + array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) { + return nullptr; + } + + std::string address = array[0].asString(); + bool ipv6 = (address.find(":") != std::string::npos); + if (ipv6) { + return std::make_shared(address, array[1].asInteger()); + } + return std::make_shared(address, array[1].asInteger()); +} +} // namespace + +RedisCluster::RedisDiscoverySession::~RedisDiscoverySession() { + if (current_request_) { + current_request_->cancel(); + current_request_ = nullptr; + } + + while (!client_map_.empty()) { + client_map_.begin()->second->client_->close(); + } +} + +void RedisCluster::RedisDiscoveryClient::onEvent(Network::ConnectionEvent event) { + if (event == Network::ConnectionEvent::RemoteClose || + event == Network::ConnectionEvent::LocalClose) { + auto client_to_delete = parent_.client_map_.find(host_); + ASSERT(client_to_delete != parent_.client_map_.end()); + parent_.dispatcher_.deferredDelete(std::move(client_to_delete->second->client_)); + parent_.client_map_.erase(client_to_delete); + } +} + +void RedisCluster::RedisDiscoverySession::registerDiscoveryAddress( + const std::list& address_list, + const uint32_t port) { + // Since the address from DNS does not have port, we need to make a new address that has port in + // it. + for (const Network::Address::InstanceConstSharedPtr& address : address_list) { + ASSERT(address != nullptr); + discovery_address_list_.push_back(Network::Utility::getAddressWithPort(*address, port)); + } +} + +void RedisCluster::RedisDiscoverySession::startResolve() { + parent_.info_->stats().update_attempt_.inc(); + // If a resolution is currently in progress, skip it. + if (current_request_) { + return; + } + + // If hosts is empty, we haven't received a successful result from the CLUSTER SLOTS call yet. + // So, pick a random discovery address from dns and make a request. + Upstream::HostSharedPtr host; + if (parent_.hosts_.empty()) { + const int rand_idx = parent_.random_.random() % discovery_address_list_.size(); + auto it = discovery_address_list_.begin(); + std::next(it, rand_idx); + host = Upstream::HostSharedPtr{new RedisHost(parent_.info(), "", *it, parent_, true)}; + } else { + const int rand_idx = parent_.random_.random() % parent_.hosts_.size(); + host = parent_.hosts_[rand_idx]; + } + + current_host_address_ = host->address()->asString(); + RedisDiscoveryClientPtr& client = client_map_[current_host_address_]; + if (!client) { + client = std::make_unique(*this); + client->host_ = current_host_address_; + client->client_ = client_factory_.create(host, dispatcher_, *this); + client->client_->addConnectionCallbacks(*client); + } + + current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this); +} + +void RedisCluster::RedisDiscoverySession::onResponse( + NetworkFilters::Common::Redis::RespValuePtr&& value) { + current_request_ = nullptr; + + // Do nothing if the cluster is empty. + if (value->type() != NetworkFilters::Common::Redis::RespType::Array || value->asArray().empty()) { + onUnexpectedResponse(value); + return; + } + + std::vector slots_; + + // Loop through the cluster slot response and error checks for each field. + for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) { + if (part.type() != NetworkFilters::Common::Redis::RespType::Array) { + onUnexpectedResponse(value); + return; + } + const std::vector& slot_range = part.asArray(); + if (slot_range.size() < 3 || + slot_range[0].type() != + NetworkFilters::Common::Redis::RespType::Integer || // Start slot range is an integer. + slot_range[1].type() != + NetworkFilters::Common::Redis::RespType::Integer) { // End slot range is an integer. + onUnexpectedResponse(value); + return; + } + + // Field 2: Master address for slot range + // TODO(hyang): For now we're only adding the master node for each slot. When we're ready to + // send requests to replica nodes, we need to add subsequent address in the response as + // replica nodes. + auto master_address = ProcessCluster(slot_range[2]); + if (!master_address) { + onUnexpectedResponse(value); + return; + } + slots_.emplace_back(slot_range[0].asInteger(), slot_range[1].asInteger(), master_address); + } + + parent_.onClusterSlotUpdate(slots_); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); +} + +void RedisCluster::RedisDiscoverySession::onUnexpectedResponse( + const NetworkFilters::Common::Redis::RespValuePtr& value) { + ENVOY_LOG(warn, "Unexpected response to cluster slot command: {}", value->toString()); + this->parent_.info_->stats().update_failure_.inc(); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); +} + +void RedisCluster::RedisDiscoverySession::onFailure() { + current_request_ = nullptr; + if (!current_host_address_.empty()) { + auto client_to_delete = client_map_.find(current_host_address_); + client_to_delete->second->client_->close(); + } + parent_.info()->stats().update_failure_.inc(); + resolve_timer_->enableTimer(parent_.cluster_refresh_rate_); +} + +RedisCluster::ClusterSlotsRequest RedisCluster::ClusterSlotsRequest::instance_; + +Upstream::ClusterImplBaseSharedPtr RedisClusterFactory::createClusterWithConfig( + const envoy::api::v2::Cluster& cluster, + const envoy::config::cluster::redis::RedisClusterConfig& proto_config, + Upstream::ClusterFactoryContext& context, + Envoy::Server::Configuration::TransportSocketFactoryContext& socket_factory_context, + Envoy::Stats::ScopePtr&& stats_scope) { + if (!cluster.has_cluster_type() || + cluster.cluster_type().name() != Extensions::Clusters::ClusterTypes::get().Redis) { + throw EnvoyException("Redis cluster can only created with redis cluster type"); + } + return std::make_shared( + cluster, proto_config, NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_, + context.clusterManager(), context.runtime(), selectDnsResolver(cluster, context), + socket_factory_context, std::move(stats_scope), context.addedViaApi()); +} + +REGISTER_FACTORY(RedisClusterFactory, Upstream::ClusterFactory); + +} // namespace Redis +} // namespace Clusters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h new file mode 100644 index 0000000000000..9c432298acda9 --- /dev/null +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -0,0 +1,285 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "envoy/api/api.h" +#include "envoy/api/v2/cds.pb.h" +#include "envoy/api/v2/core/base.pb.h" +#include "envoy/api/v2/endpoint/endpoint.pb.h" +#include "envoy/config/cluster/redis/redis_cluster.pb.h" +#include "envoy/config/cluster/redis/redis_cluster.pb.validate.h" +#include "envoy/config/typed_metadata.h" +#include "envoy/event/dispatcher.h" +#include "envoy/event/timer.h" +#include "envoy/http/codec.h" +#include "envoy/local_info/local_info.h" +#include "envoy/network/dns.h" +#include "envoy/runtime/runtime.h" +#include "envoy/secret/secret_manager.h" +#include "envoy/server/transport_socket_config.h" +#include "envoy/ssl/context_manager.h" +#include "envoy/stats/scope.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/upstream/cluster_manager.h" +#include "envoy/upstream/health_checker.h" +#include "envoy/upstream/load_balancer.h" +#include "envoy/upstream/locality.h" +#include "envoy/upstream/upstream.h" + +#include "common/common/callback_impl.h" +#include "common/common/enum_to_int.h" +#include "common/common/logger.h" +#include "common/config/metadata.h" +#include "common/config/well_known_names.h" +#include "common/network/address_impl.h" +#include "common/network/utility.h" +#include "common/stats/isolated_store_impl.h" +#include "common/upstream/cluster_factory_impl.h" +#include "common/upstream/load_balancer_impl.h" +#include "common/upstream/outlier_detection_impl.h" +#include "common/upstream/resource_manager_impl.h" +#include "common/upstream/upstream_impl.h" + +#include "server/transport_socket_config_impl.h" + +#include "extensions/clusters/well_known_names.h" +#include "extensions/filters/network/common/redis/client.h" +#include "extensions/filters/network/common/redis/client_impl.h" +#include "extensions/filters/network/common/redis/codec.h" + +namespace Envoy { +namespace Extensions { +namespace Clusters { +namespace Redis { + +/* + * This class implements support for the topology part of `Redis Cluster + * `_. Specifically, it allows Envoy to maintain an internal + * representation of the topology of a Redis Cluster, and how often the topology should be + * refreshed. + * + * The target Redis Cluster is obtained from the yaml config file as usual, and we choose a random + * discovery address from DNS if there are no existing hosts (our startup condition). Otherwise, we + * choose a random host from our known set of hosts. Then, against this host we make a topology + * request. + * + * Topology requests are handled by RedisDiscoverySession, which handles the initialization of + * the `CLUSTER SLOTS command `_, and the responses and + * failure cases. + * + * The topology is stored in cluster_slots_map_. According to the + * `Redis Cluster Spec SlotArray; + +class RedisCluster : public Upstream::BaseDynamicClusterImpl { +public: + RedisCluster(const envoy::api::v2::Cluster& cluster, + const envoy::config::cluster::redis::RedisClusterConfig& redisCluster, + NetworkFilters::Common::Redis::Client::ClientFactory& client_factory, + Upstream::ClusterManager& clusterManager, Runtime::Loader& runtime, + Network::DnsResolverSharedPtr dns_resolver, + Server::Configuration::TransportSocketFactoryContext& factory_context, + Stats::ScopePtr&& stats_scope, bool added_via_api); + + struct ClusterSlotsRequest : public Extensions::NetworkFilters::Common::Redis::RespValue { + public: + ClusterSlotsRequest() : Extensions::NetworkFilters::Common::Redis::RespValue() { + type(Extensions::NetworkFilters::Common::Redis::RespType::Array); + std::vector values(2); + values[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + values[0].asString() = "CLUSTER"; + values[1].type(NetworkFilters::Common::Redis::RespType::BulkString); + values[1].asString() = "SLOTS"; + asArray().swap(values); + } + static ClusterSlotsRequest instance_; + }; + + InitializePhase initializePhase() const override { return InitializePhase::Primary; } + +private: + friend class RedisClusterTest; + + void startPreInit() override; + + void updateAllHosts(const Upstream::HostVector& hosts_added, + const Upstream::HostVector& hosts_removed, uint32_t priority); + + struct ClusterSlot { + ClusterSlot(int64_t start, int64_t end, Network::Address::InstanceConstSharedPtr master) + : start_(start), end_(end), master_(std::move(master)) {} + + int64_t start_; + int64_t end_; + Network::Address::InstanceConstSharedPtr master_; + }; + + void onClusterSlotUpdate(const std::vector&); + + const envoy::api::v2::endpoint::LocalityLbEndpoints& localityLbEndpoint() const { + // Always use the first endpoint. + return load_assignment_.endpoints()[0]; + } + + const envoy::api::v2::endpoint::LbEndpoint& lbEndpoint() const { + // Always use the first endpoint. + return localityLbEndpoint().lb_endpoints()[0]; + } + + // A redis node in the Redis cluster. + class RedisHost : public Upstream::HostImpl { + public: + RedisHost(Upstream::ClusterInfoConstSharedPtr cluster, const std::string& hostname, + Network::Address::InstanceConstSharedPtr address, RedisCluster& parent, bool master) + : Upstream::HostImpl(cluster, hostname, address, parent.lbEndpoint().metadata(), + parent.lbEndpoint().load_balancing_weight().value(), + parent.localityLbEndpoint().locality(), + parent.lbEndpoint().endpoint().health_check_config(), + parent.localityLbEndpoint().priority(), + parent.lbEndpoint().health_status()), + master_(master) {} + + bool isMaster() const { return master_; } + + private: + const bool master_; + }; + + // Resolves the discovery endpoint. + struct DnsDiscoveryResolveTarget { + DnsDiscoveryResolveTarget( + RedisCluster& parent, const std::string& dns_address, const uint32_t port, + const envoy::api::v2::endpoint::LocalityLbEndpoints& locality_lb_endpoint, + const envoy::api::v2::endpoint::LbEndpoint& lb_endpoint); + + ~DnsDiscoveryResolveTarget(); + + void startResolve(); + + RedisCluster& parent_; + Network::ActiveDnsQuery* active_query_{}; + const std::string dns_address_; + const uint32_t port_; + const envoy::api::v2::endpoint::LocalityLbEndpoints locality_lb_endpoint_; + const envoy::api::v2::endpoint::LbEndpoint lb_endpoint_; + }; + + typedef std::unique_ptr DnsDiscoveryResolveTargetPtr; + + struct RedisDiscoverySession; + + struct RedisDiscoveryClient : public Network::ConnectionCallbacks { + RedisDiscoveryClient(RedisDiscoverySession& parent) : parent_(parent) {} + + // Network::ConnectionCallbacks + void onEvent(Network::ConnectionEvent event) override; + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + RedisDiscoverySession& parent_; + std::string host_; + Extensions::NetworkFilters::Common::Redis::Client::ClientPtr client_; + }; + + typedef std::unique_ptr RedisDiscoveryClientPtr; + + struct RedisDiscoverySession + : public Extensions::NetworkFilters::Common::Redis::Client::Config, + public Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks { + RedisDiscoverySession(RedisCluster& parent, + NetworkFilters::Common::Redis::Client::ClientFactory& client_factory); + + ~RedisDiscoverySession(); + + void registerDiscoveryAddress( + const std::list& address_list, + const uint32_t port); + + // Start discovery against a random host from existing hosts + void startResolve(); + + // Extensions::NetworkFilters::Common::Redis::Client::Config + bool disableOutlierEvents() const override { return true; } + std::chrono::milliseconds opTimeout() const override { + // Allow the main Health Check infra to control timeout. + return parent_.cluster_refresh_timeout_; + } + bool enableHashtagging() const override { return false; } + bool enableRedirection() const override { return false; } + uint32_t maxBufferSizeBeforeFlush() const override { return 0; } + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; } + + // Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks + void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override; + void onFailure() override; + // Note: Below callback isn't used in topology updates + bool onRedirection(const NetworkFilters::Common::Redis::RespValue&) override { return true; } + void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&); + + RedisCluster& parent_; + Event::Dispatcher& dispatcher_; + std::string current_host_address_; + Extensions::NetworkFilters::Common::Redis::Client::PoolRequest* current_request_{}; + std::unordered_map client_map_; + + std::list discovery_address_list_; + + Event::TimerPtr resolve_timer_; + NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_; + const std::chrono::milliseconds buffer_timeout_; + }; + + Upstream::ClusterManager& cluster_manager_; + const std::chrono::milliseconds cluster_refresh_rate_; + const std::chrono::milliseconds cluster_refresh_timeout_; + std::list dns_discovery_resolve_targets_; + Event::Dispatcher& dispatcher_; + Network::DnsResolverSharedPtr dns_resolver_; + Network::DnsLookupFamily dns_lookup_family_; + const envoy::api::v2::ClusterLoadAssignment load_assignment_; + const LocalInfo::LocalInfo& local_info_; + Runtime::RandomGenerator& random_; + RedisDiscoverySession redis_discovery_session_; + // The slot to master node map. + SlotArray cluster_slots_map_; + + Upstream::HostVector hosts_; + Upstream::HostMap all_hosts_; +}; + +class RedisClusterFactory : public Upstream::ConfigurableClusterFactoryBase< + envoy::config::cluster::redis::RedisClusterConfig> { +public: + RedisClusterFactory() + : ConfigurableClusterFactoryBase(Extensions::Clusters::ClusterTypes::get().Redis) {} + +private: + friend class RedisClusterTest; + + Upstream::ClusterImplBaseSharedPtr createClusterWithConfig( + const envoy::api::v2::Cluster& cluster, + const envoy::config::cluster::redis::RedisClusterConfig& proto_config, + Upstream::ClusterFactoryContext& context, + Server::Configuration::TransportSocketFactoryContext& socket_factory_context, + Stats::ScopePtr&& stats_scope) override; +}; +} // namespace Redis +} // namespace Clusters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/clusters/well_known_names.h b/source/extensions/clusters/well_known_names.h index 820c612280680..b074bdd644bc9 100644 --- a/source/extensions/clusters/well_known_names.h +++ b/source/extensions/clusters/well_known_names.h @@ -28,6 +28,9 @@ class ClusterTypeValues { // Original destination (dynamic cluster that automatically adds hosts as needed based on the // original destination address of the downstream connection). const std::string OriginalDst = "envoy.cluster.original_dst"; + + // Redis cluster (cluster that reads host information using the redis cluster protocol). + const std::string Redis = "envoy.clusters.redis"; }; using ClusterTypes = ConstSingleton; diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 5b08a48e1f091..8b8297dec180e 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -7,11 +7,16 @@ EXTENSIONS = { "envoy.access_loggers.file": "//source/extensions/access_loggers/file:config", "envoy.access_loggers.http_grpc": "//source/extensions/access_loggers/http_grpc:config", + # + # Clusters + # + "envoy.clusters.redis": "//source/extensions/clusters/redis:redis_cluster", + # # gRPC Credentials Plugins # - "envoy.grpc_credentials.file_based_metadata": "//source/extensions/grpc_credentials/file_based_metadata:config", + "envoy.grpc_credentials.file_based_metadata": "//source/extensions/grpc_credentials/file_based_metadata:config", # # Health checkers diff --git a/test/extensions/clusters/redis/BUILD b/test/extensions/clusters/redis/BUILD new file mode 100644 index 0000000000000..f4a802908601a --- /dev/null +++ b/test/extensions/clusters/redis/BUILD @@ -0,0 +1,52 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +envoy_package() + +envoy_extension_cc_test( + name = "redis_cluster_test", + srcs = ["redis_cluster_test.cc"], + extension_name = "envoy.clusters.redis", + deps = [ + "//source/common/event:dispatcher_lib", + "//source/common/network:utility_lib", + "//source/common/upstream:upstream_lib", + "//source/extensions/clusters/redis:redis_cluster", + "//source/extensions/transport_sockets/raw_buffer:config", + "//source/server:transport_socket_config_lib", + "//test/common/upstream:utility_lib", + "//test/extensions/filters/network/common/redis:redis_mocks", + "//test/extensions/filters/network/common/redis:test_utils_lib", + "//test/extensions/filters/network/redis_proxy:redis_mocks", + "//test/mocks:common_lib", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/network:network_mocks", + "//test/mocks/runtime:runtime_mocks", + "//test/mocks/server:server_mocks", + "//test/mocks/ssl:ssl_mocks", + "//test/mocks/thread_local:thread_local_mocks", + "//test/mocks/upstream:upstream_mocks", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/cluster/redis:redis_cluster_cc", + ], +) + +envoy_extension_cc_test( + name = "redis_cluster_integration_test", + size = "small", + srcs = ["redis_cluster_integration_test.cc"], + extension_name = "envoy.clusters.redis", + deps = [ + "//source/extensions/clusters/redis:redis_cluster", + "//source/extensions/filters/network/redis_proxy:config", + "//test/integration:integration_lib", + ], +) diff --git a/test/extensions/clusters/redis/redis_cluster_integration_test.cc b/test/extensions/clusters/redis/redis_cluster_integration_test.cc new file mode 100644 index 0000000000000..28d0a67fa0236 --- /dev/null +++ b/test/extensions/clusters/redis/redis_cluster_integration_test.cc @@ -0,0 +1,263 @@ +#include +#include + +#include "extensions/filters/network/redis_proxy/command_splitter_impl.h" + +#include "test/integration/integration.h" + +using testing::Return; + +namespace Envoy { +namespace { + +// This is a basic redis_proxy configuration with a single host +// in the cluster. The load balancing policy must be set +// to random for proper test operation. + +const std::string CONFIG = R"EOF( +admin: + access_log_path: /dev/null + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +static_resources: + clusters: + - name: cluster_0 + lb_policy: RANDOM + hosts: + - socket_address: + address: 127.0.0.1 + port_value: 0 + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 1s + cluster_refresh_timeout: 4s + listeners: + name: listener_0 + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + filter_chains: + filters: + name: envoy.redis_proxy + config: + stat_prefix: redis_stats + cluster: cluster_0 + settings: + op_timeout: 5s +)EOF"; + +// This function encodes commands as an array of bulkstrings as transmitted by Redis clients to +// Redis servers, according to the Redis protocol. +std::string makeBulkStringArray(std::vector&& command_strings) { + std::stringstream result; + + result << "*" << command_strings.size() << "\r\n"; + for (uint64_t i = 0; i < command_strings.size(); i++) { + result << "$" << command_strings[i].size() << "\r\n"; + result << command_strings[i] << "\r\n"; + } + + return result.str(); +} + +class RedisClusterIntegrationTest : public testing::TestWithParam, + public BaseIntegrationTest { +public: + RedisClusterIntegrationTest(const std::string& config = CONFIG, int num_upstreams = 2) + : BaseIntegrationTest(GetParam(), config), num_upstreams_(num_upstreams), + version_(GetParam()) {} + + void TearDown() override { + test_server_.reset(); + fake_upstreams_.clear(); + } + + void initialize() override { + setUpstreamCount(num_upstreams_); + setDeterministic(); + config_helper_.renameListener("redis_proxy"); + + // Change the port for each of the discovery host in cluster_0. + config_helper_.addConfigModifier([this](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { + uint32_t upstream_idx = 0; + auto* cluster_0 = bootstrap.mutable_static_resources()->mutable_clusters(0); + + for (int j = 0; j < cluster_0->hosts_size(); ++j) { + if (cluster_0->mutable_hosts(j)->has_socket_address()) { + auto* host_socket_addr = cluster_0->mutable_hosts(j)->mutable_socket_address(); + RELEASE_ASSERT(fake_upstreams_.size() > upstream_idx, ""); + host_socket_addr->set_address( + fake_upstreams_[upstream_idx]->localAddress()->ip()->addressAsString()); + host_socket_addr->set_port_value( + fake_upstreams_[upstream_idx++]->localAddress()->ip()->port()); + } + } + }); + + BaseIntegrationTest::initialize(); + + mock_rng_ = dynamic_cast(&test_server_->server().random()); + // Abort now if we cannot downcast the server's random number generator pointer. + ASSERT_TRUE(mock_rng_ != nullptr); + // Ensure that fake_upstreams_[0] is the load balancer's host of choice by default. + ON_CALL(*mock_rng_, random()).WillByDefault(Return(random_index_)); + } + +protected: + /** + * Simple bi-directional test between a fake Redis client and Redis server. + * @param request supplies Redis client data to transmit to the Redis server. + * @param response supplies Redis server data to transmit to the client. + */ + void simpleRequestAndResponse(const int stream_index, const std::string& request, + const std::string& response) { + std::string proxy_to_server; + IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy")); + redis_client->write(request); + + FakeRawConnectionPtr fake_upstream_connection; + EXPECT_TRUE(fake_upstreams_[stream_index]->waitForRawConnection(fake_upstream_connection)); + EXPECT_TRUE(fake_upstream_connection->waitForData(request.size(), &proxy_to_server)); + // The original request should be the same as the data received by the server. + EXPECT_EQ(request, proxy_to_server); + + EXPECT_TRUE(fake_upstream_connection->write(response)); + redis_client->waitForData(response); + // The original response should be received by the fake Redis client. + EXPECT_EQ(response, redis_client->data()); + + redis_client->close(); + EXPECT_TRUE(fake_upstream_connection->close()); + } + + void expectCallClusterSlot(int stream_index, std::string& response) { + std::string cluster_slot_request = makeBulkStringArray({"CLUSTER", "SLOTS"}); + + fake_upstreams_[stream_index]->set_allow_unexpected_disconnects(true); + + std::string proxied_cluster_slot_request; + + FakeRawConnectionPtr fake_upstream_connection_; + EXPECT_TRUE(fake_upstreams_[stream_index]->waitForRawConnection(fake_upstream_connection_)); + EXPECT_TRUE(fake_upstream_connection_->waitForData(cluster_slot_request.size(), + &proxied_cluster_slot_request)); + + EXPECT_EQ(cluster_slot_request, proxied_cluster_slot_request); + + EXPECT_TRUE(fake_upstream_connection_->write(response)); + EXPECT_TRUE(fake_upstream_connection_->close()); + } + + /** + * Simple response for a single slot redis cluster with a master and slave. + * @param master the ip of the master node. + * @param slave the ip of the slave node. + * @return The cluster slot response. + */ + std::string singleSlotMasterSlave(const Network::Address::Ip* master, + const Network::Address::Ip* slave) { + int64_t start_slot = 0; + int64_t end_slot = 16383; + + std::stringstream resp; + resp << "*1\r\n" + << "*4\r\n" + << ":" << start_slot << "\r\n" + << ":" << end_slot << "\r\n" + << makeIp(master->addressAsString(), master->port()) + << makeIp(slave->addressAsString(), slave->port()); + + return resp.str(); + } + + /** + * Simple response for 2 slot redis cluster with 2 nodes. + * @param slot1 the ip of the master node of slot1. + * @param slot2 the ip of the master node of slot2. + * @return The cluster slot response. + */ + std::string twoSlots(const Network::Address::Ip* slot1, const Network::Address::Ip* slot2) { + int64_t start_slot1 = 0; + int64_t end_slot1 = 10000; + int64_t start_slot2 = 10000; + int64_t end_slot2 = 16383; + + std::stringstream resp; + resp << "*2\r\n" + << "*3\r\n" + << ":" << start_slot1 << "\r\n" + << ":" << end_slot1 << "\r\n" + << makeIp(slot1->addressAsString(), slot1->port()) << "*3\r\n" + << ":" << start_slot2 << "\r\n" + << ":" << end_slot2 << "\r\n" + << makeIp(slot2->addressAsString(), slot2->port()); + return resp.str(); + } + + std::string makeIp(const std::string& address, uint32_t port) { + return fmt::format("*2\r\n${0}\r\n{1}\r\n:{2}\r\n", address.size(), address, port); + } + + Runtime::MockRandomGenerator* mock_rng_{}; + const int num_upstreams_; + const Network::Address::IpVersion version_; + int random_index_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, RedisClusterIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// This test sends a simple "get foo" command from a fake +// downstream client through the proxy to a fake upstream +// Redis cluster with a single slot with master and slave. +// The fake server sends a valid response back to the client. +// The request and response should make it through the envoy +// proxy server code unchanged. + +TEST_P(RedisClusterIntegrationTest, SingleSlotMasterSlave) { + random_index_ = 0; + + on_server_init_function_ = [this]() { + std::string cluster_slot_response = singleSlotMasterSlave( + fake_upstreams_[0]->localAddress()->ip(), fake_upstreams_[1]->localAddress()->ip()); + expectCallClusterSlot(0, cluster_slot_response); + }; + + initialize(); + + simpleRequestAndResponse(random_index_, makeBulkStringArray({"get", "foo"}), "$3\r\nbar\r\n"); +} + +// This test sends a simple "get foo" command from a fake +// downstream client through the proxy to a fake upstream +// Redis cluster with 2 slots. The fake server sends a valid response +// back to the client. The request and response should +// make it through the envoy proxy server code unchanged. + +TEST_P(RedisClusterIntegrationTest, TwoSlot) { + random_index_ = 0; + + on_server_init_function_ = [this]() { + std::string cluster_slot_response = twoSlots(fake_upstreams_[0]->localAddress()->ip(), + fake_upstreams_[1]->localAddress()->ip()); + expectCallClusterSlot(0, cluster_slot_response); + }; + + initialize(); + + simpleRequestAndResponse(random_index_, makeBulkStringArray({"get", "foo"}), "$3\r\nbar\r\n"); + + // change the load balancer index and hit slot 2 master + ON_CALL(*mock_rng_, random()).WillByDefault(Return(1)); + simpleRequestAndResponse(1, makeBulkStringArray({"get", "foo"}), "$3\r\nbar\r\n"); +} + +} // namespace +} // namespace Envoy diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc new file mode 100644 index 0000000000000..ce4da493a5f8c --- /dev/null +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -0,0 +1,681 @@ +#include +#include +#include +#include + +#include "envoy/stats/scope.h" + +#include "common/network/utility.h" +#include "common/singleton/manager_impl.h" +#include "common/upstream/logical_dns_cluster.h" + +#include "source/extensions/clusters/redis/redis_cluster.h" + +#include "test/common/upstream/utility.h" +#include "test/extensions/filters/network/common/redis/mocks.h" +#include "test/mocks/common.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/network/mocks.h" +#include "test/mocks/server/mocks.h" +#include "test/mocks/ssl/mocks.h" + +using testing::_; +using testing::ContainerEq; +using testing::DoAll; +using testing::Eq; +using testing::InvokeWithoutArgs; +using testing::NiceMock; +using testing::Ref; +using testing::Return; +using testing::ReturnRef; +using testing::SaveArg; +using testing::WithArg; + +namespace Envoy { +namespace Extensions { +namespace Clusters { +namespace Redis { + +class RedisClusterTest : public testing::Test, + public Extensions::NetworkFilters::Common::Redis::Client::ClientFactory { +public: + // ClientFactory + Extensions::NetworkFilters::Common::Redis::Client::ClientPtr + create(Upstream::HostConstSharedPtr host, Event::Dispatcher&, + const Extensions::NetworkFilters::Common::Redis::Client::Config&) override { + EXPECT_EQ(22120, host->address()->ip()->port()); + return Extensions::NetworkFilters::Common::Redis::Client::ClientPtr{ + create_(host->address()->asString())}; + } + + MOCK_METHOD1(create_, Extensions::NetworkFilters::Common::Redis::Client::Client*(std::string)); + +protected: + RedisClusterTest() : api_(Api::createApiForTest(stats_store_)) {} + + std::list hostListToAddresses(const Upstream::HostVector& hosts) { + std::list addresses; + for (const Upstream::HostSharedPtr& host : hosts) { + addresses.push_back(host->address()->asString()); + } + + return addresses; + } + + void setupFromV2Yaml(const std::string& yaml) { + expectRedisSessionCreated(); + NiceMock cm; + envoy::api::v2::Cluster cluster_config = Upstream::parseClusterFromV2Yaml(yaml); + Envoy::Stats::ScopePtr scope = stats_store_.createScope(fmt::format( + "cluster.{}.", cluster_config.alt_stat_name().empty() ? cluster_config.name() + : cluster_config.alt_stat_name())); + Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context( + admin_, ssl_context_manager_, *scope, cm, local_info_, dispatcher_, random_, stats_store_, + singleton_manager_, tls_, *api_); + + envoy::config::cluster::redis::RedisClusterConfig config; + Config::Utility::translateOpaqueConfig(cluster_config.cluster_type().typed_config(), + ProtobufWkt::Struct::default_instance(), config); + + cluster_.reset(new RedisCluster( + cluster_config, + MessageUtil::downcastAndValidate( + config), + *this, cm, runtime_, dns_resolver_, factory_context, std::move(scope), false)); + // This allows us to create expectation on cluster slot response without waiting for + // makeRequest. + pool_callbacks_ = &cluster_->redis_discovery_session_; + cluster_->prioritySet().addPriorityUpdateCb( + [&](uint32_t, const Upstream::HostVector&, const Upstream::HostVector&) -> void { + membership_updated_.ready(); + }); + } + + void setupFactoryFromV2Yaml(const std::string& yaml) { + NiceMock cm; + envoy::api::v2::Cluster cluster_config = Upstream::parseClusterFromV2Yaml(yaml); + Envoy::Stats::ScopePtr scope = stats_store_.createScope(fmt::format( + "cluster.{}.", cluster_config.alt_stat_name().empty() ? cluster_config.name() + : cluster_config.alt_stat_name())); + Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context( + admin_, ssl_context_manager_, *scope, cm, local_info_, dispatcher_, random_, stats_store_, + singleton_manager_, tls_, *api_); + + envoy::config::cluster::redis::RedisClusterConfig config; + Config::Utility::translateOpaqueConfig(cluster_config.cluster_type().typed_config(), + ProtobufWkt::Struct::default_instance(), config); + + NiceMock log_manager; + NiceMock outlier_event_logger; + NiceMock api; + Upstream::ClusterFactoryContextImpl cluster_factory_context( + cm, stats_store_, tls_, std::move(dns_resolver_), ssl_context_manager_, runtime_, random_, + dispatcher_, log_manager, local_info_, admin_, singleton_manager_, + std::move(outlier_event_logger), false, api); + + RedisClusterFactory factory = RedisClusterFactory(); + factory.createClusterWithConfig(cluster_config, config, cluster_factory_context, + factory_context, std::move(scope)); + } + + void expectResolveDiscovery(Network::DnsLookupFamily dns_lookup_family, + const std::string& expected_address, + const std::list& resolved_addresses) { + EXPECT_CALL(*dns_resolver_, resolve(expected_address, dns_lookup_family, _)) + .WillOnce(Invoke([&](const std::string&, Network::DnsLookupFamily, + Network::DnsResolver::ResolveCb cb) -> Network::ActiveDnsQuery* { + cb(TestUtility::makeDnsResponse(resolved_addresses)); + return nullptr; + })); + } + + void expectRedisSessionCreated() { + resolve_timer_ = new Event::MockTimer(&dispatcher_); + ON_CALL(random_, random()).WillByDefault(Return(0)); + } + + void expectRedisResolve(bool createClient = false) { + if (createClient) { + client_ = new Extensions::NetworkFilters::Common::Redis::Client::MockClient(); + EXPECT_CALL(*this, create_(_)).WillOnce(Return(client_)); + EXPECT_CALL(*client_, addConnectionCallbacks(_)); + EXPECT_CALL(*client_, close()); + } + EXPECT_CALL(*client_, makeRequest(Ref(RedisCluster::ClusterSlotsRequest::instance_), _)) + .WillOnce(Return(&pool_request_)); + } + + void expectClusterSlotResponse(NetworkFilters::Common::Redis::RespValuePtr&& response) { + EXPECT_CALL(*resolve_timer_, enableTimer(_)); + pool_callbacks_->onResponse(std::move(response)); + } + + void expectClusterSlotFailure() { + EXPECT_CALL(*resolve_timer_, enableTimer(_)); + pool_callbacks_->onFailure(); + } + + NetworkFilters::Common::Redis::RespValuePtr + singleSlotMasterSlave(const std::string& master, const std::string& slave, int64_t port) const { + std::vector master_1(2); + master_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + master_1[0].asString() = master; + master_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + master_1[1].asInteger() = port; + + std::vector slave_1(2); + slave_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + slave_1[0].asString() = slave; + slave_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + slave_1[1].asInteger() = port; + + std::vector slot_1(4); + slot_1[0].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[0].asInteger() = 0; + slot_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[1].asInteger() = 16383; + slot_1[2].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[2].asArray().swap(master_1); + slot_1[3].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[3].asArray().swap(slave_1); + + std::vector slots(1); + slots[0].type(NetworkFilters::Common::Redis::RespType::Array); + slots[0].asArray().swap(slot_1); + + NetworkFilters::Common::Redis::RespValuePtr response( + new NetworkFilters::Common::Redis::RespValue()); + response->type(NetworkFilters::Common::Redis::RespType::Array); + response->asArray().swap(slots); + return response; + } + + NetworkFilters::Common::Redis::RespValuePtr twoSlotsMasters() const { + std::vector master_1(2); + master_1[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + master_1[0].asString() = "127.0.0.1"; + master_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + master_1[1].asInteger() = 22120; + + std::vector master_2(2); + master_2[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + master_2[0].asString() = "127.0.0.2"; + master_2[1].type(NetworkFilters::Common::Redis::RespType::Integer); + master_2[1].asInteger() = 22120; + + std::vector slot_1(3); + slot_1[0].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[0].asInteger() = 0; + slot_1[1].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_1[1].asInteger() = 9999; + slot_1[2].type(NetworkFilters::Common::Redis::RespType::Array); + slot_1[2].asArray().swap(master_1); + + std::vector slot_2(3); + slot_2[0].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_2[0].asInteger() = 10000; + slot_2[1].type(NetworkFilters::Common::Redis::RespType::Integer); + slot_2[1].asInteger() = 16383; + slot_2[2].type(NetworkFilters::Common::Redis::RespType::Array); + slot_2[2].asArray().swap(master_2); + + std::vector slots(2); + slots[0].type(NetworkFilters::Common::Redis::RespType::Array); + slots[0].asArray().swap(slot_1); + slots[1].type(NetworkFilters::Common::Redis::RespType::Array); + slots[1].asArray().swap(slot_2); + + NetworkFilters::Common::Redis::RespValuePtr response( + new NetworkFilters::Common::Redis::RespValue()); + response->type(NetworkFilters::Common::Redis::RespType::Array); + response->asArray().swap(slots); + return response; + } + + NetworkFilters::Common::Redis::RespValue + createStringField(bool is_correct_type, const std::string& correct_value) const { + NetworkFilters::Common::Redis::RespValue respValue; + if (is_correct_type) { + respValue.type(NetworkFilters::Common::Redis::RespType::BulkString); + respValue.asString() = correct_value; + } else { + respValue.type(NetworkFilters::Common::Redis::RespType::Integer); + respValue.asInteger() = 10; + } + return respValue; + } + + NetworkFilters::Common::Redis::RespValue createIntegerField(bool is_correct_type, + int64_t correct_value) const { + NetworkFilters::Common::Redis::RespValue respValue; + if (is_correct_type) { + respValue.type(NetworkFilters::Common::Redis::RespType::Integer); + respValue.asInteger() = correct_value; + } else { + respValue.type(NetworkFilters::Common::Redis::RespType::BulkString); + respValue.asString() = "bad_value"; + } + return respValue; + } + + NetworkFilters::Common::Redis::RespValue + createArrayField(bool is_correct_type, + std::vector& correct_value) const { + NetworkFilters::Common::Redis::RespValue respValue; + if (is_correct_type) { + respValue.type(NetworkFilters::Common::Redis::RespType::Array); + respValue.asArray().swap(correct_value); + } else { + respValue.type(NetworkFilters::Common::Redis::RespType::BulkString); + respValue.asString() = "bad value"; + } + return respValue; + } + + // Create a redis cluster slot response. If a bit is set in the bitset, then that part of + // of the response is correct, otherwise it's incorrect. + NetworkFilters::Common::Redis::RespValuePtr createResponse(std::bitset<10> flags) const { + int64_t idx(0); + int64_t slots_type = idx++; + int64_t slots_size = idx++; + int64_t slot1_type = idx++; + int64_t slot1_size = idx++; + int64_t slot1_range_start_type = idx++; + int64_t slot1_range_end_type = idx++; + int64_t master_type = idx++; + int64_t master_size = idx++; + int64_t master_ip_type = idx++; + int64_t master_port_type = idx++; + + std::vector master_1_array; + if (flags.test(master_size)) { + // Ip field. + master_1_array.push_back(createStringField(flags.test(master_ip_type), "127.0.0.1")); + // Port field. + master_1_array.push_back(createIntegerField(flags.test(master_port_type), 22120)); + } + + std::vector slot_1_array; + if (flags.test(slot1_size)) { + slot_1_array.push_back(createIntegerField(flags.test(slot1_range_start_type), 0)); + slot_1_array.push_back(createIntegerField(flags.test(slot1_range_end_type), 16383)); + slot_1_array.push_back(createArrayField(flags.test(master_type), master_1_array)); + } + + std::vector slots_array; + if (flags.test(slots_size)) { + slots_array.push_back(createArrayField(flags.test(slot1_type), slot_1_array)); + } + + NetworkFilters::Common::Redis::RespValuePtr response{ + new NetworkFilters::Common::Redis::RespValue()}; + if (flags.test(slots_type)) { + response->type(NetworkFilters::Common::Redis::RespType::Array); + response->asArray().swap(slots_array); + } else { + response->type(NetworkFilters::Common::Redis::RespType::BulkString); + response->asString() = "Pong"; + } + + return response; + } + + void + expectHealthyHosts(const std::list>& healthy_hosts) { + EXPECT_THAT(healthy_hosts, ContainerEq(hostListToAddresses( + cluster_->prioritySet().hostSetsPerPriority()[0]->hosts()))); + EXPECT_THAT(healthy_hosts, + ContainerEq(hostListToAddresses( + cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHosts()))); + EXPECT_EQ(1UL, + cluster_->prioritySet().hostSetsPerPriority()[0]->hostsPerLocality().get().size()); + EXPECT_EQ( + 1UL, + cluster_->prioritySet().hostSetsPerPriority()[0]->healthyHostsPerLocality().get().size()); + } + + void testBasicSetup(const std::string& config, const std::string& expected_discovery_address) { + setupFromV2Yaml(config); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, expected_discovery_address, + resolved_addresses); + expectRedisResolve(true); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + expectClusterSlotResponse(singleSlotMasterSlave("127.0.0.1", "127.0.0.2", 22120)); + // TODO(hyang): this will change once we register slaves as well + expectHealthyHosts(std::list({"127.0.0.1:22120"})); + + // Add new host. + expectRedisResolve(); + EXPECT_CALL(membership_updated_, ready()); + resolve_timer_->callback_(); + expectClusterSlotResponse(twoSlotsMasters()); + expectHealthyHosts(std::list({"127.0.0.1:22120", "127.0.0.2:22120"})); + + // No change. + expectRedisResolve(); + resolve_timer_->callback_(); + expectClusterSlotResponse(twoSlotsMasters()); + expectHealthyHosts(std::list({"127.0.0.1:22120", "127.0.0.2:22120"})); + + // Remove host. + expectRedisResolve(); + EXPECT_CALL(membership_updated_, ready()); + resolve_timer_->callback_(); + expectClusterSlotResponse(singleSlotMasterSlave("127.0.0.1", "127.0.0.2", 22120)); + expectHealthyHosts(std::list({"127.0.0.1:22120"})); + } + + Stats::IsolatedStoreImpl stats_store_; + Ssl::MockContextManager ssl_context_manager_; + std::shared_ptr> dns_resolver_{ + new NiceMock}; + NiceMock random_; + NiceMock tls_; + Event::MockTimer* resolve_timer_; + ReadyWatcher membership_updated_; + ReadyWatcher initialized_; + NiceMock runtime_; + NiceMock dispatcher_; + NiceMock local_info_; + NiceMock admin_; + Singleton::ManagerImpl singleton_manager_{Thread::threadFactoryForTest().currentThreadId()}; + Api::ApiPtr api_; + std::shared_ptr hosts_; + Upstream::MockHealthCheckEventLogger* event_logger_{}; + Event::MockTimer* interval_timer_{}; + Extensions::NetworkFilters::Common::Redis::Client::MockClient* client_{}; + Extensions::NetworkFilters::Common::Redis::Client::MockPoolRequest pool_request_; + Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks* pool_callbacks_{}; + std::shared_ptr cluster_; +}; + +typedef std::tuple, + std::list> + RedisDnsConfigTuple; +std::vector generateRedisDnsParams() { + std::vector dns_config; + { + std::string family_yaml(""); + Network::DnsLookupFamily family(Network::DnsLookupFamily::Auto); + std::list dns_response{"127.0.0.1", "127.0.0.2"}; + std::list resolved_host{"127.0.0.1:22120"}; + dns_config.push_back(std::make_tuple(family_yaml, family, dns_response, resolved_host)); + } + { + std::string family_yaml(R"EOF(dns_lookup_family: V4_ONLY)EOF"); + Network::DnsLookupFamily family(Network::DnsLookupFamily::V4Only); + std::list dns_response{"127.0.0.1", "127.0.0.2"}; + std::list resolved_host{"127.0.0.1:22120"}; + dns_config.push_back(std::make_tuple(family_yaml, family, dns_response, resolved_host)); + } + { + std::string family_yaml(R"EOF(dns_lookup_family: V6_ONLY)EOF"); + Network::DnsLookupFamily family(Network::DnsLookupFamily::V6Only); + std::list dns_response{"::1", "::2"}; + std::list resolved_host{"[::1]:22120"}; + dns_config.push_back(std::make_tuple(family_yaml, family, dns_response, resolved_host)); + } + { + std::string family_yaml(R"EOF(dns_lookup_family: AUTO)EOF"); + Network::DnsLookupFamily family(Network::DnsLookupFamily::Auto); + std::list dns_response{"::1", "::2"}; + std::list resolved_host{"[::1]:22120"}; + dns_config.push_back(std::make_tuple(family_yaml, family, dns_response, resolved_host)); + } + return dns_config; +} + +class RedisDnsParamTest : public RedisClusterTest, + public testing::WithParamInterface {}; + +INSTANTIATE_TEST_SUITE_P(DnsParam, RedisDnsParamTest, testing::ValuesIn(generateRedisDnsParams())); + +// Validate that if the DNS and CLUSTER SLOT resolve immediately, we have the expected +// host state and initialization callback invocation. + +TEST_P(RedisDnsParamTest, ImmediateResolveDns) { + const std::string config = R"EOF( + name: name + connect_timeout: 0.25s + )EOF" + std::get<0>(GetParam()) + + R"EOF( + hosts: + - socket_address: + address: foo.bar.com + port_value: 22120 + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 4s + cluster_refresh_timeout: 0.25s + )EOF"; + + setupFromV2Yaml(config); + + expectRedisResolve(true); + EXPECT_CALL(*dns_resolver_, resolve("foo.bar.com", std::get<1>(GetParam()), _)) + .WillOnce(Invoke([&](const std::string&, Network::DnsLookupFamily, + Network::DnsResolver::ResolveCb cb) -> Network::ActiveDnsQuery* { + std::list address_pair = std::get<2>(GetParam()); + cb(TestUtility::makeDnsResponse(address_pair)); + expectClusterSlotResponse( + singleSlotMasterSlave(address_pair.front(), address_pair.back(), 22120)); + return nullptr; + })); + + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + expectHealthyHosts(std::get<3>(GetParam())); +} + +TEST_F(RedisClusterTest, Basic) { + const std::string basic_yaml_hosts = R"EOF( + name: name + connect_timeout: 0.25s + dns_lookup_family: V4_ONLY + hosts: + - socket_address: + address: foo.bar.com + port_value: 22120 + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 4s + cluster_refresh_timeout: 0.25s + )EOF"; + + // Using load assignment. + const std::string basic_yaml_load_assignment = R"EOF( + name: name + connect_timeout: 0.25s + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: name + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.bar.com + port_value: 22120 + health_check_config: + port_value: 8000 + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 4s + cluster_refresh_timeout: 0.25s + )EOF"; + + testBasicSetup(basic_yaml_hosts, "foo.bar.com"); + testBasicSetup(basic_yaml_load_assignment, "foo.bar.com"); +} + +TEST_F(RedisClusterTest, RedisResolveFailure) { + const std::string basic_yaml_hosts = R"EOF( + name: name + connect_timeout: 0.25s + dns_lookup_family: V4_ONLY + hosts: + - socket_address: + address: foo.bar.com + port_value: 22120 + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 4s + cluster_refresh_timeout: 0.25s + )EOF"; + setupFromV2Yaml(basic_yaml_hosts); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + expectRedisResolve(true); + + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + // Initialization will wait til the redis cluster succeed. + expectClusterSlotFailure(); + EXPECT_EQ(1U, cluster_->info()->stats().update_attempt_.value()); + EXPECT_EQ(1U, cluster_->info()->stats().update_failure_.value()); + + expectRedisResolve(true); + resolve_timer_->callback_(); + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + expectClusterSlotResponse(singleSlotMasterSlave("127.0.0.1", "127.0.0.2", 22120)); + expectHealthyHosts(std::list({"127.0.0.1:22120"})); + + // Expect no change if resolve failed. + expectRedisResolve(); + resolve_timer_->callback_(); + expectClusterSlotFailure(); + expectHealthyHosts(std::list({"127.0.0.1:22120"})); + EXPECT_EQ(3U, cluster_->info()->stats().update_attempt_.value()); + EXPECT_EQ(2U, cluster_->info()->stats().update_failure_.value()); +} + +TEST_F(RedisClusterTest, FactoryInitNotRedisClusterTypeFailure) { + const std::string basic_yaml_hosts = R"EOF( + name: name + connect_timeout: 0.25s + dns_lookup_family: V4_ONLY + hosts: + - socket_address: + address: foo.bar.com + port_value: 22120 + cluster_type: + name: envoy.clusters.memcached + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 4s + cluster_refresh_timeout: 0.25s + )EOF"; + + EXPECT_THROW_WITH_MESSAGE(setupFactoryFromV2Yaml(basic_yaml_hosts), EnvoyException, + "Redis cluster can only created with redis cluster type"); +} + +TEST_F(RedisClusterTest, FactoryInitRedisClusterTypeSuccess) { + const std::string basic_yaml_hosts = R"EOF( + name: name + connect_timeout: 0.25s + dns_lookup_family: V4_ONLY + hosts: + - socket_address: + address: foo.bar.com + port_value: 22120 + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 4s + cluster_refresh_timeout: 0.25s + )EOF"; + setupFactoryFromV2Yaml(basic_yaml_hosts); +} + +TEST_F(RedisClusterTest, RedisErrorResponse) { + const std::string basic_yaml_hosts = R"EOF( + name: name + connect_timeout: 0.25s + dns_lookup_family: V4_ONLY + hosts: + - socket_address: + address: foo.bar.com + port_value: 22120 + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 4s + cluster_refresh_timeout: 0.25s + )EOF"; + setupFromV2Yaml(basic_yaml_hosts); + const std::list resolved_addresses{"127.0.0.1", "127.0.0.2"}; + expectResolveDiscovery(Network::DnsLookupFamily::V4Only, "foo.bar.com", resolved_addresses); + expectRedisResolve(true); + + cluster_->initialize([&]() -> void { initialized_.ready(); }); + + // Initialization will wait til the redis cluster succeed. + std::vector hello_world(2); + hello_world[0].type(NetworkFilters::Common::Redis::RespType::BulkString); + hello_world[0].asString() = "hello"; + hello_world[1].type(NetworkFilters::Common::Redis::RespType::BulkString); + hello_world[1].asString() = "world"; + + NetworkFilters::Common::Redis::RespValuePtr hello_world_response( + new NetworkFilters::Common::Redis::RespValue()); + hello_world_response->type(NetworkFilters::Common::Redis::RespType::Array); + hello_world_response->asArray().swap(hello_world); + + expectClusterSlotResponse(std::move(hello_world_response)); + EXPECT_EQ(1U, cluster_->info()->stats().update_attempt_.value()); + EXPECT_EQ(1U, cluster_->info()->stats().update_failure_.value()); + + expectRedisResolve(); + resolve_timer_->callback_(); + EXPECT_CALL(membership_updated_, ready()); + EXPECT_CALL(initialized_, ready()); + expectClusterSlotResponse(singleSlotMasterSlave("127.0.0.1", "127.0.0.2", 22120)); + expectHealthyHosts(std::list({"127.0.0.1:22120"})); + + // Expect no change if resolve failed. + uint64_t update_attempt = 2; + uint64_t update_failure = 1; + // Test every combination the cluster slots response. + for (uint64_t i = 0; i < (1 << 10); i++) { + std::bitset<10> flags(i); + expectRedisResolve(); + resolve_timer_->callback_(); + expectClusterSlotResponse(createResponse(flags)); + expectHealthyHosts(std::list({"127.0.0.1:22120"})); + EXPECT_EQ(++update_attempt, cluster_->info()->stats().update_attempt_.value()); + if (!flags.all()) { + EXPECT_EQ(++update_failure, cluster_->info()->stats().update_failure_.value()); + } + } +} + +} // namespace Redis +} // namespace Clusters +} // namespace Extensions +} // namespace Envoy