Skip to content
7 changes: 7 additions & 0 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ class AsyncClient {
*/
virtual void onTrailers(HeaderMapPtr&& trailers) PURE;

/**
* Called when both the local and remote have gracefully closed the stream.
* Useful for asymmetric cases where end_stream may not be bidirectionally observable.
* Note this is NOT called on stream reset.
*/
virtual void onComplete() PURE;

/**
* Called when the async HTTP stream is reset.
*/
Expand Down
4 changes: 4 additions & 0 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::str
resetStream();
}

void AsyncStreamImpl::onComplete() {
// No-op since stream completion is handled within other callbacks.
}

void AsyncStreamImpl::onReset() {
if (http_reset_) {
return;
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class AsyncStreamImpl : public RawAsyncStream,
void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override;
void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(Http::HeaderMapPtr&& trailers) override;
void onComplete() override;
void onReset() override;

// Grpc::AsyncStream
Expand Down
67 changes: 52 additions & 15 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,48 @@ void AsyncStreamImpl::sendTrailers(HeaderMap& trailers) {
}

void AsyncStreamImpl::closeLocal(bool end_stream) {
// TODO(goaway): This assert maybe merits reconsideration. It seems to be saying that we shouldn't
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, I don't remember the history here. We should look at cleaning this up. cc @htuch @lizan in case they remember any of this history.

// get here when trying to send the final frame of a stream that has already been closed locally,
// but it's fine for us to get here if we're trying to send a non-final frame. There's not an
// obvious reason why the first case would be not okay but the second case okay.
ASSERT(!(local_closed_ && end_stream));
// This guard ensures that we don't attempt to clean up a stream or fire a completion callback
// for a stream that has already been closed. Both send* calls and resets can result in stream
// closure, and this state may be updated synchronously during stream interaction and callbacks.
// Additionally AsyncRequestImpl maintains behavior wherein its onComplete callback will fire
// immediately upon receiving a complete response, regardless of whether it has finished sending
// a request.
// Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
// guards against redundant cleanup), but to surface consistent stream state via callbacks,
// it's necessary to be more rigorous.
// TODO(goaway): Consider deeper cleanup of assumptions here.
if (local_closed_) {
return;
}

local_closed_ |= end_stream;
local_closed_ = end_stream;
if (complete()) {
stream_callbacks_.onComplete();
cleanup();
}
}

void AsyncStreamImpl::closeRemote(bool end_stream) {
remote_closed_ |= end_stream;
// This guard ensures that we don't attempt to clean up a stream or fire a completion callback for
// a stream that has already been closed. This function is called synchronously after callbacks
// have executed, and it's possible for callbacks to, for instance, directly reset a stream or
// close the remote manually. The test case ResetInOnHeaders covers this case specifically.
// Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
// guards against redundant cleanup), but to surface consistent stream state via callbacks, it's
// necessary to be more rigorous.
// TODO(goaway): Consider deeper cleanup of assumptions here.
if (remote_closed_) {
return;
}

remote_closed_ = end_stream;
if (complete()) {
stream_callbacks_.onComplete();
cleanup();
}
}
Expand Down Expand Up @@ -184,36 +215,42 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent

void AsyncRequestImpl::initialize() {
sendHeaders(request_->headers(), !request_->body());
if (!remoteClosed() && request_->body()) {
sendData(*request_->body(), true);
// AsyncRequestImpl has historically been implemented to fire onComplete immediately upon
// receiving a complete response, regardless of whether the underlying stream was fully closed (in
// other words, regardless of whether the complete request had been sent). This had the potential
// to leak half-closed streams, which is now covered by manually firing closeLocal below. (See
// test PoolFailureWithBody for an example execution path.)
// TODO(goaway): Consider deeper cleanup of assumptions here.
if (request_->body()) {
// sendHeaders can result in synchronous stream closure in certain cases (e.g. connection pool
// failure).
if (remoteClosed()) {
// In the case that we had a locally-generated response, we manually close the stream locally
// to fire the completion callback. This is a no-op if we had a locally-generated reset
// instead.
closeLocal(true);
} else {
sendData(*request_->body(), true);
}
}
// TODO(mattklein123): Support request trailers.
}

void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); }

void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool) {
response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) {
void AsyncRequestImpl::onData(Buffer::Instance& data, bool) {
if (!response_->body()) {
response_->body() = std::make_unique<Buffer::OwnedImpl>();
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
onComplete();
}

void AsyncRequestImpl::onReset() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,

protected:
bool remoteClosed() { return remote_closed_; }
void closeLocal(bool end_stream);

AsyncClientImpl& parent_;

Expand Down Expand Up @@ -281,7 +282,6 @@ class AsyncStreamImpl : public AsyncClient::Stream,
};

void cleanup();
void closeLocal(bool end_stream);
void closeRemote(bool end_stream);
bool complete() { return local_closed_ && remote_closed_; }

Expand Down Expand Up @@ -383,12 +383,12 @@ class AsyncRequestImpl final : public AsyncClient::Request,

private:
void initialize();
void onComplete();

// AsyncClient::StreamCallbacks
void onHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(HeaderMapPtr&& trailers) override;
void onComplete() override;
void onReset() override;

// Http::StreamDecoderFilterCallbacks
Expand Down
9 changes: 9 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ TEST_F(AsyncClientImplTest, BasicStream) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -243,6 +244,7 @@ TEST_F(AsyncClientImplTest, RetryWithStream) {

// Normal response.
expectResponseHeaders(stream_callbacks_, 200, true);
EXPECT_CALL(stream_callbacks_, onComplete());
HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers2), true);
}
Expand All @@ -265,6 +267,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand All @@ -289,6 +292,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) {
EXPECT_CALL(stream_encoder2, encodeData(BufferEqual(body2.get()), true));

expectResponseHeaders(stream_callbacks2, 503, true);
EXPECT_CALL(stream_callbacks2, onComplete());

AsyncClient::Stream* stream2 = client_.start(stream_callbacks2, AsyncClient::StreamOptions());
stream2->sendHeaders(headers2, false);
Expand Down Expand Up @@ -388,6 +392,7 @@ TEST_F(AsyncClientImplTest, StreamAndRequest) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -427,6 +432,7 @@ TEST_F(AsyncClientImplTest, StreamWithTrailers) {
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false));
TestHeaderMapImpl expected_trailers{{"some", "trailer"}};
EXPECT_CALL(stream_callbacks_, onTrailers_(HeaderMapEqualRef(&expected_trailers)));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -720,6 +726,7 @@ TEST_F(AsyncClientImplTest, StreamTimeout) {
{":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}};
EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false));
EXPECT_CALL(stream_callbacks_, onData(_, true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(
stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40)));
Expand Down Expand Up @@ -753,6 +760,7 @@ TEST_F(AsyncClientImplTest, StreamTimeoutHeadReply) {
TestHeaderMapImpl expected_timeout{
{":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}};
EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(
stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40)));
Expand Down Expand Up @@ -860,6 +868,7 @@ TEST_F(AsyncClientImplTest, MultipleDataStream) {

EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body2.get()), true));
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body2.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

stream->sendData(*body2, true);
response_decoder_->decodeData(*body2, true);
Expand Down
1 change: 1 addition & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ class MockAsyncClientStreamCallbacks : public AsyncClient::StreamCallbacks {
MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream));
MOCK_METHOD2(onData, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD1(onTrailers_, void(HeaderMap& headers));
MOCK_METHOD0(onComplete, void());
MOCK_METHOD0(onReset, void());
};

Expand Down