diff --git a/api/envoy/config/bootstrap/v3/bootstrap.proto b/api/envoy/config/bootstrap/v3/bootstrap.proto index 679505815114b..eaf0911c64b66 100644 --- a/api/envoy/config/bootstrap/v3/bootstrap.proto +++ b/api/envoy/config/bootstrap/v3/bootstrap.proto @@ -41,7 +41,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // ` for more detail. // Bootstrap :ref:`configuration overview `. -// [#next-free-field: 36] +// [#next-free-field: 37] message Bootstrap { option (udpa.annotations.versioning).previous_message_type = "envoy.config.bootstrap.v2.Bootstrap"; @@ -342,6 +342,18 @@ message Bootstrap { // TODO(abeyad): Add public-facing documentation. // [#not-implemented-hide:] core.v3.TypedExtensionConfig xds_delegate_extension = 35; + + // Optional XdsConfigTracker configuration, which allows tracking xDS responses in external components, + // e.g., external tracer or monitor. It provides the process point when receive, ingest, or fail to + // process xDS resources and messages. If a value is not specified, no XdsConfigTracker will be used. + // + // .. note:: + // + // There are no in-repo extensions currently, and the :repo:`XdsConfigTracker ` + // interface should be implemented before using. + // See :repo:`xds_config_tracker_integration_test ` + // for an example usage of the interface. + core.v3.TypedExtensionConfig xds_config_tracker_extension = 36; } // Administration interface :ref:`operations documentation diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 6a29ae143521e..9303fb9f79619 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -241,5 +241,9 @@ new_features: :ref:`source_address `. This allows setting :ref:`socket options ` when using the default unspecified bind address is desired. +- area: xds + change: | + added an api configuration :ref:`xds_config_tracker_extension ` in the bootstrap + to allow tracking xDS responses in external components, and provided the extension interface. deprecated: diff --git a/envoy/config/BUILD b/envoy/config/BUILD index 5f5a02b4b354d..d527fea857bfe 100644 --- a/envoy/config/BUILD +++ b/envoy/config/BUILD @@ -117,3 +117,16 @@ envoy_cc_library( "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", ], ) + +envoy_cc_library( + name = "xds_config_tracker_interface", + hdrs = ["xds_config_tracker.h"], + deps = [ + ":subscription_interface", + ":typed_config_interface", + "//envoy/protobuf:message_validator_interface", + "//source/common/config:update_ack_lib", + "//source/common/protobuf", + "@com_google_googleapis//google/rpc:status_cc_proto", + ], +) diff --git a/envoy/config/subscription.h b/envoy/config/subscription.h index 3d5c629eec53a..0c4d61a924c8e 100644 --- a/envoy/config/subscription.h +++ b/envoy/config/subscription.h @@ -4,6 +4,7 @@ #include #include "envoy/common/exception.h" +#include "envoy/common/optref.h" #include "envoy/common/pure.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "envoy/stats/stats_macros.h" @@ -59,6 +60,11 @@ class DecodedResource { * @return bool does the xDS discovery response have a set resource payload? */ virtual bool hasResource() const PURE; + + /** + * @return optional ref of a resource. + */ + virtual const OptRef metadata() const PURE; }; using DecodedResourcePtr = std::unique_ptr; diff --git a/envoy/config/xds_config_tracker.h b/envoy/config/xds_config_tracker.h new file mode 100644 index 0000000000000..202ae4f49506a --- /dev/null +++ b/envoy/config/xds_config_tracker.h @@ -0,0 +1,109 @@ +#pragma once + +#include +#include + +#include "envoy/common/optref.h" +#include "envoy/config/subscription.h" +#include "envoy/config/typed_config.h" +#include "envoy/protobuf/message_validator.h" +#include "envoy/stats/scope.h" + +#include "source/common/protobuf/protobuf.h" + +#include "google/rpc/status.pb.h" + +namespace Envoy { +namespace Config { + +/** + * An interface for hooking into xDS update events to provide the ablility to use some external + * processor in xDS update. This tracker provides the process point when the discovery response + * is received, when the resources are successfully processed and applied, and when there is any + * failure. + * + * Instance of this interface get invoked on the main Envoy thread. Thus, it is important + * for implementations of this interface to not execute any blocking operations on the same + * thread. + */ +class XdsConfigTracker { +public: + virtual ~XdsConfigTracker() = default; + + /** + * Invoked when SotW xDS configuration updates have been successfully parsed, applied on + * the Envoy instance, and are about to be ACK'ed. + * + * For SotW, the passed resources contain all the received resources except for the heart-beat + * ones in the original message. The call of this method means there is a subscriber for this + * type_url and the type of resource is same as the message's type_url. + * + * Note: this method is called when *all* the resources in a response are accepted. + * + * @param type_url The type url of xDS message. + * @param resources A list of decoded resources to add to the current state. + */ + virtual void onConfigAccepted(const absl::string_view type_url, + const std::vector& resources) PURE; + + /** + * Invoked when Delta xDS configuration updates have been successfully accepted, applied on + * the Envoy instance, and are about to be ACK'ed. + * + * For Delta, added_resources contains all the received added resources except for the heart-beat + * ones in the original message, and the removed resources are the same in the xDS message. + * + * Note: this method is called when *all* the resources in a response are accepted. + * + * @param type_url The type url of xDS message. + * @param added_resources A list of decoded resources to add to the current state. + * @param removed_resources A list of resources to remove from the current state. + */ + virtual void onConfigAccepted( + const absl::string_view type_url, + const Protobuf::RepeatedPtrField& added_resources, + const Protobuf::RepeatedPtrField& removed_resources) PURE; + + /** + * Invoked when xds configs are rejected during xDS ingestion. + * + * @param message The SotW discovery response message body. + * @param details The process state and error details. + */ + virtual void onConfigRejected(const envoy::service::discovery::v3::DiscoveryResponse& message, + const absl::string_view error_detail) PURE; + + /** + * Invoked when xds configs are rejected during xDS ingestion. + * + * @param message The Delta discovery response message body. + * @param details The process state and error details. + */ + virtual void + onConfigRejected(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message, + const absl::string_view error_detail) PURE; +}; + +using XdsConfigTrackerPtr = std::unique_ptr; +using XdsConfigTrackerOptRef = OptRef; + +/** + * A factory abstract class for creating instances of XdsConfigTracker. + */ +class XdsConfigTrackerFactory : public Config::TypedFactory { +public: + ~XdsConfigTrackerFactory() override = default; + + /** + * Creates an XdsConfigTracker using the given config. + */ + virtual XdsConfigTrackerPtr + createXdsConfigTracker(const ProtobufWkt::Any& config, + ProtobufMessage::ValidationVisitor& validation_visitor, + Event::Dispatcher& dispatcher, Stats::Scope& stats) PURE; + + std::string category() const override { return "envoy.config.xds_tracker"; } +}; + +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/BUILD b/source/common/config/BUILD index d9f1154263626..8181f7610be05 100644 --- a/source/common/config/BUILD +++ b/source/common/config/BUILD @@ -166,6 +166,7 @@ envoy_cc_library( ":xds_source_id_lib", "//envoy/config:grpc_mux_interface", "//envoy/config:subscription_interface", + "//envoy/config:xds_config_tracker_interface", "//envoy/config:xds_resources_delegate_interface", "//envoy/upstream:cluster_manager_interface", "//source/common/common:cleanup_lib", @@ -203,6 +204,7 @@ envoy_cc_library( ":watch_map_lib", ":xds_context_params_lib", ":xds_resource_lib", + "//envoy/config:xds_config_tracker_interface", "//envoy/event:dispatcher_interface", "//envoy/grpc:async_client_interface", "//source/common/memory:utils_lib", @@ -329,6 +331,7 @@ envoy_cc_library( ":xds_resource_lib", "//envoy/config:subscription_factory_interface", "//envoy/config:subscription_interface", + "//envoy/config:xds_config_tracker_interface", "//envoy/config:xds_resources_delegate_interface", "//envoy/server:instance_interface", "//envoy/upstream:cluster_manager_interface", @@ -434,6 +437,7 @@ envoy_cc_library( ":utility_lib", ":xds_resource_lib", "//envoy/config:subscription_interface", + "//envoy/config:xds_config_tracker_interface", "//source/common/common:assert_lib", "//source/common/common:cleanup_lib", "//source/common/common:minimal_logger_lib", diff --git a/source/common/config/decoded_resource_impl.h b/source/common/config/decoded_resource_impl.h index 65c5d9dc6a0c6..405cf1a1a4fa8 100644 --- a/source/common/config/decoded_resource_impl.h +++ b/source/common/config/decoded_resource_impl.h @@ -1,5 +1,6 @@ #pragma once +#include "envoy/common/optref.h" #include "envoy/config/subscription.h" #include "envoy/service/discovery/v3/discovery.pb.h" @@ -40,7 +41,7 @@ class DecodedResourceImpl : public DecodedResource { return std::unique_ptr(new DecodedResourceImpl( resource_decoder, absl::nullopt, Protobuf::RepeatedPtrField(), resource, true, - version, absl::nullopt)); + version, absl::nullopt, absl::nullopt)); } static DecodedResourceImplPtr @@ -51,21 +52,22 @@ class DecodedResourceImpl : public DecodedResource { DecodedResourceImpl(OpaqueResourceDecoder& resource_decoder, const envoy::service::discovery::v3::Resource& resource) - : DecodedResourceImpl(resource_decoder, resource.name(), resource.aliases(), - resource.resource(), resource.has_resource(), resource.version(), - resource.has_ttl() - ? absl::make_optional(std::chrono::milliseconds( - DurationUtil::durationToMilliseconds(resource.ttl()))) - : absl::nullopt) {} + : DecodedResourceImpl( + resource_decoder, resource.name(), resource.aliases(), resource.resource(), + resource.has_resource(), resource.version(), + resource.has_ttl() ? absl::make_optional(std::chrono::milliseconds( + DurationUtil::durationToMilliseconds(resource.ttl()))) + : absl::nullopt, + resource.has_metadata() ? makeOptRef(resource.metadata()) : absl::nullopt) {} DecodedResourceImpl(OpaqueResourceDecoder& resource_decoder, const xds::core::v3::CollectionEntry::InlineEntry& inline_entry) : DecodedResourceImpl(resource_decoder, inline_entry.name(), Protobuf::RepeatedPtrField(), inline_entry.resource(), - true, inline_entry.version(), absl::nullopt) {} + true, inline_entry.version(), absl::nullopt, absl::nullopt) {} DecodedResourceImpl(ProtobufTypes::MessagePtr resource, const std::string& name, const std::vector& aliases, const std::string& version) : resource_(std::move(resource)), has_resource_(true), name_(name), aliases_(aliases), - version_(version), ttl_(absl::nullopt) {} + version_(version), ttl_(absl::nullopt), metadata_(absl::nullopt) {} // Config::DecodedResource const std::string& name() const override { return name_; } @@ -74,15 +76,20 @@ class DecodedResourceImpl : public DecodedResource { const Protobuf::Message& resource() const override { return *resource_; }; bool hasResource() const override { return has_resource_; } absl::optional ttl() const override { return ttl_; } + const OptRef metadata() const override { + return metadata_; + } private: DecodedResourceImpl(OpaqueResourceDecoder& resource_decoder, absl::optional name, const Protobuf::RepeatedPtrField& aliases, const ProtobufWkt::Any& resource, bool has_resource, - const std::string& version, absl::optional ttl) + const std::string& version, absl::optional ttl, + const OptRef metadata) : resource_(resource_decoder.decodeResource(resource)), has_resource_(has_resource), name_(name ? *name : resource_decoder.resourceName(*resource_)), - aliases_(repeatedPtrFieldToVector(aliases)), version_(version), ttl_(ttl) {} + aliases_(repeatedPtrFieldToVector(aliases)), version_(version), ttl_(ttl), + metadata_(metadata) {} const ProtobufTypes::MessagePtr resource_; const bool has_resource_; @@ -91,6 +98,10 @@ class DecodedResourceImpl : public DecodedResource { const std::string version_; // Per resource TTL. const absl::optional ttl_; + + // This is the metadata info under the Resource wrapper. + // It is intended to be consumed in the xds_config_tracker extension. + const OptRef metadata_; }; struct DecodedResourcesWrapper { diff --git a/source/common/config/delta_subscription_state.cc b/source/common/config/delta_subscription_state.cc index 39429f88b4a5a..850dd565238d0 100644 --- a/source/common/config/delta_subscription_state.cc +++ b/source/common/config/delta_subscription_state.cc @@ -9,13 +9,16 @@ namespace { DeltaSubscriptionStateVariant getState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, const LocalInfo::LocalInfo& local_info, - Event::Dispatcher& dispatcher) { + Event::Dispatcher& dispatcher, + XdsConfigTrackerOptRef xds_config_tracker) { if (Runtime::runtimeFeatureEnabled("envoy.restart_features.explicit_wildcard_resource")) { return DeltaSubscriptionStateVariant(absl::in_place_type, - std::move(type_url), watch_map, local_info, dispatcher); + std::move(type_url), watch_map, local_info, dispatcher, + xds_config_tracker); } else { return DeltaSubscriptionStateVariant(absl::in_place_type, - std::move(type_url), watch_map, local_info, dispatcher); + std::move(type_url), watch_map, local_info, dispatcher, + xds_config_tracker); } } @@ -24,8 +27,10 @@ DeltaSubscriptionStateVariant getState(std::string type_url, DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, const LocalInfo::LocalInfo& local_info, - Event::Dispatcher& dispatcher) - : state_(getState(std::move(type_url), watch_map, local_info, dispatcher)) {} + Event::Dispatcher& dispatcher, + XdsConfigTrackerOptRef xds_config_tracker) + : state_(getState(std::move(type_url), watch_map, local_info, dispatcher, xds_config_tracker)) { +} void DeltaSubscriptionState::updateSubscriptionInterest( const absl::flat_hash_set& cur_added, diff --git a/source/common/config/delta_subscription_state.h b/source/common/config/delta_subscription_state.h index 6b613ade0b4fa..ec57f555280c9 100644 --- a/source/common/config/delta_subscription_state.h +++ b/source/common/config/delta_subscription_state.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/config/subscription.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/local_info/local_info.h" #include "envoy/service/discovery/v3/discovery.pb.h" @@ -20,7 +21,8 @@ using DeltaSubscriptionStateVariant = class DeltaSubscriptionState : public Logger::Loggable { public: DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, - const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher); + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + XdsConfigTrackerOptRef xds_config_tracker); void updateSubscriptionInterest(const absl::flat_hash_set& cur_added, const absl::flat_hash_set& cur_removed); @@ -37,6 +39,7 @@ class DeltaSubscriptionState : public Logger::Loggable { private: DeltaSubscriptionStateVariant state_; + XdsConfigTrackerOptRef xds_config_tracker_; }; } // namespace Config diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 8b1935ebb8430..6f730dc950348 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -39,12 +39,13 @@ GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info, Random::RandomGenerator& random, Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority) : grpc_stream_(this, std::move(async_client), service_method, random, dispatcher, scope, rate_limit_settings), local_info_(local_info), skip_subsequent_node_(skip_subsequent_node), - config_validators_(std::move(config_validators)), + config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker), xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority), first_stream_request_(true), dispatcher_(dispatcher), dynamic_update_callback_handle_(local_info.contextProvider().addDynamicContextUpdateCallback( @@ -219,6 +220,7 @@ void GrpcMuxImpl::onDiscoveryResponse( ControlPlaneStats& control_plane_stats) { const std::string type_url = message->type_url(); ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url, message->version_info()); + if (api_state_.count(type_url) == 0) { // TODO(yuval-k): This should never happen. consider dropping the stream as this is a // protocol violation @@ -286,6 +288,11 @@ void GrpcMuxImpl::onDiscoveryResponse( processDiscoveryResources(resources, api_state, type_url, message->version_info(), /*call_delegate=*/true); + + // Processing point when resources are successfully ingested. + if (xds_config_tracker_.has_value()) { + xds_config_tracker_->onConfigAccepted(type_url, resources); + } } END_TRY catch (const EnvoyException& e) { @@ -296,6 +303,11 @@ void GrpcMuxImpl::onDiscoveryResponse( ::google::rpc::Status* error_detail = api_state.request_.mutable_error_detail(); error_detail->set_code(Grpc::Status::WellKnownGrpcStatus::Internal); error_detail->set_message(Config::Utility::truncateGrpcStatusMessage(e.what())); + + // Processing point when there is any exception during the parse and ingestion process. + if (xds_config_tracker_.has_value()) { + xds_config_tracker_->onConfigRejected(*message, error_detail->message()); + } } previously_fetched_data_ = true; api_state.request_.set_response_nonce(message->nonce()); diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index 8b34046c58424..2c07c17ba7d60 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -8,6 +8,7 @@ #include "envoy/common/time.h" #include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/config/xds_resources_delegate.h" #include "envoy/event/dispatcher.h" #include "envoy/grpc/status.h" @@ -39,6 +40,7 @@ class GrpcMuxImpl : public GrpcMux, Random::RandomGenerator& random, Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority); @@ -182,6 +184,7 @@ class GrpcMuxImpl : public GrpcMux, const LocalInfo::LocalInfo& local_info_; const bool skip_subsequent_node_; CustomConfigValidatorsPtr config_validators_; + XdsConfigTrackerOptRef xds_config_tracker_; XdsResourcesDelegateOptRef xds_resources_delegate_; const std::string target_xds_authority_; bool first_stream_request_; diff --git a/source/common/config/new_delta_subscription_state.cc b/source/common/config/new_delta_subscription_state.cc index 6b495b7bbba50..9cf27f53feda5 100644 --- a/source/common/config/new_delta_subscription_state.cc +++ b/source/common/config/new_delta_subscription_state.cc @@ -14,7 +14,8 @@ namespace Config { NewDeltaSubscriptionState::NewDeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, const LocalInfo::LocalInfo& local_info, - Event::Dispatcher& dispatcher) + Event::Dispatcher& dispatcher, + XdsConfigTrackerOptRef xds_config_tracker) // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on // empty resources as updates. : supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"), @@ -36,7 +37,8 @@ NewDeltaSubscriptionState::NewDeltaSubscriptionState(std::string type_url, watch_map_.onConfigUpdate({}, removed_resources, ""); }, dispatcher, dispatcher.timeSource()), - type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info) {} + type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info), + xds_config_tracker_(xds_config_tracker) {} void NewDeltaSubscriptionState::updateSubscriptionInterest( const absl::flat_hash_set& cur_added, @@ -232,6 +234,12 @@ void NewDeltaSubscriptionState::handleGoodResponse( watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(), message.system_version_info()); + // Processing point when resources are successfully ingested. + if (xds_config_tracker_.has_value()) { + xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources, + message.removed_resources()); + } + if (Runtime::runtimeFeatureEnabled( "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { const auto scoped_update = ttl_.scopedTtlUpdate(); diff --git a/source/common/config/new_delta_subscription_state.h b/source/common/config/new_delta_subscription_state.h index 9ef841cffb22f..177de30f168ed 100644 --- a/source/common/config/new_delta_subscription_state.h +++ b/source/common/config/new_delta_subscription_state.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/config/subscription.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/event/dispatcher.h" #include "envoy/grpc/status.h" #include "envoy/local_info/local_info.h" @@ -77,7 +78,8 @@ namespace Config { class NewDeltaSubscriptionState : public Logger::Loggable { public: NewDeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, - const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher); + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + XdsConfigTrackerOptRef xds_config_tracker); // Update which resources we're interested in subscribing to. void updateSubscriptionInterest(const absl::flat_hash_set& cur_added, @@ -163,6 +165,7 @@ class NewDeltaSubscriptionState : public Logger::Loggable { const std::string type_url_; UntypedConfigUpdateCallbacks& watch_map_; const LocalInfo::LocalInfo& local_info_; + XdsConfigTrackerOptRef xds_config_tracker_; bool in_initial_legacy_wildcard_{true}; bool any_request_sent_yet_in_current_stream_{}; diff --git a/source/common/config/new_grpc_mux_impl.cc b/source/common/config/new_grpc_mux_impl.cc index 56d7ae05109b0..daf2c9f479f9f 100644 --- a/source/common/config/new_grpc_mux_impl.cc +++ b/source/common/config/new_grpc_mux_impl.cc @@ -40,7 +40,8 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, Random::RandomGenerator& random, Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, - CustomConfigValidatorsPtr&& config_validators) + CustomConfigValidatorsPtr&& config_validators, + XdsConfigTrackerOptRef xds_config_tracker) : grpc_stream_(this, std::move(async_client), service_method, random, dispatcher, scope, rate_limit_settings), local_info_(local_info), config_validators_(std::move(config_validators)), @@ -48,7 +49,7 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client, [this](absl::string_view resource_type_url) { onDynamicContextUpdate(resource_type_url); })), - dispatcher_(dispatcher) { + dispatcher_(dispatcher), xds_config_tracker_(xds_config_tracker) { AllMuxes::get().insert(this); } @@ -89,6 +90,7 @@ void NewGrpcMuxImpl::onDiscoveryResponse( ControlPlaneStats& control_plane_stats) { ENVOY_LOG(debug, "Received DeltaDiscoveryResponse for {} at version {}", message->type_url(), message->system_version_info()); + auto sub = subscriptions_.find(message->type_url()); if (sub == subscriptions_.end()) { ENVOY_LOG(warn, @@ -108,7 +110,14 @@ void NewGrpcMuxImpl::onDiscoveryResponse( } } - kickOffAck(sub->second->sub_state_.handleResponse(*message)); + auto ack = sub->second->sub_state_.handleResponse(*message); + + // Processing point to record error if there is any failure after the response is processed. + if (xds_config_tracker_.has_value() && + ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) { + xds_config_tracker_->onConfigRejected(*message, ack.error_detail_.message()); + } + kickOffAck(ack); Memory::Utils::tryShrinkHeap(); } @@ -233,9 +242,9 @@ void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) { void NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use_namespace_matching) { - subscriptions_.emplace( - type_url, std::make_unique(type_url, local_info_, use_namespace_matching, - dispatcher_, *config_validators_.get())); + subscriptions_.emplace(type_url, std::make_unique( + type_url, local_info_, use_namespace_matching, dispatcher_, + *config_validators_.get(), xds_config_tracker_)); subscription_ordering_.emplace_back(type_url); } diff --git a/source/common/config/new_grpc_mux_impl.h b/source/common/config/new_grpc_mux_impl.h index bcf8dfee0a464..60625f717263d 100644 --- a/source/common/config/new_grpc_mux_impl.h +++ b/source/common/config/new_grpc_mux_impl.h @@ -6,6 +6,7 @@ #include "envoy/common/token_bucket.h" #include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "source/common/common/logger.h" @@ -34,7 +35,8 @@ class NewGrpcMuxImpl const Protobuf::MethodDescriptor& service_method, Random::RandomGenerator& random, Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, - CustomConfigValidatorsPtr&& config_validators); + CustomConfigValidatorsPtr&& config_validators, + XdsConfigTrackerOptRef xds_config_tracker); ~NewGrpcMuxImpl() override; @@ -83,9 +85,10 @@ class NewGrpcMuxImpl struct SubscriptionStuff { SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info, const bool use_namespace_matching, Event::Dispatcher& dispatcher, - CustomConfigValidators& config_validators) + CustomConfigValidators& config_validators, + XdsConfigTrackerOptRef xds_config_tracker) : watch_map_(use_namespace_matching, type_url, config_validators), - sub_state_(type_url, watch_map_, local_info, dispatcher) {} + sub_state_(type_url, watch_map_, local_info, dispatcher, xds_config_tracker) {} WatchMap watch_map_; DeltaSubscriptionState sub_state_; @@ -178,6 +181,7 @@ class NewGrpcMuxImpl CustomConfigValidatorsPtr config_validators_; Common::CallbackHandlePtr dynamic_update_callback_handle_; Event::Dispatcher& dispatcher_; + XdsConfigTrackerOptRef xds_config_tracker_; // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is // true because it may contain dangling pointers. diff --git a/source/common/config/old_delta_subscription_state.cc b/source/common/config/old_delta_subscription_state.cc index 993727feaa768..69461ae9158b0 100644 --- a/source/common/config/old_delta_subscription_state.cc +++ b/source/common/config/old_delta_subscription_state.cc @@ -14,7 +14,8 @@ namespace Config { OldDeltaSubscriptionState::OldDeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, const LocalInfo::LocalInfo& local_info, - Event::Dispatcher& dispatcher) + Event::Dispatcher& dispatcher, + XdsConfigTrackerOptRef xds_config_tracker) // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on // empty resources as updates. : supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"), @@ -30,7 +31,7 @@ OldDeltaSubscriptionState::OldDeltaSubscriptionState(std::string type_url, }, dispatcher, dispatcher.timeSource()), type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info), - dispatcher_(dispatcher) {} + dispatcher_(dispatcher), xds_config_tracker_(xds_config_tracker) {} void OldDeltaSubscriptionState::updateSubscriptionInterest( const absl::flat_hash_set& cur_added, @@ -138,6 +139,12 @@ void OldDeltaSubscriptionState::handleGoodResponse( watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(), message.system_version_info()); + // Processing point when resources are successfully ingested. + if (xds_config_tracker_.has_value()) { + xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources, + message.removed_resources()); + } + // If a resource is gone, there is no longer a meaningful version for it that makes sense to // provide to the server upon stream reconnect: either it will continue to not exist, in which // case saying nothing is fine, or the server will bring back something new, which we should diff --git a/source/common/config/old_delta_subscription_state.h b/source/common/config/old_delta_subscription_state.h index f8aef137f1337..f1efcb5544450 100644 --- a/source/common/config/old_delta_subscription_state.h +++ b/source/common/config/old_delta_subscription_state.h @@ -1,6 +1,7 @@ #pragma once #include "envoy/config/subscription.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/event/dispatcher.h" #include "envoy/grpc/status.h" #include "envoy/local_info/local_info.h" @@ -25,7 +26,8 @@ namespace Config { class OldDeltaSubscriptionState : public Logger::Loggable { public: OldDeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, - const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher); + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + XdsConfigTrackerOptRef xds_config_tracker); // Update which resources we're interested in subscribing to. void updateSubscriptionInterest(const absl::flat_hash_set& cur_added, @@ -107,6 +109,7 @@ class OldDeltaSubscriptionState : public Logger::Loggable { UntypedConfigUpdateCallbacks& watch_map_; const LocalInfo::LocalInfo& local_info_; Event::Dispatcher& dispatcher_; + XdsConfigTrackerOptRef xds_config_tracker_; std::chrono::milliseconds init_fetch_timeout_; bool any_request_sent_yet_in_current_stream_{}; diff --git a/source/common/config/subscription_factory_impl.cc b/source/common/config/subscription_factory_impl.cc index 7ed7682a5fdb6..d08dce8d3efae 100644 --- a/source/common/config/subscription_factory_impl.cc +++ b/source/common/config/subscription_factory_impl.cc @@ -24,10 +24,10 @@ SubscriptionFactoryImpl::SubscriptionFactoryImpl( const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api, const Server::Instance& server, - XdsResourcesDelegateOptRef xds_resources_delegate) + XdsResourcesDelegateOptRef xds_resources_delegate, XdsConfigTrackerOptRef xds_config_tracker) : local_info_(local_info), dispatcher_(dispatcher), cm_(cm), validation_visitor_(validation_visitor), api_(api), server_(server), - xds_resources_delegate_(xds_resources_delegate) {} + xds_resources_delegate_(xds_resources_delegate), xds_config_tracker_(xds_config_tracker) {} SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource( const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url, @@ -88,7 +88,7 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource( dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope, Utility::parseRateLimitSettings(api_config_source), local_info_, api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - xds_resources_delegate_, control_plane_id); + xds_config_tracker_, xds_resources_delegate_, control_plane_id); } else { mux = std::make_shared( local_info_, @@ -98,7 +98,7 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource( dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope, Utility::parseRateLimitSettings(api_config_source), api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - xds_resources_delegate_, control_plane_id); + xds_config_tracker_, xds_resources_delegate_, control_plane_id); } return std::make_unique( std::move(mux), callbacks, resource_decoder, stats, type_url, dispatcher_, @@ -117,8 +117,8 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource( ->createUncachedRawAsyncClient(), dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope, Utility::parseRateLimitSettings(api_config_source), local_info_, - api_config_source.set_node_on_first_message_only(), - std::move(custom_config_validators)); + api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), + xds_config_tracker_); } else { mux = std::make_shared( Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), @@ -126,7 +126,7 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource( ->createUncachedRawAsyncClient(), dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope, Utility::parseRateLimitSettings(api_config_source), local_info_, - std::move(custom_config_validators)); + std::move(custom_config_validators), xds_config_tracker_); } return std::make_unique( std::move(mux), callbacks, resource_decoder, stats, type_url, dispatcher_, @@ -192,7 +192,7 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl( ->createUncachedRawAsyncClient(), dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope, Utility::parseRateLimitSettings(api_config_source), local_info_, - std::move(custom_config_validators)), + std::move(custom_config_validators), xds_config_tracker_), callbacks, resource_decoder, stats, dispatcher_, Utility::configSourceInitialFetchTimeout(config), false, options); } diff --git a/source/common/config/subscription_factory_impl.h b/source/common/config/subscription_factory_impl.h index 8bbbaa2219731..2f37e08408e72 100644 --- a/source/common/config/subscription_factory_impl.h +++ b/source/common/config/subscription_factory_impl.h @@ -5,6 +5,7 @@ #include "envoy/config/core/v3/config_source.pb.h" #include "envoy/config/subscription.h" #include "envoy/config/subscription_factory.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/config/xds_resources_delegate.h" #include "envoy/server/instance.h" #include "envoy/stats/scope.h" @@ -21,7 +22,8 @@ class SubscriptionFactoryImpl : public SubscriptionFactory, Logger::LoggableonConfigAccepted(message.type_url(), non_heartbeat_resources, + message.removed_resources()); + } + if (Runtime::runtimeFeatureEnabled( "envoy.reloadable_features.delta_xds_subscription_state_tracking_fix")) { const auto scoped_update = ttl_.scopedTtlUpdate(); diff --git a/source/common/config/xds_mux/delta_subscription_state.h b/source/common/config/xds_mux/delta_subscription_state.h index 64daec95baea5..22d9ee371245f 100644 --- a/source/common/config/xds_mux/delta_subscription_state.h +++ b/source/common/config/xds_mux/delta_subscription_state.h @@ -20,7 +20,7 @@ class DeltaSubscriptionState envoy::service::discovery::v3::DeltaDiscoveryRequest> { public: DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, - Event::Dispatcher& dispatcher); + Event::Dispatcher& dispatcher, XdsConfigTrackerOptRef xds_config_tracker); ~DeltaSubscriptionState() override; @@ -114,10 +114,11 @@ class DeltaSubscriptionStateFactory : public SubscriptionStateFactory makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks, - OpaqueResourceDecoderSharedPtr, + OpaqueResourceDecoderSharedPtr, XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef /*xds_resources_delegate*/, const std::string& /*target_xds_authority*/) override { - return std::make_unique(type_url, callbacks, dispatcher_); + return std::make_unique(type_url, callbacks, dispatcher_, + xds_config_tracker); } private: diff --git a/source/common/config/xds_mux/grpc_mux_impl.cc b/source/common/config/xds_mux/grpc_mux_impl.cc index 65ed4d9cbde5a..9848295fd7ce8 100644 --- a/source/common/config/xds_mux/grpc_mux_impl.cc +++ b/source/common/config/xds_mux/grpc_mux_impl.cc @@ -42,7 +42,8 @@ GrpcMuxImpl::GrpcMuxImpl( Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, Random::RandomGenerator& random, Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, CustomConfigValidatorsPtr&& config_validators, - XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority) + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, + const std::string& target_xds_authority) : grpc_stream_(this, std::move(async_client), service_method, random, dispatcher, scope, rate_limit_settings), subscription_state_factory_(std::move(subscription_state_factory)), @@ -51,7 +52,7 @@ GrpcMuxImpl::GrpcMuxImpl( [this](absl::string_view resource_type_url) { onDynamicContextUpdate(resource_type_url); })), - config_validators_(std::move(config_validators)), + config_validators_(std::move(config_validators)), xds_config_tracker_(xds_config_tracker), xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority) { Config::Utility::checkLocalInfo("ads", local_info); AllMuxes::get().insert(this); @@ -91,7 +92,8 @@ Config::GrpcMuxWatchPtr GrpcMuxImpl::addWatch( .first; subscriptions_.emplace(type_url, subscription_state_factory_->makeSubscriptionState( type_url, *watch_maps_[type_url], resource_decoder, - xds_resources_delegate_, target_xds_authority_)); + xds_config_tracker_, xds_resources_delegate_, + target_xds_authority_)); subscription_ordering_.emplace_back(type_url); } @@ -364,10 +366,11 @@ GrpcMuxDelta::GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispat Random::RandomGenerator& random, Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node, - CustomConfigValidatorsPtr&& config_validators) + CustomConfigValidatorsPtr&& config_validators, + XdsConfigTrackerOptRef xds_config_tracker) : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, local_info, std::move(async_client), dispatcher, service_method, random, scope, - rate_limit_settings, std::move(config_validators)) {} + rate_limit_settings, std::move(config_validators), xds_config_tracker) {} // GrpcStreamCallbacks for GrpcMuxDelta void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url, @@ -386,12 +389,13 @@ GrpcMuxSotw::GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatch const RateLimitSettings& rate_limit_settings, const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node, CustomConfigValidatorsPtr&& config_validators, + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority) : GrpcMuxImpl(std::make_unique(dispatcher), skip_subsequent_node, local_info, std::move(async_client), dispatcher, service_method, random, scope, - rate_limit_settings, std::move(config_validators), xds_resources_delegate, - target_xds_authority) {} + rate_limit_settings, std::move(config_validators), xds_config_tracker, + xds_resources_delegate, target_xds_authority) {} Config::GrpcMuxWatchPtr NullGrpcMuxImpl::addWatch(const std::string&, const absl::flat_hash_set&, diff --git a/source/common/config/xds_mux/grpc_mux_impl.h b/source/common/config/xds_mux/grpc_mux_impl.h index 9e115358b0cab..5f960fbaebf54 100644 --- a/source/common/config/xds_mux/grpc_mux_impl.h +++ b/source/common/config/xds_mux/grpc_mux_impl.h @@ -9,6 +9,7 @@ #include "envoy/common/token_bucket.h" #include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/config/xds_resources_delegate.h" #include "envoy/event/dispatcher.h" #include "envoy/grpc/status.h" @@ -64,6 +65,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, Random::RandomGenerator& random, Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, CustomConfigValidatorsPtr&& config_validators, + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, const std::string& target_xds_authority = ""); @@ -206,6 +208,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, const LocalInfo::LocalInfo& local_info_; Common::CallbackHandlePtr dynamic_update_callback_handle_; CustomConfigValidatorsPtr config_validators_; + XdsConfigTrackerOptRef xds_config_tracker_; XdsResourcesDelegateOptRef xds_resources_delegate_; const std::string target_xds_authority_; @@ -222,7 +225,8 @@ class GrpcMuxDelta : public GrpcMuxImplonConfigAccepted(message.type_url(), non_heartbeat_resources); + } + // Send the resources to the xDS delegate, if configured. if (xds_resources_delegate_.has_value()) { XdsConfigSourceId source_id{target_xds_authority_, message.type_url()}; diff --git a/source/common/config/xds_mux/sotw_subscription_state.h b/source/common/config/xds_mux/sotw_subscription_state.h index 603d2f440663e..fb063ada022f8 100644 --- a/source/common/config/xds_mux/sotw_subscription_state.h +++ b/source/common/config/xds_mux/sotw_subscription_state.h @@ -23,6 +23,7 @@ class SotwSubscriptionState SotwSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& callbacks, Event::Dispatcher& dispatcher, OpaqueResourceDecoderSharedPtr resource_decoder, + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority); ~SotwSubscriptionState() override; @@ -74,11 +75,12 @@ class SotwSubscriptionStateFactory : public SubscriptionStateFactory makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder, + XdsConfigTrackerOptRef xds_config_tracker, XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority) override { return std::make_unique(type_url, callbacks, dispatcher_, - resource_decoder, xds_resources_delegate, - target_xds_authority); + resource_decoder, xds_config_tracker, + xds_resources_delegate, target_xds_authority); } private: diff --git a/source/common/config/xds_mux/subscription_state.h b/source/common/config/xds_mux/subscription_state.h index d165eb9b96e4b..41223d0829d44 100644 --- a/source/common/config/xds_mux/subscription_state.h +++ b/source/common/config/xds_mux/subscription_state.h @@ -5,6 +5,7 @@ #include "envoy/common/pure.h" #include "envoy/config/subscription.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/config/xds_resources_delegate.h" #include "envoy/event/dispatcher.h" #include "envoy/service/discovery/v3/discovery.pb.h" @@ -34,12 +35,13 @@ class BaseSubscriptionState : public SubscriptionState, // Note that, outside of tests, we expect callbacks to always be a WatchMap. BaseSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& callbacks, Event::Dispatcher& dispatcher, + XdsConfigTrackerOptRef xds_config_tracker = absl::nullopt, XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, const std::string& target_xds_authority = "") : ttl_([this](const std::vector& expired) { ttlExpiryCallback(expired); }, dispatcher, dispatcher.timeSource()), type_url_(std::move(type_url)), callbacks_(callbacks), dispatcher_(dispatcher), - xds_resources_delegate_(xds_resources_delegate), + xds_config_tracker_(xds_config_tracker), xds_resources_delegate_(xds_resources_delegate), target_xds_authority_(target_xds_authority) {} virtual ~BaseSubscriptionState() = default; @@ -68,6 +70,10 @@ class BaseSubscriptionState : public SubscriptionState, TRY_ASSERT_MAIN_THREAD { handleGoodResponse(response); } END_TRY catch (const EnvoyException& e) { + if (xds_config_tracker_.has_value()) { + xds_config_tracker_->onConfigRejected(response, + Config::Utility::truncateGrpcStatusMessage(e.what())); + } handleBadResponse(e, ack); } previously_fetched_data_ = true; @@ -121,6 +127,7 @@ class BaseSubscriptionState : public SubscriptionState, Event::Dispatcher& dispatcher_; bool dynamic_context_changed_{}; std::string control_plane_identifier_{}; + XdsConfigTrackerOptRef xds_config_tracker_; XdsResourcesDelegateOptRef xds_resources_delegate_; const std::string target_xds_authority_; bool previously_fetched_data_{}; @@ -133,6 +140,7 @@ template class SubscriptionStateFactory { virtual std::unique_ptr makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder, + XdsConfigTrackerOptRef xds_config_tracker = absl::nullopt, XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, const std::string& target_xds_authority = "") PURE; }; diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 0bf01bcdafa3d..9fa16f3827231 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -336,9 +336,18 @@ ClusterManagerImpl::ClusterManagerImpl( validation_context.dynamicValidationVisitor(), api, main_thread_dispatcher); } + if (bootstrap.has_xds_config_tracker_extension()) { + auto& tracer_factory = Config::Utility::getAndCheckFactory( + bootstrap.xds_config_tracker_extension()); + xds_config_tracker_ = tracer_factory.createXdsConfigTracker( + bootstrap.xds_config_tracker_extension().typed_config(), + validation_context.dynamicValidationVisitor(), main_thread_dispatcher, stats); + } + subscription_factory_ = std::make_unique( local_info, main_thread_dispatcher, *this, validation_context.dynamicValidationVisitor(), api, - server, makeOptRefFromPtr(xds_resources_delegate_.get())); + server, makeOptRefFromPtr(xds_resources_delegate_.get()), + makeOptRefFromPtr(xds_config_tracker_.get())); const auto& dyn_resources = bootstrap.dynamic_resources(); @@ -394,7 +403,7 @@ ClusterManagerImpl::ClusterManagerImpl( random_, stats_, Envoy::Config::Utility::parseRateLimitSettings(dyn_resources.ads_config()), local_info, dyn_resources.ads_config().set_node_on_first_message_only(), - std::move(custom_config_validators)); + std::move(custom_config_validators), makeOptRefFromPtr(xds_config_tracker_.get())); } else { ads_mux_ = std::make_shared( Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, @@ -405,7 +414,7 @@ ClusterManagerImpl::ClusterManagerImpl( "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"), random_, stats_, Envoy::Config::Utility::parseRateLimitSettings(dyn_resources.ads_config()), local_info, - std::move(custom_config_validators)); + std::move(custom_config_validators), makeOptRefFromPtr(xds_config_tracker_.get())); } } else { Config::Utility::checkTransportVersion(dyn_resources.ads_config()); @@ -425,7 +434,8 @@ ClusterManagerImpl::ClusterManagerImpl( random_, stats_, Envoy::Config::Utility::parseRateLimitSettings(dyn_resources.ads_config()), local_info, bootstrap.dynamic_resources().ads_config().set_node_on_first_message_only(), - std::move(custom_config_validators), xds_delegate_opt_ref, target_xds_authority); + std::move(custom_config_validators), makeOptRefFromPtr(xds_config_tracker_.get()), + xds_delegate_opt_ref, target_xds_authority); } else { ads_mux_ = std::make_shared( local_info, @@ -438,7 +448,8 @@ ClusterManagerImpl::ClusterManagerImpl( random_, stats_, Envoy::Config::Utility::parseRateLimitSettings(dyn_resources.ads_config()), bootstrap.dynamic_resources().ads_config().set_node_on_first_message_only(), - std::move(custom_config_validators), xds_delegate_opt_ref, target_xds_authority); + std::move(custom_config_validators), makeOptRefFromPtr(xds_config_tracker_.get()), + xds_delegate_opt_ref, target_xds_authority); } } } else { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 17ed38a0ab266..33e92d213dfdc 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -814,6 +814,7 @@ class ClusterManagerImpl : public ClusterManager, ClusterSet primary_clusters_; std::unique_ptr xds_resources_delegate_; + std::unique_ptr xds_config_tracker_; }; } // namespace Upstream diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 8e47d6be268a5..c45fd0c4c191a 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -36,6 +36,7 @@ envoy_cc_test( srcs = ["delta_subscription_impl_test.cc"], deps = [ ":delta_subscription_test_harness", + "//envoy/config:xds_config_tracker_interface", "//source/common/config:api_version_lib", "//source/common/config:grpc_subscription_lib", "//source/common/config:new_grpc_mux_lib", @@ -154,6 +155,7 @@ envoy_cc_test( name = "grpc_mux_impl_test", srcs = ["grpc_mux_impl_test.cc"], deps = [ + "//envoy/config:xds_config_tracker_interface", "//envoy/config:xds_resources_delegate_interface", "//source/common/config:api_version_lib", "//source/common/config:grpc_mux_lib", @@ -263,6 +265,7 @@ envoy_cc_test_library( hdrs = ["grpc_subscription_test_harness.h"], deps = [ ":subscription_test_harness", + "//envoy/config:xds_config_tracker_interface", "//envoy/config:xds_resources_delegate_interface", "//source/common/common:hash_lib", "//source/common/config:api_version_lib", @@ -288,6 +291,7 @@ envoy_cc_test_library( hdrs = ["delta_subscription_test_harness.h"], deps = [ ":subscription_test_harness", + "//envoy/config:xds_config_tracker_interface", "//source/common/common:utility_lib", "//source/common/config:new_grpc_mux_lib", "//source/common/config/xds_mux:grpc_mux_lib", @@ -350,6 +354,7 @@ envoy_cc_test( name = "subscription_factory_impl_test", srcs = ["subscription_factory_impl_test.cc"], deps = [ + "//envoy/config:xds_config_tracker_interface", "//envoy/config:xds_resources_delegate_interface", "//source/common/config:subscription_factory_lib", "//source/common/config:xds_resource_lib", @@ -479,6 +484,7 @@ envoy_cc_test( name = "watch_map_test", srcs = ["watch_map_test.cc"], deps = [ + "//envoy/config:xds_config_tracker_interface", "//source/common/config:watch_map_lib", "//test/mocks/config:config_mocks", "//test/mocks/config:custom_config_validators_mocks", diff --git a/test/common/config/decoded_resource_impl_test.cc b/test/common/config/decoded_resource_impl_test.cc index ef2584b638858..e494695d2a075 100644 --- a/test/common/config/decoded_resource_impl_test.cc +++ b/test/common/config/decoded_resource_impl_test.cc @@ -49,6 +49,27 @@ TEST(DecodedResourceImplTest, All) { EXPECT_EQ("foo", decoded_resource.version()); EXPECT_THAT(decoded_resource.resource(), ProtoEq(ProtobufWkt::Empty())); EXPECT_TRUE(decoded_resource.hasResource()); + EXPECT_FALSE(decoded_resource.metadata().has_value()); + } + + // To verify the metadata is decoded as expected. + { + envoy::service::discovery::v3::Resource resource_wrapper; + resource_wrapper.set_name("real_name"); + resource_wrapper.mutable_resource()->MergeFrom(some_opaque_resource); + auto metadata = resource_wrapper.mutable_metadata(); + metadata->mutable_filter_metadata()->insert( + {"fake_test_domain", MessageUtil::keyValueStruct("fake_test_key", "fake_test_value")}); + EXPECT_CALL(resource_decoder, decodeResource(ProtoEq(some_opaque_resource))) + .WillOnce(InvokeWithoutArgs( + []() -> ProtobufTypes::MessagePtr { return std::make_unique(); })); + EXPECT_CALL(resource_decoder, resourceName(ProtoEq(ProtobufWkt::Empty()))).Times(0); + DecodedResourceImpl decoded_resource(resource_decoder, resource_wrapper); + EXPECT_EQ("real_name", decoded_resource.name()); + EXPECT_THAT(decoded_resource.resource(), ProtoEq(ProtobufWkt::Empty())); + EXPECT_TRUE(decoded_resource.hasResource()); + EXPECT_TRUE(decoded_resource.metadata().has_value()); + EXPECT_EQ(metadata->DebugString(), decoded_resource.metadata()->DebugString()); } { diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc index 18a578b20be9e..eeef0307b7a78 100644 --- a/test/common/config/delta_subscription_impl_test.cc +++ b/test/common/config/delta_subscription_impl_test.cc @@ -1,5 +1,6 @@ #include "envoy/config/core/v3/base.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "source/common/buffer/zero_copy_input_stream_impl.h" @@ -152,12 +153,14 @@ TEST_P(DeltaSubscriptionNoGrpcStreamTest, NoGrpcStream) { xds_context = std::make_shared( std::unique_ptr(async_client), dispatcher, *method_descriptor, random, stats_store, rate_limit_settings, local_info, false, - std::make_unique>()); + std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()); } else { xds_context = std::make_shared( std::unique_ptr(async_client), dispatcher, *method_descriptor, random, stats_store, rate_limit_settings, local_info, - std::make_unique>()); + std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()); } GrpcSubscriptionImplPtr subscription = std::make_unique( diff --git a/test/common/config/delta_subscription_state_old_test.cc b/test/common/config/delta_subscription_state_old_test.cc index fa564c6ca21ad..f5fa768901c3e 100644 --- a/test/common/config/delta_subscription_state_old_test.cc +++ b/test/common/config/delta_subscription_state_old_test.cc @@ -1,6 +1,7 @@ #include #include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "source/common/config/delta_subscription_state.h" @@ -42,8 +43,8 @@ class OldDeltaSubscriptionStateTestBase : public testing::Test { scoped_runtime.mergeValues({ {"envoy.restart_features.explicit_wildcard_resource", "false"}, }); - state_ = std::make_unique(type_url, callbacks_, - local_info_, dispatcher_); + state_ = std::make_unique( + type_url, callbacks_, local_info_, dispatcher_, XdsConfigTrackerOptRef()); } updateSubscriptionInterest(initial_resources, {}); auto cur_request = getNextRequestAckless(); diff --git a/test/common/config/delta_subscription_state_test.cc b/test/common/config/delta_subscription_state_test.cc index 9f04784d07cd1..4f113cb57762e 100644 --- a/test/common/config/delta_subscription_state_test.cc +++ b/test/common/config/delta_subscription_state_test.cc @@ -1,6 +1,7 @@ #include #include "envoy/config/cluster/v3/cluster.pb.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "source/common/config/delta_subscription_state.h" @@ -59,11 +60,11 @@ class DeltaSubscriptionStateTestBase : public testing::TestWithParam(type_url, callbacks_, - dispatcher_); + state_ = std::make_unique( + type_url, callbacks_, dispatcher_, XdsConfigTrackerOptRef()); } else { - state_ = std::make_unique(type_url, callbacks_, - local_info_, dispatcher_); + state_ = std::make_unique( + type_url, callbacks_, local_info_, dispatcher_, XdsConfigTrackerOptRef()); } } diff --git a/test/common/config/delta_subscription_test_harness.h b/test/common/config/delta_subscription_test_harness.h index e3cfd7fc3fe01..c542426deab96 100644 --- a/test/common/config/delta_subscription_test_harness.h +++ b/test/common/config/delta_subscription_test_harness.h @@ -5,6 +5,7 @@ #include "envoy/config/core/v3/base.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.validate.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "source/common/config/grpc_subscription_impl.h" @@ -51,12 +52,14 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { xds_context_ = std::make_shared( std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, random_, stats_store_, rate_limit_settings_, local_info_, false, - std::make_unique>()); + std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()); } else { xds_context_ = std::make_shared( std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, random_, stats_store_, rate_limit_settings_, local_info_, - std::make_unique>()); + std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()); } subscription_ = std::make_unique( xds_context_, callbacks_, resource_decoder_, stats_, diff --git a/test/common/config/grpc_mux_impl_test.cc b/test/common/config/grpc_mux_impl_test.cc index fe0540bb474b4..3636b25f09319 100644 --- a/test/common/config/grpc_mux_impl_test.cc +++ b/test/common/config/grpc_mux_impl_test.cc @@ -2,6 +2,7 @@ #include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.validate.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/config/xds_resources_delegate.h" #include "envoy/service/discovery/v3/discovery.pb.h" @@ -64,6 +65,7 @@ class GrpcMuxImplTestBase : public testing::Test { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, rate_limit_settings_, true, std::move(config_validators_), + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""); } @@ -73,6 +75,7 @@ class GrpcMuxImplTestBase : public testing::Test { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, custom_rate_limit_settings, true, std::move(config_validators_), + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""); } @@ -893,6 +896,7 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, rate_limit_settings_, true, std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " @@ -908,6 +912,7 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, rate_limit_settings_, true, std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index c30f5ebeef649..4f30aff773045 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -5,6 +5,7 @@ #include "envoy/config/core/v3/base.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.validate.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/config/xds_resources_delegate.h" #include "envoy/service/discovery/v3/discovery.pb.h" @@ -59,12 +60,13 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { mux_ = std::make_shared( std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, random_, stats_store_, rate_limit_settings_, local_info_, true, - std::move(config_validators_)); + std::move(config_validators_), /*xds_config_tracker=*/XdsConfigTrackerOptRef()); } else { mux_ = std::make_shared( local_info_, std::unique_ptr(async_client_), dispatcher_, *method_descriptor_, random_, stats_store_, rate_limit_settings_, true, - std::move(config_validators_), /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + std::move(config_validators_), /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), /*target_xds_authority=*/""); } subscription_ = std::make_unique( diff --git a/test/common/config/new_grpc_mux_impl_test.cc b/test/common/config/new_grpc_mux_impl_test.cc index 9ce1ad7222d85..7157acdce7f86 100644 --- a/test/common/config/new_grpc_mux_impl_test.cc +++ b/test/common/config/new_grpc_mux_impl_test.cc @@ -2,6 +2,7 @@ #include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.validate.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/event/timer.h" #include "envoy/service/discovery/v3/discovery.pb.h" @@ -65,14 +66,16 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam { std::unique_ptr(async_client_), dispatcher_, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - random_, stats_, rate_limit_settings_, local_info_, false, std::move(config_validators_)); + random_, stats_, rate_limit_settings_, local_info_, false, std::move(config_validators_), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()); return; } grpc_mux_ = std::make_unique( std::unique_ptr(async_client_), dispatcher_, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"), - random_, stats_, rate_limit_settings_, local_info_, std::move(config_validators_)); + random_, stats_, rate_limit_settings_, local_info_, std::move(config_validators_), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()); } void expectSendMessage(const std::string& type_url, diff --git a/test/common/config/sotw_subscription_state_test.cc b/test/common/config/sotw_subscription_state_test.cc index 7bc5592175c8e..462949e96c3cf 100644 --- a/test/common/config/sotw_subscription_state_test.cc +++ b/test/common/config/sotw_subscription_state_test.cc @@ -100,8 +100,8 @@ class SotwSubscriptionStateTest : public testing::Test { xds_resources_delegate_ = std::make_unique(); state_ = std::make_unique( Config::getTypeUrl(), callbacks_, - dispatcher_, resource_decoder_, *xds_resources_delegate_, - /*target_xds_authority=*/"some_random_xds_server"); + dispatcher_, resource_decoder_, /*xds_config_tracker=*/XdsConfigTrackerOptRef(), + *xds_resources_delegate_, /*target_xds_authority=*/"some_random_xds_server"); state_->updateSubscriptionInterest({"name1", "name2", "name3"}, {}); auto cur_request = getNextDiscoveryRequestAckless(); EXPECT_THAT(cur_request->resource_names(), UnorderedElementsAre("name1", "name2", "name3")); diff --git a/test/common/config/subscription_factory_impl_test.cc b/test/common/config/subscription_factory_impl_test.cc index e1e712da8aea7..80ca4df3adc6a 100644 --- a/test/common/config/subscription_factory_impl_test.cc +++ b/test/common/config/subscription_factory_impl_test.cc @@ -6,6 +6,7 @@ #include "envoy/config/core/v3/config_source.pb.validate.h" #include "envoy/config/core/v3/grpc_service.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/config/xds_resources_delegate.h" #include "envoy/stats/scope.h" @@ -47,7 +48,8 @@ class SubscriptionFactoryTest : public testing::Test { http_request_(&cm_.thread_local_cluster_.async_client_), api_(Api::createApiForTest(stats_store_, random_)), subscription_factory_(local_info_, dispatcher_, cm_, validation_visitor_, *api_, server_, - /*xds_resources_delegate=*/XdsResourcesDelegateOptRef()) {} + /*xds_resources_delegate=*/XdsResourcesDelegateOptRef(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()) {} SubscriptionPtr subscriptionFromConfigSource(const envoy::config::core::v3::ConfigSource& config) { diff --git a/test/common/config/xds_grpc_mux_impl_test.cc b/test/common/config/xds_grpc_mux_impl_test.cc index 4147c14a9531f..3a139697dcc29 100644 --- a/test/common/config/xds_grpc_mux_impl_test.cc +++ b/test/common/config/xds_grpc_mux_impl_test.cc @@ -63,7 +63,8 @@ class GrpcMuxImplTestBase : public testing::Test { std::unique_ptr(async_client_), dispatcher_, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), - random_, stats_, rate_limit_settings_, local_info_, true, std::move(config_validators_)); + random_, stats_, rate_limit_settings_, local_info_, true, std::move(config_validators_), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()); } void setup(const RateLimitSettings& custom_rate_limit_settings) { @@ -72,7 +73,7 @@ class GrpcMuxImplTestBase : public testing::Test { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, custom_rate_limit_settings, local_info_, true, - std::move(config_validators_)); + std::move(config_validators_), /*xds_config_tracker=*/XdsConfigTrackerOptRef()); } void expectSendMessage(const std::string& type_url, @@ -895,7 +896,8 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyClusterName) { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, rate_limit_settings_, local_info_, true, - std::make_unique>()), + std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); @@ -909,7 +911,8 @@ TEST_F(GrpcMuxImplTest, BadLocalInfoEmptyNodeName) { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, rate_limit_settings_, local_info_, true, - std::make_unique>()), + std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()), EnvoyException, "ads: node 'id' and 'cluster' are required. Set it either in 'node' config or via " "--service-node and --service-cluster options."); @@ -1021,7 +1024,8 @@ TEST_F(GrpcMuxImplTest, AllMuxesStateTest) { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, rate_limit_settings_, local_info_, true, - std::make_unique>()); + std::make_unique>(), + /*xds_config_tracker=*/XdsConfigTrackerOptRef()); Config::XdsMux::GrpcMuxSotw::shutdownAll(); diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index f4b20fe9d1fba..84abeda51cf35 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -146,6 +146,12 @@ class DeltaSotwIntegrationParamTest testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ testing::Values(Grpc::LegacyOrUnified::Legacy, Grpc::LegacyOrUnified::Unified)) +#define DELTA_SOTW_UNIFIED_GRPC_CLIENT_INTEGRATION_PARAMS \ + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ + testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ + testing::Values(Grpc::SotwOrDelta::Sotw, Grpc::SotwOrDelta::Delta, \ + Grpc::SotwOrDelta::UnifiedSotw, \ + Grpc::SotwOrDelta::UnifiedDelta)) } // namespace Grpc } // namespace Envoy diff --git a/test/config/utility.cc b/test/config/utility.cc index 13f4add5e4104..c884060403d52 100644 --- a/test/config/utility.cc +++ b/test/config/utility.cc @@ -348,8 +348,7 @@ name: squash )EOF"; } -// TODO(#6327) cleaner approach to testing with static config. -std::string ConfigHelper::discoveredClustersBootstrap(const std::string& api_type) { +std::string ConfigHelper::clustersNoListenerBootstrap(const std::string& api_type) { return fmt::format( R"EOF( admin: @@ -389,6 +388,14 @@ std::string ConfigHelper::discoveredClustersBootstrap(const std::string& api_typ socket_address: address: 127.0.0.1 port_value: 0 +)EOF", + Platform::null_device_path, api_type); +} + +// TODO(#6327) cleaner approach to testing with static config. +std::string ConfigHelper::discoveredClustersBootstrap(const std::string& api_type) { + return absl::StrCat(clustersNoListenerBootstrap(api_type), + R"EOF( listeners: name: http address: @@ -421,8 +428,7 @@ std::string ConfigHelper::discoveredClustersBootstrap(const std::string& api_typ match: prefix: "/cluster2" domains: "*" -)EOF", - Platform::null_device_path, api_type); +)EOF"); } // TODO(#6327) cleaner approach to testing with static config. diff --git a/test/config/utility.h b/test/config/utility.h index 615d3326189a6..0f12fc3fb3640 100644 --- a/test/config/utility.h +++ b/test/config/utility.h @@ -196,6 +196,9 @@ class ConfigHelper { // Configuration for L7 proxying, with clusters cluster_1 and cluster_2 meant to be added via CDS. // api_type should be REST, GRPC, or DELTA_GRPC. static std::string discoveredClustersBootstrap(const std::string& api_type); + // Configuration for L7 proxying, with clusters cluster_1 and cluster_2 meant to be added via CDS. + // but there are no listeners. + static std::string clustersNoListenerBootstrap(const std::string& api_type); static std::string adsBootstrap(const std::string& api_type); // Builds a standard Cluster config fragment, with a single endpoint (at address:port). static envoy::config::cluster::v3::Cluster diff --git a/test/extensions/clusters/eds/eds_speed_test.cc b/test/extensions/clusters/eds/eds_speed_test.cc index db144ecad7f40..443b04371cb7c 100644 --- a/test/extensions/clusters/eds/eds_speed_test.cc +++ b/test/extensions/clusters/eds/eds_speed_test.cc @@ -5,6 +5,7 @@ #include "envoy/config/core/v3/health_check.pb.h" #include "envoy/config/endpoint/v3/endpoint.pb.h" #include "envoy/config/endpoint/v3/endpoint_components.pb.h" +#include "envoy/config/xds_config_tracker.h" #include "envoy/config/xds_resources_delegate.h" #include "envoy/service/discovery/v3/discovery.pb.h" #include "envoy/stats/scope.h" @@ -53,7 +54,8 @@ class EdsSpeedTest { std::unique_ptr(async_client_), server_context_.dispatcher_, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"), - random_, stats_, {}, local_info_, true, std::move(config_validators_))); + random_, stats_, {}, local_info_, true, std::move(config_validators_), + /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef())); } else { grpc_mux_.reset(new Config::GrpcMuxImpl( local_info_, std::unique_ptr(async_client_), @@ -61,6 +63,7 @@ class EdsSpeedTest { *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"), random_, stats_, {}, true, std::move(config_validators_), + /*xds_config_tracker=*/Config::XdsConfigTrackerOptRef(), /*xds_resources_delegate=*/Config::XdsResourcesDelegateOptRef(), /*target_xds_authority=*/"")); } diff --git a/test/integration/BUILD b/test/integration/BUILD index 63a175f1069a7..31658175a9c6c 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -2074,3 +2074,27 @@ envoy_proto_library( name = "xds_delegate_test_config_proto", srcs = ["xds_delegate_test_config.proto"], ) + +envoy_cc_test( + name = "xds_config_tracker_integration_test", + srcs = ["xds_config_tracker_integration_test.cc"], + external_deps = ["abseil_strings"], + deps = [ + ":http_integration_lib", + ":xds_config_tracker_test_proto_cc_proto", + "//envoy/config:subscription_interface", + "//envoy/config:xds_config_tracker_interface", + "//envoy/protobuf:message_validator_interface", + "//test/common/grpc:grpc_client_integration_lib", + "//test/config:v2_link_hacks", + "//test/test_common:registry_lib", + "@envoy_api//envoy/config/route/v3:pkg_cc_proto", + "@envoy_api//envoy/service/discovery/v3:pkg_cc_proto", + "@envoy_api//envoy/service/runtime/v3:pkg_cc_proto", + ], +) + +envoy_proto_library( + name = "xds_config_tracker_test_proto", + srcs = ["xds_config_tracker_test.proto"], +) diff --git a/test/integration/xds_config_tracker_integration_test.cc b/test/integration/xds_config_tracker_integration_test.cc new file mode 100644 index 0000000000000..96f58bdaf79cb --- /dev/null +++ b/test/integration/xds_config_tracker_integration_test.cc @@ -0,0 +1,243 @@ +#include + +#include "envoy/config/route/v3/route.pb.h" +#include "envoy/config/subscription.h" +#include "envoy/config/xds_config_tracker.h" +#include "envoy/protobuf/message_validator.h" +#include "envoy/service/discovery/v3/discovery.pb.h" +#include "envoy/service/runtime/v3/rtds.pb.h" + +#include "test/common/grpc/grpc_client_integration.h" +#include "test/config/v2_link_hacks.h" +#include "test/integration/http_integration.h" +#include "test/integration/xds_config_tracker_test.pb.h" +#include "test/test_common/registry.h" +#include "test/test_common/utility.h" + +#include "absl/synchronization/mutex.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +const char ClusterName1[] = "cluster_1"; +const char ClusterName2[] = "cluster_2"; +const int UpstreamIndex1 = 1; +const int UpstreamIndex2 = 2; + +/** + * All stats for this xds tracker. @see stats_macros.h + */ +#define ALL_TEST_XDS_TRACKER_STATS(COUNTER) \ + COUNTER(on_config_accepted) \ + COUNTER(on_config_rejected) + +/** + * Struct definition for stats. @see stats_macros.h + */ +struct TestXdsTrackerStats { + ALL_TEST_XDS_TRACKER_STATS(GENERATE_COUNTER_STRUCT) +}; + +/** + * A test implementation of the XdsConfigTracker extension. + * It just increases the test counter when a related method is called. + */ +class TestXdsConfigTracker : public Config::XdsConfigTracker { +public: + TestXdsConfigTracker(Stats::Scope& scope) : stats_(generateStats("test_xds_tracker", scope)) {} + + void onConfigAccepted(const absl::string_view, + const std::vector&) override { + stats_.on_config_accepted_.inc(); + } + + void onConfigAccepted(const absl::string_view, + const Protobuf::RepeatedPtrField&, + const Protobuf::RepeatedPtrField&) override { + stats_.on_config_accepted_.inc(); + } + + void onConfigRejected(const envoy::service::discovery::v3::DiscoveryResponse&, + const absl::string_view) override { + stats_.on_config_rejected_.inc(); + } + + void onConfigRejected(const envoy::service::discovery::v3::DeltaDiscoveryResponse&, + const absl::string_view) override { + stats_.on_config_rejected_.inc(); + } + +private: + TestXdsTrackerStats generateStats(const std::string& prefix, Stats::Scope& scope) { + return {ALL_TEST_XDS_TRACKER_STATS(POOL_COUNTER_PREFIX(scope, prefix))}; + } + TestXdsTrackerStats stats_; +}; + +class TestXdsConfigTrackerFactory : public Config::XdsConfigTrackerFactory { +public: + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return std::make_unique(); + } + + std::string name() const override { return "envoy.config.xds.test_xds_tracker"; }; + + Config::XdsConfigTrackerPtr createXdsConfigTracker(const ProtobufWkt::Any&, + ProtobufMessage::ValidationVisitor&, + Event::Dispatcher&, + Stats::Scope& stats) override { + return std::make_unique(stats); + } +}; + +class XdsConfigTrackerIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, + public HttpIntegrationTest { +public: + XdsConfigTrackerIntegrationTest() + : HttpIntegrationTest(Http::CodecType::HTTP2, ipVersion(), + ConfigHelper::clustersNoListenerBootstrap( + sotwOrDelta() == Grpc::SotwOrDelta::Sotw || + sotwOrDelta() == Grpc::SotwOrDelta::UnifiedSotw + ? "GRPC" + : "DELTA_GRPC")) { + + use_lds_ = false; + sotw_or_delta_ = sotwOrDelta(); + + config_helper_.addRuntimeOverride("envoy.reloadable_features.unified_mux", + (this->sotwOrDelta() == Grpc::SotwOrDelta::UnifiedDelta || + this->sotwOrDelta() == Grpc::SotwOrDelta::UnifiedSotw) + ? "true" + : "false"); + + // Add test xDS config tracer. + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* tracer_extension = bootstrap.mutable_xds_config_tracker_extension(); + tracer_extension->set_name("envoy.config.xds.test_xds_tracer"); + tracer_extension->mutable_typed_config()->PackFrom( + test::envoy::config::xds::TestXdsConfigTracker()); + }); + } + + void TearDown() override { + if (xds_connection_ != nullptr) { + cleanUpXdsConnection(); + } + } + + void initialize() override { + // The tests infra expects the xDS server to be the second fake upstream, so + // we need a dummy data plane cluster. + setUpstreamCount(1); + setUpstreamProtocol(Http::CodecType::HTTP2); + HttpIntegrationTest::initialize(); + + // Create the regular (i.e. not an xDS server) upstreams. + addFakeUpstream(Http::CodecType::HTTP2); + addFakeUpstream(Http::CodecType::HTTP2); + cluster1_ = ConfigHelper::buildStaticCluster( + ClusterName1, fake_upstreams_[UpstreamIndex1]->localAddress()->ip()->port(), + Network::Test::getLoopbackAddressString(ipVersion()), "ROUND_ROBIN"); + cluster2_ = ConfigHelper::buildStaticCluster( + ClusterName2, fake_upstreams_[UpstreamIndex2]->localAddress()->ip()->port(), + Network::Test::getLoopbackAddressString(ipVersion()), "ROUND_ROBIN"); + + acceptXdsConnection(); + registerTestServerPorts({}); + } + + void acceptXdsConnection() { + AssertionResult result = // xds_connection_ is filled with the new FakeHttpConnection. + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + } + + envoy::config::cluster::v3::Cluster cluster1_; + envoy::config::cluster::v3::Cluster cluster2_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, XdsConfigTrackerIntegrationTest, + DELTA_SOTW_UNIFIED_GRPC_CLIENT_INTEGRATION_PARAMS); + +TEST_P(XdsConfigTrackerIntegrationTest, XdsConfigTrackerSuccessCount) { + TestXdsConfigTrackerFactory factory; + Registry::InjectFactory registered(factory); + + initialize(); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {cluster1_, cluster2_}, {cluster1_, cluster2_}, {}, "1"); + + // 3 because the statically specified CDS server itself counts as a cluster. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); + + // onConfigAccepted is called when all the resources are accepted. + test_server_->waitForCounterEq("test_xds_tracker.on_config_accepted", 1); + EXPECT_EQ(1, test_server_->counter("test_xds_tracker.on_config_accepted")->value()); +} + +TEST_P(XdsConfigTrackerIntegrationTest, XdsConfigTrackerFailureCount) { + TestXdsConfigTrackerFactory factory; + Registry::InjectFactory registered(factory); + + initialize(); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + + const auto route_config = + TestUtility::parseYaml(R"EOF( + name: my_route + vhds: + config_source: + resource_api_version: V3 + api_config_source: + api_type: GRPC + transport_api_version: V3 + grpc_services: + envoy_grpc: + cluster_name: xds_cluster + )EOF"); + + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {route_config}, {route_config}, {}, "3"); + + // Resources are rejected because Message's TypeUrl != Resource's + test_server_->waitForCounterEq("test_xds_tracker.on_config_rejected", 1); + EXPECT_EQ(1, test_server_->counter("test_xds_tracker.on_config_rejected")->value()); +} + +TEST_P(XdsConfigTrackerIntegrationTest, XdsConfigTrackerPartialUpdate) { + TestXdsConfigTrackerFactory factory; + Registry::InjectFactory registered(factory); + + initialize(); + // The first of duplicates has already been successfully applied, + // and a duplicate exception should be threw. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {cluster1_, cluster1_, cluster2_}, + {cluster1_, cluster1_, cluster2_}, {}, "5"); + + // For Delta, the response will be rejected when checking the message due to the duplication. + // For SotW, both clusters are accepted, but the internal exception is not empty. + if (sotw_or_delta_ == Grpc::SotwOrDelta::Delta || + sotw_or_delta_ == Grpc::SotwOrDelta::UnifiedDelta) { + test_server_->waitForCounterGe("cluster_manager.cluster_added", 1); + } else { + test_server_->waitForCounterGe("cluster_manager.cluster_added", 3); + } + + // onConfigRejected is called if there is any exception even some resources are accepted. + test_server_->waitForCounterEq("test_xds_tracker.on_config_rejected", 1); + EXPECT_EQ(1, test_server_->counter("test_xds_tracker.on_config_rejected")->value()); + + // onConfigAccepted is called only when all the resources in a response are successfully ingested. + EXPECT_EQ(0, test_server_->counter("test_xds_tracker.on_config_accepted")->value()); +} + +} // namespace +} // namespace Envoy diff --git a/test/integration/xds_config_tracker_test.proto b/test/integration/xds_config_tracker_test.proto new file mode 100644 index 0000000000000..7a74cc0fed59e --- /dev/null +++ b/test/integration/xds_config_tracker_test.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package test.envoy.config.xds; + +// Configuration for TestXdsConfigTracker. +message TestXdsConfigTracker { +}