diff --git a/docs/root/configuration/http/http_filters/router_filter.rst b/docs/root/configuration/http/http_filters/router_filter.rst index d1a5d17f1a402..6575e0ed23362 100644 --- a/docs/root/configuration/http/http_filters/router_filter.rst +++ b/docs/root/configuration/http/http_filters/router_filter.rst @@ -388,7 +388,6 @@ owning HTTP connection manager. rq_direct_response, Counter, Total requests that resulted in a direct response rq_total, Counter, Total routed requests rq_reset_after_downstream_response_started, Counter, Total requests that were reset after downstream response had started - rq_retry_skipped_request_not_complete, Counter, Total retries that were skipped as the request is not yet complete .. _config_http_filters_router_vcluster_stats: diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 915fecc2d7969..b16f130fbaa97 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -10,6 +10,7 @@ Changes are applied to using :ref:`HTTP headers ` to the HTTP fault filter. * http: fixed a bug where the upgrade header was not cleared on responses to non-upgrade requests. Can be reverted temporarily by setting runtime feature `envoy.reloadable_features.fix_upgrade_response` to false. +* router: allow retries of streaming or incomplete requests. This removes stat `rq_retry_skipped_request_not_complete`. * tracing: tracing configuration has been made fully dynamic and every HTTP connection manager can now have a separate :ref:`tracing provider `. diff --git a/include/envoy/stream_info/stream_info.h b/include/envoy/stream_info/stream_info.h index 1afda3a2336a3..89824f4190f49 100644 --- a/include/envoy/stream_info/stream_info.h +++ b/include/envoy/stream_info/stream_info.h @@ -91,6 +91,10 @@ struct ResponseCodeDetailValues { // Envoy is doing non-streaming proxying, and the request payload exceeded // configured limits. const std::string RequestPayloadTooLarge = "request_payload_too_large"; + // Envoy is doing streaming proxying, but too much data arrived while waiting + // to attempt a retry. + const std::string RequestPayloadExceededRetryBufferLimit = + "request_payload_exceeded_retry_buffer_limit"; // Envoy is doing non-streaming proxying, and the response payload exceeded // configured limits. const std::string ResponsePayloadTooLArge = "response_payload_too_large"; diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 4171f18440be7..511d378e2cbad 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -667,11 +667,12 @@ void Filter::sendNoHealthyUpstreamResponse() { } Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) { - // upstream_requests_.size() cannot be 0 because we add to it unconditionally - // in decodeHeaders(). It cannot be > 1 because that only happens when a per + // upstream_requests_.size() cannot be > 1 because that only happens when a per // try timeout occurs with hedge_on_per_try_timeout enabled but the per - // try timeout timer is not started until onUpstreamComplete(). - ASSERT(upstream_requests_.size() == 1); + // try timeout timer is not started until onRequestComplete(). It could be zero + // if the first request attempt has already failed and a retry is waiting for + // a backoff timer. + ASSERT(upstream_requests_.size() <= 1); bool buffering = (retry_state_ && retry_state_->enabled()) || !active_shadow_policies_.empty(); if (buffering && @@ -681,13 +682,31 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea retry_state_.reset(); buffering = false; active_shadow_policies_.clear(); + + // If we had to abandon buffering and there's no request in progress, abort the request and + // clean up. This happens if the initial upstream request failed, and we are currently waiting + // for a backoff timer before starting the next upstream attempt. + if (upstream_requests_.empty()) { + cleanup(); + callbacks_->sendLocalReply( + Http::Code::InsufficientStorage, "exceeded request buffer limit while retrying upstream", + modify_headers_, absl::nullopt, + StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit); + return Http::FilterDataStatus::StopIterationNoBuffer; + } } + // If we aren't buffering and there is no active request, an abort should have occurred + // already. + ASSERT(buffering || !upstream_requests_.empty()); + if (buffering) { // If we are going to buffer for retries or shadowing, we need to make a copy before encoding // since it's all moves from here on. - Buffer::OwnedImpl copy(data); - upstream_requests_.front()->encodeData(copy, end_stream); + if (!upstream_requests_.empty()) { + Buffer::OwnedImpl copy(data); + upstream_requests_.front()->encodeData(copy, end_stream); + } // If we are potentially going to retry or shadow this request we need to buffer. // This will not cause the connection manager to 413 because before we hit the @@ -709,11 +728,12 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) { ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers); - // upstream_requests_.size() cannot be 0 because we add to it unconditionally - // in decodeHeaders(). It cannot be > 1 because that only happens when a per + // upstream_requests_.size() cannot be > 1 because that only happens when a per // try timeout occurs with hedge_on_per_try_timeout enabled but the per - // try timeout timer is not started until onUpstreamComplete(). - ASSERT(upstream_requests_.size() == 1); + // try timeout timer is not started until onRequestComplete(). It could be zero + // if the first request attempt has already failed and a retry is waiting for + // a backoff timer. + ASSERT(upstream_requests_.size() <= 1); downstream_trailers_ = &trailers; for (auto& upstream_request : upstream_requests_) { upstream_request->encodeTrailers(trailers); @@ -724,8 +744,10 @@ Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trail Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) { Http::MetadataMapPtr metadata_map_ptr = std::make_unique(metadata_map); - ASSERT(upstream_requests_.size() == 1); - upstream_requests_.front()->encodeMetadata(std::move(metadata_map_ptr)); + if (!upstream_requests_.empty()) { + // TODO(soya3129): Save metadata for retry, redirect and shadowing case. + upstream_requests_.front()->encodeMetadata(std::move(metadata_map_ptr)); + } return Http::FilterMetadataStatus::Continue; } @@ -869,8 +891,9 @@ void Filter::onSoftPerTryTimeout(UpstreamRequest& upstream_request) { RetryStatus retry_status = retry_state_->shouldHedgeRetryPerTryTimeout([this]() -> void { doRetry(); }); - if (retry_status == RetryStatus::Yes && setupRetry()) { - setupRetry(); + if (retry_status == RetryStatus::Yes) { + pending_retries_++; + // Don't increment upstream_host->stats().rq_error_ here, we'll do that // later if 1) we hit global timeout or 2) we get bad response headers // back. @@ -996,7 +1019,9 @@ bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason, const RetryStatus retry_status = retry_state_->shouldRetryReset(reset_reason, [this]() -> void { doRetry(); }); - if (retry_status == RetryStatus::Yes && setupRetry()) { + if (retry_status == RetryStatus::Yes) { + pending_retries_++; + if (upstream_request.upstreamHost()) { upstream_request.upstreamHost()->stats().rq_error_.inc(); } @@ -1184,19 +1209,18 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPt } else { const RetryStatus retry_status = retry_state_->shouldRetryHeaders(*headers, [this]() -> void { doRetry(); }); - // Capture upstream_host since setupRetry() in the following line will clear - // upstream_request. - const auto upstream_host = upstream_request.upstreamHost(); - if (retry_status == RetryStatus::Yes && setupRetry()) { + if (retry_status == RetryStatus::Yes) { + pending_retries_++; + upstream_request.upstreamHost()->stats().rq_error_.inc(); + Http::CodeStats& code_stats = httpContext().codeStats(); + code_stats.chargeBasicResponseStat(cluster_->statsScope(), config_.retry_, + static_cast(response_code)); + if (!end_stream) { upstream_request.resetStream(); } upstream_request.removeFromList(upstream_requests_); - Http::CodeStats& code_stats = httpContext().codeStats(); - code_stats.chargeBasicResponseStat(cluster_->statsScope(), config_.retry_, - static_cast(response_code)); - upstream_host->stats().rq_error_.inc(); return; } else if (retry_status == RetryStatus::NoOverflow) { callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); @@ -1375,23 +1399,6 @@ void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) { cleanup(); } -bool Filter::setupRetry() { - // If we responded before the request was complete we don't bother doing a retry. This may not - // catch certain cases where we are in full streaming mode and we have a connect timeout or an - // overflow of some kind. However, in many cases deployments will use the buffer filter before - // this filter which will make this a non-issue. The implementation of supporting retry in cases - // where the request is not complete is more complicated so we will start with this for now. - if (!downstream_end_stream_) { - config_.stats_.rq_retry_skipped_request_not_complete_.inc(); - return false; - } - pending_retries_++; - - ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_); - - return true; -} - bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers, UpstreamRequest& upstream_request) { ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_); @@ -1412,7 +1419,7 @@ bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers, const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState(); - // As with setupRetry, redirects are not supported for streaming requests yet. + // Redirects are not supported for streaming requests yet. if (downstream_end_stream_ && !callbacks_->decodingBuffer() && // Redirects with body not yet supported. location != nullptr && @@ -1432,6 +1439,8 @@ bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers, } void Filter::doRetry() { + ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_); + is_retry_ = true; attempt_count_++; ASSERT(pending_retries_ > 0); @@ -1454,10 +1463,10 @@ void Filter::doRetry() { downstream_headers_->setEnvoyAttemptCount(attempt_count_); } - ASSERT(response_timeout_ || timeout_.global_timeout_.count() == 0); UpstreamRequest* upstream_request_tmp = upstream_request.get(); upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_); - upstream_requests_.front()->encodeHeaders(!callbacks_->decodingBuffer() && !downstream_trailers_); + upstream_requests_.front()->encodeHeaders(!callbacks_->decodingBuffer() && + !downstream_trailers_ && downstream_end_stream_); // It's possible we got immediately reset which means the upstream request we just // added to the front of the list might have been removed, so we need to check to make // sure we don't encodeData on the wrong request. @@ -1465,7 +1474,7 @@ void Filter::doRetry() { if (callbacks_->decodingBuffer()) { // If we are doing a retry we need to make a copy. Buffer::OwnedImpl copy(*callbacks_->decodingBuffer()); - upstream_requests_.front()->encodeData(copy, !downstream_trailers_); + upstream_requests_.front()->encodeData(copy, !downstream_trailers_ && downstream_end_stream_); } if (downstream_trailers_) { diff --git a/source/common/router/router.h b/source/common/router/router.h index fb5e529454694..9f532d08fb5f6 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -46,8 +46,7 @@ namespace Router { COUNTER(rq_redirect) \ COUNTER(rq_direct_response) \ COUNTER(rq_total) \ - COUNTER(rq_reset_after_downstream_response_started) \ - COUNTER(rq_retry_skipped_request_not_complete) + COUNTER(rq_reset_after_downstream_response_started) // clang-format on /** @@ -492,8 +491,6 @@ class Filter : Logger::Loggable, // for the remaining upstream requests to return. void resetOtherUpstreams(UpstreamRequest& upstream_request); void sendNoHealthyUpstreamResponse(); - // TODO(soya3129): Save metadata for retry, redirect and shadowing case. - bool setupRetry(); bool setupRedirect(const Http::ResponseHeaderMap& headers, UpstreamRequest& upstream_request); void updateOutlierDetection(Upstream::Outlier::Result result, UpstreamRequest& upstream_request, absl::optional code); diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index 36e5eab6ddc72..1d47bd682b3b8 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -2752,7 +2752,9 @@ TEST_F(RouterTest, BadHeadersDroppedIfPreviousRetryScheduled) { response_decoder2->decodeHeaders(std::move(response_headers2), true); } -TEST_F(RouterTest, RetryRequestNotComplete) { +// Test retrying a request, when the first attempt fails before the client +// has sent any of the body. +TEST_F(RouterTest, RetryRequestBeforeBody) { NiceMock encoder1; Http::ResponseDecoder* response_decoder = nullptr; EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) @@ -2763,23 +2765,359 @@ TEST_F(RouterTest, RetryRequestNotComplete) { callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_); return nullptr; })); - EXPECT_CALL(callbacks_.stream_info_, - setResponseFlag(StreamInfo::ResponseFlag::UpstreamRemoteReset)); - EXPECT_CALL(callbacks_.stream_info_, onUpstreamHostSelected(_)) - .WillOnce(Invoke([&](const Upstream::HostDescriptionConstSharedPtr host) -> void { - EXPECT_EQ(host_address_, host->address()); + expectResponseTimerCreate(); + + Http::TestRequestHeaderMapImpl headers{ + {"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}, {"myheader", "present"}}; + HttpTestUtility::addDefaultHeaders(headers); + router_.decodeHeaders(headers, false); + + router_.retry_state_->expectResetRetry(); + encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset); + + NiceMock encoder2; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder2, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + EXPECT_CALL(encoder2, encodeHeaders(HeaderHasValueRef("myheader", "present"), false)); + router_.retry_state_->callback_(); + EXPECT_EQ(2U, + callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value()); + EXPECT_TRUE(verifyHostUpstreamStats(0, 1)); + + // Complete request. Ensure original headers are present. + const std::string body("body"); + EXPECT_CALL(encoder2, encodeData(BufferStringEqual(body), true)); + Buffer::OwnedImpl buf(body); + router_.decodeData(buf, true); + + // Send successful response, verify success. + Http::ResponseHeaderMapPtr response_headers( + new Http::TestResponseHeaderMapImpl({{":status", "200"}})); + EXPECT_CALL(callbacks_, encodeHeaders_(_, _)) + .WillOnce(Invoke([&](Http::ResponseHeaderMap& headers, bool) -> void { + EXPECT_EQ(headers.Status()->value(), "200"); })); + response_decoder->decodeHeaders(std::move(response_headers), true); + EXPECT_TRUE(verifyHostUpstreamStats(1, 1)); +} - Http::TestRequestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}}; +// Test retrying a request, when the first attempt fails while the client +// is sending the body. +TEST_F(RouterTest, RetryRequestDuringBody) { + Buffer::OwnedImpl decoding_buffer; + EXPECT_CALL(callbacks_, decodingBuffer()).WillRepeatedly(Return(&decoding_buffer)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) { decoding_buffer.move(data); })); + + NiceMock encoder1; + Http::ResponseDecoder* response_decoder = nullptr; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + expectResponseTimerCreate(); + + Http::TestRequestHeaderMapImpl headers{ + {"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}, {"myheader", "present"}}; HttpTestUtility::addDefaultHeaders(headers); router_.decodeHeaders(headers, false); + const std::string body1("body1"); + Buffer::OwnedImpl buf1(body1); + EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true)); + router_.decodeData(buf1, false); + + router_.retry_state_->expectResetRetry(); + encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset); + + NiceMock encoder2; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder2, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + + EXPECT_CALL(encoder2, encodeHeaders(HeaderHasValueRef("myheader", "present"), false)); + EXPECT_CALL(encoder2, encodeData(BufferStringEqual(body1), false)); + router_.retry_state_->callback_(); + EXPECT_EQ(2U, + callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value()); + EXPECT_TRUE(verifyHostUpstreamStats(0, 1)); + + // Complete request. Ensure original headers are present. + const std::string body2("body2"); + EXPECT_CALL(encoder2, encodeData(BufferStringEqual(body2), true)); + Buffer::OwnedImpl buf2(body2); + EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true)); + router_.decodeData(buf2, true); + + // Send successful response, verify success. + Http::ResponseHeaderMapPtr response_headers( + new Http::TestResponseHeaderMapImpl({{":status", "200"}})); + EXPECT_CALL(callbacks_, encodeHeaders_(_, _)) + .WillOnce(Invoke([&](Http::ResponseHeaderMap& headers, bool) -> void { + EXPECT_EQ(headers.Status()->value(), "200"); + })); + response_decoder->decodeHeaders(std::move(response_headers), true); + EXPECT_TRUE(verifyHostUpstreamStats(1, 1)); +} + +// Test retrying a request, when the first attempt fails while the client +// is sending the body, with more data arriving in between upstream attempts +// (which would normally happen during the backoff timer interval), but not end_stream. +TEST_F(RouterTest, RetryRequestDuringBodyDataBetweenAttemptsNotEndStream) { + Buffer::OwnedImpl decoding_buffer; + EXPECT_CALL(callbacks_, decodingBuffer()).WillRepeatedly(Return(&decoding_buffer)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) { decoding_buffer.move(data); })); + + NiceMock encoder1; + Http::ResponseDecoder* response_decoder = nullptr; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + expectResponseTimerCreate(); + + Http::TestRequestHeaderMapImpl headers{ + {"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}, {"myheader", "present"}}; + HttpTestUtility::addDefaultHeaders(headers); + router_.decodeHeaders(headers, false); + const std::string body1("body1"); + Buffer::OwnedImpl buf1(body1); + EXPECT_CALL(*router_.retry_state_, enabled()).Times(3).WillRepeatedly(Return(true)); + router_.decodeData(buf1, false); + + router_.retry_state_->expectResetRetry(); + encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset); + + const std::string body2("body2"); + Buffer::OwnedImpl buf2(body2); + router_.decodeData(buf2, false); + + NiceMock encoder2; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder2, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + + EXPECT_CALL(encoder2, encodeHeaders(HeaderHasValueRef("myheader", "present"), false)); + EXPECT_CALL(encoder2, encodeData(BufferStringEqual(body1 + body2), false)); + router_.retry_state_->callback_(); + EXPECT_EQ(2U, + callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value()); + EXPECT_TRUE(verifyHostUpstreamStats(0, 1)); + + // Complete request. Ensure original headers are present. + const std::string body3("body3"); + EXPECT_CALL(encoder2, encodeData(BufferStringEqual(body3), true)); + Buffer::OwnedImpl buf3(body3); + router_.decodeData(buf3, true); + + // Send successful response, verify success. + Http::ResponseHeaderMapPtr response_headers( + new Http::TestResponseHeaderMapImpl({{":status", "200"}})); + EXPECT_CALL(callbacks_, encodeHeaders_(_, _)) + .WillOnce(Invoke([&](Http::ResponseHeaderMap& headers, bool) -> void { + EXPECT_EQ(headers.Status()->value(), "200"); + })); + response_decoder->decodeHeaders(std::move(response_headers), true); + EXPECT_TRUE(verifyHostUpstreamStats(1, 1)); +} + +// Test retrying a request, when the first attempt fails while the client +// is sending the body, with the rest of the request arriving in between upstream +// request attempts. +TEST_F(RouterTest, RetryRequestDuringBodyCompleteBetweenAttempts) { + Buffer::OwnedImpl decoding_buffer; + EXPECT_CALL(callbacks_, decodingBuffer()).WillRepeatedly(Return(&decoding_buffer)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) { decoding_buffer.move(data); })); + + NiceMock encoder1; + Http::ResponseDecoder* response_decoder = nullptr; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + + Http::TestRequestHeaderMapImpl headers{ + {"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}, {"myheader", "present"}}; + HttpTestUtility::addDefaultHeaders(headers); + router_.decodeHeaders(headers, false); + const std::string body1("body1"); + Buffer::OwnedImpl buf1(body1); + EXPECT_CALL(*router_.retry_state_, enabled()).Times(2).WillRepeatedly(Return(true)); + router_.decodeData(buf1, false); + + router_.retry_state_->expectResetRetry(); + encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset); + + // Complete request while there is no upstream request. + const std::string body2("body2"); + Buffer::OwnedImpl buf2(body2); + router_.decodeData(buf2, true); + + NiceMock encoder2; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder2, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + + EXPECT_CALL(encoder2, encodeHeaders(HeaderHasValueRef("myheader", "present"), false)); + EXPECT_CALL(encoder2, encodeData(BufferStringEqual(body1 + body2), true)); + router_.retry_state_->callback_(); + EXPECT_EQ(2U, + callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value()); + EXPECT_TRUE(verifyHostUpstreamStats(0, 1)); + + // Send successful response, verify success. + Http::ResponseHeaderMapPtr response_headers( + new Http::TestResponseHeaderMapImpl({{":status", "200"}})); + EXPECT_CALL(callbacks_, encodeHeaders_(_, _)) + .WillOnce(Invoke([&](Http::ResponseHeaderMap& headers, bool) -> void { + EXPECT_EQ(headers.Status()->value(), "200"); + })); + response_decoder->decodeHeaders(std::move(response_headers), true); + EXPECT_TRUE(verifyHostUpstreamStats(1, 1)); +} + +// Test retrying a request, when the first attempt fails while the client +// is sending the body, with the trailers arriving in between upstream +// request attempts. +TEST_F(RouterTest, RetryRequestDuringBodyTrailerBetweenAttempts) { + Buffer::OwnedImpl decoding_buffer; + EXPECT_CALL(callbacks_, decodingBuffer()).WillRepeatedly(Return(&decoding_buffer)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) { decoding_buffer.move(data); })); + + NiceMock encoder1; + Http::ResponseDecoder* response_decoder = nullptr; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + + Http::TestRequestHeaderMapImpl headers{ + {"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}, {"myheader", "present"}}; + HttpTestUtility::addDefaultHeaders(headers); + router_.decodeHeaders(headers, false); + const std::string body1("body1"); + Buffer::OwnedImpl buf1(body1); + EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true)); + router_.decodeData(buf1, false); + + router_.retry_state_->expectResetRetry(); + encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset); + + // Complete request while there is no upstream request. + Http::TestRequestTrailerMapImpl trailers{{"some", "trailer"}}; + router_.decodeTrailers(trailers); + + NiceMock encoder2; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder2, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + + EXPECT_CALL(encoder2, encodeHeaders(HeaderHasValueRef("myheader", "present"), false)); + EXPECT_CALL(encoder2, encodeData(BufferStringEqual(body1), false)); + EXPECT_CALL(encoder2, encodeTrailers(HeaderMapEqualRef(&trailers))); + router_.retry_state_->callback_(); + EXPECT_EQ(2U, + callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value()); + EXPECT_TRUE(verifyHostUpstreamStats(0, 1)); + + // Send successful response, verify success. + Http::ResponseHeaderMapPtr response_headers( + new Http::TestResponseHeaderMapImpl({{":status", "200"}})); + EXPECT_CALL(callbacks_, encodeHeaders_(_, _)) + .WillOnce(Invoke([&](Http::ResponseHeaderMap& headers, bool) -> void { + EXPECT_EQ(headers.Status()->value(), "200"); + })); + response_decoder->decodeHeaders(std::move(response_headers), true); + EXPECT_TRUE(verifyHostUpstreamStats(1, 1)); +} + +// Test retrying a request, when the first attempt fails while the client +// is sending the body, with the rest of the request arriving in between upstream +// request attempts, but exceeding the buffer limit causing a downstream request abort. +TEST_F(RouterTest, RetryRequestDuringBodyBufferLimitExceeded) { + Buffer::OwnedImpl decoding_buffer; + EXPECT_CALL(callbacks_, decodingBuffer()).WillRepeatedly(Return(&decoding_buffer)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)) + .WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) { decoding_buffer.move(data); })); + EXPECT_CALL(callbacks_.route_->route_entry_, retryShadowBufferLimit()).WillOnce(Return(10)); + + NiceMock encoder1; + Http::ResponseDecoder* response_decoder = nullptr; + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke( + [&](Http::ResponseDecoder& decoder, + Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { + response_decoder = &decoder; + callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_); + return nullptr; + })); + + Http::TestRequestHeaderMapImpl headers{ + {"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}, {"myheader", "present"}}; + HttpTestUtility::addDefaultHeaders(headers); + router_.decodeHeaders(headers, false); + const std::string body1("body1"); + Buffer::OwnedImpl buf1(body1); + EXPECT_CALL(*router_.retry_state_, enabled()).Times(2).WillRepeatedly(Return(true)); + router_.decodeData(buf1, false); router_.retry_state_->expectResetRetry(); - EXPECT_CALL(cm_.conn_pool_.host_->outlier_detector_, - putResult(Upstream::Outlier::Result::LocalOriginConnectFailed, _)); encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset); + + // Complete request while there is no upstream request. + const std::string body2(50, 'a'); + Buffer::OwnedImpl buf2(body2); + router_.decodeData(buf2, false); + + EXPECT_EQ(callbacks_.details_, "request_payload_exceeded_retry_buffer_limit"); + EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_ + .counter("retry_or_shadow_abandoned") + .value()); EXPECT_TRUE(verifyHostUpstreamStats(0, 1)); - EXPECT_EQ(1UL, stats_store_.counter("test.rq_retry_skipped_request_not_complete").value()); } // Two requests are sent (slow request + hedged retry) and then global timeout diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index 361c94d2f14a0..e1bc4a9ef7270 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -299,6 +299,117 @@ TEST_P(ProtocolIntegrationTest, Retry) { EXPECT_EQ(512U, response->body().size()); } +TEST_P(ProtocolIntegrationTest, RetryStreaming) { + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + auto encoder_decoder = + codec_client_->startRequest(Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-forwarded-for", "10.0.0.1"}, + {"x-envoy-retry-on", "5xx"}}); + auto& encoder = encoder_decoder.first; + auto& response = encoder_decoder.second; + + // Send some data, but not the entire body. + std::string data(1024, 'a'); + Buffer::OwnedImpl send1(data); + encoder.encodeData(send1, false); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + + // Send back an upstream failure. + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, false); + + if (fake_upstreams_[0]->httpType() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + } else { + ASSERT_TRUE(upstream_request_->waitForReset()); + } + + // Wait for a retry. Ensure all data, both before and after the retry, is received. + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + + // Finish the request. + std::string data2(512, 'b'); + Buffer::OwnedImpl send2(data2); + encoder.encodeData(send2, true); + std::string combined_request_data = data + data2; + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, combined_request_data)); + + upstream_request_->encodeHeaders(default_response_headers_, false); + upstream_request_->encodeData(512, true); + + response->waitForEndStream(); + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ(combined_request_data.size(), upstream_request_->bodyLength()); + + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + EXPECT_EQ(512U, response->body().size()); +} + +TEST_P(ProtocolIntegrationTest, RetryStreamingCancelDueToBufferOverflow) { + config_helper_.addConfigModifier( + [](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) { + auto* route = hcm.mutable_route_config()->mutable_virtual_hosts(0)->mutable_routes(0); + + route->mutable_per_request_buffer_limit_bytes()->set_value(1024); + route->mutable_route() + ->mutable_retry_policy() + ->mutable_retry_back_off() + ->mutable_base_interval() + ->MergeFrom( + ProtobufUtil::TimeUtil::MillisecondsToDuration(100000000)); // Effectively infinity. + }); + initialize(); + + codec_client_ = makeHttpConnection(lookupPort("http")); + auto encoder_decoder = + codec_client_->startRequest(Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-forwarded-for", "10.0.0.1"}, + {"x-envoy-retry-on", "5xx"}}); + auto& encoder = encoder_decoder.first; + auto& response = encoder_decoder.second; + + // Send some data, but less than the buffer limit, and not end-stream + std::string data(64, 'a'); + Buffer::OwnedImpl send1(data); + encoder.encodeData(send1, false); + + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + + // Send back an upstream failure. + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, false); + + if (fake_upstreams_[0]->httpType() == FakeHttpConnection::Type::HTTP1) { + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + } else { + ASSERT_TRUE(upstream_request_->waitForReset()); + } + + // Overflow the request buffer limit. Because the retry base interval is infinity, no + // request will be in progress. This will cause the request to be aborted and an error + // to be returned to the client. + std::string data2(2048, 'b'); + Buffer::OwnedImpl send2(data2); + encoder.encodeData(send2, false); + + response->waitForEndStream(); + + EXPECT_TRUE(response->complete()); + EXPECT_EQ("507", response->headers().Status()->value().getStringView()); + test_server_->waitForCounterEq("cluster.cluster_0.retry_or_shadow_abandoned", 1); +} + // Tests that the x-envoy-attempt-count header is properly set on the upstream request and the // downstream response, and updated after the request is retried. TEST_P(DownstreamProtocolIntegrationTest, RetryAttemptCountHeader) {