Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions include/envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,26 @@ envoy_package()
envoy_cc_library(
name = "cluster_manager_interface",
hdrs = ["cluster_manager.h"],
deps = [
":load_balancer_interface",
":thread_local_cluster_interface",
":lazy_loader_interface",
":upstream_interface",
"//include/envoy/access_log:access_log_interface",
"//include/envoy/config:grpc_mux_interface",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/http:async_client_interface",
"//include/envoy/http:conn_pool_interface",
"//include/envoy/local_info:local_info_interface",
"//include/envoy/runtime:runtime_interface",
"@envoy_api//envoy/api/v2:cds_cc",
"@envoy_api//envoy/config/bootstrap/v2:bootstrap_cc",
],
)

envoy_cc_library(
name = "lazy_loader_interface",
hdrs = ["lazy_loader.h"],
deps = [
":load_balancer_interface",
":thread_local_cluster_interface",
Expand Down
18 changes: 17 additions & 1 deletion include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/load_balancer.h"
#include "envoy/upstream/thread_local_cluster.h"
#include "envoy/upstream/lazy_loader.h"
#include "envoy/upstream/upstream.h"

namespace Envoy {
namespace Upstream {

class ClusterUpdateCallbacks {
public:
virtual ~ClusterUpdateCallbacks() {}

virtual void onClusterAddOrUpdate(const ThreadLocalCluster& cluster) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs Doxygen comments (see rest of file for examples to mimic style).


virtual void onClusterRemoval(const std::string& cluster_name) PURE;
};

/**
* Manages connection pools and load balancing for upstream clusters. The cluster manager is
* persistent and shared among multiple ongoing requests/connections.
Expand Down Expand Up @@ -147,6 +157,11 @@ class ClusterManager {
* @return std::string the local cluster name, or "" if no local cluster was configured.
*/
virtual const std::string& localClusterName() const PURE;

virtual void addClusterUpdateCallbacks(ClusterUpdateCallbacks& callbacks) PURE;
virtual void removeClusterUpdateCallbacks(ClusterUpdateCallbacks& callbacks) PURE;

virtual LazyLoader* lazyLoader() const PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these need comments as above.

};

typedef std::unique_ptr<ClusterManager> ClusterManagerPtr;
Expand Down Expand Up @@ -213,7 +228,8 @@ class ClusterManagerFactory {
virtual ClusterSharedPtr clusterFromProto(const envoy::api::v2::Cluster& cluster,
ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger,
bool added_via_api) PURE;
bool added_via_api,
bool added_lazily) PURE;

/**
* Create a CDS API provider from configuration proto.
Expand Down
37 changes: 37 additions & 0 deletions include/envoy/upstream/lazy_loader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>

#include "envoy/access_log/access_log.h"
#include "envoy/api/v2/cds.pb.h"
#include "envoy/config/bootstrap/v2/bootstrap.pb.h"
#include "envoy/config/grpc_mux.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/http/async_client.h"
#include "envoy/http/conn_pool.h"
#include "envoy/local_info/local_info.h"
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/load_balancer.h"
#include "envoy/upstream/thread_local_cluster.h"
#include "envoy/upstream/upstream.h"

namespace Envoy {
namespace Upstream {

/**
* Lazy loading is only available when using CDS, bootstrap with static clusters will not support
* lazy loading.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true if we have independent lazy CDS config source in the bootstrap as in envoyproxy/data-plane-api#524?

*/
class LazyLoader {
public:
virtual ~LazyLoader() {}

virtual void loadCluster(const std::string& cluster) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doxygenate.

};

}
}
2 changes: 2 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ class ClusterInfo {
*/
virtual bool addedViaApi() const PURE;

virtual bool addedLazily() const PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More so than my comment on envoyproxy/data-plane-api#524, I think this is where we might want to think about how to reconcile incremental and lazy loading. As a strawman, I'd suggest there is no difference between an incremental cluster addition on a regular CDS channel and a lazy loaded cluster. WDYT? @mattklein123 as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Ideally if we could extend the xDS API with incremental updates and incremental subscriptions (or hint) we could handle lazy loading with a single stream and reconcile quite a bit of this logic.

Has there been any work / discussion on this? Should I start a data-plane-api proposal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started a strawman proposal here: envoyproxy/data-plane-api#527

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to spend heads down time with this next week, but at a high level if we can handle lazy loading w/ incremental lets just do incremental. We will get much bigger bang for the buck here. I will focus mostly on helping work through the issues in envoyproxy/data-plane-api#527.


/**
* @return the connect timeout for upstream hosts that belong to this cluster.
*/
Expand Down
1 change: 1 addition & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ envoy_cc_library(
":utility_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/upstream:cds_subscription_lib",
"//source/common/filesystem:filesystem_lib",
"//source/common/protobuf",
"@envoy_api//envoy/api/v2/core:base_cc",
Expand Down
33 changes: 32 additions & 1 deletion source/common/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "common/config/utility.h"
#include "common/filesystem/filesystem_impl.h"
#include "common/protobuf/protobuf.h"
#include "common/upstream/cds_subscription.h"

namespace Envoy {
namespace Config {
Expand Down Expand Up @@ -40,6 +41,17 @@ class SubscriptionFactory {
Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
Stats::Scope& scope, std::function<Subscription<ResourceType>*()> rest_legacy_constructor,
const std::string& rest_method, const std::string& grpc_method) {
return subscriptionFromConfigSource(config, node, dispatcher, cm, random,
scope, rest_legacy_constructor, cm.adsMux(), rest_method, grpc_method);
}

template <class ResourceType>
static std::unique_ptr<Subscription<ResourceType>> subscriptionFromConfigSource(
const envoy::api::v2::core::ConfigSource& config, const envoy::api::v2::core::Node& node,
Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
Stats::Scope& scope, std::function<Subscription<ResourceType>*()> rest_legacy_constructor,
Config::GrpcMux& ads_mux,
const std::string& rest_method, const std::string& grpc_method) {
std::unique_ptr<Subscription<ResourceType>> result;
SubscriptionStats stats = Utility::generateStats(scope);
switch (config.config_source_specifier_case()) {
Expand Down Expand Up @@ -79,14 +91,33 @@ class SubscriptionFactory {
break;
}
case envoy::api::v2::core::ConfigSource::kAds: {
result.reset(new GrpcMuxSubscriptionImpl<ResourceType>(cm.adsMux(), stats));
result.reset(new GrpcMuxSubscriptionImpl<ResourceType>(ads_mux, stats));
break;
}
default:
throw EnvoyException("Missing config source specifier in envoy::api::v2::core::ConfigSource");
}
return result;
}

static std::unique_ptr<Subscription<envoy::api::v2::Cluster>> cdsSubscriptionFromConfigSource(
const envoy::api::v2::core::ConfigSource& cds_config, const Optional<envoy::api::v2::core::ConfigSource>& eds_config,
const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
Stats::Scope& scope, Config::GrpcMux& ads_mux) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: superfluous blank line.

return subscriptionFromConfigSource<envoy::api::v2::Cluster>(
cds_config, local_info.node(), dispatcher, cm, random, scope,
[&cds_config, &eds_config, &cm, &dispatcher, &random, &scope,
&local_info]() -> Config::Subscription<envoy::api::v2::Cluster>* {
return new Upstream::CdsSubscription(Config::Utility::generateStats(scope), cds_config,
eds_config, cm, dispatcher, random, local_info);
},
ads_mux,
"envoy.api.v2.ClusterDiscoveryService.FetchClusters",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See discussion in envoyproxy/data-plane-api#524, I wonder if lazy deserves a new endpoint or not.

"envoy.api.v2.ClusterDiscoveryService.StreamClusters");

}
};

} // namespace Config
Expand Down
11 changes: 9 additions & 2 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,14 @@ envoy_cc_library(

envoy_cc_library(
name = "cluster_manager_lib",
srcs = ["cluster_manager_impl.cc"],
hdrs = ["cluster_manager_impl.h"],
srcs = [
"cluster_manager_impl.cc",
"lazy_loader_impl.cc",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to have lazy_loader_impl.* in their own lazy_load_lib target to keep fine grained dependencies for testing.

],
hdrs = [
"cluster_manager_impl.h",
"lazy_loader_impl.h"
],
deps = [
":cds_api_lib",
":load_balancer_lib",
Expand All @@ -67,6 +73,7 @@ envoy_cc_library(
"//include/envoy/ssl:context_manager_interface",
"//include/envoy/thread_local:thread_local_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:lazy_loader_interface",
"//source/common/common:enum_to_int",
"//source/common/common:utility_lib",
"//source/common/config:cds_json_lib",
Expand Down
16 changes: 6 additions & 10 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "common/config/subscription_factory.h"
#include "common/config/utility.h"
#include "common/protobuf/utility.h"
#include "common/upstream/cds_subscription.h"

namespace Envoy {
namespace Upstream {
Expand All @@ -33,15 +32,8 @@ CdsApiImpl::CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config,
Config::Utility::checkLocalInfo("cds", local_info);

subscription_ =
Config::SubscriptionFactory::subscriptionFromConfigSource<envoy::api::v2::Cluster>(
cds_config, local_info.node(), dispatcher, cm, random, *scope_,
[this, &cds_config, &eds_config, &cm, &dispatcher, &random,
&local_info]() -> Config::Subscription<envoy::api::v2::Cluster>* {
return new CdsSubscription(Config::Utility::generateStats(*scope_), cds_config,
eds_config, cm, dispatcher, random, local_info);
},
"envoy.api.v2.ClusterDiscoveryService.FetchClusters",
"envoy.api.v2.ClusterDiscoveryService.StreamClusters");
Config::SubscriptionFactory::cdsSubscriptionFromConfigSource(
cds_config, eds_config, local_info, dispatcher, cm, random, scope, cm.adsMux());
}

void CdsApiImpl::onConfigUpdate(const ResourceVector& resources) {
Expand All @@ -62,6 +54,10 @@ void CdsApiImpl::onConfigUpdate(const ResourceVector& resources) {

for (auto cluster : clusters_to_remove) {
const std::string cluster_name = cluster.first;
if (cluster.second.get().info()->addedLazily()) {
ENVOY_LOG(debug, "cds: not removing '{}' as it was added lazily", cluster_name);
continue;
}
if (cm_.removePrimaryCluster(cluster_name)) {
ENVOY_LOG(debug, "cds: remove cluster '{}'", cluster_name);
}
Expand Down
47 changes: 37 additions & 10 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/protobuf/utility.h"
#include "common/router/shadow_writer_impl.h"
#include "common/upstream/cds_api_impl.h"
#include "common/upstream/lazy_loader_impl.h"
#include "common/upstream/load_balancer_impl.h"
#include "common/upstream/maglev_lb.h"
#include "common/upstream/original_dst_cluster.h"
Expand Down Expand Up @@ -201,14 +202,14 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
for (const auto& cluster : bootstrap.static_resources().clusters()) {
// First load all the primary clusters.
if (cluster.type() != envoy::api::v2::Cluster::EDS) {
loadCluster(cluster, false);
loadCluster(cluster, false, false);
}
}

for (const auto& cluster : bootstrap.static_resources().clusters()) {
// Now load all the secondary clusters.
if (cluster.type() == envoy::api::v2::Cluster::EDS) {
loadCluster(cluster, false);
loadCluster(cluster, false, false);
}
}

Expand Down Expand Up @@ -299,6 +300,11 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots
->create(),
primary_dispatcher));
}

// Lazy loading requires CDS
if (bootstrap.dynamic_resources().has_cds_config()) {
lazy_loader_.reset(new LazyLoaderImpl(bootstrap, local_info, primary_dispatcher, random, *this, stats));
}
}

ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) {
Expand Down Expand Up @@ -337,6 +343,10 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
}

bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster& cluster) {
return addOrUpdatePrimaryCluster(cluster, false);
}

bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster& cluster, bool added_lazily) {
// First we need to see if this new config is new or an update to an existing dynamic cluster.
// We don't allow updates to statically configured clusters in the main configuration.
const std::string cluster_name = cluster.name();
Expand All @@ -351,7 +361,7 @@ bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster
init_helper_.removeCluster(*existing_cluster->second.cluster_);
}

loadCluster(cluster, true);
loadCluster(cluster, true, added_lazily);
auto& primary_cluster_entry = primary_clusters_.at(cluster_name);
ENVOY_LOG(info, "add/update cluster {}", cluster_name);
tls_->runOnAllThreads(
Expand All @@ -369,9 +379,13 @@ bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster
ENVOY_LOG(debug, "adding TLS cluster {}", new_cluster->name());
}

cluster_manager.thread_local_clusters_[new_cluster->name()].reset(
auto thread_local_cluster =
new ThreadLocalClusterManagerImpl::ClusterEntry(cluster_manager, new_cluster,
thread_aware_lb_factory));
thread_aware_lb_factory);
cluster_manager.thread_local_clusters_[new_cluster->name()].reset(thread_local_cluster);
for (auto *cb : cluster_manager.update_callbacks_) {
cb->onClusterAddOrUpdate(*thread_local_cluster);
}
});

