From e9391d8980cc9401166e1b5c942059565210ca5b Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Wed, 10 Apr 2019 14:41:58 -0400 Subject: [PATCH 01/14] remove queue from delta xDS, fix sub/unsub logic, add tests Signed-off-by: Fred Douglas --- api/XDS_PROTOCOL.md | 5 +- include/envoy/config/subscription.h | 2 +- .../common/config/delta_subscription_impl.h | 192 +++++++++--------- .../config/filesystem_subscription_impl.h | 2 +- .../config/grpc_mux_subscription_impl.h | 2 +- source/common/config/grpc_subscription_impl.h | 2 +- source/common/config/http_subscription_impl.h | 2 +- .../config/delta_subscription_impl_test.cc | 74 ++++++- .../config/delta_subscription_test_harness.h | 19 +- test/mocks/config/mocks.h | 2 +- test/test_common/utility.h | 40 +++- 11 files changed, 220 insertions(+), 122 deletions(-) diff --git a/api/XDS_PROTOCOL.md b/api/XDS_PROTOCOL.md index 2401c15c9309f..84522c5407348 100644 --- a/api/XDS_PROTOCOL.md +++ b/api/XDS_PROTOCOL.md @@ -340,6 +340,9 @@ debugging purposes only. 3. Spontaneous `DeltaDiscoveryRequest` from the client. This can be done to dynamically add or remove elements from the tracked `resource_names` set. In this case `response_nonce` must be omitted. + The client may include resource names in the `resource_names_subscribe` + field that the server believes the client should already have. However, + the server must still provide those resources in the response. In this first example the client connects and receives a first update that it ACKs. The second update fails and the client NACKs the update. Later the xDS @@ -348,7 +351,7 @@ client spontaneously requests the "wc" resource. ![Incremental session example](diagrams/incremental.svg) On reconnect the Incremental xDS client may tell the server of its known -resources to avoid resending them over the network. +resources to avoid resending them over the network. Because no state is assumed to be preserved from the previous stream, the reconnecting client must provide the server with all resource names it is interested in. ![Incremental reconnect example](diagrams/incremental-reconnect.svg) diff --git a/include/envoy/config/subscription.h b/include/envoy/config/subscription.h index 2897e9798befc..d6efd22e27b9a 100644 --- a/include/envoy/config/subscription.h +++ b/include/envoy/config/subscription.h @@ -80,7 +80,7 @@ class Subscription { * Update the resources to fetch. * @param resources vector of resource names to fetch. */ - virtual void updateResources(const std::vector& resources) PURE; + virtual void updateResources(std::vector resource_names) PURE; }; /** diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 1e5f0e7cc0adb..5319f29f6c814 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -1,7 +1,5 @@ #pragma once -#include - #include "envoy/api/v2/discovery.pb.h" #include "envoy/common/token_bucket.h" #include "envoy/config/subscription.h" @@ -20,11 +18,6 @@ namespace Envoy { namespace Config { -struct ResourceNameDiff { - std::vector added_; - std::vector removed_; -}; - /** * Manages the logic of a (non-aggregated) delta xDS subscription. * TODO(fredlas) add aggregation support. The plan is for that to happen in XdsGrpcContext, @@ -48,51 +41,42 @@ class DeltaSubscriptionImpl : public Subscription, request_.mutable_node()->MergeFrom(local_info_.node()); } - // Enqueues and attempts to send a discovery request, (un)subscribing to resources missing from / - // added to the passed 'resources' argument, relative to resource_versions_. - void buildAndQueueDiscoveryRequest(const std::vector& resources) { - ResourceNameDiff diff; - std::set_difference(resources.begin(), resources.end(), resource_names_.begin(), - resource_names_.end(), std::inserter(diff.added_, diff.added_.begin())); - std::set_difference(resource_names_.begin(), resource_names_.end(), resources.begin(), - resources.end(), std::inserter(diff.removed_, diff.removed_.begin())); - - for (const auto& added : diff.added_) { - setResourceWaitingForServer(added); + void updateResources(std::vector resource_names) override { + std::vector cur_added; + std::vector cur_removed; + + // set_difference expects sorted collections. (resource_names_ is always sorted; it's std::set). + std::sort(resource_names.begin(), resource_names.end()); + + std::set_difference(resource_names.begin(), resource_names.end(), resource_names_.begin(), + resource_names_.end(), std::inserter(cur_added, cur_added.begin())); + std::set_difference(resource_names_.begin(), resource_names_.end(), resource_names.begin(), + resource_names.end(), std::inserter(cur_removed, cur_removed.begin())); + + for (auto a : cur_added) { + setResourceWaitingForServer(a); + // Removed->added requires us to keep track of it as a "new" addition, since our user may have + // forgotten its copy of the resource after instructing us to remove it, and so needs to be + // reminded of it. + names_removed_.erase(a); + names_added_.insert(a); } - for (const auto& removed : diff.removed_) { - lostInterestInResource(removed); + for (auto r : cur_removed) { + lostInterestInResource(r); + // Ideally, when a resource is added-then-removed in between requests, we would avoid putting + // a superfluous "unsubscribe [resource that was never subscribed]" in the request. However, + // the removed-then-added case *does* need to go in the request, and due to how we accomplish + // that, it's difficult to distinguish remove-add-remove from add-remove (because "remove-add" + // has to be treated as equivalent to just "add"). + names_added_.erase(r); + names_removed_.insert(r); } - queueDiscoveryRequest(diff); - } - void sendDiscoveryRequest(const ResourceNameDiff& diff) { - if (!grpc_stream_.grpcStreamAvailable()) { - ENVOY_LOG(debug, "No stream available to sendDiscoveryRequest for {}", type_url_); - return; // Drop this request; the reconnect will enqueue a new one. - } - if (paused_) { - ENVOY_LOG(trace, "API {} paused during sendDiscoveryRequest().", type_url_); - pending_ = diff; - return; // The unpause will send this request. + stats_.update_attempt_.inc(); + // Tell the server about our new interests (but only if there are any). + if (names_added_.size() > 0 || names_removed_.size() > 0) { + kickOffDiscoveryRequest(); } - - request_.clear_resource_names_subscribe(); - request_.clear_resource_names_unsubscribe(); - std::copy(diff.added_.begin(), diff.added_.end(), - Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_subscribe())); - std::copy(diff.removed_.begin(), diff.removed_.end(), - Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_unsubscribe())); - - ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url_, request_.DebugString()); - grpc_stream_.sendMessage(request_); - request_.clear_error_detail(); - request_.clear_initial_resource_versions(); - } - - void subscribe(const std::vector& resources) { - ENVOY_LOG(debug, "delta subscribe for " + type_url_); - buildAndQueueDiscoveryRequest(resources); } void pause() { @@ -105,20 +89,13 @@ class DeltaSubscriptionImpl : public Subscription, ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url_); ASSERT(paused_); paused_ = false; - if (pending_.has_value()) { - queueDiscoveryRequest(pending_.value()); - pending_.reset(); - } + trySendDiscoveryRequestIfPending(); } envoy::api::v2::DeltaDiscoveryRequest internalRequestStateForTest() const { return request_; } // Config::GrpcStreamCallbacks void onStreamEstablished() override { - // initial_resource_versions "must be populated for first request in a stream", so guarantee - // that the initial version'd request we're about to enqueue is what gets sent. - clearRequestQueue(); - request_.Clear(); for (auto const& resource : resource_versions_) { // Populate initial_resource_versions with the resource versions we currently have. Resources @@ -130,13 +107,16 @@ class DeltaSubscriptionImpl : public Subscription, } request_.set_type_url(type_url_); request_.mutable_node()->MergeFrom(local_info_.node()); - queueDiscoveryRequest(ResourceNameDiff()); // no change to subscribed resources + kickOffDiscoveryRequest(); } void onEstablishmentFailure() override { disableInitFetchTimeoutTimer(); stats_.update_failure_.inc(); ENVOY_LOG(debug, "delta update for {} failed", type_url_); + // TODO(fredlas) this increment is needed to pass existing tests, but it seems wrong. We already + // increment it when updating subscription interest, which attempts a request. Is this supposed + // to be the sum of client- and server- initiated update attempts? Seems weird. stats_.update_attempt_.inc(); callbacks_->onConfigUpdateFailed(nullptr); } @@ -155,16 +135,19 @@ class DeltaSubscriptionImpl : public Subscription, } catch (const EnvoyException& e) { stats_.update_rejected_.inc(); ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what()); + // TODO(fredlas) this increment is needed to pass existing tests, but it seems wrong. We + // already increment it when updating subscription interest, which attempts a request. Is this + // supposed to be the sum of client- and server- initiated update attempts? Seems weird. stats_.update_attempt_.inc(); callbacks_->onConfigUpdateFailed(&e); ::google::rpc::Status* error_detail = request_.mutable_error_detail(); error_detail->set_code(Grpc::Status::GrpcStatus::Internal); error_detail->set_message(e.what()); } - queueDiscoveryRequest(ResourceNameDiff()); // no change to subscribed resources + kickOffDiscoveryRequest(); } - void onWriteable() override { drainRequests(); } + void onWriteable() override { trySendDiscoveryRequestIfPending(); } // Config::Subscription void start(const std::vector& resources, SubscriptionCallbacks& callbacks) override { @@ -179,19 +162,26 @@ class DeltaSubscriptionImpl : public Subscription, } grpc_stream_.establishNewStream(); - subscribe(resources); - // The attempt stat here is maintained for the purposes of having consistency between ADS and - // individual DeltaSubscriptions. Since ADS is push based and muxed, the notion of an - // "attempt" for a given xDS API combined by ADS is not really that meaningful. - stats_.update_attempt_.inc(); + updateResources(resources); } - void updateResources(const std::vector& resources) override { - subscribe(resources); - stats_.update_attempt_.inc(); +private: + void sendDiscoveryRequest() { + request_.clear_resource_names_subscribe(); + request_.clear_resource_names_unsubscribe(); + std::copy(names_added_.begin(), names_added_.end(), + Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_subscribe())); + std::copy(names_removed_.begin(), names_removed_.end(), + Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_unsubscribe())); + names_added_.clear(); + names_removed_.clear(); + + ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url_, request_.DebugString()); + grpc_stream_.sendMessage(request_); + request_.clear_error_detail(); + request_.clear_initial_resource_versions(); } -private: void handleConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, @@ -214,6 +204,9 @@ class DeltaSubscriptionImpl : public Subscription, } } stats_.update_success_.inc(); + // TODO(fredlas) this increment is needed to pass existing tests, but it seems wrong. We already + // increment it when updating subscription interest, which attempts a request. Is this supposed + // to be the sum of client- and server- initiated update attempts? Seems weird. stats_.update_attempt_.inc(); stats_.version_.set(HashUtil::xxHash64(version_info)); ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_, @@ -227,14 +220,34 @@ class DeltaSubscriptionImpl : public Subscription, } } - void drainRequests() { - ENVOY_LOG(trace, "draining discovery requests {}", request_queue_.size()); - while (!request_queue_.empty() && grpc_stream_.checkRateLimitAllowsDrain()) { - // Process the request, if rate limiting is not enabled at all or if it is under rate limit. - sendDiscoveryRequest(request_queue_.front()); - request_queue_.pop(); + void kickOffDiscoveryRequest() { + pending_ = true; + trySendDiscoveryRequestIfPending(); + } + + void trySendDiscoveryRequestIfPending() { + if (!pending_) { + return; + } + bool should_send = true; + if (paused_) { + ENVOY_LOG(trace, "API {} paused; discovery request on hold for now.", type_url_); + should_send = false; } - grpc_stream_.maybeUpdateQueueSizeStat(request_queue_.size()); + if (!grpc_stream_.grpcStreamAvailable()) { + ENVOY_LOG(trace, "No stream available to send a DiscoveryRequest for {}.", type_url_); + should_send = false; + } + if (!grpc_stream_.checkRateLimitAllowsDrain()) { + ENVOY_LOG(trace, "{} DiscoveryRequest hit rate limit; will try later.", type_url_); + should_send = false; + } + + if (should_send) { + sendDiscoveryRequest(); + pending_ = false; + } + grpc_stream_.maybeUpdateQueueSizeStat(pending_ ? 1 : 0); } class ResourceVersion { @@ -272,23 +285,6 @@ class DeltaSubscriptionImpl : public Subscription, resource_names_.erase(resource_name); } - void queueDiscoveryRequest(const ResourceNameDiff& queue_item) { - request_queue_.push(queue_item); - drainRequests(); - } - - void clearRequestQueue() { - grpc_stream_.maybeUpdateQueueSizeStat(0); - // TODO(fredlas) when we have C++17: request_queue_ = {}; - while (!request_queue_.empty()) { - request_queue_.pop(); - } - } - - // A queue to store requests while rate limited. Note that when requests cannot be sent due to the - // gRPC stream being down, this queue does not store them; rather, they are simply dropped. - std::queue request_queue_; - GrpcStream grpc_stream_; @@ -299,15 +295,21 @@ class DeltaSubscriptionImpl : public Subscription, std::unordered_map resource_versions_; // The keys of resource_versions_. Only tracked separately because std::map does not provide an // iterator into just its keys, e.g. for use in std::set_difference. - std::unordered_set resource_names_; + // Must be stored sorted to work with std::set_difference. + std::set resource_names_; const std::string type_url_; SubscriptionCallbacks* callbacks_{}; // In-flight or previously sent request. envoy::api::v2::DeltaDiscoveryRequest request_; - // Paused via pause()? bool paused_{}; - absl::optional pending_; + bool pending_{}; + + // Tracking of the delta in our subscription interest since the previous DeltaDiscoveryRequest was + // sent. Can't use unordered_set due to ordering issues in gTest expectation matching. Feel free + // to change if you can figure out how to make it work. + std::set names_added_; + std::set names_removed_; const LocalInfo::LocalInfo& local_info_; SubscriptionStats stats_; diff --git a/source/common/config/filesystem_subscription_impl.h b/source/common/config/filesystem_subscription_impl.h index 5a9344d994876..b09974623b94c 100644 --- a/source/common/config/filesystem_subscription_impl.h +++ b/source/common/config/filesystem_subscription_impl.h @@ -45,7 +45,7 @@ class FilesystemSubscriptionImpl : public Config::Subscription, refresh(); } - void updateResources(const std::vector& resources) override { + void updateResources(std::vector resources) override { // We report all discovered resources in the watched file. UNREFERENCED_PARAMETER(resources); // Bump stats for consistence behavior with other xDS. diff --git a/source/common/config/grpc_mux_subscription_impl.h b/source/common/config/grpc_mux_subscription_impl.h index 5526651ae7f87..d4a4197025243 100644 --- a/source/common/config/grpc_mux_subscription_impl.h +++ b/source/common/config/grpc_mux_subscription_impl.h @@ -45,7 +45,7 @@ class GrpcMuxSubscriptionImpl : public Subscription, stats_.update_attempt_.inc(); } - void updateResources(const std::vector& resources) override { + void updateResources(std::vector resources) override { watch_ = grpc_mux_.subscribe(type_url_, resources, *this); stats_.update_attempt_.inc(); } diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index 04b1b2aa6981f..c3ca0b9d0c2b9 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -32,7 +32,7 @@ class GrpcSubscriptionImpl : public Config::Subscription { grpc_mux_.start(); } - void updateResources(const std::vector& resources) override { + void updateResources(std::vector resources) override { grpc_mux_subscription_.updateResources(resources); } diff --git a/source/common/config/http_subscription_impl.h b/source/common/config/http_subscription_impl.h index 0ae9e6a2287e0..75954aedbd652 100644 --- a/source/common/config/http_subscription_impl.h +++ b/source/common/config/http_subscription_impl.h @@ -67,7 +67,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, initialize(); } - void updateResources(const std::vector& resources) override { + void updateResources(std::vector resources) override { Protobuf::RepeatedPtrField resources_vector(resources.begin(), resources.end()); request_.mutable_resource_names()->Swap(&resources_vector); diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index 98ea598c25794..7925f1b9cc1c4 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -77,14 +77,72 @@ TEST_F(DeltaSubscriptionImplTest, ResourceGoneLeadsToBlankInitialVersion) { // ...but our own map should remember our interest. In particular, losing interest in all 3 should // cause their names to appear in the resource_names_unsubscribe field of a DeltaDiscoveryRequest. - subscription_->resume(); // now we do want the request to actually get sendMessage()'d. - EXPECT_CALL(async_stream_, sendMessage(_, _)).WillOnce([](const Protobuf::Message& msg, bool) { - auto sent_request = static_cast(&msg); - EXPECT_THAT(sent_request->resource_names_subscribe(), UnorderedElementsAre("name4")); - EXPECT_THAT(sent_request->resource_names_unsubscribe(), - UnorderedElementsAre("name1", "name2", "name3")); - }); - subscription_->subscribe({"name4"}); // (implies "we no longer care about name1,2,3") + subscription_->resume(); // we do want the final subscribe() to do a sendMessage(). + expectSendMessage({"name4"}, {"name1", "name2", "name3"}, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->updateResources({"name4"}); // (implies "we no longer care about name1,2,3") +} + +// If Envoy decided it wasn't interested in a resource and then (before a request was sent) decided +// it was again, for all we know, it dropped that resource in between and needs to retrieve it +// again. So, we *should* send a request "re-"subscribing. This means that the server needs to +// interpret the resource_names_subscribe field as "send these resources even if you think Envoy +// already has them". +TEST_F(DeltaSubscriptionImplTest, RemoveThenAdd) { + startSubscription({"name1", "name2", "name3"}); + subscription_->pause(); // Pause because we're testing multiple updates in between request sends. + subscription_->updateResources({"name1", "name2"}); + subscription_->updateResources({"name1", "name2", "name3"}); + expectSendMessage({"name3"}, {}, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->resume(); +} + +// Due to the need for the behavior tested by LoseThenGainSent, gain-then-losing interest in +// resource X before the DeltaDiscoveryRequest is sent causes that request to "unsubscribe" from X. +// Ideally we would have the request simply not include no mention of X. Oh well. This test is just +// here to illustrate that this behavior exists, not to enforce that it should be like this. What +// *is* importat: the server must happily and cleanly ignore "unsubscribe from [resource name I have +// never before referred to]" requests. +TEST_F(DeltaSubscriptionImplTest, AddThenRemove) { + startSubscription({"name1", "name2", "name3"}); + subscription_->pause(); // Pause because we're testing multiple updates in between request sends. + subscription_->updateResources({"name1", "name2", "name3", "name4"}); + subscription_->updateResources({"name1", "name2", "name3"}); + expectSendMessage({}, {"name4"}, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->resume(); +} + +// add/remove/add == add. +TEST_F(DeltaSubscriptionImplTest, AddRemoveAdd) { + startSubscription({"name1", "name2", "name3"}); + subscription_->pause(); + subscription_->updateResources({"name1", "name2", "name3", "name4"}); + subscription_->updateResources({"name1", "name2", "name3"}); + subscription_->updateResources({"name1", "name2", "name3", "name4"}); + expectSendMessage({"name4"}, {}, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->resume(); +} + +// remove/add/remove == remove. +TEST_F(DeltaSubscriptionImplTest, RemoveAddRemove) { + startSubscription({"name1", "name2", "name3"}); + subscription_->pause(); + subscription_->updateResources({"name1", "name2"}); + subscription_->updateResources({"name1", "name2", "name3"}); + subscription_->updateResources({"name1", "name2"}); + expectSendMessage({}, {"name3"}, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->resume(); +} + +// Starts with 1,2,3. 4 is added/removed/added. In those same updates, 1,2,3 are +// removed/added/removed. End result should be 4 added and 1,2,3 removed. +TEST_F(DeltaSubscriptionImplTest, BothAddAndRemove) { + startSubscription({"name1", "name2", "name3"}); + subscription_->pause(); + subscription_->updateResources({"name4"}); + subscription_->updateResources({"name1", "name2", "name3"}); + subscription_->updateResources({"name4"}); + expectSendMessage({"name4"}, {"name1", "name2", "name3"}, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->resume(); } } // namespace diff --git a/test/common/config/delta_subscription_test_harness.h b/test/common/config/delta_subscription_test_harness.h index 5764b619de08c..a8b0d6c9e269b 100644 --- a/test/common/config/delta_subscription_test_harness.h +++ b/test/common/config/delta_subscription_test_harness.h @@ -60,8 +60,8 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { for (const auto& resource : subscribe) { expected_request.add_resource_names_subscribe(resource); } - for (auto resource = unsubscribe.rbegin(); resource != unsubscribe.rend(); ++resource) { - expected_request.add_resource_names_unsubscribe(*resource); + for (const auto& resource : unsubscribe) { + expected_request.add_resource_names_unsubscribe(resource); } expected_request.set_response_nonce(last_response_nonce_); expected_request.set_type_url(Config::TypeUrl::get().ClusterLoadAssignment); @@ -71,7 +71,6 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { error_detail->set_code(error_code); error_detail->set_message(error_message); } - std::cerr << "EXPECTING DiscoveryRequest: " << expected_request.DebugString() << std::endl; EXPECT_CALL(async_stream_, sendMessage(ProtoEq(expected_request), false)); } @@ -109,10 +108,16 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { } void updateResources(const std::vector& cluster_names) override { - std::vector cluster_superset = cluster_names; - cluster_superset.insert(cluster_superset.end(), last_cluster_names_.begin(), - last_cluster_names_.end()); - expectSendMessage(cluster_names, last_cluster_names_, Grpc::Status::GrpcStatus::Ok, ""); + std::vector sub; + std::vector unsub; + + std::set_difference(cluster_names.begin(), cluster_names.end(), last_cluster_names_.begin(), + last_cluster_names_.end(), std::inserter(sub, sub.begin())); + std::set_difference(last_cluster_names_.begin(), last_cluster_names_.end(), + cluster_names.begin(), cluster_names.end(), + std::inserter(unsub, unsub.begin())); + + expectSendMessage(sub, unsub, Grpc::Status::GrpcStatus::Ok, ""); subscription_->updateResources(cluster_names); last_cluster_names_ = cluster_names; } diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index 9cc82f2380834..de51d6d10c010 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -42,7 +42,7 @@ class MockSubscription : public Subscription { public: MOCK_METHOD2_T(start, void(const std::vector& resources, SubscriptionCallbacks& callbacks)); - MOCK_METHOD1_T(updateResources, void(const std::vector& resources)); + MOCK_METHOD1_T(updateResources, void(std::vector resources)); }; class MockGrpcMuxWatch : public GrpcMuxWatch { diff --git a/test/test_common/utility.h b/test/test_common/utility.h index 2c1464bf05505..2ed49861ffd79 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -527,14 +527,44 @@ ApiPtr createApiForTest(Event::TimeSystem& time_system); ApiPtr createApiForTest(Stats::Store& stat_store, Event::TimeSystem& time_system); } // namespace Api -MATCHER_P(HeaderMapEqualIgnoreOrder, rhs, "") { - *result_listener << *rhs << " is not equal to " << *arg; - return TestUtility::headerMapEqualIgnoreOrder(*arg, *rhs); +MATCHER_P(HeaderMapEqualIgnoreOrder, expected, "") { + const bool equal = TestUtility::headerMapEqualIgnoreOrder(*arg, *expected); + if (!equal) { + *result_listener << "\n" + << "========================Expected header map:========================\n" + << *expected + << "-----------------is not equal to actual header map:-----------------\n" + << *arg + << "====================================================================\n"; + } + return equal; } -MATCHER_P(ProtoEq, rhs, "") { return TestUtility::protoEqual(arg, rhs); } +MATCHER_P(ProtoEq, expected, "") { + const bool equal = TestUtility::protoEqual(arg, expected); + if (!equal) { + *result_listener << "\n" + << "==========================Expected proto:===========================\n" + << expected.DebugString() + << "------------------is not equal to actual proto:---------------------\n" + << arg.DebugString() + << "====================================================================\n"; + } + return equal; +} -MATCHER_P(RepeatedProtoEq, rhs, "") { return TestUtility::repeatedPtrFieldEqual(arg, rhs); } +MATCHER_P(RepeatedProtoEq, expected, "") { + const bool equal = TestUtility::repeatedPtrFieldEqual(arg, expected); + if (!equal) { + *result_listener << "\n" + << "=======================Expected repeated:===========================\n" + << RepeatedPtrUtil::debugString(expected) << "\n" + << "-----------------is not equal to actual repeated:-------------------\n" + << RepeatedPtrUtil::debugString(arg) << "\n" + << "====================================================================\n"; + } + return equal; +} MATCHER_P(Percent, rhs, "") { envoy::type::FractionalPercent expected; From 13d224d02934bc4223eeba0f42f0486a955f200d Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Wed, 10 Apr 2019 14:59:37 -0400 Subject: [PATCH 02/14] more explicit specification in XDS_PROTOCOL.md Signed-off-by: Fred Douglas --- api/XDS_PROTOCOL.md | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/api/XDS_PROTOCOL.md b/api/XDS_PROTOCOL.md index 84522c5407348..00b8a3adb69a5 100644 --- a/api/XDS_PROTOCOL.md +++ b/api/XDS_PROTOCOL.md @@ -340,9 +340,6 @@ debugging purposes only. 3. Spontaneous `DeltaDiscoveryRequest` from the client. This can be done to dynamically add or remove elements from the tracked `resource_names` set. In this case `response_nonce` must be omitted. - The client may include resource names in the `resource_names_subscribe` - field that the server believes the client should already have. However, - the server must still provide those resources in the response. In this first example the client connects and receives a first update that it ACKs. The second update fails and the client NACKs the update. Later the xDS @@ -361,17 +358,24 @@ identified by the alias field in the resource of a `DeltaDiscoveryResponse`. The be returned in the name field in the resource of a `DeltaDiscoveryResponse`. #### Subscribing to Resources -Envoy can send either an alias or the name of a resource in the `resource_names_subscribe` field of -a `DeltaDiscoveryRequest` in order to subscribe to a resource. Envoy should check both the names and -aliases of resources in order to determine whether the entity in question has been subscribed to. +The client can send either an alias or the name of a resource in the `resource_names_subscribe` +field of a `DeltaDiscoveryRequest` in order to subscribe to a resource. Both the names and aliases +of resources should be checked in order to determine whether the entity in question has been +subscribed to. + +A `resource_names_subscribe` field may contain resource names that the server believes the client +is already subscribed to, and furthermore has the most recent versions of. However, the server +*must* still provide those resources in the response; due to implementation details hidden from +the server, the client may have "forgotten" those resources despite apparently remaining subscribed. #### Unsubscribing from Resources -Envoy will keep track of a per resource reference count internally. This count will keep track of the -total number of aliases/resource names that are currently subscribed to. When the reference count -reaches zero, Envoy will send a `DeltaDiscoveryRequest` containing the resource name of the resource -to unsubscribe from in the `resource_names_unsubscribe` field. When Envoy unsubscribes from a resource, -it should check for both the resource name and all aliases and appropriately update all resources -that reference either. +When a client loses interest in some resources, it will indicate that with the +`resource_names_unsubscribe` field of a `DeltaDiscoveryRequest`. As with `resource_names_subscribe`, +these may be resource names or aliases. + +A `resource_names_unsubscribe` field may contain superfluous resource names, which the server +thought the client was already not subscribed to. The server must cleanly process such a request; +it can simply ignore these phantom unsubscriptions. ## REST-JSON polling subscriptions From d6afbcb2cd0cbf27b016bf091ef0c208c10ec734 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Wed, 10 Apr 2019 15:35:36 -0400 Subject: [PATCH 03/14] another unit test Signed-off-by: Fred Douglas --- test/common/config/delta_subscription_impl_test.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index 7925f1b9cc1c4..fa05e4072fcf8 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -145,6 +145,19 @@ TEST_F(DeltaSubscriptionImplTest, BothAddAndRemove) { subscription_->resume(); } +// If one update fails to send a request (pausing, rate limit, no stream are all identical for this +// purpose), and then another update comes along and also fails, when the request is finally sent, +// both should be present. (A previous version of the code would have had 1->12 generate a diff of +// 2, then 12->123 generate a diff of 3, which would replace 2). +TEST_F(DeltaSubscriptionImplTest, CumulativeUpdates) { + startSubscription({"name1"}); + subscription_->pause(); + subscription_->updateResources({"name1", "name2"}); + subscription_->updateResources({"name1", "name2", "name3"}); + expectSendMessage({"name2", "name3"}, {}, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->resume(); +} + } // namespace } // namespace Config } // namespace Envoy From c1f533f6f62ee9eb92b7a333510a7c3dbc86ef73 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Wed, 10 Apr 2019 15:43:35 -0400 Subject: [PATCH 04/14] add else. might matter if checkRateLimitAllowsDrain has an effect. Signed-off-by: Fred Douglas --- source/common/config/delta_subscription_impl.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 5319f29f6c814..2ffb257ebe20a 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -233,12 +233,10 @@ class DeltaSubscriptionImpl : public Subscription, if (paused_) { ENVOY_LOG(trace, "API {} paused; discovery request on hold for now.", type_url_); should_send = false; - } - if (!grpc_stream_.grpcStreamAvailable()) { + } else if (!grpc_stream_.grpcStreamAvailable()) { ENVOY_LOG(trace, "No stream available to send a DiscoveryRequest for {}.", type_url_); should_send = false; - } - if (!grpc_stream_.checkRateLimitAllowsDrain()) { + } else if (!grpc_stream_.checkRateLimitAllowsDrain()) { ENVOY_LOG(trace, "{} DiscoveryRequest hit rate limit; will try later.", type_url_); should_send = false; } From 8a82b87f75bd4c01f54783be352f0106dbd4c00f Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Wed, 10 Apr 2019 15:47:51 -0400 Subject: [PATCH 05/14] double negative Signed-off-by: Fred Douglas --- test/common/config/delta_subscription_impl_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index fa05e4072fcf8..20bd366d7e65a 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -98,7 +98,7 @@ TEST_F(DeltaSubscriptionImplTest, RemoveThenAdd) { // Due to the need for the behavior tested by LoseThenGainSent, gain-then-losing interest in // resource X before the DeltaDiscoveryRequest is sent causes that request to "unsubscribe" from X. -// Ideally we would have the request simply not include no mention of X. Oh well. This test is just +// Ideally we would have the request simply not include any mention of X. Oh well. This test is just // here to illustrate that this behavior exists, not to enforce that it should be like this. What // *is* importat: the server must happily and cleanly ignore "unsubscribe from [resource name I have // never before referred to]" requests. From 278344ce3acd9d0d35a7e3e3b24e86e9619a20c0 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Wed, 10 Apr 2019 15:53:18 -0400 Subject: [PATCH 06/14] typo Signed-off-by: Fred Douglas --- test/common/config/delta_subscription_impl_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index 20bd366d7e65a..490e631948dfe 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -100,8 +100,8 @@ TEST_F(DeltaSubscriptionImplTest, RemoveThenAdd) { // resource X before the DeltaDiscoveryRequest is sent causes that request to "unsubscribe" from X. // Ideally we would have the request simply not include any mention of X. Oh well. This test is just // here to illustrate that this behavior exists, not to enforce that it should be like this. What -// *is* importat: the server must happily and cleanly ignore "unsubscribe from [resource name I have -// never before referred to]" requests. +// *is* important: the server must happily and cleanly ignore "unsubscribe from [resource name I +// have never before referred to]" requests. TEST_F(DeltaSubscriptionImplTest, AddThenRemove) { startSubscription({"name1", "name2", "name3"}); subscription_->pause(); // Pause because we're testing multiple updates in between request sends. From e2c13707c2aa4396400d78e0f9935009736d5d87 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Thu, 11 Apr 2019 10:28:07 -0400 Subject: [PATCH 07/14] clang tidy, clarify test comments Signed-off-by: Fred Douglas --- .../common/config/delta_subscription_impl.h | 31 +++++++++++-------- .../config/delta_subscription_impl_test.cc | 14 +++++---- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 2ffb257ebe20a..2adcfb9e520fc 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -53,7 +53,7 @@ class DeltaSubscriptionImpl : public Subscription, std::set_difference(resource_names_.begin(), resource_names_.end(), resource_names.begin(), resource_names.end(), std::inserter(cur_removed, cur_removed.begin())); - for (auto a : cur_added) { + for (const auto& a : cur_added) { setResourceWaitingForServer(a); // Removed->added requires us to keep track of it as a "new" addition, since our user may have // forgotten its copy of the resource after instructing us to remove it, and so needs to be @@ -61,7 +61,7 @@ class DeltaSubscriptionImpl : public Subscription, names_removed_.erase(a); names_added_.insert(a); } - for (auto r : cur_removed) { + for (const auto& r : cur_removed) { lostInterestInResource(r); // Ideally, when a resource is added-then-removed in between requests, we would avoid putting // a superfluous "unsubscribe [resource that was never subscribed]" in the request. However, @@ -74,7 +74,7 @@ class DeltaSubscriptionImpl : public Subscription, stats_.update_attempt_.inc(); // Tell the server about our new interests (but only if there are any). - if (names_added_.size() > 0 || names_removed_.size() > 0) { + if (!names_added_.empty() || !names_removed_.empty()) { kickOffDiscoveryRequest(); } } @@ -100,11 +100,14 @@ class DeltaSubscriptionImpl : public Subscription, for (auto const& resource : resource_versions_) { // Populate initial_resource_versions with the resource versions we currently have. Resources // we are interested in, but are still waiting to get any version of from the server, do not - // belong in initial_resource_versions. + // belong in initial_resource_versions. (But do belong in new subscriptions!) if (!resource.second.waitingForServer()) { (*request_.mutable_initial_resource_versions())[resource.first] = resource.second.version(); } + // As mentioned above, fill resource_names_subscribe with everything. + names_added_.insert(resource.first); } + names_removed_.clear(); request_.set_type_url(type_url_); request_.mutable_node()->MergeFrom(local_info_.node()); kickOffDiscoveryRequest(); @@ -225,23 +228,25 @@ class DeltaSubscriptionImpl : public Subscription, trySendDiscoveryRequestIfPending(); } - void trySendDiscoveryRequestIfPending() { - if (!pending_) { - return; - } - bool should_send = true; + bool shouldSendDiscoveryRequest() { if (paused_) { ENVOY_LOG(trace, "API {} paused; discovery request on hold for now.", type_url_); - should_send = false; + return false; } else if (!grpc_stream_.grpcStreamAvailable()) { ENVOY_LOG(trace, "No stream available to send a DiscoveryRequest for {}.", type_url_); - should_send = false; + return false; } else if (!grpc_stream_.checkRateLimitAllowsDrain()) { ENVOY_LOG(trace, "{} DiscoveryRequest hit rate limit; will try later.", type_url_); - should_send = false; + return false; } + return true; + } - if (should_send) { + void trySendDiscoveryRequestIfPending() { + if (!pending_) { + return; + } + if (shouldSendDiscoveryRequest()) { sendDiscoveryRequest(); pending_ = false; } diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index 490e631948dfe..b28c9552ed83d 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -96,12 +96,14 @@ TEST_F(DeltaSubscriptionImplTest, RemoveThenAdd) { subscription_->resume(); } -// Due to the need for the behavior tested by LoseThenGainSent, gain-then-losing interest in -// resource X before the DeltaDiscoveryRequest is sent causes that request to "unsubscribe" from X. -// Ideally we would have the request simply not include any mention of X. Oh well. This test is just -// here to illustrate that this behavior exists, not to enforce that it should be like this. What -// *is* important: the server must happily and cleanly ignore "unsubscribe from [resource name I -// have never before referred to]" requests. +// Due to how our implementation provides the required behavior tested in RemoveThenAdd, the +// add-then-remove case *also* causes the resource to be referred to in the request (as an +// unsubscribe). +// Unlike the remove-then-add case, this one really is unnecessary, and ideally we would have +// the request simply not include any mention of the resource. Oh well. +// This test is just here to illustrate that this behavior exists, not to enforce that it +// should be like this. What *is* important: the server must happily and cleanly ignore +// "unsubscribe from [resource name I have never before referred to]" requests. TEST_F(DeltaSubscriptionImplTest, AddThenRemove) { startSubscription({"name1", "name2", "name3"}); subscription_->pause(); // Pause because we're testing multiple updates in between request sends. From d2d9dc6256cf47364326c5a99c1754ef99621952 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Thu, 11 Apr 2019 13:21:52 -0400 Subject: [PATCH 08/14] better name for resource_names Signed-off-by: Fred Douglas --- source/common/config/delta_subscription_impl.h | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 2adcfb9e520fc..0acc043be0a96 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -41,17 +41,19 @@ class DeltaSubscriptionImpl : public Subscription, request_.mutable_node()->MergeFrom(local_info_.node()); } - void updateResources(std::vector resource_names) override { + void updateResources(std::vector update_to_these_names) override { std::vector cur_added; std::vector cur_removed; // set_difference expects sorted collections. (resource_names_ is always sorted; it's std::set). - std::sort(resource_names.begin(), resource_names.end()); - - std::set_difference(resource_names.begin(), resource_names.end(), resource_names_.begin(), - resource_names_.end(), std::inserter(cur_added, cur_added.begin())); - std::set_difference(resource_names_.begin(), resource_names_.end(), resource_names.begin(), - resource_names.end(), std::inserter(cur_removed, cur_removed.begin())); + std::sort(update_to_these_names.begin(), update_to_these_names.end()); + + std::set_difference(update_to_these_names.begin(), update_to_these_names.end(), + resource_names_.begin(), resource_names_.end(), + std::inserter(cur_added, cur_added.begin())); + std::set_difference(resource_names_.begin(), resource_names_.end(), + update_to_these_names.begin(), update_to_these_names.end(), + std::inserter(cur_removed, cur_removed.begin())); for (const auto& a : cur_added) { setResourceWaitingForServer(a); From 823123b88ca99850fea67096829e5a06f1430789 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Fri, 12 Apr 2019 09:04:48 -0400 Subject: [PATCH 09/14] std set everywhere, fix watch issue Signed-off-by: Fred Douglas --- include/envoy/config/grpc_mux.h | 4 +-- include/envoy/config/subscription.h | 10 +++---- source/common/common/utility.cc | 9 ++++++ source/common/common/utility.h | 8 +++++ .../common/config/delta_subscription_impl.h | 7 ++--- .../config/filesystem_subscription_impl.h | 4 +-- source/common/config/grpc_mux_impl.cc | 2 +- source/common/config/grpc_mux_impl.h | 8 ++--- .../config/grpc_mux_subscription_impl.h | 10 +++++-- source/common/config/grpc_subscription_impl.h | 6 ++-- source/common/config/http_subscription_impl.h | 8 ++--- .../config/delta_subscription_test_harness.h | 30 +++++++++---------- .../filesystem_subscription_test_harness.h | 6 ++-- .../config/grpc_subscription_test_harness.h | 26 ++++++++++------ .../config/http_subscription_test_harness.h | 8 ++--- test/common/config/subscription_impl_test.cc | 7 ++--- .../common/config/subscription_test_harness.h | 6 ++-- test/mocks/config/mocks.cc | 2 +- test/mocks/config/mocks.h | 8 ++--- 19 files changed, 97 insertions(+), 72 deletions(-) diff --git a/include/envoy/config/grpc_mux.h b/include/envoy/config/grpc_mux.h index 6872a62d875b6..fb66a78abdcbb 100644 --- a/include/envoy/config/grpc_mux.h +++ b/include/envoy/config/grpc_mux.h @@ -82,7 +82,7 @@ class GrpcMux { * Start a configuration subscription asynchronously for some API type and resources. * @param type_url type URL corresponding to xDS API, e.g. * type.googleapis.com/envoy.api.v2.Cluster. - * @param resources vector of resource names to watch for. If this is empty, then all + * @param resources set of resource names to watch for. If this is empty, then all * resources for type_url will result in callbacks. * @param callbacks the callbacks to be notified of configuration updates. These must be valid * until GrpcMuxWatch is destroyed. @@ -90,7 +90,7 @@ class GrpcMux { * away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr. */ virtual GrpcMuxWatchPtr subscribe(const std::string& type_url, - const std::vector& resources, + const std::set& resources, GrpcMuxCallbacks& callbacks) PURE; /** diff --git a/include/envoy/config/subscription.h b/include/envoy/config/subscription.h index d6efd22e27b9a..badd3bad890d3 100644 --- a/include/envoy/config/subscription.h +++ b/include/envoy/config/subscription.h @@ -69,18 +69,18 @@ class Subscription { /** * Start a configuration subscription asynchronously. This should be called once and will continue * to fetch throughout the lifetime of the Subscription object. - * @param resources vector of resource names to fetch. + * @param resources set of resource names to fetch. * @param callbacks the callbacks to be notified of configuration updates. The callback must not * result in the deletion of the Subscription object. */ - virtual void start(const std::vector& resources, - SubscriptionCallbacks& callbacks) PURE; + virtual void start(const std::set& resources, SubscriptionCallbacks& callbacks) PURE; /** * Update the resources to fetch. - * @param resources vector of resource names to fetch. + * @param resources vector of resource names to fetch. It's a (not unordered_)set so that it can + * be passed to std::set_difference, which must be given sorted collections. */ - virtual void updateResources(std::vector resource_names) PURE; + virtual void updateResources(const std::set& update_to_these_names) PURE; }; /** diff --git a/source/common/common/utility.cc b/source/common/common/utility.cc index f173d8333f2bf..fbe37d492ab5d 100644 --- a/source/common/common/utility.cc +++ b/source/common/common/utility.cc @@ -375,6 +375,15 @@ std::string StringUtil::join(const std::vector& source, const std:: return ret.substr(0, ret.length() - delimiter.length()); } +std::string StringUtil::join(const std::set& source, const std::string& delimiter) { + std::ostringstream buf; + std::copy(source.begin(), source.end(), + std::ostream_iterator(buf, delimiter.c_str())); + std::string ret = buf.str(); + // copy will always end with an extra delimiter, we remove it here. + return ret.substr(0, ret.length() - delimiter.length()); +} + std::string StringUtil::subspan(absl::string_view source, size_t start, size_t end) { return std::string(source.data() + start, end - start); } diff --git a/source/common/common/utility.h b/source/common/common/utility.h index 785df6d8aa404..144b5e0d458d5 100644 --- a/source/common/common/utility.h +++ b/source/common/common/utility.h @@ -297,6 +297,14 @@ class StringUtil { */ static std::string join(const std::vector& source, const std::string& delimiter); + /** + * Join elements of a sorted set into a string delimited by delimiter. + * @param source supplies the strings to join. + * @param delimiter supplies the delimiter to join them together. + * @return string combining elements of `source` with `delimiter` in between each element. + */ + static std::string join(const std::set& source, const std::string& delimiter); + /** * Version of substr() that operates on a start and end index instead of a start index and a * length. diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 0acc043be0a96..91a86f63eb12a 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -41,13 +41,10 @@ class DeltaSubscriptionImpl : public Subscription, request_.mutable_node()->MergeFrom(local_info_.node()); } - void updateResources(std::vector update_to_these_names) override { + void updateResources(const std::set& update_to_these_names) override { std::vector cur_added; std::vector cur_removed; - // set_difference expects sorted collections. (resource_names_ is always sorted; it's std::set). - std::sort(update_to_these_names.begin(), update_to_these_names.end()); - std::set_difference(update_to_these_names.begin(), update_to_these_names.end(), resource_names_.begin(), resource_names_.end(), std::inserter(cur_added, cur_added.begin())); @@ -155,7 +152,7 @@ class DeltaSubscriptionImpl : public Subscription, void onWriteable() override { trySendDiscoveryRequestIfPending(); } // Config::Subscription - void start(const std::vector& resources, SubscriptionCallbacks& callbacks) override { + void start(const std::set& resources, SubscriptionCallbacks& callbacks) override { callbacks_ = &callbacks; if (init_fetch_timeout_.count() > 0) { diff --git a/source/common/config/filesystem_subscription_impl.h b/source/common/config/filesystem_subscription_impl.h index b09974623b94c..e5f2dbac266b0 100644 --- a/source/common/config/filesystem_subscription_impl.h +++ b/source/common/config/filesystem_subscription_impl.h @@ -35,7 +35,7 @@ class FilesystemSubscriptionImpl : public Config::Subscription, } // Config::Subscription - void start(const std::vector& resources, + void start(const std::set& resources, Config::SubscriptionCallbacks& callbacks) override { // We report all discovered resources in the watched file. UNREFERENCED_PARAMETER(resources); @@ -45,7 +45,7 @@ class FilesystemSubscriptionImpl : public Config::Subscription, refresh(); } - void updateResources(std::vector resources) override { + void updateResources(const std::set& resources) override { // We report all discovered resources in the watched file. UNREFERENCED_PARAMETER(resources); // Bump stats for consistence behavior with other xDS. diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index c962b0e9dd838..fc654d7e1c1d6 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -67,7 +67,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) { } GrpcMuxWatchPtr GrpcMuxImpl::subscribe(const std::string& type_url, - const std::vector& resources, + const std::set& resources, GrpcMuxCallbacks& callbacks) { auto watch = std::unique_ptr(new GrpcMuxWatchImpl(resources, callbacks, type_url, *this)); diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index 49a961895229d..8702662fc683d 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -31,7 +31,7 @@ class GrpcMuxImpl : public GrpcMux, ~GrpcMuxImpl(); void start() override; - GrpcMuxWatchPtr subscribe(const std::string& type_url, const std::vector& resources, + GrpcMuxWatchPtr subscribe(const std::string& type_url, const std::set& resources, GrpcMuxCallbacks& callbacks) override; void pause(const std::string& type_url) override; void resume(const std::string& type_url) override; @@ -53,7 +53,7 @@ class GrpcMuxImpl : public GrpcMux, void setRetryTimer(); struct GrpcMuxWatchImpl : public GrpcMuxWatch { - GrpcMuxWatchImpl(const std::vector& resources, GrpcMuxCallbacks& callbacks, + GrpcMuxWatchImpl(const std::set& resources, GrpcMuxCallbacks& callbacks, const std::string& type_url, GrpcMuxImpl& parent) : resources_(resources), callbacks_(callbacks), type_url_(type_url), parent_(parent), inserted_(true) { @@ -68,7 +68,7 @@ class GrpcMuxImpl : public GrpcMux, } } } - std::vector resources_; + std::set resources_; GrpcMuxCallbacks& callbacks_; const std::string type_url_; GrpcMuxImpl& parent_; @@ -110,7 +110,7 @@ class GrpcMuxImpl : public GrpcMux, class NullGrpcMuxImpl : public GrpcMux { public: void start() override {} - GrpcMuxWatchPtr subscribe(const std::string&, const std::vector&, + GrpcMuxWatchPtr subscribe(const std::string&, const std::set&, GrpcMuxCallbacks&) override { throw EnvoyException("ADS must be configured to support an ADS config source"); } diff --git a/source/common/config/grpc_mux_subscription_impl.h b/source/common/config/grpc_mux_subscription_impl.h index d4a4197025243..6ccce8f740de0 100644 --- a/source/common/config/grpc_mux_subscription_impl.h +++ b/source/common/config/grpc_mux_subscription_impl.h @@ -27,7 +27,7 @@ class GrpcMuxSubscriptionImpl : public Subscription, init_fetch_timeout_(init_fetch_timeout) {} // Config::Subscription - void start(const std::vector& resources, SubscriptionCallbacks& callbacks) override { + void start(const std::set& resources, SubscriptionCallbacks& callbacks) override { callbacks_ = &callbacks; if (init_fetch_timeout_.count() > 0) { @@ -45,8 +45,12 @@ class GrpcMuxSubscriptionImpl : public Subscription, stats_.update_attempt_.inc(); } - void updateResources(std::vector resources) override { - watch_ = grpc_mux_.subscribe(type_url_, resources, *this); + void updateResources(const std::set& update_to_these_names) override { + // First destroy the watch, so that this subscribe doesn't send a request for both the + // previously watched resources and the new ones (we may have lost interest in some of the + // previously watched ones). + watch_.reset(); + watch_ = grpc_mux_.subscribe(type_url_, update_to_these_names, *this); stats_.update_attempt_.inc(); } diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index c3ca0b9d0c2b9..0c90b8a47f065 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -25,15 +25,15 @@ class GrpcSubscriptionImpl : public Config::Subscription { grpc_mux_subscription_(grpc_mux_, stats, type_url, dispatcher, init_fetch_timeout) {} // Config::Subscription - void start(const std::vector& resources, + void start(const std::set& resources, Config::SubscriptionCallbacks& callbacks) override { // Subscribe first, so we get failure callbacks if grpc_mux_.start() fails. grpc_mux_subscription_.start(resources, callbacks); grpc_mux_.start(); } - void updateResources(std::vector resources) override { - grpc_mux_subscription_.updateResources(resources); + void updateResources(const std::set& update_to_these_names) override { + grpc_mux_subscription_.updateResources(update_to_these_names); } GrpcMuxImpl& grpcMux() { return grpc_mux_; } diff --git a/source/common/config/http_subscription_impl.h b/source/common/config/http_subscription_impl.h index 75954aedbd652..fc5e0e7c6bfa1 100644 --- a/source/common/config/http_subscription_impl.h +++ b/source/common/config/http_subscription_impl.h @@ -48,7 +48,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, } // Config::Subscription - void start(const std::vector& resources, + void start(const std::set& resources, Config::SubscriptionCallbacks& callbacks) override { ASSERT(callbacks_ == nullptr); @@ -67,9 +67,9 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, initialize(); } - void updateResources(std::vector resources) override { - Protobuf::RepeatedPtrField resources_vector(resources.begin(), - resources.end()); + void updateResources(const std::set& update_to_these_names) override { + Protobuf::RepeatedPtrField resources_vector( + update_to_these_names.begin(), update_to_these_names.end()); request_.mutable_resource_names()->Swap(&resources_vector); } diff --git a/test/common/config/delta_subscription_test_harness.h b/test/common/config/delta_subscription_test_harness.h index a8b0d6c9e269b..0a8a4d85166dd 100644 --- a/test/common/config/delta_subscription_test_harness.h +++ b/test/common/config/delta_subscription_test_harness.h @@ -38,7 +38,7 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { rate_limit_settings_, stats_, init_fetch_timeout); } - void startSubscription(const std::vector& cluster_names) override { + void startSubscription(const std::set& cluster_names) override { EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); last_cluster_names_ = cluster_names; expectSendMessage({}, ""); @@ -46,23 +46,23 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { subscription_->start(cluster_names, callbacks_); } - void expectSendMessage(const std::vector& cluster_names, + void expectSendMessage(const std::set& cluster_names, const std::string& version) override { UNREFERENCED_PARAMETER(version); expectSendMessage(cluster_names, {}, Grpc::Status::GrpcStatus::Ok, ""); } - void expectSendMessage(const std::vector& subscribe, - const std::vector& unsubscribe, - const Protobuf::int32 error_code, const std::string& error_message) { + void expectSendMessage(const std::set& subscribe, + const std::set& unsubscribe, const Protobuf::int32 error_code, + const std::string& error_message) { envoy::api::v2::DeltaDiscoveryRequest expected_request; expected_request.mutable_node()->CopyFrom(node_); - for (const auto& resource : subscribe) { - expected_request.add_resource_names_subscribe(resource); - } - for (const auto& resource : unsubscribe) { - expected_request.add_resource_names_unsubscribe(resource); - } + std::copy( + subscribe.begin(), subscribe.end(), + Protobuf::RepeatedFieldBackInserter(expected_request.mutable_resource_names_subscribe())); + std::copy( + unsubscribe.begin(), unsubscribe.end(), + Protobuf::RepeatedFieldBackInserter(expected_request.mutable_resource_names_unsubscribe())); expected_request.set_response_nonce(last_response_nonce_); expected_request.set_type_url(Config::TypeUrl::get().ClusterLoadAssignment); @@ -107,9 +107,9 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { Mock::VerifyAndClearExpectations(&async_stream_); } - void updateResources(const std::vector& cluster_names) override { - std::vector sub; - std::vector unsub; + void updateResources(const std::set& cluster_names) override { + std::set sub; + std::set unsub; std::set_difference(cluster_names.begin(), cluster_names.end(), last_cluster_names_.begin(), last_cluster_names_.end(), std::inserter(sub, sub.begin())); @@ -145,7 +145,7 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { Grpc::MockAsyncStream async_stream_; std::unique_ptr subscription_; std::string last_response_nonce_; - std::vector last_cluster_names_; + std::set last_cluster_names_; Envoy::Config::RateLimitSettings rate_limit_settings_; Event::MockTimer* init_timeout_timer_; envoy::api::v2::core::Node node_; diff --git a/test/common/config/filesystem_subscription_test_harness.h b/test/common/config/filesystem_subscription_test_harness.h index e5a8e85e8f237..912d21739419d 100644 --- a/test/common/config/filesystem_subscription_test_harness.h +++ b/test/common/config/filesystem_subscription_test_harness.h @@ -37,13 +37,13 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness { } } - void startSubscription(const std::vector& cluster_names) override { + void startSubscription(const std::set& cluster_names) override { std::ifstream config_file(path_); file_at_start_ = config_file.good(); subscription_.start(cluster_names, callbacks_); } - void updateResources(const std::vector& cluster_names) override { + void updateResources(const std::set& cluster_names) override { subscription_.updateResources(cluster_names); } @@ -57,7 +57,7 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness { } } - void expectSendMessage(const std::vector& cluster_names, + void expectSendMessage(const std::set& cluster_names, const std::string& version) override { UNREFERENCED_PARAMETER(cluster_names); UNREFERENCED_PARAMETER(version); diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index 8128edb534f88..5c23300dab288 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -50,12 +50,12 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { ~GrpcSubscriptionTestHarness() override { EXPECT_CALL(async_stream_, sendMessage(_, false)); } - void expectSendMessage(const std::vector& cluster_names, + void expectSendMessage(const std::set& cluster_names, const std::string& version) override { expectSendMessage(cluster_names, version, Grpc::Status::GrpcStatus::Ok, ""); } - void expectSendMessage(const std::vector& cluster_names, const std::string& version, + void expectSendMessage(const std::set& cluster_names, const std::string& version, const Protobuf::int32 error_code, const std::string& error_message) { envoy::api::v2::DiscoveryRequest expected_request; expected_request.mutable_node()->CopyFrom(node_); @@ -75,7 +75,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { EXPECT_CALL(async_stream_, sendMessage(ProtoEq(expected_request), false)); } - void startSubscription(const std::vector& cluster_names) override { + void startSubscription(const std::set& cluster_names) override { EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); last_cluster_names_ = cluster_names; expectSendMessage(last_cluster_names_, ""); @@ -113,11 +113,19 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { Mock::VerifyAndClearExpectations(&async_stream_); } - void updateResources(const std::vector& cluster_names) override { - std::vector cluster_superset = cluster_names; - cluster_superset.insert(cluster_superset.end(), last_cluster_names_.begin(), - last_cluster_names_.end()); - expectSendMessage(cluster_superset, version_); + void updateResources(const std::set& cluster_names) override { + // The "watch" mechanism means that updates that lose interest in a resource + // will first generate a request for [still watched resources, i.e. without newly unwatched + // ones] before generating the request for all of cluster_names. + // TODO(fredlas) this unnecessary second request will stop happening once the watch mechanism is + // no longer internally used by GrpcSubscriptionImpl. + std::set both; + for (const auto& n : cluster_names) { + if (last_cluster_names_.find(n) != last_cluster_names_.end()) { + both.insert(n); + } + } + expectSendMessage(both, version_); expectSendMessage(cluster_names, version_); subscription_->updateResources(cluster_names); last_cluster_names_ = cluster_names; @@ -151,7 +159,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { Grpc::MockAsyncStream async_stream_; std::unique_ptr subscription_; std::string last_response_nonce_; - std::vector last_cluster_names_; + std::set last_cluster_names_; NiceMock local_info_; Envoy::Config::RateLimitSettings rate_limit_settings_; Event::MockTimer* init_timeout_timer_; diff --git a/test/common/config/http_subscription_test_harness.h b/test/common/config/http_subscription_test_harness.h index 3eec9b65e749f..4fe82699a049c 100644 --- a/test/common/config/http_subscription_test_harness.h +++ b/test/common/config/http_subscription_test_harness.h @@ -56,7 +56,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { } } - void expectSendMessage(const std::vector& cluster_names, + void expectSendMessage(const std::set& cluster_names, const std::string& version) override { EXPECT_CALL(cm_, httpAsyncClientForCluster("eds_cluster")); EXPECT_CALL(cm_.async_client_, send_(_, _, _)) @@ -88,14 +88,14 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { })); } - void startSubscription(const std::vector& cluster_names) override { + void startSubscription(const std::set& cluster_names) override { version_ = ""; cluster_names_ = cluster_names; expectSendMessage(cluster_names, ""); subscription_->start(cluster_names, callbacks_); } - void updateResources(const std::vector& cluster_names) override { + void updateResources(const std::set& cluster_names) override { cluster_names_ = cluster_names; expectSendMessage(cluster_names, version_); subscription_->updateResources(cluster_names); @@ -154,7 +154,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { bool request_in_progress_{}; std::string version_; - std::vector cluster_names_; + std::set cluster_names_; const Protobuf::MethodDescriptor* method_descriptor_; Upstream::MockClusterManager cm_; Event::MockDispatcher dispatcher_; diff --git a/test/common/config/subscription_impl_test.cc b/test/common/config/subscription_impl_test.cc index 35b1f819acb63..67a9566619c2a 100644 --- a/test/common/config/subscription_impl_test.cc +++ b/test/common/config/subscription_impl_test.cc @@ -39,16 +39,15 @@ class SubscriptionImplTest : public testing::TestWithParam { } } - void startSubscription(const std::vector& cluster_names) { + void startSubscription(const std::set& cluster_names) { test_harness_->startSubscription(cluster_names); } - void updateResources(const std::vector& cluster_names) { + void updateResources(const std::set& cluster_names) { test_harness_->updateResources(cluster_names); } - void expectSendMessage(const std::vector& cluster_names, - const std::string& version) { + void expectSendMessage(const std::set& cluster_names, const std::string& version) { test_harness_->expectSendMessage(cluster_names, version); } diff --git a/test/common/config/subscription_test_harness.h b/test/common/config/subscription_test_harness.h index 5a62f6d861d26..551b4ca254e84 100644 --- a/test/common/config/subscription_test_harness.h +++ b/test/common/config/subscription_test_harness.h @@ -24,20 +24,20 @@ class SubscriptionTestHarness { * Start subscription and set related expectations. * @param cluster_names initial cluster names to request via EDS. */ - virtual void startSubscription(const std::vector& cluster_names) PURE; + virtual void startSubscription(const std::set& cluster_names) PURE; /** * Update cluster names to be delivered via EDS. * @param cluster_names cluster names. */ - virtual void updateResources(const std::vector& cluster_names) PURE; + virtual void updateResources(const std::set& cluster_names) PURE; /** * Expect that an update request is sent by the Subscription implementation. * @param cluster_names cluster names to expect in the request. * @param version version_info to expect in the request. */ - virtual void expectSendMessage(const std::vector& cluster_names, + virtual void expectSendMessage(const std::set& cluster_names, const std::string& version) PURE; /** diff --git a/test/mocks/config/mocks.cc b/test/mocks/config/mocks.cc index 72783296cc9fb..00d70fac699b2 100644 --- a/test/mocks/config/mocks.cc +++ b/test/mocks/config/mocks.cc @@ -17,7 +17,7 @@ MockGrpcStreamCallbacks::MockGrpcStreamCallbacks() {} MockGrpcStreamCallbacks::~MockGrpcStreamCallbacks() {} GrpcMuxWatchPtr MockGrpcMux::subscribe(const std::string& type_url, - const std::vector& resources, + const std::set& resources, GrpcMuxCallbacks& callbacks) { return GrpcMuxWatchPtr(subscribe_(type_url, resources, callbacks)); } diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index de51d6d10c010..7bdf32b42898e 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -41,8 +41,8 @@ template class MockSubscriptionCallbacks : public Subscript class MockSubscription : public Subscription { public: MOCK_METHOD2_T(start, - void(const std::vector& resources, SubscriptionCallbacks& callbacks)); - MOCK_METHOD1_T(updateResources, void(std::vector resources)); + void(const std::set& resources, SubscriptionCallbacks& callbacks)); + MOCK_METHOD1_T(updateResources, void(const std::set& update_to_these_names)); }; class MockGrpcMuxWatch : public GrpcMuxWatch { @@ -60,9 +60,9 @@ class MockGrpcMux : public GrpcMux { MOCK_METHOD0(start, void()); MOCK_METHOD3(subscribe_, - GrpcMuxWatch*(const std::string& type_url, const std::vector& resources, + GrpcMuxWatch*(const std::string& type_url, const std::set& resources, GrpcMuxCallbacks& callbacks)); - GrpcMuxWatchPtr subscribe(const std::string& type_url, const std::vector& resources, + GrpcMuxWatchPtr subscribe(const std::string& type_url, const std::set& resources, GrpcMuxCallbacks& callbacks); MOCK_METHOD1(pause, void(const std::string& type_url)); MOCK_METHOD1(resume, void(const std::string& type_url)); From ea842dd866a491564fa303f9ca30ed880b393c6f Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Tue, 16 Apr 2019 18:07:58 -0400 Subject: [PATCH 10/14] add queuing back Signed-off-by: Fred Douglas --- source/common/common/utility.cc | 9 --- source/common/common/utility.h | 8 --- .../common/config/delta_subscription_impl.h | 61 ++++++++++++------- .../config/delta_subscription_impl_test.cc | 25 ++++++-- .../config/delta_subscription_test_harness.h | 1 - .../config/http_subscription_test_harness.h | 12 +++- 6 files changed, 70 insertions(+), 46 deletions(-) diff --git a/source/common/common/utility.cc b/source/common/common/utility.cc index fbe37d492ab5d..f173d8333f2bf 100644 --- a/source/common/common/utility.cc +++ b/source/common/common/utility.cc @@ -375,15 +375,6 @@ std::string StringUtil::join(const std::vector& source, const std:: return ret.substr(0, ret.length() - delimiter.length()); } -std::string StringUtil::join(const std::set& source, const std::string& delimiter) { - std::ostringstream buf; - std::copy(source.begin(), source.end(), - std::ostream_iterator(buf, delimiter.c_str())); - std::string ret = buf.str(); - // copy will always end with an extra delimiter, we remove it here. - return ret.substr(0, ret.length() - delimiter.length()); -} - std::string StringUtil::subspan(absl::string_view source, size_t start, size_t end) { return std::string(source.data() + start, end - start); } diff --git a/source/common/common/utility.h b/source/common/common/utility.h index 144b5e0d458d5..785df6d8aa404 100644 --- a/source/common/common/utility.h +++ b/source/common/common/utility.h @@ -297,14 +297,6 @@ class StringUtil { */ static std::string join(const std::vector& source, const std::string& delimiter); - /** - * Join elements of a sorted set into a string delimited by delimiter. - * @param source supplies the strings to join. - * @param delimiter supplies the delimiter to join them together. - * @return string combining elements of `source` with `delimiter` in between each element. - */ - static std::string join(const std::set& source, const std::string& delimiter); - /** * Version of substr() that operates on a start and end index instead of a start index and a * length. diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 91a86f63eb12a..9664925d434e2 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "envoy/api/v2/discovery.pb.h" #include "envoy/common/token_bucket.h" #include "envoy/config/subscription.h" @@ -18,6 +20,12 @@ namespace Envoy { namespace Config { +struct UpdateAck { + UpdateAck(absl::string_view nonce) : nonce_(nonce) {} + std::string nonce_; + ::google::rpc::Status error_detail_; +}; + /** * Manages the logic of a (non-aggregated) delta xDS subscription. * TODO(fredlas) add aggregation support. The plan is for that to happen in XdsGrpcContext, @@ -88,7 +96,7 @@ class DeltaSubscriptionImpl : public Subscription, ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url_); ASSERT(paused_); paused_ = false; - trySendDiscoveryRequestIfPending(); + trySendDiscoveryRequests(); } envoy::api::v2::DeltaDiscoveryRequest internalRequestStateForTest() const { return request_; } @@ -129,8 +137,7 @@ class DeltaSubscriptionImpl : public Subscription, message->system_version_info()); disableInitFetchTimeoutTimer(); - request_.set_response_nonce(message->nonce()); - + UpdateAck ack(message->nonce()); try { handleConfigUpdate(message->resources(), message->removed_resources(), message->system_version_info()); @@ -142,14 +149,13 @@ class DeltaSubscriptionImpl : public Subscription, // supposed to be the sum of client- and server- initiated update attempts? Seems weird. stats_.update_attempt_.inc(); callbacks_->onConfigUpdateFailed(&e); - ::google::rpc::Status* error_detail = request_.mutable_error_detail(); - error_detail->set_code(Grpc::Status::GrpcStatus::Internal); - error_detail->set_message(e.what()); + ack.error_detail_.set_code(Grpc::Status::GrpcStatus::Internal); + ack.error_detail_.set_message(e.what()); } - kickOffDiscoveryRequest(); + kickOffDiscoveryRequestWithAck(ack); } - void onWriteable() override { trySendDiscoveryRequestIfPending(); } + void onWriteable() override { trySendDiscoveryRequests(); } // Config::Subscription void start(const std::set& resources, SubscriptionCallbacks& callbacks) override { @@ -168,7 +174,12 @@ class DeltaSubscriptionImpl : public Subscription, } private: - void sendDiscoveryRequest() { + void sendDiscoveryRequest(absl::optional maybe_ack) { + if (maybe_ack.has_value()) { + const UpdateAck& ack = maybe_ack.value(); + request_.set_response_nonce(ack.nonce_); + *request_.mutable_error_detail() = ack.error_detail_; + } request_.clear_resource_names_subscribe(); request_.clear_resource_names_unsubscribe(); std::copy(names_added_.begin(), names_added_.end(), @@ -222,9 +233,11 @@ class DeltaSubscriptionImpl : public Subscription, } } - void kickOffDiscoveryRequest() { - pending_ = true; - trySendDiscoveryRequestIfPending(); + void kickOffDiscoveryRequest() { kickOffDiscoveryRequestWithAck(absl::nullopt); } + + void kickOffDiscoveryRequestWithAck(absl::optional ack) { + ack_queue_.push(ack); + trySendDiscoveryRequests(); } bool shouldSendDiscoveryRequest() { @@ -241,15 +254,16 @@ class DeltaSubscriptionImpl : public Subscription, return true; } - void trySendDiscoveryRequestIfPending() { - if (!pending_) { - return; - } - if (shouldSendDiscoveryRequest()) { - sendDiscoveryRequest(); - pending_ = false; + void trySendDiscoveryRequests() { + while (!ack_queue_.empty()) { + if (shouldSendDiscoveryRequest()) { + sendDiscoveryRequest(ack_queue_.front()); + ack_queue_.pop(); + } else { + break; + } } - grpc_stream_.maybeUpdateQueueSizeStat(pending_ ? 1 : 0); + grpc_stream_.maybeUpdateQueueSizeStat(ack_queue_.size()); } class ResourceVersion { @@ -302,10 +316,13 @@ class DeltaSubscriptionImpl : public Subscription, const std::string type_url_; SubscriptionCallbacks* callbacks_{}; - // In-flight or previously sent request. + // The request being built for the next send. envoy::api::v2::DeltaDiscoveryRequest request_; bool paused_{}; - bool pending_{}; + + // An item in the queue represents a DeltaDiscoveryRequest that must be sent. If an item is not + // empty, it is the ACK (nonce + error_detail) to set. + std::queue> ack_queue_; // Tracking of the delta in our subscription interest since the previous DeltaDiscoveryRequest was // sent. Can't use unordered_set due to ordering issues in gTest expectation matching. Feel free diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index b28c9552ed83d..b8820162638a1 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -1,6 +1,7 @@ #include "test/common/config/delta_subscription_test_harness.h" using testing::AnyNumber; +using testing::InSequence; using testing::UnorderedElementsAre; namespace Envoy { @@ -82,6 +83,11 @@ TEST_F(DeltaSubscriptionImplTest, ResourceGoneLeadsToBlankInitialVersion) { subscription_->updateResources({"name4"}); // (implies "we no longer care about name1,2,3") } +// Delta xDS reliably queues up and sends all discovery requests, even in situations where it isn't +// strictly necessary. E.g.: if you subscribe but then unsubscribe to a given resource, all before a +// request was able to be sent, two requests will be sent. The following tests test various cases of +// this reliability. TODO TODO REMOVE PROBABLY +// // If Envoy decided it wasn't interested in a resource and then (before a request was sent) decided // it was again, for all we know, it dropped that resource in between and needs to retrieve it // again. So, we *should* send a request "re-"subscribing. This means that the server needs to @@ -92,7 +98,9 @@ TEST_F(DeltaSubscriptionImplTest, RemoveThenAdd) { subscription_->pause(); // Pause because we're testing multiple updates in between request sends. subscription_->updateResources({"name1", "name2"}); subscription_->updateResources({"name1", "name2", "name3"}); + InSequence s; expectSendMessage({"name3"}, {}, Grpc::Status::GrpcStatus::Ok, ""); + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the second update subscription_->resume(); } @@ -109,7 +117,9 @@ TEST_F(DeltaSubscriptionImplTest, AddThenRemove) { subscription_->pause(); // Pause because we're testing multiple updates in between request sends. subscription_->updateResources({"name1", "name2", "name3", "name4"}); subscription_->updateResources({"name1", "name2", "name3"}); + InSequence s; expectSendMessage({}, {"name4"}, Grpc::Status::GrpcStatus::Ok, ""); + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the second update subscription_->resume(); } @@ -120,7 +130,10 @@ TEST_F(DeltaSubscriptionImplTest, AddRemoveAdd) { subscription_->updateResources({"name1", "name2", "name3", "name4"}); subscription_->updateResources({"name1", "name2", "name3"}); subscription_->updateResources({"name1", "name2", "name3", "name4"}); + InSequence s; expectSendMessage({"name4"}, {}, Grpc::Status::GrpcStatus::Ok, ""); + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the second update + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the third update subscription_->resume(); } @@ -131,7 +144,10 @@ TEST_F(DeltaSubscriptionImplTest, RemoveAddRemove) { subscription_->updateResources({"name1", "name2"}); subscription_->updateResources({"name1", "name2", "name3"}); subscription_->updateResources({"name1", "name2"}); + InSequence s; expectSendMessage({}, {"name3"}, Grpc::Status::GrpcStatus::Ok, ""); + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the second update + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the third update subscription_->resume(); } @@ -143,20 +159,21 @@ TEST_F(DeltaSubscriptionImplTest, BothAddAndRemove) { subscription_->updateResources({"name4"}); subscription_->updateResources({"name1", "name2", "name3"}); subscription_->updateResources({"name4"}); + InSequence s; expectSendMessage({"name4"}, {"name1", "name2", "name3"}, Grpc::Status::GrpcStatus::Ok, ""); + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the second update + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the third update subscription_->resume(); } -// If one update fails to send a request (pausing, rate limit, no stream are all identical for this -// purpose), and then another update comes along and also fails, when the request is finally sent, -// both should be present. (A previous version of the code would have had 1->12 generate a diff of -// 2, then 12->123 generate a diff of 3, which would replace 2). TEST_F(DeltaSubscriptionImplTest, CumulativeUpdates) { startSubscription({"name1"}); subscription_->pause(); subscription_->updateResources({"name1", "name2"}); subscription_->updateResources({"name1", "name2", "name3"}); + InSequence s; expectSendMessage({"name2", "name3"}, {}, Grpc::Status::GrpcStatus::Ok, ""); + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Ok, ""); // no-op due to the second update subscription_->resume(); } diff --git a/test/common/config/delta_subscription_test_harness.h b/test/common/config/delta_subscription_test_harness.h index 0a8a4d85166dd..64d4b966f1bcc 100644 --- a/test/common/config/delta_subscription_test_harness.h +++ b/test/common/config/delta_subscription_test_harness.h @@ -13,7 +13,6 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -using testing::InSequence; using testing::Mock; using testing::NiceMock; using testing::Return; diff --git a/test/common/config/http_subscription_test_harness.h b/test/common/config/http_subscription_test_harness.h index 4fe82699a049c..cd9fe953a6ceb 100644 --- a/test/common/config/http_subscription_test_harness.h +++ b/test/common/config/http_subscription_test_harness.h @@ -76,8 +76,16 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { } expected_request += "\"node\":{\"id\":\"fo0\"},"; if (!cluster_names.empty()) { - expected_request += - "\"resource_names\":[\"" + StringUtil::join(cluster_names, "\",\"") + "\"]"; + std::string joined_cluster_names; + { + std::string delimiter = "\",\""; + std::ostringstream buf; + std::copy(cluster_names.begin(), cluster_names.end(), + std::ostream_iterator(buf, delimiter.c_str())); + std::string with_comma = buf.str(); + joined_cluster_names = with_comma.substr(0, with_comma.length() - delimiter.length()); + } + expected_request += "\"resource_names\":[\"" + joined_cluster_names + "\"]"; } expected_request += "}"; EXPECT_EQ(expected_request, request->bodyAsString()); From 9702684f3d710e858ce192b1a4b0a92b75293f44 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Wed, 17 Apr 2019 13:56:37 -0400 Subject: [PATCH 11/14] add unwatched to dictionary Signed-off-by: Fred Douglas --- tools/spelling_dictionary.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/spelling_dictionary.txt b/tools/spelling_dictionary.txt index bd4a33889dd65..d2a039fdc8b1c 100644 --- a/tools/spelling_dictionary.txt +++ b/tools/spelling_dictionary.txt @@ -755,6 +755,7 @@ unterminated untruncated untrusted untyped +unwatched unweighted unzigzag upstreams From 83fae554a0aba47bd6a1d97a0fcca5370ca34baf Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Thu, 18 Apr 2019 13:42:08 -0400 Subject: [PATCH 12/14] comments and formatting Signed-off-by: Fred Douglas --- api/XDS_PROTOCOL.md | 4 +++- .../common/config/delta_subscription_impl.h | 19 +++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/api/XDS_PROTOCOL.md b/api/XDS_PROTOCOL.md index 00b8a3adb69a5..c2bd0aa0cbee2 100644 --- a/api/XDS_PROTOCOL.md +++ b/api/XDS_PROTOCOL.md @@ -348,7 +348,9 @@ client spontaneously requests the "wc" resource. ![Incremental session example](diagrams/incremental.svg) On reconnect the Incremental xDS client may tell the server of its known -resources to avoid resending them over the network. Because no state is assumed to be preserved from the previous stream, the reconnecting client must provide the server with all resource names it is interested in. +resources to avoid resending them over the network. Because no state is assumed +to be preserved from the previous stream, the reconnecting client must provide +the server with all resource names it is interested in. ![Incremental reconnect example](diagrams/incremental-reconnect.svg) diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 9664925d434e2..0e69a180f4820 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -174,6 +174,12 @@ class DeltaSubscriptionImpl : public Subscription, } private: + // What's with the optional? DeltaDiscoveryRequest plays two independent roles: + // informing the server of what resources we're interested in, and acknowledging resources it has + // sent us. Some requests are queued up specifically to carry ACKs, and some are queued up for + // resource updates. Susbscription changes might get included in an ACK request. In that case, the + // pending request that the subscription change queued up does still get sent, just empty and + // pointless. (TODO(fredlas) we would like to skip those no-op requests). void sendDiscoveryRequest(absl::optional maybe_ack) { if (maybe_ack.has_value()) { const UpdateAck& ack = maybe_ack.value(); @@ -255,13 +261,9 @@ class DeltaSubscriptionImpl : public Subscription, } void trySendDiscoveryRequests() { - while (!ack_queue_.empty()) { - if (shouldSendDiscoveryRequest()) { - sendDiscoveryRequest(ack_queue_.front()); - ack_queue_.pop(); - } else { - break; - } + while (!ack_queue_.empty() && shouldSendDiscoveryRequest()) { + sendDiscoveryRequest(ack_queue_.front()); + ack_queue_.pop(); } grpc_stream_.maybeUpdateQueueSizeStat(ack_queue_.size()); } @@ -321,7 +323,8 @@ class DeltaSubscriptionImpl : public Subscription, bool paused_{}; // An item in the queue represents a DeltaDiscoveryRequest that must be sent. If an item is not - // empty, it is the ACK (nonce + error_detail) to set. + // empty, it is the ACK (nonce + error_detail) to set on that request. See + // trySendDiscoveryRequests() for more details. std::queue> ack_queue_; // Tracking of the delta in our subscription interest since the previous DeltaDiscoveryRequest was From 5714c48e23704ac98f78c2e9c053df90402154d2 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Thu, 18 Apr 2019 13:44:30 -0400 Subject: [PATCH 13/14] better grouping of functions Signed-off-by: Fred Douglas --- .../common/config/delta_subscription_impl.h | 116 +++++++++--------- 1 file changed, 58 insertions(+), 58 deletions(-) diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 0e69a180f4820..80c132dc6800c 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -49,6 +49,37 @@ class DeltaSubscriptionImpl : public Subscription, request_.mutable_node()->MergeFrom(local_info_.node()); } + void pause() { + ENVOY_LOG(debug, "Pausing discovery requests for {}", type_url_); + ASSERT(!paused_); + paused_ = true; + } + + void resume() { + ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url_); + ASSERT(paused_); + paused_ = false; + trySendDiscoveryRequests(); + } + + envoy::api::v2::DeltaDiscoveryRequest internalRequestStateForTest() const { return request_; } + + // Config::Subscription + void start(const std::set& resources, SubscriptionCallbacks& callbacks) override { + callbacks_ = &callbacks; + + if (init_fetch_timeout_.count() > 0) { + init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { + ENVOY_LOG(warn, "delta config: initial fetch timed out for {}", type_url_); + callbacks_->onConfigUpdateFailed(nullptr); + }); + init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); + } + + grpc_stream_.establishNewStream(); + updateResources(resources); + } + void updateResources(const std::set& update_to_these_names) override { std::vector cur_added; std::vector cur_removed; @@ -86,21 +117,6 @@ class DeltaSubscriptionImpl : public Subscription, } } - void pause() { - ENVOY_LOG(debug, "Pausing discovery requests for {}", type_url_); - ASSERT(!paused_); - paused_ = true; - } - - void resume() { - ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url_); - ASSERT(paused_); - paused_ = false; - trySendDiscoveryRequests(); - } - - envoy::api::v2::DeltaDiscoveryRequest internalRequestStateForTest() const { return request_; } - // Config::GrpcStreamCallbacks void onStreamEstablished() override { request_.Clear(); @@ -157,50 +173,7 @@ class DeltaSubscriptionImpl : public Subscription, void onWriteable() override { trySendDiscoveryRequests(); } - // Config::Subscription - void start(const std::set& resources, SubscriptionCallbacks& callbacks) override { - callbacks_ = &callbacks; - - if (init_fetch_timeout_.count() > 0) { - init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { - ENVOY_LOG(warn, "delta config: initial fetch timed out for {}", type_url_); - callbacks_->onConfigUpdateFailed(nullptr); - }); - init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); - } - - grpc_stream_.establishNewStream(); - updateResources(resources); - } - private: - // What's with the optional? DeltaDiscoveryRequest plays two independent roles: - // informing the server of what resources we're interested in, and acknowledging resources it has - // sent us. Some requests are queued up specifically to carry ACKs, and some are queued up for - // resource updates. Susbscription changes might get included in an ACK request. In that case, the - // pending request that the subscription change queued up does still get sent, just empty and - // pointless. (TODO(fredlas) we would like to skip those no-op requests). - void sendDiscoveryRequest(absl::optional maybe_ack) { - if (maybe_ack.has_value()) { - const UpdateAck& ack = maybe_ack.value(); - request_.set_response_nonce(ack.nonce_); - *request_.mutable_error_detail() = ack.error_detail_; - } - request_.clear_resource_names_subscribe(); - request_.clear_resource_names_unsubscribe(); - std::copy(names_added_.begin(), names_added_.end(), - Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_subscribe())); - std::copy(names_removed_.begin(), names_removed_.end(), - Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_unsubscribe())); - names_added_.clear(); - names_removed_.clear(); - - ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url_, request_.DebugString()); - grpc_stream_.sendMessage(request_); - request_.clear_error_detail(); - request_.clear_initial_resource_versions(); - } - void handleConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, @@ -246,6 +219,33 @@ class DeltaSubscriptionImpl : public Subscription, trySendDiscoveryRequests(); } + // What's with the optional? DeltaDiscoveryRequest plays two independent roles: + // informing the server of what resources we're interested in, and acknowledging resources it has + // sent us. Some requests are queued up specifically to carry ACKs, and some are queued up for + // resource updates. Susbscription changes might get included in an ACK request. In that case, the + // pending request that the subscription change queued up does still get sent, just empty and + // pointless. (TODO(fredlas) we would like to skip those no-op requests). + void sendDiscoveryRequest(absl::optional maybe_ack) { + if (maybe_ack.has_value()) { + const UpdateAck& ack = maybe_ack.value(); + request_.set_response_nonce(ack.nonce_); + *request_.mutable_error_detail() = ack.error_detail_; + } + request_.clear_resource_names_subscribe(); + request_.clear_resource_names_unsubscribe(); + std::copy(names_added_.begin(), names_added_.end(), + Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_subscribe())); + std::copy(names_removed_.begin(), names_removed_.end(), + Protobuf::RepeatedFieldBackInserter(request_.mutable_resource_names_unsubscribe())); + names_added_.clear(); + names_removed_.clear(); + + ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url_, request_.DebugString()); + grpc_stream_.sendMessage(request_); + request_.clear_error_detail(); + request_.clear_initial_resource_versions(); + } + bool shouldSendDiscoveryRequest() { if (paused_) { ENVOY_LOG(trace, "API {} paused; discovery request on hold for now.", type_url_); From 1d1e0241e6d354240159325a413d08c22e8c83a4 Mon Sep 17 00:00:00 2001 From: Fred Douglas Date: Thu, 18 Apr 2019 13:52:40 -0400 Subject: [PATCH 14/14] typo Signed-off-by: Fred Douglas --- source/common/config/delta_subscription_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 80c132dc6800c..b614c7806f409 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -222,7 +222,7 @@ class DeltaSubscriptionImpl : public Subscription, // What's with the optional? DeltaDiscoveryRequest plays two independent roles: // informing the server of what resources we're interested in, and acknowledging resources it has // sent us. Some requests are queued up specifically to carry ACKs, and some are queued up for - // resource updates. Susbscription changes might get included in an ACK request. In that case, the + // resource updates. Subscription changes might get included in an ACK request. In that case, the // pending request that the subscription change queued up does still get sent, just empty and // pointless. (TODO(fredlas) we would like to skip those no-op requests). void sendDiscoveryRequest(absl::optional maybe_ack) {