Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ using HealthyHostVector = Phantom<HostVector, Healthy>;
using DegradedHostVector = Phantom<HostVector, Degraded>;
using ExcludedHostVector = Phantom<HostVector, Excluded>;
using HostMap = absl::flat_hash_map<std::string, Upstream::HostSharedPtr>;
using HostMapSharedPtr = std::shared_ptr<HostMap>;
using HostMapConstSharedPtr = std::shared_ptr<const HostMap>;
using HostVectorSharedPtr = std::shared_ptr<HostVector>;
using HostVectorConstSharedPtr = std::shared_ptr<const HostVector>;

Expand Down Expand Up @@ -424,6 +426,12 @@ class PrioritySet {
*/
virtual const std::vector<HostSetPtr>& hostSetsPerPriority() const PURE;

/**
* @return const HostMapConstSharedPtr& read only all host map that indexed by host address
* string.
*/
virtual const HostMapConstSharedPtr& readOnlyAllHostMap() const PURE;

/**
* Parameter class for updateHosts.
*/
Expand Down
57 changes: 31 additions & 26 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -978,35 +978,40 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_
per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
}

tls_.runOnAllThreads(
[info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster,
load_balancer_factory](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
if (add_or_update_cluster) {
if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) {
ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
} else {
ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
}
HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().readOnlyAllHostMap();

tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
add_or_update_cluster, load_balancer_factory, map = std::move(host_map)](

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe call it host_map within the lambda as well? map seems a bit too generic

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
if (add_or_update_cluster) {
if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) {
ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
} else {
ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
}

new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
load_balancer_factory);
cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
}
new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
load_balancer_factory);
cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
}

for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
info->name(), per_priority.priority_, per_priority.update_hosts_params_,
per_priority.locality_weights_, per_priority.hosts_added_,
per_priority.hosts_removed_, per_priority.overprovisioning_factor_);
}
for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
info->name(), per_priority.priority_, per_priority.update_hosts_params_,
per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,
per_priority.overprovisioning_factor_);
}

if (new_cluster != nullptr) {
for (auto& cb : cluster_manager->update_callbacks_) {
cb->onClusterAddOrUpdate(*new_cluster);
}
}
});
// Update read only all host map in worker threads.
cluster_manager->thread_local_clusters_[info->name()]->priority_set_.setReadOnlyAllHostMap(map);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't this map just be calculated on the main thread and then passed by shared_ptr to all of the workers similar to how everything else is passed in the updateClusterMembership() call? It's unclear to me why this needs to be special cased like this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an update after aggregation, updateClusterMembership() will be executed multiple times (once for each priority). The host map only needs to be updated once. Although it is not a problem to add it to the updateClusterMembership(), it will only be repeated several times to set the pointer of the host map.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will only be repeated several times to set the pointer of the host map.

As long as the copy doesn't happen multiple times I don't think this matters? My preference would be to keep the code as simple as possible unless we have a known perf issue. This code is already super complicated.

@wbpcode wbpcode Jul 14, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is already super complicated.

Yes it is. The code is complex enough. It took me a long time to understand the source code of ClusterManager and EDS.
I will modify and improve this PR first based on your suggestions.


if (new_cluster != nullptr) {
for (auto& cb : cluster_manager->update_callbacks_) {
cb->onClusterAddOrUpdate(*new_cluster);
}
}
});
}

void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) {
Expand Down
25 changes: 25 additions & 0 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,31 @@ void PrioritySetImpl::BatchUpdateScope::updateHosts(
hosts_removed, overprovisioning_factor);
}

