Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions source/common/upstream/thread_aware_lb_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ void ThreadAwareLoadBalancerBase::initialize() {
// complicated initialization as the load balancer would need its own initialized callback. I
// think the synchronous/asynchronous split is probably the best option.
priority_update_cb_ = priority_set_.addPriorityUpdateCb(
[this](uint32_t, const HostVector&, const HostVector&) -> void {
refresh();
threadSafeSetCrossPriorityHostMap(priority_set_.crossPriorityHostMap());
});
[this](uint32_t, const HostVector&, const HostVector&) -> void { refresh(); });

refresh();
}
Expand Down Expand Up @@ -134,6 +131,7 @@ void ThreadAwareLoadBalancerBase::refresh() {
factory_->healthy_per_priority_load_ = healthy_per_priority_load;
factory_->degraded_per_priority_load_ = degraded_per_priority_load;
factory_->per_priority_state_ = per_priority_state_vector;
factory_->cross_priority_host_map_ = priority_set_.crossPriorityHostMap();
}
}

Expand Down Expand Up @@ -181,15 +179,15 @@ ThreadAwareLoadBalancerBase::LoadBalancerImpl::chooseHost(LoadBalancerContext* c
}

LoadBalancerPtr ThreadAwareLoadBalancerBase::LoadBalancerFactoryImpl::create() {
auto lb = std::make_unique<LoadBalancerImpl>(
stats_, random_, thread_aware_lb_.threadSafeGetCrossPriorityHostMap());
auto lb = std::make_unique<LoadBalancerImpl>(stats_, random_);

// We must protect current_lb_ via a RW lock since it is accessed and written to by multiple
// threads. All complex processing has already been precalculated however.
absl::ReaderMutexLock lock(&mutex_);
lb->healthy_per_priority_load_ = healthy_per_priority_load_;
lb->degraded_per_priority_load_ = degraded_per_priority_load_;
lb->per_priority_state_ = per_priority_state_;
lb->cross_priority_host_map_ = cross_priority_host_map_;

return lb;
}
Expand Down
45 changes: 15 additions & 30 deletions source/common/upstream/thread_aware_lb_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL
Random::RandomGenerator& random,
const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
: LoadBalancerBase(priority_set, stats, runtime, random, common_config),
factory_(new LoadBalancerFactoryImpl(stats, random, *this)) {}
factory_(new LoadBalancerFactoryImpl(stats, random)) {}

