Skip to content
7 changes: 7 additions & 0 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ class AsyncClient {
*/
virtual void onTrailers(HeaderMapPtr&& trailers) PURE;

/**
* Called when both the local and remote have gracefully closed the stream.
* Useful for asymmetric cases where end_stream may not be bidirectionally observable.
* Note this is NOT called on stream reset.
Comment thread
mattklein123 marked this conversation as resolved.
*/
virtual void onClosure() PURE;
Comment thread
goaway marked this conversation as resolved.
Outdated

/**
* Called when the async HTTP stream is reset.
*/
Expand Down
4 changes: 4 additions & 0 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::str
resetStream();
}

void AsyncStreamImpl::onClosure() {
// No-op since stream closure is handled within other callbacks.
Comment thread
goaway marked this conversation as resolved.
Outdated
}

void AsyncStreamImpl::onReset() {
if (http_reset_) {
return;
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class AsyncStreamImpl : public RawAsyncStream,
void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override;
void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(Http::HeaderMapPtr&& trailers) override;
void onClosure() override;
void onReset() override;

// Grpc::AsyncStream
Expand Down
17 changes: 7 additions & 10 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ void AsyncStreamImpl::closeLocal(bool end_stream) {

local_closed_ |= end_stream;
if (complete()) {
stream_callbacks_.onClosure();
cleanup();
}
}

void AsyncStreamImpl::closeRemote(bool end_stream) {
remote_closed_ |= end_stream;
if (complete()) {
stream_callbacks_.onClosure();
cleanup();
}
}
Expand Down Expand Up @@ -192,27 +194,22 @@ void AsyncRequestImpl::initialize() {

void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); }

void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool) {
response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) {
void AsyncRequestImpl::onData(Buffer::Instance& data, bool) {
if (!response_->body()) {
response_->body() = std::make_unique<Buffer::OwnedImpl>();
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
}

void AsyncRequestImpl::onClosure() {
onComplete();
}

Expand Down
1 change: 1 addition & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ class AsyncRequestImpl final : public AsyncClient::Request,
void onHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(HeaderMapPtr&& trailers) override;
void onClosure() override;
void onReset() override;

// Http::StreamDecoderFilterCallbacks
Expand Down
6 changes: 6 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ TEST_F(AsyncClientImplTest, BasicStream) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onClosure());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -265,6 +266,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onClosure());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -388,6 +390,7 @@ TEST_F(AsyncClientImplTest, StreamAndRequest) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onClosure());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -427,6 +430,7 @@ TEST_F(AsyncClientImplTest, StreamWithTrailers) {
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false));
TestHeaderMapImpl expected_trailers{{"some", "trailer"}};
EXPECT_CALL(stream_callbacks_, onTrailers_(HeaderMapEqualRef(&expected_trailers)));
EXPECT_CALL(stream_callbacks_, onClosure());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -720,6 +724,7 @@ TEST_F(AsyncClientImplTest, StreamTimeout) {
{":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}};
EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false));
EXPECT_CALL(stream_callbacks_, onData(_, true));
EXPECT_CALL(stream_callbacks_, onClosure());

AsyncClient::Stream* stream = client_.start(
stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40)));
Expand Down Expand Up @@ -860,6 +865,7 @@ TEST_F(AsyncClientImplTest, MultipleDataStream) {

EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body2.get()), true));
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body2.get()), true));
EXPECT_CALL(stream_callbacks_, onClosure());

stream->sendData(*body2, true);
response_decoder_->decodeData(*body2, true);
Expand Down
1 change: 1 addition & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ class MockAsyncClientStreamCallbacks : public AsyncClient::StreamCallbacks {
MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream));
MOCK_METHOD2(onData, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD1(onTrailers_, void(HeaderMap& headers));
MOCK_METHOD0(onClosure, void());
MOCK_METHOD0(onReset, void());
};

Expand Down