diff --git a/source/common/upstream/thread_aware_lb_impl.cc b/source/common/upstream/thread_aware_lb_impl.cc index 8da946d77d980..771da91b7f2bf 100644 --- a/source/common/upstream/thread_aware_lb_impl.cc +++ b/source/common/upstream/thread_aware_lb_impl.cc @@ -95,10 +95,7 @@ void ThreadAwareLoadBalancerBase::initialize() { // complicated initialization as the load balancer would need its own initialized callback. I // think the synchronous/asynchronous split is probably the best option. priority_update_cb_ = priority_set_.addPriorityUpdateCb( - [this](uint32_t, const HostVector&, const HostVector&) -> void { - refresh(); - threadSafeSetCrossPriorityHostMap(priority_set_.crossPriorityHostMap()); - }); + [this](uint32_t, const HostVector&, const HostVector&) -> void { refresh(); }); refresh(); } @@ -134,6 +131,7 @@ void ThreadAwareLoadBalancerBase::refresh() { factory_->healthy_per_priority_load_ = healthy_per_priority_load; factory_->degraded_per_priority_load_ = degraded_per_priority_load; factory_->per_priority_state_ = per_priority_state_vector; + factory_->cross_priority_host_map_ = priority_set_.crossPriorityHostMap(); } } @@ -181,8 +179,7 @@ ThreadAwareLoadBalancerBase::LoadBalancerImpl::chooseHost(LoadBalancerContext* c } LoadBalancerPtr ThreadAwareLoadBalancerBase::LoadBalancerFactoryImpl::create() { - auto lb = std::make_unique( - stats_, random_, thread_aware_lb_.threadSafeGetCrossPriorityHostMap()); + auto lb = std::make_unique(stats_, random_); // We must protect current_lb_ via a RW lock since it is accessed and written to by multiple // threads. All complex processing has already been precalculated however. @@ -190,6 +187,7 @@ LoadBalancerPtr ThreadAwareLoadBalancerBase::LoadBalancerFactoryImpl::create() { lb->healthy_per_priority_load_ = healthy_per_priority_load_; lb->degraded_per_priority_load_ = degraded_per_priority_load_; lb->per_priority_state_ = per_priority_state_; + lb->cross_priority_host_map_ = cross_priority_host_map_; return lb; } diff --git a/source/common/upstream/thread_aware_lb_impl.h b/source/common/upstream/thread_aware_lb_impl.h index 81a1e5e2e4c83..fa26abddf98b4 100644 --- a/source/common/upstream/thread_aware_lb_impl.h +++ b/source/common/upstream/thread_aware_lb_impl.h @@ -98,7 +98,7 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL Random::RandomGenerator& random, const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) : LoadBalancerBase(priority_set, stats, runtime, random, common_config), - factory_(new LoadBalancerFactoryImpl(stats, random, *this)) {} + factory_(new LoadBalancerFactoryImpl(stats, random)) {} private: struct PerPriorityState { @@ -108,9 +108,8 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL using PerPriorityStatePtr = std::unique_ptr; struct LoadBalancerImpl : public LoadBalancer { - LoadBalancerImpl(ClusterStats& stats, Random::RandomGenerator& random, - HostMapConstSharedPtr host_map) - : stats_(stats), random_(random), cross_priority_host_map_(std::move(host_map)) {} + LoadBalancerImpl(ClusterStats& stats, Random::RandomGenerator& random) + : stats_(stats), random_(random) {} // Upstream::LoadBalancer HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; @@ -128,15 +127,12 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL }; struct LoadBalancerFactoryImpl : public LoadBalancerFactory { - LoadBalancerFactoryImpl(ClusterStats& stats, Random::RandomGenerator& random, - ThreadAwareLoadBalancerBase& thread_aware_lb) - : thread_aware_lb_(thread_aware_lb), stats_(stats), random_(random) {} + LoadBalancerFactoryImpl(ClusterStats& stats, Random::RandomGenerator& random) + : stats_(stats), random_(random) {} // Upstream::LoadBalancerFactory LoadBalancerPtr create() override; - ThreadAwareLoadBalancerBase& thread_aware_lb_; - ClusterStats& stats_; Random::RandomGenerator& random_; absl::Mutex mutex_; @@ -144,6 +140,16 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL // This is split out of PerPriorityState so LoadBalancerBase::ChoosePriority can be reused. std::shared_ptr healthy_per_priority_load_ ABSL_GUARDED_BY(mutex_); std::shared_ptr degraded_per_priority_load_ ABSL_GUARDED_BY(mutex_); + + // Whenever the membership changes, the cross_priority_host_map_ will be updated automatically. + // And all workers will create a new worker local load balancer and copy the + // cross_priority_host_map_. + // This leads to the possibility of simultaneous reading and writing of cross_priority_host_map_ + // in different threads. For this reason, mutex is necessary to guard cross_priority_host_map_. + // + // Cross priority host map for fast cross priority host searching. When the priority update + // callback is executed, the host map will also be updated. + HostMapConstSharedPtr cross_priority_host_map_ ABSL_GUARDED_BY(mutex_); }; virtual HashingLoadBalancerSharedPtr @@ -151,29 +157,8 @@ class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareL double min_normalized_weight, double max_normalized_weight) PURE; void refresh(); - void threadSafeSetCrossPriorityHostMap(HostMapConstSharedPtr host_map) { - absl::MutexLock ml(&cross_priority_host_map_mutex_); - cross_priority_host_map_ = std::move(host_map); - } - HostMapConstSharedPtr threadSafeGetCrossPriorityHostMap() { - absl::MutexLock ml(&cross_priority_host_map_mutex_); - return cross_priority_host_map_; - } - std::shared_ptr factory_; Common::CallbackHandlePtr priority_update_cb_; - - // Whenever the membership changes, the cross_priority_host_map_ will be updated automatically. - // And all workers will create a new worker local load balancer and copy the - // cross_priority_host_map_. - // - // This leads to the possibility of simultaneous reading and writing of cross_priority_host_map_ - // in different threads. For this reason, an additional mutex is necessary to guard - // cross_priority_host_map_. - absl::Mutex cross_priority_host_map_mutex_; - // Cross priority host map for fast cross priority host searching. When the priority update - // callback is executed, the host map will also be updated. - HostMapConstSharedPtr cross_priority_host_map_ ABSL_GUARDED_BY(cross_priority_host_map_mutex_); }; } // namespace Upstream diff --git a/test/common/upstream/maglev_lb_test.cc b/test/common/upstream/maglev_lb_test.cc index 562d4f0e90fa6..d8472c5a5aefe 100644 --- a/test/common/upstream/maglev_lb_test.cc +++ b/test/common/upstream/maglev_lb_test.cc @@ -103,6 +103,19 @@ TEST_F(MaglevLoadBalancerTest, SelectOverrideHost) { EXPECT_EQ(mock_host, lb_->factory()->create()->chooseHost(&context)); } +// Test for thread aware load balancer destructed before load balancer factory. After CDS removes a +// cluster, the operation does not immediately reach the worker thread. There may be cases where the +// thread aware load balancer is destructed, but the load balancer factory is still used in the +// worker thread. +TEST_F(MaglevLoadBalancerTest, LbDestructedBeforeFactory) { + init(7); + + auto factory = lb_->factory(); + lb_.reset(); + + EXPECT_NE(nullptr, factory->create()); +} + // Throws an exception if table size is not a prime number. TEST_F(MaglevLoadBalancerTest, NoPrimeNumber) { EXPECT_THROW_WITH_MESSAGE(init(8), EnvoyException, diff --git a/test/common/upstream/ring_hash_lb_test.cc b/test/common/upstream/ring_hash_lb_test.cc index c259f610c54af..9d5b2c4141eff 100644 --- a/test/common/upstream/ring_hash_lb_test.cc +++ b/test/common/upstream/ring_hash_lb_test.cc @@ -120,6 +120,19 @@ TEST_P(RingHashLoadBalancerTest, SelectOverrideHost) { EXPECT_EQ(mock_host, lb_->factory()->create()->chooseHost(&context)); } +// Test for thread aware load balancer destructed before load balancer factory. After CDS removes a +// cluster, the operation does not immediately reach the worker thread. There may be cases where the +// thread aware load balancer is destructed, but the load balancer factory is still used in the +// worker thread. +TEST_P(RingHashLoadBalancerTest, LbDestructedBeforeFactory) { + init(); + + auto factory = lb_->factory(); + lb_.reset(); + + EXPECT_NE(nullptr, factory->create()); +} + // Given minimum_ring_size > maximum_ring_size, expect an exception. TEST_P(RingHashLoadBalancerTest, BadRingSizeBounds) { config_ = envoy::config::cluster::v3::Cluster::RingHashLbConfig(); diff --git a/test/config/utility.cc b/test/config/utility.cc index a5aca16f6c773..208e27a70c1c7 100644 --- a/test/config/utility.cc +++ b/test/config/utility.cc @@ -404,9 +404,12 @@ std::string ConfigHelper::adsBootstrap(const std::string& api_type) { } // TODO(samflattery): bundle this up with buildCluster -envoy::config::cluster::v3::Cluster -ConfigHelper::buildStaticCluster(const std::string& name, int port, const std::string& address) { - return TestUtility::parseYaml(fmt::format(R"EOF( +envoy::config::cluster::v3::Cluster ConfigHelper::buildStaticCluster(const std::string& name, + int port, + const std::string& address, + const std::string& lb_policy) { + return TestUtility::parseYaml( + fmt::format(R"EOF( name: {} connect_timeout: 5s type: STATIC @@ -419,15 +422,14 @@ ConfigHelper::buildStaticCluster(const std::string& name, int port, const std::s socket_address: address: {} port_value: {} - lb_policy: ROUND_ROBIN + lb_policy: {} typed_extension_protocol_options: envoy.extensions.upstreams.http.v3.HttpProtocolOptions: "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions explicit_http_config: http2_protocol_options: {{}} )EOF", - name, name, - address, port)); + name, name, address, port, lb_policy)); } envoy::config::cluster::v3::Cluster ConfigHelper::buildCluster(const std::string& name, diff --git a/test/config/utility.h b/test/config/utility.h index 7d94c4b8dbd0e..061726d8aff03 100644 --- a/test/config/utility.h +++ b/test/config/utility.h @@ -146,8 +146,9 @@ class ConfigHelper { static std::string discoveredClustersBootstrap(const std::string& api_type); static std::string adsBootstrap(const std::string& api_type); // Builds a standard Cluster config fragment, with a single endpoint (at address:port). - static envoy::config::cluster::v3::Cluster buildStaticCluster(const std::string& name, int port, - const std::string& address); + static envoy::config::cluster::v3::Cluster + buildStaticCluster(const std::string& name, int port, const std::string& address, + const std::string& lb_policy = "ROUND_ROBIN"); // ADS configurations static envoy::config::cluster::v3::Cluster diff --git a/test/integration/cds_integration_test.cc b/test/integration/cds_integration_test.cc index 3da9db2c5e897..bbffa27e9569c 100644 --- a/test/integration/cds_integration_test.cc +++ b/test/integration/cds_integration_test.cc @@ -175,6 +175,40 @@ TEST_P(CdsIntegrationTest, CdsClusterUpDownUp) { cleanupUpstreamAndDownstream(); } +// Test the fast addition and removal of clusters when they use ThreadAwareLb. +TEST_P(CdsIntegrationTest, CdsClusterWithThreadAwareLbCycleUpDownUp) { + // Calls our initialize(), which includes establishing a listener, route, and cluster. + testRouterHeaderOnlyRequestAndResponse(nullptr, UpstreamIndex1, "/cluster1"); + test_server_->waitForCounterGe("cluster_manager.cluster_added", 1); + + // Tell Envoy that cluster_1 is gone. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "55", {}, {}, {})); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, {}, {}, + {ClusterName1}, "42"); + // Make sure that Envoy's ClusterManager has made use of the DiscoveryResponse that says cluster_1 + // is gone. + test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); + + // Update cluster1_ to use MAGLEV load balancer policy. + cluster1_ = ConfigHelper::buildStaticCluster( + ClusterName1, fake_upstreams_[UpstreamIndex1]->localAddress()->ip()->port(), + Network::Test::getLoopbackAddressString(ipVersion()), "MAGLEV"); + + // Cyclically add and remove cluster with ThreadAwareLb. + for (int i = 42; i < 142; i += 2) { + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().Cluster, absl::StrCat(i), {}, {}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {cluster1_}, {cluster1_}, {}, absl::StrCat(i + 1)); + EXPECT_TRUE( + compareDiscoveryRequest(Config::TypeUrl::get().Cluster, absl::StrCat(i + 1), {}, {}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {}, {}, {ClusterName1}, absl::StrCat(i + 2)); + } + + cleanupUpstreamAndDownstream(); +} + // Tests adding a cluster, adding another, then removing the first. TEST_P(CdsIntegrationTest, TwoClusters) { // Calls our initialize(), which includes establishing a listener, route, and cluster.