diff --git a/include/envoy/config/grpc_mux.h b/include/envoy/config/grpc_mux.h index c07b0bab16464..41af0fd2e60eb 100644 --- a/include/envoy/config/grpc_mux.h +++ b/include/envoy/config/grpc_mux.h @@ -73,6 +73,14 @@ class GrpcMux { const std::set& resources, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout) PURE; + /** + * The only difference between addToWatch() and addOrUpdateWatch() is that the 'resources' here + * means the *extra* resources we interested in. + */ + virtual Watch* addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources, + SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout) PURE; /** * Cleanup of a Watch* added by addOrUpdateWatch(). Receiving a Watch* from addOrUpdateWatch() diff --git a/include/envoy/config/subscription.h b/include/envoy/config/subscription.h index 009c4a265dc6c..0f6a14dbf3467 100644 --- a/include/envoy/config/subscription.h +++ b/include/envoy/config/subscription.h @@ -90,6 +90,12 @@ class Subscription { * be passed to std::set_difference, which must be given sorted collections. */ virtual void updateResourceInterest(const std::set& update_to_these_names) PURE; + + /** + * Add the resources to fetch. + * @param resources vector of resource names to fetch. + */ + virtual void addToResourceInterest(const std::set&){}; }; using SubscriptionPtr = std::unique_ptr; diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 8974edf4cd080..9641569b6390c 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -224,6 +224,10 @@ class ClusterManager { * @return Config::SubscriptionFactory& the subscription factory. */ virtual Config::SubscriptionFactory& subscriptionFactory() PURE; + + virtual void updateClusterInterest(const std::set& update_to_these_names) PURE; + + virtual void addToClusterInterest(const std::set& add_these_names) PURE; }; using ClusterManagerPtr = std::unique_ptr; @@ -250,6 +254,16 @@ class CdsApi { * @return std::string last accepted version from fetch. */ virtual const std::string versionInfo() const PURE; + + /** + * Update watch set of cluster resources interested. + */ + virtual void updateClusterInterest(const std::set& update_to_these_names) PURE; + + /** + * Add watch set of cluster resources interested. + */ + virtual void addToClusterInterest(const std::set& add_these_names) PURE; }; using CdsApiPtr = std::unique_ptr; diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 93662e0e21c49..586f832a820bc 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -22,11 +22,24 @@ Watch* GrpcMuxImpl::addOrUpdateWatch(const std::string& type_url, Watch* watch, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout) { if (watch == nullptr) { - return addWatch(type_url, resources, callbacks, init_fetch_timeout); - } else { - updateWatch(type_url, watch, resources); - return watch; + watch = addWatch(type_url, callbacks, init_fetch_timeout); } + // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed. + updateWatch(type_url, watch, resources); + return watch; +} + +Watch* GrpcMuxImpl::addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources, + SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout) { + if (watch == nullptr) { + watch = addWatch(type_url, callbacks, init_fetch_timeout); + } + // addToWatch() queues a discovery request for any of *extra* 'resources' we are not yet + // subscribed. + addToWatch(type_url, watch, resources); + return watch; } void GrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) { @@ -95,8 +108,7 @@ void GrpcMuxImpl::handleStreamEstablishmentFailure() { } while (all_subscribed.size() != subscriptions_.size()); } -Watch* GrpcMuxImpl::addWatch(const std::string& type_url, const std::set& resources, - SubscriptionCallbacks& callbacks, +Watch* GrpcMuxImpl::addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout) { auto watch_map = watch_maps_.find(type_url); if (watch_map == watch_maps_.end()) { @@ -108,11 +120,24 @@ Watch* GrpcMuxImpl::addWatch(const std::string& type_url, const std::setsecond->addWatch(callbacks); - // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed. - updateWatch(type_url, watch, resources); return watch; } +void GrpcMuxImpl::addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources) { + ASSERT(watch != nullptr); + SubscriptionState& sub = subscriptionStateFor(type_url); + WatchMap& watch_map = watchMapFor(type_url); + + auto added_removed = watch_map.addToWatchInterest(watch, resources); + sub.updateSubscriptionInterest(added_removed.added_, added_removed.removed_); + + // Tell the server about our change in interest, if any. + if (sub.subscriptionUpdatePending()) { + trySendDiscoveryRequests(); + } +} + // Updates the list of resource names watched by the given watch. If an added name is new across // the whole subscription, or if a removed name has no other watch interested in it, then the // subscription will enqueue and attempt to send an appropriate discovery request. diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index ff0d5db75e89e..2ef06369ec878 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -29,6 +29,11 @@ class GrpcMuxImpl : public GrpcMux, Logger::Loggable { Watch* addOrUpdateWatch(const std::string& type_url, Watch* watch, const std::set& resources, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout) override; + + Watch* addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources, SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout) override; + void removeWatch(const std::string& type_url, Watch* watch) override; void pause(const std::string& type_url) override; @@ -67,8 +72,11 @@ class GrpcMuxImpl : public GrpcMux, Logger::Loggable { const LocalInfo::LocalInfo& local_info() const { return local_info_; } private: - Watch* addWatch(const std::string& type_url, const std::set& resources, - SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout); + Watch* addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout); + + void addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources); // Updates the list of resource names watched by the given watch. If an added name is new across // the whole subscription, or if a removed name has no other watch interested in it, then the @@ -188,6 +196,10 @@ class NullGrpcMuxImpl : public GrpcMux { SubscriptionCallbacks&, std::chrono::milliseconds) override { throw EnvoyException("ADS must be configured to support an ADS config source"); } + Watch* addToWatch(const std::string&, Watch*, const std::set&, + SubscriptionCallbacks&, std::chrono::milliseconds) override { + throw EnvoyException("ADS must be configured to support an ADS config source"); + } void removeWatch(const std::string&, Watch*) override { throw EnvoyException("ADS must be configured to support an ADS config source"); } diff --git a/source/common/config/grpc_subscription_impl.cc b/source/common/config/grpc_subscription_impl.cc index 77f003b061b57..730b6264977e9 100644 --- a/source/common/config/grpc_subscription_impl.cc +++ b/source/common/config/grpc_subscription_impl.cc @@ -39,6 +39,11 @@ void GrpcSubscriptionImpl::updateResourceInterest( stats_.update_attempt_.inc(); } +void GrpcSubscriptionImpl::addToResourceInterest(const std::set& add_these_names) { + watch_ = grpc_mux_->addToWatch(type_url_, watch_, add_these_names, *this, init_fetch_timeout_); + stats_.update_attempt_.inc(); +} + // Config::SubscriptionCallbacks void GrpcSubscriptionImpl::onConfigUpdate( const Protobuf::RepeatedPtrField& resources, diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index 46968aa3d71b1..e2889e12c0a60 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -41,6 +41,7 @@ class GrpcSubscriptionImpl : public Subscription, public SubscriptionCallbacks { // Config::Subscription void start(const std::set& resource_names) override; void updateResourceInterest(const std::set& update_to_these_names) override; + void addToResourceInterest(const std::set& add_these_names) override; // Config::SubscriptionCallbacks (all pass through to callbacks_!) void onConfigUpdate(const Protobuf::RepeatedPtrField& resources, diff --git a/source/common/config/watch_map.cc b/source/common/config/watch_map.cc index adc99f145f559..6b85d89b7ce4a 100644 --- a/source/common/config/watch_map.cc +++ b/source/common/config/watch_map.cc @@ -40,6 +40,17 @@ AddedRemoved WatchMap::updateWatchInterest(Watch* watch, findRemovals(newly_removed_from_watch, watch)); } +AddedRemoved WatchMap::addToWatchInterest(Watch* watch, + const std::set& add_these_names) { + std::vector newly_added_to_watch(add_these_names.begin(), add_these_names.end()); + std::vector newly_removed_from_watch; + watch->resource_names_.insert(add_these_names.begin(), add_these_names.end()); + std::set additions = add_these_names; + + return AddedRemoved(findAdditions(newly_added_to_watch, watch), + findRemovals(newly_removed_from_watch, watch)); +} + absl::flat_hash_set WatchMap::watchesInterestedIn(const std::string& resource_name) { absl::flat_hash_set ret = wildcard_watches_; const auto watches_interested = watch_interest_.find(resource_name); diff --git a/source/common/config/watch_map.h b/source/common/config/watch_map.h index e98c8af171221..8f8e7644a7786 100644 --- a/source/common/config/watch_map.h +++ b/source/common/config/watch_map.h @@ -70,6 +70,8 @@ class WatchMap : public SubscriptionCallbacks, public Logger::Loggable& update_to_these_names); + // Adds the extra set of resource names that the given watch should watch. + AddedRemoved addToWatchInterest(Watch* watch, const std::set& add_these_names); // Expects that the watch to be removed has already had all of its resource names removed via // updateWatchInterest(). diff --git a/source/common/upstream/cds_api_impl.h b/source/common/upstream/cds_api_impl.h index b17d4bbc9989d..03161eaf6fdb4 100644 --- a/source/common/upstream/cds_api_impl.h +++ b/source/common/upstream/cds_api_impl.h @@ -33,6 +33,14 @@ class CdsApiImpl : public CdsApi, } const std::string versionInfo() const override { return system_version_info_; } + void updateClusterInterest(const std::set& update_to_these_names) override { + subscription_->updateResourceInterest(update_to_these_names); + } + + void addToClusterInterest(const std::set& add_these_names) override { + subscription_->addToResourceInterest(add_these_names); + } + private: // Config::SubscriptionCallbacks void onConfigUpdate(const Protobuf::RepeatedPtrField& resources, diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 4740267a29d39..595ce0286a47a 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -238,6 +238,14 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable& update_to_these_names) override { + cds_api_->updateClusterInterest(update_to_these_names); + } + + void addToClusterInterest(const std::set& add_these_names) override { + cds_api_->addToClusterInterest(add_these_names); + } + protected: virtual void postThreadLocalDrainConnections(const Cluster& cluster, const HostVector& hosts_removed); diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index 65777ec1217f0..6cf12ac30dea1 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -50,6 +50,12 @@ TEST_F(DeltaSubscriptionImplTest, PauseHoldsRequest) { subscription_->resume(); } +TEST_F(DeltaSubscriptionImplTest, AddResourceCauseRequest) { + startSubscription({}); + expectSendMessage({"name1", "name2"}, {}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {}); + subscription_->addToResourceInterest({"name1", "name2"}); +} + TEST_F(DeltaSubscriptionImplTest, ResponseCausesAck) { startSubscription({"name1"}); deliverConfigUpdate({"name1"}, "someversion", true); diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index ee4bae564f761..0af6d52116cd9 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -72,6 +72,10 @@ class MockGrpcMux : public GrpcMux { Watch*(const std::string& type_url, Watch* watch, const std::set& resources, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout)); + MOCK_METHOD5(addToWatch, + Watch*(const std::string& type_url, Watch* watch, + const std::set& resources, SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout)); MOCK_METHOD2(removeWatch, void(const std::string& type_url, Watch* watch)); MOCK_METHOD1(pause, void(const std::string& type_url)); MOCK_METHOD1(resume, void(const std::string& type_url)); diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index 4b7151d295d8e..981d8ec1825e7 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -328,6 +328,8 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD1(addThreadLocalClusterUpdateCallbacks_, ClusterUpdateCallbacksHandle*(ClusterUpdateCallbacks& callbacks)); MOCK_METHOD0(subscriptionFactory, Config::SubscriptionFactory&()); + MOCK_METHOD1(updateClusterInterest, void(const std::set&)); + MOCK_METHOD1(addToClusterInterest, void(const std::set&)); NiceMock conn_pool_; NiceMock async_client_; @@ -382,6 +384,8 @@ class MockCdsApi : public CdsApi { MOCK_METHOD0(initialize, void()); MOCK_METHOD1(setInitializedCb, void(std::function callback)); MOCK_CONST_METHOD0(versionInfo, const std::string()); + MOCK_METHOD1(updateClusterInterest, void(const std::set&)); + MOCK_METHOD1(addToClusterInterest, void(const std::set&)); std::function initialized_callback_; };