Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ message Cluster {
// Refer to :ref:`load balancer type <arch_overview_load_balancing_types>` architecture
// overview section for information on each type.
enum LbPolicy {

// Refer to the :ref:`round robin load balancing
// policy<arch_overview_load_balancing_types_round_robin>`
// for an explanation.
Expand Down Expand Up @@ -168,6 +167,11 @@ message Cluster {
// Refer to the :ref:`Maglev load balancing policy<arch_overview_load_balancing_types_maglev>`
// for an explanation.
MAGLEV = 5;

// This load balancer type must be specified if the configured cluster provides a cluster
// specific load balancer. Consult the configured cluster's documentation for whether to set
// this option or not.
CLUSTER_PROVIDED = 6;
}
// The :ref:`load balancer type <arch_overview_load_balancing_types>` to use
// when picking a host in the cluster.
Expand Down
7 changes: 4 additions & 3 deletions include/envoy/upstream/cluster_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ class ClusterFactory {
* with the provided parameters, it should throw an EnvoyException in the case of general error.
* @param cluster supplies the general protobuf configuration for the cluster.
* @param context supplies the cluster's context.
* @return ClusterSharedPtr the cluster instance.
* @return a pair containing the the cluster instance as well as an option thread aware load
* balancer if this cluster has an integrated load balancer.
*/
virtual ClusterSharedPtr create(const envoy::api::v2::Cluster& cluster,
ClusterFactoryContext& context) PURE;
virtual std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>
create(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context) PURE;

/**
* @return std::string the identifying name for a particular implementation of a cluster factory.
Expand Down
7 changes: 3 additions & 4 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,9 @@ class ClusterManagerFactory {
/**
* Allocate a cluster from configuration proto.
*/
virtual ClusterSharedPtr clusterFromProto(const envoy::api::v2::Cluster& cluster,
ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger,
bool added_via_api) PURE;
virtual std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>
clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) PURE;

/**
* Create a CDS API provider from configuration proto.
Expand Down
10 changes: 9 additions & 1 deletion include/envoy/upstream/load_balancer_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ namespace Upstream {
/**
* Type of load balancing to perform.
*/
enum class LoadBalancerType { RoundRobin, LeastRequest, Random, RingHash, OriginalDst, Maglev };
enum class LoadBalancerType {
RoundRobin,
LeastRequest,
Random,
RingHash,
OriginalDst,
Maglev,
ClusterProvided
};

/**
* Load Balancer subset configuration.
Expand Down
19 changes: 10 additions & 9 deletions source/common/upstream/cluster_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Stats::ScopePtr generateStatsScope(const envoy::api::v2::Cluster& config, Stats:

} // namespace

ClusterSharedPtr ClusterFactoryImplBase::create(
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> ClusterFactoryImplBase::create(
const envoy::api::v2::Cluster& cluster, ClusterManager& cluster_manager, Stats::Store& stats,
ThreadLocal::Instance& tls, Network::DnsResolverSharedPtr dns_resolver,
Ssl::ContextManager& ssl_context_manager, Runtime::Loader& runtime,
Expand Down Expand Up @@ -90,33 +90,34 @@ ClusterFactoryImplBase::selectDnsResolver(const envoy::api::v2::Cluster& cluster
return context.dnsResolver();
}

ClusterSharedPtr ClusterFactoryImplBase::create(const envoy::api::v2::Cluster& cluster,
ClusterFactoryContext& context) {
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>
ClusterFactoryImplBase::create(const envoy::api::v2::Cluster& cluster,
ClusterFactoryContext& context) {

auto stats_scope = generateStatsScope(cluster, context.stats());
Server::Configuration::TransportSocketFactoryContextImpl factory_context(
context.admin(), context.sslContextManager(), *stats_scope, context.clusterManager(),
context.localInfo(), context.dispatcher(), context.random(), context.stats(),
context.singletonManager(), context.tls(), context.api());

ClusterImplBaseSharedPtr new_cluster =
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr> new_cluster_pair =
createClusterImpl(cluster, context, factory_context, std::move(stats_scope));

if (!cluster.health_checks().empty()) {
// TODO(htuch): Need to support multiple health checks in v2.
if (cluster.health_checks().size() != 1) {
throw EnvoyException("Multiple health checks not supported");
} else {
new_cluster->setHealthChecker(HealthCheckerFactory::create(
cluster.health_checks()[0], *new_cluster, context.runtime(), context.random(),
new_cluster_pair.first->setHealthChecker(HealthCheckerFactory::create(
cluster.health_checks()[0], *new_cluster_pair.first, context.runtime(), context.random(),
context.dispatcher(), context.logManager()));
}
}

new_cluster->setOutlierDetector(Outlier::DetectorImplFactory::createForCluster(
*new_cluster, cluster, context.dispatcher(), context.runtime(),
new_cluster_pair.first->setOutlierDetector(Outlier::DetectorImplFactory::createForCluster(
*new_cluster_pair.first, cluster, context.dispatcher(), context.runtime(),
context.outlierEventLogger()));
return new_cluster;
return new_cluster_pair;
}

} // namespace Upstream
Expand Down
12 changes: 6 additions & 6 deletions source/common/upstream/cluster_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ClusterFactoryImplBase : public ClusterFactory {
/**
* Static method to get the registered cluster factory and create an instance of cluster.
*/
static ClusterSharedPtr
static std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>
create(const envoy::api::v2::Cluster& cluster, ClusterManager& cluster_manager,
Stats::Store& stats, ThreadLocal::Instance& tls,
Network::DnsResolverSharedPtr dns_resolver, Ssl::ContextManager& ssl_context_manager,
Expand All @@ -126,8 +126,8 @@ class ClusterFactoryImplBase : public ClusterFactory {
ClusterFactoryContext& context);

// Upstream::ClusterFactory
ClusterSharedPtr create(const envoy::api::v2::Cluster& cluster,
ClusterFactoryContext& context) override;
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>
create(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context) override;
std::string name() override { return name_; }

protected:
Expand All @@ -137,7 +137,7 @@ class ClusterFactoryImplBase : public ClusterFactory {
/**
* Create an instance of ClusterImplBase.
*/
virtual ClusterImplBaseSharedPtr
virtual std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) PURE;
Expand All @@ -161,7 +161,7 @@ template <class ConfigProto> class ConfigurableClusterFactoryBase : public Clust
ConfigurableClusterFactoryBase(const std::string& name) : ClusterFactoryImplBase(name) {}

private:
virtual ClusterImplBaseSharedPtr
virtual std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) override {
Expand All @@ -173,7 +173,7 @@ template <class ConfigProto> class ConfigurableClusterFactoryBase : public Clust
context, socket_factory_context, std::move(stats_scope));
}

virtual ClusterImplBaseSharedPtr createClusterWithConfig(
virtual std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr> createClusterWithConfig(
const envoy::api::v2::Cluster& cluster, const ConfigProto& proto_config,
ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Expand Down
25 changes: 22 additions & 3 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map) {
ClusterSharedPtr new_cluster =
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> new_cluster_pair =
factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
auto& new_cluster = new_cluster_pair.first;
Cluster& cluster_reference = *new_cluster;

if (!added_via_api) {
if (cluster_map.find(new_cluster->info()->name()) != cluster_map.end()) {
Expand All @@ -590,7 +592,21 @@ void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster,
}
}

Cluster& cluster_reference = *new_cluster;
if (cluster_reference.info()->lbType() == LoadBalancerType::ClusterProvided &&
new_cluster_pair.second == nullptr) {
throw EnvoyException(fmt::format("cluster manager: cluster provided LB specified but cluster "
"'{}' did not provide one. Check cluster documentation.",
new_cluster->info()->name()));
}

if (cluster_reference.info()->lbType() != LoadBalancerType::ClusterProvided &&
new_cluster_pair.second != nullptr) {
throw EnvoyException(
fmt::format("cluster manager: cluster provided LB not specified but cluster "
"'{}' provided one. Check cluster documentation.",
new_cluster->info()->name()));
}

if (new_cluster->healthChecker() != nullptr) {
new_cluster->healthChecker()->addHostCheckCompleteCb(
[this](HostSharedPtr host, HealthTransition changed_state) {
Expand Down Expand Up @@ -625,6 +641,8 @@ void ClusterManagerImpl::loadCluster(const envoy::api::v2::Cluster& cluster,
cluster_reference.prioritySet(), cluster_reference.info()->stats(),
cluster_reference.info()->statsScope(), runtime_, random_,
cluster_reference.info()->lbConfig());
} else if (cluster_reference.info()->lbType() == LoadBalancerType::ClusterProvided) {
cluster_entry_it->second->thread_aware_lb_ = std::move(new_cluster_pair.second);
}

updateGauges();
Expand Down Expand Up @@ -1076,6 +1094,7 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
parent.parent_.random_, cluster->lbConfig());
break;
}
case LoadBalancerType::ClusterProvided:
case LoadBalancerType::RingHash:
case LoadBalancerType::Maglev: {
ASSERT(lb_factory_ != nullptr);
Expand Down Expand Up @@ -1223,7 +1242,7 @@ Tcp::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateTcpConnPool(
new Tcp::ConnPoolImpl(dispatcher, host, priority, options, transport_socket_options)};
}

ClusterSharedPtr ProdClusterManagerFactory::clusterFromProto(
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> ProdClusterManagerFactory::clusterFromProto(
const envoy::api::v2::Cluster& cluster, ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) {
return ClusterFactoryImplBase::create(
Expand Down
6 changes: 3 additions & 3 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ class ProdClusterManagerFactory : public ClusterManagerFactory {
ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options,
Network::TransportSocketOptionsSharedPtr transport_socket_options) override;
ClusterSharedPtr clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger,
bool added_via_api) override;
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>
clusterFromProto(const envoy::api::v2::Cluster& cluster, ClusterManager& cm,
Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) override;
CdsApiPtr createCds(const envoy::api::v2::core::ConfigSource& cds_config,
ClusterManager& cm) override;
Secret::SecretManager& secretManager() override { return secret_manager_; }
Expand Down
9 changes: 6 additions & 3 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,19 @@ void EdsClusterImpl::onConfigUpdateFailed(const EnvoyException* e) {
onPreInitComplete();
}

ClusterImplBaseSharedPtr EdsClusterFactory::createClusterImpl(
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
EdsClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
if (!cluster.has_eds_cluster_config()) {
throw EnvoyException("cannot create an EDS cluster without an EDS config");
}

return std::make_unique<EdsClusterImpl>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi());
return std::make_pair(
std::make_shared<EdsClusterImpl>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi()),
nullptr);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class EdsClusterFactory : public ClusterFactoryImplBase {
EdsClusterFactory() : ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().Eds) {}

private:
ClusterImplBaseSharedPtr
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) override;
Expand Down
10 changes: 6 additions & 4 deletions source/common/upstream/logical_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,17 @@ Upstream::Host::CreateConnectionData LogicalDnsCluster::LogicalHost::createConne
shared_from_this(), parent_.symbolTable())}};
}

