Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
if (client.state() == ActiveClient::State::DRAINING && client.numActiveStreams() == 0) {
// Close out the draining client if we no longer have active streams.
client.close();
} else if (client.state() == ActiveClient::State::BUSY && client.currentUnusedCapacity() != 0) {
} else if (client.state() == ActiveClient::State::BUSY && client.currentUnusedCapacity() > 0) {
transitionActiveClientState(client, ActiveClient::State::READY);
if (!delay_attaching_stream) {
onUpstreamReady();
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ void MultiplexedActiveClientBase::onSettings(ReceivedSettings& settings) {
ASSERT(std::numeric_limits<int32_t>::max() >= old_unused_capacity);
concurrent_stream_limit_ = settings.maxConcurrentStreams().value();
int64_t delta = old_unused_capacity - currentUnusedCapacity();
if (state() == ActiveClient::State::READY && currentUnusedCapacity() <= 0) {
parent_.transitionActiveClientState(*this, ActiveClient::State::BUSY);
}
parent_.decrClusterStreamCapacity(delta);
ENVOY_CONN_LOG(trace, "Decreasing stream capacity by {}", *codec_client_, delta);
negative_capacity_ += delta;
Expand Down
47 changes: 39 additions & 8 deletions test/common/http/http2/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1436,32 +1436,63 @@ TEST_F(Http2ConnPoolImplTest, PreconnectWithoutMultiplexing) {

TEST_F(Http2ConnPoolImplTest, DisconnectWithNegativeCapacity) {
TestScopedRuntime scoped_runtime;
cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(2);
cluster_->http2_options_.mutable_max_concurrent_streams()->set_value(6);
ON_CALL(*cluster_, perUpstreamPreconnectRatio).WillByDefault(Return(1));

// One stream results in one connection. Two streams result in two connections.
expectClientsCreate(1);
ActiveTestRequest r1(*this, 0, false);
CHECK_STATE(0 /*active*/, 1 /*pending*/, 2 /*capacity*/);
CHECK_STATE(0 /*active*/, 1 /*pending*/, 6 /*capacity*/);
ActiveTestRequest r2(*this, 0, false);
CHECK_STATE(0 /*active*/, 2 /*pending*/, 2 /*capacity*/);
CHECK_STATE(0 /*active*/, 2 /*pending*/, 6 /*capacity*/);
ActiveTestRequest r3(*this, 0, false);
CHECK_STATE(0 /*active*/, 3 /*pending*/, 6 /*capacity*/);
ActiveTestRequest r4(*this, 0, false);
CHECK_STATE(0 /*active*/, 4 /*pending*/, 6 /*capacity*/);
ActiveTestRequest r5(*this, 0, false);
CHECK_STATE(0 /*active*/, 5 /*pending*/, 6 /*capacity*/);

// When the connection connects, there is zero spare capacity in this pool.
// When the connection connects, there is 1 spare capacity in this pool.
EXPECT_CALL(*test_clients_[0].codec_, newStream(_))
.WillOnce(DoAll(SaveArgAddress(&r1.inner_decoder_), ReturnRef(r1.inner_encoder_)))
.WillOnce(DoAll(SaveArgAddress(&r2.inner_decoder_), ReturnRef(r2.inner_encoder_)));
.WillOnce(DoAll(SaveArgAddress(&r2.inner_decoder_), ReturnRef(r2.inner_encoder_)))
.WillOnce(DoAll(SaveArgAddress(&r3.inner_decoder_), ReturnRef(r3.inner_encoder_)))
.WillOnce(DoAll(SaveArgAddress(&r4.inner_decoder_), ReturnRef(r4.inner_encoder_)))
.WillOnce(DoAll(SaveArgAddress(&r5.inner_decoder_), ReturnRef(r5.inner_encoder_)));
EXPECT_CALL(r1.callbacks_.pool_ready_, ready());
EXPECT_CALL(r2.callbacks_.pool_ready_, ready());
EXPECT_CALL(r3.callbacks_.pool_ready_, ready());
EXPECT_CALL(r4.callbacks_.pool_ready_, ready());
EXPECT_CALL(r5.callbacks_.pool_ready_, ready());
expectClientConnect(0);
CHECK_STATE(2 /*active*/, 0 /*pending*/, 0 /*capacity*/);
CHECK_STATE(5 /*active*/, 0 /*pending*/, 1 /*capacity*/);
EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 1);

// Settings frame reducing capacity to one stream per connection results in -1 capacity.
// Settings frame results in -2 capacity.
NiceMock<MockReceivedSettings> settings;
settings.max_concurrent_streams_ = 3;
test_clients_[0].codec_client_->onSettings(settings);
CHECK_STATE(5 /*active*/, 0 /*pending*/, -2 /*capacity*/);
EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 0);

// Close 1 stream, concurrency capacity goes to -1, still no ready client available.
completeRequest(r1);
EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 0);

// Close 2 streams, concurrency capacity goes to 1, there should be one ready client.
completeRequest(r2);
completeRequest(r3);
EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 1);
CHECK_STATE(2 /*active*/, 0 /*pending*/, 1 /*capacity*/);

// Another settings frame results in -1 capacity.
settings.max_concurrent_streams_ = 1;
test_clients_[0].codec_client_->onSettings(settings);
CHECK_STATE(2 /*active*/, 0 /*pending*/, -1 /*capacity*/);
EXPECT_EQ(pool_->owningList(Envoy::ConnectionPool::ActiveClient::State::READY).size(), 0);

// Close connection with negative capacity.
closeAllClients();
CHECK_STATE(0 /*active*/, 0 /*pending*/, 0 /*capacity*/);
}

TEST_F(Http2ConnPoolImplTest, PreconnectWithMultiplexing) {
Expand Down