Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1471,8 +1471,16 @@ void PriorityStateManager::updateClusterPrioritySet(
LocalityWeightsSharedPtr locality_weights;
std::vector<HostVector> per_locality;

// If we are configured for locality weighted LB we populate the locality weights.
const bool locality_weighted_lb = parent_.info()->lbConfig().has_locality_weighted_lb_config();
// If we are configured for locality weighted LB we populate the locality weights. We also
// populate locality weights if the cluster uses load balancing extensions, since the extension
// may want to make use of locality weights and we cannot tell by inspecting the config whether
// this is the case.
//
// TODO: have the load balancing extension indicate, programmatically, whether it needs locality
// weights, as an optimization in cases where it doesn't.
const bool locality_weighted_lb =
parent_.info()->lbConfig().has_locality_weighted_lb_config() ||
parent_.info()->lbType() == LoadBalancerType::LoadBalancingPolicyConfig;
if (locality_weighted_lb) {
locality_weights = std::make_shared<LocalityWeights>();
}
Expand Down
2 changes: 2 additions & 0 deletions test/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ envoy_cc_test(
"//source/common/upstream:load_balancer_factory_base_lib",
"//source/extensions/transport_sockets/tls:config",
"//test/config:v2_link_hacks",
"//test/integration/load_balancers:custom_lb_policy",
"//test/mocks/matcher:matcher_mocks",
"//test/mocks/upstream:cds_api_mocks",
"//test/mocks/upstream:cluster_priority_set_mocks",
Expand Down Expand Up @@ -110,6 +111,7 @@ envoy_cc_test(
"//source/extensions/transport_sockets/tls:config",
"//source/server:transport_socket_config_lib",
"//test/common/stats:stat_test_utility_lib",
"//test/integration/load_balancers:custom_lb_policy",
"//test/mocks/local_info:local_info_mocks",
"//test/mocks/protobuf:protobuf_mocks",
"//test/mocks/runtime:runtime_mocks",
Expand Down
52 changes: 4 additions & 48 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -757,53 +757,10 @@ TEST_F(ClusterManagerImplTest, ClusterProvidedLbNotConfigured) {
"'cluster_0' provided one. Check cluster documentation.");
}

class CustomLbFactory : public TypedLoadBalancerFactoryBase {
public:
CustomLbFactory() : TypedLoadBalancerFactoryBase("envoy.load_balancers.custom_lb") {}

ThreadAwareLoadBalancerPtr
create(const PrioritySet&, ClusterStats&, Stats::Scope&, Runtime::Loader&,
Random::RandomGenerator&,
const ::envoy::config::cluster::v3::LoadBalancingPolicy_Policy&) override {
return std::make_unique<ThreadAwareLbImpl>();
}

private:
class LbImpl : public LoadBalancer {
public:
LbImpl() = default;

Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext*) override {
return nullptr;
}
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
return nullptr;
}
};

class LbFactory : public LoadBalancerFactory {
public:
LbFactory() = default;

Upstream::LoadBalancerPtr create() override { return std::make_unique<LbImpl>(); }
};

class ThreadAwareLbImpl : public Upstream::ThreadAwareLoadBalancer {
public:
ThreadAwareLbImpl() = default;

Upstream::LoadBalancerFactorySharedPtr factory() override {
return std::make_shared<LbFactory>();
}
void initialize() override {}
};
};

