diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index 2aae77c040d90..2d486e3def6b9 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -37,6 +37,7 @@ AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::C new_request->moveIntoList(std::move(new_request), active_streams_); return async_request; } else { + new_request->cleanup(); return nullptr; } } @@ -44,14 +45,8 @@ AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::C AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks, const Optional& timeout) { std::unique_ptr new_stream{new AsyncStreamImpl(*this, callbacks, timeout)}; - - // The request may get immediately failed. If so, we will return nullptr. - if (!new_stream->remote_closed_) { - new_stream->moveIntoList(std::move(new_stream), active_streams_); - return active_streams_.front().get(); - } else { - return nullptr; - } + new_stream->moveIntoList(std::move(new_stream), active_streams_); + return active_streams_.front().get(); } AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, @@ -156,7 +151,7 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent : AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) { sendHeaders(request_->headers(), !request_->body()); - if (!complete() && request_->body()) { + if (!remoteClosed() && request_->body()) { sendData(*request_->body(), true); } // TODO: Support request trailers. diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 5793dc06f5fdd..ccf8ab8a5aacd 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -66,7 +66,8 @@ class AsyncStreamImpl : public AsyncClient::Stream, void reset() override; protected: - bool complete() { return local_closed_ && remote_closed_; } + bool remoteClosed() { return remote_closed_; } + AsyncClientImpl& parent_; private: @@ -148,9 +149,9 @@ class AsyncStreamImpl : public AsyncClient::Stream, }; void cleanup(); - void closeLocal(bool end_stream); void closeRemote(bool end_stream); + bool complete() { return local_closed_ && remote_closed_; } // Http::StreamDecoderFilterCallbacks void addResetStreamCallback(std::function callback) override { diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 34c6f326a9422..664e1dd1e3de8 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -610,6 +610,22 @@ TEST_F(AsyncClientImplTest, PoolFailure) { EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_503").value()); } +TEST_F(AsyncClientImplTest, PoolFailureWithBody) { + EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](StreamDecoder&, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolFailure(ConnectionPool::PoolFailureReason::Overflow, nullptr); + return nullptr; + })); + + expectSuccess(503); + message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("hello")}); + EXPECT_EQ(nullptr, + client_.send(std::move(message_), callbacks_, Optional())); + + EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_503").value()); +} + TEST_F(AsyncClientImplTest, StreamTimeout) { EXPECT_CALL(cm_.conn_pool_, newStream(_, _)) .WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks)