Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,20 @@ class GrpcMux {
* @param callbacks the callbacks to be notified of configuration updates. These must be valid
* until GrpcMuxWatch is destroyed.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @param use_namespace_matching if namespace watch should be created. This is used for creating
* watches on collections of resources; individual members of a collection are identified by the
* namespace in resource name.
* @return GrpcMuxWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes
* away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr.
*/
virtual GrpcMuxWatchPtr addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) PURE;
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) PURE;

virtual void requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) PURE;
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
16 changes: 14 additions & 2 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ class UntypedConfigUpdateCallbacks {
* @param removed_resources names of resources that this fetch instructed to be removed.
* @param system_version_info aggregate response data "version", for debugging.
* @throw EnvoyException with reason if the config changes are rejected. Otherwise the changes
* are accepted. Accepted changes have their version_info reflected in subsequent requests.
* @param use_namespace_matching if the resources should me matched on their namespaces, rather
* than unique names. This is used when a collection of resources (e.g. virtual hosts in VHDS) is
* being updated. Accepted changes have their version_info reflected in subsequent
* requests.
*/
virtual void onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
Expand Down Expand Up @@ -174,15 +177,24 @@ class Subscription {
* Start a configuration subscription asynchronously. This should be called once and will continue
* to fetch throughout the lifetime of the Subscription object.
* @param resources set of resource names to fetch.
* @param use_namespace_matching if the subscription is for a collection of resources. In such a
* case a namespace watch will be created.
*/
virtual void start(const std::set<std::string>& resource_names) PURE;
virtual void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) PURE;

/**
* Update the resources to fetch.
* @param resources vector of resource names to fetch. It's a (not unordered_)set so that it can
* be passed to std::set_difference, which must be given sorted collections.
*/
virtual void updateResourceInterest(const std::set<std::string>& update_to_these_names) PURE;

/**
* Creates a discovery request for resources.
* @param add_these_names resource ids for inclusion in the discovery request.
*/
virtual void requestOnDemandUpdate(const std::set<std::string>& add_these_names) PURE;
};

using SubscriptionPtr = std::unique_ptr<Subscription>;
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/filesystem_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
}

// Config::Subscription
void FilesystemSubscriptionImpl::start(const std::set<std::string>&) {
void FilesystemSubscriptionImpl::start(const std::set<std::string>&, const bool) {
started_ = true;
// Attempt to read in case there is a file there already.
refresh();
Expand Down
5 changes: 4 additions & 1 deletion source/common/config/filesystem_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ class FilesystemSubscriptionImpl : public Config::Subscription,
// Config::Subscription
// We report all discovered resources in the watched file, so the resource names arguments are
// unused, and updateResourceInterest is a no-op (other than updating a stat).
void start(const std::set<std::string>&) override;
void start(const std::set<std::string>&, const bool use_namespace_matching = false) override;
void updateResourceInterest(const std::set<std::string>&) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

private:
void refresh();
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) {
OpaqueResourceDecoder& resource_decoder, const bool) {
auto watch =
std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url, *this);
ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);
Expand Down
13 changes: 11 additions & 2 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ class GrpcMuxImpl : public GrpcMux,

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) override;
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching = false) override;

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void handleDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);
Expand Down Expand Up @@ -159,10 +164,14 @@ class NullGrpcMuxImpl : public GrpcMux,
}

GrpcMuxWatchPtr addWatch(const std::string&, const std::set<std::string>&, SubscriptionCallbacks&,
OpaqueResourceDecoder&) override {
OpaqueResourceDecoder&, const bool) override {
ExceptionUtil::throwEnvoyException("ADS must be configured to support an ADS config source");
}

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void onWriteable() override {}
void onStreamEstablished() override {}
void onEstablishmentFailure() override {}
Expand Down
11 changes: 9 additions & 2 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ GrpcSubscriptionImpl::GrpcSubscriptionImpl(
init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated) {}

// Config::Subscription
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources,
const bool use_namespace_matching) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
Expand All @@ -28,7 +29,8 @@ void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_);
watch_ =
grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, use_namespace_matching);

// The attempt stat here is maintained for the purposes of having consistency between ADS and
// gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
Expand All @@ -48,6 +50,11 @@ void GrpcSubscriptionImpl::updateResourceInterest(
stats_.update_attempt_.inc();
}

void GrpcSubscriptionImpl::requestOnDemandUpdate(const std::set<std::string>& for_update) {
grpc_mux_->requestOnDemandUpdate(type_url_, for_update);
stats_.update_attempt_.inc();
}

// Config::SubscriptionCallbacks
void GrpcSubscriptionImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) {
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ class GrpcSubscriptionImpl : public Subscription,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;

void requestOnDemandUpdate(const std::set<std::string>& add_these_names) override;
// Config::SubscriptionCallbacks (all pass through to callbacks_!)
void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) override;
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/http_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl(
}

// Config::Subscription
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names, const bool) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
Expand Down
6 changes: 5 additions & 1 deletion source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
ProtobufMessage::ValidationVisitor& validation_visitor);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

// Http::RestApiFetcher
void createRequest(Http::RequestMessage& request) override;
Expand Down
48 changes: 30 additions & 18 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,6 @@ void NewGrpcMuxImpl::onDiscoveryResponse(
return;
}

// When an on-demand request is made a Watch is created using an alias, as the resource name isn't
// known at that point. When an update containing aliases comes back, we update Watches with
// resource names.
for (const auto& r : message->resources()) {
if (r.aliases_size() > 0) {
AddedRemoved converted = sub->second->watch_map_.convertAliasWatchesToNameWatches(r);
sub->second->sub_state_.updateSubscriptionInterest(converted.added_, converted.removed_);
}
}

