diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 5afdc83a2dd3d..deeceec73d7c8 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -24,8 +24,6 @@ struct ResourceNameDiff { std::vector removed_; }; -const char EmptyVersion[] = ""; - /** * Manages the logic of a (non-aggregated) delta xDS subscription. * TODO(fredlas) add aggregation support. @@ -53,8 +51,7 @@ class DeltaSubscriptionImpl } // Enqueues and attempts to send a discovery request, (un)subscribing to resources missing from / - // added to the passed 'resources' argument, relative to resources_. Updates resources_ to - // 'resources'. + // added to the passed 'resources' argument, relative to resource_versions_. void buildAndQueueDiscoveryRequest(const std::vector& resources) { ResourceNameDiff diff; std::set_difference(resources.begin(), resources.end(), resource_names_.begin(), @@ -63,12 +60,10 @@ class DeltaSubscriptionImpl resources.end(), std::inserter(diff.removed_, diff.removed_.begin())); for (const auto& added : diff.added_) { - resources_[added] = EmptyVersion; - resource_names_.insert(added); + setResourceWaitingForServer(added); } for (const auto& removed : diff.removed_) { - resources_.erase(removed); - resource_names_.erase(removed); + lostInterestInResource(removed); } queueDiscoveryRequest(diff); } @@ -118,13 +113,28 @@ class DeltaSubscriptionImpl } } + envoy::api::v2::DeltaDiscoveryRequest internalRequestStateForTest() const { return request_; } + // Config::SubscriptionCallbacks void onConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, const std::string& version_info) { callbacks_->onConfigUpdate(added_resources, removed_resources, version_info); for (const auto& resource : added_resources) { - resources_[resource.name()] = resource.version(); + setResourceVersion(resource.name(), resource.version()); + } + // If a resource is gone, there is no longer a meaningful version for it that makes sense to + // provide to the server upon stream reconnect: either it will continue to not exist, in which + // case saying nothing is fine, or the server will bring back something new, which we should + // receive regardless (which is the logic that not specifying a version will get you). + // + // So, leave the version map entry present but blank. It will be left out of + // initial_resource_versions messages, but will remind us to explicitly tell the server "I'm + // cancelling my subscription" when we lose interest. + for (const auto& resource_name : removed_resources) { + if (resource_names_.find(resource_name) != resource_names_.end()) { + setResourceWaitingForServer(resource_name); + } } stats_.update_success_.inc(); stats_.update_attempt_.inc(); @@ -161,8 +171,13 @@ class DeltaSubscriptionImpl clearRequestQueue(); request_.Clear(); - for (auto const& resource : resources_) { - (*request_.mutable_initial_resource_versions())[resource.first] = resource.second; + for (auto const& resource : resource_versions_) { + // Populate initial_resource_versions with the resource versions we currently have. Resources + // we are interested in, but are still waiting to get any version of from the server, do not + // belong in initial_resource_versions. + if (!resource.second.waitingForServer()) { + (*request_.mutable_initial_resource_versions())[resource.first] = resource.second.version(); + } } request_.set_type_url(type_url_); request_.mutable_node()->MergeFrom(local_info_.node()); @@ -210,11 +225,51 @@ class DeltaSubscriptionImpl init_fetch_timeout_timer_.reset(); } } - // A map from resource name to per-resource version. - std::unordered_map resources_; - // The keys of resources_. Only tracked separately because std::map does not provide an iterator - // into just its keys, e.g. for use in std::set_difference. + + class ResourceVersion { + public: + explicit ResourceVersion(absl::string_view version) : version_(version) {} + // Builds a ResourceVersion in the waitingForServer state. + ResourceVersion() {} + + // If true, we currently have no version of this resource - we are waiting for the server to + // provide us with one. + bool waitingForServer() const { return version_ == absl::nullopt; } + // Must not be called if waitingForServer() == true. + std::string version() const { + ASSERT(version_.has_value()); + return version_.value_or(""); + } + + private: + absl::optional version_; + }; + + // Use these helpers to avoid forgetting to update both at once. + void setResourceVersion(const std::string& resource_name, const std::string& resource_version) { + resource_versions_[resource_name] = ResourceVersion(resource_version); + resource_names_.insert(resource_name); + } + + void setResourceWaitingForServer(const std::string& resource_name) { + resource_versions_[resource_name] = ResourceVersion(); + resource_names_.insert(resource_name); + } + + void lostInterestInResource(const std::string& resource_name) { + resource_versions_.erase(resource_name); + resource_names_.erase(resource_name); + } + + // A map from resource name to per-resource version. The keys of this map are exactly the resource + // names we are currently interested in. Those in the waitingForServer state currently don't have + // any version for that resource: we need to inform the server if we lose interest in them, but we + // also need to *not* include them in the initial_resource_versions map upon a reconnect. + std::unordered_map resource_versions_; + // The keys of resource_versions_. Only tracked separately because std::map does not provide an + // iterator into just its keys, e.g. for use in std::set_difference. std::unordered_set resource_names_; + const std::string type_url_; SubscriptionCallbacks* callbacks_{}; // In-flight or previously sent request. @@ -224,7 +279,6 @@ class DeltaSubscriptionImpl absl::optional pending_; const LocalInfo::LocalInfo& local_info_; - SubscriptionStats stats_; Event::Dispatcher& dispatcher_; std::chrono::milliseconds init_fetch_timeout_; diff --git a/test/common/config/BUILD b/test/common/config/BUILD index f3f853f6856e1..7fb02b385b3ae 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -10,6 +10,23 @@ load( envoy_package() +envoy_cc_test( + name = "delta_subscription_impl_test", + srcs = ["delta_subscription_impl_test.cc"], + deps = [ + ":delta_subscription_test_harness", + "//source/common/config:delta_subscription_lib", + "//source/common/stats:isolated_store_lib", + "//test/mocks:common_lib", + "//test/mocks/config:config_mocks", + "//test/mocks/event:event_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/runtime:runtime_mocks", + "//test/test_common:logging_lib", + ], +) + envoy_cc_test( name = "filesystem_subscription_impl_test", srcs = ["filesystem_subscription_impl_test.cc"], diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc new file mode 100644 index 0000000000000..cbe980a526505 --- /dev/null +++ b/test/common/config/delta_subscription_impl_test.cc @@ -0,0 +1,80 @@ +#include "test/common/config/delta_subscription_test_harness.h" + +using testing::AnyNumber; +using testing::UnorderedElementsAre; + +namespace Envoy { +namespace Config { +namespace { + +class DeltaSubscriptionImplTest : public DeltaSubscriptionTestHarness, public testing::Test {}; + +TEST_F(DeltaSubscriptionImplTest, ResourceGoneLeadsToBlankInitialVersion) { + // Envoy is interested in three resources: name1, name2, and name3. + startSubscription({"name1", "name2", "name3"}); + + // Ignore these for now, although at the very end there is one we will care about. + EXPECT_CALL(async_stream_, sendMessage(_, _)).Times(AnyNumber()); + + // Semi-hack: we don't want the requests to actually get sent, since it would clear out the + // request_ that we want to inspect. pause() does the trick! + subscription_->pause(); + + // The xDS server's first update includes items for name1 and 2, but not 3. + Protobuf::RepeatedPtrField add1_2; + auto* resource = add1_2.Add(); + resource->set_name("name1"); + resource->set_version("version1A"); + resource = add1_2.Add(); + resource->set_name("name2"); + resource->set_version("version2A"); + subscription_->onConfigUpdate(add1_2, {}, "debugversion1"); + subscription_->handleStreamEstablished(); + envoy::api::v2::DeltaDiscoveryRequest cur_request = subscription_->internalRequestStateForTest(); + EXPECT_EQ("version1A", cur_request.initial_resource_versions().at("name1")); + EXPECT_EQ("version2A", cur_request.initial_resource_versions().at("name2")); + EXPECT_EQ(cur_request.initial_resource_versions().end(), + cur_request.initial_resource_versions().find("name3")); + + // The next update updates 1, removes 2, and adds 3. The map should then have 1 and 3. + Protobuf::RepeatedPtrField add1_3; + resource = add1_3.Add(); + resource->set_name("name1"); + resource->set_version("version1B"); + resource = add1_3.Add(); + resource->set_name("name3"); + resource->set_version("version3A"); + Protobuf::RepeatedPtrField remove2; + *remove2.Add() = "name2"; + subscription_->onConfigUpdate(add1_3, remove2, "debugversion2"); + subscription_->handleStreamEstablished(); + cur_request = subscription_->internalRequestStateForTest(); + EXPECT_EQ("version1B", cur_request.initial_resource_versions().at("name1")); + EXPECT_EQ(cur_request.initial_resource_versions().end(), + cur_request.initial_resource_versions().find("name2")); + EXPECT_EQ("version3A", cur_request.initial_resource_versions().at("name3")); + + // The next update removes 1 and 3. The map we send the server should be empty... + Protobuf::RepeatedPtrField remove1_3; + *remove1_3.Add() = "name1"; + *remove1_3.Add() = "name3"; + subscription_->onConfigUpdate({}, remove1_3, "debugversion3"); + subscription_->handleStreamEstablished(); + cur_request = subscription_->internalRequestStateForTest(); + EXPECT_TRUE(cur_request.initial_resource_versions().empty()); + + // ...but our own map should remember our interest. In particular, losing interest in all 3 should + // cause their names to appear in the resource_names_unsubscribe field of a DeltaDiscoveryRequest. + subscription_->resume(); // now we do want the request to actually get sendMessage()'d. + EXPECT_CALL(async_stream_, sendMessage(_, _)).WillOnce([](const Protobuf::Message& msg, bool) { + auto sent_request = static_cast(&msg); + EXPECT_THAT(sent_request->resource_names_subscribe(), UnorderedElementsAre("name4")); + EXPECT_THAT(sent_request->resource_names_unsubscribe(), + UnorderedElementsAre("name1", "name2", "name3")); + }); + subscription_->subscribe({"name4"}); // (implies "we no longer care about name1,2,3") +} + +} // namespace +} // namespace Config +} // namespace Envoy