Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ void GrpcMuxImpl::onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResp
resources.emplace(resource_name, resource);
}
for (auto watch : api_state_[type_url].watches_) {
// onConfigUpdate should be called in all cases for single watch xDS (Cluster and Listener)
// even if the message does not have resources so that update_empty stat is properly
// incremented and state-of-the-world semantics are maintained.
if (watch->resources_.empty()) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@htuch single watch xDS case is already handled here. So just added a comment to be clear. Also added a test which validates that.

watch->callbacks_.onConfigUpdate(message->resources(), message->version_info());
continue;
Expand All @@ -217,7 +220,11 @@ void GrpcMuxImpl::onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResp
found_resources.Add()->MergeFrom(it->second);
}
}
watch->callbacks_.onConfigUpdate(found_resources, message->version_info());
// onConfigUpdate should be called only on watches(clusters/routes) that have updates in the
// message.
if (found_resources.size() > 0) {
watch->callbacks_.onConfigUpdate(found_resources, message->version_info());
}
}
// TODO(mattklein123): In the future if we start tracking per-resource versions, we would do
// that tracking here.
Expand Down
56 changes: 52 additions & 4 deletions test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ TEST_F(GrpcMuxImplTest, WildcardWatch) {
// Validate behavior when watches specify resources (potentially overlapping).
TEST_F(GrpcMuxImplTest, WatchDemux) {
setup();

InSequence s;
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
NiceMock<MockGrpcMuxCallbacks> foo_callbacks;
Expand All @@ -251,9 +250,7 @@ TEST_F(GrpcMuxImplTest, WatchDemux) {
envoy::api::v2::ClusterLoadAssignment load_assignment;
load_assignment.set_cluster_name("x");
response->add_resources()->PackFrom(load_assignment);
EXPECT_CALL(bar_callbacks, onConfigUpdate(_, "1"))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make this a .Times(0) to make clear it's not expected.

.WillOnce(Invoke([](const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string&) { EXPECT_TRUE(resources.empty()); }));
EXPECT_CALL(bar_callbacks, onConfigUpdate(_, "1")).Times(0);
EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1"))
.WillOnce(
Invoke([&load_assignment](const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down Expand Up @@ -311,6 +308,57 @@ TEST_F(GrpcMuxImplTest, WatchDemux) {
expectSendMessage(type_url, {}, "2");
}

// Validate behavior when we have multiple watchers that send empty updates.
TEST_F(GrpcMuxImplTest, MultipleWatcherWithEmptyUpdates) {
setup();
InSequence s;
const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
NiceMock<MockGrpcMuxCallbacks> foo_callbacks;
auto foo_sub = grpc_mux_->subscribe(type_url, {"x", "y"}, foo_callbacks);

EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {"x", "y"}, "");
grpc_mux_->start();

{

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for these block constructs? They are used elsewhere in the file for forcing subscription objects to be destroyed, do we need them in these new tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no specific reason other than follow the same pattern of having subscription in a block construct. Will remove them

std::unique_ptr<envoy::api::v2::DiscoveryResponse> response(
new envoy::api::v2::DiscoveryResponse());
response->set_type_url(type_url);
response->set_version_info("1");

EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1")).Times(0);
expectSendMessage(type_url, {"x", "y"}, "1");
grpc_mux_->onReceiveMessage(std::move(response));
}

expectSendMessage(type_url, {}, "1");
}

// Validate behavior when we have Single Watcher that sends Empty updates.
TEST_F(GrpcMuxImplTest, SingleWatcherWithEmptyUpdates) {
setup();
const std::string& type_url = Config::TypeUrl::get().Cluster;
NiceMock<MockGrpcMuxCallbacks> foo_callbacks;
auto foo_sub = grpc_mux_->subscribe(type_url, {}, foo_callbacks);

EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_));
expectSendMessage(type_url, {}, "");
grpc_mux_->start();

{
std::unique_ptr<envoy::api::v2::DiscoveryResponse> response(
new envoy::api::v2::DiscoveryResponse());
response->set_type_url(type_url);
response->set_version_info("1");
// Validate that onConfigUpdate is called with empty resources.
EXPECT_CALL(foo_callbacks, onConfigUpdate(_, "1"))
.WillOnce(Invoke([](const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string&) { EXPECT_TRUE(resources.empty()); }));
expectSendMessage(type_url, {}, "1");
grpc_mux_->onReceiveMessage(std::move(response));
}
}

// Verifies that warning messages get logged when Envoy detects too many requests.
TEST_F(GrpcMuxImplTest, TooManyRequests) {
setup();
Expand Down
2 changes: 1 addition & 1 deletion test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ TEST_P(AdsIntegrationTest, XdsBatching) {
{"eds_cluster2", "eds_cluster"}));
sendDiscoveryResponse<envoy::api::v2::ClusterLoadAssignment>(
Config::TypeUrl::get().ClusterLoadAssignment,
{buildClusterLoadAssignment("eds_cluster"), buildClusterLoadAssignment("eds_cluster1")},
{buildClusterLoadAssignment("eds_cluster"), buildClusterLoadAssignment("eds_cluster2")},
"1");

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "",
Expand Down