Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 65 additions & 42 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,20 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) {

const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); };
if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
// Remove the previous cluster before the cluster object is destroyed.
primary_init_clusters_.remove_if(
[name_to_remove = cluster.info()->name()](Cluster* cluster_iter) {
return cluster_iter->info()->name() == name_to_remove;
});
primary_init_clusters_.push_back(&cluster);
cluster.initialize(initialize_cb);
} else {
ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
// Remove the previous cluster before the cluster object is destroyed.
secondary_init_clusters_.remove_if(
[name_to_remove = cluster.info()->name()](Cluster* cluster_iter) {
return cluster_iter->info()->name() == name_to_remove;
});
secondary_init_clusters_.push_back(&cluster);
if (started_secondary_initialize_) {
// This can happen if we get a second CDS update that adds new clusters after we have
Expand Down Expand Up @@ -417,7 +427,16 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
// been setup for cross-thread updates to avoid needless updates during initialization. The order
// of operations here is important. We start by initializing the thread aware load balancer if
// needed. This must happen first so cluster updates are heard first by the load balancer.
auto cluster_data = active_clusters_.find(cluster.info()->name());
// Also, it assures that all of clusters which this function is called should be always active.
auto cluster_data = warming_clusters_.find(cluster.info()->name());
// We have a situation that clusters will be immediately active, such as static and primary
// cluster. So we must have this prevention logic here.
if (cluster_data != warming_clusters_.end()) {
clusterWarmingToActive(cluster.info()->name());
updateClusterCounts();
}
cluster_data = active_clusters_.find(cluster.info()->name());

if (cluster_data->second->thread_aware_lb_ != nullptr) {
cluster_data->second->thread_aware_lb_->initialize();
}
Expand Down Expand Up @@ -587,17 +606,6 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// The following init manager remove call is a NOP in the case we are already initialized.
// It's just kept here to avoid additional logic.
init_helper_.removeCluster(*existing_active_cluster->second->cluster_);
} else {
// Validate that warming clusters are not added to the init_helper_.
// NOTE: This loop is compiled out in optimized builds.
for (const std::list<Cluster*>& cluster_list :
{std::cref(init_helper_.primary_init_clusters_),
std::cref(init_helper_.secondary_init_clusters_)}) {
ASSERT(!std::any_of(cluster_list.begin(), cluster_list.end(),
[&existing_warming_cluster](Cluster* cluster) {
return existing_warming_cluster->second->cluster_.get() == cluster;
}));
}
}
cm_stats_.cluster_modified_.inc();
} else {
Expand All @@ -614,40 +622,41 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// the future we may decide to undergo a refactor to unify the logic but the effort/risk to
// do that right now does not seem worth it given that the logic is generally pretty clean
// and easy to understand.
const bool use_active_map =
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_);

if (use_active_map) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
// Preserve the previous cluster data to avoid early destroy. The same cluster should be added
// before destroy to avoid early initialization complete.
const auto previous_cluster = loadCluster(cluster, version_info, true, warming_clusters_);
auto& cluster_entry = warming_clusters_.at(cluster_name);
if (!all_clusters_initialized) {
ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
auto& cluster_entry = active_clusters_.at(cluster_name);
createOrUpdateThreadLocalCluster(*cluster_entry);
init_helper_.addCluster(*cluster_entry->cluster_);
} else {
auto& cluster_entry = warming_clusters_.at(cluster_name);
ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
cluster_entry->cluster_->initialize([this, cluster_name] {
auto warming_it = warming_clusters_.find(cluster_name);
auto& cluster_entry = *warming_it->second;

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);

ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
createOrUpdateThreadLocalCluster(cluster_entry);
onClusterInit(*cluster_entry.cluster_);
updateClusterCounts();
auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
createOrUpdateThreadLocalCluster(*state_changed_cluster_entry->second);
onClusterInit(*state_changed_cluster_entry->second->cluster_);
});
}

updateClusterCounts();
return true;
}

void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
auto warming_it = warming_clusters_.find(cluster_name);
ASSERT(warming_it != warming_clusters_.end());

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);
}

