diff --git a/envoy/http/codec.h b/envoy/http/codec.h index a1d5fc829ae0a..9a336b267fd6c 100644 --- a/envoy/http/codec.h +++ b/envoy/http/codec.h @@ -394,6 +394,14 @@ class ConnectionCallbacks { * @param ReceivedSettings the settings received from the peer. */ virtual void onSettings(ReceivedSettings& settings) { UNREFERENCED_PARAMETER(settings); } + + /** + * Fires when the MAX_STREAMS frame is received from the peer. + * This is an HTTP/3 frame, indicating the new maximum stream ID which can be opened. + * This may occur multiple times across the lifetime of an HTTP/3 connection. + * @param num_streams the number of streams now allowed to be opened. + */ + virtual void onMaxStreamsChanged(uint32_t num_streams) { UNREFERENCED_PARAMETER(num_streams); } }; /** diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index 4cd623df5061b..096f27a206833 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -175,18 +175,23 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& } ENVOY_CONN_LOG(debug, "creating stream", client); + // Latch capacity before updating remaining streams. + uint64_t capacity = client.currentUnusedCapacity(); client.remaining_streams_--; if (client.remaining_streams_ == 0) { ENVOY_CONN_LOG(debug, "maximum streams per connection, DRAINING", client); host_->cluster().stats().upstream_cx_max_requests_.inc(); transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::DRAINING); - } else if (client.numActiveStreams() + 1 >= client.concurrent_stream_limit_) { + } else if (capacity == 1) { // As soon as the new stream is created, the client will be maxed out. transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::BUSY); } // Decrement the capacity, as there's one less stream available for serving. - state_.decrConnectingAndConnectedStreamCapacity(1); + // For HTTP/3, the capacity is updated in newStreamEncoder. + if (trackStreamCapacity()) { + state_.decrConnectingAndConnectedStreamCapacity(1); + } // Track the new active stream. state_.incrActiveStreams(1); num_active_streams_++; @@ -213,14 +218,17 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien // If the effective client capacity was limited by concurrency, increase connecting capacity. // If the effective client capacity was limited by max total streams, this will not result in an // increment as no capacity is freed up. - if (client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1 || - had_negative_capacity) { + // We don't update the capacity for HTTP/3 as the stream count should only + // increase when a MAX_STREAMS frame is received. + if (trackStreamCapacity() && (client.remaining_streams_ > client.concurrent_stream_limit_ - + client.numActiveStreams() - 1 || + had_negative_capacity)) { state_.incrConnectingAndConnectedStreamCapacity(1); } if (client.state() == ActiveClient::State::DRAINING && client.numActiveStreams() == 0) { // Close out the draining client if we no longer have active streams. client.close(); - } else if (client.state() == ActiveClient::State::BUSY) { + } else if (client.state() == ActiveClient::State::BUSY && client.currentUnusedCapacity() != 0) { transitionActiveClientState(client, ActiveClient::State::READY); if (!delay_attaching_stream) { onUpstreamReady(); @@ -296,6 +304,9 @@ void ConnPoolImplBase::onUpstreamReady() { state_.decrPendingStreams(1); pending_streams_.pop_back(); } + if (!pending_streams_.empty()) { + tryCreateNewConnections(); + } } std::list& ConnPoolImplBase::owningList(ActiveClient::State state) { diff --git a/source/common/conn_pool/conn_pool_base.h b/source/common/conn_pool/conn_pool_base.h index 0c47346f5725c..29a714410f8cb 100644 --- a/source/common/conn_pool/conn_pool_base.h +++ b/source/common/conn_pool/conn_pool_base.h @@ -56,7 +56,7 @@ class ActiveClient : public LinkedObject, // Returns the application protocol, or absl::nullopt for TCP. virtual absl::optional protocol() const PURE; - int64_t currentUnusedCapacity() const { + virtual int64_t currentUnusedCapacity() const { int64_t remaining_concurrent_streams = static_cast(concurrent_stream_limit_) - numActiveStreams(); @@ -102,6 +102,11 @@ class ActiveClient : public LinkedObject, virtual void drain(); ConnPoolImplBase& parent_; + // The count of remaining streams allowed for this connection. + // This will start out as the total number of streams per connection if capped + // by configuration, or it will be set to std::numeric_limits::max() to be + // (functionally) unlimited. + // TODO: this could be moved to an optional to make it actually unlimited. uint32_t remaining_streams_; uint32_t concurrent_stream_limit_; Upstream::HostDescriptionConstSharedPtr real_host_description_; @@ -148,6 +153,10 @@ class ConnPoolImplBase : protected Logger::Loggable { virtual ~ConnPoolImplBase(); void deleteIsPendingImpl(); + // By default, the connection pool will track connected and connecting stream + // capacity as streams are created and destroyed. QUIC does custom stream + // accounting so will override this to false. + virtual bool trackStreamCapacity() { return true; } // A helper function to get the specific context type from the base class context. template T& typedContext(AttachContext& context) { @@ -234,6 +243,9 @@ class ConnPoolImplBase : protected Logger::Loggable { void decrClusterStreamCapacity(uint32_t delta) { state_.decrConnectingAndConnectedStreamCapacity(delta); } + void incrClusterStreamCapacity(uint32_t delta) { + state_.incrConnectingAndConnectedStreamCapacity(delta); + } void dumpState(std::ostream& os, int indent_level = 0) const { const char* spaces = spacesForLevel(indent_level); os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size()) @@ -255,6 +267,9 @@ class ConnPoolImplBase : protected Logger::Loggable { connecting_stream_capacity_ -= delta; } + // Called when an upstream is ready to serve pending streams. + void onUpstreamReady(); + protected: virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {} @@ -265,7 +280,6 @@ class ConnPoolImplBase : protected Logger::Loggable { NoConnectionRateLimited, CreatedButRateLimited, }; - // Creates up to 3 connections, based on the preconnect ratio. // Returns the ConnectionResult of the last attempt. ConnectionResult tryCreateNewConnections(); @@ -342,7 +356,6 @@ class ConnPoolImplBase : protected Logger::Loggable { // True iff this object is in the deferred delete list. bool deferred_deleting_{false}; - void onUpstreamReady(); Event::SchedulableCallbackPtr upstream_ready_cb_; }; diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index 684c962380f32..093e28304402c 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -158,6 +158,11 @@ class CodecClient : protected Logger::Loggable, codec_callbacks_->onSettings(settings); } } + void onMaxStreamsChanged(uint32_t num_streams) override { + if (codec_callbacks_) { + codec_callbacks_->onMaxStreamsChanged(num_streams); + } + } void onIdleTimeout() { host_->cluster().stats().upstream_cx_idle_timeout_.inc(); diff --git a/source/common/http/http3/conn_pool.cc b/source/common/http/http3/conn_pool.cc index fbc3e45036078..2080e7811c6d4 100644 --- a/source/common/http/http3/conn_pool.cc +++ b/source/common/http/http3/conn_pool.cc @@ -30,6 +30,15 @@ ActiveClient::ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent, parent.host()->cluster().stats().upstream_cx_http3_total_, data) { } +void ActiveClient::onMaxStreamsChanged(uint32_t num_streams) { + updateCapacity(num_streams); + if (state() == ActiveClient::State::BUSY && currentUnusedCapacity() != 0) { + parent_.transitionActiveClientState(*this, ActiveClient::State::READY); + // If there's waiting streams, make sure the pool will now serve them. + parent_.onUpstreamReady(); + } +} + void Http3ConnPoolImpl::setQuicConfigFromClusterConfig(const Upstream::ClusterInfo& cluster, quic::QuicConfig& quic_config) { // TODO(alyssawilk) use and test other defaults. diff --git a/source/common/http/http3/conn_pool.h b/source/common/http/http3/conn_pool.h index 6ae87420207e0..2db0426630d67 100644 --- a/source/common/http/http3/conn_pool.h +++ b/source/common/http/http3/conn_pool.h @@ -23,6 +23,65 @@ class ActiveClient : public MultiplexedActiveClientBase { public: ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent, Upstream::Host::CreateConnectionData& data); + + // Http::ConnectionCallbacks + void onMaxStreamsChanged(uint32_t num_streams) override; + + RequestEncoder& newStreamEncoder(ResponseDecoder& response_decoder) override { + ASSERT(quiche_capacity_ != 0); + // Each time a quic stream is allocated the quic capacity needs to get + // decremented. See comments by quiche_capacity_. + updateCapacity(quiche_capacity_ - 1); + return MultiplexedActiveClientBase::newStreamEncoder(response_decoder); + } + + // Overload the default capacity calculations to return the quic capacity + // (modified by any stream limits in Envoy config) + int64_t currentUnusedCapacity() const override { + return std::min(quiche_capacity_, effectiveConcurrentStreamLimit()); + } + + void updateCapacity(uint64_t new_quiche_capacity) { + // Each time we update the capacity make sure to reflect the update in the + // connection pool. + // + // Due to interplay between the max number of concurrent streams Envoy will + // allow and the max number of streams per connection this is not as simple + // as just updating based on the delta between quiche_capacity_ and + // new_quiche_capacity, so we use the delta between the actual calculated + // capacity before and after the update. + uint64_t old_capacity = currentUnusedCapacity(); + quiche_capacity_ = new_quiche_capacity; + uint64_t new_capacity = currentUnusedCapacity(); + + if (new_capacity < old_capacity) { + parent_.decrClusterStreamCapacity(old_capacity - new_capacity); + } else if (old_capacity < new_capacity) { + parent_.incrClusterStreamCapacity(new_capacity - old_capacity); + } + } + + // Unlike HTTP/2 and HTTP/1, rather than having a cap on the number of active + // streams, QUIC has a fixed number of streams available which is updated via + // the MAX_STREAMS frame. + // + // As such each time we create a new stream for QUIC, the capacity goes down + // by one, but unlike the other two codecs it is _not_ restored on stream + // closure. + // + // We track the QUIC capacity here, and overload currentUnusedCapacity so the + // connection pool can accurately keep track of when it is safe to create new + // streams. + // + // Though HTTP/3 should arguably start out with 0 stream capacity until the + // initial handshake is complete and MAX_STREAMS frame has been received, + // assume optimistically it will get ~100 streams, so that the connection pool + // won't fetch a connection for each incoming stream but will assume that the + // first connection will likely be able to serve 100. + // This number will be updated to the correct value before the connection is + // deemed connected, at which point further connections will be established if + // necessary. + uint64_t quiche_capacity_ = 100; }; // Http3 subclass of FixedHttpConnPoolImpl which exists to store quic data. @@ -45,6 +104,9 @@ class Http3ConnPoolImpl : public FixedHttpConnPoolImpl { quic::QuicConfig& quic_config); Quic::PersistentQuicInfoImpl& quicInfo() { return *quic_info_; } + // For HTTP/3 the base connection pool does not track stream capacity, rather + // the HTTP3 active client does. + bool trackStreamCapacity() override { return false; } private: // Store quic helpers which can be shared between connections and must live diff --git a/source/common/quic/envoy_quic_client_session.cc b/source/common/quic/envoy_quic_client_session.cc index 6879f5aec3268..07964d75f6757 100644 --- a/source/common/quic/envoy_quic_client_session.cc +++ b/source/common/quic/envoy_quic_client_session.cc @@ -5,6 +5,21 @@ #include "quic_filter_manager_connection_impl.h" +namespace quic { +namespace test { + +// TODO(alyssawilk) add the necessary accessors to quiche and remove this. +class QuicSessionPeer { +public: + static quic::QuicStreamIdManager& + getStreamIdManager(Envoy::Quic::EnvoyQuicClientSession* session) { + return session->ietf_streamid_manager_.bidirectional_stream_id_manager_; + } +}; + +} // namespace test +} // namespace quic + namespace Envoy { namespace Quic { @@ -82,6 +97,16 @@ void EnvoyQuicClientSession::OnRstStream(const quic::QuicRstStreamFrame& frame) /*from_self*/ false, /*is_upstream*/ true); } +void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool unidirectional) { + if (!http_connection_callbacks_ || unidirectional) { + return; + } + uint32_t streams_available = streamsAvailable(); + if (streams_available > 0) { + http_connection_callbacks_->onMaxStreamsChanged(streams_available); + } +} + std::unique_ptr EnvoyQuicClientSession::CreateClientStream() { ASSERT(codec_stats_.has_value() && http3_options_.has_value()); return std::make_unique(GetNextOutgoingBidirectionalStreamId(), this, @@ -110,9 +135,22 @@ quic::QuicConnection* EnvoyQuicClientSession::quicConnection() { return initialized_ ? connection() : nullptr; } +uint64_t EnvoyQuicClientSession::streamsAvailable() { + quic::QuicStreamIdManager& manager = quic::test::QuicSessionPeer::getStreamIdManager(this); + ASSERT(manager.outgoing_max_streams() >= manager.outgoing_stream_count()); + uint32_t streams_available = manager.outgoing_max_streams() - manager.outgoing_stream_count(); + return streams_available; +} + void EnvoyQuicClientSession::OnTlsHandshakeComplete() { quic::QuicSpdyClientSession::OnTlsHandshakeComplete(); - raiseConnectionEvent(Network::ConnectionEvent::Connected); + + // TODO(alyssawilk) support the case where a connection starts with 0 max streams. + ASSERT(streamsAvailable()); + if (streamsAvailable() > 0) { + OnCanCreateNewOutgoingStream(false); + raiseConnectionEvent(Network::ConnectionEvent::Connected); + } } std::unique_ptr EnvoyQuicClientSession::CreateQuicCryptoStream() { diff --git a/source/common/quic/envoy_quic_client_session.h b/source/common/quic/envoy_quic_client_session.h index 36ed64bec3f5f..58485b4f07b92 100644 --- a/source/common/quic/envoy_quic_client_session.h +++ b/source/common/quic/envoy_quic_client_session.h @@ -11,6 +11,8 @@ namespace Envoy { namespace Quic { +class EnvoyQuicClientSession; + // Act as a Network::ClientConnection to ClientCodec. // TODO(danzh) This class doesn't need to inherit Network::FilterManager // interface but need all other Network::Connection implementation in @@ -58,6 +60,7 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, void MaybeSendRstStreamFrame(quic::QuicStreamId id, quic::QuicResetStreamError error, quic::QuicStreamOffset bytes_written) override; void OnRstStream(const quic::QuicRstStreamFrame& frame) override; + // quic::QuicSpdyClientSessionBase bool ShouldKeepConnectionAlive() const override; // quic::ProofHandler @@ -73,6 +76,9 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, // QuicFilterManagerConnectionImpl void setHttp3Options(const envoy::config::core::v3::Http3ProtocolOptions& http3_options) override; + // Notify any registered connection pool when new streams are available. + void OnCanCreateNewOutgoingStream(bool) override; + using quic::QuicSpdyClientSession::PerformActionOnActiveStreams; protected: @@ -95,6 +101,8 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, quic::QuicConnection* quicConnection() override; private: + uint64_t streamsAvailable(); + // These callbacks are owned by network filters and quic session should outlive // them. Http::ConnectionCallbacks* http_connection_callbacks_{nullptr}; diff --git a/source/common/quic/envoy_quic_client_stream.cc b/source/common/quic/envoy_quic_client_stream.cc index ad26cf9e17d77..222f35f56e78f 100644 --- a/source/common/quic/envoy_quic_client_stream.cc +++ b/source/common/quic/envoy_quic_client_stream.cc @@ -286,7 +286,9 @@ void EnvoyQuicClientStream::ResetWithError(quic::QuicResetStreamError error) { stats_.tx_reset_.inc(); // Upper layers expect calling resetStream() to immediately raise reset callbacks. runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error.internal_code())); - quic::QuicSpdyClientStream::ResetWithError(error); + if (session()->connection()->connected()) { + quic::QuicSpdyClientStream::ResetWithError(error); + } } void EnvoyQuicClientStream::OnConnectionClosed(quic::QuicErrorCode error, diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index 303efca46b250..528b1c0cc2af7 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -362,6 +362,9 @@ class BaseIntegrationTest : protected Logger::Loggable { void mergeOptions(envoy::config::core::v3::Http2ProtocolOptions& options) { upstream_config_.http2_options_.MergeFrom(options); } + void mergeOptions(envoy::config::listener::v3::QuicProtocolOptions& options) { + upstream_config_.quic_options_.MergeFrom(options); + } std::unique_ptr upstream_stats_store_; diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 2129334241ae7..2f33f3c3f519c 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -510,7 +510,7 @@ FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket_factory, Network::SocketPtr&& listen_socket, const FakeUpstreamConfig& config) : http_type_(config.upstream_protocol_), http2_options_(config.http2_options_), - http3_options_(config.http3_options_), + http3_options_(config.http3_options_), quic_options_(config.quic_options_), socket_(Network::SocketSharedPtr(listen_socket.release())), socket_factory_(std::make_unique(socket_)), api_(Api::createApiForTest(stats_store_)), time_system_(config.time_system_), diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 83bbb6db124da..f667bcb2a2258 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -579,6 +579,7 @@ struct FakeUpstreamConfig { absl::optional udp_fake_upstream_; envoy::config::core::v3::Http2ProtocolOptions http2_options_; envoy::config::core::v3::Http3ProtocolOptions http3_options_; + envoy::config::listener::v3::QuicProtocolOptions quic_options_; uint32_t max_request_headers_kb_ = Http::DEFAULT_MAX_REQUEST_HEADERS_KB; uint32_t max_request_headers_count_ = Http::DEFAULT_MAX_HEADERS_COUNT; envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction @@ -760,7 +761,7 @@ class FakeUpstream : Logger::Loggable, if (is_quic) { #if defined(ENVOY_ENABLE_QUIC) udp_listener_config_.listener_factory_ = std::make_unique( - envoy::config::listener::v3::QuicProtocolOptions(), 1, parent_.quic_stat_names_); + parent_.quic_options_, 1, parent_.quic_stat_names_); // Initialize QUICHE flags. quiche::FlagRegistry::getInstance(); #else @@ -823,6 +824,7 @@ class FakeUpstream : Logger::Loggable, const envoy::config::core::v3::Http2ProtocolOptions http2_options_; const envoy::config::core::v3::Http3ProtocolOptions http3_options_; + envoy::config::listener::v3::QuicProtocolOptions quic_options_; Network::SocketSharedPtr socket_; Network::ListenSocketFactoryPtr socket_factory_; ConditionalInitializer server_initialized_; diff --git a/test/integration/multiplexed_upstream_integration_test.cc b/test/integration/multiplexed_upstream_integration_test.cc index d47abf5f0ea20..4acf747d777ce 100644 --- a/test/integration/multiplexed_upstream_integration_test.cc +++ b/test/integration/multiplexed_upstream_integration_test.cc @@ -205,7 +205,6 @@ TEST_P(Http2UpstreamIntegrationTest, LargeSimultaneousRequestWithBufferLimits) { void Http2UpstreamIntegrationTest::manySimultaneousRequests(uint32_t request_bytes, uint32_t max_response_bytes, uint32_t num_requests) { - autonomous_allow_incomplete_streams_ = true; TestRandomGenerator rand; std::vector encoders; std::vector responses; @@ -236,11 +235,8 @@ void Http2UpstreamIntegrationTest::manySimultaneousRequests(uint32_t request_byt ASSERT_TRUE(responses[i]->waitForEndStream()); if (i % 2 != 0) { EXPECT_TRUE(responses[i]->complete()); - // TODO(18160) remove this if and always check for 200 and body length. - if (num_requests <= 100 || upstreamProtocol() != Http::CodecType::HTTP3) { - EXPECT_EQ("200", responses[i]->headers().getStatusValue()); - EXPECT_EQ(response_bytes[i], responses[i]->body().length()); - } + EXPECT_EQ("200", responses[i]->headers().getStatusValue()); + EXPECT_EQ(response_bytes[i], responses[i]->body().length()); } else { // Upstream stream reset. EXPECT_EQ("503", responses[i]->headers().getStatusValue()); @@ -255,13 +251,23 @@ TEST_P(Http2UpstreamIntegrationTest, ManySimultaneousRequest) { manySimultaneousRequests(1024, 1024, 100); } -#ifdef NDEBUG -// TODO(alyssawilk) this causes crashes in debug mode for QUIC due to a race -// condition between Envoy's stream accounting and QUICE's. Debug and fix. TEST_P(Http2UpstreamIntegrationTest, TooManySimultaneousRequests) { manySimultaneousRequests(1024, 1024, 200); } -#endif + +TEST_P(Http2UpstreamIntegrationTest, ManySimultaneousRequestsTightUpstreamLimits) { + if (upstreamProtocol() == Http::CodecType::HTTP2) { + return; + } + envoy::config::core::v3::Http2ProtocolOptions config; + config.mutable_max_concurrent_streams()->set_value(1); + mergeOptions(config); + envoy::config::listener::v3::QuicProtocolOptions options; + options.mutable_quic_protocol_options()->mutable_max_concurrent_streams()->set_value(1); + mergeOptions(options); + + manySimultaneousRequests(1024, 1024, 10); +} TEST_P(Http2UpstreamIntegrationTest, ManyLargeSimultaneousRequestWithBufferLimits) { config_helper_.setBufferLimits(1024, 1024); // Set buffer limits upstream and downstream.