Skip to content
Merged
7 changes: 7 additions & 0 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ class GrpcMux {
*/
virtual void resume(const std::string& type_url) PURE;

// TODO(fredlas) PR #8478 will remove this.
/**
* Whether this GrpcMux is delta.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the end goal for supporting a combination of SOTW and delta? I would imagine that some operators will want both, with let's say CDS delta but LDS SOTW. Can that be done over a single gRPC stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and no and maybe eventually yes, haha. Non-aggregated xDS can already mix and match delta and SotW no problem. When using ADS, because delta and SotW have different request/response protos, and different gRPC services corresponding to those protos, there can only be one type.

However, the change we briefly discussed a while back, where there would be a single top level carrier proto for request+response, and the various types of messages (delta vs SotW, non-ACK request vs ACK) were just different oneof options, would enable mixing within ADS. In practice that would take a bit of plumbing in this code (probably pretty straightforward), and would also need the bootstrap config format to be reworked a bit: rather than just configuring your CDS as "CDS, provided by ADS [implicitly referring to the ADS config]", you would also need to choose delta/SotW in the CDS config thing. And... actually, the choice would be removed from the ADS config thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. OK, let's go ahead with this as is, thanks for the reminder of our earlier conversation on this.

* @return bool whether this GrpcMux is delta.
*/
virtual bool isDelta() const PURE;

// For delta
virtual Watch* addOrUpdateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SubscriptionFactory {
virtual SubscriptionPtr
subscriptionFromConfigSource(const envoy::api::v2::core::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
SubscriptionCallbacks& callbacks, bool is_delta) PURE;
SubscriptionCallbacks& callbacks) PURE;
};

} // namespace Config
Expand Down
2 changes: 1 addition & 1 deletion include/envoy/router/route_config_provider_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RouteConfigProviderManager {
virtual RouteConfigProviderPtr createRdsRouteConfigProvider(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix,
Init::Manager& init_manager, bool is_delta) PURE;
Init::Manager& init_manager) PURE;

/**
* Get a RouteConfigSharedPtr for a statically defined route. Ownership is as described for
Expand Down
6 changes: 2 additions & 4 deletions include/envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ class ListenerComponentFactory {
* @return an LDS API provider.
* @param lds_config supplies the management server configuration.
*/
virtual LdsApiPtr createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config,
bool is_delta) PURE;
virtual LdsApiPtr createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config) PURE;

/**
* Creates a socket.
Expand Down Expand Up @@ -129,8 +128,7 @@ class ListenerManager {
* pieces of the server existing.
* @param lds_config supplies the management server configuration.
*/
virtual void createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config,
bool is_delta) PURE;
virtual void createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config) PURE;

