Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ class GrpcMux {
virtual GrpcMuxWatchPtr addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) PURE;
OpaqueResourceDecoder& resource_decoder,
const bool use_prefix_matching) PURE;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chaoqin-li1123 some variant of this might be useful for glob matching when subscribing to glob collections in the work you are doing. For udpa:// URLs, it's not a prefix on the path component (ignoring context parameters, which should be exact match).

Comment thread
dmitri-d marked this conversation as resolved.
Outdated

virtual void requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) PURE;
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
7 changes: 5 additions & 2 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class UntypedConfigUpdateCallbacks {
virtual void onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) PURE;
const std::string& system_version_info, const bool use_prefix_matching) PURE;

/**
* Called when either the Subscription is unable to fetch a config update or when onConfigUpdate
Expand All @@ -175,14 +175,17 @@ class Subscription {
* to fetch throughout the lifetime of the Subscription object.
* @param resources set of resource names to fetch.
*/
virtual void start(const std::set<std::string>& resource_names) PURE;
virtual void start(const std::set<std::string>& resource_names,
const bool use_prefix_matching = false) PURE;

/**
* Update the resources to fetch.
* @param resources vector of resource names to fetch. It's a (not unordered_)set so that it can
* be passed to std::set_difference, which must be given sorted collections.
*/
virtual void updateResourceInterest(const std::set<std::string>& update_to_these_names) PURE;

virtual void requestOnDemandUpdate(const std::set<std::string>& add_these_names) PURE;
};

using SubscriptionPtr = std::unique_ptr<Subscription>;
Expand Down
2 changes: 2 additions & 0 deletions include/envoy/router/route_config_update_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class RouteConfigUpdateReceiver {
* update.
*/
virtual const std::set<std::string>& resourceIdsInLastVhdsUpdate() PURE;

virtual std::set<std::string> vhdsVhosts() const PURE;
};

using RouteConfigUpdatePtr = std::unique_ptr<RouteConfigUpdateReceiver>;
Expand Down
8 changes: 5 additions & 3 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ namespace Config {

DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info)
: type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info) {}
const LocalInfo::LocalInfo& local_info,
const bool use_prefix_matching)
: type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info),
use_prefix_matching_(use_prefix_matching) {}

void DeltaSubscriptionState::updateSubscriptionInterest(const std::set<std::string>& cur_added,
const std::set<std::string>& cur_removed) {
Expand Down Expand Up @@ -82,7 +84,7 @@ void DeltaSubscriptionState::handleGoodResponse(
}
}
watch_map_.onConfigUpdate(message.resources(), message.removed_resources(),
message.system_version_info());
message.system_version_info(), use_prefix_matching_);
for (const auto& resource : message.resources()) {
setResourceVersion(resource.name(), resource.version());
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/config/delta_subscription_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Config {
class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
public:
DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map,
const LocalInfo::LocalInfo& local_info);
const LocalInfo::LocalInfo& local_info, const bool use_prefix_matching);

// Update which resources we're interested in subscribing to.
void updateSubscriptionInterest(const std::set<std::string>& cur_added,
Expand Down Expand Up @@ -100,6 +100,7 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
// Feel free to change to an unordered container once we figure out how to make it work.
std::set<std::string> names_added_;
std::set<std::string> names_removed_;
const bool use_prefix_matching_;
};

} // namespace Config
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/filesystem_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
}

// Config::Subscription
void FilesystemSubscriptionImpl::start(const std::set<std::string>&) {
void FilesystemSubscriptionImpl::start(const std::set<std::string>&, const bool) {
started_ = true;
// Attempt to read in case there is a file there already.
refresh();
Expand Down
5 changes: 4 additions & 1 deletion source/common/config/filesystem_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ class FilesystemSubscriptionImpl : public Config::Subscription,
// Config::Subscription
// We report all discovered resources in the watched file, so the resource names arguments are
// unused, and updateResourceInterest is a no-op (other than updating a stat).
void start(const std::set<std::string>&) override;
void start(const std::set<std::string>&, const bool use_prefix_matching = false) override;
void updateResourceInterest(const std::set<std::string>&) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

private:
void refresh();
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) {
OpaqueResourceDecoder& resource_decoder, const bool) {
auto watch =
std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url, *this);
ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);
Expand Down
13 changes: 11 additions & 2 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ class GrpcMuxImpl : public GrpcMux,

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) override;
OpaqueResourceDecoder& resource_decoder,
const bool use_prefix_matching = false) override;

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void handleDiscoveryResponse(
std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);
Expand Down Expand Up @@ -159,10 +164,14 @@ class NullGrpcMuxImpl : public GrpcMux,
}

GrpcMuxWatchPtr addWatch(const std::string&, const std::set<std::string>&, SubscriptionCallbacks&,
OpaqueResourceDecoder&) override {
OpaqueResourceDecoder&, const bool) override {
ExceptionUtil::throwEnvoyException("ADS must be configured to support an ADS config source");
}

void requestOnDemandUpdate(const std::string&, const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

void onWriteable() override {}
void onStreamEstablished() override {}
void onEstablishmentFailure() override {}
Expand Down
10 changes: 8 additions & 2 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ GrpcSubscriptionImpl::GrpcSubscriptionImpl(
init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated) {}

// Config::Subscription
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources,
const bool use_prefix_matching) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
Expand All @@ -28,7 +29,7 @@ void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_);
watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, use_prefix_matching);

