Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ New Features
:ref:`max_upstream_rx_datagram_size <envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.max_upstream_rx_datagram_size>`
UDP proxy configuration to allow configuration of upstream max UDP datagram size. The defaults for
both remain 1500 bytes.
* xds: re-introduced unification of delta and sotw xDS multiplexers, based on work in https://github.com/envoyproxy/envoy/pull/8974. Added a new runtime config `envoy.reloadable_features.unified_mux` that when set to true, switches xDS to use the unified multiplexer implementation shared by both regular- and delta-xDS.


Deprecated
----------
55 changes: 55 additions & 0 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ struct ControlPlaneStats {
GENERATE_TEXT_READOUT_STRUCT)
};

struct Watch;

/**
* Handle on a muxed gRPC subscription. The subscription is canceled on destruction.
*/
Expand Down Expand Up @@ -110,6 +112,59 @@ class GrpcMux {

using TypeUrlMap = absl::flat_hash_map<std::string, std::string>;
static TypeUrlMap& typeUrlMap() { MUTABLE_CONSTRUCT_ON_FIRST_USE(TypeUrlMap, {}); }

// Unified mux interface starts here
/**
* Start a configuration subscription asynchronously for some API type and resources.
* @param type_url type URL corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster.
* @param resources set of resource names to watch for. If this is empty, then all
* resources for type_url will result in callbacks.
* @param callbacks the callbacks to be notified of configuration updates. These must be valid
* until GrpcMuxWatch is destroyed.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @param use_namespace_matching if namespace watch should be created. This is used for creating
* watches on collections of resources; individual members of a collection are identified by the
* namespace in resource name.
* @return Watch* an opaque watch token added or updated, to be used in future addOrUpdateWatch
* calls.
*/
virtual Watch* addWatch(const std::string& type_url,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder,
std::chrono::milliseconds init_fetch_timeout,
const bool use_namespace_matching) PURE;

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
virtual void updateWatch(const std::string& type_url, Watch* watch,
const absl::flat_hash_set<std::string>& resources,
const bool creating_namespace_watch) PURE;

/**
* Cleanup of a Watch* added by addOrUpdateWatch(). Receiving a Watch* from addOrUpdateWatch()
* makes you responsible for eventually invoking this cleanup.
* @param type_url type URL corresponding to xDS API e.g. type.googleapis.com/envoy.api.v2.Cluster
* @param watch the watch to be cleaned up.
*/
virtual void removeWatch(const std::string& type_url, Watch* watch) PURE;

/**
* Retrieves the current pause state as set by pause()/resume().
* @param type_url type URL corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster
* @return bool whether the API is paused.
*/
virtual bool paused(const std::string& type_url) const PURE;

/**
* Passes through to all multiplexed SubscriptionStates. To be called when something
* definitive happens with the initial fetch: either an update is successfully received,
* or some sort of error happened.*/
virtual void disableInitFetchTimeoutTimer() PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this and the above methods are not implemented in the non-unified versions. If that's the case, would it be possible to create an interface between GrpcMux and the unified versions that adds these methods?


virtual bool isUnified() const { return false; }
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
1 change: 1 addition & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ envoy_cc_library(
"//include/envoy/config:subscription_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/config/unified_mux:grpc_subscription_lib",
"//source/common/http:utility_lib",
"//source/common/protobuf",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
36 changes: 36 additions & 0 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ class GrpcMuxImpl : public GrpcMux,
return grpc_stream_;
}

// unified GrpcMux interface, not implemented by legacy multiplexers
Watch* addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
SubscriptionCallbacks&, OpaqueResourceDecoder&, std::chrono::milliseconds,
const bool) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void updateWatch(const std::string&, Watch*, const absl::flat_hash_set<std::string>&,
const bool) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void removeWatch(const std::string&, Watch*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

bool paused(const std::string&) const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

void disableInitFetchTimeoutTimer() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

private:
void drainRequests();
void setRetryTimer();
Expand Down Expand Up @@ -206,6 +224,24 @@ class NullGrpcMuxImpl : public GrpcMux,
void onEstablishmentFailure() override {}
void onDiscoveryResponse(std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&&,
ControlPlaneStats&) override {}

// unified GrpcMux interface, not implemented by legacy multiplexers
Watch* addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
SubscriptionCallbacks&, OpaqueResourceDecoder&, std::chrono::milliseconds,
const bool) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void updateWatch(const std::string&, Watch*, const absl::flat_hash_set<std::string>&,
const bool) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void removeWatch(const std::string&, Watch*) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

bool paused(const std::string&) const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

void disableInitFetchTimeoutTimer() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
};

} // namespace Config
Expand Down
15 changes: 13 additions & 2 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ class NewGrpcMuxImpl
return subscriptions_;
}

// unified GrpcMux interface, not implemented by legacy multiplexers
Watch* addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
SubscriptionCallbacks&, OpaqueResourceDecoder&, std::chrono::milliseconds,
const bool) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

bool paused(const std::string&) const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

void disableInitFetchTimeoutTimer() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

private:
class WatchImpl : public GrpcMuxWatch {
public:
Expand All @@ -112,14 +123,14 @@ class NewGrpcMuxImpl
NewGrpcMuxImpl& parent_;
};

void removeWatch(const std::string& type_url, Watch* watch);
void removeWatch(const std::string& type_url, Watch* watch) override;

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void updateWatch(const std::string& type_url, Watch* watch,
const absl::flat_hash_set<std::string>& resources,
bool creating_namespace_watch = false);
bool creating_namespace_watch = false) override;

void addSubscription(const std::string& type_url, const bool use_namespace_matching);

Expand Down
2 changes: 2 additions & 0 deletions source/common/config/pausable_ack_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ bool PausableAckQueue::empty() {
return true;
}

