Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/envoy/config/endpoint/v3/endpoint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ message ClusterLoadAssignment {
option (udpa.annotations.versioning).previous_message_type =
"envoy.api.v2.ClusterLoadAssignment.Policy";

// [#not-implemented-hide:]
message DropOverload {
option (udpa.annotations.versioning).previous_message_type =
"envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload";
Expand Down Expand Up @@ -75,7 +74,9 @@ message ClusterLoadAssignment {
// "throttle"_drop = 60%
// "lb"_drop = 20% // 50% of the remaining 'actual' load, which is 40%.
// actual_outgoing_load = 20% // remaining after applying all categories.
// [#not-implemented-hide:]
//
// Envoy supports only one element and will NACK if more than one element is present.
// Other xDS-capable data planes will not necessarily have this limitation.
repeated DropOverload drop_overloads = 2;

// Priority levels and localities are considered overprovisioned with this
Expand Down
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ new_features:
instead of using :ref:`choice_count
<envoy_v3_api_msg_extensions.load_balancing_policies.least_request.v3.LeastRequest>`
to select the hosts.
- area: upstream
change: |
Implmented API :ref:`drop_overloads<envoy_v3_api_field_config.endpoint.v3.ClusterLoadAssignment.Policy.drop_overloads>`
which can be used to drop certain percentage of traffic from Envoy.
- area: stats
change: |
added :ref:`per_endpoint_stats <envoy_v3_api_field_config.cluster.v3.TrackClusterStats.per_endpoint_stats>` to get some metrics
Expand Down
1 change: 1 addition & 0 deletions docs/root/configuration/observability/access_log/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ HTTP only
**UpstreamMaxStreamDurationReached**, **UMSDR**, The upstream request reached max stream duration.
**OverloadManagerTerminated**, **OM**, Overload Manager terminated the request.
**DnsResolutionFailed**, **DF**, The request was terminated due to DNS resolution failure.
**DropOverload**, **DO**, The request was terminated in addition to 503 response code due to :ref:`drop_overloads<envoy_v3_api_field_config.endpoint.v3.ClusterLoadAssignment.Policy.drop_overloads>`.

UDP
Not implemented ("-").
Expand Down
6 changes: 5 additions & 1 deletion envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ enum ResponseFlag {
OverloadManager = 0x2000000,
// DNS resolution failed.
DnsResolutionFailed = 0x4000000,
// Drop certain percentage of overloaded traffic.
DropOverLoad = 0x8000000,
// ATTENTION: MAKE SURE THIS REMAINS EQUAL TO THE LAST FLAG.
LastFlag = DnsResolutionFailed,
LastFlag = DropOverLoad,
};

/**
Expand Down Expand Up @@ -156,6 +158,8 @@ struct ResponseCodeDetailValues {
const std::string ClusterNotFound = "cluster_not_found";
// The request was rejected by the router filter because the cluster was in maintenance mode.
const std::string MaintenanceMode = "maintenance_mode";
// The request was rejected by the router filter because the DROP_OVERLOAD configuration.
const std::string DropOverload = "drop_overload";
// The request was rejected by the router filter because there was no healthy upstream found.
const std::string NoHealthyUpstream = "no_healthy_upstream";
// The request was forwarded upstream but the response timed out.
Expand Down
10 changes: 10 additions & 0 deletions envoy/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ class ThreadLocalCluster {
virtual Tcp::AsyncTcpClientPtr
tcpAsyncClient(LoadBalancerContext* context,
Tcp::AsyncTcpClientOptionsConstSharedPtr options) PURE;

/**
* @return the thread local cluster drop_overload configuration.
*/
virtual UnitFloat dropOverload() const PURE;

/**
* Set up the drop_overload value for the thread local cluster.
*/
virtual void setDropOverload(UnitFloat drop_overload) PURE;
};

using ThreadLocalClusterOptRef = absl::optional<std::reference_wrapper<ThreadLocalCluster>>;
Expand Down
19 changes: 16 additions & 3 deletions envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -771,11 +771,14 @@ class PrioritySet {

/**
* All cluster load report stats. These are only use for EDS load reporting and not sent to the
* stats sink. See envoy.api.v2.endpoint.ClusterStats for the definition of upstream_rq_dropped.
* These are latched by LoadStatsReporter, independent of the normal stats sink flushing.
* stats sink. See envoy.config.endpoint.v3.ClusterStats for the definition of
* total_dropped_requests and dropped_requests, which correspond to the upstream_rq_dropped and
* upstream_rq_drop_overload counter here. These are latched by LoadStatsReporter, independent of
* the normal stats sink flushing.
*/
#define ALL_CLUSTER_LOAD_REPORT_STATS(COUNTER, GAUGE, HISTOGRAM, TEXT_READOUT, STATNAME) \
COUNTER(upstream_rq_dropped)
COUNTER(upstream_rq_dropped) \
COUNTER(upstream_rq_drop_overload)

/**
* Cluster circuit breakers gauges. Note that we do not generate a stats
Expand Down Expand Up @@ -1339,6 +1342,16 @@ class Cluster {
* @return the const PrioritySet for the cluster.
*/
virtual const PrioritySet& prioritySet() const PURE;

/**
* @return the cluster drop_overload configuration.
*/
virtual UnitFloat dropOverload() const PURE;

/**
* Set up the drop_overload value for the cluster.
*/
virtual void setDropOverload(UnitFloat drop_overload) PURE;
};

using ClusterSharedPtr = std::shared_ptr<Cluster>;
Expand Down
5 changes: 5 additions & 0 deletions source/common/http/headers.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class HeaderValues {
const LowerCaseString EnvoyOriginalMethod{absl::StrCat(prefix(), "-original-method")};
const LowerCaseString EnvoyOriginalPath{absl::StrCat(prefix(), "-original-path")};
const LowerCaseString EnvoyOverloaded{absl::StrCat(prefix(), "-overloaded")};
const LowerCaseString EnvoyDropOverload{absl::StrCat(prefix(), "-drop-overload")};
const LowerCaseString EnvoyRateLimited{absl::StrCat(prefix(), "-ratelimited")};
const LowerCaseString EnvoyRetryOn{absl::StrCat(prefix(), "-retry-on")};
const LowerCaseString EnvoyRetryGrpcOn{absl::StrCat(prefix(), "-retry-grpc-on")};
Expand Down Expand Up @@ -279,6 +280,10 @@ class HeaderValues {
const std::string True{"true"};
} EnvoyOverloadedValues;

struct {
const std::string True{"true"};
} EnvoyDropOverloadValues;

struct {
const std::string True{"true"};
} EnvoyRateLimitedValues;
Expand Down
32 changes: 32 additions & 0 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
return Http::FilterHeadersStatus::StopIteration;
}

// Support DROP_OVERLOAD config from control plane to drop certain percentage of traffic.
if (checkDropOverload(*cluster, modify_headers)) {
return Http::FilterHeadersStatus::StopIteration;
}

// Fetch a connection pool for the upstream cluster.
const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();

Expand Down Expand Up @@ -1995,6 +2000,33 @@ uint32_t Filter::numRequestsAwaitingHeaders() {
[](const auto& req) -> bool { return req->awaitingHeaders(); });
}

bool Filter::checkDropOverload(Upstream::ThreadLocalCluster& cluster,
std::function<void(Http::ResponseHeaderMap&)>& modify_headers) {
if (cluster.dropOverload().value()) {
ENVOY_STREAM_LOG(debug, "Router filter: cluster DROP_OVERLOAD configuration: {}", *callbacks_,
cluster.dropOverload().value());
if (config_.random_.bernoulli(cluster.dropOverload())) {
ENVOY_STREAM_LOG(debug, "The request is dropped by DROP_OVERLOAD", *callbacks_);
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DropOverLoad);
chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
callbacks_->sendLocalReply(
Http::Code::ServiceUnavailable, "drop overload",
[modify_headers, this](Http::ResponseHeaderMap& headers) {
if (!config_.suppress_envoy_headers_) {
headers.addReference(Http::Headers::get().EnvoyDropOverload,
Http::Headers::get().EnvoyDropOverloadValues.True);
}
modify_headers(headers);
},
absl::nullopt, StreamInfo::ResponseCodeDetails::get().DropOverload);

cluster.info()->loadReportStats().upstream_rq_drop_overload_.inc();
return true;
}
}
return false;
}

