Skip to content
30 changes: 24 additions & 6 deletions source/common/config/watch_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,17 @@ absl::flat_hash_set<Watch*> WatchMap::watchesInterestedIn(const std::string& res
void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) {
if (watches_.empty()) {
ENVOY_LOG(warn, "WatchMap::onConfigUpdate: there are no watches!");
return;
if (resources.empty()) {
Comment thread
fredlas marked this conversation as resolved.
Outdated
// We have no watches, and the update contained no resources. This can happen when Envoy
// unregisters from a resource that's removed from the server as well. For example,
// a deleted cluster triggers un-watching the ClusterLoadAssignment watch, and at the
// same time the xDS server sends an empty list of ClusterLoadAssignment resources.
return;
} else {
// We have no watches, but the update contained resources. This should not happen.
ENVOY_LOG(warn, "Rejecting non-empty update for unwatched type URL");
Comment thread
fredlas marked this conversation as resolved.
Outdated
throw EnvoyException("Rejecting non-empty update for unwatched type URL");
}
}
SubscriptionCallbacks& name_getter = (*watches_.begin())->callbacks_;

Expand All @@ -71,16 +80,25 @@ void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>
}
}

bool map_is_single_wildcard = (watches_.size() == 1 && wildcard_watches_.size() == 1);
Comment thread
fredlas marked this conversation as resolved.
Outdated
// We just bundled up the updates into nice per-watch packages. Now, deliver them.
for (auto& watch : watches_) {
const auto this_watch_updates = per_watch_updates.find(watch);
if (this_watch_updates == per_watch_updates.end()) {
// This update included no resources this watch cares about - so we do an empty
// onConfigUpdate(), to notify the watch that its resources - if they existed before this -
// were dropped.
watch->callbacks_.onConfigUpdate({}, version_info);
// This update included no resources this watch cares about.
// 1) If there is only a single, wildcard watch (i.e. Cluster or Listener), always call
// its onConfigUpdate even if just a no-op, to properly maintain state-of-the-world
// semantics and the update_empty stat.
// 2) If this watch previously had some resources, it means this update is removing all
// of this watch's resources, so the watch must be informed with an onConfigUpdate.
// 3) Otherwise, we can skip onConfigUpdate for this watch.
if (map_is_single_wildcard || !watch->state_of_the_world_empty_) {
watch->callbacks_.onConfigUpdate({}, version_info);
}
watch->state_of_the_world_empty_ = true;
} else {
watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info);
watch->state_of_the_world_empty_ = false;
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions source/common/config/watch_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ struct Watch {
Watch(SubscriptionCallbacks& callbacks) : callbacks_(callbacks) {}
SubscriptionCallbacks& callbacks_;
std::set<std::string> resource_names_; // must be sorted set, for set_difference.
// Needed only for state-of-the-world.
// Whether the most recent update contained any resources this watch cares about.
// If true, a new update that also contains no resources can skip this watch.
bool state_of_the_world_empty_{true};
};

// NOTE: Users are responsible for eventually calling removeWatch() on the Watch* returned
Expand Down
71 changes: 52 additions & 19 deletions test/common/config/watch_map_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ void expectDeltaAndSotwUpdate(
}));
}

// Sometimes we want to verify that a delta onConfigUpdate simply doesn't happen. However, for SotW,
// every update triggers all onConfigUpdate()s, so we should still expect empty calls for that.
void expectNoDeltaUpdate(NamedMockSubscriptionCallbacks& callbacks, const std::string& version) {
void expectNoUpdate(NamedMockSubscriptionCallbacks& callbacks, const std::string& version) {
EXPECT_CALL(callbacks, onConfigUpdate(_, version)).Times(0);
EXPECT_CALL(callbacks, onConfigUpdate(_, _, version)).Times(0);
}

void expectEmptySotwNoDeltaUpdate(NamedMockSubscriptionCallbacks& callbacks,
const std::string& version) {
EXPECT_CALL(callbacks, onConfigUpdate(_, version))
.WillOnce(Invoke([](const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& gotten_resources,
const std::string&) { EXPECT_EQ(0, gotten_resources.size()); }));
EXPECT_CALL(callbacks, onConfigUpdate(_, _, _)).Times(0);
const std::string&) { EXPECT_EQ(gotten_resources.size(), 0); }));
EXPECT_CALL(callbacks, onConfigUpdate(_, _, version)).Times(0);
}

Protobuf::RepeatedPtrField<envoy::api::v2::Resource>
Expand Down Expand Up @@ -105,7 +109,7 @@ void doDeltaAndSotwUpdate(SubscriptionCallbacks& watch_map,
for (const auto& n : removed_names) {
*removed_names_proto.Add() = n;
}
watch_map.onConfigUpdate(delta_resources, removed_names_proto, "version1");
watch_map.onConfigUpdate(delta_resources, removed_names_proto, version);
}

