Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,12 @@ class AsyncStreamImpl : public AsyncClient::Stream,
const Tracing::Config& tracingConfig() override { return tracing_config_; }
void continueDecoding() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
HeaderMap& addDecodedTrailers() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
void addDecodedData(Buffer::Instance&, bool) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
void addDecodedData(Buffer::Instance&, bool) override {
// This should only be called if the user has set up buffering. The request is already fully
// buffered. Note that this is only called via the async client's internal use of the router
// filter which uses this function for buffering.
ASSERT(buffered_body_ != nullptr);
}
const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
void sendLocalReply(Code code, absl::string_view body,
std::function<void(HeaderMap& headers)> modify_headers,
Expand Down Expand Up @@ -360,6 +365,10 @@ class AsyncRequestImpl final : public AsyncClient::Request,
void onReset() override;

// Http::StreamDecoderFilterCallbacks
void addDecodedData(Buffer::Instance&, bool) override {
// The request is already fully buffered. Note that this is only called via the async client's
// internal use of the router filter which uses this function for buffering.
}
const Buffer::Instance* decodingBuffer() override { return request_->body().get(); }

MessagePtr request_;
Expand Down
17 changes: 10 additions & 7 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,18 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea
do_shadowing_ = false;
}

// If we are going to buffer for retries or shadowing, we need to make a copy before encoding
// since it's all moves from here on.
if (buffering) {
// If we are going to buffer for retries or shadowing, we need to make a copy before encoding
// since it's all moves from here on.
Buffer::OwnedImpl copy(data);
upstream_request_->encodeData(copy, end_stream);

// If we are potentially going to retry or shadow this request we need to buffer.
// This will not cause the connection manager to 413 because before we hit the
// buffer limit we give up on retries and buffering. We must buffer using addDecodedData()
// so that all buffered data is available by the time we do request complete processing and
// potentially shadow.
callbacks_->addDecodedData(data, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the second time in as many weeks I've seen us have to do this workaround.

Not required for this PR but any thoughts for a change we could make to avoid this gotcha?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this has come up a few times in slightly different contexts. I will think about this. I don't think there are any easy fixes here given the different permutations of things we need to support. The only thing I can think of quickly is to somehow indicate that a filter is of a particular type, and then the HCM would "pre-buffer." With that said, even that wouldn't work for the router filter because we buffer sometimes and not others...

} else {
upstream_request_->encodeData(data, end_stream);
}
Expand All @@ -437,11 +444,7 @@ Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_strea
onRequestComplete();
}

// If we are potentially going to retry or shadow this request we need to buffer.
// This will not cause the connection manager to 413 because before we hit the
// buffer limit we give up on retries and buffering.
return buffering ? Http::FilterDataStatus::StopIterationAndBuffer
: Http::FilterDataStatus::StopIterationNoBuffer;
return Http::FilterDataStatus::StopIterationNoBuffer;
}

Http::FilterTrailersStatus Filter::decodeTrailers(Http::HeaderMap& trailers) {
Expand Down
12 changes: 8 additions & 4 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,8 @@ TEST_F(RouterTest, RetryUpstream5xxNotComplete) {

Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello"));
EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false));
EXPECT_CALL(callbacks_, addDecodedData(_, true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, router_.decodeData(*body_data, false));

Http::TestHeaderMapImpl trailers{{"some", "trailer"}};
router_.decodeTrailers(trailers);
Expand Down Expand Up @@ -1826,7 +1827,8 @@ TEST_F(RouterTest, RetryRespsectsMaxHostSelectionCount) {

Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello"));
EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false));
EXPECT_CALL(callbacks_, addDecodedData(_, true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, router_.decodeData(*body_data, false));

Http::TestHeaderMapImpl trailers{{"some", "trailer"}};
router_.decodeTrailers(trailers);
Expand Down Expand Up @@ -1892,7 +1894,8 @@ TEST_F(RouterTest, RetryRespectsRetryHostPredicate) {

Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello"));
EXPECT_CALL(*router_.retry_state_, enabled()).WillOnce(Return(true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false));
EXPECT_CALL(callbacks_, addDecodedData(_, true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, router_.decodeData(*body_data, false));

Http::TestHeaderMapImpl trailers{{"some", "trailer"}};
router_.decodeTrailers(trailers);
Expand Down Expand Up @@ -2086,7 +2089,8 @@ TEST_F(RouterTest, Shadow) {
router_.decodeHeaders(headers, false);

Buffer::InstancePtr body_data(new Buffer::OwnedImpl("hello"));
EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, router_.decodeData(*body_data, false));
EXPECT_CALL(callbacks_, addDecodedData(_, true));
EXPECT_EQ(Http::FilterDataStatus::StopIterationNoBuffer, router_.decodeData(*body_data, false));

Http::TestHeaderMapImpl trailers{{"some", "trailer"}};
EXPECT_CALL(callbacks_, decodingBuffer())
Expand Down
48 changes: 48 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,54 @@ TEST_P(Http2IntegrationTest, IdleTimeoutWithSimultaneousRequests) {
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_idle_timeout", 2);
}

// Test request mirroring / shadowing with an HTTP/2 downstream and a request with a body.
TEST_P(Http2IntegrationTest, RequestMirrorWithBody) {
config_helper_.addConfigModifier(
[&](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm)
-> void {
hcm.mutable_route_config()
->mutable_virtual_hosts(0)
->mutable_routes(0)
->mutable_route()
->mutable_request_mirror_policy()
->set_cluster("cluster_0");
});

initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));

// Send request with body.
IntegrationStreamDecoderPtr request =
codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "POST"},
{":path", "/test/long/url"},
{":scheme", "http"},
{":authority", "host"}},
"hello");

// Wait for the first request as well as the shadow.
waitForNextUpstreamRequest();

FakeHttpConnectionPtr fake_upstream_connection2;
FakeStreamPtr upstream_request2;
ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection2));
ASSERT_TRUE(fake_upstream_connection2->waitForNewStream(*dispatcher_, upstream_request2));
ASSERT_TRUE(upstream_request2->waitForEndStream(*dispatcher_));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boo, one of our utilities should work for this and they totally don't.
Goes on my TODO list :-)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry I thought the same thing and was going to make the utility functions work for this, but I got lazy. I will circle back if you don't get to it first.


// Make sure both requests have a body. Also check the shadow for the shadow headers.
EXPECT_EQ("hello", upstream_request_->body().toString());
EXPECT_EQ("hello", upstream_request2->body().toString());
EXPECT_EQ("host-shadow", upstream_request2->headers().Host()->value().getStringView());

upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, true);
upstream_request2->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, true);
request->waitForEndStream();
EXPECT_EQ("200", request->headers().Status()->value().getStringView());

// Cleanup.
ASSERT_TRUE(fake_upstream_connection2->close());
ASSERT_TRUE(fake_upstream_connection2->waitForDisconnect());
}

// Interleave two requests and responses and make sure the HTTP2 stack handles this correctly.
void Http2IntegrationTest::simultaneousRequest(int32_t request1_bytes, int32_t request2_bytes) {
FakeHttpConnectionPtr fake_upstream_connection1;
Expand Down