diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index bff52edb33301..c2919374041a4 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -38,6 +38,7 @@ Bug Fixes * config: validate that upgrade configs have a non-empty :ref:`upgrade_type `, fixing a bug where an errant "-" could result in unexpected behavior. * dns: fix a bug where custom resolvers provided in configuration were not preserved after network issues. * dns_filter: correctly associate DNS response IDs when multiple queries are received. +* grpc mux: fix sending node again after stream is reset when ::ref:`set_node_on_first_message_only ` is set. * http: fixed URL parsing for HTTP/1.1 fully qualified URLs and connect requests containing IPv6 addresses. * http: reject requests with missing required headers after filter chain processing. * http: sending CONNECT_ERROR for HTTP/2 where appropriate during CONNECT requests. diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index c45aab8d0f568..2734867446fe1 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -48,8 +48,13 @@ void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) { } } - if (skip_subsequent_node_ && !first_stream_request_) { - request.clear_node(); + if (skip_subsequent_node_) { + if (first_stream_request_) { + // Node may have been cleared during a previous request. + request.mutable_node()->MergeFrom(local_info_.node()); + } else { + request.clear_node(); + } } VersionConverter::prepareMessageForGrpcWire(request, transport_api_version_); ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url, request.DebugString()); diff --git a/test/common/config/grpc_mux_impl_test.cc b/test/common/config/grpc_mux_impl_test.cc index 6544290bc7aa3..f8ffa0329bafc 100644 --- a/test/common/config/grpc_mux_impl_test.cc +++ b/test/common/config/grpc_mux_impl_test.cc @@ -159,18 +159,24 @@ TEST_F(GrpcMuxImplTest, ResetStream) { expectSendMessage("baz", {"z"}, ""); grpc_mux_->start(); + // Send another message for foo so that the node is cleared in the cached request. + // This is to test that the the node is set again in the first message below. + expectSendMessage("foo", {"z", "x", "y"}, ""); + auto foo_z_sub = grpc_mux_->addWatch("foo", {"z"}, callbacks_, resource_decoder_); + EXPECT_CALL(callbacks_, onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _)) - .Times(3); + .Times(4); EXPECT_CALL(random_, random()); EXPECT_CALL(*timer, enableTimer(_, _)); grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Canceled, ""); EXPECT_EQ(0, control_plane_connected_state_.value()); EXPECT_EQ(0, control_plane_pending_requests_.value()); EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); - expectSendMessage("foo", {"x", "y"}, "", true); + expectSendMessage("foo", {"z", "x", "y"}, "", true); expectSendMessage("bar", {}, ""); expectSendMessage("baz", {"z"}, ""); + expectSendMessage("foo", {"x", "y"}, ""); timer->invokeCallback(); expectSendMessage("baz", {}, ""); diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index f0950a8dca314..929f91619a102 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -44,7 +44,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { "envoy.api.v2.EndpointDiscoveryService.StreamEndpoints")), async_client_(new NiceMock()) { node_.set_id("fo0"); - EXPECT_CALL(local_info_, node()).WillOnce(testing::ReturnRef(node_)); + EXPECT_CALL(local_info_, node()).WillRepeatedly(testing::ReturnRef(node_)); ttl_timer_ = new NiceMock(&dispatcher_); timer_ = new Event::MockTimer(&dispatcher_); diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index b178297fe127c..f3ff6f7b3b03f 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -249,6 +249,32 @@ TEST_P(AdsIntegrationTest, Failure) { makeSingleRequest(); } +// Regression test for https://github.com/envoyproxy/envoy/issues/9682. +TEST_P(AdsIntegrationTest, ResendNodeOnStreamReset) { + initialize(); + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true)); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {buildCluster("cluster_0")}, + {buildCluster("cluster_0")}, {}, "1"); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "", + {"cluster_0"}, {"cluster_0"}, {})); + sendDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1"); + + // A second CDS request should be sent so that the node is cleared in the cached request. + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {}, {}, {})); + + xds_stream_->finishGrpcStream(Grpc::Status::Internal); + AssertionResult result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {"cluster_0"}, + {"cluster_0"}, {}, true)); +} + // Validate that xds can support a mix of v2 and v3 type url. TEST_P(AdsIntegrationTest, MixV2V3TypeUrlInDiscoveryResponse) { config_helper_.addRuntimeOverride(