void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) {
tls_->runOnAllThreads([new_cluster = cluster.cluster_->info(),
thread_aware_lb_factory = cluster.loadBalancerFactory()](
Expand Down Expand Up @@ -702,6 +711,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
if (existing_warming_cluster != warming_clusters_.end() &&
existing_warming_cluster->second->added_via_api_) {
removed = true;
init_helper_.removeCluster(*existing_warming_cluster->second->cluster_);
warming_clusters_.erase(existing_warming_cluster);
ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
}
Expand All @@ -716,9 +726,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
return removed;
}

void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map) {
ClusterManagerImpl::ClusterDataPtr
ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map) {
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> new_cluster_pair =
factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
auto& new_cluster = new_cluster_pair.first;
Expand Down Expand Up @@ -763,11 +774,20 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster&
}
});
}

cluster_map[cluster_reference.info()->name()] = std::make_unique<ClusterData>(
cluster, version_info, added_via_api, std::move(new_cluster), time_source_);
const auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name());

ClusterDataPtr result;
auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name());
if (cluster_entry_it != cluster_map.end()) {
result = std::exchange(cluster_entry_it->second,
std::make_unique<ClusterData>(cluster, version_info, added_via_api,
std::move(new_cluster), time_source_));
} else {
bool inserted = false;
std::tie(cluster_entry_it, inserted) =
cluster_map.emplace(cluster_reference.info()->name(),
std::make_unique<ClusterData>(cluster, version_info, added_via_api,
std::move(new_cluster), time_source_));
ASSERT(inserted);
}
// If an LB is thread aware, create it here. The LB is not initialized until cluster pre-init
// finishes. For RingHash/Maglev don't create the LB here if subset balancing is enabled,
// because the thread_aware_lb_ field takes precedence over the subset lb).
Expand All @@ -790,6 +810,7 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster&
}

updateClusterCounts();
return result;
}

void ClusterManagerImpl::updateClusterCounts() {
Expand All @@ -804,7 +825,9 @@ void ClusterManagerImpl::updateClusterCounts() {
// Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
// signal to ADS to proceed with RDS updates.
// If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
if (ads_mux_) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
if (all_clusters_initialized && ads_mux_) {
const auto type_urls = Config::getAllVersionTypeUrls<envoy::config::cluster::v3::Cluster>();
const uint64_t previous_warming = cm_stats_.warming_clusters_.value();
if (previous_warming == 0 && !warming_clusters_.empty()) {
Expand Down
11 changes: 9 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,18 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
void createOrUpdateThreadLocalCluster(ClusterData& cluster);
ProtobufTypes::MessagePtr dumpClusterConfigs();
static ClusterManagerStats generateStats(Stats::Scope& scope);
void loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api, ClusterMap& cluster_map);

/**
* @return ClusterDataPtr contains the previous cluster in the cluster_map, or
* nullptr if cluster_map did not contain the same cluster.
*/
ClusterDataPtr loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map);
void onClusterInit(Cluster& cluster);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateClusterCounts();
void clusterWarmingToActive(const std::string& cluster_name);
void maybePrefetch(ThreadLocalClusterManagerImpl::ClusterEntryPtr& cluster_entry,
std::function<ConnectionPool::Instance*()> prefetch_pool);

Expand Down
133 changes: 131 additions & 2 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_active_clusters:
dynamic_warming_clusters:
- version_info: "version1"
cluster:
"@type": type.googleapis.com/envoy.config.cluster.v3.Cluster
Expand Down Expand Up @@ -1107,7 +1107,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_warming_clusters:
dynamic_active_clusters:
)EOF");

EXPECT_CALL(*cluster3, initialize(_));
Expand Down Expand Up @@ -1233,6 +1233,105 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) {
EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get()));
}

TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) {
const std::string json = fmt::sprintf(
R"EOF(
{
"dynamic_resources": {
"cds_config": {
"api_config_source": {
"api_type": "0",
"refresh_delay": "30s",
"cluster_names": ["cds_cluster"]
}
}
},
"static_resources": {
%s
}
}
)EOF",
clustersJson({
defaultStaticClusterJson("cds_cluster"),
}));

MockCdsApi* cds = new MockCdsApi();
std::shared_ptr<MockClusterMockPrioritySet> cds_cluster(
new NiceMock<MockClusterMockPrioritySet>());
cds_cluster->info_->name_ = "cds_cluster";

