Skip to content
7 changes: 7 additions & 0 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ class AsyncClient {
*/
virtual void onTrailers(HeaderMapPtr&& trailers) PURE;

/**
* Called when both the local and remote have gracefully closed the stream.
* Useful for asymmetric cases where end_stream may not be bidirectionally observable.
* Note this is NOT called on stream reset.
Comment thread
mattklein123 marked this conversation as resolved.
*/
virtual void onComplete() PURE;

/**
* Called when the async HTTP stream is reset.
*/
Expand Down
4 changes: 4 additions & 0 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::str
resetStream();
}

void AsyncStreamImpl::onComplete() {
// No-op since stream completion is handled within other callbacks.
}

void AsyncStreamImpl::onReset() {
if (http_reset_) {
return;
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class AsyncStreamImpl : public RawAsyncStream,
void onHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override;
void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(Http::HeaderMapPtr&& trailers) override;
void onComplete() override;
void onReset() override;

// Grpc::AsyncStream
Expand Down
17 changes: 6 additions & 11 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ void AsyncStreamImpl::closeLocal(bool end_stream) {

local_closed_ |= end_stream;
if (complete()) {
stream_callbacks_.onComplete();
cleanup();
}
}

void AsyncStreamImpl::closeRemote(bool end_stream) {
remote_closed_ |= end_stream;
if (complete()) {
stream_callbacks_.onComplete();
cleanup();
}
}
Expand Down Expand Up @@ -186,34 +188,27 @@ void AsyncRequestImpl::initialize() {
sendHeaders(request_->headers(), !request_->body());
if (!remoteClosed() && request_->body()) {
sendData(*request_->body(), true);
} else if (remoteClosed() && request_->body()) {
closeLocal(true);
Comment thread
goaway marked this conversation as resolved.
Outdated
}
// TODO(mattklein123): Support request trailers.
}

void AsyncRequestImpl::onComplete() { callbacks_.onSuccess(std::move(response_)); }

void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool end_stream) {
void AsyncRequestImpl::onHeaders(HeaderMapPtr&& headers, bool) {
response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onData(Buffer::Instance& data, bool end_stream) {
void AsyncRequestImpl::onData(Buffer::Instance& data, bool) {
if (!response_->body()) {
response_->body() = std::make_unique<Buffer::OwnedImpl>();
}
response_->body()->move(data);

if (end_stream) {
onComplete();
}
}

void AsyncRequestImpl::onTrailers(HeaderMapPtr&& trailers) {
response_->trailers(std::move(trailers));
onComplete();
}

void AsyncRequestImpl::onReset() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,

protected:
bool remoteClosed() { return remote_closed_; }
void closeLocal(bool end_stream);

AsyncClientImpl& parent_;

Expand Down Expand Up @@ -281,7 +282,6 @@ class AsyncStreamImpl : public AsyncClient::Stream,
};

void cleanup();
void closeLocal(bool end_stream);
void closeRemote(bool end_stream);
bool complete() { return local_closed_ && remote_closed_; }

Expand Down Expand Up @@ -383,12 +383,12 @@ class AsyncRequestImpl final : public AsyncClient::Request,

private:
void initialize();
void onComplete();

// AsyncClient::StreamCallbacks
void onHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void onData(Buffer::Instance& data, bool end_stream) override;
void onTrailers(HeaderMapPtr&& trailers) override;
void onComplete() override;
void onReset() override;

// Http::StreamDecoderFilterCallbacks
Expand Down
10 changes: 10 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ TEST_F(AsyncClientImplTest, BasicStream) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -243,6 +244,7 @@ TEST_F(AsyncClientImplTest, RetryWithStream) {

// Normal response.
expectResponseHeaders(stream_callbacks_, 200, true);
EXPECT_CALL(stream_callbacks_, onComplete());
HeaderMapPtr response_headers2(new TestHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers2), true);
}
Expand All @@ -265,6 +267,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand All @@ -289,6 +292,7 @@ TEST_F(AsyncClientImplTest, MultipleStreams) {
EXPECT_CALL(stream_encoder2, encodeData(BufferEqual(body2.get()), true));

expectResponseHeaders(stream_callbacks2, 503, true);
EXPECT_CALL(stream_callbacks2, onComplete());

AsyncClient::Stream* stream2 = client_.start(stream_callbacks2, AsyncClient::StreamOptions());
stream2->sendHeaders(headers2, false);
Expand Down Expand Up @@ -388,6 +392,7 @@ TEST_F(AsyncClientImplTest, StreamAndRequest) {

expectResponseHeaders(stream_callbacks_, 200, false);
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -427,6 +432,7 @@ TEST_F(AsyncClientImplTest, StreamWithTrailers) {
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body.get()), false));
TestHeaderMapImpl expected_trailers{{"some", "trailer"}};
EXPECT_CALL(stream_callbacks_, onTrailers_(HeaderMapEqualRef(&expected_trailers)));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(stream_callbacks_, AsyncClient::StreamOptions());
stream->sendHeaders(headers, false);
Expand Down Expand Up @@ -544,6 +550,7 @@ TEST_F(AsyncClientImplTest, ResetInOnHeaders) {
.WillOnce(Invoke([&stream](HeaderMap&, bool) { stream->reset(); }));
EXPECT_CALL(stream_callbacks_, onData(_, _)).Times(0);
EXPECT_CALL(stream_callbacks_, onReset());
EXPECT_CALL(stream_callbacks_, onComplete());
Comment thread
goaway marked this conversation as resolved.
Outdated

stream->sendHeaders(headers, false);
stream->sendData(*body, false);
Expand Down Expand Up @@ -720,6 +727,7 @@ TEST_F(AsyncClientImplTest, StreamTimeout) {
{":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}};
EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), false));
EXPECT_CALL(stream_callbacks_, onData(_, true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(
stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40)));
Expand Down Expand Up @@ -753,6 +761,7 @@ TEST_F(AsyncClientImplTest, StreamTimeoutHeadReply) {
TestHeaderMapImpl expected_timeout{
{":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}};
EXPECT_CALL(stream_callbacks_, onHeaders_(HeaderMapEqualRef(&expected_timeout), true));
EXPECT_CALL(stream_callbacks_, onComplete());

AsyncClient::Stream* stream = client_.start(
stream_callbacks_, AsyncClient::StreamOptions().setTimeout(std::chrono::milliseconds(40)));
Expand Down Expand Up @@ -860,6 +869,7 @@ TEST_F(AsyncClientImplTest, MultipleDataStream) {

EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(body2.get()), true));
EXPECT_CALL(stream_callbacks_, onData(BufferEqual(body2.get()), true));
EXPECT_CALL(stream_callbacks_, onComplete());

stream->sendData(*body2, true);
response_decoder_->decodeData(*body2, true);
Expand Down
1 change: 1 addition & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ class MockAsyncClientStreamCallbacks : public AsyncClient::StreamCallbacks {
MOCK_METHOD2(onHeaders_, void(HeaderMap& headers, bool end_stream));
MOCK_METHOD2(onData, void(Buffer::Instance& data, bool end_stream));
MOCK_METHOD1(onTrailers_, void(HeaderMap& headers));
MOCK_METHOD0(onComplete, void());
MOCK_METHOD0(onReset, void());
};

Expand Down