Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <chrono>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -67,20 +66,12 @@ void ClusterManagerInitHelper::addCluster(ClusterManagerCluster& cm_cluster) {
Cluster& cluster = cm_cluster.cluster();
if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
// Remove the previous cluster before the cluster object is destroyed.
primary_init_clusters_.remove_if(
[name_to_remove = cluster.info()->name()](ClusterManagerCluster* cluster_iter) {
return cluster_iter->cluster().info()->name() == name_to_remove;
});
primary_init_clusters_.push_back(&cm_cluster);
primary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
cluster.initialize(initialize_cb);
} else {
ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
// Remove the previous cluster before the cluster object is destroyed.
secondary_init_clusters_.remove_if(
[name_to_remove = cluster.info()->name()](ClusterManagerCluster* cluster_iter) {
return cluster_iter->cluster().info()->name() == name_to_remove;
});
secondary_init_clusters_.push_back(&cm_cluster);
secondary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
if (started_secondary_initialize_) {
// This can happen if we get a second CDS update that adds new clusters after we have
// already started secondary init. In this case, just immediately initialize.
Expand All @@ -105,17 +96,22 @@ void ClusterManagerInitHelper::removeCluster(ClusterManagerCluster& cluster) {

// There is a remote edge case where we can remove a cluster via CDS that has not yet been
// initialized. When called via the remove cluster API this code catches that case.
std::list<ClusterManagerCluster*>* cluster_list;
absl::flat_hash_map<std::string, ClusterManagerCluster*>* cluster_map;
if (cluster.cluster().initializePhase() == Cluster::InitializePhase::Primary) {
cluster_list = &primary_init_clusters_;
cluster_map = &primary_init_clusters_;
} else {
ASSERT(cluster.cluster().initializePhase() == Cluster::InitializePhase::Secondary);
cluster_list = &secondary_init_clusters_;
cluster_map = &secondary_init_clusters_;
}

// It is possible that the cluster we are removing has already been initialized, and is not
// present in the initializer list. If so, this is fine.
cluster_list->remove(&cluster);
// present in the initializer map. If so, this is fine as a CDS update may happen for a
// cluster with the same name. See the case "UpdateAlreadyInitialized" of the
// target //test/common/upstream:cluster_manager_impl_test.
auto iter = cluster_map->find(cluster.cluster().info()->name());
if (iter != cluster_map->end() && iter->second == &cluster) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you are here, can you log the when (iter != cluster_map->end() && iter->second != &cluster) ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above is not the condition of the crash. But it's good to know if this branch is possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this code I think we are expecting it's possible that this can happen in our data model, e.g. getting a CDS update with the same cluster name as before.

If that's the case just put that in a comment. It would be good to know if that's covered in our tests. E.g. put an ASSERT failure here and just see if that passes tests.

If we don't hit that assert, may be leave it in? If we do hit that assert then replace it with a comment saying the above happens on a CDS update.

I'm not sure it's necessary to log it, if it's just going to happen on every CDS update that retains some names.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. getting a CDS update with the same cluster name as before.

Yes, this is exactly the case covered with bazel test //test/common/upstream:cluster_manager_impl_test --gtest_filter=*UpdateAlreadyInitialized*. I've added a comment referencing it.

cluster_map->erase(iter);
}
ENVOY_LOG(debug, "cm init: init complete: cluster={} primary={} secondary={}",
cluster.cluster().info()->name(), primary_init_clusters_.size(),
secondary_init_clusters_.size());
Expand All @@ -124,13 +120,13 @@ void ClusterManagerInitHelper::removeCluster(ClusterManagerCluster& cluster) {

void ClusterManagerInitHelper::initializeSecondaryClusters() {
started_secondary_initialize_ = true;
// Cluster::initialize() method can modify the list of secondary_init_clusters_ to remove
// Cluster::initialize() method can modify the map of secondary_init_clusters_ to remove
// the item currently being initialized, so we eschew range-based-for and do this complicated
// dance to increment the iterator before calling initialize.
for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) {
ClusterManagerCluster* cluster = *iter;
ClusterManagerCluster* cluster = iter->second;
ENVOY_LOG(debug, "initializing secondary cluster {}", iter->first);
++iter;
ENVOY_LOG(debug, "initializing secondary cluster {}", cluster->cluster().info()->name());
cluster->cluster().initialize([cluster, this] { onClusterInit(*cluster); });
}
}
Expand Down
4 changes: 2 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
CdsApi* cds_{};
ClusterManager::PrimaryClustersReadyCallback primary_clusters_initialized_callback_;
ClusterManager::InitializationCompleteCallback initialized_callback_;
std::list<ClusterManagerCluster*> primary_init_clusters_;
std::list<ClusterManagerCluster*> secondary_init_clusters_;
absl::flat_hash_map<std::string, ClusterManagerCluster*> primary_init_clusters_;
absl::flat_hash_map<std::string, ClusterManagerCluster*> secondary_init_clusters_;
State state_{State::Loading};
bool started_secondary_initialize_{};
};
Expand Down
7 changes: 7 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3318,6 +3318,13 @@ TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) {
init_helper_.startInitializingSecondaryClusters();
}

TEST_F(ClusterManagerInitHelperTest, RemoveUnknown) {
InSequence s;

NiceMock<MockClusterManagerCluster> cluster;
init_helper_.removeCluster(cluster);
}

// If secondary clusters initialization triggered outside of CdsApiImpl::onConfigUpdate()'s
// callback flows, sending ClusterLoadAssignment should not be paused before calling
// ClusterManagerInitHelper::maybeFinishInitialize(). This case tests that
Expand Down