diff --git a/source/common/http/codec_helper.h b/source/common/http/codec_helper.h index 4a304c159e56c..b0689be22562e 100644 --- a/source/common/http/codec_helper.h +++ b/source/common/http/codec_helper.h @@ -1,5 +1,7 @@ #pragma once +#include "envoy/event/dispatcher.h" +#include "envoy/event/timer.h" #include "envoy/http/codec.h" #include "source/common/common/assert.h" @@ -82,5 +84,55 @@ class StreamCallbackHelper { uint32_t high_watermark_callbacks_{}; }; +// A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream +// with buffered data. +class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper { +public: + MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} + ~MultiplexedStreamImplBase() override { ASSERT(stream_idle_timer_ == nullptr); } + // TODO(mattklein123): Optimally this would be done in the destructor but there are currently + // deferred delete lifetime issues that need sorting out if the destructor of the stream is + // going to be able to refer to the parent connection. + virtual void destroy() { disarmStreamIdleTimer(); } + + void onLocalEndStream() { + ASSERT(local_end_stream_); + if (hasPendingData()) { + createPendingFlushTimer(); + } + } + + void disarmStreamIdleTimer() { + if (stream_idle_timer_ != nullptr) { + // To ease testing and the destructor assertion. + stream_idle_timer_->disableTimer(); + stream_idle_timer_.reset(); + } + } + +protected: + void setFlushTimeout(std::chrono::milliseconds timeout) override { + stream_idle_timeout_ = timeout; + } + + void createPendingFlushTimer() { + ASSERT(stream_idle_timer_ == nullptr); + if (stream_idle_timeout_.count() > 0) { + stream_idle_timer_ = dispatcher_.createTimer([this] { onPendingFlushTimer(); }); + stream_idle_timer_->enableTimer(stream_idle_timeout_); + } + } + + virtual void onPendingFlushTimer() { stream_idle_timer_.reset(); } + + virtual bool hasPendingData() PURE; + +private: + Event::Dispatcher& dispatcher_; + // See HttpConnectionManager.stream_idle_timeout. + std::chrono::milliseconds stream_idle_timeout_{}; + Event::TimerPtr stream_idle_timer_; +}; + } // namespace Http } // namespace Envoy diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index f782d64d5daf5..31f5d75e1d44e 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -130,7 +130,7 @@ template static T* removeConst(const void* object) { } ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit) - : parent_(parent), + : MultiplexedStreamImplBase(parent.connection_.dispatcher()), parent_(parent), pending_recv_data_(parent_.connection_.dispatcher().getWatermarkFactory().createBuffer( [this]() -> void { this->pendingRecvBufferLowWatermark(); }, [this]() -> void { this->pendingRecvBufferHighWatermark(); }, @@ -149,10 +149,8 @@ ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_l } } -ConnectionImpl::StreamImpl::~StreamImpl() { ASSERT(stream_idle_timer_ == nullptr); } - void ConnectionImpl::StreamImpl::destroy() { - disarmStreamIdleTimer(); + MultiplexedStreamImplBase::destroy(); parent_.stats_.streams_active_.dec(); parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length()); } @@ -287,7 +285,7 @@ void ConnectionImpl::StreamImpl::encodeTrailersBase(const HeaderMap& trailers) { trailers.empty() && parent_.skip_encoding_empty_trailers_; if (!skip_encoding_empty_trailers) { pending_trailers_to_encode_ = cloneTrailers(trailers); - createPendingFlushTimer(); + onLocalEndStream(); } } else { submitTrailers(trailers); @@ -491,18 +489,9 @@ void ConnectionImpl::ServerStreamImpl::submitHeaders(const std::vector 0) { - stream_idle_timer_ = - parent_.connection_.dispatcher().createTimer([this] { onPendingFlushTimer(); }); - stream_idle_timer_->enableTimer(stream_idle_timeout_); - } -} - void ConnectionImpl::StreamImpl::onPendingFlushTimer() { ENVOY_CONN_LOG(debug, "pending stream flush timeout", parent_.connection_); - stream_idle_timer_.reset(); + MultiplexedStreamImplBase::onPendingFlushTimer(); parent_.stats_.tx_flush_timeout_.inc(); ASSERT(local_end_stream_ && !local_end_stream_sent_); // This will emit a reset frame for this stream and close the stream locally. No reset callbacks @@ -541,8 +530,8 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e // Intended to check through coverage that this error case is tested return; } - if (local_end_stream_ && pending_send_data_->length() > 0) { - createPendingFlushTimer(); + if (local_end_stream_) { + onLocalEndStream(); } } diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 95758a1a91193..dc048a884fefc 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -181,25 +181,16 @@ class ConnectionImpl : public virtual Connection, * Base class for client and server side streams. */ struct StreamImpl : public virtual StreamEncoder, - public Stream, public LinkedObject, public Event::DeferredDeletable, - public StreamCallbackHelper, + public Http::MultiplexedStreamImplBase, public ScopeTrackedObject { StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit); - ~StreamImpl() override; - // TODO(mattklein123): Optimally this would be done in the destructor but there are currently - // deferred delete lifetime issues that need sorting out if the destructor of the stream is - // going to be able to refer to the parent connection. - virtual void destroy(); - void disarmStreamIdleTimer() { - if (stream_idle_timer_ != nullptr) { - // To ease testing and the destructor assertion. - stream_idle_timer_->disableTimer(); - stream_idle_timer_.reset(); - } - } + + // Http::MultiplexedStreamImplBase + void destroy() override; + void onPendingFlushTimer() override; StreamImpl* base() { return this; } ssize_t onDataSourceRead(uint64_t length, uint32_t* data_flags); @@ -217,8 +208,6 @@ class ConnectionImpl : public virtual Connection, virtual HeaderMap& headers() PURE; virtual void allocTrailers() PURE; virtual HeaderMapPtr cloneTrailers(const HeaderMap& trailers) PURE; - virtual void createPendingFlushTimer() PURE; - void onPendingFlushTimer(); // Http::StreamEncoder void encodeData(Buffer::Instance& data, bool end_stream) override; @@ -236,9 +225,6 @@ class ConnectionImpl : public virtual Connection, return parent_.connection_.addressProvider().localAddress(); } absl::string_view responseDetails() override { return details_; } - void setFlushTimeout(std::chrono::milliseconds timeout) override { - stream_idle_timeout_ = timeout; - } void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override; // ScopeTrackedObject @@ -317,9 +303,12 @@ class ConnectionImpl : public virtual Connection, bool pending_send_buffer_high_watermark_called_ : 1; bool reset_due_to_messaging_error_ : 1; absl::string_view details_; - // See HttpConnectionManager.stream_idle_timeout. - std::chrono::milliseconds stream_idle_timeout_{}; - Event::TimerPtr stream_idle_timer_; + + protected: + // Http::MultiplexedStreamImplBase + bool hasPendingData() override { + return pending_send_data_->length() > 0 || pending_trailers_to_encode_ != nullptr; + } }; using StreamImplPtr = std::unique_ptr; @@ -333,6 +322,11 @@ class ConnectionImpl : public virtual Connection, : StreamImpl(parent, buffer_limit), response_decoder_(response_decoder), headers_or_trailers_(ResponseHeaderMapImpl::create()) {} + // Http::MultiplexedStreamImplBase + void setFlushTimeout(std::chrono::milliseconds /*timeout*/) override { + // Client streams do not need a flush timer because we currently assume that any failure + // to flush would be covered by a request/stream/etc. timeout. + } // StreamImpl void submitHeaders(const std::vector& final_headers, nghttp2_data_provider* provider) override; @@ -358,10 +352,6 @@ class ConnectionImpl : public virtual Connection, HeaderMapPtr cloneTrailers(const HeaderMap& trailers) override { return createHeaderMap(trailers); } - void createPendingFlushTimer() override { - // Client streams do not create a flush timer because we currently assume that any failure - // to flush would be covered by a request/stream/etc. timeout. - } // RequestEncoder Status encodeHeaders(const RequestHeaderMap& headers, bool end_stream) override; @@ -407,7 +397,6 @@ class ConnectionImpl : public virtual Connection, HeaderMapPtr cloneTrailers(const HeaderMap& trailers) override { return createHeaderMap(trailers); } - void createPendingFlushTimer() override; void resetStream(StreamResetReason reason) override; // ResponseEncoder diff --git a/source/common/http/http3/codec_stats.h b/source/common/http/http3/codec_stats.h index fc8355ed1b317..9e7ca5859b2ff 100644 --- a/source/common/http/http3/codec_stats.h +++ b/source/common/http/http3/codec_stats.h @@ -18,7 +18,8 @@ namespace Http3 { COUNTER(rx_reset) \ COUNTER(tx_reset) \ COUNTER(metadata_not_supported_error) \ - COUNTER(quic_version_rfc_v1) + COUNTER(quic_version_rfc_v1) \ + COUNTER(tx_flush_timeout) /** * Wrapper struct for the HTTP/3 codec stats. @see stats_macros.h diff --git a/source/common/quic/envoy_quic_client_stream.cc b/source/common/quic/envoy_quic_client_stream.cc index 314747fe9c236..6dcbd735f3897 100644 --- a/source/common/quic/envoy_quic_client_stream.cc +++ b/source/common/quic/envoy_quic_client_stream.cc @@ -66,6 +66,9 @@ Http::Status EnvoyQuicClientStream::encodeHeaders(const Http::RequestHeaderMap& } } WriteHeaders(std::move(spdy_headers), end_stream, nullptr); + if (local_end_stream_) { + onLocalEndStream(); + } return Http::okStatus(); } @@ -85,6 +88,9 @@ void EnvoyQuicClientStream::encodeData(Buffer::Instance& data, bool end_stream) Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD); return; } + if (local_end_stream_) { + onLocalEndStream(); + } } void EnvoyQuicClientStream::encodeTrailers(const Http::RequestTrailerMap& trailers) { @@ -93,6 +99,7 @@ void EnvoyQuicClientStream::encodeTrailers(const Http::RequestTrailerMap& traile ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers); ScopedWatermarkBufferUpdater updater(this, this); WriteTrailers(envoyHeadersToSpdyHeaderBlock(trailers), nullptr); + onLocalEndStream(); } void EnvoyQuicClientStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) { @@ -271,6 +278,7 @@ void EnvoyQuicClientStream::OnConnectionClosed(quic::QuicErrorCode error, } void EnvoyQuicClientStream::OnClose() { + destroy(); quic::QuicSpdyClientStream::OnClose(); if (isDoingWatermarkAccounting()) { // This is called in the scope of a watermark buffer updater. Clear the @@ -321,5 +329,7 @@ void EnvoyQuicClientStream::onStreamError(absl::optional should_close_conn } } +bool EnvoyQuicClientStream::hasPendingData() { return BufferedDataBytes() > 0; } + } // namespace Quic } // namespace Envoy diff --git a/source/common/quic/envoy_quic_client_stream.h b/source/common/quic/envoy_quic_client_stream.h index 624cdc8ef8959..89b35d8e51650 100644 --- a/source/common/quic/envoy_quic_client_stream.h +++ b/source/common/quic/envoy_quic_client_stream.h @@ -70,6 +70,9 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream, void OnTrailingHeadersComplete(bool fin, size_t frame_len, const quic::QuicHeaderList& header_list) override; + // Http::MultiplexedStreamImplBase + bool hasPendingData() override; + private: QuicFilterManagerConnectionImpl* filterManagerConnection(); diff --git a/source/common/quic/envoy_quic_server_stream.cc b/source/common/quic/envoy_quic_server_stream.cc index a4a25af3de75b..10f4e379a351e 100644 --- a/source/common/quic/envoy_quic_server_stream.cc +++ b/source/common/quic/envoy_quic_server_stream.cc @@ -60,6 +60,9 @@ void EnvoyQuicServerStream::encodeHeaders(const Http::ResponseHeaderMap& headers local_end_stream_ = end_stream; SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this); WriteHeaders(envoyHeadersToSpdyHeaderBlock(headers), end_stream, nullptr); + if (local_end_stream_) { + onLocalEndStream(); + } } void EnvoyQuicServerStream::encodeData(Buffer::Instance& data, bool end_stream) { @@ -78,6 +81,9 @@ void EnvoyQuicServerStream::encodeData(Buffer::Instance& data, bool end_stream) Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD); return; } + if (local_end_stream_) { + onLocalEndStream(); + } } void EnvoyQuicServerStream::encodeTrailers(const Http::ResponseTrailerMap& trailers) { @@ -86,6 +92,7 @@ void EnvoyQuicServerStream::encodeTrailers(const Http::ResponseTrailerMap& trail ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers); SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this); WriteTrailers(envoyHeadersToSpdyHeaderBlock(trailers), nullptr); + onLocalEndStream(); } void EnvoyQuicServerStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) { @@ -269,8 +276,10 @@ void EnvoyQuicServerStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) void EnvoyQuicServerStream::Reset(quic::QuicRstStreamErrorCode error) { ENVOY_STREAM_LOG(debug, "sending reset code={}", *this, error); stats_.tx_reset_.inc(); - // Upper layers expect calling resetStream() to immediately raise reset callbacks. - runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error)); + if (!local_end_stream_) { + // Upper layers expect calling resetStream() to immediately raise reset callbacks. + runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error)); + } quic::QuicSpdyServerStreamBase::Reset(error); } @@ -302,6 +311,7 @@ void EnvoyQuicServerStream::CloseWriteSide() { } void EnvoyQuicServerStream::OnClose() { + destroy(); quic::QuicSpdyServerStreamBase::OnClose(); if (isDoingWatermarkAccounting()) { return; @@ -367,5 +377,21 @@ void EnvoyQuicServerStream::onStreamError(absl::optional should_close_conn } } +void EnvoyQuicServerStream::onPendingFlushTimer() { + ENVOY_STREAM_LOG(debug, "pending stream flush timeout", *this); + Http::MultiplexedStreamImplBase::onPendingFlushTimer(); + stats_.tx_flush_timeout_.inc(); + ASSERT(local_end_stream_ && !fin_sent()); + // Reset the stream locally. But no reset callbacks will be run because higher layers think the + // stream is already finished. + Reset(quic::QUIC_STREAM_CANCELLED); +} + +bool EnvoyQuicServerStream::hasPendingData() { + // Quic stream sends headers and trailers on the same stream, and buffers them in the same sending + // buffer if needed. So checking this buffer is sufficient. + return BufferedDataBytes() > 0; +} + } // namespace Quic } // namespace Envoy diff --git a/source/common/quic/envoy_quic_server_stream.h b/source/common/quic/envoy_quic_server_stream.h index 0f0f81f7cf3e3..ae3d431a3e5f7 100644 --- a/source/common/quic/envoy_quic_server_stream.h +++ b/source/common/quic/envoy_quic_server_stream.h @@ -45,9 +45,6 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, // Http::Stream void resetStream(Http::StreamResetReason reason) override; - void setFlushTimeout(std::chrono::milliseconds) override { - // TODO(mattklein123): Actually implement this for HTTP/3 similar to HTTP/2. - } // quic::QuicSpdyStream void OnBodyAvailable() override; @@ -79,6 +76,10 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, const quic::QuicHeaderList& header_list) override; void OnHeadersTooLarge() override; + // Http::MultiplexedStreamImplBase + void onPendingFlushTimer() override; + bool hasPendingData() override; + private: QuicFilterManagerConnectionImpl* filterManagerConnection(); diff --git a/source/common/quic/envoy_quic_stream.h b/source/common/quic/envoy_quic_stream.h index 3136ad93b4c86..b50bca4e2d551 100644 --- a/source/common/quic/envoy_quic_stream.h +++ b/source/common/quic/envoy_quic_stream.h @@ -16,8 +16,7 @@ namespace Quic { // Base class for EnvoyQuicServer|ClientStream. class EnvoyQuicStream : public virtual Http::StreamEncoder, - public Http::Stream, - public Http::StreamCallbackHelper, + public Http::MultiplexedStreamImplBase, public SendBufferMonitor, public HeaderValidator, protected Logger::Loggable { @@ -28,7 +27,8 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder, std::function below_low_watermark, std::function above_high_watermark, Http::Http3::CodecStats& stats, const envoy::config::core::v3::Http3ProtocolOptions& http3_options) - : stats_(stats), http3_options_(http3_options), + : Http::MultiplexedStreamImplBase(filter_manager_connection.dispatcher()), stats_(stats), + http3_options_(http3_options), send_buffer_simulation_(buffer_limit / 2, buffer_limit, std::move(below_low_watermark), std::move(above_high_watermark), ENVOY_LOGGER()), filter_manager_connection_(filter_manager_connection), diff --git a/test/common/quic/envoy_quic_server_session_test.cc b/test/common/quic/envoy_quic_server_session_test.cc index ce493e33da91c..f44b359cc2b78 100644 --- a/test/common/quic/envoy_quic_server_session_test.cc +++ b/test/common/quic/envoy_quic_server_session_test.cc @@ -1005,8 +1005,8 @@ TEST_F(EnvoyQuicServerSessionTest, SendBufferWatermark) { EXPECT_TRUE(stream2->IsFlowControlBlocked()); // Resetting stream3 should lower the buffered bytes, but callbacks will not - // be triggered because reset callback has been already triggered. - EXPECT_CALL(stream_callbacks3, onResetStream(Http::StreamResetReason::LocalReset, "")); + // be triggered because end stream is already encoded. + EXPECT_CALL(stream_callbacks3, onResetStream(Http::StreamResetReason::LocalReset, "")).Times(0); // Connection buffered data book keeping should also be updated. EXPECT_CALL(network_connection_callbacks_, onBelowWriteBufferLowWatermark()); stream3->resetStream(Http::StreamResetReason::LocalReset); diff --git a/test/integration/multiplexed_integration_test.cc b/test/integration/multiplexed_integration_test.cc index f139528b9723e..5f3fed78b6e29 100644 --- a/test/integration/multiplexed_integration_test.cc +++ b/test/integration/multiplexed_integration_test.cc @@ -3,6 +3,10 @@ #include #include +#ifdef ENVOY_ENABLE_QUIC +#include "source/common/quic/client_connection_factory_impl.h" +#endif + #include "envoy/config/bootstrap/v3/bootstrap.pb.h" #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" @@ -123,7 +127,6 @@ TEST_P(Http2IntegrationTest, LargeRequestTrailersRejected) { testLargeRequestTra // Verify downstream codec stream flush timeout. TEST_P(Http2IntegrationTest, CodecStreamIdleTimeout) { - EXCLUDE_DOWNSTREAM_HTTP3; // Need to support stream_idle_timeout. config_helper_.setBufferLimits(1024, 1024); config_helper_.addConfigModifier( [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& @@ -133,16 +136,29 @@ TEST_P(Http2IntegrationTest, CodecStreamIdleTimeout) { hcm.mutable_stream_idle_timeout()->set_nanos(IdleTimeoutMs * 1000 * 1000); }); initialize(); + const size_t stream_flow_control_window = + downstream_protocol_ == Http::CodecType::HTTP3 ? 32 * 1024 : 65535; envoy::config::core::v3::Http2ProtocolOptions http2_options = ::Envoy::Http2::Utility::initializeAndValidateOptions( envoy::config::core::v3::Http2ProtocolOptions()); - http2_options.mutable_initial_stream_window_size()->set_value(65535); + http2_options.mutable_initial_stream_window_size()->set_value(stream_flow_control_window); +#ifdef ENVOY_ENABLE_QUIC + if (downstream_protocol_ == Http::CodecType::HTTP3) { + dynamic_cast(*quic_connection_persistent_info_) + .quic_config_.SetInitialStreamFlowControlWindowToSend(stream_flow_control_window); + dynamic_cast(*quic_connection_persistent_info_) + .quic_config_.SetInitialSessionFlowControlWindowToSend(stream_flow_control_window); + } +#endif codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), http2_options); auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); waitForNextUpstreamRequest(); upstream_request_->encodeHeaders(default_response_headers_, false); - upstream_request_->encodeData(70000, true); - test_server_->waitForCounterEq("http2.tx_flush_timeout", 1); + upstream_request_->encodeData(stream_flow_control_window + 2000, true); + std::string flush_timeout_counter(downstreamProtocol() == Http::CodecType::HTTP3 + ? "http3.tx_flush_timeout" + : "http2.tx_flush_timeout"); + test_server_->waitForCounterEq(flush_timeout_counter, 1); ASSERT_TRUE(response->waitForReset()); }