// Tests the simple case of a single watch. Checks that the watch will not be told of updates to
Expand Down Expand Up @@ -195,9 +199,9 @@ TEST(WatchMapTest, Overlap) {
EXPECT_TRUE(added_removed.removed_.empty());
watch_map.updateWatchInterest(watch2, {"dummy"});

// First watch receives update.
// *Only* first watch receives update.
expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version1");
expectNoDeltaUpdate(callbacks2, "version1");
expectNoUpdate(callbacks2, "version1");
doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version1");
}
// Second watch becomes interested.
Expand All @@ -218,9 +222,13 @@ TEST(WatchMapTest, Overlap) {
EXPECT_TRUE(added_removed.added_.empty()); // nothing happens
EXPECT_TRUE(added_removed.removed_.empty());

// *Only* second watch receives update.
expectNoDeltaUpdate(callbacks1, "version3");
// Both watches receive the update. For watch2, this is obviously desired.
expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version3");
// For watch1, it's more subtle: the WatchMap sees that this update has no
// resources watch1 cares about, but also knows that watch1 previously had
// some resources. So, it must inform watch1 that it now has no resources.
// (SotW only: delta's explicit removals avoid the need for this guessing.)
expectEmptySotwNoDeltaUpdate(callbacks1, "version3");
doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version3");
}
// Second watch loses interest.
Expand Down Expand Up @@ -257,9 +265,9 @@ TEST(WatchMapTest, AddRemoveAdd) {
EXPECT_TRUE(added_removed.removed_.empty());
watch_map.updateWatchInterest(watch2, {"dummy"});

// First watch receives update.
// *Only* first watch receives update.
expectDeltaAndSotwUpdate(callbacks1, {alice}, {}, "version1");
expectNoDeltaUpdate(callbacks2, "version1");
expectNoUpdate(callbacks2, "version1");
doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version1");
}
// First watch loses interest.
Expand All @@ -278,9 +286,13 @@ TEST(WatchMapTest, AddRemoveAdd) {
EXPECT_EQ(std::set<std::string>({"alice"}), added_removed.added_); // add to subscription
EXPECT_TRUE(added_removed.removed_.empty());

// *Only* second watch receives update.
expectNoDeltaUpdate(callbacks1, "version2");
// Both watches receive the update. For watch2, this is obviously desired.
expectDeltaAndSotwUpdate(callbacks2, {alice}, {}, "version2");
// For watch1, it's more subtle: the WatchMap sees that this update has no
// resources watch1 cares about, but also knows that watch1 previously had
// some resources. So, it must inform watch1 that it now has no resources.
// (SotW only: delta's explicit removals avoid the need for this guessing.)
expectEmptySotwNoDeltaUpdate(callbacks1, "version2");
doDeltaAndSotwUpdate(watch_map, updated_resources, {}, "version2");
}
}
Expand All @@ -302,21 +314,29 @@ TEST(WatchMapTest, UninterestingUpdate) {
bob.set_cluster_name("bob");
bob_update.Add()->PackFrom(bob);

expectNoDeltaUpdate(callbacks, "version1");
// We are watching for alice, and an update for just bob arrives. It should be ignored.
expectNoUpdate(callbacks, "version1");
doDeltaAndSotwUpdate(watch_map, bob_update, {}, "version1");
::testing::Mock::VerifyAndClearExpectations(&callbacks);

// The server sends an update adding alice and removing bob. We pay attention only to alice.
expectDeltaAndSotwUpdate(callbacks, {alice}, {}, "version2");
doDeltaAndSotwUpdate(watch_map, alice_update, {}, "version2");
::testing::Mock::VerifyAndClearExpectations(&callbacks);

expectNoDeltaUpdate(callbacks, "version3");
doDeltaAndSotwUpdate(watch_map, bob_update, {}, "version3");
// The server sends an update removing alice and adding bob. We pay attention only to alice.
expectDeltaAndSotwUpdate(callbacks, {}, {"alice"}, "version3");
doDeltaAndSotwUpdate(watch_map, bob_update, {"alice"}, "version3");
::testing::Mock::VerifyAndClearExpectations(&callbacks);

// Clean removal of the watch: first update to "interested in nothing", then remove.
watch_map.updateWatchInterest(watch, {});
watch_map.removeWatch(watch);

// Finally, test that calling onConfigUpdate on a map with no watches doesn't break.
doDeltaAndSotwUpdate(watch_map, bob_update, {}, "version4");
// Finally, test that calling onConfigUpdate on a map with no watches (which should not
// happen) throws an exception.
EXPECT_THROW_WITH_MESSAGE(doDeltaAndSotwUpdate(watch_map, bob_update, {}, "version4"),
EnvoyException, "Rejecting non-empty update for unwatched type URL");
}

// Tests that a watch that specifies no particular resource interest is treated as interested in
Expand Down Expand Up @@ -366,6 +386,19 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) {
watch_map.updateWatchInterest(watch2, {"updated", "removed"});
watch_map.updateWatchInterest(watch3, {"removed"});

// First, create the "removed" resource. We want to test SotW being handed an empty
// onConfigUpdate. But, if SotW holds no resources, then an update with nothing it cares about
// will just not trigger any onConfigUpdate at all.
{
Protobuf::RepeatedPtrField<ProtobufWkt::Any> prepare_removed;
envoy::api::v2::ClusterLoadAssignment will_be_removed_later;
will_be_removed_later.set_cluster_name("removed");
prepare_removed.Add()->PackFrom(will_be_removed_later);
expectDeltaAndSotwUpdate(callbacks2, {will_be_removed_later}, {}, "version0");
expectDeltaAndSotwUpdate(callbacks3, {will_be_removed_later}, {}, "version0");
doDeltaAndSotwUpdate(watch_map, prepare_removed, {}, "version0");
}

Protobuf::RepeatedPtrField<ProtobufWkt::Any> update;
envoy::api::v2::ClusterLoadAssignment updated;
updated.set_cluster_name("updated");
Expand Down
1 change: 1 addition & 0 deletions tools/spelling_dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ accessors
acls
addr
agg
alice
alignas
alignof
alloc
Expand Down