diff --git a/docs/root/api-docs/xds_protocol.rst b/docs/root/api-docs/xds_protocol.rst index 71c5f646627bb..8efa8ea81daaf 100644 --- a/docs/root/api-docs/xds_protocol.rst +++ b/docs/root/api-docs/xds_protocol.rst @@ -427,13 +427,18 @@ names becomes empty, that means that the client is no longer interested in any r specified type. For :ref:`Listener ` and :ref:`Cluster ` resource -types, there is also a "wildcard" mode, which is triggered when the initial request on the stream -for that resource type contains no resource names. In this case, the server should use +types, there is also a "wildcard" mode, which comes in two flavors. First flavor, implicit, is triggered when the initial request on the stream +for that resource type contains no resource names. Second flavor, explicit, is triggered when a request +(not necessarily an initial one on the stream) for that resource type contains (among other names) a special name "*". +For wildcard requests, the server should use site-specific business logic to determine the full set of resources that the client is interested -in, typically based on the client's :ref:`node ` identification. Note -that once a stream has entered wildcard mode for a given resource type, there is no way to change -the stream out of wildcard mode; resource names specified in any subsequent request on the stream -will be ignored. +in, typically based on the client's :ref:`node ` identification. +The client can opt out from the wildcard mode by unsubscribing from the "*" resource name. Note that +opting back into the wildcard mode can only be done with a request containing the "*" resource name, +thus switching into the explicit wildcard mode. Also note that if client wants to express +an interest in a resource name after sending an empty initial request on the stream, the client +needs to switch to the explicit wildcard mode, otherwise resource names specified in subsequent +request on the stream will be ignored. Client Behavior """"""""""""""" @@ -535,6 +540,8 @@ being requested by the client, and if one of those resources springs into existe server must send an update to the client informing it of the new resource. Clients that initially see a resource that does not exist must be prepared for the resource to be created at any time. +.. _xds_protocol_unsubscribing_from_resources: + Unsubscribing From Resources ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -553,7 +560,10 @@ Note that for :ref:`Listener ` and resource types where the stream is in "wildcard" mode (see :ref:`How the client specifies what resources to return ` for details), the set of resources being subscribed to is determined by the server instead of the client, so there is no mechanism -for the client to unsubscribe from resources. +for the client to unsubscribe from resources. The only resources that the client could unsubscribe +from are the resources that the client explicitly expressed the interest in before. Note that +the server may still send the resource to the client if the resource was also a part of the set +of resources determined by the server from the wildcard subscription. Requesting Multiple Resources on a Single Stream ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index b00d52cbde733..57d4290c2a017 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -8,6 +8,7 @@ Incompatible Behavior Changes * grpc_bridge_filter: the filter no longer collects grpc stats in favor of the existing grpc stats filter. The behavior can be reverted by changing runtime key ``envoy.reloadable_features.grpc_bridge_stats_disabled``. * tracing: update Apache SkyWalking tracer version to be compatible with 8.4.0 data collect protocol. This change will introduce incompatibility with SkyWalking 8.3.0. +* xds: added the explicit wildcard mode, which allows subscribing to specific resource names on top of a wildcard subscription. This means that the resource name ``*`` is reserved. This may mean that in some cases the initial wildcard subscription request on the stream will not be empty, but will have a non-empty list of resources with a special name among them. See :ref:`wildcard mode description ` and :ref:`unsubscribing from resources ` for details. Minor Behavior Changes ---------------------- diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 14744078e69a9..1c0034f0e6ff5 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -39,7 +39,7 @@ namespace Envoy { namespace Upstream { /** - * ClusterUpdateCallbacks provide a way to exposes Cluster lifecycle events in the + * ClusterUpdateCallbacks provide a way to expose Cluster lifecycle events in the * ClusterManager. */ class ClusterUpdateCallbacks { @@ -72,6 +72,76 @@ class ClusterUpdateCallbacksHandle { using ClusterUpdateCallbacksHandlePtr = std::unique_ptr; +/** + * Status enum for the result of an attempted cluster discovery. + */ +enum class ClusterDiscoveryStatus { + /** + * The discovery process timed out. This means that we haven't yet received any reply from + * on-demand CDS about it. + */ + Timeout, + /** + * The discovery process has concluded and on-demand CDS has no such cluster. + */ + Missing, + /** + * Cluster found and currently available through ClusterManager. + */ + Available, +}; + +/** + * ClusterDiscoveryCallback is a callback called at the end of the on-demand cluster discovery + * process. The status of the discovery is sent as a parameter. + */ +using ClusterDiscoveryCallback = std::function; +using ClusterDiscoveryCallbackPtr = std::unique_ptr; + +/** + * ClusterDiscoveryCallbackHandle is a RAII wrapper for a ClusterDiscoveryCallback. Deleting the + * ClusterDiscoveryCallbackHandle will remove the callbacks from ClusterManager. + */ +class ClusterDiscoveryCallbackHandle { +public: + virtual ~ClusterDiscoveryCallbackHandle() = default; +}; + +using ClusterDiscoveryCallbackHandlePtr = std::unique_ptr; + +/** + * A handle to an on-demand CDS. + */ +class OdCdsApiHandle { +public: + virtual ~OdCdsApiHandle() = default; + + /** + * Request an on-demand discovery of a cluster with a passed name. This ODCDS may be used to + * perform the discovery process in the main thread if there is no discovery going on for this + * cluster. When the requested cluster is added and warmed up, the passed callback will be invoked + * in the same thread that invoked this function. + * + * The returned handle can be destroyed to prevent the callback to be invoked. Note that the + * handle can only be destroyed in the same thread that invoked the function. Destroying the + * handle might not stop the discovery process, though. As soon as the callback is invoked, + * destroying the handle does nothing. It is a responsibility of the caller to make sure that the + * objects captured in the callback outlive the callback. + * + * This function is thread-safe. + * + * @param name is the name of the cluster to be discovered. + * @param callback will be called when the discovery is finished. + * @param timeout describes how long the operation may take before failing. + * @return ClusterDiscoveryCallbackHandlePtr the discovery process handle. + */ + virtual ClusterDiscoveryCallbackHandlePtr + requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout) PURE; +}; + +using OdCdsApiHandlePtr = std::unique_ptr; + class ClusterManagerFactory; // These are per-cluster per-thread, so not "global" stats. @@ -309,6 +379,19 @@ class ClusterManager { virtual const ClusterRequestResponseSizeStatNames& clusterRequestResponseSizeStatNames() const PURE; virtual const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const PURE; + + /** + * Allocates an on-demand CDS API provider from configuration proto or locator. + * + * @param odcds_config is a configuration proto. Used when odcds_resources_locator is a nullopt. + * @param odcds_resources_locator is a locator for ODCDS. Used over odcds_config if not a nullopt. + * @param validation_visitor + * @return OdCdsApiHandlePtr the ODCDS handle. + */ + virtual OdCdsApiHandlePtr + allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ProtobufMessage::ValidationVisitor& validation_visitor) PURE; }; using ClusterManagerPtr = std::unique_ptr; diff --git a/source/common/config/delta_subscription_state.cc b/source/common/config/delta_subscription_state.cc index 215dc55602fbc..28df497c26bbe 100644 --- a/source/common/config/delta_subscription_state.cc +++ b/source/common/config/delta_subscription_state.cc @@ -22,21 +22,23 @@ DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url, [this](const auto& expired) { Protobuf::RepeatedPtrField removed_resources; for (const auto& resource : expired) { - setResourceWaitingForServer(resource); - removed_resources.Add(std::string(resource)); + if (setResourceWaitingForServer(resource)) { + removed_resources.Add(std::string(resource)); + } } watch_map_.onConfigUpdate({}, removed_resources, ""); }, dispatcher, dispatcher.timeSource()), - type_url_(std::move(type_url)), wildcard_(wildcard), watch_map_(watch_map), + type_url_(std::move(type_url)), + mode_(wildcard ? WildcardMode::Implicit : WildcardMode::Disabled), watch_map_(watch_map), local_info_(local_info), dispatcher_(dispatcher) {} void DeltaSubscriptionState::updateSubscriptionInterest( const absl::flat_hash_set& cur_added, const absl::flat_hash_set& cur_removed) { for (const auto& a : cur_added) { - setResourceWaitingForServer(a); + addResourceWaitingForServer(a, ResourceType::ExplicitlyRequested); // If interest in a resource is removed-then-added (all before a discovery request // can be sent), we must treat it as a "new" addition: our user may have forgotten its // copy of the resource after instructing us to remove it, and need to be reminded of it. @@ -53,6 +55,31 @@ void DeltaSubscriptionState::updateSubscriptionInterest( names_added_.erase(r); names_removed_.insert(r); } + switch (mode_) { + case WildcardMode::Implicit: + if (names_removed_.find("*") != names_removed_.end()) { + // we explicitly cancel the wildcard subscription + mode_ = WildcardMode::Disabled; + } else if (!names_added_.empty()) { + // switch to explicit mode if we requested some extra names + mode_ = WildcardMode::Explicit; + } + break; + + case WildcardMode::Explicit: + if (names_removed_.find("*") != names_removed_.end()) { + // we explicitly cancel the wildcard subscription + mode_ = WildcardMode::Disabled; + } + break; + + case WildcardMode::Disabled: + if (names_added_.find("*") != names_added_.end()) { + // we switch into an explicit wildcard subscription + mode_ = WildcardMode::Explicit; + } + break; + } } // Not having sent any requests yet counts as an "update pending" since you're supposed to resend @@ -124,7 +151,7 @@ void DeltaSubscriptionState::handleGoodResponse( { const auto scoped_update = ttl_.scopedTtlUpdate(); for (const auto& resource : message.resources()) { - addResourceState(resource); + addResourceState(resource, ResourceType::ReceivedFromServer); } } @@ -140,9 +167,7 @@ void DeltaSubscriptionState::handleGoodResponse( // initial_resource_versions messages, but will remind us to explicitly tell the server "I'm // cancelling my subscription" when we lose interest. for (const auto& resource_name : message.removed_resources()) { - if (resource_names_.find(resource_name) != resource_names_.end()) { - setResourceWaitingForServer(resource_name); - } + setResourceWaitingForServer(resource_name); } ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_, message.resources().size(), message.removed_resources().size()); @@ -177,16 +202,20 @@ DeltaSubscriptionState::getNextRequestAckless() { if (!resource_state.waitingForServer()) { (*request.mutable_initial_resource_versions())[resource_name] = resource_state.version(); } - // As mentioned above, fill resource_names_subscribe with everything, including names we - // have yet to receive any resource for unless this is a wildcard subscription, for which - // the first request on a stream must be without any resource names. - if (!wildcard_) { + // Add resource names to resource_names_subscribe only if this is not a wildcard subscription + // request or if we requested this resource explicitly (so we are actually in explicit + // wildcard mode). + if (mode_ == WildcardMode::Disabled || + resource_state.type() == ResourceType::ExplicitlyRequested) { names_added_.insert(resource_name); } } - // Wildcard subscription initial requests must have no resource_names_subscribe. - if (wildcard_) { - names_added_.clear(); + // We are not clearing the names_added_ set. If we are in implicit wildcard subscription mode, + // then the set should already be empty. If we are in explicit wildcard mode then the set will + // contain the names we explicitly requested, but we need to add * to the list to make sure it's + // sent too. + if (mode_ == WildcardMode::Explicit) { + names_added_.insert("*"); } names_removed_.clear(); } @@ -214,7 +243,7 @@ DeltaSubscriptionState::getNextRequestWithAck(const UpdateAck& ack) { } void DeltaSubscriptionState::addResourceState( - const envoy::service::discovery::v3::Resource& resource) { + const envoy::service::discovery::v3::Resource& resource, ResourceType type) { if (resource.has_ttl()) { ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())), resource.name()); @@ -222,18 +251,36 @@ void DeltaSubscriptionState::addResourceState( ttl_.clear(resource.name()); } - resource_state_[resource.name()] = ResourceState(resource); - resource_names_.insert(resource.name()); + if (auto it = resource_state_.find(resource.name()); it != resource_state_.end()) { + auto old_type = it->second.type(); + it->second = ResourceState(resource, effectiveResourceType(old_type, type)); + } else { + resource_state_.insert({resource.name(), ResourceState(resource, type)}); + } +} + +bool DeltaSubscriptionState::setResourceWaitingForServer(const std::string& resource_name) { + auto itr = resource_state_.find(resource_name); + if (itr == resource_state_.end()) { + return false; + } + auto old_type = itr->second.type(); + itr->second = ResourceState(old_type); + return true; } -void DeltaSubscriptionState::setResourceWaitingForServer(const std::string& resource_name) { - resource_state_[resource_name] = ResourceState(); - resource_names_.insert(resource_name); +void DeltaSubscriptionState::addResourceWaitingForServer(const std::string& resource_name, + ResourceType type) { + if (auto it = resource_state_.find(resource_name); it != resource_state_.end()) { + auto old_type = it->second.type(); + it->second = ResourceState(effectiveResourceType(old_type, type)); + } else { + resource_state_.insert({resource_name, ResourceState(type)}); + } } void DeltaSubscriptionState::removeResourceState(const std::string& resource_name) { resource_state_.erase(resource_name); - resource_names_.erase(resource_name); } } // namespace Config diff --git a/source/common/config/delta_subscription_state.h b/source/common/config/delta_subscription_state.h index 7c478002ce379..0db953696c4cf 100644 --- a/source/common/config/delta_subscription_state.h +++ b/source/common/config/delta_subscription_state.h @@ -59,13 +59,36 @@ class DeltaSubscriptionState : public Logger::Loggable { void handleGoodResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message); void handleBadResponse(const EnvoyException& e, UpdateAck& ack); + // This enumeration describes the resource type, which is only relevant for wildcard + // subscriptions. Depending on its type, the resource will or will not be resent on the initial + // wildcard subscription. + enum class ResourceType { + // Explicitly requested resource type means that we have asked about the resource by updating + // the subscription interest. Such resources are resent on the initial wildcard request. + ExplicitlyRequested, + // Received from server resources are resources that the state knows about only from the server + // response. Such resources are not resent on the initial wildcard request. + ReceivedFromServer, + }; + + // Determines the effective resource type. Explicitly requested type overrides the received from + // server type. + ResourceType effectiveResourceType(ResourceType old_type, ResourceType new_type) { + return (old_type == ResourceType::ReceivedFromServer) ? new_type : old_type; + } + class ResourceState { public: - ResourceState(const envoy::service::discovery::v3::Resource& resource) - : version_(resource.version()) {} + ResourceState(absl::optional version, ResourceType type) + : version_(std::move(version)), type_(type) {} + + ResourceState(const envoy::service::discovery::v3::Resource& resource, ResourceType type) + : ResourceState(resource.version(), type) {} // Builds a ResourceState in the waitingForServer state. - ResourceState() = default; + ResourceState(ResourceType type) : ResourceState(absl::nullopt, type) {} + + ResourceType type() const { return type_; } // If true, we currently have no version of this resource - we are waiting for the server to // provide us with one. @@ -79,14 +102,25 @@ class DeltaSubscriptionState : public Logger::Loggable { private: absl::optional version_; + ResourceType type_; }; - // Use these helpers to ensure resource_state_ and resource_names_ get updated together. - void addResourceState(const envoy::service::discovery::v3::Resource& resource); - void setResourceWaitingForServer(const std::string& resource_name); - void removeResourceState(const std::string& resource_name); + // Describes the wildcard mode the subscription is in. + enum class WildcardMode { + // This mode is being expressed by sending a wildcard subscription request with an empty + // resource subscription list. + Implicit, + // This mode is being expressed by sending a wildcard subscription request that contains "*" + // special name in the resource subscription list. + Explicit, + // This mode is means no wildcard subscription. + Disabled, + }; - void populateDiscoveryRequest(envoy::service::discovery::v3::DeltaDiscoveryResponse& request); + void addResourceState(const envoy::service::discovery::v3::Resource& resource, ResourceType type); + bool setResourceWaitingForServer(const std::string& resource_name); + void addResourceWaitingForServer(const std::string& resource_name, ResourceType type); + void removeResourceState(const std::string& resource_name); // A map from resource name to per-resource version. The keys of this map are exactly the resource // names we are currently interested in. Those in the waitingForServer state currently don't have @@ -99,13 +133,9 @@ class DeltaSubscriptionState : public Logger::Loggable { // disable heartbeats for these resources (currently only VHDS). const bool supports_heartbeats_; TtlManager ttl_; - // The keys of resource_versions_. Only tracked separately because std::map does not provide an - // iterator into just its keys. - absl::flat_hash_set resource_names_; const std::string type_url_; - // Is the subscription is for a wildcard request. - const bool wildcard_; + WildcardMode mode_; UntypedConfigUpdateCallbacks& watch_map_; const LocalInfo::LocalInfo& local_info_; Event::Dispatcher& dispatcher_; diff --git a/source/common/config/new_grpc_mux_impl.cc b/source/common/config/new_grpc_mux_impl.cc index 62a2ba950d155..91b4134b0c5f7 100644 --- a/source/common/config/new_grpc_mux_impl.cc +++ b/source/common/config/new_grpc_mux_impl.cc @@ -128,7 +128,10 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url, auto entry = subscriptions_.find(type_url); if (entry == subscriptions_.end()) { // We don't yet have a subscription for type_url! Make one! - addSubscription(type_url, options.use_namespace_matching_, resources.empty()); + // No resources or an existence of the special name implies that + // this is a wildcard request subscription. + const bool wildcard = resources.empty() || (resources.find("*") != resources.end()); + addSubscription(type_url, options.use_namespace_matching_, wildcard); return addWatch(type_url, resources, callbacks, resource_decoder, options); } diff --git a/source/common/config/watch_map.cc b/source/common/config/watch_map.cc index fd65b5375b1f9..9abbb8e77600c 100644 --- a/source/common/config/watch_map.cc +++ b/source/common/config/watch_map.cc @@ -49,7 +49,8 @@ void WatchMap::removeDeferredWatches() { AddedRemoved WatchMap::updateWatchInterest(Watch* watch, const absl::flat_hash_set& update_to_these_names) { - if (update_to_these_names.empty()) { + if (update_to_these_names.empty() || + update_to_these_names.find("*") != update_to_these_names.end()) { wildcard_watches_.insert(watch); } else { wildcard_watches_.erase(watch); diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index c8f8a219b17a8..4f43eee78b82e 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -44,14 +44,46 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "od_cds_api_lib", + srcs = ["od_cds_api_impl.cc"], + hdrs = ["od_cds_api_impl.h"], + deps = [ + ":cds_api_helper_lib", + "//include/envoy/config:subscription_interface", + "//include/envoy/protobuf:message_validator_interface", + "//include/envoy/stats:stats_interface", + "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:minimal_logger_lib", + "//source/common/config:subscription_base_interface", + "//source/common/grpc:common_lib", + "//source/common/protobuf", + "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + +envoy_cc_library( + name = "cluster_discovery_manager_lib", + srcs = ["cluster_discovery_manager.cc"], + hdrs = ["cluster_discovery_manager.h"], + deps = [ + "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:enum_to_int", + "//source/common/common:minimal_logger_lib", + ], +) + envoy_cc_library( name = "cluster_manager_lib", srcs = ["cluster_manager_impl.cc"], hdrs = ["cluster_manager_impl.h"], deps = [ ":cds_api_lib", + ":cluster_discovery_manager_lib", ":load_balancer_lib", ":load_stats_reporter_lib", + ":od_cds_api_lib", ":ring_hash_lb_lib", ":subset_lb_lib", "//include/envoy/api:api_interface", diff --git a/source/common/upstream/cds_api_helper.h b/source/common/upstream/cds_api_helper.h index a7a91c4ebe46a..ef88afddaf3c3 100644 --- a/source/common/upstream/cds_api_helper.h +++ b/source/common/upstream/cds_api_helper.h @@ -37,7 +37,7 @@ class CdsApiHelper : Logger::Loggable { private: ClusterManager& cm_; - std::string name_; + const std::string name_; std::string system_version_info_; }; diff --git a/source/common/upstream/cluster_discovery_manager.cc b/source/common/upstream/cluster_discovery_manager.cc new file mode 100644 index 0000000000000..5222a4255b465 --- /dev/null +++ b/source/common/upstream/cluster_discovery_manager.cc @@ -0,0 +1,171 @@ +#include "common/upstream/cluster_discovery_manager.h" + +#include + +#include "common/common/enum_to_int.h" + +namespace Envoy { +namespace Upstream { + +namespace { + +using ClusterAddedCb = std::function; + +class ClusterCallbacks : public ClusterUpdateCallbacks { +public: + ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {} + + void onClusterAddOrUpdate(ThreadLocalCluster& cluster) override { cb_(cluster); }; + + void onClusterRemoval(const std::string&) override {} + +private: + ClusterAddedCb cb_; +}; + +} // namespace + +ClusterDiscoveryManager::ClusterDiscoveryManager( + std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler) + : thread_name_(std::move(thread_name)) { + callbacks_ = std::make_unique([this](ThreadLocalCluster& cluster) { + ENVOY_LOG(trace, + "cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle " + "callback in {}", + cluster.info()->name(), enumToInt(ClusterDiscoveryStatus::Available), thread_name_); + processClusterName(cluster.info()->name(), ClusterDiscoveryStatus::Available); + }); + callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_); +} + +void ClusterDiscoveryManager::processClusterName(absl::string_view name, + ClusterDiscoveryStatus cluster_status) { + auto callback_items = extractCallbackList(name); + if (callback_items.empty()) { + ENVOY_LOG(trace, "cm cdm: no callbacks for the cluster name {} in {}", name, thread_name_); + return; + } + ENVOY_LOG(trace, "cm cdm: invoking {} callbacks for the cluster name {} in {}", + callback_items.size(), name, thread_name_); + for (auto& item : callback_items) { + auto callback = std::move(item->callback_); + // This invalidates the handle and the invoker. + item.reset(); + // The callback could be null when handle was destroyed during the + // previous callback. + if (callback != nullptr) { + (*callback)(cluster_status); + } + } +} + +ClusterDiscoveryManager::AddedCallbackData +ClusterDiscoveryManager::addCallback(std::string name, ClusterDiscoveryCallbackPtr callback) { + ENVOY_LOG(trace, "cm cdm: adding callback for the cluster name {} in {}", name, thread_name_); + auto& callbacks_list = pending_clusters_[name]; + auto item_weak_ptr = addCallbackInternal(callbacks_list, std::move(callback)); + auto handle = std::make_unique(*this, name, item_weak_ptr); + CallbackInvoker invoker(*this, std::move(name), std::move(item_weak_ptr)); + auto discovery_in_progress = (callbacks_list.size() > 1); + return {std::move(handle), discovery_in_progress, std::move(invoker)}; +} + +void ClusterDiscoveryManager::swap(ClusterDiscoveryManager& other) { + thread_name_.swap(other.thread_name_); + pending_clusters_.swap(other.pending_clusters_); + callbacks_.swap(other.callbacks_); + callbacks_handle_.swap(other.callbacks_handle_); +} + +void ClusterDiscoveryManager::invokeCallbackFromItem(absl::string_view name, + CallbackListItemWeakPtr item_weak_ptr, + ClusterDiscoveryStatus cluster_status) { + auto item_ptr = item_weak_ptr.lock(); + if (item_ptr == nullptr) { + ENVOY_LOG(trace, "cm cdm: not invoking an already stale callback for cluster {} in {}", name, + thread_name_); + return; + } + ENVOY_LOG(trace, "cm cdm: invoking a callback for cluster {} in {}", name, thread_name_); + auto callback = std::move(item_ptr->callback_); + if (item_ptr->self_iterator_.has_value()) { + eraseItem(name, std::move(item_ptr)); + } else { + ENVOY_LOG(trace, + "cm cdm: the callback for cluster {} in {} is prepared for invoking during " + "processing, yet some other callback tries to invoke this callback earlier", + name, thread_name_); + } + if (callback != nullptr) { + (*callback)(cluster_status); + } else { + ENVOY_LOG(trace, "cm cdm: the callback for cluster {} in {} is prepared for invoking during " + "processing, yet some other callback destroyed its handle in the meantime"); + } +} + +ClusterDiscoveryManager::CallbackList +ClusterDiscoveryManager::extractCallbackList(absl::string_view name) { + auto map_node_handle = pending_clusters_.extract(name); + if (map_node_handle.empty()) { + return {}; + } + CallbackList extracted; + map_node_handle.mapped().swap(extracted); + for (auto& item : extracted) { + item->self_iterator_.reset(); + } + return extracted; +} + +ClusterDiscoveryManager::CallbackListItemWeakPtr +ClusterDiscoveryManager::addCallbackInternal(CallbackList& list, + ClusterDiscoveryCallbackPtr callback) { + auto item = std::make_shared(std::move(callback)); + auto it = list.emplace(list.end(), item); + item->self_iterator_ = std::move(it); + return item; +} + +void ClusterDiscoveryManager::erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr) { + auto item_ptr = item_weak_ptr.lock(); + if (item_ptr == nullptr) { + ENVOY_LOG(trace, "cm cdm: not dropping a stale callback for the cluster name {} in {}", name, + thread_name_); + return; + } + ENVOY_LOG(trace, "cm cdm: dropping callback for the cluster name {} in {}", name, thread_name_); + if (!item_ptr->self_iterator_.has_value()) { + ENVOY_LOG(trace, + "cm cdm: callback for the cluster name {} in {} is not on the callbacks list " + "anymore, which means it is about to be invoked; preventing it", + name, thread_name_); + item_ptr->callback_.reset(); + return; + } + eraseItem(name, std::move(item_ptr)); +} + +void ClusterDiscoveryManager::eraseItem(absl::string_view name, + CallbackListItemSharedPtr item_ptr) { + ASSERT(item_ptr != nullptr); + ASSERT(item_ptr->self_iterator_.has_value()); + const bool drop_list = eraseFromList(name, item_ptr->self_iterator_.value()); + item_ptr->self_iterator_.reset(); + if (drop_list) { + ENVOY_LOG(trace, "cm cdm: dropped last callback for the cluster name {} in {}", name, + thread_name_); + pending_clusters_.erase(name); + } +} + +bool ClusterDiscoveryManager::eraseFromList(absl::string_view name, CallbackListIterator it) { + auto map_it = pending_clusters_.find(name); + ASSERT(map_it != pending_clusters_.end()); + auto& list = map_it->second; + list.erase(it); + return list.empty(); +} + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/cluster_discovery_manager.h b/source/common/upstream/cluster_discovery_manager.h new file mode 100644 index 0000000000000..26cf68736ce40 --- /dev/null +++ b/source/common/upstream/cluster_discovery_manager.h @@ -0,0 +1,175 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/upstream/cluster_manager.h" + +#include "common/common/logger.h" + +#include "absl/container/flat_hash_map.h" +#include "absl/strings/string_view.h" +#include "absl/types/optional.h" + +namespace Envoy { +namespace Upstream { + +/** + * A base class for cluster lifecycle handler. Mostly to avoid a dependency on + * ThreadLocalClusterManagerImpl in ClusterDiscoveryManager. + */ +class ClusterLifecycleCallbackHandler { +public: + virtual ~ClusterLifecycleCallbackHandler() = default; + + virtual ClusterUpdateCallbacksHandlePtr + addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) PURE; +}; + +/** A thread-local on-demand cluster discovery manager. It takes care of invoking the discovery + * callbacks in the event of a finished discovery. It does it by installing a cluster lifecycle + * callback that invokes the discovery callbacks when a matching cluster just got added. + * + * The manager is the sole owner of the added discovery callbacks. The only way to remove the + * callback from the manager is by destroying the discovery handle. + */ +class ClusterDiscoveryManager : Logger::Loggable { +private: + struct CallbackListItem; + using CallbackListItemSharedPtr = std::shared_ptr; + using CallbackListItemWeakPtr = std::weak_ptr; + using CallbackList = std::list; + using CallbackListIterator = CallbackList::iterator; + +public: + /** + * This class is used in a case when the cluster manager in the main thread notices that it + * already has the requested cluster, so instead of starting the discovery process, it schedules + * the invocation of the callback back to the thread that made the request. Invoking the request + * removes it from the manager. + */ + class CallbackInvoker { + public: + void invokeCallback(ClusterDiscoveryStatus cluster_status) const { + parent_.invokeCallbackFromItem(name_, item_weak_ptr_, cluster_status); + } + + private: + friend class ClusterDiscoveryManager; + + CallbackInvoker(ClusterDiscoveryManager& parent, std::string name, + CallbackListItemWeakPtr item_weak_ptr) + : parent_(parent), name_(std::move(name)), item_weak_ptr_(std::move(item_weak_ptr)) {} + + ClusterDiscoveryManager& parent_; + const std::string name_; + CallbackListItemWeakPtr item_weak_ptr_; + }; + + ClusterDiscoveryManager(std::string thread_name, + ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler); + + /** + * Invoke the callbacks for the given cluster name. The discovery status is passed to the + * callbacks. After invoking the callbacks, they are dropped from the manager. + */ + void processClusterName(absl::string_view name, ClusterDiscoveryStatus cluster_status); + + /** + * A struct containing a discovery handle, information whether a discovery for a given cluster + * was already requested in this thread, and an immediate invocation context. + */ + struct AddedCallbackData { + ClusterDiscoveryCallbackHandlePtr handle_ptr_; + bool discovery_in_progress_; + CallbackInvoker invoker_; + }; + + /** + * Adds the discovery callback. Returns a handle and a boolean indicating whether this worker + * thread has already requested the discovery of a cluster with a given name. + */ + AddedCallbackData addCallback(std::string name, ClusterDiscoveryCallbackPtr callback); + + /** + * Swaps this manager with another. Used for tests only. + */ + void swap(ClusterDiscoveryManager& other); + +private: + /** + * An item in the callbacks list. It contains the iterator to itself inside the callbacks + * list. Since the list contains shared pointers to items, we know that the iterator is valid as + * long as the item is alive. + */ + struct CallbackListItem { + CallbackListItem(ClusterDiscoveryCallbackPtr callback) : callback_(std::move(callback)) {} + + ClusterDiscoveryCallbackPtr callback_; + absl::optional self_iterator_; + }; + + /** + * An implementation of discovery handle. Destroy it to drop the callback from the discovery + * manager. It won't stop the discovery process, though. + */ + class ClusterDiscoveryCallbackHandleImpl : public ClusterDiscoveryCallbackHandle { + public: + ClusterDiscoveryCallbackHandleImpl(ClusterDiscoveryManager& parent, std::string name, + CallbackListItemWeakPtr item_weak_ptr) + : parent_(parent), name_(std::move(name)), item_weak_ptr_(std::move(item_weak_ptr)) {} + + ~ClusterDiscoveryCallbackHandleImpl() override { + parent_.erase(name_, std::move(item_weak_ptr_)); + } + + private: + ClusterDiscoveryManager& parent_; + const std::string name_; + CallbackListItemWeakPtr item_weak_ptr_; + }; + + /** + * Invokes a callback stored in the item and removes it from the callbacks list, so it won't be + * invoked again. + */ + void invokeCallbackFromItem(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr, + ClusterDiscoveryStatus cluster_status); + + /** + * Extracts the list of callbacks from the pending_clusters_ map. This action invalidates the + * self iterators in the items, so destroying the handle won't try to erase the element from the + * list using an invalid iterator. + */ + CallbackList extractCallbackList(absl::string_view name); + /** + * Creates and sets up the callback list item, adds to the list and returns a weak_ptr to the + * item. + */ + CallbackListItemWeakPtr addCallbackInternal(CallbackList& list, + ClusterDiscoveryCallbackPtr callback); + /** + * Drops the callback item from the discovery manager. It the item wasn't stale, the callback + * will not be invoked. Called when the discovery handle is destroyed. + */ + void erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr); + /** + * Drops the callback item from the discovery manager. + */ + void eraseItem(absl::string_view name, CallbackListItemSharedPtr item_ptr); + /** + * Try to erase a callback from under the given iterator. It returns a boolean value indicating + * whether the dropped callback was a last one for the given cluster. + */ + bool eraseFromList(absl::string_view name, CallbackListIterator it); + + std::string thread_name_; + absl::flat_hash_map pending_clusters_; + std::unique_ptr callbacks_; + ClusterUpdateCallbacksHandlePtr callbacks_handle_; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 84825e941ab29..7d419f218e6a9 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -970,6 +970,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_ per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor(); } + pending_cluster_creations_.erase(cm_cluster.cluster().info()->name()); tls_.runOnAllThreads( [info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster, load_balancer_factory](OptRef cluster_manager) { @@ -1037,7 +1038,120 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient ClusterUpdateCallbacksHandlePtr ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) { ThreadLocalClusterManagerImpl& cluster_manager = *tls_; - return std::make_unique(cb, cluster_manager.update_callbacks_); + return cluster_manager.addClusterUpdateCallbacks(cb); +} + +OdCdsApiHandlePtr +ClusterManagerImpl::allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ProtobufMessage::ValidationVisitor& validation_visitor) { + // TODO(krnowak): Instead of creating a new handle every time, store the handles internally and + // return an already existing one if the config or locator matches. Note that this may need a way + // to clean up the unused handles, so we can close the unnecessary connections. + auto odcds = OdCdsApiImpl::create(odcds_config, odcds_resources_locator, *this, *this, stats_, + validation_visitor); + return OdCdsApiHandleImpl::create(*this, std::move(odcds)); +} + +ClusterDiscoveryCallbackHandlePtr +ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name, + ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout) { + ThreadLocalClusterManagerImpl& cluster_manager = *tls_; + + auto [handle, discovery_in_progress, invoker] = + cluster_manager.cdm_.addCallback(name, std::move(callback)); + // This check will catch requests for discoveries from this thread only. If other thread requested + // the same discovery, we will detect it in the main thread later. + if (discovery_in_progress) { + ENVOY_LOG(debug, + "cm odcds: on-demand discovery for cluster {} is already in progress, something else " + "in thread {} has already requested it", + name, cluster_manager.thread_local_dispatcher_.name()); + // This worker thread has already requested a discovery of a cluster with this name, so nothing + // more left to do here. + return std::move(handle); + } + ENVOY_LOG( + debug, + "cm odcds: forwarding the on-demand discovery request for cluster {} to the main thread", + name); + // This seems to be the first request for discovery of this cluster in this worker thread. Rest of + // the process may only happen in the main thread. + dispatcher_.post([this, odcds = std::move(odcds), timeout, name = std::move(name), + invoker = std::move(invoker), + &thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] { + // Check for the cluster here too. It might have been added between the time when this closure + // was posted and when it is being executed. + if (getThreadLocalCluster(name) != nullptr) { + ENVOY_LOG( + debug, + "cm odcds: the requested cluster {} is already known, posting the callback back to {}", + name, thread_local_dispatcher.name()); + thread_local_dispatcher.post([invoker = std::move(invoker)] { + invoker.invokeCallback(ClusterDiscoveryStatus::Available); + }); + return; + } + + if (auto it = pending_cluster_creations_.find(name); it != pending_cluster_creations_.end()) { + ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name); + // We already began the discovery process for this cluster, nothing to do. If we got here, it + // means that it was other worker thread that requested the discovery. + return; + } + // Start the discovery. If the cluster gets discovered, cluster manager will warm it up and + // invoke the cluster lifecycle callbacks, that will in turn invoke our callback. + odcds->updateOnDemand(name); + // Setup the discovery timeout timer to avoid keeping callbacks indefinitely. + auto timer = dispatcher_.createTimer([this, name] { notifyExpiredDiscovery(name); }); + timer->enableTimer(timeout); + // Keep odcds handle alive for the duration of the discovery process. + pending_cluster_creations_.insert( + {std::move(name), ClusterCreation{std::move(odcds), std::move(timer)}}); + }); + + return std::move(handle); +} + +void ClusterManagerImpl::notifyMissingCluster(absl::string_view name) { + ENVOY_LOG(debug, "cm odcds: cluster {} not found during on-demand discovery", name); + notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Missing); +} + +void ClusterManagerImpl::notifyExpiredDiscovery(absl::string_view name) { + ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} timed out", name); + notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Timeout); +} + +void ClusterManagerImpl::notifyClusterDiscoveryStatus(absl::string_view name, + ClusterDiscoveryStatus status) { + auto map_node_handle = pending_cluster_creations_.extract(name); + if (map_node_handle.empty()) { + // Not a cluster we are interested in. This may happen when ODCDS + // receives some cluster name in removed resources field and + // notifies the cluster manager about it. + return; + } + // Let all the worker threads know that the discovery timed out. + tls_.runOnAllThreads( + [name = std::string(name), status](OptRef cluster_manager) { + ENVOY_LOG( + trace, + "cm cdm: starting processing cluster name {} (status {}) from the expired timer in {}", + name, enumToInt(status), cluster_manager->thread_local_dispatcher_.name()); + cluster_manager->cdm_.processClusterName(name, status); + }); +} + +ClusterDiscoveryManager +ClusterManagerImpl::createAndSwapClusterDiscoveryManager(std::string thread_name) { + ThreadLocalClusterManagerImpl& cluster_manager = *tls_; + ClusterDiscoveryManager cdm(std::move(thread_name), cluster_manager); + + cluster_manager.cdm_.swap(cdm); + + return cdm; } ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() { @@ -1074,7 +1188,7 @@ ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() { ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl( ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, const absl::optional& local_cluster_params) - : parent_(parent), thread_local_dispatcher_(dispatcher) { + : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(dispatcher.name(), *this) { // If local cluster is defined then we need to initialize it first. if (local_cluster_params.has_value()) { const auto& local_cluster_name = local_cluster_params->info_->name(); @@ -1319,6 +1433,12 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer( return &container_iter->second; } +ClusterUpdateCallbacksHandlePtr +ClusterManagerImpl::ThreadLocalClusterManagerImpl::addClusterUpdateCallbacks( + ClusterUpdateCallbacks& cb) { + return std::make_unique(cb, update_callbacks_); +} + ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster, const LoadBalancerFactorySharedPtr& lb_factory) diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index ec75f0af94677..cd1a555dc50d0 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -30,7 +30,9 @@ #include "common/config/grpc_mux_impl.h" #include "common/config/subscription_factory_impl.h" #include "common/http/async_client_impl.h" +#include "common/upstream/cluster_discovery_manager.h" #include "common/upstream/load_stats_reporter.h" +#include "common/upstream/od_cds_api_impl.h" #include "common/upstream/priority_conn_pool_map.h" #include "common/upstream/upstream_impl.h" @@ -220,7 +222,9 @@ struct ClusterManagerStats { * Implementation of ClusterManager that reads from a proto configuration, maintains a central * cluster list, as well as thread local caches of each cluster and associated connection pools. */ -class ClusterManagerImpl : public ClusterManager, Logger::Loggable { +class ClusterManagerImpl : public ClusterManager, + public MissingClusterNotifier, + Logger::Loggable { public: ClusterManagerImpl(const envoy::config::bootstrap::v3::Bootstrap& bootstrap, ClusterManagerFactory& factory, Stats::Store& stats, @@ -285,6 +289,11 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable odcds_resources_locator, + ProtobufMessage::ValidationVisitor& validation_visitor) override; + ClusterManagerFactory& clusterManagerFactory() override { return factory_; } Config::SubscriptionFactory& subscriptionFactory() override { return subscription_factory_; } @@ -306,6 +315,9 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable per_priority_update_params_; }; + /** + * An implementation of an on-demand CDS handle. It forwards the discovery request to the cluster + * manager that created the handle. + * + * It's a protected type, so unit tests can use it. + */ + class OdCdsApiHandleImpl : public OdCdsApiHandle { + public: + static OdCdsApiHandlePtr create(ClusterManagerImpl& parent, OdCdsApiSharedPtr odcds) { + return std::make_unique(parent, std::move(odcds)); + } + + OdCdsApiHandleImpl(ClusterManagerImpl& parent, OdCdsApiSharedPtr odcds) + : parent_(parent), odcds_(std::move(odcds)) { + ASSERT(odcds_ != nullptr); + } + + ClusterDiscoveryCallbackHandlePtr + requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout) override { + return parent_.requestOnDemandClusterDiscovery(odcds_, std::string(name), std::move(callback), + timeout); + } + + private: + ClusterManagerImpl& parent_; + OdCdsApiSharedPtr odcds_; + }; + virtual void postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster, ThreadLocalClusterUpdateParams&& params); + /** + * Notifies cluster discovery managers in each worker thread that the discovery process for the + * cluster with a passed name has timed out. + * + * It's protected, so the tests can use it. + */ + void notifyExpiredDiscovery(absl::string_view name); + + /** + * Creates a new discovery manager in current thread and swaps it with the one in thread local + * cluster manager. This could be used to simulate requesting a cluster from a different + * thread. Used for tests only. + * + * Protected, so tests can use it. + */ + ClusterDiscoveryManager createAndSwapClusterDiscoveryManager(std::string thread_name); + private: /** * Thread local cached cluster data. Each thread local cluster gets updates from the parent * central dynamic cluster (if applicable). It maintains load balancer state and any created * connection pools. */ - struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject { + struct ThreadLocalClusterManagerImpl final : public ThreadLocal::ThreadLocalObject, + public ClusterLifecycleCallbackHandler { struct ConnPoolsContainer { ConnPoolsContainer(Event::Dispatcher& dispatcher, const HostConstSharedPtr& host) : pools_{std::make_shared(dispatcher, host)} {} @@ -450,6 +509,9 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable thread_local_clusters_; @@ -465,6 +527,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable update_callbacks_; const PrioritySet* local_priority_set_{}; bool destroying_{}; + ClusterDiscoveryManager cdm_; }; struct ClusterData : public ClusterManagerCluster { @@ -553,6 +616,17 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable; using ClusterUpdatesMap = absl::node_hash_map; + /** + * Holds a reference to an on-demand CDS to keep it alive for the duration of a cluster discovery, + * and an expiration timer notifying worker threads about discovery timing out. + */ + struct ClusterCreation { + OdCdsApiSharedPtr odcds_; + Event::TimerPtr expiration_timer_; + }; + + using ClusterCreationsMap = absl::flat_hash_map; + void applyUpdates(ClusterManagerCluster& cluster, uint32_t priority, PendingUpdates& updates); bool scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority, bool mergeable, const uint64_t timeout); @@ -574,10 +648,20 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable preconnect_pool); + ClusterDiscoveryCallbackHandlePtr + requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name, + ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout); + + void notifyClusterDiscoveryStatus(absl::string_view name, ClusterDiscoveryStatus status); + +private: ClusterManagerFactory& factory_; Runtime::Loader& runtime_; Stats::Store& stats_; ThreadLocal::TypedSlot tls_; + // Contains information about ongoing on-demand cluster discoveries. + ClusterCreationsMap pending_cluster_creations_; Random::RandomGenerator& random_; protected: diff --git a/source/common/upstream/od_cds_api_impl.cc b/source/common/upstream/od_cds_api_impl.cc new file mode 100644 index 0000000000000..e074336ede23e --- /dev/null +++ b/source/common/upstream/od_cds_api_impl.cc @@ -0,0 +1,107 @@ +#include "common/upstream/od_cds_api_impl.h" + +#include "common/common/assert.h" +#include "common/grpc/common.h" + +#include "absl/strings/str_join.h" + +namespace Envoy { +namespace Upstream { + +OdCdsApiSharedPtr +OdCdsApiImpl::create(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ClusterManager& cm, MissingClusterNotifier& notifier, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) { + return OdCdsApiSharedPtr(new OdCdsApiImpl(odcds_config, odcds_resources_locator, cm, notifier, + scope, validation_visitor)); +} + +OdCdsApiImpl::OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ClusterManager& cm, MissingClusterNotifier& notifier, + Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor) + : Envoy::Config::SubscriptionBase( + odcds_config.resource_api_version(), validation_visitor, "name"), + helper_(cm, "odcds"), cm_(cm), notifier_(notifier), scope_(scope.createScope("odcds.")), + status_(StartStatus::NotStarted) { + // TODO(krnowak): Move the subscription setup to CdsApiHelper. Maybe make CdsApiHelper a base + // class for CDS and ODCDS. + const auto resource_name = getResourceName(); + if (!odcds_resources_locator.has_value()) { + subscription_ = cm_.subscriptionFactory().subscriptionFromConfigSource( + odcds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {}); + } else { + subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl( + *odcds_resources_locator, odcds_config, resource_name, *scope_, *this, resource_decoder_); + } +} + +void OdCdsApiImpl::onConfigUpdate(const std::vector& resources, + const std::string& version_info) { + UNREFERENCED_PARAMETER(resources); + UNREFERENCED_PARAMETER(version_info); + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; +} + +void OdCdsApiImpl::onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) { + auto exception_msgs = + helper_.onConfigUpdate(added_resources, removed_resources, system_version_info); + sendAwaiting(); + status_ = StartStatus::InitialFetchDone; + for (const auto& resource_name : removed_resources) { + ENVOY_LOG(debug, "odcds: notifying about potential missing cluster {}", resource_name); + notifier_.notifyMissingCluster(resource_name); + } + if (!exception_msgs.empty()) { + throw EnvoyException( + fmt::format("Error adding/updating cluster(s) {}", absl::StrJoin(exception_msgs, ", "))); + } +} + +void OdCdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, + const EnvoyException*) { + ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason); + sendAwaiting(); + status_ = StartStatus::InitialFetchDone; +} + +void OdCdsApiImpl::sendAwaiting() { + if (awaiting_names_.empty()) { + return; + } + // The awaiting names are sent only once. After the state transition from Starting to + // InitialFetchDone (which happens on the first received response), the awaiting names list is not + // used any more. + ENVOY_LOG(debug, "odcds: sending request for awaiting cluster names {}", + fmt::join(awaiting_names_, ", ")); + subscription_->requestOnDemandUpdate(awaiting_names_); + awaiting_names_.clear(); +} + +void OdCdsApiImpl::updateOnDemand(std::string cluster_name) { + switch (status_) { + case StartStatus::NotStarted: + ENVOY_LOG(trace, "odcds: starting a subscription with cluster name {}", cluster_name); + status_ = StartStatus::Started; + subscription_->start({std::move(cluster_name)}); + return; + + case StartStatus::Started: + ENVOY_LOG(trace, "odcds: putting cluster name {} on awaiting list", cluster_name); + awaiting_names_.insert(std::move(cluster_name)); + return; + + case StartStatus::InitialFetchDone: + ENVOY_LOG(trace, "odcds: requesting for cluster name {}", cluster_name); + subscription_->requestOnDemandUpdate({std::move(cluster_name)}); + return; + } + NOT_REACHED_GCOVR_EXCL_LINE; +} + +} // namespace Upstream +} // namespace Envoy diff --git a/source/common/upstream/od_cds_api_impl.h b/source/common/upstream/od_cds_api_impl.h new file mode 100644 index 0000000000000..0dcd00473f51a --- /dev/null +++ b/source/common/upstream/od_cds_api_impl.h @@ -0,0 +1,95 @@ +#pragma once + +#include +#include + +#include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/config/cluster/v3/cluster.pb.validate.h" +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/config/subscription.h" +#include "envoy/protobuf/message_validator.h" +#include "envoy/stats/scope.h" +#include "envoy/upstream/cluster_manager.h" + +#include "common/config/subscription_base.h" +#include "common/protobuf/protobuf.h" +#include "common/upstream/cds_api_helper.h" + +namespace Envoy { +namespace Upstream { + +enum class StartStatus { + // No initial fetch started. + NotStarted, + // Initial fetch started. + Started, + // Initial fetch arrived. + InitialFetchDone, +}; + +/** + * An interface for on-demand CDS. Defined to allow mocking. + */ +class OdCdsApi { +public: + virtual ~OdCdsApi() = default; + + virtual void updateOnDemand(std::string cluster_name) PURE; +}; + +using OdCdsApiSharedPtr = std::shared_ptr; + +/** + * An interface used by OdCdsApiImpl for sending notifications about the missing cluster that was + * requested. + */ +class MissingClusterNotifier { +public: + virtual ~MissingClusterNotifier() = default; + + virtual void notifyMissingCluster(absl::string_view name) PURE; +}; + +/** + * ODCDS API implementation that fetches via Subscription. + */ +class OdCdsApiImpl : public OdCdsApi, + Envoy::Config::SubscriptionBase, + Logger::Loggable { +public: + static OdCdsApiSharedPtr create(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ClusterManager& cm, MissingClusterNotifier& notifier, + Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor); + + // Upstream::OdCdsApi + void updateOnDemand(std::string cluster_name) override; + +private: + // Config::SubscriptionCallbacks + void onConfigUpdate(const std::vector& resources, + const std::string& version_info) override; + void onConfigUpdate(const std::vector& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& system_version_info) override; + void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, + const EnvoyException* e) override; + + OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, ClusterManager& cm, + MissingClusterNotifier& notifier, Stats::Scope& scope, + ProtobufMessage::ValidationVisitor& validation_visitor); + void sendAwaiting(); + + CdsApiHelper helper_; + ClusterManager& cm_; + MissingClusterNotifier& notifier_; + Stats::ScopePtr scope_; + StartStatus status_; + absl::flat_hash_set awaiting_names_; + Config::SubscriptionPtr subscription_; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/common/config/delta_subscription_state_test.cc b/test/common/config/delta_subscription_state_test.cc index cc33dc4a5758f..6ee512b7bb396 100644 --- a/test/common/config/delta_subscription_state_test.cc +++ b/test/common/config/delta_subscription_state_test.cc @@ -346,14 +346,73 @@ TEST_F(DeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) { EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); } -// For wildcard subscription, upon a reconnection, the server is supposed to assume a -// blank slate for the Envoy's state (hence the need for initial_resource_versions), and -// the resource_names_subscribe and resource_names_unsubscribe must be empty (as is expected -// of every wildcard first message). This is true even if in between the last request of the -// last stream and the first request of the new stream, Envoy gained or lost interest in a -// resource. The subscription & unsubscription implicitly takes effect by simply requesting a -// wildcard subscription in the newly reconnected stream. -TEST_F(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) { +// Check that switching into wildcard subscription after initial +// request switches us into the explicit wildcard mode. +TEST_F(DeltaSubscriptionStateTest, SwitchIntoWildcardMode) { + Protobuf::RepeatedPtrField add1_2 = + populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); + // We call deliverDiscoveryResponse twice in this test. + EXPECT_CALL(*timer_, disableTimer()).Times(2); + deliverDiscoveryResponse(add1_2, {}, "debugversion1"); + + // switch into wildcard mode + state_.updateSubscriptionInterest({"name4", "*"}, {"name1"}); + state_.markStreamFresh(); // simulate a stream reconnection + envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); + // Regarding the resource_names_subscribe field: + // name1: do not include: we lost interest. + // name2: yes do include: we are explicitly interested (from test's base constructor) + // name3: yes do include: we are explicitly interested (from test's base constructor) + // name4: yes do include: we are explicitly interested + // *: explicit wildcard subscription + EXPECT_THAT(cur_request.resource_names_subscribe(), + UnorderedElementsAre("name2", "name3", "name4", "*")); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); + + Protobuf::RepeatedPtrField add4_5 = + populateRepeatedResource({{"name4", "version4A"}, {"name5", "version5A"}}); + deliverDiscoveryResponse(add4_5, {}, "debugversion1"); + + state_.markStreamFresh(); // simulate a stream reconnection + cur_request = state_.getNextRequestAckless(); + // Regarding the resource_names_subscribe field: + // name1: do not include: we lost interest. + // name2: yes do include: we are explicitly interested (from test's base constructor) + // name3: yes do include: we are explicitly interested (from test's base constructor) + // name4: yes do include: we are explicitly interested + // name5: do not include: we are implicitly interested, so this resource should not appear on the + // initial request + // *: explicit wildcard subscription + EXPECT_THAT(cur_request.resource_names_subscribe(), + UnorderedElementsAre("name2", "name3", "name4", "*")); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); +} + +// For wildcard subscription, upon a reconnection, the server is supposed to assume a blank slate +// for the Envoy's state (hence the need for initial_resource_versions), and the +// resource_names_subscribe and resource_names_unsubscribe must be empty if we haven't gained any +// new explicit interest in a resource. In such case, the client should send an empty request. +TEST_F(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnectImplicit) { + Protobuf::RepeatedPtrField add1_2 = + populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); + EXPECT_CALL(*timer_, disableTimer()); + deliverDiscoveryResponse(add1_2, {}, "debugversion1"); + + state_.markStreamFresh(); // simulate a stream reconnection + envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); + // Regarding the resource_names_subscribe field: + // name1: do not include: we lost interest. + // name2: do not include: we are implicitly interested, but for wildcard it shouldn't be provided. + EXPECT_TRUE(cur_request.resource_names_subscribe().empty()); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); +} + +// For wildcard subscription, upon a reconnection, the server is supposed to assume a blank slate +// for the Envoy's state (hence the need for initial_resource_versions). The +// resource_names_unsubscribe must be empty (as is expected of every wildcard first message). The +// resource_names_subscribe should contain all the resources we are explicitly interested in and a +// special resource denoting a wildcard subscription. +TEST_F(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnectExplicit) { Protobuf::RepeatedPtrField add1_2 = populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); EXPECT_CALL(*timer_, disableTimer()); @@ -364,11 +423,106 @@ TEST_F(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); // Regarding the resource_names_subscribe field: // name1: do not include: we lost interest. - // name2: do not include: we are interested, but for wildcard it shouldn't be provided. - // name4: do not include: although we are newly interested, an initial wildcard request - // must be with no resources. + // name2: do not include: we are implicitly interested, but for wildcard it shouldn't be provided. + // name3: yes do include: we are explicitly interested. + EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("*", "name3")); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); +} + +// Check the contents of the requests after cancelling the wildcard +// subscription and then reconnection. The second request should look +// like a non-wildcard request, so mention all the known resources in +// the initial request. +TEST_F(WildcardDeltaSubscriptionStateTest, CancellingImplicitWildcardSubscription) { + Protobuf::RepeatedPtrField add1_2 = + populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); + EXPECT_CALL(*timer_, disableTimer()); + deliverDiscoveryResponse(add1_2, {}, "debugversion1"); + + state_.updateSubscriptionInterest({"name3"}, {"name1", "*"}); + envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); + EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name3")); + EXPECT_THAT(cur_request.resource_names_unsubscribe(), UnorderedElementsAre("name1", "*")); + state_.markStreamFresh(); // simulate a stream reconnection + // Regarding the resource_names_subscribe field: + // name1: do not include: we lost interest. + // name2: yes do include: we are interested, and it's not wildcard. + // name3: yes do include: we are interested, and it's not wildcard. + cur_request = state_.getNextRequestAckless(); + EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name2", "name3")); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); +} + +// Check the contents of the requests after cancelling the wildcard +// subscription and then reconnection. The second request should look +// like a non-wildcard request, so mention all the known resources in +// the initial request. +TEST_F(WildcardDeltaSubscriptionStateTest, CancellingExplicitWildcardSubscription) { + Protobuf::RepeatedPtrField add1_2 = + populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); + EXPECT_CALL(*timer_, disableTimer()); + deliverDiscoveryResponse(add1_2, {}, "debugversion1"); + // switch to explicit wildcard subscription + state_.updateSubscriptionInterest({"name3"}, {}); + envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); + EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name3")); + + // cancel wildcard subscription + state_.updateSubscriptionInterest({"name4"}, {"name1", "*"}); + cur_request = state_.getNextRequestAckless(); + EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name4")); + EXPECT_THAT(cur_request.resource_names_unsubscribe(), UnorderedElementsAre("name1", "*")); + state_.markStreamFresh(); // simulate a stream reconnection + // Regarding the resource_names_subscribe field: + // name1: do not include: we lost interest. + // name2: yes do include: we are interested, and it's not wildcard. + // name3: yes do include: we are interested, and it's not wildcard. + // name4: yes do include: we are interested, and it's not wildcard. + cur_request = state_.getNextRequestAckless(); + EXPECT_THAT(cur_request.resource_names_subscribe(), + UnorderedElementsAre("name2", "name3", "name4")); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); +} + +// Check that resource changes from being interested in implicitly to explicitly when we update the +// subscription interest. Such resources will show up in the initial wildcard requests +// too. Receiving the update on such resource will not change their interest mode. +TEST_F(WildcardDeltaSubscriptionStateTest, ExplicitInterestOverridesImplicit) { + Protobuf::RepeatedPtrField add1_2_a = + populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); + EXPECT_CALL(*timer_, disableTimer()).Times(2); + deliverDiscoveryResponse(add1_2_a, {}, "debugversion1"); + + // verify that neither name1 nor name2 appears in the initial request (they are of implicit + // interest and initial wildcard request should not contain those). + state_.markStreamFresh(); // simulate a stream reconnection + envoy::service::discovery::v3::DeltaDiscoveryRequest cur_request = state_.getNextRequestAckless(); EXPECT_TRUE(cur_request.resource_names_subscribe().empty()); EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); + + // express the interest in name1 explicitly and verify that the follow-up request will contain it + // (this also switches the wildcard mode to explicit, but we won't see * in resource names, + // because we already are in wildcard mode). + state_.updateSubscriptionInterest({"name1"}, {}); + cur_request = state_.getNextRequestAckless(); + EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name1")); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); + + // verify that name1 and * appear in the initial request (name1 is of explicit interest and we are + // in explicit wildcard mode). + state_.markStreamFresh(); // simulate a stream reconnection + cur_request = state_.getNextRequestAckless(); + EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name1", "*")); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); + + // verify that getting an update on name1 will keep name1 in the explicit interest mode + Protobuf::RepeatedPtrField add1_2_b = + populateRepeatedResource({{"name1", "version1B"}, {"name2", "version2B"}}); + deliverDiscoveryResponse(add1_2_b, {}, "debugversion1"); + state_.markStreamFresh(); // simulate a stream reconnection + cur_request = state_.getNextRequestAckless(); + EXPECT_THAT(cur_request.resource_names_subscribe(), UnorderedElementsAre("name1", "*")); + EXPECT_TRUE(cur_request.resource_names_unsubscribe().empty()); } // initial_resource_versions should not be present on messages after the first in a stream. diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index c4964177d10fb..4442a5a248211 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -14,6 +14,20 @@ licenses(["notice"]) # Apache 2 envoy_package() +envoy_cc_test( + name = "od_cds_api_impl_test", + srcs = ["od_cds_api_impl_test.cc"], + deps = [ + "//include/envoy/config:subscription_interface", + "//source/common/stats:isolated_store_lib", + "//source/common/upstream:od_cds_api_lib", + "//test/mocks/protobuf:protobuf_mocks", + "//test/mocks/upstream:cluster_manager_mocks", + "//test/mocks/upstream:missing_cluster_notifier_mocks", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + ], +) + envoy_cc_test( name = "cds_api_impl_test", srcs = ["cds_api_impl_test.cc"], @@ -32,6 +46,17 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "cluster_discovery_manager_test", + srcs = ["cluster_discovery_manager_test.cc"], + deps = [ + "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:cleanup_lib", + "//source/common/upstream:cluster_discovery_manager_lib", + "//test/mocks/upstream:thread_local_cluster_mocks", + ], +) + envoy_cc_test( name = "cluster_manager_impl_test", srcs = ["cluster_manager_impl_test.cc"], @@ -42,12 +67,14 @@ envoy_cc_test( ":test_cluster_manager", "//source/common/router:context_lib", "//source/extensions/transport_sockets/tls:config", + "//test/mocks/protobuf:protobuf_mocks", "//test/mocks/upstream:cds_api_mocks", "//test/mocks/upstream:cluster_priority_set_mocks", "//test/mocks/upstream:cluster_real_priority_set_mocks", "//test/mocks/upstream:cluster_update_callbacks_mocks", "//test/mocks/upstream:health_checker_mocks", "//test/mocks/upstream:load_balancer_context_mock", + "//test/mocks/upstream:od_cds_api_mocks", "//test/mocks/upstream:thread_aware_load_balancer_mocks", "//test/test_common:test_runtime_lib", "@envoy_api//envoy/admin/v3:pkg_cc_proto", diff --git a/test/common/upstream/cluster_discovery_manager_test.cc b/test/common/upstream/cluster_discovery_manager_test.cc new file mode 100644 index 0000000000000..8394cf71eb947 --- /dev/null +++ b/test/common/upstream/cluster_discovery_manager_test.cc @@ -0,0 +1,447 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "envoy/upstream/cluster_manager.h" + +#include "common/common/cleanup.h" +#include "common/upstream/cluster_discovery_manager.h" + +#include "test/mocks/upstream/thread_local_cluster.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { +namespace { + +class TestClusterUpdateCallbacksHandle : public ClusterUpdateCallbacksHandle, + RaiiListElement { +public: + TestClusterUpdateCallbacksHandle(ClusterUpdateCallbacks& cb, + std::list& parent) + : RaiiListElement(parent, &cb) {} +}; + +class TestClusterLifecycleCallbackHandler : public ClusterLifecycleCallbackHandler { +public: + // Upstream::ClusterLifecycleCallbackHandler + ClusterUpdateCallbacksHandlePtr addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) override { + return std::make_unique(cb, update_callbacks_); + } + + void invokeClusterAdded(ThreadLocalCluster& cluster) { + for (auto& cb : update_callbacks_) { + cb->onClusterAddOrUpdate(cluster); + } + } + + std::list update_callbacks_; +}; + +enum class Action { + InvokePrevious, + InvokeSelf, + InvokeNext, + InvokeLast, + InvokeOther, + ProcessFoo, + ProcessBar, + DestroyPrevious, + DestroySelf, + DestroyNext, + DestroyOther, + AddNewToFoo, +}; + +const char* actionToString(Action action) { + switch (action) { + case Action::InvokePrevious: + return "invoke previous"; + + case Action::InvokeSelf: + return "invoke self"; + + case Action::InvokeNext: + return "invoke next"; + + case Action::InvokeLast: + return "invoke last"; + + case Action::InvokeOther: + return "invoke other"; + + case Action::ProcessFoo: + return "process foo"; + + case Action::ProcessBar: + return "process bar"; + + case Action::DestroyPrevious: + return "destroy previous"; + + case Action::DestroySelf: + return "destroy self"; + + case Action::DestroyNext: + return "destroy next"; + + case Action::DestroyOther: + return "destroy other"; + + case Action::AddNewToFoo: + return "add new to foo"; + } + + return "invalid action"; +} + +std::ostream& operator<<(std::ostream& os, Action action) { + os << actionToString(action); + return os; +} + +enum class OtherActionsExecution { + AfterFirstAction, + WithinFirstAction, +}; + +struct ActionsParameter { + ActionsParameter(std::vector actions, std::vector called_callbacks, + OtherActionsExecution other_actions_execution) + : actions_(std::move(actions)), called_callbacks_(std::move(called_callbacks)), + other_actions_execution_(other_actions_execution) {} + + std::vector actions_; + std::vector called_callbacks_; + OtherActionsExecution other_actions_execution_; +}; + +std::ostream& operator<<(std::ostream& os, const ActionsParameter& param) { + const char* prefix = ""; + const char* first_separator = ", "; + if (param.other_actions_execution_ == OtherActionsExecution::WithinFirstAction) { + prefix = "during "; + first_separator = ": "; + } + os << prefix << param.actions_.front() << first_separator + << absl::StrJoin(param.actions_.begin() + 1, param.actions_.end(), ", ", + absl::StreamFormatter()) + << " => " << absl::StrJoin(param.called_callbacks_, ", "); + return os; +} + +class ActionExecutor { +public: + ActionExecutor() + : cdm_("test_thread", lifecycle_handler_), previous_(addCallback("foo", "previous")), + self_(addCallback("foo", "self")), next_(addCallback("foo", "next")), + last_(addCallback("foo", "last")), other_(addCallback("bar", "other")) {} + + void setSelfCallback(std::function self_callback) { + self_callback_ = std::move(self_callback); + } + + void execute(Action action) { + switch (action) { + case Action::InvokePrevious: + useInvoker(previous_.invoker_); + break; + + case Action::InvokeSelf: + useInvoker(self_.invoker_); + break; + + case Action::InvokeNext: + useInvoker(next_.invoker_); + break; + + case Action::InvokeLast: + useInvoker(last_.invoker_); + break; + + case Action::InvokeOther: + useInvoker(other_.invoker_); + break; + + case Action::ProcessFoo: + processClusterName("foo"); + break; + + case Action::ProcessBar: + processClusterName("bar"); + break; + + case Action::DestroyPrevious: + previous_.handle_ptr_.reset(); + break; + + case Action::DestroySelf: + self_.handle_ptr_.reset(); + break; + + case Action::DestroyNext: + next_.handle_ptr_.reset(); + break; + + case Action::DestroyOther: + other_.handle_ptr_.reset(); + break; + + case Action::AddNewToFoo: + std::string callback_name = "new" + std::to_string(new_.size()); + new_.emplace_back(addCallback("foo", std::move(callback_name))); + break; + } + } + + ClusterDiscoveryManager::AddedCallbackData addCallback(std::string cluster_name, + std::string callback_name) { + return cdm_.addCallback( + std::move(cluster_name), + std::make_unique( + [this, callback_name = std::move(callback_name)](ClusterDiscoveryStatus) { + // we ignore the status, it's a thing that always comes from outside the manager + bool is_self = callback_name == "self"; + called_callbacks_.push_back(std::move(callback_name)); + if (is_self && self_callback_) { + self_callback_(); + } + })); + } + + void processClusterName(std::string name) { + auto cluster = NiceMock(); + cluster.cluster_.info_->name_ = std::move(name); + lifecycle_handler_.invokeClusterAdded(cluster); + } + + void useInvoker(ClusterDiscoveryManager::CallbackInvoker& invoker) { + invoker.invokeCallback(ClusterDiscoveryStatus::Available); + } + + TestClusterLifecycleCallbackHandler lifecycle_handler_; + ClusterDiscoveryManager cdm_; + std::vector called_callbacks_; + ClusterDiscoveryManager::AddedCallbackData previous_, self_, next_, last_, other_; + std::vector new_; + std::function self_callback_; +}; + +class ActionExecutorTest : public testing::TestWithParam { +public: + void runTest() { + auto& [actions, expected_result, other_actions_execution] = GetParam(); + + ASSERT_FALSE(actions.empty()); + + switch (other_actions_execution) { + case OtherActionsExecution::AfterFirstAction: + for (auto action : actions) { + executor_.execute(action); + } + break; + + case OtherActionsExecution::WithinFirstAction: + executor_.setSelfCallback([this, begin = actions.begin() + 1, end = actions.end()]() { + for (auto it = begin; it != end; ++it) { + executor_.execute(*it); + } + }); + executor_.execute(actions.front()); + break; + } + + EXPECT_EQ(executor_.called_callbacks_, expected_result); + } + + ActionExecutor executor_; +}; + +std::vector all_actions = { + // invoke self twice in a row; expect it to be called once + ActionsParameter({Action::InvokeSelf, Action::InvokeSelf}, {"self"}, + OtherActionsExecution::AfterFirstAction), + // invoke self then other; expect them to be called normally + ActionsParameter({Action::InvokeSelf, Action::InvokeOther}, {"self", "other"}, + OtherActionsExecution::AfterFirstAction), + // invoke self then process foo; since self was already called, processing foo should not call + // it again + ActionsParameter({Action::InvokeSelf, Action::ProcessFoo}, {"self", "previous", "next", "last"}, + OtherActionsExecution::AfterFirstAction), + // invoke self then process bar; expect them to be called normally + ActionsParameter({Action::InvokeSelf, Action::ProcessBar}, {"self", "other"}, + OtherActionsExecution::AfterFirstAction), + // invoke self then destroy self; expect destroying to be a noop instead of corrupting things + // (this is mostly for address sanitizer) + ActionsParameter({Action::InvokeSelf, Action::DestroySelf}, {"self"}, + OtherActionsExecution::AfterFirstAction), + // process foo then invoke self; since self was called as a part of processing foo, invoke + // should be a noop + ActionsParameter({Action::ProcessFoo, Action::InvokeSelf}, {"previous", "self", "next", "last"}, + OtherActionsExecution::AfterFirstAction), + // process foo twice; expect the callbacks to be called once + ActionsParameter({Action::ProcessFoo, Action::ProcessFoo}, {"previous", "self", "next", "last"}, + OtherActionsExecution::AfterFirstAction), + // process foo then bar; expect the callbacks to be called normally + ActionsParameter({Action::ProcessFoo, Action::ProcessBar}, + {"previous", "self", "next", "last", "other"}, + OtherActionsExecution::AfterFirstAction), + // process foo then destroy self; expect destroying to be a noop instead of corrupting things + // (this is mostly for address sanitizer) + ActionsParameter({Action::ProcessFoo, Action::DestroySelf}, + {"previous", "self", "next", "last"}, OtherActionsExecution::AfterFirstAction), + // destroy self then invoke self; expect the invoke to be a noop + ActionsParameter({Action::DestroySelf, Action::InvokeSelf}, {}, + OtherActionsExecution::AfterFirstAction), + // destroy self then process foo; expect all callbacks but self to be invoked + ActionsParameter({Action::DestroySelf, Action::ProcessFoo}, {"previous", "next", "last"}, + OtherActionsExecution::AfterFirstAction), + // destroy self twice; expect the second destroying to be a noop instead of corrupting things + // (this is mostly for address sanitizer) + ActionsParameter({Action::DestroySelf, Action::DestroySelf}, {}, + OtherActionsExecution::AfterFirstAction), + + // when invoking self, invoke self; expect the second invoke to be a noop + ActionsParameter({Action::InvokeSelf, Action::InvokeSelf}, {"self"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, destroy self; expect the second destroying to be a noop instead of + // corrupting things (this is mostly for address sanitizer) + ActionsParameter({Action::InvokeSelf, Action::DestroySelf}, {"self"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, process foo; expect all callbacks but self to be invoked, since self was + // already invoked + ActionsParameter({Action::InvokeSelf, Action::ProcessFoo}, {"self", "previous", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, process bar; expect the callbacks to be called normally + ActionsParameter({Action::InvokeSelf, Action::ProcessBar}, {"self", "other"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, invoke previous; expect the invoke to be a noop, because previous has + // already been called and done + ActionsParameter({Action::ProcessFoo, Action::InvokePrevious}, + {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, invoke self; expect the invoke to be a noop, because self is being + // called right now + ActionsParameter({Action::ProcessFoo, Action::InvokeSelf}, {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, invoke next; expect next to be called once (with the invoke), while + // calling it during the process should become a noop + ActionsParameter({Action::ProcessFoo, Action::InvokeNext}, {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, invoke last; expect last to be called out of order + ActionsParameter({Action::ProcessFoo, Action::InvokeLast}, {"previous", "self", "last", "next"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, process foo; expect the second process to be a noop + ActionsParameter({Action::ProcessFoo, Action::ProcessFoo}, {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, process bar; expect the callbacks to be called normally, but bar + // callbacks should be called before the rest of foo callbacks + ActionsParameter({Action::ProcessFoo, Action::ProcessBar}, + {"previous", "self", "other", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, destroy self; expect the second destroying to be a noop (since self is + // being called at the moment), instead of corrupting things (this is mostly for address + // sanitizer) + ActionsParameter({Action::ProcessFoo, Action::DestroySelf}, + {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, destroy next; expect next callback to be skipped + ActionsParameter({Action::ProcessFoo, Action::DestroyNext}, {"previous", "self", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, add a new callback to foo; expect the new callback to be not invoked + // (could be invoked with a follow-up process) + ActionsParameter({Action::ProcessFoo, Action::AddNewToFoo}, + {"previous", "self", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + + // when invoking self, invoke previous and process foo; expect the process to call only next and + // last + ActionsParameter({Action::InvokeSelf, Action::InvokePrevious, Action::ProcessFoo}, + {"self", "previous", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, invoke next and process foo; expect process to call only previous and + // last + ActionsParameter({Action::InvokeSelf, Action::InvokeNext, Action::ProcessFoo}, + {"self", "next", "previous", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, invoke other invoke last, and process bar; expect the process to be a + // noop (invoking last for visibility of the noop) + ActionsParameter( + {Action::InvokeSelf, Action::InvokeOther, Action::InvokeLast, Action::ProcessBar}, + {"self", "other", "last"}, OtherActionsExecution::WithinFirstAction), + // when invoking self, process foo then invoke previous; expect the process to skip self (as + // it's being called at the moment) and invoking previous to be a noop (called during the + // process) + ActionsParameter({Action::InvokeSelf, Action::ProcessFoo, Action::InvokePrevious}, + {"self", "previous", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, process foo then invoke previous; expect the process to skip self (as + // it's being called at the moment) and invoking previous to be a noop (called during the + // process) + ActionsParameter({Action::InvokeSelf, Action::ProcessFoo, Action::DestroyPrevious}, + {"self", "previous", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, destroy previous and process foo; expect self and previous to be skipped + // when processing + ActionsParameter({Action::InvokeSelf, Action::DestroyPrevious, Action::ProcessFoo}, + {"self", "next", "last"}, OtherActionsExecution::WithinFirstAction), + // when invoking self, destroy other and process bar; expect the process to be a noop + ActionsParameter({Action::InvokeSelf, Action::DestroyOther, Action::ProcessBar}, {"self"}, + OtherActionsExecution::WithinFirstAction), + // when invoking self, add new callback to foo and process foo; expect self to be skipped, but + // new to be called along with the rest of the callbacks + ActionsParameter({Action::InvokeSelf, Action::AddNewToFoo, Action::ProcessFoo}, + {"self", "previous", "next", "last", "new0"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, add new callback to foo, process foo then invoke other; expect the + // second process to call only the new callback, then first process to resume with the rest of + // the callbacks (the other callback is added to see the split between two processes) + ActionsParameter( + {Action::ProcessFoo, Action::AddNewToFoo, Action::ProcessFoo, Action::InvokeOther}, + {"previous", "self", "new0", "other", "next", "last"}, + OtherActionsExecution::WithinFirstAction), + // when processing foo, add new to foo and destroy next; expect the new callback to be not + // called, same for the next callback + ActionsParameter({Action::ProcessFoo, Action::AddNewToFoo, Action::DestroyNext}, + {"previous", "self", "last"}, OtherActionsExecution::WithinFirstAction), + // when processing foo, destroy next and try to invoke next; + // expect the invoke to be noop, and processing to not call the + // next callback + ActionsParameter({Action::ProcessFoo, Action::DestroyNext, Action::InvokeNext}, + {"previous", "self", "last"}, OtherActionsExecution::WithinFirstAction), +}; + +class ClusterDiscoveryTest : public ActionExecutorTest {}; + +INSTANTIATE_TEST_SUITE_P(ClusterDiscoveryTestActions, ClusterDiscoveryTest, + testing::ValuesIn(all_actions)); + +TEST_P(ClusterDiscoveryTest, TestActions) { runTest(); } + +class ClusterDiscoveryManagerMiscTest : public testing::Test { +public: + ClusterDiscoveryManagerMiscTest() = default; + + ActionExecutor executor_; +}; + +// Test the the discovery in progress value is correct. +TEST_F(ClusterDiscoveryManagerMiscTest, TestDiscoveryInProgressValue) { + // previous is first callback added to foo + EXPECT_FALSE(executor_.previous_.discovery_in_progress_); + // self, next and last callbacks are follow-up callbacks in foo + EXPECT_TRUE(executor_.self_.discovery_in_progress_); + EXPECT_TRUE(executor_.next_.discovery_in_progress_); + EXPECT_TRUE(executor_.last_.discovery_in_progress_); + // other is first callback added to bar + EXPECT_FALSE(executor_.other_.discovery_in_progress_); +} + +} // namespace +} // namespace Upstream +} // namespace Envoy diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index de00bb302046c..d383ad042edd3 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -4,18 +4,21 @@ #include "envoy/config/cluster/v3/cluster.pb.validate.h" #include "envoy/config/core/v3/base.pb.h" +#include "common/config/xds_resource.h" #include "common/network/raw_buffer_socket.h" #include "common/router/context_impl.h" #include "extensions/transport_sockets/raw_buffer/config.h" #include "test/common/upstream/test_cluster_manager.h" +#include "test/mocks/protobuf/mocks.h" #include "test/mocks/upstream/cds_api.h" #include "test/mocks/upstream/cluster_priority_set.h" #include "test/mocks/upstream/cluster_real_priority_set.h" #include "test/mocks/upstream/cluster_update_callbacks.h" #include "test/mocks/upstream/health_checker.h" #include "test/mocks/upstream/load_balancer_context.h" +#include "test/mocks/upstream/od_cds_api.h" #include "test/mocks/upstream/thread_aware_load_balancer.h" #include "test/test_common/test_runtime.h" @@ -164,6 +167,254 @@ envoy::config::bootstrap::v3::Bootstrap defaultConfig() { return parseBootstrapFromV3Yaml(yaml); } +class ODCDTest : public ClusterManagerImplTest { +public: + void SetUp() override { + create(defaultConfig()); + odcds_ = MockOdCdsApi::create(); + odcds_handle_ = cluster_manager_->createOdCdsApiHandle(odcds_); + } + + void TearDown() override { + odcds_.reset(); + odcds_handle_.reset(); + factory_.tls_.shutdownThread(); + } + + ClusterDiscoveryCallbackPtr createCallback() { + return std::make_unique( + [this](ClusterDiscoveryStatus cluster_status) { + UNREFERENCED_PARAMETER(cluster_status); + ++callback_call_count_; + }); + } + + ClusterDiscoveryCallbackPtr createCallback(ClusterDiscoveryStatus expected_cluster_status) { + return std::make_unique( + [this, expected_cluster_status](ClusterDiscoveryStatus cluster_status) { + EXPECT_EQ(expected_cluster_status, cluster_status); + ++callback_call_count_; + }); + } + + MockOdCdsApiSharedPtr odcds_; + OdCdsApiHandlePtr odcds_handle_; + std::chrono::milliseconds timeout_ = std::chrono::milliseconds(5000); + unsigned callback_call_count_ = 0u; +}; + +// Check that we create a valid handle for valid config source and null resource locator. +TEST_F(ODCDTest, TestAllocate) { + envoy::config::core::v3::ConfigSource config; + OptRef locator; + ProtobufMessage::MockValidationVisitor mock_visitor; + + config.mutable_api_config_source()->set_api_type( + envoy::config::core::v3::ApiConfigSource::DELTA_GRPC); + config.mutable_api_config_source()->set_transport_api_version(envoy::config::core::v3::V3); + config.mutable_api_config_source()->mutable_refresh_delay()->set_seconds(1); + config.mutable_api_config_source()->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name( + "static_cluster"); + + auto handle = cluster_manager_->allocateOdCdsApi(config, locator, mock_visitor); + EXPECT_NE(handle, nullptr); +} + +// Check that we create a valid handle for valid config source and resource locator. +TEST_F(ODCDTest, TestAllocateWithLocator) { + envoy::config::core::v3::ConfigSource config; + ProtobufMessage::MockValidationVisitor mock_visitor; + + config.mutable_api_config_source()->set_api_type( + envoy::config::core::v3::ApiConfigSource::DELTA_GRPC); + config.mutable_api_config_source()->set_transport_api_version(envoy::config::core::v3::V3); + config.mutable_api_config_source()->mutable_refresh_delay()->set_seconds(1); + config.mutable_api_config_source()->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name( + "static_cluster"); + + auto locator = Config::XdsResourceIdentifier::decodeUrl("xdstp://foo/envoy.api.v2.Cluster/bar"); + auto handle = cluster_manager_->allocateOdCdsApi(config, locator, mock_visitor); + EXPECT_NE(handle, nullptr); +} + +// Check if requesting for an unknown cluster calls into ODCDS instead of invoking the callback. +TEST_F(ODCDTest, TestRequest) { + auto cb = createCallback(); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 0); +} + +// Check if repeatedly requesting for an unknown cluster calls only once into ODCDS instead of +// invoking the callbacks. +TEST_F(ODCDTest, TestRequestRepeated) { + auto cb1 = createCallback(); + auto cb2 = createCallback(); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle1 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb1), timeout_); + auto handle2 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb2), timeout_); + EXPECT_EQ(callback_call_count_, 0); +} + +// Check if requesting an unknown cluster calls into ODCDS, even after the successful discovery of +// the cluster and its following expiration (removal). Also make sure that the callback is called on +// the successful discovery. +TEST_F(ODCDTest, TestClusterRediscovered) { + auto cb = createCallback(ClusterDiscoveryStatus::Available); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(2); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + EXPECT_EQ(callback_call_count_, 1); + handle.reset(); + cluster_manager_->removeCluster("cluster_foo"); + cb = createCallback(); + handle = odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 1); +} + +// Check if requesting an unknown cluster calls into ODCDS, even after the expired discovery of the +// cluster. Also make sure that the callback is called on the expired discovery. +TEST_F(ODCDTest, TestClusterRediscoveredAfterExpiration) { + auto cb = createCallback(ClusterDiscoveryStatus::Timeout); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(2); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->notifyExpiredDiscovery("cluster_foo"); + EXPECT_EQ(callback_call_count_, 1); + handle.reset(); + cb = createCallback(); + handle = odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 1); +} + +// Check if requesting an unknown cluster calls into ODCDS, even after +// the discovery found out that the cluster is missing in the +// management server. Also make sure that the callback is called on +// the failed discovery. +TEST_F(ODCDTest, TestClusterRediscoveredAfterMissing) { + auto cb = createCallback(ClusterDiscoveryStatus::Missing); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(2); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->notifyMissingCluster("cluster_foo"); + EXPECT_EQ(callback_call_count_, 1); + handle.reset(); + cb = createCallback(); + handle = odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 1); +} + +// Check that we do nothing if we get a notification about irrelevant +// missing cluster. +TEST_F(ODCDTest, TestIrrelevantNotifyMissingCluster) { + auto cb = createCallback(ClusterDiscoveryStatus::Timeout); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->notifyMissingCluster("cluster_bar"); + EXPECT_EQ(callback_call_count_, 0); +} + +// Check that the callback is not called when some other cluster is added. +TEST_F(ODCDTest, TestDiscoveryManagerIgnoresIrrelevantClusters) { + auto cb = std::make_unique([](ClusterDiscoveryStatus) { + ADD_FAILURE() << "The callback should not be called for irrelevant clusters"; + }); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_irrelevant"), "version1"); +} + +// Start a couple of discoveries and drop the discovery handles in different order, make sure no +// callbacks are invoked when discoveries are done. +TEST_F(ODCDTest, TestDroppingHandles) { + auto cb1 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 1 should not be called"; }); + auto cb2 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 2 should not be called"; }); + auto cb3 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 3 should not be called"; }); + auto cb4 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 4 should not be called"; }); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo1")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo2")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo3")); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo4")); + auto handle1 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo1", std::move(cb1), timeout_); + auto handle2 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo2", std::move(cb2), timeout_); + auto handle3 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo3", std::move(cb3), timeout_); + auto handle4 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo4", std::move(cb4), timeout_); + + handle2.reset(); + handle3.reset(); + handle1.reset(); + handle4.reset(); + + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo1"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo2"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo3"), "version1"); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo4"), "version1"); +} + +// Checks that dropping discovery handles will result in callbacks not being invoked. +TEST_F(ODCDTest, TestHandles) { + auto cb1 = createCallback(ClusterDiscoveryStatus::Available); + auto cb2 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 2 should not be called"; }); + auto cb3 = std::make_unique( + [](ClusterDiscoveryStatus) { ADD_FAILURE() << "The callback 3 should not be called"; }); + auto cb4 = createCallback(ClusterDiscoveryStatus::Available); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto handle1 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb1), timeout_); + auto handle2 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb2), timeout_); + auto handle3 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb3), timeout_); + auto handle4 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb4), timeout_); + + // handle1 and handle4 are left intact, so their respective callbacks will be invoked. + handle2.reset(); + handle3.reset(); + + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + EXPECT_EQ(callback_call_count_, 2); +} + +// Check if callback is invoked when trying to discover a cluster we already know about. It should +// not call into ODCDS in such case. +TEST_F(ODCDTest, TestCallbackWithExistingCluster) { + auto cb = createCallback(ClusterDiscoveryStatus::Available); + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("cluster_foo"), "version1"); + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")).Times(0); + auto handle = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb), timeout_); + EXPECT_EQ(callback_call_count_, 1); +} + +// Checks that the cluster manager detects that a thread has requested a cluster that some other +// thread already did earlier, so it does not start another discovery process. +TEST_F(ODCDTest, TestMainThreadDiscoveryInProgressDetection) { + EXPECT_CALL(*odcds_, updateOnDemand("cluster_foo")); + auto cb1 = createCallback(); + auto cb2 = createCallback(); + auto handle1 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb1), timeout_); + auto cdm = cluster_manager_->createAndSwapClusterDiscoveryManager("another_fake_thread"); + auto handle2 = + odcds_handle_->requestOnDemandClusterDiscovery("cluster_foo", std::move(cb2), timeout_); +} + class AlpnSocketFactory : public Network::RawBufferSocketFactory { public: bool supportsAlpn() const override { return true; } diff --git a/test/common/upstream/od_cds_api_impl_test.cc b/test/common/upstream/od_cds_api_impl_test.cc new file mode 100644 index 0000000000000..6a1776bde6eb6 --- /dev/null +++ b/test/common/upstream/od_cds_api_impl_test.cc @@ -0,0 +1,177 @@ +#include "envoy/config/core/v3/config_source.pb.h" +#include "envoy/config/subscription.h" + +#include "common/stats/isolated_store_impl.h" +#include "common/upstream/od_cds_api_impl.h" + +#include "test/mocks/protobuf/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" +#include "test/mocks/upstream/missing_cluster_notifier.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { +namespace { + +using ::testing::ElementsAre; +using ::testing::InSequence; +using ::testing::UnorderedElementsAre; + +class OdCdsApiImplTest : public testing::Test { +public: + void SetUp() override { + envoy::config::core::v3::ConfigSource odcds_config; + OptRef null_locator; + odcds_ = OdCdsApiImpl::create(odcds_config, null_locator, cm_, notifier_, store_, + validation_visitor_); + odcds_callbacks_ = cm_.subscription_factory_.callbacks_; + } + + NiceMock cm_; + Stats::IsolatedStoreImpl store_; + MockMissingClusterNotifier notifier_; + OdCdsApiSharedPtr odcds_; + Config::SubscriptionCallbacks* odcds_callbacks_ = nullptr; + NiceMock validation_visitor_; +}; + +// Check that the subscription is started on the first (initial) request. +TEST_F(OdCdsApiImplTest, FirstUpdateStarts) { + InSequence s; + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(ElementsAre("fake_cluster"))); + odcds_->updateOnDemand("fake_cluster"); +} + +// Check that the cluster names are added to the awaiting list, when we still wait for the response +// for the initial request. +TEST_F(OdCdsApiImplTest, FollowingClusterNamesHitAwaitingList) { + InSequence s; + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(ElementsAre("fake_cluster"))); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, requestOnDemandUpdate(_)).Times(0); + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster"); +} + +// Check that the awaiting list is processed when we receive a successful response for the initial +// request. +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnConfigUpdate) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + envoy::config::cluster::v3::Cluster cluster; + cluster.set_name("fake_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster}); + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, "0"); +} + +// Check that the awaiting list is processed when we receive a failure response for the initial +// request. +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnConfigUpdateFailed) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +// Check that the awaiting list is processed only once, so on the first config update or config +// update failed. +TEST_F(OdCdsApiImplTest, AwaitingListIsProcessedOnceOnly) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + odcds_->updateOnDemand("another_cluster_1"); + odcds_->updateOnDemand("another_cluster_2"); + + EXPECT_CALL( + *cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster_1", "another_cluster_2"))); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +// Check that we don't do extra request if there's nothing on the awaiting list. +TEST_F(OdCdsApiImplTest, NothingIsRequestedOnEmptyAwaitingList) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + + EXPECT_CALL(*cm_.subscription_factory_.subscription_, requestOnDemandUpdate(_)).Times(0); + odcds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, + nullptr); +} + +// Check that we send the requests for clusters after receiving the initial response instead of +// putting the names into the awaiting list. +TEST_F(OdCdsApiImplTest, OnDemandUpdateIsRequestedAfterInitialFetch) { + InSequence s; + + odcds_->updateOnDemand("fake_cluster"); + envoy::config::cluster::v3::Cluster cluster; + cluster.set_name("fake_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster}); + odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, "0"); + EXPECT_CALL(*cm_.subscription_factory_.subscription_, + requestOnDemandUpdate(UnorderedElementsAre("another_cluster"))); + odcds_->updateOnDemand("another_cluster"); +} + +// Check that we report an error when we received a duplicated cluster. +TEST_F(OdCdsApiImplTest, ValidateDuplicateClusters) { + InSequence s; + + envoy::config::cluster::v3::Cluster cluster_1; + cluster_1.set_name("duplicate_cluster"); + const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1}); + + EXPECT_THROW_WITH_MESSAGE(odcds_callbacks_->onConfigUpdate(decoded_resources.refvec_, {}, ""), + EnvoyException, + "Error adding/updating cluster(s) duplicate_cluster: duplicate cluster " + "duplicate_cluster found"); +} + +// Check that notifier gets a message about potentially missing cluster. +TEST_F(OdCdsApiImplTest, NotifierGetsUsed) { + InSequence s; + + odcds_->updateOnDemand("cluster"); + EXPECT_CALL(notifier_, notifyMissingCluster("missing_cluster")); + std::vector v{"missing_cluster"}; + Protobuf::RepeatedPtrField removed(v.begin(), v.end()); + odcds_callbacks_->onConfigUpdate({}, removed, ""); +} + +// Check that notifier won't be used for a requested cluster that did +// not appear in the response. +TEST_F(OdCdsApiImplTest, NotifierNotUsed) { + InSequence s; + + odcds_->updateOnDemand("cluster"); + EXPECT_CALL(notifier_, notifyMissingCluster("cluster")).Times(0); + odcds_callbacks_->onConfigUpdate({}, {}, ""); + odcds_callbacks_->onConfigUpdate({}, {}, ""); + odcds_callbacks_->onConfigUpdate({}, {}, ""); + odcds_callbacks_->onConfigUpdate({}, {}, ""); +} + +} // namespace +} // namespace Upstream +} // namespace Envoy diff --git a/test/common/upstream/test_cluster_manager.h b/test/common/upstream/test_cluster_manager.h index f57acc673889f..091eeb1f709bc 100644 --- a/test/common/upstream/test_cluster_manager.h +++ b/test/common/upstream/test_cluster_manager.h @@ -183,6 +183,18 @@ class TestClusterManagerImpl : public ClusterManagerImpl { } return clusters; } + + OdCdsApiHandlePtr createOdCdsApiHandle(OdCdsApiSharedPtr odcds) { + return ClusterManagerImpl::OdCdsApiHandleImpl::create(*this, std::move(odcds)); + } + + void notifyExpiredDiscovery(absl::string_view name) { + ClusterManagerImpl::notifyExpiredDiscovery(name); + } + + ClusterDiscoveryManager createAndSwapClusterDiscoveryManager(std::string thread_name) { + return ClusterManagerImpl::createAndSwapClusterDiscoveryManager(std::move(thread_name)); + } }; // Override postThreadLocalClusterUpdate so we can test that merged updates calls diff --git a/test/mocks/upstream/BUILD b/test/mocks/upstream/BUILD index 5ab7a4d101098..a915583143a1b 100644 --- a/test/mocks/upstream/BUILD +++ b/test/mocks/upstream/BUILD @@ -72,6 +72,7 @@ envoy_cc_mock( deps = [ ":basic_resource_limit_mocks", ":cds_api_mocks", + ":cluster_discovery_callback_handle_mocks", ":cluster_info_factory_mocks", ":cluster_manager_factory_mocks", ":cluster_manager_mocks", @@ -85,6 +86,9 @@ envoy_cc_mock( ":host_set_mocks", ":load_balancer_context_mock", ":load_balancer_mocks", + ":missing_cluster_notifier_mocks", + ":od_cds_api_handle_mocks", + ":od_cds_api_mocks", ":priority_set_mocks", ":retry_host_predicate_mocks", ":retry_priority_factory_mocks", @@ -248,6 +252,7 @@ envoy_cc_mock( "//test/mocks/http:http_mocks", "//test/mocks/tcp:tcp_mocks", "//test/mocks/upstream:cluster_manager_factory_mocks", + "//test/mocks/upstream:od_cds_api_handle_mocks", "//test/mocks/upstream:thread_local_cluster_mocks", ], ) @@ -279,6 +284,43 @@ envoy_cc_mock( ], ) +envoy_cc_mock( + name = "missing_cluster_notifier_mocks", + srcs = ["missing_cluster_notifier.cc"], + hdrs = ["missing_cluster_notifier.h"], + deps = [ + "//source/common/upstream:od_cds_api_lib", + ], +) + +envoy_cc_mock( + name = "od_cds_api_mocks", + srcs = ["od_cds_api.cc"], + hdrs = ["od_cds_api.h"], + deps = [ + "//source/common/upstream:od_cds_api_lib", + ], +) + +envoy_cc_mock( + name = "od_cds_api_handle_mocks", + srcs = ["od_cds_api_handle.cc"], + hdrs = ["od_cds_api_handle.h"], + deps = [ + ":cluster_discovery_callback_handle_mocks", + "//include/envoy/upstream:cluster_manager_interface", + ], +) + +envoy_cc_mock( + name = "cluster_discovery_callback_handle_mocks", + srcs = ["cluster_discovery_callback_handle.cc"], + hdrs = ["cluster_discovery_callback_handle.h"], + deps = [ + "//include/envoy/upstream:cluster_manager_interface", + ], +) + envoy_cc_mock( name = "cluster_update_callbacks_mocks", srcs = ["cluster_update_callbacks.cc"], diff --git a/test/mocks/upstream/cluster_discovery_callback_handle.cc b/test/mocks/upstream/cluster_discovery_callback_handle.cc new file mode 100644 index 0000000000000..05513c714e1b8 --- /dev/null +++ b/test/mocks/upstream/cluster_discovery_callback_handle.cc @@ -0,0 +1,10 @@ +#include "cluster_discovery_callback_handle.h" + +namespace Envoy { +namespace Upstream { + +MockClusterDiscoveryCallbackHandle::MockClusterDiscoveryCallbackHandle() = default; +MockClusterDiscoveryCallbackHandle::~MockClusterDiscoveryCallbackHandle() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/cluster_discovery_callback_handle.h b/test/mocks/upstream/cluster_discovery_callback_handle.h new file mode 100644 index 0000000000000..d7568d10dd3c6 --- /dev/null +++ b/test/mocks/upstream/cluster_discovery_callback_handle.h @@ -0,0 +1,18 @@ +#pragma once + +#include "envoy/upstream/cluster_manager.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockClusterDiscoveryCallbackHandle : public ClusterDiscoveryCallbackHandle { +public: + MockClusterDiscoveryCallbackHandle(); + ~MockClusterDiscoveryCallbackHandle() override; +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/cluster_manager.cc b/test/mocks/upstream/cluster_manager.cc index a789f485d47b4..a7ee0dc0efb1f 100644 --- a/test/mocks/upstream/cluster_manager.cc +++ b/test/mocks/upstream/cluster_manager.cc @@ -26,6 +26,12 @@ MockClusterManager::MockClusterManager() ON_CALL(*this, grpcAsyncClientManager()).WillByDefault(ReturnRef(async_client_manager_)); ON_CALL(*this, localClusterName()).WillByDefault((ReturnRef(local_cluster_name_))); ON_CALL(*this, subscriptionFactory()).WillByDefault(ReturnRef(subscription_factory_)); + ON_CALL(*this, allocateOdCdsApi(_, _, _)) + .WillByDefault(Invoke([](const envoy::config::core::v3::ConfigSource&, + OptRef, + ProtobufMessage::ValidationVisitor&) -> OdCdsApiHandlePtr { + return MockOdCdsApiHandle::create(); + })); } MockClusterManager::~MockClusterManager() = default; diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index ff5649caf5170..a1bb090d9c03c 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -10,6 +10,7 @@ #include "cluster_manager_factory.h" #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "od_cds_api_handle.h" #include "thread_local_cluster.h" namespace Envoy { @@ -68,6 +69,10 @@ class MockClusterManager : public ClusterManager { const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override { return cluster_timeout_budget_stat_names_; } + MOCK_METHOD(OdCdsApiHandlePtr, allocateOdCdsApi, + (const envoy::config::core::v3::ConfigSource& odcds_config, + OptRef odcds_resources_locator, + ProtobufMessage::ValidationVisitor& validation_visitor)); NiceMock thread_local_cluster_; envoy::config::core::v3::BindConfig bind_config_; @@ -86,5 +91,4 @@ class MockClusterManager : public ClusterManager { ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_; }; } // namespace Upstream - } // namespace Envoy diff --git a/test/mocks/upstream/missing_cluster_notifier.cc b/test/mocks/upstream/missing_cluster_notifier.cc new file mode 100644 index 0000000000000..2c15a43212307 --- /dev/null +++ b/test/mocks/upstream/missing_cluster_notifier.cc @@ -0,0 +1,10 @@ +#include "missing_cluster_notifier.h" + +namespace Envoy { +namespace Upstream { + +MockMissingClusterNotifier::MockMissingClusterNotifier() = default; +MockMissingClusterNotifier::~MockMissingClusterNotifier() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/missing_cluster_notifier.h b/test/mocks/upstream/missing_cluster_notifier.h new file mode 100644 index 0000000000000..ee235b9efeadd --- /dev/null +++ b/test/mocks/upstream/missing_cluster_notifier.h @@ -0,0 +1,20 @@ +#pragma once + +#include "common/upstream/od_cds_api_impl.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockMissingClusterNotifier : public MissingClusterNotifier { +public: + MockMissingClusterNotifier(); + ~MockMissingClusterNotifier() override; + + MOCK_METHOD(void, notifyMissingCluster, (absl::string_view name)); +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index 879280b0aef13..9206ebb20e538 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -27,6 +27,7 @@ #include "test/mocks/upstream/basic_resource_limit.h" #include "test/mocks/upstream/cds_api.h" #include "test/mocks/upstream/cluster.h" +#include "test/mocks/upstream/cluster_discovery_callback_handle.h" #include "test/mocks/upstream/cluster_info.h" #include "test/mocks/upstream/cluster_info_factory.h" #include "test/mocks/upstream/cluster_manager.h" @@ -40,6 +41,8 @@ #include "test/mocks/upstream/host_set.h" #include "test/mocks/upstream/load_balancer.h" #include "test/mocks/upstream/load_balancer_context.h" +#include "test/mocks/upstream/od_cds_api.h" +#include "test/mocks/upstream/od_cds_api_handle.h" #include "test/mocks/upstream/priority_set.h" #include "test/mocks/upstream/retry_host_predicate.h" #include "test/mocks/upstream/retry_priority.h" diff --git a/test/mocks/upstream/od_cds_api.cc b/test/mocks/upstream/od_cds_api.cc new file mode 100644 index 0000000000000..c5438817977a4 --- /dev/null +++ b/test/mocks/upstream/od_cds_api.cc @@ -0,0 +1,13 @@ +#include "od_cds_api.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +MockOdCdsApi::MockOdCdsApi() = default; +MockOdCdsApi::~MockOdCdsApi() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/od_cds_api.h b/test/mocks/upstream/od_cds_api.h new file mode 100644 index 0000000000000..8f4178e60e4d6 --- /dev/null +++ b/test/mocks/upstream/od_cds_api.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +#include "common/upstream/od_cds_api_impl.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockOdCdsApi; +using MockOdCdsApiSharedPtr = std::shared_ptr; + +class MockOdCdsApi : public OdCdsApi { +public: + static MockOdCdsApiSharedPtr create() { return std::make_shared(); } + + MockOdCdsApi(); + ~MockOdCdsApi() override; + + MOCK_METHOD(void, updateOnDemand, (std::string cluster_name)); +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/od_cds_api_handle.cc b/test/mocks/upstream/od_cds_api_handle.cc new file mode 100644 index 0000000000000..3f09cbb2e222a --- /dev/null +++ b/test/mocks/upstream/od_cds_api_handle.cc @@ -0,0 +1,23 @@ +#include "od_cds_api_handle.h" + +#include "cluster_discovery_callback_handle.h" + +namespace Envoy { +namespace Upstream { + +using ::testing::_; +using ::testing::Invoke; +using ::testing::NiceMock; + +MockOdCdsApiHandle::MockOdCdsApiHandle() { + ON_CALL(*this, requestOnDemandClusterDiscovery(_, _, _)) + .WillByDefault(Invoke([](absl::string_view, ClusterDiscoveryCallbackPtr, + std::chrono::milliseconds) -> ClusterDiscoveryCallbackHandlePtr { + return std::make_unique>(); + })); +} + +MockOdCdsApiHandle::~MockOdCdsApiHandle() = default; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/mocks/upstream/od_cds_api_handle.h b/test/mocks/upstream/od_cds_api_handle.h new file mode 100644 index 0000000000000..49a96e4223f51 --- /dev/null +++ b/test/mocks/upstream/od_cds_api_handle.h @@ -0,0 +1,26 @@ +#pragma once + +#include "envoy/upstream/cluster_manager.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Upstream { + +class MockOdCdsApiHandle; +using MockOdCdsApiHandlePtr = std::unique_ptr; + +class MockOdCdsApiHandle : public OdCdsApiHandle { +public: + static MockOdCdsApiHandlePtr create() { return std::make_unique(); } + MockOdCdsApiHandle(); + ~MockOdCdsApiHandle() override; + + MOCK_METHOD(ClusterDiscoveryCallbackHandlePtr, requestOnDemandClusterDiscovery, + (absl::string_view name, ClusterDiscoveryCallbackPtr callback, + std::chrono::milliseconds timeout)); +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 44481c60f7af5..f0643c4a9a956 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -222,6 +222,8 @@ Nilsson Nonhashable Oauth OCSP +OD +ODCDS OID OK OOM @@ -726,6 +728,7 @@ interpretable intra ints invariance +invoker iovec iovecs ips