Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ using HealthyHostVector = Phantom<HostVector, Healthy>;
using DegradedHostVector = Phantom<HostVector, Degraded>;
using ExcludedHostVector = Phantom<HostVector, Excluded>;
using HostMap = absl::flat_hash_map<std::string, Upstream::HostSharedPtr>;
using HostMapSharedPtr = std::shared_ptr<HostMap>;
using HostMapConstSharedPtr = std::shared_ptr<const HostMap>;
using HostVectorSharedPtr = std::shared_ptr<HostVector>;
using HostVectorConstSharedPtr = std::shared_ptr<const HostVector>;

Expand Down Expand Up @@ -424,6 +426,12 @@ class PrioritySet {
*/
virtual const std::vector<HostSetPtr>& hostSetsPerPriority() const PURE;

/**
* @return const HostMapConstSharedPtr& read only cross priority host map that indexed by host
* address string.
*/
virtual const HostMapConstSharedPtr& crossPriorityHostMap() const PURE;

/**
* Parameter class for updateHosts.
*/
Expand All @@ -447,11 +455,14 @@ class PrioritySet {
* @param hosts_added supplies the hosts added since the last update.
* @param hosts_removed supplies the hosts removed since the last update.
* @param overprovisioning_factor if presents, overwrites the current overprovisioning_factor.
* @param cross_priority_host_map read only cross-priority host map which is created in the main
* thread and shared by all the worker threads.
*/
virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_host_params,
virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights,
const HostVector& hosts_added, const HostVector& hosts_removed,
absl::optional<uint32_t> overprovisioning_factor) PURE;
absl::optional<uint32_t> overprovisioning_factor,
HostMapConstSharedPtr cross_priority_host_map = nullptr) PURE;

/**
* Callback provided during batch updates that can be used to update hosts.
Expand All @@ -469,7 +480,7 @@ class PrioritySet {
* @param hosts_removed supplies the hosts removed since the last update.
* @param overprovisioning_factor if presents, overwrites the current overprovisioning_factor.
*/
virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_host_params,
virtual void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights,
const HostVector& hosts_added, const HostVector& hosts_removed,
absl::optional<uint32_t> overprovisioning_factor) PURE;
Expand Down
59 changes: 31 additions & 28 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -978,35 +978,37 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_
per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
}

tls_.runOnAllThreads(
[info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster,
load_balancer_factory](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
if (add_or_update_cluster) {
if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) {
ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
} else {
ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
}
HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap();

tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
add_or_update_cluster, load_balancer_factory, map = std::move(host_map)](

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe call it host_map within the lambda as well? map seems a bit too generic

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
if (add_or_update_cluster) {
if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) {
ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
} else {
ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
}

new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
load_balancer_factory);
cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
}
new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
load_balancer_factory);
cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
}

for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
info->name(), per_priority.priority_, per_priority.update_hosts_params_,
per_priority.locality_weights_, per_priority.hosts_added_,
per_priority.hosts_removed_, per_priority.overprovisioning_factor_);
}
for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
info->name(), per_priority.priority_, per_priority.update_hosts_params_,
per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,
per_priority.overprovisioning_factor_, map);
}

if (new_cluster != nullptr) {
for (auto& cb : cluster_manager->update_callbacks_) {
cb->onClusterAddOrUpdate(*new_cluster);
}
}
});
if (new_cluster != nullptr) {
for (auto& cb : cluster_manager->update_callbacks_) {
cb->onClusterAddOrUpdate(*new_cluster);
}
}
});
}

void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) {
Expand Down Expand Up @@ -1243,14 +1245,15 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
const HostVector& hosts_removed, uint64_t overprovisioning_factor) {
const HostVector& hosts_removed, uint64_t overprovisioning_factor,
const HostMapConstSharedPtr& cross_priority_host_map) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think usually we'd pass a shared ptr by value and std::move it to avoid copies where appropriate

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end());
const auto& cluster_entry = thread_local_clusters_[name];
ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name,
hosts_added.size(), hosts_removed.size());
cluster_entry->priority_set_.updateHosts(priority, std::move(update_hosts_params),
std::move(locality_weights), hosts_added, hosts_removed,
overprovisioning_factor);
overprovisioning_factor, cross_priority_host_map);

// If an LB is thread aware, create a new worker local LB on membership changes.
if (cluster_entry->lb_factory_ != nullptr) {
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
PrioritySet::UpdateHostsParams update_hosts_params,
LocalityWeightsConstSharedPtr locality_weights,
const HostVector& hosts_added, const HostVector& hosts_removed,
uint64_t overprovisioning_factor);
uint64_t overprovisioning_factor,
const HostMapConstSharedPtr& cross_priority_host_map);
void onHostHealthFailure(const HostSharedPtr& host);

