diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 15df928be990e..259c688f74a4f 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -63,10 +63,20 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) { const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); }; if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { + // Remove the previous cluster before the cluster object is destroyed. + primary_init_clusters_.remove_if( + [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { + return cluster_iter->info()->name() == name_to_remove; + }); primary_init_clusters_.push_back(&cluster); cluster.initialize(initialize_cb); } else { ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary); + // Remove the previous cluster before the cluster object is destroyed. + secondary_init_clusters_.remove_if( + [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { + return cluster_iter->info()->name() == name_to_remove; + }); secondary_init_clusters_.push_back(&cluster); if (started_secondary_initialize_) { // This can happen if we get a second CDS update that adds new clusters after we have @@ -417,7 +427,16 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { // been setup for cross-thread updates to avoid needless updates during initialization. The order // of operations here is important. We start by initializing the thread aware load balancer if // needed. This must happen first so cluster updates are heard first by the load balancer. - auto cluster_data = active_clusters_.find(cluster.info()->name()); + // Also, it assures that all of clusters which this function is called should be always active. + auto cluster_data = warming_clusters_.find(cluster.info()->name()); + // We have a situation that clusters will be immediately active, such as static and primary + // cluster. So we must have this prevention logic here. + if (cluster_data != warming_clusters_.end()) { + clusterWarmingToActive(cluster.info()->name()); + updateClusterCounts(); + } + cluster_data = active_clusters_.find(cluster.info()->name()); + if (cluster_data->second->thread_aware_lb_ != nullptr) { cluster_data->second->thread_aware_lb_->initialize(); } @@ -587,17 +606,6 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl // The following init manager remove call is a NOP in the case we are already initialized. // It's just kept here to avoid additional logic. init_helper_.removeCluster(*existing_active_cluster->second->cluster_); - } else { - // Validate that warming clusters are not added to the init_helper_. - // NOTE: This loop is compiled out in optimized builds. - for (const std::list& cluster_list : - {std::cref(init_helper_.primary_init_clusters_), - std::cref(init_helper_.secondary_init_clusters_)}) { - ASSERT(!std::any_of(cluster_list.begin(), cluster_list.end(), - [&existing_warming_cluster](Cluster* cluster) { - return existing_warming_cluster->second->cluster_.get() == cluster; - })); - } } cm_stats_.cluster_modified_.inc(); } else { @@ -614,40 +622,41 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl // the future we may decide to undergo a refactor to unify the logic but the effort/risk to // do that right now does not seem worth it given that the logic is generally pretty clean // and easy to understand. - const bool use_active_map = - init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized; - loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_); - - if (use_active_map) { + const bool all_clusters_initialized = + init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized; + // Preserve the previous cluster data to avoid early destroy. The same cluster should be added + // before destroy to avoid early initialization complete. + const auto previous_cluster = loadCluster(cluster, version_info, true, warming_clusters_); + auto& cluster_entry = warming_clusters_.at(cluster_name); + if (!all_clusters_initialized) { ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name); - auto& cluster_entry = active_clusters_.at(cluster_name); createOrUpdateThreadLocalCluster(*cluster_entry); init_helper_.addCluster(*cluster_entry->cluster_); } else { - auto& cluster_entry = warming_clusters_.at(cluster_name); ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name); cluster_entry->cluster_->initialize([this, cluster_name] { - auto warming_it = warming_clusters_.find(cluster_name); - auto& cluster_entry = *warming_it->second; - - // If the cluster is being updated, we need to cancel any pending merged updates. - // Otherwise, applyUpdates() will fire with a dangling cluster reference. - updates_map_.erase(cluster_name); - - active_clusters_[cluster_name] = std::move(warming_it->second); - warming_clusters_.erase(warming_it); - ENVOY_LOG(debug, "warming cluster {} complete", cluster_name); - createOrUpdateThreadLocalCluster(cluster_entry); - onClusterInit(*cluster_entry.cluster_); - updateClusterCounts(); + auto state_changed_cluster_entry = warming_clusters_.find(cluster_name); + createOrUpdateThreadLocalCluster(*state_changed_cluster_entry->second); + onClusterInit(*state_changed_cluster_entry->second->cluster_); }); } - updateClusterCounts(); return true; } +void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) { + auto warming_it = warming_clusters_.find(cluster_name); + ASSERT(warming_it != warming_clusters_.end()); + + // If the cluster is being updated, we need to cancel any pending merged updates. + // Otherwise, applyUpdates() will fire with a dangling cluster reference. + updates_map_.erase(cluster_name); + + active_clusters_[cluster_name] = std::move(warming_it->second); + warming_clusters_.erase(warming_it); +} + void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) { tls_->runOnAllThreads([new_cluster = cluster.cluster_->info(), thread_aware_lb_factory = cluster.loadBalancerFactory()]( @@ -702,6 +711,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { if (existing_warming_cluster != warming_clusters_.end() && existing_warming_cluster->second->added_via_api_) { removed = true; + init_helper_.removeCluster(*existing_warming_cluster->second->cluster_); warming_clusters_.erase(existing_warming_cluster); ENVOY_LOG(info, "removing warming cluster {}", cluster_name); } @@ -716,9 +726,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { return removed; } -void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster, - const std::string& version_info, bool added_via_api, - ClusterMap& cluster_map) { +ClusterManagerImpl::ClusterDataPtr +ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster, + const std::string& version_info, bool added_via_api, + ClusterMap& cluster_map) { std::pair new_cluster_pair = factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api); auto& new_cluster = new_cluster_pair.first; @@ -763,11 +774,20 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& } }); } - - cluster_map[cluster_reference.info()->name()] = std::make_unique( - cluster, version_info, added_via_api, std::move(new_cluster), time_source_); - const auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name()); - + ClusterDataPtr result; + auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name()); + if (cluster_entry_it != cluster_map.end()) { + result = std::exchange(cluster_entry_it->second, + std::make_unique(cluster, version_info, added_via_api, + std::move(new_cluster), time_source_)); + } else { + bool inserted = false; + std::tie(cluster_entry_it, inserted) = + cluster_map.emplace(cluster_reference.info()->name(), + std::make_unique(cluster, version_info, added_via_api, + std::move(new_cluster), time_source_)); + ASSERT(inserted); + } // If an LB is thread aware, create it here. The LB is not initialized until cluster pre-init // finishes. For RingHash/Maglev don't create the LB here if subset balancing is enabled, // because the thread_aware_lb_ field takes precedence over the subset lb). @@ -790,6 +810,7 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& } updateClusterCounts(); + return result; } void ClusterManagerImpl::updateClusterCounts() { @@ -804,7 +825,9 @@ void ClusterManagerImpl::updateClusterCounts() { // Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a // signal to ADS to proceed with RDS updates. // If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant. - if (ads_mux_) { + const bool all_clusters_initialized = + init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized; + if (all_clusters_initialized && ads_mux_) { const auto type_urls = Config::getAllVersionTypeUrls(); const uint64_t previous_warming = cm_stats_.warming_clusters_.value(); if (previous_warming == 0 && !warming_clusters_.empty()) { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 920681bff0ef5..c87c6652484cf 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -477,11 +477,18 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable prefetch_pool); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index b24c45330de52..155e2b6288577 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -1055,7 +1055,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { last_updated: seconds: 1234567891 nanos: 234000000 - dynamic_active_clusters: + dynamic_warming_clusters: - version_info: "version1" cluster: "@type": type.googleapis.com/envoy.config.cluster.v3.Cluster @@ -1107,7 +1107,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { last_updated: seconds: 1234567891 nanos: 234000000 - dynamic_warming_clusters: + dynamic_active_clusters: )EOF"); EXPECT_CALL(*cluster3, initialize(_)); @@ -1233,6 +1233,105 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); } +TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { + const std::string json = fmt::sprintf( + R"EOF( + { + "dynamic_resources": { + "cds_config": { + "api_config_source": { + "api_type": "0", + "refresh_delay": "30s", + "cluster_names": ["cds_cluster"] + } + } + }, + "static_resources": { + %s + } + } + )EOF", + clustersJson({ + defaultStaticClusterJson("cds_cluster"), + })); + + MockCdsApi* cds = new MockCdsApi(); + std::shared_ptr cds_cluster( + new NiceMock()); + cds_cluster->info_->name_ = "cds_cluster"; + + // This part tests static init. + InSequence s; + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cds_cluster, nullptr))); + ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds)); + EXPECT_CALL(*cds, setInitializedCb(_)); + EXPECT_CALL(*cds_cluster, initialize(_)); + + create(parseBootstrapFromV3Json(json)); + + ReadyWatcher cm_initialized; + cluster_manager_->setInitializedCb([&]() -> void { cm_initialized.ready(); }); + + const std::string ready_cluster_yaml = R"EOF( + name: fake_cluster + connect_timeout: 0.250s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: fake_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + )EOF"; + + const std::string warming_cluster_yaml = R"EOF( + name: fake_cluster + connect_timeout: 0.250s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: fake_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.com + port_value: 11001 + )EOF"; + + { + SCOPED_TRACE("Add a primary cluster staying in warming."); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(warming_cluster_yaml), + "warming")); + + // Mark all the rest of the clusters ready. Now the only warming cluster is the above one. + EXPECT_CALL(cm_initialized, ready()).Times(0); + cds_cluster->initialize_callback_(); + } + + { + SCOPED_TRACE("Modify the only warming primary cluster to immediate ready."); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); + EXPECT_CALL(*cds, initialize()); + EXPECT_TRUE( + cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(ready_cluster_yaml), "ready")); + } + { + SCOPED_TRACE("All clusters are ready."); + EXPECT_CALL(cm_initialized, ready()); + cds->initialized_callback_(); + } + EXPECT_TRUE(Mock::VerifyAndClearExpectations(cds_cluster.get())); +} + TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); create(defaultConfig()); @@ -2984,6 +3083,33 @@ TEST_F(ClusterManagerInitHelperTest, StaticSdsInitialize) { cluster1.initialize_callback_(); } +// Verify that primary cluster can be updated in warming state. +TEST_F(ClusterManagerInitHelperTest, TestUpdateWarming) { + InSequence s; + + auto sds = std::make_unique>(); + ON_CALL(*sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(*sds, initialize(_)); + init_helper_.addCluster(*sds); + init_helper_.onStaticLoadComplete(); + + NiceMock updated_sds; + ON_CALL(updated_sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(updated_sds, initialize(_)); + init_helper_.addCluster(updated_sds); + + // The override cluster is added. Manually drop the previous cluster. In production flow this is + // achieved by ClusterManagerImpl. + sds.reset(); + + ReadyWatcher primary_initialized; + init_helper_.setPrimaryClustersInitializedCb([&]() -> void { primary_initialized.ready(); }); + + EXPECT_CALL(*this, onClusterInit(Ref(updated_sds))); + EXPECT_CALL(primary_initialized, ready()); + updated_sds.initialize_callback_(); +} + TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) { InSequence s; @@ -3087,6 +3213,7 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) { init_helper_.addCluster(cluster1); NiceMock cluster2; + cluster2.info_->name_ = "cluster2"; ON_CALL(cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); init_helper_.addCluster(cluster2); @@ -3099,6 +3226,8 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) { init_helper_.startInitializingSecondaryClusters(); NiceMock cluster3; + cluster3.info_->name_ = "cluster3"; + ON_CALL(cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); EXPECT_CALL(cluster3, initialize(_)); init_helper_.addCluster(cluster3); diff --git a/test/integration/ads_integration.cc b/test/integration/ads_integration.cc index 7d81b1de0a1b4..6a0db9004244f 100644 --- a/test/integration/ads_integration.cc +++ b/test/integration/ads_integration.cc @@ -34,8 +34,9 @@ AdsIntegrationTest::AdsIntegrationTest(const envoy::config::core::v3::ApiVersion void AdsIntegrationTest::TearDown() { cleanUpXdsConnection(); } -envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name) { - return ConfigHelper::buildCluster(name, "ROUND_ROBIN", api_version_); +envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name, + const std::string& lb_policy) { + return ConfigHelper::buildCluster(name, lb_policy, api_version_); } envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildTlsCluster(const std::string& name) { diff --git a/test/integration/ads_integration.h b/test/integration/ads_integration.h index 0da99aea566a1..8c9a8e0ab3e6c 100644 --- a/test/integration/ads_integration.h +++ b/test/integration/ads_integration.h @@ -22,7 +22,8 @@ class AdsIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, public Ht void TearDown() override; - envoy::config::cluster::v3::Cluster buildCluster(const std::string& name); + envoy::config::cluster::v3::Cluster buildCluster(const std::string& name, + const std::string& lb_policy = "ROUND_ROBIN"); envoy::config::cluster::v3::Cluster buildTlsCluster(const std::string& name); diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index b49b217464bfd..053dfcbc9868a 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1132,6 +1132,140 @@ class AdsClusterV3Test : public AdsIntegrationTest { INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDelta, AdsClusterV3Test, DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS); +TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + const auto eds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {})); + sendDiscoveryResponse( + eds_type_url, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1", false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); +} + +// Update the only warming cluster. Verify that the new cluster is still warming and the cluster +// manager as a whole is not initialized. +TEST_P(AdsClusterV3Test, ClusterInitializationUpdateTheOnlyWarmingCluster) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + const auto eds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + // Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash. + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0", "MAGLEV")}, {buildCluster("cluster_0", "MAGLEV")}, + {}, "2", false); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {})); + sendDiscoveryResponse( + eds_type_url, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1", false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); +} + +// Primary cluster is warming during cluster initialization. Update the cluster with immediate ready +// config and verify that all the clusters are initialized. +TEST_P(AdsClusterV3Test, TestPrimaryClusterWarmClusterInitialization) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + auto loopback = Network::Test::getLoopbackAddressString(ipVersion()); + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + auto port = fake_upstreams_.back()->localAddress()->ip()->port(); + + // This cluster will be blocked since endpoint name cannot be resolved. + auto warming_cluster = ConfigHelper::buildStaticCluster("fake_cluster", port, loopback); + // Below endpoint accepts request but never return. The health check hangs 1 hour which covers the + // test running. + auto blocking_health_check = TestUtility::parseYaml(R"EOF( + timeout: 3600s + interval: 3600s + unhealthy_threshold: 2 + healthy_threshold: 2 + tcp_health_check: + send: + text: '01' + receive: + - text: '02' + )EOF"); + *warming_cluster.add_health_checks() = blocking_health_check; + + // Active cluster has the same name with warming cluster but has no blocking health check. + auto active_cluster = ConfigHelper::buildStaticCluster("fake_cluster", port, loopback); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse(cds_type_url, {warming_cluster}, + {warming_cluster}, {}, "1", false); + + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_.back()->waitForRawConnection(fake_upstream_connection)); + + // fake_cluster is in warming. + test_server_->waitForGaugeGe("cluster_manager.warming_clusters", 1); + + // Now replace the warming cluster by the config which will turn ready immediately. + sendDiscoveryResponse(cds_type_url, {active_cluster}, + {active_cluster}, {}, "2", false); + + // All clusters are ready. + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); +} + +// Two cluster warming, update one of them. Verify that the clusters are eventually initialized. +TEST_P(AdsClusterV3Test, ClusterInitializationUpdateOneOfThe2Warming) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + const auto eds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse( + cds_type_url, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0"), buildCluster("cluster_1")}, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0"), buildCluster("cluster_1")}, + {}, "1", false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); + + // Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash. + sendDiscoveryResponse( + cds_type_url, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, + {}, "2", false); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0", "cluster_1"}, + {"cluster_0", "cluster_1"}, {})); + sendDiscoveryResponse( + eds_type_url, + {buildClusterLoadAssignment("cluster_0"), buildClusterLoadAssignment("cluster_1")}, + {buildClusterLoadAssignment("cluster_0"), buildClusterLoadAssignment("cluster_1")}, {}, "1", + false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); +} + // Verify CDS is paused during cluster warming. TEST_P(AdsClusterV3Test, CdsPausedDuringWarming) { initialize(); diff --git a/test/integration/base_integration_test.cc b/test/integration/base_integration_test.cc index 4a2623f43ae4c..73bf987fe77fd 100644 --- a/test/integration/base_integration_test.cc +++ b/test/integration/base_integration_test.cc @@ -415,6 +415,22 @@ AssertionResult BaseIntegrationTest::compareDiscoveryRequest( } } +AssertionResult compareSets(const std::set& set1, const std::set& set2, + absl::string_view name) { + if (set1 == set2) { + return AssertionSuccess(); + } + auto failure = AssertionFailure() << name << " field not as expected.\nExpected: {"; + for (const auto& x : set1) { + failure << x << ", "; + } + failure << "}\nActual: {"; + for (const auto& x : set2) { + failure << x << ", "; + } + return failure << "}"; +} + AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( const std::string& expected_type_url, const std::string& expected_version, const std::vector& expected_resource_names, bool expect_node, @@ -441,12 +457,12 @@ AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( } EXPECT_TRUE( IsSubstring("", "", expected_error_substring, discovery_request.error_detail().message())); - const std::vector resource_names(discovery_request.resource_names().cbegin(), - discovery_request.resource_names().cend()); - if (expected_resource_names != resource_names) { - return AssertionFailure() << fmt::format( - "resources {} do not match expected {} in {}", absl::StrJoin(resource_names, ","), - absl::StrJoin(expected_resource_names, ","), discovery_request.DebugString()); + const std::set resource_names_in_request(discovery_request.resource_names().cbegin(), + discovery_request.resource_names().cend()); + if (auto resource_name_result = compareSets( + std::set(expected_resource_names.cbegin(), expected_resource_names.cend()), + resource_names_in_request, "Sotw resource names")) { + return resource_name_result; } if (expected_version != discovery_request.version_info()) { return AssertionFailure() << fmt::format("version {} does not match expected {} in {}", @@ -456,22 +472,6 @@ AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( return AssertionSuccess(); } -AssertionResult compareSets(const std::set& set1, const std::set& set2, - absl::string_view name) { - if (set1 == set2) { - return AssertionSuccess(); - } - auto failure = AssertionFailure() << name << " field not as expected.\nExpected: {"; - for (const auto& x : set1) { - failure << x << ", "; - } - failure << "}\nActual: {"; - for (const auto& x : set2) { - failure << x << ", "; - } - return failure << "}"; -} - AssertionResult BaseIntegrationTest::waitForPortAvailable(uint32_t port, std::chrono::milliseconds timeout) { Event::TestTimeSystem::RealTimeBound bound(timeout);