diff --git a/api/envoy/api/v2/cds.proto b/api/envoy/api/v2/cds.proto index 74ae29fb11b64..6332188175645 100644 --- a/api/envoy/api/v2/cds.proto +++ b/api/envoy/api/v2/cds.proto @@ -139,7 +139,6 @@ message Cluster { // Refer to :ref:`load balancer type ` architecture // overview section for information on each type. enum LbPolicy { - // Refer to the :ref:`round robin load balancing // policy` // for an explanation. @@ -168,6 +167,11 @@ message Cluster { // Refer to the :ref:`Maglev load balancing policy` // for an explanation. MAGLEV = 5; + + // This load balancer type must be specified if the configured cluster provides a cluster + // specific load balancer. Consult the configured cluster's documentation for whether to set + // this option or not. + CLUSTER_PROVIDED = 6; } // The :ref:`load balancer type ` to use // when picking a host in the cluster. diff --git a/include/envoy/upstream/cluster_factory.h b/include/envoy/upstream/cluster_factory.h index 86ff93c2cb810..fb2ad484b5ce6 100644 --- a/include/envoy/upstream/cluster_factory.h +++ b/include/envoy/upstream/cluster_factory.h @@ -128,10 +128,11 @@ class ClusterFactory { * with the provided parameters, it should throw an EnvoyException in the case of general error. * @param cluster supplies the general protobuf configuration for the cluster. * @param context supplies the cluster's context. - * @return ClusterSharedPtr the cluster instance. + * @return a pair containing the the cluster instance as well as an option thread aware load + * balancer if this cluster has an integrated load balancer. */ - virtual ClusterSharedPtr create(const envoy::api::v2::Cluster& cluster, - ClusterFactoryContext& context) PURE; + virtual std::pair + create(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context) PURE; /** * @return std::string the identifying name for a particular implementation of a cluster factory. diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 20a69617bfdf2..6e3fd6e36da18 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -301,10 +301,9 @@ class ClusterManagerFactory { /** * Allocate a cluster from configuration proto. */ - virtual ClusterSharedPtr clusterFromProto(const envoy::api::v2::Cluster& cluster, - ClusterManager& cm, - Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api) PURE; + virtual std::pair + clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, + Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) PURE; /** * Create a CDS API provider from configuration proto. diff --git a/include/envoy/upstream/load_balancer_type.h b/include/envoy/upstream/load_balancer_type.h index b7bb058c61f84..23c648918edff 100644 --- a/include/envoy/upstream/load_balancer_type.h +++ b/include/envoy/upstream/load_balancer_type.h @@ -15,7 +15,15 @@ namespace Upstream { /** * Type of load balancing to perform. */ -enum class LoadBalancerType { RoundRobin, LeastRequest, Random, RingHash, OriginalDst, Maglev }; +enum class LoadBalancerType { + RoundRobin, + LeastRequest, + Random, + RingHash, + OriginalDst, + Maglev, + ClusterProvided +}; /** * Load Balancer subset configuration. diff --git a/source/common/upstream/cluster_factory_impl.cc b/source/common/upstream/cluster_factory_impl.cc index ce4ad39dc1f9a..40b6855cf3e28 100644 --- a/source/common/upstream/cluster_factory_impl.cc +++ b/source/common/upstream/cluster_factory_impl.cc @@ -20,7 +20,7 @@ Stats::ScopePtr generateStatsScope(const envoy::api::v2::Cluster& config, Stats: } // namespace -ClusterSharedPtr ClusterFactoryImplBase::create( +std::pair ClusterFactoryImplBase::create( const envoy::api::v2::Cluster& cluster, ClusterManager& cluster_manager, Stats::Store& stats, ThreadLocal::Instance& tls, Network::DnsResolverSharedPtr dns_resolver, Ssl::ContextManager& ssl_context_manager, Runtime::Loader& runtime, @@ -90,8 +90,9 @@ ClusterFactoryImplBase::selectDnsResolver(const envoy::api::v2::Cluster& cluster return context.dnsResolver(); } -ClusterSharedPtr ClusterFactoryImplBase::create(const envoy::api::v2::Cluster& cluster, - ClusterFactoryContext& context) { +std::pair +ClusterFactoryImplBase::create(const envoy::api::v2::Cluster& cluster, + ClusterFactoryContext& context) { auto stats_scope = generateStatsScope(cluster, context.stats()); Server::Configuration::TransportSocketFactoryContextImpl factory_context( @@ -99,7 +100,7 @@ ClusterSharedPtr ClusterFactoryImplBase::create(const envoy::api::v2::Cluster& c context.localInfo(), context.dispatcher(), context.random(), context.stats(), context.singletonManager(), context.tls(), context.api()); - ClusterImplBaseSharedPtr new_cluster = + std::pair new_cluster_pair = createClusterImpl(cluster, context, factory_context, std::move(stats_scope)); if (!cluster.health_checks().empty()) { @@ -107,16 +108,16 @@ ClusterSharedPtr ClusterFactoryImplBase::create(const envoy::api::v2::Cluster& c if (cluster.health_checks().size() != 1) { throw EnvoyException("Multiple health checks not supported"); } else { - new_cluster->setHealthChecker(HealthCheckerFactory::create( - cluster.health_checks()[0], *new_cluster, context.runtime(), context.random(), + new_cluster_pair.first->setHealthChecker(HealthCheckerFactory::create( + cluster.health_checks()[0], *new_cluster_pair.first, context.runtime(), context.random(), context.dispatcher(), context.logManager())); } } - new_cluster->setOutlierDetector(Outlier::DetectorImplFactory::createForCluster( - *new_cluster, cluster, context.dispatcher(), context.runtime(), + new_cluster_pair.first->setOutlierDetector(Outlier::DetectorImplFactory::createForCluster( + *new_cluster_pair.first, cluster, context.dispatcher(), context.runtime(), context.outlierEventLogger())); - return new_cluster; + return new_cluster_pair; } } // namespace Upstream diff --git a/source/common/upstream/cluster_factory_impl.h b/source/common/upstream/cluster_factory_impl.h index 6777a2ba6b1ec..6dd3dd90c18d0 100644 --- a/source/common/upstream/cluster_factory_impl.h +++ b/source/common/upstream/cluster_factory_impl.h @@ -110,7 +110,7 @@ class ClusterFactoryImplBase : public ClusterFactory { /** * Static method to get the registered cluster factory and create an instance of cluster. */ - static ClusterSharedPtr + static std::pair create(const envoy::api::v2::Cluster& cluster, ClusterManager& cluster_manager, Stats::Store& stats, ThreadLocal::Instance& tls, Network::DnsResolverSharedPtr dns_resolver, Ssl::ContextManager& ssl_context_manager, @@ -126,8 +126,8 @@ class ClusterFactoryImplBase : public ClusterFactory { ClusterFactoryContext& context); // Upstream::ClusterFactory - ClusterSharedPtr create(const envoy::api::v2::Cluster& cluster, - ClusterFactoryContext& context) override; + std::pair + create(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context) override; std::string name() override { return name_; } protected: @@ -137,7 +137,7 @@ class ClusterFactoryImplBase : public ClusterFactory { /** * Create an instance of ClusterImplBase. */ - virtual ClusterImplBaseSharedPtr + virtual std::pair createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) PURE; @@ -161,7 +161,7 @@ template class ConfigurableClusterFactoryBase : public Clust ConfigurableClusterFactoryBase(const std::string& name) : ClusterFactoryImplBase(name) {} private: - virtual ClusterImplBaseSharedPtr + virtual std::pair createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) override { @@ -173,7 +173,7 @@ template class ConfigurableClusterFactoryBase : public Clust context, socket_factory_context, std::move(stats_scope)); } - virtual ClusterImplBaseSharedPtr createClusterWithConfig( + virtual std::pair createClusterWithConfig( const envoy::api::v2::Cluster& cluster, const ConfigProto& proto_config, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index e2d46ab64ab7e..5c1655a8fbe2d 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -580,8 +580,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, const std::string& version_info, bool added_via_api, ClusterMap& cluster_map) { - ClusterSharedPtr new_cluster = + std::pair new_cluster_pair = factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api); + auto& new_cluster = new_cluster_pair.first; + Cluster& cluster_reference = *new_cluster; if (!added_via_api) { if (cluster_map.find(new_cluster->info()->name()) != cluster_map.end()) { @@ -590,7 +592,21 @@ void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, } } - Cluster& cluster_reference = *new_cluster; + if (cluster_reference.info()->lbType() == LoadBalancerType::ClusterProvided && + new_cluster_pair.second == nullptr) { + throw EnvoyException(fmt::format("cluster manager: cluster provided LB specified but cluster " + "'{}' did not provide one. Check cluster documentation.", + new_cluster->info()->name())); + } + + if (cluster_reference.info()->lbType() != LoadBalancerType::ClusterProvided && + new_cluster_pair.second != nullptr) { + throw EnvoyException( + fmt::format("cluster manager: cluster provided LB not specified but cluster " + "'{}' provided one. Check cluster documentation.", + new_cluster->info()->name())); + } + if (new_cluster->healthChecker() != nullptr) { new_cluster->healthChecker()->addHostCheckCompleteCb( [this](HostSharedPtr host, HealthTransition changed_state) { @@ -625,6 +641,8 @@ void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster, cluster_reference.prioritySet(), cluster_reference.info()->stats(), cluster_reference.info()->statsScope(), runtime_, random_, cluster_reference.info()->lbConfig()); + } else if (cluster_reference.info()->lbType() == LoadBalancerType::ClusterProvided) { + cluster_entry_it->second->thread_aware_lb_ = std::move(new_cluster_pair.second); } updateGauges(); @@ -1076,6 +1094,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( parent.parent_.random_, cluster->lbConfig()); break; } + case LoadBalancerType::ClusterProvided: case LoadBalancerType::RingHash: case LoadBalancerType::Maglev: { ASSERT(lb_factory_ != nullptr); @@ -1223,7 +1242,7 @@ Tcp::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateTcpConnPool( new Tcp::ConnPoolImpl(dispatcher, host, priority, options, transport_socket_options)}; } -ClusterSharedPtr ProdClusterManagerFactory::clusterFromProto( +std::pair ProdClusterManagerFactory::clusterFromProto( const envoy::api::v2::Cluster& cluster, ClusterManager& cm, Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) { return ClusterFactoryImplBase::create( diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index a7ce5cb7b5bd4..f540400dd409b 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -62,9 +62,9 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { ResourcePriority priority, const Network::ConnectionSocket::OptionsSharedPtr& options, Network::TransportSocketOptionsSharedPtr transport_socket_options) override; - ClusterSharedPtr clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, - Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api) override; + std::pair + clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, + Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) override; CdsApiPtr createCds(const envoy::api::v2::core::ConfigSource& cds_config, ClusterManager& cm) override; Secret::SecretManager& secretManager() override { return secret_manager_; } diff --git a/source/common/upstream/eds.cc b/source/common/upstream/eds.cc index 4405dbbaeafc1..57d77c055a620 100644 --- a/source/common/upstream/eds.cc +++ b/source/common/upstream/eds.cc @@ -258,7 +258,8 @@ void EdsClusterImpl::onConfigUpdateFailed(const EnvoyException* e) { onPreInitComplete(); } -ClusterImplBaseSharedPtr EdsClusterFactory::createClusterImpl( +std::pair +EdsClusterFactory::createClusterImpl( const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) { @@ -266,8 +267,10 @@ ClusterImplBaseSharedPtr EdsClusterFactory::createClusterImpl( throw EnvoyException("cannot create an EDS cluster without an EDS config"); } - return std::make_unique(cluster, context.runtime(), socket_factory_context, - std::move(stats_scope), context.addedViaApi()); + return std::make_pair( + std::make_shared(cluster, context.runtime(), socket_factory_context, + std::move(stats_scope), context.addedViaApi()), + nullptr); } /** diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index f1cae5638829c..a1780871cf3f8 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -82,7 +82,7 @@ class EdsClusterFactory : public ClusterFactoryImplBase { EdsClusterFactory() : ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().Eds) {} private: - ClusterImplBaseSharedPtr + std::pair createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) override; diff --git a/source/common/upstream/logical_dns_cluster.cc b/source/common/upstream/logical_dns_cluster.cc index fa3bf93e2b885..41658a6b43c62 100644 --- a/source/common/upstream/logical_dns_cluster.cc +++ b/source/common/upstream/logical_dns_cluster.cc @@ -143,15 +143,17 @@ Upstream::Host::CreateConnectionData LogicalDnsCluster::LogicalHost::createConne shared_from_this(), parent_.symbolTable())}}; } -ClusterImplBaseSharedPtr LogicalDnsClusterFactory::createClusterImpl( +std::pair +LogicalDnsClusterFactory::createClusterImpl( const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) { auto selected_dns_resolver = selectDnsResolver(cluster, context); - return std::make_unique(cluster, context.runtime(), selected_dns_resolver, - context.tls(), socket_factory_context, - std::move(stats_scope), context.addedViaApi()); + return std::make_pair(std::make_shared( + cluster, context.runtime(), selected_dns_resolver, context.tls(), + socket_factory_context, std::move(stats_scope), context.addedViaApi()), + nullptr); } /** diff --git a/source/common/upstream/logical_dns_cluster.h b/source/common/upstream/logical_dns_cluster.h index 19fd052b3a38d..09173b89584bc 100644 --- a/source/common/upstream/logical_dns_cluster.h +++ b/source/common/upstream/logical_dns_cluster.h @@ -168,7 +168,7 @@ class LogicalDnsClusterFactory : public ClusterFactoryImplBase { : ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().LogicalDns) {} private: - ClusterImplBaseSharedPtr + std::pair createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) override; diff --git a/source/common/upstream/original_dst_cluster.cc b/source/common/upstream/original_dst_cluster.cc index c5c2dfe48a208..ae64c4e2c8ada 100644 --- a/source/common/upstream/original_dst_cluster.cc +++ b/source/common/upstream/original_dst_cluster.cc @@ -186,7 +186,8 @@ void OriginalDstCluster::cleanup() { cleanup_timer_->enableTimer(cleanup_interval_ms_); } -ClusterImplBaseSharedPtr OriginalDstClusterFactory::createClusterImpl( +std::pair +OriginalDstClusterFactory::createClusterImpl( const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) { @@ -199,8 +200,13 @@ ClusterImplBaseSharedPtr OriginalDstClusterFactory::createClusterImpl( fmt::format("cluster: cluster type 'original_dst' may not be used with lb_subset_config")); } - return std::make_unique(cluster, context.runtime(), socket_factory_context, - std::move(stats_scope), context.addedViaApi()); + // TODO(mattklein123): The original DST load balancer type should be deprecated and instead + // the cluster should directly supply the load balancer. This will remove + // a special case and allow this cluster to be compiled out as an extension. + return std::make_pair( + std::make_shared(cluster, context.runtime(), socket_factory_context, + std::move(stats_scope), context.addedViaApi()), + nullptr); } /** diff --git a/source/common/upstream/original_dst_cluster.h b/source/common/upstream/original_dst_cluster.h index a061e8ee18475..64226fc71db4f 100644 --- a/source/common/upstream/original_dst_cluster.h +++ b/source/common/upstream/original_dst_cluster.h @@ -125,7 +125,7 @@ class OriginalDstClusterFactory : public ClusterFactoryImplBase { : ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().OriginalDst) {} private: - ClusterImplBaseSharedPtr + std::pair createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) override; diff --git a/source/common/upstream/static_cluster.cc b/source/common/upstream/static_cluster.cc index c5586e4ee8e08..9ed6044d990df 100644 --- a/source/common/upstream/static_cluster.cc +++ b/source/common/upstream/static_cluster.cc @@ -50,12 +50,15 @@ void StaticClusterImpl::startPreInit() { onPreInitComplete(); } -ClusterImplBaseSharedPtr StaticClusterFactory::createClusterImpl( +std::pair +StaticClusterFactory::createClusterImpl( const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) { - return std::make_unique(cluster, context.runtime(), socket_factory_context, - std::move(stats_scope), context.addedViaApi()); + return std::make_pair( + std::make_shared(cluster, context.runtime(), socket_factory_context, + std::move(stats_scope), context.addedViaApi()), + nullptr); } /** diff --git a/source/common/upstream/static_cluster.h b/source/common/upstream/static_cluster.h index 984fd5fd7c948..f8a440d72f126 100644 --- a/source/common/upstream/static_cluster.h +++ b/source/common/upstream/static_cluster.h @@ -36,7 +36,7 @@ class StaticClusterFactory : public ClusterFactoryImplBase { : ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().Static) {} private: - ClusterImplBaseSharedPtr + std::pair createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) override; diff --git a/source/common/upstream/strict_dns_cluster.cc b/source/common/upstream/strict_dns_cluster.cc index 8914eb6553583..f68c748772261 100644 --- a/source/common/upstream/strict_dns_cluster.cc +++ b/source/common/upstream/strict_dns_cluster.cc @@ -134,15 +134,17 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() { }); } -ClusterImplBaseSharedPtr StrictDnsClusterFactory::createClusterImpl( +std::pair +StrictDnsClusterFactory::createClusterImpl( const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) { auto selected_dns_resolver = selectDnsResolver(cluster, context); - return std::make_unique(cluster, context.runtime(), selected_dns_resolver, - socket_factory_context, std::move(stats_scope), - context.addedViaApi()); + return std::make_pair(std::make_shared( + cluster, context.runtime(), selected_dns_resolver, + socket_factory_context, std::move(stats_scope), context.addedViaApi()), + nullptr); } /** diff --git a/source/common/upstream/strict_dns_cluster.h b/source/common/upstream/strict_dns_cluster.h index 24496dd5a79f3..63f1a4bd647fb 100644 --- a/source/common/upstream/strict_dns_cluster.h +++ b/source/common/upstream/strict_dns_cluster.h @@ -65,7 +65,7 @@ class StrictDnsClusterFactory : public ClusterFactoryImplBase { : ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().StrictDns) {} private: - ClusterImplBaseSharedPtr + std::pair createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) override; diff --git a/source/common/upstream/subset_lb.cc b/source/common/upstream/subset_lb.cc index 56e048d265fe7..f259d930f3d55 100644 --- a/source/common/upstream/subset_lb.cc +++ b/source/common/upstream/subset_lb.cc @@ -501,6 +501,9 @@ SubsetLoadBalancer::PrioritySubsetImpl::PrioritySubsetImpl(const SubsetLoadBalan break; case LoadBalancerType::OriginalDst: + case LoadBalancerType::ClusterProvided: + // LoadBalancerType::OriginalDst is blocked in the factory. LoadBalancerType::ClusterProvided + // is impossible because the subset LB returns a null load balancer from its factory. NOT_REACHED_GCOVR_EXCL_LINE; } diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index a1a6c9455cb5c..6b534d93d62f6 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -594,6 +594,9 @@ ClusterInfoImpl::ClusterInfoImpl(const envoy::api::v2::Cluster& config, case envoy::api::v2::Cluster::MAGLEV: lb_type_ = LoadBalancerType::Maglev; break; + case envoy::api::v2::Cluster::CLUSTER_PROVIDED: + lb_type_ = LoadBalancerType::ClusterProvided; + break; default: NOT_REACHED_GCOVR_EXCL_LINE; } diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 4a141e04a89e4..fcbc05e18864c 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -308,7 +308,8 @@ void RedisCluster::RedisDiscoverySession::onFailure() { RedisCluster::ClusterSlotsRequest RedisCluster::ClusterSlotsRequest::instance_; -Upstream::ClusterImplBaseSharedPtr RedisClusterFactory::createClusterWithConfig( +std::pair +RedisClusterFactory::createClusterWithConfig( const envoy::api::v2::Cluster& cluster, const envoy::config::cluster::redis::RedisClusterConfig& proto_config, Upstream::ClusterFactoryContext& context, @@ -318,11 +319,15 @@ Upstream::ClusterImplBaseSharedPtr RedisClusterFactory::createClusterWithConfig( cluster.cluster_type().name() != Extensions::Clusters::ClusterTypes::get().Redis) { throw EnvoyException("Redis cluster can only created with redis cluster type"); } - return std::make_shared( - cluster, proto_config, NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_, - context.clusterManager(), context.runtime(), context.api(), - selectDnsResolver(cluster, context), socket_factory_context, std::move(stats_scope), - context.addedViaApi()); + // TODO(Henry): Implement a thread aware load balancer for Redis Cluster. This can come from + // inside the created cluster. + return std::make_pair(std::make_shared( + cluster, proto_config, + NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_, + context.clusterManager(), context.runtime(), context.api(), + selectDnsResolver(cluster, context), socket_factory_context, + std::move(stats_scope), context.addedViaApi()), + nullptr); } REGISTER_FACTORY(RedisClusterFactory, Upstream::ClusterFactory); diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index e0872bb27b698..7ac7cc071cca6 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -278,7 +278,8 @@ class RedisClusterFactory : public Upstream::ConfigurableClusterFactoryBase< private: friend class RedisClusterTest; - Upstream::ClusterImplBaseSharedPtr createClusterWithConfig( + std::pair + createClusterWithConfig( const envoy::api::v2::Cluster& cluster, const envoy::config::cluster::redis::RedisClusterConfig& proto_config, Upstream::ClusterFactoryContext& context, diff --git a/source/server/server.cc b/source/server/server.cc index 70a763e558a98..15d41ccc07bfe 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -594,15 +594,15 @@ void InstanceImpl::shutdownAdmin() { ServerLifecycleNotifier::HandlePtr InstanceImpl::registerCallback(Stage stage, StageCallback callback) { auto& callbacks = stage_callbacks_[stage]; - return absl::make_unique>(callbacks, callback); + return std::make_unique>(callbacks, callback); } ServerLifecycleNotifier::HandlePtr InstanceImpl::registerCallback(Stage stage, StageCallbackWithCompletion callback) { ASSERT(stage == Stage::ShutdownExit); auto& callbacks = stage_completable_callbacks_[stage]; - return absl::make_unique>(callbacks, - callback); + return std::make_unique>(callbacks, + callback); } void InstanceImpl::notifyCallbacksForStage(Stage stage, Event::PostCb completion_cb) { diff --git a/test/common/upstream/cluster_factory_impl_test.cc b/test/common/upstream/cluster_factory_impl_test.cc index 6ed080956e141..b21de07d9d214 100644 --- a/test/common/upstream/cluster_factory_impl_test.cc +++ b/test/common/upstream/cluster_factory_impl_test.cc @@ -38,13 +38,14 @@ class TestStaticClusterFactory : public ClusterFactoryImplBase { public: TestStaticClusterFactory() : ClusterFactoryImplBase("envoy.clusters.test_static") {} - ClusterImplBaseSharedPtr + std::pair createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) override { - return std::make_unique(cluster, context.runtime(), socket_factory_context, - std::move(stats_scope), context.addedViaApi(), 1, - "127.0.0.1", 80); + return std::make_pair(std::make_shared( + cluster, context.runtime(), socket_factory_context, + std::move(stats_scope), context.addedViaApi(), 1, "127.0.0.1", 80), + nullptr); } }; @@ -90,10 +91,11 @@ TEST_F(TestStaticClusterImplTest, CreateWithoutConfig) { Registry::InjectFactory registered_factory(factory); const envoy::api::v2::Cluster cluster_config = parseClusterFromV2Yaml(yaml); - auto cluster = ClusterFactoryImplBase::create( + auto create_result = ClusterFactoryImplBase::create( cluster_config, cm_, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_, random_, dispatcher_, log_manager_, local_info_, admin_, singleton_manager_, std::move(outlier_event_logger_), false, *api_); + auto cluster = create_result.first; cluster->initialize([] {}); EXPECT_EQ(1UL, cluster->prioritySet().hostSetsPerPriority()[1]->healthyHosts().size()); @@ -130,10 +132,11 @@ TEST_F(TestStaticClusterImplTest, CreateWithStructConfig) { )EOF"; const envoy::api::v2::Cluster cluster_config = parseClusterFromV2Yaml(yaml); - auto cluster = ClusterFactoryImplBase::create( + auto create_result = ClusterFactoryImplBase::create( cluster_config, cm_, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_, random_, dispatcher_, log_manager_, local_info_, admin_, singleton_manager_, std::move(outlier_event_logger_), false, *api_); + auto cluster = create_result.first; cluster->initialize([] {}); EXPECT_EQ(1UL, cluster->prioritySet().hostSetsPerPriority()[10]->healthyHosts().size()); @@ -168,10 +171,11 @@ TEST_F(TestStaticClusterImplTest, CreateWithTypedConfig) { )EOF"; const envoy::api::v2::Cluster cluster_config = parseClusterFromV2Yaml(yaml); - auto cluster = ClusterFactoryImplBase::create( + auto create_result = ClusterFactoryImplBase::create( cluster_config, cm_, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_, random_, dispatcher_, log_manager_, local_info_, admin_, singleton_manager_, std::move(outlier_event_logger_), false, *api_); + auto cluster = create_result.first; cluster->initialize([] {}); EXPECT_EQ(1UL, cluster->prioritySet().hostSetsPerPriority()[10]->healthyHosts().size()); @@ -206,11 +210,10 @@ TEST_F(TestStaticClusterImplTest, UnsupportedClusterType) { EXPECT_THROW_WITH_MESSAGE( { const envoy::api::v2::Cluster cluster_config = parseClusterFromV2Yaml(yaml); - auto cluster = ClusterFactoryImplBase::create( - cluster_config, cm_, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_, - random_, dispatcher_, log_manager_, local_info_, admin_, singleton_manager_, - std::move(outlier_event_logger_), false, *api_); - cluster->initialize([] {}); + ClusterFactoryImplBase::create(cluster_config, cm_, stats_, tls_, dns_resolver_, + ssl_context_manager_, runtime_, random_, dispatcher_, + log_manager_, local_info_, admin_, singleton_manager_, + std::move(outlier_event_logger_), false, *api_); }, EnvoyException, "Didn't find a registered cluster factory implementation for name: " diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 9f3c6aae2bf02..d45d857b0e5ef 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -61,14 +61,17 @@ class TestClusterManagerFactory : public ClusterManagerFactory { public: TestClusterManagerFactory() : api_(Api::createApiForTest(stats_)) { ON_CALL(*this, clusterFromProto_(_, _, _, _)) - .WillByDefault(Invoke([&](const envoy::api::v2::Cluster& cluster, ClusterManager& cm, - Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api) -> ClusterSharedPtr { - return ClusterFactoryImplBase::create( - cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_, random_, - dispatcher_, log_manager_, local_info_, admin_, singleton_manager_, - outlier_event_logger, added_via_api, *api_); - })); + .WillByDefault(Invoke( + [&](const envoy::api::v2::Cluster& cluster, ClusterManager& cm, + Outlier::EventLoggerSharedPtr outlier_event_logger, + bool added_via_api) -> std::pair { + auto result = ClusterFactoryImplBase::create( + cluster, cm, stats_, tls_, dns_resolver_, ssl_context_manager_, runtime_, random_, + dispatcher_, log_manager_, local_info_, admin_, singleton_manager_, + outlier_event_logger, added_via_api, *api_); + // Convert from load balancer unique_ptr -> raw pointer -> unique_ptr. + return std::make_pair(result.first, result.second.release()); + })); } Http::ConnectionPool::InstancePtr @@ -84,10 +87,12 @@ class TestClusterManagerFactory : public ClusterManagerFactory { return Tcp::ConnectionPool::InstancePtr{allocateTcpConnPool_(host)}; } - ClusterSharedPtr clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, - Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api) override { - return clusterFromProto_(cluster, cm, outlier_event_logger, added_via_api); + std::pair + clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, + Outlier::EventLoggerSharedPtr outlier_event_logger, + bool added_via_api) override { + auto result = clusterFromProto_(cluster, cm, outlier_event_logger, added_via_api); + return std::make_pair(result.first, ThreadAwareLoadBalancerPtr(result.second)); } CdsApiPtr createCds(const envoy::api::v2::core::ConfigSource&, ClusterManager&) override { @@ -108,9 +113,9 @@ class TestClusterManagerFactory : public ClusterManagerFactory { Network::ConnectionSocket::OptionsSharedPtr)); MOCK_METHOD1(allocateTcpConnPool_, Tcp::ConnectionPool::Instance*(HostConstSharedPtr host)); MOCK_METHOD4(clusterFromProto_, - ClusterSharedPtr(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, - Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api)); + std::pair( + const envoy::api::v2::Cluster& cluster, ClusterManager& cm, + Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api)); MOCK_METHOD0(createCds_, CdsApi*()); Stats::IsolatedStoreImpl stats_; @@ -702,6 +707,35 @@ TEST_F(ClusterManagerImplTest, EdsClustersRequireEdsConfig) { "cannot create an EDS cluster without an EDS config"); } +// Verify that specifying a cluster provided LB, but the cluster doesn't provide one is an error. +TEST_F(ClusterManagerImplTest, ClusterProvidedLbNoLb) { + const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", + clustersJson({defaultStaticClusterJson("cluster_0")})); + + std::shared_ptr cluster1(new NiceMock()); + cluster1->info_->name_ = "cluster_0"; + cluster1->info_->lb_type_ = LoadBalancerType::ClusterProvided; + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); + EXPECT_THROW_WITH_MESSAGE(create(parseBootstrapFromV2Json(json)), EnvoyException, + "cluster manager: cluster provided LB specified but cluster " + "'cluster_0' did not provide one. Check cluster documentation."); +} + +// Verify that not specifying a cluster provided LB, but the cluster does provide one is an error. +TEST_F(ClusterManagerImplTest, ClusterProvidedLbNotConfigured) { + const std::string json = fmt::sprintf("{\"static_resources\":{%s}}", + clustersJson({defaultStaticClusterJson("cluster_0")})); + + std::shared_ptr cluster1(new NiceMock()); + cluster1->info_->name_ = "cluster_0"; + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, new MockThreadAwareLoadBalancer()))); + EXPECT_THROW_WITH_MESSAGE(create(parseBootstrapFromV2Json(json)), EnvoyException, + "cluster manager: cluster provided LB not specified but cluster " + "'cluster_0' provided one. Check cluster documentation."); +} + class ClusterManagerImplThreadAwareLbTest : public ClusterManagerImplTest { public: void doTest(LoadBalancerType lb_type) { @@ -714,7 +748,8 @@ class ClusterManagerImplThreadAwareLbTest : public ClusterManagerImplTest { cluster1->info_->lb_type_ = lb_type; InSequence s; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); ON_CALL(*cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); create(parseBootstrapFromV2Json(json)); @@ -927,11 +962,14 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { // This part tests static init. InSequence s; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cds_cluster)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cds_cluster, nullptr))); ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); ON_CALL(*cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster2)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster2, nullptr))); ON_CALL(*cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds)); EXPECT_CALL(*cds, setInitializedCb(_)); @@ -958,18 +996,21 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) { std::shared_ptr cluster5(new NiceMock()); cluster5->info_->name_ = "cluster5"; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster3)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster3, nullptr))); ON_CALL(*cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster3"), "version1", dummyWarmingCb); - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster4)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster4, nullptr))); ON_CALL(*cluster4, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); EXPECT_CALL(*cluster4, initialize(_)); cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster4"), "version2", dummyWarmingCb); - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster5)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster5, nullptr))); ON_CALL(*cluster5, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster5"), "version3", dummyWarmingCb); @@ -1089,7 +1130,8 @@ TEST_F(ClusterManagerImplTest, DynamicRemoveWithLocalCluster) { std::shared_ptr foo(new NiceMock()); foo->info_->name_ = "foo"; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, false)).WillOnce(Return(foo)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, false)) + .WillOnce(Return(std::make_pair(foo, nullptr))); ON_CALL(*foo, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); EXPECT_CALL(*foo, initialize(_)); @@ -1100,7 +1142,8 @@ TEST_F(ClusterManagerImplTest, DynamicRemoveWithLocalCluster) { // cluster in its load balancer. std::shared_ptr cluster1(new NiceMock()); cluster1->info_->name_ = "cluster1"; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, true)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, true)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); ON_CALL(*cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); EXPECT_CALL(*cluster1, initialize(_)); cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster1"), "", dummyWarmingCb); @@ -1138,7 +1181,8 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) { cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); std::shared_ptr cluster1(new NiceMock()); - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(*cluster1, initializePhase()).Times(0); EXPECT_CALL(*cluster1, initialize(_)); EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(defaultStaticCluster("fake_cluster"), "version1", @@ -1177,7 +1221,8 @@ TEST_F(ClusterManagerImplTest, ShutdownWithWarming) { cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); std::shared_ptr cluster1(new NiceMock()); - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(*cluster1, initializePhase()).Times(0); EXPECT_CALL(*cluster1, initialize(_)); EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(defaultStaticCluster("fake_cluster"), "version1", @@ -1205,7 +1250,8 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { int warming_cb_calls = 0; ClusterManager::ClusterWarmingState last_warming_state = ClusterManager::ClusterWarmingState::Starting; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(*cluster1, initializePhase()).Times(0); EXPECT_CALL(*cluster1, initialize(_)); EXPECT_CALL(*callbacks, onClusterAddOrUpdate(_)).Times(1); @@ -1239,7 +1285,8 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) { std::shared_ptr cluster2(new NiceMock()); cluster2->prioritySet().getMockHostSet(0)->hosts_ = { makeTestHost(cluster2->info_, "tcp://127.0.0.1:80")}; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster2)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster2, nullptr))); EXPECT_CALL(*cluster2, initializePhase()).Times(0); EXPECT_CALL(*cluster2, initialize(_)) .WillOnce(Invoke([cluster1](std::function initialize_callback) { @@ -1305,7 +1352,8 @@ TEST_F(ClusterManagerImplTest, addOrUpdateClusterStaticExists) { clustersJson({defaultStaticClusterJson("fake_cluster")})); std::shared_ptr cluster1(new NiceMock()); InSequence s; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); ON_CALL(*cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); EXPECT_CALL(*cluster1, initialize(_)); @@ -1334,7 +1382,8 @@ TEST_F(ClusterManagerImplTest, HostsPostedToTlsCluster) { clustersJson({defaultStaticClusterJson("fake_cluster")})); std::shared_ptr cluster1(new NiceMock()); InSequence s; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); ON_CALL(*cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); EXPECT_CALL(*cluster1, initialize(_)); @@ -1397,7 +1446,8 @@ TEST_F(ClusterManagerImplTest, CloseHttpConnectionsOnHealthFailure) { { InSequence s; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(health_checker, addHostCheckCompleteCb(_)); EXPECT_CALL(outlier_detector, addChangedStateCb(_)); EXPECT_CALL(*cluster1, initialize(_)) @@ -1459,7 +1509,8 @@ TEST_F(ClusterManagerImplTest, CloseTcpConnectionPoolsOnHealthFailure) { { InSequence s; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(health_checker, addHostCheckCompleteCb(_)); EXPECT_CALL(outlier_detector, addChangedStateCb(_)); EXPECT_CALL(*cluster1, initialize(_)) @@ -1531,7 +1582,8 @@ TEST_F(ClusterManagerImplTest, CloseTcpConnectionsOnHealthFailure) { { InSequence s; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(health_checker, addHostCheckCompleteCb(_)); EXPECT_CALL(outlier_detector, addChangedStateCb(_)); EXPECT_CALL(*cluster1, initialize(_)) @@ -1603,7 +1655,8 @@ TEST_F(ClusterManagerImplTest, DoNotCloseTcpConnectionsOnHealthFailure) { Network::MockClientConnection* connection1 = new NiceMock(); Host::CreateConnectionData conn_info1; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)).WillOnce(Return(cluster1)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); EXPECT_CALL(health_checker, addHostCheckCompleteCb(_)); EXPECT_CALL(outlier_detector, addChangedStateCb(_)); EXPECT_CALL(*cluster1, initialize(_)) @@ -2514,7 +2567,8 @@ TEST_F(ClusterManagerImplTest, MergedUpdatesDestroyedOnUpdate) { // Update the cluster, which should cancel the pending updates. std::shared_ptr updated(new NiceMock()); updated->info_->name_ = "new_cluster"; - EXPECT_CALL(factory_, clusterFromProto_(_, _, _, true)).WillOnce(Return(updated)); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, true)) + .WillOnce(Return(std::make_pair(updated, nullptr))); const std::string yaml_updated = R"EOF( name: new_cluster diff --git a/test/integration/clusters/custom_static_cluster.cc b/test/integration/clusters/custom_static_cluster.cc index d4dae7cce4da9..b94c0314fd0be 100644 --- a/test/integration/clusters/custom_static_cluster.cc +++ b/test/integration/clusters/custom_static_cluster.cc @@ -4,12 +4,10 @@ namespace Envoy { // ClusterImplBase void CustomStaticCluster::startPreInit() { - printf("startPreInit"); - Upstream::HostSharedPtr host = makeHost(); - Upstream::HostVector hosts{host}; + Upstream::HostVector hosts{host_}; auto hosts_ptr = std::make_shared(hosts); - this->priority_set_.updateHosts( + priority_set_.updateHosts( priority_, Upstream::HostSetImpl::partitionHosts(hosts_ptr, Upstream::HostsPerLocalityImpl::empty()), {}, hosts, {}, absl::nullopt); @@ -17,16 +15,21 @@ void CustomStaticCluster::startPreInit() { onPreInitComplete(); } -inline Upstream::HostSharedPtr CustomStaticCluster::makeHost() { +Upstream::HostSharedPtr CustomStaticCluster::makeHost() { Network::Address::InstanceConstSharedPtr address = Network::Utility::parseInternetAddress(address_, port_, true); return Upstream::HostSharedPtr{new Upstream::HostImpl( - this->info(), "", address, this->info()->metadata(), 1, + info(), "", address, info()->metadata(), 1, envoy::api::v2::core::Locality::default_instance(), envoy::api::v2::endpoint::Endpoint::HealthCheckConfig::default_instance(), priority_, envoy::api::v2::core::HealthStatus::UNKNOWN)}; } -REGISTER_FACTORY(CustomStaticClusterFactory, Upstream::ClusterFactory); +Upstream::ThreadAwareLoadBalancerPtr CustomStaticCluster::threadAwareLb() { + return std::make_unique(host_); +} + +REGISTER_FACTORY(CustomStaticClusterFactoryNoLb, Upstream::ClusterFactory); +REGISTER_FACTORY(CustomStaticClusterFactoryWithLb, Upstream::ClusterFactory); } // namespace Envoy diff --git a/test/integration/clusters/custom_static_cluster.h b/test/integration/clusters/custom_static_cluster.h index d3cf39edddd21..f3e9d3fd7c09f 100644 --- a/test/integration/clusters/custom_static_cluster.h +++ b/test/integration/clusters/custom_static_cluster.h @@ -27,38 +27,89 @@ class CustomStaticCluster : public Upstream::ClusterImplBase { Stats::ScopePtr&& stats_scope, bool added_via_api, uint32_t priority, std::string address, uint32_t port) : ClusterImplBase(cluster, runtime, factory_context, std::move(stats_scope), added_via_api), - priority_(priority), address_(std::move(address)), port_(port) {} + priority_(priority), address_(std::move(address)), port_(port), host_(makeHost()) {} InitializePhase initializePhase() const override { return InitializePhase::Primary; } private: + struct LbImpl : public Upstream::LoadBalancer { + LbImpl(const Upstream::HostSharedPtr& host) : host_(host) {} + + Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext*) override { + return host_; + } + + const Upstream::HostSharedPtr host_; + }; + + struct LbFactory : public Upstream::LoadBalancerFactory { + LbFactory(const Upstream::HostSharedPtr& host) : host_(host) {} + + Upstream::LoadBalancerPtr create() override { return std::make_unique(host_); } + + const Upstream::HostSharedPtr host_; + }; + + struct ThreadAwareLbImpl : public Upstream::ThreadAwareLoadBalancer { + ThreadAwareLbImpl(const Upstream::HostSharedPtr& host) : host_(host) {} + + Upstream::LoadBalancerFactorySharedPtr factory() override { + return std::make_shared(host_); + } + void initialize() override {} + + const Upstream::HostSharedPtr host_; + }; + + Upstream::ThreadAwareLoadBalancerPtr threadAwareLb(); + // ClusterImplBase void startPreInit() override; - inline Upstream::HostSharedPtr makeHost(); + Upstream::HostSharedPtr makeHost(); const uint32_t priority_; const std::string address_; const uint32_t port_; + const Upstream::HostSharedPtr host_; + + friend class CustomStaticClusterFactoryBase; }; -class CustomStaticClusterFactory : public Upstream::ConfigurableClusterFactoryBase< - test::integration::clusters::CustomStaticConfig> { -public: - CustomStaticClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.custom_static") {} +class CustomStaticClusterFactoryBase : public Upstream::ConfigurableClusterFactoryBase< + test::integration::clusters::CustomStaticConfig> { +protected: + CustomStaticClusterFactoryBase(const std::string& name, bool create_lb) + : ConfigurableClusterFactoryBase(name), create_lb_(create_lb) {} private: - Upstream::ClusterImplBaseSharedPtr createClusterWithConfig( + std::pair + createClusterWithConfig( const envoy::api::v2::Cluster& cluster, const test::integration::clusters::CustomStaticConfig& proto_config, Upstream::ClusterFactoryContext& context, Server::Configuration::TransportSocketFactoryContext& socket_factory_context, Stats::ScopePtr&& stats_scope) override { - return std::make_unique(cluster, context.runtime(), socket_factory_context, - std::move(stats_scope), context.addedViaApi(), - proto_config.priority(), proto_config.address(), - proto_config.port_value()); + auto new_cluster = std::make_shared( + cluster, context.runtime(), socket_factory_context, std::move(stats_scope), + context.addedViaApi(), proto_config.priority(), proto_config.address(), + proto_config.port_value()); + return std::make_pair(new_cluster, create_lb_ ? new_cluster->threadAwareLb() : nullptr); } + + const bool create_lb_; +}; + +class CustomStaticClusterFactoryNoLb : public CustomStaticClusterFactoryBase { +public: + CustomStaticClusterFactoryNoLb() + : CustomStaticClusterFactoryBase("envoy.clusters.custom_static", false) {} +}; + +class CustomStaticClusterFactoryWithLb : public CustomStaticClusterFactoryBase { +public: + CustomStaticClusterFactoryWithLb() + : CustomStaticClusterFactoryBase("envoy.clusters.custom_static_with_lb", true) {} }; } // namespace Envoy \ No newline at end of file diff --git a/test/integration/custom_cluster_integration_test.cc b/test/integration/custom_cluster_integration_test.cc index 4d931d0a94733..d07886601a5b7 100644 --- a/test/integration/custom_cluster_integration_test.cc +++ b/test/integration/custom_cluster_integration_test.cc @@ -28,8 +28,13 @@ class CustomClusterIntegrationTest : public testing::TestWithParamclear_hosts(); + if (cluster_provided_lb_) { + cluster_0->set_lb_policy(envoy::api::v2::Cluster::CLUSTER_PROVIDED); + } + envoy::api::v2::Cluster_CustomClusterType cluster_type; - cluster_type.set_name("envoy.clusters.custom_static"); + cluster_type.set_name(cluster_provided_lb_ ? "envoy.clusters.custom_static_with_lb" + : "envoy.clusters.custom_static"); test::integration::clusters::CustomStaticConfig config; config.set_priority(10); config.set_address(Network::Test::getLoopbackAddressString(ipVersion())); @@ -43,6 +48,7 @@ class CustomClusterIntegrationTest : public testing::TestWithParamhealthyHosts().size()); EXPECT_EQ(10, host_set->priority()); } + } // namespace } // namespace Envoy diff --git a/test/mocks/upstream/mocks.cc b/test/mocks/upstream/mocks.cc index f8c426e856c92..35f96382ac940 100644 --- a/test/mocks/upstream/mocks.cc +++ b/test/mocks/upstream/mocks.cc @@ -115,9 +115,11 @@ MockClusterMockPrioritySet::MockClusterMockPrioritySet() = default; MockClusterMockPrioritySet::~MockClusterMockPrioritySet() = default; MockLoadBalancer::MockLoadBalancer() { ON_CALL(*this, chooseHost(_)).WillByDefault(Return(host_)); } - MockLoadBalancer::~MockLoadBalancer() = default; +MockThreadAwareLoadBalancer::MockThreadAwareLoadBalancer() = default; +MockThreadAwareLoadBalancer::~MockThreadAwareLoadBalancer() = default; + MockThreadLocalCluster::MockThreadLocalCluster() { ON_CALL(*this, prioritySet()).WillByDefault(ReturnRef(cluster_.priority_set_)); ON_CALL(*this, info()).WillByDefault(Return(cluster_.info_)); diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index a93e90b237728..a5f152b4d3f71 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -215,6 +215,16 @@ class MockLoadBalancer : public LoadBalancer { std::shared_ptr host_{new MockHost()}; }; +class MockThreadAwareLoadBalancer : public ThreadAwareLoadBalancer { +public: + MockThreadAwareLoadBalancer(); + ~MockThreadAwareLoadBalancer(); + + // Upstream::ThreadAwareLoadBalancer + MOCK_METHOD0(factory, LoadBalancerFactorySharedPtr()); + MOCK_METHOD0(initialize, void()); +}; + class MockThreadLocalCluster : public ThreadLocalCluster { public: MockThreadLocalCluster(); @@ -251,9 +261,9 @@ class MockClusterManagerFactory : public ClusterManagerFactory { Network::TransportSocketOptionsSharedPtr)); MOCK_METHOD4(clusterFromProto, - ClusterSharedPtr(const envoy::api::v2::Cluster& cluster, ClusterManager& cm, - Outlier::EventLoggerSharedPtr outlier_event_logger, - bool added_via_api)); + std::pair( + const envoy::api::v2::Cluster& cluster, ClusterManager& cm, + Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api)); MOCK_METHOD2(createCds, CdsApiPtr(const envoy::api::v2::core::ConfigSource& cds_config, ClusterManager& cm));