ClusterImplBaseSharedPtr LogicalDnsClusterFactory::createClusterImpl(
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
LogicalDnsClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
auto selected_dns_resolver = selectDnsResolver(cluster, context);

return std::make_unique<LogicalDnsCluster>(cluster, context.runtime(), selected_dns_resolver,
context.tls(), socket_factory_context,
std::move(stats_scope), context.addedViaApi());
return std::make_pair(std::make_shared<LogicalDnsCluster>(
cluster, context.runtime(), selected_dns_resolver, context.tls(),
socket_factory_context, std::move(stats_scope), context.addedViaApi()),
nullptr);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/logical_dns_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class LogicalDnsClusterFactory : public ClusterFactoryImplBase {
: ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().LogicalDns) {}

private:
ClusterImplBaseSharedPtr
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) override;
Expand Down
12 changes: 9 additions & 3 deletions source/common/upstream/original_dst_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ void OriginalDstCluster::cleanup() {
cleanup_timer_->enableTimer(cleanup_interval_ms_);
}

ClusterImplBaseSharedPtr OriginalDstClusterFactory::createClusterImpl(
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
OriginalDstClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
Expand All @@ -199,8 +200,13 @@ ClusterImplBaseSharedPtr OriginalDstClusterFactory::createClusterImpl(
fmt::format("cluster: cluster type 'original_dst' may not be used with lb_subset_config"));
}

