Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSha
upstream_ready_timer_(dispatcher_.createTimer([this]() { onUpstreamReady(); })) {}

ConnPoolImpl::~ConnPoolImpl() {
while (!delayed_clients_.empty()) {
delayed_clients_.front()->codec_client_->close();
}

while (!ready_clients_.empty()) {
ready_clients_.front()->codec_client_->close();
}
Expand All @@ -43,6 +47,10 @@ ConnPoolImpl::~ConnPoolImpl() {
}

void ConnPoolImpl::drainConnections() {
while (!delayed_clients_.empty()) {
delayed_clients_.front()->codec_client_->close();
}

while (!ready_clients_.empty()) {
ready_clients_.front()->codec_client_->close();
}
Expand Down Expand Up @@ -74,6 +82,10 @@ void ConnPoolImpl::attachRequestToClient(ActiveClient& client, StreamDecoder& re

void ConnPoolImpl::checkForDrained() {
if (!drained_callbacks_.empty() && pending_requests_.empty() && busy_clients_.empty()) {
while (!delayed_clients_.empty()) {
delayed_clients_.front()->codec_client_->close();
}

while (!ready_clients_.empty()) {
ready_clients_.front()->codec_client_->close();
}
Expand Down Expand Up @@ -147,7 +159,12 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEv
} else if (!client.connect_timer_) {
// The connect timer is destroyed on connect. The lack of a connect timer means that this
// client is idle and in the ready pool.
removed = client.removeFromList(ready_clients_);
if (client.delayed_) {
client.delayed_ = 0;
removed = client.removeFromList(delayed_clients_);
} else {
removed = client.removeFromList(ready_clients_);
}
check_for_drained = false;
} else {
// The only time this happens is if we actually saw a connect failure.
Expand Down Expand Up @@ -220,6 +237,20 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) {

void ConnPoolImpl::onUpstreamReady() {
upstream_ready_enabled_ = false;
auto it = delayed_clients_.begin();
while (it != delayed_clients_.end()) {
ActiveClient& client = **it;
it++; // Move forward before moveBetweenLists which would invalidate 'it'.
client.delayed_--;
if (client.delayed_ == 0) {
ENVOY_CONN_LOG(debug, "moving from delay to ready", *client.codec_client_);
client.moveBetweenLists(delayed_clients_, ready_clients_);
}
}
if (!delayed_clients_.empty()) {
upstream_ready_enabled_ = true;
upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0));
}
while (!pending_requests_.empty() && !ready_clients_.empty()) {
ActiveClient& client = *ready_clients_.front();
ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_);
Expand All @@ -234,7 +265,13 @@ void ConnPoolImpl::onUpstreamReady() {

void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) {
client.stream_wrapper_.reset();
if (pending_requests_.empty() || delay) {
if (delay) {
ENVOY_CONN_LOG(debug, "moving to delay", *client.codec_client_);
// N.B. libevent does not guarantee ordering of events, so to ensure that the delayed client
// experiences a poll cycle before being made ready, delay for 2 event loops.
client.delayed_ = 2;
client.moveBetweenLists(busy_clients_, delayed_clients_);
} else if (pending_requests_.empty()) {
// There is nothing to service or delayed processing is requested, so just move the connection
// into the ready list.
ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_);
Expand All @@ -248,7 +285,7 @@ void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) {
pending_requests_.pop_back();
}

if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) {
if (!delayed_clients_.empty() && !upstream_ready_enabled_) {
upstream_ready_enabled_ = true;
upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0));
}
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase {
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
uint64_t remaining_requests_;
int delayed_{0};
};

typedef std::unique_ptr<ActiveClient> ActiveClientPtr;
Expand All @@ -118,6 +119,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase {

Stats::TimespanPtr conn_connect_ms_;
Event::Dispatcher& dispatcher_;
std::list<ActiveClientPtr> delayed_clients_;
std::list<ActiveClientPtr> ready_clients_;
std::list<ActiveClientPtr> busy_clients_;
std::list<DrainedCb> drained_callbacks_;
Expand Down
30 changes: 23 additions & 7 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,13 @@ class ConnPoolImplForTest : public ConnPoolImpl {
ON_CALL(*test_client.codec_, protocol()).WillByDefault(Return(protocol));
}

void expectEnableUpstreamReady() {
void expectUpstreamReadyEnableTimer(int times = 1) {
EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(times).RetiresOnSaturation();
}

void expectEnableUpstreamReady(int times = 1) {
EXPECT_FALSE(upstream_ready_enabled_);
EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(1).RetiresOnSaturation();
expectUpstreamReadyEnableTimer(times);
}

void expectAndRunUpstreamReady() {
Expand All @@ -113,6 +117,12 @@ class ConnPoolImplForTest : public ConnPoolImpl {
EXPECT_FALSE(upstream_ready_enabled_);
}

void expectAndRunUpstreamReadyStillReady() {
EXPECT_TRUE(upstream_ready_enabled_);
mock_upstream_ready_timer_->callback_();
EXPECT_TRUE(upstream_ready_enabled_);
}

Api::ApiPtr api_;
Event::MockDispatcher& mock_dispatcher_;
NiceMock<Event::MockTimer>* mock_upstream_ready_timer_;
Expand Down Expand Up @@ -283,6 +293,9 @@ TEST_F(Http1ConnPoolImplTest, MultipleRequestAndResponse) {
r1.startRequest();
r1.completeResponse(false);

conn_pool_.expectAndRunUpstreamReadyStillReady();
conn_pool_.expectAndRunUpstreamReady();

// Request 2 should not.
ActiveTestRequest r2(*this, 0, ActiveTestRequest::Type::Immediate);
r2.startRequest();
Expand Down Expand Up @@ -469,8 +482,8 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {

conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected);

// Finishing request 1 will immediately bind to request 2.
conn_pool_.expectEnableUpstreamReady();
// Finishing request 1 will bind to request 2.
conn_pool_.expectEnableUpstreamReady(2);
EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks2.pool_ready_, ready());
Expand All @@ -479,7 +492,10 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) {
Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}});
inner_decoder->decodeHeaders(std::move(response_headers), true);

conn_pool_.expectAndRunUpstreamReadyStillReady();
conn_pool_.expectAndRunUpstreamReady();

conn_pool_.expectUpstreamReadyEnableTimer(); // The connection will be added to the delay list.
callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true);
// N.B. clang_tidy insists that we use std::make_unique which can not infer std::initialize_list.
response_headers = std::make_unique<TestHeaderMapImpl>(
Expand Down Expand Up @@ -537,8 +553,6 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) {
conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

conn_pool_.expectAndRunUpstreamReady();

EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder)));
EXPECT_CALL(callbacks2.pool_ready_, ready());
Expand Down Expand Up @@ -702,14 +716,16 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) {
ActiveTestRequest r3(*this, 0, ActiveTestRequest::Type::Pending);

// Finish r1, which gets r3 going.
conn_pool_.expectEnableUpstreamReady();
conn_pool_.expectEnableUpstreamReady(2);
r3.expectNewStream();

r1.completeResponse(false);
conn_pool_.expectAndRunUpstreamReadyStillReady();
conn_pool_.expectAndRunUpstreamReady();
r3.startRequest();
EXPECT_EQ(3U, cluster_->stats_.upstream_rq_total_.value());

conn_pool_.expectUpstreamReadyEnableTimer(); // The connections will be added to the delay list.
r2.completeResponse(false);
r3.completeResponse(false);

Expand Down