Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 58 additions & 42 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ void ClusterManagerImpl::drainConnections(const std::string& cluster) {
tls_.runOnAllThreads([cluster](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster);
if (cluster_entry != cluster_manager->thread_local_clusters_.end()) {
cluster_entry->second->drainConnPools();
cluster_entry->second->drainAllConnPools();
}
});
}
Expand All @@ -966,7 +966,7 @@ void ClusterManagerImpl::drainConnections() {

tls_.runOnAllThreads([](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) {
cluster_entry.second->drainConnPools();
cluster_entry.second->drainAllConnPools();
}
});
}
Expand Down Expand Up @@ -1262,49 +1262,13 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
const HostSharedPtr& host) {

// Drain all HTTP connection pool connections in the case of a host health failure. If outlier/
// health is due to `ECMP` flow hashing issues for example, a new set of connections might do
// better.
// Drain all HTTP and TCP connection pool connections in the case of a host health failure. If
// outlier/ health is due to `ECMP` flow hashing issues for example, a new set of connections
// might do better.
// TODO(mattklein123): This function is currently very specific, but in the future when we do
// more granular host set changes, we should be able to capture single host changes and make them
// more targeted.
{
const auto container = getHttpConnPoolsContainer(host);
if (container != nullptr) {
container->do_not_delete_ = true;
container->pools_->drainConnections();
container->do_not_delete_ = false;

if (container->pools_->size() == 0) {
host_http_conn_pool_map_.erase(host);
}
}
}
{
// Drain or close any TCP connection pool for the host. Draining a TCP pool doesn't lead to
// connections being closed, it only prevents new connections through the pool. The
// CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE can be used to make the pool close any
// active connections.
const auto& container = host_tcp_conn_pool_map_.find(host);
if (container != host_tcp_conn_pool_map_.end()) {
// Draining pools or closing connections can cause pool deletion if it becomes
// idle. Copy `pools_` so that we aren't iterating through a container that
// gets mutated by callbacks deleting from it.
std::vector<Tcp::ConnectionPool::Instance*> pools;
for (const auto& pair : container->second.pools_) {
pools.push_back(pair.second.get());
}

for (auto* pool : pools) {
if (host->cluster().features() &
ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
pool->closeConnections();
} else {
pool->drainConnections();
}
}
}
}
drainAllConnPoolsWorker(host);

if (host->cluster().features() &
ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
Expand Down Expand Up @@ -1408,6 +1372,58 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnP
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainAllConnPools() {
for (auto& host_set : priority_set_.hostSetsPerPriority()) {
for (const HostSharedPtr& host : host_set->hosts()) {
parent_.drainAllConnPoolsWorker(host);
}
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainAllConnPoolsWorker(
const HostSharedPtr& host) {
// Drain or close any HTTP connection pool for the host. Draining an HTTP pool only leads to
// idle connections being closed. Non-idle connections are marked as draining and prevents new
// streams to go through them, causing new connections to be opened.
{
const auto container = getHttpConnPoolsContainer(host);
if (container != nullptr) {
container->do_not_delete_ = true;
container->pools_->drainConnections();
container->do_not_delete_ = false;

if (container->pools_->size() == 0) {
host_http_conn_pool_map_.erase(host);
}
}
}
{
// Drain or close any TCP connection pool for the host. Draining a TCP pool doesn't lead to
// connections being closed, it only prevents new connections through the pool. The
// CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE can be used to make the pool close any
// active connections.
const auto& container = host_tcp_conn_pool_map_.find(host);
if (container != host_tcp_conn_pool_map_.end()) {
// Draining pools or closing connections can cause pool deletion if it becomes
// idle. Copy `pools_` so that we aren't iterating through a container that
// gets mutated by callbacks deleting from it.
std::vector<Tcp::ConnectionPool::Instance*> pools;
for (const auto& pair : container->second.pools_) {
pools.push_back(pair.second.get());
}

for (auto* pool : pools) {
if (host->cluster().features() &
ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
pool->closeConnections();
} else {
pool->drainConnections();
}
}
}
}
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() {
// We need to drain all connection pools for the cluster being removed. Then we can remove the
// cluster.
Expand Down
10 changes: 9 additions & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

// Drains any connection pools associated with the removed hosts.
void drainConnPools(const HostVector& hosts_removed);
// Drains connection pools for all hosts.
// Drains idle clients in connection pools for all hosts.
void drainConnPools();
// Drain all clients in connection pools for all hosts.
void drainAllConnPools();

private:
Http::ConnectionPool::Instance*
Expand Down Expand Up @@ -465,9 +467,15 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
const absl::optional<LocalClusterParams>& local_cluster_params);
~ThreadLocalClusterManagerImpl() override;
// TODO(junr03): clean up drainConnPools vs drainAllConnPools once ConnPoolImplBase::startDrain
// and
// ConnPoolImplBase::drainConnections() get cleaned up. The code in onHostHealthFailure and the
// code in ThreadLocalClusterManagerImpl::drainConnPools(const HostVector& hosts) is very
// similar and can be merged in a similar fashion to the ConnPoolImplBase case.
void drainConnPools(const HostVector& hosts);
void drainConnPools(HostSharedPtr old_host, ConnPoolsContainer& container);
void drainTcpConnPools(TcpConnPoolsContainer& container);
void drainAllConnPoolsWorker(const HostSharedPtr& host);
void httpConnPoolIsIdle(HostConstSharedPtr host, ResourcePriority priority,
const std::vector<uint8_t>& hash_key);
void tcpConnPoolIsIdle(HostConstSharedPtr host, const std::vector<uint8_t>& hash_key);
Expand Down