void MainPrioritySetImpl::updateMutableAllHostMap(const HostVector& hosts_added,
const HostVector& hosts_removed) {
if (hosts_added.empty() && hosts_removed.empty()) {
// No new hosts have been added and no old hosts have been removed.
return;
}

// Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes, we
// cannot directly modify read_only_all_host_map_.
if (mutable_all_host_map_ == nullptr) {
// Copy old read only host map to mutable host map.
mutable_all_host_map_ = read_only_all_host_map_ != nullptr
? std::make_shared<HostMap>(*read_only_all_host_map_)
: std::make_shared<HostMap>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be prohibitive for large clusters? Even if just one host changes, we end up doing an O(n) copy of this map.

Speaking of large clusters, since this would add both space and computational overhead to large clusters, should we make this configurable or only enabled when actually used in some way?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are already paying this copy cost for all the other data structures, right? (Not that it's good to add yet another one.). One option for this particular structure would be to use R/W locks and then we wouldn't need to actually do any copies on it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we do pay this when processing the EDS update as a whole, so we wouldn't make it that much worse.

I was originally thinking in terms of the update callbacks, most (all?) of which operate on the per priority level, so a change in P0 won't trigger updates to P1, P2, ...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point re: per-priority, though I'm not sure how much that matters in practice for most users. I think though that as long as we can replace the EDS private one with this one it's probably a wash?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep if this lets us replace the all_hosts_ map then I think we're good

@wbpcode wbpcode Jul 13, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep if this lets us replace the all_hosts_ map then I think we're good

@snowp Yep. This will replace the all_hosts_ map in EDS. This may still bring additional overhead, because each priority may be updated, causing the callback to be executed multiple times. In the original EDS, because batchUpdate was used, all_hosts_ would only be copied once. But this problem can be alleviated by setting merge_timeout.

Another way to avoid extra overhead is to use an additional memberUpdateCallback to synchronize read_only_all_host_map_ to the worker threads instead of directly reusing postThreadLocalClusterUpdate. This ensures that when batchUpdate is used, there is only one map copy at most. I think this is also a point that can be discussed.

@wbpcode wbpcode Jul 13, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are already paying this copy cost for all the other data structures, right? (Not that it's good to add yet another one.). One option for this particular structure would be to use R/W locks and then we wouldn't need to actually do any copies on it.

@mattklein123 I have considered using R/W locks, but I don't want to increase the cost of acquiring R locks during each request for occasional membership updates.

}

for (const auto& host : hosts_removed) {
mutable_all_host_map_->erase(host->address()->asString());
}

for (const auto& host : hosts_added) {
mutable_all_host_map_->insert({host->address()->asString(), host});
}
}

ClusterStats ClusterInfoImpl::generateStats(Stats::Scope& scope,
const ClusterStatNames& stat_names) {
return ClusterStats(stat_names, scope);
Expand Down
48 changes: 47 additions & 1 deletion source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,14 @@ class PrioritySetImpl : public PrioritySet {

void batchHostUpdate(BatchUpdateCb& callback) override;

const HostMapConstSharedPtr& readOnlyAllHostMap() const override {
return read_only_all_host_map_;
}

virtual void setReadOnlyAllHostMap(HostMapConstSharedPtr all_host_map) {
read_only_all_host_map_ = std::move(all_host_map);
}

protected:
// Allows subclasses of PrioritySetImpl to create their own type of HostSetImpl.
virtual HostSetImplPtr createHostSet(uint32_t priority,
Expand All @@ -504,6 +512,8 @@ class PrioritySetImpl : public PrioritySet {
// avoid any potential lifetime issues.
std::vector<std::unique_ptr<HostSet>> host_sets_;

HostMapConstSharedPtr read_only_all_host_map_;

private:
// This is a matching vector to store the callback handles for host_sets_. It is kept separately
// because host_sets_ is directly returned so we avoid translation.
Expand Down Expand Up @@ -539,6 +549,42 @@ class PrioritySetImpl : public PrioritySet {
};
};

/**
* Specialized PrioritySetImpl designed for the main thread. It will update and maintain the read
* only all host map when the hosts set changes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

host set

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

*/
class MainPrioritySetImpl : public PrioritySetImpl {
public:
MainPrioritySetImpl() {
all_host_map_update_handle_ = addPriorityUpdateCb(
[this](uint32_t, const HostVector& hosts_added, const HostVector& hosts_removed) {
updateMutableAllHostMap(hosts_added, hosts_removed);
});
}

// PrioritySet
const HostMapConstSharedPtr& readOnlyAllHostMap() const override {
// Check if the host set in the main thread PrioritySet has been updated.
if (mutable_all_host_map_ != nullptr) {
read_only_all_host_map_ = std::move(mutable_all_host_map_);
}
return read_only_all_host_map_;
}

// PrioritySetImpl
void setReadOnlyAllHostMap(HostMapConstSharedPtr) override {
// Using an external host map to update the read_only_all_host_map_ is not allowed in
// MainPrioritySetImpl.
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

protected:
void updateMutableAllHostMap(const HostVector& hosts_added, const HostVector& hosts_removed);

HostMapSharedPtr mutable_all_host_map_;
Common::CallbackHandlePtr all_host_map_update_handle_;
};

/**
* Implementation of ClusterInfo that reads from JSON.
*/
Expand Down Expand Up @@ -875,7 +921,7 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::u

protected:
TimeSource& time_source_;
PrioritySetImpl priority_set_;
MainPrioritySetImpl priority_set_;

void validateEndpointsForZoneAwareRouting(
const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const;
Expand Down