Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2ff7a0d
initial WatchMap snapshot
fredlas May 29, 2019
502e3b7
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas May 29, 2019
3816de1
cleanup and comments
fredlas May 29, 2019
2d7a247
compiles and a basic test passes
fredlas May 29, 2019
d823320
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas May 29, 2019
d30e229
add two more tests, and fix a bug, thanks unit testing
fredlas May 29, 2019
2ce83f4
test delta too
fredlas May 30, 2019
df14958
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas May 30, 2019
33aff0f
tiny rearrangement
fredlas May 30, 2019
67dcc67
spellcheck
fredlas May 30, 2019
2ac1518
unmock resourceName to enable switching off of NiceMock
fredlas May 30, 2019
fdb0a18
add a test, fix another bug, thanks again unit testing
fredlas May 30, 2019
21f428a
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas May 30, 2019
38f13cc
support watches that want to watch everything by providing no names
fredlas Jun 6, 2019
b8b7b08
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas Jun 6, 2019
eb0f380
fix compile after merge
fredlas Jun 6, 2019
00f7f55
snapshot
fredlas Jun 7, 2019
943949a
change std pair to AddedRemoved
fredlas Jun 7, 2019
c61a4aa
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas Jun 7, 2019
5582614
remove virtual
fredlas Jun 14, 2019
3ac614e
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas Jun 14, 2019
a8d53c2
add PNG diagram of intended usage of WatchMap
fredlas Jun 17, 2019
e571c53
add interface, move WatchMap to WatchMapImpl
fredlas Jun 17, 2019
db4fd7a
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas Jun 17, 2019
3ce07f4
privatize SubscriptionCallbacks
fredlas Jun 17, 2019
34dd5ce
move AddedRemoved, move png
fredlas Jul 1, 2019
885bc2c
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas Jul 1, 2019
ea4a01f
private friend Watch
fredlas Jul 9, 2019
37ffff6
clang tidy
fredlas Jul 9, 2019
4900eb9
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas Jul 9, 2019
c238f1f
change Watch to class and interface
fredlas Jul 15, 2019
46e3fa9
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas Jul 15, 2019
8010950
add another word to the dictionary
fredlas Jul 15, 2019
8df252b
back to earlier design
fredlas Jul 31, 2019
76bd613
merge conflict, realphabetize BUILD
fredlas Jul 31, 2019
4dba42d
clang tidy
fredlas Aug 7, 2019
692e982
Merge remote-tracking branch 'upstream/master' into WAT_watch
fredlas Aug 7, 2019
8a4ae35
merge conflict
fredlas Aug 7, 2019
064fe75
merge conflict
fredlas Aug 7, 2019
3a1d2ba
snapshot
fredlas Aug 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct ControlPlaneStats {
ALL_CONTROL_PLANE_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

// TODO(fredlas) redundant to SubscriptionCallbacks; remove this one.
class GrpcMuxCallbacks {
public:
virtual ~GrpcMuxCallbacks() {}
Expand Down
12 changes: 12 additions & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,15 @@ envoy_cc_library(
"//source/common/protobuf",
],
)

envoy_cc_library(
name = "watch_map_lib",
srcs = ["watch_map.cc"],
hdrs = ["watch_map.h"],
deps = [
"//include/envoy/config:subscription_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/protobuf",
],
)
171 changes: 171 additions & 0 deletions source/common/config/watch_map.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#include "common/config/watch_map.h"

namespace Envoy {
namespace Config {

WatchPtr WatchMap::addWatch(SubscriptionCallbacks& callbacks) {
auto watch = std::make_unique<Watch>(*this, callbacks);
wildcard_watches_.insert(watch.get());
watches_.insert(watch.get());
return watch;
}

void WatchMap::removeWatch(Watch* watch) {
wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
watches_.erase(watch);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert it's in at least one of these?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say no; removeWatch is purely for cleanup. Weird as it may be for that to happen, it doesn't mean anything is wrong with the WatchMap.

}

AddedRemoved WatchMap::updateWatchInterest(Watch* watch,
const std::set<std::string>& update_to_these_names) {
if (update_to_these_names.empty()) {
wildcard_watches_.insert(watch);
} else {
wildcard_watches_.erase(watch);
}

std::vector<std::string> newly_added_to_watch;
std::set_difference(update_to_these_names.begin(), update_to_these_names.end(),
watch->resource_names_.begin(), watch->resource_names_.end(),
std::inserter(newly_added_to_watch, newly_added_to_watch.begin()));

std::vector<std::string> newly_removed_from_watch;
std::set_difference(watch->resource_names_.begin(), watch->resource_names_.end(),
update_to_these_names.begin(), update_to_these_names.end(),
std::inserter(newly_removed_from_watch, newly_removed_from_watch.begin()));

watch->resource_names_ = update_to_these_names;

return AddedRemoved(findAdditions(newly_added_to_watch, watch),
findRemovals(newly_removed_from_watch, watch));
}

absl::flat_hash_set<Watch*> WatchMap::watchesInterestedIn(const std::string& resource_name) {
Comment thread
fredlas marked this conversation as resolved.
// Note that std::set_union needs sorted sets. Better to do it ourselves with insert().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just use an ordered set? std::less gives a total ordering of pointers in C++14.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but then updates would be more expensive; watch_interest_ would need to hold ordered sets. Removed that comment; it actually took me a minute to figure out what it was even saying!

absl::flat_hash_set<Watch*> ret = wildcard_watches_;
auto watches_interested = watch_interest_.find(resource_name);
if (watches_interested != watch_interest_.end()) {
for (const auto& watch : watches_interested->second) {
ret.insert(watch);
}
}
return ret;
}

void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) {
if (watches_.empty()) {
ENVOY_LOG(warn, "WatchMap::onConfigUpdate: there are no watches!");
return;
}
SubscriptionCallbacks& name_getter = (*watches_.begin())->callbacks_;

// Build a map from watches, to the set of updated resources that each watch cares about. Each
// entry in the map is then a nice little bundle that can be fed directly into the individual
// onConfigUpdate()s.
absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<ProtobufWkt::Any>> per_watch_updates;
for (const auto& r : resources) {
const absl::flat_hash_set<Watch*>& interested_in_r =
watchesInterestedIn(name_getter.resourceName(r));
for (const auto& interested_watch : interested_in_r) {
per_watch_updates[interested_watch].Add()->CopyFrom(r);
}
}

// We just bundled up the updates into nice per-watch packages. Now, deliver them.
for (auto& watch : watches_) {
auto this_watch_updates = per_watch_updates.find(watch);
if (this_watch_updates == per_watch_updates.end()) {
// This update included no resources this watch cares about - so we do an empty
// onConfigUpdate(), to notify the watch that its resources - if they existed before this -
// were dropped.
watch->callbacks_.onConfigUpdate({}, version_info);
} else {
watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info);
}
}
}

void WatchMap::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) {
// Build a pair of maps: from watches, to the set of resources {added,removed} that each watch
// cares about. Each entry in the map-pair is then a nice little bundle that can be fed directly
// into the individual onConfigUpdate()s.
absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<envoy::api::v2::Resource>> per_watch_added;
for (const auto& r : added_resources) {
const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r.name());
for (const auto& interested_watch : interested_in_r) {
per_watch_added[interested_watch].Add()->CopyFrom(r);
}
}
absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<std::string>> per_watch_removed;
for (const auto& r : removed_resources) {
const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r);
for (const auto& interested_watch : interested_in_r) {
*per_watch_removed[interested_watch].Add() = r;
}
}

