diff --git a/api/envoy/api/v2/cds.proto b/api/envoy/api/v2/cds.proto index f649dafe0fb63..ff936754aad04 100644 --- a/api/envoy/api/v2/cds.proto +++ b/api/envoy/api/v2/cds.proto @@ -432,6 +432,17 @@ message Cluster { ZoneAwareLbConfig zone_aware_lb_config = 2; LocalityWeightedLbConfig locality_weighted_lb_config = 3; } + // If set, all health check/weight/metadata updates that happen within this duration will be + // merged and delivered in one shot when the duration expires. The start of the duration is when + // the first update happens. This is useful for big clusters, with potentially noisy deploys + // that might trigger excessive CPU usage due to a constant stream of healthcheck state changes + // or metadata updates. By default, this is not configured and updates apply immediately. Also, + // the first set of updates to be seen apply immediately as well (e.g.: a new cluster). + // + // Note: merging does not apply to cluster membership changes (e.g.: adds/removes); this is + // because merging those updates isn't currently safe. See + // https://github.com/envoyproxy/envoy/pull/3941. + google.protobuf.Duration update_merge_window = 4; } // Common configuration for all load balancer implementations. diff --git a/docs/root/configuration/cluster_manager/cluster_stats.rst b/docs/root/configuration/cluster_manager/cluster_stats.rst index 83fe8a10b0340..bfeb164c795a5 100644 --- a/docs/root/configuration/cluster_manager/cluster_stats.rst +++ b/docs/root/configuration/cluster_manager/cluster_stats.rst @@ -19,6 +19,10 @@ statistics. Any ``:`` character in the stats name is replaced with ``_``. cluster_added, Counter, Total clusters added (either via static config or CDS) cluster_modified, Counter, Total clusters modified (via CDS) cluster_removed, Counter, Total clusters removed (via CDS) + cluster_updated, Counter, Total cluster updates + cluster_updated_via_merge, Counter, Total cluster updates applied as merged updates + update_merge_cancelled, Counter, Total merged updates that got cancelled and delivered early + update_out_of_merge_window, Counter, Total updates which arrived out of a merge window active_clusters, Gauge, Number of currently active (warmed) clusters warming_clusters, Gauge, Number of currently warming (not active) clusters diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 8d1c1e6fba4ac..a5fd30aa08224 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -5,10 +5,13 @@ Version history =============== * access log: added :ref:`response flag filter ` to filter based on the presence of Envoy response flags. +* access log: added RESPONSE_DURATION and RESPONSE_TX_DURATION. * admin: added :http:get:`/hystrix_event_stream` as an endpoint for monitoring envoy's statistics through `Hystrix dashboard `_. * grpc-json: added support for building HTTP response from `google.api.HttpBody `_. +* cluster: added :ref:`option ` to merge + health check/weight/metadata updates within the given duration. * config: v1 disabled by default. v1 support remains available until October via flipping --v2-config-only=false. * config: v1 disabled by default. v1 support remains available until October via setting :option:`--allow-deprecated-v1-api`. * health check: added support for :ref:`custom health check `. @@ -47,7 +50,6 @@ Version history `. * upstream: added configuration option to the subset load balancer to take locality weights into account when selecting a host from a subset. -* access log: added RESPONSE_DURATION and RESPONSE_TX_DURATION. 1.7.0 =============== diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index dd1f042822968..0b53d5119bf7e 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -180,7 +180,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }), config_tracker_entry_( admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })), - system_time_source_(system_time_source) { + system_time_source_(system_time_source), dispatcher_(main_thread_dispatcher) { async_client_manager_ = std::make_unique(*this, tls); const auto& cm_config = bootstrap.cluster_manager(); if (cm_config.has_outlier_detection()) { @@ -330,7 +330,35 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { const HostVector& hosts_removed) { // This fires when a cluster is about to have an updated member set. We need to send this // out to all of the thread local configurations. - postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); + + // Should we save this update and merge it with other updates? + // + // Note that we can only _safely_ merge updates that have no added/removed hosts. That is, + // only those updates that signal a change in host healthcheck state, weight or metadata. + // + // We've discussed merging updates related to hosts being added/removed, but it's really + // tricky to merge those given that downstream consumers of these updates expect to see the + // full list of updates, not a condensed one. This is because they use the broadcasted + // HostSharedPtrs within internal maps to track hosts. If we fail to broadcast the entire list + // of removals, these maps will leak those HostSharedPtrs. + // + // See https://github.com/envoyproxy/envoy/pull/3941 for more context. + bool scheduled = false; + const bool merging_enabled = cluster.info()->lbConfig().has_update_merge_window(); + // Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes. + const bool is_mergeable = !hosts_added.size() && !hosts_removed.size(); + + if (merging_enabled) { + // If this is not mergeable, we should cancel any scheduled updates since + // we'll deliver it immediately. + scheduled = scheduleUpdate(cluster, priority, is_mergeable); + } + + // If an update was not scheduled for later, deliver it immediately. + if (!scheduled) { + cm_stats_.cluster_updated_.inc(); + postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); + } }); // Finally, if the cluster has any hosts, post updates cross-thread so the per-thread load @@ -343,6 +371,83 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { } } +bool ClusterManagerImpl::scheduleUpdate(const Cluster& cluster, uint32_t priority, bool mergeable) { + const auto& update_merge_window = cluster.info()->lbConfig().update_merge_window(); + const auto timeout = DurationUtil::durationToMilliseconds(update_merge_window); + + // Find pending updates for this cluster. + auto& updates_by_prio = updates_map_[cluster.info()->name()]; + if (!updates_by_prio) { + updates_by_prio.reset(new PendingUpdatesByPriorityMap()); + } + + // Find pending updates for this priority. + auto& updates = (*updates_by_prio)[priority]; + if (!updates) { + updates.reset(new PendingUpdates()); + } + + // Has an update_merge_window gone by since the last update? If so, don't schedule + // the update so it can be applied immediately. Ditto if this is not a mergeable update. + const auto delta = std::chrono::steady_clock::now() - updates->last_updated_; + const uint64_t delta_ms = std::chrono::duration_cast(delta).count(); + const bool out_of_merge_window = delta_ms > timeout; + if (out_of_merge_window || !mergeable) { + // If there was a pending update, we cancel the pending merged update. + // + // Note: it's possible that even though we are outside of a merge window (delta_ms > timeout), + // a timer is enabled. This race condition is fine, since we'll disable the timer here and + // deliver the update immediately. + + // Why wasn't the update scheduled for later delivery? We keep some stats that are helpful + // to understand why merging did not happen. There's 2 things we are tracking here: + + // 1) Was this update out of a merge window? + if (mergeable && out_of_merge_window) { + cm_stats_.update_out_of_merge_window_.inc(); + } + + // 2) Were there previous updates that we are cancelling (and delivering immediately)? + if (updates->disableTimer()) { + cm_stats_.update_merge_cancelled_.inc(); + } + + updates->last_updated_ = std::chrono::steady_clock::now(); + return false; + } + + // If there's no timer, create one. + if (updates->timer_ == nullptr) { + updates->timer_ = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void { + applyUpdates(cluster, priority, *updates); + }); + } + + // Ensure there's a timer set to deliver these updates. + if (!updates->timer_enabled_) { + updates->enableTimer(timeout); + } + + return true; +} + +void ClusterManagerImpl::applyUpdates(const Cluster& cluster, uint32_t priority, + PendingUpdates& updates) { + // Deliver pending updates. + + // Remember that these merged updates are _only_ for updates related to + // HC/weight/metadata changes. That's why added/removed are empty. All + // adds/removals were already immediately broadcasted. + static const HostVector hosts_added; + static const HostVector hosts_removed; + + postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); + + cm_stats_.cluster_updated_via_merge_.inc(); + updates.timer_enabled_ = false; + updates.last_updated_ = std::chrono::steady_clock::now(); +} + bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster, const std::string& version_info) { // First we need to see if this new config is new or an update to an existing dynamic cluster. @@ -468,6 +573,9 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { if (removed) { cm_stats_.cluster_removed_.inc(); updateGauges(); + // Did we ever deliver merged updates for this cluster? + // No need to manually disable timers, this should take care of it. + updates_map_.erase(cluster_name); } return removed; diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index f65e27ccb2825..9acf6927c3f0a 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -138,6 +138,10 @@ class ClusterManagerInitHelper : Logger::Loggable { COUNTER(cluster_added) \ COUNTER(cluster_modified) \ COUNTER(cluster_removed) \ + COUNTER(cluster_updated) \ + COUNTER(cluster_updated_via_merge) \ + COUNTER(update_merge_cancelled) \ + COUNTER(update_out_of_merge_window) \ GAUGE (active_clusters) \ GAUGE (warming_clusters) // clang-format on @@ -209,6 +213,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable ClusterMap; + struct PendingUpdates { + void enableTimer(const uint64_t timeout) { + ASSERT(!timer_enabled_); + if (timer_ != nullptr) { + timer_->enableTimer(std::chrono::milliseconds(timeout)); + timer_enabled_ = true; + } + } + bool disableTimer() { + const bool was_enabled = timer_enabled_; + if (timer_ != nullptr) { + timer_->disableTimer(); + timer_enabled_ = false; + } + return was_enabled; + } + + Event::TimerPtr timer_; + // TODO(rgs1): this should be part of Event::Timer's interface. + bool timer_enabled_{}; + // This is default constructed to the clock's epoch: + // https://en.cppreference.com/w/cpp/chrono/time_point/time_point + // + // This will usually be the computer's boot time, which means that given a not very large + // `Cluster.CommonLbConfig.update_merge_window`, the first update will trigger immediately + // (the expected behavior). + MonotonicTime last_updated_; + }; + using PendingUpdatesPtr = std::unique_ptr; + using PendingUpdatesByPriorityMap = std::unordered_map; + using PendingUpdatesByPriorityMapPtr = std::unique_ptr; + using ClusterUpdatesMap = std::unordered_map; + + void applyUpdates(const Cluster& cluster, uint32_t priority, PendingUpdates& updates); + bool scheduleUpdate(const Cluster& cluster, uint32_t priority, bool mergeable); void createOrUpdateThreadLocalCluster(ClusterData& cluster); ProtobufTypes::MessagePtr dumpClusterConfigs(); static ClusterManagerStats generateStats(Stats::Scope& scope); void loadCluster(const envoy::api::v2::Cluster& cluster, const std::string& version_info, bool added_via_api, ClusterMap& cluster_map); void onClusterInit(Cluster& cluster); - void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority, - const HostVector& hosts_added, const HostVector& hosts_removed); void postThreadLocalHealthFailure(const HostSharedPtr& host); void updateGauges(); @@ -394,6 +436,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable factory_; std::unique_ptr cluster_manager_; AccessLog::MockAccessLogManager log_manager_; NiceMock admin_; NiceMock system_time_source_; NiceMock monotonic_time_source_; + MockLocalClusterUpdate local_cluster_update_; }; envoy::config::bootstrap::v2::Bootstrap parseBootstrapFromJson(const std::string& json_string) { @@ -171,12 +257,6 @@ envoy::config::bootstrap::v2::Bootstrap parseBootstrapFromJson(const std::string return bootstrap; } -envoy::config::bootstrap::v2::Bootstrap parseBootstrapFromV2Yaml(const std::string& yaml) { - envoy::config::bootstrap::v2::Bootstrap bootstrap; - MessageUtil::loadFromYaml(yaml, bootstrap); - return bootstrap; -} - TEST_F(ClusterManagerImplTest, MultipleProtocolClusterFail) { const std::string yaml = R"EOF( static_resources: @@ -1620,6 +1700,180 @@ TEST_F(ClusterManagerImplTest, OriginalDstInitialization) { factory_.tls_.shutdownThread(); } +// Tests that all the HC/weight/metadata changes are delivered in one go, as long as +// there's no hosts changes in between. +// Also tests that if hosts are added/removed between mergeable updates, delivery will +// happen and the scheduled update will be canceled. +TEST_F(ClusterManagerImplTest, MergedUpdates) { + createWithLocalClusterUpdate(); + + // Ensure we see the right set of added/removed hosts on every call. + EXPECT_CALL(local_cluster_update_, post(_, _, _)) + .WillOnce(Invoke([](uint32_t priority, const HostVector& hosts_added, + const HostVector& hosts_removed) -> void { + // 1st removal. + EXPECT_EQ(0, priority); + EXPECT_EQ(0, hosts_added.size()); + EXPECT_EQ(1, hosts_removed.size()); + })) + .WillOnce(Invoke([](uint32_t priority, const HostVector& hosts_added, + const HostVector& hosts_removed) -> void { + // Triggerd by the 2 HC updates, it's a merged update so no added/removed + // hosts. + EXPECT_EQ(0, priority); + EXPECT_EQ(0, hosts_added.size()); + EXPECT_EQ(0, hosts_removed.size()); + })) + .WillOnce(Invoke([](uint32_t priority, const HostVector& hosts_added, + const HostVector& hosts_removed) -> void { + // 1st removed host added back. + EXPECT_EQ(0, priority); + EXPECT_EQ(1, hosts_added.size()); + EXPECT_EQ(0, hosts_removed.size()); + })) + .WillOnce(Invoke([](uint32_t priority, const HostVector& hosts_added, + const HostVector& hosts_removed) -> void { + // 1st removed host removed again, plus the 3 HC/weight/metadata updates that were + // waiting for delivery. + EXPECT_EQ(0, priority); + EXPECT_EQ(0, hosts_added.size()); + EXPECT_EQ(1, hosts_removed.size()); + })); + + Event::MockTimer* timer = new NiceMock(&factory_.dispatcher_); + const Cluster& cluster = cluster_manager_->clusters().begin()->second; + HostVectorSharedPtr hosts( + new HostVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts())); + HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); + HostVector hosts_added; + HostVector hosts_removed; + + // The first update should be applied immediately, since it's not mergeable. + hosts_removed.push_back((*hosts)[0]); + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); + + // This calls should be merged, since there are not added/removed hosts. + hosts_removed.clear(); + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); + + // Ensure the merged updates were applied. + timer->callback_(); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); + + // Add the host back, the update should be immediately applied. + hosts_removed.clear(); + hosts_added.push_back((*hosts)[0]); + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + EXPECT_EQ(2, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); + + // Now emit 3 updates that should be scheduled: metadata, HC, and weight. + hosts_added.clear(); + + (*hosts)[0]->metadata(buildMetadata("v1")); + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + + (*hosts)[0]->healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH); + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + + (*hosts)[0]->weight(100); + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + + // Updates not delivered yet. + EXPECT_EQ(2, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); + + // Remove the host again, should cancel the scheduled update and be delivered immediately. + hosts_removed.push_back((*hosts)[0]); + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + + EXPECT_EQ(3, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); +} + +// Tests that mergeable updates outside of a window get applied immediately. +TEST_F(ClusterManagerImplTest, MergedUpdatesOutOfWindow) { + createWithLocalClusterUpdate(); + + // Ensure we see the right set of added/removed hosts on every call. + EXPECT_CALL(local_cluster_update_, post(_, _, _)) + .WillOnce(Invoke([](uint32_t priority, const HostVector& hosts_added, + const HostVector& hosts_removed) -> void { + // HC update, immediately delivered. + EXPECT_EQ(0, priority); + EXPECT_EQ(0, hosts_added.size()); + EXPECT_EQ(0, hosts_removed.size()); + })); + + const Cluster& cluster = cluster_manager_->clusters().begin()->second; + HostVectorSharedPtr hosts( + new HostVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts())); + HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); + HostVector hosts_added; + HostVector hosts_removed; + + // The first update should be applied immediately, because even though it's mergeable + // it's outside a merge window. + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.update_out_of_merge_window").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); +} + +// Tests that mergeable updates outside of a window get applied immediately when +// merging is disabled, and that the counters are correct. +TEST_F(ClusterManagerImplTest, MergedUpdatesOutOfWindowDisabled) { + createWithLocalClusterUpdate(false); + + // Ensure we see the right set of added/removed hosts on every call. + EXPECT_CALL(local_cluster_update_, post(_, _, _)) + .WillOnce(Invoke([](uint32_t priority, const HostVector& hosts_added, + const HostVector& hosts_removed) -> void { + // HC update, immediately delivered. + EXPECT_EQ(0, priority); + EXPECT_EQ(0, hosts_added.size()); + EXPECT_EQ(0, hosts_removed.size()); + })); + + const Cluster& cluster = cluster_manager_->clusters().begin()->second; + HostVectorSharedPtr hosts( + new HostVector(cluster.prioritySet().hostSetsPerPriority()[0]->hosts())); + HostsPerLocalitySharedPtr hosts_per_locality = std::make_shared(); + HostVector hosts_added; + HostVector hosts_removed; + + // The first update should be applied immediately, because even though it's mergeable + // and outside a merge window, merging is disabled. + cluster.prioritySet().hostSetsPerPriority()[0]->updateHosts( + hosts, hosts, hosts_per_locality, hosts_per_locality, {}, hosts_added, hosts_removed); + EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_out_of_merge_window").value()); + EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value()); +} + class ClusterManagerInitHelperTest : public testing::Test { public: MOCK_METHOD1(onClusterInit, void(Cluster& cluster));