kickOffAck(sub->second->sub_state_.handleResponse(*message));
Memory::Utils::tryShrinkHeap();
}
Expand Down Expand Up @@ -112,31 +102,51 @@ void NewGrpcMuxImpl::start() { grpc_stream_.establishNewStream(); }
GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) {
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) {
auto entry = subscriptions_.find(type_url);
if (entry == subscriptions_.end()) {
// We don't yet have a subscription for type_url! Make one!
addSubscription(type_url);
return addWatch(type_url, resources, callbacks, resource_decoder);
addSubscription(type_url, use_namespace_matching);
return addWatch(type_url, resources, callbacks, resource_decoder, use_namespace_matching);
}

Watch* watch = entry->second->watch_map_.addWatch(callbacks, resource_decoder);
// updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
updateWatch(type_url, watch, resources);
updateWatch(type_url, watch, resources, use_namespace_matching);
return std::make_unique<WatchImpl>(type_url, watch, *this);
}

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources) {
const std::set<std::string>& resources,
bool creating_namespace_watch) {
ASSERT(watch != nullptr);
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, resources);
sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_, added_removed.removed_);
if (creating_namespace_watch) {
// This is to prevent sending out of requests that contain prefixes instead of resource names
sub->second->sub_state_.updateSubscriptionInterest({}, {});
} else {
sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_,
added_removed.removed_);
}
// Tell the server about our change in interest, if any.
if (sub->second->sub_state_.subscriptionUpdatePending()) {
trySendDiscoveryRequests();
}
}

void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) {
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
sub->second->sub_state_.updateSubscriptionInterest(for_update, {});
// Tell the server about our change in interest, if any.
if (sub->second->sub_state_.subscriptionUpdatePending()) {
trySendDiscoveryRequests();
Expand All @@ -151,8 +161,10 @@ void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
entry->second->watch_map_.removeWatch(watch);
}

void NewGrpcMuxImpl::addSubscription(const std::string& type_url) {
subscriptions_.emplace(type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_));
void NewGrpcMuxImpl::addSubscription(const std::string& type_url,
const bool use_namespace_matching) {
subscriptions_.emplace(
type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_, use_namespace_matching));
subscription_ordering_.emplace_back(type_url);
}

Expand Down
16 changes: 11 additions & 5 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class NewGrpcMuxImpl

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) override;
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching = false) override;

void requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) override;

ScopedResume pause(const std::string& type_url) override;
ScopedResume pause(const std::vector<std::string> type_urls) override;
Expand All @@ -60,8 +64,9 @@ class NewGrpcMuxImpl
void start() override;

struct SubscriptionStuff {
SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info)
: sub_state_(type_url, watch_map_, local_info) {}
SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
const bool use_namespace_matching)
: watch_map_(use_namespace_matching), sub_state_(type_url, watch_map_, local_info) {}

WatchMap watch_map_;
DeltaSubscriptionState sub_state_;
Expand Down Expand Up @@ -108,9 +113,10 @@ class NewGrpcMuxImpl
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources);
const std::set<std::string>& resources,
const bool creating_namespace_watch = false);

void addSubscription(const std::string& type_url);
void addSubscription(const std::string& type_url, const bool use_namespace_matching);

void trySendDiscoveryRequests();

Expand Down
45 changes: 17 additions & 28 deletions source/common/config/watch_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@
namespace Envoy {
namespace Config {

namespace {
// Returns the namespace part (if there's any) in the resource name.
std::string namespaceFromName(const std::string& resource_name) {
const auto pos = resource_name.find_last_of('/');
// we are not interested in the "/" character in the namespace
return pos == std::string::npos ? "" : resource_name.substr(0, pos);
}
} // namespace

Watch* WatchMap::addWatch(SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) {
auto watch = std::make_unique<Watch>(callbacks, resource_decoder);
Expand Down Expand Up @@ -59,8 +68,14 @@ AddedRemoved WatchMap::updateWatchInterest(Watch* watch,
}

absl::flat_hash_set<Watch*> WatchMap::watchesInterestedIn(const std::string& resource_name) {
absl::flat_hash_set<Watch*> ret = wildcard_watches_;
const auto watches_interested = watch_interest_.find(resource_name);
absl::flat_hash_set<Watch*> ret;
if (!use_namespace_matching_) {
ret = wildcard_watches_;
}

const auto prefix = namespaceFromName(resource_name);
const auto resource_key = use_namespace_matching_ && !prefix.empty() ? prefix : resource_name;
const auto watches_interested = watch_interest_.find(resource_key);
if (watches_interested != watch_interest_.end()) {
for (const auto& watch : watches_interested->second) {
ret.insert(watch);
Expand Down Expand Up @@ -120,32 +135,6 @@ void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>
}
}

// For responses to on-demand requests, replace the original watch for an alias
// with one for the resource's name
AddedRemoved WatchMap::convertAliasWatchesToNameWatches(
const envoy::service::discovery::v3::Resource& resource) {
absl::flat_hash_set<Watch*> watches_to_update;
for (const auto& alias : resource.aliases()) {
const auto interested_watches = watch_interest_.find(alias);
if (interested_watches != watch_interest_.end()) {
for (const auto& interested_watch : interested_watches->second) {
watches_to_update.insert(interested_watch);
}
}
}

auto ret = AddedRemoved({}, {});
for (const auto& watch : watches_to_update) {
const auto& converted_watches = updateWatchInterest(watch, {resource.name()});
std::copy(converted_watches.added_.begin(), converted_watches.added_.end(),
std::inserter(ret.added_, ret.added_.end()));
std::copy(converted_watches.removed_.begin(), converted_watches.removed_.end(),
std::inserter(ret.removed_, ret.removed_.end()));
}

return ret;
}

void WatchMap::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
Expand Down
Loading