RetryStatePtr
ProdFilter::createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers,
const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster,
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
UpstreamRequest& upstream_request, bool end_stream,
uint64_t grpc_to_http_status);
Http::Context& httpContext() { return config_.http_context_; }
bool checkDropOverload(Upstream::ThreadLocalCluster& cluster,
std::function<void(Http::ResponseHeaderMap&)>& modify_headers);

RetryStatePtr retry_state_;
FilterConfig& config_;
Expand Down
2 changes: 1 addition & 1 deletion source/common/stream_info/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const std::string ResponseFlagUtils::toString(const StreamInfo& stream_info, boo
}

absl::flat_hash_map<std::string, ResponseFlag> ResponseFlagUtils::getFlagMap() {
static_assert(ResponseFlag::LastFlag == 0x4000000,
static_assert(ResponseFlag::LastFlag == 0x8000000,
"A flag has been added. Add the new flag to ALL_RESPONSE_STRINGS_FLAGS.");
absl::flat_hash_map<std::string, ResponseFlag> res;
for (auto [flag_strings, flag] : ResponseFlagUtils::ALL_RESPONSE_STRINGS_FLAGS) {
Expand Down
31 changes: 20 additions & 11 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1211,15 +1211,16 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_

pending_cluster_creations_.erase(cm_cluster.cluster().info()->name());

const UnitFloat drop_overload = cm_cluster.cluster().dropOverload();
// Populate the cluster initialization object based on this update.
ClusterInitializationObjectConstSharedPtr cluster_initialization_object =
addOrUpdateClusterInitializationObjectIfSupported(params, cm_cluster.cluster().info(),
load_balancer_factory, host_map);
addOrUpdateClusterInitializationObjectIfSupported(
params, cm_cluster.cluster().info(), load_balancer_factory, host_map, drop_overload);

tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
add_or_update_cluster, load_balancer_factory, map = std::move(host_map),
cluster_initialization_object = std::move(cluster_initialization_object)](
OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
cluster_initialization_object = std::move(cluster_initialization_object),
drop_overload](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ASSERT(cluster_manager.has_value(),
"Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");

Expand Down Expand Up @@ -1275,6 +1276,9 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_
cluster_manager->thread_local_clusters_.size());
}

if (cluster_manager->thread_local_clusters_[info->name()]) {
cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);
}
for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
info->name(), per_priority.priority_, per_priority.update_hosts_params_,
Expand Down Expand Up @@ -1309,7 +1313,8 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_
ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr
ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map) {
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload) {
if (!deferralIsSupportedForCluster(cluster_info)) {
return nullptr;
}
Expand Down Expand Up @@ -1340,13 +1345,13 @@ ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
entry->second->per_priority_state_, params, std::move(cluster_info),
load_balancer_factory == nullptr ? entry->second->load_balancer_factory_
: load_balancer_factory,
map);
map, drop_overload);
cluster_initialization_map_[cluster_name] = new_initialization_object;
return new_initialization_object;
} else {
// We need to create a fresh Cluster Initialization Object.
auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
params, std::move(cluster_info), load_balancer_factory, map);
params, std::move(cluster_info), load_balancer_factory, map, drop_overload);
cluster_initialization_map_[cluster_name] = new_initialization_object;
return new_initialization_object;
}
Expand Down Expand Up @@ -1379,6 +1384,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExis
per_priority.overprovisioning_factor_,
initialization_object->cross_priority_host_map_);
}
thread_local_clusters_[cluster]->setDropOverload(initialization_object->drop_overload_);