ConnPoolsContainer* getHttpConnPoolsContainer(const HostConstSharedPtr& host,
Expand Down
32 changes: 16 additions & 16 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ EdsClusterImpl::EdsClusterImpl(
void EdsClusterImpl::startPreInit() { subscription_->start({cluster_name_}); }

void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) {
absl::flat_hash_map<std::string, HostSharedPtr> updated_hosts;
absl::flat_hash_set<std::string> all_new_hosts;
PriorityStateManager priority_state_manager(parent_, parent_.local_info_, &host_update_cb);
for (const auto& locality_lb_endpoint : cluster_load_assignment_.endpoints()) {
Expand All @@ -57,6 +56,11 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h

for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address());
// When the configuration contains duplicate hosts, only the first one will be retained.
if (all_new_hosts.count(address->asString()) > 0) {
continue;
}
Comment on lines +58 to +61

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an unrelated bug fix? Or is this somehow required with this new code?


priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address,
locality_lb_endpoint, lb_endpoint,
parent_.time_source_);
Expand All @@ -67,6 +71,11 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
// Track whether we rebuilt any LB structures.
bool cluster_rebuilt = false;

// Get the map of all the latest existing hosts, which is used to filter out the existing
// hosts in the process of updating cluster memberships.
HostMapConstSharedPtr all_hosts = parent_.prioritySet().crossPriorityHostMap();
ASSERT(all_hosts != nullptr);

const uint32_t overprovisioning_factor = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
cluster_load_assignment_.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor);

Expand All @@ -81,13 +90,13 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
if (priority_state[i].first != nullptr) {
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, *priority_state[i].first, parent_.locality_weights_map_[i],
priority_state[i].second, priority_state_manager, updated_hosts, all_new_hosts);
priority_state[i].second, priority_state_manager, *all_hosts, all_new_hosts);
} else {
// If the new update contains a priority with no hosts, call the update function with an empty
// set of hosts.
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map,
priority_state_manager, updated_hosts, all_new_hosts);
priority_state_manager, *all_hosts, all_new_hosts);
}
}

Expand All @@ -100,11 +109,9 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h
}
cluster_rebuilt |= parent_.updateHostsPerLocality(
i, overprovisioning_factor, {}, parent_.locality_weights_map_[i], empty_locality_map,
priority_state_manager, updated_hosts, all_new_hosts);
priority_state_manager, *all_hosts, all_new_hosts);
}

parent_.all_hosts_ = std::move(updated_hosts);

if (!cluster_rebuilt) {
parent_.info_->stats().update_no_rebuild_.inc();
}
Expand Down Expand Up @@ -227,18 +234,12 @@ void EdsClusterImpl::reloadHealthyHostsHelper(const HostSharedPtr& host) {
HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
host_set->localityWeights(), {}, hosts_to_remove, absl::nullopt);
}

if (host_to_exclude != nullptr) {
ASSERT(all_hosts_.find(host_to_exclude->address()->asString()) != all_hosts_.end());
all_hosts_.erase(host_to_exclude->address()->asString());
}
}

