Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class SubscriptionCallbacks {
virtual ~SubscriptionCallbacks() = default;

/**
* Called when a configuration update is received.
* Called when a state-of-the-world configuration update is received. (State-of-the-world is
* everything other than delta gRPC - filesystem, HTTP, non-delta gRPC).
* @param resources vector of fetched resources corresponding to the configuration update.
* @param version_info supplies the version information as supplied by the xDS discovery response.
* @throw EnvoyException with reason if the configuration is rejected. Otherwise the configuration
Expand All @@ -28,9 +29,6 @@ class SubscriptionCallbacks {
virtual void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) PURE;

// TODO(fredlas) it is a HACK that there are two of these. After delta CDS is merged,
// I intend to reimplement all state-of-the-world xDSes' use of onConfigUpdate
// in terms of this delta-style one (and remove the original).
/**
* Called when a delta configuration update is received.
* @param added_resources resources newly added since the previous fetch.
Expand Down
44 changes: 37 additions & 7 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,9 @@ RdsRouteConfigSubscription::~RdsRouteConfigSubscription() {
void RdsRouteConfigSubscription::onConfigUpdate(
const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) {
if (resources.empty()) {
ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_);
stats_.update_empty_.inc();
init_target_.ready();
if (!validateUpdateSize(resources.size())) {
return;
}
if (resources.size() != 1) {
throw EnvoyException(fmt::format("Unexpected RDS resource length: {}", resources.size()));
}
auto route_config = MessageUtil::anyConvert<envoy::api::v2::RouteConfiguration>(resources[0]);
MessageUtil::validate(route_config);
if (route_config.name() != route_config_name_) {
Expand Down Expand Up @@ -131,12 +125,48 @@ void RdsRouteConfigSubscription::onConfigUpdate(
init_target_.ready();
}

void RdsRouteConfigSubscription::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) {
if (!removed_resources.empty()) {
// TODO(#2500) when on-demand resource loading is supported, an RDS removal may make sense (see
// discussion in #6879), and so we should do something other than ignoring here.
ENVOY_LOG(
error,
"Server sent a delta RDS update attempting to remove a resource (name: {}). Ignoring.",
removed_resources[0]);
}
Protobuf::RepeatedPtrField<ProtobufWkt::Any> unwrapped_resource;
if (!added_resources.empty()) {
*unwrapped_resource.Add() = added_resources[0].resource();
onConfigUpdate(unwrapped_resource, added_resources[0].version());
} else {
onConfigUpdate({}, system_version_info);
return;
}
}

void RdsRouteConfigSubscription::onConfigUpdateFailed(const EnvoyException*) {
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
}

bool RdsRouteConfigSubscription::validateUpdateSize(int num_resources) {
if (num_resources == 0) {
ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_);
stats_.update_empty_.inc();
init_target_.ready();
return false;
}
if (num_resources != 1) {
throw EnvoyException(fmt::format("Unexpected RDS resource length: {}", num_resources));
// (would be a return false here)
}
return true;
}

RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl(
RdsRouteConfigSubscriptionSharedPtr&& subscription,
Server::Configuration::FactoryContext& factory_context)
Expand Down
10 changes: 5 additions & 5 deletions source/common/router/rds_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,11 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks,
RouteConfigUpdatePtr& routeConfigUpdate() { return config_update_info_; }

// Config::SubscriptionCallbacks
// TODO(fredlas) deduplicate
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>&,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string&) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
std::string resourceName(const ProtobufWkt::Any& resource) override {
return MessageUtil::anyConvert<envoy::api::v2::RouteConfiguration>(resource).name();
Expand All @@ -128,6 +126,8 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks,
const std::string& stat_prefix,
RouteConfigProviderManagerImpl& route_config_provider_manager);

bool validateUpdateSize(int num_resources);

std::unique_ptr<Envoy::Config::Subscription> subscription_;
const std::string route_config_name_;
Server::Configuration::FactoryContext& factory_context_;
Expand Down
26 changes: 19 additions & 7 deletions source/common/secret/sds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ SdsApi::SdsApi(const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispat

void SdsApi::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string&) {
if (resources.empty()) {
throw EnvoyException(
fmt::format("Missing SDS resources for {} in onConfigUpdate()", sds_config_name_));
}
if (resources.size() != 1) {
throw EnvoyException(fmt::format("Unexpected SDS secrets length: {}", resources.size()));
}
validateUpdateSize(resources.size());
auto secret = MessageUtil::anyConvert<envoy::api::v2::auth::Secret>(resources[0]);
MessageUtil::validate(secret);

Expand All @@ -57,11 +51,29 @@ void SdsApi::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>&
init_target_.ready();
}

void SdsApi::onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& resources,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) {
validateUpdateSize(resources.size());
Protobuf::RepeatedPtrField<ProtobufWkt::Any> unwrapped_resource;
*unwrapped_resource.Add() = resources[0].resource();
onConfigUpdate(unwrapped_resource, resources[0].version());
}

void SdsApi::onConfigUpdateFailed(const EnvoyException*) {
// We need to allow server startup to continue, even if we have a bad config.
init_target_.ready();
}

void SdsApi::validateUpdateSize(int num_resources) {
if (num_resources == 0) {
throw EnvoyException(
fmt::format("Missing SDS resources for {} in onConfigUpdate()", sds_config_name_));
}
if (num_resources != 1) {
throw EnvoyException(fmt::format("Unexpected SDS secrets length: {}", num_resources));
}
}

void SdsApi::initialize() {
subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource(
sds_config_, local_info_, dispatcher_, cluster_manager_, random_, stats_,
Expand Down
6 changes: 2 additions & 4 deletions source/common/secret/sds_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ class SdsApi : public Config::SubscriptionCallbacks {
std::function<void()> destructor_cb, Api::Api& api);

// Config::SubscriptionCallbacks
// TODO(fredlas) deduplicate
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>&,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
std::string resourceName(const ProtobufWkt::Any& resource) override {
return MessageUtil::anyConvert<envoy::api::v2::auth::Secret>(resource).name();
Expand All @@ -56,6 +53,7 @@ class SdsApi : public Config::SubscriptionCallbacks {
Common::CallbackManager<> update_callback_manager_;

private:
void validateUpdateSize(int num_resources);
void initialize();
Init::TargetImpl init_target_;
const LocalInfo::LocalInfo& local_info_;
Expand Down
7 changes: 6 additions & 1 deletion source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ void CdsApiImpl::onConfigUpdate(

std::vector<std::string> exception_msgs;
std::unordered_set<std::string> cluster_names;
bool any_applied = false;
for (const auto& resource : added_resources) {
envoy::api::v2::Cluster cluster;
try {
cluster = MessageUtil::anyConvert<envoy::api::v2::Cluster>(resource.resource());
MessageUtil::validate(cluster);
if (!cluster_names.insert(cluster.name()).second) {
// NOTE: at this point, the first of these duplicates has already been successfully applied.
throw EnvoyException(fmt::format("duplicate cluster {} found", cluster.name()));
}
if (cm_.addOrUpdateCluster(
Expand Down Expand Up @@ -107,6 +109,7 @@ void CdsApiImpl::onConfigUpdate(
cm_.adsMux().resume(Config::TypeUrl::get().Cluster);
}
})) {
any_applied = true;
ENVOY_LOG(debug, "cds: add/update cluster '{}'", cluster.name());
}
} catch (const EnvoyException& e) {
Expand All @@ -119,12 +122,14 @@ void CdsApiImpl::onConfigUpdate(
}
}

if (any_applied) {
system_version_info_ = system_version_info;
}
runInitializeCallbackIfAny();
if (!exception_msgs.empty()) {
throw EnvoyException(
fmt::format("Error adding/updating cluster(s) {}", StringUtil::join(exception_msgs, ", ")));
}
system_version_info_ = system_version_info;
}

void CdsApiImpl::onConfigUpdateFailed(const EnvoyException*) {
Expand Down
6 changes: 3 additions & 3 deletions source/common/upstream/cds_api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class CdsApiImpl : public CdsApi,
const std::string versionInfo() const override { return system_version_info_; }

// Config::SubscriptionCallbacks
// TODO(fredlas) deduplicate
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>&,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
std::string resourceName(const ProtobufWkt::Any& resource) override {
return MessageUtil::anyConvert<envoy::api::v2::Cluster>(resource).name();
Expand Down
31 changes: 24 additions & 7 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,9 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h

void EdsClusterImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string&) {
if (resources.empty()) {
ENVOY_LOG(debug, "Missing ClusterLoadAssignment for {} in onConfigUpdate()", cluster_name_);
info_->stats().update_empty_.inc();
onPreInitComplete();
if (!validateUpdateSize(resources.size())) {
return;
}
if (resources.size() != 1) {
throw EnvoyException(fmt::format("Unexpected EDS resource length: {}", resources.size()));
}
auto cluster_load_assignment =
MessageUtil::anyConvert<envoy::api::v2::ClusterLoadAssignment>(resources[0]);
MessageUtil::validate(cluster_load_assignment);
Expand All @@ -135,6 +129,29 @@ void EdsClusterImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt
priority_set_.batchHostUpdate(helper);
}

void EdsClusterImpl::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& resources,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) {
validateUpdateSize(resources.size());
Protobuf::RepeatedPtrField<ProtobufWkt::Any> unwrapped_resource;
*unwrapped_resource.Add() = resources[0].resource();
onConfigUpdate(unwrapped_resource, resources[0].version());
}

bool EdsClusterImpl::validateUpdateSize(int num_resources) {
if (num_resources == 0) {
ENVOY_LOG(debug, "Missing ClusterLoadAssignment for {} in onConfigUpdate()", cluster_name_);
info_->stats().update_empty_.inc();
onPreInitComplete();
return false;
}
if (num_resources != 1) {
throw EnvoyException(fmt::format("Unexpected EDS resource length: {}", num_resources));
// (would be a return false here)
}
return true;
}

void EdsClusterImpl::onAssignmentTimeout() {
// We can no longer use the assignments, remove them.
// TODO(vishalpowar) This is not going to work for incremental updates, and we
Expand Down
6 changes: 2 additions & 4 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba
InitializePhase initializePhase() const override { return InitializePhase::Secondary; }

// Config::SubscriptionCallbacks
// TODO(fredlas) deduplicate
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>&,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
std::string resourceName(const ProtobufWkt::Any& resource) override {
return MessageUtil::anyConvert<envoy::api::v2::ClusterLoadAssignment>(resource).cluster_name();
Expand All @@ -50,6 +47,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba
LocalityWeightsMap& new_locality_weights_map,
PriorityStateManager& priority_state_manager,
std::unordered_map<std::string, HostSharedPtr>& updated_hosts);
bool validateUpdateSize(int num_resources);

// ClusterImplBase
void reloadHealthyHostsHelper(const HostSharedPtr& host) override;
Expand Down
Loading