Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,16 +385,31 @@ class ClusterManager {
clusterRequestResponseSizeStatNames() const PURE;
virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE;

/**
* Predicate function used in drainConnections().
* @param host supplies the host that is about to be drained.
* @return true if the host should be drained, and false otherwise.
*
* IMPORTANT: This predicate must be completely self contained and thread safe. It will be posted
* to all worker threads and run concurrently.
*/
using DrainConnectionsHostPredicate = std::function<bool(const Host&)>;

/**
* Drain all connection pool connections owned by this cluster.
* @param cluster, the cluster to drain.
* @param predicate supplies the optional drain connections host predicate. If not supplied, all
* hosts are drained.
*/
virtual void drainConnections(const std::string& cluster) PURE;
virtual void drainConnections(const std::string& cluster,
DrainConnectionsHostPredicate predicate) PURE;

/**
* Drain all connection pool connections owned by all clusters in the cluster manager.
* @param predicate supplies the optional drain connections host predicate. If not supplied, all
* hosts are drained.
*/
virtual void drainConnections() PURE;
virtual void drainConnections(DrainConnectionsHostPredicate predicate) PURE;

/**
* Check if the cluster is active and statically configured, and if not, throw exception.
Expand Down
20 changes: 13 additions & 7 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -989,25 +989,26 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
return data;
}

void ClusterManagerImpl::drainConnections(const std::string& cluster) {
void ClusterManagerImpl::drainConnections(const std::string& cluster,
DrainConnectionsHostPredicate predicate) {
ENVOY_LOG_EVENT(debug, "drain_connections_call", "drainConnections called for cluster {}",
cluster);

tls_.runOnAllThreads([cluster](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
tls_.runOnAllThreads([cluster, predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster);
if (cluster_entry != cluster_manager->thread_local_clusters_.end()) {
cluster_entry->second->drainAllConnPools();
cluster_entry->second->drainAllConnPools(predicate);
}
});
}

void ClusterManagerImpl::drainConnections() {
void ClusterManagerImpl::drainConnections(DrainConnectionsHostPredicate predicate) {
ENVOY_LOG_EVENT(debug, "drain_connections_call_for_all_clusters",
"drainConnections called for all clusters");

tls_.runOnAllThreads([](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
tls_.runOnAllThreads([predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) {
cluster_entry.second->drainAllConnPools();
cluster_entry.second->drainAllConnPools(predicate);
}
});
}
Expand Down Expand Up @@ -1558,9 +1559,14 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnP
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainAllConnPools() {
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainAllConnPools(
DrainConnectionsHostPredicate predicate) {
for (auto& host_set : priority_set_.hostSetsPerPriority()) {
for (const HostSharedPtr& host : host_set->hosts()) {
if (predicate != nullptr && !predicate(*host)) {
continue;
}

parent_.drainAllConnPoolsWorker(host);
}
}
Expand Down
7 changes: 4 additions & 3 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,10 @@ class ClusterManagerImpl : public ClusterManager,
return cluster_timeout_budget_stat_names_;
}

void drainConnections(const std::string& cluster) override;
void drainConnections(const std::string& cluster,
DrainConnectionsHostPredicate predicate) override;

void drainConnections() override;
void drainConnections(DrainConnectionsHostPredicate predicate) override;

void checkActiveStaticCluster(const std::string& cluster) override;

Expand Down Expand Up @@ -512,7 +513,7 @@ class ClusterManagerImpl : public ClusterManager,
// Drains idle clients in connection pools for all hosts.
void drainConnPools();
// Drain all clients in connection pools for all hosts.
void drainAllConnPools();
void drainAllConnPools(DrainConnectionsHostPredicate predicate);

private:
Http::ConnectionPool::Instance*
Expand Down
46 changes: 46 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5176,6 +5176,52 @@ TEST_F(TcpKeepaliveTest, TcpKeepaliveWithAllOptions) {
expectSetsockoptSoKeepalive(7, 4, 1);
}

// Make sure the drainConnections() with a predicate can correctly exclude a host.
TEST_F(ClusterManagerImplTest, DrainConnectionsPredicate) {
const std::string yaml = R"EOF(
static_resources:
clusters:
- name: cluster_1
connect_timeout: 0.250s
lb_policy: ROUND_ROBIN
type: STATIC
)EOF";

create(parseBootstrapFromV3Yaml(yaml));

// Set up the HostSet.
Cluster& cluster = cluster_manager_->activeClusters().begin()->second;
HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80", time_system_);
HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:81", time_system_);

HostVector hosts{host1, host2};
auto hosts_ptr = std::make_shared<HostVector>(hosts);

// Sending non-mergeable updates.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {},
100);

// Using RR LB get a pool for each host.
EXPECT_CALL(factory_, allocateConnPool_(_, _, _, _, _))
.Times(2)
.WillRepeatedly(ReturnNew<NiceMock<Http::ConnectionPool::MockInstance>>());
Http::ConnectionPool::MockInstance* cp1 = HttpPoolDataPeer::getPool(
cluster_manager_->getThreadLocalCluster("cluster_1")
->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr));
Http::ConnectionPool::MockInstance* cp2 = HttpPoolDataPeer::getPool(
cluster_manager_->getThreadLocalCluster("cluster_1")
->httpConnPool(ResourcePriority::Default, Http::Protocol::Http11, nullptr));
EXPECT_NE(cp1, cp2);

EXPECT_CALL(*cp1,
drainConnections(Envoy::ConnectionPool::DrainBehavior::DrainExistingConnections));
EXPECT_CALL(*cp2, drainConnections(_)).Times(0);
cluster_manager_->drainConnections("cluster_1", [](const Upstream::Host& host) {
return host.address()->asString() == "127.0.0.1:80";
});
}

TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) {
const std::string yaml = R"EOF(
static_resources:
Expand Down
42 changes: 40 additions & 2 deletions test/integration/http_conn_pool_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,52 @@ TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiSpecificCluster) {

// Drain connection pools via API. Need to post this to the server thread.
test_server_->server().dispatcher().post(
[this] { test_server_->server().clusterManager().drainConnections("cluster_0"); });
[this] { test_server_->server().clusterManager().drainConnections("cluster_0", nullptr); });

ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());

// Validate that the pool is deleted when it becomes idle.
test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0);
}

// Verify the drainConnections() with a predicate is able to filter host drains.
TEST_P(HttpConnPoolIntegrationTest, DrainConnectionsWithPredicate) {
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
auto response = codec_client_->makeRequestWithBody(default_request_headers_, 1024);
waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(default_response_headers_, false);
upstream_request_->encodeData(512, true);
ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response->complete());

// Perform a drain request which doesn't actually do a drain.
test_server_->server().dispatcher().post([this] {
test_server_->server().clusterManager().drainConnections(
"cluster_0", [](const Upstream::Host&) { return false; });
});

// The existing upstream connection should continue to work.
response = codec_client_->makeRequestWithBody(default_request_headers_, 1024);
waitForNextUpstreamRequest();
upstream_request_->encodeHeaders(default_response_headers_, false);
upstream_request_->encodeData(512, true);
ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response->complete());

// Now do a drain that matches.
test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 1);
test_server_->server().dispatcher().post([this] {
test_server_->server().clusterManager().drainConnections(
"cluster_0", [](const Upstream::Host&) { return true; });
});
ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());
test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0);
}

// Verify that the drainConnections() cluster manager API works correctly.
TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiAllClusters) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
Expand Down Expand Up @@ -168,7 +206,7 @@ TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApiAllClusters) {

// Drain connection pools via API. Need to post this to the server thread.
test_server_->server().dispatcher().post(
[this] { test_server_->server().clusterManager().drainConnections(); });
[this] { test_server_->server().clusterManager().drainConnections(nullptr); });

ASSERT_TRUE(first_connection->waitForDisconnect());
ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());
Expand Down
5 changes: 3 additions & 2 deletions test/mocks/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ class MockClusterManager : public ClusterManager {
const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override {
return cluster_timeout_budget_stat_names_;
}
MOCK_METHOD(void, drainConnections, (const std::string& cluster));
MOCK_METHOD(void, drainConnections, ());
MOCK_METHOD(void, drainConnections,
(const std::string& cluster, DrainConnectionsHostPredicate predicate));
MOCK_METHOD(void, drainConnections, (DrainConnectionsHostPredicate predicate));
MOCK_METHOD(void, checkActiveStaticCluster, (const std::string& cluster));
MOCK_METHOD(OdCdsApiHandlePtr, allocateOdCdsApi,
(const envoy::config::core::v3::ConfigSource& odcds_config,
Expand Down