Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ bug_fixes:
the number of requests per I/O cycle is configured and an HTTP decoder filter that pauses filter chain is present. This behavior
can be reverted by setting the runtime guard ``envoy.reloadable_features.use_filter_manager_state_for_downstream_end_stream``
to false.
- area: upstream
change: |
Fixed a bug using hard coded drop category when reporting drop_overload stats to the load report service.
It is changed to use drop category that is set in
:ref:`category <envoy_v3_api_field_config.endpoint.v3.clusterloadassignment.policy.DropOverload.category>`.
- area: proxy_filter
change: |
Fixed a bug in the ``CONNECT`` implementation that would cause the ``CONNECT`` request created to be invalid when the
Expand Down
10 changes: 10 additions & 0 deletions envoy/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,20 @@ class ThreadLocalCluster {
*/
virtual UnitFloat dropOverload() const PURE;

/**
* @return the thread local cluster drop_category configuration.
*/
virtual const std::string& dropCategory() const PURE;

/**
* Set up the drop_overload value for the thread local cluster.
*/
virtual void setDropOverload(UnitFloat drop_overload) PURE;

/**
* Set up the drop_category value for the thread local cluster.
*/
virtual void setDropCategory(absl::string_view drop_category) PURE;
};

using ThreadLocalClusterOptRef = absl::optional<std::reference_wrapper<ThreadLocalCluster>>;
Expand Down
10 changes: 10 additions & 0 deletions envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -1315,10 +1315,20 @@ class Cluster {
*/
virtual UnitFloat dropOverload() const PURE;

/**
* @return the cluster drop_category_ configuration.
*/
virtual const std::string& dropCategory() const PURE;

/**
* Set up the drop_overload value for the cluster.
*/
virtual void setDropOverload(UnitFloat drop_overload) PURE;

/**
* Set up the drop_category value for the thread local cluster.
*/
virtual void setDropCategory(absl::string_view drop_category) PURE;
};

using ClusterSharedPtr = std::shared_ptr<Cluster>;
Expand Down
25 changes: 15 additions & 10 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,15 +1236,18 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_
pending_cluster_creations_.erase(cm_cluster.cluster().info()->name());

const UnitFloat drop_overload = cm_cluster.cluster().dropOverload();
const std::string drop_category = cm_cluster.cluster().dropCategory();
// Populate the cluster initialization object based on this update.
ClusterInitializationObjectConstSharedPtr cluster_initialization_object =
addOrUpdateClusterInitializationObjectIfSupported(
params, cm_cluster.cluster().info(), load_balancer_factory, host_map, drop_overload);
addOrUpdateClusterInitializationObjectIfSupported(params, cm_cluster.cluster().info(),
load_balancer_factory, host_map,
drop_overload, drop_category);

tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
add_or_update_cluster, load_balancer_factory, map = std::move(host_map),
cluster_initialization_object = std::move(cluster_initialization_object),
drop_overload](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
drop_overload, drop_category = std::move(drop_category)](
OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ASSERT(cluster_manager.has_value(),
"Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");

Expand Down Expand Up @@ -1302,6 +1305,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_

if (cluster_manager->thread_local_clusters_[info->name()]) {
cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);
cluster_manager->thread_local_clusters_[info->name()]->setDropCategory(drop_category);
}
for (const auto& per_priority : params.per_priority_update_params_) {
cluster_manager->updateClusterMembership(
Expand Down Expand Up @@ -1338,7 +1342,7 @@ ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr
ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload) {
UnitFloat drop_overload, absl::string_view drop_category) {
if (!deferralIsSupportedForCluster(cluster_info)) {
return nullptr;
}
Expand Down Expand Up @@ -1369,13 +1373,13 @@ ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
entry->second->per_priority_state_, params, std::move(cluster_info),
load_balancer_factory == nullptr ? entry->second->load_balancer_factory_
: load_balancer_factory,
map, drop_overload);
map, drop_overload, drop_category);
cluster_initialization_map_[cluster_name] = new_initialization_object;
return new_initialization_object;
} else {
// We need to create a fresh Cluster Initialization Object.
auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
params, std::move(cluster_info), load_balancer_factory, map, drop_overload);
params, std::move(cluster_info), load_balancer_factory, map, drop_overload, drop_category);
cluster_initialization_map_[cluster_name] = new_initialization_object;
return new_initialization_object;
}
Expand Down Expand Up @@ -1409,6 +1413,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExis
initialization_object->cross_priority_host_map_);
}
thread_local_clusters_[cluster]->setDropOverload(initialization_object->drop_overload_);
thread_local_clusters_[cluster]->setDropCategory(initialization_object->drop_category_);

