From c2fb5aa2aa7ab371ef57f186e48d18b44405412d Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Thu, 12 Nov 2020 08:51:08 +0000 Subject: [PATCH 1/5] clean up Signed-off-by: Yuchen Dai --- include/envoy/upstream/cluster_manager.h | 6 ++ source/common/upstream/cds_api_impl.cc | 6 +- source/common/upstream/cluster_manager_impl.h | 13 +++ test/common/upstream/cds_api_impl_test.cc | 23 +++-- test/integration/ads_integration_test.cc | 94 +++++++++++++++++-- test/mocks/upstream/cluster_manager.h | 2 + 6 files changed, 125 insertions(+), 19 deletions(-) diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 5939092a371be..6ed383db21464 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -133,6 +133,12 @@ class ClusterManager { */ virtual ClusterInfoMap clusters() PURE; + /** + * @return absl::flat_hash_set all current clusters names including warming and + * active. + */ + virtual absl::flat_hash_set allClusterNames() PURE; + using ClusterSet = absl::flat_hash_set; /** diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index 0ca7d5763d5a9..f703721139425 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -38,13 +38,13 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config, void CdsApiImpl::onConfigUpdate(const std::vector& resources, const std::string& version_info) { - ClusterManager::ClusterInfoMap clusters_to_remove = cm_.clusters(); + auto clusters_to_remove = cm_.allClusterNames(); std::vector clusters; for (const auto& resource : resources) { clusters_to_remove.erase(resource.get().name()); } Protobuf::RepeatedPtrField to_remove_repeated; - for (const auto& [cluster_name, _] : clusters_to_remove) { + for (const auto& cluster_name : clusters_to_remove) { *to_remove_repeated.Add() = cluster_name; } onConfigUpdate(resources, to_remove_repeated, version_info); @@ -64,7 +64,7 @@ void CdsApiImpl::onConfigUpdate(const std::vector& a removed_resources.size()); std::vector exception_msgs; - absl::node_hash_set cluster_names; + absl::flat_hash_set cluster_names(added_resources.size()); bool any_applied = false; for (const auto& resource : added_resources) { envoy::config::cluster::v3::Cluster cluster; diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index ba3c55ba2af18..14a5436b160f6 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -223,6 +223,19 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable allClusterNames() override { + absl::flat_hash_set clusters_names(active_clusters_.size() + warming_clusters_.size()); + + for (const auto& [name, _ ] : active_clusters_) { + clusters_names.emplace(name); + } + for (const auto& [name, _ ] : warming_clusters_) { + clusters_names.emplace(name); + } + return clusters_names; + } + const ClusterSet& primaryClusters() override { return primary_clusters_; } ThreadLocalCluster* get(absl::string_view cluster) override; diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index f052749677fb3..03ebb91f02286 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -62,8 +62,11 @@ class CdsApiImplTest : public testing::Test { return map; } + absl::flat_hash_set makeAllClusterNames(const std::vector& clusters) { + return absl::flat_hash_set(clusters.begin(), clusters.end()); + } + NiceMock cm_; - Upstream::ClusterManager::ClusterInfoMap cluster_map_; Upstream::MockClusterMockPrioritySet mock_cluster_; Stats::IsolatedStoreImpl store_; CdsApiPtr cds_; @@ -92,7 +95,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); expectAdd("cluster1", "0"); EXPECT_CALL(initialized_, ready()); EXPECT_EQ("", cds_->versionInfo()); @@ -108,7 +111,7 @@ version_info: '1' )EOF"; auto response2 = TestUtility::parseYaml(response2_yaml); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1"}))); + EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{"cluster1"})); EXPECT_CALL(cm_, removeCluster("cluster1")).WillOnce(Return(true)); const auto decoded_resources_2 = TestUtility::decodeResources(response2); @@ -126,7 +129,7 @@ TEST_F(CdsApiImplTest, ValidateDuplicateClusters) { cluster_1.set_name("duplicate_cluster"); const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1}); - EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_)); + EXPECT_CALL(cm_, allClusterNames()).WillRepeatedly(Return(absl::flat_hash_set{})); EXPECT_CALL(initialized_, ready()); EXPECT_THROW_WITH_MESSAGE(cds_callbacks_->onConfigUpdate(decoded_resources.refvec_, ""), EnvoyException, @@ -139,7 +142,7 @@ TEST_F(CdsApiImplTest, EmptyConfigUpdate) { setup(); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); EXPECT_CALL(initialized_, ready()); cds_callbacks_->onConfigUpdate({}, ""); @@ -151,7 +154,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateWith2ValidClusters) { setup(); } - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); EXPECT_CALL(initialized_, ready()); envoy::config::cluster::v3::Cluster cluster_1; @@ -224,7 +227,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateAddsSecondClusterEvenIfFirstThrows) { setup(); } - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); EXPECT_CALL(initialized_, ready()); envoy::config::cluster::v3::Cluster cluster_1; @@ -269,7 +272,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); + EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); expectAdd("cluster1", "0"); expectAdd("cluster2", "0"); EXPECT_CALL(initialized_, ready()); @@ -298,7 +301,7 @@ version_info: '1' auto response2 = TestUtility::parseYaml(response2_yaml); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1", "cluster2"}))); + EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{"cluster1", "cluster2"})); expectAdd("cluster1", "1"); expectAdd("cluster3", "1"); EXPECT_CALL(cm_, removeCluster("cluster2")); @@ -334,7 +337,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_)); + EXPECT_CALL(cm_, allClusterNames()).WillRepeatedly(Return(absl::flat_hash_set{})); EXPECT_CALL(initialized_, ready()); const auto decoded_resources = TestUtility::decodeResources(response1); diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index 3fc55beb56e20..decb98aa79663 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -553,8 +553,9 @@ TEST_P(AdsIntegrationTest, CdsPausedDuringWarming) { // Send the second warming cluster. sendDiscoveryResponse( - Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_2")}, - {buildCluster("warming_cluster_2")}, {}, "3"); + Config::TypeUrl::get().Cluster, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3"); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); // We would've got a Cluster discovery request with version 2 here, had the CDS not been paused. @@ -586,6 +587,86 @@ TEST_P(AdsIntegrationTest, CdsPausedDuringWarming) { {"warming_cluster_2", "warming_cluster_1"}, {}, {})); } +TEST_P(AdsIntegrationTest, RemoveWarmingCluster) { + initialize(); + + // Send initial configuration, validate we can process a request. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {buildCluster("cluster_0")}, + {buildCluster("cluster_0")}, {}, "1"); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "", + {"cluster_0"}, {"cluster_0"}, {})); + + sendDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {}, {}, {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {}, {}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().Listener, {buildListener("listener_0", "route_config_0")}, + {buildListener("listener_0", "route_config_0")}, {}, "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", + {"cluster_0"}, {}, {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "", + {"route_config_0"}, {"route_config_0"}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, {buildRouteConfig("route_config_0", "cluster_0")}, + {buildRouteConfig("route_config_0", "cluster_0")}, {}, "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "1", {}, {}, {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "1", + {"route_config_0"}, {}, {})); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + makeSingleRequest(); + + // Send the first warming cluster. + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_1")}, + {buildCluster("warming_cluster_1")}, {"cluster_0"}, "2"); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", + {"warming_cluster_1"}, {"warming_cluster_1"}, {"cluster_0"})); + + // Send the second warming cluster and remove the first cluster. + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {buildCluster("warming_cluster_2")}, + {buildCluster("warming_cluster_2")}, + // Delta: remove warming_cluster_1. + {"warming_cluster_1"}, "3"); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + // We would've got a Cluster discovery request with version 2 here, had the CDS not been paused. + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", + {"warming_cluster_2"}, {"warming_cluster_2"}, {"warming_cluster_1"})); + + // Finish warming the clusters. Note that the first warming cluster is not included in the + // response. + sendDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, + {buildClusterLoadAssignment("warming_cluster_2")}, + {buildClusterLoadAssignment("warming_cluster_2")}, {"cluster_0"}, "2"); + + // Validate that all clusters are warmed. + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3); + + // CDS is resumed and EDS response was acknowledged. + if (sotw_or_delta_ == Grpc::SotwOrDelta::Delta) { + // Envoy will ACK both Cluster messages. Since they arrived while CDS was paused, they aren't + // sent until CDS is unpaused. Since version 3 has already arrived by the time the version 2 + // ACK goes out, they're both acknowledging version 3. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "3", {}, {}, {})); + } + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "3", {}, {}, {})); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "2", + {"warming_cluster_2"}, {}, {})); +} // Validate that warming listeners are removed when left out of SOTW update. TEST_P(AdsIntegrationTest, RemoveWarmingListener) { initialize(); @@ -696,8 +777,9 @@ TEST_P(AdsIntegrationTest, ClusterWarmingOnNamedResponse) { // Send the second warming cluster. sendDiscoveryResponse( - Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_2")}, - {buildCluster("warming_cluster_2")}, {}, "3"); + Config::TypeUrl::get().Cluster, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3"); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", @@ -1359,8 +1441,8 @@ TEST_P(AdsClusterV2Test, CdsPausedDuringWarming) { // Send the second warming cluster. sendDiscoveryResponse( - cds_type_url, {buildCluster("warming_cluster_2")}, {buildCluster("warming_cluster_2")}, {}, - "3", true); + cds_type_url, {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, + {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3", true); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); // We would've got a Cluster discovery request with version 2 here, had the CDS not been paused. diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index c24b1b045acda..433875f2e3967 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -43,6 +43,8 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD(void, initializeSecondaryClusters, (const envoy::config::bootstrap::v3::Bootstrap& bootstrap)); MOCK_METHOD(ClusterInfoMap, clusters, ()); + MOCK_METHOD(absl::flat_hash_set, allClusterNames, ()); + MOCK_METHOD(const ClusterSet&, primaryClusters, ()); MOCK_METHOD(ThreadLocalCluster*, get, (absl::string_view cluster)); MOCK_METHOD(Http::ConnectionPool::Instance*, httpConnPoolForCluster, From 19dc6583536f274cea27d3b89fef045e570deb65 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Thu, 12 Nov 2020 16:53:22 +0000 Subject: [PATCH 2/5] fix format Signed-off-by: Yuchen Dai --- source/common/upstream/cluster_manager_impl.h | 11 ++++++----- test/common/upstream/cds_api_impl_test.cc | 6 ++++-- test/integration/ads_integration_test.cc | 3 ++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 14a5436b160f6..3eba52f7edc95 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -225,17 +225,18 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable allClusterNames() override { - absl::flat_hash_set clusters_names(active_clusters_.size() + warming_clusters_.size()); - - for (const auto& [name, _ ] : active_clusters_) { + absl::flat_hash_set clusters_names(active_clusters_.size() + + warming_clusters_.size()); + + for (const auto& [name, _] : active_clusters_) { clusters_names.emplace(name); } - for (const auto& [name, _ ] : warming_clusters_) { + for (const auto& [name, _] : warming_clusters_) { clusters_names.emplace(name); } return clusters_names; } - + const ClusterSet& primaryClusters() override { return primary_clusters_; } ThreadLocalCluster* get(absl::string_view cluster) override; diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index 03ebb91f02286..31478c27c27a0 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -111,7 +111,8 @@ version_info: '1' )EOF"; auto response2 = TestUtility::parseYaml(response2_yaml); - EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{"cluster1"})); + EXPECT_CALL(cm_, allClusterNames()) + .WillOnce(Return(absl::flat_hash_set{"cluster1"})); EXPECT_CALL(cm_, removeCluster("cluster1")).WillOnce(Return(true)); const auto decoded_resources_2 = TestUtility::decodeResources(response2); @@ -301,7 +302,8 @@ version_info: '1' auto response2 = TestUtility::parseYaml(response2_yaml); - EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{"cluster1", "cluster2"})); + EXPECT_CALL(cm_, allClusterNames()) + .WillOnce(Return(absl::flat_hash_set{"cluster1", "cluster2"})); expectAdd("cluster1", "1"); expectAdd("cluster3", "1"); EXPECT_CALL(cm_, removeCluster("cluster2")); diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index decb98aa79663..a9d96e44a8a2f 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -643,7 +643,8 @@ TEST_P(AdsIntegrationTest, RemoveWarmingCluster) { // We would've got a Cluster discovery request with version 2 here, had the CDS not been paused. EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1", - {"warming_cluster_2"}, {"warming_cluster_2"}, {"warming_cluster_1"})); + {"warming_cluster_2"}, {"warming_cluster_2"}, + {"warming_cluster_1"})); // Finish warming the clusters. Note that the first warming cluster is not included in the // response. From 1af0af7309510a9bfed8b803a49f955d9e18c673 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Fri, 13 Nov 2020 00:25:20 +0000 Subject: [PATCH 3/5] make cluster() return both active and warm Signed-off-by: Yuchen Dai --- include/envoy/upstream/cluster_manager.h | 15 +++---- .../common/grpc/async_client_manager_impl.cc | 6 +-- source/common/upstream/cds_api_impl.cc | 13 ++++++- source/common/upstream/cluster_manager_impl.h | 23 +++-------- source/common/upstream/load_stats_reporter.cc | 17 ++++---- .../redis/cluster_refresh_manager_impl.cc | 6 +-- .../extensions/stat_sinks/hystrix/hystrix.cc | 8 ++-- source/server/admin/clusters_handler.cc | 6 ++- source/server/admin/config_dump_handler.cc | 13 +++++-- .../grpc/async_client_manager_impl_test.cc | 9 +++-- test/common/upstream/cds_api_impl_test.cc | 39 +++++++++---------- .../upstream/cluster_manager_impl_test.cc | 6 +-- .../upstream/load_stats_reporter_test.cc | 3 +- .../redis/cluster_refresh_manager_test.cc | 6 +-- .../stats_sinks/hystrix/hystrix_test.cc | 18 ++++----- .../custom_cluster_integration_test.cc | 8 ++-- test/integration/eds_integration_test.cc | 8 ++-- test/mocks/upstream/cluster_manager.h | 3 +- test/server/admin/clusters_handler_test.cc | 6 +-- test/server/admin/config_dump_handler_test.cc | 20 +++++----- test/server/configuration_impl_test.cc | 4 +- 21 files changed, 122 insertions(+), 115 deletions(-) diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 6ed383db21464..beb88299da28f 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -126,18 +126,15 @@ class ClusterManager { initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) PURE; using ClusterInfoMap = absl::node_hash_map>; + struct ClusterInfoMaps { + ClusterInfoMap active_clusters_; + ClusterInfoMap warming_clusters_; + }; /** - * @return ClusterInfoMap all current clusters. These are the primary (not thread local) - * clusters which should only be used for stats/admin. - */ - virtual ClusterInfoMap clusters() PURE; - - /** - * @return absl::flat_hash_set all current clusters names including warming and - * active. + * @return ClusterInfoMap all current clusters including active and warming. */ - virtual absl::flat_hash_set allClusterNames() PURE; + virtual ClusterInfoMaps clusters() PURE; using ClusterSet = absl::flat_hash_set; diff --git a/source/common/grpc/async_client_manager_impl.cc b/source/common/grpc/async_client_manager_impl.cc index dc039473395cc..5f809755f89d1 100644 --- a/source/common/grpc/async_client_manager_impl.cc +++ b/source/common/grpc/async_client_manager_impl.cc @@ -21,9 +21,9 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm, } const std::string& cluster_name = config.envoy_grpc().cluster_name(); - auto clusters = cm_.clusters(); - const auto& it = clusters.find(cluster_name); - if (it == clusters.end()) { + auto all_clusters = cm_.clusters(); + const auto& it = all_clusters.active_clusters_.find(cluster_name); + if (it == all_clusters.active_clusters_.end()) { throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster_name)); } if (it->second.get().info()->addedViaApi()) { diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index f703721139425..c0fb6fdb93a6a 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -38,7 +38,18 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config, void CdsApiImpl::onConfigUpdate(const std::vector& resources, const std::string& version_info) { - auto clusters_to_remove = cm_.allClusterNames(); + auto all_existing_clusters = cm_.clusters(); + absl::flat_hash_set clusters_to_remove( + all_existing_clusters.active_clusters_.size() + + all_existing_clusters.warming_clusters_.size()); + + for (const auto& [name, _] : all_existing_clusters.active_clusters_) { + clusters_to_remove.emplace(name); + } + for (const auto& [name, _] : all_existing_clusters.warming_clusters_) { + clusters_to_remove.emplace(name); + } + std::vector clusters; for (const auto& resource : resources) { clusters_to_remove.erase(resource.get().name()); diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 3eba52f7edc95..813ce1c04f9d8 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -214,27 +214,16 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggablecluster_); + clusters_maps.active_clusters_.emplace(cluster.first, *cluster.second->cluster_); } - - return clusters_map; - } - - absl::flat_hash_set allClusterNames() override { - absl::flat_hash_set clusters_names(active_clusters_.size() + - warming_clusters_.size()); - - for (const auto& [name, _] : active_clusters_) { - clusters_names.emplace(name); - } - for (const auto& [name, _] : warming_clusters_) { - clusters_names.emplace(name); + for (auto& cluster : warming_clusters_) { + clusters_maps.warming_clusters_.emplace(cluster.first, *cluster.second->cluster_); } - return clusters_names; + return clusters_maps; } const ClusterSet& primaryClusters() override { return primary_clusters_; } diff --git a/source/common/upstream/load_stats_reporter.cc b/source/common/upstream/load_stats_reporter.cc index fa5697e86fbd3..b7ff8e94b7c34 100644 --- a/source/common/upstream/load_stats_reporter.cc +++ b/source/common/upstream/load_stats_reporter.cc @@ -62,11 +62,11 @@ void LoadStatsReporter::sendLoadStatsRequest() { // added to the cluster manager. When we get the notification, we record the current time in // clusters_ as the start time for the load reporting window for that cluster. request_.mutable_cluster_stats()->Clear(); + auto all_clusters = cm_.clusters(); for (const auto& cluster_name_and_timestamp : clusters_) { const std::string& cluster_name = cluster_name_and_timestamp.first; - auto cluster_info_map = cm_.clusters(); - auto it = cluster_info_map.find(cluster_name); - if (it == cluster_info_map.end()) { + auto it = all_clusters.active_clusters_.find(cluster_name); + if (it == all_clusters.active_clusters_.end()) { ENVOY_LOG(debug, "Cluster {} does not exist", cluster_name); continue; } @@ -154,7 +154,8 @@ void LoadStatsReporter::startLoadReportPeriod() { // converge. absl::node_hash_map existing_clusters; if (message_->send_all_clusters()) { - for (const auto& p : cm_.clusters()) { + auto cluster_info_map = cm_.clusters(); + for (const auto& p : cluster_info_map.active_clusters_) { const std::string& cluster_name = p.first; if (clusters_.count(cluster_name) > 0) { existing_clusters.emplace(cluster_name, clusters_[cluster_name]); @@ -173,9 +174,10 @@ void LoadStatsReporter::startLoadReportPeriod() { clusters_.emplace(cluster_name, existing_clusters.count(cluster_name) > 0 ? existing_clusters[cluster_name] : time_source_.monotonicTime().time_since_epoch()); + // TODO(lambdai): Move the clusters() call out of this lambda. auto cluster_info_map = cm_.clusters(); - auto it = cluster_info_map.find(cluster_name); - if (it == cluster_info_map.end()) { + auto it = cluster_info_map.active_clusters_.find(cluster_name); + if (it == cluster_info_map.active_clusters_.end()) { return; } // Don't reset stats for existing tracked clusters. @@ -193,7 +195,8 @@ void LoadStatsReporter::startLoadReportPeriod() { cluster.info()->loadReportStats().upstream_rq_dropped_.latch(); }; if (message_->send_all_clusters()) { - for (const auto& p : cm_.clusters()) { + auto cluster_info_map = cm_.clusters(); + for (const auto& p : cluster_info_map.active_clusters_) { const std::string& cluster_name = p.first; handle_cluster_func(cluster_name); } diff --git a/source/extensions/common/redis/cluster_refresh_manager_impl.cc b/source/extensions/common/redis/cluster_refresh_manager_impl.cc index 8da52d96665ca..c3caa96d9eec3 100644 --- a/source/extensions/common/redis/cluster_refresh_manager_impl.cc +++ b/source/extensions/common/redis/cluster_refresh_manager_impl.cc @@ -133,9 +133,9 @@ bool ClusterRefreshManagerImpl::onEvent(const std::string& cluster_name, EventTy if (post_callback) { main_thread_dispatcher_.post([this, cluster_name, info]() { // Ensure that cluster is still active before calling callback. - auto map = cm_.clusters(); - auto it = map.find(cluster_name); - if (it != map.end()) { + auto maps = cm_.clusters(); + auto it = maps.active_clusters_.find(cluster_name); + if (it != maps.active_clusters_.end()) { info->cb_(); } }); diff --git a/source/extensions/stat_sinks/hystrix/hystrix.cc b/source/extensions/stat_sinks/hystrix/hystrix.cc index a35f67a8d3f77..6f4a1808accf2 100644 --- a/source/extensions/stat_sinks/hystrix/hystrix.cc +++ b/source/extensions/stat_sinks/hystrix/hystrix.cc @@ -339,7 +339,7 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) { } incCounter(); std::stringstream ss; - Upstream::ClusterManager::ClusterInfoMap clusters = server_.clusterManager().clusters(); + Upstream::ClusterManager::ClusterInfoMaps all_clusters = server_.clusterManager().clusters(); // Save a map of the relevant histograms per cluster in a convenient format. absl::node_hash_map time_histograms; @@ -370,7 +370,7 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) { } } - for (auto& cluster : clusters) { + for (auto& cluster : all_clusters.active_clusters_) { Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.second.get().info(); std::unique_ptr& cluster_stats_cache_ptr = @@ -407,9 +407,9 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) { } // check if any clusters were removed, and remove from cache - if (clusters.size() < cluster_stats_cache_map_.size()) { + if (all_clusters.active_clusters_.size() < cluster_stats_cache_map_.size()) { for (auto it = cluster_stats_cache_map_.begin(); it != cluster_stats_cache_map_.end();) { - if (clusters.find(it->first) == clusters.end()) { + if (all_clusters.active_clusters_.find(it->first) == all_clusters.active_clusters_.end()) { auto next_it = std::next(it); cluster_stats_cache_map_.erase(it); it = next_it; diff --git a/source/server/admin/clusters_handler.cc b/source/server/admin/clusters_handler.cc index e0e17350b7c91..3f4724627118f 100644 --- a/source/server/admin/clusters_handler.cc +++ b/source/server/admin/clusters_handler.cc @@ -100,7 +100,8 @@ void setHealthFlag(Upstream::Host::HealthFlag flag, const Upstream::Host& host, // TODO(efimki): Add support of text readouts stats. void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) { envoy::admin::v3::Clusters clusters; - for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) { + auto all_clusters = server_.clusterManager().clusters(); + for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) { const Upstream::Cluster& cluster = cluster_ref.get(); Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.info(); @@ -184,7 +185,8 @@ void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) { // TODO(efimki): Add support of text readouts stats. void ClustersHandler::writeClustersAsText(Buffer::Instance& response) { - for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) { + auto all_clusters = server_.clusterManager().clusters(); + for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) { const Upstream::Cluster& cluster = cluster_ref.get(); const std::string& cluster_name = cluster.info()->name(); addOutlierInfo(cluster_name, cluster.outlierDetector(), response); diff --git a/source/server/admin/config_dump_handler.cc b/source/server/admin/config_dump_handler.cc index dbfd13a01e2ec..e4ca4772ce416 100644 --- a/source/server/admin/config_dump_handler.cc +++ b/source/server/admin/config_dump_handler.cc @@ -149,7 +149,9 @@ ConfigDumpHandler::addResourceToDump(envoy::admin::v3::ConfigDump& dump, const std::string& resource, bool include_eds) const { Envoy::Server::ConfigTracker::CbsMap callbacks_map = config_tracker_.getCallbacksMap(); if (include_eds) { - if (!server_.clusterManager().clusters().empty()) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. + auto all_clusters = server_.clusterManager().clusters(); + if (!all_clusters.active_clusters_.empty()) { callbacks_map.emplace("endpoint", [this] { return dumpEndpointConfigs(); }); } } @@ -194,7 +196,9 @@ void ConfigDumpHandler::addAllConfigToDump(envoy::admin::v3::ConfigDump& dump, bool include_eds) const { Envoy::Server::ConfigTracker::CbsMap callbacks_map = config_tracker_.getCallbacksMap(); if (include_eds) { - if (!server_.clusterManager().clusters().empty()) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. + auto all_clusters = server_.clusterManager().clusters(); + if (!all_clusters.active_clusters_.empty()) { callbacks_map.emplace("endpoint", [this] { return dumpEndpointConfigs(); }); } } @@ -218,8 +222,9 @@ void ConfigDumpHandler::addAllConfigToDump(envoy::admin::v3::ConfigDump& dump, ProtobufTypes::MessagePtr ConfigDumpHandler::dumpEndpointConfigs() const { auto endpoint_config_dump = std::make_unique(); - - for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. + auto all_clusters = server_.clusterManager().clusters(); + for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) { const Upstream::Cluster& cluster = cluster_ref.get(); Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.info(); envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; diff --git a/test/common/grpc/async_client_manager_impl_test.cc b/test/common/grpc/async_client_manager_impl_test.cc index 55d5c14e2fb11..448f52aa6140b 100644 --- a/test/common/grpc/async_client_manager_impl_test.cc +++ b/test/common/grpc/async_client_manager_impl_test.cc @@ -38,10 +38,10 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) { envoy::config::core::v3::GrpcService grpc_service; grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); - Upstream::ClusterManager::ClusterInfoMap cluster_map; + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; Upstream::MockClusterMockPrioritySet cluster; - cluster_map.emplace("foo", cluster); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); + cluster_maps.active_clusters_.emplace("foo", cluster); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_maps)); EXPECT_CALL(cluster, info()); EXPECT_CALL(*cluster.info_, addedViaApi()); @@ -65,7 +65,8 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcDynamicCluster) { Upstream::ClusterManager::ClusterInfoMap cluster_map; Upstream::MockClusterMockPrioritySet cluster; cluster_map.emplace("foo", cluster); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); + EXPECT_CALL(cm_, clusters()) + .WillOnce(Return(Upstream::ClusterManager::ClusterInfoMaps{cluster_map, {}})); EXPECT_CALL(cluster, info()); EXPECT_CALL(*cluster.info_, addedViaApi()).WillOnce(Return(true)); EXPECT_THROW_WITH_MESSAGE( diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index 31478c27c27a0..a914225dd9d04 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -54,16 +54,17 @@ class CdsApiImplTest : public testing::Test { .WillOnce(Throw(EnvoyException(exception_msg))); } - ClusterManager::ClusterInfoMap makeClusterMap(const std::vector& clusters) { - ClusterManager::ClusterInfoMap map; - for (const auto& cluster : clusters) { - map.emplace(cluster, cm_.thread_local_cluster_.cluster_); + ClusterManager::ClusterInfoMaps + makeClusterInfoMaps(const std::vector& active_clusters, + const std::vector& warming_clusters = {}) { + ClusterManager::ClusterInfoMaps maps; + for (const auto& cluster : active_clusters) { + maps.active_clusters_.emplace(cluster, cm_.thread_local_cluster_.cluster_); } - return map; - } - - absl::flat_hash_set makeAllClusterNames(const std::vector& clusters) { - return absl::flat_hash_set(clusters.begin(), clusters.end()); + for (const auto& cluster : warming_clusters) { + maps.warming_clusters_.emplace(cluster, cm_.thread_local_cluster_.cluster_); + } + return maps; } NiceMock cm_; @@ -95,7 +96,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); expectAdd("cluster1", "0"); EXPECT_CALL(initialized_, ready()); EXPECT_EQ("", cds_->versionInfo()); @@ -111,8 +112,7 @@ version_info: '1' )EOF"; auto response2 = TestUtility::parseYaml(response2_yaml); - EXPECT_CALL(cm_, allClusterNames()) - .WillOnce(Return(absl::flat_hash_set{"cluster1"})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({"cluster1"}))); EXPECT_CALL(cm_, removeCluster("cluster1")).WillOnce(Return(true)); const auto decoded_resources_2 = TestUtility::decodeResources(response2); @@ -130,7 +130,7 @@ TEST_F(CdsApiImplTest, ValidateDuplicateClusters) { cluster_1.set_name("duplicate_cluster"); const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1}); - EXPECT_CALL(cm_, allClusterNames()).WillRepeatedly(Return(absl::flat_hash_set{})); + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); EXPECT_THROW_WITH_MESSAGE(cds_callbacks_->onConfigUpdate(decoded_resources.refvec_, ""), EnvoyException, @@ -143,7 +143,7 @@ TEST_F(CdsApiImplTest, EmptyConfigUpdate) { setup(); - EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); cds_callbacks_->onConfigUpdate({}, ""); @@ -155,7 +155,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateWith2ValidClusters) { setup(); } - EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); envoy::config::cluster::v3::Cluster cluster_1; @@ -228,7 +228,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateAddsSecondClusterEvenIfFirstThrows) { setup(); } - EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); envoy::config::cluster::v3::Cluster cluster_1; @@ -273,7 +273,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set{})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({}))); expectAdd("cluster1", "0"); expectAdd("cluster2", "0"); EXPECT_CALL(initialized_, ready()); @@ -302,8 +302,7 @@ version_info: '1' auto response2 = TestUtility::parseYaml(response2_yaml); - EXPECT_CALL(cm_, allClusterNames()) - .WillOnce(Return(absl::flat_hash_set{"cluster1", "cluster2"})); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({"cluster1", "cluster2"}))); expectAdd("cluster1", "1"); expectAdd("cluster3", "1"); EXPECT_CALL(cm_, removeCluster("cluster2")); @@ -339,7 +338,7 @@ version_info: '0' auto response1 = TestUtility::parseYaml(response1_yaml); - EXPECT_CALL(cm_, allClusterNames()).WillRepeatedly(Return(absl::flat_hash_set{})); + EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(makeClusterInfoMaps({}))); EXPECT_CALL(initialized_, ready()); const auto decoded_resources = TestUtility::decodeResources(response1); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 2127623a94004..5d34e3cbea353 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -361,7 +361,7 @@ TEST_F(ClusterManagerImplTest, ValidClusterName) { create(parseBootstrapFromV3Yaml(yaml)); cluster_manager_->clusters() - .find("cluster:name") + .active_clusters_.find("cluster:name") ->second.get() .info() ->statsScope() @@ -1490,7 +1490,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(update_cluster, "")); EXPECT_EQ(cluster2->info_, cluster_manager_->get("fake_cluster")->info()); - EXPECT_EQ(1UL, cluster_manager_->clusters().size()); + EXPECT_EQ(1UL, cluster_manager_->clusters().active_clusters_.size()); Http::ConnectionPool::MockInstance* cp = new Http::ConnectionPool::MockInstance(); EXPECT_CALL(factory_, allocateConnPool_(_, _, _)).WillOnce(Return(cp)); EXPECT_EQ(cp, cluster_manager_->httpConnPoolForCluster("fake_cluster", ResourcePriority::Default, @@ -1520,7 +1520,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { EXPECT_CALL(*cp2, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb2)); EXPECT_TRUE(cluster_manager_->removeCluster("fake_cluster")); EXPECT_EQ(nullptr, cluster_manager_->get("fake_cluster")); - EXPECT_EQ(0UL, cluster_manager_->clusters().size()); + EXPECT_EQ(0UL, cluster_manager_->clusters().active_clusters_.size()); // Close the TCP connection. Success is no ASSERT or crash due to referencing // the removed cluster. diff --git a/test/common/upstream/load_stats_reporter_test.cc b/test/common/upstream/load_stats_reporter_test.cc index 111e7356a064b..46105592c00a3 100644 --- a/test/common/upstream/load_stats_reporter_test.cc +++ b/test/common/upstream/load_stats_reporter_test.cc @@ -125,7 +125,8 @@ TEST_F(LoadStatsReporterTest, ExistingClusters) { foo_cluster.info_->load_report_stats_.upstream_rq_dropped_.add(2); foo_cluster.info_->eds_service_name_ = "bar"; NiceMock bar_cluster; - MockClusterManager::ClusterInfoMap cluster_info{{"foo", foo_cluster}, {"bar", bar_cluster}}; + MockClusterManager::ClusterInfoMaps cluster_info{{{"foo", foo_cluster}, {"bar", bar_cluster}}, + {}}; ON_CALL(cm_, clusters()).WillByDefault(Return(cluster_info)); deliverLoadStatsResponse({"foo"}); // Initial stats report for foo on timer tick. diff --git a/test/extensions/common/redis/cluster_refresh_manager_test.cc b/test/extensions/common/redis/cluster_refresh_manager_test.cc index 916a1457da312..ea07e16243847 100644 --- a/test/extensions/common/redis/cluster_refresh_manager_test.cc +++ b/test/extensions/common/redis/cluster_refresh_manager_test.cc @@ -33,8 +33,8 @@ class ClusterRefreshManagerTest : public testing::Test { : cluster_name_("fake_cluster"), refresh_manager_(std::make_shared( dispatcher_, cm_, time_system_)) { time_system_.setMonotonicTime(std::chrono::seconds(1)); - map_.emplace("fake_cluster", mock_cluster_); - ON_CALL(cm_, clusters()).WillByDefault(Return(map_)); + cluster_maps_.active_clusters_.emplace("fake_cluster", mock_cluster_); + ON_CALL(cm_, clusters()).WillByDefault(Return(cluster_maps_)); } ~ClusterRefreshManagerTest() override = default; @@ -104,7 +104,7 @@ class ClusterRefreshManagerTest : public testing::Test { const std::string cluster_name_; NiceMock dispatcher_; NiceMock cm_; - Upstream::ClusterManager::ClusterInfoMap map_; + Upstream::ClusterManager::ClusterInfoMaps cluster_maps_; Upstream::MockClusterMockPrioritySet mock_cluster_; Event::SimulatedTimeSystem time_system_; std::shared_ptr refresh_manager_; diff --git a/test/extensions/stats_sinks/hystrix/hystrix_test.cc b/test/extensions/stats_sinks/hystrix/hystrix_test.cc index 29e7c79d02da5..38f88019602cf 100644 --- a/test/extensions/stats_sinks/hystrix/hystrix_test.cc +++ b/test/extensions/stats_sinks/hystrix/hystrix_test.cc @@ -128,9 +128,9 @@ class HystrixSinkTest : public testing::Test { void createClusterAndCallbacks() { // Set cluster. - cluster_map_.emplace(cluster1_name_, cluster1_.cluster_); + cluster_maps_.active_clusters_.emplace(cluster1_name_, cluster1_.cluster_); ON_CALL(server_, clusterManager()).WillByDefault(ReturnRef(cluster_manager_)); - ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_map_)); + ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_maps_)); ON_CALL(callbacks_, encodeData(_, _)).WillByDefault(Invoke([&](Buffer::Instance& data, bool) { // Set callbacks to send data to buffer. This will append to the end of the buffer, so @@ -141,15 +141,15 @@ class HystrixSinkTest : public testing::Test { void addClusterToMap(const std::string& cluster_name, NiceMock& cluster) { - cluster_map_.emplace(cluster_name, cluster); - // Redefining since cluster_map_ is returned by value. - ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_map_)); + cluster_maps_.active_clusters_.emplace(cluster_name, cluster); + // Redefining since cluster_maps_ is returned by value. + ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_maps_)); } void removeClusterFromMap(const std::string& cluster_name) { - cluster_map_.erase(cluster_name); - // Redefining since cluster_map_ is returned by value. - ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_map_)); + cluster_maps_.active_clusters_.erase(cluster_name); + // Redefining since cluster_maps_ is returned by value. + ON_CALL(cluster_manager_, clusters()).WillByDefault(Return(cluster_maps_)); } void addSecondClusterHelper(Buffer::OwnedImpl& buffer) { @@ -245,7 +245,7 @@ class HystrixSinkTest : public testing::Test { NiceMock callbacks_; NiceMock server_; - Upstream::ClusterManager::ClusterInfoMap cluster_map_; + Upstream::ClusterManager::ClusterInfoMaps cluster_maps_; Buffer::OwnedImpl cluster_stats_buffer_; std::unique_ptr sink_; diff --git a/test/integration/custom_cluster_integration_test.cc b/test/integration/custom_cluster_integration_test.cc index 62d5cac5ab9ff..84c936e006c56 100644 --- a/test/integration/custom_cluster_integration_test.cc +++ b/test/integration/custom_cluster_integration_test.cc @@ -69,10 +69,10 @@ TEST_P(CustomClusterIntegrationTest, TestCustomConfig) { initialize(); // Verify the cluster is correctly setup with the custom priority - const auto& cluster_map = test_server_->server().clusterManager().clusters(); - EXPECT_EQ(1, cluster_map.size()); - EXPECT_EQ(1, cluster_map.count("cluster_0")); - const auto& cluster_ref = cluster_map.find("cluster_0")->second; + const auto& cluster_maps = test_server_->server().clusterManager().clusters(); + EXPECT_EQ(1, cluster_maps.active_clusters_.size()); + EXPECT_EQ(1, cluster_maps.active_clusters_.count("cluster_0")); + const auto& cluster_ref = cluster_maps.active_clusters_.find("cluster_0")->second; const auto& hostset_per_priority = cluster_ref.get().prioritySet().hostSetsPerPriority(); EXPECT_EQ(11, hostset_per_priority.size()); const Envoy::Upstream::HostSetPtr& host_set = hostset_per_priority[10]; diff --git a/test/integration/eds_integration_test.cc b/test/integration/eds_integration_test.cc index 3e1c237e7c597..b8ca3c6f07c2a 100644 --- a/test/integration/eds_integration_test.cc +++ b/test/integration/eds_integration_test.cc @@ -316,9 +316,9 @@ TEST_P(EdsIntegrationTest, OverprovisioningFactorUpdate) { setEndpoints(4, 4, 0); auto get_and_compare = [this](const uint32_t expected_factor) { const auto& cluster_map = test_server_->server().clusterManager().clusters(); - EXPECT_EQ(1, cluster_map.size()); - EXPECT_EQ(1, cluster_map.count("cluster_0")); - const auto& cluster_ref = cluster_map.find("cluster_0")->second; + EXPECT_EQ(1, cluster_map.active_clusters_.size()); + EXPECT_EQ(1, cluster_map.active_clusters_.count("cluster_0")); + const auto& cluster_ref = cluster_map.active_clusters_.find("cluster_0")->second; const auto& hostset_per_priority = cluster_ref.get().prioritySet().hostSetsPerPriority(); EXPECT_EQ(1, hostset_per_priority.size()); const Envoy::Upstream::HostSetPtr& host_set = hostset_per_priority[0]; @@ -340,7 +340,7 @@ TEST_P(EdsIntegrationTest, BatchMemberUpdateCb) { auto& priority_set = test_server_->server() .clusterManager() .clusters() - .find("cluster_0") + .active_clusters_.find("cluster_0") ->second.get() .prioritySet(); diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index 433875f2e3967..cc3071052f677 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -42,8 +42,7 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD(void, setInitializedCb, (InitializationCompleteCallback)); MOCK_METHOD(void, initializeSecondaryClusters, (const envoy::config::bootstrap::v3::Bootstrap& bootstrap)); - MOCK_METHOD(ClusterInfoMap, clusters, ()); - MOCK_METHOD(absl::flat_hash_set, allClusterNames, ()); + MOCK_METHOD(ClusterInfoMaps, clusters, ()); MOCK_METHOD(const ClusterSet&, primaryClusters, ()); MOCK_METHOD(ThreadLocalCluster*, get, (absl::string_view cluster)); diff --git a/test/server/admin/clusters_handler_test.cc b/test/server/admin/clusters_handler_test.cc index f0ba3f5ae7e56..4d41b6c24969a 100644 --- a/test/server/admin/clusters_handler_test.cc +++ b/test/server/admin/clusters_handler_test.cc @@ -14,11 +14,11 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, AdminInstanceTest, TestUtility::ipTestParamsToString); TEST_P(AdminInstanceTest, ClustersJson) { - Upstream::ClusterManager::ClusterInfoMap cluster_map; - ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_map)); + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; + ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_maps)); NiceMock cluster; - cluster_map.emplace(cluster.info_->name_, cluster); + cluster_maps.active_clusters_.emplace(cluster.info_->name_, cluster); NiceMock outlier_detector; ON_CALL(Const(cluster), outlierDetector()).WillByDefault(Return(&outlier_detector)); diff --git a/test/server/admin/config_dump_handler_test.cc b/test/server/admin/config_dump_handler_test.cc index 7c7f5f57781f8..6075dffa4a673 100644 --- a/test/server/admin/config_dump_handler_test.cc +++ b/test/server/admin/config_dump_handler_test.cc @@ -113,11 +113,11 @@ TEST_P(AdminInstanceTest, ConfigDumpMaintainsOrder) { // Test that using ?include_eds parameter adds EDS to the config dump. TEST_P(AdminInstanceTest, ConfigDumpWithEndpoint) { - Upstream::ClusterManager::ClusterInfoMap cluster_map; - ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_map)); + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; + ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_maps)); NiceMock cluster; - cluster_map.emplace(cluster.info_->name_, cluster); + cluster_maps.active_clusters_.emplace(cluster.info_->name_, cluster); ON_CALL(*cluster.info_, addedViaApi()).WillByDefault(Return(false)); @@ -186,11 +186,11 @@ TEST_P(AdminInstanceTest, ConfigDumpWithEndpoint) { // Test EDS config dump while multiple localities and priorities exist TEST_P(AdminInstanceTest, ConfigDumpWithLocalityEndpoint) { - Upstream::ClusterManager::ClusterInfoMap cluster_map; - ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_map)); + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; + ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_maps)); NiceMock cluster; - cluster_map.emplace(cluster.info_->name_, cluster); + cluster_maps.active_clusters_.emplace(cluster.info_->name_, cluster); ON_CALL(*cluster.info_, addedViaApi()).WillByDefault(Return(false)); @@ -398,11 +398,11 @@ TEST_P(AdminInstanceTest, ConfigDumpFiltersByResource) { // We add both static and dynamic endpoint config to the dump, but expect only // dynamic in the JSON with ?resource=dynamic_endpoint_configs. TEST_P(AdminInstanceTest, ConfigDumpWithEndpointFiltersByResource) { - Upstream::ClusterManager::ClusterInfoMap cluster_map; - ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_map)); + Upstream::ClusterManager::ClusterInfoMaps cluster_maps; + ON_CALL(server_.cluster_manager_, clusters()).WillByDefault(ReturnPointee(&cluster_maps)); NiceMock cluster_1; - cluster_map.emplace(cluster_1.info_->name_, cluster_1); + cluster_maps.active_clusters_.emplace(cluster_1.info_->name_, cluster_1); ON_CALL(*cluster_1.info_, addedViaApi()).WillByDefault(Return(true)); @@ -419,7 +419,7 @@ TEST_P(AdminInstanceTest, ConfigDumpWithEndpointFiltersByResource) { NiceMock cluster_2; cluster_2.info_->name_ = "fake_cluster_2"; - cluster_map.emplace(cluster_2.info_->name_, cluster_2); + cluster_maps.active_clusters_.emplace(cluster_2.info_->name_, cluster_2); ON_CALL(*cluster_2.info_, addedViaApi()).WillByDefault(Return(false)); diff --git a/test/server/configuration_impl_test.cc b/test/server/configuration_impl_test.cc index eeeba3585b770..c8e63cb180f6c 100644 --- a/test/server/configuration_impl_test.cc +++ b/test/server/configuration_impl_test.cc @@ -161,10 +161,10 @@ TEST_F(ConfigurationImplTest, SetUpstreamClusterPerConnectionBufferLimit) { MainImpl config; config.initialize(bootstrap, server_, cluster_manager_factory_); - ASSERT_EQ(1U, config.clusterManager()->clusters().count("test_cluster")); + ASSERT_EQ(1U, config.clusterManager()->clusters().active_clusters_.count("test_cluster")); EXPECT_EQ(8192U, config.clusterManager() ->clusters() - .find("test_cluster") + .active_clusters_.find("test_cluster") ->second.get() .info() ->perConnectionBufferLimitBytes()); From ca019fd456ef8b7fd494dbc292b28120383e991f Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Fri, 13 Nov 2020 18:25:49 +0000 Subject: [PATCH 4/5] move cluster admin TODO to admin files Signed-off-by: Yuchen Dai --- source/common/upstream/cluster_manager_impl.h | 1 - source/server/admin/clusters_handler.cc | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 813ce1c04f9d8..97cf4f29de306 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -215,7 +215,6 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggablecluster_); diff --git a/source/server/admin/clusters_handler.cc b/source/server/admin/clusters_handler.cc index 3f4724627118f..801045e63ecd5 100644 --- a/source/server/admin/clusters_handler.cc +++ b/source/server/admin/clusters_handler.cc @@ -100,6 +100,7 @@ void setHealthFlag(Upstream::Host::HealthFlag flag, const Upstream::Host& host, // TODO(efimki): Add support of text readouts stats. void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) { envoy::admin::v3::Clusters clusters; + // TODO(mattklein123): Add ability to see warming clusters in admin output. auto all_clusters = server_.clusterManager().clusters(); for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) { const Upstream::Cluster& cluster = cluster_ref.get(); @@ -185,6 +186,7 @@ void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) { // TODO(efimki): Add support of text readouts stats. void ClustersHandler::writeClustersAsText(Buffer::Instance& response) { + // TODO(mattklein123): Add ability to see warming clusters in admin output. auto all_clusters = server_.clusterManager().clusters(); for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) { const Upstream::Cluster& cluster = cluster_ref.get(); From 9595fa9a36ae000f6ec458f47e157f2bcacb29b5 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Fri, 13 Nov 2020 20:24:31 +0000 Subject: [PATCH 5/5] avoid allocate set in CdsApiImpl::onConfigUpdate Signed-off-by: Yuchen Dai --- source/common/upstream/cds_api_impl.cc | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index c0fb6fdb93a6a..4568bf84b8f3d 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -39,25 +39,21 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config, void CdsApiImpl::onConfigUpdate(const std::vector& resources, const std::string& version_info) { auto all_existing_clusters = cm_.clusters(); - absl::flat_hash_set clusters_to_remove( - all_existing_clusters.active_clusters_.size() + - all_existing_clusters.warming_clusters_.size()); - - for (const auto& [name, _] : all_existing_clusters.active_clusters_) { - clusters_to_remove.emplace(name); - } - for (const auto& [name, _] : all_existing_clusters.warming_clusters_) { - clusters_to_remove.emplace(name); - } - - std::vector clusters; + // Exclude the clusters which CDS wants to add. for (const auto& resource : resources) { - clusters_to_remove.erase(resource.get().name()); + all_existing_clusters.active_clusters_.erase(resource.get().name()); + all_existing_clusters.warming_clusters_.erase(resource.get().name()); } Protobuf::RepeatedPtrField to_remove_repeated; - for (const auto& cluster_name : clusters_to_remove) { + for (const auto& [cluster_name, _] : all_existing_clusters.active_clusters_) { *to_remove_repeated.Add() = cluster_name; } + for (const auto& [cluster_name, _] : all_existing_clusters.warming_clusters_) { + // Do not add the cluster twice when the cluster is both active and warming. + if (all_existing_clusters.active_clusters_.count(cluster_name) == 0) { + *to_remove_repeated.Add() = cluster_name; + } + } onConfigUpdate(resources, to_remove_repeated, version_info); }