void PausableAckQueue::clear() { storage_.clear(); }

const UpdateAck& PausableAckQueue::front() {
for (const auto& entry : storage_) {
if (pauses_[entry.type_url_] == 0) {
Expand Down
1 change: 1 addition & 0 deletions source/common/config/pausable_ack_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class PausableAckQueue {
void pause(const std::string& type_url);
void resume(const std::string& type_url);
bool paused(const std::string& type_url) const;
void clear();

private:
// It's ok for non-existent subs to be paused/resumed. The cleanest way to support that is to give
Expand Down
39 changes: 39 additions & 0 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "common/config/http_subscription_impl.h"
#include "common/config/new_grpc_mux_impl.h"
#include "common/config/type_to_endpoint.h"
#include "common/config/unified_mux/grpc_mux_impl.h"
#include "common/config/unified_mux/grpc_subscription_impl.h"
#include "common/config/utility.h"
#include "common/config/xds_resource.h"
#include "common/http/utility.h"
Expand Down Expand Up @@ -57,6 +59,19 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
resource_decoder, stats, Utility::configSourceInitialFetchTimeout(config),
validation_visitor_);
case envoy::config::core::v3::ApiConfigSource::GRPC:
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
return std::make_unique<UnifiedMux::GrpcSubscriptionImpl>(
std::make_shared<UnifiedMux::GrpcMuxSotw>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope, true)
->create(),
dispatcher_, sotwGrpcMethod(type_url, transport_api_version), transport_api_version,
api_.randomGenerator(), scope, Utility::parseRateLimitSettings(api_config_source),
local_info_, api_config_source.set_node_on_first_message_only()),
type_url, callbacks, resource_decoder, stats, dispatcher_.timeSource(),
Utility::configSourceInitialFetchTimeout(config),
/*is_aggregated*/ false, use_namespace_matching);
}
return std::make_unique<GrpcSubscriptionImpl>(
std::make_shared<Config::GrpcMuxImpl>(
local_info_,
Expand All @@ -70,6 +85,20 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
Utility::configSourceInitialFetchTimeout(config),
/*is_aggregated*/ false, use_namespace_matching);
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
return std::make_unique<UnifiedMux::GrpcSubscriptionImpl>(
std::make_shared<UnifiedMux::GrpcMuxDelta>(
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope, true)
->create(),
dispatcher_, deltaGrpcMethod(type_url, transport_api_version),
transport_api_version, api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only()),
type_url, callbacks, resource_decoder, stats, dispatcher_.timeSource(),
Utility::configSourceInitialFetchTimeout(config), /*is_aggregated*/ false,
use_namespace_matching);
}
return std::make_unique<GrpcSubscriptionImpl>(
std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
Expand All @@ -87,6 +116,11 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
}
}
case envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kAds: {
if (cm_.adsMux()->isUnified()) {
return std::make_unique<UnifiedMux::GrpcSubscriptionImpl>(
cm_.adsMux(), type_url, callbacks, resource_decoder, stats, dispatcher_.timeSource(),
Utility::configSourceInitialFetchTimeout(config), true, use_namespace_matching);
}
return std::make_unique<GrpcSubscriptionImpl>(
cm_.adsMux(), callbacks, resource_decoder, stats, type_url, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), true, use_namespace_matching);
Expand Down Expand Up @@ -125,6 +159,11 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(

switch (api_config_source.api_type()) {
case envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC: {
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
return std::make_unique<UnifiedMux::GrpcCollectionSubscriptionImpl>(
collection_locator, cm_.adsMux(), callbacks, resource_decoder, stats,
dispatcher_.timeSource(), Utility::configSourceInitialFetchTimeout(config), false);
}
return std::make_unique<GrpcCollectionSubscriptionImpl>(
collection_locator, cm_.adsMux(), callbacks, resource_decoder, stats, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), false);
Expand Down
87 changes: 87 additions & 0 deletions source/common/config/unified_mux/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_library(
name = "grpc_mux_lib",
srcs = ["grpc_mux_impl.cc"],
hdrs = ["grpc_mux_impl.h"],
deps = [
":delta_subscription_state_lib",
":sotw_subscription_state_lib",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:async_client_interface",
"//source/common/config:api_version_lib",
"//source/common/config:decoded_resource_lib",
"//source/common/config:grpc_stream_lib",
"//source/common/config:pausable_ack_queue_lib",
"//source/common/config:watch_map_lib",
"//source/common/config:xds_context_params_lib",
"//source/common/config:xds_resource_lib",
"//source/common/memory:utils_lib",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "grpc_subscription_lib",
srcs = ["grpc_subscription_impl.cc"],
hdrs = ["grpc_subscription_impl.h"],
deps = [
":grpc_mux_lib",
"//include/envoy/config:subscription_interface",
"//source/common/config:grpc_stream_lib",
"//source/common/config:utility_lib",
"//source/common/config:xds_resource_lib",
"//source/common/protobuf:utility_lib",
],
)

envoy_cc_library(
name = "delta_subscription_state_lib",
srcs = ["delta_subscription_state.cc"],
hdrs = ["delta_subscription_state.h"],
deps = [
":subscription_state_lib",
"//source/common/config:api_version_lib",
"//source/common/config:utility_lib",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "sotw_subscription_state_lib",
srcs = ["sotw_subscription_state.cc"],
hdrs = ["sotw_subscription_state.h"],
deps = [
":subscription_state_lib",
"//source/common/config:utility_lib",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
],
)

envoy_cc_library(
name = "subscription_state_lib",
srcs = ["subscription_state.cc"],
hdrs = ["subscription_state.h"],
deps = [
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/local_info:local_info_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/config:ttl_lib",
"//source/common/config:update_ack_lib",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
],
)
Loading