diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index 1e495a2a7f94c..f7598a02b6126 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -221,6 +221,10 @@ void GrpcMuxImpl::onEstablishmentFailure() { watch->callbacks_.onConfigUpdateFailed( Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr); } + if (api_state.second.paused_) { + ENVOY_LOG(trace, "API {} paused during onEstablishmentFailure(), resuming.", api_state.first); + resume(api_state.first); + } } } @@ -244,4 +248,4 @@ void GrpcMuxImpl::drainRequests() { } } // namespace Config -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/test/common/config/grpc_mux_impl_test.cc b/test/common/config/grpc_mux_impl_test.cc index 536836e534450..d060207896520 100644 --- a/test/common/config/grpc_mux_impl_test.cc +++ b/test/common/config/grpc_mux_impl_test.cc @@ -193,6 +193,47 @@ TEST_F(GrpcMuxImplTest, PauseResume) { grpc_mux_->pause("foo"); } +// Validate behavior when multiple type URL watches are maintained and the stream is reset +// even when paused. +TEST_F(GrpcMuxImplTest, ResetStreamWithPauseResume) { + InSequence s; + + Event::MockTimer* timer = nullptr; + Event::TimerCb timer_cb; + EXPECT_CALL(dispatcher_, createTimer_(_)).WillOnce(Invoke([&timer, &timer_cb](Event::TimerCb cb) { + timer_cb = cb; + EXPECT_EQ(nullptr, timer); + timer = new Event::MockTimer(); + return timer; + })); + + setup(); + auto foo_sub = grpc_mux_->addWatch("foo", {"x", "y"}, callbacks_); + grpc_mux_->pause("foo"); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + grpc_mux_->start(); + expectSendMessage("foo", {"x", "y"}, "", true); + grpc_mux_->resume("foo"); + + grpc_mux_->pause("foo"); // never resumed prior to stream termination. + EXPECT_CALL(callbacks_, + onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _)) + .Times(1); + EXPECT_CALL(random_, random()); + ASSERT_TRUE(timer != nullptr); // initialized from dispatcher mock. + EXPECT_CALL(*timer, enableTimer(_, _)); + grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Internal, ""); + EXPECT_EQ(0, control_plane_connected_state_.value()); + + grpc_mux_->pause("foo"); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("foo", {"x", "y"}, "", true); + timer_cb(); + grpc_mux_->resume("foo"); + + expectSendMessage("foo", {}, ""); +} + // Validate behavior when type URL mismatches occur. TEST_F(GrpcMuxImplTest, TypeUrlMismatch) { setup();