// Verify that specifying LOAD_BALANCING_POLICY_CONFIG with CommonLbConfig is an error.
TEST_F(ClusterManagerImplTest, LbPolicyConfigCannotSpecifyCommonLbConfig) {
CustomLbFactory factory;
Registry::InjectFactory<TypedLoadBalancerFactory> registration(factory);

// envoy.load_balancers.custom_lb is registered by linking in
// //test/integration/load_balancers:custom_lb_policy.
const std::string yaml = fmt::format(R"EOF(
static_resources:
clusters:
Expand Down Expand Up @@ -871,9 +828,8 @@ TEST_F(ClusterManagerImplTest, LbPolicyConfigMustSpecifyLbPolicy) {
// Verify that multiple load balancing policies can be specified, and Envoy selects the first
// policy that it has a factory for.
TEST_F(ClusterManagerImplTest, LbPolicyConfig) {
CustomLbFactory factory;
Registry::InjectFactory<TypedLoadBalancerFactory> registration(factory);

// envoy.load_balancers.custom_lb is registered by linking in
// //test/integration/load_balancers:custom_lb_policy.
const std::string yaml = fmt::format(R"EOF(
static_resources:
clusters:
Expand Down
159 changes: 95 additions & 64 deletions test/common/upstream/eds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ TEST_F(EdsTest, EndpointLocality) {
}

// Validate that onConfigUpdate() does not propagate locality weights to the host set when
// locality weighted balancing isn't configured.
// locality weighted balancing isn't configured and the cluster does not use LB policy extensions.
TEST_F(EdsTest, EndpointLocalityWeightsIgnored) {
envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");
Expand Down Expand Up @@ -1503,12 +1503,77 @@ TEST_F(EdsTest, EndpointLocalityWeightsIgnored) {
EXPECT_EQ(nullptr, cluster_->prioritySet().hostSetsPerPriority()[0]->localityWeights());
}

class EdsLocalityWeightsTest : public EdsTest {
public:
void expectLocalityWeightsPresentForClusterConfig(const std::string& config) {
envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");
resetCluster(config, Cluster::InitializePhase::Secondary);

{
auto* endpoints = cluster_load_assignment.add_endpoints();
auto* locality = endpoints->mutable_locality();
locality->set_region("oceania");
locality->set_zone("hello");
locality->set_sub_zone("world");
endpoints->mutable_load_balancing_weight()->set_value(42);

auto* endpoint_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
endpoint_address->set_address("1.2.3.4");
endpoint_address->set_port_value(80);
}

{
auto* endpoints = cluster_load_assignment.add_endpoints();
auto* locality = endpoints->mutable_locality();
locality->set_region("space");
locality->set_zone("station");
locality->set_sub_zone("international");

auto* endpoint_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
endpoint_address->set_address("1.2.3.5");
endpoint_address->set_port_value(80);
}

{
auto* endpoints = cluster_load_assignment.add_endpoints();
auto* locality = endpoints->mutable_locality();
locality->set_region("sugar");
locality->set_zone("candy");
locality->set_sub_zone("mountain");
endpoints->mutable_load_balancing_weight()->set_value(37);

auto* endpoint_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
endpoint_address->set_address("1.2.3.6");
endpoint_address->set_port_value(80);
}

initialize();
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);
EXPECT_TRUE(initialized_);

const auto& locality_weights =
*cluster_->prioritySet().hostSetsPerPriority()[0]->localityWeights();
EXPECT_EQ(3, locality_weights.size());
EXPECT_EQ(42, locality_weights[0]);
EXPECT_EQ(0, locality_weights[1]);
EXPECT_EQ(37, locality_weights[2]);
}
};

// Validate that onConfigUpdate() propagates locality weights to the host set when locality
// weighted balancing is configured.
TEST_F(EdsTest, EndpointLocalityWeights) {
envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");
resetCluster(R"EOF(
TEST_F(EdsLocalityWeightsTest, WeightsPresentWithLocalityWeightedConfig) {
expectLocalityWeightsPresentForClusterConfig(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
Expand All @@ -1523,66 +1588,32 @@ TEST_F(EdsTest, EndpointLocalityWeights) {
cluster_names:
- eds
refresh_delay: 1s
)EOF",
Cluster::InitializePhase::Secondary);

{
auto* endpoints = cluster_load_assignment.add_endpoints();
auto* locality = endpoints->mutable_locality();
locality->set_region("oceania");
locality->set_zone("hello");
locality->set_sub_zone("world");
endpoints->mutable_load_balancing_weight()->set_value(42);

auto* endpoint_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
endpoint_address->set_address("1.2.3.4");
endpoint_address->set_port_value(80);
}

{
auto* endpoints = cluster_load_assignment.add_endpoints();
auto* locality = endpoints->mutable_locality();
locality->set_region("space");
locality->set_zone("station");
locality->set_sub_zone("international");

auto* endpoint_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
endpoint_address->set_address("1.2.3.5");
endpoint_address->set_port_value(80);
}

{
auto* endpoints = cluster_load_assignment.add_endpoints();
auto* locality = endpoints->mutable_locality();
locality->set_region("sugar");
locality->set_zone("candy");
locality->set_sub_zone("mountain");
endpoints->mutable_load_balancing_weight()->set_value(37);

auto* endpoint_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
endpoint_address->set_address("1.2.3.6");
endpoint_address->set_port_value(80);
}

initialize();
doOnConfigUpdateVerifyNoThrow(cluster_load_assignment);
EXPECT_TRUE(initialized_);
)EOF");
}

const auto& locality_weights =
*cluster_->prioritySet().hostSetsPerPriority()[0]->localityWeights();
EXPECT_EQ(3, locality_weights.size());
EXPECT_EQ(42, locality_weights[0]);
EXPECT_EQ(0, locality_weights[1]);
EXPECT_EQ(37, locality_weights[2]);
// Validate that onConfigUpdate() propagates locality weights to the host set when the cluster uses
// load balancing policy extensions.
TEST_F(EdsLocalityWeightsTest, WeightsPresentWithLoadBalancingPolicyConfig) {
// envoy.load_balancers.custom_lb is registered by linking in
// //test/integration/load_balancers:custom_lb_policy.
expectLocalityWeightsPresentForClusterConfig(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
lb_policy: LOAD_BALANCING_POLICY_CONFIG
load_balancing_policy:
policies:
- typed_extension_config:
name: envoy.load_balancers.custom_lb
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
api_type: REST
cluster_names:
- eds
refresh_delay: 1s
)EOF");
}

// Validate that onConfigUpdate() removes any locality not referenced in the
Expand Down
1 change: 1 addition & 0 deletions test/integration/clusters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ envoy_cc_test_library(
"//source/extensions/transport_sockets/raw_buffer:config",
"//source/server:transport_socket_config_lib",
"//test/common/upstream:utility_lib",
"//test/integration/load_balancers:custom_lb_policy",
"//test/test_common:registry_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
Expand Down
2 changes: 2 additions & 0 deletions test/integration/clusters/custom_static_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "envoy/config/core/v3/health_check.pb.h"
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"

#include "test/integration/load_balancers/custom_lb_policy.h"

namespace Envoy {

// ClusterImplBase
Expand Down
32 changes: 0 additions & 32 deletions test/integration/clusters/custom_static_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,38 +35,6 @@ class CustomStaticCluster : public Upstream::ClusterImplBase {
InitializePhase initializePhase() const override { return InitializePhase::Primary; }

private:
struct LbImpl : public Upstream::LoadBalancer {
LbImpl(const Upstream::HostSharedPtr& host) : host_(host) {}

Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext*) override {
return host_;
}
Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
return nullptr;
}

const Upstream::HostSharedPtr host_;
};

struct LbFactory : public Upstream::LoadBalancerFactory {
LbFactory(const Upstream::HostSharedPtr& host) : host_(host) {}

Upstream::LoadBalancerPtr create() override { return std::make_unique<LbImpl>(host_); }

const Upstream::HostSharedPtr host_;
};

struct ThreadAwareLbImpl : public Upstream::ThreadAwareLoadBalancer {
ThreadAwareLbImpl(const Upstream::HostSharedPtr& host) : host_(host) {}

Upstream::LoadBalancerFactorySharedPtr factory() override {
return std::make_shared<LbFactory>(host_);
}
void initialize() override {}

const Upstream::HostSharedPtr host_;
};

Upstream::ThreadAwareLoadBalancerPtr threadAwareLb();

// ClusterImplBase
Expand Down
24 changes: 24 additions & 0 deletions test/integration/load_balancers/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_test_library",
"envoy_package",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_test_library(
name = "custom_lb_policy",
srcs = [
"custom_lb_policy.cc",
],
hdrs = [
"custom_lb_policy.h",
],
deps = [
"//envoy/upstream:load_balancer_interface",
"//source/common/upstream:load_balancer_factory_base_lib",
"//test/test_common:registry_lib",
],
)
9 changes: 9 additions & 0 deletions test/integration/load_balancers/custom_lb_policy.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#include "test/integration/load_balancers/custom_lb_policy.h"

#include "envoy/registry/registry.h"

namespace Envoy {

REGISTER_FACTORY(CustomLbFactory, Upstream::TypedLoadBalancerFactory);

} // namespace Envoy
Loading