// We just bundled up the updates into nice per-watch packages. Now, deliver them.
for (const auto& added : per_watch_added) {
const Watch* cur_watch = added.first;
auto removed = per_watch_removed.find(cur_watch);
if (removed == per_watch_removed.end()) {
// additions only, no removals
cur_watch->callbacks_.onConfigUpdate(added.second, {}, system_version_info);
} else {
// both additions and removals
cur_watch->callbacks_.onConfigUpdate(added.second, removed->second, system_version_info);
// Drop the removals now, so the final removals-only pass won't use them.
per_watch_removed.erase(removed);
}
}
// Any removals-only updates will not have been picked up in the per_watch_added loop.
for (auto& removed : per_watch_removed) {
removed.first->callbacks_.onConfigUpdate({}, removed.second, system_version_info);
Comment thread
fredlas marked this conversation as resolved.
}
}

void WatchMap::onConfigUpdateFailed(const EnvoyException* e) {
for (auto& watch : watches_) {
watch->callbacks_.onConfigUpdateFailed(e);
}
}

std::set<std::string> WatchMap::findAdditions(const std::vector<std::string>& newly_added_to_watch,
Watch* watch) {
std::set<std::string> newly_added_to_subscription;
for (const auto& name : newly_added_to_watch) {
auto entry = watch_interest_.find(name);
if (entry == watch_interest_.end()) {
newly_added_to_subscription.insert(name);
watch_interest_[name] = {watch};
Comment thread
fredlas marked this conversation as resolved.
} else {
entry->second.insert(watch);
}
}
return newly_added_to_subscription;
}

