Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,20 @@ class GrpcMux {
* @param callbacks the callbacks to be notified of configuration updates. These must be valid
* until GrpcMuxWatch is destroyed.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @param use_namespace_matching if namespace watch should be created. This is used for creating
* watches on collections of resources; individual members of a collection are identified by the
* namespace in resource name.
* @return GrpcMuxWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes
* away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr.
*/
virtual GrpcMuxWatchPtr addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) PURE;
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) PURE;

virtual void requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) PURE;
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
18 changes: 15 additions & 3 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ class UntypedConfigUpdateCallbacks {
* @param removed_resources names of resources that this fetch instructed to be removed.
* @param system_version_info aggregate response data "version", for debugging.
* @throw EnvoyException with reason if the config changes are rejected. Otherwise the changes
* are accepted. Accepted changes have their version_info reflected in subsequent requests.
* @param use_namespace_matching if the resources should me matched on their namespaces, rather
* than unique names. This is used when a collection of resources (e.g. virtual hosts in VHDS) is
* being updated. are accepted. Accepted changes have their version_info reflected in subsequent
Comment thread
dmitri-d marked this conversation as resolved.
Outdated
* requests.
*/
virtual void onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) PURE;
const std::string& system_version_info, const bool use_namespace_matching) PURE;
Comment thread
dmitri-d marked this conversation as resolved.
Outdated

/**
* Called when either the Subscription is unable to fetch a config update or when onConfigUpdate
Expand All @@ -174,15 +177,24 @@ class Subscription {
* Start a configuration subscription asynchronously. This should be called once and will continue
* to fetch throughout the lifetime of the Subscription object.
* @param resources set of resource names to fetch.
* @param use_namespace_matching if the subscription is for a collection of resources. In such a
* case a namespace watch will be created.
*/
virtual void start(const std::set<std::string>& resource_names) PURE;
virtual void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) PURE;

/**
* Update the resources to fetch.
* @param resources vector of resource names to fetch. It's a (not unordered_)set so that it can
* be passed to std::set_difference, which must be given sorted collections.
*/
virtual void updateResourceInterest(const std::set<std::string>& update_to_these_names) PURE;

/**
* Creates a discovery request for resources.
* @param add_these_names resource ids for inclusion in the discovery request.
*/
virtual void requestOnDemandUpdate(const std::set<std::string>& add_these_names) PURE;
};

using SubscriptionPtr = std::unique_ptr<Subscription>;
Expand Down
8 changes: 5 additions & 3 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ namespace Config {

DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info)
: type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info) {}
const LocalInfo::LocalInfo& local_info,
const bool use_namespace_matching)
: type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info),
use_namespace_matching_(use_namespace_matching) {}

void DeltaSubscriptionState::updateSubscriptionInterest(const std::set<std::string>& cur_added,
const std::set<std::string>& cur_removed) {
Expand Down Expand Up @@ -82,7 +84,7 @@ void DeltaSubscriptionState::handleGoodResponse(
}
}
watch_map_.onConfigUpdate(message.resources(), message.removed_resources(),
message.system_version_info());
message.system_version_info(), use_namespace_matching_);
for (const auto& resource : message.resources()) {
setResourceVersion(resource.name(), resource.version());
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/config/delta_subscription_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Config {
class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
public:
DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info);
const LocalInfo::LocalInfo& local_info, const bool use_namespace_matching);

// Update which resources we're interested in subscribing to.
void updateSubscriptionInterest(const std::set<std::string>& cur_added,
Expand Down Expand Up @@ -100,6 +100,7 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
// Feel free to change to an unordered container once we figure out how to make it work.
std::set<std::string> names_added_;
std::set<std::string> names_removed_;
const bool use_namespace_matching_;
};

} // namespace Config
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/filesystem_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
}

// Config::Subscription
void FilesystemSubscriptionImpl::start(const std::set<std::string>&) {
void FilesystemSubscriptionImpl::start(const std::set<std::string>&, const bool) {
started_ = true;
// Attempt to read in case there is a file there already.
refresh();
Expand Down
5 changes: 4 additions & 1 deletion source/common/config/filesystem_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ class FilesystemSubscriptionImpl : public Config::Subscription,
// Config::Subscription
// We report all discovered resources in the watched file, so the resource names arguments are
// unused, and updateResourceInterest is a no-op (other than updating a stat).
void start(const std::set<std::string>&) override;
void start(const std::set<std::string>&, const bool use_namespace_matching = false) override;
void updateResourceInterest(const std::set<std::string>&) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

private:
void refresh();
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) {
OpaqueResourceDecoder& resource_decoder, const bool) {
auto watch =
std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url, *this);
ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);
Expand Down
13 changes: 11 additions & 2 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ class GrpcMuxImpl : public GrpcMux,

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) override;
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching = false) override;

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void handleDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);
Expand Down Expand Up @@ -159,10 +164,14 @@ class NullGrpcMuxImpl : public GrpcMux,
}

GrpcMuxWatchPtr addWatch(const std::string&, const std::set<std::string>&, SubscriptionCallbacks&,
OpaqueResourceDecoder&) override {
OpaqueResourceDecoder&, const bool) override {
ExceptionUtil::throwEnvoyException("ADS must be configured to support an ADS config source");
}

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void onWriteable() override {}
void onStreamEstablished() override {}
void onEstablishmentFailure() override {}
Expand Down
11 changes: 9 additions & 2 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ GrpcSubscriptionImpl::GrpcSubscriptionImpl(
init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated) {}

