diff --git a/source/common/conn_pool/conn_pool_base.cc b/source/common/conn_pool/conn_pool_base.cc index 096f27a206833..de35a059d3ad5 100644 --- a/source/common/conn_pool/conn_pool_base.cc +++ b/source/common/conn_pool/conn_pool_base.cc @@ -492,7 +492,9 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view client.conn_connect_ms_->complete(); client.conn_connect_ms_.reset(); ASSERT(client.state() == ActiveClient::State::CONNECTING); - transitionActiveClientState(client, ActiveClient::State::READY); + bool streams_available = client.currentUnusedCapacity() > 0; + transitionActiveClientState(client, streams_available ? ActiveClient::State::READY + : ActiveClient::State::BUSY); // Now that the active client is ready, set up a timer for max connection duration. const absl::optional max_connection_duration = @@ -506,7 +508,9 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view // At this point, for the mixed ALPN pool, the client may be deleted. Do not // refer to client after this point. onConnected(client); - onUpstreamReady(); + if (streams_available) { + onUpstreamReady(); + } checkForIdleAndCloseIdleConnsIfDraining(); } } diff --git a/source/common/http/http3/conn_pool.cc b/source/common/http/http3/conn_pool.cc index c4a53797c747e..8ded31e8cb679 100644 --- a/source/common/http/http3/conn_pool.cc +++ b/source/common/http/http3/conn_pool.cc @@ -36,6 +36,9 @@ void ActiveClient::onMaxStreamsChanged(uint32_t num_streams) { parent_.transitionActiveClientState(*this, ActiveClient::State::READY); // If there's waiting streams, make sure the pool will now serve them. parent_.onUpstreamReady(); + } else if (currentUnusedCapacity() == 0 && state() == ActiveClient::State::READY) { + // With HTTP/3 this can only happen during a rejected 0-RTT handshake. + parent_.transitionActiveClientState(*this, ActiveClient::State::BUSY); } } diff --git a/source/common/quic/codec_impl.h b/source/common/quic/codec_impl.h index b5136bb1b6313..399cf5dde889f 100644 --- a/source/common/quic/codec_impl.h +++ b/source/common/quic/codec_impl.h @@ -53,6 +53,8 @@ class QuicHttpServerConnectionImpl : public QuicHttpConnectionImplBase, void onUnderlyingConnectionAboveWriteBufferHighWatermark() override; void onUnderlyingConnectionBelowWriteBufferLowWatermark() override; + EnvoyQuicServerSession& quicServerSession() { return quic_server_session_; } + private: EnvoyQuicServerSession& quic_server_session_; }; diff --git a/source/common/quic/envoy_quic_client_session.cc b/source/common/quic/envoy_quic_client_session.cc index 4cf14c31db3b9..22191444ba358 100644 --- a/source/common/quic/envoy_quic_client_session.cc +++ b/source/common/quic/envoy_quic_client_session.cc @@ -101,9 +101,7 @@ void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool unidirectional) { return; } uint32_t streams_available = streamsAvailable(); - if (streams_available > 0) { - http_connection_callbacks_->onMaxStreamsChanged(streams_available); - } + http_connection_callbacks_->onMaxStreamsChanged(streams_available); } std::unique_ptr EnvoyQuicClientSession::CreateClientStream() { @@ -144,12 +142,11 @@ uint64_t EnvoyQuicClientSession::streamsAvailable() { void EnvoyQuicClientSession::OnTlsHandshakeComplete() { quic::QuicSpdyClientSession::OnTlsHandshakeComplete(); - // TODO(alyssawilk) support the case where a connection starts with 0 max streams. - ASSERT(streamsAvailable()); - if (streamsAvailable() > 0) { - OnCanCreateNewOutgoingStream(false); - raiseConnectionEvent(Network::ConnectionEvent::Connected); - } + // Fake this to make sure we set the connection pool stream limit correctly + // before use. This may result in OnCanCreateNewOutgoingStream with zero + // available streams. + OnCanCreateNewOutgoingStream(false); + raiseConnectionEvent(Network::ConnectionEvent::Connected); } std::unique_ptr EnvoyQuicClientSession::CreateQuicCryptoStream() { diff --git a/test/common/conn_pool/conn_pool_base_test.cc b/test/common/conn_pool/conn_pool_base_test.cc index 90ec7ceb64569..ba0f4a39f883d 100644 --- a/test/common/conn_pool/conn_pool_base_test.cc +++ b/test/common/conn_pool/conn_pool_base_test.cc @@ -35,8 +35,15 @@ class TestActiveClient : public ActiveClient { ASSERT_TRUE(testClient != nullptr); testClient->active_streams_++; } + int64_t currentUnusedCapacity() const override { + if (capacity_override_.has_value()) { + return capacity_override_.value(); + } + return ActiveClient::currentUnusedCapacity(); + } uint32_t active_streams_{}; + absl::optional capacity_override_; }; class TestPendingStream : public PendingStream { @@ -417,6 +424,26 @@ TEST_F(ConnPoolImplDispatcherBaseTest, MaxConnectionDurationCallbackWhileConnect pool_.destructAllConnections(); } +// Test the behavior of a client created with 0 zero streams available. +TEST_F(ConnPoolImplDispatcherBaseTest, NoAvailableStreams) { + // Start with a concurrent stream limit of 0. + stream_limit_ = 1; + newConnectingClient(); + clients_.back()->capacity_override_ = 0; + pool_.decrClusterStreamCapacity(stream_limit_); + + // Make sure that when the connected event is raised, there is no call to + // onPoolReady, and the client is marked as busy. + EXPECT_CALL(pool_, onPoolReady).Times(0); + clients_.back()->onEvent(Network::ConnectionEvent::Connected); + EXPECT_EQ(ActiveClient::State::BUSY, clients_.back()->state()); + + // Clean up. + EXPECT_CALL(pool_, instantiateActiveClient); + EXPECT_CALL(pool_, onPoolFailure); + pool_.destructAllConnections(); +} + // Remote close simulates the peer closing the connection. TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredRemoteClose) { EXPECT_CALL(dispatcher_, createTimer_(_)).Times(AnyNumber()); diff --git a/test/integration/BUILD b/test/integration/BUILD index 7861e4f63a6c5..8335179d0b88c 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -742,6 +742,7 @@ envoy_cc_test_library( ] + envoy_select_enable_http3([ "//source/common/quic:active_quic_listener_lib", "//source/common/quic:quic_factory_lib", + "@com_github_google_quiche//:quic_test_tools_session_peer_lib", ]), ) diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 2f33f3c3f519c..e72b03a52fe75 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -17,6 +17,7 @@ #ifdef ENVOY_ENABLE_QUIC #include "source/common/quic/codec_impl.h" +#include "quiche/quic/test_tools/quic_session_peer.h" #endif #include "source/server/connection_handler_impl.h" @@ -396,6 +397,21 @@ void FakeHttpConnection::encodeGoAway() { postToConnectionThread([this]() { codec_->goAway(); }); } +void FakeHttpConnection::updateConcurrentStreams(uint64_t max_streams) { + ASSERT(type_ >= Http::CodecType::HTTP3); + +#ifdef ENVOY_ENABLE_QUIC + postToConnectionThread([this, max_streams]() { + auto codec = dynamic_cast(codec_.get()); + quic::test::QuicSessionPeer::SetMaxOpenIncomingBidirectionalStreams(&codec->quicServerSession(), + max_streams); + codec->quicServerSession().SendMaxStreams(1, false); + }); +#else + UNREFERENCED_PARAMETER(max_streams); +#endif +} + void FakeHttpConnection::encodeProtocolError() { ASSERT(type_ >= Http::CodecType::HTTP2); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index f667bcb2a2258..c282ad207ee0a 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -462,6 +462,10 @@ class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeCo // Should only be called for HTTP2 or above, sends a GOAWAY frame with ENHANCE_YOUR_CALM. void encodeProtocolError(); + // Update the maximum number of concurrent streams. This is currently only + // supported for HTTP/3 + void updateConcurrentStreams(uint64_t max_streams); + private: struct ReadFilter : public Network::ReadFilterBaseImpl { ReadFilter(FakeHttpConnection& parent) : parent_(parent) {} diff --git a/test/integration/quic_http_integration_test.cc b/test/integration/quic_http_integration_test.cc index 843d4c1b945ca..7d2eec47ad105 100644 --- a/test/integration/quic_http_integration_test.cc +++ b/test/integration/quic_http_integration_test.cc @@ -787,6 +787,62 @@ TEST_P(QuicHttpIntegrationTest, Http3DownstreamKeepalive) { ASSERT_TRUE(response->complete()); } +TEST_P(QuicHttpIntegrationTest, NoInitialStreams) { + // Set the fake upstream to start with 0 streams available. + setUpstreamProtocol(Http::CodecType::HTTP3); + envoy::config::listener::v3::QuicProtocolOptions options; + options.mutable_quic_protocol_options()->mutable_max_concurrent_streams()->set_value(0); + mergeOptions(options); + initialize(); + + // Create the client connection and send a request. + codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), absl::nullopt); + IntegrationStreamDecoderPtr response = + codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // There should now be an upstream connection, but no upstream stream. + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_FALSE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_, + std::chrono::milliseconds(100))); + + // Update the upstream to have 1 stream available. Now Envoy should ship the + // original request upstream. + fake_upstream_connection_->updateConcurrentStreams(1); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + + // Make sure the standard request/response pipeline works as expected. + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); +} + +TEST_P(QuicHttpIntegrationTest, NoStreams) { + // Tighten the stream idle timeout, as it defaults to 5m + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { + hcm.mutable_stream_idle_timeout()->set_seconds(0); + hcm.mutable_stream_idle_timeout()->set_nanos(400 * 1000 * 1000); + }); + + // Set the fake upstream to start with 0 streams available. + setUpstreamProtocol(Http::CodecType::HTTP3); + envoy::config::listener::v3::QuicProtocolOptions options; + options.mutable_quic_protocol_options()->mutable_max_concurrent_streams()->set_value(0); + mergeOptions(options); + initialize(); + + // Create the client connection and send a request. + codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), absl::nullopt); + IntegrationStreamDecoderPtr response = + codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // Make sure the time out closes the stream. + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(response->complete()); +} + class QuicInplaceLdsIntegrationTest : public QuicHttpIntegrationTest { public: void inplaceInitialize(bool add_default_filter_chain = false) {