diff --git a/source/common/network/connection_impl_base.cc b/source/common/network/connection_impl_base.cc index 708038bf9d977..8cfd2f42000d3 100644 --- a/source/common/network/connection_impl_base.cc +++ b/source/common/network/connection_impl_base.cc @@ -16,6 +16,7 @@ void ConnectionImplBase::setConnectionStats(const ConnectionStats& stats) { "with the configured filter chain."); connection_stats_ = std::make_unique(stats); } + void ConnectionImplBase::setDelayedCloseTimeout(std::chrono::milliseconds timeout) { // Validate that this is only called prior to issuing a close() or closeSocket(). ASSERT(delayed_close_timer_ == nullptr && state() == State::Open); diff --git a/source/common/network/connection_impl_base.h b/source/common/network/connection_impl_base.h index e174de4981d2b..8e6bd3ed1388b 100644 --- a/source/common/network/connection_impl_base.h +++ b/source/common/network/connection_impl_base.h @@ -12,7 +12,6 @@ class ConnectionImplBase : public FilterManagerConnection, protected Logger::Loggable { public: ConnectionImplBase(Event::Dispatcher& dispatcher, uint64_t id); - ~ConnectionImplBase() override {} // Network::Connection void addConnectionCallbacks(ConnectionCallbacks& cb) override; diff --git a/source/extensions/quic_listeners/quiche/codec_impl.cc b/source/extensions/quic_listeners/quiche/codec_impl.cc index 4af18bd949b3f..49b7128aaa3a4 100644 --- a/source/extensions/quic_listeners/quiche/codec_impl.cc +++ b/source/extensions/quic_listeners/quiche/codec_impl.cc @@ -13,7 +13,7 @@ EnvoyQuicStream* quicStreamToEnvoyStream(quic::QuicStream* stream) { return dynamic_cast(stream); } -bool QuicHttpConnectionImplBase::wantsToWrite() { return quic_session_.HasDataToWrite(); } +bool QuicHttpConnectionImplBase::wantsToWrite() { return quic_session_.bytesToSend() > 0; } void QuicHttpConnectionImplBase::runWatermarkCallbacksForEachStream( quic::QuicSmallMap, 10>& stream_map, diff --git a/source/extensions/quic_listeners/quiche/codec_impl.h b/source/extensions/quic_listeners/quiche/codec_impl.h index 9394ec11e1985..3fabff7c3ae56 100644 --- a/source/extensions/quic_listeners/quiche/codec_impl.h +++ b/source/extensions/quic_listeners/quiche/codec_impl.h @@ -18,7 +18,8 @@ namespace Quic { class QuicHttpConnectionImplBase : public virtual Http::Connection, protected Logger::Loggable { public: - QuicHttpConnectionImplBase(quic::QuicSpdySession& quic_session) : quic_session_(quic_session) {} + QuicHttpConnectionImplBase(QuicFilterManagerConnectionImpl& quic_session) + : quic_session_(quic_session) {} // Http::Connection void dispatch(Buffer::Instance& /*data*/) override { @@ -35,7 +36,7 @@ class QuicHttpConnectionImplBase : public virtual Http::Connection, bool high_watermark); protected: - quic::QuicSpdySession& quic_session_; + QuicFilterManagerConnectionImpl& quic_session_; }; class QuicHttpServerConnectionImpl : public QuicHttpConnectionImplBase, diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_session.cc b/source/extensions/quic_listeners/quiche/envoy_quic_client_session.cc index d600055c5c8b0..646540ca982e0 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_client_session.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_session.cc @@ -41,6 +41,11 @@ void EnvoyQuicClientSession::Initialize() { quic_connection_->setEnvoyConnection(*this); } +void EnvoyQuicClientSession::OnCanWrite() { + quic::QuicSpdyClientSession::OnCanWrite(); + maybeApplyDelayClosePolicy(); +} + void EnvoyQuicClientSession::OnGoAway(const quic::QuicGoAwayFrame& frame) { ENVOY_CONN_LOG(debug, "GOAWAY received with error {}: {}", *this, quic::QuicErrorCodeToString(frame.error_code), frame.reason_phrase); @@ -73,5 +78,7 @@ EnvoyQuicClientSession::CreateIncomingStream(quic::PendingStream* /*pending*/) { NOT_REACHED_GCOVR_EXCL_LINE; } +bool EnvoyQuicClientSession::hasDataToWrite() { return HasDataToWrite(); } + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_session.h b/source/extensions/quic_listeners/quiche/envoy_quic_client_session.h index 1bebce79ed2b9..364478c0413a8 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_client_session.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_session.h @@ -53,6 +53,7 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, void OnConnectionClosed(const quic::QuicConnectionCloseFrame& frame, quic::ConnectionCloseSource source) override; void Initialize() override; + void OnCanWrite() override; void OnGoAway(const quic::QuicGoAwayFrame& frame) override; // quic::QuicSpdyClientSessionBase void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override; @@ -66,6 +67,9 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, quic::QuicSpdyStream* CreateIncomingStream(quic::QuicStreamId id) override; quic::QuicSpdyStream* CreateIncomingStream(quic::PendingStream* pending) override; + // QuicFilterManagerConnectionImpl + bool hasDataToWrite() override; + private: // These callbacks are owned by network filters and quic session should outlive // them. diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc index c12a32690231a..d5ba4433fe085 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc @@ -93,6 +93,13 @@ void EnvoyQuicServerSession::SendGoAway(quic::QuicErrorCode error_code, const st } } +void EnvoyQuicServerSession::OnCanWrite() { + quic::QuicServerSessionBase::OnCanWrite(); + // Do not update delay close state according to connection level packet egress because that is + // equivalent to TCP transport layer egress. But only do so if the session gets chance to write. + maybeApplyDelayClosePolicy(); +} + void EnvoyQuicServerSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { quic::QuicServerSessionBase::OnCryptoHandshakeEvent(event); if (event == HANDSHAKE_CONFIRMED) { @@ -100,5 +107,7 @@ void EnvoyQuicServerSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) } } +bool EnvoyQuicServerSession::hasDataToWrite() { return HasDataToWrite(); } + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.h b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.h index e6cd850ea486f..0e6415e0df2f0 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.h @@ -50,6 +50,7 @@ class EnvoyQuicServerSession : public quic::QuicServerSessionBase, quic::ConnectionCloseSource source) override; void Initialize() override; void SendGoAway(quic::QuicErrorCode error_code, const std::string& reason) override; + void OnCanWrite() override; // quic::QuicSpdySession void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override; @@ -68,6 +69,9 @@ class EnvoyQuicServerSession : public quic::QuicServerSessionBase, quic::QuicSpdyStream* CreateOutgoingBidirectionalStream() override; quic::QuicSpdyStream* CreateOutgoingUnidirectionalStream() override; + // QuicFilterManagerConnectionImpl + bool hasDataToWrite() override; + private: void setUpRequestDecoder(EnvoyQuicStream& stream); diff --git a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc index 13cb3829f0047..8a871fcf186a3 100644 --- a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc +++ b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc @@ -50,20 +50,46 @@ bool QuicFilterManagerConnectionImpl::aboveHighWatermark() const { } void QuicFilterManagerConnectionImpl::close(Network::ConnectionCloseType type) { - if (type != Network::ConnectionCloseType::NoFlush) { - // TODO(danzh): Implement FlushWrite and FlushWriteAndDelay mode. - } if (quic_connection_ == nullptr) { // Already detached from quic connection. return; } - closeConnectionImmediately(); -} - -void QuicFilterManagerConnectionImpl::setDelayedCloseTimeout(std::chrono::milliseconds timeout) { - if (timeout != std::chrono::milliseconds::zero()) { - // TODO(danzh) support delayed close of connection. - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + const bool delayed_close_timeout_configured = delayed_close_timeout_.count() > 0; + if (hasDataToWrite() && type != Network::ConnectionCloseType::NoFlush) { + if (delayed_close_timeout_configured) { + // QUIC connection has unsent data and caller wants to flush them. Wait for flushing or + // timeout. + if (!inDelayedClose()) { + // Only set alarm if not in delay close mode yet. + initializeDelayedCloseTimer(); + } + // Update delay close state according to current call. + if (type == Network::ConnectionCloseType::FlushWriteAndDelay) { + delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait; + } else { + ASSERT(type == Network::ConnectionCloseType::FlushWrite); + delayed_close_state_ = DelayedCloseState::CloseAfterFlush; + } + } else { + delayed_close_state_ = DelayedCloseState::CloseAfterFlush; + } + } else if (hasDataToWrite()) { + // Quic connection has unsent data but caller wants to close right away. + ASSERT(type == Network::ConnectionCloseType::NoFlush); + quic_connection_->OnCanWrite(); + closeConnectionImmediately(); + } else { + // Quic connection doesn't have unsent data. It's up to the caller and + // the configuration whether to wait or not before closing. + if (delayed_close_timeout_configured && + type == Network::ConnectionCloseType::FlushWriteAndDelay) { + if (!inDelayedClose()) { + initializeDelayedCloseTimer(); + } + delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait; + } else { + closeConnectionImmediately(); + } } } @@ -102,6 +128,21 @@ void QuicFilterManagerConnectionImpl::adjustBytesToSend(int64_t delta) { write_buffer_watermark_simulation_.checkLowWatermark(bytes_to_send_); } +void QuicFilterManagerConnectionImpl::maybeApplyDelayClosePolicy() { + if (!inDelayedClose()) { + return; + } + if (hasDataToWrite() || delayed_close_state_ == DelayedCloseState::CloseAfterFlushAndWait) { + if (delayed_close_timer_ != nullptr) { + // Re-arm delay close timer on every write event if there are still data + // buffered or the connection close is supposed to be delayed. + delayed_close_timer_->enableTimer(delayed_close_timeout_); + } + } else { + closeConnectionImmediately(); + } +} + void QuicFilterManagerConnectionImpl::onConnectionCloseEvent( const quic::QuicConnectionCloseFrame& frame, quic::ConnectionCloseSource source) { transport_failure_reason_ = absl::StrCat(quic::QuicErrorCodeToString(frame.quic_error_code), @@ -115,6 +156,9 @@ void QuicFilterManagerConnectionImpl::onConnectionCloseEvent( } void QuicFilterManagerConnectionImpl::closeConnectionImmediately() { + if (quic_connection_ == nullptr) { + return; + } quic_connection_->CloseConnection(quic::QUIC_NO_ERROR, "Closed by application", quic::ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); quic_connection_ = nullptr; diff --git a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h index a6b34cda9e798..9ba0282bf2394 100644 --- a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h +++ b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h @@ -40,7 +40,6 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase { void noDelay(bool /*enable*/) override { // No-op. TCP_NODELAY doesn't apply to UDP. } - void setDelayedCloseTimeout(std::chrono::milliseconds timeout) override; void readDisable(bool /*disable*/) override { NOT_REACHED_GCOVR_EXCL_LINE; } void detectEarlyCloseWhenReadDisabled(bool /*value*/) override { NOT_REACHED_GCOVR_EXCL_LINE; } bool readEnabled() const override { return true; } @@ -98,6 +97,11 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase { // streams, and run watermark check. void adjustBytesToSend(int64_t delta); + // Called after each write when a previous connection close call is postponed. + void maybeApplyDelayClosePolicy(); + + uint32_t bytesToSend() { return bytes_to_send_; } + protected: // Propagate connection close to network_connection_callbacks_. void onConnectionCloseEvent(const quic::QuicConnectionCloseFrame& frame, @@ -105,6 +109,8 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase { void closeConnectionImmediately() override; + virtual bool hasDataToWrite() PURE; + EnvoyQuicConnection* quic_connection_{nullptr}; private: diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc index aa9d2e3ff6fd1..b77508765d30a 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc @@ -83,6 +83,13 @@ class TestEnvoyQuicServerSession : public EnvoyQuicServerSession { } }; +class TestQuicCryptoServerStream : public quic::QuicCryptoServerStream { +public: + using quic::QuicCryptoServerStream::QuicCryptoServerStream; + + bool encryption_established() const override { return true; } +}; + class EnvoyQuicServerSessionTest : public testing::TestWithParam { public: EnvoyQuicServerSessionTest() @@ -148,11 +155,27 @@ class EnvoyQuicServerSessionTest : public testing::TestWithParam { return envoy_quic_session_.initializeReadFilters(); } + quic::QuicStream* createNewStream(Http::MockStreamDecoder& request_decoder, + Http::MockStreamCallbacks& stream_callbacks) { + EXPECT_CALL(http_connection_callbacks_, newStream(_, false)) + .WillOnce(Invoke([&request_decoder, &stream_callbacks](Http::StreamEncoder& encoder, + bool) -> Http::StreamDecoder& { + encoder.getStream().addCallbacks(stream_callbacks); + return request_decoder; + })); + quic::QuicStreamId stream_id = + quic_version_[0].transport_version == quic::QUIC_VERSION_99 ? 4u : 5u; + return envoy_quic_session_.GetOrCreateStream(stream_id); + } + void TearDown() override { if (quic_connection_->connected()) { EXPECT_CALL(*quic_connection_, SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(*quic_connection_, SendControlFrame(_)) + .Times(testing::AtMost(1)) + .WillOnce(Invoke([](const quic::QuicFrame&) { return false; })); envoy_quic_session_.close(Network::ConnectionCloseType::NoFlush); } } @@ -234,7 +257,6 @@ TEST_P(EnvoyQuicServerSessionTest, InvalidIncomingStreamId) { } TEST_P(EnvoyQuicServerSessionTest, NoNewStreamForInvalidIncomingStream) { - quic::SetVerbosityLogThreshold(1); installReadFilter(); Http::MockStreamDecoder request_decoder; Http::MockStreamCallbacks stream_callbacks; @@ -254,24 +276,13 @@ TEST_P(EnvoyQuicServerSessionTest, OnResetFrame) { installReadFilter(); Http::MockStreamDecoder request_decoder; Http::MockStreamCallbacks stream_callbacks; - EXPECT_CALL(http_connection_callbacks_, newStream(_, false)) - .WillRepeatedly(Invoke([&request_decoder, &stream_callbacks](Http::StreamEncoder& encoder, - bool) -> Http::StreamDecoder& { - encoder.getStream().addCallbacks(stream_callbacks); - return request_decoder; - })); - - // G-QUIC or IETF bi-directional stream. - quic::QuicStreamId stream_id = - quic_version_[0].transport_version == quic::QUIC_VERSION_99 ? 4u : 5u; - - quic::QuicStream* stream1 = envoy_quic_session_.GetOrCreateStream(stream_id); + quic::QuicStream* stream1 = createNewStream(request_decoder, stream_callbacks); quic::QuicRstStreamFrame rst1(/*control_frame_id=*/1u, stream1->id(), quic::QUIC_ERROR_PROCESSING_STREAM, /*bytes_written=*/0u); EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::RemoteReset, _)); if (quic_version_[0].transport_version < quic::QUIC_VERSION_99) { EXPECT_CALL(*quic_connection_, SendControlFrame(_)) - .WillOnce(Invoke([stream_id](const quic::QuicFrame& frame) { + .WillOnce(Invoke([stream_id = stream1->id()](const quic::QuicFrame& frame) { EXPECT_EQ(stream_id, frame.rst_stream_frame->stream_id); EXPECT_EQ(quic::QUIC_RST_ACKNOWLEDGEMENT, frame.rst_stream_frame->error_code); return false; @@ -279,8 +290,13 @@ TEST_P(EnvoyQuicServerSessionTest, OnResetFrame) { } stream1->OnStreamReset(rst1); - // G-QUIC bi-directional stream or IETF read uni-directional stream. - quic::QuicStream* stream2 = envoy_quic_session_.GetOrCreateStream(stream_id + 4u); + EXPECT_CALL(http_connection_callbacks_, newStream(_, false)) + .WillOnce(Invoke([&request_decoder, &stream_callbacks](Http::StreamEncoder& encoder, + bool) -> Http::StreamDecoder& { + encoder.getStream().addCallbacks(stream_callbacks); + return request_decoder; + })); + quic::QuicStream* stream2 = envoy_quic_session_.GetOrCreateStream(stream1->id() + 4u); quic::QuicRstStreamFrame rst2(/*control_frame_id=*/1u, stream2->id(), quic::QUIC_REFUSED_STREAM, /*bytes_written=*/0u); EXPECT_CALL(stream_callbacks, @@ -307,31 +323,360 @@ TEST_P(EnvoyQuicServerSessionTest, ConnectionCloseWithActiveStream) { Http::MockStreamDecoder request_decoder; Http::MockStreamCallbacks stream_callbacks; - EXPECT_CALL(http_connection_callbacks_, newStream(_, false)) - .WillOnce(Invoke([&request_decoder, &stream_callbacks](Http::StreamEncoder& encoder, - bool) -> Http::StreamDecoder& { - encoder.getStream().addCallbacks(stream_callbacks); - return request_decoder; - })); - quic::QuicStreamId stream_id = - quic_version_[0].transport_version == quic::QUIC_VERSION_99 ? 4u : 5u; - quic::QuicStream* stream = envoy_quic_session_.GetOrCreateStream(stream_id); + quic::QuicStream* stream = createNewStream(request_decoder, stream_callbacks); + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + envoy_quic_session_.close(Network::ConnectionCloseType::NoFlush); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + EXPECT_TRUE(stream->write_side_closed() && stream->reading_stopped()); +} + +TEST_P(EnvoyQuicServerSessionTest, NoFlushWithDataToWrite) { + installReadFilter(); + + Http::MockStreamDecoder request_decoder; + Http::MockStreamCallbacks stream_callbacks; + quic::QuicStream* stream = createNewStream(request_decoder, stream_callbacks); + envoy_quic_session_.MarkConnectionLevelWriteBlocked(stream->id()); EXPECT_CALL(*quic_connection_, SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + // Even though the stream is write blocked, connection should be closed + // immediately. envoy_quic_session_.close(Network::ConnectionCloseType::NoFlush); EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); EXPECT_TRUE(stream->write_side_closed() && stream->reading_stopped()); } -TEST_P(EnvoyQuicServerSessionTest, FlushCloseNotSupported) { +TEST_P(EnvoyQuicServerSessionTest, FlushCloseWithDataToWrite) { + installReadFilter(); + Http::MockStreamDecoder request_decoder; + Http::MockStreamCallbacks stream_callbacks; + quic::QuicStream* stream = createNewStream(request_decoder, stream_callbacks); + + envoy_quic_session_.MarkConnectionLevelWriteBlocked(stream->id()); + EXPECT_TRUE(envoy_quic_session_.HasDataToWrite()); + // Connection shouldn't be closed right away as there is a stream write blocked. + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWrite); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + // Unblock that stream to trigger actual connection close. + envoy_quic_session_.OnCanWrite(); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + EXPECT_FALSE(quic_connection_->connected()); +} + +// Tests that a write event after flush close should update the delay close +// timer. +TEST_P(EnvoyQuicServerSessionTest, WriteUpdatesDelayCloseTimer) { + installReadFilter(); + // Switch to a encryption forward secure crypto stream. + quic::test::QuicServerSessionBasePeer::SetCryptoStream(&envoy_quic_session_, nullptr); + quic::test::QuicServerSessionBasePeer::SetCryptoStream( + &envoy_quic_session_, + new TestQuicCryptoServerStream(&crypto_config_, &compressed_certs_cache_, + &envoy_quic_session_, &crypto_stream_helper_)); + quic_connection_->SetDefaultEncryptionLevel(quic::ENCRYPTION_FORWARD_SECURE); + quic_connection_->SetEncrypter( + quic::ENCRYPTION_FORWARD_SECURE, + std::make_unique(quic::Perspective::IS_SERVER)); + // Drive congestion control manually. + auto send_algorithm = new testing::NiceMock; + quic::test::QuicConnectionPeer::SetSendAlgorithm(quic_connection_, send_algorithm); + EXPECT_CALL(*send_algorithm, CanSend(_)).WillRepeatedly(Return(true)); + EXPECT_CALL(*send_algorithm, GetCongestionWindow()).WillRepeatedly(Return(quic::kDefaultTCPMSS)); + EXPECT_CALL(*send_algorithm, PacingRate(_)).WillRepeatedly(Return(quic::QuicBandwidth::Zero())); + EXPECT_CALL(*send_algorithm, BandwidthEstimate()) + .WillRepeatedly(Return(quic::QuicBandwidth::Zero())); + + EXPECT_CALL(*quic_connection_, SendControlFrame(_)).Times(AnyNumber()); + + // Bump connection flow control window large enough not to interfere + // stream writing. + envoy_quic_session_.flow_controller()->UpdateSendWindowOffset( + 10 * quic::kDefaultFlowControlSendWindow); + + envoy_quic_session_.setDelayedCloseTimeout(std::chrono::milliseconds(100)); + Http::MockStreamDecoder request_decoder; + Http::MockStreamCallbacks stream_callbacks; + // Create a stream and write enough data to make it blocked. + auto stream = + dynamic_cast(createNewStream(request_decoder, stream_callbacks)); + + // Receive a GET request on created stream. + quic::QuicHeaderList request_headers; + request_headers.OnHeaderBlockStart(); + std::string host("www.abc.com"); + request_headers.OnHeader(":authority", host); + request_headers.OnHeader(":method", "GET"); + request_headers.OnHeader(":path", "/"); + request_headers.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, /*compressed_header_bytes=*/0); + // Request headers should be propagated to decoder. + EXPECT_CALL(request_decoder, decodeHeaders_(_, /*end_stream=*/true)) + .WillOnce(Invoke([&host](const Http::HeaderMapPtr& decoded_headers, bool) { + EXPECT_EQ(host, decoded_headers->Host()->value().getStringView()); + EXPECT_EQ("/", decoded_headers->Path()->value().getStringView()); + EXPECT_EQ(Http::Headers::get().MethodValues.Get, + decoded_headers->Method()->value().getStringView()); + })); + stream->OnStreamHeaderList(/*fin=*/true, request_headers.uncompressed_header_bytes(), + request_headers); + + Http::TestHeaderMapImpl response_headers{{":status", "200"}, + {":content-length", "32770"}}; // 32KB + 2 bytes + + stream->encodeHeaders(response_headers, false); + std::string response(32 * 1024 + 1, 'a'); + Buffer::OwnedImpl buffer(response); + EXPECT_CALL(stream_callbacks, onAboveWriteBufferHighWatermark()); + stream->encodeData(buffer, false); + // Stream become write blocked. + EXPECT_TRUE(envoy_quic_session_.HasDataToWrite()); + EXPECT_TRUE(stream->flow_controller()->IsBlocked()); + EXPECT_FALSE(envoy_quic_session_.IsConnectionFlowControlBlocked()); + + // Connection shouldn't be closed right away as there is a stream write blocked. + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWrite); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + time_system_.sleep(std::chrono::milliseconds(10)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + // Another write event without updating flow control window shouldn't trigger + // connection close, but it should update the timer. + envoy_quic_session_.OnCanWrite(); + EXPECT_TRUE(envoy_quic_session_.HasDataToWrite()); + + // Timer shouldn't fire at original deadline. + time_system_.sleep(std::chrono::milliseconds(90)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + // Advance the time to fire connection close timer. + time_system_.sleep(std::chrono::milliseconds(10)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + EXPECT_FALSE(quic_connection_->connected()); +} + +// Tests that if delay close timeout is not configured, flush close will not act +// based on timeout. +TEST_P(EnvoyQuicServerSessionTest, FlushCloseNoTimeout) { + installReadFilter(); + // Switch to a encryption forward secure crypto stream. + quic::test::QuicServerSessionBasePeer::SetCryptoStream(&envoy_quic_session_, nullptr); + quic::test::QuicServerSessionBasePeer::SetCryptoStream( + &envoy_quic_session_, + new TestQuicCryptoServerStream(&crypto_config_, &compressed_certs_cache_, + &envoy_quic_session_, &crypto_stream_helper_)); + quic_connection_->SetDefaultEncryptionLevel(quic::ENCRYPTION_FORWARD_SECURE); + quic_connection_->SetEncrypter( + quic::ENCRYPTION_FORWARD_SECURE, + std::make_unique(quic::Perspective::IS_SERVER)); + // Drive congestion control manually. + auto send_algorithm = new testing::NiceMock; + quic::test::QuicConnectionPeer::SetSendAlgorithm(quic_connection_, send_algorithm); + EXPECT_CALL(*send_algorithm, CanSend(_)).WillRepeatedly(Return(true)); + EXPECT_CALL(*send_algorithm, GetCongestionWindow()).WillRepeatedly(Return(quic::kDefaultTCPMSS)); + EXPECT_CALL(*send_algorithm, PacingRate(_)).WillRepeatedly(Return(quic::QuicBandwidth::Zero())); + EXPECT_CALL(*send_algorithm, BandwidthEstimate()) + .WillRepeatedly(Return(quic::QuicBandwidth::Zero())); + + EXPECT_CALL(*quic_connection_, SendControlFrame(_)).Times(AnyNumber()); + + // Bump connection flow control window large enough not to interfere + // stream writing. + envoy_quic_session_.flow_controller()->UpdateSendWindowOffset( + 10 * quic::kDefaultFlowControlSendWindow); + + Http::MockStreamDecoder request_decoder; + Http::MockStreamCallbacks stream_callbacks; + // Create a stream and write enough data to make it blocked. + auto stream = + dynamic_cast(createNewStream(request_decoder, stream_callbacks)); + + // Receive a GET request on created stream. + quic::QuicHeaderList request_headers; + request_headers.OnHeaderBlockStart(); + std::string host("www.abc.com"); + request_headers.OnHeader(":authority", host); + request_headers.OnHeader(":method", "GET"); + request_headers.OnHeader(":path", "/"); + request_headers.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, /*compressed_header_bytes=*/0); + // Request headers should be propagated to decoder. + EXPECT_CALL(request_decoder, decodeHeaders_(_, /*end_stream=*/true)) + .WillOnce(Invoke([&host](const Http::HeaderMapPtr& decoded_headers, bool) { + EXPECT_EQ(host, decoded_headers->Host()->value().getStringView()); + EXPECT_EQ("/", decoded_headers->Path()->value().getStringView()); + EXPECT_EQ(Http::Headers::get().MethodValues.Get, + decoded_headers->Method()->value().getStringView()); + })); + stream->OnStreamHeaderList(/*fin=*/true, request_headers.uncompressed_header_bytes(), + request_headers); + + Http::TestHeaderMapImpl response_headers{{":status", "200"}, + {":content-length", "32770"}}; // 32KB + 2 bytes + + stream->encodeHeaders(response_headers, false); + std::string response(32 * 1024 + 1, 'a'); + Buffer::OwnedImpl buffer(response); + stream->encodeData(buffer, true); + // Stream become write blocked. + EXPECT_TRUE(envoy_quic_session_.HasDataToWrite()); + EXPECT_TRUE(stream->flow_controller()->IsBlocked()); + EXPECT_FALSE(envoy_quic_session_.IsConnectionFlowControlBlocked()); + + // Connection shouldn't be closed right away as there is a stream write blocked. + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWrite); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + // Another write event without updating flow control window shouldn't trigger + // connection close. + envoy_quic_session_.OnCanWrite(); + EXPECT_TRUE(envoy_quic_session_.HasDataToWrite()); + + // No timeout set, so alarm shouldn't fire. + time_system_.sleep(std::chrono::milliseconds(100)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + // Force close connection. + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + EXPECT_CALL(*quic_connection_, SendControlFrame(_)) + .Times(testing::AtMost(1)) + .WillOnce(Invoke([](const quic::QuicFrame&) { return false; })); + envoy_quic_session_.close(Network::ConnectionCloseType::NoFlush); +} + +TEST_P(EnvoyQuicServerSessionTest, FlushCloseWithTimeout) { installReadFilter(); + envoy_quic_session_.setDelayedCloseTimeout(std::chrono::milliseconds(100)); + Http::MockStreamDecoder request_decoder; + Http::MockStreamCallbacks stream_callbacks; + quic::QuicStream* stream = createNewStream(request_decoder, stream_callbacks); + + envoy_quic_session_.MarkConnectionLevelWriteBlocked(stream->id()); + EXPECT_TRUE(envoy_quic_session_.HasDataToWrite()); + // Connection shouldn't be closed right away as there is a stream write blocked. + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWrite); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + // Advance the time a bit and try to close again. The delay close timer + // shouldn't be rescheduled by this call. + time_system_.sleep(std::chrono::milliseconds(10)); + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); EXPECT_CALL(*quic_connection_, SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + // Advance the time to fire connection close timer. + time_system_.sleep(std::chrono::milliseconds(90)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + EXPECT_FALSE(quic_connection_->connected()); +} + +TEST_P(EnvoyQuicServerSessionTest, FlushAndWaitForCloseWithTimeout) { + installReadFilter(); + envoy_quic_session_.setDelayedCloseTimeout(std::chrono::milliseconds(100)); + Http::MockStreamDecoder request_decoder; + Http::MockStreamCallbacks stream_callbacks; + quic::QuicStream* stream = createNewStream(request_decoder, stream_callbacks); + + envoy_quic_session_.MarkConnectionLevelWriteBlocked(stream->id()); + EXPECT_TRUE(envoy_quic_session_.HasDataToWrite()); + // Connection shouldn't be closed right away as there is a stream write blocked. + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + // Unblocking the stream shouldn't close the connection as it should be + // delayed. + time_system_.sleep(std::chrono::milliseconds(10)); + envoy_quic_session_.OnCanWrite(); + // delay close alarm should have been rescheduled. + time_system_.sleep(std::chrono::milliseconds(90)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + // Advance the time to fire connection close timer. + time_system_.sleep(std::chrono::milliseconds(10)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + EXPECT_FALSE(quic_connection_->connected()); +} + +TEST_P(EnvoyQuicServerSessionTest, FlusWriteTransitToFlushWriteWithDelay) { + installReadFilter(); + envoy_quic_session_.setDelayedCloseTimeout(std::chrono::milliseconds(100)); + Http::MockStreamDecoder request_decoder; + Http::MockStreamCallbacks stream_callbacks; + quic::QuicStream* stream = createNewStream(request_decoder, stream_callbacks); + + envoy_quic_session_.MarkConnectionLevelWriteBlocked(stream->id()); + EXPECT_TRUE(envoy_quic_session_.HasDataToWrite()); + // Connection shouldn't be closed right away as there is a stream write blocked. envoy_quic_session_.close(Network::ConnectionCloseType::FlushWrite); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + time_system_.sleep(std::chrono::milliseconds(10)); + // The closing behavior should be changed. + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay); + // Unblocking the stream shouldn't close the connection as it should be + // delayed. + envoy_quic_session_.OnCanWrite(); + + // delay close alarm should have been rescheduled. + time_system_.sleep(std::chrono::milliseconds(90)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + // Advance the time to fire connection close timer. + time_system_.sleep(std::chrono::milliseconds(10)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + EXPECT_FALSE(quic_connection_->connected()); +} + +TEST_P(EnvoyQuicServerSessionTest, FlushAndWaitForCloseWithNoPendingData) { + installReadFilter(); + envoy_quic_session_.setDelayedCloseTimeout(std::chrono::milliseconds(100)); + // This close should be delayed as configured. + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + // Advance the time a bit and try to close again. The delay close timer + // shouldn't be rescheduled by this call. + time_system_.sleep(std::chrono::milliseconds(10)); + envoy_quic_session_.close(Network::ConnectionCloseType::FlushWriteAndDelay); + EXPECT_EQ(Network::Connection::State::Open, envoy_quic_session_.state()); + + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + // Advance the time to fire connection close timer. + time_system_.sleep(std::chrono::milliseconds(90)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); } TEST_P(EnvoyQuicServerSessionTest, ShutdownNotice) { @@ -339,7 +684,6 @@ TEST_P(EnvoyQuicServerSessionTest, ShutdownNotice) { // Not verifying dummy implementation, just to have coverage. EXPECT_DEATH(envoy_quic_session_.enableHalfClose(true), ""); EXPECT_EQ(nullptr, envoy_quic_session_.ssl()); - EXPECT_DEATH(envoy_quic_session_.setDelayedCloseTimeout(std::chrono::milliseconds(1)), ""); http_connection_->shutdownNotice(); } @@ -418,13 +762,6 @@ TEST_P(EnvoyQuicServerSessionTest, NetworkConnectionInterface) { EXPECT_TRUE(envoy_quic_session_.readEnabled()); } -class TestQuicCryptoServerStream : public quic::QuicCryptoServerStream { -public: - using quic::QuicCryptoServerStream::QuicCryptoServerStream; - - bool encryption_established() const override { return true; } -}; - TEST_P(EnvoyQuicServerSessionTest, SendBufferWatermark) { // Switch to a encryption forward secure crypto stream. quic::test::QuicServerSessionBasePeer::SetCryptoStream(&envoy_quic_session_, nullptr); diff --git a/test/extensions/quic_listeners/quiche/integration/BUILD b/test/extensions/quic_listeners/quiche/integration/BUILD index 2f2a48b28ab88..361609bcdcfc7 100644 --- a/test/extensions/quic_listeners/quiche/integration/BUILD +++ b/test/extensions/quic_listeners/quiche/integration/BUILD @@ -14,6 +14,7 @@ envoy_cc_test( data = ["//test/config/integration/certs"], tags = ["nofips"], deps = [ + "//source/extensions/filters/http/dynamo:config", "//source/extensions/quic_listeners/quiche:active_quic_listener_config_lib", "//source/extensions/quic_listeners/quiche:codec_lib", "//source/extensions/quic_listeners/quiche:envoy_quic_client_connection_lib", diff --git a/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc b/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc index d65792479b980..a652c5e9d40e8 100644 --- a/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc +++ b/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc @@ -102,7 +102,6 @@ class QuicHttpIntegrationTest : public testing::TestWithParamset_nanos(0); EXPECT_EQ(hcm.codec_type(), envoy::config::filter::network::http_connection_manager::v2:: HttpConnectionManager::HTTP3); }); @@ -178,5 +177,42 @@ TEST_P(QuicHttpIntegrationTest, DownstreamReadDisabledOnGiantPost) { testRouterRequestAndResponseWithBody(/*request_size=*/1024 * 1024, /*response_size=*/1024, false); } +// Tests that a connection idle times out after 1s and starts delayed close. +TEST_P(QuicHttpIntegrationTest, TestDelayedConnectionTeardownTimeoutTrigger) { + config_helper_.addFilter("{ name: envoy.http_dynamo_filter, typed_config: { \"@type\": " + "type.googleapis.com/google.protobuf.Empty } }"); + config_helper_.setBufferLimits(1024, 1024); + config_helper_.addConfigModifier( + [](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& hcm) { + // 200ms. + hcm.mutable_delayed_close_timeout()->set_nanos(200000000); + hcm.mutable_drain_timeout()->set_seconds(1); + hcm.mutable_common_http_protocol_options()->mutable_idle_timeout()->set_seconds(1); + }); + + initialize(); + + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + auto encoder_decoder = + codec_client_->startRequest(Http::TestHeaderMapImpl{{":method", "POST"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}}); + request_encoder_ = &encoder_decoder.first; + auto response = std::move(encoder_decoder.second); + + codec_client_->sendData(*request_encoder_, 1024 * 65, false); + + response->waitForEndStream(); + // The delayed close timeout should trigger since client is not closing the connection. + EXPECT_TRUE(codec_client_->waitForDisconnect(std::chrono::milliseconds(5000))); + EXPECT_EQ(codec_client_->last_connection_event(), Network::ConnectionEvent::RemoteClose); + EXPECT_EQ(test_server_->counter("http.config_test.downstream_cx_delayed_close_timeout")->value(), + 1); +} + } // namespace Quic } // namespace Envoy diff --git a/test/extensions/quic_listeners/quiche/test_utils.h b/test/extensions/quic_listeners/quiche/test_utils.h index 690191eda8c13..1c17e3ab87732 100644 --- a/test/extensions/quic_listeners/quiche/test_utils.h +++ b/test/extensions/quic_listeners/quiche/test_utils.h @@ -51,6 +51,9 @@ class MockEnvoyQuicSession : public quic::QuicSpdySession, public QuicFilterMana using quic::QuicSpdySession::ActivateStream; +protected: + bool hasDataToWrite() override { return HasDataToWrite(); } + private: std::unique_ptr crypto_stream_; }; @@ -87,6 +90,9 @@ class MockEnvoyQuicClientSession : public quic::QuicSpdyClientSession, using quic::QuicSpdySession::ActivateStream; +protected: + bool hasDataToWrite() override { return HasDataToWrite(); } + private: quic::QuicCryptoClientConfig crypto_config_; };