diff --git a/include/envoy/upstream/BUILD b/include/envoy/upstream/BUILD index b093c3715cdfd..f2dcb14fbee32 100644 --- a/include/envoy/upstream/BUILD +++ b/include/envoy/upstream/BUILD @@ -11,6 +11,26 @@ envoy_package() envoy_cc_library( name = "cluster_manager_interface", hdrs = ["cluster_manager.h"], + deps = [ + ":load_balancer_interface", + ":thread_local_cluster_interface", + ":lazy_loader_interface", + ":upstream_interface", + "//include/envoy/access_log:access_log_interface", + "//include/envoy/config:grpc_mux_interface", + "//include/envoy/grpc:async_client_manager_interface", + "//include/envoy/http:async_client_interface", + "//include/envoy/http:conn_pool_interface", + "//include/envoy/local_info:local_info_interface", + "//include/envoy/runtime:runtime_interface", + "@envoy_api//envoy/api/v2:cds_cc", + "@envoy_api//envoy/config/bootstrap/v2:bootstrap_cc", + ], +) + +envoy_cc_library( + name = "lazy_loader_interface", + hdrs = ["lazy_loader.h"], deps = [ ":load_balancer_interface", ":thread_local_cluster_interface", diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index f4f022b3c401c..75f553df9f73b 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -17,11 +17,21 @@ #include "envoy/runtime/runtime.h" #include "envoy/upstream/load_balancer.h" #include "envoy/upstream/thread_local_cluster.h" +#include "envoy/upstream/lazy_loader.h" #include "envoy/upstream/upstream.h" namespace Envoy { namespace Upstream { +class ClusterUpdateCallbacks { +public: + virtual ~ClusterUpdateCallbacks() {} + + virtual void onClusterAddOrUpdate(const ThreadLocalCluster& cluster) PURE; + + virtual void onClusterRemoval(const std::string& cluster_name) PURE; +}; + /** * Manages connection pools and load balancing for upstream clusters. The cluster manager is * persistent and shared among multiple ongoing requests/connections. @@ -147,6 +157,11 @@ class ClusterManager { * @return std::string the local cluster name, or "" if no local cluster was configured. */ virtual const std::string& localClusterName() const PURE; + + virtual void addClusterUpdateCallbacks(ClusterUpdateCallbacks& callbacks) PURE; + virtual void removeClusterUpdateCallbacks(ClusterUpdateCallbacks& callbacks) PURE; + + virtual LazyLoader* lazyLoader() const PURE; }; typedef std::unique_ptr ClusterManagerPtr; @@ -213,7 +228,8 @@ class ClusterManagerFactory { virtual ClusterSharedPtr clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api) PURE; + bool added_via_api, + bool added_lazily) PURE; /** * Create a CDS API provider from configuration proto. diff --git a/include/envoy/upstream/lazy_loader.h b/include/envoy/upstream/lazy_loader.h new file mode 100644 index 0000000000000..9796621a27bc7 --- /dev/null +++ b/include/envoy/upstream/lazy_loader.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "envoy/access_log/access_log.h" +#include "envoy/api/v2/cds.pb.h" +#include "envoy/config/bootstrap/v2/bootstrap.pb.h" +#include "envoy/config/grpc_mux.h" +#include "envoy/grpc/async_client_manager.h" +#include "envoy/http/async_client.h" +#include "envoy/http/conn_pool.h" +#include "envoy/local_info/local_info.h" +#include "envoy/runtime/runtime.h" +#include "envoy/upstream/load_balancer.h" +#include "envoy/upstream/thread_local_cluster.h" +#include "envoy/upstream/upstream.h" + +namespace Envoy { +namespace Upstream { + +/** + * Lazy loading is only available when using CDS, bootstrap with static clusters will not support + * lazy loading. + */ +class LazyLoader { +public: + virtual ~LazyLoader() {} + + virtual void loadCluster(const std::string& cluster) PURE; +}; + +} +} diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index 0ab058f36f04b..8f2ea739e1668 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -377,6 +377,8 @@ class ClusterInfo { */ virtual bool addedViaApi() const PURE; + virtual bool addedLazily() const PURE; + /** * @return the connect timeout for upstream hosts that belong to this cluster. */ diff --git a/source/common/config/BUILD b/source/common/config/BUILD index 20971a5a625fd..4ae4b3b7631d7 100644 --- a/source/common/config/BUILD +++ b/source/common/config/BUILD @@ -284,6 +284,7 @@ envoy_cc_library( ":utility_lib", "//include/envoy/config:subscription_interface", "//include/envoy/upstream:cluster_manager_interface", + "//source/common/upstream:cds_subscription_lib", "//source/common/filesystem:filesystem_lib", "//source/common/protobuf", "@envoy_api//envoy/api/v2/core:base_cc", diff --git a/source/common/config/subscription_factory.h b/source/common/config/subscription_factory.h index a21ffa07ea1a2..a292fc85c3775 100644 --- a/source/common/config/subscription_factory.h +++ b/source/common/config/subscription_factory.h @@ -13,6 +13,7 @@ #include "common/config/utility.h" #include "common/filesystem/filesystem_impl.h" #include "common/protobuf/protobuf.h" +#include "common/upstream/cds_subscription.h" namespace Envoy { namespace Config { @@ -40,6 +41,17 @@ class SubscriptionFactory { Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random, Stats::Scope& scope, std::function*()> rest_legacy_constructor, const std::string& rest_method, const std::string& grpc_method) { + return subscriptionFromConfigSource(config, node, dispatcher, cm, random, + scope, rest_legacy_constructor, cm.adsMux(), rest_method, grpc_method); + } + + template + static std::unique_ptr> subscriptionFromConfigSource( + const envoy::api::v2::core::ConfigSource& config, const envoy::api::v2::core::Node& node, + Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random, + Stats::Scope& scope, std::function*()> rest_legacy_constructor, + Config::GrpcMux& ads_mux, + const std::string& rest_method, const std::string& grpc_method) { std::unique_ptr> result; SubscriptionStats stats = Utility::generateStats(scope); switch (config.config_source_specifier_case()) { @@ -79,7 +91,7 @@ class SubscriptionFactory { break; } case envoy::api::v2::core::ConfigSource::kAds: { - result.reset(new GrpcMuxSubscriptionImpl(cm.adsMux(), stats)); + result.reset(new GrpcMuxSubscriptionImpl(ads_mux, stats)); break; } default: @@ -87,6 +99,25 @@ class SubscriptionFactory { } return result; } + + static std::unique_ptr> cdsSubscriptionFromConfigSource( + const envoy::api::v2::core::ConfigSource& cds_config, const Optional& eds_config, + const LocalInfo::LocalInfo& local_info, + Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, Runtime::RandomGenerator& random, + Stats::Scope& scope, Config::GrpcMux& ads_mux) { + + return subscriptionFromConfigSource( + cds_config, local_info.node(), dispatcher, cm, random, scope, + [&cds_config, &eds_config, &cm, &dispatcher, &random, &scope, + &local_info]() -> Config::Subscription* { + return new Upstream::CdsSubscription(Config::Utility::generateStats(scope), cds_config, + eds_config, cm, dispatcher, random, local_info); + }, + ads_mux, + "envoy.api.v2.ClusterDiscoveryService.FetchClusters", + "envoy.api.v2.ClusterDiscoveryService.StreamClusters"); + + } }; } // namespace Config diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index 4588f2d74085e..a73512e4636dc 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -51,8 +51,14 @@ envoy_cc_library( envoy_cc_library( name = "cluster_manager_lib", - srcs = ["cluster_manager_impl.cc"], - hdrs = ["cluster_manager_impl.h"], + srcs = [ + "cluster_manager_impl.cc", + "lazy_loader_impl.cc", + ], + hdrs = [ + "cluster_manager_impl.h", + "lazy_loader_impl.h" + ], deps = [ ":cds_api_lib", ":load_balancer_lib", @@ -67,6 +73,7 @@ envoy_cc_library( "//include/envoy/ssl:context_manager_interface", "//include/envoy/thread_local:thread_local_interface", "//include/envoy/upstream:cluster_manager_interface", + "//include/envoy/upstream:lazy_loader_interface", "//source/common/common:enum_to_int", "//source/common/common:utility_lib", "//source/common/config:cds_json_lib", diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index da9844f78912d..787f77fdfbb64 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -10,7 +10,6 @@ #include "common/config/subscription_factory.h" #include "common/config/utility.h" #include "common/protobuf/utility.h" -#include "common/upstream/cds_subscription.h" namespace Envoy { namespace Upstream { @@ -33,15 +32,8 @@ CdsApiImpl::CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config, Config::Utility::checkLocalInfo("cds", local_info); subscription_ = - Config::SubscriptionFactory::subscriptionFromConfigSource( - cds_config, local_info.node(), dispatcher, cm, random, *scope_, - [this, &cds_config, &eds_config, &cm, &dispatcher, &random, - &local_info]() -> Config::Subscription* { - return new CdsSubscription(Config::Utility::generateStats(*scope_), cds_config, - eds_config, cm, dispatcher, random, local_info); - }, - "envoy.api.v2.ClusterDiscoveryService.FetchClusters", - "envoy.api.v2.ClusterDiscoveryService.StreamClusters"); + Config::SubscriptionFactory::cdsSubscriptionFromConfigSource( + cds_config, eds_config, local_info, dispatcher, cm, random, scope, cm.adsMux()); } void CdsApiImpl::onConfigUpdate(const ResourceVector& resources) { @@ -62,6 +54,10 @@ void CdsApiImpl::onConfigUpdate(const ResourceVector& resources) { for (auto cluster : clusters_to_remove) { const std::string cluster_name = cluster.first; + if (cluster.second.get().info()->addedLazily()) { + ENVOY_LOG(debug, "cds: not removing '{}' as it was added lazily", cluster_name); + continue; + } if (cm_.removePrimaryCluster(cluster_name)) { ENVOY_LOG(debug, "cds: remove cluster '{}'", cluster_name); } diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index f0b7cb9ce4dd3..cb465ffc55efe 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -27,6 +27,7 @@ #include "common/protobuf/utility.h" #include "common/router/shadow_writer_impl.h" #include "common/upstream/cds_api_impl.h" +#include "common/upstream/lazy_loader_impl.h" #include "common/upstream/load_balancer_impl.h" #include "common/upstream/maglev_lb.h" #include "common/upstream/original_dst_cluster.h" @@ -201,14 +202,14 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots for (const auto& cluster : bootstrap.static_resources().clusters()) { // First load all the primary clusters. if (cluster.type() != envoy::api::v2::Cluster::EDS) { - loadCluster(cluster, false); + loadCluster(cluster, false, false); } } for (const auto& cluster : bootstrap.static_resources().clusters()) { // Now load all the secondary clusters. if (cluster.type() == envoy::api::v2::Cluster::EDS) { - loadCluster(cluster, false); + loadCluster(cluster, false, false); } } @@ -299,6 +300,11 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v2::Boots ->create(), primary_dispatcher)); } + + // Lazy loading requires CDS + if (bootstrap.dynamic_resources().has_cds_config()) { + lazy_loader_.reset(new LazyLoaderImpl(bootstrap, local_info, primary_dispatcher, random, *this, stats)); + } } ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) { @@ -337,6 +343,10 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) { } bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster& cluster) { + return addOrUpdatePrimaryCluster(cluster, false); +} + +bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster& cluster, bool added_lazily) { // First we need to see if this new config is new or an update to an existing dynamic cluster. // We don't allow updates to statically configured clusters in the main configuration. const std::string cluster_name = cluster.name(); @@ -351,7 +361,7 @@ bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster init_helper_.removeCluster(*existing_cluster->second.cluster_); } - loadCluster(cluster, true); + loadCluster(cluster, true, added_lazily); auto& primary_cluster_entry = primary_clusters_.at(cluster_name); ENVOY_LOG(info, "add/update cluster {}", cluster_name); tls_->runOnAllThreads( @@ -369,9 +379,13 @@ bool ClusterManagerImpl::addOrUpdatePrimaryCluster(const envoy::api::v2::Cluster ENVOY_LOG(debug, "adding TLS cluster {}", new_cluster->name()); } - cluster_manager.thread_local_clusters_[new_cluster->name()].reset( + auto thread_local_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(cluster_manager, new_cluster, - thread_aware_lb_factory)); + thread_aware_lb_factory); + cluster_manager.thread_local_clusters_[new_cluster->name()].reset(thread_local_cluster); + for (auto *cb : cluster_manager.update_callbacks_) { + cb->onClusterAddOrUpdate(*thread_local_cluster); + } }); init_helper_.addCluster(*primary_cluster_entry.cluster_); @@ -396,14 +410,17 @@ bool ClusterManagerImpl::removePrimaryCluster(const std::string& cluster_name) { ASSERT(cluster_manager.thread_local_clusters_.count(cluster_name) == 1); ENVOY_LOG(debug, "removing TLS cluster {}", cluster_name); cluster_manager.thread_local_clusters_.erase(cluster_name); + for (auto *cb : cluster_manager.update_callbacks_) { + cb->onClusterRemoval(cluster_name); + } }); return true; } -void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, bool added_via_api) { +void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, bool added_via_api, bool added_lazily) { ClusterSharedPtr new_cluster = - factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api); + factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api, added_lazily); if (!added_via_api) { if (primary_clusters_.find(new_cluster->info()->name()) != primary_clusters_.end()) { @@ -434,7 +451,7 @@ void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, boo size_t num_erased = primary_clusters_.erase(primary_cluster_reference.info()->name()); auto cluster_entry_it = primary_clusters_ .emplace(primary_cluster_reference.info()->name(), - PrimaryClusterData{MessageUtil::hash(cluster), added_via_api, + PrimaryClusterData{MessageUtil::hash(cluster), added_via_api, added_lazily, std::move(new_cluster)}) .first; @@ -548,6 +565,16 @@ const std::string ClusterManagerImpl::versionInfo() const { return "static"; } +void ClusterManagerImpl::addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) { + ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped(); + cluster_manager.update_callbacks_.insert(&cb); +} + +void ClusterManagerImpl::removeClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) { + ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped(); + cluster_manager.update_callbacks_.erase(&cb); +} + ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl( ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, const Optional& local_cluster_name) @@ -805,10 +832,10 @@ Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool( ClusterSharedPtr ProdClusterManagerFactory::clusterFromProto( const envoy::api::v2::Cluster& cluster, ClusterManager& cm, - Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) { + Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api, bool added_lazily) { return ClusterImplBase::create(cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_, random_, primary_dispatcher_, local_info_, - outlier_event_logger, added_via_api); + outlier_event_logger, added_via_api, added_lazily); } CdsApiPtr diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 656f6bc02e356..872bf51291054 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -52,7 +52,7 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { const Network::ConnectionSocket::OptionsSharedPtr& options) override; ClusterSharedPtr clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api) override; + bool added_via_api, bool added_lazily) override; CdsApiPtr createCds(const envoy::api::v2::core::ConfigSource& cds_config, const Optional& eds_config, ClusterManager& cm) override; @@ -153,6 +153,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable callback) override { init_helper_.setInitializedCb(callback); } @@ -189,6 +190,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable thread_local_clusters_; std::unordered_map host_http_conn_pool_map_; + std::unordered_set update_callbacks_; const PrioritySet* local_priority_set_{}; }; struct PrimaryClusterData { - PrimaryClusterData(uint64_t config_hash, bool added_via_api, ClusterSharedPtr&& cluster) - : config_hash_(config_hash), added_via_api_(added_via_api), cluster_(std::move(cluster)) {} + PrimaryClusterData(uint64_t config_hash, bool added_via_api, bool added_lazily, ClusterSharedPtr&& cluster) + : config_hash_(config_hash), added_via_api_(added_via_api), added_lazily_(added_lazily), cluster_(std::move(cluster)) {} LoadBalancerFactorySharedPtr loadBalancerFactory() { if (thread_aware_lb_ != nullptr) { @@ -275,13 +282,14 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable lazy_loader_; }; } // namespace Upstream diff --git a/source/common/upstream/eds.cc b/source/common/upstream/eds.cc index f185f2eb9ea73..db438af0f4dfc 100644 --- a/source/common/upstream/eds.cc +++ b/source/common/upstream/eds.cc @@ -21,9 +21,9 @@ EdsClusterImpl::EdsClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime:: Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, const LocalInfo::LocalInfo& local_info, ClusterManager& cm, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, - bool added_via_api) + bool added_via_api, bool added_lazily) : BaseDynamicClusterImpl(cluster, cm.sourceAddress(), runtime, stats, ssl_context_manager, - added_via_api), + added_via_api, added_lazily), cm_(cm), local_info_(local_info), cluster_name_(cluster.eds_cluster_config().service_name().empty() ? cluster.name() diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index a24c13516c957..d3450d081190f 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -20,7 +20,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, const LocalInfo::LocalInfo& local_info, ClusterManager& cm, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, - bool added_via_api); + bool added_via_api, bool added_lazily); const std::string versionInfo() const { return subscription_->versionInfo(); } diff --git a/source/common/upstream/lazy_loader_impl.cc b/source/common/upstream/lazy_loader_impl.cc new file mode 100644 index 0000000000000..7fa317c43d612 --- /dev/null +++ b/source/common/upstream/lazy_loader_impl.cc @@ -0,0 +1,68 @@ +#include "common/upstream/lazy_loader_impl.h" + +#include "envoy/config/bootstrap/v2/bootstrap.pb.h" +#include "envoy/upstream/lazy_loader.h" +#include "envoy/event/dispatcher.h" + +#include "common/config/subscription_factory.h" +#include "common/config/utility.h" +#include "common/config/grpc_mux_impl.h" + + +namespace Envoy { +namespace Upstream { + + +LazyLoaderImpl::LazyLoaderImpl(const envoy::config::bootstrap::v2::Bootstrap& bootstrap, + const LocalInfo::LocalInfo& local_info, + Event::Dispatcher& primary_dispatcher, + Runtime::RandomGenerator& random, + ClusterManagerImpl& cm, + Stats::Store& stats) : + primary_dispatcher_(primary_dispatcher), + cluster_manager_(cm) { + ASSERT(bootstrap.dynamic_resources().has_cds_config()); + if (bootstrap.dynamic_resources().has_ads_config()) { + // Create an ADS stream for the lazy loader. We can not reuse the cluster manager ADS stream. + ads_mux_.reset(new Config::GrpcMuxImpl( + bootstrap.node(), + Config::Utility::factoryForApiConfigSource( + cluster_manager_.grpcAsyncClientManager(), bootstrap.dynamic_resources().ads_config(), stats) + ->create(), + primary_dispatcher, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"))); + } else { + ads_mux_.reset(&cluster_manager_.adsMux()); + } + + if (bootstrap.dynamic_resources().deprecated_v1().has_sds_config()) { + eds_config_.value(bootstrap.dynamic_resources().deprecated_v1().sds_config()); + } + + subscription_ = + Config::SubscriptionFactory::cdsSubscriptionFromConfigSource( + bootstrap.dynamic_resources().cds_config(), eds_config_, local_info, primary_dispatcher, + cluster_manager_, random, stats, *ads_mux_.get()); + + subscription_->start({}, *this); +}; + +void LazyLoaderImpl::loadCluster(const std::string& cluster) { + subscription_->updateResources({cluster}); +} + +void LazyLoaderImpl::onConfigUpdate(const ResourceVector& resources) { + primary_dispatcher_.post([this, resources]() -> void { + for (auto r : resources) { + cluster_manager_.addOrUpdatePrimaryCluster(r, true); + } + }); +} +void LazyLoaderImpl::onConfigUpdateFailed(const EnvoyException* e) { + ENVOY_LOG(warn, "gRPC update failed for lazy loader: {}", e->what()); +} + +} // namespace Upstream +} // namespace Envoy + diff --git a/source/common/upstream/lazy_loader_impl.h b/source/common/upstream/lazy_loader_impl.h new file mode 100644 index 0000000000000..66a9a551d2c3c --- /dev/null +++ b/source/common/upstream/lazy_loader_impl.h @@ -0,0 +1,42 @@ +#pragma once + +#include "envoy/config/bootstrap/v2/bootstrap.pb.h" +#include "envoy/upstream/lazy_loader.h" +#include "envoy/event/dispatcher.h" +#include "envoy/config/subscription.h" + +#include "common/config/grpc_mux_impl.h" +#include "common/upstream/cluster_manager_impl.h" + +namespace Envoy { +namespace Upstream { + +class LazyLoaderImpl : public LazyLoader, Config::SubscriptionCallbacks, Logger::Loggable { +public: + LazyLoaderImpl(const envoy::config::bootstrap::v2::Bootstrap& bootstrap, + const LocalInfo::LocalInfo& local_info, + Event::Dispatcher& primary_dispatcher, + Runtime::RandomGenerator& random, + ClusterManagerImpl& cluster_manager, + Stats::Store& statsconst); + + void loadCluster(const std::string& cluster) override; + + // From Config::SubscriptionCallbacks + void onConfigUpdate(const ResourceVector& resources) override; + void onConfigUpdateFailed(const EnvoyException* e) override; + std::string resourceName(const ProtobufWkt::Any& resource) override { + return MessageUtil::anyConvert(resource).name(); + } + +private: + Optional eds_config_; + Config::GrpcMuxPtr ads_mux_; + std::unique_ptr> subscription_; + Event::Dispatcher& primary_dispatcher_; + ClusterManagerImpl& cluster_manager_; + +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/logical_dns_cluster.cc b/source/common/upstream/logical_dns_cluster.cc index 31458633724b9..b4f7a667524d2 100644 --- a/source/common/upstream/logical_dns_cluster.cc +++ b/source/common/upstream/logical_dns_cluster.cc @@ -20,9 +20,9 @@ LogicalDnsCluster::LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, Ssl::ContextManager& ssl_context_manager, Network::DnsResolverSharedPtr dns_resolver, ThreadLocal::SlotAllocator& tls, ClusterManager& cm, - Event::Dispatcher& dispatcher, bool added_via_api) + Event::Dispatcher& dispatcher, bool added_via_api, bool added_lazily) : ClusterImplBase(cluster, cm.sourceAddress(), runtime, stats, ssl_context_manager, - added_via_api), + added_via_api, added_lazily), dns_resolver_(dns_resolver), dns_refresh_rate_ms_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))), diff --git a/source/common/upstream/logical_dns_cluster.h b/source/common/upstream/logical_dns_cluster.h index 5ebb59c55b15b..ddd9e3f6b5186 100644 --- a/source/common/upstream/logical_dns_cluster.h +++ b/source/common/upstream/logical_dns_cluster.h @@ -31,7 +31,7 @@ class LogicalDnsCluster : public ClusterImplBase { LogicalDnsCluster(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, Network::DnsResolverSharedPtr dns_resolver, ThreadLocal::SlotAllocator& tls, - ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api); + ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api, bool added_lazily); ~LogicalDnsCluster(); diff --git a/source/common/upstream/original_dst_cluster.cc b/source/common/upstream/original_dst_cluster.cc index b4a0cbd80666b..7d0d1d76334e2 100644 --- a/source/common/upstream/original_dst_cluster.cc +++ b/source/common/upstream/original_dst_cluster.cc @@ -94,9 +94,9 @@ HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerCont OriginalDstCluster::OriginalDstCluster(const envoy::api::v2::Cluster& config, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, ClusterManager& cm, - Event::Dispatcher& dispatcher, bool added_via_api) + Event::Dispatcher& dispatcher, bool added_via_api, bool added_lazily) : ClusterImplBase(config, cm.sourceAddress(), runtime, stats, ssl_context_manager, - added_via_api), + added_via_api, added_lazily), dispatcher_(dispatcher), cleanup_interval_ms_(std::chrono::milliseconds( PROTOBUF_GET_MS_OR_DEFAULT(config, cleanup_interval, 5000))), cleanup_timer_(dispatcher.createTimer([this]() -> void { cleanup(); })) { diff --git a/source/common/upstream/original_dst_cluster.h b/source/common/upstream/original_dst_cluster.h index 0fcd723b39bf6..ab7c27055d055 100644 --- a/source/common/upstream/original_dst_cluster.h +++ b/source/common/upstream/original_dst_cluster.h @@ -24,7 +24,7 @@ class OriginalDstCluster : public ClusterImplBase { public: OriginalDstCluster(const envoy::api::v2::Cluster& config, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, - ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api); + ClusterManager& cm, Event::Dispatcher& dispatcher, bool added_via_api, bool added_lazily); // Upstream::Cluster InitializePhase initializePhase() const override { return InitializePhase::Primary; } diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index 0e249b1eb4df4..2cdbfe76200f8 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -112,7 +112,8 @@ ClusterLoadReportStats ClusterInfoImpl::generateLoadReportStats(Stats::Scope& sc ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config, const Network::Address::InstanceConstSharedPtr source_address, Runtime::Loader& runtime, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, bool added_via_api) + Ssl::ContextManager& ssl_context_manager, bool added_via_api, + bool added_lazily) : runtime_(runtime), name_(config.name()), type_(config.type()), max_requests_per_connection_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_requests_per_connection, 0)), @@ -131,7 +132,7 @@ ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config, maintenance_mode_runtime_key_(fmt::format("upstream.maintenance_mode.{}", name_)), source_address_(getSourceAddress(config, source_address)), lb_ring_hash_config_(envoy::api::v2::Cluster::RingHashLbConfig(config.ring_hash_lb_config())), - ssl_context_manager_(ssl_context_manager), added_via_api_(added_via_api), + ssl_context_manager_(ssl_context_manager), added_via_api_(added_via_api), added_lazily_(added_lazily), lb_subset_(LoadBalancerSubsetInfoImpl(config.lb_subset_config())), metadata_(config.metadata()), common_lb_config_(config.common_lb_config()) { @@ -198,7 +199,7 @@ ClusterSharedPtr ClusterImplBase::create(const envoy::api::v2::Cluster& cluster, Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api) { + bool added_via_api, bool added_lazily) { std::unique_ptr new_cluster; // We make this a shared pointer to deal with the distinct ownership @@ -221,17 +222,17 @@ ClusterSharedPtr ClusterImplBase::create(const envoy::api::v2::Cluster& cluster, switch (cluster.type()) { case envoy::api::v2::Cluster::STATIC: new_cluster.reset( - new StaticClusterImpl(cluster, runtime, stats, ssl_context_manager, cm, added_via_api)); + new StaticClusterImpl(cluster, runtime, stats, ssl_context_manager, cm, added_via_api, added_lazily)); break; case envoy::api::v2::Cluster::STRICT_DNS: new_cluster.reset(new StrictDnsClusterImpl(cluster, runtime, stats, ssl_context_manager, selected_dns_resolver, cm, dispatcher, - added_via_api)); + added_via_api, added_lazily)); break; case envoy::api::v2::Cluster::LOGICAL_DNS: new_cluster.reset(new LogicalDnsCluster(cluster, runtime, stats, ssl_context_manager, selected_dns_resolver, tls, cm, dispatcher, - added_via_api)); + added_via_api, added_lazily)); break; case envoy::api::v2::Cluster::ORIGINAL_DST: if (cluster.lb_policy() != envoy::api::v2::Cluster::ORIGINAL_DST_LB) { @@ -243,7 +244,7 @@ ClusterSharedPtr ClusterImplBase::create(const envoy::api::v2::Cluster& cluster, "cluster: cluster type 'original_dst' may not be used with lb_subset_config")); } new_cluster.reset(new OriginalDstCluster(cluster, runtime, stats, ssl_context_manager, cm, - dispatcher, added_via_api)); + dispatcher, added_via_api, added_lazily)); break; case envoy::api::v2::Cluster::EDS: if (!cluster.has_eds_cluster_config()) { @@ -252,7 +253,7 @@ ClusterSharedPtr ClusterImplBase::create(const envoy::api::v2::Cluster& cluster, // We map SDS to EDS, since EDS provides backwards compatibility with SDS. new_cluster.reset(new EdsClusterImpl(cluster, runtime, stats, ssl_context_manager, local_info, - cm, dispatcher, random, added_via_api)); + cm, dispatcher, random, added_via_api, added_lazily)); break; default: NOT_REACHED; @@ -273,9 +274,9 @@ ClusterSharedPtr ClusterImplBase::create(const envoy::api::v2::Cluster& cluster, ClusterImplBase::ClusterImplBase(const envoy::api::v2::Cluster& cluster, const Network::Address::InstanceConstSharedPtr source_address, Runtime::Loader& runtime, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, bool added_via_api) + Ssl::ContextManager& ssl_context_manager, bool added_via_api, bool added_lazily) : runtime_(runtime), info_(new ClusterInfoImpl(cluster, source_address, runtime, stats, - ssl_context_manager, added_via_api)) { + ssl_context_manager, added_via_api, added_lazily)) { // Create the default (empty) priority set before registering callbacks to // avoid getting an update the first time it is accessed. priority_set_.getOrCreateHostSet(0); @@ -476,9 +477,9 @@ ClusterInfoImpl::ResourceManagers::load(const envoy::api::v2::Cluster& config, StaticClusterImpl::StaticClusterImpl(const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Stats::Store& stats, Ssl::ContextManager& ssl_context_manager, ClusterManager& cm, - bool added_via_api) + bool added_via_api, bool added_lazily) : ClusterImplBase(cluster, cm.sourceAddress(), runtime, stats, ssl_context_manager, - added_via_api), + added_via_api, added_lazily), initial_hosts_(new HostVector()) { for (const auto& host : cluster.hosts()) { @@ -596,9 +597,9 @@ StrictDnsClusterImpl::StrictDnsClusterImpl(const envoy::api::v2::Cluster& cluste Ssl::ContextManager& ssl_context_manager, Network::DnsResolverSharedPtr dns_resolver, ClusterManager& cm, Event::Dispatcher& dispatcher, - bool added_via_api) + bool added_via_api, bool added_lazily) : BaseDynamicClusterImpl(cluster, cm.sourceAddress(), runtime, stats, ssl_context_manager, - added_via_api), + added_via_api, added_lazily), dns_resolver_(dns_resolver), dns_refresh_rate_ms_( std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cluster, dns_refresh_rate, 5000))) { diff --git a/source/common/upstream/upstream_impl.h b/source/common/upstream/upstream_impl.h index 95a2facdd6933..d80c55fecba39 100644 --- a/source/common/upstream/upstream_impl.h +++ b/source/common/upstream/upstream_impl.h @@ -296,13 +296,15 @@ class ClusterInfoImpl : public ClusterInfo, ClusterInfoImpl(const envoy::api::v2::Cluster& config, const Network::Address::InstanceConstSharedPtr source_address, Runtime::Loader& runtime, Stats::Store& stats, - Ssl::ContextManager& ssl_context_manager, bool added_via_api); + Ssl::ContextManager& ssl_context_manager, bool added_via_api, + bool added_lazily_); static ClusterStats generateStats(Stats::Scope& scope); static ClusterLoadReportStats generateLoadReportStats(Stats::Scope& scope); // Upstream::ClusterInfo bool addedViaApi() const override { return added_via_api_; } + bool addedLazily() const override { return added_lazily_; } const envoy::api::v2::Cluster::CommonLbConfig& lbConfig() const override { return common_lb_config_; } @@ -371,6 +373,7 @@ class ClusterInfoImpl : public ClusterInfo, Optional lb_ring_hash_config_; Ssl::ContextManager& ssl_context_manager_; const bool added_via_api_; + const bool added_lazily_; LoadBalancerSubsetInfoImpl lb_subset_; const envoy::api::v2::core::Metadata metadata_; const envoy::api::v2::Cluster::CommonLbConfig common_lb_config_; @@ -389,7 +392,7 @@ class ClusterImplBase : public Cluster, protected Logger::Loggable