Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ using HealthyHostVector = Phantom<HostVector, Healthy>;
using DegradedHostVector = Phantom<HostVector, Degraded>;
using ExcludedHostVector = Phantom<HostVector, Excluded>;
using HostMap = absl::flat_hash_map<std::string, Upstream::HostSharedPtr>;
using HostMapSharedPtr = std::shared_ptr<HostMap>;
using HostMapConstSharedPtr = std::shared_ptr<const HostMap>;
using HostVectorSharedPtr = std::shared_ptr<HostVector>;
using HostVectorConstSharedPtr = std::shared_ptr<const HostVector>;

Expand Down Expand Up @@ -424,6 +426,12 @@ class PrioritySet {
*/
virtual const std::vector<HostSetPtr>& hostSetsPerPriority() const PURE;

/**
* @return HostMapConstSharedPtr read only cross priority host map that indexed by host address
* string.
*/
virtual HostMapConstSharedPtr crossPriorityHostMap() const PURE;

/**
* Parameter class for updateHosts.
*/
Expand All @@ -447,11 +455,14 @@ class PrioritySet {
* @param hosts_added supplies the hosts added since the last update.
* @param hosts_removed supplies the hosts removed since the last update.
* @param overprovisioning_factor if presents, overwrites the current overprovisioning_factor.
* @param cross_priority_host_map read only cross-priority host map which is created in the main
* thread and shared by all the worker threads.
*/
virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_host_params,
virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights,
const HostVector& hosts_added, const HostVector& hosts_removed,
absl::optional<uint32_t> overprovisioning_factor) PURE;
absl::optional<uint32_t> overprovisioning_factor,
HostMapConstSharedPtr cross_priority_host_map = nullptr) PURE;

/**
* Callback provided during batch updates that can be used to update hosts.
Expand All @@ -469,7 +480,7 @@ class PrioritySet {
* @param hosts_removed supplies the hosts removed since the last update.
* @param overprovisioning_factor if presents, overwrites the current overprovisioning_factor.
*/
virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_host_params,
virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights,
const HostVector& hosts_added, const HostVector& hosts_removed,
absl::optional<uint32_t> overprovisioning_factor) PURE;
Expand Down
65 changes: 35 additions & 30 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -978,35 +978,37 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_
per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
}

tls_.runOnAllThreads(
[info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster,
load_balancer_factory](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
if (add_or_update_cluster) {
if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) {
ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
} else {
ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
}
HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap();

tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
add_or_update_cluster, load_balancer_factory, map = std::move(host_map)](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe call it host_map within the lambda as well? map seems a bit too generic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
if (add_or_update_cluster) {
if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) {
ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
} else {
ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
}

new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
load_balancer_factory);
cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
}
new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
load_balancer_factory);
cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
}

for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
info->name(), per_priority.priority_, per_priority.update_hosts_params_,
per_priority.locality_weights_, per_priority.hosts_added_,
per_priority.hosts_removed_, per_priority.overprovisioning_factor_);
}
for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
info->name(), per_priority.priority_, per_priority.update_hosts_params_,
per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,
per_priority.overprovisioning_factor_, map);
}

if (new_cluster != nullptr) {
for (auto& cb : cluster_manager->update_callbacks_) {
cb->onClusterAddOrUpdate(*new_cluster);
}
}
});
if (new_cluster != nullptr) {
for (auto& cb : cluster_manager->update_callbacks_) {
cb->onClusterAddOrUpdate(*new_cluster);
}
}
});
}

void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) {
Expand Down Expand Up @@ -1046,11 +1048,13 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHost
const std::string& name, uint32_t priority,
PrioritySet::UpdateHostsParams&& update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
const HostVector& hosts_removed, absl::optional<uint32_t> overprovisioning_factor) {
const HostVector& hosts_removed, absl::optional<uint32_t> overprovisioning_factor,
HostMapConstSharedPtr cross_priority_host_map) {
ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name,
hosts_added.size(), hosts_removed.size());
priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights),
hosts_added, hosts_removed, overprovisioning_factor);
hosts_added, hosts_removed, overprovisioning_factor,
std::move(cross_priority_host_map));
// If an LB is thread aware, create a new worker local LB on membership changes.
if (lb_factory_ != nullptr) {
ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
Expand Down Expand Up @@ -1224,12 +1228,13 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
const HostVector& hosts_removed, uint64_t overprovisioning_factor) {
const HostVector& hosts_removed, uint64_t overprovisioning_factor,
HostMapConstSharedPtr cross_priority_host_map) {
ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end());
const auto& cluster_entry = thread_local_clusters_[name];
cluster_entry->updateHosts(name, priority, std::move(update_hosts_params),
std::move(locality_weights), hosts_added, hosts_removed,
overprovisioning_factor);
overprovisioning_factor, std::move(cross_priority_host_map));
}

