Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ void ClusterManagerImpl::maybePreconnect(
ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
const ClusterConnectivityState& state,
std::function<ConnectionPool::Instance*()> pick_preconnect_pool) {
auto peekahead_ratio = cluster_entry.cluster_info_->peekaheadRatio();
auto peekahead_ratio = cluster_entry.info()->peekaheadRatio();
if (peekahead_ratio <= 1.0) {
return;
}
Expand Down Expand Up @@ -913,7 +913,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
ResourcePriority priority, absl::optional<Http::Protocol> protocol,
LoadBalancerContext* context) {
// Select a host and create a connection pool for it if it does not already exist.
auto pool = connPool(priority, protocol, context, false);
auto pool = httpConnPoolImpl(priority, protocol, context, false);
if (pool == nullptr) {
return absl::nullopt;
}
Expand All @@ -923,7 +923,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
// Now that a new stream is being established, attempt to preconnect.
maybePreconnect(*this, parent_.cluster_manager_state_,
[this, &priority, &protocol, &context]() {
return connPool(priority, protocol, context, true);
return httpConnPoolImpl(priority, protocol, context, true);
});
},
pool);
Expand All @@ -934,15 +934,15 @@ absl::optional<TcpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
ResourcePriority priority, LoadBalancerContext* context) {
// Select a host and create a connection pool for it if it does not already exist.
auto pool = tcpConnPool(priority, context, false);
auto pool = tcpConnPoolImpl(priority, context, false);
if (pool == nullptr) {
return absl::nullopt;
}

TcpPoolData data(
[this, priority, context]() -> void {
maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() {
return tcpConnPool(priority, context, true);
return tcpConnPoolImpl(priority, context, true);
});
},
pool);
Expand Down Expand Up @@ -1042,6 +1042,27 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient
return http_async_client_;
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHosts(
const std::string& name, uint32_t priority,
PrioritySet::UpdateHostsParams&& update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
const HostVector& hosts_removed, absl::optional<uint32_t> overprovisioning_factor) {
ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name,
hosts_added.size(), hosts_removed.size());
priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights),
hosts_added, hosts_removed, overprovisioning_factor);
// If an LB is thread aware, create a new worker local LB on membership changes.
if (lb_factory_ != nullptr) {
ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
lb_ = lb_factory_->create();
}
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
const HostVector& hosts_removed) {
parent_.drainConnPools(hosts_removed);
}

ClusterUpdateCallbacksHandlePtr
ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) {
ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
Expand Down Expand Up @@ -1096,7 +1117,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl
ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name);
thread_local_clusters_[local_cluster_name] = std::make_unique<ClusterEntry>(
*this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_);
local_priority_set_ = &thread_local_clusters_[local_cluster_name]->priority_set_;
local_priority_set_ = &thread_local_clusters_[local_cluster_name]->prioritySet();
}
}

Expand All @@ -1111,7 +1132,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImp
host_tcp_conn_pool_map_.clear();
ASSERT(host_tcp_conn_map_.empty());
for (auto& cluster : thread_local_clusters_) {
if (&cluster.second->priority_set_ != local_priority_set_) {
if (&cluster.second->prioritySet() != local_priority_set_) {
cluster.second.reset();
}
}
Expand Down Expand Up @@ -1197,7 +1218,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(
// We need to go through and purge any connection pools for hosts that got deleted.
// Even if two hosts actually point to the same address this will be safe, since if a
// host is readded it will be a different physical HostSharedPtr.
cluster_entry->parent_.drainConnPools(hosts_removed);
cluster_entry->drainConnPools(hosts_removed);
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
Expand All @@ -1206,17 +1227,9 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
const HostVector& hosts_removed, uint64_t overprovisioning_factor) {
ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end());
const auto& cluster_entry = thread_local_clusters_[name];
ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name,
hosts_added.size(), hosts_removed.size());
cluster_entry->priority_set_.updateHosts(priority, std::move(update_hosts_params),
std::move(locality_weights), hosts_added, hosts_removed,
overprovisioning_factor);

// If an LB is thread aware, create a new worker local LB on membership changes.
if (cluster_entry->lb_factory_ != nullptr) {
ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
cluster_entry->lb_ = cluster_entry->lb_factory_->create();
}
cluster_entry->updateHosts(name, priority, std::move(update_hosts_params),
std::move(locality_weights), hosts_added, hosts_removed,
overprovisioning_factor);
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
Expand Down Expand Up @@ -1374,7 +1387,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry()
}

Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool(
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl(
ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context, bool peek) {
HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context));
Expand Down Expand Up @@ -1486,7 +1499,7 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::httpConnPoolIsIdle(
}

Tcp::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolImpl(
ResourcePriority priority, LoadBalancerContext* context, bool peek) {
HostConstSharedPtr host = (peek ? lb_->peekAnotherHost(context) : lb_->chooseHost(context));
if (!host) {
Expand Down
29 changes: 21 additions & 8 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,18 +400,12 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
using TcpConnectionsMap =
absl::node_hash_map<Network::ClientConnection*, std::unique_ptr<TcpConnContainer>>;

struct ClusterEntry : public ThreadLocalCluster {
class ClusterEntry : public ThreadLocalCluster {
public:
ClusterEntry(ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
const LoadBalancerFactorySharedPtr& lb_factory);
~ClusterEntry() override;

Http::ConnectionPool::Instance* connPool(ResourcePriority priority,
absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context, bool peek);

Tcp::ConnectionPool::Instance* tcpConnPool(ResourcePriority priority,
LoadBalancerContext* context, bool peek);

// Upstream::ThreadLocalCluster
const PrioritySet& prioritySet() override { return priority_set_; }
ClusterInfoConstSharedPtr info() override { return cluster_info_; }
Expand All @@ -424,6 +418,25 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
Host::CreateConnectionData tcpConn(LoadBalancerContext* context) override;
Http::AsyncClient& httpAsyncClient() override;

// Updates the hosts in the priority set.
void updateHosts(const std::string& name, uint32_t priority,
PrioritySet::UpdateHostsParams&& update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights,
const HostVector& hosts_added, const HostVector& hosts_removed,
absl::optional<uint32_t> overprovisioning_factor);

// Drains any connection pools associated with the removed hosts.
void drainConnPools(const HostVector& hosts_removed);

private:
Http::ConnectionPool::Instance*
httpConnPoolImpl(ResourcePriority priority,
absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context, bool peek);

Tcp::ConnectionPool::Instance* tcpConnPoolImpl(ResourcePriority priority,
LoadBalancerContext* context, bool peek);

ThreadLocalClusterManagerImpl& parent_;
PrioritySetImpl priority_set_;
// LB factory if applicable. Not all load balancer types have a factory. LB types that have
Expand Down