Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
431a634
Add option for coalescing cluster updates
Jul 23, 2018
8e20204
Address review
Jul 24, 2018
c6dbc7d
Address more review comments
Jul 24, 2018
9e3288e
Fix docs
Jul 25, 2018
add79b2
Preserve order when merging updates
Jul 25, 2018
29f1d3d
Address @htuch's review
Jul 25, 2018
0dd31c4
Skip immediate apply if there's an active timer
Jul 25, 2018
0bd58e6
Fix a few more issues pointed out by @htuch
Jul 25, 2018
67adb12
Improve existing test
Jul 25, 2018
ec643f1
Move EXPECT_CALL() to the top for clarity
Jul 25, 2018
d05ef99
More tests
Jul 25, 2018
95e5260
Cleanup tests
Jul 26, 2018
0f70fb3
Improve naming
Jul 26, 2018
b392c7b
Erase cluster from updates_map_ from removeCluster()
Jul 26, 2018
a399949
Drop merging adds/removals
Jul 27, 2018
f88c297
Fix memory leak
Jul 28, 2018
a1145dd
Make 2 local vars const.
Jul 30, 2018
b33b080
More constness, remove newline & spelling.
Jul 30, 2018
d70809c
Simplify use of shared_ptrs/refs/weak ptrs
Jul 30, 2018
10d3670
More review comments
Jul 30, 2018
80e03db
version history: sort & reflow cols
Jul 30, 2018
dcd7174
Collapse the number of map lookups
Jul 30, 2018
b3745de
Unconditionally remove cluster from updates map
Jul 30, 2018
6fac84f
Reorder members of PendingUpdates
Jul 30, 2018
9eae09a
Add TODO for extending Event::Timer's interface
Jul 30, 2018
0d8475f
Make const empty vectors static
Jul 30, 2018
4d0c528
Assert we are not enabling an already enabled timer
Jul 30, 2018
09d56cc
More counters
Jul 30, 2018
3d78234
Explain a possible race condition
Jul 30, 2018
6408528
Simplify branching
Jul 30, 2018
31e4fdf
Increment counter if merging is enabled
Jul 30, 2018
a5a5391
Counter renames + more tests
Jul 30, 2018
2a264d1
Remove trailing newline
Jul 30, 2018
25faa2b
Fix format
Jul 30, 2018
5affa4c
Fix spelling
Jul 31, 2018
d3464d4
Rename offwindow → out_of_merge_window
Jul 31, 2018
bbbfef8
Add comment to explain why we keeping some stats
Jul 31, 2018
d81fb7f
Document new cluster stats
Jul 31, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,17 @@ message Cluster {
ZoneAwareLbConfig zone_aware_lb_config = 2;
LocalityWeightedLbConfig locality_weighted_lb_config = 3;
}
// If set, all health check/weight/metadata updates that happen within this duration will be
// merged and delivered in one shot when the duration expires. The start of the duration is when
// the first update happens. This is useful for big clusters, with potentially noisy deploys
// that might trigger excessive CPU usage due to a constant stream of healthcheck state changes
// or metadata updates. By default, this is not configured and updates apply immediately. Also,
// the first set of updates to be seen apply immediately as well (e.g.: a new cluster).
//
// Note: merging does not apply to cluster membership changes (e.g.: adds/removes); this is
// because merging those updates isn't currently safe. See
// https://github.com/envoyproxy/envoy/pull/3941.
google.protobuf.Duration update_merge_window = 4;
}

// Common configuration for all load balancer implementations.
Expand Down
4 changes: 4 additions & 0 deletions docs/root/configuration/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ statistics. Any ``:`` character in the stats name is replaced with ``_``.
cluster_added, Counter, Total clusters added (either via static config or CDS)
cluster_modified, Counter, Total clusters modified (via CDS)
cluster_removed, Counter, Total clusters removed (via CDS)
cluster_updated, Counter, Total cluster updates
cluster_updated_via_merge, Counter, Total cluster updates applied as merged updates
update_merge_cancelled, Counter, Total merged updates that got cancelled and delivered early
update_out_of_merge_window, Counter, Total updates which arrived out of a merge window
active_clusters, Gauge, Number of currently active (warmed) clusters
warming_clusters, Gauge, Number of currently warming (not active) clusters

