From 9364fa557a74ec3f0e4476241d64e6a3627b837b Mon Sep 17 00:00:00 2001 From: Ryan Hamilton Date: Fri, 6 Aug 2021 17:11:13 +0000 Subject: [PATCH 1/4] cluster_manager: Make ClusterEntry a class instead of a struct Signed-off-by: Ryan Hamilton --- .../common/upstream/cluster_manager_impl.cc | 40 +++++++++++++------ source/common/upstream/cluster_manager_impl.h | 27 +++++++++---- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 2b3796b3ff6ee..8171c246afca1 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -880,7 +880,7 @@ void ClusterManagerImpl::maybePreconnect( ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry, const ClusterConnectivityState& state, std::function pick_preconnect_pool) { - auto peekahead_ratio = cluster_entry.cluster_info_->peekaheadRatio(); + auto peekahead_ratio = cluster_entry.info()->peekaheadRatio(); if (peekahead_ratio <= 1.0) { return; } @@ -1042,6 +1042,28 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient return http_async_client_; } +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHosts( + const std::string& name, + uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params, + LocalityWeightsConstSharedPtr locality_weights, + const HostVector& hosts_added, const HostVector& hosts_removed, + absl::optional overprovisioning_factor) { + ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name, + hosts_added.size(), hosts_removed.size()); + priority_set_.updateHosts(priority, std::move(update_hosts_params), + std::move(locality_weights), hosts_added, hosts_removed, + overprovisioning_factor); + // If an LB is thread aware, create a new worker local LB on membership changes. + if (lb_factory_ != nullptr) { + ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name); + lb_ = lb_factory_->create(); + } +} + +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(const HostVector& hosts_removed) { + parent_.drainConnPools(hosts_removed); +} + ClusterUpdateCallbacksHandlePtr ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) { ThreadLocalClusterManagerImpl& cluster_manager = *tls_; @@ -1096,7 +1118,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name); thread_local_clusters_[local_cluster_name] = std::make_unique( *this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_); - local_priority_set_ = &thread_local_clusters_[local_cluster_name]->priority_set_; + local_priority_set_ = &thread_local_clusters_[local_cluster_name]->prioritySet(); } } @@ -1111,7 +1133,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImp host_tcp_conn_pool_map_.clear(); ASSERT(host_tcp_conn_map_.empty()); for (auto& cluster : thread_local_clusters_) { - if (&cluster.second->priority_set_ != local_priority_set_) { + if (&cluster.second->prioritySet() != local_priority_set_) { cluster.second.reset(); } } @@ -1197,7 +1219,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts( // We need to go through and purge any connection pools for hosts that got deleted. // Even if two hosts actually point to the same address this will be safe, since if a // host is readded it will be a different physical HostSharedPtr. - cluster_entry->parent_.drainConnPools(hosts_removed); + cluster_entry->drainConnPools(hosts_removed); } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( @@ -1206,17 +1228,9 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( const HostVector& hosts_removed, uint64_t overprovisioning_factor) { ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end()); const auto& cluster_entry = thread_local_clusters_[name]; - ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name, - hosts_added.size(), hosts_removed.size()); - cluster_entry->priority_set_.updateHosts(priority, std::move(update_hosts_params), + cluster_entry->updateHosts(name, priority, std::move(update_hosts_params), std::move(locality_weights), hosts_added, hosts_removed, overprovisioning_factor); - - // If an LB is thread aware, create a new worker local LB on membership changes. - if (cluster_entry->lb_factory_ != nullptr) { - ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name); - cluster_entry->lb_ = cluster_entry->lb_factory_->create(); - } } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 11cb8c2d0b4cf..951da426c3810 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -400,18 +400,12 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable>; - struct ClusterEntry : public ThreadLocalCluster { + class ClusterEntry : public ThreadLocalCluster { + public: ClusterEntry(ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster, const LoadBalancerFactorySharedPtr& lb_factory); ~ClusterEntry() override; - Http::ConnectionPool::Instance* connPool(ResourcePriority priority, - absl::optional downstream_protocol, - LoadBalancerContext* context, bool peek); - - Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority, - LoadBalancerContext* context, bool peek); - // Upstream::ThreadLocalCluster const PrioritySet& prioritySet() override { return priority_set_; } ClusterInfoConstSharedPtr info() override { return cluster_info_; } @@ -424,6 +418,23 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable overprovisioning_factor); + + // Drains any connection pools assocaited with the removed hosts. + void drainConnPools(const HostVector& hosts_removed); + + private: + Http::ConnectionPool::Instance* connPool(ResourcePriority priority, + absl::optional downstream_protocol, + LoadBalancerContext* context, bool peek); + + Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority, + LoadBalancerContext* context, bool peek); + ThreadLocalClusterManagerImpl& parent_; PrioritySetImpl priority_set_; // LB factory if applicable. Not all load balancer types have a factory. LB types that have From 6dd02f536bd4a4c73a4f510b48dcd9a7e217c5e3 Mon Sep 17 00:00:00 2001 From: Ryan Hamilton Date: Fri, 6 Aug 2021 17:15:03 +0000 Subject: [PATCH 2/4] Rename connPool to httpConnPool for parallelism with tcpConnPool Signed-off-by: Ryan Hamilton --- source/common/upstream/cluster_manager_impl.cc | 6 +++--- source/common/upstream/cluster_manager_impl.h | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 8171c246afca1..c7f52516ed2ca 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -913,7 +913,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( ResourcePriority priority, absl::optional protocol, LoadBalancerContext* context) { // Select a host and create a connection pool for it if it does not already exist. - auto pool = connPool(priority, protocol, context, false); + auto pool = httpConnPool(priority, protocol, context, false); if (pool == nullptr) { return absl::nullopt; } @@ -923,7 +923,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( // Now that a new stream is being established, attempt to preconnect. maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &protocol, &context]() { - return connPool(priority, protocol, context, true); + return httpConnPool(priority, protocol, context, true); }); }, pool); @@ -1388,7 +1388,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() } Http::ConnectionPool::Instance* -ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( +ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( ResourcePriority priority, absl::optional downstream_protocol, LoadBalancerContext* context, bool peek) { HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 951da426c3810..d282e85402414 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -428,9 +428,9 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable downstream_protocol, - LoadBalancerContext* context, bool peek); + Http::ConnectionPool::Instance* httpConnPool(ResourcePriority priority, + absl::optional downstream_protocol, + LoadBalancerContext* context, bool peek); Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority, LoadBalancerContext* context, bool peek); From c3ab29842088858a9d85e848b1410c8ada834ef6 Mon Sep 17 00:00:00 2001 From: Ryan Hamilton Date: Fri, 6 Aug 2021 17:16:09 +0000 Subject: [PATCH 3/4] Add Impl suffix to tcpConnPool() and httpConnPool(). Signed-off-by: Ryan Hamilton --- source/common/upstream/cluster_manager_impl.cc | 12 ++++++------ source/common/upstream/cluster_manager_impl.h | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index c7f52516ed2ca..05634b2db08f8 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -913,7 +913,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( ResourcePriority priority, absl::optional protocol, LoadBalancerContext* context) { // Select a host and create a connection pool for it if it does not already exist. - auto pool = httpConnPool(priority, protocol, context, false); + auto pool = httpConnPoolImpl(priority, protocol, context, false); if (pool == nullptr) { return absl::nullopt; } @@ -923,7 +923,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( // Now that a new stream is being established, attempt to preconnect. maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &protocol, &context]() { - return httpConnPool(priority, protocol, context, true); + return httpConnPoolImpl(priority, protocol, context, true); }); }, pool); @@ -934,7 +934,7 @@ absl::optional ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( ResourcePriority priority, LoadBalancerContext* context) { // Select a host and create a connection pool for it if it does not already exist. - auto pool = tcpConnPool(priority, context, false); + auto pool = tcpConnPoolImpl(priority, context, false); if (pool == nullptr) { return absl::nullopt; } @@ -942,7 +942,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( TcpPoolData data( [this, priority, context]() -> void { maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() { - return tcpConnPool(priority, context, true); + return tcpConnPoolImpl(priority, context, true); }); }, pool); @@ -1388,7 +1388,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() } Http::ConnectionPool::Instance* -ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( +ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl( ResourcePriority priority, absl::optional downstream_protocol, LoadBalancerContext* context, bool peek) { HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); @@ -1500,7 +1500,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::httpConnPoolIsIdle( } Tcp::ConnectionPool::Instance* -ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( +ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolImpl( ResourcePriority priority, LoadBalancerContext* context, bool peek) { HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); if (!host) { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index d282e85402414..150347493f0e3 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -428,11 +428,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable downstream_protocol, LoadBalancerContext* context, bool peek); - Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority, + Tcp::ConnectionPool::Instance* tcpConnPoolImpl(ResourcePriority priority, LoadBalancerContext* context, bool peek); ThreadLocalClusterManagerImpl& parent_; From acf16e2f2c130c5b6ac9733db1bee0d87d2b4463 Mon Sep 17 00:00:00 2001 From: Ryan Hamilton Date: Fri, 6 Aug 2021 17:20:11 +0000 Subject: [PATCH 4/4] Format Signed-off-by: Ryan Hamilton --- .../common/upstream/cluster_manager_impl.cc | 21 +++++++++---------- source/common/upstream/cluster_manager_impl.h | 16 +++++++------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 05634b2db08f8..21461da3bbdb8 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -1043,16 +1043,14 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHosts( - const std::string& name, - uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params, - LocalityWeightsConstSharedPtr locality_weights, - const HostVector& hosts_added, const HostVector& hosts_removed, - absl::optional overprovisioning_factor) { + const std::string& name, uint32_t priority, + PrioritySet::UpdateHostsParams&& update_hosts_params, + LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, + const HostVector& hosts_removed, absl::optional overprovisioning_factor) { ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name, hosts_added.size(), hosts_removed.size()); - priority_set_.updateHosts(priority, std::move(update_hosts_params), - std::move(locality_weights), hosts_added, hosts_removed, - overprovisioning_factor); + priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights), + hosts_added, hosts_removed, overprovisioning_factor); // If an LB is thread aware, create a new worker local LB on membership changes. if (lb_factory_ != nullptr) { ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name); @@ -1060,7 +1058,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHost } } -void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(const HostVector& hosts_removed) { +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools( + const HostVector& hosts_removed) { parent_.drainConnPools(hosts_removed); } @@ -1229,8 +1228,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end()); const auto& cluster_entry = thread_local_clusters_[name]; cluster_entry->updateHosts(name, priority, std::move(update_hosts_params), - std::move(locality_weights), hosts_added, hosts_removed, - overprovisioning_factor); + std::move(locality_weights), hosts_added, hosts_removed, + overprovisioning_factor); } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 150347493f0e3..d606e68b7ed12 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -401,7 +401,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable>; class ClusterEntry : public ThreadLocalCluster { - public: + public: ClusterEntry(ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster, const LoadBalancerFactorySharedPtr& lb_factory); ~ClusterEntry() override; @@ -419,21 +419,23 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable overprovisioning_factor); - // Drains any connection pools assocaited with the removed hosts. + // Drains any connection pools associated with the removed hosts. void drainConnPools(const HostVector& hosts_removed); private: - Http::ConnectionPool::Instance* httpConnPoolImpl(ResourcePriority priority, - absl::optional downstream_protocol, - LoadBalancerContext* context, bool peek); + Http::ConnectionPool::Instance* + httpConnPoolImpl(ResourcePriority priority, + absl::optional downstream_protocol, + LoadBalancerContext* context, bool peek); Tcp::ConnectionPool::Instance* tcpConnPoolImpl(ResourcePriority priority, - LoadBalancerContext* context, bool peek); + LoadBalancerContext* context, bool peek); ThreadLocalClusterManagerImpl& parent_; PrioritySetImpl priority_set_;