diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 2b3796b3ff6ee..21461da3bbdb8 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -880,7 +880,7 @@ void ClusterManagerImpl::maybePreconnect( ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry, const ClusterConnectivityState& state, std::function pick_preconnect_pool) { - auto peekahead_ratio = cluster_entry.cluster_info_->peekaheadRatio(); + auto peekahead_ratio = cluster_entry.info()->peekaheadRatio(); if (peekahead_ratio <= 1.0) { return; } @@ -913,7 +913,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( ResourcePriority priority, absl::optional protocol, LoadBalancerContext* context) { // Select a host and create a connection pool for it if it does not already exist. - auto pool = connPool(priority, protocol, context, false); + auto pool = httpConnPoolImpl(priority, protocol, context, false); if (pool == nullptr) { return absl::nullopt; } @@ -923,7 +923,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool( // Now that a new stream is being established, attempt to preconnect. maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &protocol, &context]() { - return connPool(priority, protocol, context, true); + return httpConnPoolImpl(priority, protocol, context, true); }); }, pool); @@ -934,7 +934,7 @@ absl::optional ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( ResourcePriority priority, LoadBalancerContext* context) { // Select a host and create a connection pool for it if it does not already exist. - auto pool = tcpConnPool(priority, context, false); + auto pool = tcpConnPoolImpl(priority, context, false); if (pool == nullptr) { return absl::nullopt; } @@ -942,7 +942,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( TcpPoolData data( [this, priority, context]() -> void { maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() { - return tcpConnPool(priority, context, true); + return tcpConnPoolImpl(priority, context, true); }); }, pool); @@ -1042,6 +1042,27 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient return http_async_client_; } +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHosts( + const std::string& name, uint32_t priority, + PrioritySet::UpdateHostsParams&& update_hosts_params, + LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, + const HostVector& hosts_removed, absl::optional overprovisioning_factor) { + ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name, + hosts_added.size(), hosts_removed.size()); + priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights), + hosts_added, hosts_removed, overprovisioning_factor); + // If an LB is thread aware, create a new worker local LB on membership changes. + if (lb_factory_ != nullptr) { + ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name); + lb_ = lb_factory_->create(); + } +} + +void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools( + const HostVector& hosts_removed) { + parent_.drainConnPools(hosts_removed); +} + ClusterUpdateCallbacksHandlePtr ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) { ThreadLocalClusterManagerImpl& cluster_manager = *tls_; @@ -1096,7 +1117,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name); thread_local_clusters_[local_cluster_name] = std::make_unique( *this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_); - local_priority_set_ = &thread_local_clusters_[local_cluster_name]->priority_set_; + local_priority_set_ = &thread_local_clusters_[local_cluster_name]->prioritySet(); } } @@ -1111,7 +1132,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImp host_tcp_conn_pool_map_.clear(); ASSERT(host_tcp_conn_map_.empty()); for (auto& cluster : thread_local_clusters_) { - if (&cluster.second->priority_set_ != local_priority_set_) { + if (&cluster.second->prioritySet() != local_priority_set_) { cluster.second.reset(); } } @@ -1197,7 +1218,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts( // We need to go through and purge any connection pools for hosts that got deleted. // Even if two hosts actually point to the same address this will be safe, since if a // host is readded it will be a different physical HostSharedPtr. - cluster_entry->parent_.drainConnPools(hosts_removed); + cluster_entry->drainConnPools(hosts_removed); } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( @@ -1206,17 +1227,9 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership( const HostVector& hosts_removed, uint64_t overprovisioning_factor) { ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end()); const auto& cluster_entry = thread_local_clusters_[name]; - ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name, - hosts_added.size(), hosts_removed.size()); - cluster_entry->priority_set_.updateHosts(priority, std::move(update_hosts_params), - std::move(locality_weights), hosts_added, hosts_removed, - overprovisioning_factor); - - // If an LB is thread aware, create a new worker local LB on membership changes. - if (cluster_entry->lb_factory_ != nullptr) { - ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name); - cluster_entry->lb_ = cluster_entry->lb_factory_->create(); - } + cluster_entry->updateHosts(name, priority, std::move(update_hosts_params), + std::move(locality_weights), hosts_added, hosts_removed, + overprovisioning_factor); } void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure( @@ -1374,7 +1387,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() } Http::ConnectionPool::Instance* -ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( +ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl( ResourcePriority priority, absl::optional downstream_protocol, LoadBalancerContext* context, bool peek) { HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); @@ -1486,7 +1499,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::httpConnPoolIsIdle( } Tcp::ConnectionPool::Instance* -ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool( +ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolImpl( ResourcePriority priority, LoadBalancerContext* context, bool peek) { HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context)); if (!host) { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 11cb8c2d0b4cf..d606e68b7ed12 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -400,18 +400,12 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable>; - struct ClusterEntry : public ThreadLocalCluster { + class ClusterEntry : public ThreadLocalCluster { + public: ClusterEntry(ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster, const LoadBalancerFactorySharedPtr& lb_factory); ~ClusterEntry() override; - Http::ConnectionPool::Instance* connPool(ResourcePriority priority, - absl::optional downstream_protocol, - LoadBalancerContext* context, bool peek); - - Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority, - LoadBalancerContext* context, bool peek); - // Upstream::ThreadLocalCluster const PrioritySet& prioritySet() override { return priority_set_; } ClusterInfoConstSharedPtr info() override { return cluster_info_; } @@ -424,6 +418,25 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable overprovisioning_factor); + + // Drains any connection pools associated with the removed hosts. + void drainConnPools(const HostVector& hosts_removed); + + private: + Http::ConnectionPool::Instance* + httpConnPoolImpl(ResourcePriority priority, + absl::optional downstream_protocol, + LoadBalancerContext* context, bool peek); + + Tcp::ConnectionPool::Instance* tcpConnPoolImpl(ResourcePriority priority, + LoadBalancerContext* context, bool peek); + ThreadLocalClusterManagerImpl& parent_; PrioritySetImpl priority_set_; // LB factory if applicable. Not all load balancer types have a factory. LB types that have