init_helper_.addCluster(*primary_cluster_entry.cluster_);
Expand All @@ -396,14 +410,17 @@ bool ClusterManagerImpl::removePrimaryCluster(const std::string& cluster_name) {
ASSERT(cluster_manager.thread_local_clusters_.count(cluster_name) == 1);
ENVOY_LOG(debug, "removing TLS cluster {}", cluster_name);
cluster_manager.thread_local_clusters_.erase(cluster_name);
for (auto *cb : cluster_manager.update_callbacks_) {
cb->onClusterRemoval(cluster_name);
}
});

return true;
}

void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, bool added_via_api) {
void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, bool added_via_api, bool added_lazily) {
ClusterSharedPtr new_cluster =
factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api, added_lazily);

if (!added_via_api) {
if (primary_clusters_.find(new_cluster->info()->name()) != primary_clusters_.end()) {
Expand Down Expand Up @@ -434,7 +451,7 @@ void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, boo
size_t num_erased = primary_clusters_.erase(primary_cluster_reference.info()->name());
auto cluster_entry_it = primary_clusters_
.emplace(primary_cluster_reference.info()->name(),
PrimaryClusterData{MessageUtil::hash(cluster), added_via_api,
PrimaryClusterData{MessageUtil::hash(cluster), added_via_api, added_lazily,
std::move(new_cluster)})
.first;

Expand Down Expand Up @@ -548,6 +565,16 @@ const std::string ClusterManagerImpl::versionInfo() const {
return "static";
}

void ClusterManagerImpl::addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) {
ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>();
cluster_manager.update_callbacks_.insert(&cb);
}

void ClusterManagerImpl::removeClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) {
ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>();
cluster_manager.update_callbacks_.erase(&cb);
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
const Optional<std::string>& local_cluster_name)
Expand Down Expand Up @@ -805,10 +832,10 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(

ClusterSharedPtr ProdClusterManagerFactory::clusterFromProto(
const envoy::api::v2::Cluster& cluster, ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) {
Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api, bool added_lazily) {
return ClusterImplBase::create(cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_,
runtime_, random_, primary_dispatcher_, local_info_,
outlier_event_logger, added_via_api);
outlier_event_logger, added_via_api, added_lazily);
}

CdsApiPtr
Expand Down
Loading