Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ class CdsApi {
* Start the first fetch of CDS data.
*/
virtual void initialize() PURE;

/**
* Set a callback that will be called when the CDS API has done an initial load from the remote
* server. If the initial load fails, the callback will also be called.
*/
virtual void setInitializedCb(std::function<void()> callback) PURE;
};

typedef std::unique_ptr<CdsApi> CdsApiPtr;
Expand Down
7 changes: 7 additions & 0 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ void CdsApiImpl::parseResponse(const Http::Message& response) {
stats_.update_success_.inc();
}

void CdsApiImpl::onFetchComplete() {
if (initialize_callback_) {
initialize_callback_();
initialize_callback_ = nullptr;
}
}

void CdsApiImpl::onFetchFailure(EnvoyException* e) {
stats_.update_failure_.inc();
if (e) {
Expand Down
6 changes: 5 additions & 1 deletion source/common/upstream/cds_api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class CdsApiImpl : public CdsApi, Http::RestApiFetcher, Logger::Loggable<Logger:

// Upstream::CdsApi
void initialize() override { RestApiFetcher::initialize(); }
void setInitializedCb(std::function<void()> callback) override {
initialize_callback_ = callback;
}

private:
CdsApiImpl(const Json::Object& config, ClusterManager& cm, Event::Dispatcher& dispatcher,
Expand All @@ -46,11 +49,12 @@ class CdsApiImpl : public CdsApi, Http::RestApiFetcher, Logger::Loggable<Logger:
// Http::RestApiFetcher
void createRequest(Http::Message& request) override;
void parseResponse(const Http::Message& response) override;
void onFetchComplete() override {}
void onFetchComplete() override;
void onFetchFailure(EnvoyException* e) override;

const LocalInfo::LocalInfo& local_info_;
CdsStats stats_;
std::function<void()> initialize_callback_;
};

} // Upstream
152 changes: 115 additions & 37 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,112 @@

namespace Upstream {

void ClusterManagerInitHelper::addCluster(Cluster& cluster) {
if (state_ == State::AllClustersInitialized) {
cluster.initialize();
return;
}

if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
primary_init_clusters_.push_back(&cluster);
cluster.initialize();
} else {
ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
secondary_init_clusters_.push_back(&cluster);
}

cluster.setInitializedCb([&cluster, this]() -> void {
ASSERT(state_ != State::AllClustersInitialized);
removeCluster(cluster);
});
}

void ClusterManagerInitHelper::removeCluster(Cluster& cluster) {
if (state_ == State::AllClustersInitialized) {
return;
}

// There is a remote edge case where we can remove a cluster via CDS that has not yet been
// initialized. When called via the remove cluster API this code catches that case.
std::list<Cluster*>* cluster_list;
if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
cluster_list = &primary_init_clusters_;
} else {
ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
cluster_list = &secondary_init_clusters_;
}

ASSERT(std::find(cluster_list->begin(), cluster_list->end(), &cluster) != cluster_list->end());
cluster_list->remove(&cluster);
maybeFinishInitialize();
}

void ClusterManagerInitHelper::maybeFinishInitialize() {
// Do not do anything if we are still doing the initial static load or if we are waiting for
// CDS initialize.
if (state_ == State::Loading || state_ == State::WaitingForCdsInitialize) {
return;
}

// If we are still waiting for primary clusters to initialize, do nothing.
ASSERT(state_ == State::WaitingForStaticInitialize || state_ == State::CdsInitialized);
if (!primary_init_clusters_.empty()) {
return;
}

// If we are still waiting for secondary clusters to initialize, see if we need to first call
// initialize on them. This is only done once.
if (!secondary_init_clusters_.empty()) {
if (!started_secondary_initialize_) {
started_secondary_initialize_ = true;
for (Cluster* cluster : secondary_init_clusters_) {
cluster->initialize();
}
}

return;
}

// At this point, if we are doing static init, and we have CDS, start CDS init. Otherwise, move
// directly to initialized.
started_secondary_initialize_ = false;
if (state_ == State::WaitingForStaticInitialize && cds_) {
state_ = State::WaitingForCdsInitialize;
cds_->initialize();
} else {
state_ = State::AllClustersInitialized;
if (initialized_callback_) {
initialized_callback_();
}
}
}

void ClusterManagerInitHelper::onStaticLoadComplete() {
ASSERT(state_ == State::Loading);
state_ = State::WaitingForStaticInitialize;
maybeFinishInitialize();
}

void ClusterManagerInitHelper::setCds(CdsApi* cds) {
ASSERT(state_ == State::Loading);
cds_ = cds;
if (cds_) {
cds_->setInitializedCb([this]() -> void {
ASSERT(state_ == State::WaitingForCdsInitialize);
state_ = State::CdsInitialized;
maybeFinishInitialize();
});
}
}

