From 3bf207dc3561a5335b1bc978d90e4734ff70df13 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Thu, 28 Apr 2022 16:16:09 +0000 Subject: [PATCH 1/3] cluster manager: add optional host predicate to drainConnections() This will allow EM to selectively drain connections based on hostname(), for example. Signed-off-by: Matt Klein --- envoy/upstream/cluster_manager.h | 19 +++++++++-- .../common/upstream/cluster_manager_impl.cc | 20 +++++++---- source/common/upstream/cluster_manager_impl.h | 7 ++-- .../http_conn_pool_integration_test.cc | 33 +++++++++++++++++-- test/mocks/upstream/cluster_manager.h | 5 +-- 5 files changed, 68 insertions(+), 16 deletions(-) diff --git a/envoy/upstream/cluster_manager.h b/envoy/upstream/cluster_manager.h index a7d99e249ba38..c8199bf3f8395 100644 --- a/envoy/upstream/cluster_manager.h +++ b/envoy/upstream/cluster_manager.h @@ -385,16 +385,31 @@ class ClusterManager { clusterRequestResponseSizeStatNames() const PURE; virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE; + /** + * Predicate function used in drainConnections(). + * @param host supplies the host that is about to be drained. + * @return true if the host should be drained, and false otherwise. + * + * IMPORTANT: This predicate must be completely self contained and thread safe. It will be posted + * to all worker threads and run concurrently. + */ + using DrainConnectionsHostPredicate = std::function; + /** * Drain all connection pool connections owned by this cluster. * @param cluster, the cluster to drain. + * @param predicate supplies the optional drain connections host predicate. If not supplied, all + * hosts are drained. */ - virtual void drainConnections(const std::string& cluster) PURE; + virtual void drainConnections(const std::string& cluster, + DrainConnectionsHostPredicate predicate) PURE; /** * Drain all connection pool connections owned by all clusters in the cluster manager. + * @param predicate supplies the optional drain connections host predicate. If not supplied, all + * hosts are drained. */ - virtual void drainConnections() PURE; + virtual void drainConnections(DrainConnectionsHostPredicate predicate) PURE; /** * Check if the cluster is active and statically configured, and if not, throw exception. diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 5c5258bfd76c3..6d24e5d9ffa28 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -989,25 +989,26 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( return data; } -void ClusterManagerImpl::drainConnections(const std::string& cluster) { +void ClusterManagerImpl::drainConnections(const std::string& cluster, + DrainConnectionsHostPredicate predicate) { ENVOY_LOG_EVENT(debug, "drain_connections_call", "drainConnections called for cluster {}", cluster); - tls_.runOnAllThreads([cluster](OptRef cluster_manager) { + tls_.runOnAllThreads([cluster, predicate](OptRef cluster_manager) { auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster); if (cluster_entry != cluster_manager->thread_local_clusters_.end()) { - cluster_entry->second->drainAllConnPools(); + cluster_entry->second->drainAllConnPools(predicate); } }); } -void ClusterManagerImpl::drainConnections() { +void ClusterManagerImpl::drainConnections(DrainConnectionsHostPredicate predicate) { ENVOY_LOG_EVENT(debug, "drain_connections_call_for_all_clusters", "drainConnections called for all clusters"); - tls_.runOnAllThreads([](OptRef cluster_manager) { + tls_.runOnAllThreads([predicate](OptRef cluster_manager) { for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) { - cluster_entry.second->drainAllConnPools(); + cluster_entry.second->drainAllConnPools(predicate); } }); } @@ -1558,9 +1559,14 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnP } } -void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainAllConnPools() { +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainAllConnPools( + DrainConnectionsHostPredicate predicate) { for (auto& host_set : priority_set_.hostSetsPerPriority()) { for (const HostSharedPtr& host : host_set->hosts()) { + if (predicate != nullptr && !predicate(*host)) { + continue; + } + parent_.drainAllConnPoolsWorker(host); } } diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index f331e34f6edb5..ab3c25e242446 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -339,9 +339,10 @@ class ClusterManagerImpl : public ClusterManager, return cluster_timeout_budget_stat_names_; } - void drainConnections(const std::string& cluster) override; + void drainConnections(const std::string& cluster, + DrainConnectionsHostPredicate predicate) override; - void drainConnections() override; + void drainConnections(DrainConnectionsHostPredicate predicate) override; void checkActiveStaticCluster(const std::string& cluster) override; @@ -512,7 +513,7 @@ class ClusterManagerImpl : public ClusterManager, // Drains idle clients in connection pools for all hosts. void drainConnPools(); // Drain all clients in connection pools for all hosts. - void drainAllConnPools(); + void drainAllConnPools(DrainConnectionsHostPredicate predicate); private: Http::ConnectionPool::Instance* diff --git a/test/integration/http_conn_pool_integration_test.cc b/test/integration/http_conn_pool_integration_test.cc index d320be28d5e38..d43acfdf99520 100644 --- a/test/integration/http_conn_pool_integration_test.cc +++ b/test/integration/http_conn_pool_integration_test.cc @@ -106,7 +106,7 @@ TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiSpecificCluster) { // Drain connection pools via API. Need to post this to the server thread. test_server_->server().dispatcher().post( - [this] { test_server_->server().clusterManager().drainConnections("cluster_0"); }); + [this] { test_server_->server().clusterManager().drainConnections("cluster_0", nullptr); }); ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); @@ -114,6 +114,35 @@ TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiSpecificCluster) { test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0); } +// Verify the drainConnections() with a predicate is able to filter host drains. +TEST_P(HttpConnPoolIntegrationTest, DrainConnectionsWithPredicate) { + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = codec_client_->makeRequestWithBody(default_request_headers_, 1024); + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData(512, true); + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response->complete()); + + // Perform a drain request which doesn't actually do a drain. + test_server_->server().dispatcher().post([this] { + test_server_->server().clusterManager().drainConnections( + "cluster_0", [](const Upstream::Host&) { return false; }); + }); + + // The existing upstream connection should continue to work. + response = codec_client_->makeRequestWithBody(default_request_headers_, 1024); + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData(512, true); + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response->complete()); +} + // Verify that the drainConnections() cluster manager API works correctly. TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiAllClusters) { config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { @@ -168,7 +197,7 @@ TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiAllClusters) { // Drain connection pools via API. Need to post this to the server thread. test_server_->server().dispatcher().post( - [this] { test_server_->server().clusterManager().drainConnections(); }); + [this] { test_server_->server().clusterManager().drainConnections(nullptr); }); ASSERT_TRUE(first_connection->waitForDisconnect()); ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index fb38f12bc13b1..59dfce2e01c6b 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -69,8 +69,9 @@ class MockClusterManager : public ClusterManager { const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override { return cluster_timeout_budget_stat_names_; } - MOCK_METHOD(void, drainConnections, (const std::string& cluster)); - MOCK_METHOD(void, drainConnections, ()); + MOCK_METHOD(void, drainConnections, + (const std::string& cluster, DrainConnectionsHostPredicate predicate)); + MOCK_METHOD(void, drainConnections, (DrainConnectionsHostPredicate predicate)); MOCK_METHOD(void, checkActiveStaticCluster, (const std::string& cluster)); MOCK_METHOD(OdCdsApiHandlePtr, allocateOdCdsApi, (const envoy::config::core::v3::ConfigSource& odcds_config, From 320340d2e1296504df9c4b7787760e5be68d2014 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Thu, 28 Apr 2022 20:14:49 +0000 Subject: [PATCH 2/3] comments Signed-off-by: Matt Klein --- test/integration/http_conn_pool_integration_test.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/integration/http_conn_pool_integration_test.cc b/test/integration/http_conn_pool_integration_test.cc index d43acfdf99520..2ab6fa7abc36c 100644 --- a/test/integration/http_conn_pool_integration_test.cc +++ b/test/integration/http_conn_pool_integration_test.cc @@ -141,6 +141,15 @@ TEST_P(HttpConnPoolIntegrationTest, DrainConnectionsWithPredicate) { ASSERT_TRUE(response->waitForEndStream()); EXPECT_TRUE(upstream_request_->complete()); EXPECT_TRUE(response->complete()); + + // Now do a drain that matches. + test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 1); + test_server_->server().dispatcher().post([this] { + test_server_->server().clusterManager().drainConnections( + "cluster_0", [](const Upstream::Host&) { return true; }); + }); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0); } // Verify that the drainConnections() cluster manager API works correctly. From 8ee91538aded766e2031befca1acdf469e5581e2 Mon Sep 17 00:00:00 2001 From: Matt Klein Date: Thu, 28 Apr 2022 21:02:58 +0000 Subject: [PATCH 3/3] more tests Signed-off-by: Matt Klein --- .../upstream/cluster_manager_impl_test.cc | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 55e518107a320..cafba2fe28574 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -5176,6 +5176,52 @@ TEST_F(TcpKeepaliveTest, TcpKeepaliveWithAllOptions) { expectSetsockoptSoKeepalive(7, 4, 1); } +// Make sure the drainConnections() with a predicate can correctly exclude a host. +TEST_F(ClusterManagerImplTest, DrainConnectionsPredicate) { + const std::string yaml = R"EOF( + static_resources: + clusters: + - name: cluster_1 + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + type: STATIC + )EOF"; + + create(parseBootstrapFromV3Yaml(yaml)); + + // Set up the HostSet. + Cluster& cluster = cluster_manager_->activeClusters().begin()->second; + HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80", time_system_); + HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:81", time_system_); + + HostVector hosts{host1, host2}; + auto hosts_ptr = std::make_shared(hosts); + + // Sending non-mergeable updates. + cluster.prioritySet().updateHosts( + 0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {}, + 100); + + // Using RR LB get a pool for each host. + EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _)) + .Times(2) + .WillRepeatedly(ReturnNew>()); + Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool( + cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); + Http::ConnectionPool::MockInstance* cp2 = HttpPoolDataPeer::getPool( + cluster_manager_->getThreadLocalCluster("cluster_1") + ->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr)); + EXPECT_NE(cp1, cp2); + + EXPECT_CALL(*cp1, + drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections)); + EXPECT_CALL(*cp2, drainConnections(_)).Times(0); + cluster_manager_->drainConnections("cluster_1", [](const Upstream::Host& host) { + return host.address()->asString() == "127.0.0.1:80"; + }); +} + TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { const std::string yaml = R"EOF( static_resources: