-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Add option for merging cluster updates #3941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 34 commits
431a634
8e20204
c6dbc7d
9e3288e
add79b2
29f1d3d
0dd31c4
0bd58e6
67adb12
ec643f1
d05ef99
95e5260
0f70fb3
b392c7b
a399949
f88c297
a1145dd
b33b080
d70809c
10d3670
80e03db
dcd7174
b3745de
6fac84f
9eae09a
0d8475f
4d0c528
09d56cc
3d78234
6408528
31e4fdf
a5a5391
2a264d1
25faa2b
5affa4c
d3464d4
bbbfef8
d81fb7f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -432,6 +432,17 @@ message Cluster { | |
| ZoneAwareLbConfig zone_aware_lb_config = 2; | ||
| LocalityWeightedLbConfig locality_weighted_lb_config = 3; | ||
| } | ||
| // If set, all healthcheck/weight/metadata updates that happen within this duration will be | ||
| // merged and delivered in one shot when the duration expires. The start of the duration is when | ||
| // the first update happens. This is useful for big clusters, with potentially noisy deploys | ||
| // that might trigger excessive CPU usage due to a constant stream of healthcheck state changes | ||
| // or metadata updates. By default, this is not configured and updates apply immediately. Also, | ||
| // the first set of updates to be seen apply immediately as well (e.g.: a new cluster). | ||
| // | ||
| // Note: merging does not apply to cluser membership changes (e.g.: adds/removes); this is | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: s/cluser/cluster/ |
||
| // because merging those updates isn't currently safe. See | ||
| // https://github.com/envoyproxy/envoy/pull/3941. | ||
| google.protobuf.Duration update_merge_window = 4; | ||
| } | ||
|
|
||
| // Common configuration for all load balancer implementations. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -180,7 +180,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots | |
| init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }), | ||
| config_tracker_entry_( | ||
| admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })), | ||
| system_time_source_(system_time_source) { | ||
| system_time_source_(system_time_source), dispatcher_(main_thread_dispatcher) { | ||
| async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(*this, tls); | ||
| const auto& cm_config = bootstrap.cluster_manager(); | ||
| if (cm_config.has_outlier_detection()) { | ||
|
|
@@ -330,7 +330,35 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { | |
| const HostVector& hosts_removed) { | ||
| // This fires when a cluster is about to have an updated member set. We need to send this | ||
| // out to all of the thread local configurations. | ||
| postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); | ||
|
|
||
| // Should we save this update and merge it with other updates? | ||
| // | ||
| // Note that we can only _safely_ merge updates that have no added/removed hosts. That is, | ||
| // only those updates that signal a change in host healthcheck state, weight or metadata. | ||
| // | ||
| // We've discussed merging updates related to hosts being added/removed, but it's really | ||
| // tricky to merge those given that downstream consumers of these updates expect to see the | ||
| // full list of updates, not a condensed one. This is because they use the broadcasted | ||
| // HostSharedPtrs within internal maps to track hosts. If we fail to broadcast the entire list | ||
| // of removals, these maps will leak those HostSharedPtrs. | ||
| // | ||
| // See https://github.com/envoyproxy/envoy/pull/3941 for more context. | ||
| bool scheduled = false; | ||
| const bool merging_enabled = cluster.info()->lbConfig().has_update_merge_window(); | ||
| // Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes. | ||
| const bool is_mergeable = !hosts_added.size() && !hosts_removed.size(); | ||
|
|
||
| if (merging_enabled) { | ||
| // If this is not mergeable, we should cancel any scheduled updates since | ||
| // we'll deliver it immediately. | ||
| scheduled = scheduleUpdate(cluster, priority, is_mergeable); | ||
| } | ||
|
|
||
| // If an update was not scheduled for later, deliver it immediately. | ||
| if (!scheduled) { | ||
| cm_stats_.cluster_updated_.inc(); | ||
| postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); | ||
| } | ||
| }); | ||
|
|
||
| // Finally, if the cluster has any hosts, post updates cross-thread so the per-thread load | ||
|
|
@@ -343,6 +371,79 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { | |
| } | ||
| } | ||
|
|
||
| bool ClusterManagerImpl::scheduleUpdate(const Cluster& cluster, uint32_t priority, bool mergeable) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: if, for a given cluster/priority, we have something like "add, remove, inplace update, add, remove, inplace update", should we be canceling the outstanding deferred update at the second "add"?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, because the second "add" is a non-mergeable update so it will trigger immediate delivery, thus cancelling the pending merged update by disabling the timer. This happens here in And we exercise a similar variation of this at the end at the unit test (inplace/inplace/inplace/remove→ timer disabled and updates delivered immediately). |
||
| const auto& update_merge_window = cluster.info()->lbConfig().update_merge_window(); | ||
| const auto timeout = DurationUtil::durationToMilliseconds(update_merge_window); | ||
|
|
||
| // Find pending updates for this cluster. | ||
| auto& updates_by_prio = updates_map_[cluster.info()->name()]; | ||
| if (!updates_by_prio) { | ||
| updates_by_prio.reset(new PendingUpdatesByPriorityMap()); | ||
| } | ||
|
|
||
| // Find pending updates for this priority. | ||
| auto& updates = (*updates_by_prio)[priority]; | ||
| if (!updates) { | ||
| updates.reset(new PendingUpdates()); | ||
| } | ||
|
|
||
| // Has an update_merge_window gone by since the last update? If so, don't schedule | ||
| // the update so it can be applied immediately. Ditto if this is not a mergeable update. | ||
| const auto delta = std::chrono::steady_clock::now() - updates->last_updated_; | ||
| const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count(); | ||
| const bool offwindow = delta_ms > timeout; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kind of prefer |
||
| if (offwindow || !mergeable) { | ||
| // If there was a pending update, we cancel the pending merged update. | ||
| // | ||
| // Note: it's possible that even though we are outside of a merge window (delta_ms > timeout), | ||
| // a timer is enabled. This race condition is fine, since we'll disable the timer here and | ||
| // deliver the update immediately. | ||
|
|
||
| // Why wasn't the update scheduled for later delivery? | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry what does this comment mean? Is this just tracking the race condition? Is it worth the extra code for this? I think this should be incredibly rare but I agree it can happen.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's for tracking two things: a) did we not merge this update because we are out of a merge window? i found this stat useful while testing this out, because it tells you how often merging is not needed I'd like to keep these, I think they are useful. Happy to clarify comments/names. Ideas?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK makes sense. I would probably go with the stat name @htuch suggested and put the entirety of what you jus wrote in a comment here? I think that would make it clear.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| if (mergeable && offwindow) { | ||
| cm_stats_.update_merge_offwindow_.inc(); | ||
| } | ||
|
|
||
| if (updates->disableTimer()) { | ||
| cm_stats_.update_merge_cancelled_.inc(); | ||
| } | ||
|
|
||
| updates->last_updated_ = std::chrono::steady_clock::now(); | ||
| return false; | ||
| } | ||
|
|
||
| // If there's no timer, create one. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this delay the first update after a long period of silence? Should we instead apply immediately in this case? I think we can then just keep track of the last time we applied and only add to a pending queue if < window length.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed in the latest commit |
||
| if (updates->timer_ == nullptr) { | ||
| updates->timer_ = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void { | ||
| applyUpdates(cluster, priority, *updates); | ||
| }); | ||
| } | ||
|
|
||
| // Ensure there's a timer set to deliver these updates. | ||
| if (!updates->timer_enabled_) { | ||
| updates->enableTimer(timeout); | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| void ClusterManagerImpl::applyUpdates(const Cluster& cluster, uint32_t priority, | ||
| PendingUpdates& updates) { | ||
| // Deliver pending updates. | ||
|
|
||
| // Remember that these merged updates are _only_ for updates related to | ||
| // HC/weight/metadata changes. That's why added/removed are empty. All | ||
| // adds/removals were already immediately broadcasted. | ||
| static const HostVector hosts_added; | ||
| static const HostVector hosts_removed; | ||
|
|
||
| postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); | ||
|
|
||
| cm_stats_.cluster_updated_via_merge_.inc(); | ||
| updates.timer_enabled_ = false; | ||
| updates.last_updated_ = std::chrono::steady_clock::now(); | ||
| } | ||
|
|
||
| bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster, | ||
| const std::string& version_info) { | ||
| // First we need to see if this new config is new or an update to an existing dynamic cluster. | ||
|
|
@@ -468,6 +569,9 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { | |
| if (removed) { | ||
| cm_stats_.cluster_removed_.inc(); | ||
| updateGauges(); | ||
| // Did we ever deliver merged updates for this cluster? | ||
| // No need to manually disable timers, this should take care of it. | ||
| updates_map_.erase(cluster_name); | ||
| } | ||
|
|
||
| return removed; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -138,6 +138,10 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> { | |
| COUNTER(cluster_added) \ | ||
| COUNTER(cluster_modified) \ | ||
| COUNTER(cluster_removed) \ | ||
| COUNTER(cluster_updated) \ | ||
| COUNTER(cluster_updated_via_merge) \ | ||
| COUNTER(update_merge_cancelled) \ | ||
| COUNTER(update_merge_offwindow) \ | ||
| GAUGE (active_clusters) \ | ||
| GAUGE (warming_clusters) | ||
| // clang-format on | ||
|
|
@@ -209,6 +213,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u | |
|
|
||
| ClusterManagerFactory& clusterManagerFactory() override { return factory_; } | ||
|
|
||
| protected: | ||
| virtual void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority, | ||
| const HostVector& hosts_added, | ||
| const HostVector& hosts_removed); | ||
|
|
||
| private: | ||
| /** | ||
| * Thread local cached cluster data. Each thread local cluster gets updates from the parent | ||
|
|
@@ -361,14 +370,47 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u | |
| // This map is ordered so that config dumping is consistent. | ||
| typedef std::map<std::string, ClusterDataPtr> ClusterMap; | ||
|
|
||
| struct PendingUpdates { | ||
| void enableTimer(const uint64_t timeout) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add |
||
| ASSERT(!timer_enabled_); | ||
| if (timer_ != nullptr) { | ||
| timer_->enableTimer(std::chrono::milliseconds(timeout)); | ||
| timer_enabled_ = true; | ||
| } | ||
| } | ||
| bool disableTimer() { | ||
| const bool was_enabled = timer_enabled_; | ||
| if (timer_ != nullptr) { | ||
| timer_->disableTimer(); | ||
| timer_enabled_ = false; | ||
| } | ||
| return was_enabled; | ||
| } | ||
|
|
||
| Event::TimerPtr timer_; | ||
| // TODO(rgs1): this should be part of Event::Timer's interface. | ||
| bool timer_enabled_{}; | ||
| // This is default constructed to the clock's epoch: | ||
| // https://en.cppreference.com/w/cpp/chrono/time_point/time_point | ||
| // | ||
| // This will usually be the computer's boot time, which means that given a not very large | ||
| // `Cluster.CommonLbConfig.update_merge_window`, the first update will trigger immediately | ||
| // (the expected behavior). | ||
| MonotonicTime last_updated_; | ||
| }; | ||
| using PendingUpdatesPtr = std::unique_ptr<PendingUpdates>; | ||
| using PendingUpdatesByPriorityMap = std::unordered_map<uint32_t, PendingUpdatesPtr>; | ||
| using PendingUpdatesByPriorityMapPtr = std::unique_ptr<PendingUpdatesByPriorityMap>; | ||
| using ClusterUpdatesMap = std::unordered_map<std::string, PendingUpdatesByPriorityMapPtr>; | ||
|
|
||
| void applyUpdates(const Cluster& cluster, uint32_t priority, PendingUpdates& updates); | ||
| bool scheduleUpdate(const Cluster& cluster, uint32_t priority, bool mergeable); | ||
| void createOrUpdateThreadLocalCluster(ClusterData& cluster); | ||
| ProtobufTypes::MessagePtr dumpClusterConfigs(); | ||
| static ClusterManagerStats generateStats(Stats::Scope& scope); | ||
| void loadCluster(const envoy::api::v2::Cluster& cluster, const std::string& version_info, | ||
| bool added_via_api, ClusterMap& cluster_map); | ||
| void onClusterInit(Cluster& cluster); | ||
| void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority, | ||
| const HostVector& hosts_added, const HostVector& hosts_removed); | ||
| void postThreadLocalHealthFailure(const HostSharedPtr& host); | ||
| void updateGauges(); | ||
|
|
||
|
|
@@ -394,6 +436,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u | |
| Grpc::AsyncClientManagerPtr async_client_manager_; | ||
| Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_; | ||
| SystemTimeSource& system_time_source_; | ||
| ClusterUpdatesMap updates_map_; | ||
| Event::Dispatcher& dispatcher_; | ||
| }; | ||
|
|
||
| } // namespace Upstream | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: s/healthcheck/health check/