diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index 00e339f6c25c4..e2fdd24522e9d 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -10,6 +10,7 @@ option go_package = "v2"; import "envoy/api/v2/core/base.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; import "validate/validate.proto"; import "gogoproto/gogo.proto"; @@ -82,6 +83,13 @@ message RedisProxy { // If `max_buffer_size_before_flush` is set, but `buffer_flush_timeout` is not, the latter // defaults to 3ms. google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true]; + + // `max_upstream_unknown_connections` controls how many upstream connections to unknown hosts + // can be created at any given time by any given worker thread (see `enable_redirection` for + // more details). If the host is unknown and a connection cannot be created due to enforcing + // this limit, then redirection will fail and the original redirection error will be passed + // downstream unchanged. This limit defaults to 100. + google.protobuf.UInt32Value max_upstream_unknown_connections = 6; } // Network settings for the connection pool to the upstream clusters. diff --git a/docs/root/intro/arch_overview/other_protocols/redis.rst b/docs/root/intro/arch_overview/other_protocols/redis.rst index 42c3b288cabe6..22650a4264ade 100644 --- a/docs/root/intro/arch_overview/other_protocols/redis.rst +++ b/docs/root/intro/arch_overview/other_protocols/redis.rst @@ -81,6 +81,15 @@ following information: For topology configuration details, see the Redis Cluster :ref:`v2 API reference `. +Every Redis cluster has its own extra statistics tree rooted at *cluster..redis_cluster.* with the following statistics: + +.. csv-table:: + :header: Name, Type, Description + :widths: 1, 1, 2 + + max_upstream_unknown_connections_reached, Counter, Total number of times that an upstream connection to an unknown host is not created after redirection having reached the connection pool's max_upstream_unknown_connections limit + upstream_cx_drained, Counter, Total number of upstream connections drained of active requests before being closed + Supported commands ------------------ diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index f3f0e141e8305..d7b776353671e 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -211,6 +211,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { bool enableRedirection() const override { return false; } uint32_t maxBufferSizeBeforeFlush() const override { return 0; } std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; } + uint32_t maxUpstreamUnknownConnections() const override { return 0; } // Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override; diff --git a/source/extensions/filters/network/common/redis/BUILD b/source/extensions/filters/network/common/redis/BUILD index f74e283743c69..1757c2a7b9b4c 100644 --- a/source/extensions/filters/network/common/redis/BUILD +++ b/source/extensions/filters/network/common/redis/BUILD @@ -65,6 +65,7 @@ envoy_cc_library( "//source/common/network:filter_lib", "//source/common/protobuf:utility_lib", "//source/common/upstream:load_balancer_lib", + "//source/common/upstream:upstream_lib", "@envoy_api//envoy/config/filter/network/redis_proxy/v2:redis_proxy_cc", ], ) diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index e6db95d75c259..4e041ad33552f 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -76,6 +76,12 @@ class Client : public Event::DeferredDeletable { */ virtual void addConnectionCallbacks(Network::ConnectionCallbacks& callbacks) PURE; + /** + * Called to determine if the client has pending requests. + * @return bool true if the client is processing requests or false if it is currently idle. + */ + virtual bool active() PURE; + /** * Closes the underlying network connection. */ @@ -134,6 +140,23 @@ class Config { * @return timeout for batching commands for a single upstream host. */ virtual std::chrono::milliseconds bufferFlushTimeoutInMs() const PURE; + + /** + * @return the maximum number of upstream connections to unknown hosts when enableRedirection() is + * true. + * + * This value acts as an upper bound on the number of servers in a cluster if only a subset + * of the cluster's servers are known via configuration (cluster size - number of servers in + * cluster known to cluster manager <= maxUpstreamUnknownConnections() for proper operation). + * Redirection errors are processed if enableRedirection() is true, and a new upstream connection + * to a previously unknown server will be made as a result of redirection if the number of unknown + * server connections is currently less than maxUpstreamUnknownConnections(). If a connection + * cannot be made, then the original redirection error will be passed though unchanged to the + * downstream client. If a cluster is using the Redis cluster protocol (RedisCluster), then the + * cluster logic will periodically discover all of the servers in the cluster; this should + * minimize the need for a large maxUpstreamUnknownConnections() value. + */ + virtual uint32_t maxUpstreamUnknownConnections() const PURE; }; /** diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index c0fee56b5eb3b..20bc5f02079ab 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -16,9 +16,10 @@ ConfigImpl::ConfigImpl( config.max_buffer_size_before_flush()), // This is a scalar, so default is zero. buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT( config, buffer_flush_timeout, - 3)) // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used - // as the buffer is flushed on each request immediately. -{} + 3)), // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used + // as the buffer is flushed on each request immediately. + max_upstream_unknown_connections_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)) {} ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, DecoderFactory& decoder_factory, @@ -124,14 +125,12 @@ void ClientImpl::putOutlierEvent(Upstream::Outlier::Result result) { void ClientImpl::onEvent(Network::ConnectionEvent event) { if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { + + Upstream::reportUpstreamCxDestroy(host_, event); if (!pending_requests_.empty()) { - host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc(); + Upstream::reportUpstreamCxDestroyActiveRequest(host_, event); if (event == Network::ConnectionEvent::RemoteClose) { putOutlierEvent(Upstream::Outlier::Result::LOCAL_ORIGIN_CONNECT_FAILED); - host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc(); - } - if (event == Network::ConnectionEvent::LocalClose) { - host_->cluster().stats().upstream_cx_destroy_local_with_active_rq_.inc(); } } diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index 20a7adcd0cc5b..d88fc4c93e236 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -12,6 +12,7 @@ #include "common/protobuf/utility.h" #include "common/singleton/const_singleton.h" #include "common/upstream/load_balancer_impl.h" +#include "common/upstream/upstream_impl.h" #include "extensions/filters/network/common/redis/client.h" @@ -45,6 +46,9 @@ class ConfigImpl : public Config { std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_flush_timeout_; } + uint32_t maxUpstreamUnknownConnections() const override { + return max_upstream_unknown_connections_; + } private: const std::chrono::milliseconds op_timeout_; @@ -52,6 +56,7 @@ class ConfigImpl : public Config { const bool enable_redirection_; const uint32_t max_buffer_size_before_flush_; const std::chrono::milliseconds buffer_flush_timeout_; + const uint32_t max_upstream_unknown_connections_; }; class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks { @@ -68,6 +73,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne } void close() override; PoolRequest* makeRequest(const RespValue& request, PoolCallbacks& callbacks) override; + bool active() override { return !pending_requests_.empty(); } void flushBufferAndResetTimer(); private: diff --git a/source/extensions/filters/network/redis_proxy/BUILD b/source/extensions/filters/network/redis_proxy/BUILD index b67729cedfdef..bbc18dff95d25 100644 --- a/source/extensions/filters/network/redis_proxy/BUILD +++ b/source/extensions/filters/network/redis_proxy/BUILD @@ -75,6 +75,7 @@ envoy_cc_library( deps = [ ":config_interface", ":conn_pool_interface", + "//include/envoy/stats:stats_macros", "//include/envoy/thread_local:thread_local_interface", "//include/envoy/upstream:cluster_manager_interface", "//source/common/buffer:buffer_lib", diff --git a/source/extensions/filters/network/redis_proxy/config.cc b/source/extensions/filters/network/redis_proxy/config.cc index 0fb6a602e5ff7..5e3e4018260d2 100644 --- a/source/extensions/filters/network/redis_proxy/config.cc +++ b/source/extensions/filters/network/redis_proxy/config.cc @@ -59,11 +59,13 @@ Network::FilterFactoryCb RedisProxyFilterConfigFactory::createFilterFactoryFromP Upstreams upstreams; for (auto& cluster : unique_clusters) { + Stats::ScopePtr stats_scope = + context.scope().createScope(fmt::format("cluster.{}.redis_cluster", cluster)); upstreams.emplace(cluster, std::make_shared( cluster, context.clusterManager(), Common::Redis::Client::ClientFactoryImpl::instance_, context.threadLocal(), proto_config.settings(), context.api(), - context.scope().symbolTable())); + std::move(stats_scope))); } auto router = diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index c9da9ef188064..3f998b191efb0 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -6,6 +6,7 @@ #include #include "common/common/assert.h" +#include "common/stats/utility.h" #include "extensions/filters/network/redis_proxy/config.h" #include "extensions/filters/network/well_known_names.h" @@ -15,7 +16,6 @@ namespace Extensions { namespace NetworkFilters { namespace RedisProxy { namespace ConnPool { - namespace { Common::Redis::Client::DoNothingPoolCallbacks null_pool_callbacks; } // namespace @@ -24,9 +24,10 @@ InstanceImpl::InstanceImpl( const std::string& cluster_name, Upstream::ClusterManager& cm, Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls, const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config, - Api::Api& api, Stats::SymbolTable& symbol_table) + Api::Api& api, Stats::ScopePtr&& stats_scope) : cm_(cm), client_factory_(client_factory), tls_(tls.allocateSlot()), config_(config), - api_(api), symbol_table_(symbol_table) { + api_(api), stats_scope_(std::move(stats_scope)), redis_cluster_stats_{REDIS_CLUSTER_STATS( + POOL_COUNTER(*stats_scope_))} { tls_->set([this, cluster_name]( Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { return std::make_shared(*this, dispatcher, cluster_name); @@ -48,8 +49,9 @@ InstanceImpl::makeRequestToHost(const std::string& host_address, InstanceImpl::ThreadLocalPool::ThreadLocalPool(InstanceImpl& parent, Event::Dispatcher& dispatcher, std::string cluster_name) - : parent_(parent), dispatcher_(dispatcher), cluster_name_(std::move(cluster_name)) { - + : parent_(parent), dispatcher_(dispatcher), cluster_name_(std::move(cluster_name)), + drain_timer_(dispatcher.createTimer([this]() -> void { drainClients(); })), + is_redis_cluster_(false) { cluster_update_handle_ = parent_.cm_.addThreadLocalClusterUpdateCallbacks(*this); Upstream::ThreadLocalCluster* cluster = parent_.cm_.get(cluster_name_); if (cluster != nullptr) { @@ -69,6 +71,9 @@ InstanceImpl::ThreadLocalPool::~ThreadLocalPool() { while (!client_map_.empty()) { client_map_.begin()->second->redis_client_->close(); } + while (!clients_to_drain_.empty()) { + (*clients_to_drain_.begin())->redis_client_->close(); + } } void InstanceImpl::ThreadLocalPool::onClusterAddOrUpdateNonVirtual( @@ -86,8 +91,9 @@ void InstanceImpl::ThreadLocalPool::onClusterAddOrUpdateNonVirtual( cluster_ = &cluster; ASSERT(host_set_member_update_cb_handle_ == nullptr); host_set_member_update_cb_handle_ = cluster_->prioritySet().addMemberUpdateCb( - [this](const std::vector&, + [this](const std::vector& hosts_added, const std::vector& hosts_removed) -> void { + onHostsAdded(hosts_added); onHostsRemoved(hosts_removed); }); @@ -97,6 +103,16 @@ void InstanceImpl::ThreadLocalPool::onClusterAddOrUpdateNonVirtual( host_address_map_[host->address()->asString()] = host; } } + + // Figure out if the cluster associated with this ConnPool is a Redis cluster + // with its own hash slot sharding scheme and ability to dynamically discover + // its members. This is done once to minimize overhead in the data path, makeRequest() in + // particular. + Upstream::ClusterInfoConstSharedPtr info = cluster_->info(); + const auto& cluster_type = info->clusterType(); + is_redis_cluster_ = info->lbType() == Upstream::LoadBalancerType::ClusterProvided && + cluster_type.has_value() && + cluster_type->name() == Extensions::Clusters::ClusterTypes::get().Redis; } void InstanceImpl::ThreadLocalPool::onClusterRemoval(const std::string& cluster_name) { @@ -106,25 +122,71 @@ void InstanceImpl::ThreadLocalPool::onClusterRemoval(const std::string& cluster_ // Treat cluster removal as a removal of all hosts. Close all connections and fail all pending // requests. + if (host_set_member_update_cb_handle_ != nullptr) { + host_set_member_update_cb_handle_->remove(); + host_set_member_update_cb_handle_ = nullptr; + } while (!client_map_.empty()) { client_map_.begin()->second->redis_client_->close(); } + while (!clients_to_drain_.empty()) { + (*clients_to_drain_.begin())->redis_client_->close(); + } cluster_ = nullptr; - host_set_member_update_cb_handle_ = nullptr; host_address_map_.clear(); } +void InstanceImpl::ThreadLocalPool::onHostsAdded( + const std::vector& hosts_added) { + for (const auto& host : hosts_added) { + std::string host_address = host->address()->asString(); + // Insert new host into address map, possibly overwriting a previous host's entry. + host_address_map_[host_address] = host; + for (const auto& created_host : created_via_redirect_hosts_) { + if (created_host->address()->asString() == host_address) { + // Remove our "temporary" host created in makeRequestToHost(). + onHostsRemoved({created_host}); + created_via_redirect_hosts_.remove(created_host); + break; + } + } + } +} + void InstanceImpl::ThreadLocalPool::onHostsRemoved( const std::vector& hosts_removed) { for (const auto& host : hosts_removed) { auto it = client_map_.find(host); if (it != client_map_.end()) { - // We don't currently support any type of draining for redis connections. If a host is gone, - // we just close the connection. This will fail any pending requests. - it->second->redis_client_->close(); + if (it->second->redis_client_->active()) { + // Put the ThreadLocalActiveClient to the side to drain. + clients_to_drain_.push_back(std::move(it->second)); + client_map_.erase(it); + if (!drain_timer_->enabled()) { + drain_timer_->enableTimer(std::chrono::seconds(1)); + } + } else { + // There are no pending requests so close the connection. + it->second->redis_client_->close(); + } + } + // There is the possibility that multiple hosts with the same address + // are registered in host_address_map_ given that hosts may be created + // upon redirection or supplied as part of the cluster's definition. + auto it2 = host_address_map_.find(host->address()->asString()); + if ((it2 != host_address_map_.end()) && (it2->second == host)) { + host_address_map_.erase(it2); } - host_address_map_.erase(host->address()->asString()); + } +} + +void InstanceImpl::ThreadLocalPool::drainClients() { + while (!clients_to_drain_.empty() && !(*clients_to_drain_.begin())->redis_client_->active()) { + (*clients_to_drain_.begin())->redis_client_->close(); + } + if (!clients_to_drain_.empty()) { + drain_timer_->enableTimer(std::chrono::seconds(1)); } } @@ -155,11 +217,7 @@ InstanceImpl::ThreadLocalPool::makeRequest(const std::string& key, return nullptr; } - Upstream::ClusterInfoConstSharedPtr info = cluster_->info(); - const auto& cluster_type = info->clusterType(); - const bool use_crc16 = info->lbType() == Upstream::LoadBalancerType::ClusterProvided && - cluster_type.has_value() && - cluster_type->name() == Extensions::Clusters::ClusterTypes::get().Redis; + const bool use_crc16 = is_redis_cluster_; Clusters::Redis::RedisLoadBalancerContext lb_context(key, parent_.config_.enableHashtagging(), use_crc16); Upstream::HostConstSharedPtr host = cluster_->loadBalancer().chooseHost(&lb_context); @@ -169,12 +227,6 @@ InstanceImpl::ThreadLocalPool::makeRequest(const std::string& key, ThreadLocalActiveClientPtr& client = threadLocalActiveClient(host); - // Keep host_address_map_ in sync with client_map_. - auto host_cached_by_address = host_address_map_.find(host->address()->asString()); - if (host_cached_by_address == host_address_map_.end()) { - host_address_map_[host->address()->asString()] = host; - } - return client->redis_client_->makeRequest(request, callbacks); } @@ -217,9 +269,11 @@ InstanceImpl::ThreadLocalPool::makeRequestToHost(const std::string& host_address auto it = host_address_map_.find(host_address_map_key); if (it == host_address_map_.end()) { // This host is not known to the cluster manager. Create a new host and insert it into the map. - // TODO(msukalski): Add logic to track the number of these "unknown" host connections, - // cap the number of these connections, and implement time-out and cleaning logic, etc. - + if (created_via_redirect_hosts_.size() == parent_.config_.maxUpstreamUnknownConnections()) { + // Too many upstream connections to unknown hosts have been created. + parent_.redis_cluster_stats_.max_upstream_unknown_connections_reached_.inc(); + return nullptr; + } if (!ipv6) { // Only create an IPv4 address instance if we need a new Upstream::HostImpl. const auto ip_port = absl::string_view(host_address).substr(colon_pos + 1); @@ -239,6 +293,7 @@ InstanceImpl::ThreadLocalPool::makeRequestToHost(const std::string& host_address envoy::api::v2::endpoint::Endpoint::HealthCheckConfig::default_instance(), 0, envoy::api::v2::core::HealthStatus::UNKNOWN)}; host_address_map_[host_address_map_key] = new_host; + created_via_redirect_hosts_.push_back(new_host); it = host_address_map_.find(host_address_map_key); } @@ -251,9 +306,22 @@ void InstanceImpl::ThreadLocalActiveClient::onEvent(Network::ConnectionEvent eve if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { auto client_to_delete = parent_.client_map_.find(host_); - ASSERT(client_to_delete != parent_.client_map_.end()); - parent_.dispatcher_.deferredDelete(std::move(client_to_delete->second->redis_client_)); - parent_.client_map_.erase(client_to_delete); + if (client_to_delete != parent_.client_map_.end()) { + parent_.dispatcher_.deferredDelete(std::move(redis_client_)); + parent_.client_map_.erase(client_to_delete); + } else { + for (auto it = parent_.clients_to_drain_.begin(); it != parent_.clients_to_drain_.end(); + it++) { + if ((*it).get() == this) { + if (!redis_client_->active()) { + parent_.parent_.redis_cluster_stats_.upstream_cx_drained_.inc(); + } + parent_.dispatcher_.deferredDelete(std::move(redis_client_)); + parent_.clients_to_drain_.erase(it); + break; + } + } + } } } diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index 34f86c5bb78e8..ff5b52ed5067c 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -9,6 +9,7 @@ #include #include "envoy/config/filter/network/redis_proxy/v2/redis_proxy.pb.h" +#include "envoy/stats/stats_macros.h" #include "envoy/thread_local/thread_local.h" #include "envoy/upstream/cluster_manager.h" @@ -36,13 +37,21 @@ namespace ConnPool { // TODO(mattklein123): Circuit breaking // TODO(rshriram): Fault injection +#define REDIS_CLUSTER_STATS(COUNTER) \ + COUNTER(upstream_cx_drained) \ + COUNTER(max_upstream_unknown_connections_reached) + +struct RedisClusterStats { + REDIS_CLUSTER_STATS(GENERATE_COUNTER_STRUCT) +}; + class InstanceImpl : public Instance { public: InstanceImpl( const std::string& cluster_name, Upstream::ClusterManager& cm, Common::Redis::Client::ClientFactory& client_factory, ThreadLocal::SlotAllocator& tls, const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config, - Api::Api& api, Stats::SymbolTable& symbol_table); + Api::Api& api, Stats::ScopePtr&& stats_scope); // RedisProxy::ConnPool::Instance Common::Redis::Client::PoolRequest* makeRequest(const std::string& key, const Common::Redis::RespValue& request, @@ -50,7 +59,6 @@ class InstanceImpl : public Instance { Common::Redis::Client::PoolRequest* makeRequestToHost(const std::string& host_address, const Common::Redis::RespValue& request, Common::Redis::Client::PoolCallbacks& callbacks) override; - Stats::SymbolTable& symbolTable() { return symbol_table_; } // Allow the unit test to have access to private members. friend class RedisConnPoolImplTest; @@ -85,7 +93,9 @@ class InstanceImpl : public Instance { makeRequestToHost(const std::string& host_address, const Common::Redis::RespValue& request, Common::Redis::Client::PoolCallbacks& callbacks); void onClusterAddOrUpdateNonVirtual(Upstream::ThreadLocalCluster& cluster); + void onHostsAdded(const std::vector& hosts_added); void onHostsRemoved(const std::vector& hosts_removed); + void drainClients(); // Upstream::ClusterUpdateCallbacks void onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) override { @@ -102,6 +112,17 @@ class InstanceImpl : public Instance { Envoy::Common::CallbackHandle* host_set_member_update_cb_handle_{}; std::unordered_map host_address_map_; std::string auth_password_; + std::list created_via_redirect_hosts_; + std::list clients_to_drain_; + + /* This timer is used to poll the active clients in clients_to_drain_ to determine whether they + * have been drained (have no active requests) or not. It is only enabled after a client has + * been added to clients_to_drain_, and is only re-enabled as long as that list is not empty. A + * timer is being used as opposed to using a callback to avoid adding a check of + * clients_to_drain_ to the main data code path as this should only rarely be not empty. + */ + Event::TimerPtr drain_timer_; + bool is_redis_cluster_; }; Upstream::ClusterManager& cm_; @@ -109,7 +130,8 @@ class InstanceImpl : public Instance { ThreadLocal::SlotPtr tls_; Common::Redis::Client::ConfigImpl config_; Api::Api& api_; - Stats::SymbolTable& symbol_table_; + Stats::ScopePtr stats_scope_; + RedisClusterStats redis_cluster_stats_; }; } // namespace ConnPool diff --git a/source/extensions/health_checkers/redis/redis.h b/source/extensions/health_checkers/redis/redis.h index b046da958931c..63f9dd0de0d5f 100644 --- a/source/extensions/health_checkers/redis/redis.h +++ b/source/extensions/health_checkers/redis/redis.h @@ -77,6 +77,8 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { return std::chrono::milliseconds(1); } + uint32_t maxUpstreamUnknownConnections() const override { return 0; } + // Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override; void onFailure() override; diff --git a/test/extensions/clusters/redis/redis_cluster_test.cc b/test/extensions/clusters/redis/redis_cluster_test.cc index ef1603d87b76d..4d60412503ff2 100644 --- a/test/extensions/clusters/redis/redis_cluster_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_test.cc @@ -401,6 +401,7 @@ class RedisClusterTest : public testing::Test, RedisCluster::RedisDiscoverySession discovery_session(*cluster_, *this); EXPECT_FALSE(discovery_session.enableHashtagging()); EXPECT_EQ(discovery_session.bufferFlushTimeoutInMs(), std::chrono::milliseconds(0)); + EXPECT_EQ(discovery_session.maxUpstreamUnknownConnections(), 0); NetworkFilters::Common::Redis::RespValue dummy_value; dummy_value.type(NetworkFilters::Common::Redis::RespType::Error); diff --git a/test/extensions/filters/network/common/redis/client_impl_test.cc b/test/extensions/filters/network/common/redis/client_impl_test.cc index 444ef060931fb..ecea5c887c226 100644 --- a/test/extensions/filters/network/common/redis/client_impl_test.cc +++ b/test/extensions/filters/network/common/redis/client_impl_test.cc @@ -80,6 +80,7 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF *config_); EXPECT_EQ(1UL, host_->cluster_.stats_.upstream_cx_total_.value()); EXPECT_EQ(1UL, host_->stats_.cx_total_.value()); + EXPECT_EQ(false, client_->active()); // NOP currently. upstream_connection_->runHighWatermarkCallbacks(); @@ -95,6 +96,7 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF Common::Redis::RespValuePtr response1{new Common::Redis::RespValue()}; response1->type(Common::Redis::RespType::SimpleString); response1->asString() = "OK"; + EXPECT_EQ(true, client_->active()); ClientImpl* client_impl = dynamic_cast(client_.get()); EXPECT_NE(client_impl, nullptr); client_impl->onRespValue(std::move(response1)); @@ -155,6 +157,7 @@ class ConfigBufferSizeGTSingleRequest : public Config { std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return std::chrono::milliseconds(1); } + uint32_t maxUpstreamUnknownConnections() const override { return 0; } }; TEST_F(RedisClientImplTest, BatchWithTimerFiring) { @@ -466,6 +469,7 @@ class ConfigOutlierDisabled : public Config { std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return std::chrono::milliseconds(0); } + uint32_t maxUpstreamUnknownConnections() const override { return 0; } }; TEST_F(RedisClientImplTest, OutlierDisabled) { diff --git a/test/extensions/filters/network/common/redis/mocks.h b/test/extensions/filters/network/common/redis/mocks.h index 37b90626c4e4c..829983abd53da 100644 --- a/test/extensions/filters/network/common/redis/mocks.h +++ b/test/extensions/filters/network/common/redis/mocks.h @@ -69,6 +69,7 @@ class MockClient : public Client { } MOCK_METHOD1(addConnectionCallbacks, void(Network::ConnectionCallbacks& callbacks)); + MOCK_METHOD0(active, bool()); MOCK_METHOD0(close, void()); MOCK_METHOD2(makeRequest, PoolRequest*(const Common::Redis::RespValue& request, PoolCallbacks& callbacks)); diff --git a/test/extensions/filters/network/common/redis/test_utils.h b/test/extensions/filters/network/common/redis/test_utils.h index c81cb2647f95b..e720394b83f26 100644 --- a/test/extensions/filters/network/common/redis/test_utils.h +++ b/test/extensions/filters/network/common/redis/test_utils.h @@ -17,11 +17,12 @@ namespace Client { inline envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings createConnPoolSettings(int64_t millis = 20, bool hashtagging = true, - bool redirection_support = true) { + bool redirection_support = true, uint32_t max_unknown_conns = 100) { envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings setting{}; setting.mutable_op_timeout()->CopyFrom(Protobuf::util::TimeUtil::MillisecondsToDuration(millis)); setting.set_enable_hashtagging(hashtagging); setting.set_enable_redirection(redirection_support); + setting.mutable_max_upstream_unknown_connections()->set_value(max_unknown_conns); return setting; } diff --git a/test/extensions/filters/network/redis_proxy/BUILD b/test/extensions/filters/network/redis_proxy/BUILD index d7b4bc5dacb1b..1f9a52b6a0294 100644 --- a/test/extensions/filters/network/redis_proxy/BUILD +++ b/test/extensions/filters/network/redis_proxy/BUILD @@ -127,7 +127,6 @@ envoy_extension_cc_test( envoy_extension_cc_test( name = "redis_proxy_integration_test", - size = "small", srcs = ["redis_proxy_integration_test.cc"], extension_name = "envoy.filters.network.redis_proxy", deps = [ diff --git a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc index 5ecb9e9aed539..ee16cfaa50155 100644 --- a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc @@ -1,7 +1,6 @@ #include #include "common/network/utility.h" -#include "common/stats/fake_symbol_table_impl.h" #include "common/upstream/upstream_impl.h" #include "extensions/filters/network/common/redis/utility.h" @@ -40,7 +39,8 @@ namespace ConnPool { class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client::ClientFactory { public: - void setup(bool cluster_exists = true, bool hashtagging = true) { + void setup(bool cluster_exists = true, bool hashtagging = true, + uint32_t max_unknown_conns = 100) { EXPECT_CALL(cm_, addThreadLocalClusterUpdateCallbacks_(_)) .WillOnce(DoAll(SaveArgAddress(&update_callbacks_), ReturnNew())); @@ -48,9 +48,33 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client EXPECT_CALL(cm_, get(Eq("fake_cluster"))).WillOnce(Return(nullptr)); } + std::unique_ptr> store = + std::make_unique>(); + + upstream_cx_drained_.value_ = 0; + ON_CALL(*store, counter(Eq("upstream_cx_drained"))) + .WillByDefault(ReturnRef(upstream_cx_drained_)); + ON_CALL(upstream_cx_drained_, value()).WillByDefault(Invoke([&]() -> uint64_t { + return upstream_cx_drained_.value_; + })); + ON_CALL(upstream_cx_drained_, inc()).WillByDefault(Invoke([&]() { + upstream_cx_drained_.value_++; + })); + + max_upstream_unknown_connections_reached_.value_ = 0; + ON_CALL(*store, counter(Eq("max_upstream_unknown_connections_reached"))) + .WillByDefault(ReturnRef(max_upstream_unknown_connections_reached_)); + ON_CALL(max_upstream_unknown_connections_reached_, value()) + .WillByDefault( + Invoke([&]() -> uint64_t { return max_upstream_unknown_connections_reached_.value_; })); + ON_CALL(max_upstream_unknown_connections_reached_, inc()).WillByDefault(Invoke([&]() { + max_upstream_unknown_connections_reached_.value_++; + })); + std::unique_ptr conn_pool_impl = std::make_unique( cluster_name_, cm_, *this, tls_, - Common::Redis::Client::createConnPoolSettings(20, hashtagging, true), api_, *symbol_table_); + Common::Redis::Client::createConnPoolSettings(20, hashtagging, true, max_unknown_conns), + api_, std::move(store)); // Set the authentication password for this connection pool. conn_pool_impl->tls_->getTyped().auth_password_ = auth_password_; conn_pool_ = std::move(conn_pool_impl); @@ -86,6 +110,53 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client EXPECT_EQ(&active_request, request); } + std::unordered_map& + clientMap() { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + return conn_pool_impl->tls_->getTyped().client_map_; + } + + InstanceImpl::ThreadLocalActiveClient* clientMap(Upstream::HostConstSharedPtr host) { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + return conn_pool_impl->tls_->getTyped().client_map_[host].get(); + } + + std::unordered_map& hostAddressMap() { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + return conn_pool_impl->tls_->getTyped().host_address_map_; + } + + std::list& createdViaRedirectHosts() { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + return conn_pool_impl->tls_->getTyped() + .created_via_redirect_hosts_; + } + + std::list& clientsToDrain() { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + return conn_pool_impl->tls_->getTyped().clients_to_drain_; + } + + Event::TimerPtr& drainTimer() { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + return conn_pool_impl->tls_->getTyped().drain_timer_; + } + + void drainClients() { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + conn_pool_impl->tls_->getTyped().drainClients(); + } + + Stats::Counter& upstreamCxDrained() { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + return conn_pool_impl->redis_cluster_stats_.upstream_cx_drained_; + } + + Stats::Counter& maxUpstreamUnknownConnectionsReached() { + InstanceImpl* conn_pool_impl = dynamic_cast(conn_pool_.get()); + return conn_pool_impl->redis_cluster_stats_.max_upstream_unknown_connections_reached_; + } + // Common::Redis::Client::ClientFactory Common::Redis::Client::ClientPtr create(Upstream::HostConstSharedPtr host, Event::Dispatcher&, const Common::Redis::Client::Config&) override { @@ -95,7 +166,6 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client MOCK_METHOD1(create_, Common::Redis::Client::Client*(Upstream::HostConstSharedPtr host)); const std::string cluster_name_{"fake_cluster"}; - Envoy::Test::Global symbol_table_; NiceMock cm_; NiceMock tls_; InstanceSharedPtr conn_pool_; @@ -104,6 +174,8 @@ class RedisConnPoolImplTest : public testing::Test, public Common::Redis::Client Network::Address::InstanceConstSharedPtr test_address_; std::string auth_password_; NiceMock api_; + NiceMock upstream_cx_drained_; + NiceMock max_upstream_unknown_connections_reached_; }; TEST_F(RedisConnPoolImplTest, Basic) { @@ -234,7 +306,7 @@ TEST_F(RedisConnPoolImplTest, HashtaggingNotEnabled) { tls_.shutdownThread(); }; -// Conn pool created when no cluster exists at creation time. Dynamic cluster creation and removal +// ConnPool created when no cluster exists at creation time. Dynamic cluster creation and removal // work correctly. TEST_F(RedisConnPoolImplTest, NoClusterAtConstruction) { InSequence s; @@ -285,6 +357,8 @@ TEST_F(RedisConnPoolImplTest, NoClusterAtConstruction) { update_callbacks_->onClusterRemoval("fake_cluster"); } +// This test removes a single host from the ConnPool after learning about 2 hosts from the +// associated load balancer. TEST_F(RedisConnPoolImplTest, HostRemove) { InSequence s; @@ -329,6 +403,23 @@ TEST_F(RedisConnPoolImplTest, HostRemove) { testing::Mock::AllowLeak(host2.get()); } +// This test removes a host from a ConnPool that was never added in the first place. No errors +// should be encountered. +TEST_F(RedisConnPoolImplTest, HostRemovedNeverAdded) { + InSequence s; + + setup(); + + std::shared_ptr host1(new Upstream::MockHost()); + auto host1_test_address = Network::Utility::resolveUrl("tcp://10.0.0.1:3000"); + EXPECT_CALL(*host1, address()).WillOnce(Return(host1_test_address)); + EXPECT_NO_THROW(cm_.thread_local_cluster_.cluster_.prioritySet().getMockHostSet(0)->runCallbacks( + {}, {host1})); + EXPECT_EQ(hostAddressMap().size(), 0); + + tls_.shutdownThread(); +} + TEST_F(RedisConnPoolImplTest, DeleteFollowedByClusterUpdateCallback) { setup(); conn_pool_.reset(); @@ -377,7 +468,7 @@ TEST_F(RedisConnPoolImplTest, RemoteClose) { tls_.shutdownThread(); } -TEST_F(RedisConnPoolImplTest, makeRequestToHost) { +TEST_F(RedisConnPoolImplTest, MakeRequestToHost) { InSequence s; setup(false); @@ -437,7 +528,22 @@ TEST_F(RedisConnPoolImplTest, makeRequestToHost) { tls_.shutdownThread(); } -TEST_F(RedisConnPoolImplTest, makeRequestToHostWithAuthPassword) { +TEST_F(RedisConnPoolImplTest, MakeRequestToHostWithZeroMaxUnknownUpstreamConnectionLimit) { + InSequence s; + + // Create a ConnPool with a max_upstream_unknown_connections setting of 0. + setup(true, true, 0); + + Common::Redis::RespValue value; + Common::Redis::Client::MockPoolCallbacks callbacks1; + + // The max_unknown_upstream_connections is set to 0. Request should fail. + EXPECT_EQ(nullptr, conn_pool_->makeRequestToHost("10.0.0.1:3000", value, callbacks1)); + EXPECT_EQ(maxUpstreamUnknownConnectionsReached().value(), 1); + tls_.shutdownThread(); +} + +TEST_F(RedisConnPoolImplTest, MakeRequestToHostWithAuthPassword) { InSequence s; auth_password_ = "superduperpassword"; @@ -490,17 +596,275 @@ TEST_F(RedisConnPoolImplTest, makeRequestToHostWithAuthPassword) { tls_.shutdownThread(); } +// This test forces the creation of 2 hosts (one with an IPv4 address, and the other with an IPv6 +// address) and pending requests using makeRequestToHost(). After their creation, "new" hosts are +// discovered, and the original hosts are put aside to drain. The test then verifies the drain +// logic. +TEST_F(RedisConnPoolImplTest, HostsAddedAndRemovedWithDraining) { + setup(); + + Common::Redis::RespValue value; + Common::Redis::Client::MockPoolRequest auth_request1, active_request1; + Common::Redis::Client::MockPoolRequest auth_request2, active_request2; + Common::Redis::Client::MockPoolCallbacks callbacks1; + Common::Redis::Client::MockPoolCallbacks callbacks2; + Common::Redis::Client::MockClient* client1 = new NiceMock(); + Common::Redis::Client::MockClient* client2 = new NiceMock(); + Upstream::HostConstSharedPtr host1; + Upstream::HostConstSharedPtr host2; + + EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host1), Return(client1))); + EXPECT_CALL(*client1, makeRequest(Ref(value), Ref(callbacks1))) + .WillOnce(Return(&active_request1)); + Common::Redis::Client::PoolRequest* request1 = + conn_pool_->makeRequestToHost("10.0.0.1:3000", value, callbacks1); + EXPECT_EQ(&active_request1, request1); + EXPECT_EQ(host1->address()->asString(), "10.0.0.1:3000"); + + // IPv6 address returned from Redis server will not have square brackets + // around it, while Envoy represents Address::Ipv6Instance addresses with square brackets around + // the address. + EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host2), Return(client2))); + EXPECT_CALL(*client2, makeRequest(Ref(value), Ref(callbacks2))) + .WillOnce(Return(&active_request2)); + Common::Redis::Client::PoolRequest* request2 = + conn_pool_->makeRequestToHost("2001:470:813B:0:0:0:0:1:3333", value, callbacks2); + EXPECT_EQ(&active_request2, request2); + EXPECT_EQ(host2->address()->asString(), "[2001:470:813b::1]:3333"); + + std::unordered_map& host_address_map = + hostAddressMap(); + EXPECT_EQ(host_address_map.size(), 2); // host1 and host2 have been created. + EXPECT_EQ(host_address_map[host1->address()->asString()], host1); + EXPECT_EQ(host_address_map[host2->address()->asString()], host2); + EXPECT_EQ(clientMap().size(), 2); + EXPECT_NE(clientMap().find(host1), clientMap().end()); + EXPECT_NE(clientMap().find(host2), clientMap().end()); + void* host1_active_client = clientMap(host1); + EXPECT_EQ(createdViaRedirectHosts().size(), 2); + EXPECT_EQ(clientsToDrain().size(), 0); + EXPECT_EQ(drainTimer()->enabled(), false); + + std::shared_ptr new_host1(new Upstream::MockHost()); + std::shared_ptr new_host2(new Upstream::MockHost()); + auto new_host1_test_address = Network::Utility::resolveUrl("tcp://10.0.0.1:3000"); + auto new_host2_test_address = Network::Utility::resolveUrl("tcp://[2001:470:813b::1]:3333"); + EXPECT_CALL(*new_host1, address()).WillRepeatedly(Return(new_host1_test_address)); + EXPECT_CALL(*new_host2, address()).WillRepeatedly(Return(new_host2_test_address)); + EXPECT_CALL(*client1, active()).WillOnce(Return(true)); + EXPECT_CALL(*client2, active()).WillOnce(Return(false)); + EXPECT_CALL(*client2, close()); + + cm_.thread_local_cluster_.cluster_.prioritySet().getMockHostSet(0)->runCallbacks( + {new_host1, new_host2}, {}); + + host_address_map = hostAddressMap(); + EXPECT_EQ(host_address_map.size(), 2); // new_host1 and new_host2 have been added. + EXPECT_EQ(host_address_map[new_host1_test_address->asString()], new_host1); + EXPECT_EQ(host_address_map[new_host2_test_address->asString()], new_host2); + EXPECT_EQ(clientMap().size(), 0); + EXPECT_EQ(createdViaRedirectHosts().size(), 0); + EXPECT_EQ(clientsToDrain().size(), 1); // client2 has already been drained. + EXPECT_EQ(clientsToDrain().front().get(), host1_active_client); // client1 is still active. + EXPECT_EQ(drainTimer()->enabled(), true); + + cm_.thread_local_cluster_.cluster_.prioritySet().getMockHostSet(0)->runCallbacks( + {}, {new_host1, new_host2}); + + EXPECT_EQ(host_address_map.size(), 0); // new_host1 and new_host2 have been removed. + EXPECT_EQ(clientMap().size(), 0); + EXPECT_EQ(createdViaRedirectHosts().size(), 0); + EXPECT_EQ(clientsToDrain().size(), 1); + EXPECT_EQ(clientsToDrain().front().get(), host1_active_client); + EXPECT_EQ(drainTimer()->enabled(), true); + + EXPECT_CALL(*client1, active()).WillOnce(Return(true)); + drainTimer()->disableTimer(); + drainClients(); + EXPECT_EQ(clientsToDrain().size(), 1); // Nothing happened. client1 is still active. + EXPECT_EQ(drainTimer()->enabled(), true); + + EXPECT_CALL(*client1, active()).Times(2).WillRepeatedly(Return(false)); + EXPECT_CALL(*client1, close()); + drainTimer()->disableTimer(); + drainClients(); + EXPECT_EQ(clientsToDrain().size(), 0); // client1 has been drained and closed. + EXPECT_EQ(drainTimer()->enabled(), false); + EXPECT_EQ(upstreamCxDrained().value(), 1); + + tls_.shutdownThread(); +} + +// This test creates 2 hosts (one with an IPv4 address, and the other with an IPv6 +// address) and pending requests using makeRequestToHost(). After their creation, "new" hosts are +// discovered (added), and the original hosts are put aside to drain. Destructors are then +// called on these not yet drained clients, and the underlying connections should be closed. +TEST_F(RedisConnPoolImplTest, HostsAddedAndEndWithNoDraining) { + setup(); + + Common::Redis::RespValue value; + Common::Redis::Client::MockPoolRequest auth_request1, active_request1; + Common::Redis::Client::MockPoolRequest auth_request2, active_request2; + Common::Redis::Client::MockPoolCallbacks callbacks1; + Common::Redis::Client::MockPoolCallbacks callbacks2; + Common::Redis::Client::MockClient* client1 = new NiceMock(); + Common::Redis::Client::MockClient* client2 = new NiceMock(); + Upstream::HostConstSharedPtr host1; + Upstream::HostConstSharedPtr host2; + + EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host1), Return(client1))); + EXPECT_CALL(*client1, makeRequest(Ref(value), Ref(callbacks1))) + .WillOnce(Return(&active_request1)); + Common::Redis::Client::PoolRequest* request1 = + conn_pool_->makeRequestToHost("10.0.0.1:3000", value, callbacks1); + EXPECT_EQ(&active_request1, request1); + EXPECT_EQ(host1->address()->asString(), "10.0.0.1:3000"); + + // IPv6 address returned from Redis server will not have square brackets + // around it, while Envoy represents Address::Ipv6Instance addresses with square brackets around + // the address. + EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host2), Return(client2))); + EXPECT_CALL(*client2, makeRequest(Ref(value), Ref(callbacks2))) + .WillOnce(Return(&active_request2)); + Common::Redis::Client::PoolRequest* request2 = + conn_pool_->makeRequestToHost("2001:470:813B:0:0:0:0:1:3333", value, callbacks2); + EXPECT_EQ(&active_request2, request2); + EXPECT_EQ(host2->address()->asString(), "[2001:470:813b::1]:3333"); + + std::unordered_map& host_address_map = + hostAddressMap(); + EXPECT_EQ(host_address_map.size(), 2); // host1 and host2 have been created. + EXPECT_EQ(host_address_map[host1->address()->asString()], host1); + EXPECT_EQ(host_address_map[host2->address()->asString()], host2); + EXPECT_EQ(clientMap().size(), 2); + EXPECT_NE(clientMap().find(host1), clientMap().end()); + EXPECT_NE(clientMap().find(host2), clientMap().end()); + EXPECT_EQ(createdViaRedirectHosts().size(), 2); + EXPECT_EQ(clientsToDrain().size(), 0); + EXPECT_EQ(drainTimer()->enabled(), false); + + std::shared_ptr new_host1(new Upstream::MockHost()); + std::shared_ptr new_host2(new Upstream::MockHost()); + auto new_host1_test_address = Network::Utility::resolveUrl("tcp://10.0.0.1:3000"); + auto new_host2_test_address = Network::Utility::resolveUrl("tcp://[2001:470:813b::1]:3333"); + EXPECT_CALL(*new_host1, address()).WillRepeatedly(Return(new_host1_test_address)); + EXPECT_CALL(*new_host2, address()).WillRepeatedly(Return(new_host2_test_address)); + EXPECT_CALL(*client1, active()).WillOnce(Return(true)); + EXPECT_CALL(*client2, active()).WillOnce(Return(true)); + + cm_.thread_local_cluster_.cluster_.prioritySet().getMockHostSet(0)->runCallbacks( + {new_host1, new_host2}, {}); + + host_address_map = hostAddressMap(); + EXPECT_EQ(host_address_map.size(), 2); // new_host1 and new_host2 have been added. + EXPECT_EQ(host_address_map[new_host1_test_address->asString()], new_host1); + EXPECT_EQ(host_address_map[new_host2_test_address->asString()], new_host2); + EXPECT_EQ(clientMap().size(), 0); + EXPECT_EQ(createdViaRedirectHosts().size(), 0); + EXPECT_EQ(clientsToDrain().size(), 2); // host1 and host2 have been put aside to drain. + EXPECT_EQ(drainTimer()->enabled(), true); + + EXPECT_CALL(*client1, close()); + EXPECT_CALL(*client2, close()); + EXPECT_CALL(*client1, active()).WillOnce(Return(true)); + EXPECT_CALL(*client2, active()).WillOnce(Return(true)); + EXPECT_EQ(upstreamCxDrained().value(), 0); + + tls_.shutdownThread(); +} + +// This test creates 2 hosts (one with an IPv4 address, and the other with an IPv6 +// address) and pending requests using makeRequestToHost(). After their creation, "new" hosts are +// discovered (added), and the original hosts are put aside to drain. The cluster is removed and the +// underlying connections should be closed. +TEST_F(RedisConnPoolImplTest, HostsAddedAndEndWithClusterRemoval) { + setup(); + + Common::Redis::RespValue value; + Common::Redis::Client::MockPoolRequest auth_request1, active_request1; + Common::Redis::Client::MockPoolRequest auth_request2, active_request2; + Common::Redis::Client::MockPoolCallbacks callbacks1; + Common::Redis::Client::MockPoolCallbacks callbacks2; + Common::Redis::Client::MockClient* client1 = new NiceMock(); + Common::Redis::Client::MockClient* client2 = new NiceMock(); + Upstream::HostConstSharedPtr host1; + Upstream::HostConstSharedPtr host2; + + EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host1), Return(client1))); + EXPECT_CALL(*client1, makeRequest(Ref(value), Ref(callbacks1))) + .WillOnce(Return(&active_request1)); + Common::Redis::Client::PoolRequest* request1 = + conn_pool_->makeRequestToHost("10.0.0.1:3000", value, callbacks1); + EXPECT_EQ(&active_request1, request1); + EXPECT_EQ(host1->address()->asString(), "10.0.0.1:3000"); + + // IPv6 address returned from Redis server will not have square brackets + // around it, while Envoy represents Address::Ipv6Instance addresses with square brackets around + // the address. + EXPECT_CALL(*this, create_(_)).WillOnce(DoAll(SaveArg<0>(&host2), Return(client2))); + EXPECT_CALL(*client2, makeRequest(Ref(value), Ref(callbacks2))) + .WillOnce(Return(&active_request2)); + Common::Redis::Client::PoolRequest* request2 = + conn_pool_->makeRequestToHost("2001:470:813B:0:0:0:0:1:3333", value, callbacks2); + EXPECT_EQ(&active_request2, request2); + EXPECT_EQ(host2->address()->asString(), "[2001:470:813b::1]:3333"); + + std::unordered_map& host_address_map = + hostAddressMap(); + EXPECT_EQ(host_address_map.size(), 2); // host1 and host2 have been created. + EXPECT_EQ(host_address_map[host1->address()->asString()], host1); + EXPECT_EQ(host_address_map[host2->address()->asString()], host2); + EXPECT_EQ(clientMap().size(), 2); + EXPECT_NE(clientMap().find(host1), clientMap().end()); + EXPECT_NE(clientMap().find(host2), clientMap().end()); + EXPECT_EQ(createdViaRedirectHosts().size(), 2); + EXPECT_EQ(clientsToDrain().size(), 0); + EXPECT_EQ(drainTimer()->enabled(), false); + + std::shared_ptr new_host1(new Upstream::MockHost()); + std::shared_ptr new_host2(new Upstream::MockHost()); + auto new_host1_test_address = Network::Utility::resolveUrl("tcp://10.0.0.1:3000"); + auto new_host2_test_address = Network::Utility::resolveUrl("tcp://[2001:470:813b::1]:3333"); + EXPECT_CALL(*new_host1, address()).WillRepeatedly(Return(new_host1_test_address)); + EXPECT_CALL(*new_host2, address()).WillRepeatedly(Return(new_host2_test_address)); + EXPECT_CALL(*client1, active()).WillOnce(Return(true)); + EXPECT_CALL(*client2, active()).WillOnce(Return(true)); + + cm_.thread_local_cluster_.cluster_.prioritySet().getMockHostSet(0)->runCallbacks( + {new_host1, new_host2}, {}); + + host_address_map = hostAddressMap(); + EXPECT_EQ(host_address_map.size(), 2); // new_host1 and new_host2 have been added. + EXPECT_EQ(host_address_map[new_host1_test_address->asString()], new_host1); + EXPECT_EQ(host_address_map[new_host2_test_address->asString()], new_host2); + EXPECT_EQ(clientMap().size(), 0); + EXPECT_EQ(createdViaRedirectHosts().size(), 0); + EXPECT_EQ(clientsToDrain().size(), 2); // host1 and host2 have been put aside to drain. + EXPECT_EQ(drainTimer()->enabled(), true); + + EXPECT_CALL(*client1, close()); + EXPECT_CALL(*client2, close()); + EXPECT_CALL(*client1, active()).WillOnce(Return(true)); + EXPECT_CALL(*client2, active()).WillOnce(Return(true)); + update_callbacks_->onClusterRemoval("fake_cluster"); + + EXPECT_EQ(hostAddressMap().size(), 0); + EXPECT_EQ(clientMap().size(), 0); + EXPECT_EQ(clientsToDrain().size(), 0); + EXPECT_EQ(upstreamCxDrained().value(), 0); + + tls_.shutdownThread(); +} + TEST_F(RedisConnPoolImplTest, MakeRequestToRedisCluster) { absl::optional cluster_type; cluster_type.emplace(); cluster_type->set_name("envoy.clusters.redis"); EXPECT_CALL(*cm_.thread_local_cluster_.cluster_.info_, clusterType()) - .Times(2) - .WillRepeatedly(ReturnRef(cluster_type)); + .WillOnce(ReturnRef(cluster_type)); EXPECT_CALL(*cm_.thread_local_cluster_.cluster_.info_, lbType()) - .Times(2) - .WillRepeatedly(Return(Upstream::LoadBalancerType::ClusterProvided)); + .WillOnce(Return(Upstream::LoadBalancerType::ClusterProvided)); setup(); @@ -518,11 +882,9 @@ TEST_F(RedisConnPoolImplTest, MakeRequestToRedisClusterHashtag) { cluster_type.emplace(); cluster_type->set_name("envoy.clusters.redis"); EXPECT_CALL(*cm_.thread_local_cluster_.cluster_.info_, clusterType()) - .Times(2) - .WillRepeatedly(ReturnRef(cluster_type)); + .WillOnce(ReturnRef(cluster_type)); EXPECT_CALL(*cm_.thread_local_cluster_.cluster_.info_, lbType()) - .Times(2) - .WillRepeatedly(Return(Upstream::LoadBalancerType::ClusterProvided)); + .WillOnce(Return(Upstream::LoadBalancerType::ClusterProvided)); setup(); diff --git a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc index c16599e6e2743..444bbc4c1e47b 100644 --- a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc +++ b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc @@ -976,7 +976,8 @@ TEST_P(RedisProxyWithMirrorsIntegrationTest, ExcludeReadCommands) { // command is not mirrored to cluster 1 FakeRawConnectionPtr cluster_1_connection; - EXPECT_FALSE(fake_upstreams_[2]->waitForRawConnection(cluster_1_connection)); + EXPECT_FALSE(fake_upstreams_[2]->waitForRawConnection(cluster_1_connection, + std::chrono::milliseconds(500))); redis_client->waitForData(get_response); EXPECT_EQ(get_response, redis_client->data()); diff --git a/test/extensions/health_checkers/redis/redis_test.cc b/test/extensions/health_checkers/redis/redis_test.cc index aa171538ee0f6..d5741a158eb4d 100644 --- a/test/extensions/health_checkers/redis/redis_test.cc +++ b/test/extensions/health_checkers/redis/redis_test.cc @@ -167,6 +167,7 @@ class RedisHealthCheckerTest EXPECT_TRUE(session->enableRedirection()); EXPECT_EQ(session->maxBufferSizeBeforeFlush(), 0); EXPECT_EQ(session->bufferFlushTimeoutInMs(), std::chrono::milliseconds(1)); + EXPECT_EQ(session->maxUpstreamUnknownConnections(), 0); session->onDeferredDeleteBase(); // This must be called to pass assertions in the destructor. }