From d4a4f823699a3b22a7b13e7c5189a4887440569f Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Tue, 5 Oct 2021 08:40:04 -0400 Subject: [PATCH 01/12] WIP Signed-off-by: Alyssa Wilk --- test/integration/multiplexed_upstream_integration_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration/multiplexed_upstream_integration_test.cc b/test/integration/multiplexed_upstream_integration_test.cc index d47abf5f0ea20..f8043f5ebe3d1 100644 --- a/test/integration/multiplexed_upstream_integration_test.cc +++ b/test/integration/multiplexed_upstream_integration_test.cc @@ -205,7 +205,6 @@ TEST_P(Http2UpstreamIntegrationTest, LargeSimultaneousRequestWithBufferLimits) { void Http2UpstreamIntegrationTest::manySimultaneousRequests(uint32_t request_bytes, uint32_t max_response_bytes, uint32_t num_requests) { - autonomous_allow_incomplete_streams_ = true; TestRandomGenerator rand; std::vector encoders; std::vector responses; From 7390eef6dfaf7210f49362206806e2ad9f4a0269 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 13 Oct 2021 14:24:48 -0400 Subject: [PATCH 02/12] quic: (mostly) fixing stream limit bug Signed-off-by: Alyssa Wilk --- test/integration/multiplexed_upstream_integration_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/multiplexed_upstream_integration_test.cc b/test/integration/multiplexed_upstream_integration_test.cc index f8043f5ebe3d1..d47abf5f0ea20 100644 --- a/test/integration/multiplexed_upstream_integration_test.cc +++ b/test/integration/multiplexed_upstream_integration_test.cc @@ -205,6 +205,7 @@ TEST_P(Http2UpstreamIntegrationTest, LargeSimultaneousRequestWithBufferLimits) { void Http2UpstreamIntegrationTest::manySimultaneousRequests(uint32_t request_bytes, uint32_t max_response_bytes, uint32_t num_requests) { + autonomous_allow_incomplete_streams_ = true; TestRandomGenerator rand; std::vector encoders; std::vector responses; From b48d11958c35713f280c8887b6f310ff4abf8f68 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Thu, 14 Oct 2021 15:02:31 -0400 Subject: [PATCH 03/12] quic: fixing stream limits bug Signed-off-by: Alyssa Wilk --- source/common/conn_pool/conn_pool_base.cc | 18 +++++++---- source/common/conn_pool/conn_pool_base.h | 6 +++- source/common/http/codec_client.h | 2 ++ source/common/http/http3/conn_pool.cc | 13 ++++++++ source/common/http/http3/conn_pool.h | 28 +++++++++++++++++ .../common/quic/envoy_quic_client_session.cc | 30 +++++++++++++++++++ .../common/quic/envoy_quic_client_session.h | 24 +++++++++++++++ .../common/quic/envoy_quic_client_stream.cc | 4 ++- .../multiplexed_upstream_integration_test.cc | 12 ++------ 9 files changed, 120 insertions(+), 17 deletions(-) diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index 4cd623df5061b..13ea296d3d087 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -175,18 +175,23 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& } ENVOY_CONN_LOG(debug, "creating stream", client); + // Latch capacity before updating remaining streams. + uint64_t capacity = client.currentUnusedCapacity(); client.remaining_streams_--; if (client.remaining_streams_ == 0) { ENVOY_CONN_LOG(debug, "maximum streams per connection, DRAINING", client); host_->cluster().stats().upstream_cx_max_requests_.inc(); transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::DRAINING); - } else if (client.numActiveStreams() + 1 >= client.concurrent_stream_limit_) { + } else if (capacity <= 1) { // As soon as the new stream is created, the client will be maxed out. transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::BUSY); } // Decrement the capacity, as there's one less stream available for serving. - state_.decrConnectingAndConnectedStreamCapacity(1); + // For HTTP/3, the capacity is updated in newStreamEncoder. + if (!quic()) { + state_.decrConnectingAndConnectedStreamCapacity(1); + } // Track the new active stream. state_.incrActiveStreams(1); num_active_streams_++; @@ -213,14 +218,17 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien // If the effective client capacity was limited by concurrency, increase connecting capacity. // If the effective client capacity was limited by max total streams, this will not result in an // increment as no capacity is freed up. - if (client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1 || - had_negative_capacity) { + // We don't update the capacity for HTTP/3 as the stream count should only + // increase when a MAX_STREAMS frame is received. + if (!quic() && (client.remaining_streams_ > + client.concurrent_stream_limit_ - client.numActiveStreams() - 1 || + had_negative_capacity)) { state_.incrConnectingAndConnectedStreamCapacity(1); } if (client.state() == ActiveClient::State::DRAINING && client.numActiveStreams() == 0) { // Close out the draining client if we no longer have active streams. client.close(); - } else if (client.state() == ActiveClient::State::BUSY) { + } else if (client.state() == ActiveClient::State::BUSY && client.currentUnusedCapacity() != 0) { transitionActiveClientState(client, ActiveClient::State::READY); if (!delay_attaching_stream) { onUpstreamReady(); diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 0c47346f5725c..26bd4191b0ac1 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -56,7 +56,7 @@ class ActiveClient : public LinkedObject, // Returns the application protocol, or absl::nullopt for TCP. virtual absl::optional protocol() const PURE; - int64_t currentUnusedCapacity() const { + virtual int64_t currentUnusedCapacity() const { int64_t remaining_concurrent_streams = static_cast(concurrent_stream_limit_) - numActiveStreams(); @@ -148,6 +148,7 @@ class ConnPoolImplBase : protected Logger::Loggable { virtual ~ConnPoolImplBase(); void deleteIsPendingImpl(); + virtual bool quic() { return false; } // A helper function to get the specific context type from the base class context. template T& typedContext(AttachContext& context) { @@ -234,6 +235,9 @@ class ConnPoolImplBase : protected Logger::Loggable { void decrClusterStreamCapacity(uint32_t delta) { state_.decrConnectingAndConnectedStreamCapacity(delta); } + void incrClusterStreamCapacity(uint32_t delta) { + state_.incrConnectingAndConnectedStreamCapacity(delta); + } void dumpState(std::ostream& os, int indent_level = 0) const { const char* spaces = spacesForLevel(indent_level); os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size()) diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index 684c962380f32..977d06f59e78a 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -131,6 +131,8 @@ class CodecClient : protected Logger::Loggable, // Note this is the L4 stream info, not L7. const StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); } + const Network::ClientConnectionPtr& connection() { return connection_; } + protected: /** * Create a codec client and connect to a remote host/port. diff --git a/source/common/http/http3/conn_pool.cc b/source/common/http/http3/conn_pool.cc index fbc3e45036078..e5555416e714e 100644 --- a/source/common/http/http3/conn_pool.cc +++ b/source/common/http/http3/conn_pool.cc @@ -28,6 +28,19 @@ ActiveClient::ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent, Upstream::Host::CreateConnectionData& data) : MultiplexedActiveClientBase(parent, getMaxStreams(parent.host()->cluster()), parent.host()->cluster().stats().upstream_cx_http3_total_, data) { + auto& connection = dynamic_cast(this)->codec_client_->connection(); + Quic::EnvoyQuicClientSession* session = const_cast( + dynamic_cast(connection.get())); + ASSERT(session != nullptr); + notifier_ = std::make_unique( + [this](int32_t s) -> void { onMaxStreamsChanged(s); }, *session); +} + +void ActiveClient::onMaxStreamsChanged(uint32_t num_streams) { + updateCapacity(num_streams); + if (state() == ActiveClient::State::BUSY && currentUnusedCapacity() != 0) { + parent_.transitionActiveClientState(*this, ActiveClient::State::READY); + } } void Http3ConnPoolImpl::setQuicConfigFromClusterConfig(const Upstream::ClusterInfo& cluster, diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index 6ae87420207e0..bfb2df78a34b5 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -23,6 +23,33 @@ class ActiveClient : public MultiplexedActiveClientBase { public: ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent, Upstream::Host::CreateConnectionData& data); + + void onMaxStreamsChanged(uint32_t num_streams); + + RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override { + ASSERT(quiche_capacity_ != 0); + updateCapacity(quiche_capacity_ - 1); + return MultiplexedActiveClientBase::newStreamEncoder(response_decoder); + } + + int64_t currentUnusedCapacity() const override { + return std::min(quiche_capacity_, effectiveConcurrentStreamLimit()); + } + + void updateCapacity(uint64_t new_quiche_capacity) { + uint64_t old_capacity = currentUnusedCapacity(); + quiche_capacity_ = new_quiche_capacity; + uint64_t new_capacity = currentUnusedCapacity(); + + if (new_capacity < old_capacity) { + parent_.decrClusterStreamCapacity(old_capacity - new_capacity); + } else if (old_capacity < new_capacity) { + parent_.incrClusterStreamCapacity(new_capacity - old_capacity); + } + } + + std::unique_ptr notifier_; + uint64_t quiche_capacity_ = 100; }; // Http3 subclass of FixedHttpConnPoolImpl which exists to store quic data. @@ -45,6 +72,7 @@ class Http3ConnPoolImpl : public FixedHttpConnPoolImpl { quic::QuicConfig& quic_config); Quic::PersistentQuicInfoImpl& quicInfo() { return *quic_info_; } + bool quic() override { return true; } private: // Store quic helpers which can be shared between connections and must live diff --git a/source/common/quic/envoy_quic_client_session.cc b/source/common/quic/envoy_quic_client_session.cc index 2329e871ad5d5..9847a18fbd11e 100644 --- a/source/common/quic/envoy_quic_client_session.cc +++ b/source/common/quic/envoy_quic_client_session.cc @@ -4,9 +4,31 @@ #include "quic_filter_manager_connection_impl.h" +namespace quic { +namespace test { + +// TODO(alyssawilk) add the necessary accessors to quiche and remove this. +class QuicSessionPeer { +public: + static quic::QuicStreamIdManager& getStream(Envoy::Quic::EnvoyQuicClientSession* session) { + return session->ietf_streamid_manager_.bidirectional_stream_id_manager_; + } +}; + +} // namespace test +} // namespace quic + namespace Envoy { namespace Quic { +ScopedStreamNotifier::ScopedStreamNotifier(std::function notify, + EnvoyQuicClientSession& session) + : on_can_create_streams_(notify), session_(session) { + session.setNotifier(*this); +} + +ScopedStreamNotifier::~ScopedStreamNotifier() { session_.clearNotifier(); } + EnvoyQuicClientSession::EnvoyQuicClientSession( const quic::QuicConfig& config, const quic::ParsedQuicVersionVector& supported_versions, std::unique_ptr connection, const quic::QuicServerId& server_id, @@ -87,6 +109,14 @@ void EnvoyQuicClientSession::SetDefaultEncryptionLevel(quic::EncryptionLevel lev } } +void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool) { + if (notifier_.has_value()) { + quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStream(this); + uint32_t streams_available = manager.outgoing_max_streams() - manager.outgoing_stream_count(); + notifier_->notify(streams_available); + } +} + std::unique_ptr EnvoyQuicClientSession::CreateClientStream() { ASSERT(codec_stats_.has_value() && http3_options_.has_value()); return std::make_unique(GetNextOutgoingBidirectionalStreamId(), this, diff --git a/source/common/quic/envoy_quic_client_session.h b/source/common/quic/envoy_quic_client_session.h index 30462fa77feed..cfcb6624368e5 100644 --- a/source/common/quic/envoy_quic_client_session.h +++ b/source/common/quic/envoy_quic_client_session.h @@ -11,6 +11,18 @@ namespace Envoy { namespace Quic { +class EnvoyQuicClientSession; + +struct ScopedStreamNotifier { + ScopedStreamNotifier(std::function notify, EnvoyQuicClientSession& session); + ~ScopedStreamNotifier(); + + void notify(uint32_t streams_available) { on_can_create_streams_(streams_available); } + + std::function on_can_create_streams_; + EnvoyQuicClientSession& session_; +}; + // Act as a Network::ClientConnection to ClientCodec. // TODO(danzh) This class doesn't need to inherit Network::FilterManager // interface but need all other Network::Connection implementation in @@ -71,6 +83,17 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, // QuicFilterManagerConnectionImpl void setHttp3Options(const envoy::config::core::v3::Http3ProtocolOptions& http3_options) override; + bool QuicheShouldCreateOutgoingBidirectionalStream() { + return quic::QuicSpdyClientSession::ShouldCreateOutgoingBidirectionalStream(); + } + + // Notify any registered connection pool when new streams are available. + void OnCanCreateNewOutgoingStream(bool) override; + + void setNotifier(ScopedStreamNotifier& notifier) { notifier_ = notifier; } + + void clearNotifier() { notifier_.reset(); } + using quic::QuicSpdyClientSession::PerformActionOnActiveStreams; protected: @@ -102,6 +125,7 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, EnvoyQuicCryptoClientStreamFactoryInterface& crypto_stream_factory_; QuicStatNames& quic_stat_names_; Stats::Scope& scope_; + OptRef notifier_; }; } // namespace Quic diff --git a/source/common/quic/envoy_quic_client_stream.cc b/source/common/quic/envoy_quic_client_stream.cc index ad26cf9e17d77..222f35f56e78f 100644 --- a/source/common/quic/envoy_quic_client_stream.cc +++ b/source/common/quic/envoy_quic_client_stream.cc @@ -286,7 +286,9 @@ void EnvoyQuicClientStream::ResetWithError(quic::QuicResetStreamError error) { stats_.tx_reset_.inc(); // Upper layers expect calling resetStream() to immediately raise reset callbacks. runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error.internal_code())); - quic::QuicSpdyClientStream::ResetWithError(error); + if (session()->connection()->connected()) { + quic::QuicSpdyClientStream::ResetWithError(error); + } } void EnvoyQuicClientStream::OnConnectionClosed(quic::QuicErrorCode error, diff --git a/test/integration/multiplexed_upstream_integration_test.cc b/test/integration/multiplexed_upstream_integration_test.cc index d47abf5f0ea20..749fadc380e7f 100644 --- a/test/integration/multiplexed_upstream_integration_test.cc +++ b/test/integration/multiplexed_upstream_integration_test.cc @@ -205,7 +205,6 @@ TEST_P(Http2UpstreamIntegrationTest, LargeSimultaneousRequestWithBufferLimits) { void Http2UpstreamIntegrationTest::manySimultaneousRequests(uint32_t request_bytes, uint32_t max_response_bytes, uint32_t num_requests) { - autonomous_allow_incomplete_streams_ = true; TestRandomGenerator rand; std::vector encoders; std::vector responses; @@ -236,11 +235,8 @@ void Http2UpstreamIntegrationTest::manySimultaneousRequests(uint32_t request_byt ASSERT_TRUE(responses[i]->waitForEndStream()); if (i % 2 != 0) { EXPECT_TRUE(responses[i]->complete()); - // TODO(18160) remove this if and always check for 200 and body length. - if (num_requests <= 100 || upstreamProtocol() != Http::CodecType::HTTP3) { - EXPECT_EQ("200", responses[i]->headers().getStatusValue()); - EXPECT_EQ(response_bytes[i], responses[i]->body().length()); - } + EXPECT_EQ("200", responses[i]->headers().getStatusValue()); + EXPECT_EQ(response_bytes[i], responses[i]->body().length()); } else { // Upstream stream reset. EXPECT_EQ("503", responses[i]->headers().getStatusValue()); @@ -255,13 +251,9 @@ TEST_P(Http2UpstreamIntegrationTest, ManySimultaneousRequest) { manySimultaneousRequests(1024, 1024, 100); } -#ifdef NDEBUG -// TODO(alyssawilk) this causes crashes in debug mode for QUIC due to a race -// condition between Envoy's stream accounting and QUICE's. Debug and fix. TEST_P(Http2UpstreamIntegrationTest, TooManySimultaneousRequests) { manySimultaneousRequests(1024, 1024, 200); } -#endif TEST_P(Http2UpstreamIntegrationTest, ManyLargeSimultaneousRequestWithBufferLimits) { config_helper_.setBufferLimits(1024, 1024); // Set buffer limits upstream and downstream. From c38fc9a3849268cbf3eb12ae66b22b94b17e2432 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 20 Oct 2021 14:22:09 -0400 Subject: [PATCH 04/12] comments Signed-off-by: Alyssa Wilk --- source/common/http/http3/conn_pool.h | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index bfb2df78a34b5..21f5e7736d179 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -24,19 +24,32 @@ class ActiveClient : public MultiplexedActiveClientBase { ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent, Upstream::Host::CreateConnectionData& data); + // Update quiche_capacity_ when a MAX_STREAMS frame arrives. void onMaxStreamsChanged(uint32_t num_streams); RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override { ASSERT(quiche_capacity_ != 0); + // Each time a quic stream is allocated the quic capacity needs to get + // decremented. See comments by quiche_capacity_. updateCapacity(quiche_capacity_ - 1); return MultiplexedActiveClientBase::newStreamEncoder(response_decoder); } + // Overload the default capacity calculations to return the quic capacity + // (modified by any stream limits in Envoy config) int64_t currentUnusedCapacity() const override { return std::min(quiche_capacity_, effectiveConcurrentStreamLimit()); } void updateCapacity(uint64_t new_quiche_capacity) { + // Each time we update the capacity make sure to reflect the update in the + // connection pool. + // + // Due to interplay between the max number of concurrent streams Envoy will + // allow and the max number of streams per connection this is not as simple + // as just updating based on the delta between quiche_capacity_ and + // new_quiche_capacity, so we use the delta between the actual calculated + // capacity before and after the update. uint64_t old_capacity = currentUnusedCapacity(); quiche_capacity_ = new_quiche_capacity; uint64_t new_capacity = currentUnusedCapacity(); @@ -49,6 +62,17 @@ class ActiveClient : public MultiplexedActiveClientBase { } std::unique_ptr notifier_; + // Unlike HTTP/2 and HTTP/1, rather than having a cap on the number of active + // streams, QUIC has a fixed number of streams available which is updated via + // the MAX_STREAMS frame. + // + // As such each time we create a new stream for QUIC, the capacity goes down + // by one, but unlike the other two codecs it is _not_ restored on stream + // closure. + // + // We track the QUIC capacity here, and overload currentUnusedCapacity so the + // connection pool can accurately keep track of when it is safe to create new + // streams. uint64_t quiche_capacity_ = 100; }; From 98b584b4575ddc8d9a43e08d689cdb6a6b51fedf Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 20 Oct 2021 14:47:33 -0400 Subject: [PATCH 05/12] format Signed-off-by: Alyssa Wilk --- source/common/http/http3/conn_pool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index 21f5e7736d179..4a8bcbafa9e84 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -30,7 +30,7 @@ class ActiveClient : public MultiplexedActiveClientBase { RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override { ASSERT(quiche_capacity_ != 0); // Each time a quic stream is allocated the quic capacity needs to get - // decremented. See comments by quiche_capacity_. + // decremented. See comments by quiche_capacity_. updateCapacity(quiche_capacity_ - 1); return MultiplexedActiveClientBase::newStreamEncoder(response_decoder); } From c6d5bc43eacbe33e43197ca95d8aabc94835bd94 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 20 Oct 2021 15:46:56 -0400 Subject: [PATCH 06/12] comments Signed-off-by: Alyssa Wilk --- envoy/http/codec.h | 7 +++++++ source/common/conn_pool/conn_pool_base.cc | 8 ++++---- source/common/conn_pool/conn_pool_base.h | 9 ++++++++- source/common/http/codec_client.h | 7 +++++-- source/common/http/http3/conn_pool.cc | 6 ------ source/common/http/http3/conn_pool.h | 9 +++++---- .../common/quic/envoy_quic_client_session.cc | 20 +++++++------------ .../common/quic/envoy_quic_client_session.h | 15 -------------- 8 files changed, 36 insertions(+), 45 deletions(-) diff --git a/envoy/http/codec.h b/envoy/http/codec.h index a1d5fc829ae0a..48a422f48f4bc 100644 --- a/envoy/http/codec.h +++ b/envoy/http/codec.h @@ -394,6 +394,13 @@ class ConnectionCallbacks { * @param ReceivedSettings the settings received from the peer. */ virtual void onSettings(ReceivedSettings& settings) { UNREFERENCED_PARAMETER(settings); } + + /** + * Fires when the MAX_STREAMS frame is received from the peer. + * This may occur multiple times across the lifetime of an HTTP/3 connection. + * @param num_streams the number of streams now allowed to be opened. + */ + virtual void onMaxStreamsChanged(uint32_t num_streams) { UNREFERENCED_PARAMETER(num_streams); } }; /** diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index 13ea296d3d087..034a96710a579 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -189,7 +189,7 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& // Decrement the capacity, as there's one less stream available for serving. // For HTTP/3, the capacity is updated in newStreamEncoder. - if (!quic()) { + if (trackStreamCapacity()) { state_.decrConnectingAndConnectedStreamCapacity(1); } // Track the new active stream. @@ -220,9 +220,9 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien // increment as no capacity is freed up. // We don't update the capacity for HTTP/3 as the stream count should only // increase when a MAX_STREAMS frame is received. - if (!quic() && (client.remaining_streams_ > - client.concurrent_stream_limit_ - client.numActiveStreams() - 1 || - had_negative_capacity)) { + if (trackStreamCapacity() && (client.remaining_streams_ > client.concurrent_stream_limit_ - + client.numActiveStreams() - 1 || + had_negative_capacity)) { state_.incrConnectingAndConnectedStreamCapacity(1); } if (client.state() == ActiveClient::State::DRAINING && client.numActiveStreams() == 0) { diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 26bd4191b0ac1..64c1f63967c4f 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -102,6 +102,10 @@ class ActiveClient : public LinkedObject, virtual void drain(); ConnPoolImplBase& parent_; + // The count of remaining streams allowed for this connection. + // This will start out as the total number of streams per connection if capped + // by configuration, or it will be set to std::numeric_limits::max() to be + // (functionally) unlimited. uint32_t remaining_streams_; uint32_t concurrent_stream_limit_; Upstream::HostDescriptionConstSharedPtr real_host_description_; @@ -148,7 +152,10 @@ class ConnPoolImplBase : protected Logger::Loggable { virtual ~ConnPoolImplBase(); void deleteIsPendingImpl(); - virtual bool quic() { return false; } + // By default, the connection pool will track connected and connecting stream + // capacity as streams are created and destroyed. QUIC does custom stream + // accounting so will override this to false. + virtual bool trackStreamCapacity() { return true; } // A helper function to get the specific context type from the base class context. template T& typedContext(AttachContext& context) { diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index 977d06f59e78a..093e28304402c 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -131,8 +131,6 @@ class CodecClient : protected Logger::Loggable, // Note this is the L4 stream info, not L7. const StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); } - const Network::ClientConnectionPtr& connection() { return connection_; } - protected: /** * Create a codec client and connect to a remote host/port. @@ -160,6 +158,11 @@ class CodecClient : protected Logger::Loggable, codec_callbacks_->onSettings(settings); } } + void onMaxStreamsChanged(uint32_t num_streams) override { + if (codec_callbacks_) { + codec_callbacks_->onMaxStreamsChanged(num_streams); + } + } void onIdleTimeout() { host_->cluster().stats().upstream_cx_idle_timeout_.inc(); diff --git a/source/common/http/http3/conn_pool.cc b/source/common/http/http3/conn_pool.cc index e5555416e714e..c5008c2be3a3c 100644 --- a/source/common/http/http3/conn_pool.cc +++ b/source/common/http/http3/conn_pool.cc @@ -28,12 +28,6 @@ ActiveClient::ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent, Upstream::Host::CreateConnectionData& data) : MultiplexedActiveClientBase(parent, getMaxStreams(parent.host()->cluster()), parent.host()->cluster().stats().upstream_cx_http3_total_, data) { - auto& connection = dynamic_cast(this)->codec_client_->connection(); - Quic::EnvoyQuicClientSession* session = const_cast( - dynamic_cast(connection.get())); - ASSERT(session != nullptr); - notifier_ = std::make_unique( - [this](int32_t s) -> void { onMaxStreamsChanged(s); }, *session); } void ActiveClient::onMaxStreamsChanged(uint32_t num_streams) { diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index 4a8bcbafa9e84..e4b1c2638f89a 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -24,8 +24,8 @@ class ActiveClient : public MultiplexedActiveClientBase { ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent, Upstream::Host::CreateConnectionData& data); - // Update quiche_capacity_ when a MAX_STREAMS frame arrives. - void onMaxStreamsChanged(uint32_t num_streams); + // Http::ConnectionCallbacks + void onMaxStreamsChanged(uint32_t num_streams) override; RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override { ASSERT(quiche_capacity_ != 0); @@ -61,7 +61,6 @@ class ActiveClient : public MultiplexedActiveClientBase { } } - std::unique_ptr notifier_; // Unlike HTTP/2 and HTTP/1, rather than having a cap on the number of active // streams, QUIC has a fixed number of streams available which is updated via // the MAX_STREAMS frame. @@ -96,7 +95,9 @@ class Http3ConnPoolImpl : public FixedHttpConnPoolImpl { quic::QuicConfig& quic_config); Quic::PersistentQuicInfoImpl& quicInfo() { return *quic_info_; } - bool quic() override { return true; } + // For HTTP/3 the base connection pool does not track stream capacity, rather + // the HTTP3 active client does. + bool trackStreamCapacity() override { return false; } private: // Store quic helpers which can be shared between connections and must live diff --git a/source/common/quic/envoy_quic_client_session.cc b/source/common/quic/envoy_quic_client_session.cc index 9847a18fbd11e..82ae70ea6107a 100644 --- a/source/common/quic/envoy_quic_client_session.cc +++ b/source/common/quic/envoy_quic_client_session.cc @@ -10,7 +10,8 @@ namespace test { // TODO(alyssawilk) add the necessary accessors to quiche and remove this. class QuicSessionPeer { public: - static quic::QuicStreamIdManager& getStream(Envoy::Quic::EnvoyQuicClientSession* session) { + static quic::QuicStreamIdManager& + getStreamIdManager(Envoy::Quic::EnvoyQuicClientSession* session) { return session->ietf_streamid_manager_.bidirectional_stream_id_manager_; } }; @@ -21,14 +22,6 @@ class QuicSessionPeer { namespace Envoy { namespace Quic { -ScopedStreamNotifier::ScopedStreamNotifier(std::function notify, - EnvoyQuicClientSession& session) - : on_can_create_streams_(notify), session_(session) { - session.setNotifier(*this); -} - -ScopedStreamNotifier::~ScopedStreamNotifier() { session_.clearNotifier(); } - EnvoyQuicClientSession::EnvoyQuicClientSession( const quic::QuicConfig& config, const quic::ParsedQuicVersionVector& supported_versions, std::unique_ptr connection, const quic::QuicServerId& server_id, @@ -109,11 +102,12 @@ void EnvoyQuicClientSession::SetDefaultEncryptionLevel(quic::EncryptionLevel lev } } -void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool) { - if (notifier_.has_value()) { - quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStream(this); +void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool unidirectional) { + if (http_connection_callbacks_ && !unidirectional) { + quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStreamIdManager(this); + ASSERT(manager.outgoing_max_streams() >= manager.outgoing_stream_count()); uint32_t streams_available = manager.outgoing_max_streams() - manager.outgoing_stream_count(); - notifier_->notify(streams_available); + http_connection_callbacks_->onMaxStreamsChanged(streams_available); } } diff --git a/source/common/quic/envoy_quic_client_session.h b/source/common/quic/envoy_quic_client_session.h index cfcb6624368e5..1f0c5c6814568 100644 --- a/source/common/quic/envoy_quic_client_session.h +++ b/source/common/quic/envoy_quic_client_session.h @@ -13,16 +13,6 @@ namespace Quic { class EnvoyQuicClientSession; -struct ScopedStreamNotifier { - ScopedStreamNotifier(std::function notify, EnvoyQuicClientSession& session); - ~ScopedStreamNotifier(); - - void notify(uint32_t streams_available) { on_can_create_streams_(streams_available); } - - std::function on_can_create_streams_; - EnvoyQuicClientSession& session_; -}; - // Act as a Network::ClientConnection to ClientCodec. // TODO(danzh) This class doesn't need to inherit Network::FilterManager // interface but need all other Network::Connection implementation in @@ -90,10 +80,6 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, // Notify any registered connection pool when new streams are available. void OnCanCreateNewOutgoingStream(bool) override; - void setNotifier(ScopedStreamNotifier& notifier) { notifier_ = notifier; } - - void clearNotifier() { notifier_.reset(); } - using quic::QuicSpdyClientSession::PerformActionOnActiveStreams; protected: @@ -125,7 +111,6 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, EnvoyQuicCryptoClientStreamFactoryInterface& crypto_stream_factory_; QuicStatNames& quic_stat_names_; Stats::Scope& scope_; - OptRef notifier_; }; } // namespace Quic From ed2b45089777b8b9229d47f018d5014b7f684309 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Wed, 20 Oct 2021 17:04:46 -0400 Subject: [PATCH 07/12] tidy Signed-off-by: Alyssa Wilk --- source/common/quic/envoy_quic_client_session.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/common/quic/envoy_quic_client_session.h b/source/common/quic/envoy_quic_client_session.h index 1f0c5c6814568..1b3a2857502c2 100644 --- a/source/common/quic/envoy_quic_client_session.h +++ b/source/common/quic/envoy_quic_client_session.h @@ -73,10 +73,6 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, // QuicFilterManagerConnectionImpl void setHttp3Options(const envoy::config::core::v3::Http3ProtocolOptions& http3_options) override; - bool QuicheShouldCreateOutgoingBidirectionalStream() { - return quic::QuicSpdyClientSession::ShouldCreateOutgoingBidirectionalStream(); - } - // Notify any registered connection pool when new streams are available. void OnCanCreateNewOutgoingStream(bool) override; From 4fca90ca0bde52be9bc4463fe6c559a2b129a2b6 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Thu, 21 Oct 2021 10:49:58 -0400 Subject: [PATCH 08/12] comments Signed-off-by: Alyssa Wilk --- source/common/http/http3/conn_pool.h | 6 ++++++ source/common/quic/envoy_quic_client_session.cc | 11 ++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index e4b1c2638f89a..db8074e7427b0 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -72,6 +72,12 @@ class ActiveClient : public MultiplexedActiveClientBase { // We track the QUIC capacity here, and overload currentUnusedCapacity so the // connection pool can accurately keep track of when it is safe to create new // streams. + // + // Though HTTP/3 should arguably start out with 0 stream capacity until the + // initial handshake is complete and MAX_STREAMS frame has been received, + // assume optimistically it will get ~100 streams, so that the connection pool + // won't fetch a connection for each incoming stream but will assume that the + // first connection will likely be able to serve 100. uint64_t quiche_capacity_ = 100; }; diff --git a/source/common/quic/envoy_quic_client_session.cc b/source/common/quic/envoy_quic_client_session.cc index 82ae70ea6107a..b9853dcb350f9 100644 --- a/source/common/quic/envoy_quic_client_session.cc +++ b/source/common/quic/envoy_quic_client_session.cc @@ -103,12 +103,13 @@ void EnvoyQuicClientSession::SetDefaultEncryptionLevel(quic::EncryptionLevel lev } void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool unidirectional) { - if (http_connection_callbacks_ && !unidirectional) { - quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStreamIdManager(this); - ASSERT(manager.outgoing_max_streams() >= manager.outgoing_stream_count()); - uint32_t streams_available = manager.outgoing_max_streams() - manager.outgoing_stream_count(); - http_connection_callbacks_->onMaxStreamsChanged(streams_available); + if (!http_connection_callbacks_ || unidirectional) { + return; } + quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStreamIdManager(this); + ASSERT(manager.outgoing_max_streams() >= manager.outgoing_stream_count()); + uint32_t streams_available = manager.outgoing_max_streams() - manager.outgoing_stream_count(); + http_connection_callbacks_->onMaxStreamsChanged(streams_available); } std::unique_ptr EnvoyQuicClientSession::CreateClientStream() { From 8d2e36f56904d5bd42a70d5823832c0045570c19 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Thu, 21 Oct 2021 17:20:51 -0400 Subject: [PATCH 09/12] comments Signed-off-by: Alyssa Wilk --- source/common/conn_pool/conn_pool_base.cc | 5 ++- source/common/conn_pool/conn_pool_base.h | 10 +++--- source/common/http/http3/conn_pool.cc | 2 ++ source/common/http/http3/conn_pool.h | 5 ++- .../common/quic/envoy_quic_client_session.cc | 32 +++++++++++-------- .../common/quic/envoy_quic_client_session.h | 4 +-- test/integration/base_integration_test.h | 3 ++ test/integration/fake_upstream.cc | 2 +- test/integration/fake_upstream.h | 4 ++- .../multiplexed_upstream_integration_test.cc | 14 ++++++++ 10 files changed, 58 insertions(+), 23 deletions(-) diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index 034a96710a579..096f27a206833 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -182,7 +182,7 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& ENVOY_CONN_LOG(debug, "maximum streams per connection, DRAINING", client); host_->cluster().stats().upstream_cx_max_requests_.inc(); transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::DRAINING); - } else if (capacity <= 1) { + } else if (capacity == 1) { // As soon as the new stream is created, the client will be maxed out. transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::BUSY); } @@ -304,6 +304,9 @@ void ConnPoolImplBase::onUpstreamReady() { state_.decrPendingStreams(1); pending_streams_.pop_back(); } + if (!pending_streams_.empty()) { + tryCreateNewConnections(); + } } std::list& ConnPoolImplBase::owningList(ActiveClient::State state) { diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 64c1f63967c4f..2b87569cb643d 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -106,6 +106,7 @@ class ActiveClient : public LinkedObject, // This will start out as the total number of streams per connection if capped // by configuration, or it will be set to std::numeric_limits::max() to be // (functionally) unlimited. + // TODO: this could be moved to an optional to make it actually unlimited. uint32_t remaining_streams_; uint32_t concurrent_stream_limit_; Upstream::HostDescriptionConstSharedPtr real_host_description_; @@ -266,9 +267,10 @@ class ConnPoolImplBase : protected Logger::Loggable { connecting_stream_capacity_ -= delta; } -protected: - virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {} + // Called when an upstream is ready to serve pending streams. + void onUpstreamReady(); +protected: enum class ConnectionResult { FailedToCreateConnection, CreatedNewConnection, @@ -276,11 +278,12 @@ class ConnPoolImplBase : protected Logger::Loggable { NoConnectionRateLimited, CreatedButRateLimited, }; - // Creates up to 3 connections, based on the preconnect ratio. // Returns the ConnectionResult of the last attempt. ConnectionResult tryCreateNewConnections(); + virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {} + // Creates a new connection if there is sufficient demand, it is allowed by resourceManager, or // to avoid starving this pool. // Demand is determined either by perUpstreamPreconnectRatio() or global_preconnect_ratio @@ -353,7 +356,6 @@ class ConnPoolImplBase : protected Logger::Loggable { // True iff this object is in the deferred delete list. bool deferred_deleting_{false}; - void onUpstreamReady(); Event::SchedulableCallbackPtr upstream_ready_cb_; }; diff --git a/source/common/http/http3/conn_pool.cc b/source/common/http/http3/conn_pool.cc index c5008c2be3a3c..2080e7811c6d4 100644 --- a/source/common/http/http3/conn_pool.cc +++ b/source/common/http/http3/conn_pool.cc @@ -34,6 +34,8 @@ void ActiveClient::onMaxStreamsChanged(uint32_t num_streams) { updateCapacity(num_streams); if (state() == ActiveClient::State::BUSY && currentUnusedCapacity() != 0) { parent_.transitionActiveClientState(*this, ActiveClient::State::READY); + // If there's waiting streams, make sure the pool will now serve them. + parent_.onUpstreamReady(); } } diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index db8074e7427b0..63576132c5172 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -77,7 +77,10 @@ class ActiveClient : public MultiplexedActiveClientBase { // initial handshake is complete and MAX_STREAMS frame has been received, // assume optimistically it will get ~100 streams, so that the connection pool // won't fetch a connection for each incoming stream but will assume that the - // first connection will likely be able to serve 100. + // first connection will likely be ablie to serve 100. + // This number will be updated to the correct value before the connection is + // deemed connected, at which point further connections will be established if + // necessary. uint64_t quiche_capacity_ = 100; }; diff --git a/source/common/quic/envoy_quic_client_session.cc b/source/common/quic/envoy_quic_client_session.cc index b9853dcb350f9..fa5b8448f9cba 100644 --- a/source/common/quic/envoy_quic_client_session.cc +++ b/source/common/quic/envoy_quic_client_session.cc @@ -94,22 +94,14 @@ void EnvoyQuicClientSession::OnRstStream(const quic::QuicRstStreamFrame& frame) /*from_self*/ false, /*is_upstream*/ true); } -void EnvoyQuicClientSession::SetDefaultEncryptionLevel(quic::EncryptionLevel level) { - quic::QuicSpdyClientSession::SetDefaultEncryptionLevel(level); - if (level == quic::ENCRYPTION_FORWARD_SECURE) { - // This is only reached once, when handshake is done. - raiseConnectionEvent(Network::ConnectionEvent::Connected); - } -} - void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool unidirectional) { if (!http_connection_callbacks_ || unidirectional) { return; } - quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStreamIdManager(this); - ASSERT(manager.outgoing_max_streams() >= manager.outgoing_stream_count()); - uint32_t streams_available = manager.outgoing_max_streams() - manager.outgoing_stream_count(); - http_connection_callbacks_->onMaxStreamsChanged(streams_available); + uint32_t streams_available = streamsAvailable(); + if (streams_available > 0) { + http_connection_callbacks_->onMaxStreamsChanged(streams_available); + } } std::unique_ptr EnvoyQuicClientSession::CreateClientStream() { @@ -140,9 +132,23 @@ quic::QuicConnection* EnvoyQuicClientSession::quicConnection() { return initialized_ ? connection() : nullptr; } +uint64_t EnvoyQuicClientSession::streamsAvailable() { + quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStreamIdManager(this); + ASSERT(manager.outgoing_max_streams() >= manager.outgoing_stream_count()); + uint32_t streams_available = manager.outgoing_max_streams() - manager.outgoing_stream_count(); + return streams_available; +} + void EnvoyQuicClientSession::OnTlsHandshakeComplete() { quic::QuicSpdyClientSession::OnTlsHandshakeComplete(); - raiseConnectionEvent(Network::ConnectionEvent::Connected); + + // Arguably the peer could start a connection with 0 streams and increase open + // streams later but this is currently unsupported. + ASSERT(streamsAvailable()); + if (streamsAvailable() > 0) { + OnCanCreateNewOutgoingStream(false); + raiseConnectionEvent(Network::ConnectionEvent::Connected); + } } std::unique_ptr EnvoyQuicClientSession::CreateQuicCryptoStream() { diff --git a/source/common/quic/envoy_quic_client_session.h b/source/common/quic/envoy_quic_client_session.h index 1b3a2857502c2..d4dd01fe3ff1a 100644 --- a/source/common/quic/envoy_quic_client_session.h +++ b/source/common/quic/envoy_quic_client_session.h @@ -60,8 +60,6 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, void MaybeSendRstStreamFrame(quic::QuicStreamId id, quic::QuicResetStreamError error, quic::QuicStreamOffset bytes_written) override; void OnRstStream(const quic::QuicRstStreamFrame& frame) override; - // quic::QuicSpdyClientSessionBase - void SetDefaultEncryptionLevel(quic::EncryptionLevel level) override; // PacketsToReadDelegate size_t numPacketsExpectedPerEventLoop() override { @@ -98,6 +96,8 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, quic::QuicConnection* quicConnection() override; private: + uint64_t streamsAvailable(); + // These callbacks are owned by network filters and quic session should outlive // them. Http::ConnectionCallbacks* http_connection_callbacks_{nullptr}; diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index 303efca46b250..528b1c0cc2af7 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -362,6 +362,9 @@ class BaseIntegrationTest : protected Logger::Loggable { void mergeOptions(envoy::config::core::v3::Http2ProtocolOptions& options) { upstream_config_.http2_options_.MergeFrom(options); } + void mergeOptions(envoy::config::listener::v3::QuicProtocolOptions& options) { + upstream_config_.quic_options_.MergeFrom(options); + } std::unique_ptr upstream_stats_store_; diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 2129334241ae7..2f33f3c3f519c 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -510,7 +510,7 @@ FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket_factory, Network::SocketPtr&& listen_socket, const FakeUpstreamConfig& config) : http_type_(config.upstream_protocol_), http2_options_(config.http2_options_), - http3_options_(config.http3_options_), + http3_options_(config.http3_options_), quic_options_(config.quic_options_), socket_(Network::SocketSharedPtr(listen_socket.release())), socket_factory_(std::make_unique(socket_)), api_(Api::createApiForTest(stats_store_)), time_system_(config.time_system_), diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index eb404facf5436..7dc98df2ac75e 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -579,6 +579,7 @@ struct FakeUpstreamConfig { absl::optional udp_fake_upstream_; envoy::config::core::v3::Http2ProtocolOptions http2_options_; envoy::config::core::v3::Http3ProtocolOptions http3_options_; + envoy::config::listener::v3::QuicProtocolOptions quic_options_; uint32_t max_request_headers_kb_ = Http::DEFAULT_MAX_REQUEST_HEADERS_KB; uint32_t max_request_headers_count_ = Http::DEFAULT_MAX_HEADERS_COUNT; envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction @@ -760,7 +761,7 @@ class FakeUpstream : Logger::Loggable, if (is_quic) { #if defined(ENVOY_ENABLE_QUIC) udp_listener_config_.listener_factory_ = std::make_unique( - envoy::config::listener::v3::QuicProtocolOptions(), 1, parent_.quic_stat_names_); + parent_.quic_options_, 1, parent_.quic_stat_names_); #else ASSERT(false, "Running a test that requires QUIC without compiling QUIC"); #endif @@ -821,6 +822,7 @@ class FakeUpstream : Logger::Loggable, const envoy::config::core::v3::Http2ProtocolOptions http2_options_; const envoy::config::core::v3::Http3ProtocolOptions http3_options_; + envoy::config::listener::v3::QuicProtocolOptions quic_options_; Network::SocketSharedPtr socket_; Network::ListenSocketFactoryPtr socket_factory_; ConditionalInitializer server_initialized_; diff --git a/test/integration/multiplexed_upstream_integration_test.cc b/test/integration/multiplexed_upstream_integration_test.cc index 749fadc380e7f..4acf747d777ce 100644 --- a/test/integration/multiplexed_upstream_integration_test.cc +++ b/test/integration/multiplexed_upstream_integration_test.cc @@ -255,6 +255,20 @@ TEST_P(Http2UpstreamIntegrationTest, TooManySimultaneousRequests) { manySimultaneousRequests(1024, 1024, 200); } +TEST_P(Http2UpstreamIntegrationTest, ManySimultaneousRequestsTightUpstreamLimits) { + if (upstreamProtocol() == Http::CodecType::HTTP2) { + return; + } + envoy::config::core::v3::Http2ProtocolOptions config; + config.mutable_max_concurrent_streams()->set_value(1); + mergeOptions(config); + envoy::config::listener::v3::QuicProtocolOptions options; + options.mutable_quic_protocol_options()->mutable_max_concurrent_streams()->set_value(1); + mergeOptions(options); + + manySimultaneousRequests(1024, 1024, 10); +} + TEST_P(Http2UpstreamIntegrationTest, ManyLargeSimultaneousRequestWithBufferLimits) { config_helper_.setBufferLimits(1024, 1024); // Set buffer limits upstream and downstream. manySimultaneousRequests(1024 * 20, 1024 * 20); From 2c508a9554ec970df8093b6756e077c4d544ecea Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 25 Oct 2021 09:27:53 -0400 Subject: [PATCH 10/12] comments Signed-off-by: Alyssa Wilk --- envoy/http/codec.h | 1 + source/common/quic/envoy_quic_client_session.cc | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/envoy/http/codec.h b/envoy/http/codec.h index 48a422f48f4bc..9a336b267fd6c 100644 --- a/envoy/http/codec.h +++ b/envoy/http/codec.h @@ -397,6 +397,7 @@ class ConnectionCallbacks { /** * Fires when the MAX_STREAMS frame is received from the peer. + * This is an HTTP/3 frame, indicating the new maximum stream ID which can be opened. * This may occur multiple times across the lifetime of an HTTP/3 connection. * @param num_streams the number of streams now allowed to be opened. */ diff --git a/source/common/quic/envoy_quic_client_session.cc b/source/common/quic/envoy_quic_client_session.cc index fa5b8448f9cba..bc82b70b9acb3 100644 --- a/source/common/quic/envoy_quic_client_session.cc +++ b/source/common/quic/envoy_quic_client_session.cc @@ -142,8 +142,7 @@ uint64_t EnvoyQuicClientSession::streamsAvailable() { void EnvoyQuicClientSession::OnTlsHandshakeComplete() { quic::QuicSpdyClientSession::OnTlsHandshakeComplete(); - // Arguably the peer could start a connection with 0 streams and increase open - // streams later but this is currently unsupported. + // TODO(alyssawilk) support the case where a connection starts with 0 max streams. ASSERT(streamsAvailable()); if (streamsAvailable() > 0) { OnCanCreateNewOutgoingStream(false); From b03f1118afad6de1f30137b55675a4499cc81b34 Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 25 Oct 2021 09:51:54 -0400 Subject: [PATCH 11/12] spelling Signed-off-by: Alyssa Wilk --- source/common/http/http3/conn_pool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index 63576132c5172..2db0426630d67 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -77,7 +77,7 @@ class ActiveClient : public MultiplexedActiveClientBase { // initial handshake is complete and MAX_STREAMS frame has been received, // assume optimistically it will get ~100 streams, so that the connection pool // won't fetch a connection for each incoming stream but will assume that the - // first connection will likely be ablie to serve 100. + // first connection will likely be able to serve 100. // This number will be updated to the correct value before the connection is // deemed connected, at which point further connections will be established if // necessary. From edd45cfa0cc2e9c6024065b5ff6da820d674705d Mon Sep 17 00:00:00 2001 From: Alyssa Wilk Date: Mon, 25 Oct 2021 15:42:13 -0400 Subject: [PATCH 12/12] revert move Signed-off-by: Alyssa Wilk --- source/common/conn_pool/conn_pool_base.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 2b87569cb643d..29a714410f8cb 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -271,6 +271,8 @@ class ConnPoolImplBase : protected Logger::Loggable { void onUpstreamReady(); protected: + virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {} + enum class ConnectionResult { FailedToCreateConnection, CreatedNewConnection, @@ -282,8 +284,6 @@ class ConnPoolImplBase : protected Logger::Loggable { // Returns the ConnectionResult of the last attempt. ConnectionResult tryCreateNewConnections(); - virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {} - // Creates a new connection if there is sufficient demand, it is allowed by resourceManager, or // to avoid starving this pool. // Demand is determined either by perUpstreamPreconnectRatio() or global_preconnect_ratio