diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 5789ca5f0c699..d91c536739213 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -30,6 +30,10 @@ ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSha upstream_ready_timer_(dispatcher_.createTimer([this]() { onUpstreamReady(); })) {} ConnPoolImpl::~ConnPoolImpl() { + while (!delayed_clients_.empty()) { + delayed_clients_.front()->codec_client_->close(); + } + while (!ready_clients_.empty()) { ready_clients_.front()->codec_client_->close(); } @@ -43,6 +47,10 @@ ConnPoolImpl::~ConnPoolImpl() { } void ConnPoolImpl::drainConnections() { + while (!delayed_clients_.empty()) { + delayed_clients_.front()->codec_client_->close(); + } + while (!ready_clients_.empty()) { ready_clients_.front()->codec_client_->close(); } @@ -74,6 +82,10 @@ void ConnPoolImpl::attachRequestToClient(ActiveClient& client, StreamDecoder& re void ConnPoolImpl::checkForDrained() { if (!drained_callbacks_.empty() && pending_requests_.empty() && busy_clients_.empty()) { + while (!delayed_clients_.empty()) { + delayed_clients_.front()->codec_client_->close(); + } + while (!ready_clients_.empty()) { ready_clients_.front()->codec_client_->close(); } @@ -147,7 +159,12 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEv } else if (!client.connect_timer_) { // The connect timer is destroyed on connect. The lack of a connect timer means that this // client is idle and in the ready pool. - removed = client.removeFromList(ready_clients_); + if (client.delayed_) { + client.delayed_ = 0; + removed = client.removeFromList(delayed_clients_); + } else { + removed = client.removeFromList(ready_clients_); + } check_for_drained = false; } else { // The only time this happens is if we actually saw a connect failure. @@ -220,6 +237,20 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) { void ConnPoolImpl::onUpstreamReady() { upstream_ready_enabled_ = false; + auto it = delayed_clients_.begin(); + while (it != delayed_clients_.end()) { + ActiveClient& client = **it; + it++; // Move forward before moveBetweenLists which would invalidate 'it'. + client.delayed_--; + if (client.delayed_ == 0) { + ENVOY_CONN_LOG(debug, "moving from delay to ready", *client.codec_client_); + client.moveBetweenLists(delayed_clients_, ready_clients_); + } + } + if (!delayed_clients_.empty()) { + upstream_ready_enabled_ = true; + upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0)); + } while (!pending_requests_.empty() && !ready_clients_.empty()) { ActiveClient& client = *ready_clients_.front(); ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_); @@ -234,7 +265,13 @@ void ConnPoolImpl::onUpstreamReady() { void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) { client.stream_wrapper_.reset(); - if (pending_requests_.empty() || delay) { + if (delay) { + ENVOY_CONN_LOG(debug, "moving to delay", *client.codec_client_); + // N.B. libevent does not guarantee ordering of events, so to ensure that the delayed client + // experiences a poll cycle before being made ready, delay for 2 event loops. + client.delayed_ = 2; + client.moveBetweenLists(busy_clients_, delayed_clients_); + } else if (pending_requests_.empty()) { // There is nothing to service or delayed processing is requested, so just move the connection // into the ready list. ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); @@ -248,7 +285,7 @@ void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) { pending_requests_.pop_back(); } - if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) { + if (!delayed_clients_.empty() && !upstream_ready_enabled_) { upstream_ready_enabled_ = true; upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0)); } diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index fba5b83268e11..0f5b4c4f2faa6 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -102,6 +102,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase { Event::TimerPtr connect_timer_; Stats::TimespanPtr conn_length_; uint64_t remaining_requests_; + int delayed_{0}; }; typedef std::unique_ptr ActiveClientPtr; @@ -118,6 +119,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase { Stats::TimespanPtr conn_connect_ms_; Event::Dispatcher& dispatcher_; + std::list delayed_clients_; std::list ready_clients_; std::list busy_clients_; std::list drained_callbacks_; diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index f06e06d4b02ec..01a60c7e8080f 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -102,9 +102,13 @@ class ConnPoolImplForTest : public ConnPoolImpl { ON_CALL(*test_client.codec_, protocol()).WillByDefault(Return(protocol)); } - void expectEnableUpstreamReady() { + void expectUpstreamReadyEnableTimer(int times = 1) { + EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(times).RetiresOnSaturation(); + } + + void expectEnableUpstreamReady(int times = 1) { EXPECT_FALSE(upstream_ready_enabled_); - EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(1).RetiresOnSaturation(); + expectUpstreamReadyEnableTimer(times); } void expectAndRunUpstreamReady() { @@ -113,6 +117,12 @@ class ConnPoolImplForTest : public ConnPoolImpl { EXPECT_FALSE(upstream_ready_enabled_); } + void expectAndRunUpstreamReadyStillReady() { + EXPECT_TRUE(upstream_ready_enabled_); + mock_upstream_ready_timer_->callback_(); + EXPECT_TRUE(upstream_ready_enabled_); + } + Api::ApiPtr api_; Event::MockDispatcher& mock_dispatcher_; NiceMock* mock_upstream_ready_timer_; @@ -283,6 +293,9 @@ TEST_F(Http1ConnPoolImplTest, MultipleRequestAndResponse) { r1.startRequest(); r1.completeResponse(false); + conn_pool_.expectAndRunUpstreamReadyStillReady(); + conn_pool_.expectAndRunUpstreamReady(); + // Request 2 should not. ActiveTestRequest r2(*this, 0, ActiveTestRequest::Type::Immediate); r2.startRequest(); @@ -469,8 +482,8 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); - // Finishing request 1 will immediately bind to request 2. - conn_pool_.expectEnableUpstreamReady(); + // Finishing request 1 will bind to request 2. + conn_pool_.expectEnableUpstreamReady(2); EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks2.pool_ready_, ready()); @@ -479,7 +492,10 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); + conn_pool_.expectAndRunUpstreamReadyStillReady(); conn_pool_.expectAndRunUpstreamReady(); + + conn_pool_.expectUpstreamReadyEnableTimer(); // The connection will be added to the delay list. callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); // N.B. clang_tidy insists that we use std::make_unique which can not infer std::initialize_list. response_headers = std::make_unique( @@ -537,8 +553,6 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) { conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); dispatcher_.clearDeferredDeleteList(); - conn_pool_.expectAndRunUpstreamReady(); - EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks2.pool_ready_, ready()); @@ -702,14 +716,16 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) { ActiveTestRequest r3(*this, 0, ActiveTestRequest::Type::Pending); // Finish r1, which gets r3 going. - conn_pool_.expectEnableUpstreamReady(); + conn_pool_.expectEnableUpstreamReady(2); r3.expectNewStream(); r1.completeResponse(false); + conn_pool_.expectAndRunUpstreamReadyStillReady(); conn_pool_.expectAndRunUpstreamReady(); r3.startRequest(); EXPECT_EQ(3U, cluster_->stats_.upstream_rq_total_.value()); + conn_pool_.expectUpstreamReadyEnableTimer(); // The connections will be added to the delay list. r2.completeResponse(false); r3.completeResponse(false);