// Remove the CIO as we've initialized the cluster.
thread_local_deferred_clusters_.erase(entry);
Expand All @@ -1388,9 +1394,10 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExis

ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map)
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload)
: cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory),
cross_priority_host_map_(map) {
cross_priority_host_map_(map), drop_overload_(drop_overload) {
// Copy the update since the map is empty.
for (const auto& update : params.per_priority_update_params_) {
per_priority_state_.emplace(update.priority_, update);
Expand All @@ -1400,9 +1407,11 @@ ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>& per_priority_state,
const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map)
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload)
: per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)),
load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map) {
load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map),
drop_overload_(drop_overload) {

// Because EDS Clusters receive the entire ClusterLoadAssignment but only
// provides the delta we must process the hosts_added and hosts_removed and
Expand Down
12 changes: 9 additions & 3 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,18 +434,20 @@ class ClusterManagerImpl : public ClusterManager,
ClusterInitializationObject(const ThreadLocalClusterUpdateParams& params,
ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory,
HostMapConstSharedPtr map);
HostMapConstSharedPtr map, UnitFloat drop_overload);

ClusterInitializationObject(
const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>&
per_priority_state,
const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map);
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload);

absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority> per_priority_state_;
const ClusterInfoConstSharedPtr cluster_info_;
const LoadBalancerFactorySharedPtr load_balancer_factory_;
const HostMapConstSharedPtr cross_priority_host_map_;
UnitFloat drop_overload_{0};
};

using ClusterInitializationObjectConstSharedPtr =
Expand Down Expand Up @@ -614,6 +616,8 @@ class ClusterManagerImpl : public ClusterManager,
// Drain any connection pools associated with the hosts filtered by the predicate.
void drainConnPools(DrainConnectionsHostPredicate predicate,
ConnectionPool::DrainBehavior behavior);
UnitFloat dropOverload() const override { return drop_overload_; }
void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }

private:
Http::ConnectionPool::Instance*
Expand All @@ -629,6 +633,7 @@ class ClusterManagerImpl : public ClusterManager,

ThreadLocalClusterManagerImpl& parent_;
PrioritySetImpl priority_set_;
UnitFloat drop_overload_{0};

// Don't change the order of cluster_info_ and lb_factory_/lb_ as the the lb_factory_/lb_
// may keep a reference to the cluster_info_.
Expand Down Expand Up @@ -875,7 +880,8 @@ class ClusterManagerImpl : public ClusterManager,
*/
ClusterInitializationObjectConstSharedPtr addOrUpdateClusterInitializationObjectIfSupported(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map);
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload);

bool deferralIsSupportedForCluster(const ClusterInfoConstSharedPtr& info) const;

Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/health_discovery_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class HdsCluster : public Cluster, Logger::Loggable<Logger::Id::upstream> {

std::vector<Upstream::HealthCheckerSharedPtr> healthCheckers() { return health_checkers_; };
std::vector<HostSharedPtr> hosts() { return *hosts_; };
UnitFloat dropOverload() const override { return UnitFloat(0); }
void setDropOverload(UnitFloat) override {}

protected:
PrioritySetImpl priority_set_;
Expand Down
Loading