void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
Expand Down
6 changes: 4 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
PrioritySet::UpdateHostsParams&& update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights,
const HostVector& hosts_added, const HostVector& hosts_removed,
absl::optional<uint32_t> overprovisioning_factor);
absl::optional<uint32_t> overprovisioning_factor,
HostMapConstSharedPtr cross_priority_host_map);

// Drains any connection pools associated with the removed hosts.
void drainConnPools(const HostVector& hosts_removed);
Expand Down Expand Up @@ -471,7 +472,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
PrioritySet::UpdateHostsParams update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights,
const HostVector& hosts_added, const HostVector& hosts_removed,
uint64_t overprovisioning_factor);
uint64_t overprovisioning_factor,
HostMapConstSharedPtr cross_priority_host_map);
void onHostHealthFailure(const HostSharedPtr& host);

ConnPoolsContainer* getHttpConnPoolsContainer(const HostConstSharedPtr& host,
Expand Down
32 changes: 16 additions & 16 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ EdsClusterImpl::EdsClusterImpl(
void EdsClusterImpl::startPreInit() { subscription_->start({cluster_name_}); }

void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) {
absl::flat_hash_map<std::string, HostSharedPtr> updated_hosts;
absl::flat_hash_set<std::string> all_new_hosts;
PriorityStateManager priority_state_manager(parent_, parent_.local_info_, &host_update_cb);
for (const auto& locality_lb_endpoint : cluster_load_assignment_.endpoints()) {
Expand All @@ -56,6 +55,11 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h

for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address());
// When the configuration contains duplicate hosts, only the first one will be retained.
if (all_new_hosts.count(address->asString()) > 0) {
continue;
}
Comment on lines +58 to +61
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an unrelated bug fix? Or is this somehow required with this new code?


priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address,
locality_lb_endpoint, lb_endpoint,
parent_.time_source_);
Expand All @@ -66,6 +70,11 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
// Track whether we rebuilt any LB structures.
bool cluster_rebuilt = false;

// Get the map of all the latest existing hosts, which is used to filter out the existing
// hosts in the process of updating cluster memberships.
HostMapConstSharedPtr all_hosts = parent_.prioritySet().crossPriorityHostMap();
ASSERT(all_hosts != nullptr);

const uint32_t overprovisioning_factor = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
cluster_load_assignment_.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor);

Expand All @@ -80,13 +89,13 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
if (priority_state[i].first != nullptr) {
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, *priority_state[i].first, parent_.locality_weights_map_[i],
priority_state[i].second, priority_state_manager, updated_hosts, all_new_hosts);
priority_state[i].second, priority_state_manager, *all_hosts, all_new_hosts);
} else {
// If the new update contains a priority with no hosts, call the update function with an empty
// set of hosts.
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map,
priority_state_manager, updated_hosts, all_new_hosts);
priority_state_manager, *all_hosts, all_new_hosts);
}
}

Expand All @@ -99,11 +108,9 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
}
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map,
priority_state_manager, updated_hosts, all_new_hosts);
priority_state_manager, *all_hosts, all_new_hosts);
}

parent_.all_hosts_ = std::move(updated_hosts);

if (!cluster_rebuilt) {
parent_.info_->stats().update_no_rebuild_.inc();
}
Expand Down Expand Up @@ -226,18 +233,12 @@ void EdsClusterImpl::reloadHealthyHostsHelper(const HostSharedPtr& host) {
HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
host_set->localityWeights(), {}, hosts_to_remove, absl::nullopt);
}

if (host_to_exclude != nullptr) {
ASSERT(all_hosts_.find(host_to_exclude->address()->asString()) != all_hosts_.end());
all_hosts_.erase(host_to_exclude->address()->asString());
}
}