private:
struct PerPriorityState {
Expand All @@ -108,9 +108,8 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL
using PerPriorityStatePtr = std::unique_ptr<PerPriorityState>;

struct LoadBalancerImpl : public LoadBalancer {
LoadBalancerImpl(ClusterStats& stats, Random::RandomGenerator& random,
HostMapConstSharedPtr host_map)
: stats_(stats), random_(random), cross_priority_host_map_(std::move(host_map)) {}
LoadBalancerImpl(ClusterStats& stats, Random::RandomGenerator& random)
: stats_(stats), random_(random) {}

// Upstream::LoadBalancer
HostConstSharedPtr chooseHost(LoadBalancerContext* context) override;
Expand All @@ -128,52 +127,38 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL
};

struct LoadBalancerFactoryImpl : public LoadBalancerFactory {
LoadBalancerFactoryImpl(ClusterStats& stats, Random::RandomGenerator& random,
ThreadAwareLoadBalancerBase& thread_aware_lb)
: thread_aware_lb_(thread_aware_lb), stats_(stats), random_(random) {}
LoadBalancerFactoryImpl(ClusterStats& stats, Random::RandomGenerator& random)
: stats_(stats), random_(random) {}

// Upstream::LoadBalancerFactory
LoadBalancerPtr create() override;

ThreadAwareLoadBalancerBase& thread_aware_lb_;

ClusterStats& stats_;
Random::RandomGenerator& random_;
absl::Mutex mutex_;
std::shared_ptr<std::vector<PerPriorityStatePtr>> per_priority_state_ ABSL_GUARDED_BY(mutex_);
// This is split out of PerPriorityState so LoadBalancerBase::ChoosePriority can be reused.
std::shared_ptr<HealthyLoad> healthy_per_priority_load_ ABSL_GUARDED_BY(mutex_);
std::shared_ptr<DegradedLoad> degraded_per_priority_load_ ABSL_GUARDED_BY(mutex_);

// Whenever the membership changes, the cross_priority_host_map_ will be updated automatically.
// And all workers will create a new worker local load balancer and copy the
// cross_priority_host_map_.
// This leads to the possibility of simultaneous reading and writing of cross_priority_host_map_
// in different threads. For this reason, mutex is necessary to guard cross_priority_host_map_.
//
// Cross priority host map for fast cross priority host searching. When the priority update
// callback is executed, the host map will also be updated.
HostMapConstSharedPtr cross_priority_host_map_ ABSL_GUARDED_BY(mutex_);
};

virtual HashingLoadBalancerSharedPtr
createLoadBalancer(const NormalizedHostWeightVector& normalized_host_weights,
double min_normalized_weight, double max_normalized_weight) PURE;
void refresh();

void threadSafeSetCrossPriorityHostMap(HostMapConstSharedPtr host_map) {
absl::MutexLock ml(&cross_priority_host_map_mutex_);
cross_priority_host_map_ = std::move(host_map);
}
HostMapConstSharedPtr threadSafeGetCrossPriorityHostMap() {
absl::MutexLock ml(&cross_priority_host_map_mutex_);
return cross_priority_host_map_;
}

std::shared_ptr<LoadBalancerFactoryImpl> factory_;
Common::CallbackHandlePtr priority_update_cb_;

// Whenever the membership changes, the cross_priority_host_map_ will be updated automatically.
// And all workers will create a new worker local load balancer and copy the
// cross_priority_host_map_.
//
// This leads to the possibility of simultaneous reading and writing of cross_priority_host_map_
// in different threads. For this reason, an additional mutex is necessary to guard
// cross_priority_host_map_.
absl::Mutex cross_priority_host_map_mutex_;
// Cross priority host map for fast cross priority host searching. When the priority update
// callback is executed, the host map will also be updated.
HostMapConstSharedPtr cross_priority_host_map_ ABSL_GUARDED_BY(cross_priority_host_map_mutex_);
};

} // namespace Upstream
Expand Down
13 changes: 13 additions & 0 deletions test/common/upstream/maglev_lb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ TEST_F(MaglevLoadBalancerTest, SelectOverrideHost) {
EXPECT_EQ(mock_host, lb_->factory()->create()->chooseHost(&context));
}

// Test for thread aware load balancer destructed before load balancer factory. After CDS removes a
// cluster, the operation does not immediately reach the worker thread. There may be cases where the
// thread aware load balancer is destructed, but the load balancer factory is still used in the
// worker thread.
TEST_F(MaglevLoadBalancerTest, LbDestructedBeforeFactory) {
init(7);

auto factory = lb_->factory();
lb_.reset();

EXPECT_NE(nullptr, factory->create());
}

