diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 29a714410f8cb..802cff1d639c2 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -49,7 +49,7 @@ class ActiveClient : public LinkedObject, // Returns the concurrent stream limit, accounting for if the total stream limit // is less than the concurrent stream limit. - uint32_t effectiveConcurrentStreamLimit() const { + virtual uint32_t effectiveConcurrentStreamLimit() const { return std::min(remaining_streams_, concurrent_stream_limit_); } @@ -267,6 +267,11 @@ class ConnPoolImplBase : protected Logger::Loggable { connecting_stream_capacity_ -= delta; } + void incrConnectingAndConnectedStreamCapacity(uint32_t delta) { + state_.incrConnectingAndConnectedStreamCapacity(delta); + connecting_stream_capacity_ += delta; + } + // Called when an upstream is ready to serve pending streams. void onUpstreamReady(); @@ -309,11 +314,6 @@ class ConnPoolImplBase : protected Logger::Loggable { bool hasActiveStreams() const { return num_active_streams_ > 0; } - void incrConnectingAndConnectedStreamCapacity(uint32_t delta) { - state_.incrConnectingAndConnectedStreamCapacity(delta); - connecting_stream_capacity_ += delta; - } - Upstream::ClusterConnectivityState& state_; const Upstream::HostConstSharedPtr host_; diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index 2db0426630d67..d89c5fa69ab06 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -35,6 +35,11 @@ class ActiveClient : public MultiplexedActiveClientBase { return MultiplexedActiveClientBase::newStreamEncoder(response_decoder); } + uint32_t effectiveConcurrentStreamLimit() const override { + return std::min(MultiplexedActiveClientBase::effectiveConcurrentStreamLimit(), + quiche_capacity_); + } + // Overload the default capacity calculations to return the quic capacity // (modified by any stream limits in Envoy config) int64_t currentUnusedCapacity() const override { @@ -54,10 +59,18 @@ class ActiveClient : public MultiplexedActiveClientBase { quiche_capacity_ = new_quiche_capacity; uint64_t new_capacity = currentUnusedCapacity(); - if (new_capacity < old_capacity) { - parent_.decrClusterStreamCapacity(old_capacity - new_capacity); - } else if (old_capacity < new_capacity) { - parent_.incrClusterStreamCapacity(new_capacity - old_capacity); + if (connect_timer_) { + if (new_capacity < old_capacity) { + parent_.decrConnectingAndConnectedStreamCapacity(old_capacity - new_capacity); + } else if (old_capacity < new_capacity) { + parent_.incrConnectingAndConnectedStreamCapacity(new_capacity - old_capacity); + } + } else { + if (new_capacity < old_capacity) { + parent_.decrClusterStreamCapacity(old_capacity - new_capacity); + } else if (old_capacity < new_capacity) { + parent_.incrClusterStreamCapacity(new_capacity - old_capacity); + } } } diff --git a/test/integration/multiplexed_upstream_integration_test.cc b/test/integration/multiplexed_upstream_integration_test.cc index 1a30fda97d9db..ae1e3c8166fb0 100644 --- a/test/integration/multiplexed_upstream_integration_test.cc +++ b/test/integration/multiplexed_upstream_integration_test.cc @@ -288,6 +288,32 @@ TEST_P(MultiplexedUpstreamIntegrationTest, ManySimultaneousRequestsTightUpstream manySimultaneousRequests(1024, 1024, 10); } +TEST_P(MultiplexedUpstreamIntegrationTest, ManySimultaneousRequestsLaxUpstreamLimits) { + envoy::config::core::v3::Http2ProtocolOptions config; + config.mutable_max_concurrent_streams()->set_value(10000); + mergeOptions(config); + envoy::config::listener::v3::QuicProtocolOptions options; + options.mutable_quic_protocol_options()->mutable_max_concurrent_streams()->set_value(10000); + mergeOptions(options); + + if (upstreamProtocol() == Http::CodecType::HTTP3) { + config_helper_.addConfigModifier( + [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + RELEASE_ASSERT(bootstrap.mutable_static_resources()->clusters_size() >= 1, ""); + ConfigHelper::HttpProtocolOptions protocol_options; + protocol_options.mutable_explicit_http_config() + ->mutable_http3_protocol_options() + ->mutable_quic_protocol_options() + ->mutable_max_concurrent_streams() + ->set_value(10000); + ConfigHelper::setProtocolOptions( + *bootstrap.mutable_static_resources()->mutable_clusters(0), protocol_options); + }); + } + + manySimultaneousRequests(1024, 1024, 10); +} + TEST_P(MultiplexedUpstreamIntegrationTest, ManyLargeSimultaneousRequestWithBufferLimits) { config_helper_.setBufferLimits(1024, 1024); // Set buffer limits upstream and downstream. manySimultaneousRequests(1024 * 20, 1024 * 20);