// Remove the CIO as we've initialized the cluster.
thread_local_deferred_clusters_.erase(entry);
Expand All @@ -1419,9 +1424,9 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExis
ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload)
UnitFloat drop_overload, absl::string_view drop_category)
: cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory),
cross_priority_host_map_(map), drop_overload_(drop_overload) {
cross_priority_host_map_(map), drop_overload_(drop_overload), drop_category_(drop_category) {
// Copy the update since the map is empty.
for (const auto& update : params.per_priority_update_params_) {
per_priority_state_.emplace(update.priority_, update);
Expand All @@ -1432,10 +1437,10 @@ ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>& per_priority_state,
const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload)
UnitFloat drop_overload, absl::string_view drop_category)
: per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)),
load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map),
drop_overload_(drop_overload) {
drop_overload_(drop_overload), drop_category_(drop_category) {

// Because EDS Clusters receive the entire ClusterLoadAssignment but only
// provides the delta we must process the hosts_added and hosts_removed and
Expand Down
13 changes: 10 additions & 3 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,20 +427,22 @@ class ClusterManagerImpl : public ClusterManager,
ClusterInitializationObject(const ThreadLocalClusterUpdateParams& params,
ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory,
HostMapConstSharedPtr map, UnitFloat drop_overload);
HostMapConstSharedPtr map, UnitFloat drop_overload,
absl::string_view drop_category);

ClusterInitializationObject(
const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>&
per_priority_state,
const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload);
UnitFloat drop_overload, absl::string_view drop_category);

absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority> per_priority_state_;
const ClusterInfoConstSharedPtr cluster_info_;
const LoadBalancerFactorySharedPtr load_balancer_factory_;
const HostMapConstSharedPtr cross_priority_host_map_;
UnitFloat drop_overload_{0};
const std::string drop_category_;
};

using ClusterInitializationObjectConstSharedPtr =
Expand Down Expand Up @@ -610,7 +612,11 @@ class ClusterManagerImpl : public ClusterManager,
void drainConnPools(DrainConnectionsHostPredicate predicate,
ConnectionPool::DrainBehavior behavior);
UnitFloat dropOverload() const override { return drop_overload_; }
const std::string& dropCategory() const override { return drop_category_; }
void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }
void setDropCategory(absl::string_view drop_category) override {
drop_category_ = drop_category;
}

private:
Http::ConnectionPool::Instance*
Expand All @@ -627,6 +633,7 @@ class ClusterManagerImpl : public ClusterManager,
ThreadLocalClusterManagerImpl& parent_;
PrioritySetImpl priority_set_;
UnitFloat drop_overload_{0};
std::string drop_category_;

// Don't change the order of cluster_info_ and lb_factory_/lb_ as the the lb_factory_/lb_
// may keep a reference to the cluster_info_.
Expand Down Expand Up @@ -889,7 +896,7 @@ class ClusterManagerImpl : public ClusterManager,
ClusterInitializationObjectConstSharedPtr addOrUpdateClusterInitializationObjectIfSupported(
const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
UnitFloat drop_overload);
UnitFloat drop_overload, absl::string_view drop_category);

bool deferralIsSupportedForCluster(const ClusterInfoConstSharedPtr& info) const;

