diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index 41f76e1f97e55..505d51ec859d2 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -20,6 +20,12 @@ namespace Envoy { namespace Http { namespace Http1 { +ConnPoolImpl::ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host, + Upstream::ResourcePriority priority, + const Network::ConnectionSocket::OptionsSharedPtr& options) + : dispatcher_(dispatcher), host_(host), priority_(priority), socket_options_(options), + upstream_ready_timer_(dispatcher_.createTimer([this]() { onUpstreamReady(); })) {} + ConnPoolImpl::~ConnPoolImpl() { while (!ready_clients_.empty()) { ready_clients_.front()->codec_client_->close(); @@ -180,7 +186,7 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, Network::ConnectionEv // whether the client is in the ready list (connected) or the busy list (failed to connect). if (event == Network::ConnectionEvent::Connected) { conn_connect_ms_->complete(); - processIdleClient(client); + processIdleClient(client, false); } } @@ -209,25 +215,48 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) { host_->cluster().stats().upstream_cx_max_requests_.inc(); onDownstreamReset(client); } else { - processIdleClient(client); + // Upstream connection might be closed right after response is complete. Setting delay=true + // here to attach pending requests in next dispatcher loop to handle that case. + // https://github.com/envoyproxy/envoy/issues/2715 + processIdleClient(client, true); + } +} + +void ConnPoolImpl::onUpstreamReady() { + upstream_ready_enabled_ = false; + while (!pending_requests_.empty() && !ready_clients_.empty()) { + ActiveClient& client = *ready_clients_.front(); + ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_); + // There is work to do so bind a request to the client and move it to the busy list. Pending + // requests are pushed onto the front, so pull from the back. + attachRequestToClient(client, pending_requests_.back()->decoder_, + pending_requests_.back()->callbacks_); + pending_requests_.pop_back(); + client.moveBetweenLists(ready_clients_, busy_clients_); } } -void ConnPoolImpl::processIdleClient(ActiveClient& client) { +void ConnPoolImpl::processIdleClient(ActiveClient& client, bool delay) { client.stream_wrapper_.reset(); - if (pending_requests_.empty()) { - // There is nothing to service so just move the connection into the ready list. + if (pending_requests_.empty() || delay) { + // There is nothing to service or delayed processing is requested, so just move the connection + // into the ready list. ENVOY_CONN_LOG(debug, "moving to ready", *client.codec_client_); client.moveBetweenLists(busy_clients_, ready_clients_); } else { - // There is work to do so bind a request to the client and move it to the busy list. Pending - // requests are pushed onto the front, so pull from the back. + // There is work to do immediately so bind a request to the client and move it to the busy list. + // Pending requests are pushed onto the front, so pull from the back. ENVOY_CONN_LOG(debug, "attaching to next request", *client.codec_client_); attachRequestToClient(client, pending_requests_.back()->decoder_, pending_requests_.back()->callbacks_); pending_requests_.pop_back(); } + if (delay && !pending_requests_.empty() && !upstream_ready_enabled_) { + upstream_ready_enabled_ = true; + upstream_ready_timer_->enableTimer(std::chrono::milliseconds(0)); + } + checkForDrained(); } diff --git a/source/common/http/http1/conn_pool.h b/source/common/http/http1/conn_pool.h index 223367402ec09..5bcd22c874ffd 100644 --- a/source/common/http/http1/conn_pool.h +++ b/source/common/http/http1/conn_pool.h @@ -31,8 +31,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: public: ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority, - const Network::ConnectionSocket::OptionsSharedPtr& options) - : dispatcher_(dispatcher), host_(host), priority_(priority), socket_options_(options) {} + const Network::ConnectionSocket::OptionsSharedPtr& options); ~ConnPoolImpl(); @@ -123,7 +122,8 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onDownstreamReset(ActiveClient& client); void onPendingRequestCancel(PendingRequest& request); void onResponseComplete(ActiveClient& client); - void processIdleClient(ActiveClient& client); + void onUpstreamReady(); + void processIdleClient(ActiveClient& client, bool delay); Stats::TimespanPtr conn_connect_ms_; Event::Dispatcher& dispatcher_; @@ -134,6 +134,8 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: std::list drained_callbacks_; Upstream::ResourcePriority priority_; const Network::ConnectionSocket::OptionsSharedPtr socket_options_; + Event::TimerPtr upstream_ready_timer_; + bool upstream_ready_enabled_{false}; }; /** diff --git a/test/common/http/http1/conn_pool_test.cc b/test/common/http/http1/conn_pool_test.cc index 20165a335f920..032cf7334a8bb 100644 --- a/test/common/http/http1/conn_pool_test.cc +++ b/test/common/http/http1/conn_pool_test.cc @@ -42,10 +42,11 @@ namespace Http1 { class ConnPoolImplForTest : public ConnPoolImpl { public: ConnPoolImplForTest(Event::MockDispatcher& dispatcher, - Upstream::ClusterInfoConstSharedPtr cluster) + Upstream::ClusterInfoConstSharedPtr cluster, + NiceMock* upstream_ready_timer) : ConnPoolImpl(dispatcher, Upstream::makeTestHost(cluster, "tcp://127.0.0.1:9000"), Upstream::ResourcePriority::Default, nullptr), - mock_dispatcher_(dispatcher) {} + mock_dispatcher_(dispatcher), mock_upstream_ready_timer_(upstream_ready_timer) {} ~ConnPoolImplForTest() { EXPECT_EQ(0U, ready_clients_.size()); @@ -98,7 +99,19 @@ class ConnPoolImplForTest : public ConnPoolImpl { EXPECT_CALL(*test_client.connect_timer_, enableTimer(_)); } + void expectEnableUpstreamReady() { + EXPECT_FALSE(upstream_ready_enabled_); + EXPECT_CALL(*mock_upstream_ready_timer_, enableTimer(_)).Times(1).RetiresOnSaturation(); + } + + void expectAndRunUpstreamReady() { + EXPECT_TRUE(upstream_ready_enabled_); + mock_upstream_ready_timer_->callback_(); + EXPECT_FALSE(upstream_ready_enabled_); + } + Event::MockDispatcher& mock_dispatcher_; + NiceMock* mock_upstream_ready_timer_; std::vector test_clients_; }; @@ -107,7 +120,9 @@ class ConnPoolImplForTest : public ConnPoolImpl { */ class Http1ConnPoolImplTest : public testing::Test { public: - Http1ConnPoolImplTest() : conn_pool_(dispatcher_, cluster_) {} + Http1ConnPoolImplTest() + : upstream_ready_timer_(new NiceMock(&dispatcher_)), + conn_pool_(dispatcher_, cluster_, upstream_ready_timer_) {} ~Http1ConnPoolImplTest() { // Make sure all gauges are 0. @@ -118,6 +133,7 @@ class Http1ConnPoolImplTest : public testing::Test { NiceMock dispatcher_; std::shared_ptr cluster_{new NiceMock()}; + NiceMock* upstream_ready_timer_; ConnPoolImplForTest conn_pool_; NiceMock runtime_; }; @@ -437,6 +453,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); // Finishing request 1 will immediately bind to request 2. + conn_pool_.expectEnableUpstreamReady(); EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); EXPECT_CALL(callbacks2.pool_ready_, ready()); @@ -445,6 +462,7 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); + conn_pool_.expectAndRunUpstreamReady(); callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); response_headers.reset(new TestHeaderMapImpl{{":status", "200"}}); inner_decoder->decodeHeaders(std::move(response_headers), true); @@ -455,6 +473,67 @@ TEST_F(Http1ConnPoolImplTest, MaxConnections) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test when upstream closes connection without 'connection: close' like + * https://github.com/envoyproxy/envoy/pull/2715 + */ +TEST_F(Http1ConnPoolImplTest, ConnectionCloseWithoutHeader) { + InSequence s; + + // Request 1 should kick off a new connection. + NiceMock outer_decoder1; + ConnPoolCallbacks callbacks; + conn_pool_.expectClientCreate(); + Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder1, callbacks); + + EXPECT_NE(nullptr, handle); + + // Request 2 should not kick off a new connection. + NiceMock outer_decoder2; + ConnPoolCallbacks callbacks2; + handle = conn_pool_.newStream(outer_decoder2, callbacks2); + EXPECT_EQ(1U, cluster_->stats_.upstream_cx_overflow_.value()); + + EXPECT_NE(nullptr, handle); + + // Connect event will bind to request 1. + NiceMock request_encoder; + Http::StreamDecoder* inner_decoder; + EXPECT_CALL(*conn_pool_.test_clients_[0].codec_, newStream(_)) + .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); + EXPECT_CALL(callbacks.pool_ready_, ready()); + + conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); + + // Finishing request 1 will schedule binding the connection to request 2. + conn_pool_.expectEnableUpstreamReady(); + + callbacks.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); + Http::HeaderMapPtr response_headers(new TestHeaderMapImpl{{":status", "200"}}); + inner_decoder->decodeHeaders(std::move(response_headers), true); + + // Cause the connection to go away. + conn_pool_.expectClientCreate(); + EXPECT_CALL(conn_pool_, onClientDestroy()); + conn_pool_.test_clients_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); + + conn_pool_.expectAndRunUpstreamReady(); + + EXPECT_CALL(*conn_pool_.test_clients_[1].codec_, newStream(_)) + .WillOnce(DoAll(SaveArgAddress(&inner_decoder), ReturnRef(request_encoder))); + EXPECT_CALL(callbacks2.pool_ready_, ready()); + conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::Connected); + + callbacks2.outer_encoder_->encodeHeaders(TestHeaderMapImpl{}, true); + response_headers.reset(new TestHeaderMapImpl{{":status", "200"}}); + inner_decoder->decodeHeaders(std::move(response_headers), true); + + EXPECT_CALL(conn_pool_, onClientDestroy()); + conn_pool_.test_clients_[1].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); +} + /** * Test when upstream sends us 'connection: close' */ @@ -537,8 +616,11 @@ TEST_F(Http1ConnPoolImplTest, ConcurrentConnections) { ActiveTestRequest r3(*this, 0, ActiveTestRequest::Type::Pending); // Finish r1, which gets r3 going. + conn_pool_.expectEnableUpstreamReady(); r3.expectNewStream(); + r1.completeResponse(false); + conn_pool_.expectAndRunUpstreamReady(); r3.startRequest(); r2.completeResponse(false); diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index fec6c45aa3442..e6ade5e13505f 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -936,6 +936,57 @@ void HttpIntegrationTest::testIdleTimeoutWithTwoRequests() { test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_idle_timeout", 1); } +void HttpIntegrationTest::testUpstreamDisconnectWithTwoRequests() { + initialize(); + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + // Request 1. + codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}, + 1024, *response_); + waitForNextUpstreamRequest(); + + // Request 2. + IntegrationStreamDecoderPtr response2{new IntegrationStreamDecoder(*dispatcher_)}; + IntegrationCodecClientPtr codec_client2 = makeHttpConnection(lookupPort("http")); + codec_client2->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}, + 512, *response2); + + // Response 1. + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(512, true); + fake_upstream_connection_->close(); + response_->waitForEndStream(); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response_->complete()); + EXPECT_STREQ("200", response_->headers().Status()->value().c_str()); + test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", 1); + test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 1); + + // Response 2. + fake_upstream_connection_->waitForDisconnect(); + fake_upstream_connection_.reset(); + waitForNextUpstreamRequest(); + upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + upstream_request_->encodeData(1024, true); + response2->waitForEndStream(); + codec_client2->close(); + + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_TRUE(response2->complete()); + EXPECT_STREQ("200", response2->headers().Status()->value().c_str()); + test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", 2); + test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 2); +} + void HttpIntegrationTest::testTwoRequests() { initialize(); diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index f4e6e365f9906..0850446708817 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -128,6 +128,7 @@ class HttpIntegrationTest : public BaseIntegrationTest { void testIdleTimeoutBasic(); void testIdleTimeoutWithTwoRequests(); void testIdleTimerDisabled(); + void testUpstreamDisconnectWithTwoRequests(); // HTTP/1 tests void testBadFirstline(); void testMissingDelimiter(); diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index f65519610550a..4e31c51c7d587 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -112,6 +112,10 @@ TEST_P(IntegrationTest, EnvoyProxyingLate100ContinueWithEncoderFilter) { TEST_P(IntegrationTest, TwoRequests) { testTwoRequests(); } +TEST_P(IntegrationTest, UpstreamDisconnectWithTwoRequests) { + testUpstreamDisconnectWithTwoRequests(); +} + TEST_P(IntegrationTest, RetryHittingBufferLimit) { testRetryHittingBufferLimit(); } TEST_P(IntegrationTest, HittingDecoderFilterLimit) { testHittingDecoderFilterLimit(); }