diff --git a/include/envoy/config/grpc_mux.h b/include/envoy/config/grpc_mux.h index 0f20aae3cfc5a..6c268d1076b0f 100644 --- a/include/envoy/config/grpc_mux.h +++ b/include/envoy/config/grpc_mux.h @@ -93,13 +93,20 @@ class GrpcMux { * @param callbacks the callbacks to be notified of configuration updates. These must be valid * until GrpcMuxWatch is destroyed. * @param resource_decoder how incoming opaque resource objects are to be decoded. + * @param use_namespace_matching if namespace watch should be created. This is used for creating + * watches on collections of resources; individual members of a collection are identified by the + * namespace in resource name. * @return GrpcMuxWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes * away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr. */ virtual GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set& resources, SubscriptionCallbacks& callbacks, - OpaqueResourceDecoder& resource_decoder) PURE; + OpaqueResourceDecoder& resource_decoder, + const bool use_namespace_matching) PURE; + + virtual void requestOnDemandUpdate(const std::string& type_url, + const std::set& for_update) PURE; }; using GrpcMuxPtr = std::unique_ptr; diff --git a/include/envoy/config/subscription.h b/include/envoy/config/subscription.h index c05a6d567d700..0506ed46dcb7f 100644 --- a/include/envoy/config/subscription.h +++ b/include/envoy/config/subscription.h @@ -146,7 +146,10 @@ class UntypedConfigUpdateCallbacks { * @param removed_resources names of resources that this fetch instructed to be removed. * @param system_version_info aggregate response data "version", for debugging. * @throw EnvoyException with reason if the config changes are rejected. Otherwise the changes - * are accepted. Accepted changes have their version_info reflected in subsequent requests. + * @param use_namespace_matching if the resources should me matched on their namespaces, rather + * than unique names. This is used when a collection of resources (e.g. virtual hosts in VHDS) is + * being updated. Accepted changes have their version_info reflected in subsequent + * requests. */ virtual void onConfigUpdate( const Protobuf::RepeatedPtrField& added_resources, @@ -174,8 +177,11 @@ class Subscription { * Start a configuration subscription asynchronously. This should be called once and will continue * to fetch throughout the lifetime of the Subscription object. * @param resources set of resource names to fetch. + * @param use_namespace_matching if the subscription is for a collection of resources. In such a + * case a namespace watch will be created. */ - virtual void start(const std::set& resource_names) PURE; + virtual void start(const std::set& resource_names, + const bool use_namespace_matching = false) PURE; /** * Update the resources to fetch. @@ -183,6 +189,12 @@ class Subscription { * be passed to std::set_difference, which must be given sorted collections. */ virtual void updateResourceInterest(const std::set& update_to_these_names) PURE; + + /** + * Creates a discovery request for resources. + * @param add_these_names resource ids for inclusion in the discovery request. + */ + virtual void requestOnDemandUpdate(const std::set& add_these_names) PURE; }; using SubscriptionPtr = std::unique_ptr; diff --git a/source/common/config/filesystem_subscription_impl.cc b/source/common/config/filesystem_subscription_impl.cc index 1373dc34c92e4..79d505d763611 100644 --- a/source/common/config/filesystem_subscription_impl.cc +++ b/source/common/config/filesystem_subscription_impl.cc @@ -27,7 +27,7 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl( } // Config::Subscription -void FilesystemSubscriptionImpl::start(const std::set&) { +void FilesystemSubscriptionImpl::start(const std::set&, const bool) { started_ = true; // Attempt to read in case there is a file there already. refresh(); diff --git a/source/common/config/filesystem_subscription_impl.h b/source/common/config/filesystem_subscription_impl.h index 75dd5f25b1e47..eb23ffdc93306 100644 --- a/source/common/config/filesystem_subscription_impl.h +++ b/source/common/config/filesystem_subscription_impl.h @@ -27,8 +27,11 @@ class FilesystemSubscriptionImpl : public Config::Subscription, // Config::Subscription // We report all discovered resources in the watched file, so the resource names arguments are // unused, and updateResourceInterest is a no-op (other than updating a stat). - void start(const std::set&) override; + void start(const std::set&, const bool use_namespace_matching = false) override; void updateResourceInterest(const std::set&) override; + void requestOnDemandUpdate(const std::set&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } private: void refresh(); diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 907bf9148adf2..2be7cd4fddd72 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -62,7 +62,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) { GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url, const std::set& resources, SubscriptionCallbacks& callbacks, - OpaqueResourceDecoder& resource_decoder) { + OpaqueResourceDecoder& resource_decoder, const bool) { auto watch = std::make_unique(resources, callbacks, resource_decoder, type_url, *this); ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url); diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index 232559ad21674..06acf1a78ef70 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -47,7 +47,12 @@ class GrpcMuxImpl : public GrpcMux, GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set& resources, SubscriptionCallbacks& callbacks, - OpaqueResourceDecoder& resource_decoder) override; + OpaqueResourceDecoder& resource_decoder, + const bool use_namespace_matching = false) override; + + void requestOnDemandUpdate(const std::string&, const std::set&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } void handleDiscoveryResponse( std::unique_ptr&& message); @@ -159,10 +164,14 @@ class NullGrpcMuxImpl : public GrpcMux, } GrpcMuxWatchPtr addWatch(const std::string&, const std::set&, SubscriptionCallbacks&, - OpaqueResourceDecoder&) override { + OpaqueResourceDecoder&, const bool) override { ExceptionUtil::throwEnvoyException("ADS must be configured to support an ADS config source"); } + void requestOnDemandUpdate(const std::string&, const std::set&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } + void onWriteable() override {} void onStreamEstablished() override {} void onEstablishmentFailure() override {} diff --git a/source/common/config/grpc_subscription_impl.cc b/source/common/config/grpc_subscription_impl.cc index ef8037f250064..83c21e2208dc4 100644 --- a/source/common/config/grpc_subscription_impl.cc +++ b/source/common/config/grpc_subscription_impl.cc @@ -19,7 +19,8 @@ GrpcSubscriptionImpl::GrpcSubscriptionImpl( init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated) {} // Config::Subscription -void GrpcSubscriptionImpl::start(const std::set& resources) { +void GrpcSubscriptionImpl::start(const std::set& resources, + const bool use_namespace_matching) { if (init_fetch_timeout_.count() > 0) { init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, @@ -28,7 +29,8 @@ void GrpcSubscriptionImpl::start(const std::set& resources) { init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); } - watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_); + watch_ = + grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, use_namespace_matching); // The attempt stat here is maintained for the purposes of having consistency between ADS and // gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an @@ -48,6 +50,11 @@ void GrpcSubscriptionImpl::updateResourceInterest( stats_.update_attempt_.inc(); } +void GrpcSubscriptionImpl::requestOnDemandUpdate(const std::set& for_update) { + grpc_mux_->requestOnDemandUpdate(type_url_, for_update); + stats_.update_attempt_.inc(); +} + // Config::SubscriptionCallbacks void GrpcSubscriptionImpl::onConfigUpdate(const std::vector& resources, const std::string& version_info) { diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index a5102055a08ce..a7fa247eeebf6 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -24,9 +24,10 @@ class GrpcSubscriptionImpl : public Subscription, std::chrono::milliseconds init_fetch_timeout, bool is_aggregated); // Config::Subscription - void start(const std::set& resource_names) override; + void start(const std::set& resource_names, + const bool use_namespace_matching = false) override; void updateResourceInterest(const std::set& update_to_these_names) override; - + void requestOnDemandUpdate(const std::set& add_these_names) override; // Config::SubscriptionCallbacks (all pass through to callbacks_!) void onConfigUpdate(const std::vector& resources, const std::string& version_info) override; diff --git a/source/common/config/http_subscription_impl.cc b/source/common/config/http_subscription_impl.cc index 8c0d55d5e7494..9c4616766fede 100644 --- a/source/common/config/http_subscription_impl.cc +++ b/source/common/config/http_subscription_impl.cc @@ -43,7 +43,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl( } // Config::Subscription -void HttpSubscriptionImpl::start(const std::set& resource_names) { +void HttpSubscriptionImpl::start(const std::set& resource_names, const bool) { if (init_fetch_timeout_.count() > 0) { init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr); diff --git a/source/common/config/http_subscription_impl.h b/source/common/config/http_subscription_impl.h index ec3d2e6ad0de3..73bdae5094935 100644 --- a/source/common/config/http_subscription_impl.h +++ b/source/common/config/http_subscription_impl.h @@ -34,8 +34,12 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, ProtobufMessage::ValidationVisitor& validation_visitor); // Config::Subscription - void start(const std::set& resource_names) override; + void start(const std::set& resource_names, + const bool use_namespace_matching = false) override; void updateResourceInterest(const std::set& update_to_these_names) override; + void requestOnDemandUpdate(const std::set&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } // Http::RestApiFetcher void createRequest(Http::RequestMessage& request) override; diff --git a/source/common/config/new_grpc_mux_impl.cc b/source/common/config/new_grpc_mux_impl.cc index 131ccd24db51b..4b72f94fb8f11 100644 --- a/source/common/config/new_grpc_mux_impl.cc +++ b/source/common/config/new_grpc_mux_impl.cc @@ -58,16 +58,6 @@ void NewGrpcMuxImpl::onDiscoveryResponse( return; } - // When an on-demand request is made a Watch is created using an alias, as the resource name isn't - // known at that point. When an update containing aliases comes back, we update Watches with - // resource names. - for (const auto& r : message->resources()) { - if (r.aliases_size() > 0) { - AddedRemoved converted = sub->second->watch_map_.convertAliasWatchesToNameWatches(r); - sub->second->sub_state_.updateSubscriptionInterest(converted.added_, converted.removed_); - } - } - kickOffAck(sub->second->sub_state_.handleResponse(*message)); Memory::Utils::tryShrinkHeap(); } @@ -112,17 +102,18 @@ void NewGrpcMuxImpl::start() { grpc_stream_.establishNewStream(); } GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url, const std::set& resources, SubscriptionCallbacks& callbacks, - OpaqueResourceDecoder& resource_decoder) { + OpaqueResourceDecoder& resource_decoder, + const bool use_namespace_matching) { auto entry = subscriptions_.find(type_url); if (entry == subscriptions_.end()) { // We don't yet have a subscription for type_url! Make one! - addSubscription(type_url); - return addWatch(type_url, resources, callbacks, resource_decoder); + addSubscription(type_url, use_namespace_matching); + return addWatch(type_url, resources, callbacks, resource_decoder, use_namespace_matching); } Watch* watch = entry->second->watch_map_.addWatch(callbacks, resource_decoder); // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed. - updateWatch(type_url, watch, resources); + updateWatch(type_url, watch, resources, use_namespace_matching); return std::make_unique(type_url, watch, *this); } @@ -130,13 +121,32 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url, // the whole subscription, or if a removed name has no other watch interested in it, then the // subscription will enqueue and attempt to send an appropriate discovery request. void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch, - const std::set& resources) { + const std::set& resources, + bool creating_namespace_watch) { ASSERT(watch != nullptr); auto sub = subscriptions_.find(type_url); RELEASE_ASSERT(sub != subscriptions_.end(), fmt::format("Watch of {} has no subscription to update.", type_url)); auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, resources); - sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_, added_removed.removed_); + if (creating_namespace_watch) { + // This is to prevent sending out of requests that contain prefixes instead of resource names + sub->second->sub_state_.updateSubscriptionInterest({}, {}); + } else { + sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_, + added_removed.removed_); + } + // Tell the server about our change in interest, if any. + if (sub->second->sub_state_.subscriptionUpdatePending()) { + trySendDiscoveryRequests(); + } +} + +void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url, + const std::set& for_update) { + auto sub = subscriptions_.find(type_url); + RELEASE_ASSERT(sub != subscriptions_.end(), + fmt::format("Watch of {} has no subscription to update.", type_url)); + sub->second->sub_state_.updateSubscriptionInterest(for_update, {}); // Tell the server about our change in interest, if any. if (sub->second->sub_state_.subscriptionUpdatePending()) { trySendDiscoveryRequests(); @@ -151,8 +161,10 @@ void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) { entry->second->watch_map_.removeWatch(watch); } -void NewGrpcMuxImpl::addSubscription(const std::string& type_url) { - subscriptions_.emplace(type_url, std::make_unique(type_url, local_info_)); +void NewGrpcMuxImpl::addSubscription(const std::string& type_url, + const bool use_namespace_matching) { + subscriptions_.emplace( + type_url, std::make_unique(type_url, local_info_, use_namespace_matching)); subscription_ordering_.emplace_back(type_url); } diff --git a/source/common/config/new_grpc_mux_impl.h b/source/common/config/new_grpc_mux_impl.h index 431106a4dd399..4f549556558f8 100644 --- a/source/common/config/new_grpc_mux_impl.h +++ b/source/common/config/new_grpc_mux_impl.h @@ -39,7 +39,11 @@ class NewGrpcMuxImpl GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set& resources, SubscriptionCallbacks& callbacks, - OpaqueResourceDecoder& resource_decoder) override; + OpaqueResourceDecoder& resource_decoder, + const bool use_namespace_matching = false) override; + + void requestOnDemandUpdate(const std::string& type_url, + const std::set& for_update) override; ScopedResume pause(const std::string& type_url) override; ScopedResume pause(const std::vector type_urls) override; @@ -60,8 +64,9 @@ class NewGrpcMuxImpl void start() override; struct SubscriptionStuff { - SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info) - : sub_state_(type_url, watch_map_, local_info) {} + SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info, + const bool use_namespace_matching) + : watch_map_(use_namespace_matching), sub_state_(type_url, watch_map_, local_info) {} WatchMap watch_map_; DeltaSubscriptionState sub_state_; @@ -108,9 +113,10 @@ class NewGrpcMuxImpl // the whole subscription, or if a removed name has no other watch interested in it, then the // subscription will enqueue and attempt to send an appropriate discovery request. void updateWatch(const std::string& type_url, Watch* watch, - const std::set& resources); + const std::set& resources, + const bool creating_namespace_watch = false); - void addSubscription(const std::string& type_url); + void addSubscription(const std::string& type_url, const bool use_namespace_matching); void trySendDiscoveryRequests(); diff --git a/source/common/config/watch_map.cc b/source/common/config/watch_map.cc index 51e73e06344d9..a70fb2c44c084 100644 --- a/source/common/config/watch_map.cc +++ b/source/common/config/watch_map.cc @@ -8,6 +8,15 @@ namespace Envoy { namespace Config { +namespace { +// Returns the namespace part (if there's any) in the resource name. +std::string namespaceFromName(const std::string& resource_name) { + const auto pos = resource_name.find_last_of('/'); + // we are not interested in the "/" character in the namespace + return pos == std::string::npos ? "" : resource_name.substr(0, pos); +} +} // namespace + Watch* WatchMap::addWatch(SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder) { auto watch = std::make_unique(callbacks, resource_decoder); @@ -59,8 +68,14 @@ AddedRemoved WatchMap::updateWatchInterest(Watch* watch, } absl::flat_hash_set WatchMap::watchesInterestedIn(const std::string& resource_name) { - absl::flat_hash_set ret = wildcard_watches_; - const auto watches_interested = watch_interest_.find(resource_name); + absl::flat_hash_set ret; + if (!use_namespace_matching_) { + ret = wildcard_watches_; + } + + const auto prefix = namespaceFromName(resource_name); + const auto resource_key = use_namespace_matching_ && !prefix.empty() ? prefix : resource_name; + const auto watches_interested = watch_interest_.find(resource_key); if (watches_interested != watch_interest_.end()) { for (const auto& watch : watches_interested->second) { ret.insert(watch); @@ -120,32 +135,6 @@ void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField } } -// For responses to on-demand requests, replace the original watch for an alias -// with one for the resource's name -AddedRemoved WatchMap::convertAliasWatchesToNameWatches( - const envoy::service::discovery::v3::Resource& resource) { - absl::flat_hash_set watches_to_update; - for (const auto& alias : resource.aliases()) { - const auto interested_watches = watch_interest_.find(alias); - if (interested_watches != watch_interest_.end()) { - for (const auto& interested_watch : interested_watches->second) { - watches_to_update.insert(interested_watch); - } - } - } - - auto ret = AddedRemoved({}, {}); - for (const auto& watch : watches_to_update) { - const auto& converted_watches = updateWatchInterest(watch, {resource.name()}); - std::copy(converted_watches.added_.begin(), converted_watches.added_.end(), - std::inserter(ret.added_, ret.added_.end())); - std::copy(converted_watches.removed_.begin(), converted_watches.removed_.end(), - std::inserter(ret.removed_, ret.removed_.end())); - } - - return ret; -} - void WatchMap::onConfigUpdate( const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, diff --git a/source/common/config/watch_map.h b/source/common/config/watch_map.h index f1f7d09294ed7..d0d9e822b28eb 100644 --- a/source/common/config/watch_map.h +++ b/source/common/config/watch_map.h @@ -60,7 +60,7 @@ struct Watch { // A WatchMap is assumed to be dedicated to a single type_url type of resource (EDS, CDS, etc). class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable { public: - WatchMap() = default; + WatchMap(const bool use_namespace_matching) : use_namespace_matching_(use_namespace_matching) {} // Adds 'callbacks' to the WatchMap, with every possible resource being watched. // (Use updateWatchInterest() to narrow it down to some specific names). @@ -79,10 +79,6 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable& resources, const std::string& version_info) override; @@ -125,6 +121,8 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable the resource can be removed. // 2) Enables efficient lookup of all interested watches when a resource has been updated. absl::flat_hash_map> watch_interest_; + + const bool use_namespace_matching_; }; } // namespace Config diff --git a/source/common/router/vhds.cc b/source/common/router/vhds.cc index 31d5b9d27d251..6038b42684b9c 100644 --- a/source/common/router/vhds.cc +++ b/source/common/router/vhds.cc @@ -32,8 +32,9 @@ VhdsSubscription::VhdsSubscription( scope_(factory_context.scope().createScope(stat_prefix + "vhds." + config_update_info_->routeConfigName() + ".")), stats_({ALL_VHDS_STATS(POOL_COUNTER(*scope_))}), - init_target_(fmt::format("VhdsConfigSubscription {}", config_update_info_->routeConfigName()), - [this]() { subscription_->start({}); }), + init_target_( + fmt::format("VhdsConfigSubscription {}", config_update_info_->routeConfigName()), + [this]() { subscription_->start({config_update_info_->routeConfigName()}, true); }), route_config_providers_(route_config_providers) { const auto& config_source = config_update_info_->routeConfiguration() .vhds() @@ -51,7 +52,7 @@ VhdsSubscription::VhdsSubscription( } void VhdsSubscription::updateOnDemand(const std::string& with_route_config_name_prefix) { - subscription_->updateResourceInterest({with_route_config_name_prefix}); + subscription_->requestOnDemandUpdate({with_route_config_name_prefix}); } void VhdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, diff --git a/test/common/config/new_grpc_mux_impl_test.cc b/test/common/config/new_grpc_mux_impl_test.cc index a35a38d577256..3357e04b8a992 100644 --- a/test/common/config/new_grpc_mux_impl_test.cc +++ b/test/common/config/new_grpc_mux_impl_test.cc @@ -118,7 +118,7 @@ TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithAliases) { setup(); const std::string& type_url = Config::TypeUrl::get().VirtualHost; - auto watch = grpc_mux_->addWatch(type_url, {"domain1.test"}, callbacks_, resource_decoder_); + auto watch = grpc_mux_->addWatch(type_url, {"prefix"}, callbacks_, resource_decoder_, true); EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); grpc_mux_->start(); @@ -133,9 +133,9 @@ TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithAliases) { vhost.add_domains("domain2.test"); response->add_resources()->mutable_resource()->PackFrom(vhost); - response->mutable_resources()->at(0).set_name("vhost_1"); - response->mutable_resources()->at(0).add_aliases("domain1.test"); - response->mutable_resources()->at(0).add_aliases("domain2.test"); + response->mutable_resources()->at(0).set_name("prefix/vhost_1"); + response->mutable_resources()->at(0).add_aliases("prefix/domain1.test"); + response->mutable_resources()->at(0).add_aliases("prefix/domain2.test"); grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); @@ -153,7 +153,7 @@ TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithNotFoundResponse) { setup(); const std::string& type_url = Config::TypeUrl::get().VirtualHost; - auto watch = grpc_mux_->addWatch(type_url, {"domain1.test"}, callbacks_, resource_decoder_); + auto watch = grpc_mux_->addWatch(type_url, {"prefix"}, callbacks_, resource_decoder_, true); EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); grpc_mux_->start(); @@ -164,7 +164,7 @@ TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithNotFoundResponse) { response->add_resources(); response->mutable_resources()->at(0).set_name("not-found"); - response->mutable_resources()->at(0).add_aliases("domain1.test"); + response->mutable_resources()->at(0).add_aliases("prefix/domain1.test"); grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); diff --git a/test/common/config/watch_map_test.cc b/test/common/config/watch_map_test.cc index 6749cb901a8ae..ff26dee1e0d39 100644 --- a/test/common/config/watch_map_test.cc +++ b/test/common/config/watch_map_test.cc @@ -24,24 +24,10 @@ namespace Envoy { namespace Config { namespace { -// expectDeltaAndSotwUpdate() EXPECTs two birds with one function call: we want to cover both SotW -// and delta, which, while mechanically different, can behave identically for our testing purposes. -// Specifically, as a simplification for these tests, every still-present resource is updated in -// every update. Therefore, a resource can never show up in the SotW update but not the delta -// update. We can therefore use the same expected_resources for both. -void expectDeltaAndSotwUpdate( +void expectDeltaUpdate( MockSubscriptionCallbacks& callbacks, const std::vector& expected_resources, const std::vector& expected_removals, const std::string& version) { - EXPECT_CALL(callbacks, onConfigUpdate(_, version)) - .WillOnce(Invoke([expected_resources](const std::vector& gotten_resources, - const std::string&) { - EXPECT_EQ(expected_resources.size(), gotten_resources.size()); - for (size_t i = 0; i < expected_resources.size(); i++) { - EXPECT_TRUE( - TestUtility::protoEqual(gotten_resources[i].get().resource(), expected_resources[i])); - } - })); EXPECT_CALL(callbacks, onConfigUpdate(_, _, _)) .WillOnce(Invoke([expected_resources, expected_removals, version](const std::vector& gotten_resources, @@ -60,6 +46,27 @@ void expectDeltaAndSotwUpdate( })); } +// expectDeltaAndSotwUpdate() EXPECTs two birds with one function call: we want to cover both SotW +// and delta, which, while mechanically different, can behave identically for our testing purposes. +// Specifically, as a simplification for these tests, every still-present resource is updated in +// every update. Therefore, a resource can never show up in the SotW update but not the delta +// update. We can therefore use the same expected_resources for both. +void expectDeltaAndSotwUpdate( + MockSubscriptionCallbacks& callbacks, + const std::vector& expected_resources, + const std::vector& expected_removals, const std::string& version) { + EXPECT_CALL(callbacks, onConfigUpdate(_, version)) + .WillOnce(Invoke([expected_resources](const std::vector& gotten_resources, + const std::string&) { + EXPECT_EQ(expected_resources.size(), gotten_resources.size()); + for (size_t i = 0; i < expected_resources.size(); i++) { + EXPECT_TRUE( + TestUtility::protoEqual(gotten_resources[i].get().resource(), expected_resources[i])); + } + })); + expectDeltaUpdate(callbacks, expected_resources, expected_removals, version); +} + void expectNoUpdate(MockSubscriptionCallbacks& callbacks, const std::string& version) { EXPECT_CALL(callbacks, onConfigUpdate(_, version)).Times(0); EXPECT_CALL(callbacks, onConfigUpdate(_, _, version)).Times(0); @@ -88,13 +95,9 @@ wrapInResource(const Protobuf::RepeatedPtrField& anys, return ret; } -// Similar to expectDeltaAndSotwUpdate(), but making the onConfigUpdate() happen, rather than -// EXPECT-ing it. -void doDeltaAndSotwUpdate(WatchMap& watch_map, - const Protobuf::RepeatedPtrField& sotw_resources, - const std::vector& removed_names, - const std::string& version) { - watch_map.onConfigUpdate(sotw_resources, version); +void doDeltaUpdate(WatchMap& watch_map, + const Protobuf::RepeatedPtrField& sotw_resources, + const std::vector& removed_names, const std::string& version) { Protobuf::RepeatedPtrField delta_resources = wrapInResource(sotw_resources, version); @@ -105,6 +108,16 @@ void doDeltaAndSotwUpdate(WatchMap& watch_map, watch_map.onConfigUpdate(delta_resources, removed_names_proto, version); } +// Similar to expectDeltaAndSotwUpdate(), but making the onConfigUpdate() happen, rather than +// EXPECT-ing it. +void doDeltaAndSotwUpdate(WatchMap& watch_map, + const Protobuf::RepeatedPtrField& sotw_resources, + const std::vector& removed_names, + const std::string& version) { + watch_map.onConfigUpdate(sotw_resources, version); + doDeltaUpdate(watch_map, sotw_resources, removed_names, version); +} + // Tests the simple case of a single watch. Checks that the watch will not be told of updates to // resources it doesn't care about. Checks that the watch can later decide it does care about them, // and then receive subsequent updates to them. @@ -112,7 +125,7 @@ TEST(WatchMapTest, Basic) { MockSubscriptionCallbacks callbacks; TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); - WatchMap watch_map; + WatchMap watch_map(false); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); { @@ -179,7 +192,7 @@ TEST(WatchMapTest, Overlap) { MockSubscriptionCallbacks callbacks2; TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); - WatchMap watch_map; + WatchMap watch_map(false); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -241,6 +254,8 @@ TEST(WatchMapTest, Overlap) { // WatchMap defers deletes and doesn't crash. class SameWatchRemoval : public testing::Test { public: + SameWatchRemoval() : watch_map_(false) {} + void SetUp() override { envoy::config::endpoint::v3::ClusterLoadAssignment alice; alice.set_cluster_name("alice"); @@ -316,7 +331,7 @@ TEST(WatchMapTest, AddRemoveAdd) { MockSubscriptionCallbacks callbacks2; TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); - WatchMap watch_map; + WatchMap watch_map(false); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -371,7 +386,7 @@ TEST(WatchMapTest, UninterestingUpdate) { MockSubscriptionCallbacks callbacks; TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); - WatchMap watch_map; + WatchMap watch_map(false); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"alice"}); @@ -415,7 +430,7 @@ TEST(WatchMapTest, WatchingEverything) { MockSubscriptionCallbacks callbacks2; TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); - WatchMap watch_map; + WatchMap watch_map(false); /*Watch* watch1 = */ watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); // watch1 never specifies any names, and so is treated as interested in everything. @@ -451,7 +466,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { MockSubscriptionCallbacks callbacks3; TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); - WatchMap watch_map; + WatchMap watch_map(false); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); @@ -484,7 +499,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { } TEST(WatchMapTest, OnConfigUpdateFailed) { - WatchMap watch_map; + WatchMap watch_map(false); // calling on empty map doesn't break watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); @@ -500,49 +515,63 @@ TEST(WatchMapTest, OnConfigUpdateFailed) { watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); } -// verifies that a watch is updated with the resource name -TEST(WatchMapTest, ConvertAliasWatchesToNameWatches) { - MockSubscriptionCallbacks callbacks; +TEST(WatchMapTest, OnConfigUpdateUsingNamespaces) { + MockSubscriptionCallbacks callbacks1; + MockSubscriptionCallbacks callbacks2; + MockSubscriptionCallbacks callbacks3; TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); - WatchMap watch_map; - Watch* watch = watch_map.addWatch(callbacks, resource_decoder); - watch_map.updateWatchInterest(watch, {"alias"}); + WatchMap watch_map(true); + Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); + Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); + Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); + watch_map.updateWatchInterest(watch1, {"ns1"}); + watch_map.updateWatchInterest(watch2, {"ns1", "ns2"}); + watch_map.updateWatchInterest(watch3, {"ns3"}); - envoy::service::discovery::v3::Resource resource; - resource.set_name("resource"); - resource.set_version("version"); - for (const auto alias : {"alias", "alias1", "alias2"}) { - resource.add_aliases(alias); + // verify update + { + Protobuf::RepeatedPtrField update; + envoy::config::endpoint::v3::ClusterLoadAssignment resource; + resource.set_cluster_name("ns1/resource1"); + update.Add()->PackFrom(resource); + expectDeltaUpdate(callbacks1, {resource}, {}, "version0"); + expectDeltaUpdate(callbacks2, {resource}, {}, "version0"); + doDeltaUpdate(watch_map, update, {}, "version0"); } - - AddedRemoved converted = watch_map.convertAliasWatchesToNameWatches(resource); - - EXPECT_EQ(std::set{"resource"}, converted.added_); - EXPECT_EQ(std::set{"alias"}, converted.removed_); -} - -// verifies that if a resource contains an alias the same as its name, and the watch has been set -// with that alias, the watch won't be updated -TEST(WatchMapTest, ConvertAliasWatchesToNameWatchesAliasIsSameAsName) { - MockSubscriptionCallbacks callbacks; - TestUtility::TestOpaqueResourceDecoderImpl - resource_decoder("cluster_name"); - WatchMap watch_map; - Watch* watch = watch_map.addWatch(callbacks, resource_decoder); - watch_map.updateWatchInterest(watch, {"name-and-alias"}); - - envoy::service::discovery::v3::Resource resource; - resource.set_name("name-and-alias"); - resource.set_version("version"); - for (const auto alias : {"name-and-alias", "alias1", "alias2"}) { - resource.add_aliases(alias); + // verify removal + { + Protobuf::RepeatedPtrField update; + expectDeltaUpdate(callbacks2, {}, {"ns2/removed"}, "version1"); + doDeltaUpdate(watch_map, update, {"ns2/removed"}, "version1"); } + // verify a not-found response to an on-demand request: such a response will contain an empty + // resource wrapper with the name and aliases fields containing the alias used in the request. + { + Protobuf::RepeatedPtrField empty_resources; + const auto version = "version3"; + const auto not_resolved = "ns3/not_resolved"; - AddedRemoved converted = watch_map.convertAliasWatchesToNameWatches(resource); - - EXPECT_TRUE(converted.added_.empty()); - EXPECT_TRUE(converted.removed_.empty()); + auto* cur_resource = empty_resources.Add(); + cur_resource->set_version(version); + cur_resource->set_name(not_resolved); + cur_resource->add_aliases(not_resolved); + + EXPECT_CALL(callbacks3, onConfigUpdate(_, _, _)) + .WillOnce(Invoke([not_resolved, version]( + const std::vector& gotten_resources, + const Protobuf::RepeatedPtrField&, const std::string&) { + EXPECT_EQ(1, gotten_resources.size()); + EXPECT_EQ(gotten_resources[0].get().version(), version); + EXPECT_FALSE(gotten_resources[0].get().hasResource()); + EXPECT_EQ(gotten_resources[0].get().name(), not_resolved); + EXPECT_EQ(gotten_resources[0].get().aliases(), std::vector{not_resolved}); + })); + + Protobuf::RepeatedPtrField removed_names_proto; + + watch_map.onConfigUpdate(empty_resources, removed_names_proto, "version2"); + } } } // namespace diff --git a/test/common/filter/http/filter_config_discovery_impl_test.cc b/test/common/filter/http/filter_config_discovery_impl_test.cc index 2d7d7d0e00e61..89c668e423579 100644 --- a/test/common/filter/http/filter_config_discovery_impl_test.cc +++ b/test/common/filter/http/filter_config_discovery_impl_test.cc @@ -88,7 +88,8 @@ class FilterConfigDiscoveryImplTest : public FilterConfigDiscoveryTestBase { void setup(bool warm = true) { provider_ = createProvider("foo", warm); callbacks_ = factory_context_.cluster_manager_.subscription_factory_.callbacks_; - EXPECT_CALL(*factory_context_.cluster_manager_.subscription_factory_.subscription_, start(_)); + EXPECT_CALL(*factory_context_.cluster_manager_.subscription_factory_.subscription_, + start(_, _)); if (!warm) { EXPECT_CALL(init_watcher_, ready()); } diff --git a/test/common/router/rds_impl_test.cc b/test/common/router/rds_impl_test.cc index 0d631c4b9da5c..8adc57d7ac94f 100644 --- a/test/common/router/rds_impl_test.cc +++ b/test/common/router/rds_impl_test.cc @@ -109,7 +109,7 @@ stat_prefix: foo validation_visitor_, outer_init_manager_, "foo.", *route_config_provider_manager_); rds_callbacks_ = server_factory_context_.cluster_manager_.subscription_factory_.callbacks_; EXPECT_CALL(*server_factory_context_.cluster_manager_.subscription_factory_.subscription_, - start(_)); + start(_, _)); outer_init_manager_.initialize(init_watcher_); } @@ -517,7 +517,7 @@ name: foo // Static + dynamic. setup(); EXPECT_CALL(*server_factory_context_.cluster_manager_.subscription_factory_.subscription_, - start(_)); + start(_, _)); outer_init_manager_.initialize(init_watcher_); const std::string response1_json = R"EOF( @@ -683,7 +683,7 @@ name: foo_route_config TEST_F(RouteConfigProviderManagerImplTest, OnConfigUpdateEmpty) { setup(); EXPECT_CALL(*server_factory_context_.cluster_manager_.subscription_factory_.subscription_, - start(_)); + start(_, _)); outer_init_manager_.initialize(init_watcher_); EXPECT_CALL(init_watcher_, ready()); server_factory_context_.cluster_manager_.subscription_factory_.callbacks_->onConfigUpdate({}, ""); @@ -692,7 +692,7 @@ TEST_F(RouteConfigProviderManagerImplTest, OnConfigUpdateEmpty) { TEST_F(RouteConfigProviderManagerImplTest, OnConfigUpdateWrongSize) { setup(); EXPECT_CALL(*server_factory_context_.cluster_manager_.subscription_factory_.subscription_, - start(_)); + start(_, _)); outer_init_manager_.initialize(init_watcher_); envoy::config::route::v3::RouteConfiguration route_config; const auto decoded_resources = TestUtility::decodeResources({route_config, route_config}); @@ -724,7 +724,7 @@ TEST_F(RouteConfigProviderManagerImplTest, ConfigDumpAfterConfigRejected) { // dynamic. setup(); EXPECT_CALL(*server_factory_context_.cluster_manager_.subscription_factory_.subscription_, - start(_)); + start(_, _)); outer_init_manager_.initialize(init_watcher_); const std::string response1_yaml = R"EOF( diff --git a/test/common/router/scoped_rds_test.cc b/test/common/router/scoped_rds_test.cc index b00466a702a49..f2b9c1ae93a62 100644 --- a/test/common/router/scoped_rds_test.cc +++ b/test/common/router/scoped_rds_test.cc @@ -132,22 +132,22 @@ class ScopedRdsTest : public ScopedRoutesTestBase { API_NO_BOOST(envoy::api::v2::RouteConfiguration)().GetDescriptor()->full_name())), _, _, _)) .Times(AnyNumber()) - .WillRepeatedly(Invoke([this](const envoy::config::core::v3::ConfigSource&, - absl::string_view, Stats::Scope&, - Envoy::Config::SubscriptionCallbacks& callbacks, - Envoy::Config::OpaqueResourceDecoder&) { - auto ret = std::make_unique>(); - rds_subscription_by_config_subscription_[ret.get()] = &callbacks; - EXPECT_CALL(*ret, start(_)) - .WillOnce(Invoke( - [this, config_sub_addr = ret.get()](const std::set& resource_names) { + .WillRepeatedly( + Invoke([this](const envoy::config::core::v3::ConfigSource&, absl::string_view, + Stats::Scope&, Envoy::Config::SubscriptionCallbacks& callbacks, + Envoy::Config::OpaqueResourceDecoder&) { + auto ret = std::make_unique>(); + rds_subscription_by_config_subscription_[ret.get()] = &callbacks; + EXPECT_CALL(*ret, start(_, _)) + .WillOnce(Invoke([this, config_sub_addr = ret.get()]( + const std::set& resource_names, const bool) { EXPECT_EQ(resource_names.size(), 1); auto iter = rds_subscription_by_config_subscription_.find(config_sub_addr); EXPECT_NE(iter, rds_subscription_by_config_subscription_.end()); rds_subscription_by_name_[*resource_names.begin()] = iter->second; })); - return ret; - })); + return ret; + })); ON_CALL(context_init_manager_, add(_)).WillByDefault(Invoke([this](const Init::Target& target) { target_handles_.push_back(target.createHandle("test")); diff --git a/test/common/runtime/runtime_impl_test.cc b/test/common/runtime/runtime_impl_test.cc index aa4805f0964b2..3c58800e79777 100644 --- a/test/common/runtime/runtime_impl_test.cc +++ b/test/common/runtime/runtime_impl_test.cc @@ -848,7 +848,7 @@ class RtdsLoaderImplTest : public LoaderImplTest { generator_, validation_visitor_, *api_); loader_->initialize(cm_); for (auto* sub : rtds_subscriptions_) { - EXPECT_CALL(*sub, start(_)); + EXPECT_CALL(*sub, start(_, _)); } loader_->startRtdsSubscriptions(rtds_init_callback_.AsStdFunction()); diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index 42d83aa1fcaa2..f052749677fb3 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -40,7 +40,7 @@ class CdsApiImplTest : public testing::Test { cds_ = CdsApiImpl::create(cds_config, cm_, store_, validation_visitor_); cds_->setInitializedCb([this]() -> void { initialized_.ready(); }); - EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_)); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_, _)); cds_->initialize(); cds_callbacks_ = cm_.subscription_factory_.callbacks_; } diff --git a/test/common/upstream/eds_speed_test.cc b/test/common/upstream/eds_speed_test.cc index 84f2bf924a639..20b9a5a9fae73 100644 --- a/test/common/upstream/eds_speed_test.cc +++ b/test/common/upstream/eds_speed_test.cc @@ -63,7 +63,7 @@ class EdsSpeedTest { )EOF", Envoy::Upstream::Cluster::InitializePhase::Secondary); - EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_)); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_, _)); cluster_->initialize([this] { initialized_ = true; }); EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_)); subscription_->start({"fare"}); diff --git a/test/common/upstream/eds_test.cc b/test/common/upstream/eds_test.cc index 043865f1052a0..e52b807c1d714 100644 --- a/test/common/upstream/eds_test.cc +++ b/test/common/upstream/eds_test.cc @@ -104,7 +104,7 @@ class EdsTest : public testing::Test { } void initialize() { - EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_)); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_, _)); cluster_->initialize([this] { initialized_ = true; }); } diff --git a/test/integration/vhds_integration_test.cc b/test/integration/vhds_integration_test.cc index efa74c6039775..b0f0a4fc52473 100644 --- a/test/integration/vhds_integration_test.cc +++ b/test/integration/vhds_integration_test.cc @@ -218,7 +218,7 @@ TEST_P(VhdsInitializationTest, InitializeVhdsAfterRdsHasBeenInitialized) { sendDeltaDiscoveryResponse( Config::TypeUrl::get().VirtualHost, {TestUtility::parseYaml( - fmt::format(VhostTemplate, "vhost_0", "vhost.first"))}, + fmt::format(VhostTemplate, "my_route/vhost_0", "vhost.first"))}, {}, "1", vhds_stream_); EXPECT_TRUE( compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); @@ -249,19 +249,19 @@ class VhdsIntegrationTest : public HttpIntegrationTest, envoy::config::route::v3::VirtualHost buildVirtualHost() { return TestUtility::parseYaml( - virtualHostYaml("vhost_0", "host")); + virtualHostYaml("my_route/vhost_0", "host")); } std::vector buildVirtualHost1() { return {TestUtility::parseYaml( - virtualHostYaml("vhost_1", "vhost.first")), + virtualHostYaml("my_route/vhost_1", "vhost.first")), TestUtility::parseYaml( - virtualHostYaml("vhost_2", "vhost.second"))}; + virtualHostYaml("my_route/vhost_2", "vhost.second"))}; } envoy::config::route::v3::VirtualHost buildVirtualHost2() { return TestUtility::parseYaml( - virtualHostYaml("vhost_1", "vhost.first")); + virtualHostYaml("my_route/vhost_1", "vhost.first")); } // Overridden to insert this stuff into the initialize() at the very beginning of @@ -330,7 +330,7 @@ class VhdsIntegrationTest : public HttpIntegrationTest, response.set_system_version_info("system_version_info_this_is_a_test"); response.set_type_url(Config::TypeUrl::get().VirtualHost); auto* resource = response.add_resources(); - resource->set_name("cannot-resolve-alias"); + resource->set_name("my_route/cannot-resolve-alias"); resource->set_version(version); for (const auto& alias : aliases) { resource->add_aliases(alias); @@ -424,7 +424,8 @@ TEST_P(VhdsIntegrationTest, VhdsVirtualHostAddUpdateRemove) { // A spontaneous VHDS DiscoveryResponse removes newly added virtual hosts sendDeltaDiscoveryResponse( - Config::TypeUrl::get().VirtualHost, {}, {"vhost_1", "vhost_2"}, "3", vhds_stream_); + Config::TypeUrl::get().VirtualHost, {}, {"my_route/vhost_1", "my_route/vhost_2"}, "3", + vhds_stream_); EXPECT_TRUE( compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); @@ -487,7 +488,8 @@ TEST_P(VhdsIntegrationTest, RdsWithVirtualHostsVhdsVirtualHostAddUpdateRemove) { // A spontaneous VHDS DiscoveryResponse removes virtual hosts added via vhds sendDeltaDiscoveryResponse( - Config::TypeUrl::get().VirtualHost, {}, {"vhost_1", "vhost_2"}, "3", vhds_stream_); + Config::TypeUrl::get().VirtualHost, {}, {"my_route/vhost_1", "my_route/vhost_2"}, "3", + vhds_stream_); EXPECT_TRUE( compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); @@ -636,7 +638,8 @@ TEST_P(VhdsIntegrationTest, VhdsOnDemandUpdateFailToResolveOneAliasOutOfSeveral) vhds_stream_)); // Send an empty response back (the management server isn't aware of vhost.third) sendDeltaDiscoveryResponseWithUnresolvedAliases({buildVirtualHost2()}, {}, "4", vhds_stream_, - {"vhost.first"}, {"my_route/vhost.third"}); + {"my_route/vhost.first"}, + {"my_route/vhost.third"}); response->waitForHeaders(); EXPECT_EQ("404", response->headers().getStatusValue()); @@ -677,5 +680,101 @@ TEST_P(VhdsIntegrationTest, VhdsOnDemandUpdateHttpConnectionCloses) { cleanupUpstreamAndDownstream(); } +const char VhostTemplateAfterUpdate[] = R"EOF( +name: {} +domains: [{}] +routes: +- match: {{ prefix: "/after_update" }} + route: {{ cluster: "my_service" }} +)EOF"; + +// Verifies that after multiple vhds updates, virtual hosts from earlier updates still can receive +// updates See https://github.com/envoyproxy/envoy/issues/12158 for more details +TEST_P(VhdsIntegrationTest, MultipleUpdates) { + testRouterHeaderOnlyRequestAndResponse(nullptr, 1); + cleanupUpstreamAndDownstream(); + EXPECT_TRUE(codec_client_->waitForDisconnect()); + + { + // make first vhds request (for vhost.first) + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "vhost.first"}, + {"x-lyft-user-id", "123"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, + {vhdsRequestResourceName("vhost.first")}, {}, + vhds_stream_)); + sendDeltaDiscoveryResponse( + Config::TypeUrl::get().VirtualHost, {buildVirtualHost2()}, {}, "4", vhds_stream_, + {"my_route/vhost.first"}); + EXPECT_TRUE( + compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + + waitForNextUpstreamRequest(1); + // Send response headers, and end_stream if there is no response body. + upstream_request_->encodeHeaders(default_response_headers_, true); + + response->waitForHeaders(); + EXPECT_EQ("200", response->headers().getStatusValue()); + + cleanupUpstreamAndDownstream(); + EXPECT_TRUE(codec_client_->waitForDisconnect()); + } + { + // make second vhds request (for vhost.second) + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + Http::TestRequestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "vhost.second"}, + {"x-lyft-user-id", "123"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, + {vhdsRequestResourceName("vhost.second")}, {}, + vhds_stream_)); + sendDeltaDiscoveryResponse( + Config::TypeUrl::get().VirtualHost, + {TestUtility::parseYaml( + virtualHostYaml("my_route/vhost_2", "vhost.second"))}, + {}, "4", vhds_stream_, {"my_route/vhost.second"}); + EXPECT_TRUE( + compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + + waitForNextUpstreamRequest(1); + // Send response headers, and end_stream if there is no response body. + upstream_request_->encodeHeaders(default_response_headers_, true); + + response->waitForHeaders(); + EXPECT_EQ("200", response->headers().getStatusValue()); + + cleanupUpstreamAndDownstream(); + EXPECT_TRUE(codec_client_->waitForDisconnect()); + } + { + // Attempt to push updates for both vhost.first and vhost.second + sendDeltaDiscoveryResponse( + Config::TypeUrl::get().VirtualHost, + {TestUtility::parseYaml( + fmt::format(VhostTemplateAfterUpdate, "my_route/vhost_1", "vhost.first")), + TestUtility::parseYaml( + fmt::format(VhostTemplateAfterUpdate, "my_route/vhost_2", "vhost.second"))}, + {}, "5", vhds_stream_); + EXPECT_TRUE( + compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + + // verify that both virtual hosts have been updated + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/after_update", "vhost.first"); + cleanupUpstreamAndDownstream(); + ASSERT_TRUE(codec_client_->waitForDisconnect()); + + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/after_update", "vhost.second"); + cleanupUpstreamAndDownstream(); + ASSERT_TRUE(codec_client_->waitForDisconnect()); + } +} + } // namespace } // namespace Envoy diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index 29412fffbd068..34efe4c30bc99 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -62,8 +62,10 @@ class MockUntypedConfigUpdateCallbacks : public UntypedConfigUpdateCallbacks { class MockSubscription : public Subscription { public: - MOCK_METHOD(void, start, (const std::set& resources)); + MOCK_METHOD(void, start, + (const std::set& resources, const bool use_prefix_matching)); MOCK_METHOD(void, updateResourceInterest, (const std::set& update_to_these_names)); + MOCK_METHOD(void, requestOnDemandUpdate, (const std::set& add_these_names)); }; class MockSubscriptionFactory : public SubscriptionFactory { @@ -107,7 +109,11 @@ class MockGrpcMux : public GrpcMux { MOCK_METHOD(GrpcMuxWatchPtr, addWatch, (const std::string& type_url, const std::set& resources, - SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder)); + SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder, + const bool use_prefix_matching)); + + MOCK_METHOD(void, requestOnDemandUpdate, + (const std::string& type_url, const std::set& add_these_names)); }; class MockGrpcStreamCallbacks diff --git a/test/server/lds_api_test.cc b/test/server/lds_api_test.cc index 54a84886e832d..5b8efae5b756d 100644 --- a/test/server/lds_api_test.cc +++ b/test/server/lds_api_test.cc @@ -42,7 +42,7 @@ class LdsApiTest : public testing::Test { EXPECT_CALL(init_manager_, add(_)); lds_ = std::make_unique(lds_config, cluster_manager_, init_manager_, store_, listener_manager_, validation_visitor_); - EXPECT_CALL(*cluster_manager_.subscription_factory_.subscription_, start(_)); + EXPECT_CALL(*cluster_manager_.subscription_factory_.subscription_, start(_, _)); init_target_handle_->initialize(init_watcher_); lds_callbacks_ = cluster_manager_.subscription_factory_.callbacks_; }