Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,16 @@ AsyncClient::Request* AsyncClientImpl::send(MessagePtr&& request, AsyncClient::C
new_request->moveIntoList(std::move(new_request), active_streams_);
return async_request;
} else {
new_request->cleanup();
return nullptr;
}
}

AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
const Optional<std::chrono::milliseconds>& timeout) {
std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, timeout)};

// The request may get immediately failed. If so, we will return nullptr.
if (!new_stream->remote_closed_) {
new_stream->moveIntoList(std::move(new_stream), active_streams_);
return active_streams_.front().get();
} else {
return nullptr;
}
new_stream->moveIntoList(std::move(new_stream), active_streams_);
return active_streams_.front().get();
}

AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
Expand Down Expand Up @@ -156,7 +151,7 @@ AsyncRequestImpl::AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent
: AsyncStreamImpl(parent, *this, timeout), request_(std::move(request)), callbacks_(callbacks) {

sendHeaders(request_->headers(), !request_->body());
if (!complete() && request_->body()) {
if (!remoteClosed() && request_->body()) {
sendData(*request_->body(), true);
}
// TODO: Support request trailers.
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
void reset() override;

protected:
bool complete() { return local_closed_ && remote_closed_; }
bool remoteClosed() { return remote_closed_; }

AsyncClientImpl& parent_;

private:
Expand Down Expand Up @@ -148,9 +149,9 @@ class AsyncStreamImpl : public AsyncClient::Stream,
};

void cleanup();

void closeLocal(bool end_stream);
void closeRemote(bool end_stream);
bool complete() { return local_closed_ && remote_closed_; }

// Http::StreamDecoderFilterCallbacks
void addResetStreamCallback(std::function<void()> callback) override {
Expand Down
16 changes: 16 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,22 @@ TEST_F(AsyncClientImplTest, PoolFailure) {
EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_503").value());
}

TEST_F(AsyncClientImplTest, PoolFailureWithBody) {
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder&,
ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* {
callbacks.onPoolFailure(ConnectionPool::PoolFailureReason::Overflow, nullptr);
return nullptr;
}));

expectSuccess(503);
message_->body(Buffer::InstancePtr{new Buffer::OwnedImpl("hello")});
EXPECT_EQ(nullptr,
client_.send(std::move(message_), callbacks_, Optional<std::chrono::milliseconds>()));

EXPECT_EQ(1UL, cm_.cluster_.info_->stats_store_.counter("upstream_rq_503").value());
}

TEST_F(AsyncClientImplTest, StreamTimeout) {
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](StreamDecoder&, ConnectionPool::Callbacks& callbacks)
Expand Down