Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ class ClusterManager {
virtual const ClusterRequestResponseSizeStatNames&
clusterRequestResponseSizeStatNames() const PURE;
virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE;

/**
* Drain all connection pool connections owned by this cluster.
*/
virtual void drainConnections(const std::string& cluster) PURE;
};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand Down
27 changes: 20 additions & 7 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) {
if (cluster.info()->lbConfig().close_connections_on_host_set_change()) {
for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
// This will drain all tcp and http connection pools.
postThreadLocalDrainConnections(cluster, host_set->hosts());
postThreadLocalRemoveHosts(cluster, host_set->hosts());
}
} else {
// TODO(snowp): Should this be subject to merge windows?
Expand All @@ -495,7 +495,7 @@ void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) {
// enabled, this case will be covered by first `if` statement, where all
// connection pools are drained.
if (!hosts_removed.empty()) {
postThreadLocalDrainConnections(cluster, hosts_removed);
postThreadLocalRemoveHosts(cluster, hosts_removed);
}
}
});
Expand Down Expand Up @@ -956,8 +956,17 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
return data;
}

void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed) {
void ClusterManagerImpl::drainConnections(const std::string& cluster) {
tls_.runOnAllThreads([cluster](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster);
if (cluster_entry != cluster_manager->thread_local_clusters_.end()) {
cluster_entry->second->drainConnPools();
}
});
}

void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster,
const HostVector& hosts_removed) {
tls_.runOnAllThreads([name = cluster.info()->name(),
hosts_removed](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
cluster_manager->removeHosts(name, hosts_removed);
Expand Down Expand Up @@ -1387,16 +1396,20 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools() {
for (auto& host_set : priority_set_.hostSetsPerPriority()) {
parent_.drainConnPools(host_set->hosts());
}
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() {
// We need to drain all connection pools for the cluster being removed. Then we can remove the
// cluster.
//
// TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of
// the hosts inside of the HostImpl destructor. That is a change with wide implications, so we are
// going with a more targeted approach for now.
for (auto& host_set : priority_set_.hostSetsPerPriority()) {
parent_.drainConnPools(host_set->hosts());
}
drainConnPools();
}

Http::ConnectionPool::Instance*
Expand Down
7 changes: 5 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
return cluster_timeout_budget_stat_names_;
}

void drainConnections(const std::string& cluster) override;

protected:
virtual void postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed);
virtual void postThreadLocalRemoveHosts(const Cluster& cluster, const HostVector& hosts_removed);

// Parameters for calling postThreadLocalClusterUpdate()
struct ThreadLocalClusterUpdateParams {
Expand Down Expand Up @@ -428,6 +429,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

// Drains any connection pools associated with the removed hosts.
void drainConnPools(const HostVector& hosts_removed);
// Drains connection pools for all hosts.
void drainConnPools();

private:
Http::ConnectionPool::Instance*
Expand Down
2 changes: 1 addition & 1 deletion test/common/upstream/test_cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class MockedUpdatedClusterManagerImpl : public TestClusterManagerImpl {
}
}

void postThreadLocalDrainConnections(const Cluster&, const HostVector& hosts_removed) override {
void postThreadLocalRemoveHosts(const Cluster&, const HostVector& hosts_removed) override {
local_hosts_removed_.post(hosts_removed);
}

Expand Down
28 changes: 28 additions & 0 deletions test/integration/http_conn_pool_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,33 @@ TEST_P(HttpConnPoolIntegrationTest, PoolCleanupAfterRemoteClose) {
test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0);
}

// Verify that the drainConnections() cluster manager API works correctly.
TEST_P(HttpConnPoolIntegrationTest, PoolDrainAfterDrainApi) {
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
auto response = codec_client_->makeRequestWithBody(default_request_headers_, 1024);
waitForNextUpstreamRequest();

// Validate that the circuit breaker config is setup as we expect.
test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 1);

upstream_request_->encodeHeaders(default_response_headers_, false);
upstream_request_->encodeData(512, true);
ASSERT_TRUE(response->waitForEndStream());

EXPECT_TRUE(upstream_request_->complete());
EXPECT_TRUE(response->complete());

// Drain connection pools via API. Need to post this to the server thread.
test_server_->server().dispatcher().post(
[this] { test_server_->server().clusterManager().drainConnections("cluster_0"); });

ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());

// Validate that the pool is deleted when it becomes idle.
test_server_->waitForGaugeEq("cluster.cluster_0.circuit_breakers.default.cx_pool_open", 0);
}

} // namespace
} // namespace Envoy
1 change: 1 addition & 0 deletions test/mocks/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class MockClusterManager : public ClusterManager {
const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override {
return cluster_timeout_budget_stat_names_;
}
MOCK_METHOD(void, drainConnections, (const std::string& cluster));

NiceMock<MockThreadLocalCluster> thread_local_cluster_;
envoy::config::core::v3::BindConfig bind_config_;
Expand Down