// Config::Subscription
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources,
const bool use_namespace_matching) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
Expand All @@ -28,7 +29,8 @@ void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_);
watch_ =
grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, use_namespace_matching);

// The attempt stat here is maintained for the purposes of having consistency between ADS and
// gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
Expand All @@ -48,6 +50,11 @@ void GrpcSubscriptionImpl::updateResourceInterest(
stats_.update_attempt_.inc();
}

void GrpcSubscriptionImpl::requestOnDemandUpdate(const std::set<std::string>& for_update) {
grpc_mux_->requestOnDemandUpdate(type_url_, for_update);
stats_.update_attempt_.inc();
}

// Config::SubscriptionCallbacks
void GrpcSubscriptionImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) {
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ class GrpcSubscriptionImpl : public Subscription,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;

void requestOnDemandUpdate(const std::set<std::string>& add_these_names) override;
// Config::SubscriptionCallbacks (all pass through to callbacks_!)
void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) override;
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/http_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl(
}

// Config::Subscription
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names, const bool) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
Expand Down
6 changes: 5 additions & 1 deletion source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
ProtobufMessage::ValidationVisitor& validation_visitor);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

// Http::RestApiFetcher
void createRequest(Http::RequestMessage& request) override;
Expand Down
48 changes: 30 additions & 18 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,6 @@ void NewGrpcMuxImpl::onDiscoveryResponse(
return;
}

// When an on-demand request is made a Watch is created using an alias, as the resource name isn't
// known at that point. When an update containing aliases comes back, we update Watches with
// resource names.
for (const auto& r : message->resources()) {
if (r.aliases_size() > 0) {
AddedRemoved converted = sub->second->watch_map_.convertAliasWatchesToNameWatches(r);
sub->second->sub_state_.updateSubscriptionInterest(converted.added_, converted.removed_);
}
}

kickOffAck(sub->second->sub_state_.handleResponse(*message));
Memory::Utils::tryShrinkHeap();
}
Expand Down Expand Up @@ -112,31 +102,51 @@ void NewGrpcMuxImpl::start() { grpc_stream_.establishNewStream(); }
GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) {
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching) {
auto entry = subscriptions_.find(type_url);
if (entry == subscriptions_.end()) {
// We don't yet have a subscription for type_url! Make one!
addSubscription(type_url);
return addWatch(type_url, resources, callbacks, resource_decoder);
addSubscription(type_url, use_namespace_matching);
return addWatch(type_url, resources, callbacks, resource_decoder, use_namespace_matching);
}

Watch* watch = entry->second->watch_map_.addWatch(callbacks, resource_decoder);
// updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
updateWatch(type_url, watch, resources);
updateWatch(type_url, watch, resources, use_namespace_matching);
return std::make_unique<WatchImpl>(type_url, watch, *this);
}

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources) {
const std::set<std::string>& resources,
bool creating_namespace_watch) {
ASSERT(watch != nullptr);
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, resources);
sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_, added_removed.removed_);
if (creating_namespace_watch) {
// This is to prevent sending out of requests that contain prefixes instead of resource names
sub->second->sub_state_.updateSubscriptionInterest({}, {});
} else {
sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_,
added_removed.removed_);
}
// Tell the server about our change in interest, if any.
if (sub->second->sub_state_.subscriptionUpdatePending()) {
trySendDiscoveryRequests();
}
}

void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) {
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
sub->second->sub_state_.updateSubscriptionInterest(for_update, {});
// Tell the server about our change in interest, if any.
if (sub->second->sub_state_.subscriptionUpdatePending()) {
trySendDiscoveryRequests();
Expand All @@ -151,8 +161,10 @@ void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
entry->second->watch_map_.removeWatch(watch);
}

void NewGrpcMuxImpl::addSubscription(const std::string& type_url) {
subscriptions_.emplace(type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_));
void NewGrpcMuxImpl::addSubscription(const std::string& type_url,
const bool use_namespace_matching) {
subscriptions_.emplace(
type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_, use_namespace_matching));
subscription_ordering_.emplace_back(type_url);
}

Expand Down
16 changes: 11 additions & 5 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class NewGrpcMuxImpl

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) override;
OpaqueResourceDecoder& resource_decoder,
const bool use_namespace_matching = false) override;

void requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) override;

ScopedResume pause(const std::string& type_url) override;
ScopedResume pause(const std::vector<std::string> type_urls) override;
Expand All @@ -60,8 +64,9 @@ class NewGrpcMuxImpl
void start() override;

struct SubscriptionStuff {
SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info)
: sub_state_(type_url, watch_map_, local_info) {}
SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
const bool use_namespace_matching)
: sub_state_(type_url, watch_map_, local_info, use_namespace_matching) {}

WatchMap watch_map_;
DeltaSubscriptionState sub_state_;
Expand Down Expand Up @@ -108,9 +113,10 @@ class NewGrpcMuxImpl
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources);
const std::set<std::string>& resources,
const bool creating_namespace_watch = false);

void addSubscription(const std::string& type_url);
void addSubscription(const std::string& type_url, const bool use_namespace_matching);

void trySendDiscoveryRequests();

Expand Down
Loading