Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ void GrpcMuxImpl::onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResp
resources.emplace(resource_name, resource);
}
for (auto watch : api_state_[type_url].watches_) {
// onConfigUpdate should be called in all cases for single watch xDS (Cluster and Listener)
// even if the message does not have resources so that update_empty stat is properly
// incremented and state-of-the-world semantics are maintained.
if (watch->resources_.empty()) {
watch->callbacks_.onConfigUpdate(message->resources(), message->version_info());
continue;
Expand All @@ -217,7 +220,11 @@ void GrpcMuxImpl::onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResp
found_resources.Add()->MergeFrom(it->second);
}
}
watch->callbacks_.onConfigUpdate(found_resources, message->version_info());
// onConfigUpdate should be called only on watches(clusters/routes) that have updates in the
// message.
if (found_resources.size() > 0) {
watch->callbacks_.onConfigUpdate(found_resources, message->version_info());
}
}
// TODO(mattklein123): In the future if we start tracking per-resource versions, we would do
// that tracking here.
Expand Down
52 changes: 48 additions & 4 deletions test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ TEST_F(GrpcMuxImplTest, WildcardWatch) {
// Validate behavior when watches specify resources (potentially overlapping).
TEST_F(GrpcMuxImplTest, WatchDemux) {
setup();

InSequence s;
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
NiceMock<MockGrpcMuxCallbacks> foo_callbacks;
Expand All @@ -251,9 +250,7 @@ TEST_F(GrpcMuxImplTest, WatchDemux) {
envoy::api::v2::ClusterLoadAssignment load_assignment;
load_assignment.set_cluster_name("x");
response->add_resources()->PackFrom(load_assignment);
EXPECT_CALL(bar_callbacks, onConfigUpdate(_, "1"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make this a .Times(0) to make clear it's not expected.

.WillOnce(Invoke([](const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string&) { EXPECT_TRUE(resources.empty()); }));
EXPECT_CALL(bar_callbacks, onConfigUpdate(_, "1")).Times(0);
EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1"))
.WillOnce(
Invoke([&load_assignment](const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down Expand Up @@ -311,6 +308,53 @@ TEST_F(GrpcMuxImplTest, WatchDemux) {
expectSendMessage(type_url, {}, "2");
}

// Validate behavior when we have multiple watchers that send empty updates.
TEST_F(GrpcMuxImplTest, MultipleWatcherWithEmptyUpdates) {
setup();
InSequence s;
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
NiceMock<MockGrpcMuxCallbacks> foo_callbacks;
auto foo_sub = grpc_mux_->subscribe(type_url, {"x", "y"}, foo_callbacks);

EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {"x", "y"}, "");
grpc_mux_->start();

std::unique_ptr<envoy::api::v2::DiscoveryResponse> response(
new envoy::api::v2::DiscoveryResponse());
response->set_type_url(type_url);
response->set_version_info("1");

EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1")).Times(0);
expectSendMessage(type_url, {"x", "y"}, "1");
grpc_mux_->onReceiveMessage(std::move(response));

expectSendMessage(type_url, {}, "1");
}

// Validate behavior when we have Single Watcher that sends Empty updates.
TEST_F(GrpcMuxImplTest, SingleWatcherWithEmptyUpdates) {
setup();
const std::string& type_url = Config::TypeUrl::get().Cluster;
NiceMock<MockGrpcMuxCallbacks> foo_callbacks;
auto foo_sub = grpc_mux_->subscribe(type_url, {}, foo_callbacks);

EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {}, "");
grpc_mux_->start();

std::unique_ptr<envoy::api::v2::DiscoveryResponse> response(
new envoy::api::v2::DiscoveryResponse());
response->set_type_url(type_url);
response->set_version_info("1");
// Validate that onConfigUpdate is called with empty resources.
EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1"))
.WillOnce(Invoke([](const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string&) { EXPECT_TRUE(resources.empty()); }));
expectSendMessage(type_url, {}, "1");
grpc_mux_->onReceiveMessage(std::move(response));
}

// Verifies that warning messages get logged when Envoy detects too many requests.
TEST_F(GrpcMuxImplTest, TooManyRequests) {
setup();
Expand Down
2 changes: 1 addition & 1 deletion test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ TEST_P(AdsIntegrationTest, XdsBatching) {
{"eds_cluster2", "eds_cluster"}));
sendDiscoveryResponse<envoy::api::v2::ClusterLoadAssignment>(
Config::TypeUrl::get().ClusterLoadAssignment,
{buildClusterLoadAssignment("eds_cluster"), buildClusterLoadAssignment("eds_cluster1")},
{buildClusterLoadAssignment("eds_cluster"), buildClusterLoadAssignment("eds_cluster2")},
"1");

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "",
Expand Down