diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 5a2fb1328c368..6eaef38a901b2 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -63,10 +63,20 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) { const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); }; if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { + // Remove the previous cluster before the cluster object is destroyed. + primary_init_clusters_.remove_if( + [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { + return cluster_iter->info()->name() == name_to_remove; + }); primary_init_clusters_.push_back(&cluster); cluster.initialize(initialize_cb); } else { ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary); + // Remove the previous cluster before the cluster object is destroyed. + secondary_init_clusters_.remove_if( + [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { + return cluster_iter->info()->name() == name_to_remove; + }); secondary_init_clusters_.push_back(&cluster); if (started_secondary_initialize_) { // This can happen if we get a second CDS update that adds new clusters after we have @@ -613,7 +623,9 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl // and easy to understand. const bool all_clusters_initialized = init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized; - loadCluster(cluster, version_info, true, warming_clusters_); + // Preserve the previous cluster data to avoid early destroy. The same cluster should be added + // before destroy to avoid early initialization complete. + const auto previous_cluster = loadCluster(cluster, version_info, true, warming_clusters_); auto& cluster_entry = warming_clusters_.at(cluster_name); if (!all_clusters_initialized) { ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name); @@ -702,9 +714,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { return removed; } -void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster, - const std::string& version_info, bool added_via_api, - ClusterMap& cluster_map) { +ClusterManagerImpl::ClusterDataPtr +ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster, + const std::string& version_info, bool added_via_api, + ClusterMap& cluster_map) { std::pair new_cluster_pair = factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api); auto& new_cluster = new_cluster_pair.first; @@ -749,11 +762,20 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& } }); } - - cluster_map[cluster_reference.info()->name()] = std::make_unique( - cluster, version_info, added_via_api, std::move(new_cluster), time_source_); - const auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name()); - + ClusterDataPtr result; + auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name()); + if (cluster_entry_it != cluster_map.end()) { + result = std::exchange(cluster_entry_it->second, + std::make_unique(cluster, version_info, added_via_api, + std::move(new_cluster), time_source_)); + } else { + bool inserted = false; + std::tie(cluster_entry_it, inserted) = + cluster_map.emplace(cluster_reference.info()->name(), + std::make_unique(cluster, version_info, added_via_api, + std::move(new_cluster), time_source_)); + ASSERT(inserted); + } // If an LB is thread aware, create it here. The LB is not initialized until cluster pre-init // finishes. For RingHash/Maglev don't create the LB here if subset balancing is enabled, // because the thread_aware_lb_ field takes precedence over the subset lb). @@ -776,6 +798,7 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& } updateClusterCounts(); + return result; } void ClusterManagerImpl::updateClusterCounts() { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 147bbdd4c35cc..ba3c55ba2af18 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -477,8 +477,14 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable cds_cluster( + new NiceMock()); + cds_cluster->info_->name_ = "cds_cluster"; + + // This part tests static init. + InSequence s; + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cds_cluster, nullptr))); + ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds)); + EXPECT_CALL(*cds, setInitializedCb(_)); + EXPECT_CALL(*cds_cluster, initialize(_)); + + create(parseBootstrapFromV3Json(json)); + + ReadyWatcher cm_initialized; + cluster_manager_->setInitializedCb([&]() -> void { cm_initialized.ready(); }); + + const std::string ready_cluster_yaml = R"EOF( + name: fake_cluster + connect_timeout: 0.250s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: fake_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + )EOF"; + + const std::string warming_cluster_yaml = R"EOF( + name: fake_cluster + connect_timeout: 0.250s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: fake_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.com + port_value: 11001 + )EOF"; + + { + SCOPED_TRACE("Add a primary cluster staying in warming."); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(warming_cluster_yaml), + "warming")); + + // Mark all the rest of the clusters ready. Now the only warming cluster is the above one. + EXPECT_CALL(cm_initialized, ready()).Times(0); + cds_cluster->initialize_callback_(); + } + + { + SCOPED_TRACE("Modify the only warming primary cluster to immediate ready."); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); + EXPECT_CALL(*cds, initialize()); + EXPECT_TRUE( + cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(ready_cluster_yaml), "ready")); + } + { + SCOPED_TRACE("All clusters are ready."); + EXPECT_CALL(cm_initialized, ready()); + cds->initialized_callback_(); + } + EXPECT_TRUE(Mock::VerifyAndClearExpectations(cds_cluster.get())); +} + TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); create(defaultConfig()); @@ -2984,6 +3083,33 @@ TEST_F(ClusterManagerInitHelperTest, StaticSdsInitialize) { cluster1.initialize_callback_(); } +// Verify that primary cluster can be updated in warming state. +TEST_F(ClusterManagerInitHelperTest, TestUpdateWarming) { + InSequence s; + + auto sds = std::make_unique>(); + ON_CALL(*sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(*sds, initialize(_)); + init_helper_.addCluster(*sds); + init_helper_.onStaticLoadComplete(); + + NiceMock updated_sds; + ON_CALL(updated_sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(updated_sds, initialize(_)); + init_helper_.addCluster(updated_sds); + + // The override cluster is added. Manually drop the previous cluster. In production flow this is + // achieved by ClusterManagerImpl. + sds.reset(); + + ReadyWatcher primary_initialized; + init_helper_.setPrimaryClustersInitializedCb([&]() -> void { primary_initialized.ready(); }); + + EXPECT_CALL(*this, onClusterInit(Ref(updated_sds))); + EXPECT_CALL(primary_initialized, ready()); + updated_sds.initialize_callback_(); +} + TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) { InSequence s; @@ -3087,6 +3213,7 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) { init_helper_.addCluster(cluster1); NiceMock cluster2; + cluster2.info_->name_ = "cluster2"; ON_CALL(cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); init_helper_.addCluster(cluster2); @@ -3099,6 +3226,8 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) { init_helper_.startInitializingSecondaryClusters(); NiceMock cluster3; + cluster3.info_->name_ = "cluster3"; + ON_CALL(cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); EXPECT_CALL(cluster3, initialize(_)); init_helper_.addCluster(cluster3); diff --git a/test/integration/ads_integration.cc b/test/integration/ads_integration.cc index 7d81b1de0a1b4..6a0db9004244f 100644 --- a/test/integration/ads_integration.cc +++ b/test/integration/ads_integration.cc @@ -34,8 +34,9 @@ AdsIntegrationTest::AdsIntegrationTest(const envoy::config::core::v3::ApiVersion void AdsIntegrationTest::TearDown() { cleanUpXdsConnection(); } -envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name) { - return ConfigHelper::buildCluster(name, "ROUND_ROBIN", api_version_); +envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name, + const std::string& lb_policy) { + return ConfigHelper::buildCluster(name, lb_policy, api_version_); } envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildTlsCluster(const std::string& name) { diff --git a/test/integration/ads_integration.h b/test/integration/ads_integration.h index 0da99aea566a1..8c9a8e0ab3e6c 100644 --- a/test/integration/ads_integration.h +++ b/test/integration/ads_integration.h @@ -22,7 +22,8 @@ class AdsIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, public Ht void TearDown() override; - envoy::config::cluster::v3::Cluster buildCluster(const std::string& name); + envoy::config::cluster::v3::Cluster buildCluster(const std::string& name, + const std::string& lb_policy = "ROUND_ROBIN"); envoy::config::cluster::v3::Cluster buildTlsCluster(const std::string& name); diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index 01aae9dc9f733..053dfcbc9868a 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1152,6 +1152,120 @@ TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) { test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); } +// Update the only warming cluster. Verify that the new cluster is still warming and the cluster +// manager as a whole is not initialized. +TEST_P(AdsClusterV3Test, ClusterInitializationUpdateTheOnlyWarmingCluster) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + const auto eds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + // Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash. + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0", "MAGLEV")}, {buildCluster("cluster_0", "MAGLEV")}, + {}, "2", false); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {})); + sendDiscoveryResponse( + eds_type_url, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1", false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); +} + +// Primary cluster is warming during cluster initialization. Update the cluster with immediate ready +// config and verify that all the clusters are initialized. +TEST_P(AdsClusterV3Test, TestPrimaryClusterWarmClusterInitialization) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + auto loopback = Network::Test::getLoopbackAddressString(ipVersion()); + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + auto port = fake_upstreams_.back()->localAddress()->ip()->port(); + + // This cluster will be blocked since endpoint name cannot be resolved. + auto warming_cluster = ConfigHelper::buildStaticCluster("fake_cluster", port, loopback); + // Below endpoint accepts request but never return. The health check hangs 1 hour which covers the + // test running. + auto blocking_health_check = TestUtility::parseYaml(R"EOF( + timeout: 3600s + interval: 3600s + unhealthy_threshold: 2 + healthy_threshold: 2 + tcp_health_check: + send: + text: '01' + receive: + - text: '02' + )EOF"); + *warming_cluster.add_health_checks() = blocking_health_check; + + // Active cluster has the same name with warming cluster but has no blocking health check. + auto active_cluster = ConfigHelper::buildStaticCluster("fake_cluster", port, loopback); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse(cds_type_url, {warming_cluster}, + {warming_cluster}, {}, "1", false); + + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_.back()->waitForRawConnection(fake_upstream_connection)); + + // fake_cluster is in warming. + test_server_->waitForGaugeGe("cluster_manager.warming_clusters", 1); + + // Now replace the warming cluster by the config which will turn ready immediately. + sendDiscoveryResponse(cds_type_url, {active_cluster}, + {active_cluster}, {}, "2", false); + + // All clusters are ready. + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); +} + +// Two cluster warming, update one of them. Verify that the clusters are eventually initialized. +TEST_P(AdsClusterV3Test, ClusterInitializationUpdateOneOfThe2Warming) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + const auto eds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse( + cds_type_url, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0"), buildCluster("cluster_1")}, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0"), buildCluster("cluster_1")}, + {}, "1", false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); + + // Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash. + sendDiscoveryResponse( + cds_type_url, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, + {}, "2", false); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0", "cluster_1"}, + {"cluster_0", "cluster_1"}, {})); + sendDiscoveryResponse( + eds_type_url, + {buildClusterLoadAssignment("cluster_0"), buildClusterLoadAssignment("cluster_1")}, + {buildClusterLoadAssignment("cluster_0"), buildClusterLoadAssignment("cluster_1")}, {}, "1", + false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); +} + // Verify CDS is paused during cluster warming. TEST_P(AdsClusterV3Test, CdsPausedDuringWarming) { initialize(); diff --git a/test/integration/base_integration_test.cc b/test/integration/base_integration_test.cc index ade05762720c4..354d309ef86fe 100644 --- a/test/integration/base_integration_test.cc +++ b/test/integration/base_integration_test.cc @@ -416,6 +416,22 @@ AssertionResult BaseIntegrationTest::compareDiscoveryRequest( } } +AssertionResult compareSets(const std::set& set1, const std::set& set2, + absl::string_view name) { + if (set1 == set2) { + return AssertionSuccess(); + } + auto failure = AssertionFailure() << name << " field not as expected.\nExpected: {"; + for (const auto& x : set1) { + failure << x << ", "; + } + failure << "}\nActual: {"; + for (const auto& x : set2) { + failure << x << ", "; + } + return failure << "}"; +} + AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( const std::string& expected_type_url, const std::string& expected_version, const std::vector& expected_resource_names, bool expect_node, @@ -442,12 +458,12 @@ AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( } EXPECT_TRUE( IsSubstring("", "", expected_error_substring, discovery_request.error_detail().message())); - const std::vector resource_names(discovery_request.resource_names().cbegin(), - discovery_request.resource_names().cend()); - if (expected_resource_names != resource_names) { - return AssertionFailure() << fmt::format( - "resources {} do not match expected {} in {}", absl::StrJoin(resource_names, ","), - absl::StrJoin(expected_resource_names, ","), discovery_request.DebugString()); + const std::set resource_names_in_request(discovery_request.resource_names().cbegin(), + discovery_request.resource_names().cend()); + if (auto resource_name_result = compareSets( + std::set(expected_resource_names.cbegin(), expected_resource_names.cend()), + resource_names_in_request, "Sotw resource names")) { + return resource_name_result; } if (expected_version != discovery_request.version_info()) { return AssertionFailure() << fmt::format("version {} does not match expected {} in {}", @@ -457,22 +473,6 @@ AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( return AssertionSuccess(); } -AssertionResult compareSets(const std::set& set1, const std::set& set2, - absl::string_view name) { - if (set1 == set2) { - return AssertionSuccess(); - } - auto failure = AssertionFailure() << name << " field not as expected.\nExpected: {"; - for (const auto& x : set1) { - failure << x << ", "; - } - failure << "}\nActual: {"; - for (const auto& x : set2) { - failure << x << ", "; - } - return failure << "}"; -} - AssertionResult BaseIntegrationTest::waitForPortAvailable(uint32_t port, std::chrono::milliseconds timeout) { Event::TestTimeSystem::RealTimeBound bound(timeout);