bool EdsClusterImpl::updateHostsPerLocality(
const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts,
LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map,
PriorityStateManager& priority_state_manager,
absl::flat_hash_map<std::string, HostSharedPtr>& updated_hosts,
PriorityStateManager& priority_state_manager, const HostMap& all_hosts,
const absl::flat_hash_set<std::string>& all_new_hosts) {
const auto& host_set = priority_set_.getOrCreateHostSet(priority, overprovisioning_factor);
HostVectorSharedPtr current_hosts_copy(new HostVector(host_set.hosts()));
Expand All @@ -254,9 +255,8 @@ bool EdsClusterImpl::updateHostsPerLocality(
// performance implications, since this has the knock on effect that we rebuild the load balancers
// and locality scheduler. See the comment in BaseDynamicClusterImpl::updateDynamicHostList
// about this. In the future we may need to do better here.
const bool hosts_updated =
updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, hosts_removed,
updated_hosts, all_hosts_, all_new_hosts);
const bool hosts_updated = updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added,
hosts_removed, all_hosts, all_new_hosts);
if (hosts_updated || host_set.overprovisioningFactor() != overprovisioning_factor ||
locality_weights_map != new_locality_weights_map) {
ASSERT(std::all_of(current_hosts_copy->begin(), current_hosts_copy->end(),
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class EdsClusterImpl
bool updateHostsPerLocality(const uint32_t priority, const uint32_t overprovisioning_factor,
const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map,
LocalityWeightsMap& new_locality_weights_map,
PriorityStateManager& priority_state_manager, HostMap& updated_hosts,
PriorityStateManager& priority_state_manager,
const HostMap& all_hosts,
const absl::flat_hash_set<std::string>& all_new_hosts);
bool validateUpdateSize(int num_resources);

Expand Down Expand Up @@ -78,7 +79,6 @@ class EdsClusterImpl
const LocalInfo::LocalInfo& local_info_;
const std::string cluster_name_;
std::vector<LocalityWeightsMap> locality_weights_map_;
HostMap all_hosts_;
Event::TimerPtr assignment_timeout_;
InitializePhase initialize_phase_;
};
Expand Down
24 changes: 17 additions & 7 deletions source/common/upstream/strict_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
if (status == Network::DnsResolver::ResolutionStatus::Success) {
parent_.info_->stats().update_success_.inc();

HostMap updated_hosts;
HostVector new_hosts;
std::chrono::seconds ttl_refresh_rate = std::chrono::seconds::max();
absl::flat_hash_set<std::string> all_new_hosts;
Expand All @@ -127,33 +126,44 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
// potentially move port handling into the DNS interface itself, which would work better
// for SRV.
ASSERT(resp.address_ != nullptr);
auto address = Network::Utility::getAddressWithPort(*(resp.address_), port_);
if (all_new_hosts.count(address->asString()) > 0) {
continue;
}
Comment on lines +130 to +132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question?


new_hosts.emplace_back(new HostImpl(
parent_.info_, hostname_,
Network::Utility::getAddressWithPort(*(resp.address_), port_),
parent_.info_, hostname_, address,
// TODO(zyfjeff): Created through metadata shared pool
std::make_shared<const envoy::config::core::v3::Metadata>(lb_endpoint_.metadata()),
lb_endpoint_.load_balancing_weight().value(), locality_lb_endpoints_.locality(),
lb_endpoint_.endpoint().health_check_config(), locality_lb_endpoints_.priority(),
lb_endpoint_.health_status(), parent_.time_source_));
all_new_hosts.emplace(new_hosts.back()->address()->asString());
all_new_hosts.emplace(address->asString());
ttl_refresh_rate = min(ttl_refresh_rate, resp.ttl_);
}

HostVector hosts_added;
HostVector hosts_removed;
if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed,
updated_hosts, all_hosts_, all_new_hosts)) {
all_hosts_, all_new_hosts)) {
ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_);
ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) {
return host->priority() == locality_lb_endpoints_.priority();
}));

// Update host map for current resolve target.
for (const auto& host : hosts_removed) {
all_hosts_.erase(host->address()->asString());
}
for (const auto& host : hosts_added) {
all_hosts_.insert({host->address()->asString(), host});
}

parent_.updateAllHosts(hosts_added, hosts_removed, locality_lb_endpoints_.priority());
} else {
parent_.info_->stats().update_no_rebuild_.inc();
}

all_hosts_ = std::move(updated_hosts);

// reset failure backoff strategy because there was a success.
parent_.failure_backoff_strategy_->reset();

Expand Down
7 changes: 7 additions & 0 deletions source/common/upstream/strict_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl {
const uint32_t port_;
const Event::TimerPtr resolve_timer_;
HostVector hosts_;

// Host map for current resolve target. When we have multiple resolve targets, multiple targets
// may contain two different hosts with the same address. This has two effects:
// 1) This host map cannot be replaced by the cross-priority global host map in the priority
// set.
// 2) Cross-priority global host map may not be able to search for the expected host based on
// the address.
HostMap all_hosts_;
};

Expand Down
Loading