diff --git a/docs/root/configuration/upstream/cluster_manager/cds.rst b/docs/root/configuration/upstream/cluster_manager/cds.rst index dcea74d79710c..6b22f58d630a3 100644 --- a/docs/root/configuration/upstream/cluster_manager/cds.rst +++ b/docs/root/configuration/upstream/cluster_manager/cds.rst @@ -18,3 +18,40 @@ Statistics ---------- CDS has a :ref:`statistics ` tree rooted at *cluster_manager.cds.* + +On-demand CDS +------------- + +Similar to VHDS on-demand feature in terms of hosts, the on-demand CDS API is an additional API +that Envoy will call to dynamically fetch upstream clusters which Envoy interested in spontaneously. + +By default in CDS, all cluster configurations are sent to every Envoy instance in the mesh. The +delta CDS provides the ability that the xDS management server can send incremental CDS to the Envoy +instance, but Envoy instance can not feedback upstream clusters it interested in spontaneously, in +other words, Envoy instance only can receive the CDS passively. + +In order to fix this issue, on-demand CDS uses the delta xDS protocol to allow a cluster configuration +to be subscribed to and the necessary cluster configuration to be requested as needed. Instead +of sending all cluster configuration or cluster configuration the Envoy instance aren't interested +in, using on-demand CDS will allow an Envoy instance to subscribe and unsubscribe from a list of +cluster configurations stored internally in the xDS management server. The xDS management server +will monitor the list and use it to filter the configuration sent to an individual Envoy instance +to only contain the subscribed cluster configurations. + +Subscribing to resources +^^^^^^^^^^^^^^^^^^^^^^^^ + +On-demand CDS allows resources to be :ref:`subscribed ` to using +a :ref:`DeltaDiscoveryRequest ` +with the :ref:`type_url ` set to +`type.googleapis.com/envoy.api.v2.Cluster` and +:ref:`resource_names_subscribe ` +set to a list of cluster resource names for which it would like configuration. + +Typical use case +^^^^^^^^^^^^^^^^ + +Sometimes, there will be a large amount of broker instances provided for +`Apache RocketMQ `_ to produce/consume messages. Perhaps the size of +SToW CDS configurations will be more than 1GB, so it is not practical to deliver the SToW CDS from the +management server every time, which will cause huge overhead. In this case, on-demand CDS is essential. \ No newline at end of file diff --git a/include/envoy/config/grpc_mux.h b/include/envoy/config/grpc_mux.h index 8d65e28d58faf..a4fb28bfb7e33 100644 --- a/include/envoy/config/grpc_mux.h +++ b/include/envoy/config/grpc_mux.h @@ -125,6 +125,15 @@ class GrpcMux { const std::set& resources, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout) PURE; + + /** + * Adds additional resources to the watch. Unlike addOrUpdateWatch, it never removes resources. + */ + virtual Watch* addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources, + SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout) PURE; + virtual void removeWatch(const std::string& type_url, Watch* watch) PURE; /** diff --git a/include/envoy/config/subscription.h b/include/envoy/config/subscription.h index 4dabe6eb003fb..4efe03c70425c 100644 --- a/include/envoy/config/subscription.h +++ b/include/envoy/config/subscription.h @@ -90,6 +90,12 @@ class Subscription { * be passed to std::set_difference, which must be given sorted collections. */ virtual void updateResourceInterest(const std::set& update_to_these_names) PURE; + + /** + * Add the resources to fetch. + * @param resources vector of resource names to fetch. + */ + virtual void addToResourceInterest(const std::set&){}; }; using SubscriptionPtr = std::unique_ptr; diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 047bf2aafd481..94ec77d81ec08 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -227,6 +227,8 @@ class ClusterManager { * @return Config::SubscriptionFactory& the subscription factory. */ virtual Config::SubscriptionFactory& subscriptionFactory() PURE; + + virtual void addToClusterInterest(const std::set& add_these_names) PURE; }; using ClusterManagerPtr = std::unique_ptr; @@ -253,6 +255,11 @@ class CdsApi { * @return std::string last accepted version from fetch. */ virtual const std::string versionInfo() const PURE; + + /** + * Add watch set of cluster resources interested. + */ + virtual void addToClusterInterest(const std::set& add_these_names) PURE; }; using CdsApiPtr = std::unique_ptr; diff --git a/source/common/config/delta_subscription_impl.cc b/source/common/config/delta_subscription_impl.cc index b788b08887dc8..5320f7a71ba0a 100644 --- a/source/common/config/delta_subscription_impl.cc +++ b/source/common/config/delta_subscription_impl.cc @@ -41,6 +41,11 @@ void DeltaSubscriptionImpl::updateResourceInterest( stats_.update_attempt_.inc(); } +void DeltaSubscriptionImpl::addToResourceInterest(const std::set& add_these_names) { + watch_ = context_->addToWatch(type_url_, watch_, add_these_names, *this, init_fetch_timeout_); + stats_.update_attempt_.inc(); +} + // Config::SubscriptionCallbacks void DeltaSubscriptionImpl::onConfigUpdate( const Protobuf::RepeatedPtrField& resources, diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 1b30df7b5ad00..f4549735d5029 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -45,6 +45,7 @@ class DeltaSubscriptionImpl : public Subscription, public SubscriptionCallbacks void start(const std::set& resource_names) override; void updateResourceInterest(const std::set& update_to_these_names) override; + void addToResourceInterest(const std::set& add_these_names) override; // Config::SubscriptionCallbacks (all pass through to callbacks_!) void onConfigUpdate(const Protobuf::RepeatedPtrField& resources, diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index 0781eb0cf9a13..6932db8f73916 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -50,6 +50,11 @@ class GrpcMuxImpl : public GrpcMux, SubscriptionCallbacks&, std::chrono::milliseconds) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + + Watch* addToWatch(const std::string&, Watch*, const std::set&, + SubscriptionCallbacks&, std::chrono::milliseconds) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } void removeWatch(const std::string&, Watch*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } void handleDiscoveryResponse( @@ -156,6 +161,10 @@ class NullGrpcMuxImpl : public GrpcMux, SubscriptionCallbacks&, std::chrono::milliseconds) override { throw EnvoyException("ADS must be configured to support an ADS config source"); } + Watch* addToWatch(const std::string&, Watch*, const std::set&, + SubscriptionCallbacks&, std::chrono::milliseconds) override { + throw EnvoyException("ADS must be configured to support an ADS config source"); + } void removeWatch(const std::string&, Watch*) override { throw EnvoyException("ADS must be configured to support an ADS config source"); } diff --git a/source/common/config/new_grpc_mux_impl.cc b/source/common/config/new_grpc_mux_impl.cc index 76989a45a19df..a22001874cf04 100644 --- a/source/common/config/new_grpc_mux_impl.cc +++ b/source/common/config/new_grpc_mux_impl.cc @@ -30,11 +30,24 @@ Watch* NewGrpcMuxImpl::addOrUpdateWatch(const std::string& type_url, Watch* watc SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout) { if (watch == nullptr) { - return addWatch(type_url, resources, callbacks, init_fetch_timeout); - } else { - updateWatch(type_url, watch, resources); - return watch; + watch = addWatch(type_url, callbacks, init_fetch_timeout); } + // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed. + updateWatch(type_url, watch, resources); + return watch; +} + +Watch* NewGrpcMuxImpl::addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources, + SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout) { + if (watch == nullptr) { + watch = addWatch(type_url, callbacks, init_fetch_timeout); + } + // addToWatch() queues a discovery request for any of *extra* 'resources' we are not yet + // subscribed. + addToWatch(type_url, watch, resources); + return watch; } void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) { @@ -124,22 +137,32 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::subscribe(const std::string&, const std::set& resources, - SubscriptionCallbacks& callbacks, +Watch* NewGrpcMuxImpl::addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout) { auto entry = subscriptions_.find(type_url); if (entry == subscriptions_.end()) { // We don't yet have a subscription for type_url! Make one! addSubscription(type_url, init_fetch_timeout); - return addWatch(type_url, resources, callbacks, init_fetch_timeout); + return addWatch(type_url, callbacks, init_fetch_timeout); } - Watch* watch = entry->second->watch_map_.addWatch(callbacks); - // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed. - updateWatch(type_url, watch, resources); return watch; } +void NewGrpcMuxImpl::addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources) { + ASSERT(watch != nullptr); + auto sub = subscriptions_.find(type_url); + RELEASE_ASSERT(sub != subscriptions_.end(), + fmt::format("Watch of {} has no subscription to update.", type_url)); + auto added_removed = sub->second->watch_map_.addToWatchInterest(watch, resources); + sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_, added_removed.removed_); + // Tell the server about our change in interest, if any. + if (sub->second->sub_state_.subscriptionUpdatePending()) { + trySendDiscoveryRequests(); + } +} + // Updates the list of resource names watched by the given watch. If an added name is new across // the whole subscription, or if a removed name has no other watch interested in it, then the // subscription will enqueue and attempt to send an appropriate discovery request. diff --git a/source/common/config/new_grpc_mux_impl.h b/source/common/config/new_grpc_mux_impl.h index b600c2e1e2f01..05705b8773cd8 100644 --- a/source/common/config/new_grpc_mux_impl.h +++ b/source/common/config/new_grpc_mux_impl.h @@ -37,6 +37,11 @@ class NewGrpcMuxImpl Watch* addOrUpdateWatch(const std::string& type_url, Watch* watch, const std::set& resources, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout) override; + + Watch* addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources, SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout) override; + void removeWatch(const std::string& type_url, Watch* watch) override; // TODO(fredlas) PR #8478 will remove this. @@ -81,8 +86,11 @@ class NewGrpcMuxImpl } private: - Watch* addWatch(const std::string& type_url, const std::set& resources, - SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout); + Watch* addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks, + std::chrono::milliseconds init_fetch_timeout); + + void addToWatch(const std::string& type_url, Watch* watch, + const std::set& resources); // Updates the list of resource names watched by the given watch. If an added name is new across // the whole subscription, or if a removed name has no other watch interested in it, then the diff --git a/source/common/config/watch_map.cc b/source/common/config/watch_map.cc index 257f42e4ccae2..bd02799b6ee91 100644 --- a/source/common/config/watch_map.cc +++ b/source/common/config/watch_map.cc @@ -42,6 +42,15 @@ AddedRemoved WatchMap::updateWatchInterest(Watch* watch, findRemovals(newly_removed_from_watch, watch)); } +AddedRemoved WatchMap::addToWatchInterest(Watch* watch, + const std::set& add_these_names) { + std::vector newly_added_to_watch(add_these_names.begin(), add_these_names.end()); + watch->resource_names_.insert(add_these_names.begin(), add_these_names.end()); + std::set removals; + + return AddedRemoved(findAdditions(newly_added_to_watch, watch), std::move(removals)); +} + absl::flat_hash_set WatchMap::watchesInterestedIn(const std::string& resource_name) { absl::flat_hash_set ret = wildcard_watches_; const auto watches_interested = watch_interest_.find(resource_name); diff --git a/source/common/config/watch_map.h b/source/common/config/watch_map.h index 36bcf23f88ea1..ae9d4836257fe 100644 --- a/source/common/config/watch_map.h +++ b/source/common/config/watch_map.h @@ -73,6 +73,9 @@ class WatchMap : public SubscriptionCallbacks, public Logger::Loggable& update_to_these_names); + // Adds the extra set of resource names that the given watch should watch. + AddedRemoved addToWatchInterest(Watch* watch, const std::set& add_these_names); + // Expects that the watch to be removed has already had all of its resource names removed via // updateWatchInterest(). void removeWatch(Watch* watch); diff --git a/source/common/upstream/cds_api_impl.h b/source/common/upstream/cds_api_impl.h index 6653fffeb1a4b..eab0bb818679c 100644 --- a/source/common/upstream/cds_api_impl.h +++ b/source/common/upstream/cds_api_impl.h @@ -35,6 +35,10 @@ class CdsApiImpl : public CdsApi, } const std::string versionInfo() const override { return system_version_info_; } + void addToClusterInterest(const std::set& add_these_names) override { + subscription_->addToResourceInterest(add_these_names); + } + private: // Config::SubscriptionCallbacks void onConfigUpdate(const Protobuf::RepeatedPtrField& resources, diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index f2cddab327974..99a942eeac794 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -243,6 +243,10 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable& add_these_names) override { + cds_api_->addToClusterInterest(add_these_names); + } + protected: virtual void postThreadLocalDrainConnections(const Cluster& cluster, const HostVector& hosts_removed); diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index 3da5dcc593ace..34bf5a5f0025d 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -56,6 +56,12 @@ TEST_F(DeltaSubscriptionImplTest, PauseHoldsRequest) { subscription_->resume(); } +TEST_F(DeltaSubscriptionImplTest, AddResourceCauseRequest) { + startSubscription({}); + expectSendMessage({"name1", "name2"}, {}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {}); + subscription_->addToResourceInterest({"name1", "name2"}); +} + TEST_F(DeltaSubscriptionImplTest, ResponseCausesAck) { startSubscription({"name1"}); deliverConfigUpdate({"name1"}, "someversion", true); diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index 882ebafd3d02a..46d4a341b8cc3 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -99,6 +99,9 @@ class MockGrpcMux : public GrpcMux { MOCK_METHOD(Watch*, addOrUpdateWatch, (const std::string& type_url, Watch* watch, const std::set& resources, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout)); + MOCK_METHOD(Watch*, addToWatch, + (const std::string& type_url, Watch* watch, const std::set& resources, + SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout)); MOCK_METHOD(void, removeWatch, (const std::string& type_url, Watch* watch)); }; diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index 9a8ca01b00f8e..704d60a74b71b 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -330,6 +330,7 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD(ClusterUpdateCallbacksHandle*, addThreadLocalClusterUpdateCallbacks_, (ClusterUpdateCallbacks & callbacks)); MOCK_METHOD(Config::SubscriptionFactory&, subscriptionFactory, ()); + MOCK_METHOD(void, addToClusterInterest, (const std::set&)); NiceMock conn_pool_; NiceMock async_client_; @@ -385,6 +386,7 @@ class MockCdsApi : public CdsApi { MOCK_METHOD(void, initialize, ()); MOCK_METHOD(void, setInitializedCb, (std::function callback)); MOCK_METHOD(const std::string, versionInfo, (), (const)); + MOCK_METHOD(void, addToClusterInterest, (const std::set&)); std::function initialized_callback_; };