return std::make_unique<OriginalDstCluster>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi());
// TODO(mattklein123): The original DST load balancer type should be deprecated and instead
// the cluster should directly supply the load balancer. This will remove
// a special case and allow this cluster to be compiled out as an extension.
return std::make_pair(
std::make_shared<OriginalDstCluster>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi()),
nullptr);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/original_dst_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class OriginalDstClusterFactory : public ClusterFactoryImplBase {
: ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().OriginalDst) {}

private:
ClusterImplBaseSharedPtr
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) override;
Expand Down
9 changes: 6 additions & 3 deletions source/common/upstream/static_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ void StaticClusterImpl::startPreInit() {
onPreInitComplete();
}

ClusterImplBaseSharedPtr StaticClusterFactory::createClusterImpl(
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
StaticClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
return std::make_unique<StaticClusterImpl>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi());
return std::make_pair(
std::make_shared<StaticClusterImpl>(cluster, context.runtime(), socket_factory_context,
std::move(stats_scope), context.addedViaApi()),
nullptr);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/static_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class StaticClusterFactory : public ClusterFactoryImplBase {
: ClusterFactoryImplBase(Extensions::Clusters::ClusterTypes::get().Static) {}

private:
ClusterImplBaseSharedPtr
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
createClusterImpl(const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) override;
Expand Down
10 changes: 6 additions & 4 deletions source/common/upstream/strict_dns_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,17 @@ void StrictDnsClusterImpl::ResolveTarget::startResolve() {
});
}

ClusterImplBaseSharedPtr StrictDnsClusterFactory::createClusterImpl(
std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>
StrictDnsClusterFactory::createClusterImpl(
const envoy::api::v2::Cluster& cluster, ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContext& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
auto selected_dns_resolver = selectDnsResolver(cluster, context);

return std::make_unique<StrictDnsClusterImpl>(cluster, context.runtime(), selected_dns_resolver,
socket_factory_context, std::move(stats_scope),
context.addedViaApi());
return std::make_pair(std::make_shared<StrictDnsClusterImpl>(
cluster, context.runtime(), selected_dns_resolver,
socket_factory_context, std::move(stats_scope), context.addedViaApi()),
nullptr);
}

/**
Expand Down
Loading