From 7d703b617c66156a11c23b61d9307a28a4bd5b1f Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Fri, 31 May 2019 10:49:14 -0700 Subject: [PATCH 1/2] Delay connection reuse for a poll cycle to catch closed connections. Signed-off-by: John Plevyak --- source/common/http/codec_client.h | 5 ++ source/common/http/http1/conn_pool.cc | 55 ++++++++++--- source/common/http/http1/conn_pool.h | 3 + test/common/http/http1/conn_pool_test.cc | 98 ++++++++++++++++++++---- 4 files changed, 134 insertions(+), 27 deletions(-) diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index 0f4dff8bd8ab8..caae81485fccd 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -78,6 +78,11 @@ class CodecClient : Logger::Loggable, */ uint64_t id() { return connection_->id(); } + /** + * @return the underlying codec protocol. + */ + Protocol protocol() { return codec_->protocol(); } + /** * @return the underlying connection error. */ diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index d8ad735f9d249..e2ce5549d1263 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -30,6 +30,10 @@ ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSha upstream_ready_timer_(dispatcher_.createTimer([this]() { onUpstreamReady(); })) {} ConnPoolImpl::~ConnPoolImpl() { + while (!delayed_clients_.empty()) { + delayed_clients_.front()->codec_client_->close(); + } + while (!ready_clients_.empty()) { ready_clients_.front()->codec_client_->close(); } @@ -147,7 +151,12 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEv } else if (!client.connect_timer_) { // The connect timer is destroyed on connect. The lack of a connect timer means that this // client is idle and in the ready pool. - removed = client.removeFromList(ready_clients_); + if (client.delayed_) { + client.delayed_ = 0; + removed = client.removeFromList(delayed_clients_); + } else { + removed = client.removeFromList(ready_clients_); + } check_for_drained = false; } else { // The only time this happens is if we actually saw a connect failure. @@ -203,8 +212,10 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) { if (!client.stream_wrapper_->encode_complete_) { ENVOY_CONN_LOG(debug, "response before request complete", *client.codec_client_); onDownstreamReset(client); - } else if (client.stream_wrapper_->saw_close_header_ || client.codec_client_->remoteClosed()) { - ENVOY_CONN_LOG(debug, "saw upstream connection: close", *client.codec_client_); + } else if (client.stream_wrapper_->saw_close_header_ || client.codec_client_->remoteClosed() || + (client.codec_client_->protocol() == Protocol::Http10 && + !client.stream_wrapper_->saw_keep_alive_header_)) { + ENVOY_CONN_LOG(debug, "saw upstream close connection", *client.codec_client_); onDownstreamReset(client); } else if (client.remaining_requests_ > 0 && --client.remaining_requests_ == 0) { ENVOY_CONN_LOG(debug, "maximum requests per connection", *client.codec_client_); @@ -220,6 +231,20 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) { void ConnPoolImpl::onUpstreamReady() { upstream_ready_enabled_ = false; + auto it = delayed_clients_.begin(); + while (it != delayed_clients_.end()) { + ActiveClient& client = **it; + it++; // Move forward before moveBetweenLists which would invalidate 'it'. + client.delayed_--; + if (client.delayed_ == 0) { + ENVOY_CONN_LOG(debug, "moving from delay to ready", *client.codec_client_); + client.moveBetweenLists(delayed_clients_, ready_clients_); + } + } + if (!delayed_clients_.empty()) { + upstream_ready_enabled_ = true; + upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0)); + } while (!pending_requests_.empty() && !ready_clients_.empty()) { ActiveClient& client = *ready_clients_.front(); ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_); @@ -234,7 +259,13 @@ void ConnPoolImpl::onUpstreamReady() { void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) { client.stream_wrapper_.reset(); - if (pending_requests_.empty() || delay) { + if (delay) { + ENVOY_CONN_LOG(debug, "moving to delay", *client.codec_client_); + // N.B. libevent does not guarantee ordering of events, so to ensure that the delayed client + // experiences a poll cycle before being made ready, delay for 2 event loops. + client.delayed_ = 2; + client.moveBetweenLists(busy_clients_, delayed_clients_); + } else if (pending_requests_.empty()) { // There is nothing to service or delayed processing is requested, so just move the connection // into the ready list. ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); @@ -248,7 +279,7 @@ void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) { pending_requests_.pop_back(); } - if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) { + if (!delayed_clients_.empty() && !upstream_ready_enabled_) { upstream_ready_enabled_ = true; upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0)); } @@ -273,11 +304,15 @@ ConnPoolImpl::StreamWrapper::~StreamWrapper() { void ConnPoolImpl::StreamWrapper::onEncodeComplete() { encode_complete_ = true; } void ConnPoolImpl::StreamWrapper::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) { - if (headers->Connection() && - absl::EqualsIgnoreCase(headers->Connection()->value().getStringView(), - Headers::get().ConnectionValues.Close)) { - saw_close_header_ = true; - parent_.parent_.host_->cluster().stats().upstream_cx_close_notify_.inc(); + if (headers->Connection()) { + if (absl::EqualsIgnoreCase(headers->Connection()->value().getStringView(), + Headers::get().ConnectionValues.Close)) { + saw_close_header_ = true; + parent_.parent_.host_->cluster().stats().upstream_cx_close_notify_.inc(); + } else if (absl::EqualsIgnoreCase(headers->Connection()->value().getStringView(), + Headers::get().ConnectionValues.KeepAlive)) { + saw_keep_alive_header_ = true; + } } if (!saw_close_header_ && headers->ProxyConnection() && absl::EqualsIgnoreCase(headers->ProxyConnection()->value().getStringView(), diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index 9f28b0ac8c587..47209233d347a 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -74,6 +74,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase { ActiveClient& parent_; bool encode_complete_{}; bool saw_close_header_{}; + bool saw_keep_alive_header_{}; bool decode_complete_{}; }; @@ -101,6 +102,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase { Event::TimerPtr connect_timer_; Stats::TimespanPtr conn_length_; uint64_t remaining_requests_; + int delayed_{0}; }; typedef std::unique_ptr ActiveClientPtr; @@ -117,6 +119,7 @@ class ConnPoolImpl : public ConnectionPool::Instance, public ConnPoolImplBase { Stats::TimespanPtr conn_connect_ms_; Event::Dispatcher& dispatcher_; + std::list delayed_clients_; std::list ready_clients_; std::list busy_clients_; std::list drained_callbacks_; diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 51ccfbabc07c8..8de6da39aca31 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -101,9 +101,13 @@ class ConnPoolImplForTest : public ConnPoolImpl { EXPECT_CALL(*test_client.connect_timer_, enableTimer(_)); } - void expectEnableUpstreamReady() { + void expectUpstreamReadyEnableTimer(int times = 1) { + EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(times).RetiresOnSaturation(); + } + + void expectEnableUpstreamReady(int times = 1) { EXPECT_FALSE(upstream_ready_enabled_); - EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(1).RetiresOnSaturation(); + expectUpstreamReadyEnableTimer(times); } void expectAndRunUpstreamReady() { @@ -112,6 +116,12 @@ class ConnPoolImplForTest : public ConnPoolImpl { EXPECT_FALSE(upstream_ready_enabled_); } + void expectAndRunUpstreamReadyStillReady() { + EXPECT_TRUE(upstream_ready_enabled_); + mock_upstream_ready_timer_->callback_(); + EXPECT_TRUE(upstream_ready_enabled_); + } + Api::ApiPtr api_; Event::MockDispatcher& mock_dispatcher_; NiceMock* mock_upstream_ready_timer_; @@ -186,6 +196,18 @@ struct ActiveTestRequest { } } + void completeKeepAliveResponse(bool with_body) { + // Test additional metric writes also. + Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{ + {"connection", "keep-alive"}, {":status", "200"}, {"x-envoy-upstream-canary", "true"}}); + + inner_decoder_->decodeHeaders(std::move(response_headers), !with_body); + if (with_body) { + Buffer::OwnedImpl data; + inner_decoder_->decodeData(data, true); + } + } + void expectNewStream() { EXPECT_CALL(*parent_.conn_pool_.test_clients_[client_index_].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder_), ReturnRef(request_encoder_))); @@ -275,12 +297,15 @@ TEST_F(Http1ConnPoolImplTest, MultipleRequestAndResponse) { // Request 1 should kick off a new connection. ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection); r1.startRequest(); - r1.completeResponse(false); + r1.completeKeepAliveResponse(false); + + conn_pool_.expectAndRunUpstreamReadyStillReady(); + conn_pool_.expectAndRunUpstreamReady(); // Request 2 should not. ActiveTestRequest r2(*this, 0, ActiveTestRequest::Type::Immediate); r2.startRequest(); - r2.completeResponse(true); + r2.completeKeepAliveResponse(true); // Cause the connection to go away. EXPECT_CALL(conn_pool_, onClientDestroy()); @@ -463,19 +488,23 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); - // Finishing request 1 will immediately bind to request 2. - conn_pool_.expectEnableUpstreamReady(); + // Finishing request 1 will bind to request 2. + conn_pool_.expectEnableUpstreamReady(2); EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks2.pool_ready_, ready()); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); - Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + Http::HeaderMapPtr response_headers( + new TestHeaderMapImpl{{":status", "200"}, {"connection", "keep-alive"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); + conn_pool_.expectAndRunUpstreamReadyStillReady(); conn_pool_.expectAndRunUpstreamReady(); + + conn_pool_.expectUpstreamReadyEnableTimer(); // The connection will be added to the delay list. callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); - response_headers.reset(new TestHeaderMapImpl{{":status", "200"}}); + response_headers.reset(new TestHeaderMapImpl{{":status", "200"}, {"connection", "keep-alive"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); // Cause the connection to go away. @@ -520,7 +549,8 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) { conn_pool_.expectEnableUpstreamReady(); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); - Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + Http::HeaderMapPtr response_headers( + new TestHeaderMapImpl{{":status", "200"}, {"connection", "keep-alive"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); // Cause the connection to go away. @@ -529,15 +559,13 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) { conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); dispatcher_.clearDeferredDeleteList(); - conn_pool_.expectAndRunUpstreamReady(); - EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks2.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); - response_headers.reset(new TestHeaderMapImpl{{":status", "200"}}); + response_headers.reset(new TestHeaderMapImpl{{":status", "200"}, {"connection", "keep-alive"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); EXPECT_CALL(conn_pool_, onClientDestroy()); @@ -611,6 +639,39 @@ TEST_F(Http1ConnPoolImplTest, ProxyConnectionCloseHeader) { EXPECT_EQ(0U, cluster_->stats_.upstream_cx_destroy_with_active_rq_.value()); } +/** + * Test when upstream is HTTP/1.0 and does not send 'connection: keep-alive' + */ +TEST_F(Http1ConnPoolImplTest, NoConnectionKeepAlive) { + InSequence s; + + // Request 1 should kick off a new connection. + NiceMock outer_decoder; + ConnPoolCallbacks callbacks; + conn_pool_.expectClientCreate(); + Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); + + EXPECT_NE(nullptr, handle); + + NiceMock request_encoder; + Http::StreamDecoder* inner_decoder; + EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) + .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); + EXPECT_CALL(callbacks.pool_ready_, ready()); + + conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); + + // Response without 'connection: keep-alive' which should cause the connection to go away. + EXPECT_CALL(conn_pool_, onClientDestroy()); + Http::HeaderMapPtr response_headers( + new TestHeaderMapImpl{{":protocol", "HTTP/1.0"}, {":status", "200"}}); + inner_decoder->decodeHeaders(std::move(response_headers), true); + dispatcher_.clearDeferredDeleteList(); + + EXPECT_EQ(0U, cluster_->stats_.upstream_cx_destroy_with_active_rq_.value()); +} + /** * Test when we reach max requests per connection. */ @@ -638,7 +699,8 @@ TEST_F(Http1ConnPoolImplTest, MaxRequestsPerConnection) { // Response with 'connection: close' which should cause the connection to go away. EXPECT_CALL(conn_pool_, onClientDestroy()); - Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + Http::HeaderMapPtr response_headers( + new TestHeaderMapImpl{{":status", "200"}, {"connection", "keep-alive"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); dispatcher_.clearDeferredDeleteList(); @@ -659,16 +721,18 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) { ActiveTestRequest r3(*this, 0, ActiveTestRequest::Type::Pending); // Finish r1, which gets r3 going. - conn_pool_.expectEnableUpstreamReady(); + conn_pool_.expectEnableUpstreamReady(2); r3.expectNewStream(); - r1.completeResponse(false); + r1.completeKeepAliveResponse(false); + conn_pool_.expectAndRunUpstreamReadyStillReady(); conn_pool_.expectAndRunUpstreamReady(); r3.startRequest(); EXPECT_EQ(3U, cluster_->stats_.upstream_rq_total_.value()); - r2.completeResponse(false); - r3.completeResponse(false); + conn_pool_.expectUpstreamReadyEnableTimer(); // The connections will be added to the delay list. + r2.completeKeepAliveResponse(false); + r3.completeKeepAliveResponse(false); // Disconnect both clients. EXPECT_CALL(conn_pool_, onClientDestroy()).Times(2); From 6f02b78b00dca184e3fe2eadf6137ff29f03200a Mon Sep 17 00:00:00 2001 From: John Plevyak Date: Wed, 5 Jun 2019 15:53:18 -0700 Subject: [PATCH 2/2] Merge. Signed-off-by: John Plevyak --- source/common/http/http1/conn_pool.cc | 8 ++++++ test/common/http/http1/conn_pool_test.cc | 31 ++++++------------------ 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 4c299b155db51..d91c536739213 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -47,6 +47,10 @@ ConnPoolImpl::~ConnPoolImpl() { } void ConnPoolImpl::drainConnections() { + while (!delayed_clients_.empty()) { + delayed_clients_.front()->codec_client_->close(); + } + while (!ready_clients_.empty()) { ready_clients_.front()->codec_client_->close(); } @@ -78,6 +82,10 @@ void ConnPoolImpl::attachRequestToClient(ActiveClient& client, StreamDecoder& re void ConnPoolImpl::checkForDrained() { if (!drained_callbacks_.empty() && pending_requests_.empty() && busy_clients_.empty()) { + while (!delayed_clients_.empty()) { + delayed_clients_.front()->codec_client_->close(); + } + while (!ready_clients_.empty()) { ready_clients_.front()->codec_client_->close(); } diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index c0667865d2a24..01a60c7e8080f 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -197,18 +197,6 @@ struct ActiveTestRequest { } } - void completeKeepAliveResponse(bool with_body) { - // Test additional metric writes also. - Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{ - {"connection", "keep-alive"}, {":status", "200"}, {"x-envoy-upstream-canary", "true"}}); - - inner_decoder_->decodeHeaders(std::move(response_headers), !with_body); - if (with_body) { - Buffer::OwnedImpl data; - inner_decoder_->decodeData(data, true); - } - } - void expectNewStream() { EXPECT_CALL(*parent_.conn_pool_.test_clients_[client_index_].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder_), ReturnRef(request_encoder_))); @@ -303,7 +291,7 @@ TEST_F(Http1ConnPoolImplTest, MultipleRequestAndResponse) { // Request 1 should kick off a new connection. ActiveTestRequest r1(*this, 0, ActiveTestRequest::Type::CreateConnection); r1.startRequest(); - r1.completeKeepAliveResponse(false); + r1.completeResponse(false); conn_pool_.expectAndRunUpstreamReadyStillReady(); conn_pool_.expectAndRunUpstreamReady(); @@ -311,7 +299,7 @@ TEST_F(Http1ConnPoolImplTest, MultipleRequestAndResponse) { // Request 2 should not. ActiveTestRequest r2(*this, 0, ActiveTestRequest::Type::Immediate); r2.startRequest(); - r2.completeKeepAliveResponse(true); + r2.completeResponse(true); // Cause the connection to go away. EXPECT_CALL(conn_pool_, onClientDestroy()); @@ -501,8 +489,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { EXPECT_CALL(callbacks2.pool_ready_, ready()); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); - Http::HeaderMapPtr response_headers( - new TestHeaderMapImpl{{":status", "200"}, {"connection", "keep-alive"}}); + Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); conn_pool_.expectAndRunUpstreamReadyStillReady(); @@ -557,8 +544,7 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) { conn_pool_.expectEnableUpstreamReady(); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); - Http::HeaderMapPtr response_headers( - new TestHeaderMapImpl{{":status", "200"}, {"connection", "keep-alive"}}); + Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); // Cause the connection to go away. @@ -709,8 +695,7 @@ TEST_F(Http1ConnPoolImplTest, MaxRequestsPerConnection) { // Response with 'connection: close' which should cause the connection to go away. EXPECT_CALL(conn_pool_, onClientDestroy()); - Http::HeaderMapPtr response_headers( - new TestHeaderMapImpl{{":status", "200"}, {"connection", "keep-alive"}}); + Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); dispatcher_.clearDeferredDeleteList(); @@ -734,15 +719,15 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) { conn_pool_.expectEnableUpstreamReady(2); r3.expectNewStream(); - r1.completeKeepAliveResponse(false); + r1.completeResponse(false); conn_pool_.expectAndRunUpstreamReadyStillReady(); conn_pool_.expectAndRunUpstreamReady(); r3.startRequest(); EXPECT_EQ(3U, cluster_->stats_.upstream_rq_total_.value()); conn_pool_.expectUpstreamReadyEnableTimer(); // The connections will be added to the delay list. - r2.completeKeepAliveResponse(false); - r3.completeKeepAliveResponse(false); + r2.completeResponse(false); + r3.completeResponse(false); // Disconnect both clients. EXPECT_CALL(conn_pool_, onClientDestroy()).Times(2);