From 11426feab2df2feb5b0cceb0e2b9876840a65c5a Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Thu, 22 Mar 2018 01:54:20 -0700 Subject: [PATCH 01/10] http: delaying attach pending requests Signed-off-by: Lizan Zhou --- source/common/http/http1/conn_pool.cc | 26 +++++++++----- source/common/http/http1/conn_pool.h | 2 ++ test/integration/http_integration.cc | 51 +++++++++++++++++++++++++++ test/integration/http_integration.h | 1 + test/integration/integration_test.cc | 4 +++ 5 files changed, 76 insertions(+), 8 deletions(-) diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 41f76e1f97e55..3f4ae82040f6b 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -213,19 +213,29 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) { } } -void ConnPoolImpl::processIdleClient(ActiveClient& client) { - client.stream_wrapper_.reset(); - if (pending_requests_.empty()) { - // There is nothing to service so just move the connection into the ready list. - ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); - client.moveBetweenLists(busy_clients_, ready_clients_); - } else { +void ConnPoolImpl::onUpstreamReady() { + upstream_ready_posted_ = false; + while (!pending_requests_.empty() && !ready_clients_.empty()) { + ActiveClient& client = *ready_clients_.front(); + ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_); // There is work to do so bind a request to the client and move it to the busy list. Pending // requests are pushed onto the front, so pull from the back. - ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_); attachRequestToClient(client, pending_requests_.back()->decoder_, pending_requests_.back()->callbacks_); pending_requests_.pop_back(); + client.moveBetweenLists(ready_clients_, busy_clients_); + } +} + +void ConnPoolImpl::processIdleClient(ActiveClient& client) { + client.stream_wrapper_.reset(); + // There is nothing to service so just move the connection into the ready list. + ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); + client.moveBetweenLists(busy_clients_, ready_clients_); + + if (!pending_requests_.empty() && !upstream_ready_posted_) { + upstream_ready_posted_ = true; + dispatcher_.post([this]() { onUpstreamReady(); }); } checkForDrained(); diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index 223367402ec09..f70e66b33c610 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -123,6 +123,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onDownstreamReset(ActiveClient& client); void onPendingRequestCancel(PendingRequest& request); void onResponseComplete(ActiveClient& client); + void onUpstreamReady(); void processIdleClient(ActiveClient& client); Stats::TimespanPtr conn_connect_ms_; @@ -134,6 +135,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: std::list drained_callbacks_; Upstream::ResourcePriority priority_; const Network::ConnectionSocket::OptionsSharedPtr socket_options_; + bool upstream_ready_posted_{false}; }; /** diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index fec6c45aa3442..f2afee3380630 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -936,6 +936,57 @@ void HttpIntegrationTest::testIdleTimeoutWithTwoRequests() { test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_idle_timeout", 1); } +void HttpIntegrationTest::testUpstreamDisconnectWithTwoRequests() { + initialize(); + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Request 1. + codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}, + 1024, *response_); + waitForNextUpstreamRequest(); + + // Request 2. + IntegrationStreamDecoderPtr response2{new IntegrationStreamDecoder(*dispatcher_)}; + IntegrationCodecClientPtr codec_client2 = makeHttpConnection(lookupPort("http")); + codec_client2->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}, + 512, *response2); + + // Response 1. + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(512, true); + fake_upstream_connection_->close(); + response_->waitForEndStream(); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response_->complete()); + EXPECT_STREQ("200", response_->headers().Status()->value().c_str()); + test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", 1); + test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 1); + + // Response 2. + fake_upstream_connection_.reset(); + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(1024, true); + response2->waitForEndStream(); + + codec_client2->close(); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response2->complete()); + EXPECT_STREQ("200", response2->headers().Status()->value().c_str()); + test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", 2); + test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 2); +} + void HttpIntegrationTest::testTwoRequests() { initialize(); diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index f4e6e365f9906..0850446708817 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -128,6 +128,7 @@ class HttpIntegrationTest : public BaseIntegrationTest { void testIdleTimeoutBasic(); void testIdleTimeoutWithTwoRequests(); void testIdleTimerDisabled(); + void testUpstreamDisconnectWithTwoRequests(); // HTTP/1 tests void testBadFirstline(); void testMissingDelimiter(); diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index f1e36779c16f5..27942d3de3ec0 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -131,6 +131,10 @@ TEST_P(IntegrationTest, EnvoyProxyingLate100ContinueWithEncoderFilter) { TEST_P(IntegrationTest, TwoRequests) { testTwoRequests(); } +TEST_P(IntegrationTest, UpstreamDisconnectWithTwoRequests) { + testUpstreamDisconnectWithTwoRequests(); +} + TEST_P(IntegrationTest, RetryHittingBufferLimit) { testRetryHittingBufferLimit(); } TEST_P(IntegrationTest, HittingDecoderFilterLimit) { testHittingDecoderFilterLimit(); } From 59ba6957d796a8a34cd8952de136e9f525576ca2 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Fri, 23 Mar 2018 01:17:00 -0700 Subject: [PATCH 02/10] Use dedicated timer for conn pool Signed-off-by: Lizan Zhou --- source/common/http/http1/conn_pool.cc | 14 ++++++++++---- source/common/http/http1/conn_pool.h | 7 ++++--- test/common/http/http1/conn_pool_test.cc | 15 ++++++++++++--- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 3f4ae82040f6b..c7ebfe3caefb8 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -20,6 +20,12 @@ namespace Envoy { namespace Http { namespace Http1 { +ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host, + Upstream::ResourcePriority priority, + const Network::ConnectionSocket::OptionsSharedPtr& options) + : dispatcher_(dispatcher), host_(host), priority_(priority), socket_options_(options), + upstream_ready_timer_(dispatcher_.createTimer([this]() { onUpstreamReady(); })) {} + ConnPoolImpl::~ConnPoolImpl() { while (!ready_clients_.empty()) { ready_clients_.front()->codec_client_->close(); @@ -214,7 +220,7 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) { } void ConnPoolImpl::onUpstreamReady() { - upstream_ready_posted_ = false; + upstream_ready_enabled_ = false; while (!pending_requests_.empty() && !ready_clients_.empty()) { ActiveClient& client = *ready_clients_.front(); ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_); @@ -233,9 +239,9 @@ void ConnPoolImpl::processIdleClient(ActiveClient& client) { ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); client.moveBetweenLists(busy_clients_, ready_clients_); - if (!pending_requests_.empty() && !upstream_ready_posted_) { - upstream_ready_posted_ = true; - dispatcher_.post([this]() { onUpstreamReady(); }); + if (!pending_requests_.empty() && !upstream_ready_enabled_) { + upstream_ready_enabled_ = true; + upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0)); } checkForDrained(); diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index f70e66b33c610..4f6293618b448 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -31,8 +31,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: public: ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority, - const Network::ConnectionSocket::OptionsSharedPtr& options) - : dispatcher_(dispatcher), host_(host), priority_(priority), socket_options_(options) {} + const Network::ConnectionSocket::OptionsSharedPtr& options); ~ConnPoolImpl(); @@ -135,7 +134,9 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: std::list drained_callbacks_; Upstream::ResourcePriority priority_; const Network::ConnectionSocket::OptionsSharedPtr socket_options_; - bool upstream_ready_posted_{false}; + + Event::TimerPtr upstream_ready_timer_; + bool upstream_ready_enabled_{false}; }; /** diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 20165a335f920..103f3ca114e2e 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -42,10 +42,15 @@ namespace Http1 { class ConnPoolImplForTest : public ConnPoolImpl { public: ConnPoolImplForTest(Event::MockDispatcher& dispatcher, - Upstream::ClusterInfoConstSharedPtr cluster) + Upstream::ClusterInfoConstSharedPtr cluster, + NiceMock* upstream_ready_timer) : ConnPoolImpl(dispatcher, Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000"), Upstream::ResourcePriority::Default, nullptr), - mock_dispatcher_(dispatcher) {} + mock_dispatcher_(dispatcher), mock_upstream_ready_timer_(upstream_ready_timer) { + ON_CALL(*mock_upstream_ready_timer_, enableTimer(std::chrono::milliseconds(0))) + .WillByDefault(Invoke( + [&](const std::chrono::milliseconds&) { mock_upstream_ready_timer_->callback_(); })); + } ~ConnPoolImplForTest() { EXPECT_EQ(0U, ready_clients_.size()); @@ -99,6 +104,7 @@ class ConnPoolImplForTest : public ConnPoolImpl { } Event::MockDispatcher& mock_dispatcher_; + NiceMock* mock_upstream_ready_timer_; std::vector test_clients_; }; @@ -107,7 +113,9 @@ class ConnPoolImplForTest : public ConnPoolImpl { */ class Http1ConnPoolImplTest : public testing::Test { public: - Http1ConnPoolImplTest() : conn_pool_(dispatcher_, cluster_) {} + Http1ConnPoolImplTest() + : upstream_ready_timer_(new NiceMock(&dispatcher_)), + conn_pool_(dispatcher_, cluster_, upstream_ready_timer_) {} ~Http1ConnPoolImplTest() { // Make sure all gauges are 0. @@ -118,6 +126,7 @@ class Http1ConnPoolImplTest : public testing::Test { NiceMock dispatcher_; std::shared_ptr cluster_{new NiceMock()}; + NiceMock* upstream_ready_timer_; ConnPoolImplForTest conn_pool_; NiceMock runtime_; }; From 8bc58a4dca69ebe3c9237ef86a0462645a9056eb Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Fri, 23 Mar 2018 02:26:09 -0700 Subject: [PATCH 03/10] fix tsan Signed-off-by: Lizan Zhou --- test/integration/http_integration.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index f2afee3380630..e6ade5e13505f 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -972,12 +972,12 @@ void HttpIntegrationTest::testUpstreamDisconnectWithTwoRequests() { test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 1); // Response 2. + fake_upstream_connection_->waitForDisconnect(); fake_upstream_connection_.reset(); waitForNextUpstreamRequest(); upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); upstream_request_->encodeData(1024, true); response2->waitForEndStream(); - codec_client2->close(); EXPECT_TRUE(upstream_request_->complete()); From ea7e7d04d2822113e6dfe19b9dfc6a209fe2ba12 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Tue, 27 Mar 2018 17:40:42 -0700 Subject: [PATCH 04/10] explicity test timer Signed-off-by: Lizan Zhou --- test/common/http/http1/conn_pool_test.cc | 36 ++++++++++++++++++++---- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 103f3ca114e2e..ac199b61078c1 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -46,11 +46,7 @@ class ConnPoolImplForTest : public ConnPoolImpl { NiceMock* upstream_ready_timer) : ConnPoolImpl(dispatcher, Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000"), Upstream::ResourcePriority::Default, nullptr), - mock_dispatcher_(dispatcher), mock_upstream_ready_timer_(upstream_ready_timer) { - ON_CALL(*mock_upstream_ready_timer_, enableTimer(std::chrono::milliseconds(0))) - .WillByDefault(Invoke( - [&](const std::chrono::milliseconds&) { mock_upstream_ready_timer_->callback_(); })); - } + mock_dispatcher_(dispatcher), mock_upstream_ready_timer_(upstream_ready_timer) {} ~ConnPoolImplForTest() { EXPECT_EQ(0U, ready_clients_.size()); @@ -103,6 +99,19 @@ class ConnPoolImplForTest : public ConnPoolImpl { EXPECT_CALL(*test_client.connect_timer_, enableTimer(_)); } + void expectEnableUpstreamReady() { + EXPECT_FALSE(upstream_ready_enabled_); + EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)) + .Times(1) + .RetiresOnSaturation(); + } + + void expectAndRunUpstreamReady() { + EXPECT_TRUE(upstream_ready_enabled_); + mock_upstream_ready_timer_->callback_(); + EXPECT_FALSE(upstream_ready_enabled_); + } + Event::MockDispatcher& mock_dispatcher_; NiceMock* mock_upstream_ready_timer_; std::vector test_clients_; @@ -156,9 +165,11 @@ struct ActiveTestRequest { if (type == Type::CreateConnection) { EXPECT_CALL(*parent_.conn_pool_.test_clients_[client_index_].connect_timer_, disableTimer()); + parent.conn_pool_.expectEnableUpstreamReady(); expectNewStream(); parent.conn_pool_.test_clients_[client_index_].connection_->raiseEvent( Network::ConnectionEvent::Connected); + parent.conn_pool_.expectAndRunUpstreamReady(); } } @@ -392,6 +403,7 @@ TEST_F(Http1ConnPoolImplTest, DisconnectWhileBound) { NiceMock outer_decoder; ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); + conn_pool_.expectEnableUpstreamReady(); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); EXPECT_NE(nullptr, handle); @@ -402,6 +414,7 @@ TEST_F(Http1ConnPoolImplTest, DisconnectWhileBound) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + conn_pool_.expectAndRunUpstreamReady(); // We should get a reset callback when the connection disconnects. Http::MockStreamCallbacks stream_callbacks; @@ -424,6 +437,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { NiceMock outer_decoder1; ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); + conn_pool_.expectEnableUpstreamReady(); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder1, callbacks); EXPECT_NE(nullptr, handle); @@ -444,8 +458,10 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + conn_pool_.expectAndRunUpstreamReady(); // Finishing request 1 will immediately bind to request 2. + conn_pool_.expectEnableUpstreamReady(); EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks2.pool_ready_, ready()); @@ -454,6 +470,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); + conn_pool_.expectAndRunUpstreamReady(); callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); response_headers.reset(new TestHeaderMapImpl{{":status", "200"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); @@ -474,6 +491,7 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseHeader) { NiceMock outer_decoder; ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); + conn_pool_.expectEnableUpstreamReady(); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); EXPECT_NE(nullptr, handle); @@ -485,6 +503,7 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseHeader) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + conn_pool_.expectAndRunUpstreamReady(); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); // Response with 'connection: close' which should cause the connection to go away. @@ -509,6 +528,7 @@ TEST_F(Http1ConnPoolImplTest, MaxRequestsPerConnection) { NiceMock outer_decoder; ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); + conn_pool_.expectEnableUpstreamReady(); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); EXPECT_NE(nullptr, handle); @@ -520,6 +540,7 @@ TEST_F(Http1ConnPoolImplTest, MaxRequestsPerConnection) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + conn_pool_.expectAndRunUpstreamReady(); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); // Response with 'connection: close' which should cause the connection to go away. @@ -546,8 +567,11 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) { ActiveTestRequest r3(*this, 0, ActiveTestRequest::Type::Pending); // Finish r1, which gets r3 going. + conn_pool_.expectEnableUpstreamReady(); r3.expectNewStream(); + r1.completeResponse(false); + conn_pool_.expectAndRunUpstreamReady(); r3.startRequest(); r2.completeResponse(false); @@ -613,10 +637,12 @@ TEST_F(Http1ConnPoolImplTest, RemoteCloseToCompleteResponse) { NiceMock request_encoder; Http::StreamDecoder* inner_decoder; EXPECT_CALL(*conn_pool_.test_clients_[0].connect_timer_, disableTimer()); + conn_pool_.expectEnableUpstreamReady(); EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + conn_pool_.expectAndRunUpstreamReady(); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); From 0d931ce83e810d9a92c5e87f3d81d870b4fc6017 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Tue, 27 Mar 2018 21:54:15 -0700 Subject: [PATCH 05/10] fix format Signed-off-by: Lizan Zhou --- test/common/http/http1/conn_pool_test.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index ac199b61078c1..c36d1e647fe85 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -101,9 +101,7 @@ class ConnPoolImplForTest : public ConnPoolImpl { void expectEnableUpstreamReady() { EXPECT_FALSE(upstream_ready_enabled_); - EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)) - .Times(1) - .RetiresOnSaturation(); + EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(1).RetiresOnSaturation(); } void expectAndRunUpstreamReady() { From 49e831ed72a8cfcf6d582930cec77f5ed393adcf Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Fri, 20 Apr 2018 17:54:10 -0700 Subject: [PATCH 06/10] Only delay onResponseComplete Signed-off-by: Lizan Zhou --- source/common/http/http1/conn_pool.cc | 23 ++++++++++++++++------- source/common/http/http1/conn_pool.h | 2 +- test/common/http/http1/conn_pool_test.cc | 12 ------------ 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index c7ebfe3caefb8..28ddb9813d96a 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -186,7 +186,7 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEv // whether the client is in the ready list (connected) or the busy list (failed to connect). if (event == Network::ConnectionEvent::Connected) { conn_connect_ms_->complete(); - processIdleClient(client); + processIdleClient(client, false); } } @@ -215,7 +215,7 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) { host_->cluster().stats().upstream_cx_max_requests_.inc(); onDownstreamReset(client); } else { - processIdleClient(client); + processIdleClient(client, true); } } @@ -233,13 +233,22 @@ void ConnPoolImpl::onUpstreamReady() { } } -void ConnPoolImpl::processIdleClient(ActiveClient& client) { +void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) { client.stream_wrapper_.reset(); - // There is nothing to service so just move the connection into the ready list. - ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); - client.moveBetweenLists(busy_clients_, ready_clients_); + if (pending_requests_.empty() || delay) { + // There is nothing to service so just move the connection into the ready list. + ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); + client.moveBetweenLists(busy_clients_, ready_clients_); + } else { + // There is work to do immediately so bind a request to the client and move it to the busy list. + // Pending requests are pushed onto the front, so pull from the back. + ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_); + attachRequestToClient(client, pending_requests_.back()->decoder_, + pending_requests_.back()->callbacks_); + pending_requests_.pop_back(); + } - if (!pending_requests_.empty() && !upstream_ready_enabled_) { + if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) { upstream_ready_enabled_ = true; upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0)); } diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index 4f6293618b448..fa793e984c97a 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -123,7 +123,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onPendingRequestCancel(PendingRequest& request); void onResponseComplete(ActiveClient& client); void onUpstreamReady(); - void processIdleClient(ActiveClient& client); + void processIdleClient(ActiveClient& client, bool delay); Stats::TimespanPtr conn_connect_ms_; Event::Dispatcher& dispatcher_; diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index c36d1e647fe85..3fe12b9569929 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -163,11 +163,9 @@ struct ActiveTestRequest { if (type == Type::CreateConnection) { EXPECT_CALL(*parent_.conn_pool_.test_clients_[client_index_].connect_timer_, disableTimer()); - parent.conn_pool_.expectEnableUpstreamReady(); expectNewStream(); parent.conn_pool_.test_clients_[client_index_].connection_->raiseEvent( Network::ConnectionEvent::Connected); - parent.conn_pool_.expectAndRunUpstreamReady(); } } @@ -401,7 +399,6 @@ TEST_F(Http1ConnPoolImplTest, DisconnectWhileBound) { NiceMock outer_decoder; ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); - conn_pool_.expectEnableUpstreamReady(); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); EXPECT_NE(nullptr, handle); @@ -412,7 +409,6 @@ TEST_F(Http1ConnPoolImplTest, DisconnectWhileBound) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); - conn_pool_.expectAndRunUpstreamReady(); // We should get a reset callback when the connection disconnects. Http::MockStreamCallbacks stream_callbacks; @@ -435,7 +431,6 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { NiceMock outer_decoder1; ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); - conn_pool_.expectEnableUpstreamReady(); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder1, callbacks); EXPECT_NE(nullptr, handle); @@ -456,7 +451,6 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); - conn_pool_.expectAndRunUpstreamReady(); // Finishing request 1 will immediately bind to request 2. conn_pool_.expectEnableUpstreamReady(); @@ -489,7 +483,6 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseHeader) { NiceMock outer_decoder; ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); - conn_pool_.expectEnableUpstreamReady(); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); EXPECT_NE(nullptr, handle); @@ -501,7 +494,6 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseHeader) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); - conn_pool_.expectAndRunUpstreamReady(); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); // Response with 'connection: close' which should cause the connection to go away. @@ -526,7 +518,6 @@ TEST_F(Http1ConnPoolImplTest, MaxRequestsPerConnection) { NiceMock outer_decoder; ConnPoolCallbacks callbacks; conn_pool_.expectClientCreate(); - conn_pool_.expectEnableUpstreamReady(); Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks); EXPECT_NE(nullptr, handle); @@ -538,7 +529,6 @@ TEST_F(Http1ConnPoolImplTest, MaxRequestsPerConnection) { EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); - conn_pool_.expectAndRunUpstreamReady(); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); // Response with 'connection: close' which should cause the connection to go away. @@ -635,12 +625,10 @@ TEST_F(Http1ConnPoolImplTest, RemoteCloseToCompleteResponse) { NiceMock request_encoder; Http::StreamDecoder* inner_decoder; EXPECT_CALL(*conn_pool_.test_clients_[0].connect_timer_, disableTimer()); - conn_pool_.expectEnableUpstreamReady(); EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks.pool_ready_, ready()); conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); - conn_pool_.expectAndRunUpstreamReady(); callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); From c9a14d889469110ed52187fc5772c96b120aeded Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Mon, 23 Apr 2018 15:52:36 -0700 Subject: [PATCH 07/10] address comments, add unit test Signed-off-by: Lizan Zhou --- source/common/http/http1/conn_pool.cc | 6 ++- source/common/http/http1/conn_pool.h | 1 - test/common/http/http1/conn_pool_test.cc | 62 ++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 28ddb9813d96a..12a390c47c986 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -215,6 +215,9 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) { host_->cluster().stats().upstream_cx_max_requests_.inc(); onDownstreamReset(client); } else { + // Upstream connection might be closed right after response is complete. Setting delay=true + // here to attach pending requests in next dispatcher loop to handle that case. + // https://github.com/envoyproxy/envoy/issues/2715 processIdleClient(client, true); } } @@ -236,7 +239,8 @@ void ConnPoolImpl::onUpstreamReady() { void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) { client.stream_wrapper_.reset(); if (pending_requests_.empty() || delay) { - // There is nothing to service so just move the connection into the ready list. + // There is nothing to service or delay processing is requested, so just move the connection + // into the ready list. ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); client.moveBetweenLists(busy_clients_, ready_clients_); } else { diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index fa793e984c97a..5bcd22c874ffd 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -134,7 +134,6 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: std::list drained_callbacks_; Upstream::ResourcePriority priority_; const Network::ConnectionSocket::OptionsSharedPtr socket_options_; - Event::TimerPtr upstream_ready_timer_; bool upstream_ready_enabled_{false}; }; diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 3fe12b9569929..35f8fb939fd1e 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -473,6 +473,68 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test when upstream closes connection without 'connection: close' like + * https://github.com/envoyproxy/envoy/pull/2715 + */ +TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) { + InSequence s; + + // Request 1 should kick off a new connection. + NiceMock outer_decoder1; + ConnPoolCallbacks callbacks; + conn_pool_.expectClientCreate(); + Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder1, callbacks); + + EXPECT_NE(nullptr, handle); + + // Request 2 should not kick off a new connection. + NiceMock outer_decoder2; + ConnPoolCallbacks callbacks2; + handle = conn_pool_.newStream(outer_decoder2, callbacks2); + EXPECT_EQ(1U, cluster_->stats_.upstream_cx_overflow_.value()); + + EXPECT_NE(nullptr, handle); + + // Connect event will bind to request 1. + NiceMock request_encoder; + Http::StreamDecoder* inner_decoder; + EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) + .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); + EXPECT_CALL(callbacks.pool_ready_, ready()); + + conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + + // Finishing request 1 will schedule binding the connection to request 2. + conn_pool_.expectEnableUpstreamReady(); + + callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); + Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + inner_decoder->decodeHeaders(std::move(response_headers), true); + + // Cause the connection to go away. + conn_pool_.expectClientCreate(); + EXPECT_CALL(conn_pool_, onClientDestroy()); + conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); + + conn_pool_.expectAndRunUpstreamReady(); + conn_pool_. + + EXPECT_CALL(*conn_pool_.test_clients_[1].codec_, newStream(_)) + .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); + EXPECT_CALL(callbacks2.pool_ready_, ready()); + conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::Connected); + + callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); + response_headers.reset(new TestHeaderMapImpl{{":status", "200"}}); + inner_decoder->decodeHeaders(std::move(response_headers), true); + + EXPECT_CALL(conn_pool_, onClientDestroy()); + conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); +} + /** * Test when upstream sends us 'connection: close' */ From f531874aaed3d477433bccc3e8680cccb1815836 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Mon, 23 Apr 2018 15:53:20 -0700 Subject: [PATCH 08/10] fix format Signed-off-by: Lizan Zhou --- test/common/http/http1/conn_pool_test.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 35f8fb939fd1e..66225598bba59 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -519,9 +519,10 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) { dispatcher_.clearDeferredDeleteList(); conn_pool_.expectAndRunUpstreamReady(); - conn_pool_. + conn_pool_ + . - EXPECT_CALL(*conn_pool_.test_clients_[1].codec_, newStream(_)) + EXPECT_CALL(*conn_pool_.test_clients_[1].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks2.pool_ready_, ready()); conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::Connected); From 0a8afec4ca960be5b10eb09b2b55fd77f3a212f0 Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Mon, 23 Apr 2018 16:17:54 -0700 Subject: [PATCH 09/10] fix Signed-off-by: Lizan Zhou --- test/common/http/http1/conn_pool_test.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 66225598bba59..032cf7334a8bb 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -519,10 +519,8 @@ TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) { dispatcher_.clearDeferredDeleteList(); conn_pool_.expectAndRunUpstreamReady(); - conn_pool_ - . - EXPECT_CALL(*conn_pool_.test_clients_[1].codec_, newStream(_)) + EXPECT_CALL(*conn_pool_.test_clients_[1].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks2.pool_ready_, ready()); conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::Connected); From 64e9f889d96e110466fccca51d02a0b239fd71cd Mon Sep 17 00:00:00 2001 From: Lizan Zhou Date: Tue, 24 Apr 2018 08:31:57 -0700 Subject: [PATCH 10/10] fix typo Signed-off-by: Lizan Zhou --- source/common/http/http1/conn_pool.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 12a390c47c986..505d51ec859d2 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -239,7 +239,7 @@ void ConnPoolImpl::onUpstreamReady() { void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) { client.stream_wrapper_.reset(); if (pending_requests_.empty() || delay) { - // There is nothing to service or delay processing is requested, so just move the connection + // There is nothing to service or delayed processing is requested, so just move the connection // into the ready list. ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); client.moveBetweenLists(busy_clients_, ready_clients_);