Expand Down
6 changes: 5 additions & 1 deletion source/common/upstream/health_discovery_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ class HdsCluster : public Cluster, Logger::Loggable<Logger::Id::upstream> {

std::vector<Upstream::HealthCheckerSharedPtr> healthCheckers() { return health_checkers_; };
std::vector<HostSharedPtr> hosts() { return *hosts_; };
UnitFloat dropOverload() const override { return UnitFloat(0); }
UnitFloat dropOverload() const override { return drop_overload_; }
const std::string& dropCategory() const override { return drop_category_; }
void setDropOverload(UnitFloat) override {}
void setDropCategory(absl::string_view) override {}

protected:
PrioritySetImpl priority_set_;
Expand All @@ -99,6 +101,8 @@ class HdsCluster : public Cluster, Logger::Loggable<Logger::Id::upstream> {
std::vector<Upstream::HealthCheckerSharedPtr> health_checkers_;
HealthCheckerMap health_checkers_map_;
TimeSource& time_source_;
UnitFloat drop_overload_{0};
const std::string drop_category_;

absl::Status updateHealthchecks(
const Protobuf::RepeatedPtrField<envoy::config::core::v3::HealthCheck>& health_checks);
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void LoadStatsReporter::sendLoadStatsRequest() {
cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch();
if (drop_overload_count > 0) {
auto* dropped_request = cluster_stats->add_dropped_requests();
dropped_request->set_category("drop_overload");
dropped_request->set_category(cluster.dropCategory());
dropped_request->set_dropped_count(drop_overload_count);
}

Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,7 @@ absl::Status ClusterImplBase::parseDropOverloadConfig(

drop_ratio = std::min(drop_ratio, float(drop_ratio_runtime) / float(MAX_DROP_OVERLOAD_RUNTIME));
drop_overload_ = UnitFloat(drop_ratio);
drop_category_ = policy.drop_overloads(0).category();
return absl::OkStatus();
}

Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,9 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::u
const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); }
void initialize(std::function<void()> callback) override;
UnitFloat dropOverload() const override { return drop_overload_; }
const std::string& dropCategory() const override { return drop_category_; }
void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }
void setDropCategory(absl::string_view drop_category) override { drop_category_ = drop_category; }

protected:
ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
Expand Down Expand Up @@ -1271,6 +1273,7 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::u
Config::ConstMetadataSharedPoolSharedPtr const_metadata_shared_pool_;
Common::CallbackHandlePtr priority_update_cb_;
UnitFloat drop_overload_{0};
std::string drop_category_;
static constexpr int kDropOverloadSize = 1;
};

Expand Down
2 changes: 1 addition & 1 deletion source/server/admin/config_dump_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ ConfigDumpHandler::dumpEndpointConfigs(const Matchers::StringMatcher& name_match
float value = cluster.dropOverload().value() * 1000000;
if (value > 0) {
auto* drop_overload = policy.add_drop_overloads();
drop_overload->set_category("drop_overload");
drop_overload->set_category(cluster.dropCategory());
auto* percent = drop_overload->mutable_drop_percentage();
percent->set_denominator(envoy::type::v3::FractionalPercent::MILLION);
percent->set_numerator(uint32_t(value));
Expand Down
4 changes: 3 additions & 1 deletion test/common/upstream/upstream_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBasicMillion) {
false);
auto cluster = *StrictDnsClusterImpl::create(cluster_config, factory_context, dns_resolver);
EXPECT_EQ(0.000035f, cluster->dropOverload().value());
EXPECT_EQ("test", cluster->dropCategory());
}

TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBasicTenThousand) {
Expand All @@ -290,7 +291,7 @@ TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBasicTenThousand) {
load_assignment:
policy:
drop_overloads:
category: test
category: foo
drop_percentage:
numerator: 1000
denominator: TEN_THOUSAND
Expand All @@ -301,6 +302,7 @@ TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBasicTenThousand) {
false);
auto cluster = *StrictDnsClusterImpl::create(cluster_config, factory_context, dns_resolver);
EXPECT_EQ(0.1f, cluster->dropOverload().value());
EXPECT_EQ("foo", cluster->dropCategory());
}