// The attempt stat here is maintained for the purposes of having consistency between ADS and
// gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
Expand All @@ -48,6 +49,11 @@ void GrpcSubscriptionImpl::updateResourceInterest(
stats_.update_attempt_.inc();
}

void GrpcSubscriptionImpl::requestOnDemandUpdate(const std::set<std::string>& for_update) {
grpc_mux_->requestOnDemandUpdate(type_url_, for_update);
stats_.update_attempt_.inc();
}

// Config::SubscriptionCallbacks
void GrpcSubscriptionImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) {
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ class GrpcSubscriptionImpl : public Subscription,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void start(const std::set<std::string>& resource_names,
const bool use_prefix_matching = false) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;

void requestOnDemandUpdate(const std::set<std::string>& add_these_names) override;
// Config::SubscriptionCallbacks (all pass through to callbacks_!)
void onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) override;
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/http_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl(
}

// Config::Subscription
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names, const bool) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
Expand Down
6 changes: 5 additions & 1 deletion source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
ProtobufMessage::ValidationVisitor& validation_visitor);

// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void start(const std::set<std::string>& resource_names,
const bool use_prefix_matching = false) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}

// Http::RestApiFetcher
void createRequest(Http::RequestMessage& request) override;
Expand Down
48 changes: 30 additions & 18 deletions source/common/config/new_grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,6 @@ void NewGrpcMuxImpl::onDiscoveryResponse(
return;
}

// When an on-demand request is made a Watch is created using an alias, as the resource name isn't
// known at that point. When an update containing aliases comes back, we update Watches with
// resource names.
for (const auto& r : message->resources()) {
if (r.aliases_size() > 0) {
AddedRemoved converted = sub->second->watch_map_.convertAliasWatchesToNameWatches(r);
sub->second->sub_state_.updateSubscriptionInterest(converted.added_, converted.removed_);
}
}

kickOffAck(sub->second->sub_state_.handleResponse(*message));
Memory::Utils::tryShrinkHeap();
}
Expand Down Expand Up @@ -109,34 +99,55 @@ void NewGrpcMuxImpl::kickOffAck(UpdateAck ack) {
// TODO(fredlas) to be removed from the GrpcMux interface very soon.
void NewGrpcMuxImpl::start() { grpc_stream_.establishNewStream(); }

// TODO (dmitri-d) verify that a prefix-matching watch isn't set up empty
GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) {
OpaqueResourceDecoder& resource_decoder,
const bool use_prefix_matching) {
auto entry = subscriptions_.find(type_url);
if (entry == subscriptions_.end()) {
// We don't yet have a subscription for type_url! Make one!
addSubscription(type_url);
return addWatch(type_url, resources, callbacks, resource_decoder);
addSubscription(type_url, use_prefix_matching);
return addWatch(type_url, resources, callbacks, resource_decoder, use_prefix_matching);
}

Watch* watch = entry->second->watch_map_.addWatch(callbacks, resource_decoder);
// updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
updateWatch(type_url, watch, resources);
updateWatch(type_url, watch, resources, use_prefix_matching);
return std::make_unique<WatchImpl>(type_url, watch, *this);
}

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources) {
const std::set<std::string>& resources,
bool creating_prefix_watch) {
ASSERT(watch != nullptr);
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, resources);
sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_, added_removed.removed_);
if (creating_prefix_watch) {
// THis is to prevent sending out of requests that contain prefixes instead of resource names
sub->second->sub_state_.updateSubscriptionInterest({}, {});
} else {
sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_,
added_removed.removed_);
}
// Tell the server about our change in interest, if any.
if (sub->second->sub_state_.subscriptionUpdatePending()) {
trySendDiscoveryRequests();
}
}

void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) {
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Watch of {} has no subscription to update.", type_url));
sub->second->sub_state_.updateSubscriptionInterest(for_update, {});
// Tell the server about our change in interest, if any.
if (sub->second->sub_state_.subscriptionUpdatePending()) {
trySendDiscoveryRequests();
Expand All @@ -151,8 +162,9 @@ void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
entry->second->watch_map_.removeWatch(watch);
}

void NewGrpcMuxImpl::addSubscription(const std::string& type_url) {
subscriptions_.emplace(type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_));
void NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use_prefix_matching) {
subscriptions_.emplace(
type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_, use_prefix_matching));
subscription_ordering_.emplace_back(type_url);
}

Expand Down
16 changes: 11 additions & 5 deletions source/common/config/new_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ class NewGrpcMuxImpl

GrpcMuxWatchPtr addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) override;
OpaqueResourceDecoder& resource_decoder,
const bool use_prefix_matching = false) override;

void requestOnDemandUpdate(const std::string& type_url,
const std::set<std::string>& for_update) override;

ScopedResume pause(const std::string& type_url) override;
ScopedResume pause(const std::vector<std::string> type_urls) override;
Expand All @@ -60,8 +64,9 @@ class NewGrpcMuxImpl
void start() override;

struct SubscriptionStuff {
SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info)
: sub_state_(type_url, watch_map_, local_info) {}
SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
const bool use_prefix_matching)
: sub_state_(type_url, watch_map_, local_info, use_prefix_matching) {}

WatchMap watch_map_;
DeltaSubscriptionState sub_state_;
Expand Down Expand Up @@ -108,9 +113,10 @@ class NewGrpcMuxImpl
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources);
const std::set<std::string>& resources,
const bool creating_prefix_watch = false);

void addSubscription(const std::string& type_url);
void addSubscription(const std::string& type_url, const bool use_prefix_matching);

void trySendDiscoveryRequests();

Expand Down
Loading