diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 818214ef86480..085e6c4a71429 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -98,6 +98,12 @@ UpstreamRequest::~UpstreamRequest() { upstream_log->log(parent_.downstreamHeaders(), upstream_headers_.get(), upstream_trailers_.get(), stream_info_); } + + while (downstream_data_disabled_ != 0) { + parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark(); + parent_.cluster()->stats().upstream_flow_control_drained_total_.inc(); + --downstream_data_disabled_; + } } void UpstreamRequest::decode100ContinueHeaders(Http::ResponseHeaderMapPtr&& headers) { @@ -421,7 +427,6 @@ void UpstreamRequest::DownstreamWatermarkManager::onAboveWriteBufferHighWatermar // can disable reads from upstream. ASSERT(!parent_.parent_.finalUpstreamRequest() || &parent_ == parent_.parent_.finalUpstreamRequest()); - // The downstream connection is overrun. Pause reads from upstream. // If there are multiple calls to readDisable either the codec (H2) or the underlying // Network::Connection (H1) will handle reference counting. @@ -451,6 +456,7 @@ void UpstreamRequest::disableDataFromDownstreamForFlowControl() { ASSERT(parent_.upstreamRequests().size() == 1 || parent_.downstreamEndStream()); parent_.cluster()->stats().upstream_flow_control_backed_up_total_.inc(); parent_.callbacks()->onDecoderFilterAboveWriteBufferHighWatermark(); + ++downstream_data_disabled_; } void UpstreamRequest::enableDataFromDownstreamForFlowControl() { @@ -466,6 +472,10 @@ void UpstreamRequest::enableDataFromDownstreamForFlowControl() { ASSERT(parent_.upstreamRequests().size() == 1 || parent_.downstreamEndStream()); parent_.cluster()->stats().upstream_flow_control_drained_total_.inc(); parent_.callbacks()->onDecoderFilterBelowWriteBufferLowWatermark(); + ASSERT(downstream_data_disabled_ != 0); + if (downstream_data_disabled_ > 0) { + --downstream_data_disabled_; + } } void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index 0e85a21ddae1b..a10f42163abf9 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -159,6 +159,8 @@ class UpstreamRequest : public Logger::Loggable, Http::ResponseTrailerMapPtr upstream_trailers_; Http::MetadataMapVector downstream_metadata_map_vector_; + // Tracks the number of times the flow of data from downstream has been disabled. + uint32_t downstream_data_disabled_{}; bool calling_encode_headers_ : 1; bool upstream_canary_ : 1; bool decode_complete_ : 1; diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index d5395ebd3d25c..1a0b47b730c50 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -5674,9 +5674,10 @@ class WatermarkTest : public RouterTest { .WillOnce(Return(std::chrono::milliseconds(0))); EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0); - EXPECT_CALL(stream_, addCallbacks(_)).WillOnce(Invoke([&](Http::StreamCallbacks& callbacks) { - stream_callbacks_ = &callbacks; - })); + EXPECT_CALL(stream_, addCallbacks(_)) + .Times(num_add_callbacks_) + .WillOnce( + Invoke([&](Http::StreamCallbacks& callbacks) { stream_callbacks_ = &callbacks; })); EXPECT_CALL(encoder_, getStream()).WillRepeatedly(ReturnRef(stream_)); EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) .WillOnce(Invoke( @@ -5707,6 +5708,7 @@ class WatermarkTest : public RouterTest { Http::ResponseDecoder* response_decoder_ = nullptr; Http::TestRequestHeaderMapImpl headers_; Http::ConnectionPool::Callbacks* pool_callbacks_{nullptr}; + int num_add_callbacks_{1}; }; TEST_F(WatermarkTest, DownstreamWatermarks) { @@ -5786,7 +5788,29 @@ TEST_F(WatermarkTest, FilterWatermarks) { .value()); sendResponse(); -} // namespace Router +} + +TEST_F(WatermarkTest, FilterWatermarksUnwound) { + num_add_callbacks_ = 0; + EXPECT_CALL(callbacks_, decoderBufferLimit()).Times(3).WillRepeatedly(Return(10)); + router_.setDecoderFilterCallbacks(callbacks_); + // Send the headers sans-fin, and don't flag the pool as ready. + sendRequest(false, false); + + // Send 11 bytes of body to fill the 10 byte buffer. + Buffer::OwnedImpl data("1234567890!"); + router_.decodeData(data, false); + EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_flow_control_backed_up_total") + .value()); + + // Set up a pool failure, and make sure the flow control blockage is undone. + pool_callbacks_->onPoolFailure(Http::ConnectionPool::PoolFailureReason::RemoteConnectionFailure, + absl::string_view(), nullptr); + EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("upstream_flow_control_drained_total") + .value()); +} // Same as RetryRequestNotComplete but with decodeData larger than the buffer // limit, no retry will occur.