From e7ac05caab914c50c47c3a436999d6cde10e6703 Mon Sep 17 00:00:00 2001 From: Venil Noronha Date: Fri, 17 Apr 2020 06:48:03 -0700 Subject: [PATCH 1/5] xds: reset ApiState during stream termination Currently, when a stream is terminated, an `ApiState` that's in the paused state can remain there indefinitely. This is because `resume` is never called for that `type_url`. If the stream is reestablished, `sendDiscoveryRequest` will set that `ApiState` as pending and no `DiscoveryRequest`s are ever sent out for that type. The fix here is to explicitly reset the `ApiState` for such types during `onEstablishmentFailure` so that `sendDiscoveryRequest` can correctly start sending requests for that type again once the stream is reestablished. Signed-off-by: Venil Noronha --- source/common/config/grpc_mux_impl.cc | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 1e495a2a7f94c..572071050c571 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -216,7 +216,16 @@ void GrpcMuxImpl::onStreamEstablished() { } void GrpcMuxImpl::onEstablishmentFailure() { - for (const auto& api_state : api_state_) { + for (auto& api_state : api_state_) { + if (api_state.second.pending_) { + ENVOY_LOG(trace, "API {} pending during onEstablishmentFailure(), unsetting pending.", api_state.first); + api_state.second.pending_ = false; + } + if (api_state.second.paused_) { + ENVOY_LOG(trace, "API {} paused during onEstablishmentFailure(), unpausing.", api_state.first); + api_state.second.paused_ = false; + } + for (auto watch : api_state.second.watches_) { watch->callbacks_.onConfigUpdateFailed( Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr); @@ -244,4 +253,4 @@ void GrpcMuxImpl::drainRequests() { } } // namespace Config -} // namespace Envoy \ No newline at end of file +} // namespace Envoy From 503ac031fa510bc95d147ba098ca270d76ef29c4 Mon Sep 17 00:00:00 2001 From: Venil Noronha Date: Fri, 17 Apr 2020 07:37:30 -0700 Subject: [PATCH 2/5] Fix format Signed-off-by: Venil Noronha --- source/common/config/grpc_mux_impl.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 572071050c571..5a477bfe94d78 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -218,11 +218,13 @@ void GrpcMuxImpl::onStreamEstablished() { void GrpcMuxImpl::onEstablishmentFailure() { for (auto& api_state : api_state_) { if (api_state.second.pending_) { - ENVOY_LOG(trace, "API {} pending during onEstablishmentFailure(), unsetting pending.", api_state.first); + ENVOY_LOG(trace, "API {} pending during onEstablishmentFailure(), unsetting pending.", + api_state.first); api_state.second.pending_ = false; } if (api_state.second.paused_) { - ENVOY_LOG(trace, "API {} paused during onEstablishmentFailure(), unpausing.", api_state.first); + ENVOY_LOG(trace, "API {} paused during onEstablishmentFailure(), unpausing.", + api_state.first); api_state.second.paused_ = false; } From 43bb74c360960761983fe23206bd011d99dec1c9 Mon Sep 17 00:00:00 2001 From: Venil Noronha Date: Fri, 17 Apr 2020 08:49:20 -0700 Subject: [PATCH 3/5] Add test Signed-off-by: Venil Noronha --- test/common/config/grpc_mux_impl_test.cc | 41 ++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/test/common/config/grpc_mux_impl_test.cc b/test/common/config/grpc_mux_impl_test.cc index 536836e534450..d060207896520 100644 --- a/test/common/config/grpc_mux_impl_test.cc +++ b/test/common/config/grpc_mux_impl_test.cc @@ -193,6 +193,47 @@ TEST_F(GrpcMuxImplTest, PauseResume) { grpc_mux_->pause("foo"); } +// Validate behavior when multiple type URL watches are maintained and the stream is reset +// even when paused. +TEST_F(GrpcMuxImplTest, ResetStreamWithPauseResume) { + InSequence s; + + Event::MockTimer* timer = nullptr; + Event::TimerCb timer_cb; + EXPECT_CALL(dispatcher_, createTimer_(_)).WillOnce(Invoke([&timer, &timer_cb](Event::TimerCb cb) { + timer_cb = cb; + EXPECT_EQ(nullptr, timer); + timer = new Event::MockTimer(); + return timer; + })); + + setup(); + auto foo_sub = grpc_mux_->addWatch("foo", {"x", "y"}, callbacks_); + grpc_mux_->pause("foo"); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + grpc_mux_->start(); + expectSendMessage("foo", {"x", "y"}, "", true); + grpc_mux_->resume("foo"); + + grpc_mux_->pause("foo"); // never resumed prior to stream termination. + EXPECT_CALL(callbacks_, + onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _)) + .Times(1); + EXPECT_CALL(random_, random()); + ASSERT_TRUE(timer != nullptr); // initialized from dispatcher mock. + EXPECT_CALL(*timer, enableTimer(_, _)); + grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Internal, ""); + EXPECT_EQ(0, control_plane_connected_state_.value()); + + grpc_mux_->pause("foo"); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, "", true); + timer_cb(); + grpc_mux_->resume("foo"); + + expectSendMessage("foo", {}, ""); +} + // Validate behavior when type URL mismatches occur. TEST_F(GrpcMuxImplTest, TypeUrlMismatch) { setup(); From bd93cfbcaa6b6133cfc138bd047fd77db19709a1 Mon Sep 17 00:00:00 2001 From: Venil Noronha Date: Sat, 18 Apr 2020 12:14:17 -0700 Subject: [PATCH 4/5] Simplify logic Signed-off-by: Venil Noronha --- source/common/config/grpc_mux_impl.cc | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 5a477bfe94d78..a3808f3274199 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -216,22 +216,16 @@ void GrpcMuxImpl::onStreamEstablished() { } void GrpcMuxImpl::onEstablishmentFailure() { - for (auto& api_state : api_state_) { - if (api_state.second.pending_) { - ENVOY_LOG(trace, "API {} pending during onEstablishmentFailure(), unsetting pending.", - api_state.first); - api_state.second.pending_ = false; - } - if (api_state.second.paused_) { - ENVOY_LOG(trace, "API {} paused during onEstablishmentFailure(), unpausing.", - api_state.first); - api_state.second.paused_ = false; - } - + for (const auto& api_state : api_state_) { for (auto watch : api_state.second.watches_) { watch->callbacks_.onConfigUpdateFailed( Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr); } + if (api_state.second.paused_) { + ENVOY_LOG(trace, "API {} paused during onEstablishmentFailure(), resuming.", + api_state.first); + resume(api_state.first); + } } } From cd1641e0dfedea8053338d6bfa73410ecd86347b Mon Sep 17 00:00:00 2001 From: Venil Noronha Date: Sat, 18 Apr 2020 13:54:30 -0700 Subject: [PATCH 5/5] Fix format Signed-off-by: Venil Noronha --- source/common/config/grpc_mux_impl.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index a3808f3274199..f7598a02b6126 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -222,8 +222,7 @@ void GrpcMuxImpl::onEstablishmentFailure() { Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr); } if (api_state.second.paused_) { - ENVOY_LOG(trace, "API {} paused during onEstablishmentFailure(), resuming.", - api_state.first); + ENVOY_LOG(trace, "API {} paused during onEstablishmentFailure(), resuming.", api_state.first); resume(api_state.first); } }