TEST_P(StrictDnsParamTest, DropOverLoadConfigTestBadDenominator) {
Expand Down
6 changes: 6 additions & 0 deletions test/mocks/upstream/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Upstream {
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using ::testing::ReturnRef;

MockCluster::MockCluster() {
ON_CALL(*this, info()).WillByDefault(Return(info_));
ON_CALL(*this, initialize(_))
Expand All @@ -16,9 +18,13 @@ MockCluster::MockCluster() {
initialize_callback_ = callback;
}));
ON_CALL(*this, dropOverload()).WillByDefault(Return(drop_overload_));
ON_CALL(*this, dropCategory()).WillByDefault(ReturnRef(drop_category_));
ON_CALL(*this, setDropOverload(_)).WillByDefault(Invoke([this](UnitFloat drop_overload) -> void {
drop_overload_ = drop_overload;
}));
ON_CALL(*this, setDropCategory(_))
.WillByDefault(Invoke(
[this](absl::string_view drop_category) -> void { drop_category_ = drop_category; }));
}

MockCluster::~MockCluster() = default;
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/upstream/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ class MockCluster : public Cluster {
MOCK_METHOD(PrioritySet&, prioritySet, ());
MOCK_METHOD(const PrioritySet&, prioritySet, (), (const));
MOCK_METHOD(UnitFloat, dropOverload, (), (const));
MOCK_METHOD(const std::string&, dropCategory, (), (const));
MOCK_METHOD(void, setDropOverload, (UnitFloat));
MOCK_METHOD(void, setDropCategory, (absl::string_view));

std::shared_ptr<MockClusterInfo> info_{new ::testing::NiceMock<MockClusterInfo>()};
std::function<void()> initialize_callback_;
Network::Address::InstanceConstSharedPtr source_address_;
UnitFloat drop_overload_{0};
std::string drop_category_{"drop_overload"};
};
} // namespace Upstream
} // namespace Envoy
5 changes: 5 additions & 0 deletions test/mocks/upstream/thread_local_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ MockThreadLocalCluster::MockThreadLocalCluster() {
.WillByDefault(Return(Upstream::TcpPoolData([]() {}, &tcp_conn_pool_)));
ON_CALL(*this, httpAsyncClient()).WillByDefault(ReturnRef(async_client_));
ON_CALL(*this, dropOverload()).WillByDefault(Return(cluster_.drop_overload_));
ON_CALL(*this, dropCategory()).WillByDefault(ReturnRef(cluster_.drop_category_));
ON_CALL(*this, setDropOverload(_)).WillByDefault(Invoke([this](UnitFloat drop_overload) -> void {
cluster_.drop_overload_ = drop_overload;
}));
ON_CALL(*this, setDropCategory(_))
.WillByDefault(Invoke([this](absl::string_view drop_category) -> void {
cluster_.drop_category_ = drop_category;
}));
}

MockThreadLocalCluster::~MockThreadLocalCluster() = default;
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/upstream/thread_local_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class MockThreadLocalCluster : public ThreadLocalCluster {
MOCK_METHOD(Tcp::AsyncTcpClientPtr, tcpAsyncClient,
(LoadBalancerContext * context, Tcp::AsyncTcpClientOptionsConstSharedPtr options));
MOCK_METHOD(UnitFloat, dropOverload, (), (const));
MOCK_METHOD(const std::string&, dropCategory, (), (const));
MOCK_METHOD(void, setDropOverload, (UnitFloat));
MOCK_METHOD(void, setDropCategory, (absl::string_view));

NiceMock<MockClusterMockPrioritySet> cluster_;
NiceMock<MockLoadBalancer> lb_;
Expand Down
3 changes: 2 additions & 1 deletion test/server/admin/config_dump_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ TEST_P(AdminInstanceTest, ConfigDumpWithEndpoint) {
hostname_for_healthcheck, "tcp://1.2.3.5:90", 5, 6);
// Adding drop_overload config.
ON_CALL(cluster, dropOverload()).WillByDefault(Return(UnitFloat(0.00035)));

const std::string drop_overload = "drop_overload";
ON_CALL(cluster, dropCategory()).WillByDefault(ReturnRef(drop_overload));
Buffer::OwnedImpl response;
Http::TestResponseHeaderMapImpl header_map;
EXPECT_EQ(Http::Code::OK, getCallback("/config_dump?include_eds", header_map, response));
Expand Down