diff --git a/bazel/external/quiche.BUILD b/bazel/external/quiche.BUILD index a7cd337706cea..1b9e08f22145d 100644 --- a/bazel/external/quiche.BUILD +++ b/bazel/external/quiche.BUILD @@ -3115,6 +3115,20 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "quic_test_tools_server_session_base_peer", + hdrs = [ + "quiche/quic/test_tools/quic_server_session_base_peer.h", + ], + copts = quiche_copts, + repository = "@envoy", + tags = ["nofips"], + deps = [ + ":quic_core_http_spdy_session_lib", + ":quic_core_utils_lib", + ], +) + envoy_cc_test_library( name = "quic_test_tools_simple_quic_framer_lib", srcs = ["quiche/quic/test_tools/simple_quic_framer.cc"], diff --git a/source/common/common/logger.h b/source/common/common/logger.h index 5efa9accd3404..80c9f57e574e3 100644 --- a/source/common/common/logger.h +++ b/source/common/common/logger.h @@ -49,6 +49,7 @@ namespace Logger { FUNCTION(misc) \ FUNCTION(mongo) \ FUNCTION(quic) \ + FUNCTION(quic_stream) \ FUNCTION(pool) \ FUNCTION(rbac) \ FUNCTION(redis) \ diff --git a/source/docs/quiche_integration.md b/source/docs/quiche_integration.md new file mode 100644 index 0000000000000..45de216cc6b44 --- /dev/null +++ b/source/docs/quiche_integration.md @@ -0,0 +1,33 @@ +### Overview + +QUICHE is integrated in the way described below: + +A combination of QUIC session and connection serves as a Network::Connection instance. More than that, QUIC session manages all the QUIC streams. The QUIC codec is a very thin layer between QUIC session and HCM. It doesn't do de-multiplexing or stream management, but only provides interfaces for the HCM to communicate with the QUIC session. + +QUIC's Http::StreamEncoder and Http::StreamDecoder implementation is decoupled. The encoder is implemented by EnvoyQuicStream which is a QUIC stream and owned by session. The HCM owns the decoder which can be accessed by QUIC stream instances. And the decoder also knows about stream encoder. + +### Request pipeline + +The QUIC stream calls decodeHeaders() to deliver request headers. The request body needs to be delivered after headers are delivered as some QUICHE implementations allow the body to arrive earlier than headers. If not read disabled, it always deliver available data via decodeData(). The stream doesn't buffer any readable data in QUICHE stream buffers. + +### Response pipeline + +The HCM will call encoder's encodeHeaders() to write response headers, and then encodeData() and encodeTrailers(). encodeData() calls WriteBodySlices() to write out response body. The quic stream in QUICHE configures its send buffer threshold QuicStream::buffered_data_threshold_ to be high enough to take all the data passed in, so stream functionally has unlimited buffer. + +### Flow Control + +#### Receive buffer + +All arrived out-of-order data is buffered in QUICHE stream. This buffer is capped by max stream flow control window in QUICHE which is 16MB. Once bytes are put in sequence and ready to be used, OnBodyDataAvailable() is called. The stream implementation overrides this call and calls StreamDecoder::decodeData() in it. Request and response body are buffered in each L7 filter if desired, and the stream itself doesn't buffer any of them unless set as read blocked. + +When upstream or any L7 filter reaches its buffer limit, it will call Http::Stream::readDisable() with false to set QUIC stream to be read blocked. In this state, even if more request/response body is available to be delivered, OnBodyDataAvailable() will not be called. As a result, downstream flow control will not shift as no data will be consumed. As both filters and upstream buffers can call readDisable(), each stream has a counter of how many times the HCM blocks the stream. When the counter is cleared, the stream will set its state to unblocked and thus deliver any new and existing available data buffered in the QUICHE stream. + +#### Send buffer + +We use the unlimited stream send buffer in QUICHE along with a book keeping data structure `EnvoyQuicSimulatedWatermarkBuffer` to serve the function of WatermarkBuffer in Envoy to prevent buffering too much in QUICHE. + +When the bytes buffered in a stream's send buffer exceeds its high watermark, its inherited method StreamCallbackHelper::runHighWatermarkCallbacks() is called. The buffered bytes will go below stream's low watermark as the stream writes out data gradually via QuicStream::OnCanWrite(). In this case StreamCallbackHelper::runLowWatermarkCallbacks() will be called. QUICHE buffers all the data upon QuicSpdyStream::WriteBodySlices(), assuming `buffered_data_threshold_` is set high enough, and then writes all or part of them according to flow control window and congestion control window. To prevent transient changes in buffered bytes from triggering these two watermark callbacks one immediately after the other, encodeData() and OnCanWrite() only update the watermark bookkeeping once at the end if buffered bytes are changed. + +QUICHE doesn't buffer data at the local connection layer. All the data is buffered in the respective streams.To prevent the case where all streams collectively buffers a lot of data, there is also a simulated watermark buffer for each QUIC connection which is updated upon each stream write. + +When the aggregated buffered bytes goes above high watermark, its registered network callbacks will call Network::ConnectionCallbacks::onAboveWriteBufferHighWatermark(). The HCM will notify each stream via QUIC codec Http::Connection::onUnderlyingConnectionAboveWriteBufferHighWatermark() which will call each stream's StreamCallbackHelper::runHighWatermarkCallbacks(). There might be a way to simply the call stack as Quic connection already knows about all the stream, there is no need to call to HCM and notify each stream via codec. But here we just follow the same logic as HTTP2 codec does. In the same way, any QuicStream::OnCanWrite() may change the aggregated buffered bytes in the connection level bookkeeping as well. If the buffered bytes goes down below the low watermark, the same calls will be triggered to propagate onBelowWriteBufferLowWatermark() to each stream. diff --git a/source/extensions/quic_listeners/quiche/BUILD b/source/extensions/quic_listeners/quiche/BUILD index 63f9046607ea3..91d8eb333b5a5 100644 --- a/source/extensions/quic_listeners/quiche/BUILD +++ b/source/extensions/quic_listeners/quiche/BUILD @@ -97,26 +97,14 @@ envoy_cc_library( hdrs = ["envoy_quic_stream.h"], tags = ["nofips"], deps = [ + ":envoy_quic_simulated_watermark_buffer_lib", + ":quic_filter_manager_connection_lib", + "//include/envoy/event:dispatcher_interface", "//include/envoy/http:codec_interface", "//source/common/http:codec_helper_lib", ], ) -envoy_cc_library( - name = "envoy_quic_server_stream_lib", - srcs = ["envoy_quic_server_stream.cc"], - hdrs = ["envoy_quic_server_stream.h"], - tags = ["nofips"], - deps = [ - ":envoy_quic_stream_lib", - ":envoy_quic_utils_lib", - "//source/common/buffer:buffer_lib", - "//source/common/common:assert_lib", - "//source/common/http:header_map_lib", - "@com_googlesource_quiche//:quic_core_http_spdy_session_lib", - ], -) - envoy_cc_library( name = "codec_lib", srcs = ["codec_impl.cc"], @@ -136,9 +124,13 @@ envoy_cc_library( tags = ["nofips"], deps = [ ":envoy_quic_connection_lib", + ":envoy_quic_simulated_watermark_buffer_lib", "//include/envoy/event:dispatcher_interface", "//include/envoy/network:connection_interface", + "//source/common/buffer:buffer_lib", + "//source/common/common:assert_lib", "//source/common/common:empty_string", + "//source/common/http:header_map_lib", "//source/common/network:filter_manager_lib", "//source/common/stream_info:stream_info_lib", ], @@ -146,12 +138,23 @@ envoy_cc_library( envoy_cc_library( name = "envoy_quic_server_session_lib", - srcs = ["envoy_quic_server_session.cc"], - hdrs = ["envoy_quic_server_session.h"], + srcs = [ + "envoy_quic_server_session.cc", + "envoy_quic_server_stream.cc", + ], + hdrs = [ + "envoy_quic_server_session.h", + "envoy_quic_server_stream.h", + ], tags = ["nofips"], deps = [ - ":envoy_quic_server_stream_lib", + ":envoy_quic_stream_lib", + ":envoy_quic_utils_lib", ":quic_filter_manager_connection_lib", + "//source/common/buffer:buffer_lib", + "//source/common/common:assert_lib", + "//source/common/http:header_map_lib", + "//source/extensions/quic_listeners/quiche/platform:quic_platform_mem_slice_storage_impl_lib", "@com_googlesource_quiche//:quic_core_http_spdy_session_lib", ], ) @@ -207,6 +210,12 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "envoy_quic_simulated_watermark_buffer_lib", + hdrs = ["envoy_quic_simulated_watermark_buffer.h"], + deps = ["//source/common/common:assert_lib"], +) + envoy_cc_library( name = "active_quic_listener_lib", srcs = ["active_quic_listener.cc"], diff --git a/source/extensions/quic_listeners/quiche/codec_impl.cc b/source/extensions/quic_listeners/quiche/codec_impl.cc index fdb060cb7c159..cd082427b0ffe 100644 --- a/source/extensions/quic_listeners/quiche/codec_impl.cc +++ b/source/extensions/quic_listeners/quiche/codec_impl.cc @@ -1,18 +1,36 @@ #include "extensions/quic_listeners/quiche/codec_impl.h" +#include "extensions/quic_listeners/quiche/envoy_quic_server_stream.h" + namespace Envoy { namespace Quic { bool QuicHttpConnectionImplBase::wantsToWrite() { return quic_session_.HasDataToWrite(); } -// TODO(danzh): modify QUIC stack to react based on aggregated bytes across all -// the streams. And call StreamCallbackHelper::runHighWatermarkCallbacks() for each stream. -void QuicHttpConnectionImplBase::onUnderlyingConnectionAboveWriteBufferHighWatermark() { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; +QuicHttpServerConnectionImpl::QuicHttpServerConnectionImpl( + EnvoyQuicServerSession& quic_session, Http::ServerConnectionCallbacks& callbacks) + : QuicHttpConnectionImplBase(quic_session), quic_server_session_(quic_session) { + quic_session.setHttpConnectionCallbacks(callbacks); +} + +void QuicHttpServerConnectionImpl::onUnderlyingConnectionAboveWriteBufferHighWatermark() { + for (auto& it : quic_server_session_.stream_map()) { + if (!it.second->is_static()) { + // Only call watermark callbacks on non QUIC static streams which are + // crypto stream and Google QUIC headers stream. + ENVOY_LOG(debug, "runHighWatermarkCallbacks on stream {}", it.first); + dynamic_cast(it.second.get())->runHighWatermarkCallbacks(); + } + } } -void QuicHttpConnectionImplBase::onUnderlyingConnectionBelowWriteBufferLowWatermark() { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; +void QuicHttpServerConnectionImpl::onUnderlyingConnectionBelowWriteBufferLowWatermark() { + for (const auto& it : quic_server_session_.stream_map()) { + if (!it.second->is_static()) { + ENVOY_LOG(debug, "runLowWatermarkCallbacks on stream {}", it.first); + dynamic_cast(it.second.get())->runLowWatermarkCallbacks(); + } + } } void QuicHttpServerConnectionImpl::goAway() { diff --git a/source/extensions/quic_listeners/quiche/codec_impl.h b/source/extensions/quic_listeners/quiche/codec_impl.h index debff738cb044..07b2f2042ac14 100644 --- a/source/extensions/quic_listeners/quiche/codec_impl.h +++ b/source/extensions/quic_listeners/quiche/codec_impl.h @@ -27,11 +27,10 @@ class QuicHttpConnectionImplBase : public virtual Http::Connection, // TODO(danzh) add Http3 enum value for QUIC. return Http::Protocol::Http2; } + // Returns true if the session has data to send but queued in connection or // stream send buffer. bool wantsToWrite() override; - void onUnderlyingConnectionAboveWriteBufferHighWatermark() override; - void onUnderlyingConnectionBelowWriteBufferLowWatermark() override; protected: quic::QuicSpdySession& quic_session_; @@ -41,10 +40,7 @@ class QuicHttpServerConnectionImpl : public QuicHttpConnectionImplBase, public Http::ServerConnection { public: QuicHttpServerConnectionImpl(EnvoyQuicServerSession& quic_session, - Http::ServerConnectionCallbacks& callbacks) - : QuicHttpConnectionImplBase(quic_session), quic_server_session_(quic_session) { - quic_session.setHttpConnectionCallbacks(callbacks); - } + Http::ServerConnectionCallbacks& callbacks); // Http::Connection void goAway() override; @@ -52,6 +48,8 @@ class QuicHttpServerConnectionImpl : public QuicHttpConnectionImplBase, // TODO(danzh): Add double-GOAWAY support in QUIC. ENVOY_CONN_LOG(error, "Shutdown notice is not propagated to QUIC.", quic_server_session_); } + void onUnderlyingConnectionAboveWriteBufferHighWatermark() override; + void onUnderlyingConnectionBelowWriteBufferLowWatermark() override; private: EnvoyQuicServerSession& quic_server_session_; diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_dispatcher.cc b/source/extensions/quic_listeners/quiche/envoy_quic_dispatcher.cc index cc353f0f7eba5..eab481cbe8c50 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_dispatcher.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_dispatcher.cc @@ -25,6 +25,17 @@ EnvoyQuicDispatcher::EnvoyQuicDispatcher( // Network::UdpListenerCallbacks which should be called at the beginning // of HandleReadEvent(). And this callback should call quic::Dispatcher::ProcessBufferedChlos(). SetQuicFlag(FLAGS_quic_allow_chlo_buffering, false); + // Set send buffer twice of max flow control window to ensure that stream send + // buffer always takes all the data. + // The max amount of data buffered is the per-stream high watermark + the max + // flow control window of upstream. The per-stream high watermark should be + // smaller than max flow control window to make sure upper stream can be flow + // control blocked early enough not to send more than the threshold allows. + // TODO(#8826) Ideally we should use the negotiated value from upstream which is not accessible + // for now. 512MB is way to large, but the actual bytes buffered should be bound by the negotiated + // upstream flow control window. + SetQuicFlag(FLAGS_quic_buffered_data_threshold, + 2 * Http::Http2Settings::DEFAULT_INITIAL_STREAM_WINDOW_SIZE); // 512MB } void EnvoyQuicDispatcher::OnConnectionClosed(quic::QuicConnectionId connection_id, @@ -44,7 +55,8 @@ quic::QuicSession* EnvoyQuicDispatcher::CreateQuicSession( listener_stats_); auto quic_session = new EnvoyQuicServerSession( config(), quic::ParsedQuicVersionVector{version}, std::move(quic_connection), this, - session_helper(), crypto_config(), compressed_certs_cache(), dispatcher_); + session_helper(), crypto_config(), compressed_certs_cache(), dispatcher_, + listener_config_.perConnectionBufferLimitBytes()); quic_session->Initialize(); // Filter chain can't be retrieved here as self address is unknown at this // point. diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc index 021c390e474d5..a1a47634ad810 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc @@ -9,6 +9,7 @@ #include "quiche/quic/core/quic_crypto_server_stream.h" #pragma GCC diagnostic pop +#include "common/common/assert.h" #include "extensions/quic_listeners/quiche/envoy_quic_server_stream.h" namespace Envoy { @@ -18,10 +19,16 @@ EnvoyQuicServerSession::EnvoyQuicServerSession( const quic::QuicConfig& config, const quic::ParsedQuicVersionVector& supported_versions, std::unique_ptr connection, quic::QuicSession::Visitor* visitor, quic::QuicCryptoServerStream::Helper* helper, const quic::QuicCryptoServerConfig* crypto_config, - quic::QuicCompressedCertsCache* compressed_certs_cache, Event::Dispatcher& dispatcher) + quic::QuicCompressedCertsCache* compressed_certs_cache, Event::Dispatcher& dispatcher, + uint32_t send_buffer_limit) : quic::QuicServerSessionBase(config, supported_versions, connection.get(), visitor, helper, crypto_config, compressed_certs_cache), - QuicFilterManagerConnectionImpl(std::move(connection), dispatcher) {} + QuicFilterManagerConnectionImpl(connection.get(), dispatcher, send_buffer_limit), + quic_connection_(std::move(connection)) {} + +EnvoyQuicServerSession::~EnvoyQuicServerSession() { + QuicFilterManagerConnectionImpl::quic_connection_ = nullptr; +} absl::string_view EnvoyQuicServerSession::requestedServerName() const { return {GetCryptoStream()->crypto_negotiated_params().sni}; @@ -41,6 +48,9 @@ quic::QuicSpdyStream* EnvoyQuicServerSession::CreateIncomingStream(quic::QuicStr auto stream = new EnvoyQuicServerStream(id, this, quic::BIDIRECTIONAL); ActivateStream(absl::WrapUnique(stream)); setUpRequestDecoder(*stream); + if (aboveHighWatermark()) { + stream->runHighWatermarkCallbacks(); + } return stream; } diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.h b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.h index d849aa0161cac..e6cd850ea486f 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.h @@ -20,6 +20,9 @@ namespace Envoy { namespace Quic { // Act as a Network::Connection to HCM and a FilterManager to FilterFactoryCb. +// TODO(danzh) Lifetime of quic connection and filter manager connection can be +// simplified by changing the inheritance to a member variable instantiated +// before quic_connection_. class EnvoyQuicServerSession : public quic::QuicServerSessionBase, public QuicFilterManagerConnectionImpl { public: @@ -30,7 +33,9 @@ class EnvoyQuicServerSession : public quic::QuicServerSessionBase, quic::QuicCryptoServerStream::Helper* helper, const quic::QuicCryptoServerConfig* crypto_config, quic::QuicCompressedCertsCache* compressed_certs_cache, - Event::Dispatcher& dispatcher); + Event::Dispatcher& dispatcher, uint32_t send_buffer_limit); + + ~EnvoyQuicServerSession() override; // Network::Connection absl::string_view requestedServerName() const override; @@ -48,6 +53,8 @@ class EnvoyQuicServerSession : public quic::QuicServerSessionBase, // quic::QuicSpdySession void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override; + using quic::QuicSession::stream_map; + protected: // quic::QuicServerSessionBase quic::QuicCryptoServerStreamBase* @@ -64,6 +71,7 @@ class EnvoyQuicServerSession : public quic::QuicServerSessionBase, private: void setUpRequestDecoder(EnvoyQuicStream& stream); + std::unique_ptr quic_connection_; // These callbacks are owned by network filters and quic session should out live // them. Http::ServerConnectionCallbacks* http_connection_callbacks_{nullptr}; diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc index 35bd63f811355..b4ac61223b178 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc @@ -3,7 +3,7 @@ #include #include -#include +#include #pragma GCC diagnostic push // QUICHE allows unused parameters. @@ -14,10 +14,12 @@ #include "quiche/quic/core/http/quic_header_list.h" #include "quiche/quic/core/quic_session.h" #include "quiche/spdy/core/spdy_header_block.h" - +#include "extensions/quic_listeners/quiche/platform/quic_mem_slice_span_impl.h" #pragma GCC diagnostic pop #include "extensions/quic_listeners/quiche/envoy_quic_utils.h" +#include "extensions/quic_listeners/quiche/envoy_quic_server_session.h" + #include "common/buffer/buffer_impl.h" #include "common/http/header_map_impl.h" #include "common/common/assert.h" @@ -25,28 +27,101 @@ namespace Envoy { namespace Quic { +EnvoyQuicServerStream::EnvoyQuicServerStream(quic::QuicStreamId id, quic::QuicSpdySession* session, + quic::StreamType type) + : quic::QuicSpdyServerStreamBase(id, session, type), + EnvoyQuicStream( + // This should be larger than 8k to fully utilize congestion control + // window. And no larger than the max stream flow control window for + // the stream to buffer all the data. + // Ideally this limit should also correlate to peer's receive window + // but not fully depends on that. + 16 * 1024, [this]() { runLowWatermarkCallbacks(); }, + [this]() { runHighWatermarkCallbacks(); }) {} + +EnvoyQuicServerStream::EnvoyQuicServerStream(quic::PendingStream* pending, + quic::QuicSpdySession* session, quic::StreamType type) + : quic::QuicSpdyServerStreamBase(pending, session, type), + EnvoyQuicStream( + // This should be larger than 8k to fully utilize congestion control + // window. And no larger than the max stream flow control window for + // the stream to buffer all the data. + 16 * 1024, [this]() { runLowWatermarkCallbacks(); }, + [this]() { runHighWatermarkCallbacks(); }) {} + void EnvoyQuicServerStream::encode100ContinueHeaders(const Http::HeaderMap& headers) { ASSERT(headers.Status()->value() == "100"); encodeHeaders(headers, false); } -void EnvoyQuicServerStream::encodeHeaders(const Http::HeaderMap& /*headers*/, bool /*end_stream*/) { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + +void EnvoyQuicServerStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) { + ENVOY_STREAM_LOG(debug, "encodeHeaders (end_stream={}) {}.", *this, end_stream, headers); + // QUICHE guarantees to take all the headers. This could cause infinite data to + // be buffered on headers stream in Google QUIC implementation because + // headers stream doesn't have upper bound for its send buffer. But in IETF + // QUIC implementation this is safe as headers are sent on data stream which + // is bounded by max concurrent streams limited. + // Same vulnerability exists in crypto stream which can infinitely buffer data + // if handshake implementation goes wrong. + // TODO(#8826) Modify QUICHE to have an upper bound for header stream send buffer. + WriteHeaders(envoyHeadersToSpdyHeaderBlock(headers), end_stream, nullptr); + local_end_stream_ = end_stream; } -void EnvoyQuicServerStream::encodeData(Buffer::Instance& /*data*/, bool /*end_stream*/) { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + +void EnvoyQuicServerStream::encodeData(Buffer::Instance& data, bool end_stream) { + ENVOY_STREAM_LOG(debug, "encodeData (end_stream={}) of {} bytes.", *this, end_stream, + data.length()); + local_end_stream_ = end_stream; + // This is counting not serialized bytes in the send buffer. + uint64_t bytes_to_send_old = BufferedDataBytes(); + // QUIC stream must take all. + WriteBodySlices(quic::QuicMemSliceSpan(quic::QuicMemSliceSpanImpl(data)), end_stream); + if (data.length() > 0) { + // Send buffer didn't take all the data, threshold needs to be adjusted. + Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD); + return; + } + + uint64_t bytes_to_send_new = BufferedDataBytes(); + ASSERT(bytes_to_send_old <= bytes_to_send_new); + maybeCheckWatermark(bytes_to_send_old, bytes_to_send_new, *filterManagerConnection()); } -void EnvoyQuicServerStream::encodeTrailers(const Http::HeaderMap& /*trailers*/) { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + +void EnvoyQuicServerStream::encodeTrailers(const Http::HeaderMap& trailers) { + ASSERT(!local_end_stream_); + local_end_stream_ = true; + ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers); + WriteTrailers(envoyHeadersToSpdyHeaderBlock(trailers), nullptr); } + void EnvoyQuicServerStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) { + // Metadata Frame is not supported in QUIC. NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } -void EnvoyQuicServerStream::resetStream(Http::StreamResetReason /*reason*/) { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; +void EnvoyQuicServerStream::resetStream(Http::StreamResetReason reason) { + // Upper layers expect calling resetStream() to immediately raise reset callbacks. + runResetCallbacks(reason); + if (local_end_stream_ && !reading_stopped()) { + // This is after 200 early response. Reset with QUIC_STREAM_NO_ERROR instead + // of propagating original reset reason. In QUICHE if a stream stops reading + // before FIN or RESET received, it resets the steam with QUIC_STREAM_NO_ERROR. + StopReading(); + } else { + Reset(envoyResetReasonToQuicRstError(reason)); + } } -void EnvoyQuicServerStream::readDisable(bool /*disable*/) { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } +void EnvoyQuicServerStream::switchStreamBlockState(bool should_block) { + ASSERT(FinishedReadingHeaders(), + "Upperstream buffer limit is reached before request body is delivered."); + if (should_block) { + sequencer()->SetBlockedUntilFlush(); + } else { + ASSERT(read_disable_counter_ == 0, "readDisable called in between."); + sequencer()->SetUnblocked(); + } +} void EnvoyQuicServerStream::OnInitialHeadersComplete(bool fin, size_t frame_len, const quic::QuicHeaderList& header_list) { @@ -54,10 +129,18 @@ void EnvoyQuicServerStream::OnInitialHeadersComplete(bool fin, size_t frame_len, ASSERT(decoder() != nullptr); ASSERT(headers_decompressed()); decoder()->decodeHeaders(quicHeadersToEnvoyHeaders(header_list), /*end_stream=*/fin); + if (fin) { + end_stream_decoded_ = true; + } ConsumeHeaderList(); } void EnvoyQuicServerStream::OnBodyAvailable() { + ASSERT(FinishedReadingHeaders()); + ASSERT(read_disable_counter_ == 0); + ASSERT(!in_decode_data_callstack_); + in_decode_data_callstack_ = true; + Buffer::InstancePtr buffer = std::make_unique(); // TODO(danzh): check Envoy per stream buffer limit. // Currently read out all the data. @@ -77,18 +160,39 @@ void EnvoyQuicServerStream::OnBodyAvailable() { // True if no trailer and FIN read. bool finished_reading = IsDoneReading(); - // If this is the last stream data, set end_stream if there is no - // trailers. - ASSERT(decoder() != nullptr); - decoder()->decodeData(*buffer, finished_reading); - if (!quic::VersionUsesQpack(transport_version()) && sequencer()->IsClosed() && - !FinishedReadingTrailers()) { + bool empty_payload_with_fin = buffer->length() == 0 && fin_received(); + // If this call is triggered by an empty frame with FIN which is not from peer + // but synthesized by stream itself upon receiving HEADERS with FIN or + // TRAILERS, do not deliver end of stream here. Because either decodeHeaders + // already delivered it or decodeTrailers will be called. + bool skip_decoding = empty_payload_with_fin && (end_stream_decoded_ || !finished_reading); + if (!skip_decoding) { + ASSERT(decoder() != nullptr); + decoder()->decodeData(*buffer, finished_reading); + if (finished_reading) { + end_stream_decoded_ = true; + } + } + + if (!sequencer()->IsClosed()) { + in_decode_data_callstack_ = false; + if (read_disable_counter_ > 0) { + // If readDisable() was ever called during decodeData() and it meant to disable + // reading from downstream, the call must have been deferred. Call it now. + switchStreamBlockState(true); + } + return; + } + + if (!quic::VersionUsesQpack(transport_version()) && !FinishedReadingTrailers()) { // For Google QUIC implementation, trailers may arrived earlier and wait to // be consumed after reading all the body. Consume it here. // IETF QUIC shouldn't reach here because trailers are sent on same stream. decoder()->decodeTrailers(spdyHeaderBlockToEnvoyHeaders(received_trailers())); MarkTrailersConsumed(); } + OnFinRead(); + in_decode_data_callstack_ = false; } void EnvoyQuicServerStream::OnTrailingHeadersComplete(bool fin, size_t frame_len, @@ -107,25 +211,40 @@ void EnvoyQuicServerStream::OnTrailingHeadersComplete(bool fin, size_t frame_len void EnvoyQuicServerStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) { quic::QuicSpdyServerStreamBase::OnStreamReset(frame); - Http::StreamResetReason reason; - if (frame.error_code == quic::QUIC_REFUSED_STREAM) { - reason = Http::StreamResetReason::RemoteRefusedStreamReset; - } else { - reason = Http::StreamResetReason::RemoteReset; - } - runResetCallbacks(reason); + runResetCallbacks(quicRstErrorToEnvoyResetReason(frame.error_code)); } void EnvoyQuicServerStream::OnConnectionClosed(quic::QuicErrorCode error, quic::ConnectionCloseSource source) { quic::QuicSpdyServerStreamBase::OnConnectionClosed(error, source); - Http::StreamResetReason reason; - if (error == quic::QUIC_NO_ERROR) { - reason = Http::StreamResetReason::ConnectionTermination; - } else { - reason = Http::StreamResetReason::ConnectionFailure; + runResetCallbacks(quicErrorCodeToEnvoyResetReason(error)); +} + +void EnvoyQuicServerStream::OnClose() { + quic::QuicSpdyServerStreamBase::OnClose(); + if (BufferedDataBytes() > 0) { + // If the stream is closed without sending out all buffered data, regard + // them as sent now and adjust connection buffer book keeping. + filterManagerConnection()->adjustBytesToSend(0 - BufferedDataBytes()); } - runResetCallbacks(reason); +} + +void EnvoyQuicServerStream::OnCanWrite() { + uint64_t buffered_data_old = BufferedDataBytes(); + quic::QuicSpdyServerStreamBase::OnCanWrite(); + uint64_t buffered_data_new = BufferedDataBytes(); + // As long as OnCanWriteNewData() is no-op, data to sent in buffer shouldn't + // increase. + ASSERT(buffered_data_new <= buffered_data_old); + maybeCheckWatermark(buffered_data_old, buffered_data_new, *filterManagerConnection()); +} + +uint32_t EnvoyQuicServerStream::streamId() { return id(); } + +Network::Connection* EnvoyQuicServerStream::connection() { return filterManagerConnection(); } + +QuicFilterManagerConnectionImpl* EnvoyQuicServerStream::filterManagerConnection() { + return dynamic_cast(session()); } } // namespace Quic diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.h b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.h index 047970f4fdbed..e79ac3d081ed2 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.h @@ -18,11 +18,10 @@ namespace Quic { class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, public EnvoyQuicStream { public: EnvoyQuicServerStream(quic::QuicStreamId id, quic::QuicSpdySession* session, - quic::StreamType type) - : quic::QuicSpdyServerStreamBase(id, session, type) {} + quic::StreamType type); + EnvoyQuicServerStream(quic::PendingStream* pending, quic::QuicSpdySession* session, - quic::StreamType type) - : quic::QuicSpdyServerStreamBase(pending, session, type) {} + quic::StreamType type); // Http::StreamEncoder void encode100ContinueHeaders(const Http::HeaderMap& headers) override; @@ -33,19 +32,28 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase, public Envo // Http::Stream void resetStream(Http::StreamResetReason reason) override; - void readDisable(bool disable) override; // quic::QuicSpdyStream void OnBodyAvailable() override; void OnStreamReset(const quic::QuicRstStreamFrame& frame) override; + void OnClose() override; + void OnCanWrite() override; // quic::QuicServerSessionBase void OnConnectionClosed(quic::QuicErrorCode error, quic::ConnectionCloseSource source) override; protected: + // EnvoyQuicStream + void switchStreamBlockState(bool should_block) override; + uint32_t streamId() override; + Network::Connection* connection() override; + // quic::QuicSpdyStream void OnInitialHeadersComplete(bool fin, size_t frame_len, const quic::QuicHeaderList& header_list) override; void OnTrailingHeadersComplete(bool fin, size_t frame_len, const quic::QuicHeaderList& header_list) override; + +private: + QuicFilterManagerConnectionImpl* filterManagerConnection(); }; } // namespace Quic diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer.h b/source/extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer.h new file mode 100644 index 0000000000000..ef795cd57aaa4 --- /dev/null +++ b/source/extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer.h @@ -0,0 +1,70 @@ +#pragma once + +#include + +#include "common/common/assert.h" + +#include "spdlog/spdlog.h" + +namespace Envoy { +namespace Quic { + +// A class, together with a stand alone buffer, used to achieve the purpose of WatermarkBuffer. +// Itself doesn't have buffer or bookkeep buffered bytes. But provided with buffered_bytes, +// it reacts upon crossing high/low watermarks. +// It's no-op if provided low and high watermark are 0. +class EnvoyQuicSimulatedWatermarkBuffer { +public: + EnvoyQuicSimulatedWatermarkBuffer(uint32_t low_watermark, uint32_t high_watermark, + std::function below_low_watermark, + std::function above_high_watermark, + spdlog::logger& logger) + : low_watermark_(low_watermark), high_watermark_(high_watermark), + below_low_watermark_(std::move(below_low_watermark)), + above_high_watermark_(std::move(above_high_watermark)), logger_(logger) { + ASSERT((high_watermark == 0 && low_watermark == 0) || (high_watermark_ > low_watermark_)); + } + + uint32_t highWatermark() const { return high_watermark_; } + + void checkHighWatermark(uint32_t bytes_buffered) { + if (high_watermark_ > 0 && !is_full_ && bytes_buffered > high_watermark_) { + // Transitioning from below low watermark to above high watermark. + ENVOY_LOG_TO_LOGGER(logger_, debug, "Buffered {} bytes, cross high watermark {}", + bytes_buffered, high_watermark_); + is_full_ = true; + above_high_watermark_(); + } + } + + void checkLowWatermark(uint32_t bytes_buffered) { + if (low_watermark_ > 0 && is_full_ && bytes_buffered < low_watermark_) { + // Transitioning from above high watermark to below low watermark. + ENVOY_LOG_TO_LOGGER(logger_, debug, "Buffered {} bytes, cross low watermark {}", + bytes_buffered, low_watermark_); + is_full_ = false; + below_low_watermark_(); + } + } + + // True after the buffer goes above high watermark and hasn't come down below low + // watermark yet, even though the buffered data might be between high and low + // watermarks. + bool isAboveHighWatermark() const { return is_full_; } + + // True before the buffer crosses the high watermark for the first time and after the buffer goes + // below low watermark and hasn't come up above high watermark yet, even though the buffered data + // might be between high and low watermarks. + bool isBelowLowWatermark() const { return !is_full_; } + +private: + uint32_t low_watermark_{0}; + uint32_t high_watermark_{0}; + bool is_full_{false}; + std::function below_low_watermark_; + std::function above_high_watermark_; + spdlog::logger& logger_; +}; + +} // namespace Quic +} // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_stream.h b/source/extensions/quic_listeners/quiche/envoy_quic_stream.h index a6126224cc688..228d0cf645926 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_stream.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_stream.h @@ -1,41 +1,114 @@ #pragma once +#include "envoy/event/dispatcher.h" #include "envoy/http/codec.h" #include "common/http/codec_helper.h" +#include "extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer.h" +#include "extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h" + namespace Envoy { namespace Quic { // Base class for EnvoyQuicServer|ClientStream. class EnvoyQuicStream : public Http::StreamEncoder, public Http::Stream, - public Http::StreamCallbackHelper { + public Http::StreamCallbackHelper, + protected Logger::Loggable { public: + // |buffer_limit| is the high watermark of the stream send buffer, and the low + // watermark will be half of it. + EnvoyQuicStream(uint32_t buffer_limit, std::function below_low_watermark, + std::function above_high_watermark) + : send_buffer_simulation_(buffer_limit / 2, buffer_limit, std::move(below_low_watermark), + std::move(above_high_watermark), ENVOY_LOGGER()) {} + // Http::StreamEncoder Stream& getStream() override { return *this; } // Http::Stream + void readDisable(bool disable) override { + bool status_changed{false}; + if (disable) { + ++read_disable_counter_; + if (read_disable_counter_ == 1) { + status_changed = true; + } + } else { + ASSERT(read_disable_counter_ > 0); + --read_disable_counter_; + if (read_disable_counter_ == 0) { + status_changed = true; + } + } + + if (status_changed && !in_decode_data_callstack_) { + // Avoid calling this while decoding data because transient disabling and + // enabling reading may trigger another decoding data inside the + // callstack which messes up stream state. + switchStreamBlockState(disable); + } + } + void addCallbacks(Http::StreamCallbacks& callbacks) override { ASSERT(!local_end_stream_); addCallbacks_(callbacks); } void removeCallbacks(Http::StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); } - uint32_t bufferLimit() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + uint32_t bufferLimit() override { return send_buffer_simulation_.highWatermark(); } // Needs to be called during quic stream creation before the stream receives // any headers and data. void setDecoder(Http::StreamDecoder& decoder) { decoder_ = &decoder; } + void maybeCheckWatermark(uint64_t buffered_data_old, uint64_t buffered_data_new, + QuicFilterManagerConnectionImpl& connection) { + if (buffered_data_new == buffered_data_old) { + return; + } + // If buffered bytes changed, update stream and session's watermark book + // keeping. + if (buffered_data_new > buffered_data_old) { + send_buffer_simulation_.checkHighWatermark(buffered_data_new); + } else { + send_buffer_simulation_.checkLowWatermark(buffered_data_new); + } + connection.adjustBytesToSend(buffered_data_new - buffered_data_old); + } + protected: + virtual void switchStreamBlockState(bool should_block) PURE; + + // Needed for ENVOY_STREAM_LOG. + virtual uint32_t streamId() PURE; + virtual Network::Connection* connection() PURE; + Http::StreamDecoder* decoder() { ASSERT(decoder_ != nullptr); return decoder_; } + // True once end of stream is propagated to Envoy. Envoy doesn't expect to be + // notified more than once about end of stream. So once this is true, no need + // to set it in the callback to Envoy stream any more. + bool end_stream_decoded_{false}; + uint32_t read_disable_counter_{0u}; + // If true, switchStreamBlockState() should be deferred till this variable + // becomes false. + bool in_decode_data_callstack_{false}; + private: // Not owned. Http::StreamDecoder* decoder_{nullptr}; + // Keeps track of bytes buffered in the stream send buffer in QUICHE and reacts + // upon crossing high and low watermarks. + // Its high watermark is also the buffer limit of stream read/write filters in + // HCM. + // There is no receive buffer simulation because Quic stream's + // OnBodyDataAvailable() hands all the ready-to-use request data from stream sequencer to HCM + // directly and buffers them in filters if needed. Itself doesn't buffer request data. + EnvoyQuicSimulatedWatermarkBuffer send_buffer_simulation_; }; } // namespace Quic diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_utils.cc b/source/extensions/quic_listeners/quiche/envoy_quic_utils.cc index 33e0c43fc035b..6be73faef9546 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_utils.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_utils.cc @@ -62,5 +62,50 @@ Http::HeaderMapImplPtr spdyHeaderBlockToEnvoyHeaders(const spdy::SpdyHeaderBlock return headers; } +spdy::SpdyHeaderBlock envoyHeadersToSpdyHeaderBlock(const Http::HeaderMap& headers) { + spdy::SpdyHeaderBlock header_block; + headers.iterate( + [](const Http::HeaderEntry& header, void* context) -> Http::HeaderMap::Iterate { + auto spdy_headers = static_cast(context); + // The key-value pairs are copied. + spdy_headers->insert({header.key().getStringView(), header.value().getStringView()}); + return Http::HeaderMap::Iterate::Continue; + }, + &header_block); + return header_block; +} + +quic::QuicRstStreamErrorCode envoyResetReasonToQuicRstError(Http::StreamResetReason reason) { + switch (reason) { + case Http::StreamResetReason::LocalRefusedStreamReset: + return quic::QUIC_REFUSED_STREAM; + case Http::StreamResetReason::ConnectionFailure: + return quic::QUIC_STREAM_CONNECTION_ERROR; + case Http::StreamResetReason::LocalReset: + return quic::QUIC_STREAM_CANCELLED; + case Http::StreamResetReason::ConnectionTermination: + return quic::QUIC_STREAM_NO_ERROR; + default: + return quic::QUIC_BAD_APPLICATION_PAYLOAD; + } +} + +Http::StreamResetReason quicRstErrorToEnvoyResetReason(quic::QuicRstStreamErrorCode rst_err) { + switch (rst_err) { + case quic::QUIC_REFUSED_STREAM: + return Http::StreamResetReason::RemoteRefusedStreamReset; + default: + return Http::StreamResetReason::RemoteReset; + } +} + +Http::StreamResetReason quicErrorCodeToEnvoyResetReason(quic::QuicErrorCode error) { + if (error == quic::QUIC_NO_ERROR) { + return Http::StreamResetReason::ConnectionTermination; + } else { + return Http::StreamResetReason::ConnectionFailure; + } +} + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_utils.h b/source/extensions/quic_listeners/quiche/envoy_quic_utils.h index 54b1bf07f6036..6505b22a66e28 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_utils.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_utils.h @@ -25,5 +25,16 @@ Http::HeaderMapImplPtr quicHeadersToEnvoyHeaders(const quic::QuicHeaderList& hea Http::HeaderMapImplPtr spdyHeaderBlockToEnvoyHeaders(const spdy::SpdyHeaderBlock& header_block); +spdy::SpdyHeaderBlock envoyHeadersToSpdyHeaderBlock(const Http::HeaderMap& headers); + +// Called when Envoy wants to reset the underlying QUIC stream. +quic::QuicRstStreamErrorCode envoyResetReasonToQuicRstError(Http::StreamResetReason reason); + +// Called when a RST_STREAM frame is received. +Http::StreamResetReason quicRstErrorToEnvoyResetReason(quic::QuicRstStreamErrorCode rst_err); + +// Called when underlying QUIC connection is closed either locally or by peer. +Http::StreamResetReason quicErrorCodeToEnvoyResetReason(quic::QuicErrorCode error); + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc index d964414470467..e48374fe49847 100644 --- a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc +++ b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc @@ -5,13 +5,16 @@ namespace Envoy { namespace Quic { -QuicFilterManagerConnectionImpl::QuicFilterManagerConnectionImpl( - std::unique_ptr connection, Event::Dispatcher& dispatcher) - : quic_connection_(std::move(connection)), filter_manager_(*this), dispatcher_(dispatcher), +QuicFilterManagerConnectionImpl::QuicFilterManagerConnectionImpl(EnvoyQuicConnection* connection, + Event::Dispatcher& dispatcher, + uint32_t send_buffer_limit) + : quic_connection_(connection), dispatcher_(dispatcher), filter_manager_(*this), // QUIC connection id can be 18 bytes. It's easier to use hash value instead // of trying to map it into a 64-bit space. - stream_info_(dispatcher.timeSource()), id_(quic_connection_->connection_id().Hash()) { - // TODO(danzh): Use QUIC specific enum value. + stream_info_(dispatcher.timeSource()), id_(quic_connection_->connection_id().Hash()), + write_buffer_watermark_simulation_( + send_buffer_limit / 2, send_buffer_limit, [this]() { onSendBufferLowWatermark(); }, + [this]() { onSendBufferHighWatermark(); }, ENVOY_LOGGER()) { stream_info_.protocol(Http::Protocol::Http2); } @@ -40,18 +43,27 @@ void QuicFilterManagerConnectionImpl::enableHalfClose(bool enabled) { } void QuicFilterManagerConnectionImpl::setBufferLimits(uint32_t /*limit*/) { - // TODO(danzh): add interface to quic for connection level buffer throttling. - // Currently read buffer is capped by connection level flow control. And - // write buffer is not capped. + // Currently read buffer is capped by connection level flow control. And write buffer limit is set + // during construction. Changing the buffer limit during the life time of the connection is not + // supported. NOT_REACHED_GCOVR_EXCL_LINE; } +bool QuicFilterManagerConnectionImpl::aboveHighWatermark() const { + return write_buffer_watermark_simulation_.isAboveHighWatermark(); +} + void QuicFilterManagerConnectionImpl::close(Network::ConnectionCloseType type) { if (type != Network::ConnectionCloseType::NoFlush) { // TODO(danzh): Implement FlushWrite and FlushWriteAndDelay mode. } + if (quic_connection_ == nullptr) { + // Already detached from quic connection. + return; + } quic_connection_->CloseConnection(quic::QUIC_NO_ERROR, "Closed by application", quic::ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + quic_connection_ = nullptr; } void QuicFilterManagerConnectionImpl::setDelayedCloseTimeout(std::chrono::milliseconds timeout) { @@ -90,14 +102,22 @@ void QuicFilterManagerConnectionImpl::rawWrite(Buffer::Instance& /*data*/, bool NOT_REACHED_GCOVR_EXCL_LINE; } +void QuicFilterManagerConnectionImpl::adjustBytesToSend(int64_t delta) { + bytes_to_send_ += delta; + write_buffer_watermark_simulation_.checkHighWatermark(bytes_to_send_); + write_buffer_watermark_simulation_.checkLowWatermark(bytes_to_send_); +} + void QuicFilterManagerConnectionImpl::onConnectionCloseEvent( const quic::QuicConnectionCloseFrame& frame, quic::ConnectionCloseSource source) { - // Tell network callbacks about connection close. - raiseEvent(source == quic::ConnectionCloseSource::FROM_PEER - ? Network::ConnectionEvent::RemoteClose - : Network::ConnectionEvent::LocalClose); transport_failure_reason_ = absl::StrCat(quic::QuicErrorCodeToString(frame.quic_error_code), " with details: ", frame.error_details); + if (quic_connection_ != nullptr) { + // Tell network callbacks about connection close if not detached yet. + raiseEvent(source == quic::ConnectionCloseSource::FROM_PEER + ? Network::ConnectionEvent::RemoteClose + : Network::ConnectionEvent::LocalClose); + } } void QuicFilterManagerConnectionImpl::raiseEvent(Network::ConnectionEvent event) { @@ -106,5 +126,19 @@ void QuicFilterManagerConnectionImpl::raiseEvent(Network::ConnectionEvent event) } } +void QuicFilterManagerConnectionImpl::onSendBufferHighWatermark() { + ENVOY_CONN_LOG(trace, "onSendBufferHighWatermark", *this); + for (auto callback : network_connection_callbacks_) { + callback->onAboveWriteBufferHighWatermark(); + } +} + +void QuicFilterManagerConnectionImpl::onSendBufferLowWatermark() { + ENVOY_CONN_LOG(trace, "onSendBufferLowWatermark", *this); + for (auto callback : network_connection_callbacks_) { + callback->onBelowWriteBufferLowWatermark(); + } +} + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h index b7f1d2b50bd30..0db77afce6e5a 100644 --- a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h +++ b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h @@ -9,6 +9,7 @@ #include "common/stream_info/stream_info_impl.h" #include "extensions/quic_listeners/quiche/envoy_quic_connection.h" +#include "extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer.h" namespace Envoy { namespace Quic { @@ -17,8 +18,8 @@ namespace Quic { class QuicFilterManagerConnectionImpl : public Network::FilterManagerConnection, protected Logger::Loggable { public: - QuicFilterManagerConnectionImpl(std::unique_ptr connection, - Event::Dispatcher& dispatcher); + QuicFilterManagerConnectionImpl(EnvoyQuicConnection* connection, Event::Dispatcher& dispatcher, + uint32_t send_buffer_limit); // Network::FilterManager // Overridden to delegate calls to filter_manager_. @@ -37,6 +38,8 @@ class QuicFilterManagerConnectionImpl : public Network::FilterManagerConnection, void enableHalfClose(bool enabled) override; void close(Network::ConnectionCloseType type) override; Event::Dispatcher& dispatcher() override { return dispatcher_; } + // Using this for purpose other than logging is not safe. Because QUIC connection id can be + // 18 bytes, so there might be collision when it's hashed to 8 bytes. uint64_t id() const override { return id_; } std::string nextProtocol() const override { return EMPTY_STRING; } void noDelay(bool /*enable*/) override { @@ -59,8 +62,10 @@ class QuicFilterManagerConnectionImpl : public Network::FilterManagerConnection, } Ssl::ConnectionInfoConstSharedPtr ssl() const override; Network::Connection::State state() const override { - return quic_connection_->connected() ? Network::Connection::State::Open - : Network::Connection::State::Closed; + if (quic_connection_ != nullptr && quic_connection_->connected()) { + return Network::Connection::State::Open; + } + return Network::Connection::State::Closed; } void write(Buffer::Instance& /*data*/, bool /*end_stream*/) override { // All writes should be handled by Quic internally. @@ -75,11 +80,8 @@ class QuicFilterManagerConnectionImpl : public Network::FilterManagerConnection, // SO_ORIGINAL_DST not supported by QUIC. NOT_REACHED_GCOVR_EXCL_LINE; } - bool aboveHighWatermark() const override { - // TODO(danzh) Aggregate the write buffer usage cross all the streams and - // add an upper limit for this connection. - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; - } + bool aboveHighWatermark() const override; + const Network::ConnectionSocket::OptionsSharedPtr& socketOptions() const override; StreamInfo::StreamInfo& streamInfo() override { return stream_info_; } const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; } @@ -96,6 +98,10 @@ class QuicFilterManagerConnectionImpl : public Network::FilterManagerConnection, // Network::WriteBufferSource Network::StreamBuffer getWriteBuffer() override { NOT_REACHED_GCOVR_EXCL_LINE; } + // Update the book keeping of the aggregated buffered bytes cross all the + // streams, and run watermark check. + void adjustBytesToSend(int64_t delta); + protected: // Propagate connection close to network_connection_callbacks_. void onConnectionCloseEvent(const quic::QuicConnectionCloseFrame& frame, @@ -103,23 +109,35 @@ class QuicFilterManagerConnectionImpl : public Network::FilterManagerConnection, void raiseEvent(Network::ConnectionEvent event); - std::unique_ptr quic_connection_; + EnvoyQuicConnection* quic_connection_; // TODO(danzh): populate stats. std::unique_ptr stats_; + Event::Dispatcher& dispatcher_; private: + // Called when aggregated buffered bytes across all the streams exceeds high watermark. + void onSendBufferHighWatermark(); + // Called when aggregated buffered bytes across all the streams declines to low watermark. + void onSendBufferLowWatermark(); + // Currently ConnectionManagerImpl is the one and only filter. If more network // filters are added, ConnectionManagerImpl should always be the last one. // Its onRead() is only called once to trigger ReadFilter::onNewConnection() // and the rest incoming data bypasses these filters. Network::FilterManagerImpl filter_manager_; - Event::Dispatcher& dispatcher_; + StreamInfo::StreamInfoImpl stream_info_; // These callbacks are owned by network filters and quic session should out live // them. std::list network_connection_callbacks_; std::string transport_failure_reason_; const uint64_t id_; + uint32_t bytes_to_send_{0}; + // Keeps the buffer state of the connection, and react upon the changes of how many bytes are + // buffered cross all streams' send buffer. The state is evaluated and may be changed upon each + // stream write. QUICHE doesn't buffer data in connection, all the data is buffered in stream's + // send buffer. + EnvoyQuicSimulatedWatermarkBuffer write_buffer_watermark_simulation_; }; } // namespace Quic diff --git a/test/extensions/quic_listeners/quiche/BUILD b/test/extensions/quic_listeners/quiche/BUILD index 418fb2676fe22..ad57d8c3ef6a4 100644 --- a/test/extensions/quic_listeners/quiche/BUILD +++ b/test/extensions/quic_listeners/quiche/BUILD @@ -58,11 +58,13 @@ envoy_cc_test( tags = ["nofips"], deps = [ ":quic_test_utils_for_envoy_lib", + ":test_utils_lib", "//source/common/http:headers_lib", "//source/extensions/quic_listeners/quiche:envoy_quic_alarm_factory_lib", "//source/extensions/quic_listeners/quiche:envoy_quic_connection_helper_lib", "//source/extensions/quic_listeners/quiche:envoy_quic_server_connection_lib", - "//source/extensions/quic_listeners/quiche:envoy_quic_server_stream_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_server_session_lib", + "//test/mocks/http:http_mocks", "//test/mocks/http:stream_decoder_mock", "//test/mocks/network:network_mocks", "//test/test_common:utility_lib", @@ -92,6 +94,8 @@ envoy_cc_test( "//test/test_common:global_lib", "//test/test_common:logging_lib", "//test/test_common:simulated_time_system_lib", + "@com_googlesource_quiche//:quic_test_tools_server_session_base_peer", + "@com_googlesource_quiche//:quic_test_tools_test_utils_interface_lib", ], ) @@ -180,3 +184,17 @@ envoy_cc_test( "@envoy_api//envoy/api/v2/listener:pkg_cc_proto", ], ) + +envoy_cc_test( + name = "envoy_quic_simulated_watermark_buffer_test", + srcs = ["envoy_quic_simulated_watermark_buffer_test.cc"], + tags = ["nofips"], + deps = ["//source/extensions/quic_listeners/quiche:envoy_quic_simulated_watermark_buffer_lib"], +) + +envoy_cc_test_library( + name = "test_utils_lib", + hdrs = ["test_utils.h"], + tags = ["nofips"], + deps = ["@com_googlesource_quiche//:quic_core_http_spdy_session_lib"], +) diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc index 29ab8847b3f9d..4dd6f1e5ad422 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc @@ -84,6 +84,8 @@ class EnvoyQuicDispatcherTest : public testing::TestWithParam read_filter(new Network::MockReadFilter()); Network::MockConnectionCallbacks network_connection_callbacks; + testing::StrictMock read_total; + testing::StrictMock read_current; + testing::StrictMock write_total; + testing::StrictMock write_current; + std::vector filter_factory( {[&](Network::FilterManager& filter_manager) { filter_manager.addReadFilter(read_filter); read_filter->callbacks_->connection().addConnectionCallbacks(network_connection_callbacks); + read_filter->callbacks_->connection().setConnectionStats( + {read_total, read_current, write_total, write_current, nullptr, nullptr}); }}); EXPECT_CALL(filter_chain, networkFilterFactories()).WillOnce(ReturnRef(filter_factory)); EXPECT_CALL(listener_config_, filterChainFactory()); diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc index d103382931759..290e44cb43e8b 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc @@ -4,8 +4,13 @@ // QUICHE uses offsetof(). #pragma GCC diagnostic ignored "-Winvalid-offsetof" +#include "quiche/quic/core/crypto/null_encrypter.h" +#include "quiche/quic/core/quic_utils.h" #include "quiche/quic/core/quic_versions.h" #include "quiche/quic/test_tools/crypto_test_utils.h" +#include "quiche/quic/test_tools/quic_config_peer.h" +#include "quiche/quic/test_tools/quic_connection_peer.h" +#include "quiche/quic/test_tools/quic_server_session_base_peer.h" #include "quiche/quic/test_tools/quic_test_utils.h" #pragma GCC diagnostic pop @@ -13,6 +18,7 @@ #include #include "extensions/quic_listeners/quiche/envoy_quic_server_session.h" +#include "extensions/quic_listeners/quiche/envoy_quic_server_stream.h" #include "extensions/quic_listeners/quiche/envoy_quic_server_connection.h" #include "extensions/quic_listeners/quiche/codec_impl.h" #include "extensions/quic_listeners/quiche/envoy_quic_connection_helper.h" @@ -36,6 +42,7 @@ #include "gtest/gtest.h" using testing::_; +using testing::AnyNumber; using testing::Invoke; using testing::Return; using testing::ReturnRef; @@ -66,6 +73,18 @@ class TestEnvoyQuicServerConnection : public EnvoyQuicServerConnection { MOCK_METHOD1(SendControlFrame, bool(const quic::QuicFrame& frame)); }; +// Derive to have simpler priority mechanism. +class TestEnvoyQuicServerSession : public EnvoyQuicServerSession { +public: + using EnvoyQuicServerSession::EnvoyQuicServerSession; + + bool ShouldYield(quic::QuicStreamId /*stream_id*/) override { + // Never yield to other stream so that it's easier to predict stream write + // behavior. + return false; + } +}; + class EnvoyQuicServerSessionTest : public testing::TestWithParam { public: EnvoyQuicServerSessionTest() @@ -87,13 +106,18 @@ class EnvoyQuicServerSessionTest : public testing::TestWithParam { envoy_quic_session_(quic_config_, quic_version_, std::unique_ptr(quic_connection_), /*visitor=*/nullptr, &crypto_stream_helper_, &crypto_config_, - &compressed_certs_cache_, *dispatcher_), + &compressed_certs_cache_, *dispatcher_, + /*send_buffer_limit*/ quic::kDefaultFlowControlSendWindow * 1.5), read_filter_(new Network::MockReadFilter()) { + EXPECT_EQ(time_system_.systemTime(), envoy_quic_session_.streamInfo().startTime()); EXPECT_EQ(EMPTY_STRING, envoy_quic_session_.nextProtocol()); time_system_.sleep(std::chrono::milliseconds(1)); ON_CALL(writer_, WritePacket(_, _, _, _, _)) - .WillByDefault(testing::Return(quic::WriteResult(quic::WRITE_STATUS_OK, 1))); + .WillByDefault(Invoke([](const char*, size_t buf_len, const quic::QuicIpAddress&, + const quic::QuicSocketAddress&, quic::PerPacketOptions*) { + return quic::WriteResult{quic::WRITE_STATUS_OK, static_cast(buf_len)}; + })); ON_CALL(crypto_stream_helper_, CanAcceptClientHello(_, _, _, _, _)).WillByDefault(Return(true)); } @@ -144,7 +168,7 @@ class EnvoyQuicServerSessionTest : public testing::TestWithParam { quic::QuicConfig quic_config_; quic::QuicCryptoServerConfig crypto_config_; testing::NiceMock crypto_stream_helper_; - EnvoyQuicServerSession envoy_quic_session_; + TestEnvoyQuicServerSession envoy_quic_session_; quic::QuicCompressedCertsCache compressed_certs_cache_{100}; std::shared_ptr read_filter_; Network::MockConnectionCallbacks network_connection_callbacks_; @@ -185,9 +209,6 @@ TEST_P(EnvoyQuicServerSessionTest, NewStream) { EXPECT_EQ(Http::Headers::get().MethodValues.Get, decoded_headers->Method()->value().getStringView()); })); - EXPECT_CALL(request_decoder, decodeData(_, true)) - .Times(testing::AtMost(1)) - .WillOnce(Invoke([](Buffer::Instance& buffer, bool) { EXPECT_EQ(0, buffer.length()); })); stream->OnStreamHeaderList(/*fin=*/true, headers.uncompressed_header_bytes(), headers); } @@ -314,7 +335,6 @@ TEST_P(EnvoyQuicServerSessionTest, ShutdownNotice) { // Not verifying dummy implementation, just to have coverage. EXPECT_DEATH(envoy_quic_session_.enableHalfClose(true), ""); EXPECT_EQ(nullptr, envoy_quic_session_.ssl()); - EXPECT_DEATH(envoy_quic_session_.aboveHighWatermark(), ""); EXPECT_DEATH(envoy_quic_session_.setDelayedCloseTimeout(std::chrono::milliseconds(1)), ""); http_connection_->shutdownNotice(); } @@ -362,6 +382,8 @@ TEST_P(EnvoyQuicServerSessionTest, InitializeFilterChain) { Network::FilterManager& filter_manager) { filter_manager.addReadFilter(read_filter_); read_filter_->callbacks_->connection().addConnectionCallbacks(network_connection_callbacks_); + read_filter_->callbacks_->connection().setConnectionStats( + {read_total_, read_current_, write_total_, write_current_, nullptr, nullptr}); }}; EXPECT_CALL(filter_chain, networkFilterFactories()).WillOnce(ReturnRef(filter_factory)); EXPECT_CALL(*read_filter_, onNewConnection()) @@ -392,5 +414,200 @@ TEST_P(EnvoyQuicServerSessionTest, NetworkConnectionInterface) { EXPECT_TRUE(envoy_quic_session_.readEnabled()); } +class TestQuicCryptoServerStream : public quic::QuicCryptoServerStream { +public: + using quic::QuicCryptoServerStream::QuicCryptoServerStream; + + bool encryption_established() const override { return true; } +}; + +TEST_P(EnvoyQuicServerSessionTest, SendBufferWatermark) { + // Switch to a encryption forward secure crypto stream. + quic::test::QuicServerSessionBasePeer::SetCryptoStream(&envoy_quic_session_, nullptr); + quic::test::QuicServerSessionBasePeer::SetCryptoStream( + &envoy_quic_session_, + new TestQuicCryptoServerStream(&crypto_config_, &compressed_certs_cache_, + &envoy_quic_session_, &crypto_stream_helper_)); + quic_connection_->SetDefaultEncryptionLevel(quic::ENCRYPTION_FORWARD_SECURE); + quic_connection_->SetEncrypter( + quic::ENCRYPTION_FORWARD_SECURE, + std::make_unique(quic::Perspective::IS_SERVER)); + // Drive congestion control manually. + auto send_algorithm = new testing::NiceMock; + quic::test::QuicConnectionPeer::SetSendAlgorithm(quic_connection_, send_algorithm); + EXPECT_CALL(*send_algorithm, CanSend(_)).WillRepeatedly(Return(true)); + EXPECT_CALL(*send_algorithm, GetCongestionWindow()).WillRepeatedly(Return(quic::kDefaultTCPMSS)); + EXPECT_CALL(*send_algorithm, PacingRate(_)).WillRepeatedly(Return(quic::QuicBandwidth::Zero())); + EXPECT_CALL(*send_algorithm, BandwidthEstimate()) + .WillRepeatedly(Return(quic::QuicBandwidth::Zero())); + EXPECT_CALL(*quic_connection_, SendControlFrame(_)).Times(AnyNumber()); + + // Bump connection flow control window large enough not to interfere + // stream writing. + envoy_quic_session_.flow_controller()->UpdateSendWindowOffset( + 10 * quic::kDefaultFlowControlSendWindow); + installReadFilter(); + Http::MockStreamDecoder request_decoder; + Http::MockStreamCallbacks stream_callbacks; + EXPECT_CALL(http_connection_callbacks_, newStream(_, false)) + .WillOnce(Invoke([&request_decoder, &stream_callbacks](Http::StreamEncoder& encoder, + bool) -> Http::StreamDecoder& { + encoder.getStream().addCallbacks(stream_callbacks); + return request_decoder; + })); + quic::QuicStreamId stream_id = + quic_version_[0].transport_version == quic::QUIC_VERSION_99 ? 4u : 5u; + auto stream1 = + dynamic_cast(envoy_quic_session_.GetOrCreateStream(stream_id)); + + // Receive a GET request on created stream. + quic::QuicHeaderList request_headers; + request_headers.OnHeaderBlockStart(); + std::string host("www.abc.com"); + request_headers.OnHeader(":authority", host); + request_headers.OnHeader(":method", "GET"); + request_headers.OnHeader(":path", "/"); + request_headers.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, /*compressed_header_bytes=*/0); + // Request headers should be propagated to decoder. + EXPECT_CALL(request_decoder, decodeHeaders_(_, /*end_stream=*/true)) + .WillOnce(Invoke([&host](const Http::HeaderMapPtr& decoded_headers, bool) { + EXPECT_EQ(host, decoded_headers->Host()->value().getStringView()); + EXPECT_EQ("/", decoded_headers->Path()->value().getStringView()); + EXPECT_EQ(Http::Headers::get().MethodValues.Get, + decoded_headers->Method()->value().getStringView()); + })); + stream1->OnStreamHeaderList(/*fin=*/true, request_headers.uncompressed_header_bytes(), + request_headers); + + Http::TestHeaderMapImpl response_headers{{":status", "200"}, + {":content-length", "32770"}}; // 32KB + 2 bytes + + stream1->encodeHeaders(response_headers, false); + std::string response(32 * 1024 + 1, 'a'); + Buffer::OwnedImpl buffer(response); + EXPECT_CALL(stream_callbacks, onAboveWriteBufferHighWatermark()); + stream1->encodeData(buffer, false); + EXPECT_TRUE(stream1->flow_controller()->IsBlocked()); + EXPECT_FALSE(envoy_quic_session_.IsConnectionFlowControlBlocked()); + + // Receive another request and send back response to trigger connection level + // send buffer watermark. + Http::MockStreamDecoder request_decoder2; + Http::MockStreamCallbacks stream_callbacks2; + EXPECT_CALL(http_connection_callbacks_, newStream(_, false)) + .WillOnce(Invoke([&request_decoder2, &stream_callbacks2](Http::StreamEncoder& encoder, + bool) -> Http::StreamDecoder& { + encoder.getStream().addCallbacks(stream_callbacks2); + return request_decoder2; + })); + auto stream2 = + dynamic_cast(envoy_quic_session_.GetOrCreateStream(stream_id + 4)); + EXPECT_CALL(request_decoder2, decodeHeaders_(_, /*end_stream=*/true)) + .WillOnce(Invoke([&host](const Http::HeaderMapPtr& decoded_headers, bool) { + EXPECT_EQ(host, decoded_headers->Host()->value().getStringView()); + EXPECT_EQ("/", decoded_headers->Path()->value().getStringView()); + EXPECT_EQ(Http::Headers::get().MethodValues.Get, + decoded_headers->Method()->value().getStringView()); + })); + stream2->OnStreamHeaderList(/*fin=*/true, request_headers.uncompressed_header_bytes(), + request_headers); + stream2->encodeHeaders(response_headers, false); + // This response will trigger both stream and connection's send buffer watermark upper limits. + Buffer::OwnedImpl buffer2(response); + EXPECT_CALL(network_connection_callbacks_, onAboveWriteBufferHighWatermark) + .WillOnce(Invoke( + [this]() { http_connection_->onUnderlyingConnectionAboveWriteBufferHighWatermark(); })); + EXPECT_CALL(stream_callbacks2, onAboveWriteBufferHighWatermark()).Times(2); + EXPECT_CALL(stream_callbacks, onAboveWriteBufferHighWatermark()); + stream2->encodeData(buffer2, false); + + // Receive another request, the new stream should be notified about connection + // high watermark reached upon creation. + Http::MockStreamDecoder request_decoder3; + Http::MockStreamCallbacks stream_callbacks3; + EXPECT_CALL(http_connection_callbacks_, newStream(_, false)) + .WillOnce(Invoke([&request_decoder3, &stream_callbacks3](Http::StreamEncoder& encoder, + bool) -> Http::StreamDecoder& { + encoder.getStream().addCallbacks(stream_callbacks3); + return request_decoder3; + })); + EXPECT_CALL(stream_callbacks3, onAboveWriteBufferHighWatermark()); + auto stream3 = + dynamic_cast(envoy_quic_session_.GetOrCreateStream(stream_id + 8)); + EXPECT_CALL(request_decoder3, decodeHeaders_(_, /*end_stream=*/true)) + .WillOnce(Invoke([&host](const Http::HeaderMapPtr& decoded_headers, bool) { + EXPECT_EQ(host, decoded_headers->Host()->value().getStringView()); + EXPECT_EQ("/", decoded_headers->Path()->value().getStringView()); + EXPECT_EQ(Http::Headers::get().MethodValues.Get, + decoded_headers->Method()->value().getStringView()); + })); + stream3->OnStreamHeaderList(/*fin=*/true, request_headers.uncompressed_header_bytes(), + request_headers); + + // Update flow control window for stream1. + quic::QuicWindowUpdateFrame window_update1(quic::kInvalidControlFrameId, stream1->id(), + 32 * 1024); + stream1->OnWindowUpdateFrame(window_update1); + EXPECT_CALL(stream_callbacks, onBelowWriteBufferLowWatermark()).WillOnce(Invoke([stream1]() { + // Write rest response to stream1. + std::string rest_response(1, 'a'); + Buffer::OwnedImpl buffer(rest_response); + stream1->encodeData(buffer, true); + })); + envoy_quic_session_.OnCanWrite(); + EXPECT_TRUE(stream1->flow_controller()->IsBlocked()); + + // Update flow control window for stream2. + quic::QuicWindowUpdateFrame window_update2(quic::kInvalidControlFrameId, stream2->id(), + 32 * 1024); + stream2->OnWindowUpdateFrame(window_update2); + EXPECT_CALL(stream_callbacks2, onBelowWriteBufferLowWatermark()).WillOnce(Invoke([stream2]() { + // Write rest response to stream2. + std::string rest_response(1, 'a'); + Buffer::OwnedImpl buffer(rest_response); + stream2->encodeData(buffer, true); + })); + // Writing out another 16k on stream2 will trigger connection's send buffer + // come down below low watermark. + EXPECT_CALL(network_connection_callbacks_, onBelowWriteBufferLowWatermark) + .WillOnce(Invoke([this]() { + // This call shouldn't be propagate to stream1 and stream2 because they both wrote to the + // end of stream. + http_connection_->onUnderlyingConnectionBelowWriteBufferLowWatermark(); + })); + EXPECT_CALL(stream_callbacks3, onBelowWriteBufferLowWatermark()).WillOnce(Invoke([=]() { + std::string super_large_response(40 * 1024, 'a'); + Buffer::OwnedImpl buffer(super_large_response); + // This call will buffer 24k on stream3, raise the buffered bytes above + // high watermarks of the stream and connection. + // But callback will not propagate to stream_callback3 as the steam is + // ended locally. + stream3->encodeData(buffer, true); + })); + EXPECT_CALL(network_connection_callbacks_, onAboveWriteBufferHighWatermark()); + envoy_quic_session_.OnCanWrite(); + EXPECT_TRUE(stream2->flow_controller()->IsBlocked()); + + // Resetting stream3 should lower the buffered bytes, but callbacks will not + // be triggered because reset callback has been already triggered. + EXPECT_CALL(stream_callbacks3, onResetStream(Http::StreamResetReason::LocalReset, "")); + // Connection buffered data book keeping should also be updated. + EXPECT_CALL(network_connection_callbacks_, onBelowWriteBufferLowWatermark()); + stream3->resetStream(Http::StreamResetReason::LocalReset); + + // Update flow control window for stream1. + quic::QuicWindowUpdateFrame window_update3(quic::kInvalidControlFrameId, stream1->id(), + 48 * 1024); + stream1->OnWindowUpdateFrame(window_update3); + // Update flow control window for stream2. + quic::QuicWindowUpdateFrame window_update4(quic::kInvalidControlFrameId, stream2->id(), + 48 * 1024); + stream2->OnWindowUpdateFrame(window_update4); + envoy_quic_session_.OnCanWrite(); + + EXPECT_TRUE(stream1->write_side_closed()); + EXPECT_TRUE(stream2->write_side_closed()); +} + } // namespace Quic } // namespace Envoy diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_server_stream_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_server_stream_test.cc index b77714985cc17..e3bbaac8d0e4a 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_server_stream_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_server_stream_test.cc @@ -1,59 +1,30 @@ -#include "extensions/quic_listeners/quiche/envoy_quic_server_stream.h" - -#pragma GCC diagnostic push -// QUICHE allows unused parameters. -#pragma GCC diagnostic ignored "-Wunused-parameter" -// QUICHE uses offsetof(). -#pragma GCC diagnostic ignored "-Winvalid-offsetof" - -#include "quiche/quic/core/quic_versions.h" -#include "quiche/quic/core/http/quic_server_session_base.h" -#include "quiche/quic/test_tools/quic_test_utils.h" - -#pragma GCC diagnostic pop - #include #include "common/event/libevent_scheduler.h" #include "common/http/headers.h" -#include "test/test_common/utility.h" + #include "extensions/quic_listeners/quiche/envoy_quic_alarm_factory.h" -#include "extensions/quic_listeners/quiche/envoy_quic_server_connection.h" #include "extensions/quic_listeners/quiche/envoy_quic_connection_helper.h" +#include "extensions/quic_listeners/quiche/envoy_quic_server_connection.h" +#include "extensions/quic_listeners/quiche/envoy_quic_server_session.h" +#include "extensions/quic_listeners/quiche/envoy_quic_server_stream.h" + +#include "test/extensions/quic_listeners/quiche/test_utils.h" +#include "test/mocks/http/mocks.h" #include "test/mocks/http/stream_decoder.h" #include "test/mocks/network/mocks.h" +#include "test/test_common/utility.h" + #include "gmock/gmock.h" #include "gtest/gtest.h" using testing::_; using testing::Invoke; +using testing::Return; namespace Envoy { namespace Quic { -class MockQuicServerSession : public quic::QuicServerSessionBase { -public: - MockQuicServerSession(const quic::QuicConfig& config, - const quic::ParsedQuicVersionVector& supported_versions, - quic::QuicConnection* connection, quic::QuicSession::Visitor* visitor, - quic::QuicCryptoServerStream::Helper* helper, - const quic::QuicCryptoServerConfig* crypto_config, - quic::QuicCompressedCertsCache* compressed_certs_cache) - : quic::QuicServerSessionBase(config, supported_versions, connection, visitor, helper, - crypto_config, compressed_certs_cache) {} - - MOCK_METHOD1(CreateIncomingStream, quic::QuicSpdyStream*(quic::QuicStreamId id)); - MOCK_METHOD1(CreateIncomingStream, quic::QuicSpdyStream*(quic::PendingStream* pending)); - MOCK_METHOD0(CreateOutgoingBidirectionalStream, quic::QuicSpdyStream*()); - MOCK_METHOD0(CreateOutgoingUnidirectionalStream, quic::QuicSpdyStream*()); - MOCK_METHOD1(ShouldCreateIncomingStream, bool(quic::QuicStreamId id)); - MOCK_METHOD0(ShouldCreateOutgoingBidirectionalStream, bool()); - MOCK_METHOD0(ShouldCreateOutgoingUnidirectionalStream, bool()); - MOCK_METHOD2(CreateQuicCryptoServerStream, - quic::QuicCryptoServerStream*(const quic::QuicCryptoServerConfig*, - quic::QuicCompressedCertsCache*)); -}; - class EnvoyQuicServerStreamTest : public testing::TestWithParam { public: EnvoyQuicServerStreamTest() @@ -70,32 +41,93 @@ class EnvoyQuicServerStreamTest : public testing::TestWithParam { quic::QuicSocketAddress(quic::QuicIpAddress::Any6(), 12345), connection_helper_, alarm_factory_, &writer_, /*owns_writer=*/false, {quic_version_}, listener_config_, listener_stats_), - quic_session_(quic_config_, {quic_version_}, &quic_connection_, /*visitor=*/nullptr, - /*helper=*/nullptr, /*crypto_config=*/nullptr, - /*compressed_certs_cache=*/nullptr), - stream_id_(quic_version_.transport_version == quic::QUIC_VERSION_99 ? 4u : 5u), - quic_stream_(stream_id_, &quic_session_, quic::BIDIRECTIONAL) { - quic::SetVerbosityLogThreshold(3); - - quic_stream_.setDecoder(stream_decoder_); + quic_session_(quic_config_, {quic_version_}, &quic_connection_, *dispatcher_, + quic_config_.GetInitialStreamFlowControlWindowToSend() * 2), + stream_id_(VersionUsesQpack(quic_version_.transport_version) ? 4u : 5u), + quic_stream_(new EnvoyQuicServerStream(stream_id_, &quic_session_, quic::BIDIRECTIONAL)), + response_headers_{{":status", "200"}} { + quic_stream_->setDecoder(stream_decoder_); + quic_stream_->addCallbacks(stream_callbacks_); + quic_session_.ActivateStream(std::unique_ptr(quic_stream_)); + EXPECT_CALL(quic_session_, WritevData(_, _, _, _, _)) + .WillRepeatedly(Invoke([](quic::QuicStream*, quic::QuicStreamId, size_t write_length, + quic::QuicStreamOffset, quic::StreamSendingState) { + return quic::QuicConsumedData{write_length, true}; + })); + EXPECT_CALL(writer_, WritePacket(_, _, _, _, _)) + .WillRepeatedly(Invoke([](const char*, size_t buf_len, const quic::QuicIpAddress&, + const quic::QuicSocketAddress&, quic::PerPacketOptions*) { + return quic::WriteResult{quic::WRITE_STATUS_OK, static_cast(buf_len)}; + })); } void SetUp() override { - headers_.OnHeaderBlockStart(); - headers_.OnHeader(":authority", host_); - headers_.OnHeader(":method", "GET"); - headers_.OnHeader(":path", "/"); - headers_.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, /*compressed_header_bytes=*/0); + quic_session_.Initialize(); + request_headers_.OnHeaderBlockStart(); + request_headers_.OnHeader(":authority", host_); + request_headers_.OnHeader(":method", "POST"); + request_headers_.OnHeader(":path", "/"); + request_headers_.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, + /*compressed_header_bytes=*/0); trailers_.OnHeaderBlockStart(); trailers_.OnHeader("key1", "value1"); - if (quic_version_.transport_version != quic::QUIC_VERSION_99) { + if (!quic::VersionUsesQpack(quic_version_.transport_version)) { // ":final-offset" is required and stripped off by quic. trailers_.OnHeader(":final-offset", absl::StrCat("", request_body_.length())); } trailers_.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, /*compressed_header_bytes=*/0); } + void TearDown() override { + if (quic_connection_.connected()) { + quic_session_.close(Network::ConnectionCloseType::NoFlush); + } + } + + std::string bodyToStreamPayload(const std::string& body) { + std::string data = body; + if (quic::VersionUsesQpack(quic_version_.transport_version)) { + std::unique_ptr data_buffer; + quic::HttpEncoder encoder; + quic::QuicByteCount data_frame_header_length = + encoder.SerializeDataFrameHeader(body.length(), &data_buffer); + quic::QuicStringPiece data_frame_header(data_buffer.get(), data_frame_header_length); + data = absl::StrCat(data_frame_header, body); + } + return data; + } + + size_t sendRequest(const std::string& payload, bool fin, size_t decoder_buffer_high_watermark) { + EXPECT_CALL(stream_decoder_, decodeHeaders_(_, /*end_stream=*/false)) + .WillOnce(Invoke([this](const Http::HeaderMapPtr& headers, bool) { + EXPECT_EQ(host_, headers->Host()->value().getStringView()); + EXPECT_EQ("/", headers->Path()->value().getStringView()); + EXPECT_EQ(Http::Headers::get().MethodValues.Post, + headers->Method()->value().getStringView()); + })); + if (quic::VersionUsesQpack(quic_version_.transport_version)) { + quic_stream_->OnHeadersDecoded(request_headers_); + } else { + quic_stream_->OnStreamHeaderList(/*fin=*/false, request_headers_.uncompressed_header_bytes(), + request_headers_); + } + EXPECT_TRUE(quic_stream_->FinishedReadingHeaders()); + + EXPECT_CALL(stream_decoder_, decodeData(_, _)) + .WillOnce(Invoke([&](Buffer::Instance& buffer, bool finished_reading) { + EXPECT_EQ(payload, buffer.toString()); + EXPECT_EQ(fin, finished_reading); + if (!finished_reading && buffer.length() > decoder_buffer_high_watermark) { + quic_stream_->readDisable(true); + } + })); + std::string data = bodyToStreamPayload(payload); + quic::QuicStreamFrame frame(stream_id_, fin, 0, data); + quic_stream_->OnStreamFrame(frame); + return data.length(); + } + protected: Api::ApiPtr api_; Event::DispatcherPtr dispatcher_; @@ -107,11 +139,13 @@ class EnvoyQuicServerStreamTest : public testing::TestWithParam { testing::NiceMock listener_config_; Server::ListenerStats listener_stats_; EnvoyQuicServerConnection quic_connection_; - MockQuicServerSession quic_session_; + MockEnvoyQuicSession quic_session_; quic::QuicStreamId stream_id_; - EnvoyQuicServerStream quic_stream_; + EnvoyQuicServerStream* quic_stream_; Http::MockStreamDecoder stream_decoder_; - quic::QuicHeaderList headers_; + Http::MockStreamCallbacks stream_callbacks_; + quic::QuicHeaderList request_headers_; + Http::TestHeaderMapImpl response_headers_; quic::QuicHeaderList trailers_; std::string host_{"www.abc.com"}; std::string request_body_{"Hello world"}; @@ -120,74 +154,35 @@ class EnvoyQuicServerStreamTest : public testing::TestWithParam { INSTANTIATE_TEST_SUITE_P(EnvoyQuicServerStreamTests, EnvoyQuicServerStreamTest, testing::ValuesIn({true, false})); -TEST_P(EnvoyQuicServerStreamTest, DecodeHeadersAndBody) { - EXPECT_CALL(stream_decoder_, decodeHeaders_(_, /*end_stream=*/false)) +TEST_P(EnvoyQuicServerStreamTest, GetRequestAndResponse) { + quic::QuicHeaderList request_headers; + request_headers.OnHeaderBlockStart(); + request_headers.OnHeader(":authority", host_); + request_headers.OnHeader(":method", "GET"); + request_headers.OnHeader(":path", "/"); + request_headers.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, + /*compressed_header_bytes=*/0); + + EXPECT_CALL(stream_decoder_, decodeHeaders_(_, /*end_stream=*/true)) .WillOnce(Invoke([this](const Http::HeaderMapPtr& headers, bool) { EXPECT_EQ(host_, headers->Host()->value().getStringView()); EXPECT_EQ("/", headers->Path()->value().getStringView()); EXPECT_EQ(Http::Headers::get().MethodValues.Get, headers->Method()->value().getStringView()); })); - if (quic_version_.transport_version == quic::QUIC_VERSION_99) { - quic_stream_.OnHeadersDecoded(headers_); - } else { - quic_stream_.OnStreamHeaderList(/*fin=*/false, headers_.uncompressed_header_bytes(), headers_); - } - EXPECT_TRUE(quic_stream_.FinishedReadingHeaders()); + quic_stream_->OnStreamHeaderList(/*fin=*/true, request_headers.uncompressed_header_bytes(), + request_headers); + EXPECT_TRUE(quic_stream_->FinishedReadingHeaders()); + quic_stream_->encodeHeaders(response_headers_, /*end_stream=*/true); +} - EXPECT_CALL(stream_decoder_, decodeData(_, _)) - .WillOnce(Invoke([this](Buffer::Instance& buffer, bool finished_reading) { - EXPECT_EQ(request_body_, buffer.toString()); - EXPECT_TRUE(finished_reading); - })); - std::string data = request_body_; - if (quic_version_.transport_version == quic::QUIC_VERSION_99) { - std::unique_ptr data_buffer; - quic::HttpEncoder encoder; - quic::QuicByteCount data_frame_header_length = - encoder.SerializeDataFrameHeader(request_body_.length(), &data_buffer); - quic::QuicStringPiece data_frame_header(data_buffer.get(), data_frame_header_length); - data = absl::StrCat(data_frame_header, request_body_); - } - quic::QuicStreamFrame frame(stream_id_, true, 0, data); - quic_stream_.OnStreamFrame(frame); +TEST_P(EnvoyQuicServerStreamTest, PostRequestAndResponse) { + sendRequest(request_body_, true, request_body_.size() * 2); + quic_stream_->encodeHeaders(response_headers_, /*end_stream=*/true); } TEST_P(EnvoyQuicServerStreamTest, DecodeHeadersBodyAndTrailers) { - EXPECT_CALL(stream_decoder_, decodeHeaders_(_, /*end_stream=*/false)) - .WillOnce(Invoke([this](const Http::HeaderMapPtr& headers, bool) { - EXPECT_EQ(host_, headers->Host()->value().getStringView()); - EXPECT_EQ("/", headers->Path()->value().getStringView()); - EXPECT_EQ(Http::Headers::get().MethodValues.Get, - headers->Method()->value().getStringView()); - })); - quic_stream_.OnStreamHeaderList(/*fin=*/false, headers_.uncompressed_header_bytes(), headers_); - EXPECT_TRUE(quic_stream_.FinishedReadingHeaders()); - - std::string data = request_body_; - if (quic_version_.transport_version == quic::QUIC_VERSION_99) { - std::unique_ptr data_buffer; - quic::HttpEncoder encoder; - quic::QuicByteCount data_frame_header_length = - encoder.SerializeDataFrameHeader(request_body_.length(), &data_buffer); - quic::QuicStringPiece data_frame_header(data_buffer.get(), data_frame_header_length); - data = absl::StrCat(data_frame_header, request_body_); - } - quic::QuicStreamFrame frame(stream_id_, false, 0, data); - EXPECT_CALL(stream_decoder_, decodeData(_, _)) - .Times(testing::AtMost(2)) - .WillOnce(Invoke([this](Buffer::Instance& buffer, bool finished_reading) { - EXPECT_EQ(request_body_, buffer.toString()); - EXPECT_FALSE(finished_reading); - })) - // Depends on QUIC version, there may be an empty STREAM_FRAME with FIN. But - // since there is trailers, finished_reading should always be false. - .WillOnce(Invoke([](Buffer::Instance& buffer, bool finished_reading) { - EXPECT_FALSE(finished_reading); - EXPECT_EQ(0, buffer.length()); - })); - quic_stream_.OnStreamFrame(frame); - + sendRequest(request_body_, false, request_body_.size() * 2); EXPECT_CALL(stream_decoder_, decodeTrailers_(_)) .WillOnce(Invoke([](const Http::HeaderMapPtr& headers) { Http::LowerCaseString key1("key1"); @@ -195,10 +190,12 @@ TEST_P(EnvoyQuicServerStreamTest, DecodeHeadersBodyAndTrailers) { EXPECT_EQ("value1", headers->get(key1)->value().getStringView()); EXPECT_EQ(nullptr, headers->get(key2)); })); - quic_stream_.OnStreamHeaderList(/*fin=*/true, trailers_.uncompressed_header_bytes(), trailers_); + quic_stream_->OnStreamHeaderList(/*fin=*/true, trailers_.uncompressed_header_bytes(), trailers_); + EXPECT_CALL(stream_callbacks_, onResetStream(_, _)); } TEST_P(EnvoyQuicServerStreamTest, OutOfOrderTrailers) { + EXPECT_CALL(stream_callbacks_, onResetStream(_, _)); if (quic::VersionUsesQpack(quic_version_.transport_version)) { return; } @@ -206,36 +203,22 @@ TEST_P(EnvoyQuicServerStreamTest, OutOfOrderTrailers) { .WillOnce(Invoke([this](const Http::HeaderMapPtr& headers, bool) { EXPECT_EQ(host_, headers->Host()->value().getStringView()); EXPECT_EQ("/", headers->Path()->value().getStringView()); - EXPECT_EQ(Http::Headers::get().MethodValues.Get, + EXPECT_EQ(Http::Headers::get().MethodValues.Post, headers->Method()->value().getStringView()); })); - quic_stream_.OnStreamHeaderList(/*fin=*/false, headers_.uncompressed_header_bytes(), headers_); - EXPECT_TRUE(quic_stream_.FinishedReadingHeaders()); + quic_stream_->OnStreamHeaderList(/*fin=*/false, request_headers_.uncompressed_header_bytes(), + request_headers_); + EXPECT_TRUE(quic_stream_->FinishedReadingHeaders()); // Trailer should be delivered to HCM later after body arrives. - quic_stream_.OnStreamHeaderList(/*fin=*/true, trailers_.uncompressed_header_bytes(), trailers_); - - std::string data = request_body_; - if (quic_version_.transport_version == quic::QUIC_VERSION_99) { - std::unique_ptr data_buffer; - quic::HttpEncoder encoder; - quic::QuicByteCount data_frame_header_length = - encoder.SerializeDataFrameHeader(request_body_.length(), &data_buffer); - quic::QuicStringPiece data_frame_header(data_buffer.get(), data_frame_header_length); - data = absl::StrCat(data_frame_header, request_body_); - } + quic_stream_->OnStreamHeaderList(/*fin=*/true, trailers_.uncompressed_header_bytes(), trailers_); + + std::string data = bodyToStreamPayload(request_body_); quic::QuicStreamFrame frame(stream_id_, false, 0, data); EXPECT_CALL(stream_decoder_, decodeData(_, _)) - .Times(testing::AtMost(2)) .WillOnce(Invoke([this](Buffer::Instance& buffer, bool finished_reading) { EXPECT_EQ(request_body_, buffer.toString()); EXPECT_FALSE(finished_reading); - })) - // Depends on QUIC version, there may be an empty STREAM_FRAME with FIN. But - // since there is trailers, finished_reading should always be false. - .WillOnce(Invoke([](Buffer::Instance& buffer, bool finished_reading) { - EXPECT_FALSE(finished_reading); - EXPECT_EQ(0, buffer.length()); })); EXPECT_CALL(stream_decoder_, decodeTrailers_(_)) @@ -245,7 +228,142 @@ TEST_P(EnvoyQuicServerStreamTest, OutOfOrderTrailers) { EXPECT_EQ("value1", headers->get(key1)->value().getStringView()); EXPECT_EQ(nullptr, headers->get(key2)); })); - quic_stream_.OnStreamFrame(frame); + quic_stream_->OnStreamFrame(frame); +} + +TEST_P(EnvoyQuicServerStreamTest, ReadDisableUponLargePost) { + std::string large_request(1024, 'a'); + // Sending such large request will cause read to be disabled. + size_t payload_offset = sendRequest(large_request, false, 512); + EXPECT_FALSE(quic_stream_->HasBytesToRead()); + // Disable reading one more time. + quic_stream_->readDisable(true); + std::string second_part_request = bodyToStreamPayload("bbb"); + // Receiving more data shouldn't push the receiving pipe line as the stream + // should have been marked blocked. + quic::QuicStreamFrame frame(stream_id_, false, payload_offset, second_part_request); + EXPECT_CALL(stream_decoder_, decodeData(_, _)).Times(0); + quic_stream_->OnStreamFrame(frame); + + // Re-enable reading just once shouldn't unblock stream. + quic_stream_->readDisable(false); + + // This data frame should also be buffered. + std::string last_part_request = bodyToStreamPayload("ccc"); + quic::QuicStreamFrame frame2(stream_id_, true, payload_offset + second_part_request.length(), + last_part_request); + quic_stream_->OnStreamFrame(frame2); + + // Unblock stream now. The remaining data in the receiving buffer should be + // pushed to upstream. + EXPECT_CALL(stream_decoder_, decodeData(_, _)) + .WillOnce(Invoke([](Buffer::Instance& buffer, bool finished_reading) { + std::string rest_request = "bbbccc"; + EXPECT_EQ(rest_request.size(), buffer.length()); + EXPECT_EQ(rest_request, buffer.toString()); + EXPECT_TRUE(finished_reading); + })); + quic_stream_->readDisable(false); + EXPECT_CALL(stream_callbacks_, onResetStream(_, _)); +} + +// Tests that ReadDisable() doesn't cause re-entry of OnBodyAvailable(). +TEST_P(EnvoyQuicServerStreamTest, ReadDisableAndReEnableImmediately) { + EXPECT_CALL(stream_decoder_, decodeHeaders_(_, /*end_stream=*/false)) + .WillOnce(Invoke([this](const Http::HeaderMapPtr& headers, bool) { + EXPECT_EQ(host_, headers->Host()->value().getStringView()); + EXPECT_EQ("/", headers->Path()->value().getStringView()); + EXPECT_EQ(Http::Headers::get().MethodValues.Post, + headers->Method()->value().getStringView()); + })); + if (quic::VersionUsesQpack(quic_version_.transport_version)) { + quic_stream_->OnHeadersDecoded(request_headers_); + } else { + quic_stream_->OnStreamHeaderList(/*fin=*/false, request_headers_.uncompressed_header_bytes(), + request_headers_); + } + EXPECT_TRUE(quic_stream_->FinishedReadingHeaders()); + + std::string payload(1024, 'a'); + EXPECT_CALL(stream_decoder_, decodeData(_, _)) + .WillOnce(Invoke([&](Buffer::Instance& buffer, bool finished_reading) { + EXPECT_EQ(payload, buffer.toString()); + EXPECT_FALSE(finished_reading); + quic_stream_->readDisable(true); + // Re-enable reading should not trigger another decodeData. + quic_stream_->readDisable(false); + })); + std::string data = bodyToStreamPayload(payload); + quic::QuicStreamFrame frame(stream_id_, false, 0, data); + quic_stream_->OnStreamFrame(frame); + + std::string last_part_request = bodyToStreamPayload("bbb"); + quic::QuicStreamFrame frame2(stream_id_, true, data.length(), last_part_request); + EXPECT_CALL(stream_decoder_, decodeData(_, _)) + .WillOnce(Invoke([&](Buffer::Instance& buffer, bool finished_reading) { + EXPECT_EQ("bbb", buffer.toString()); + EXPECT_TRUE(finished_reading); + })); + + quic_stream_->OnStreamFrame(frame2); + EXPECT_CALL(stream_callbacks_, onResetStream(_, _)); +} + +// Tests that the stream with a send buffer whose high limit is 16k and low +// limit is 8k sends over 32kB response. +TEST_P(EnvoyQuicServerStreamTest, WatermarkSendBuffer) { + sendRequest(request_body_, true, request_body_.size() * 2); + + // 32KB + 2 byte. The initial stream flow control window is 16k. + response_headers_.addCopy(":content-length", "32770"); + quic_stream_->encodeHeaders(response_headers_, /*end_stream=*/false); + // encode 32kB response body. first 16KB should be written out right away. The + // rest should be buffered. The high watermark is 16KB, so this call should + // make the send buffer reach its high watermark. + std::string response(32 * 1024 + 1, 'a'); + Buffer::OwnedImpl buffer(response); + EXPECT_CALL(stream_callbacks_, onAboveWriteBufferHighWatermark()); + quic_stream_->encodeData(buffer, false); + + EXPECT_EQ(0u, buffer.length()); + EXPECT_TRUE(quic_stream_->flow_controller()->IsBlocked()); + // Bump connection flow control window large enough not to cause connection + // level flow control blocked. + quic::QuicWindowUpdateFrame window_update( + quic::kInvalidControlFrameId, + quic::QuicUtils::GetInvalidStreamId(quic_version_.transport_version), 1024 * 1024); + quic_session_.OnWindowUpdateFrame(window_update); + + // Receive a WINDOW_UPDATE frame not large enough to drain half of the send + // buffer. + quic::QuicWindowUpdateFrame window_update1(quic::kInvalidControlFrameId, quic_stream_->id(), + 16 * 1024 + 8 * 1024); + quic_stream_->OnWindowUpdateFrame(window_update1); + EXPECT_FALSE(quic_stream_->flow_controller()->IsBlocked()); + quic_session_.OnCanWrite(); + EXPECT_TRUE(quic_stream_->flow_controller()->IsBlocked()); + + // Receive another WINDOW_UPDATE frame to drain the send buffer till below low + // watermark. + quic::QuicWindowUpdateFrame window_update2(quic::kInvalidControlFrameId, quic_stream_->id(), + 16 * 1024 + 8 * 1024 + 1024); + quic_stream_->OnWindowUpdateFrame(window_update2); + EXPECT_FALSE(quic_stream_->flow_controller()->IsBlocked()); + EXPECT_CALL(stream_callbacks_, onBelowWriteBufferLowWatermark()).WillOnce(Invoke([this]() { + std::string rest_response(1, 'a'); + Buffer::OwnedImpl buffer(rest_response); + quic_stream_->encodeData(buffer, true); + })); + quic_session_.OnCanWrite(); + EXPECT_TRUE(quic_stream_->flow_controller()->IsBlocked()); + + quic::QuicWindowUpdateFrame window_update3(quic::kInvalidControlFrameId, quic_stream_->id(), + 32 * 1024 + 1024); + quic_stream_->OnWindowUpdateFrame(window_update3); + quic_session_.OnCanWrite(); + + EXPECT_TRUE(quic_stream_->local_end_stream_); + EXPECT_TRUE(quic_stream_->write_side_closed()); } } // namespace Quic diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer_test.cc new file mode 100644 index 0000000000000..2d78dd7bf65f4 --- /dev/null +++ b/test/extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer_test.cc @@ -0,0 +1,81 @@ +#include "extensions/quic_listeners/quiche/envoy_quic_simulated_watermark_buffer.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Quic { + +class EnvoyQuicSimulatedWatermarkBufferTest : public ::testing::Test, + protected Logger::Loggable { +public: + EnvoyQuicSimulatedWatermarkBufferTest() + : simulated_watermark_buffer_( + low_watermark_, high_watermark_, [this]() { onBelowLowWatermark(); }, + [this]() { onAboveHighWatermark(); }, ENVOY_LOGGER()) {} + + void onAboveHighWatermark() { ++above_high_watermark_; } + + void onBelowLowWatermark() { ++below_low_watermark_; } + +protected: + size_t above_high_watermark_{0}; + size_t below_low_watermark_{0}; + uint32_t high_watermark_{100}; + uint32_t low_watermark_{60}; + EnvoyQuicSimulatedWatermarkBuffer simulated_watermark_buffer_; +}; + +TEST_F(EnvoyQuicSimulatedWatermarkBufferTest, InitialState) { + EXPECT_TRUE(simulated_watermark_buffer_.isBelowLowWatermark()); + EXPECT_FALSE(simulated_watermark_buffer_.isAboveHighWatermark()); + EXPECT_EQ(high_watermark_, simulated_watermark_buffer_.highWatermark()); +} + +TEST_F(EnvoyQuicSimulatedWatermarkBufferTest, GoAboveHighWatermarkAndComeDown) { + simulated_watermark_buffer_.checkHighWatermark(low_watermark_ + 1); + EXPECT_EQ(0U, above_high_watermark_); + // Even though the buffered data is above low watermark, the buffer is still regarded + // as below watermark because it didn't reach high watermark. + EXPECT_TRUE(simulated_watermark_buffer_.isBelowLowWatermark()); + simulated_watermark_buffer_.checkLowWatermark(low_watermark_ - 1); + // Going down below low watermark shouldn't trigger callback as it never + // reached high watermark. + EXPECT_EQ(0U, below_low_watermark_); + + simulated_watermark_buffer_.checkHighWatermark(high_watermark_ + 1); + EXPECT_EQ(1U, above_high_watermark_); + EXPECT_TRUE(simulated_watermark_buffer_.isAboveHighWatermark()); + EXPECT_FALSE(simulated_watermark_buffer_.isBelowLowWatermark()); + + simulated_watermark_buffer_.checkHighWatermark(high_watermark_ + 10); + EXPECT_EQ(1U, above_high_watermark_); + + simulated_watermark_buffer_.checkLowWatermark(low_watermark_); + EXPECT_EQ(0U, below_low_watermark_); + + simulated_watermark_buffer_.checkHighWatermark(high_watermark_ + 10); + // Crossing high watermark continuously shouldn't trigger callback. + EXPECT_EQ(1U, above_high_watermark_); + + // Crossing low watermark after coming down from high watermark should trigger + // callback and change status. + simulated_watermark_buffer_.checkLowWatermark(low_watermark_ - 1); + EXPECT_EQ(1U, below_low_watermark_); + EXPECT_TRUE(simulated_watermark_buffer_.isBelowLowWatermark()); + EXPECT_FALSE(simulated_watermark_buffer_.isAboveHighWatermark()); +} + +TEST_F(EnvoyQuicSimulatedWatermarkBufferTest, NoWatermarkSpecified) { + EnvoyQuicSimulatedWatermarkBuffer buffer( + 0, 0, [this]() { onBelowLowWatermark(); }, [this]() { onAboveHighWatermark(); }, + ENVOY_LOGGER()); + buffer.checkHighWatermark(10); + EXPECT_EQ(0U, above_high_watermark_); + + simulated_watermark_buffer_.checkLowWatermark(0); + EXPECT_EQ(0U, below_low_watermark_); + EXPECT_TRUE(simulated_watermark_buffer_.isBelowLowWatermark()); +} + +} // namespace Quic +} // namespace Envoy diff --git a/test/extensions/quic_listeners/quiche/test_utils.h b/test/extensions/quic_listeners/quiche/test_utils.h new file mode 100644 index 0000000000000..6efb3678a5dcf --- /dev/null +++ b/test/extensions/quic_listeners/quiche/test_utils.h @@ -0,0 +1,57 @@ +#include "extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h" + +#pragma GCC diagnostic push +// QUICHE allows unused parameters. +#pragma GCC diagnostic ignored "-Wunused-parameter" +// QUICHE uses offsetof(). +#pragma GCC diagnostic ignored "-Winvalid-offsetof" + +#include "quiche/quic/core/http/quic_spdy_session.h" +#include "quiche/quic/test_tools/quic_test_utils.h" +#include "quiche/quic/core/quic_utils.h" + +#pragma GCC diagnostic pop + +namespace Envoy { +namespace Quic { + +class MockEnvoyQuicSession : public quic::QuicSpdySession, public QuicFilterManagerConnectionImpl { +public: + MockEnvoyQuicSession(const quic::QuicConfig& config, + const quic::ParsedQuicVersionVector& supported_versions, + EnvoyQuicConnection* connection, Event::Dispatcher& dispatcher, + uint32_t send_buffer_limit) + : quic::QuicSpdySession(connection, /*visitor=*/nullptr, config, supported_versions), + QuicFilterManagerConnectionImpl(connection, dispatcher, send_buffer_limit) { + crypto_stream_ = std::make_unique(this); + } + + // From QuicSession. + MOCK_METHOD1(CreateIncomingStream, quic::QuicSpdyStream*(quic::QuicStreamId id)); + MOCK_METHOD1(CreateIncomingStream, quic::QuicSpdyStream*(quic::PendingStream* pending)); + MOCK_METHOD0(CreateOutgoingBidirectionalStream, quic::QuicSpdyStream*()); + MOCK_METHOD0(CreateOutgoingUnidirectionalStream, quic::QuicSpdyStream*()); + MOCK_METHOD1(ShouldCreateIncomingStream, bool(quic::QuicStreamId id)); + MOCK_METHOD0(ShouldCreateOutgoingBidirectionalStream, bool()); + MOCK_METHOD0(ShouldCreateOutgoingUnidirectionalStream, bool()); + MOCK_METHOD5(WritevData, + quic::QuicConsumedData(quic::QuicStream* stream, quic::QuicStreamId id, + size_t write_length, quic::QuicStreamOffset offset, + quic::StreamSendingState state)); + + absl::string_view requestedServerName() const override { + return {GetCryptoStream()->crypto_negotiated_params().sni}; + } + + quic::QuicCryptoStream* GetMutableCryptoStream() override { return crypto_stream_.get(); } + + const quic::QuicCryptoStream* GetCryptoStream() const override { return crypto_stream_.get(); } + + using quic::QuicSpdySession::ActivateStream; + +private: + std::unique_ptr crypto_stream_; +}; + +} // namespace Quic +} // namespace Envoy