Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class SubscriptionCallbacks {
virtual ~SubscriptionCallbacks() = default;

/**
* Called when a configuration update is received.
* Called when a state-of-the-world configuration update is received. (State-of-the-world is
* everything other than delta gRPC - filesystem, HTTP, non-delta gRPC).
* @param resources vector of fetched resources corresponding to the configuration update.
* @param version_info supplies the version information as supplied by the xDS discovery response.
* @throw EnvoyException with reason if the configuration is rejected. Otherwise the configuration
Expand All @@ -28,9 +29,6 @@ class SubscriptionCallbacks {
virtual void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) PURE;

// TODO(fredlas) it is a HACK that there are two of these. After delta CDS is merged,
// I intend to reimplement all state-of-the-world xDSes' use of onConfigUpdate
// in terms of this delta-style one (and remove the original).
/**
* Called when a delta configuration update is received.
* @param added_resources resources newly added since the previous fetch.
Expand Down
34 changes: 26 additions & 8 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,9 @@ void RdsRouteConfigSubscription::onConfigUpdate(
const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) {
last_updated_ = time_source_.systemTime();

if (resources.empty()) {
ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_);
stats_.update_empty_.inc();
init_target_.ready();
if (!validateUpdateSize(resources.size())) {
Comment thread
fredlas marked this conversation as resolved.
return;
}
if (resources.size() != 1) {
throw EnvoyException(fmt::format("Unexpected RDS resource length: {}", resources.size()));
}
auto route_config = MessageUtil::anyConvert<envoy::api::v2::RouteConfiguration>(resources[0]);
MessageUtil::validate(route_config);
// TODO(PiotrSikora): Remove this hack once fixed internally.
Expand All @@ -126,12 +119,37 @@ void RdsRouteConfigSubscription::onConfigUpdate(
init_target_.ready();
}

void RdsRouteConfigSubscription::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& resources,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) {
if (!validateUpdateSize(resources.size())) {
return;
}
Protobuf::RepeatedPtrField<ProtobufWkt::Any> unwrapped_resource;
*unwrapped_resource.Add() = resources[0].resource();
onConfigUpdate(unwrapped_resource, resources[0].version());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the way it works today is that we only get an RdsRouteConfigSubscription::onConfigUpdate update if the resource is present in the RDS response. If it's absent, no change is made to the configuration. Initially it's empty, and we warm listeners on first response. We may have an empty config though if the update failed.

In the delta world, we can add resources as they arrive. If we get a removal request, what do we do? Should we be resetting the configuration back to empty? The semantics are a bit different here to SOTW, since with SOTW, we are cool with missing config implying latching, here we have an explicit removal operation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess would be to have a removal request reset the configuration, since RDS is one of the types that does have a "name" to it. If the server wanted to update, it should just send an updated version. If it's sending an update explicitly naming the resource as being gone, it makes sense to get rid of that resource. Although, it sounds like when the RDS client gets started, it already has a name in mind, and only that name is valid. So removing the config should be more like shutting down.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to address the code changes for this in this PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I ended up logging an error and ignoring the remove request. It seems like it basically doesn't make sense for the server to even try a removal here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this becomes more interesting with on-demand, where you have the possibility the management server decides to remove a resource and have the client lazily fetch (and maybe recreate it on that path). Since we don't support that, I think it's fine to error out for now, but I'd like to make this a TODO and link to an on-demand tracking issue (do we have one?) so that folks know this is subject to change in terms of behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, yes. Left a TODO. Looks like there is #2500.

}

void RdsRouteConfigSubscription::onConfigUpdateFailed(const EnvoyException*) {
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
}

bool RdsRouteConfigSubscription::validateUpdateSize(int num_resources) {
if (num_resources == 0) {
ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_);
stats_.update_empty_.inc();
init_target_.ready();
return false;
}
if (num_resources != 1) {
throw EnvoyException(fmt::format("Unexpected RDS resource length: {}", num_resources));
// (would be a return false here)
}
return true;
}

RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl(
RdsRouteConfigSubscriptionSharedPtr&& subscription,
Server::Configuration::FactoryContext& factory_context)
Expand Down
9 changes: 4 additions & 5 deletions source/common/router/rds_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,10 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks,
~RdsRouteConfigSubscription() override;

// Config::SubscriptionCallbacks
// TODO(fredlas) deduplicate
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>&,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& resources,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
std::string resourceName(const ProtobufWkt::Any& resource) override {
return MessageUtil::anyConvert<envoy::api::v2::RouteConfiguration>(resource).name();
Expand All @@ -123,6 +120,8 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks,
const std::string& stat_prefix,
RouteConfigProviderManagerImpl& route_config_provider_manager);

bool validateUpdateSize(int num_resources);

std::unique_ptr<Envoy::Config::Subscription> subscription_;
const std::string route_config_name_;
Init::TargetImpl init_target_;
Expand Down
26 changes: 19 additions & 7 deletions source/common/secret/sds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ SdsApi::SdsApi(const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispat

void SdsApi::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string&) {
if (resources.empty()) {
throw EnvoyException(
fmt::format("Missing SDS resources for {} in onConfigUpdate()", sds_config_name_));
}
if (resources.size() != 1) {
throw EnvoyException(fmt::format("Unexpected SDS secrets length: {}", resources.size()));
}
validateUpdateSize(resources.size());
auto secret = MessageUtil::anyConvert<envoy::api::v2::auth::Secret>(resources[0]);
MessageUtil::validate(secret);

Expand All @@ -59,11 +53,29 @@ void SdsApi::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>&
init_target_.ready();
}

void SdsApi::onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& resources,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) {
validateUpdateSize(resources.size());
Protobuf::RepeatedPtrField<ProtobufWkt::Any> unwrapped_resource;
*unwrapped_resource.Add() = resources[0].resource();
onConfigUpdate(unwrapped_resource, resources[0].version());
}

void SdsApi::onConfigUpdateFailed(const EnvoyException*) {
// We need to allow server startup to continue, even if we have a bad config.
init_target_.ready();
}

void SdsApi::validateUpdateSize(int num_resources) {
if (num_resources == 0) {
throw EnvoyException(
fmt::format("Missing SDS resources for {} in onConfigUpdate()", sds_config_name_));
}
if (num_resources != 1) {
throw EnvoyException(fmt::format("Unexpected SDS secrets length: {}", num_resources));
}
}

void SdsApi::initialize() {
subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource(
sds_config_, local_info_, dispatcher_, cluster_manager_, random_, stats_,
Expand Down
6 changes: 2 additions & 4 deletions source/common/secret/sds_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ class SdsApi : public Config::SubscriptionCallbacks {
std::function<void()> destructor_cb, Api::Api& api);

// Config::SubscriptionCallbacks
// TODO(fredlas) deduplicate
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>&,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
std::string resourceName(const ProtobufWkt::Any& resource) override {
return MessageUtil::anyConvert<envoy::api::v2::auth::Secret>(resource).name();
Expand All @@ -56,6 +53,7 @@ class SdsApi : public Config::SubscriptionCallbacks {
Common::CallbackManager<> update_callback_manager_;

private:
void validateUpdateSize(int num_resources);
void initialize();
Init::TargetImpl init_target_;
const LocalInfo::LocalInfo& local_info_;
Expand Down
7 changes: 6 additions & 1 deletion source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ void CdsApiImpl::onConfigUpdate(

std::vector<std::string> exception_msgs;
std::unordered_set<std::string> cluster_names;
bool any_applied = false;
for (const auto& resource : added_resources) {
envoy::api::v2::Cluster cluster;
try {
cluster = MessageUtil::anyConvert<envoy::api::v2::Cluster>(resource.resource());
MessageUtil::validate(cluster);
if (!cluster_names.insert(cluster.name()).second) {
// NOTE: at this point, the first of these duplicates has already been successfully applied.
throw EnvoyException(fmt::format("duplicate cluster {} found", cluster.name()));
}
if (cm_.addOrUpdateCluster(
Expand Down Expand Up @@ -107,6 +109,7 @@ void CdsApiImpl::onConfigUpdate(
cm_.adsMux().resume(Config::TypeUrl::get().Cluster);
}
})) {
any_applied = true;
ENVOY_LOG(debug, "cds: add/update cluster '{}'", cluster.name());
}
} catch (const EnvoyException& e) {
Expand All @@ -119,12 +122,14 @@ void CdsApiImpl::onConfigUpdate(
}
}

if (any_applied) {
system_version_info_ = system_version_info;
Comment thread
htuch marked this conversation as resolved.
}
runInitializeCallbackIfAny();
if (!exception_msgs.empty()) {
throw EnvoyException(
fmt::format("Error adding/updating cluster(s) {}", StringUtil::join(exception_msgs, ", ")));
}
system_version_info_ = system_version_info;
}

void CdsApiImpl::onConfigUpdateFailed(const EnvoyException*) {
Expand Down
6 changes: 3 additions & 3 deletions source/common/upstream/cds_api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class CdsApiImpl : public CdsApi,
const std::string versionInfo() const override { return system_version_info_; }

// Config::SubscriptionCallbacks
// TODO(fredlas) deduplicate
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>&,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
std::string resourceName(const ProtobufWkt::Any& resource) override {
return MessageUtil::anyConvert<envoy::api::v2::Cluster>(resource).name();
Expand Down
31 changes: 24 additions & 7 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,9 @@ void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& h

void EdsClusterImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string&) {
if (resources.empty()) {
ENVOY_LOG(debug, "Missing ClusterLoadAssignment for {} in onConfigUpdate()", cluster_name_);
info_->stats().update_empty_.inc();
onPreInitComplete();
if (!validateUpdateSize(resources.size())) {
return;
}
if (resources.size() != 1) {
throw EnvoyException(fmt::format("Unexpected EDS resource length: {}", resources.size()));
}
auto cluster_load_assignment =
MessageUtil::anyConvert<envoy::api::v2::ClusterLoadAssignment>(resources[0]);
MessageUtil::validate(cluster_load_assignment);
Expand All @@ -136,6 +130,29 @@ void EdsClusterImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt
priority_set_.batchHostUpdate(helper);
}

void EdsClusterImpl::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& resources,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) {
validateUpdateSize(resources.size());
Protobuf::RepeatedPtrField<ProtobufWkt::Any> unwrapped_resource;
*unwrapped_resource.Add() = resources[0].resource();
onConfigUpdate(unwrapped_resource, resources[0].version());
}

bool EdsClusterImpl::validateUpdateSize(int num_resources) {
if (num_resources == 0) {
ENVOY_LOG(debug, "Missing ClusterLoadAssignment for {} in onConfigUpdate()", cluster_name_);
info_->stats().update_empty_.inc();
onPreInitComplete();
return false;
}
if (num_resources != 1) {
throw EnvoyException(fmt::format("Unexpected EDS resource length: {}", num_resources));
// (would be a return false here)
}
return true;
}

void EdsClusterImpl::onAssignmentTimeout() {
// We can no longer use the assignments, remove them.
// TODO(vishalpowar) This is not going to work for incremental updates, and we
Expand Down
6 changes: 2 additions & 4 deletions source/common/upstream/eds.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba
InitializePhase initializePhase() const override { return InitializePhase::Secondary; }

// Config::SubscriptionCallbacks
// TODO(fredlas) deduplicate
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override;
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>&,
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
const Protobuf::RepeatedPtrField<std::string>&, const std::string&) override;
void onConfigUpdateFailed(const EnvoyException* e) override;
std::string resourceName(const ProtobufWkt::Any& resource) override {
return MessageUtil::anyConvert<envoy::api::v2::ClusterLoadAssignment>(resource).cluster_name();
Expand All @@ -50,6 +47,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba
LocalityWeightsMap& new_locality_weights_map,
PriorityStateManager& priority_state_manager,
std::unordered_map<std::string, HostSharedPtr>& updated_hosts);
bool validateUpdateSize(int num_resources);

// ClusterImplBase
void reloadHealthyHostsHelper(const HostSharedPtr& host) override;
Expand Down
Loading