void ClusterManagerInitHelper::setInitializedCb(std::function<void()> callback) {
if (state_ == State::AllClustersInitialized) {
callback();
} else {
initialized_callback_ = callback;
}
}

ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManagerFactory& factory,
Stats::Store& stats, ThreadLocal::Instance& tls,
Runtime::Loader& runtime, Runtime::RandomGenerator& random,
Expand All @@ -24,9 +130,6 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage
thread_local_slot_(tls.allocateSlot()), local_info_(local_info),
cm_stats_(generateStats(stats)) {

std::vector<Json::ObjectPtr> clusters = config.getObjectArray("clusters");
pending_cluster_init_ = clusters.size();

if (config.hasObject("outlier_detection")) {
std::string event_log_file_path =
config.getObject("outlier_detection")->getString("event_log_path", "");
Expand All @@ -37,7 +140,6 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage
}

if (config.hasObject("sds")) {
pending_cluster_init_++;
loadCluster(*config.getObject("sds")->getObject("cluster"), false);

SdsConfig sds_config{
Expand All @@ -48,11 +150,14 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage
}

if (config.hasObject("cds")) {
pending_cluster_init_++;
loadCluster(*config.getObject("cds")->getObject("cluster"), false);
}

for (const Json::ObjectPtr& cluster : clusters) {
// We can now potentially create the CDS API once the backing cluster exists.
cds_api_ = factory_.createCds(config, *this);
init_helper_.setCds(cds_api_.get());

for (const Json::ObjectPtr& cluster : config.getObjectArray("clusters")) {
loadCluster(*cluster, false);
}

Expand All @@ -78,12 +183,7 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, ClusterManage
postInitializeCluster(*cluster.second.cluster_);
}

// Once all clusters are initiailized we can attempt to initialize the CDS API if configured.
// TODO: Graceful initialize of CDS on initial load.
cds_api_ = factory_.createCds(config, *this);
if (cds_api_) {
cds_api_->initialize();
}
init_helper_.onStaticLoadComplete();
}

ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) {
Expand Down Expand Up @@ -131,7 +231,8 @@ bool ClusterManagerImpl::removePrimaryCluster(const std::string& cluster_name) {
return false;
}

primary_clusters_.erase(cluster_name);
init_helper_.removeCluster(*existing_cluster->second.cluster_);
primary_clusters_.erase(existing_cluster);
cm_stats_.cluster_removed_.inc();
cm_stats_.total_clusters_.set(primary_clusters_.size());
tls_.runOnAllThreads([this, cluster_name]() -> void {
Expand All @@ -148,35 +249,12 @@ void ClusterManagerImpl::loadCluster(const Json::Object& cluster, bool added_via
ClusterPtr new_cluster =
factory_.clusterFromJson(cluster, *this, sds_config_, outlier_event_logger_);

init_helper_.addCluster(*new_cluster);
if (!added_via_api) {
if (primary_clusters_.find(new_cluster->info()->name()) != primary_clusters_.end()) {
throw EnvoyException(
fmt::format("cluster manager: duplicate cluster '{}'", new_cluster->info()->name()));
}

if (new_cluster->initializePhase() == Cluster::InitializePhase::Primary) {
new_cluster->initialize();
} else {
ASSERT(new_cluster->initializePhase() == Cluster::InitializePhase::Secondary);
secondary_init_clusters_.push_back(new_cluster.get());
}

ASSERT(pending_cluster_init_ > 0);
new_cluster->setInitializedCb([this]() -> void {
ASSERT(pending_cluster_init_ > 0);
if (--pending_cluster_init_ == 0) {
if (initialized_callback_) {
initialized_callback_();
}
} else if (pending_cluster_init_ == secondary_init_clusters_.size()) {
// All primary clusters have initialized. Now we start up the secondary clusters.
for (Cluster* cluster : secondary_init_clusters_) {
cluster->initialize();
}
}
});
} else {
new_cluster->initialize();
}

const Cluster& primary_cluster_reference = *new_cluster;
Expand Down
41 changes: 33 additions & 8 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,37 @@ class ProdClusterManagerFactory : public ClusterManagerFactory {
const LocalInfo::LocalInfo& local_info_;
};

/**
* This is a helper class used during cluster management initialization. Dealing with primary
* clusters, secondary clusters, and CDS, is quite complicated, so this makes it easier to test.
*/
class ClusterManagerInitHelper {
public:
void addCluster(Cluster& cluster);
void onStaticLoadComplete();
void removeCluster(Cluster& cluster);
void setCds(CdsApi* cds);
void setInitializedCb(std::function<void()> callback);

private:
enum class State {
Loading,
WaitingForStaticInitialize,
WaitingForCdsInitialize,
CdsInitialized,
AllClustersInitialized
};

void maybeFinishInitialize();

CdsApi* cds_{};
std::function<void()> initialized_callback_;
std::list<Cluster*> primary_init_clusters_;
std::list<Cluster*> secondary_init_clusters_;
State state_{State::Loading};
bool started_secondary_initialize_{};
};

/**
* All cluster manager stats. @see stats_macros.h
*/
Expand Down Expand Up @@ -80,11 +111,7 @@ class ClusterManagerImpl : public ClusterManager {
// Upstream::ClusterManager
bool addOrUpdatePrimaryCluster(const Json::Object& config) override;
void setInitializedCb(std::function<void()> callback) override {
if (pending_cluster_init_ == 0) {
callback();
} else {
initialized_callback_ = callback;
}
init_helper_.setInitializedCb(callback);
}
ClusterInfoMap clusters() override {
ClusterInfoMap clusters_map;
Expand Down Expand Up @@ -177,14 +204,12 @@ class ClusterManagerImpl : public ClusterManager {
Runtime::RandomGenerator& random_;
uint32_t thread_local_slot_;
std::unordered_map<std::string, PrimaryClusterData> primary_clusters_;
std::function<void()> initialized_callback_;
uint32_t pending_cluster_init_;
Optional<SdsConfig> sds_config_;
std::list<Cluster*> secondary_init_clusters_;
Outlier::EventLoggerPtr outlier_event_logger_;
const LocalInfo::LocalInfo& local_info_;
CdsApiPtr cds_api_;
ClusterManagerStats cm_stats_;
ClusterManagerInitHelper init_helper_;
};

} // Upstream
1 change: 0 additions & 1 deletion source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "common/common/enum_to_int.h"
#include "common/common/logger.h"
#include "common/json/json_loader.h"
#include "common/stats/stats_impl.h"

namespace Upstream {
Expand Down
14 changes: 7 additions & 7 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks,
log().warn("caught and eating SIGHUP. See documentation for how to hot restart.");
});

initializeStatSinks();

// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
stat_flush_timer_ = handler_.dispatcher().createTimer([this]() -> void { flushStats(); });
stat_flush_timer_->enableTimer(config_->statsFlushInterval());

// Register for cluster manager init notification. We don't start serving worker traffic until
// upstream clusters are initialized which may involve running the event loop. Note however that
// if there are only static clusters this will fire immediately.
Expand All @@ -224,13 +231,6 @@ void InstanceImpl::initialize(Options& options, TestHooks& hooks,
restarter_.drainParentListeners();
hooks.onServerInitialized();
});

initializeStatSinks();

// Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
// Just setup the timer.
stat_flush_timer_ = handler_.dispatcher().createTimer([this]() -> void { flushStats(); });
stat_flush_timer_->enableTimer(config_->statsFlushInterval());
}

Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
Expand Down
4 changes: 4 additions & 0 deletions test/common/upstream/cds_api_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class CdsApiImplTest : public testing::Test {

Json::ObjectPtr config = Json::Factory::LoadFromString(config_json);
cds_ = CdsApiImpl::create(*config, cm_, dispatcher_, random_, local_info_, store_);
cds_->setInitializedCb([this]() -> void { initialized_.ready(); });

expectRequest();
cds_->initialize();
Expand Down Expand Up @@ -76,6 +77,7 @@ class CdsApiImplTest : public testing::Test {
CdsApiPtr cds_;
Event::MockTimer* interval_timer_{new Event::MockTimer(&dispatcher_)};
Http::AsyncClient::Callbacks* callbacks_{};
ReadyWatcher initialized_;
};

TEST_F(CdsApiImplTest, InvalidOptions) {
Expand Down Expand Up @@ -121,6 +123,7 @@ TEST_F(CdsApiImplTest, Basic) {
EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
expectAdd("cluster1");
expectAdd("cluster2");
EXPECT_CALL(initialized_, ready());
EXPECT_CALL(*interval_timer_, enableTimer(_));
callbacks_->onSuccess(std::move(message));

Expand Down Expand Up @@ -169,6 +172,7 @@ TEST_F(CdsApiImplTest, Failure) {
Http::HeaderMapPtr{new Http::TestHeaderMapImpl{{":status", "200"}}}));
message->body(Buffer::InstancePtr{new Buffer::OwnedImpl(response1_json)});

EXPECT_CALL(initialized_, ready());
EXPECT_CALL(*interval_timer_, enableTimer(_));
callbacks_->onSuccess(std::move(message));

Expand Down
Loading