Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions api/XDS_PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ client spontaneously requests the "wc" resource.
![Incremental session example](diagrams/incremental.svg)

On reconnect the Incremental xDS client may tell the server of its known
resources to avoid resending them over the network.
resources to avoid resending them over the network. Because no state is assumed to be preserved from the previous stream, the reconnecting client must provide the server with all resource names it is interested in.
Comment thread
fredlas marked this conversation as resolved.
Outdated

![Incremental reconnect example](diagrams/incremental-reconnect.svg)

Expand All @@ -358,17 +358,24 @@ identified by the alias field in the resource of a `DeltaDiscoveryResponse`. The
be returned in the name field in the resource of a `DeltaDiscoveryResponse`.

#### Subscribing to Resources
Envoy can send either an alias or the name of a resource in the `resource_names_subscribe` field of
a `DeltaDiscoveryRequest` in order to subscribe to a resource. Envoy should check both the names and
aliases of resources in order to determine whether the entity in question has been subscribed to.
The client can send either an alias or the name of a resource in the `resource_names_subscribe`
field of a `DeltaDiscoveryRequest` in order to subscribe to a resource. Both the names and aliases
of resources should be checked in order to determine whether the entity in question has been
subscribed to.

A `resource_names_subscribe` field may contain resource names that the server believes the client
is already subscribed to, and furthermore has the most recent versions of. However, the server
*must* still provide those resources in the response; due to implementation details hidden from
the server, the client may have "forgotten" those resources despite apparently remaining subscribed.

#### Unsubscribing from Resources
Envoy will keep track of a per resource reference count internally. This count will keep track of the
total number of aliases/resource names that are currently subscribed to. When the reference count
reaches zero, Envoy will send a `DeltaDiscoveryRequest` containing the resource name of the resource
to unsubscribe from in the `resource_names_unsubscribe` field. When Envoy unsubscribes from a resource,
it should check for both the resource name and all aliases and appropriately update all resources
that reference either.
When a client loses interest in some resources, it will indicate that with the
`resource_names_unsubscribe` field of a `DeltaDiscoveryRequest`. As with `resource_names_subscribe`,
these may be resource names or aliases.

A `resource_names_unsubscribe` field may contain superfluous resource names, which the server
thought the client was already not subscribed to. The server must cleanly process such a request;
it can simply ignore these phantom unsubscriptions.

## REST-JSON polling subscriptions

Expand Down
2 changes: 1 addition & 1 deletion include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class Subscription {
* Update the resources to fetch.
* @param resources vector of resource names to fetch.
*/
virtual void updateResources(const std::vector<std::string>& resources) PURE;
virtual void updateResources(std::vector<std::string> resource_names) PURE;
};

