diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index cdfb92c8965ce..af81847471514 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -292,7 +292,12 @@ class AsyncStreamImpl : public AsyncClient::Stream, const Tracing::Config& tracingConfig() override { return tracing_config_; } void continueDecoding() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } HeaderMap& addDecodedTrailers() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } - void addDecodedData(Buffer::Instance&, bool) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + void addDecodedData(Buffer::Instance&, bool) override { + // This should only be called if the user has set up buffering. The request is already fully + // buffered. Note that this is only called via the async client's internal use of the router + // filter which uses this function for buffering. + ASSERT(buffered_body_ != nullptr); + } const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); } void sendLocalReply(Code code, absl::string_view body, std::function modify_headers, @@ -360,6 +365,10 @@ class AsyncRequestImpl final : public AsyncClient::Request, void onReset() override; // Http::StreamDecoderFilterCallbacks + void addDecodedData(Buffer::Instance&, bool) override { + // The request is already fully buffered. Note that this is only called via the async client's + // internal use of the router filter which uses this function for buffering. + } const Buffer::Instance* decodingBuffer() override { return request_->body().get(); } MessagePtr request_; diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 4169ef963a568..6a2de2d77a30a 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -424,11 +424,18 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea do_shadowing_ = false; } - // If we are going to buffer for retries or shadowing, we need to make a copy before encoding - // since it's all moves from here on. if (buffering) { + // If we are going to buffer for retries or shadowing, we need to make a copy before encoding + // since it's all moves from here on. Buffer::OwnedImpl copy(data); upstream_request_->encodeData(copy, end_stream); + + // If we are potentially going to retry or shadow this request we need to buffer. + // This will not cause the connection manager to 413 because before we hit the + // buffer limit we give up on retries and buffering. We must buffer using addDecodedData() + // so that all buffered data is available by the time we do request complete processing and + // potentially shadow. + callbacks_->addDecodedData(data, true); } else { upstream_request_->encodeData(data, end_stream); } @@ -437,11 +444,7 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea onRequestComplete(); } - // If we are potentially going to retry or shadow this request we need to buffer. - // This will not cause the connection manager to 413 because before we hit the - // buffer limit we give up on retries and buffering. - return buffering ? Http::FilterDataStatus::StopIterationAndBuffer - : Http::FilterDataStatus::StopIterationNoBuffer; + return Http::FilterDataStatus::StopIterationNoBuffer; } Http::FilterTrailersStatus Filter::decodeTrailers(Http::HeaderMap& trailers) { diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index b53cb56010040..c237bebacde07 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -1700,7 +1700,8 @@ TEST_F(RouterTest, RetryUpstream5xxNotComplete) { Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello")); EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true)); - EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, router_.decodeData(*body_data, false)); Http::TestHeaderMapImpl trailers{{"some", "trailer"}}; router_.decodeTrailers(trailers); @@ -1826,7 +1827,8 @@ TEST_F(RouterTest, RetryRespsectsMaxHostSelectionCount) { Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello")); EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true)); - EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, router_.decodeData(*body_data, false)); Http::TestHeaderMapImpl trailers{{"some", "trailer"}}; router_.decodeTrailers(trailers); @@ -1892,7 +1894,8 @@ TEST_F(RouterTest, RetryRespectsRetryHostPredicate) { Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello")); EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true)); - EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, router_.decodeData(*body_data, false)); Http::TestHeaderMapImpl trailers{{"some", "trailer"}}; router_.decodeTrailers(trailers); @@ -2086,7 +2089,8 @@ TEST_F(RouterTest, Shadow) { router_.decodeHeaders(headers, false); Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello")); - EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false)); + EXPECT_CALL(callbacks_, addDecodedData(_, true)); + EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, router_.decodeData(*body_data, false)); Http::TestHeaderMapImpl trailers{{"some", "trailer"}}; EXPECT_CALL(callbacks_, decodingBuffer()) diff --git a/test/integration/http2_integration_test.cc b/test/integration/http2_integration_test.cc index d8475e70f74b8..2cbb1f4688927 100644 --- a/test/integration/http2_integration_test.cc +++ b/test/integration/http2_integration_test.cc @@ -337,6 +337,54 @@ TEST_P(Http2IntegrationTest, IdleTimeoutWithSimultaneousRequests) { test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_idle_timeout", 2); } +// Test request mirroring / shadowing with an HTTP/2 downstream and a request with a body. +TEST_P(Http2IntegrationTest, RequestMirrorWithBody) { + config_helper_.addConfigModifier( + [&](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) + -> void { + hcm.mutable_route_config() + ->mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->mutable_request_mirror_policy() + ->set_cluster("cluster_0"); + }); + + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Send request with body. + IntegrationStreamDecoderPtr request = + codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}, + "hello"); + + // Wait for the first request as well as the shadow. + waitForNextUpstreamRequest(); + + FakeHttpConnectionPtr fake_upstream_connection2; + FakeStreamPtr upstream_request2; + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection2)); + ASSERT_TRUE(fake_upstream_connection2->waitForNewStream(*dispatcher_, upstream_request2)); + ASSERT_TRUE(upstream_request2->waitForEndStream(*dispatcher_)); + + // Make sure both requests have a body. Also check the shadow for the shadow headers. + EXPECT_EQ("hello", upstream_request_->body().toString()); + EXPECT_EQ("hello", upstream_request2->body().toString()); + EXPECT_EQ("host-shadow", upstream_request2->headers().Host()->value().getStringView()); + + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, true); + upstream_request2->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, true); + request->waitForEndStream(); + EXPECT_EQ("200", request->headers().Status()->value().getStringView()); + + // Cleanup. + ASSERT_TRUE(fake_upstream_connection2->close()); + ASSERT_TRUE(fake_upstream_connection2->waitForDisconnect()); +} + // Interleave two requests and responses and make sure the HTTP2 stack handles this correctly. void Http2IntegrationTest::simultaneousRequest(int32_t request1_bytes, int32_t request2_bytes) { FakeHttpConnectionPtr fake_upstream_connection1;