/**
* @return std::vector<std::reference_wrapper<Network::ListenerConfig>> a list of the currently
Expand Down
4 changes: 1 addition & 3 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ class ClusterManager {
virtual Config::SubscriptionFactory& subscriptionFactory() PURE;

virtual std::size_t warmingClusterCount() const PURE;

virtual bool xdsIsDelta() const PURE;
};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand Down Expand Up @@ -298,7 +296,7 @@ class ClusterManagerFactory {
/**
* Create a CDS API provider from configuration proto.
*/
virtual CdsApiPtr createCds(const envoy::api::v2::core::ConfigSource& cds_config, bool is_delta,
virtual CdsApiPtr createCds(const envoy::api::v2::core::ConfigSource& cds_config,
ClusterManager& cm) PURE;

/**
Expand Down
4 changes: 4 additions & 0 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class GrpcMuxImpl : public GrpcMux,
GrpcMuxCallbacks& callbacks) override;

// GrpcMux
// TODO(fredlas) PR #8478 will remove this.
bool isDelta() const override { return false; }
void pause(const std::string& type_url) override;
void resume(const std::string& type_url) override;
bool paused(const std::string& type_url) const override;
Expand Down Expand Up @@ -133,6 +135,8 @@ class NullGrpcMuxImpl : public GrpcMux, GrpcStreamCallbacks<envoy::api::v2::Disc
GrpcMuxCallbacks&) override {
throw EnvoyException("ADS must be configured to support an ADS config source");
}
// TODO(fredlas) PR #8478 will remove this.
bool isDelta() const override { return false; }
void pause(const std::string&) override {}
void resume(const std::string&) override {}
bool paused(const std::string&) const override { return false; }
Expand Down
3 changes: 3 additions & 0 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class NewGrpcMuxImpl : public GrpcMux,
std::chrono::milliseconds init_fetch_timeout) override;
void removeWatch(const std::string& type_url, Watch* watch) override;

// TODO(fredlas) PR #8478 will remove this.
bool isDelta() const override { return true; }

void pause(const std::string& type_url) override;
void resume(const std::string& type_url) override;
bool paused(const std::string& type_url) const override;
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ SubscriptionFactoryImpl::SubscriptionFactoryImpl(

SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
const envoy::api::v2::core::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks, bool is_delta) {
Stats::Scope& scope, SubscriptionCallbacks& callbacks) {
Config::Utility::checkLocalInfo(type_url, local_info_);
std::unique_ptr<Subscription> result;
SubscriptionStats stats = Utility::generateStats(scope);
Expand Down Expand Up @@ -78,7 +78,7 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
break;
}
case envoy::api::v2::core::ConfigSource::kAds: {
if (is_delta) {
if (cm_.adsMux()->isDelta()) {
result = std::make_unique<DeltaSubscriptionImpl>(
cm_.adsMux(), type_url, callbacks, stats,
Utility::configSourceInitialFetchTimeout(config), true);
Expand Down
4 changes: 1 addition & 3 deletions source/common/config/subscription_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ class SubscriptionFactoryImpl : public SubscriptionFactory {
Upstream::ClusterManager& cm, Runtime::RandomGenerator& random,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api);

// TODO(fredlas) remove is_delta once delta and SotW are unified
// Config::SubscriptionFactory
SubscriptionPtr subscriptionFromConfigSource(const envoy::api::v2::core::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
SubscriptionCallbacks& callbacks,
bool is_delta) override;
SubscriptionCallbacks& callbacks) override;

private:
const LocalInfo::LocalInfo& local_info_;
Expand Down
12 changes: 6 additions & 6 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ RouteConfigProviderPtr RouteConfigProviderUtil::create(
const envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager&
config,
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix,
RouteConfigProviderManager& route_config_provider_manager, bool is_delta) {
RouteConfigProviderManager& route_config_provider_manager) {
switch (config.route_specifier_case()) {
case envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager::
kRouteConfig:
return route_config_provider_manager.createStaticRouteConfigProvider(config.route_config(),
factory_context);
case envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager::kRds:
return route_config_provider_manager.createRdsRouteConfigProvider(
config.rds(), factory_context, stat_prefix, factory_context.initManager(), is_delta);
config.rds(), factory_context, stat_prefix, factory_context.initManager());
default:
NOT_REACHED_GCOVR_EXCL_LINE;
}
Expand Down Expand Up @@ -59,7 +59,7 @@ RdsRouteConfigSubscription::RdsRouteConfigSubscription(
const uint64_t manager_identifier, Server::Configuration::ServerFactoryContext& factory_context,
ProtobufMessage::ValidationVisitor& validator, Init::Manager& init_manager,
const std::string& stat_prefix,
Envoy::Router::RouteConfigProviderManagerImpl& route_config_provider_manager, bool is_delta)
Envoy::Router::RouteConfigProviderManagerImpl& route_config_provider_manager)
: route_config_name_(rds.route_config_name()), factory_context_(factory_context),
validator_(validator), init_manager_(init_manager),
init_target_(fmt::format("RdsRouteConfigSubscription {}", route_config_name_),
Expand All @@ -73,7 +73,7 @@ RdsRouteConfigSubscription::RdsRouteConfigSubscription(
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
rds.config_source(),
Grpc::Common::typeUrl(envoy::api::v2::RouteConfiguration().GetDescriptor()->full_name()),
*scope_, *this, is_delta);
*scope_, *this);
config_update_info_ =
std::make_unique<RouteConfigUpdateReceiverImpl>(factory_context.timeSource(), validator_);
}
Expand Down Expand Up @@ -230,7 +230,7 @@ RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& ad
Router::RouteConfigProviderPtr RouteConfigProviderManagerImpl::createRdsRouteConfigProvider(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix,
Init::Manager& init_manager, bool is_delta) {
Init::Manager& init_manager) {
// RdsRouteConfigSubscriptions are unique based on their serialized RDS config.
const uint64_t manager_identifier = MessageUtil::hash(rds);
auto& server_factory_context = factory_context.getServerFactoryContext();
Expand All @@ -244,7 +244,7 @@ Router::RouteConfigProviderPtr RouteConfigProviderManagerImpl::createRdsRouteCon
// of simplicity.
subscription.reset(new RdsRouteConfigSubscription(
rds, manager_identifier, server_factory_context, factory_context.messageValidationVisitor(),
init_manager, stat_prefix, *this, is_delta));
init_manager, stat_prefix, *this));
init_manager.add(subscription->init_target_);
route_config_subscriptions_.insert({manager_identifier, subscription});
} else {
Expand Down
8 changes: 4 additions & 4 deletions source/common/router/rds_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RouteConfigProviderUtil {
create(const envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager&
config,
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix,
RouteConfigProviderManager& route_config_provider_manager, bool is_delta);
RouteConfigProviderManager& route_config_provider_manager);
};

class RouteConfigProviderManagerImpl;
Expand Down Expand Up @@ -134,8 +134,8 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks,
const uint64_t manager_identifier,
Server::Configuration::ServerFactoryContext& factory_context,
ProtobufMessage::ValidationVisitor& validator, Init::Manager& init_manager,
const std::string& stat_prefix, RouteConfigProviderManagerImpl& route_config_provider_manager,
bool is_delta);
const std::string& stat_prefix,
RouteConfigProviderManagerImpl& route_config_provider_manager);

bool validateUpdateSize(int num_resources);

Expand Down Expand Up @@ -213,7 +213,7 @@ class RouteConfigProviderManagerImpl : public RouteConfigProviderManager,
RouteConfigProviderPtr createRdsRouteConfigProvider(
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds,
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix,
Init::Manager& init_manager, bool is_delta) override;
Init::Manager& init_manager) override;

RouteConfigProviderPtr
createStaticRouteConfigProvider(const envoy::api::v2::RouteConfiguration& route_config,
Expand Down
4 changes: 2 additions & 2 deletions source/common/router/scoped_rds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ ScopedRdsConfigSubscription::ScopedRdsConfigSubscription(
scoped_rds.scoped_rds_config_source(),
Grpc::Common::typeUrl(
envoy::api::v2::ScopedRouteConfiguration().GetDescriptor()->full_name()),
*scope_, *this, true);
*scope_, *this);

initialize([scope_key_builder]() -> Envoy::Config::ConfigProvider::ConfigConstSharedPtr {
return std::make_shared<ScopedConfigImpl>(
Expand All @@ -115,7 +115,7 @@ ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProvide
route_provider_(static_cast<RdsRouteConfigProviderImpl*>(
parent_.route_config_provider_manager_
.createRdsRouteConfigProvider(rds, parent_.factory_context_, parent_.stat_prefix_,
init_manager, true)
init_manager)
.release())),
rds_update_callback_handle_(route_provider_->subscription().addUpdateCallback([this]() {
// Subscribe to RDS update.
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/vhds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ VhdsSubscription::VhdsSubscription(RouteConfigUpdatePtr& config_update_info,
factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
config_update_info_->routeConfiguration().vhds().config_source(),
Grpc::Common::typeUrl(envoy::api::v2::route::VirtualHost().GetDescriptor()->full_name()),
*scope_, *this, /*is_delta=*/true);
*scope_, *this);
}

void VhdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
Expand Down
2 changes: 1 addition & 1 deletion source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ void RtdsSubscription::start() {
subscription_ = parent_.cm_->subscriptionFactory().subscriptionFromConfigSource(
config_source_,
Grpc::Common::typeUrl(envoy::service::discovery::v2::Runtime().GetDescriptor()->full_name()),
store_, *this, /*is_delta=*/false);
store_, *this);
subscription_->start({resource_name_});
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/secret/sds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void SdsApi::initialize() {
subscription_ = subscription_factory_.subscriptionFromConfigSource(
sds_config_,
Grpc::Common::typeUrl(envoy::api::v2::auth::Secret().GetDescriptor()->full_name()), stats_,
*this, /*is_delta=*/false);
*this);
subscription_->start({sds_config_name_});
}

Expand Down
13 changes: 5 additions & 8 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@
namespace Envoy {
namespace Upstream {

// TODO(fredlas) the is_delta argument can be removed upon delta+SotW ADS Envoy code unification. It
// is only actually needed to choose the grpc_method, which is irrelevant if ADS is used.
CdsApiPtr CdsApiImpl::create(const envoy::api::v2::core::ConfigSource& cds_config, bool is_delta,
CdsApiPtr CdsApiImpl::create(const envoy::api::v2::core::ConfigSource& cds_config,
ClusterManager& cm, Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validation_visitor) {
return CdsApiPtr{new CdsApiImpl(cds_config, is_delta, cm, scope, validation_visitor)};
return CdsApiPtr{new CdsApiImpl(cds_config, cm, scope, validation_visitor)};
}

CdsApiImpl::CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config, bool is_delta,
ClusterManager& cm, Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validation_visitor)
CdsApiImpl::CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config, ClusterManager& cm,
Stats::Scope& scope, ProtobufMessage::ValidationVisitor& validation_visitor)
: cm_(cm), scope_(scope.createScope("cluster_manager.cds.")),
validation_visitor_(validation_visitor) {
subscription_ = cm_.subscriptionFactory().subscriptionFromConfigSource(
cds_config, Grpc::Common::typeUrl(envoy::api::v2::Cluster().GetDescriptor()->full_name()),
*scope_, *this, is_delta);
*scope_, *this);
}

void CdsApiImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down
9 changes: 4 additions & 5 deletions source/common/upstream/cds_api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ class CdsApiImpl : public CdsApi,
Config::SubscriptionCallbacks,
Logger::Loggable<Logger::Id::upstream> {
public:
static CdsApiPtr create(const envoy::api::v2::core::ConfigSource& cds_config, bool is_delta,
ClusterManager& cm, Stats::Scope& scope,
static CdsApiPtr create(const envoy::api::v2::core::ConfigSource& cds_config, ClusterManager& cm,
Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validation_visitor);

// Upstream::CdsApi
Expand All @@ -46,9 +46,8 @@ class CdsApiImpl : public CdsApi,
return MessageUtil::anyConvert<envoy::api::v2::Cluster>(resource).name();
}

CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config, bool is_delta,
ClusterManager& cm, Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validation_visitor);
CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config, ClusterManager& cm,
Stats::Scope& scope, ProtobufMessage::ValidationVisitor& validation_visitor);
void runInitializeCallbackIfAny();

ClusterManager& cm_;
Expand Down
32 changes: 3 additions & 29 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,32 +221,7 @@ ClusterManagerImpl::ClusterManagerImpl(
}
}

// TODO(fredlas) HACK to support
// loadCluster->clusterFromProto->ClusterFactoryImplBase::create->EdsClusterFactory::createClusterImpl(),
// which wants to call xdsIsDelta() on us. So, we need to get our xds_is_delta_ defined before
// then. Once SotW and delta are unified, that is_delta bool will be gone from everywhere, and the
// xds_is_delta_ variable can be removed.
const auto& dyn_resources = bootstrap.dynamic_resources();
if (dyn_resources.has_ads_config()) {
xds_is_delta_ =
dyn_resources.ads_config().api_type() == envoy::api::v2::core::ApiConfigSource::DELTA_GRPC;
} else if (dyn_resources.has_cds_config()) {
const auto& cds_config = dyn_resources.cds_config();
xds_is_delta_ =
cds_config.api_config_source().api_type() ==
envoy::api::v2::core::ApiConfigSource::DELTA_GRPC ||
(dyn_resources.has_ads_config() && dyn_resources.ads_config().api_type() ==
envoy::api::v2::core::ApiConfigSource::DELTA_GRPC);
} else if (dyn_resources.has_lds_config()) {
const auto& lds_config = dyn_resources.lds_config();
xds_is_delta_ =
lds_config.api_config_source().api_type() ==
envoy::api::v2::core::ApiConfigSource::DELTA_GRPC ||
(dyn_resources.has_ads_config() && dyn_resources.ads_config().api_type() ==
envoy::api::v2::core::ApiConfigSource::DELTA_GRPC);
} else {
xds_is_delta_ = false;
}

// Cluster loading happens in two phases: first all the primary clusters are loaded, and then all
// the secondary clusters are loaded. As it currently stands all non-EDS clusters are primary and
Expand Down Expand Up @@ -326,7 +301,7 @@ ClusterManagerImpl::ClusterManagerImpl(

// We can now potentially create the CDS API once the backing cluster exists.
if (dyn_resources.has_cds_config()) {
cds_api_ = factory_.createCds(dyn_resources.cds_config(), xds_is_delta_, *this);
cds_api_ = factory_.createCds(dyn_resources.cds_config(), *this);
init_helper_.setCds(cds_api_.get());
} else {
init_helper_.setCds(nullptr);
Expand Down Expand Up @@ -1369,10 +1344,9 @@ std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> ProdClusterManagerFactor
}

CdsApiPtr ProdClusterManagerFactory::createCds(const envoy::api::v2::core::ConfigSource& cds_config,
bool is_delta, ClusterManager& cm) {
ClusterManager& cm) {
// TODO(htuch): Differentiate static vs. dynamic validation visitors.
return CdsApiImpl::create(cds_config, is_delta, cm, stats_,
validation_context_.dynamicValidationVisitor());
return CdsApiImpl::create(cds_config, cm, stats_, validation_context_.dynamicValidationVisitor());
}

} // namespace Upstream
Expand Down
Loading