diff --git a/envoy/upstream/cluster_manager.h b/envoy/upstream/cluster_manager.h index c751bfa9c286f..20ca0acca4df0 100644 --- a/envoy/upstream/cluster_manager.h +++ b/envoy/upstream/cluster_manager.h @@ -313,8 +313,14 @@ class ClusterManager { /** * Drain all connection pool connections owned by this cluster. + * @param cluster, the cluster to drain. */ virtual void drainConnections(const std::string& cluster) PURE; + + /** + * Drain all connection pool connections owned by all clusters in the cluster manager. + */ + virtual void drainConnections() PURE; }; using ClusterManagerPtr = std::unique_ptr; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 0943c0be4d662..f0a1d07cc0525 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -965,6 +965,14 @@ void ClusterManagerImpl::drainConnections(const std::string& cluster) { }); } +void ClusterManagerImpl::drainConnections() { + tls_.runOnAllThreads([](OptRef cluster_manager) { + for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) { + cluster_entry.second->drainConnPools(); + } + }); +} + void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster, const HostVector& hosts_removed) { tls_.runOnAllThreads([name = cluster.info()->name(), diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 5bb3e1dff6463..3b3cc3e225f3c 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -317,6 +317,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::LoggableAdd(); threshold->mutable_max_connection_pools()->set_value(1); - bootstrap.mutable_static_resources() - ->mutable_clusters(0) - ->mutable_circuit_breakers() - ->MergeFrom(circuit_breakers); + auto* static_resources = bootstrap.mutable_static_resources(); + for (int i = 0; i < static_resources->clusters_size(); ++i) { + static_resources->mutable_clusters(i)->mutable_circuit_breakers()->MergeFrom( + circuit_breakers); + } }); HttpProtocolIntegrationTest::initialize(); } @@ -88,7 +89,7 @@ TEST_P(HttpConnPoolIntegrationTest, PoolCleanupAfterRemoteClose) { } // Verify that the drainConnections() cluster manager API works correctly. -TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApi) { +TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiSpecificCluster) { initialize(); codec_client_ = makeHttpConnection(lookupPort("http")); @@ -115,5 +116,68 @@ TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApi) { test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0); } +// Verify that the drainConnections() cluster manager API works correctly. +TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiAllClusters) { + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + bootstrap.mutable_static_resources()->mutable_clusters()->Add()->MergeFrom( + *bootstrap.mutable_static_resources()->mutable_clusters(0)); + bootstrap.mutable_static_resources()->mutable_clusters(1)->set_name("cluster_1"); + }); + + setUpstreamCount(2); + + auto host = config_helper_.createVirtualHost("cluster_1.com", "/", "cluster_1"); + config_helper_.addVirtualHost(host); + + config_helper_.setDefaultHostAndRoute("cluster_0.com", "/"); + + initialize(); + + // Request Flow to cluster_0. + codec_client_ = makeHttpConnection(lookupPort("http")); + default_request_headers_.setHost("cluster_0.com"); + auto response = codec_client_->makeRequestWithBody(default_request_headers_, 1024); + waitForNextUpstreamRequest(); + + // Validate that the circuit breaker config is setup as we expect. + test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 1); + + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData(512, true); + ASSERT_TRUE(response->waitForEndStream()); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response->complete()); + + auto first_connection = std::move(fake_upstream_connection_); + codec_client_->close(); + + // Request Flow to cluster_1. + codec_client_ = makeHttpConnection(lookupPort("http")); + default_request_headers_.setHost("cluster_1.com"); + response = codec_client_->makeRequestWithBody(default_request_headers_, 1024); + waitForNextUpstreamRequest(1); + + // Validate that the circuit breaker config is setup as we expect. + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_pool_open", 1); + + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData(512, true); + ASSERT_TRUE(response->waitForEndStream()); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response->complete()); + + // Drain connection pools via API. Need to post this to the server thread. + test_server_->server().dispatcher().post( + [this] { test_server_->server().clusterManager().drainConnections(); }); + + ASSERT_TRUE(first_connection->waitForDisconnect()); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + + test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0); + test_server_->waitForGaugeEq("cluster.cluster_1.circuit_breakers.default.cx_pool_open", 0); +} + } // namespace } // namespace Envoy diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index 312a8d71822bd..f8b43ddb76557 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -69,6 +69,7 @@ class MockClusterManager : public ClusterManager { return cluster_timeout_budget_stat_names_; } MOCK_METHOD(void, drainConnections, (const std::string& cluster)); + MOCK_METHOD(void, drainConnections, ()); NiceMock thread_local_cluster_; envoy::config::core::v3::BindConfig bind_config_;