diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index de35a059d3ad5..6a3f603148003 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -228,7 +228,7 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien if (client.state() == ActiveClient::State::DRAINING && client.numActiveStreams() == 0) { // Close out the draining client if we no longer have active streams. client.close(); - } else if (client.state() == ActiveClient::State::BUSY && client.currentUnusedCapacity() != 0) { + } else if (client.state() == ActiveClient::State::BUSY && client.currentUnusedCapacity() > 0) { transitionActiveClientState(client, ActiveClient::State::READY); if (!delay_attaching_stream) { onUpstreamReady(); diff --git a/source/common/http/conn_pool_base.cc b/source/common/http/conn_pool_base.cc index 295220d79db91..e43b369c0b01c 100644 --- a/source/common/http/conn_pool_base.cc +++ b/source/common/http/conn_pool_base.cc @@ -122,6 +122,9 @@ void MultiplexedActiveClientBase::onSettings(ReceivedSettings& settings) { ASSERT(std::numeric_limits::max() >= old_unused_capacity); concurrent_stream_limit_ = settings.maxConcurrentStreams().value(); int64_t delta = old_unused_capacity - currentUnusedCapacity(); + if (state() == ActiveClient::State::READY && currentUnusedCapacity() <= 0) { + parent_.transitionActiveClientState(*this, ActiveClient::State::BUSY); + } parent_.decrClusterStreamCapacity(delta); ENVOY_CONN_LOG(trace, "Decreasing stream capacity by {}", *codec_client_, delta); negative_capacity_ += delta; diff --git a/test/common/http/http2/conn_pool_test.cc b/test/common/http/http2/conn_pool_test.cc index 23c86c527b225..aaf674207d974 100644 --- a/test/common/http/http2/conn_pool_test.cc +++ b/test/common/http/http2/conn_pool_test.cc @@ -1436,32 +1436,63 @@ TEST_F(Http2ConnPoolImplTest, PreconnectWithoutMultiplexing) { TEST_F(Http2ConnPoolImplTest, DisconnectWithNegativeCapacity) { TestScopedRuntime scoped_runtime; - cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(2); + cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(6); ON_CALL(*cluster_, perUpstreamPreconnectRatio).WillByDefault(Return(1)); - // One stream results in one connection. Two streams result in two connections. expectClientsCreate(1); ActiveTestRequest r1(*this, 0, false); - CHECK_STATE(0 /*active*/, 1 /*pending*/, 2 /*capacity*/); + CHECK_STATE(0 /*active*/, 1 /*pending*/, 6 /*capacity*/); ActiveTestRequest r2(*this, 0, false); - CHECK_STATE(0 /*active*/, 2 /*pending*/, 2 /*capacity*/); + CHECK_STATE(0 /*active*/, 2 /*pending*/, 6 /*capacity*/); + ActiveTestRequest r3(*this, 0, false); + CHECK_STATE(0 /*active*/, 3 /*pending*/, 6 /*capacity*/); + ActiveTestRequest r4(*this, 0, false); + CHECK_STATE(0 /*active*/, 4 /*pending*/, 6 /*capacity*/); + ActiveTestRequest r5(*this, 0, false); + CHECK_STATE(0 /*active*/, 5 /*pending*/, 6 /*capacity*/); - // When the connection connects, there is zero spare capacity in this pool. + // When the connection connects, there is 1 spare capacity in this pool. EXPECT_CALL(*test_clients_[0].codec_, newStream(_)) .WillOnce(DoAll(SaveArgAddress(&r1.inner_decoder_), ReturnRef(r1.inner_encoder_))) - .WillOnce(DoAll(SaveArgAddress(&r2.inner_decoder_), ReturnRef(r2.inner_encoder_))); + .WillOnce(DoAll(SaveArgAddress(&r2.inner_decoder_), ReturnRef(r2.inner_encoder_))) + .WillOnce(DoAll(SaveArgAddress(&r3.inner_decoder_), ReturnRef(r3.inner_encoder_))) + .WillOnce(DoAll(SaveArgAddress(&r4.inner_decoder_), ReturnRef(r4.inner_encoder_))) + .WillOnce(DoAll(SaveArgAddress(&r5.inner_decoder_), ReturnRef(r5.inner_encoder_))); EXPECT_CALL(r1.callbacks_.pool_ready_, ready()); EXPECT_CALL(r2.callbacks_.pool_ready_, ready()); + EXPECT_CALL(r3.callbacks_.pool_ready_, ready()); + EXPECT_CALL(r4.callbacks_.pool_ready_, ready()); + EXPECT_CALL(r5.callbacks_.pool_ready_, ready()); expectClientConnect(0); - CHECK_STATE(2 /*active*/, 0 /*pending*/, 0 /*capacity*/); + CHECK_STATE(5 /*active*/, 0 /*pending*/, 1 /*capacity*/); + EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 1); - // Settings frame reducing capacity to one stream per connection results in -1 capacity. + // Settings frame results in -2 capacity. NiceMock settings; + settings.max_concurrent_streams_ = 3; + test_clients_[0].codec_client_->onSettings(settings); + CHECK_STATE(5 /*active*/, 0 /*pending*/, -2 /*capacity*/); + EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 0); + + // Close 1 stream, concurrency capacity goes to -1, still no ready client available. + completeRequest(r1); + EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 0); + + // Close 2 streams, concurrency capacity goes to 1, there should be one ready client. + completeRequest(r2); + completeRequest(r3); + EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 1); + CHECK_STATE(2 /*active*/, 0 /*pending*/, 1 /*capacity*/); + + // Another settings frame results in -1 capacity. settings.max_concurrent_streams_ = 1; test_clients_[0].codec_client_->onSettings(settings); CHECK_STATE(2 /*active*/, 0 /*pending*/, -1 /*capacity*/); + EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 0); + // Close connection with negative capacity. closeAllClients(); + CHECK_STATE(0 /*active*/, 0 /*pending*/, 0 /*capacity*/); } TEST_F(Http2ConnPoolImplTest, PreconnectWithMultiplexing) {