Expand Down
4 changes: 3 additions & 1 deletion docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ Version history
===============
* access log: added :ref:`response flag filter <envoy_api_msg_config.filter.accesslog.v2.ResponseFlagFilter>`
to filter based on the presence of Envoy response flags.
* access log: added RESPONSE_DURATION and RESPONSE_TX_DURATION.
* admin: added :http:get:`/hystrix_event_stream` as an endpoint for monitoring envoy's statistics
through `Hystrix dashboard <https://github.com/Netflix-Skunkworks/hystrix-dashboard/wiki>`_.
* grpc-json: added support for building HTTP response from
`google.api.HttpBody <https://github.com/googleapis/googleapis/blob/master/google/api/httpbody.proto>`_.
* cluster: added :ref:`option <envoy_api_field_Cluster.CommonLbConfig.update_merge_window>` to merge
health check/weight/metadata updates within the given duration.
* config: v1 disabled by default. v1 support remains available until October via flipping --v2-config-only=false.
* config: v1 disabled by default. v1 support remains available until October via setting :option:`--allow-deprecated-v1-api`.
* health check: added support for :ref:`custom health check <envoy_api_field_core.HealthCheck.custom_health_check>`.
Expand Down Expand Up @@ -47,7 +50,6 @@ Version history
<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.tracing>`.
* upstream: added configuration option to the subset load balancer to take locality weights into account when
selecting a host from a subset.
* access log: added RESPONSE_DURATION and RESPONSE_TX_DURATION.

1.7.0
===============
Expand Down
112 changes: 110 additions & 2 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }),
config_tracker_entry_(
admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })),
system_time_source_(system_time_source) {
system_time_source_(system_time_source), dispatcher_(main_thread_dispatcher) {
async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(*this, tls);
const auto& cm_config = bootstrap.cluster_manager();
if (cm_config.has_outlier_detection()) {
Expand Down Expand Up @@ -330,7 +330,35 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
const HostVector& hosts_removed) {
// This fires when a cluster is about to have an updated member set. We need to send this
// out to all of the thread local configurations.
postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed);

// Should we save this update and merge it with other updates?
//
// Note that we can only _safely_ merge updates that have no added/removed hosts. That is,
// only those updates that signal a change in host healthcheck state, weight or metadata.
//
// We've discussed merging updates related to hosts being added/removed, but it's really
// tricky to merge those given that downstream consumers of these updates expect to see the
// full list of updates, not a condensed one. This is because they use the broadcasted
// HostSharedPtrs within internal maps to track hosts. If we fail to broadcast the entire list
// of removals, these maps will leak those HostSharedPtrs.
//
// See https://github.com/envoyproxy/envoy/pull/3941 for more context.
bool scheduled = false;
const bool merging_enabled = cluster.info()->lbConfig().has_update_merge_window();
// Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes.
const bool is_mergeable = !hosts_added.size() && !hosts_removed.size();

if (merging_enabled) {
// If this is not mergeable, we should cancel any scheduled updates since
// we'll deliver it immediately.
scheduled = scheduleUpdate(cluster, priority, is_mergeable);
}

// If an update was not scheduled for later, deliver it immediately.
if (!scheduled) {
cm_stats_.cluster_updated_.inc();
postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed);
}
});

// Finally, if the cluster has any hosts, post updates cross-thread so the per-thread load
Expand All @@ -343,6 +371,83 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
}
}

bool ClusterManagerImpl::scheduleUpdate(const Cluster& cluster, uint32_t priority, bool mergeable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: if, for a given cluster/priority, we have something like "add, remove, inplace update, add, remove, inplace update", should we be canceling the outstanding deferred update at the second "add"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, because the second "add" is a non-mergeable update so it will trigger immediate delivery, thus cancelling the pending merged update by disabling the timer.

This happens here in scheduleUpdate():

  // Has an update_merge_window gone by since the last update? If so, don't schedule                                                                                              
  // the update so it can be applied immediately. Ditto if this is not a mergeable update.                                                                                        
  const auto delta = std::chrono::steady_clock::now() - updates->last_updated_;                                                                                                   
  const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();                                                                                 
  if (delta_ms > timeout || !mergeable) {                                                                                                                                         
    // If there was a pending update, we count this update as merged update.                                                                                                      
    if (updates->disableTimer()) {                                                                                                                                                
      cm_stats_.merged_updates_.inc();                                                                                                                                            
    }                                                                                                                                                                             
                                                                                                                                                                                  
    updates->last_updated_ = std::chrono::steady_clock::now();                                                                                                                    
    return false;                                                                                                                                                                 
  } 

And we exercise a similar variation of this at the end at the unit test (inplace/inplace/inplace/remove→ timer disabled and updates delivered immediately).

const auto& update_merge_window = cluster.info()->lbConfig().update_merge_window();
const auto timeout = DurationUtil::durationToMilliseconds(update_merge_window);

// Find pending updates for this cluster.
auto& updates_by_prio = updates_map_[cluster.info()->name()];
if (!updates_by_prio) {
updates_by_prio.reset(new PendingUpdatesByPriorityMap());
}

// Find pending updates for this priority.
auto& updates = (*updates_by_prio)[priority];
if (!updates) {
updates.reset(new PendingUpdates());
}

// Has an update_merge_window gone by since the last update? If so, don't schedule
// the update so it can be applied immediately. Ditto if this is not a mergeable update.
const auto delta = std::chrono::steady_clock::now() - updates->last_updated_;
const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
const bool out_of_merge_window = delta_ms > timeout;
if (out_of_merge_window || !mergeable) {
// If there was a pending update, we cancel the pending merged update.
//
// Note: it's possible that even though we are outside of a merge window (delta_ms > timeout),
// a timer is enabled. This race condition is fine, since we'll disable the timer here and
// deliver the update immediately.

// Why wasn't the update scheduled for later delivery? We keep some stats that are helpful
// to understand why merging did not happen. There's 2 things we are tracking here:

// 1) Was this update out of a merge window?
if (mergeable && out_of_merge_window) {
cm_stats_.update_out_of_merge_window_.inc();
}

// 2) Were there previous updates that we are cancelling (and delivering immediately)?
if (updates->disableTimer()) {
cm_stats_.update_merge_cancelled_.inc();
}

updates->last_updated_ = std::chrono::steady_clock::now();
return false;
}

// If there's no timer, create one.
if (updates->timer_ == nullptr) {
updates->timer_ = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void {
applyUpdates(cluster, priority, *updates);
});
}

// Ensure there's a timer set to deliver these updates.
if (!updates->timer_enabled_) {
updates->enableTimer(timeout);
}

return true;
}

void ClusterManagerImpl::applyUpdates(const Cluster& cluster, uint32_t priority,
PendingUpdates& updates) {
// Deliver pending updates.

// Remember that these merged updates are _only_ for updates related to
// HC/weight/metadata changes. That's why added/removed are empty. All
// adds/removals were already immediately broadcasted.
static const HostVector hosts_added;
static const HostVector hosts_removed;

postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed);

cm_stats_.cluster_updated_via_merge_.inc();
updates.timer_enabled_ = false;
updates.last_updated_ = std::chrono::steady_clock::now();
}

bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info) {
// First we need to see if this new config is new or an update to an existing dynamic cluster.
Expand Down Expand Up @@ -468,6 +573,9 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
if (removed) {
cm_stats_.cluster_removed_.inc();
updateGauges();
// Did we ever deliver merged updates for this cluster?
// No need to manually disable timers, this should take care of it.
updates_map_.erase(cluster_name);
}

return removed;
Expand Down
48 changes: 46 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
COUNTER(cluster_added) \
COUNTER(cluster_modified) \
COUNTER(cluster_removed) \
COUNTER(cluster_updated) \
COUNTER(cluster_updated_via_merge) \
COUNTER(update_merge_cancelled) \
COUNTER(update_out_of_merge_window) \
GAUGE (active_clusters) \
GAUGE (warming_clusters)
// clang-format on
Expand Down Expand Up @@ -209,6 +213,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

ClusterManagerFactory& clusterManagerFactory() override { return factory_; }

protected:
virtual void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority,
const HostVector& hosts_added,
const HostVector& hosts_removed);

private:
/**
* Thread local cached cluster data. Each thread local cluster gets updates from the parent
Expand Down Expand Up @@ -361,14 +370,47 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
// This map is ordered so that config dumping is consistent.
typedef std::map<std::string, ClusterDataPtr> ClusterMap;

struct PendingUpdates {
void enableTimer(const uint64_t timeout) {
ASSERT(!timer_enabled_);
if (timer_ != nullptr) {
timer_->enableTimer(std::chrono::milliseconds(timeout));
timer_enabled_ = true;
}
}
bool disableTimer() {
const bool was_enabled = timer_enabled_;
if (timer_ != nullptr) {
timer_->disableTimer();
timer_enabled_ = false;
}
return was_enabled;
}

Event::TimerPtr timer_;
// TODO(rgs1): this should be part of Event::Timer's interface.
bool timer_enabled_{};
// This is default constructed to the clock's epoch:
// https://en.cppreference.com/w/cpp/chrono/time_point/time_point
//
// This will usually be the computer's boot time, which means that given a not very large
// `Cluster.CommonLbConfig.update_merge_window`, the first update will trigger immediately
// (the expected behavior).
MonotonicTime last_updated_;
};
using PendingUpdatesPtr = std::unique_ptr<PendingUpdates>;
using PendingUpdatesByPriorityMap = std::unordered_map<uint32_t, PendingUpdatesPtr>;
using PendingUpdatesByPriorityMapPtr = std::unique_ptr<PendingUpdatesByPriorityMap>;
using ClusterUpdatesMap = std::unordered_map<std::string, PendingUpdatesByPriorityMapPtr>;

void applyUpdates(const Cluster& cluster, uint32_t priority, PendingUpdates& updates);
bool scheduleUpdate(const Cluster& cluster, uint32_t priority, bool mergeable);
void createOrUpdateThreadLocalCluster(ClusterData& cluster);
ProtobufTypes::MessagePtr dumpClusterConfigs();
static ClusterManagerStats generateStats(Stats::Scope& scope);
void loadCluster(const envoy::api::v2::Cluster& cluster, const std::string& version_info,
bool added_via_api, ClusterMap& cluster_map);
void onClusterInit(Cluster& cluster);
void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority,
const HostVector& hosts_added, const HostVector& hosts_removed);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateGauges();

Expand All @@ -394,6 +436,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
Grpc::AsyncClientManagerPtr async_client_manager_;
Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_;
SystemTimeSource& system_time_source_;
ClusterUpdatesMap updates_map_;
Event::Dispatcher& dispatcher_;
};

} // namespace Upstream
Expand Down
Loading