// Throws an exception if table size is not a prime number.
TEST_F(MaglevLoadBalancerTest, NoPrimeNumber) {
EXPECT_THROW_WITH_MESSAGE(init(8), EnvoyException,
Expand Down
13 changes: 13 additions & 0 deletions test/common/upstream/ring_hash_lb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ TEST_P(RingHashLoadBalancerTest, SelectOverrideHost) {
EXPECT_EQ(mock_host, lb_->factory()->create()->chooseHost(&context));
}

// Test for thread aware load balancer destructed before load balancer factory. After CDS removes a
// cluster, the operation does not immediately reach the worker thread. There may be cases where the
// thread aware load balancer is destructed, but the load balancer factory is still used in the
// worker thread.
TEST_P(RingHashLoadBalancerTest, LbDestructedBeforeFactory) {
init();

auto factory = lb_->factory();
lb_.reset();

EXPECT_NE(nullptr, factory->create());
}

// Given minimum_ring_size > maximum_ring_size, expect an exception.
TEST_P(RingHashLoadBalancerTest, BadRingSizeBounds) {
config_ = envoy::config::cluster::v3::Cluster::RingHashLbConfig();
Expand Down
14 changes: 8 additions & 6 deletions test/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,12 @@ std::string ConfigHelper::adsBootstrap(const std::string& api_type) {
}

// TODO(samflattery): bundle this up with buildCluster
envoy::config::cluster::v3::Cluster
ConfigHelper::buildStaticCluster(const std::string& name, int port, const std::string& address) {
return TestUtility::parseYaml<envoy::config::cluster::v3::Cluster>(fmt::format(R"EOF(
envoy::config::cluster::v3::Cluster ConfigHelper::buildStaticCluster(const std::string& name,
int port,
const std::string& address,
const std::string& lb_policy) {
return TestUtility::parseYaml<envoy::config::cluster::v3::Cluster>(
fmt::format(R"EOF(
name: {}
connect_timeout: 5s
type: STATIC
Expand All @@ -419,15 +422,14 @@ ConfigHelper::buildStaticCluster(const std::string& name, int port, const std::s
socket_address:
address: {}
port_value: {}
lb_policy: ROUND_ROBIN
lb_policy: {}
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {{}}
)EOF",
name, name,
address, port));
name, name, address, port, lb_policy));
}

envoy::config::cluster::v3::Cluster ConfigHelper::buildCluster(const std::string& name,
Expand Down
5 changes: 3 additions & 2 deletions test/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ class ConfigHelper {
static std::string discoveredClustersBootstrap(const std::string& api_type);
static std::string adsBootstrap(const std::string& api_type);
// Builds a standard Cluster config fragment, with a single endpoint (at address:port).
static envoy::config::cluster::v3::Cluster buildStaticCluster(const std::string& name, int port,
const std::string& address);
static envoy::config::cluster::v3::Cluster
buildStaticCluster(const std::string& name, int port, const std::string& address,
const std::string& lb_policy = "ROUND_ROBIN");

// ADS configurations
static envoy::config::cluster::v3::Cluster
Expand Down
34 changes: 34 additions & 0 deletions test/integration/cds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,40 @@ TEST_P(CdsIntegrationTest, CdsClusterUpDownUp) {
cleanupUpstreamAndDownstream();
}

// Test the fast addition and removal of clusters when they use ThreadAwareLb.
TEST_P(CdsIntegrationTest, CdsClusterWithThreadAwareLbCycleUpDownUp) {
// Calls our initialize(), which includes establishing a listener, route, and cluster.
testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1");
test_server_->waitForCounterGe("cluster_manager.cluster_added", 1);

// Tell Envoy that cluster_1 is gone.
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {}));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(Config::TypeUrl::get().Cluster, {}, {},
{ClusterName1}, "42");
// Make sure that Envoy's ClusterManager has made use of the DiscoveryResponse that says cluster_1
// is gone.
test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1);

// Update cluster1_ to use MAGLEV load balancer policy.
cluster1_ = ConfigHelper::buildStaticCluster(
ClusterName1, fake_upstreams_[UpstreamIndex1]->localAddress()->ip()->port(),
Network::Test::getLoopbackAddressString(ipVersion()), "MAGLEV");

// Cyclically add and remove cluster with ThreadAwareLb.
for (int i = 42; i < 142; i += 2) {
EXPECT_TRUE(
compareDiscoveryRequest(Config::TypeUrl::get().Cluster, absl::StrCat(i), {}, {}, {}));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
Config::TypeUrl::get().Cluster, {cluster1_}, {cluster1_}, {}, absl::StrCat(i + 1));
EXPECT_TRUE(
compareDiscoveryRequest(Config::TypeUrl::get().Cluster, absl::StrCat(i + 1), {}, {}, {}));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
Config::TypeUrl::get().Cluster, {}, {}, {ClusterName1}, absl::StrCat(i + 2));
}

cleanupUpstreamAndDownstream();
}

// Tests adding a cluster, adding another, then removing the first.
TEST_P(CdsIntegrationTest, TwoClusters) {
// Calls our initialize(), which includes establishing a listener, route, and cluster.
Expand Down