Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 70 additions & 16 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ struct ResourceNameDiff {
std::vector<std::string> removed_;
};

const char EmptyVersion[] = "";

/**
* Manages the logic of a (non-aggregated) delta xDS subscription.
* TODO(fredlas) add aggregation support.
Expand Down Expand Up @@ -53,8 +51,7 @@ class DeltaSubscriptionImpl
}

// Enqueues and attempts to send a discovery request, (un)subscribing to resources missing from /
// added to the passed 'resources' argument, relative to resources_. Updates resources_ to
// 'resources'.
// added to the passed 'resources' argument, relative to resource_versions_.
void buildAndQueueDiscoveryRequest(const std::vector<std::string>& resources) {
ResourceNameDiff diff;
std::set_difference(resources.begin(), resources.end(), resource_names_.begin(),
Expand All @@ -63,12 +60,10 @@ class DeltaSubscriptionImpl
resources.end(), std::inserter(diff.removed_, diff.removed_.begin()));

for (const auto& added : diff.added_) {
resources_[added] = EmptyVersion;
resource_names_.insert(added);
setResourceWaitingForServer(added);
}
for (const auto& removed : diff.removed_) {
resources_.erase(removed);
resource_names_.erase(removed);
lostInterestInResource(removed);
}
queueDiscoveryRequest(diff);
}
Expand Down Expand Up @@ -118,13 +113,28 @@ class DeltaSubscriptionImpl
}
}

envoy::api::v2::DeltaDiscoveryRequest internalRequestStateForTest() const { return request_; }

// Config::SubscriptionCallbacks
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) {
callbacks_->onConfigUpdate(added_resources, removed_resources, version_info);
for (const auto& resource : added_resources) {
resources_[resource.name()] = resource.version();
setResourceVersion(resource.name(), resource.version());
}
// If a resource is gone, there is no longer a meaningful version for it that makes sense to
// provide to the server upon stream reconnect: either it will continue to not exist, in which
// case saying nothing is fine, or the server will bring back something new, which we should
// receive regardless (which is the logic that not specifying a version will get you).
//
// So, leave the version map entry present but blank. It will be left out of
// initial_resource_versions messages, but will remind us to explicitly tell the server "I'm
// cancelling my subscription" when we lose interest.
for (const auto& resource_name : removed_resources) {
if (resource_names_.find(resource_name) != resource_names_.end()) {
setResourceWaitingForServer(resource_name);
}
}
stats_.update_success_.inc();
stats_.update_attempt_.inc();
Expand Down Expand Up @@ -161,8 +171,13 @@ class DeltaSubscriptionImpl
clearRequestQueue();

request_.Clear();
for (auto const& resource : resources_) {
(*request_.mutable_initial_resource_versions())[resource.first] = resource.second;
for (auto const& resource : resource_versions_) {
// Populate initial_resource_versions with the resource versions we currently have. Resources
// we are interested in, but are still waiting to get any version of from the server, do not
// belong in initial_resource_versions.
if (!resource.second.waitingForServer()) {
(*request_.mutable_initial_resource_versions())[resource.first] = resource.second.version();
}
}
request_.set_type_url(type_url_);
request_.mutable_node()->MergeFrom(local_info_.node());
Expand Down Expand Up @@ -210,11 +225,51 @@ class DeltaSubscriptionImpl
init_fetch_timeout_timer_.reset();
}
}
// A map from resource name to per-resource version.
std::unordered_map<std::string, std::string> resources_;
// The keys of resources_. Only tracked separately because std::map does not provide an iterator
// into just its keys, e.g. for use in std::set_difference.

class ResourceVersion {
public:
explicit ResourceVersion(absl::string_view version) : version_(version) {}
// Builds a ResourceVersion in the waitingForServer state.
ResourceVersion() {}

// If true, we currently have no version of this resource - we are waiting for the server to
// provide us with one.
bool waitingForServer() const { return version_ == absl::nullopt; }
// Must not be called if waitingForServer() == true.
std::string version() const {
ASSERT(version_.has_value());
return version_.value_or("");
}

private:
absl::optional<std::string> version_;
};

// Use these helpers to avoid forgetting to update both at once.
void setResourceVersion(const std::string& resource_name, const std::string& resource_version) {
resource_versions_[resource_name] = ResourceVersion(resource_version);
resource_names_.insert(resource_name);
}

void setResourceWaitingForServer(const std::string& resource_name) {
resource_versions_[resource_name] = ResourceVersion();
resource_names_.insert(resource_name);
}

void lostInterestInResource(const std::string& resource_name) {
resource_versions_.erase(resource_name);
resource_names_.erase(resource_name);
}

// A map from resource name to per-resource version. The keys of this map are exactly the resource
// names we are currently interested in. Those in the waitingForServer state currently don't have
// any version for that resource: we need to inform the server if we lose interest in them, but we
// also need to *not* include them in the initial_resource_versions map upon a reconnect.
std::unordered_map<std::string, ResourceVersion> resource_versions_;
// The keys of resource_versions_. Only tracked separately because std::map does not provide an
// iterator into just its keys, e.g. for use in std::set_difference.
std::unordered_set<std::string> resource_names_;

const std::string type_url_;
SubscriptionCallbacks<ResourceType>* callbacks_{};
// In-flight or previously sent request.
Expand All @@ -224,7 +279,6 @@ class DeltaSubscriptionImpl
absl::optional<ResourceNameDiff> pending_;

const LocalInfo::LocalInfo& local_info_;

SubscriptionStats stats_;
Event::Dispatcher& dispatcher_;
std::chrono::milliseconds init_fetch_timeout_;
Expand Down
17 changes: 17 additions & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ load(

envoy_package()

envoy_cc_test(
name = "delta_subscription_impl_test",
srcs = ["delta_subscription_impl_test.cc"],
deps = [
":delta_subscription_test_harness",
"//source/common/config:delta_subscription_lib",
"//source/common/stats:isolated_store_lib",
"//test/mocks:common_lib",
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/local_info:local_info_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/test_common:logging_lib",
],
)

envoy_cc_test(
name = "filesystem_subscription_impl_test",
srcs = ["filesystem_subscription_impl_test.cc"],
Expand Down
80 changes: 80 additions & 0 deletions test/common/config/delta_subscription_impl_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include "test/common/config/delta_subscription_test_harness.h"

using testing::AnyNumber;
using testing::UnorderedElementsAre;

namespace Envoy {
namespace Config {
namespace {

class DeltaSubscriptionImplTest : public DeltaSubscriptionTestHarness, public testing::Test {};

TEST_F(DeltaSubscriptionImplTest, ResourceGoneLeadsToBlankInitialVersion) {
// Envoy is interested in three resources: name1, name2, and name3.
startSubscription({"name1", "name2", "name3"});

// Ignore these for now, although at the very end there is one we will care about.
EXPECT_CALL(async_stream_, sendMessage(_, _)).Times(AnyNumber());

// Semi-hack: we don't want the requests to actually get sent, since it would clear out the
// request_ that we want to inspect. pause() does the trick!
subscription_->pause();

// The xDS server's first update includes items for name1 and 2, but not 3.
Protobuf::RepeatedPtrField<envoy::api::v2::Resource> add1_2;
auto* resource = add1_2.Add();
resource->set_name("name1");
resource->set_version("version1A");
resource = add1_2.Add();
resource->set_name("name2");
resource->set_version("version2A");
subscription_->onConfigUpdate(add1_2, {}, "debugversion1");
subscription_->handleStreamEstablished();
envoy::api::v2::DeltaDiscoveryRequest cur_request = subscription_->internalRequestStateForTest();
EXPECT_EQ("version1A", cur_request.initial_resource_versions().at("name1"));
EXPECT_EQ("version2A", cur_request.initial_resource_versions().at("name2"));
EXPECT_EQ(cur_request.initial_resource_versions().end(),
cur_request.initial_resource_versions().find("name3"));

// The next update updates 1, removes 2, and adds 3. The map should then have 1 and 3.
Protobuf::RepeatedPtrField<envoy::api::v2::Resource> add1_3;
resource = add1_3.Add();
resource->set_name("name1");
resource->set_version("version1B");
resource = add1_3.Add();
resource->set_name("name3");
resource->set_version("version3A");
Protobuf::RepeatedPtrField<std::string> remove2;
*remove2.Add() = "name2";
subscription_->onConfigUpdate(add1_3, remove2, "debugversion2");
subscription_->handleStreamEstablished();
cur_request = subscription_->internalRequestStateForTest();
EXPECT_EQ("version1B", cur_request.initial_resource_versions().at("name1"));
EXPECT_EQ(cur_request.initial_resource_versions().end(),
cur_request.initial_resource_versions().find("name2"));
EXPECT_EQ("version3A", cur_request.initial_resource_versions().at("name3"));

// The next update removes 1 and 3. The map we send the server should be empty...
Protobuf::RepeatedPtrField<std::string> remove1_3;
*remove1_3.Add() = "name1";
*remove1_3.Add() = "name3";
subscription_->onConfigUpdate({}, remove1_3, "debugversion3");
subscription_->handleStreamEstablished();
cur_request = subscription_->internalRequestStateForTest();
EXPECT_TRUE(cur_request.initial_resource_versions().empty());

// ...but our own map should remember our interest. In particular, losing interest in all 3 should
// cause their names to appear in the resource_names_unsubscribe field of a DeltaDiscoveryRequest.
subscription_->resume(); // now we do want the request to actually get sendMessage()'d.
EXPECT_CALL(async_stream_, sendMessage(_, _)).WillOnce([](const Protobuf::Message& msg, bool) {
auto sent_request = static_cast<const envoy::api::v2::DeltaDiscoveryRequest*>(&msg);
EXPECT_THAT(sent_request->resource_names_subscribe(), UnorderedElementsAre("name4"));
EXPECT_THAT(sent_request->resource_names_unsubscribe(),
UnorderedElementsAre("name1", "name2", "name3"));
});
subscription_->subscribe({"name4"}); // (implies "we no longer care about name1,2,3")
}

} // namespace
} // namespace Config
} // namespace Envoy