diff --git a/envoy/upstream/cluster_manager.h b/envoy/upstream/cluster_manager.h index 180045116ec32..c751bfa9c286f 100644 --- a/envoy/upstream/cluster_manager.h +++ b/envoy/upstream/cluster_manager.h @@ -310,6 +310,11 @@ class ClusterManager { virtual const ClusterRequestResponseSizeStatNames& clusterRequestResponseSizeStatNames() const PURE; virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE; + + /** + * Drain all connection pool connections owned by this cluster. + */ + virtual void drainConnections(const std::string& cluster) PURE; }; using ClusterManagerPtr = std::unique_ptr; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 886323490df20..0943c0be4d662 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -485,7 +485,7 @@ void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) { if (cluster.info()->lbConfig().close_connections_on_host_set_change()) { for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) { // This will drain all tcp and http connection pools. - postThreadLocalDrainConnections(cluster, host_set->hosts()); + postThreadLocalRemoveHosts(cluster, host_set->hosts()); } } else { // TODO(snowp): Should this be subject to merge windows? @@ -495,7 +495,7 @@ void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) { // enabled, this case will be covered by first `if` statement, where all // connection pools are drained. if (!hosts_removed.empty()) { - postThreadLocalDrainConnections(cluster, hosts_removed); + postThreadLocalRemoveHosts(cluster, hosts_removed); } } }); @@ -956,8 +956,17 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( return data; } -void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster, - const HostVector& hosts_removed) { +void ClusterManagerImpl::drainConnections(const std::string& cluster) { + tls_.runOnAllThreads([cluster](OptRef cluster_manager) { + auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster); + if (cluster_entry != cluster_manager->thread_local_clusters_.end()) { + cluster_entry->second->drainConnPools(); + } + }); +} + +void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster, + const HostVector& hosts_removed) { tls_.runOnAllThreads([name = cluster.info()->name(), hosts_removed](OptRef cluster_manager) { cluster_manager->removeHosts(name, hosts_removed); @@ -1387,6 +1396,12 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( } } +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools() { + for (auto& host_set : priority_set_.hostSetsPerPriority()) { + parent_.drainConnPools(host_set->hosts()); + } +} + ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() { // We need to drain all connection pools for the cluster being removed. Then we can remove the // cluster. @@ -1394,9 +1409,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() // TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of // the hosts inside of the HostImpl destructor. That is a change with wide implications, so we are // going with a more targeted approach for now. - for (auto& host_set : priority_set_.hostSetsPerPriority()) { - parent_.drainConnPools(host_set->hosts()); - } + drainConnPools(); } Http::ConnectionPool::Instance* diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index b7a982aa47c9f..5bb3e1dff6463 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -315,9 +315,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::LoggablewaitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0); } +// Verify that the drainConnections() cluster manager API works correctly. +TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApi) { + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeRequestWithBody(default_request_headers_, 1024); + waitForNextUpstreamRequest(); + + // Validate that the circuit breaker config is setup as we expect. + test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 1); + + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData(512, true); + ASSERT_TRUE(response->waitForEndStream()); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response->complete()); + + // Drain connection pools via API. Need to post this to the server thread. + test_server_->server().dispatcher().post( + [this] { test_server_->server().clusterManager().drainConnections("cluster_0"); }); + + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + + // Validate that the pool is deleted when it becomes idle. + test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0); +} + } // namespace } // namespace Envoy diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index ff5649caf5170..312a8d71822bd 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -68,6 +68,7 @@ class MockClusterManager : public ClusterManager { const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override { return cluster_timeout_budget_stat_names_; } + MOCK_METHOD(void, drainConnections, (const std::string& cluster)); NiceMock thread_local_cluster_; envoy::config::core::v3::BindConfig bind_config_;