std::set<std::string>
WatchMap::findRemovals(const std::vector<std::string>& newly_removed_from_watch, Watch* watch) {
std::set<std::string> newly_removed_from_subscription;
for (const auto& name : newly_removed_from_watch) {
auto entry = watch_interest_.find(name);
if (entry == watch_interest_.end()) {
ENVOY_LOG(warn, "WatchMap: tried to remove a watch from untracked resource {}", name);
continue;
}

entry->second.erase(watch);
if (entry->second.empty()) {
watch_interest_.erase(entry);
newly_removed_from_subscription.insert(name);
}
}
return newly_removed_from_subscription;
}

} // namespace Config
} // namespace Envoy
121 changes: 121 additions & 0 deletions source/common/config/watch_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#pragma once

#include <set>
#include <string>
#include <utility>

#include "envoy/config/subscription.h"

#include "common/common/assert.h"
#include "common/common/logger.h"

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"

namespace Envoy {
namespace Config {

struct Watch;
using WatchPtr = std::unique_ptr<Watch>;

struct AddedRemoved {
AddedRemoved(std::set<std::string> added, std::set<std::string> removed)
: added_(added), removed_(removed) {}
std::set<std::string> added_;
std::set<std::string> removed_;
};

// Manages "watches" of xDS resources. Several xDS callers might ask for a subscription to the same
// resource name "X". The xDS machinery must return to each their very own subscription to X.
// The xDS machinery's "watch" concept accomplishes that, while avoiding parallel redundant xDS
// requests for X. Each of those subscriptions is viewed as a "watch" on X, while behind the scenes
// there is just a single real subscription to that resource name.
//
// This class maintains the watches<-->subscription mapping: it
// 1) delivers updates to all interested watches, and
// 2) tracks which resource names should be {added to,removed from} the subscription when the
// {first,last} watch on a resource name is {added,removed}.
//
// #1 is accomplished by WatchMap's implementation of the SubscriptionCallbacks interface.
// This interface allows the xDS client to just throw each xDS update message it receives directly
// into WatchMap::onConfigUpdate, rather than having to track the various watches' callbacks.
//
// The information for #2 is returned by updateWatchInterest(); the caller should use it to
// update the subscription accordingly.
//
// A WatchMap is assumed to be dedicated to a single type_url type of resource (EDS, CDS, etc).
class WatchMap : public SubscriptionCallbacks, public Logger::Loggable<Logger::Id::config> {
Comment thread
fredlas marked this conversation as resolved.
public:
WatchMap() {}
Comment thread
fredlas marked this conversation as resolved.
Outdated

// Adds 'callbacks' to the WatchMap, with no resource names being watched.
// (Use updateWatchInterest() to add some names).
// Returns the newly added watch, to be used for updateWatchInterest. Destroy to remove from map.
WatchPtr addWatch(SubscriptionCallbacks& callbacks);

// Updates the set of resource names that the given watch should watch.
// Returns any resource name additions/removals that are unique across all watches. That is:
// 1) if 'resources' contains X and no other watch cares about X, X will be in added_.
// 2) if 'resources' does not contain Y, and this watch was the only one that cared about Y,
Comment thread
fredlas marked this conversation as resolved.
// Y will be in removed_.
AddedRemoved updateWatchInterest(Watch* watch,
Comment thread
fredlas marked this conversation as resolved.
const std::set<std::string>& update_to_these_names);
Comment thread
fredlas marked this conversation as resolved.

// SubscriptionCallbacks
virtual void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Comment thread
fredlas marked this conversation as resolved.
Outdated
const std::string& version_info) override;
virtual void
Comment thread
fredlas marked this conversation as resolved.
Outdated
onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) override;

virtual void onConfigUpdateFailed(const EnvoyException* e) override;

virtual std::string resourceName(const ProtobufWkt::Any&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Comment thread
fredlas marked this conversation as resolved.
Outdated
}

private:
friend struct Watch;
// Expects that the watch to be removed has already had all of its resource names removed via
// updateWatchInterest().
void removeWatch(Watch* watch);

// Given a list of names that are new to an individual watch, returns those names that are in fact
// new to the entire subscription.
std::set<std::string> findAdditions(const std::vector<std::string>& newly_added_to_watch,
Watch* watch);

// Given a list of names that an individual watch no longer cares about, returns those names that
// in fact the entire subscription no longer cares about.
std::set<std::string> findRemovals(const std::vector<std::string>& newly_removed_from_watch,
Watch* watch);

// Returns the union of watch_interest_[resource_name] and wildcard_watches_.
absl::flat_hash_set<Watch*> watchesInterestedIn(const std::string& resource_name);

absl::flat_hash_set<Watch*> watches_;

// Watches whose interest set is currently empty, which is interpreted as "everything".
absl::flat_hash_set<Watch*> wildcard_watches_;

// Maps a resource name to the set of watches interested in that resource. Has two purposes:
// 1) Acts as a reference count; no watches care anymore ==> the resource can be removed.
// 2) Enables efficient lookup of all interested watches when a resource has been updated.
absl::flat_hash_map<std::string, absl::flat_hash_set<Watch*>> watch_interest_;

WatchMap(const WatchMap&) = delete;
WatchMap& operator=(const WatchMap&) = delete;
};

struct Watch {
Watch(WatchMap& owning_map, SubscriptionCallbacks& callbacks)
: owning_map_(owning_map), callbacks_(callbacks) {}
~Watch() { owning_map_.removeWatch(this); }
Comment thread
fredlas marked this conversation as resolved.
Outdated
WatchMap& owning_map_;
SubscriptionCallbacks& callbacks_;
std::set<std::string> resource_names_; // must be sorted set, for set_difference.
};

} // namespace Config
} // namespace Envoy
11 changes: 11 additions & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,17 @@ envoy_cc_test(
],
)

envoy_cc_test(
Comment thread
fredlas marked this conversation as resolved.
name = "watch_map_test",
srcs = ["watch_map_test.cc"],
deps = [
"//source/common/config:watch_map_lib",
"//test/mocks/config:config_mocks",
"//test/test_common:utility_lib",
"@envoy_api//envoy/api/v2:eds_cc",
],
)

envoy_cc_test(
name = "filter_json_test",
srcs = ["filter_json_test.cc"],
Expand Down
Loading