diff --git a/include/envoy/http/async_client.h b/include/envoy/http/async_client.h index d3bf280198c71..c80eb38db6d41 100644 --- a/include/envoy/http/async_client.h +++ b/include/envoy/http/async_client.h @@ -75,6 +75,13 @@ class AsyncClient { */ virtual void onTrailers(HeaderMapPtr&& trailers) PURE; + /** + * Called when both the local and remote have gracefully closed the stream. + * Useful for asymmetric cases where end_stream may not be bidirectionally observable. + * Note this is NOT called on stream reset. + */ + virtual void onComplete() PURE; + /** * Called when the async HTTP stream is reset. */ diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 82837ecceb67b..1f17985473786 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -166,6 +166,10 @@ void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::str resetStream(); } +void AsyncStreamImpl::onComplete() { + // No-op since stream completion is handled within other callbacks. +} + void AsyncStreamImpl::onReset() { if (http_reset_) { return; diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 575c5a46301f5..c360adce12232 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -55,6 +55,7 @@ class AsyncStreamImpl : public RawAsyncStream, void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; void onData(Buffer::Instance& data, bool end_stream) override; void onTrailers(Http::HeaderMapPtr&& trailers) override; + void onComplete() override; void onReset() override; // Grpc::AsyncStream diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index a54b9eea5a568..054a2ef2dedeb 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -142,17 +142,48 @@ void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) { } void AsyncStreamImpl::closeLocal(bool end_stream) { + // TODO(goaway): This assert maybe merits reconsideration. It seems to be saying that we shouldn't + // get here when trying to send the final frame of a stream that has already been closed locally, + // but it's fine for us to get here if we're trying to send a non-final frame. There's not an + // obvious reason why the first case would be not okay but the second case okay. ASSERT(!(local_closed_ && end_stream)); + // This guard ensures that we don't attempt to clean up a stream or fire a completion callback + // for a stream that has already been closed. Both send* calls and resets can result in stream + // closure, and this state may be updated synchronously during stream interaction and callbacks. + // Additionally AsyncRequestImpl maintains behavior wherein its onComplete callback will fire + // immediately upon receiving a complete response, regardless of whether it has finished sending + // a request. + // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage + // guards against redundant cleanup), but to surface consistent stream state via callbacks, + // it's necessary to be more rigorous. + // TODO(goaway): Consider deeper cleanup of assumptions here. + if (local_closed_) { + return; + } - local_closed_ |= end_stream; + local_closed_ = end_stream; if (complete()) { + stream_callbacks_.onComplete(); cleanup(); } } void AsyncStreamImpl::closeRemote(bool end_stream) { - remote_closed_ |= end_stream; + // This guard ensures that we don't attempt to clean up a stream or fire a completion callback for + // a stream that has already been closed. This function is called synchronously after callbacks + // have executed, and it's possible for callbacks to, for instance, directly reset a stream or + // close the remote manually. The test case ResetInOnHeaders covers this case specifically. + // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage + // guards against redundant cleanup), but to surface consistent stream state via callbacks, it's + // necessary to be more rigorous. + // TODO(goaway): Consider deeper cleanup of assumptions here. + if (remote_closed_) { + return; + } + + remote_closed_ = end_stream; if (complete()) { + stream_callbacks_.onComplete(); cleanup(); } } @@ -184,36 +215,42 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent void AsyncRequestImpl::initialize() { sendHeaders(request_->headers(), !request_->body()); - if (!remoteClosed() && request_->body()) { - sendData(*request_->body(), true); + // AsyncRequestImpl has historically been implemented to fire onComplete immediately upon + // receiving a complete response, regardless of whether the underlying stream was fully closed (in + // other words, regardless of whether the complete request had been sent). This had the potential + // to leak half-closed streams, which is now covered by manually firing closeLocal below. (See + // test PoolFailureWithBody for an example execution path.) + // TODO(goaway): Consider deeper cleanup of assumptions here. + if (request_->body()) { + // sendHeaders can result in synchronous stream closure in certain cases (e.g. connection pool + // failure). + if (remoteClosed()) { + // In the case that we had a locally-generated response, we manually close the stream locally + // to fire the completion callback. This is a no-op if we had a locally-generated reset + // instead. + closeLocal(true); + } else { + sendData(*request_->body(), true); + } } // TODO(mattklein123): Support request trailers. } void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); } -void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) { +void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool) { response_ = std::make_unique(std::move(headers)); - - if (end_stream) { - onComplete(); - } } -void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) { +void AsyncRequestImpl::onData(Buffer::Instance& data, bool) { if (!response_->body()) { response_->body() = std::make_unique(); } response_->body()->move(data); - - if (end_stream) { - onComplete(); - } } void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) { response_->trailers(std::move(trailers)); - onComplete(); } void AsyncRequestImpl::onReset() { diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index b5374c455f1aa..ac49b97c47cf0 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -88,6 +88,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, protected: bool remoteClosed() { return remote_closed_; } + void closeLocal(bool end_stream); AsyncClientImpl& parent_; @@ -281,7 +282,6 @@ class AsyncStreamImpl : public AsyncClient::Stream, }; void cleanup(); - void closeLocal(bool end_stream); void closeRemote(bool end_stream); bool complete() { return local_closed_ && remote_closed_; } @@ -383,12 +383,12 @@ class AsyncRequestImpl final : public AsyncClient::Request, private: void initialize(); - void onComplete(); // AsyncClient::StreamCallbacks void onHeaders(HeaderMapPtr&& headers, bool end_stream) override; void onData(Buffer::Instance& data, bool end_stream) override; void onTrailers(HeaderMapPtr&& trailers) override; + void onComplete() override; void onReset() override; // Http::StreamDecoderFilterCallbacks diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 9a58c3f4f83d2..b1e30d65d5fd0 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -100,6 +100,7 @@ TEST_F(AsyncClientImplTest, BasicStream) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -243,6 +244,7 @@ TEST_F(AsyncClientImplTest, RetryWithStream) { // Normal response. expectResponseHeaders(stream_callbacks_, 200, true); + EXPECT_CALL(stream_callbacks_, onComplete()); HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "200"}}); response_decoder_->decodeHeaders(std::move(response_headers2), true); } @@ -265,6 +267,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -289,6 +292,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) { EXPECT_CALL(stream_encoder2, encodeData(BufferEqual(body2.get()), true)); expectResponseHeaders(stream_callbacks2, 503, true); + EXPECT_CALL(stream_callbacks2, onComplete()); AsyncClient::Stream* stream2 = client_.start(stream_callbacks2, AsyncClient::StreamOptions()); stream2->sendHeaders(headers2, false); @@ -388,6 +392,7 @@ TEST_F(AsyncClientImplTest, StreamAndRequest) { expectResponseHeaders(stream_callbacks_, 200, false); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true)); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -427,6 +432,7 @@ TEST_F(AsyncClientImplTest, StreamWithTrailers) { EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false)); TestHeaderMapImpl expected_trailers{{"some", "trailer"}}; EXPECT_CALL(stream_callbacks_, onTrailers_(HeaderMapEqualRef(&expected_trailers))); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions()); stream->sendHeaders(headers, false); @@ -720,6 +726,7 @@ TEST_F(AsyncClientImplTest, StreamTimeout) { {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false)); EXPECT_CALL(stream_callbacks_, onData(_, true)); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start( stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40))); @@ -753,6 +760,7 @@ TEST_F(AsyncClientImplTest, StreamTimeoutHeadReply) { TestHeaderMapImpl expected_timeout{ {":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}}; EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), true)); + EXPECT_CALL(stream_callbacks_, onComplete()); AsyncClient::Stream* stream = client_.start( stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40))); @@ -860,6 +868,7 @@ TEST_F(AsyncClientImplTest, MultipleDataStream) { EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body2.get()), true)); EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body2.get()), true)); + EXPECT_CALL(stream_callbacks_, onComplete()); stream->sendData(*body2, true); response_decoder_->decodeData(*body2, true); diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 13f5d6717a031..c4b4dd46d669e 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -347,6 +347,7 @@ class MockAsyncClientStreamCallbacks : public AsyncClient::StreamCallbacks { MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream)); MOCK_METHOD2(onData, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD1(onTrailers_, void(HeaderMap& headers)); + MOCK_METHOD0(onComplete, void()); MOCK_METHOD0(onReset, void()); };