bool EdsClusterImpl::updateHostsPerLocality(
const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts,
LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map,
PriorityStateManager& priority_state_manager,
absl::flat_hash_map<std::string, HostSharedPtr>& updated_hosts,
PriorityStateManager& priority_state_manager, const HostMap& all_hosts,
const absl::flat_hash_set<std::string>& all_new_hosts) {
const auto& host_set = priority_set_.getOrCreateHostSet(priority, overprovisioning_factor);
HostVectorSharedPtr current_hosts_copy(new HostVector(host_set.hosts()));
Expand All @@ -255,9 +256,8 @@ bool EdsClusterImpl::updateHostsPerLocality(
// performance implications, since this has the knock on effect that we rebuild the load balancers
// and locality scheduler. See the comment in BaseDynamicClusterImpl::updateDynamicHostList
// about this. In the future we may need to do better here.
const bool hosts_updated =
updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added, hosts_removed,
updated_hosts, all_hosts_, all_new_hosts);
const bool hosts_updated = updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added,
hosts_removed, all_hosts, all_new_hosts);
if (hosts_updated || host_set.overprovisioningFactor() != overprovisioning_factor ||
locality_weights_map != new_locality_weights_map) {
ASSERT(std::all_of(current_hosts_copy->begin(), current_hosts_copy->end(),
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class EdsClusterImpl
bool updateHostsPerLocality(const uint32_t priority, const uint32_t overprovisioning_factor,
const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map,
LocalityWeightsMap& new_locality_weights_map,
PriorityStateManager& priority_state_manager, HostMap& updated_hosts,
PriorityStateManager& priority_state_manager,
const HostMap& all_hosts,
const absl::flat_hash_set<std::string>& all_new_hosts);
bool validateUpdateSize(int num_resources);

Expand Down Expand Up @@ -78,7 +79,6 @@ class EdsClusterImpl
const LocalInfo::LocalInfo& local_info_;
const std::string cluster_name_;
std::vector<LocalityWeightsMap> locality_weights_map_;
HostMap all_hosts_;
Event::TimerPtr assignment_timeout_;
InitializePhase initialize_phase_;
};
Expand Down
24 changes: 17 additions & 7 deletions source/common/upstream/strict_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
if (status == Network::DnsResolver::ResolutionStatus::Success) {
parent_.info_->stats().update_success_.inc();

HostMap updated_hosts;
HostVector new_hosts;
std::chrono::seconds ttl_refresh_rate = std::chrono::seconds::max();
absl::flat_hash_set<std::string> all_new_hosts;
Expand All @@ -127,33 +126,44 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
// potentially move port handling into the DNS interface itself, which would work better
// for SRV.
ASSERT(resp.address_ != nullptr);
auto address = Network::Utility::getAddressWithPort(*(resp.address_), port_);
if (all_new_hosts.count(address->asString()) > 0) {
continue;
}
Comment on lines +130 to +132

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question?


new_hosts.emplace_back(new HostImpl(
parent_.info_, hostname_,
Network::Utility::getAddressWithPort(*(resp.address_), port_),
parent_.info_, hostname_, address,
// TODO(zyfjeff): Created through metadata shared pool
std::make_shared<const envoy::config::core::v3::Metadata>(lb_endpoint_.metadata()),
lb_endpoint_.load_balancing_weight().value(), locality_lb_endpoints_.locality(),
lb_endpoint_.endpoint().health_check_config(), locality_lb_endpoints_.priority(),
lb_endpoint_.health_status(), parent_.time_source_));
all_new_hosts.emplace(new_hosts.back()->address()->asString());
all_new_hosts.emplace(address->asString());
ttl_refresh_rate = min(ttl_refresh_rate, resp.ttl_);
}

HostVector hosts_added;
HostVector hosts_removed;
if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed,
updated_hosts, all_hosts_, all_new_hosts)) {
all_hosts_, all_new_hosts)) {
ENVOY_LOG(debug, "DNS hosts have changed for {}", dns_address_);
ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) {
return host->priority() == locality_lb_endpoints_.priority();
}));

// Update host map for current resolve target.
for (const auto& host : hosts_removed) {
all_hosts_.erase(host->address()->asString());
}
for (const auto& host : hosts_added) {
all_hosts_.insert({host->address()->asString(), host});
}

parent_.updateAllHosts(hosts_added, hosts_removed, locality_lb_endpoints_.priority());
} else {
parent_.info_->stats().update_no_rebuild_.inc();
}

all_hosts_ = std::move(updated_hosts);

// reset failure backoff strategy because there was a success.
parent_.failure_backoff_strategy_->reset();

Expand Down
4 changes: 4 additions & 0 deletions source/common/upstream/strict_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class StrictDnsClusterImpl : public BaseDynamicClusterImpl {
const uint32_t port_;
const Event::TimerPtr resolve_timer_;
HostVector hosts_;

// All host map for current resolve target. When we have multiple resolve targets, multiple
// targets may contain two different host objects with the same address. This host map cannot be
// replaced by the read only all host map in the priority set.
HostMap all_hosts_;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything that would prevent someone from using the shared map for strict DNS clusters? Seems like it would not be valid?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared map is still worked for strict DNS clusters. However, we cannot use shared map to replace this all_hosts_.

Because strict DNS cluster will remove duplicate hosts in the resolve target. But if multiple resolve targets contain hosts with the same address, these hosts will exist at the same time. As a global host map, shared map cannot reserve two different hosts with the same address for two different resolve targets, nor can it keep the logic of strict DNS cluster unchanged.
So shared map cannot replace all_hosts_ (note that all_hosts_ here is a member of resolve target).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my concern would be around people trying to implement sticky LB for strict DNS but running into weird edge cases: suppose they use regular LB to select an endpoint from the STRICT_DNS cluster and wants to pin this lb selection

This works fine if there is only one host per ip within the cluster, but if this is configured with multiple resolve targets that resolve to the same ip, some of which have different metadata, you might run into a situation where the first LB selects a different endpoint than what the shared map gives you when you ask for the IP

I don't think we need to fix this, but perhaps some documentation around this limitation with the strict dns cluster would be good?

@wbpcode wbpcode Aug 3, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works fine if there is only one host per ip within the cluster, but if this is configured with multiple resolve targets that resolve to the same ip, some of which have different metadata, you might run into a situation where the first LB selects a different endpoint than what the shared map gives you when you ask for the IP

Yes, this is a known limitation. I will add some new comments to illustrate this problem.

};

Expand Down
Loading