-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Add option for merging cluster updates #3941
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
431a634
8e20204
c6dbc7d
9e3288e
add79b2
29f1d3d
0dd31c4
0bd58e6
67adb12
ec643f1
d05ef99
95e5260
0f70fb3
b392c7b
a399949
f88c297
a1145dd
b33b080
d70809c
10d3670
80e03db
dcd7174
b3745de
6fac84f
9eae09a
0d8475f
4d0c528
09d56cc
3d78234
6408528
31e4fdf
a5a5391
2a264d1
25faa2b
5affa4c
d3464d4
bbbfef8
d81fb7f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -180,7 +180,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots | |
| init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }), | ||
| config_tracker_entry_( | ||
| admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })), | ||
| system_time_source_(system_time_source) { | ||
| system_time_source_(system_time_source), dispatcher_(main_thread_dispatcher) { | ||
| async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(*this, tls); | ||
| const auto& cm_config = bootstrap.cluster_manager(); | ||
| if (cm_config.has_outlier_detection()) { | ||
|
|
@@ -330,7 +330,13 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { | |
| const HostVector& hosts_removed) { | ||
| // This fires when a cluster is about to have an updated member set. We need to send this | ||
| // out to all of the thread local configurations. | ||
| postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); | ||
|
|
||
| // Should we coalesce updates? | ||
| if (cluster.info()->lbConfig().has_time_between_updates()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it worth moving this logic into it's own function to keep this lambda easier to read?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thought about but then decided against since it isn't that big.. I am happy either way, your call. |
||
| scheduleUpdate(cluster, priority, hosts_added, hosts_removed); | ||
| } else { | ||
| postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); | ||
| } | ||
| }); | ||
|
|
||
| // Finally, if the cluster has any hosts, post updates cross-thread so the per-thread load | ||
|
|
@@ -343,6 +349,61 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { | |
| } | ||
| } | ||
|
|
||
| void ClusterManagerImpl::scheduleUpdate(const Cluster& cluster, uint32_t priority, | ||
| const HostVector& hosts_added, | ||
| const HostVector& hosts_removed) { | ||
| PendingUpdatesByPriorityMapPtr updates_by_prio; | ||
| PendingUpdatesPtr updates; | ||
|
|
||
| // Find pending updates for this cluster. | ||
| auto updates_by_prio_it = updates_map_.find(cluster.info()->name()); | ||
| if (updates_by_prio_it != updates_map_.end()) { | ||
| updates_by_prio = updates_by_prio_it->second; | ||
| } else { | ||
| updates_by_prio = std::make_shared<PendingUpdatesByPriorityMap>(); | ||
| updates_map_[cluster.info()->name()] = updates_by_prio; | ||
| } | ||
|
|
||
| // Find pending updates for this priority. | ||
| auto updates_it = updates_by_prio->find(priority); | ||
| if (updates_it != updates_by_prio->end()) { | ||
| updates = updates_it->second; | ||
| } else { | ||
| updates = std::make_shared<PendingUpdates>(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's unclear to me if we need
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @htuch i agree, address in the latest commit. LMK what you think. Thanks! |
||
| (*updates_by_prio)[priority] = updates; | ||
| } | ||
|
|
||
| // Record the updates that should be applied when the timer fires. | ||
| updates->added.insert(hosts_added.begin(), hosts_added.end()); | ||
| updates->removed.insert(hosts_removed.begin(), hosts_removed.end()); | ||
|
|
||
| // If there's no timer, create one. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't this delay the first update after a long period of silence? Should we instead apply immediately in this case? I think we can then just keep track of the last time we applied and only add to a pending queue if < window length.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed in the latest commit |
||
| if (updates->timer == nullptr) { | ||
| updates->timer = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void { | ||
| applyUpdates(cluster, priority, updates); | ||
| }); | ||
| const auto& time_between_updates = cluster.info()->lbConfig().time_between_updates(); | ||
| const auto timeout = DurationUtil::durationToMilliseconds(time_between_updates); | ||
| updates->timer->enableTimer(std::chrono::milliseconds(timeout)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we still enable the timer if it's already existing? |
||
| } | ||
| } | ||
|
|
||
| void ClusterManagerImpl::applyUpdates(const Cluster& cluster, uint32_t priority, | ||
| PendingUpdatesPtr updates) { | ||
| // Merge pending updates & deliver. | ||
| const HostVector& hosts_added{updates->added.begin(), updates->added.end()}; | ||
| const HostVector& hosts_removed{updates->removed.begin(), updates->removed.end()}; | ||
|
|
||
| postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); | ||
|
|
||
| cm_stats_.coalesced_updates_.inc(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also count non-coalesced updates? Or can we just infer this from total? |
||
|
|
||
| // Reset everything. | ||
| updates->timer = nullptr; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, you nuke it here. I'd suggest keeping the timer alive and just doing disable/enable, it probably doesn't matter, but one less wasted allocation/free.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be nice to avoid the allocation/free pair, but then we'd have to check if the timer is enabled or not by looking at the last update timestamp... at which point things become a bit more involved. Preferences? On a related note, the logic to apply an update immediately if the last update happened within a merge window should probably also check if a timer exists (e.g.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we can add a bool to track the timer's state |
||
| updates->added.clear(); | ||
| updates->removed.clear(); | ||
| } | ||
|
|
||
| bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster, | ||
| const std::string& version_info) { | ||
| // First we need to see if this new config is new or an update to an existing dynamic cluster. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -138,6 +138,7 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> { | |
| COUNTER(cluster_added) \ | ||
| COUNTER(cluster_modified) \ | ||
| COUNTER(cluster_removed) \ | ||
| COUNTER(coalesced_updates) \ | ||
| GAUGE (active_clusters) \ | ||
| GAUGE (warming_clusters) | ||
| // clang-format on | ||
|
|
@@ -361,6 +362,20 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u | |
| // This map is ordered so that config dumping is consistent. | ||
| typedef std::map<std::string, ClusterDataPtr> ClusterMap; | ||
|
|
||
| struct PendingUpdates { | ||
| PendingUpdates() {} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not needed? |
||
| Event::TimerPtr timer; | ||
| std::unordered_set<HostSharedPtr> added; | ||
| std::unordered_set<HostSharedPtr> removed; | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, these should probably be
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe not that important and not worth the cost...
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably easier to debug I think if there is some determinism here. There is no major performance advantage of unordered here most likely.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to keep it ordered we'd have to keep two sets (added/removed) to check uniqueness (e.g.: that a host isn't added/removed twice) and we'd also have to keep two vectors (added/removed) to preserve the order..... or am I missing an ordered set data structure hidding somewhere? :-) |
||
| }; | ||
| typedef std::shared_ptr<PendingUpdates> PendingUpdatesPtr; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: prefer |
||
| typedef std::unordered_map<uint32_t, PendingUpdatesPtr> PendingUpdatesByPriorityMap; | ||
| typedef std::shared_ptr<PendingUpdatesByPriorityMap> PendingUpdatesByPriorityMapPtr; | ||
| typedef std::unordered_map<std::string, PendingUpdatesByPriorityMapPtr> ClusterUpdatesMap; | ||
|
|
||
| void applyUpdates(const Cluster& cluster, uint32_t priority, PendingUpdatesPtr updates); | ||
| void scheduleUpdate(const Cluster& cluster, uint32_t priority, const HostVector& hosts_added, | ||
| const HostVector& hosts_removed); | ||
| void createOrUpdateThreadLocalCluster(ClusterData& cluster); | ||
| ProtobufTypes::MessagePtr dumpClusterConfigs(); | ||
| static ClusterManagerStats generateStats(Stats::Scope& scope); | ||
|
|
@@ -394,6 +409,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u | |
| Grpc::AsyncClientManagerPtr async_client_manager_; | ||
| Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_; | ||
| SystemTimeSource& system_time_source_; | ||
| ClusterUpdatesMap updates_map_; | ||
| Event::Dispatcher& dispatcher_; | ||
| }; | ||
|
|
||
| } // namespace Upstream | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1620,6 +1620,63 @@ TEST_F(ClusterManagerImplTest, OriginalDstInitialization) { | |
| factory_.tls_.shutdownThread(); | ||
| } | ||
|
|
||
| TEST_F(ClusterManagerImplTest, CoalescedUpdates) { | ||
| const std::string yaml = R"EOF( | ||
| static_resources: | ||
| clusters: | ||
| - name: cluster_1 | ||
| connect_timeout: 0.250s | ||
| type: STATIC | ||
| lb_policy: ROUND_ROBIN | ||
| hosts: | ||
| - socket_address: | ||
| address: "127.0.0.1" | ||
| port_value: 11001 | ||
| - socket_address: | ||
| address: "127.0.0.1" | ||
| port_value: 11002 | ||
| common_lb_config: | ||
| time_between_updates: 3s | ||
| )EOF"; | ||
|
|
||
| create(parseBootstrapFromV2Yaml(yaml)); | ||
| EXPECT_FALSE(cluster_manager_->get("cluster_1")->info()->addedViaApi()); | ||
|
|
||
| // Save the updates timer. | ||
| Event::MockTimer* timer = new NiceMock<Event::MockTimer>(&factory_.dispatcher_); | ||
|
|
||
| // Remove each host, sequentially. | ||
| const Cluster& cluster = cluster_manager_->clusters().begin()->second; | ||
|
|
||
| HostVectorSharedPtr hosts( | ||
| new HostVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts())); | ||
| HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared<HostsPerLocalityImpl>(); | ||
| HostVector hosts_added{}; | ||
| HostVector hosts_removed_0{(*hosts)[0]}; | ||
| HostVector hosts_removed_1{(*hosts)[1]}; | ||
| cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( | ||
| hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed_0); | ||
| cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( | ||
| hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed_1); | ||
|
|
||
| // Ensure the coalesced updates were applied. | ||
| timer->callback_(); | ||
| EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.coalesced_updates").value()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it worth validating that the hosts were actually removed?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you mean from the intermediate sets? or that added/removed were > 0 (i think they can be 0)?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, now i know what you meant
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, might be a bit messy |
||
|
|
||
| // Prepare a new timer. | ||
| timer = new NiceMock<Event::MockTimer>(&factory_.dispatcher_); | ||
|
|
||
| // Add them back. | ||
| cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( | ||
| hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_removed_0, hosts_added); | ||
| cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( | ||
| hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_removed_1, hosts_added); | ||
|
|
||
| // Ensure the coalesced updates were applied again. | ||
| timer->callback_(); | ||
| EXPECT_EQ(2, factory_.stats_.counter("cluster_manager.coalesced_updates").value()); | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think a test that has add, remove, add for the same host would be interesting. |
||
|
|
||
| class ClusterManagerInitHelperTest : public testing::Test { | ||
| public: | ||
| MOCK_METHOD1(onClusterInit, void(Cluster& cluster)); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... "by default this is not configured and updates apply immediately.".