From 42326d23f08d212c5e0b136e4ef0a88b58752c19 Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 14:14:24 -0700 Subject: [PATCH 01/16] http: add onClosure to AsyncClient::StreamCallbacks Signed-off-by: Mike Schore --- include/envoy/http/async_client.h | 7 +++++++ source/common/http/async_client_impl.cc | 2 ++ test/mocks/http/mocks.h | 1 + 3 files changed, 10 insertions(+) diff --git a/include/envoy/http/async_client.h b/include/envoy/http/async_client.h index d3bf280198c71..6e7f6784a4f88 100644 --- a/include/envoy/http/async_client.h +++ b/include/envoy/http/async_client.h @@ -75,6 +75,13 @@ class AsyncClient { */ virtual void onTrailers(HeaderMapPtr&& trailers) PURE; + /** + * Called when both the local and remote have gracefully closed the stream. + * Useful for asymmetric cases where end_stream may not be bidirectionally observable. + * Note this is NOT called on stream reset. + */ + virtual void onClosure() PURE; + /** * Called when the async HTTP stream is reset. */ diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index a54b9eea5a568..c5d5e5d66d228 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -146,6 +146,7 @@ void AsyncStreamImpl::closeLocal(bool end_stream) { local_closed_ |= end_stream; if (complete()) { + stream_callbacks_.onClosure(); cleanup(); } } @@ -153,6 +154,7 @@ void AsyncStreamImpl::closeLocal(bool end_stream) { void AsyncStreamImpl::closeRemote(bool end_stream) { remote_closed_ |= end_stream; if (complete()) { + stream_callbacks_.onClosure(); cleanup(); } } diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 13f5d6717a031..8ccfe0a1a6043 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -347,6 +347,7 @@ class MockAsyncClientStreamCallbacks : public AsyncClient::StreamCallbacks { MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream)); MOCK_METHOD2(onData, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD1(onTrailers_, void(HeaderMap& headers)); + MOCK_METHOD0(onClosure, void()); MOCK_METHOD0(onReset, void()); }; From d92830d87e223278b3d7232fe3389c822fc5290d Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 14:26:25 -0700 Subject: [PATCH 02/16] update tests Signed-off-by: Mike Schore --- test/common/http/async_client_impl_test.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 9a58c3f4f83d2..276dd83b86c17 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -100,6 +100,7 @@ TEST_F(AsyncClientImplTest, BasicStream) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + EXPECT_CALL(stream_callbacks_, onClosure()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -265,6 +266,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + EXPECT_CALL(stream_callbacks_, onClosure()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -388,6 +390,7 @@ TEST_F(AsyncClientImplTest, StreamAndRequest) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + EXPECT_CALL(stream_callbacks_, onClosure()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -427,6 +430,7 @@ TEST_F(AsyncClientImplTest, StreamWithTrailers) { EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false)); TestHeaderMapImpl expected_trailers{{"some", "trailer"}}; EXPECT_CALL(stream_callbacks_, onTrailers_(HeaderMapEqualRef(&expected_trailers))); + EXPECT_CALL(stream_callbacks_, onClosure_()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -720,6 +724,7 @@ TEST_F(AsyncClientImplTest, StreamTimeout) { {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false)); EXPECT_CALL(stream_callbacks_, onData(_, true)); + EXPECT_CALL(stream_callbacks_, onClosure()); AsyncClient::Stream* stream = client_.start( stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40))); @@ -860,6 +865,7 @@ TEST_F(AsyncClientImplTest, MultipleDataStream) { EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body2.get()), true)); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body2.get()), true)); + EXPECT_CALL(stream_callbacks_, onClosure()); stream->sendData(*body2, true); response_decoder_->decodeData(*body2, true); From c5d44bdc8b51d66015ac6a40cc07a8f08bbef6c8 Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 14:41:01 -0700 Subject: [PATCH 03/16] fix typo Signed-off-by: Mike Schore --- test/common/http/async_client_impl_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 276dd83b86c17..d5761cedb72a4 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -430,7 +430,7 @@ TEST_F(AsyncClientImplTest, StreamWithTrailers) { EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false)); TestHeaderMapImpl expected_trailers{{"some", "trailer"}}; EXPECT_CALL(stream_callbacks_, onTrailers_(HeaderMapEqualRef(&expected_trailers))); - EXPECT_CALL(stream_callbacks_, onClosure_()); + EXPECT_CALL(stream_callbacks_, onClosure()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); From 69013efd62d5e25086b4ed5841df8086397daa4e Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 14:48:56 -0700 Subject: [PATCH 04/16] update AsyncRequest Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 11 +++-------- source/common/http/async_client_impl.h | 1 + 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index c5d5e5d66d228..2013e7ec8d79d 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -196,10 +196,6 @@ void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)) void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) { response_ = std::make_unique(std::move(headers)); - - if (end_stream) { - onComplete(); - } } void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { @@ -207,14 +203,13 @@ void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { response_->body() = std::make_unique(); } response_->body()->move(data); - - if (end_stream) { - onComplete(); - } } void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) { response_->trailers(std::move(trailers)); +} + +void AsyncRequestImpl::onClosure() { onComplete(); } diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index b5374c455f1aa..cadf57b726016 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -389,6 +389,7 @@ class AsyncRequestImpl final : public AsyncClient::Request, void onHeaders(HeaderMapPtr&& headers, bool end_stream) override; void onData(Buffer::Instance& data, bool end_stream) override; void onTrailers(HeaderMapPtr&& trailers) override; + void onClosure() override; void onReset() override; // Http::StreamDecoderFilterCallbacks From 7d8a7b646cc08b2894bcb40b929a1dd54b161751 Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 15:03:37 -0700 Subject: [PATCH 05/16] update Grpc::AsyncClient Signed-off-by: Mike Schore --- source/common/grpc/async_client_impl.cc | 4 ++++ source/common/grpc/async_client_impl.h | 1 + 2 files changed, 5 insertions(+) diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 82837ecceb67b..48b1621716675 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -166,6 +166,10 @@ void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::str resetStream(); } +void AsyncStreamImpl::onClosure() { + // No-op since stream closure is handled within other callbacks. +} + void AsyncStreamImpl::onReset() { if (http_reset_) { return; diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 575c5a46301f5..398972435185b 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -55,6 +55,7 @@ class AsyncStreamImpl : public RawAsyncStream, void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; void onData(Buffer::Instance& data, bool end_stream) override; void onTrailers(Http::HeaderMapPtr&& trailers) override; + void onClosure() override; void onReset() override; // Grpc::AsyncStream From 2c82fa134511065257dcafc48b93d168389e8564 Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 15:13:26 -0700 Subject: [PATCH 06/16] fix unused parameter Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 2013e7ec8d79d..08767b9b6d4fc 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -194,11 +194,11 @@ void AsyncRequestImpl::initialize() { void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); } -void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) { +void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool) { response_ = std::make_unique(std::move(headers)); } -void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { +void AsyncRequestImpl::onData(Buffer::Instance& data, bool) { if (!response_->body()) { response_->body() = std::make_unique(); } From 39132b95e8f5baf972d34cdd13db407e6fbb9ba3 Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 15:32:34 -0700 Subject: [PATCH 07/16] fix format Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 08767b9b6d4fc..9f875088659ba 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -209,9 +209,7 @@ void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) { response_->trailers(std::move(trailers)); } -void AsyncRequestImpl::onClosure() { - onComplete(); -} +void AsyncRequestImpl::onClosure() { onComplete(); } void AsyncRequestImpl::onReset() { if (!cancelled_) { From 7a632bcff48f3a736f71e477d1795344e0317aec Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 16:55:34 -0700 Subject: [PATCH 08/16] fix a couple tests Signed-off-by: Mike Schore --- test/common/http/async_client_impl_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index d5761cedb72a4..b3ed6e5da9c0f 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -244,6 +244,7 @@ TEST_F(AsyncClientImplTest, RetryWithStream) { // Normal response. expectResponseHeaders(stream_callbacks_, 200, true); + EXPECT_CALL(stream_callbacks_, onClosure()); HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "200"}}); response_decoder_->decodeHeaders(std::move(response_headers2), true); } @@ -291,6 +292,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) { EXPECT_CALL(stream_encoder2, encodeData(BufferEqual(body2.get()), true)); expectResponseHeaders(stream_callbacks2, 503, true); + EXPECT_CALL(stream_callbacks_, onClosure()); AsyncClient::Stream* stream2 = client_.start(stream_callbacks2, AsyncClient::StreamOptions()); stream2->sendHeaders(headers2, false); From e6efa99c0da25f6a915796fad67dba1ad1a24f8a Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 17:08:45 -0700 Subject: [PATCH 09/16] revert AsyncRequestImpl change Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 9f875088659ba..80ffd1b11f882 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -194,22 +194,36 @@ void AsyncRequestImpl::initialize() { void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); } -void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool) { +void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) { response_ = std::make_unique(std::move(headers)); + + if (end_stream) { + onComplete(); + } } -void AsyncRequestImpl::onData(Buffer::Instance& data, bool) { +void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { if (!response_->body()) { response_->body() = std::make_unique(); } response_->body()->move(data); + + if (end_stream) { + onComplete(); + } } void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) { response_->trailers(std::move(trailers)); + onComplete(); } -void AsyncRequestImpl::onClosure() { onComplete(); } +void AsyncRequestImpl::onClosure() { + // TODO(goaway): This seems like an appealing place to call onComplete(), but current logic and + // usage requires AsyncRequestImpl to fire onComplete() in certain cases (e.g. pool failure) when + // the local side actually hasn't yet been closed. These cases should arguably signal with stream + // reset instead, enabling cleanup. +} void AsyncRequestImpl::onReset() { if (!cancelled_) { From 5962e37613f736e3ee585f181ccd3aac4c9b7fa8 Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Mon, 29 Jul 2019 17:16:17 -0700 Subject: [PATCH 10/16] fix another test Signed-off-by: Mike Schore --- test/common/http/async_client_impl_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index b3ed6e5da9c0f..1118035be6799 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -292,7 +292,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) { EXPECT_CALL(stream_encoder2, encodeData(BufferEqual(body2.get()), true)); expectResponseHeaders(stream_callbacks2, 503, true); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks2, onClosure()); AsyncClient::Stream* stream2 = client_.start(stream_callbacks2, AsyncClient::StreamOptions()); stream2->sendHeaders(headers2, false); @@ -760,6 +760,7 @@ TEST_F(AsyncClientImplTest, StreamTimeoutHeadReply) { TestHeaderMapImpl expected_timeout{ {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), true)); + EXPECT_CALL(stream_callbacks_, onClosure()); AsyncClient::Stream* stream = client_.start( stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40))); From 4db615375ce35b089b51fe149321b06d5976a083 Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Tue, 30 Jul 2019 13:45:23 -0700 Subject: [PATCH 11/16] first modification Signed-off-by: Jose Nino --- test/common/http/async_client_impl_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 1118035be6799..be5ae2c88f290 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -550,6 +550,7 @@ TEST_F(AsyncClientImplTest, ResetInOnHeaders) { .WillOnce(Invoke([&stream](HeaderMap&, bool) { stream->reset(); })); EXPECT_CALL(stream_callbacks_, onData(_, _)).Times(0); EXPECT_CALL(stream_callbacks_, onReset()); + EXPECT_CALL(stream_callbacks_, onClosure()); stream->sendHeaders(headers, false); stream->sendData(*body, false); From 8187ae4d8335d113455b96418bf2f024a623872f Mon Sep 17 00:00:00 2001 From: Jose Nino Date: Tue, 30 Jul 2019 15:05:30 -0700 Subject: [PATCH 12/16] update Signed-off-by: Jose Nino --- include/envoy/http/async_client.h | 2 +- source/common/grpc/async_client_impl.cc | 4 ++-- source/common/grpc/async_client_impl.h | 2 +- source/common/http/async_client_impl.cc | 26 +++++----------------- source/common/http/async_client_impl.h | 5 ++--- test/common/http/async_client_impl_test.cc | 20 ++++++++--------- test/mocks/http/mocks.h | 2 +- 7 files changed, 23 insertions(+), 38 deletions(-) diff --git a/include/envoy/http/async_client.h b/include/envoy/http/async_client.h index 6e7f6784a4f88..c80eb38db6d41 100644 --- a/include/envoy/http/async_client.h +++ b/include/envoy/http/async_client.h @@ -80,7 +80,7 @@ class AsyncClient { * Useful for asymmetric cases where end_stream may not be bidirectionally observable. * Note this is NOT called on stream reset. */ - virtual void onClosure() PURE; + virtual void onComplete() PURE; /** * Called when the async HTTP stream is reset. diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 48b1621716675..1f17985473786 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -166,8 +166,8 @@ void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::str resetStream(); } -void AsyncStreamImpl::onClosure() { - // No-op since stream closure is handled within other callbacks. +void AsyncStreamImpl::onComplete() { + // No-op since stream completion is handled within other callbacks. } void AsyncStreamImpl::onReset() { diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 398972435185b..c360adce12232 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -55,7 +55,7 @@ class AsyncStreamImpl : public RawAsyncStream, void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; void onData(Buffer::Instance& data, bool end_stream) override; void onTrailers(Http::HeaderMapPtr&& trailers) override; - void onClosure() override; + void onComplete() override; void onReset() override; // Grpc::AsyncStream diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 80ffd1b11f882..d8ce4559c3687 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -146,7 +146,7 @@ void AsyncStreamImpl::closeLocal(bool end_stream) { local_closed_ |= end_stream; if (complete()) { - stream_callbacks_.onClosure(); + stream_callbacks_.onComplete(); cleanup(); } } @@ -154,7 +154,7 @@ void AsyncStreamImpl::closeLocal(bool end_stream) { void AsyncStreamImpl::closeRemote(bool end_stream) { remote_closed_ |= end_stream; if (complete()) { - stream_callbacks_.onClosure(); + stream_callbacks_.onComplete(); cleanup(); } } @@ -188,41 +188,27 @@ void AsyncRequestImpl::initialize() { sendHeaders(request_->headers(), !request_->body()); if (!remoteClosed() && request_->body()) { sendData(*request_->body(), true); + } else if (remoteClosed() && request_->body()) { + closeLocal(true); } // TODO(mattklein123): Support request trailers. } void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); } -void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) { +void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool) { response_ = std::make_unique(std::move(headers)); - - if (end_stream) { - onComplete(); - } } -void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { +void AsyncRequestImpl::onData(Buffer::Instance& data, bool) { if (!response_->body()) { response_->body() = std::make_unique(); } response_->body()->move(data); - - if (end_stream) { - onComplete(); - } } void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) { response_->trailers(std::move(trailers)); - onComplete(); -} - -void AsyncRequestImpl::onClosure() { - // TODO(goaway): This seems like an appealing place to call onComplete(), but current logic and - // usage requires AsyncRequestImpl to fire onComplete() in certain cases (e.g. pool failure) when - // the local side actually hasn't yet been closed. These cases should arguably signal with stream - // reset instead, enabling cleanup. } void AsyncRequestImpl::onReset() { diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index cadf57b726016..ac49b97c47cf0 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -88,6 +88,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, protected: bool remoteClosed() { return remote_closed_; } + void closeLocal(bool end_stream); AsyncClientImpl& parent_; @@ -281,7 +282,6 @@ class AsyncStreamImpl : public AsyncClient::Stream, }; void cleanup(); - void closeLocal(bool end_stream); void closeRemote(bool end_stream); bool complete() { return local_closed_ && remote_closed_; } @@ -383,13 +383,12 @@ class AsyncRequestImpl final : public AsyncClient::Request, private: void initialize(); - void onComplete(); // AsyncClient::StreamCallbacks void onHeaders(HeaderMapPtr&& headers, bool end_stream) override; void onData(Buffer::Instance& data, bool end_stream) override; void onTrailers(HeaderMapPtr&& trailers) override; - void onClosure() override; + void onComplete() override; void onReset() override; // Http::StreamDecoderFilterCallbacks diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index be5ae2c88f290..bb912febe29c6 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -100,7 +100,7 @@ TEST_F(AsyncClientImplTest, BasicStream) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -244,7 +244,7 @@ TEST_F(AsyncClientImplTest, RetryWithStream) { // Normal response. expectResponseHeaders(stream_callbacks_, 200, true); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "200"}}); response_decoder_->decodeHeaders(std::move(response_headers2), true); } @@ -267,7 +267,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -292,7 +292,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) { EXPECT_CALL(stream_encoder2, encodeData(BufferEqual(body2.get()), true)); expectResponseHeaders(stream_callbacks2, 503, true); - EXPECT_CALL(stream_callbacks2, onClosure()); + EXPECT_CALL(stream_callbacks2, onComplete()); AsyncClient::Stream* stream2 = client_.start(stream_callbacks2, AsyncClient::StreamOptions()); stream2->sendHeaders(headers2, false); @@ -392,7 +392,7 @@ TEST_F(AsyncClientImplTest, StreamAndRequest) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -432,7 +432,7 @@ TEST_F(AsyncClientImplTest, StreamWithTrailers) { EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false)); TestHeaderMapImpl expected_trailers{{"some", "trailer"}}; EXPECT_CALL(stream_callbacks_, onTrailers_(HeaderMapEqualRef(&expected_trailers))); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -550,7 +550,7 @@ TEST_F(AsyncClientImplTest, ResetInOnHeaders) { .WillOnce(Invoke([&stream](HeaderMap&, bool) { stream->reset(); })); EXPECT_CALL(stream_callbacks_, onData(_, _)).Times(0); EXPECT_CALL(stream_callbacks_, onReset()); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); stream->sendHeaders(headers, false); stream->sendData(*body, false); @@ -727,7 +727,7 @@ TEST_F(AsyncClientImplTest, StreamTimeout) { {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false)); EXPECT_CALL(stream_callbacks_, onData(_, true)); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start( stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40))); @@ -761,7 +761,7 @@ TEST_F(AsyncClientImplTest, StreamTimeoutHeadReply) { TestHeaderMapImpl expected_timeout{ {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), true)); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start( stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40))); @@ -869,7 +869,7 @@ TEST_F(AsyncClientImplTest, MultipleDataStream) { EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body2.get()), true)); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body2.get()), true)); - EXPECT_CALL(stream_callbacks_, onClosure()); + EXPECT_CALL(stream_callbacks_, onComplete()); stream->sendData(*body2, true); response_decoder_->decodeData(*body2, true); diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 8ccfe0a1a6043..c4b4dd46d669e 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -347,7 +347,7 @@ class MockAsyncClientStreamCallbacks : public AsyncClient::StreamCallbacks { MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream)); MOCK_METHOD2(onData, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD1(onTrailers_, void(HeaderMap& headers)); - MOCK_METHOD0(onClosure, void()); + MOCK_METHOD0(onComplete, void()); MOCK_METHOD0(onReset, void()); }; From 4181ae1604cbbe5665e311fa17160e2ac00b492d Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Wed, 31 Jul 2019 12:43:45 -0700 Subject: [PATCH 13/16] guard closeLocal and closeRemote Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 13 +++++++++++-- test/common/http/async_client_impl_test.cc | 1 - 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index d8ce4559c3687..0f2ff8237303e 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -143,8 +143,12 @@ void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { void AsyncStreamImpl::closeLocal(bool end_stream) { ASSERT(!(local_closed_ && end_stream)); + // Due to the fact that send calls can synchronously result in stream closure, it's possible for this to be called after the local side is already closed. + if (local_closed_) { + return; + } - local_closed_ |= end_stream; + local_closed_ = end_stream; if (complete()) { stream_callbacks_.onComplete(); cleanup(); @@ -152,7 +156,12 @@ void AsyncStreamImpl::closeLocal(bool end_stream) { } void AsyncStreamImpl::closeRemote(bool end_stream) { - remote_closed_ |= end_stream; + // Due to the fact that callbacks can synchronously result in stream closure, it's possible for this to get called after the remote is already closed (e.g. if the the stream is reset in the callback itself). + if (remote_closed_) { + return; + } + + remote_closed_ = end_stream; if (complete()) { stream_callbacks_.onComplete(); cleanup(); diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index bb912febe29c6..b1e30d65d5fd0 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -550,7 +550,6 @@ TEST_F(AsyncClientImplTest, ResetInOnHeaders) { .WillOnce(Invoke([&stream](HeaderMap&, bool) { stream->reset(); })); EXPECT_CALL(stream_callbacks_, onData(_, _)).Times(0); EXPECT_CALL(stream_callbacks_, onReset()); - EXPECT_CALL(stream_callbacks_, onComplete()); stream->sendHeaders(headers, false); stream->sendData(*body, false); From 40031d5a909ec513fd6f74d8d7c94a3bdeb79ad8 Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Wed, 31 Jul 2019 12:49:08 -0700 Subject: [PATCH 14/16] cleanup Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 0f2ff8237303e..b7dd6a44c7a2d 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -143,7 +143,8 @@ void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { void AsyncStreamImpl::closeLocal(bool end_stream) { ASSERT(!(local_closed_ && end_stream)); - // Due to the fact that send calls can synchronously result in stream closure, it's possible for this to be called after the local side is already closed. + // Due to the fact that send calls can synchronously result in stream closure, it's possible for + // this to be called after the local side is already closed. if (local_closed_) { return; } @@ -156,7 +157,9 @@ void AsyncStreamImpl::closeLocal(bool end_stream) { } void AsyncStreamImpl::closeRemote(bool end_stream) { - // Due to the fact that callbacks can synchronously result in stream closure, it's possible for this to get called after the remote is already closed (e.g. if the the stream is reset in the callback itself). + // Due to the fact that callbacks can synchronously result in stream closure, it's possible for + // this to get called after the remote is already closed (e.g. if the the stream is reset in the + // callback itself). if (remote_closed_) { return; } @@ -195,10 +198,14 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent void AsyncRequestImpl::initialize() { sendHeaders(request_->headers(), !request_->body()); - if (!remoteClosed() && request_->body()) { - sendData(*request_->body(), true); - } else if (remoteClosed() && request_->body()) { - closeLocal(true); + if (request_->body()) { + // sendHeaders can result in synchronous stream closure in certain cases (e.g. connection pool + // failure). + if (remoteClosed()) { + closeLocal(true); + } else { + sendData(*request_->body(), true); + } } // TODO(mattklein123): Support request trailers. } From ef543d8fcea4d1c0ce85e3ff8d16fb3928436ed6 Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Thu, 1 Aug 2019 12:10:41 -0700 Subject: [PATCH 15/16] additional clarification comment Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index b7dd6a44c7a2d..2af396a7be6b2 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -202,6 +202,9 @@ void AsyncRequestImpl::initialize() { // sendHeaders can result in synchronous stream closure in certain cases (e.g. connection pool // failure). if (remoteClosed()) { + // In the case that we had a locally-generated response, we manually close the stream locally + // to fire the completion callback. This is a no-op if we had a locally-generated reset + // instead. closeLocal(true); } else { sendData(*request_->body(), true); From 16af15b57a135b7ad4789774a51d2643f18bd1bd Mon Sep 17 00:00:00 2001 From: Mike Schore Date: Tue, 6 Aug 2019 10:43:13 -0700 Subject: [PATCH 16/16] add detail to comments and TODOs Signed-off-by: Mike Schore --- source/common/http/async_client_impl.cc | 33 +++++++++++++++++++++---- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 2af396a7be6b2..054a2ef2dedeb 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -142,9 +142,21 @@ void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { } void AsyncStreamImpl::closeLocal(bool end_stream) { + // TODO(goaway): This assert maybe merits reconsideration. It seems to be saying that we shouldn't + // get here when trying to send the final frame of a stream that has already been closed locally, + // but it's fine for us to get here if we're trying to send a non-final frame. There's not an + // obvious reason why the first case would be not okay but the second case okay. ASSERT(!(local_closed_ && end_stream)); - // Due to the fact that send calls can synchronously result in stream closure, it's possible for - // this to be called after the local side is already closed. + // This guard ensures that we don't attempt to clean up a stream or fire a completion callback + // for a stream that has already been closed. Both send* calls and resets can result in stream + // closure, and this state may be updated synchronously during stream interaction and callbacks. + // Additionally AsyncRequestImpl maintains behavior wherein its onComplete callback will fire + // immediately upon receiving a complete response, regardless of whether it has finished sending + // a request. + // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage + // guards against redundant cleanup), but to surface consistent stream state via callbacks, + // it's necessary to be more rigorous. + // TODO(goaway): Consider deeper cleanup of assumptions here. if (local_closed_) { return; } @@ -157,9 +169,14 @@ void AsyncStreamImpl::closeLocal(bool end_stream) { } void AsyncStreamImpl::closeRemote(bool end_stream) { - // Due to the fact that callbacks can synchronously result in stream closure, it's possible for - // this to get called after the remote is already closed (e.g. if the the stream is reset in the - // callback itself). + // This guard ensures that we don't attempt to clean up a stream or fire a completion callback for + // a stream that has already been closed. This function is called synchronously after callbacks + // have executed, and it's possible for callbacks to, for instance, directly reset a stream or + // close the remote manually. The test case ResetInOnHeaders covers this case specifically. + // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage + // guards against redundant cleanup), but to surface consistent stream state via callbacks, it's + // necessary to be more rigorous. + // TODO(goaway): Consider deeper cleanup of assumptions here. if (remote_closed_) { return; } @@ -198,6 +215,12 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent void AsyncRequestImpl::initialize() { sendHeaders(request_->headers(), !request_->body()); + // AsyncRequestImpl has historically been implemented to fire onComplete immediately upon + // receiving a complete response, regardless of whether the underlying stream was fully closed (in + // other words, regardless of whether the complete request had been sent). This had the potential + // to leak half-closed streams, which is now covered by manually firing closeLocal below. (See + // test PoolFailureWithBody for an example execution path.) + // TODO(goaway): Consider deeper cleanup of assumptions here. if (request_->body()) { // sendHeaders can result in synchronous stream closure in certain cases (e.g. connection pool // failure).