diff --git a/source/common/config/xds_mux/BUILD b/source/common/config/xds_mux/BUILD new file mode 100644 index 0000000000000..f934e46e19dc4 --- /dev/null +++ b/source/common/config/xds_mux/BUILD @@ -0,0 +1,53 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_cc_library( + name = "delta_subscription_state_lib", + srcs = ["delta_subscription_state.cc"], + hdrs = ["delta_subscription_state.h"], + deps = [ + ":subscription_state_lib", + "//source/common/config:api_version_lib", + "//source/common/config:utility_lib", + "//source/common/grpc:common_lib", + "//source/common/protobuf", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", + ], +) + +envoy_cc_library( + name = "sotw_subscription_state_lib", + srcs = ["sotw_subscription_state.cc"], + hdrs = ["sotw_subscription_state.h"], + deps = [ + ":subscription_state_lib", + "//source/common/config:api_version_lib", + "//source/common/config:decoded_resource_lib", + "//source/common/config:utility_lib", + "//source/common/grpc:common_lib", + "//source/common/protobuf", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", + ], +) + +envoy_cc_library( + name = "subscription_state_lib", + hdrs = ["subscription_state.h"], + deps = [ + "//envoy/config:subscription_interface", + "//envoy/event:dispatcher_interface", + "//envoy/local_info:local_info_interface", + "//source/common/common:minimal_logger_lib", + "//source/common/config:ttl_lib", + "//source/common/config:update_ack_lib", + "//source/common/config:utility_lib", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", + ], +) diff --git a/source/common/config/xds_mux/delta_subscription_state.cc b/source/common/config/xds_mux/delta_subscription_state.cc new file mode 100644 index 0000000000000..dd3b8e686cb73 --- /dev/null +++ b/source/common/config/xds_mux/delta_subscription_state.cc @@ -0,0 +1,196 @@ +#include "source/common/config/xds_mux/delta_subscription_state.h" + +#include "envoy/event/dispatcher.h" +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/common/common/hash.h" +#include "source/common/config/utility.h" +#include "source/common/runtime/runtime_features.h" + +namespace Envoy { +namespace Config { +namespace XdsMux { + +DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url, + UntypedConfigUpdateCallbacks& watch_map, + Event::Dispatcher& dispatcher, const bool wildcard) + : BaseSubscriptionState(std::move(type_url), watch_map, dispatcher), + // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on + // empty resources as updates. + supports_heartbeats_(type_url_ != "envoy.config.route.v3.VirtualHost"), wildcard_(wildcard) {} + +DeltaSubscriptionState::~DeltaSubscriptionState() = default; + +void DeltaSubscriptionState::updateSubscriptionInterest( + const absl::flat_hash_set& cur_added, + const absl::flat_hash_set& cur_removed) { + for (const auto& a : cur_added) { + resource_state_[a] = ResourceState::waitingForServer(); + // If interest in a resource is removed-then-added (all before a discovery request + // can be sent), we must treat it as a "new" addition: our user may have forgotten its + // copy of the resource after instructing us to remove it, and need to be reminded of it. + names_removed_.erase(a); + names_added_.insert(a); + } + for (const auto& r : cur_removed) { + resource_state_.erase(r); + // Ideally, when interest in a resource is added-then-removed in between requests, + // we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]" + // in the request. However, the removed-then-added case *does* need to go in the request, + // and due to how we accomplish that, it's difficult to distinguish remove-add-remove from + // add-remove (because "remove-add" has to be treated as equivalent to just "add"). + names_added_.erase(r); + names_removed_.insert(r); + } +} + +// Not having sent any requests yet counts as an "update pending" since you're supposed to resend +// the entirety of your interest at the start of a stream, even if nothing has changed. +bool DeltaSubscriptionState::subscriptionUpdatePending() const { + return !names_added_.empty() || !names_removed_.empty() || + !any_request_sent_yet_in_current_stream_ || dynamicContextChanged(); +} + +bool DeltaSubscriptionState::isHeartbeatResource( + const envoy::service::discovery::v3::Resource& resource) const { + if (!supports_heartbeats_ && + !Runtime::runtimeFeatureEnabled("envoy.reloadable_features.vhds_heartbeats")) { + return false; + } + const auto itr = resource_state_.find(resource.name()); + if (itr == resource_state_.end()) { + return false; + } + + return !resource.has_resource() && !itr->second.isWaitingForServer() && + resource.version() == itr->second.version(); +} + +void DeltaSubscriptionState::handleGoodResponse( + const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) { + absl::flat_hash_set names_added_removed; + Protobuf::RepeatedPtrField non_heartbeat_resources; + for (const auto& resource : message.resources()) { + if (!names_added_removed.insert(resource.name()).second) { + throw EnvoyException( + fmt::format("duplicate name {} found among added/updated resources", resource.name())); + } + if (isHeartbeatResource(resource)) { + continue; + } + // TODO (dmitri-d) consider changing onConfigUpdate callback interface to avoid copying of + // resources + non_heartbeat_resources.Add()->CopyFrom(resource); + // DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource + if (!resource.has_resource() && resource.aliases_size() > 0) { + continue; + } + if (message.type_url() != resource.resource().type_url()) { + throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match " + "the message-wide type URL {} in DeltaDiscoveryResponse {}", + resource.resource().type_url(), message.type_url(), + message.DebugString())); + } + } + for (const auto& name : message.removed_resources()) { + if (!names_added_removed.insert(name).second) { + throw EnvoyException( + fmt::format("duplicate name {} found in the union of added+removed resources", name)); + } + } + + { + const auto scoped_update = ttl_.scopedTtlUpdate(); + for (const auto& resource : message.resources()) { + addResourceState(resource); + } + } + + callbacks().onConfigUpdate(non_heartbeat_resources, message.removed_resources(), + message.system_version_info()); + + // If a resource is gone, there is no longer a meaningful version for it that makes sense to + // provide to the server upon stream reconnect: either it will continue to not exist, in which + // case saying nothing is fine, or the server will bring back something new, which we should + // receive regardless (which is the logic that not specifying a version will get you). + // + // So, leave the version map entry present but blank. It will be left out of + // initial_resource_versions messages, but will remind us to explicitly tell the server "I'm + // cancelling my subscription" when we lose interest. + for (const auto& resource_name : message.removed_resources()) { + if (resource_state_.find(resource_name) != resource_state_.end()) { + resource_state_[resource_name] = ResourceState::waitingForServer(); + } + } + ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", typeUrl(), + message.resources().size(), message.removed_resources().size()); +} + +std::unique_ptr +DeltaSubscriptionState::getNextRequestInternal() { + auto request = std::make_unique(); + request->set_type_url(typeUrl()); + if (!any_request_sent_yet_in_current_stream_) { + any_request_sent_yet_in_current_stream_ = true; + // initial_resource_versions "must be populated for first request in a stream". + // Also, since this might be a new server, we must explicitly state *all* of our subscription + // interest. + for (auto const& [resource_name, resource_state] : resource_state_) { + // Populate initial_resource_versions with the resource versions we currently have. + // Resources we are interested in, but are still waiting to get any version of from the + // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!) + if (!resource_state.isWaitingForServer()) { + (*request->mutable_initial_resource_versions())[resource_name] = resource_state.version(); + } + // As mentioned above, fill resource_names_subscribe with everything, including names we + // have yet to receive any resource for unless this is a wildcard subscription, for which + // the first request on a stream must be without any resource names. + if (!wildcard_) { + names_added_.insert(resource_name); + } + } + // Wildcard subscription initial requests must have no resource_names_subscribe. + if (wildcard_) { + names_added_.clear(); + } + names_removed_.clear(); + } + + std::copy(names_added_.begin(), names_added_.end(), + Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_subscribe())); + std::copy(names_removed_.begin(), names_removed_.end(), + Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_unsubscribe())); + names_added_.clear(); + names_removed_.clear(); + + return request; +} + +void DeltaSubscriptionState::addResourceState( + const envoy::service::discovery::v3::Resource& resource) { + setResourceTtl(resource); + resource_state_[resource.name()] = ResourceState(resource.version()); +} + +void DeltaSubscriptionState::setResourceTtl( + const envoy::service::discovery::v3::Resource& resource) { + if (resource.has_ttl()) { + ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())), + resource.name()); + } else { + ttl_.clear(resource.name()); + } +} + +void DeltaSubscriptionState::ttlExpiryCallback(const std::vector& expired) { + Protobuf::RepeatedPtrField removed_resources; + for (const auto& resource : expired) { + resource_state_[resource] = ResourceState::waitingForServer(); + removed_resources.Add(std::string(resource)); + } + callbacks().onConfigUpdate({}, removed_resources, ""); +} + +} // namespace XdsMux +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/xds_mux/delta_subscription_state.h b/source/common/config/xds_mux/delta_subscription_state.h new file mode 100644 index 0000000000000..801bd5edc0c1a --- /dev/null +++ b/source/common/config/xds_mux/delta_subscription_state.h @@ -0,0 +1,98 @@ +#pragma once + +#include "envoy/grpc/status.h" + +#include "source/common/common/assert.h" +#include "source/common/common/logger.h" +#include "source/common/config/api_version.h" +#include "source/common/config/xds_mux/subscription_state.h" + +#include "absl/container/node_hash_map.h" +#include "absl/types/optional.h" + +namespace Envoy { +namespace Config { +namespace XdsMux { + +// Tracks the state of a delta xDS-over-gRPC protocol session. +class DeltaSubscriptionState + : public BaseSubscriptionState { +public: + DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, + Event::Dispatcher& dispatcher, const bool wildcard); + + ~DeltaSubscriptionState() override; + + // Update which resources we're interested in subscribing to. + void updateSubscriptionInterest(const absl::flat_hash_set& cur_added, + const absl::flat_hash_set& cur_removed) override; + + // Whether there was a change in our subscription interest we have yet to inform the server of. + bool subscriptionUpdatePending() const override; + + void markStreamFresh() override { any_request_sent_yet_in_current_stream_ = false; } + + void ttlExpiryCallback(const std::vector& expired) override; + + DeltaSubscriptionState(const DeltaSubscriptionState&) = delete; + DeltaSubscriptionState& operator=(const DeltaSubscriptionState&) = delete; + +private: + std::unique_ptr + getNextRequestInternal() override; + + void setResourceTtl(const envoy::service::discovery::v3::Resource& resource); + bool isHeartbeatResource(const envoy::service::discovery::v3::Resource& resource) const; + void + handleGoodResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) override; + void addResourceState(const envoy::service::discovery::v3::Resource& resource); + + class ResourceState { + public: + explicit ResourceState(absl::string_view version) : version_(version) {} + // Builds a ResourceVersion in the waitingForServer state. + ResourceState() = default; + // Self-documenting alias of default constructor. + static ResourceState waitingForServer() { return ResourceState(); } + + // If true, we currently have no version of this resource - we are waiting for the server to + // provide us with one. + bool isWaitingForServer() const { return version_ == absl::nullopt; } + + // Must not be called if waitingForServer() == true. + std::string version() const { + ASSERT(version_.has_value()); + return version_.value_or(""); + } + + private: + absl::optional version_; + }; + + // Not all xDS resources support heartbeats due to there being specific information encoded in + // an empty response, which is indistinguishable from a heartbeat in some cases. For now we just + // disable heartbeats for these resources (currently only VHDS). + const bool supports_heartbeats_; + + // Is the subscription is for a wildcard request. + const bool wildcard_; + + // A map from resource name to per-resource version. The keys of this map are exactly the resource + // names we are currently interested in. Those in the waitingForServer state currently don't have + // any version for that resource: we need to inform the server if we lose interest in them, but we + // also need to *not* include them in the initial_resource_versions map upon a reconnect. + absl::node_hash_map resource_state_; + + bool any_request_sent_yet_in_current_stream_{}; + + // Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent. + // TODO: Can't use absl::flat_hash_set due to ordering issues in gTest expectation matching. + // Feel free to change to an unordered container once we figure out how to make it work. + std::set names_added_; + std::set names_removed_; +}; + +} // namespace XdsMux +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/xds_mux/sotw_subscription_state.cc b/source/common/config/xds_mux/sotw_subscription_state.cc new file mode 100644 index 0000000000000..bb7a9f4c3a9c6 --- /dev/null +++ b/source/common/config/xds_mux/sotw_subscription_state.cc @@ -0,0 +1,125 @@ +#include "source/common/config/xds_mux/sotw_subscription_state.h" + +#include "source/common/config/utility.h" + +namespace Envoy { +namespace Config { +namespace XdsMux { + +SotwSubscriptionState::SotwSubscriptionState(std::string type_url, + UntypedConfigUpdateCallbacks& callbacks, + Event::Dispatcher& dispatcher, + OpaqueResourceDecoder& resource_decoder) + : BaseSubscriptionState(std::move(type_url), callbacks, dispatcher), + resource_decoder_(resource_decoder) {} + +SotwSubscriptionState::~SotwSubscriptionState() = default; + +void SotwSubscriptionState::updateSubscriptionInterest( + const absl::flat_hash_set& cur_added, + const absl::flat_hash_set& cur_removed) { + for (const auto& a : cur_added) { + names_tracked_.insert(a); + } + for (const auto& r : cur_removed) { + names_tracked_.erase(r); + } + if (!cur_added.empty() || !cur_removed.empty()) { + update_pending_ = true; + } +} + +// Not having sent any requests yet counts as an "update pending" since you're supposed to resend +// the entirety of your interest at the start of a stream, even if nothing has changed. +bool SotwSubscriptionState::subscriptionUpdatePending() const { + return update_pending_ || dynamicContextChanged(); +} + +void SotwSubscriptionState::markStreamFresh() { + last_good_version_info_ = absl::nullopt; + last_good_nonce_ = absl::nullopt; + update_pending_ = true; + clearDynamicContextChanged(); +} + +void SotwSubscriptionState::handleGoodResponse( + const envoy::service::discovery::v3::DiscoveryResponse& message) { + Protobuf::RepeatedPtrField non_heartbeat_resources; + std::vector resources_with_ttl( + message.resources().size()); + + { + const auto scoped_update = ttl_.scopedTtlUpdate(); + for (const auto& any : message.resources()) { + if (!any.Is() && + any.type_url() != message.type_url()) { + throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match " + "the message-wide type URL {} in DiscoveryResponse {}", + any.type_url(), message.type_url(), + message.DebugString())); + } + + auto decoded_resource = + DecodedResourceImpl::fromResource(resource_decoder_, any, message.version_info()); + setResourceTtl(*decoded_resource); + if (isHeartbeatResource(*decoded_resource, message.version_info())) { + continue; + } + non_heartbeat_resources.Add()->CopyFrom(any); + } + } + + // TODO (dmitri-d) to eliminate decoding of resources twice consider expanding the interface to + // support passing of decoded resources + callbacks().onConfigUpdate(non_heartbeat_resources, message.version_info()); + // Now that we're passed onConfigUpdate() without an exception thrown, we know we're good. + last_good_version_info_ = message.version_info(); + last_good_nonce_ = message.nonce(); + ENVOY_LOG(debug, "Config update for {} (version {}) accepted with {} resources", typeUrl(), + message.version_info(), message.resources().size()); +} + +std::unique_ptr +SotwSubscriptionState::getNextRequestInternal() { + auto request = std::make_unique(); + request->set_type_url(typeUrl()); + std::copy(names_tracked_.begin(), names_tracked_.end(), + Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names())); + if (last_good_version_info_.has_value()) { + request->set_version_info(last_good_version_info_.value()); + } + // Default response_nonce to the last known good one. If we are being called by + // getNextRequestWithAck(), this value will be overwritten. + if (last_good_nonce_.has_value()) { + request->set_response_nonce(last_good_nonce_.value()); + } + + update_pending_ = false; + return request; +} + +void SotwSubscriptionState::ttlExpiryCallback(const std::vector& expired) { + Protobuf::RepeatedPtrField removed_resources; + for (const auto& resource : expired) { + removed_resources.Add(std::string(resource)); + } + callbacks().onConfigUpdate({}, removed_resources, ""); +} + +void SotwSubscriptionState::setResourceTtl(const DecodedResourceImpl& decoded_resource) { + if (decoded_resource.ttl()) { + ttl_.add(std::chrono::milliseconds(*decoded_resource.ttl()), decoded_resource.name()); + } else { + ttl_.clear(decoded_resource.name()); + } +} + +bool SotwSubscriptionState::isHeartbeatResource(const DecodedResource& resource, + const std::string& version) { + return !resource.hasResource() && last_good_version_info_.has_value() && + version == last_good_version_info_.value(); +} + +} // namespace XdsMux +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/xds_mux/sotw_subscription_state.h b/source/common/config/xds_mux/sotw_subscription_state.h new file mode 100644 index 0000000000000..4d191fb93a3c4 --- /dev/null +++ b/source/common/config/xds_mux/sotw_subscription_state.h @@ -0,0 +1,67 @@ +#pragma once + +#include "envoy/grpc/status.h" +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/common/common/assert.h" +#include "source/common/common/hash.h" +#include "source/common/config/decoded_resource_impl.h" +#include "source/common/config/xds_mux/subscription_state.h" + +#include "absl/types/optional.h" + +namespace Envoy { +namespace Config { +namespace XdsMux { + +// Tracks the state of a "state-of-the-world" (i.e. not delta) xDS-over-gRPC protocol session. +class SotwSubscriptionState + : public BaseSubscriptionState { +public: + // Note that, outside of tests, we expect callbacks to always be a WatchMap. + SotwSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& callbacks, + Event::Dispatcher& dispatcher, OpaqueResourceDecoder& resource_decoder); + ~SotwSubscriptionState() override; + + // Update which resources we're interested in subscribing to. + void updateSubscriptionInterest(const absl::flat_hash_set& cur_added, + const absl::flat_hash_set& cur_removed) override; + + // Whether there was a change in our subscription interest we have yet to inform the server of. + bool subscriptionUpdatePending() const override; + + void markStreamFresh() override; + + void ttlExpiryCallback(const std::vector& expired) override; + + SotwSubscriptionState(const SotwSubscriptionState&) = delete; + SotwSubscriptionState& operator=(const SotwSubscriptionState&) = delete; + +private: + std::unique_ptr + getNextRequestInternal() override; + + void handleGoodResponse(const envoy::service::discovery::v3::DiscoveryResponse& message) override; + void setResourceTtl(const DecodedResourceImpl& decoded_resource); + bool isHeartbeatResource(const DecodedResource& resource, const std::string& version); + + OpaqueResourceDecoder& resource_decoder_; + + // The version_info carried by the last accepted DiscoveryResponse. + // Remains empty until one is accepted. + absl::optional last_good_version_info_; + // The nonce carried by the last accepted DiscoveryResponse. + // Remains empty until one is accepted. + // Used when it's time to make a spontaneous (i.e. not primarily meant as an ACK) request. + absl::optional last_good_nonce_; + + // Starts true because we should send a request upon subscription start. + bool update_pending_{true}; + + absl::flat_hash_set names_tracked_; +}; + +} // namespace XdsMux +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/xds_mux/subscription_state.h b/source/common/config/xds_mux/subscription_state.h new file mode 100644 index 0000000000000..6607989745763 --- /dev/null +++ b/source/common/config/xds_mux/subscription_state.h @@ -0,0 +1,118 @@ +#pragma once + +#include +#include + +#include "envoy/common/pure.h" +#include "envoy/config/subscription.h" +#include "envoy/event/dispatcher.h" +#include "envoy/service/discovery/v3/discovery.pb.h" + +#include "source/common/config/ttl.h" +#include "source/common/config/update_ack.h" +#include "source/common/config/utility.h" +#include "source/common/protobuf/protobuf.h" + +#include "absl/strings/string_view.h" + +namespace Envoy { +namespace Config { +namespace XdsMux { + +class SubscriptionState {}; + +// Tracks the protocol state of an individual ongoing xDS-over-gRPC session, for a single type_url. +// There can be multiple SubscriptionStates active, one per type_url. They will all be +// blissfully unaware of each other's existence, even when their messages are being multiplexed +// together by ADS. +// This is the abstract parent class for both the delta and state-of-the-world xDS variants. +template +class BaseSubscriptionState : public SubscriptionState, + public Logger::Loggable { +public: + // Note that, outside of tests, we expect callbacks to always be a WatchMap. + BaseSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& callbacks, + Event::Dispatcher& dispatcher) + : ttl_([this](const std::vector& expired) { ttlExpiryCallback(expired); }, + dispatcher, dispatcher.timeSource()), + type_url_(std::move(type_url)), callbacks_(callbacks), dispatcher_(dispatcher) {} + + virtual ~BaseSubscriptionState() = default; + + // Update which resources we're interested in subscribing to. + virtual void updateSubscriptionInterest(const absl::flat_hash_set& cur_added, + const absl::flat_hash_set& cur_removed) PURE; + + void setDynamicContextChanged() { dynamic_context_changed_ = true; } + void clearDynamicContextChanged() { dynamic_context_changed_ = false; } + bool dynamicContextChanged() const { return dynamic_context_changed_; } + + // Whether there was a change in our subscription interest we have yet to inform the server of. + virtual bool subscriptionUpdatePending() const PURE; + + virtual void markStreamFresh() PURE; + + UpdateAck handleResponse(const RS& response) { + // We *always* copy the response's nonce into the next request, even if we're going to make that + // request a NACK by setting error_detail. + UpdateAck ack(response.nonce(), typeUrl()); + ENVOY_LOG(debug, "Handling response for {}", typeUrl()); + TRY_ASSERT_MAIN_THREAD { handleGoodResponse(response); } + END_TRY + catch (const EnvoyException& e) { + handleBadResponse(e, ack); + } + return ack; + } + + void handleEstablishmentFailure() { + ENVOY_LOG(debug, "SubscriptionState establishment failed for {}", typeUrl()); + callbacks().onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, + nullptr); + } + + // Returns the next gRPC request proto to be sent off to the server, based on this object's + // understanding of the current protocol state, and new resources that Envoy wants to request. + std::unique_ptr getNextRequestAckless() { return getNextRequestInternal(); } + + // The WithAck version first calls the ack-less version, then adds in the passed-in ack. + // Returns a new'd pointer, meant to be owned by the caller, who is expected to know what type the + // pointer actually is. + std::unique_ptr getNextRequestWithAck(const UpdateAck& ack) { + auto request = getNextRequestInternal(); + request->set_response_nonce(ack.nonce_); + ENVOY_LOG(debug, "ACK for {} will have nonce {}", typeUrl(), ack.nonce_); + if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) { + // Don't needlessly make the field present-but-empty if status is ok. + request->mutable_error_detail()->CopyFrom(ack.error_detail_); + } + return request; + } + + virtual void ttlExpiryCallback(const std::vector& type_url) PURE; + +protected: + virtual std::unique_ptr getNextRequestInternal() PURE; + virtual void handleGoodResponse(const RS& message) PURE; + void handleBadResponse(const EnvoyException& e, UpdateAck& ack) { + // Note that error_detail being set is what indicates that a (Delta)DiscoveryRequest is a NACK. + ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal); + ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what())); + ENVOY_LOG(warn, "Config for {} rejected: {}", typeUrl(), e.what()); + callbacks().onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e); + } + + std::string typeUrl() const { return type_url_; } + UntypedConfigUpdateCallbacks& callbacks() const { return callbacks_; } + + TtlManager ttl_; + const std::string type_url_; + // callbacks_ is expected (outside of tests) to be a WatchMap. + UntypedConfigUpdateCallbacks& callbacks_; + Event::Dispatcher& dispatcher_; + bool dynamic_context_changed_{}; +}; + +} // namespace XdsMux +} // namespace Config +} // namespace Envoy diff --git a/test/common/config/BUILD b/test/common/config/BUILD index a7c5aaf0e4470..f8cd436f49465 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -77,6 +77,7 @@ envoy_cc_test( "//source/common/config:delta_subscription_state_lib", "//source/common/config:grpc_subscription_lib", "//source/common/config:new_grpc_mux_lib", + "//source/common/config/xds_mux:delta_subscription_state_lib", "//source/common/stats:isolated_store_lib", "//test/mocks:common_lib", "//test/mocks/config:config_mocks", @@ -91,6 +92,25 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "sotw_subscription_state_test", + srcs = ["sotw_subscription_state_test.cc"], + deps = [ + "//source/common/config:resource_name_lib", + "//source/common/config/xds_mux:sotw_subscription_state_lib", + "//source/common/stats:isolated_store_lib", + "//test/mocks:common_lib", + "//test/mocks/config:config_mocks", + "//test/mocks/event:event_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/runtime:runtime_mocks", + "//test/test_common:logging_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", + ], +) + envoy_cc_test( name = "filesystem_subscription_impl_test", srcs = ["filesystem_subscription_impl_test.cc"], diff --git a/test/common/config/delta_subscription_state_test.cc b/test/common/config/delta_subscription_state_test.cc index fea85b717740c..0aedc138039b1 100644 --- a/test/common/config/delta_subscription_state_test.cc +++ b/test/common/config/delta_subscription_state_test.cc @@ -5,6 +5,7 @@ #include "source/common/config/delta_subscription_state.h" #include "source/common/config/utility.h" +#include "source/common/config/xds_mux/delta_subscription_state.h" #include "source/common/stats/isolated_store_impl.h" #include "test/mocks/config/mocks.h" @@ -16,6 +17,7 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +using testing::IsSubstring; using testing::NiceMock; using testing::Throw; using testing::UnorderedElementsAre; @@ -26,22 +28,55 @@ namespace Config { namespace { const char TypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster"; +enum class LegacyOrUnified { Legacy, Unified }; -class DeltaSubscriptionStateTestBase : public testing::Test { +class DeltaSubscriptionStateTestBase : public testing::TestWithParam { protected: DeltaSubscriptionStateTestBase( - const std::string& type_url, const bool wildcard, + const std::string& type_url, const bool wildcard, LegacyOrUnified legacy_or_unified, const absl::flat_hash_set initial_resources = {"name1", "name2", "name3"}) - : timer_(new Event::MockTimer(&dispatcher_)), - state_(type_url, callbacks_, local_info_, dispatcher_, wildcard) { - state_.updateSubscriptionInterest(initial_resources, {}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_THAT(cur_request.resource_names_subscribe(), + : should_use_unified_(legacy_or_unified == LegacyOrUnified::Unified) { + ttl_timer_ = new Event::MockTimer(&dispatcher_); + + if (should_use_unified_) { + state_ = std::make_unique( + type_url, callbacks_, dispatcher_, wildcard); + } else { + state_ = std::make_unique( + type_url, callbacks_, local_info_, dispatcher_, wildcard); + } + updateSubscriptionInterest(initial_resources, {}); + auto cur_request = getNextRequestAckless(); + EXPECT_THAT(cur_request->resource_names_subscribe(), // UnorderedElementsAre("name1", "name2", "name3")); UnorderedElementsAreArray(initial_resources.cbegin(), initial_resources.cend())); } + void updateSubscriptionInterest(const absl::flat_hash_set& cur_added, + const absl::flat_hash_set& cur_removed) { + if (should_use_unified_) { + absl::get<1>(state_)->updateSubscriptionInterest(cur_added, cur_removed); + } else { + absl::get<0>(state_)->updateSubscriptionInterest(cur_added, cur_removed); + } + } + + std::unique_ptr getNextRequestAckless() { + if (should_use_unified_) { + return absl::get<1>(state_)->getNextRequestAckless(); + } + return std::make_unique( + absl::get<0>(state_)->getNextRequestAckless()); + } + + UpdateAck + handleResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& response_proto) { + if (should_use_unified_) { + return absl::get<1>(state_)->handleResponse(response_proto); + } + return absl::get<0>(state_)->handleResponse(response_proto); + } + UpdateAck deliverDiscoveryResponse( const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, @@ -61,7 +96,7 @@ class DeltaSubscriptionStateTestBase : public testing::Test { EXPECT_EQ(added.size(), *updated_resources); } })); - return state_.handleResponse(message); + return handleResponse(message); } UpdateAck deliverBadDiscoveryResponse( @@ -74,15 +109,33 @@ class DeltaSubscriptionStateTestBase : public testing::Test { message.set_system_version_info(version_info); message.set_nonce(nonce); EXPECT_CALL(callbacks_, onConfigUpdate(_, _, _)).WillOnce(Throw(EnvoyException(error_message))); - return state_.handleResponse(message); + return handleResponse(message); + } + + void markStreamFresh() { + if (should_use_unified_) { + absl::get<1>(state_)->markStreamFresh(); + } else { + absl::get<0>(state_)->markStreamFresh(); + } + } + + bool subscriptionUpdatePending() { + if (should_use_unified_) { + return absl::get<1>(state_)->subscriptionUpdatePending(); + } + return absl::get<0>(state_)->subscriptionUpdatePending(); } NiceMock callbacks_; NiceMock local_info_; NiceMock dispatcher_; - Event::MockTimer* timer_; + Event::MockTimer* ttl_timer_; // We start out interested in three resources: name1, name2, and name3. - DeltaSubscriptionState state_; + absl::variant, + std::unique_ptr> + state_; + bool should_use_unified_; }; Protobuf::RepeatedPtrField @@ -98,30 +151,35 @@ populateRepeatedResource(std::vector> items) class DeltaSubscriptionStateTest : public DeltaSubscriptionStateTestBase { public: - DeltaSubscriptionStateTest() : DeltaSubscriptionStateTestBase(TypeUrl, false) {} + DeltaSubscriptionStateTest() : DeltaSubscriptionStateTestBase(TypeUrl, false, GetParam()) {} }; +INSTANTIATE_TEST_SUITE_P(DeltaSubscriptionStateTest, DeltaSubscriptionStateTest, + testing::ValuesIn({LegacyOrUnified::Legacy, LegacyOrUnified::Unified})); + // Delta subscription state of a wildcard subscription request. class WildcardDeltaSubscriptionStateTest : public DeltaSubscriptionStateTestBase { public: - WildcardDeltaSubscriptionStateTest() : DeltaSubscriptionStateTestBase(TypeUrl, true, {}) {} + WildcardDeltaSubscriptionStateTest() + : DeltaSubscriptionStateTestBase(TypeUrl, true, GetParam(), {}) {} }; +INSTANTIATE_TEST_SUITE_P(WildcardDeltaSubscriptionStateTest, WildcardDeltaSubscriptionStateTest, + testing::ValuesIn({LegacyOrUnified::Legacy, LegacyOrUnified::Unified})); + // Basic gaining/losing interest in resources should lead to subscription updates. -TEST_F(DeltaSubscriptionStateTest, SubscribeAndUnsubscribe) { +TEST_P(DeltaSubscriptionStateTest, SubscribeAndUnsubscribe) { { - state_.updateSubscriptionInterest({"name4"}, {"name1"}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name4")); - EXPECT_THAT(cur_request.resource_names_unsubscribe(), UnorderedElementsAre("name1")); + updateSubscriptionInterest({"name4"}, {"name1"}); + auto cur_request = getNextRequestAckless(); + EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name4")); + EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre("name1")); } { - state_.updateSubscriptionInterest({"name1"}, {"name3", "name4"}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name1")); - EXPECT_THAT(cur_request.resource_names_unsubscribe(), UnorderedElementsAre("name3", "name4")); + updateSubscriptionInterest({"name1"}, {"name3", "name4"}); + auto cur_request = getNextRequestAckless(); + EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name1")); + EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre("name3", "name4")); } } @@ -134,12 +192,12 @@ TEST_F(DeltaSubscriptionStateTest, SubscribeAndUnsubscribe) { // again. So, we *should* send a request "re-"subscribing. This means that the server needs to // interpret the resource_names_subscribe field as "send these resources even if you think Envoy // already has them". -TEST_F(DeltaSubscriptionStateTest, RemoveThenAdd) { - state_.updateSubscriptionInterest({}, {"name3"}); - state_.updateSubscriptionInterest({"name3"}, {}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); - EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name3")); - EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); +TEST_P(DeltaSubscriptionStateTest, RemoveThenAdd) { + updateSubscriptionInterest({}, {"name3"}); + updateSubscriptionInterest({"name3"}, {}); + auto cur_request = getNextRequestAckless(); + EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name3")); + EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); } // Due to how our implementation provides the required behavior tested in RemoveThenAdd, the @@ -150,62 +208,62 @@ TEST_F(DeltaSubscriptionStateTest, RemoveThenAdd) { // This test is just here to illustrate that this behavior exists, not to enforce that it // should be like this. What *is* important: the server must happily and cleanly ignore // "unsubscribe from [resource name I have never before referred to]" requests. -TEST_F(DeltaSubscriptionStateTest, AddThenRemove) { - state_.updateSubscriptionInterest({"name4"}, {}); - state_.updateSubscriptionInterest({}, {"name4"}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); - EXPECT_TRUE(cur_request.resource_names_subscribe().empty()); - EXPECT_THAT(cur_request.resource_names_unsubscribe(), UnorderedElementsAre("name4")); +TEST_P(DeltaSubscriptionStateTest, AddThenRemove) { + updateSubscriptionInterest({"name4"}, {}); + updateSubscriptionInterest({}, {"name4"}); + auto cur_request = getNextRequestAckless(); + EXPECT_TRUE(cur_request->resource_names_subscribe().empty()); + EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre("name4")); } // add/remove/add == add. -TEST_F(DeltaSubscriptionStateTest, AddRemoveAdd) { - state_.updateSubscriptionInterest({"name4"}, {}); - state_.updateSubscriptionInterest({}, {"name4"}); - state_.updateSubscriptionInterest({"name4"}, {}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); - EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name4")); - EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); +TEST_P(DeltaSubscriptionStateTest, AddRemoveAdd) { + updateSubscriptionInterest({"name4"}, {}); + updateSubscriptionInterest({}, {"name4"}); + updateSubscriptionInterest({"name4"}, {}); + auto cur_request = getNextRequestAckless(); + EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name4")); + EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); } // remove/add/remove == remove. -TEST_F(DeltaSubscriptionStateTest, RemoveAddRemove) { - state_.updateSubscriptionInterest({}, {"name3"}); - state_.updateSubscriptionInterest({"name3"}, {}); - state_.updateSubscriptionInterest({}, {"name3"}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); - EXPECT_TRUE(cur_request.resource_names_subscribe().empty()); - EXPECT_THAT(cur_request.resource_names_unsubscribe(), UnorderedElementsAre("name3")); +TEST_P(DeltaSubscriptionStateTest, RemoveAddRemove) { + updateSubscriptionInterest({}, {"name3"}); + updateSubscriptionInterest({"name3"}, {}); + updateSubscriptionInterest({}, {"name3"}); + auto cur_request = getNextRequestAckless(); + EXPECT_TRUE(cur_request->resource_names_subscribe().empty()); + EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre("name3")); } // Starts with 1,2,3. 4 is added/removed/added. In those same updates, 1,2,3 are // removed/added/removed. End result should be 4 added and 1,2,3 removed. -TEST_F(DeltaSubscriptionStateTest, BothAddAndRemove) { - state_.updateSubscriptionInterest({"name4"}, {"name1", "name2", "name3"}); - state_.updateSubscriptionInterest({"name1", "name2", "name3"}, {"name4"}); - state_.updateSubscriptionInterest({"name4"}, {"name1", "name2", "name3"}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); - EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name4")); - EXPECT_THAT(cur_request.resource_names_unsubscribe(), +TEST_P(DeltaSubscriptionStateTest, BothAddAndRemove) { + updateSubscriptionInterest({"name4"}, {"name1", "name2", "name3"}); + updateSubscriptionInterest({"name1", "name2", "name3"}, {"name4"}); + updateSubscriptionInterest({"name4"}, {"name1", "name2", "name3"}); + auto cur_request = getNextRequestAckless(); + EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name4")); + EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre("name1", "name2", "name3")); } -TEST_F(DeltaSubscriptionStateTest, CumulativeUpdates) { - state_.updateSubscriptionInterest({"name4"}, {}); - state_.updateSubscriptionInterest({"name5"}, {}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); - EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name4", "name5")); - EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); +TEST_P(DeltaSubscriptionStateTest, CumulativeUpdates) { + updateSubscriptionInterest({"name4"}, {}); + updateSubscriptionInterest({"name5"}, {}); + auto cur_request = getNextRequestAckless(); + EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name4", "name5")); + EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); } // Verifies that a sequence of good and bad responses from the server all get the appropriate // ACKs/NACKs from Envoy. -TEST_F(DeltaSubscriptionStateTest, AckGenerated) { +TEST_P(DeltaSubscriptionStateTest, AckGenerated) { // The xDS server's first response includes items for name1 and 2, but not 3. { Protobuf::RepeatedPtrField added_resources = populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); UpdateAck ack = deliverDiscoveryResponse(added_resources, {}, "debug1", "nonce1"); EXPECT_EQ("nonce1", ack.nonce_); EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); @@ -215,7 +273,7 @@ TEST_F(DeltaSubscriptionStateTest, AckGenerated) { Protobuf::RepeatedPtrField added_resources = populateRepeatedResource( {{"name1", "version1B"}, {"name2", "version2B"}, {"name3", "version3A"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); UpdateAck ack = deliverDiscoveryResponse(added_resources, {}, "debug2", "nonce2"); EXPECT_EQ("nonce2", ack.nonce_); EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); @@ -225,7 +283,7 @@ TEST_F(DeltaSubscriptionStateTest, AckGenerated) { Protobuf::RepeatedPtrField added_resources = populateRepeatedResource( {{"name1", "version1C"}, {"name2", "version2C"}, {"name3", "version3B"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); UpdateAck ack = deliverBadDiscoveryResponse(added_resources, {}, "debug3", "nonce3", "oh no"); EXPECT_EQ("nonce3", ack.nonce_); EXPECT_NE(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); @@ -235,7 +293,7 @@ TEST_F(DeltaSubscriptionStateTest, AckGenerated) { Protobuf::RepeatedPtrField added_resources = populateRepeatedResource( {{"name1", "version1D"}, {"name2", "version2D"}, {"name3", "version3C"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); UpdateAck ack = deliverDiscoveryResponse(added_resources, {}, "debug4", "nonce4"); EXPECT_EQ("nonce4", ack.nonce_); EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); @@ -246,7 +304,7 @@ TEST_F(DeltaSubscriptionStateTest, AckGenerated) { Protobuf::RepeatedPtrField added_resources = populateRepeatedResource( {{"name1", "version1D"}, {"name2", "version2D"}, {"name3", "version3D"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); UpdateAck ack = deliverBadDiscoveryResponse(added_resources, {}, "debug5", "nonce5", very_large_error_message); EXPECT_EQ("nonce5", ack.nonce_); @@ -261,20 +319,19 @@ TEST_F(DeltaSubscriptionStateTest, AckGenerated) { // 1) resources we have a version of are present in the map, // 2) resources we are interested in but don't have are not present, and // 3) resources we have lost interest in are not present. -TEST_F(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) { +TEST_P(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) { { // The xDS server's first update includes items for name1 and 2, but not 3. Protobuf::RepeatedPtrField add1_2 = populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(add1_2, {}, "debugversion1"); - state_.markStreamFresh(); // simulate a stream reconnection - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_EQ("version1A", cur_request.initial_resource_versions().at("name1")); - EXPECT_EQ("version2A", cur_request.initial_resource_versions().at("name2")); - EXPECT_EQ(cur_request.initial_resource_versions().end(), - cur_request.initial_resource_versions().find("name3")); + markStreamFresh(); // simulate a stream reconnection + auto cur_request = getNextRequestAckless(); + EXPECT_EQ("version1A", cur_request->initial_resource_versions().at("name1")); + EXPECT_EQ("version2A", cur_request->initial_resource_versions().at("name2")); + EXPECT_EQ(cur_request->initial_resource_versions().end(), + cur_request->initial_resource_versions().find("name3")); } { @@ -283,15 +340,14 @@ TEST_F(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) { populateRepeatedResource({{"name1", "version1B"}, {"name3", "version3A"}}); Protobuf::RepeatedPtrField remove2; *remove2.Add() = "name2"; - EXPECT_CALL(*timer_, disableTimer()).Times(2); + EXPECT_CALL(*ttl_timer_, disableTimer()).Times(2); deliverDiscoveryResponse(add1_3, remove2, "debugversion2"); - state_.markStreamFresh(); // simulate a stream reconnection - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_EQ("version1B", cur_request.initial_resource_versions().at("name1")); - EXPECT_EQ(cur_request.initial_resource_versions().end(), - cur_request.initial_resource_versions().find("name2")); - EXPECT_EQ("version3A", cur_request.initial_resource_versions().at("name3")); + markStreamFresh(); // simulate a stream reconnection + auto cur_request = getNextRequestAckless(); + EXPECT_EQ("version1B", cur_request->initial_resource_versions().at("name1")); + EXPECT_EQ(cur_request->initial_resource_versions().end(), + cur_request->initial_resource_versions().find("name2")); + EXPECT_EQ("version3A", cur_request->initial_resource_versions().at("name3")); } { @@ -300,20 +356,18 @@ TEST_F(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) { *remove1_3.Add() = "name1"; *remove1_3.Add() = "name3"; deliverDiscoveryResponse({}, remove1_3, "debugversion3"); - state_.markStreamFresh(); // simulate a stream reconnection - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_TRUE(cur_request.initial_resource_versions().empty()); + markStreamFresh(); // simulate a stream reconnection + auto cur_request = getNextRequestAckless(); + EXPECT_TRUE(cur_request->initial_resource_versions().empty()); } { // ...but our own map should remember our interest. In particular, losing interest in a // resource should cause its name to appear in the next request's resource_names_unsubscribe. - state_.updateSubscriptionInterest({"name4"}, {"name1", "name2"}); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name4")); - EXPECT_THAT(cur_request.resource_names_unsubscribe(), UnorderedElementsAre("name1", "name2")); + updateSubscriptionInterest({"name4"}, {"name1", "name2"}); + auto cur_request = getNextRequestAckless(); + EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name4")); + EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre("name1", "name2")); } } @@ -326,24 +380,24 @@ TEST_F(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) { // in between the last request of the last stream and the first request of the new stream, Envoy // lost interest in a resource. The unsubscription implicitly takes effect by simply saying // nothing about the resource in the newly reconnected stream. -TEST_F(DeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) { +TEST_P(DeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) { Protobuf::RepeatedPtrField add1_2 = populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(add1_2, {}, "debugversion1"); - state_.updateSubscriptionInterest({"name4"}, {"name1"}); - state_.markStreamFresh(); // simulate a stream reconnection - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); + updateSubscriptionInterest({"name4"}, {"name1"}); + markStreamFresh(); // simulate a stream reconnection + auto cur_request = getNextRequestAckless(); // Regarding the resource_names_subscribe field: // name1: do not include: we lost interest. // name2: yes do include: we are interested, its non-wildcard, and we have a version of it. // name3: yes do include: even though we don't have a version of it, we are interested. // name4: yes do include: we are newly interested. (If this wasn't a stream reconnect, only // name4 would belong in this subscribe field). - EXPECT_THAT(cur_request.resource_names_subscribe(), + EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name2", "name3", "name4")); - EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); + EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); } // For wildcard subscription, upon a reconnection, the server is supposed to assume a @@ -353,77 +407,75 @@ TEST_F(DeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) { // last stream and the first request of the new stream, Envoy gained or lost interest in a // resource. The subscription & unsubscription implicitly takes effect by simply requesting a // wildcard subscription in the newly reconnected stream. -TEST_F(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) { +TEST_P(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) { Protobuf::RepeatedPtrField add1_2 = populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(add1_2, {}, "debugversion1"); - state_.updateSubscriptionInterest({"name3"}, {"name1"}); - state_.markStreamFresh(); // simulate a stream reconnection - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); + updateSubscriptionInterest({"name3"}, {"name1"}); + markStreamFresh(); // simulate a stream reconnection + auto cur_request = getNextRequestAckless(); // Regarding the resource_names_subscribe field: // name1: do not include: we lost interest. // name2: do not include: we are interested, but for wildcard it shouldn't be provided. // name4: do not include: although we are newly interested, an initial wildcard request // must be with no resources. - EXPECT_TRUE(cur_request.resource_names_subscribe().empty()); - EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); + EXPECT_TRUE(cur_request->resource_names_subscribe().empty()); + EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); } // initial_resource_versions should not be present on messages after the first in a stream. -TEST_F(DeltaSubscriptionStateTest, InitialVersionMapFirstMessageOnly) { +TEST_P(DeltaSubscriptionStateTest, InitialVersionMapFirstMessageOnly) { // First, verify that the first message of a new stream sends initial versions. { // The xDS server's first update gives us all three resources. Protobuf::RepeatedPtrField add_all = populateRepeatedResource( {{"name1", "version1A"}, {"name2", "version2A"}, {"name3", "version3A"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(add_all, {}, "debugversion1"); - state_.markStreamFresh(); // simulate a stream reconnection - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_EQ("version1A", cur_request.initial_resource_versions().at("name1")); - EXPECT_EQ("version2A", cur_request.initial_resource_versions().at("name2")); - EXPECT_EQ("version3A", cur_request.initial_resource_versions().at("name3")); + markStreamFresh(); // simulate a stream reconnection + auto cur_request = getNextRequestAckless(); + EXPECT_EQ("version1A", cur_request->initial_resource_versions().at("name1")); + EXPECT_EQ("version2A", cur_request->initial_resource_versions().at("name2")); + EXPECT_EQ("version3A", cur_request->initial_resource_versions().at("name3")); } // Then, after updating the resources but not reconnecting the stream, verify that initial // versions are not sent. { - state_.updateSubscriptionInterest({"name4"}, {}); + updateSubscriptionInterest({"name4"}, {}); // The xDS server updates our resources, and gives us our newly requested one too. Protobuf::RepeatedPtrField add_all = populateRepeatedResource({{"name1", "version1B"}, {"name2", "version2B"}, {"name3", "version3B"}, {"name4", "version4A"}}); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(add_all, {}, "debugversion2"); - envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = - state_.getNextRequestAckless(); - EXPECT_TRUE(cur_request.initial_resource_versions().empty()); + auto cur_request = getNextRequestAckless(); + EXPECT_TRUE(cur_request->initial_resource_versions().empty()); } } -TEST_F(DeltaSubscriptionStateTest, CheckUpdatePending) { +TEST_P(DeltaSubscriptionStateTest, CheckUpdatePending) { // Note that the test fixture ctor causes the first request to be "sent", so we start in the // middle of a stream, with our initially interested resources having been requested already. - EXPECT_FALSE(state_.subscriptionUpdatePending()); - state_.updateSubscriptionInterest({}, {}); // no change - EXPECT_FALSE(state_.subscriptionUpdatePending()); - state_.markStreamFresh(); - EXPECT_TRUE(state_.subscriptionUpdatePending()); // no change, BUT fresh stream - state_.updateSubscriptionInterest({}, {"name3"}); // one removed - EXPECT_TRUE(state_.subscriptionUpdatePending()); - state_.updateSubscriptionInterest({"name3"}, {}); // one added - EXPECT_TRUE(state_.subscriptionUpdatePending()); + EXPECT_FALSE(subscriptionUpdatePending()); + updateSubscriptionInterest({}, {}); // no change + EXPECT_FALSE(subscriptionUpdatePending()); + markStreamFresh(); + EXPECT_TRUE(subscriptionUpdatePending()); // no change, BUT fresh stream + updateSubscriptionInterest({}, {"name3"}); // one removed + EXPECT_TRUE(subscriptionUpdatePending()); + updateSubscriptionInterest({"name3"}, {}); // one added + EXPECT_TRUE(subscriptionUpdatePending()); } // The next three tests test that duplicate resource names (whether additions or removals) cause // DeltaSubscriptionState to reject the update without even trying to hand it to the consuming // API's onConfigUpdate(). -TEST_F(DeltaSubscriptionStateTest, DuplicatedAdd) { +TEST_P(DeltaSubscriptionStateTest, DuplicatedAdd) { Protobuf::RepeatedPtrField additions = populateRepeatedResource({{"name1", "version1A"}, {"name1", "sdfsdfsdfds"}}); UpdateAck ack = deliverDiscoveryResponse(additions, {}, "debugversion1", absl::nullopt, false); @@ -431,7 +483,7 @@ TEST_F(DeltaSubscriptionStateTest, DuplicatedAdd) { ack.error_detail_.message()); } -TEST_F(DeltaSubscriptionStateTest, DuplicatedRemove) { +TEST_P(DeltaSubscriptionStateTest, DuplicatedRemove) { Protobuf::RepeatedPtrField removals; *removals.Add() = "name1"; *removals.Add() = "name1"; @@ -440,7 +492,7 @@ TEST_F(DeltaSubscriptionStateTest, DuplicatedRemove) { ack.error_detail_.message()); } -TEST_F(DeltaSubscriptionStateTest, AddedAndRemoved) { +TEST_P(DeltaSubscriptionStateTest, AddedAndRemoved) { Protobuf::RepeatedPtrField additions = populateRepeatedResource({{"name1", "version1A"}}); Protobuf::RepeatedPtrField removals; @@ -451,7 +503,7 @@ TEST_F(DeltaSubscriptionStateTest, AddedAndRemoved) { ack.error_detail_.message()); } -TEST_F(DeltaSubscriptionStateTest, ResourceTTL) { +TEST_P(DeltaSubscriptionStateTest, ResourceTTL) { Event::SimulatedTimeSystem time_system; time_system.setSystemTime(std::chrono::milliseconds(0)); @@ -476,53 +528,82 @@ TEST_F(DeltaSubscriptionStateTest, ResourceTTL) { }; { - EXPECT_CALL(*timer_, enabled()); - EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000), _)); + EXPECT_CALL(*ttl_timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enableTimer(std::chrono::milliseconds(1000), _)); deliverDiscoveryResponse(create_resource_with_ttl(std::chrono::seconds(1), true), {}, "debug1", "nonce1"); } { // Increase the TTL. - EXPECT_CALL(*timer_, enabled()); - EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(2000), _)); + EXPECT_CALL(*ttl_timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enableTimer(std::chrono::milliseconds(2000), _)); deliverDiscoveryResponse(create_resource_with_ttl(std::chrono::seconds(2), true), {}, "debug1", "nonce1", true, 1); } { // Refresh the TTL with a heartbeat. The resource should not be passed to the update callbacks. - EXPECT_CALL(*timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enabled()); deliverDiscoveryResponse(create_resource_with_ttl(std::chrono::seconds(2), false), {}, "debug1", "nonce1", true, 0); } // Remove the TTL. - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(create_resource_with_ttl(absl::nullopt, true), {}, "debug1", "nonce1", true, 1); // Add back the TTL. - EXPECT_CALL(*timer_, enabled()); - EXPECT_CALL(*timer_, enableTimer(_, _)); + EXPECT_CALL(*ttl_timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enableTimer(_, _)); deliverDiscoveryResponse(create_resource_with_ttl(std::chrono::seconds(2), true), {}, "debug1", "nonce1"); EXPECT_CALL(callbacks_, onConfigUpdate(_, _, _)); - EXPECT_CALL(*timer_, disableTimer()); + EXPECT_CALL(*ttl_timer_, disableTimer()); time_system.setSystemTime(std::chrono::seconds(2)); // Invoke the TTL. - timer_->invokeCallback(); + ttl_timer_->invokeCallback(); +} + +TEST_P(DeltaSubscriptionStateTest, TypeUrlMismatch) { + envoy::service::discovery::v3::DeltaDiscoveryResponse message; + + Protobuf::RepeatedPtrField additions; + auto* resource = additions.Add(); + resource->set_name("name1"); + resource->set_version("version1"); + resource->mutable_resource()->set_type_url("foo"); + + *message.mutable_resources() = additions; + *message.mutable_removed_resources() = {}; + message.set_system_version_info("version1"); + message.set_nonce("nonce1"); + message.set_type_url("bar"); + + EXPECT_CALL(callbacks_, + onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, _)) + .WillOnce(Invoke([](Envoy::Config::ConfigUpdateFailureReason, const EnvoyException* e) { + EXPECT_TRUE(IsSubstring("", "", + "type URL foo embedded in an individual Any does not match the " + "message-wide type URL bar", + e->what())); + })); + handleResponse(message); } class VhdsDeltaSubscriptionStateTest : public DeltaSubscriptionStateTestBase { public: VhdsDeltaSubscriptionStateTest() - : DeltaSubscriptionStateTestBase("envoy.config.route.v3.VirtualHost", false) {} + : DeltaSubscriptionStateTestBase("envoy.config.route.v3.VirtualHost", false, GetParam()) {} }; -TEST_F(VhdsDeltaSubscriptionStateTest, ResourceTTL) { +INSTANTIATE_TEST_SUITE_P(VhdsDeltaSubscriptionStateTest, VhdsDeltaSubscriptionStateTest, + testing::ValuesIn({LegacyOrUnified::Legacy, LegacyOrUnified::Unified})); + +TEST_P(VhdsDeltaSubscriptionStateTest, ResourceTTL) { Event::SimulatedTimeSystem time_system; time_system.setSystemTime(std::chrono::milliseconds(0)); @@ -545,12 +626,12 @@ TEST_F(VhdsDeltaSubscriptionStateTest, ResourceTTL) { return added_resources; }; - EXPECT_CALL(*timer_, enabled()); - EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(1000), _)); + EXPECT_CALL(*ttl_timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enableTimer(std::chrono::milliseconds(1000), _)); deliverDiscoveryResponse(create_resource_with_ttl(true), {}, "debug1", "nonce1", true, 1); // Heartbeat update should not be propagated to the subscription callback. - EXPECT_CALL(*timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enabled()); deliverDiscoveryResponse(create_resource_with_ttl(false), {}, "debug1", "nonce1", true, 0); // When runtime flag is disabled, maintain old behavior where we do propagate @@ -558,7 +639,7 @@ TEST_F(VhdsDeltaSubscriptionStateTest, ResourceTTL) { Runtime::LoaderSingleton::getExisting()->mergeValues( {{"envoy.reloadable_features.vhds_heartbeats", "false"}}); - EXPECT_CALL(*timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enabled()); deliverDiscoveryResponse(create_resource_with_ttl(false), {}, "debug1", "nonce1", true, 1); } diff --git a/test/common/config/sotw_subscription_state_test.cc b/test/common/config/sotw_subscription_state_test.cc new file mode 100644 index 0000000000000..b7d5b5bae581d --- /dev/null +++ b/test/common/config/sotw_subscription_state_test.cc @@ -0,0 +1,307 @@ +#include "envoy/config/endpoint/v3/endpoint.pb.h" +#include "envoy/config/endpoint/v3/endpoint.pb.validate.h" + +#include "source/common/config/resource_name.h" +#include "source/common/config/utility.h" +#include "source/common/config/xds_mux/sotw_subscription_state.h" +#include "source/common/stats/isolated_store_impl.h" + +#include "test/mocks/config/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/test_common/simulated_time_system.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::IsSubstring; +using testing::NiceMock; +using testing::Throw; +using testing::UnorderedElementsAre; + +namespace Envoy { +namespace Config { +namespace { + +class SotwSubscriptionStateTest : public testing::Test { +protected: + SotwSubscriptionStateTest() : resource_decoder_("cluster_name") { + ttl_timer_ = new Event::MockTimer(&dispatcher_); + state_ = std::make_unique( + Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3), + callbacks_, dispatcher_, resource_decoder_); + state_->updateSubscriptionInterest({"name1", "name2", "name3"}, {}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), UnorderedElementsAre("name1", "name2", "name3")); + } + + std::unique_ptr + getNextDiscoveryRequestAckless() { + return state_->getNextRequestAckless(); + } + + envoy::service::discovery::v3::Resource heartbeatResource(std::chrono::milliseconds ttl, + const std::string& name) { + envoy::service::discovery::v3::Resource resource; + resource.mutable_ttl()->CopyFrom(Protobuf::util::TimeUtil::MillisecondsToDuration(ttl.count())); + resource.set_name(name); + return resource; + } + + envoy::service::discovery::v3::Resource + resourceWithTtl(std::chrono::milliseconds ttl, + const envoy::config::endpoint::v3::ClusterLoadAssignment& cla) { + envoy::service::discovery::v3::Resource resource; + resource.mutable_resource()->PackFrom(cla); + resource.mutable_ttl()->CopyFrom(Protobuf::util::TimeUtil::MillisecondsToDuration(ttl.count())); + resource.set_name(cla.cluster_name()); + return resource; + } + + const envoy::config::endpoint::v3::ClusterLoadAssignment + resource(const std::string& cluster_name) { + envoy::config::endpoint::v3::ClusterLoadAssignment resource; + resource.set_cluster_name(cluster_name); + return resource; + } + + UpdateAck deliverDiscoveryResponse(const std::vector& resource_names, + const std::string& version_info, const std::string& nonce) { + envoy::service::discovery::v3::DiscoveryResponse response; + response.set_version_info(version_info); + response.set_nonce(nonce); + response.set_type_url(Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3)); + for (const auto& resource_name : resource_names) { + response.add_resources()->PackFrom(resource(resource_name)); + } + EXPECT_CALL(callbacks_, onConfigUpdate(_, version_info)); + return state_->handleResponse(response); + } + + UpdateAck + deliverDiscoveryResponseWithTtlResource(const envoy::service::discovery::v3::Resource& resource, + const std::string& version_info, + const std::string& nonce) { + envoy::service::discovery::v3::DiscoveryResponse response; + response.set_version_info(version_info); + response.set_nonce(nonce); + response.set_type_url(Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3)); + response.add_resources()->PackFrom(resource); + EXPECT_CALL(callbacks_, onConfigUpdate(_, version_info)); + return state_->handleResponse(response); + } + + UpdateAck deliverBadDiscoveryResponse(const std::string& version_info, const std::string& nonce) { + envoy::service::discovery::v3::DiscoveryResponse message; + message.set_version_info(version_info); + message.set_nonce(nonce); + EXPECT_CALL(callbacks_, onConfigUpdate(_, _)).WillOnce(Throw(EnvoyException("oh no"))); + return state_->handleResponse(message); + } + + NiceMock callbacks_; + TestUtility::TestOpaqueResourceDecoderImpl + resource_decoder_; + NiceMock dispatcher_; + Event::MockTimer* ttl_timer_; + // We start out interested in three resources: name1, name2, and name3. + std::unique_ptr state_; +}; + +// Basic gaining/losing interest in resources should lead to changes in subscriptions. +TEST_F(SotwSubscriptionStateTest, SubscribeAndUnsubscribe) { + { + state_->updateSubscriptionInterest({"name4"}, {"name1"}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), UnorderedElementsAre("name2", "name3", "name4")); + } + { + state_->updateSubscriptionInterest({"name1"}, {"name3", "name4"}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), UnorderedElementsAre("name1", "name2")); + } +} + +// Unlike delta, if SotW gets multiple interest updates before being able to send a request, they +// all collapse to a single update. However, even if the updates all cancel each other out, there +// still will be a request generated. All of the following tests explore different such cases. +TEST_F(SotwSubscriptionStateTest, RemoveThenAdd) { + state_->updateSubscriptionInterest({}, {"name3"}); + state_->updateSubscriptionInterest({"name3"}, {}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), UnorderedElementsAre("name1", "name2", "name3")); +} + +TEST_F(SotwSubscriptionStateTest, AddThenRemove) { + state_->updateSubscriptionInterest({"name4"}, {}); + state_->updateSubscriptionInterest({}, {"name4"}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), UnorderedElementsAre("name1", "name2", "name3")); +} + +TEST_F(SotwSubscriptionStateTest, AddRemoveAdd) { + state_->updateSubscriptionInterest({"name4"}, {}); + state_->updateSubscriptionInterest({}, {"name4"}); + state_->updateSubscriptionInterest({"name4"}, {}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), + UnorderedElementsAre("name1", "name2", "name3", "name4")); +} + +TEST_F(SotwSubscriptionStateTest, RemoveAddRemove) { + state_->updateSubscriptionInterest({}, {"name3"}); + state_->updateSubscriptionInterest({"name3"}, {}); + state_->updateSubscriptionInterest({}, {"name3"}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), UnorderedElementsAre("name1", "name2")); +} + +TEST_F(SotwSubscriptionStateTest, BothAddAndRemove) { + state_->updateSubscriptionInterest({"name4"}, {"name1", "name2", "name3"}); + state_->updateSubscriptionInterest({"name1", "name2", "name3"}, {"name4"}); + state_->updateSubscriptionInterest({"name4"}, {"name1", "name2", "name3"}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), UnorderedElementsAre("name4")); +} + +TEST_F(SotwSubscriptionStateTest, CumulativeUpdates) { + state_->updateSubscriptionInterest({"name4"}, {}); + state_->updateSubscriptionInterest({"name5"}, {}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_THAT(cur_request->resource_names(), + UnorderedElementsAre("name1", "name2", "name3", "name4", "name5")); +} + +TEST_F(SotwSubscriptionStateTest, LastUpdateNonceAndVersionUsed) { + EXPECT_CALL(*ttl_timer_, disableTimer()); + deliverDiscoveryResponse({"name1", "name2"}, "version1", "nonce1"); + state_->updateSubscriptionInterest({"name3"}, {}); + auto cur_request = getNextDiscoveryRequestAckless(); + EXPECT_EQ("nonce1", cur_request->response_nonce()); + EXPECT_EQ("version1", cur_request->version_info()); +} + +// Verifies that a sequence of good and bad responses from the server all get the appropriate +// ACKs/NACKs from Envoy. +TEST_F(SotwSubscriptionStateTest, AckGenerated) { + // The xDS server's first response includes items for name1 and 2, but not 3. + { + EXPECT_CALL(*ttl_timer_, disableTimer()); + UpdateAck ack = deliverDiscoveryResponse({"name1", "name2"}, "version1", "nonce1"); + EXPECT_EQ("nonce1", ack.nonce_); + EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); + } + // The next response updates 1 and 2, and adds 3. + { + EXPECT_CALL(*ttl_timer_, disableTimer()); + UpdateAck ack = deliverDiscoveryResponse({"name1", "name2", "name3"}, "version2", "nonce2"); + EXPECT_EQ("nonce2", ack.nonce_); + EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); + } + // The next response tries but fails to update all 3, and so should produce a NACK. + { + EXPECT_CALL(*ttl_timer_, disableTimer()); + UpdateAck ack = deliverBadDiscoveryResponse("version3", "nonce3"); + EXPECT_EQ("nonce3", ack.nonce_); + EXPECT_NE(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); + } + // The last response successfully updates all 3. + { + EXPECT_CALL(*ttl_timer_, disableTimer()); + UpdateAck ack = deliverDiscoveryResponse({"name1", "name2", "name3"}, "version4", "nonce4"); + EXPECT_EQ("nonce4", ack.nonce_); + EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); + } +} + +TEST_F(SotwSubscriptionStateTest, CheckUpdatePending) { + // Note that the test fixture ctor causes the first request to be "sent", so we start in the + // middle of a stream, with our initially interested resources having been requested already. + EXPECT_FALSE(state_->subscriptionUpdatePending()); + state_->updateSubscriptionInterest({}, {}); // no change + EXPECT_FALSE(state_->subscriptionUpdatePending()); + state_->markStreamFresh(); + EXPECT_TRUE(state_->subscriptionUpdatePending()); // no change, BUT fresh stream + state_->updateSubscriptionInterest({}, {"name3"}); // one removed + EXPECT_TRUE(state_->subscriptionUpdatePending()); + state_->updateSubscriptionInterest({"name3"}, {}); // one added + EXPECT_TRUE(state_->subscriptionUpdatePending()); +} + +TEST_F(SotwSubscriptionStateTest, HandleEstablishmentFailure) { + // Although establishment failure is not supposed to cause an onConfigUpdateFailed() on the + // ultimate actual subscription callbacks, the callbacks reference held is actually to + // the WatchMap, which then calls GrpcSubscriptionImpl(s). It is the GrpcSubscriptionImpl + // that will decline to pass on an onConfigUpdateFailed(ConnectionFailure). + EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _)); + state_->handleEstablishmentFailure(); +} + +TEST_F(SotwSubscriptionStateTest, ResourceTTL) { + Event::SimulatedTimeSystem time_system; + time_system.setSystemTime(std::chrono::milliseconds(0)); + { + EXPECT_CALL(*ttl_timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enableTimer(std::chrono::milliseconds(1000), _)); + deliverDiscoveryResponseWithTtlResource( + resourceWithTtl(std::chrono::seconds(1), resource("name1")), "debug1", "nonce1"); + } + + { + // Increase the TTL. + EXPECT_CALL(*ttl_timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enableTimer(std::chrono::milliseconds(2000), _)); + deliverDiscoveryResponseWithTtlResource( + resourceWithTtl(std::chrono::seconds(2), resource("name1")), "debug1", "nonce1"); + } + + { + // Refresh the TTL with a heartbeat. The resource should not be passed to the update callbacks. + EXPECT_CALL(*ttl_timer_, enabled()); + deliverDiscoveryResponseWithTtlResource(heartbeatResource(std::chrono::seconds(2), "name1"), + "debug1", "nonce1"); + } + + // Remove the TTL. + EXPECT_CALL(*ttl_timer_, disableTimer()); + deliverDiscoveryResponse({"name1"}, "version1", "nonce1"); + + // Add back the TTL. + EXPECT_CALL(*ttl_timer_, enabled()); + EXPECT_CALL(*ttl_timer_, enableTimer(_, _)); + deliverDiscoveryResponseWithTtlResource( + resourceWithTtl(std::chrono::seconds(2), resource("name1")), "debug1", "nonce1"); + + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, _)); + EXPECT_CALL(*ttl_timer_, disableTimer()); + time_system.setSystemTime(std::chrono::seconds(2)); + + // Invoke the TTL. + ttl_timer_->invokeCallback(); +} + +TEST_F(SotwSubscriptionStateTest, TypeUrlMismatch) { + envoy::service::discovery::v3::DiscoveryResponse response; + response.set_version_info("version1"); + response.set_nonce("nonce1"); + response.set_type_url("badtypeurl"); + response.add_resources()->PackFrom(resource("resource")); + EXPECT_CALL(callbacks_, + onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, _)) + .WillOnce(Invoke([](Envoy::Config::ConfigUpdateFailureReason, const EnvoyException* e) { + EXPECT_TRUE(IsSubstring( + "", "", + "type URL type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment embedded " + "in an individual Any does not match the message-wide type URL badtypeurl", + e->what())); + })); + EXPECT_CALL(*ttl_timer_, disableTimer()); + state_->handleResponse(response); +} + +} // namespace +} // namespace Config +} // namespace Envoy