/**
Expand Down
192 changes: 97 additions & 95 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include <queue>

#include "envoy/api/v2/discovery.pb.h"
#include "envoy/common/token_bucket.h"
#include "envoy/config/subscription.h"
Expand All @@ -20,11 +18,6 @@
namespace Envoy {
namespace Config {

struct ResourceNameDiff {
std::vector<std::string> added_;
std::vector<std::string> removed_;
};

/**
* Manages the logic of a (non-aggregated) delta xDS subscription.
* TODO(fredlas) add aggregation support. The plan is for that to happen in XdsGrpcContext,
Expand All @@ -48,51 +41,42 @@ class DeltaSubscriptionImpl : public Subscription,
request_.mutable_node()->MergeFrom(local_info_.node());
}

// Enqueues and attempts to send a discovery request, (un)subscribing to resources missing from /
// added to the passed 'resources' argument, relative to resource_versions_.
void buildAndQueueDiscoveryRequest(const std::vector<std::string>& resources) {
ResourceNameDiff diff;
std::set_difference(resources.begin(), resources.end(), resource_names_.begin(),
resource_names_.end(), std::inserter(diff.added_, diff.added_.begin()));
std::set_difference(resource_names_.begin(), resource_names_.end(), resources.begin(),
resources.end(), std::inserter(diff.removed_, diff.removed_.begin()));

for (const auto& added : diff.added_) {
setResourceWaitingForServer(added);
void updateResources(std::vector<std::string> resource_names) override {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the calling code looks like, but would it be possible to have this parameter be a set instead? That way you'd avoid having to pass by value and sorting

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely makes sense, and it doesn't look like any of the implementors would run into problems with this change. However, it is a pretty far reaching change, and there appears to be some tests I'll need to fix. Working on it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. This uncovered an issue with how the old non-delta gRPC xDS implementation uses its "watches". Previously, when updating your interest from resources 0 and 1 to just 2, you would first send a request for 0,1,2, and then send one for just 2. With my change, you'll now send a request for nothing, and then just 2. Well, I don't know, maybe this is too much churn; maybe the existing logic makes sense. Fortunately, this will all be replaced in the near future with logic that will have the correct behavior (just send one request, for 2).

std::vector<std::string> cur_added;
std::vector<std::string> cur_removed;

// set_difference expects sorted collections. (resource_names_ is always sorted; it's std::set).
std::sort(resource_names.begin(), resource_names.end());

std::set_difference(resource_names.begin(), resource_names.end(), resource_names_.begin(),
Comment thread
fredlas marked this conversation as resolved.
Outdated
resource_names_.end(), std::inserter(cur_added, cur_added.begin()));
std::set_difference(resource_names_.begin(), resource_names_.end(), resource_names.begin(),
resource_names.end(), std::inserter(cur_removed, cur_removed.begin()));

for (auto a : cur_added) {
setResourceWaitingForServer(a);
// Removed->added requires us to keep track of it as a "new" addition, since our user may have
// forgotten its copy of the resource after instructing us to remove it, and so needs to be
// reminded of it.
names_removed_.erase(a);
names_added_.insert(a);
}
for (const auto& removed : diff.removed_) {
lostInterestInResource(removed);
for (auto r : cur_removed) {
lostInterestInResource(r);
// Ideally, when a resource is added-then-removed in between requests, we would avoid putting
// a superfluous "unsubscribe [resource that was never subscribed]" in the request. However,
// the removed-then-added case *does* need to go in the request, and due to how we accomplish
// that, it's difficult to distinguish remove-add-remove from add-remove (because "remove-add"
// has to be treated as equivalent to just "add").
names_added_.erase(r);
names_removed_.insert(r);
}
queueDiscoveryRequest(diff);
}

void sendDiscoveryRequest(const ResourceNameDiff& diff) {
if (!grpc_stream_.grpcStreamAvailable()) {
ENVOY_LOG(debug, "No stream available to sendDiscoveryRequest for {}", type_url_);
return; // Drop this request; the reconnect will enqueue a new one.
}
if (paused_) {
ENVOY_LOG(trace, "API {} paused during sendDiscoveryRequest().", type_url_);
pending_ = diff;
return; // The unpause will send this request.
stats_.update_attempt_.inc();
// Tell the server about our new interests (but only if there are any).
if (names_added_.size() > 0 || names_removed_.size() > 0) {
kickOffDiscoveryRequest();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this affect ACKs for server response nonces?

This had me thinking; for incremental xDS, do we want to consider splitting out ACK/NACK from the DeltaDiscoveryRequest? It made sense in state-of-world, because it was much simpler, but now we have different concerns, so a top-level oneof object might make more sense. You could end up sending two messages, an ack and then the next request.

Do you think ^^ would be easier to reason about?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "this" I'm guessing you mean the if condition here? There are a few paths that kickOffDiscoveryRequest(); this is not the one that would ever be doing it in the ACK scenario.

I think that's a very good point. IIRC there's some language like "the nonce field should only be set when ACKing a response", meaning there is somewhat of a de facto oneof already going on. If we do this, there is another bit of perceptual cleanup that would make sense, and that is dumping the Request/Response nomenclature. IMO it is usually not appropriate for bidi streaming, and this is definitely one such case.

One thing: I think the clarity benefits would be almost entirely at the "reading the .proto and docs / understanding the protocol" level (maybe the server implementations would also benefit; no idea). I think Envoy's implementation wouldn't change structure much; you would have one .cc file dealing with both oneof members in relatively close proximity. (Actually, maybe this would be more useful than I can understand, since I already fully "get" when the code is using a DiscoveryRequest as an ACK vs a request).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wasn't precise; I meant by "this" the idea of coalescing multiple Envoy DeltaDiscoveryRequests, each of which would have been sent in reply to some unique DeltaDiscoveryResponse with its own nonce. We will lose ACK/NACK as a result potentially, or maybe there is some detail that avoids this that I missed.

By splitting out the ACK/NACK as a distinct message and avoiding collapsing it into the request, when we unpause we could issue a series of ACK/NACKs with the relevant nonces, and then the new delta request.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIIIII see. Yes, that's definitely right; I will fix this PR accordingly. When you put it that way, I realize I'm pretty sure that xDS, including SotW, has been broken all along! (Due to how happy it is to drop things from the queue, and the fact that the nonce is just directly copied into request_, rather than queued up).

I think this would ideally be cleanest as a repeated field, rather than a series of messages. It would be very easy to implement here in Envoy; not sure how much work it would be in the server. Because it's so easy, there's no need to do it now "while we're already here" or anything like that, though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if SotW is broken, since the last DiscoveryResponse is always authoritative, replacing all earlier ones, so it's safe to just have the latest DiscoveryRequest in that situation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, because when you're always sending everything, ACKs are automatically "cumulative". Thanks, makes sense!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you decide to punt on making ACK/NACKs a distinct message?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think yes, punt. My big aggregated delta implementation is already working without that, and I'd rather not go back and add it in. Plus, I think it will be most clear whether we want it / how best to do it once that aggregated delta stuff is submitted, and we're looking at applying the prospective changes on top of it.

Oh, but, with this in place, what I said about "I think I could easily prevent those [no-ops] from being sent, but I was worried that that would be needlessly cute/complex" would probably be much cleaner to implement. So I bet we will want it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, I wrote a whole big response and then github threw it away. Basically, yes. It will be easier to see how we want to implement it once we can start freshly on top of the upcoming aggregated delta implementation. I think it will help make it cleaner to avoid sending the no-op messages, which is nice.

}

request_.clear_resource_names_subscribe();
request_.clear_resource_names_unsubscribe();
std::copy(diff.added_.begin(), diff.added_.end(),
Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_subscribe()));
std::copy(diff.removed_.begin(), diff.removed_.end(),
Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_unsubscribe()));

ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url_, request_.DebugString());
grpc_stream_.sendMessage(request_);
request_.clear_error_detail();
request_.clear_initial_resource_versions();
}

void subscribe(const std::vector<std::string>& resources) {
ENVOY_LOG(debug, "delta subscribe for " + type_url_);
buildAndQueueDiscoveryRequest(resources);
}

void pause() {
Expand All @@ -105,20 +89,13 @@ class DeltaSubscriptionImpl : public Subscription,
ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url_);
ASSERT(paused_);
paused_ = false;
if (pending_.has_value()) {
queueDiscoveryRequest(pending_.value());
pending_.reset();
}
trySendDiscoveryRequestIfPending();
}

envoy::api::v2::DeltaDiscoveryRequest internalRequestStateForTest() const { return request_; }

// Config::GrpcStreamCallbacks
void onStreamEstablished() override {
// initial_resource_versions "must be populated for first request in a stream", so guarantee
// that the initial version'd request we're about to enqueue is what gets sent.
clearRequestQueue();

request_.Clear();
for (auto const& resource : resource_versions_) {
// Populate initial_resource_versions with the resource versions we currently have. Resources
Expand All @@ -130,13 +107,16 @@ class DeltaSubscriptionImpl : public Subscription,
}
request_.set_type_url(type_url_);
request_.mutable_node()->MergeFrom(local_info_.node());
queueDiscoveryRequest(ResourceNameDiff()); // no change to subscribed resources
kickOffDiscoveryRequest();
}

void onEstablishmentFailure() override {
disableInitFetchTimeoutTimer();
stats_.update_failure_.inc();
ENVOY_LOG(debug, "delta update for {} failed", type_url_);
// TODO(fredlas) this increment is needed to pass existing tests, but it seems wrong. We already
// increment it when updating subscription interest, which attempts a request. Is this supposed
// to be the sum of client- and server- initiated update attempts? Seems weird.
Comment thread
fredlas marked this conversation as resolved.
stats_.update_attempt_.inc();
callbacks_->onConfigUpdateFailed(nullptr);
}
Expand All @@ -155,16 +135,19 @@ class DeltaSubscriptionImpl : public Subscription,
} catch (const EnvoyException& e) {
stats_.update_rejected_.inc();
ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what());
// TODO(fredlas) this increment is needed to pass existing tests, but it seems wrong. We
// already increment it when updating subscription interest, which attempts a request. Is this
// supposed to be the sum of client- and server- initiated update attempts? Seems weird.
stats_.update_attempt_.inc();
callbacks_->onConfigUpdateFailed(&e);
::google::rpc::Status* error_detail = request_.mutable_error_detail();
error_detail->set_code(Grpc::Status::GrpcStatus::Internal);
error_detail->set_message(e.what());
}
queueDiscoveryRequest(ResourceNameDiff()); // no change to subscribed resources
kickOffDiscoveryRequest();
}

void onWriteable() override { drainRequests(); }
void onWriteable() override { trySendDiscoveryRequestIfPending(); }

// Config::Subscription
void start(const std::vector<std::string>& resources, SubscriptionCallbacks& callbacks) override {
Expand All @@ -179,19 +162,26 @@ class DeltaSubscriptionImpl : public Subscription,
}

grpc_stream_.establishNewStream();
subscribe(resources);
// The attempt stat here is maintained for the purposes of having consistency between ADS and

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is referring to the same thing as the TODOs I added. Just how meaningless is update_attempt for ADS? Can we just do nothing with it and let it sit at 0?

// individual DeltaSubscriptions. Since ADS is push based and muxed, the notion of an
// "attempt" for a given xDS API combined by ADS is not really that meaningful.
stats_.update_attempt_.inc();
updateResources(resources);
}

void updateResources(const std::vector<std::string>& resources) override {
subscribe(resources);
stats_.update_attempt_.inc();
private:
void sendDiscoveryRequest() {
request_.clear_resource_names_subscribe();
request_.clear_resource_names_unsubscribe();
std::copy(names_added_.begin(), names_added_.end(),
Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_subscribe()));
std::copy(names_removed_.begin(), names_removed_.end(),
Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_unsubscribe()));
names_added_.clear();
names_removed_.clear();

ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url_, request_.DebugString());
grpc_stream_.sendMessage(request_);
request_.clear_error_detail();
request_.clear_initial_resource_versions();
}

private:
void
handleConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
Expand All @@ -214,6 +204,9 @@ class DeltaSubscriptionImpl : public Subscription,
}
}
stats_.update_success_.inc();
// TODO(fredlas) this increment is needed to pass existing tests, but it seems wrong. We already
// increment it when updating subscription interest, which attempts a request. Is this supposed
// to be the sum of client- and server- initiated update attempts? Seems weird.
stats_.update_attempt_.inc();
stats_.version_.set(HashUtil::xxHash64(version_info));
ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_,
Expand All @@ -227,14 +220,34 @@ class DeltaSubscriptionImpl : public Subscription,
}
}

void drainRequests() {
ENVOY_LOG(trace, "draining discovery requests {}", request_queue_.size());
while (!request_queue_.empty() && grpc_stream_.checkRateLimitAllowsDrain()) {
// Process the request, if rate limiting is not enabled at all or if it is under rate limit.
sendDiscoveryRequest(request_queue_.front());
request_queue_.pop();
void kickOffDiscoveryRequest() {
pending_ = true;
trySendDiscoveryRequestIfPending();
}

void trySendDiscoveryRequestIfPending() {
if (!pending_) {
return;
}
bool should_send = true;
if (paused_) {
ENVOY_LOG(trace, "API {} paused; discovery request on hold for now.", type_url_);
should_send = false;
}
grpc_stream_.maybeUpdateQueueSizeStat(request_queue_.size());
if (!grpc_stream_.grpcStreamAvailable()) {
ENVOY_LOG(trace, "No stream available to send a DiscoveryRequest for {}.", type_url_);
should_send = false;
}
if (!grpc_stream_.checkRateLimitAllowsDrain()) {
ENVOY_LOG(trace, "{} DiscoveryRequest hit rate limit; will try later.", type_url_);
should_send = false;
}

if (should_send) {
sendDiscoveryRequest();
pending_ = false;
}
grpc_stream_.maybeUpdateQueueSizeStat(pending_ ? 1 : 0);
}

class ResourceVersion {
Expand Down Expand Up @@ -272,23 +285,6 @@ class DeltaSubscriptionImpl : public Subscription,
resource_names_.erase(resource_name);
}

void queueDiscoveryRequest(const ResourceNameDiff& queue_item) {
request_queue_.push(queue_item);
drainRequests();
}

void clearRequestQueue() {
grpc_stream_.maybeUpdateQueueSizeStat(0);
// TODO(fredlas) when we have C++17: request_queue_ = {};
while (!request_queue_.empty()) {
request_queue_.pop();
}
}

// A queue to store requests while rate limited. Note that when requests cannot be sent due to the
// gRPC stream being down, this queue does not store them; rather, they are simply dropped.
std::queue<ResourceNameDiff> request_queue_;

GrpcStream<envoy::api::v2::DeltaDiscoveryRequest, envoy::api::v2::DeltaDiscoveryResponse>
grpc_stream_;

Expand All @@ -299,15 +295,21 @@ class DeltaSubscriptionImpl : public Subscription,
std::unordered_map<std::string, ResourceVersion> resource_versions_;
// The keys of resource_versions_. Only tracked separately because std::map does not provide an
// iterator into just its keys, e.g. for use in std::set_difference.
std::unordered_set<std::string> resource_names_;
// Must be stored sorted to work with std::set_difference.
std::set<std::string> resource_names_;

const std::string type_url_;
SubscriptionCallbacks* callbacks_{};
// In-flight or previously sent request.
envoy::api::v2::DeltaDiscoveryRequest request_;
// Paused via pause()?
bool paused_{};
absl::optional<ResourceNameDiff> pending_;
bool pending_{};

// Tracking of the delta in our subscription interest since the previous DeltaDiscoveryRequest was
// sent. Can't use unordered_set due to ordering issues in gTest expectation matching. Feel free
// to change if you can figure out how to make it work.
std::set<std::string> names_added_;
std::set<std::string> names_removed_;

const LocalInfo::LocalInfo& local_info_;
SubscriptionStats stats_;
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/filesystem_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class FilesystemSubscriptionImpl : public Config::Subscription,
refresh();
}

void updateResources(const std::vector<std::string>& resources) override {
void updateResources(std::vector<std::string> resources) override {
// We report all discovered resources in the watched file.
UNREFERENCED_PARAMETER(resources);
// Bump stats for consistence behavior with other xDS.
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_mux_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class GrpcMuxSubscriptionImpl : public Subscription,
stats_.update_attempt_.inc();
}

void updateResources(const std::vector<std::string>& resources) override {
void updateResources(std::vector<std::string> resources) override {
watch_ = grpc_mux_.subscribe(type_url_, resources, *this);
stats_.update_attempt_.inc();
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class GrpcSubscriptionImpl : public Config::Subscription {
grpc_mux_.start();
}

void updateResources(const std::vector<std::string>& resources) override {
void updateResources(std::vector<std::string> resources) override {
grpc_mux_subscription_.updateResources(resources);
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
initialize();
}

void updateResources(const std::vector<std::string>& resources) override {
void updateResources(std::vector<std::string> resources) override {
Protobuf::RepeatedPtrField<ProtobufTypes::String> resources_vector(resources.begin(),
resources.end());
request_.mutable_resource_names()->Swap(&resources_vector);
Expand Down
Loading