diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 5470e70f439b5..153597ada9ae0 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -105,6 +105,12 @@ class CdsApi { * Start the first fetch of CDS data. */ virtual void initialize() PURE; + + /** + * Set a callback that will be called when the CDS API has done an initial load from the remote + * server. If the initial load fails, the callback will also be called. + */ + virtual void setInitializedCb(std::function callback) PURE; }; typedef std::unique_ptr CdsApiPtr; diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index ae9f417686dfd..9ffb327dc797d 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -59,6 +59,13 @@ void CdsApiImpl::parseResponse(const Http::Message& response) { stats_.update_success_.inc(); } +void CdsApiImpl::onFetchComplete() { + if (initialize_callback_) { + initialize_callback_(); + initialize_callback_ = nullptr; + } +} + void CdsApiImpl::onFetchFailure(EnvoyException* e) { stats_.update_failure_.inc(); if (e) { diff --git a/source/common/upstream/cds_api_impl.h b/source/common/upstream/cds_api_impl.h index 29de57683befe..0955478bc40a8 100644 --- a/source/common/upstream/cds_api_impl.h +++ b/source/common/upstream/cds_api_impl.h @@ -37,6 +37,9 @@ class CdsApiImpl : public CdsApi, Http::RestApiFetcher, Logger::Loggable callback) override { + initialize_callback_ = callback; + } private: CdsApiImpl(const Json::Object& config, ClusterManager& cm, Event::Dispatcher& dispatcher, @@ -46,11 +49,12 @@ class CdsApiImpl : public CdsApi, Http::RestApiFetcher, Logger::Loggable initialize_callback_; }; } // Upstream diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 76e47d976281f..8ed22e0e6d512 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -15,6 +15,112 @@ namespace Upstream { +void ClusterManagerInitHelper::addCluster(Cluster& cluster) { + if (state_ == State::AllClustersInitialized) { + cluster.initialize(); + return; + } + + if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { + primary_init_clusters_.push_back(&cluster); + cluster.initialize(); + } else { + ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary); + secondary_init_clusters_.push_back(&cluster); + } + + cluster.setInitializedCb([&cluster, this]() -> void { + ASSERT(state_ != State::AllClustersInitialized); + removeCluster(cluster); + }); +} + +void ClusterManagerInitHelper::removeCluster(Cluster& cluster) { + if (state_ == State::AllClustersInitialized) { + return; + } + + // There is a remote edge case where we can remove a cluster via CDS that has not yet been + // initialized. When called via the remove cluster API this code catches that case. + std::list* cluster_list; + if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { + cluster_list = &primary_init_clusters_; + } else { + ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary); + cluster_list = &secondary_init_clusters_; + } + + ASSERT(std::find(cluster_list->begin(), cluster_list->end(), &cluster) != cluster_list->end()); + cluster_list->remove(&cluster); + maybeFinishInitialize(); +} + +void ClusterManagerInitHelper::maybeFinishInitialize() { + // Do not do anything if we are still doing the initial static load or if we are waiting for + // CDS initialize. + if (state_ == State::Loading || state_ == State::WaitingForCdsInitialize) { + return; + } + + // If we are still waiting for primary clusters to initialize, do nothing. + ASSERT(state_ == State::WaitingForStaticInitialize || state_ == State::CdsInitialized); + if (!primary_init_clusters_.empty()) { + return; + } + + // If we are still waiting for secondary clusters to initialize, see if we need to first call + // initialize on them. This is only done once. + if (!secondary_init_clusters_.empty()) { + if (!started_secondary_initialize_) { + started_secondary_initialize_ = true; + for (Cluster* cluster : secondary_init_clusters_) { + cluster->initialize(); + } + } + + return; + } + + // At this point, if we are doing static init, and we have CDS, start CDS init. Otherwise, move + // directly to initialized. + started_secondary_initialize_ = false; + if (state_ == State::WaitingForStaticInitialize && cds_) { + state_ = State::WaitingForCdsInitialize; + cds_->initialize(); + } else { + state_ = State::AllClustersInitialized; + if (initialized_callback_) { + initialized_callback_(); + } + } +} + +void ClusterManagerInitHelper::onStaticLoadComplete() { + ASSERT(state_ == State::Loading); + state_ = State::WaitingForStaticInitialize; + maybeFinishInitialize(); +} + +void ClusterManagerInitHelper::setCds(CdsApi* cds) { + ASSERT(state_ == State::Loading); + cds_ = cds; + if (cds_) { + cds_->setInitializedCb([this]() -> void { + ASSERT(state_ == State::WaitingForCdsInitialize); + state_ = State::CdsInitialized; + maybeFinishInitialize(); + }); + } +} + +void ClusterManagerInitHelper::setInitializedCb(std::function callback) { + if (state_ == State::AllClustersInitialized) { + callback(); + } else { + initialized_callback_ = callback; + } +} + ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManagerFactory& factory, Stats::Store& stats, ThreadLocal::Instance& tls, Runtime::Loader& runtime, Runtime::RandomGenerator& random, @@ -24,9 +130,6 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage thread_local_slot_(tls.allocateSlot()), local_info_(local_info), cm_stats_(generateStats(stats)) { - std::vector clusters = config.getObjectArray("clusters"); - pending_cluster_init_ = clusters.size(); - if (config.hasObject("outlier_detection")) { std::string event_log_file_path = config.getObject("outlier_detection")->getString("event_log_path", ""); @@ -37,7 +140,6 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage } if (config.hasObject("sds")) { - pending_cluster_init_++; loadCluster(*config.getObject("sds")->getObject("cluster"), false); SdsConfig sds_config{ @@ -48,11 +150,14 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage } if (config.hasObject("cds")) { - pending_cluster_init_++; loadCluster(*config.getObject("cds")->getObject("cluster"), false); } - for (const Json::ObjectPtr& cluster : clusters) { + // We can now potentially create the CDS API once the backing cluster exists. + cds_api_ = factory_.createCds(config, *this); + init_helper_.setCds(cds_api_.get()); + + for (const Json::ObjectPtr& cluster : config.getObjectArray("clusters")) { loadCluster(*cluster, false); } @@ -78,12 +183,7 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage postInitializeCluster(*cluster.second.cluster_); } - // Once all clusters are initiailized we can attempt to initialize the CDS API if configured. - // TODO: Graceful initialize of CDS on initial load. - cds_api_ = factory_.createCds(config, *this); - if (cds_api_) { - cds_api_->initialize(); - } + init_helper_.onStaticLoadComplete(); } ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) { @@ -131,7 +231,8 @@ bool ClusterManagerImpl::removePrimaryCluster(const std::string& cluster_name) { return false; } - primary_clusters_.erase(cluster_name); + init_helper_.removeCluster(*existing_cluster->second.cluster_); + primary_clusters_.erase(existing_cluster); cm_stats_.cluster_removed_.inc(); cm_stats_.total_clusters_.set(primary_clusters_.size()); tls_.runOnAllThreads([this, cluster_name]() -> void { @@ -148,35 +249,12 @@ void ClusterManagerImpl::loadCluster(const Json::Object& cluster, bool added_via ClusterPtr new_cluster = factory_.clusterFromJson(cluster, *this, sds_config_, outlier_event_logger_); + init_helper_.addCluster(*new_cluster); if (!added_via_api) { if (primary_clusters_.find(new_cluster->info()->name()) != primary_clusters_.end()) { throw EnvoyException( fmt::format("cluster manager: duplicate cluster '{}'", new_cluster->info()->name())); } - - if (new_cluster->initializePhase() == Cluster::InitializePhase::Primary) { - new_cluster->initialize(); - } else { - ASSERT(new_cluster->initializePhase() == Cluster::InitializePhase::Secondary); - secondary_init_clusters_.push_back(new_cluster.get()); - } - - ASSERT(pending_cluster_init_ > 0); - new_cluster->setInitializedCb([this]() -> void { - ASSERT(pending_cluster_init_ > 0); - if (--pending_cluster_init_ == 0) { - if (initialized_callback_) { - initialized_callback_(); - } - } else if (pending_cluster_init_ == secondary_init_clusters_.size()) { - // All primary clusters have initialized. Now we start up the secondary clusters. - for (Cluster* cluster : secondary_init_clusters_) { - cluster->initialize(); - } - } - }); - } else { - new_cluster->initialize(); } const Cluster& primary_cluster_reference = *new_cluster; diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 9f9b332f13318..93022652c51bd 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -48,6 +48,37 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { const LocalInfo::LocalInfo& local_info_; }; +/** + * This is a helper class used during cluster management initialization. Dealing with primary + * clusters, secondary clusters, and CDS, is quite complicated, so this makes it easier to test. + */ +class ClusterManagerInitHelper { +public: + void addCluster(Cluster& cluster); + void onStaticLoadComplete(); + void removeCluster(Cluster& cluster); + void setCds(CdsApi* cds); + void setInitializedCb(std::function callback); + +private: + enum class State { + Loading, + WaitingForStaticInitialize, + WaitingForCdsInitialize, + CdsInitialized, + AllClustersInitialized + }; + + void maybeFinishInitialize(); + + CdsApi* cds_{}; + std::function initialized_callback_; + std::list primary_init_clusters_; + std::list secondary_init_clusters_; + State state_{State::Loading}; + bool started_secondary_initialize_{}; +}; + /** * All cluster manager stats. @see stats_macros.h */ @@ -80,11 +111,7 @@ class ClusterManagerImpl : public ClusterManager { // Upstream::ClusterManager bool addOrUpdatePrimaryCluster(const Json::Object& config) override; void setInitializedCb(std::function callback) override { - if (pending_cluster_init_ == 0) { - callback(); - } else { - initialized_callback_ = callback; - } + init_helper_.setInitializedCb(callback); } ClusterInfoMap clusters() override { ClusterInfoMap clusters_map; @@ -177,14 +204,12 @@ class ClusterManagerImpl : public ClusterManager { Runtime::RandomGenerator& random_; uint32_t thread_local_slot_; std::unordered_map primary_clusters_; - std::function initialized_callback_; - uint32_t pending_cluster_init_; Optional sds_config_; - std::list secondary_init_clusters_; Outlier::EventLoggerPtr outlier_event_logger_; const LocalInfo::LocalInfo& local_info_; CdsApiPtr cds_api_; ClusterManagerStats cm_stats_; + ClusterManagerInitHelper init_helper_; }; } // Upstream diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index be95f2fda95db..65192e008fa61 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -16,7 +16,6 @@ #include "common/common/enum_to_int.h" #include "common/common/logger.h" -#include "common/json/json_loader.h" #include "common/stats/stats_impl.h" namespace Upstream { diff --git a/source/server/server.cc b/source/server/server.cc index ed0640294ff46..a35277ce8df1d 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -200,6 +200,13 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks, log().warn("caught and eating SIGHUP. See documentation for how to hot restart."); }); + initializeStatSinks(); + + // Some of the stat sinks may need dispatcher support so don't flush until the main loop starts. + // Just setup the timer. + stat_flush_timer_ = handler_.dispatcher().createTimer([this]() -> void { flushStats(); }); + stat_flush_timer_->enableTimer(config_->statsFlushInterval()); + // Register for cluster manager init notification. We don't start serving worker traffic until // upstream clusters are initialized which may involve running the event loop. Note however that // if there are only static clusters this will fire immediately. @@ -224,13 +231,6 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks, restarter_.drainParentListeners(); hooks.onServerInitialized(); }); - - initializeStatSinks(); - - // Some of the stat sinks may need dispatcher support so don't flush until the main loop starts. - // Just setup the timer. - stat_flush_timer_ = handler_.dispatcher().createTimer([this]() -> void { flushStats(); }); - stat_flush_timer_->enableTimer(config_->statsFlushInterval()); } Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server, diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index c14f1a39a0842..a46523dcd6770 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -31,6 +31,7 @@ class CdsApiImplTest : public testing::Test { Json::ObjectPtr config = Json::Factory::LoadFromString(config_json); cds_ = CdsApiImpl::create(*config, cm_, dispatcher_, random_, local_info_, store_); + cds_->setInitializedCb([this]() -> void { initialized_.ready(); }); expectRequest(); cds_->initialize(); @@ -76,6 +77,7 @@ class CdsApiImplTest : public testing::Test { CdsApiPtr cds_; Event::MockTimer* interval_timer_{new Event::MockTimer(&dispatcher_)}; Http::AsyncClient::Callbacks* callbacks_{}; + ReadyWatcher initialized_; }; TEST_F(CdsApiImplTest, InvalidOptions) { @@ -121,6 +123,7 @@ TEST_F(CdsApiImplTest, Basic) { EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{})); expectAdd("cluster1"); expectAdd("cluster2"); + EXPECT_CALL(initialized_, ready()); EXPECT_CALL(*interval_timer_, enableTimer(_)); callbacks_->onSuccess(std::move(message)); @@ -169,6 +172,7 @@ TEST_F(CdsApiImplTest, Failure) { Http::HeaderMapPtr{new Http::TestHeaderMapImpl{{":status", "200"}}})); message->body(Buffer::InstancePtr{new Buffer::OwnedImpl(response1_json)}); + EXPECT_CALL(initialized_, ready()); EXPECT_CALL(*interval_timer_, enableTimer(_)); callbacks_->onSuccess(std::move(message)); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 3f38b1439cd9c..6db1cdec0fb03 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -342,24 +342,26 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { } )EOF"; + MockCdsApi* cds = new MockCdsApi(); MockCluster* cds_cluster = new NiceMock(); cds_cluster->info_->name_ = "cds_cluster"; MockCluster* cluster1 = new NiceMock(); MockCluster* cluster2 = new NiceMock(); cluster2->info_->name_ = "fake_cluster2"; + // This part tests static init. InSequence s; EXPECT_CALL(factory_, clusterFromJson_(_, _, _, _)).WillOnce(Return(cds_cluster)); ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); EXPECT_CALL(*cds_cluster, initialize()); + EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds)); + EXPECT_CALL(*cds, setInitializedCb(_)); EXPECT_CALL(factory_, clusterFromJson_(_, _, _, _)).WillOnce(Return(cluster1)); ON_CALL(*cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); EXPECT_CALL(*cluster1, initialize()); EXPECT_CALL(factory_, clusterFromJson_(_, _, _, _)).WillOnce(Return(cluster2)); ON_CALL(*cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); - MockCdsApi* cds = new MockCdsApi(); - EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds)); - EXPECT_CALL(*cds, initialize()); + Json::ObjectPtr loader = Json::Factory::LoadFromString(json); create(*loader); @@ -370,8 +372,61 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { cds_cluster->initialize_callback_(); cluster1->initialize_callback_(); - EXPECT_CALL(initialized, ready()); + EXPECT_CALL(*cds, initialize()); cluster2->initialize_callback_(); + + // This part tests CDS init. + MockCluster* cluster3 = new NiceMock(); + cluster3->info_->name_ = "cluster3"; + MockCluster* cluster4 = new NiceMock(); + cluster4->info_->name_ = "cluster4"; + MockCluster* cluster5 = new NiceMock(); + cluster5->info_->name_ = "cluster5"; + + std::string json_api = R"EOF( + { + "name": "cluster3" + } + )EOF"; + + Json::ObjectPtr loader_api = Json::Factory::LoadFromString(json_api); + EXPECT_CALL(factory_, clusterFromJson_(_, _, _, _)).WillOnce(Return(cluster3)); + ON_CALL(*cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); + cluster_manager_->addOrUpdatePrimaryCluster(*loader_api); + + json_api = R"EOF( + { + "name": "cluster4" + } + )EOF"; + + loader_api = Json::Factory::LoadFromString(json_api); + EXPECT_CALL(factory_, clusterFromJson_(_, _, _, _)).WillOnce(Return(cluster4)); + ON_CALL(*cluster4, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(*cluster4, initialize()); + cluster_manager_->addOrUpdatePrimaryCluster(*loader_api); + + json_api = R"EOF( + { + "name": "cluster5" + } + )EOF"; + + loader_api = Json::Factory::LoadFromString(json_api); + EXPECT_CALL(factory_, clusterFromJson_(_, _, _, _)).WillOnce(Return(cluster5)); + ON_CALL(*cluster5, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); + cluster_manager_->addOrUpdatePrimaryCluster(*loader_api); + + cds->initialized_callback_(); + + EXPECT_CALL(*cluster3, initialize()); + cluster4->initialize_callback_(); + + // Test cluster 5 getting removed before everything is initialized. + cluster_manager_->removePrimaryCluster("cluster5"); + + EXPECT_CALL(initialized, ready()); + cluster3->initialize_callback_(); } TEST_F(ClusterManagerImplTest, dynamicAddRemove) { @@ -579,4 +634,45 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { dns_callback({"127.0.0.2"}); } +TEST(ClusterManagerInitHelper, ImmediateInitialize) { + InSequence s; + ClusterManagerInitHelper init_helper; + + NiceMock cluster1; + ON_CALL(cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(cluster1, initialize()); + init_helper.addCluster(cluster1); + cluster1.initialize_callback_(); + + init_helper.onStaticLoadComplete(); + + ReadyWatcher cm_initialized; + EXPECT_CALL(cm_initialized, ready()); + init_helper.setInitializedCb([&]() -> void { cm_initialized.ready(); }); +} + +TEST(ClusterManagerInitHelper, StaticSdsInitialize) { + InSequence s; + ClusterManagerInitHelper init_helper; + + NiceMock sds; + ON_CALL(sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(sds, initialize()); + init_helper.addCluster(sds); + sds.initialize_callback_(); + + NiceMock cluster1; + ON_CALL(cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); + init_helper.addCluster(cluster1); + + EXPECT_CALL(cluster1, initialize()); + init_helper.onStaticLoadComplete(); + + ReadyWatcher cm_initialized; + init_helper.setInitializedCb([&]() -> void { cm_initialized.ready(); }); + + EXPECT_CALL(cm_initialized, ready()); + cluster1.initialize_callback_(); +} + } // Upstream diff --git a/test/mocks/upstream/mocks.cc b/test/mocks/upstream/mocks.cc index 26aa175100ebb..13aebe32246c4 100644 --- a/test/mocks/upstream/mocks.cc +++ b/test/mocks/upstream/mocks.cc @@ -9,6 +9,7 @@ using testing::Invoke; using testing::Return; using testing::ReturnPointee; using testing::ReturnRef; +using testing::SaveArg; namespace Upstream { namespace Outlier { @@ -90,7 +91,10 @@ MockHealthChecker::MockHealthChecker() { MockHealthChecker::~MockHealthChecker() {} -MockCdsApi::MockCdsApi() {} +MockCdsApi::MockCdsApi() { + ON_CALL(*this, setInitializedCb(_)).WillByDefault(SaveArg<0>(&initialized_callback_)); +} + MockCdsApi::~MockCdsApi() {} } // Upstream diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index 441fe38acdfbc..8c4550c1e9cc8 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -124,6 +124,9 @@ class MockCdsApi : public CdsApi { ~MockCdsApi(); MOCK_METHOD0(initialize, void()); + MOCK_METHOD1(setInitializedCb, void(std::function callback)); + + std::function initialized_callback_; }; } // Upstream