// This part tests static init.
InSequence s;
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _))
.WillOnce(Return(std::make_pair(cds_cluster, nullptr)));
ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds));
EXPECT_CALL(*cds, setInitializedCb(_));
EXPECT_CALL(*cds_cluster, initialize(_));

create(parseBootstrapFromV3Json(json));

ReadyWatcher cm_initialized;
cluster_manager_->setInitializedCb([&]() -> void { cm_initialized.ready(); });

const std::string ready_cluster_yaml = R"EOF(
name: fake_cluster
connect_timeout: 0.250s
type: STATIC
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: fake_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 11001
)EOF";

const std::string warming_cluster_yaml = R"EOF(
name: fake_cluster
connect_timeout: 0.250s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: fake_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: foo.com
port_value: 11001
)EOF";

{
SCOPED_TRACE("Add a primary cluster staying in warming.");
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _));
EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(warming_cluster_yaml),
"warming"));

// Mark all the rest of the clusters ready. Now the only warming cluster is the above one.
EXPECT_CALL(cm_initialized, ready()).Times(0);
cds_cluster->initialize_callback_();
}

{
SCOPED_TRACE("Modify the only warming primary cluster to immediate ready.");
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _));
EXPECT_CALL(*cds, initialize());
EXPECT_TRUE(
cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(ready_cluster_yaml), "ready"));
}
{
SCOPED_TRACE("All clusters are ready.");
EXPECT_CALL(cm_initialized, ready());
cds->initialized_callback_();
}
EXPECT_TRUE(Mock::VerifyAndClearExpectations(cds_cluster.get()));
}

TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) {
time_system_.setSystemTime(std::chrono::milliseconds(1234567891234));
create(defaultConfig());
Expand Down Expand Up @@ -2984,6 +3083,33 @@ TEST_F(ClusterManagerInitHelperTest, StaticSdsInitialize) {
cluster1.initialize_callback_();
}

// Verify that primary cluster can be updated in warming state.
TEST_F(ClusterManagerInitHelperTest, TestUpdateWarming) {
InSequence s;

auto sds = std::make_unique<NiceMock<MockClusterMockPrioritySet>>();
ON_CALL(*sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(*sds, initialize(_));
init_helper_.addCluster(*sds);
init_helper_.onStaticLoadComplete();

NiceMock<MockClusterMockPrioritySet> updated_sds;
ON_CALL(updated_sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(updated_sds, initialize(_));
init_helper_.addCluster(updated_sds);

// The override cluster is added. Manually drop the previous cluster. In production flow this is
// achieved by ClusterManagerImpl.
sds.reset();

ReadyWatcher primary_initialized;
init_helper_.setPrimaryClustersInitializedCb([&]() -> void { primary_initialized.ready(); });

EXPECT_CALL(*this, onClusterInit(Ref(updated_sds)));
EXPECT_CALL(primary_initialized, ready());
updated_sds.initialize_callback_();
}

TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) {
InSequence s;

Expand Down Expand Up @@ -3087,6 +3213,7 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) {
init_helper_.addCluster(cluster1);

NiceMock<MockClusterMockPrioritySet> cluster2;
cluster2.info_->name_ = "cluster2";
ON_CALL(cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary));
init_helper_.addCluster(cluster2);

Expand All @@ -3099,6 +3226,8 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) {
init_helper_.startInitializingSecondaryClusters();

NiceMock<MockClusterMockPrioritySet> cluster3;
cluster3.info_->name_ = "cluster3";

ON_CALL(cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary));
EXPECT_CALL(cluster3, initialize(_));
init_helper_.addCluster(cluster3);
Expand Down
5 changes: 3 additions & 2 deletions test/integration/ads_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ AdsIntegrationTest::AdsIntegrationTest(const envoy::config::core::v3::ApiVersion

void AdsIntegrationTest::TearDown() { cleanUpXdsConnection(); }

envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name) {
return ConfigHelper::buildCluster(name, "ROUND_ROBIN", api_version_);
envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name,
const std::string& lb_policy) {
return ConfigHelper::buildCluster(name, lb_policy, api_version_);
}

envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildTlsCluster(const std::string& name) {
Expand Down
Loading