diff --git a/bazel/external/quiche.BUILD b/bazel/external/quiche.BUILD index 9f5c7694fe8d9..92ad8293594de 100644 --- a/bazel/external/quiche.BUILD +++ b/bazel/external/quiche.BUILD @@ -2006,6 +2006,7 @@ envoy_cc_library( copts = quiche_copts, repository = "@envoy", tags = ["nofips"], + visibility = ["//visibility:public"], deps = [ ":quic_core_alarm_interface_lib", ":quic_core_crypto_encryption_lib", diff --git a/source/common/http/BUILD b/source/common/http/BUILD index d317963b33c1c..effd5cfa0d83a 100644 --- a/source/common/http/BUILD +++ b/source/common/http/BUILD @@ -48,8 +48,11 @@ envoy_cc_library( "//source/common/common:enum_to_int", "//source/common/common:linked_object", "//source/common/common:minimal_logger_lib", + "//source/common/config:utility_lib", "//source/common/http/http1:codec_lib", "//source/common/http/http2:codec_lib", + "//source/common/http/http3:quic_codec_factory_lib", + "//source/common/http/http3:well_known_names", "//source/common/network:filter_lib", ], ) @@ -180,8 +183,11 @@ envoy_cc_library( "//source/common/common:regex_lib", "//source/common/common:scope_tracker", "//source/common/common:utility_lib", + "//source/common/config:utility_lib", "//source/common/http/http1:codec_lib", "//source/common/http/http2:codec_lib", + "//source/common/http/http3:quic_codec_factory_lib", + "//source/common/http/http3:well_known_names", "//source/common/network:utility_lib", "//source/common/router:config_lib", "//source/common/runtime:uuid_util_lib", diff --git a/source/common/http/codec_client.cc b/source/common/http/codec_client.cc index c6c3c42f3db32..673212d3e69b7 100644 --- a/source/common/http/codec_client.cc +++ b/source/common/http/codec_client.cc @@ -6,9 +6,12 @@ #include "envoy/http/codec.h" #include "common/common/enum_to_int.h" +#include "common/config/utility.h" #include "common/http/exception.h" #include "common/http/http1/codec_impl.h" #include "common/http/http2/codec_impl.h" +#include "common/http/http3/quic_codec_factory.h" +#include "common/http/http3/well_known_names.h" #include "common/http/utility.h" namespace Envoy { @@ -158,8 +161,10 @@ CodecClientProd::CodecClientProd(Type type, Network::ClientConnectionPtr&& conne break; } case Type::HTTP3: { - // TODO(danzh) Add QUIC codec; - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + codec_ = std::unique_ptr( + Config::Utility::getAndCheckFactory( + Http::QuicCodecNames::get().Quiche) + .createQuicClientConnection(*connection_, *this)); } } } diff --git a/source/common/http/http3/BUILD b/source/common/http/http3/BUILD new file mode 100644 index 0000000000000..086a4f4b902d2 --- /dev/null +++ b/source/common/http/http3/BUILD @@ -0,0 +1,21 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_library( + name = "quic_codec_factory_lib", + hdrs = ["quic_codec_factory.h"], + deps = ["//include/envoy/http:codec_interface"], +) + +envoy_cc_library( + name = "well_known_names", + hdrs = ["well_known_names.h"], + deps = ["//source/common/singleton:const_singleton"], +) diff --git a/source/common/http/http3/quic_codec_factory.h b/source/common/http/http3/quic_codec_factory.h new file mode 100644 index 0000000000000..21ca7cab87047 --- /dev/null +++ b/source/common/http/http3/quic_codec_factory.h @@ -0,0 +1,38 @@ +#pragma once + +#include + +#include "envoy/http/codec.h" +#include "envoy/network/connection.h" + +namespace Envoy { +namespace Http { + +// A factory to create Http::ServerConnection instance for QUIC. +class QuicHttpServerConnectionFactory { +public: + virtual ~QuicHttpServerConnectionFactory() {} + + virtual std::string name() const PURE; + + virtual std::unique_ptr + createQuicServerConnection(Network::Connection& connection, ConnectionCallbacks& callbacks) PURE; + + static std::string category() { return "quic_client_codec"; } +}; + +// A factory to create Http::ClientConnection instance for QUIC. +class QuicHttpClientConnectionFactory { +public: + virtual ~QuicHttpClientConnectionFactory() {} + + virtual std::string name() const PURE; + + virtual std::unique_ptr + createQuicClientConnection(Network::Connection& connection, ConnectionCallbacks& callbacks) PURE; + + static std::string category() { return "quic_server_codec"; } +}; + +} // namespace Http +} // namespace Envoy diff --git a/source/common/http/http3/well_known_names.h b/source/common/http/http3/well_known_names.h new file mode 100644 index 0000000000000..aace82a76d55f --- /dev/null +++ b/source/common/http/http3/well_known_names.h @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include "common/singleton/const_singleton.h" + +namespace Envoy { +namespace Http { + +class QuicCodecNameValues { +public: + // QUICHE is the only QUIC implementation for now. + const std::string Quiche = "quiche"; +}; + +using QuicCodecNames = ConstSingleton; + +} // namespace Http +} // namespace Envoy diff --git a/source/extensions/filters/network/http_connection_manager/config.cc b/source/extensions/filters/network/http_connection_manager/config.cc index e491d36be307d..f7eed2e932aaf 100644 --- a/source/extensions/filters/network/http_connection_manager/config.cc +++ b/source/extensions/filters/network/http_connection_manager/config.cc @@ -18,6 +18,8 @@ #include "common/http/default_server_string.h" #include "common/http/http1/codec_impl.h" #include "common/http/http2/codec_impl.h" +#include "common/http/http3/quic_codec_factory.h" +#include "common/http/http3/well_known_names.h" #include "common/http/utility.h" #include "common/protobuf/utility.h" #include "common/router/rds_impl.h" @@ -415,8 +417,14 @@ HttpConnectionManagerConfig::createCodec(Network::Connection& connection, connection, callbacks, context_.scope(), http2_settings_, maxRequestHeadersKb(), maxRequestHeadersCount()); case CodecType::HTTP3: - // TODO(danzh) create QUIC specific codec. - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + // Hard code Quiche factory name here to instantiate a QUIC codec implemented. + // TODO(danzh) Add support to get the factory name from config, possibly + // from HttpConnectionManager protobuf. This is not essential till there are multiple + // implementations of QUIC. + return std::unique_ptr( + Config::Utility::getAndCheckFactory( + Http::QuicCodecNames::get().Quiche) + .createQuicServerConnection(connection, callbacks)); case CodecType::AUTO: return Http::ConnectionManagerUtility::autoCreateCodec( connection, data, callbacks, context_.scope(), http1_settings_, http2_settings_, diff --git a/source/extensions/quic_listeners/quiche/BUILD b/source/extensions/quic_listeners/quiche/BUILD index 27957ad472b9d..ae124d4c3cf76 100644 --- a/source/extensions/quic_listeners/quiche/BUILD +++ b/source/extensions/quic_listeners/quiche/BUILD @@ -111,8 +111,12 @@ envoy_cc_library( hdrs = ["codec_impl.h"], tags = ["nofips"], deps = [ + ":envoy_quic_client_session_lib", ":envoy_quic_server_session_lib", "//include/envoy/http:codec_interface", + "//include/envoy/registry", + "//source/common/http/http3:quic_codec_factory_lib", + "//source/common/http/http3:well_known_names", "@com_googlesource_quiche//:quic_core_http_spdy_session_lib", ], ) @@ -159,6 +163,30 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "envoy_quic_client_session_lib", + srcs = [ + "envoy_quic_client_session.cc", + "envoy_quic_client_stream.cc", + ], + hdrs = [ + "envoy_quic_client_session.h", + "envoy_quic_client_stream.h", + ], + tags = ["nofips"], + deps = [ + ":envoy_quic_client_connection_lib", + ":envoy_quic_stream_lib", + ":envoy_quic_utils_lib", + ":quic_filter_manager_connection_lib", + "//source/common/buffer:buffer_lib", + "//source/common/common:assert_lib", + "//source/common/http:header_map_lib", + "//source/extensions/quic_listeners/quiche/platform:quic_platform_mem_slice_storage_impl_lib", + "@com_googlesource_quiche//:quic_core_http_client_lib", + ], +) + envoy_cc_library( name = "quic_io_handle_wrapper_lib", hdrs = ["quic_io_handle_wrapper.h"], @@ -194,6 +222,19 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "envoy_quic_client_connection_lib", + srcs = ["envoy_quic_client_connection.cc"], + hdrs = ["envoy_quic_client_connection.h"], + tags = ["nofips"], + deps = [ + ":envoy_quic_connection_lib", + ":envoy_quic_packet_writer_lib", + "//include/envoy/event:dispatcher_interface", + "//source/common/network:socket_option_factory_lib", + ], +) + envoy_cc_library( name = "envoy_quic_dispatcher_lib", srcs = ["envoy_quic_dispatcher.cc"], @@ -258,6 +299,8 @@ envoy_cc_library( "//include/envoy/http:codec_interface", "//source/common/http:header_map_lib", "//source/common/network:address_lib", + "//source/common/network:listen_socket_lib", + "//source/common/network:socket_option_factory_lib", "@com_googlesource_quiche//:quic_core_http_header_list_lib", ], ) diff --git a/source/extensions/quic_listeners/quiche/active_quic_listener.cc b/source/extensions/quic_listeners/quiche/active_quic_listener.cc index 8a3c5a3803d1e..68c519841a768 100644 --- a/source/extensions/quic_listeners/quiche/active_quic_listener.cc +++ b/source/extensions/quic_listeners/quiche/active_quic_listener.cc @@ -34,6 +34,8 @@ ActiveQuicListener::ActiveQuicListener(Event::Dispatcher& dispatcher, quic::QuicRandom::GetInstance(), std::make_unique(), quic::KeyExchangeSource::Default()); auto connection_helper = std::make_unique(dispatcher_); + crypto_config_->AddDefaultConfig(random, connection_helper->GetClock(), + quic::QuicCryptoServerConfig::ConfigOptions()); auto alarm_factory = std::make_unique(dispatcher_, *connection_helper->GetClock()); quic_dispatcher_ = std::make_unique( @@ -43,6 +45,8 @@ ActiveQuicListener::ActiveQuicListener(Event::Dispatcher& dispatcher, quic_dispatcher_->InitializeWithWriter(new EnvoyQuicPacketWriter(listen_socket_)); } +ActiveQuicListener::~ActiveQuicListener() { onListenerShutdown(); } + void ActiveQuicListener::onListenerShutdown() { ENVOY_LOG(info, "Quic listener {} shutdown.", config_.name()); quic_dispatcher_->Shutdown(); @@ -55,7 +59,7 @@ void ActiveQuicListener::onData(Network::UdpRecvData& data) { envoyAddressInstanceToQuicSocketAddress(data.addresses_.local_)); quic::QuicTime timestamp = quic::QuicTime::Zero() + - quic::QuicTime::Delta::FromMilliseconds(std::chrono::duration_cast( + quic::QuicTime::Delta::FromMicroseconds(std::chrono::duration_cast( data.receive_time_.time_since_epoch()) .count()); uint64_t num_slice = data.buffer_->getRawSlices(nullptr, 0); @@ -64,7 +68,7 @@ void ActiveQuicListener::onData(Network::UdpRecvData& data) { data.buffer_->getRawSlices(&slice, 1); // TODO(danzh): pass in TTL and UDP header. quic::QuicReceivedPacket packet(reinterpret_cast(slice.mem_), slice.len_, timestamp, - /*owns_buffer=*/false, /*ttl=*/0, /*ttl_valid=*/true, + /*owns_buffer=*/false, /*ttl=*/0, /*ttl_valid=*/false, /*packet_headers=*/nullptr, /*headers_length=*/0, /*owns_header_buffer*/ false); quic_dispatcher_->ProcessPacket(self_address, peer_address, packet); diff --git a/source/extensions/quic_listeners/quiche/active_quic_listener.h b/source/extensions/quic_listeners/quiche/active_quic_listener.h index c4e52ae7d319e..9c5c390712fb2 100644 --- a/source/extensions/quic_listeners/quiche/active_quic_listener.h +++ b/source/extensions/quic_listeners/quiche/active_quic_listener.h @@ -26,6 +26,8 @@ class ActiveQuicListener : public Network::UdpListenerCallbacks, Network::SocketSharedPtr listen_socket, Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config); + ~ActiveQuicListener() override; + // TODO(#7465): Make this a callback. void onListenerShutdown(); diff --git a/source/extensions/quic_listeners/quiche/codec_impl.cc b/source/extensions/quic_listeners/quiche/codec_impl.cc index cd082427b0ffe..4af18bd949b3f 100644 --- a/source/extensions/quic_listeners/quiche/codec_impl.cc +++ b/source/extensions/quic_listeners/quiche/codec_impl.cc @@ -1,12 +1,39 @@ #include "extensions/quic_listeners/quiche/codec_impl.h" +#include "extensions/quic_listeners/quiche/envoy_quic_client_stream.h" #include "extensions/quic_listeners/quiche/envoy_quic_server_stream.h" namespace Envoy { namespace Quic { +// Converts a QuicStream instance to EnvoyQuicStream instance. The current stream implementation +// inherits from these two interfaces, with the former one providing Quic interface and the latter +// providing Envoy interface. +EnvoyQuicStream* quicStreamToEnvoyStream(quic::QuicStream* stream) { + return dynamic_cast(stream); +} + bool QuicHttpConnectionImplBase::wantsToWrite() { return quic_session_.HasDataToWrite(); } +void QuicHttpConnectionImplBase::runWatermarkCallbacksForEachStream( + quic::QuicSmallMap, 10>& stream_map, + bool high_watermark) { + for (auto& it : stream_map) { + if (!it.second->is_static()) { + // Only call watermark callbacks on non QUIC static streams which are + // crypto stream and Google QUIC headers stream. + auto stream = quicStreamToEnvoyStream(it.second.get()); + if (high_watermark) { + ENVOY_LOG(debug, "runHighWatermarkCallbacks on stream {}", it.first); + stream->runHighWatermarkCallbacks(); + } else { + ENVOY_LOG(debug, "runLowWatermarkCallbacks on stream {}", it.first); + stream->runLowWatermarkCallbacks(); + } + } + } +} + QuicHttpServerConnectionImpl::QuicHttpServerConnectionImpl( EnvoyQuicServerSession& quic_session, Http::ServerConnectionCallbacks& callbacks) : QuicHttpConnectionImplBase(quic_session), quic_server_session_(quic_session) { @@ -14,28 +41,63 @@ QuicHttpServerConnectionImpl::QuicHttpServerConnectionImpl( } void QuicHttpServerConnectionImpl::onUnderlyingConnectionAboveWriteBufferHighWatermark() { - for (auto& it : quic_server_session_.stream_map()) { - if (!it.second->is_static()) { - // Only call watermark callbacks on non QUIC static streams which are - // crypto stream and Google QUIC headers stream. - ENVOY_LOG(debug, "runHighWatermarkCallbacks on stream {}", it.first); - dynamic_cast(it.second.get())->runHighWatermarkCallbacks(); - } - } + runWatermarkCallbacksForEachStream(quic_server_session_.stream_map(), true); } void QuicHttpServerConnectionImpl::onUnderlyingConnectionBelowWriteBufferLowWatermark() { - for (const auto& it : quic_server_session_.stream_map()) { - if (!it.second->is_static()) { - ENVOY_LOG(debug, "runLowWatermarkCallbacks on stream {}", it.first); - dynamic_cast(it.second.get())->runLowWatermarkCallbacks(); - } - } + runWatermarkCallbacksForEachStream(quic_server_session_.stream_map(), false); } void QuicHttpServerConnectionImpl::goAway() { quic_server_session_.SendGoAway(quic::QUIC_PEER_GOING_AWAY, "server shutdown imminent"); } +QuicHttpClientConnectionImpl::QuicHttpClientConnectionImpl(EnvoyQuicClientSession& session, + Http::ConnectionCallbacks& callbacks) + : QuicHttpConnectionImplBase(session), quic_client_session_(session) { + session.setHttpConnectionCallbacks(callbacks); +} + +Http::StreamEncoder& +QuicHttpClientConnectionImpl::newStream(Http::StreamDecoder& response_decoder) { + EnvoyQuicStream* stream = + quicStreamToEnvoyStream(quic_client_session_.CreateOutgoingBidirectionalStream()); + // TODO(danzh) handle stream creation failure gracefully. This can happen when + // there are already 100 open streams. In such case, caller should hold back + // the stream creation till an existing stream is closed. + ASSERT(stream != nullptr, "Fail to create QUIC stream."); + stream->setDecoder(response_decoder); + if (quic_client_session_.aboveHighWatermark()) { + stream->runHighWatermarkCallbacks(); + } + return *stream; +} + +void QuicHttpClientConnectionImpl::onUnderlyingConnectionAboveWriteBufferHighWatermark() { + runWatermarkCallbacksForEachStream(quic_client_session_.stream_map(), true); +} + +void QuicHttpClientConnectionImpl::onUnderlyingConnectionBelowWriteBufferLowWatermark() { + runWatermarkCallbacksForEachStream(quic_client_session_.stream_map(), false); +} + +std::unique_ptr +QuicHttpClientConnectionFactoryImpl::createQuicClientConnection( + Network::Connection& connection, Http::ConnectionCallbacks& callbacks) { + return std::make_unique( + dynamic_cast(connection), callbacks); +} + +std::unique_ptr +QuicHttpServerConnectionFactoryImpl::createQuicServerConnection( + Network::Connection& connection, Http::ConnectionCallbacks& callbacks) { + return std::make_unique( + dynamic_cast(connection), + dynamic_cast(callbacks)); +} + +REGISTER_FACTORY(QuicHttpClientConnectionFactoryImpl, Http::QuicHttpClientConnectionFactory); +REGISTER_FACTORY(QuicHttpServerConnectionFactoryImpl, Http::QuicHttpServerConnectionFactory); + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/codec_impl.h b/source/extensions/quic_listeners/quiche/codec_impl.h index 07b2f2042ac14..9394ec11e1985 100644 --- a/source/extensions/quic_listeners/quiche/codec_impl.h +++ b/source/extensions/quic_listeners/quiche/codec_impl.h @@ -1,8 +1,12 @@ #include "envoy/http/codec.h" +#include "envoy/registry/registry.h" #include "common/common/assert.h" #include "common/common/logger.h" +#include "common/http/http3/quic_codec_factory.h" +#include "common/http/http3/well_known_names.h" +#include "extensions/quic_listeners/quiche/envoy_quic_client_session.h" #include "extensions/quic_listeners/quiche/envoy_quic_server_session.h" namespace Envoy { @@ -21,17 +25,15 @@ class QuicHttpConnectionImplBase : public virtual Http::Connection, // Bypassed. QUIC connection already hands all data to streams. NOT_REACHED_GCOVR_EXCL_LINE; } - Http::Protocol protocol() override { - // From HCM's view, QUIC should behave the same as Http2, only the stats - // should be different. - // TODO(danzh) add Http3 enum value for QUIC. - return Http::Protocol::Http2; - } - + Http::Protocol protocol() override { return Http::Protocol::Http3; } // Returns true if the session has data to send but queued in connection or // stream send buffer. bool wantsToWrite() override; + void runWatermarkCallbacksForEachStream( + quic::QuicSmallMap, 10>& stream_map, + bool high_watermark); + protected: quic::QuicSpdySession& quic_session_; }; @@ -55,5 +57,47 @@ class QuicHttpServerConnectionImpl : public QuicHttpConnectionImplBase, EnvoyQuicServerSession& quic_server_session_; }; +class QuicHttpClientConnectionImpl : public QuicHttpConnectionImplBase, + public Http::ClientConnection { +public: + QuicHttpClientConnectionImpl(EnvoyQuicClientSession& session, + Http::ConnectionCallbacks& callbacks); + + // Http::ClientConnection + Http::StreamEncoder& newStream(Http::StreamDecoder& response_decoder) override; + + // Http::Connection + void goAway() override { NOT_REACHED_GCOVR_EXCL_LINE; } + void shutdownNotice() override { NOT_REACHED_GCOVR_EXCL_LINE; } + void onUnderlyingConnectionAboveWriteBufferHighWatermark() override; + void onUnderlyingConnectionBelowWriteBufferLowWatermark() override; + +private: + EnvoyQuicClientSession& quic_client_session_; +}; + +// A factory to create QuicHttpClientConnection. +class QuicHttpClientConnectionFactoryImpl : public Http::QuicHttpClientConnectionFactory { +public: + std::unique_ptr + createQuicClientConnection(Network::Connection& connection, + Http::ConnectionCallbacks& callbacks) override; + + std::string name() const override { return Http::QuicCodecNames::get().Quiche; } +}; + +// A factory to create QuicHttpServerConnection. +class QuicHttpServerConnectionFactoryImpl : public Http::QuicHttpServerConnectionFactory { +public: + std::unique_ptr + createQuicServerConnection(Network::Connection& connection, + Http::ConnectionCallbacks& callbacks) override; + + std::string name() const override { return Http::QuicCodecNames::get().Quiche; } +}; + +DECLARE_FACTORY(QuicHttpClientConnectionFactoryImpl); +DECLARE_FACTORY(QuicHttpServerConnectionFactoryImpl); + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_alarm.h b/source/extensions/quic_listeners/quiche/envoy_quic_alarm.h index 4152f4c101c3f..115e68d1f882e 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_alarm.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_alarm.h @@ -20,7 +20,8 @@ class EnvoyQuicAlarm : public quic::QuicAlarm { EnvoyQuicAlarm(Event::Dispatcher& dispatcher, const quic::QuicClock& clock, quic::QuicArenaScopedPtr delegate); - ~EnvoyQuicAlarm() override { ASSERT(!IsSet()); }; + // TimerImpl destruction deletes in-flight alarm firing event. + ~EnvoyQuicAlarm() override {} // quic::QuicAlarm void CancelImpl() override; diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.cc b/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.cc new file mode 100644 index 0000000000000..a3bd4a92d394c --- /dev/null +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.cc @@ -0,0 +1,116 @@ +#include "extensions/quic_listeners/quiche/envoy_quic_client_connection.h" + +#include "common/network/listen_socket_impl.h" +#include "common/network/socket_option_factory.h" + +#include "extensions/quic_listeners/quiche/envoy_quic_packet_writer.h" +#include "extensions/quic_listeners/quiche/envoy_quic_utils.h" +#include "extensions/transport_sockets/well_known_names.h" + +namespace Envoy { +namespace Quic { + +EnvoyQuicClientConnection::EnvoyQuicClientConnection( + const quic::QuicConnectionId& server_connection_id, + Network::Address::InstanceConstSharedPtr& initial_peer_address, + quic::QuicConnectionHelperInterface& helper, quic::QuicAlarmFactory& alarm_factory, + const quic::ParsedQuicVersionVector& supported_versions, + Network::Address::InstanceConstSharedPtr local_addr, Event::Dispatcher& dispatcher, + const Network::ConnectionSocket::OptionsSharedPtr& options) + : EnvoyQuicClientConnection(server_connection_id, helper, alarm_factory, supported_versions, + dispatcher, + createConnectionSocket(initial_peer_address, local_addr, options)) { +} + +EnvoyQuicClientConnection::EnvoyQuicClientConnection( + const quic::QuicConnectionId& server_connection_id, quic::QuicConnectionHelperInterface& helper, + quic::QuicAlarmFactory& alarm_factory, const quic::ParsedQuicVersionVector& supported_versions, + Event::Dispatcher& dispatcher, Network::ConnectionSocketPtr&& connection_socket) + : EnvoyQuicClientConnection(server_connection_id, helper, alarm_factory, + new EnvoyQuicPacketWriter(*connection_socket), true, + supported_versions, dispatcher, std::move(connection_socket)) {} + +EnvoyQuicClientConnection::EnvoyQuicClientConnection( + const quic::QuicConnectionId& server_connection_id, quic::QuicConnectionHelperInterface& helper, + quic::QuicAlarmFactory& alarm_factory, quic::QuicPacketWriter* writer, bool owns_writer, + const quic::ParsedQuicVersionVector& supported_versions, Event::Dispatcher& dispatcher, + Network::ConnectionSocketPtr&& connection_socket) + : EnvoyQuicConnection( + server_connection_id, + envoyAddressInstanceToQuicSocketAddress(connection_socket->remoteAddress()), helper, + alarm_factory, writer, owns_writer, quic::Perspective::IS_CLIENT, supported_versions, + std::move(connection_socket)), + dispatcher_(dispatcher) {} + +void EnvoyQuicClientConnection::processPacket( + Network::Address::InstanceConstSharedPtr local_address, + Network::Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer, + MonotonicTime receive_time) { + if (!connected()) { + return; + } + quic::QuicTime timestamp = + quic::QuicTime::Zero() + + quic::QuicTime::Delta::FromMicroseconds( + std::chrono::duration_cast(receive_time.time_since_epoch()) + .count()); + uint64_t num_slice = buffer->getRawSlices(nullptr, 0); + ASSERT(num_slice == 1); + Buffer::RawSlice slice; + buffer->getRawSlices(&slice, 1); + quic::QuicReceivedPacket packet(reinterpret_cast(slice.mem_), slice.len_, timestamp, + /*owns_buffer=*/false, /*ttl=*/0, /*ttl_valid=*/false, + /*packet_headers=*/nullptr, /*headers_length=*/0, + /*owns_header_buffer*/ false); + ProcessUdpPacket(envoyAddressInstanceToQuicSocketAddress(local_address), + envoyAddressInstanceToQuicSocketAddress(peer_address), packet); +} + +uint64_t EnvoyQuicClientConnection::maxPacketSize() const { + // TODO(danzh) make this variable configurable to support jumbo frames. + return Network::MAX_UDP_PACKET_SIZE; +} + +void EnvoyQuicClientConnection::setUpConnectionSocket() { + if (connectionSocket()->ioHandle().isOpen()) { + file_event_ = dispatcher_.createFileEvent( + connectionSocket()->ioHandle().fd(), + [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, + Event::FileReadyType::Read | Event::FileReadyType::Write); + + if (!Network::Socket::applyOptions(connectionSocket()->options(), *connectionSocket(), + envoy::api::v2::core::SocketOption::STATE_LISTENING)) { + ENVOY_CONN_LOG(error, "Fail to apply listening options", *this); + connectionSocket()->close(); + } + } + if (!connectionSocket()->ioHandle().isOpen()) { + CloseConnection(quic::QUIC_CONNECTION_CANCELLED, "Fail to set up connection socket.", + quic::ConnectionCloseBehavior::SILENT_CLOSE); + } +} + +void EnvoyQuicClientConnection::onFileEvent(uint32_t events) { + ENVOY_CONN_LOG(trace, "socket event: {}", *this, events); + ASSERT(events & (Event::FileReadyType::Read | Event::FileReadyType::Write)); + + if (events & Event::FileReadyType::Write) { + OnCanWrite(); + } + + // It's possible for a write event callback to close the connection, in such case ignore read + // event processing. + if (connected() && (events & Event::FileReadyType::Read)) { + Api::IoErrorPtr err = Network::Utility::readPacketsFromSocket( + connectionSocket()->ioHandle(), *connectionSocket()->localAddress(), *this, + dispatcher_.timeSource(), packets_dropped_); + // TODO(danzh): Handle no error when we limit the number of packets read. + if (err->getErrorCode() != Api::IoError::IoErrorCode::Again) { + ENVOY_CONN_LOG(error, "recvmsg result {}: {}", *this, static_cast(err->getErrorCode()), + err->getErrorDetails()); + } + } +} + +} // namespace Quic +} // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.h b/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.h new file mode 100644 index 0000000000000..8ee2f22dad2e1 --- /dev/null +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_connection.h @@ -0,0 +1,59 @@ +#pragma once + +#include "envoy/event/dispatcher.h" + +#include "common/network/utility.h" + +#include "extensions/quic_listeners/quiche/envoy_quic_connection.h" + +namespace Envoy { +namespace Quic { + +// A client QuicConnection instance managing its own file events. +class EnvoyQuicClientConnection : public EnvoyQuicConnection, public Network::UdpPacketProcessor { +public: + // A connection socket will be created with given |local_addr|. If binding + // port not provided in |local_addr|, pick up a random port. + EnvoyQuicClientConnection(const quic::QuicConnectionId& server_connection_id, + Network::Address::InstanceConstSharedPtr& initial_peer_address, + quic::QuicConnectionHelperInterface& helper, + quic::QuicAlarmFactory& alarm_factory, + const quic::ParsedQuicVersionVector& supported_versions, + Network::Address::InstanceConstSharedPtr local_addr, + Event::Dispatcher& dispatcher, + const Network::ConnectionSocket::OptionsSharedPtr& options); + + EnvoyQuicClientConnection(const quic::QuicConnectionId& server_connection_id, + quic::QuicConnectionHelperInterface& helper, + quic::QuicAlarmFactory& alarm_factory, quic::QuicPacketWriter* writer, + bool owns_writer, + const quic::ParsedQuicVersionVector& supported_versions, + Event::Dispatcher& dispatcher, + Network::ConnectionSocketPtr&& connection_socket); + + // Network::UdpPacketProcessor + void processPacket(Network::Address::InstanceConstSharedPtr local_address, + Network::Address::InstanceConstSharedPtr peer_address, + Buffer::InstancePtr buffer, MonotonicTime receive_time) override; + uint64_t maxPacketSize() const override; + + // Register file event and apply socket options. + void setUpConnectionSocket(); + +private: + EnvoyQuicClientConnection(const quic::QuicConnectionId& server_connection_id, + quic::QuicConnectionHelperInterface& helper, + quic::QuicAlarmFactory& alarm_factory, + const quic::ParsedQuicVersionVector& supported_versions, + Event::Dispatcher& dispatcher, + Network::ConnectionSocketPtr&& connection_socket); + + void onFileEvent(uint32_t events); + + uint32_t packets_dropped_{0}; + Event::Dispatcher& dispatcher_; + Event::FileEventPtr file_event_; +}; + +} // namespace Quic +} // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_session.cc b/source/extensions/quic_listeners/quiche/envoy_quic_client_session.cc new file mode 100644 index 0000000000000..d600055c5c8b0 --- /dev/null +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_session.cc @@ -0,0 +1,77 @@ +#include "extensions/quic_listeners/quiche/envoy_quic_client_session.h" + +namespace Envoy { +namespace Quic { + +EnvoyQuicClientSession::EnvoyQuicClientSession( + const quic::QuicConfig& config, const quic::ParsedQuicVersionVector& supported_versions, + std::unique_ptr connection, const quic::QuicServerId& server_id, + quic::QuicCryptoClientConfig* crypto_config, + quic::QuicClientPushPromiseIndex* push_promise_index, Event::Dispatcher& dispatcher, + uint32_t send_buffer_limit) + : QuicFilterManagerConnectionImpl(*connection, dispatcher, send_buffer_limit), + quic::QuicSpdyClientSession(config, supported_versions, connection.release(), server_id, + crypto_config, push_promise_index) {} + +EnvoyQuicClientSession::~EnvoyQuicClientSession() { + ASSERT(!connection()->connected()); + quic_connection_ = nullptr; +} + +absl::string_view EnvoyQuicClientSession::requestedServerName() const { + return {GetCryptoStream()->crypto_negotiated_params().sni}; +} + +void EnvoyQuicClientSession::connect() { + dynamic_cast(quic_connection_)->setUpConnectionSocket(); + // Start version negotiation and crypto handshake during which the connection may fail if server + // doesn't support the one and only supported version. + CryptoConnect(); + SetMaxAllowedPushId(0u); +} + +void EnvoyQuicClientSession::OnConnectionClosed(const quic::QuicConnectionCloseFrame& frame, + quic::ConnectionCloseSource source) { + quic::QuicSpdyClientSession::OnConnectionClosed(frame, source); + onConnectionCloseEvent(frame, source); +} + +void EnvoyQuicClientSession::Initialize() { + quic::QuicSpdyClientSession::Initialize(); + quic_connection_->setEnvoyConnection(*this); +} + +void EnvoyQuicClientSession::OnGoAway(const quic::QuicGoAwayFrame& frame) { + ENVOY_CONN_LOG(debug, "GOAWAY received with error {}: {}", *this, + quic::QuicErrorCodeToString(frame.error_code), frame.reason_phrase); + quic::QuicSpdyClientSession::OnGoAway(frame); + if (http_connection_callbacks_ != nullptr) { + http_connection_callbacks_->onGoAway(); + } +} + +void EnvoyQuicClientSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) { + quic::QuicSpdyClientSession::OnCryptoHandshakeEvent(event); + if (event == HANDSHAKE_CONFIRMED) { + raiseConnectionEvent(Network::ConnectionEvent::Connected); + } +} + +std::unique_ptr EnvoyQuicClientSession::CreateClientStream() { + return std::make_unique(GetNextOutgoingBidirectionalStreamId(), this, + quic::BIDIRECTIONAL); +} + +quic::QuicSpdyStream* EnvoyQuicClientSession::CreateIncomingStream(quic::QuicStreamId /*id*/) { + // Disallow server initiated stream. + NOT_REACHED_GCOVR_EXCL_LINE; +} + +quic::QuicSpdyStream* +EnvoyQuicClientSession::CreateIncomingStream(quic::PendingStream* /*pending*/) { + // Disallow server initiated stream. + NOT_REACHED_GCOVR_EXCL_LINE; +} + +} // namespace Quic +} // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_session.h b/source/extensions/quic_listeners/quiche/envoy_quic_client_session.h new file mode 100644 index 0000000000000..1bebce79ed2b9 --- /dev/null +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_session.h @@ -0,0 +1,76 @@ +#pragma once + +#pragma GCC diagnostic push +// QUICHE allows unused parameters. +#pragma GCC diagnostic ignored "-Wunused-parameter" +// QUICHE uses offsetof(). +#pragma GCC diagnostic ignored "-Winvalid-offsetof" +#pragma GCC diagnostic ignored "-Wtype-limits" + +#include "quiche/quic/core/http/quic_spdy_client_session.h" + +#pragma GCC diagnostic pop + +#include "extensions/quic_listeners/quiche/envoy_quic_client_stream.h" +#include "extensions/quic_listeners/quiche/envoy_quic_client_connection.h" +#include "extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h" + +namespace Envoy { +namespace Quic { + +// Act as a Network::ClientConnection to ClientCodec. +// TODO(danzh) This class doesn't need to inherit Network::FilterManager +// interface but need all other Network::Connection implementation in +// QuicFilterManagerConnectionImpl. Refactor QuicFilterManagerConnectionImpl to +// move FilterManager interface to EnvoyQuicServerSession. +class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl, + public quic::QuicSpdyClientSession, + public Network::ClientConnection { +public: + EnvoyQuicClientSession(const quic::QuicConfig& config, + const quic::ParsedQuicVersionVector& supported_versions, + std::unique_ptr connection, + const quic::QuicServerId& server_id, + quic::QuicCryptoClientConfig* crypto_config, + quic::QuicClientPushPromiseIndex* push_promise_index, + Event::Dispatcher& dispatcher, uint32_t send_buffer_limit); + + ~EnvoyQuicClientSession() override; + + // Called by QuicHttpClientConnectionImpl before creating data streams. + void setHttpConnectionCallbacks(Http::ConnectionCallbacks& callbacks) { + http_connection_callbacks_ = &callbacks; + } + + // Network::Connection + absl::string_view requestedServerName() const override; + + // Network::ClientConnection + // Set up socket and start handshake. + void connect() override; + + // quic::QuicSession + void OnConnectionClosed(const quic::QuicConnectionCloseFrame& frame, + quic::ConnectionCloseSource source) override; + void Initialize() override; + void OnGoAway(const quic::QuicGoAwayFrame& frame) override; + // quic::QuicSpdyClientSessionBase + void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override; + + using quic::QuicSpdyClientSession::stream_map; + +protected: + // quic::QuicSpdyClientSession + std::unique_ptr CreateClientStream() override; + // quic::QuicSpdySession + quic::QuicSpdyStream* CreateIncomingStream(quic::QuicStreamId id) override; + quic::QuicSpdyStream* CreateIncomingStream(quic::PendingStream* pending) override; + +private: + // These callbacks are owned by network filters and quic session should outlive + // them. + Http::ConnectionCallbacks* http_connection_callbacks_{nullptr}; +}; + +} // namespace Quic +} // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.cc b/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.cc new file mode 100644 index 0000000000000..b6fed6f10833f --- /dev/null +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.cc @@ -0,0 +1,234 @@ +#include "extensions/quic_listeners/quiche/envoy_quic_client_stream.h" + +#pragma GCC diagnostic push +// QUICHE allows unused parameters. +#pragma GCC diagnostic ignored "-Wunused-parameter" +// QUICHE uses offsetof(). +#pragma GCC diagnostic ignored "-Winvalid-offsetof" + +#include "quiche/quic/core/quic_session.h" +#include "quiche/quic/core/http/quic_header_list.h" +#include "quiche/spdy/core/spdy_header_block.h" +#include "extensions/quic_listeners/quiche/platform/quic_mem_slice_span_impl.h" + +#pragma GCC diagnostic pop + +#include "extensions/quic_listeners/quiche/envoy_quic_utils.h" +#include "extensions/quic_listeners/quiche/envoy_quic_client_session.h" + +#include "common/buffer/buffer_impl.h" +#include "common/http/header_map_impl.h" +#include "common/common/assert.h" + +namespace Envoy { +namespace Quic { + +EnvoyQuicClientStream::EnvoyQuicClientStream(quic::QuicStreamId id, + quic::QuicSpdyClientSession* client_session, + quic::StreamType type) + : quic::QuicSpdyClientStream(id, client_session, type), + EnvoyQuicStream( + // This should be larger than 8k to fully utilize congestion control + // window. And no larger than the max stream flow control window for + // the stream to buffer all the data. + // Ideally this limit should also correlate to peer's receive window + // but not fully depends on that. + 16 * 1024, [this]() { runLowWatermarkCallbacks(); }, + [this]() { runHighWatermarkCallbacks(); }) {} + +EnvoyQuicClientStream::EnvoyQuicClientStream(quic::PendingStream* pending, + quic::QuicSpdyClientSession* client_session, + quic::StreamType type) + : quic::QuicSpdyClientStream(pending, client_session, type), + EnvoyQuicStream( + 16 * 1024, [this]() { runLowWatermarkCallbacks(); }, + [this]() { runHighWatermarkCallbacks(); }) {} + +void EnvoyQuicClientStream::encode100ContinueHeaders(const Http::HeaderMap& headers) { + ASSERT(headers.Status()->value() == "100"); + encodeHeaders(headers, false); +} + +void EnvoyQuicClientStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) { + ENVOY_STREAM_LOG(debug, "encodeHeaders: (end_stream={}) {}.", *this, end_stream, headers); + WriteHeaders(envoyHeadersToSpdyHeaderBlock(headers), end_stream, nullptr); + local_end_stream_ = end_stream; +} + +void EnvoyQuicClientStream::encodeData(Buffer::Instance& data, bool end_stream) { + ENVOY_STREAM_LOG(debug, "encodeData (end_stream={}) of {} bytes.", *this, end_stream, + data.length()); + local_end_stream_ = end_stream; + // This is counting not serialized bytes in the send buffer. + uint64_t bytes_to_send_old = BufferedDataBytes(); + // QUIC stream must take all. + WriteBodySlices(quic::QuicMemSliceSpan(quic::QuicMemSliceSpanImpl(data)), end_stream); + if (data.length() > 0) { + // Send buffer didn't take all the data, threshold needs to be adjusted. + Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD); + return; + } + + uint64_t bytes_to_send_new = BufferedDataBytes(); + ASSERT(bytes_to_send_old <= bytes_to_send_new); + maybeCheckWatermark(bytes_to_send_old, bytes_to_send_new, *filterManagerConnection()); +} + +void EnvoyQuicClientStream::encodeTrailers(const Http::HeaderMap& trailers) { + ASSERT(!local_end_stream_); + local_end_stream_ = true; + ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers); + WriteTrailers(envoyHeadersToSpdyHeaderBlock(trailers), nullptr); +} + +void EnvoyQuicClientStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) { + // Metadata Frame is not supported in QUIC. + // TODO(danzh): add stats for metadata not supported error. +} + +void EnvoyQuicClientStream::resetStream(Http::StreamResetReason reason) { + // Higher layers expect calling resetStream() to immediately raise reset callbacks. + runResetCallbacks(reason); + Reset(envoyResetReasonToQuicRstError(reason)); +} + +void EnvoyQuicClientStream::switchStreamBlockState(bool should_block) { + ASSERT(FinishedReadingHeaders(), + "Upper stream buffer limit is reached before response body is delivered."); + if (should_block) { + sequencer()->SetBlockedUntilFlush(); + } else { + ASSERT(read_disable_counter_ == 0, "readDisable called in between."); + sequencer()->SetUnblocked(); + } +} + +void EnvoyQuicClientStream::OnInitialHeadersComplete(bool fin, size_t frame_len, + const quic::QuicHeaderList& header_list) { + quic::QuicSpdyStream::OnInitialHeadersComplete(fin, frame_len, header_list); + if (rst_sent()) { + return; + } + ASSERT(decoder() != nullptr); + ASSERT(headers_decompressed()); + decoder()->decodeHeaders(quicHeadersToEnvoyHeaders(header_list), /*end_stream=*/fin); + if (fin) { + end_stream_decoded_ = true; + } + ConsumeHeaderList(); +} + +void EnvoyQuicClientStream::OnBodyAvailable() { + ASSERT(FinishedReadingHeaders()); + ASSERT(read_disable_counter_ == 0); + ASSERT(!in_decode_data_callstack_); + in_decode_data_callstack_ = true; + + Buffer::InstancePtr buffer = std::make_unique(); + // TODO(danzh): check Envoy per stream buffer limit. + // Currently read out all the data. + while (HasBytesToRead()) { + struct iovec iov; + int num_regions = GetReadableRegions(&iov, 1); + ASSERT(num_regions > 0); + size_t bytes_read = iov.iov_len; + Buffer::RawSlice slice; + buffer->reserve(bytes_read, &slice, 1); + ASSERT(slice.len_ >= bytes_read); + slice.len_ = bytes_read; + memcpy(slice.mem_, iov.iov_base, iov.iov_len); + buffer->commit(&slice, 1); + MarkConsumed(bytes_read); + } + + // True if no trailer and FIN read. + bool finished_reading = IsDoneReading(); + bool empty_payload_with_fin = buffer->length() == 0 && fin_received(); + // If this call is triggered by an empty frame with FIN which is not from peer + // but synthesized by stream itself upon receiving HEADERS with FIN or + // TRAILERS, do not deliver end of stream here. Because either decodeHeaders + // already delivered it or decodeTrailers will be called. + bool skip_decoding = empty_payload_with_fin && (end_stream_decoded_ || !finished_reading); + if (!skip_decoding) { + decoder()->decodeData(*buffer, finished_reading); + if (finished_reading) { + end_stream_decoded_ = true; + } + } + + if (!sequencer()->IsClosed()) { + in_decode_data_callstack_ = false; + if (read_disable_counter_ > 0) { + // If readDisable() was ever called during decodeData() and it meant to disable + // reading from downstream, the call must have been deferred. Call it now. + switchStreamBlockState(true); + } + return; + } + + if (!quic::VersionUsesHttp3(transport_version()) && !FinishedReadingTrailers()) { + // For Google QUIC implementation, trailers may arrived earlier and wait to + // be consumed after reading all the body. Consume it here. + // IETF QUIC shouldn't reach here because trailers are sent on same stream. + decoder()->decodeTrailers(spdyHeaderBlockToEnvoyHeaders(received_trailers())); + MarkTrailersConsumed(); + } + OnFinRead(); + in_decode_data_callstack_ = false; +} + +void EnvoyQuicClientStream::OnTrailingHeadersComplete(bool fin, size_t frame_len, + const quic::QuicHeaderList& header_list) { + quic::QuicSpdyStream::OnTrailingHeadersComplete(fin, frame_len, header_list); + ASSERT(trailers_decompressed()); + if (session()->connection()->connected() && + (quic::VersionUsesHttp3(transport_version()) || sequencer()->IsClosed()) && + !FinishedReadingTrailers()) { + // Before QPack, trailers can arrive before body. Only decode trailers after finishing decoding + // body. + ASSERT(decoder() != nullptr); + decoder()->decodeTrailers(spdyHeaderBlockToEnvoyHeaders(received_trailers())); + MarkTrailersConsumed(); + } +} + +void EnvoyQuicClientStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) { + quic::QuicSpdyClientStream::OnStreamReset(frame); + runResetCallbacks(quicRstErrorToEnvoyResetReason(frame.error_code)); +} + +void EnvoyQuicClientStream::OnConnectionClosed(quic::QuicErrorCode error, + quic::ConnectionCloseSource source) { + quic::QuicSpdyClientStream::OnConnectionClosed(error, source); + runResetCallbacks(quicErrorCodeToEnvoyResetReason(error)); +} + +void EnvoyQuicClientStream::OnClose() { + quic::QuicSpdyClientStream::OnClose(); + if (BufferedDataBytes() > 0) { + // If the stream is closed without sending out all buffered data, regard + // them as sent now and adjust connection buffer book keeping. + filterManagerConnection()->adjustBytesToSend(0 - BufferedDataBytes()); + } +} + +void EnvoyQuicClientStream::OnCanWrite() { + uint64_t buffered_data_old = BufferedDataBytes(); + quic::QuicSpdyClientStream::OnCanWrite(); + uint64_t buffered_data_new = BufferedDataBytes(); + // As long as OnCanWriteNewData() is no-op, data to sent in buffer shouldn't + // increase. + ASSERT(buffered_data_new <= buffered_data_old); + maybeCheckWatermark(buffered_data_old, buffered_data_new, *filterManagerConnection()); +} + +uint32_t EnvoyQuicClientStream::streamId() { return id(); } + +Network::Connection* EnvoyQuicClientStream::connection() { return filterManagerConnection(); } + +QuicFilterManagerConnectionImpl* EnvoyQuicClientStream::filterManagerConnection() { + return dynamic_cast(session()); +} + +} // namespace Quic +} // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.h b/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.h new file mode 100644 index 0000000000000..6f99a80980413 --- /dev/null +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.h @@ -0,0 +1,60 @@ +#pragma once + +#pragma GCC diagnostic push +// QUICHE allows unused parameters. +#pragma GCC diagnostic ignored "-Wunused-parameter" +// QUICHE uses offsetof(). +#pragma GCC diagnostic ignored "-Winvalid-offsetof" +#include "quiche/quic/core/http/quic_spdy_client_stream.h" + +#pragma GCC diagnostic pop + +#include "extensions/quic_listeners/quiche/envoy_quic_stream.h" + +namespace Envoy { +namespace Quic { + +// This class is a quic stream and also a request encoder. +class EnvoyQuicClientStream : public quic::QuicSpdyClientStream, public EnvoyQuicStream { +public: + EnvoyQuicClientStream(quic::QuicStreamId id, quic::QuicSpdyClientSession* client_session, + quic::StreamType type); + EnvoyQuicClientStream(quic::PendingStream* pending, quic::QuicSpdyClientSession* client_session, + quic::StreamType type); + + // Http::StreamEncoder + void encode100ContinueHeaders(const Http::HeaderMap& headers) override; + void encodeHeaders(const Http::HeaderMap& headers, bool end_stream) override; + void encodeData(Buffer::Instance& data, bool end_stream) override; + void encodeTrailers(const Http::HeaderMap& trailers) override; + void encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) override; + + // Http::Stream + void resetStream(Http::StreamResetReason reason) override; + // quic::QuicSpdyStream + void OnBodyAvailable() override; + void OnStreamReset(const quic::QuicRstStreamFrame& frame) override; + void OnClose() override; + void OnCanWrite() override; + // quic::Stream + void OnConnectionClosed(quic::QuicErrorCode error, quic::ConnectionCloseSource source) override; + +protected: + // EnvoyQuicStream + void switchStreamBlockState(bool should_block) override; + uint32_t streamId() override; + Network::Connection* connection() override; + + // quic::QuicSpdyStream + // Overridden to pass headers to decoder. + void OnInitialHeadersComplete(bool fin, size_t frame_len, + const quic::QuicHeaderList& header_list) override; + void OnTrailingHeadersComplete(bool fin, size_t frame_len, + const quic::QuicHeaderList& header_list) override; + +private: + QuicFilterManagerConnectionImpl* filterManagerConnection(); +}; + +} // namespace Quic +} // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_fake_proof_verifier.h b/source/extensions/quic_listeners/quiche/envoy_quic_fake_proof_verifier.h index 0861e09fb4d9b..c8355717bccb8 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_fake_proof_verifier.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_fake_proof_verifier.h @@ -48,7 +48,8 @@ class EnvoyQuicFakeProofVerifier : public quic::ProofVerifier { const quic::ProofVerifyContext* /*context*/, std::string* /*error_details*/, std::unique_ptr* /*details*/, std::unique_ptr /*callback*/) override { - if (cert_sct == "Fake timestamp" && certs.size() == 1 && certs[0] == "Fake cert") { + // Cert SCT support is not enabled for fake ProofSource. + if (cert_sct == "" && certs.size() == 1 && certs[0] == "Fake cert") { return quic::QUIC_SUCCESS; } return quic::QUIC_FAILURE; diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc index 9445f9c18767f..c12a32690231a 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_session.cc @@ -23,10 +23,11 @@ EnvoyQuicServerSession::EnvoyQuicServerSession( uint32_t send_buffer_limit) : quic::QuicServerSessionBase(config, supported_versions, connection.get(), visitor, helper, crypto_config, compressed_certs_cache), - QuicFilterManagerConnectionImpl(connection.get(), dispatcher, send_buffer_limit), + QuicFilterManagerConnectionImpl(*connection, dispatcher, send_buffer_limit), quic_connection_(std::move(connection)) {} EnvoyQuicServerSession::~EnvoyQuicServerSession() { + ASSERT(!quic_connection_->connected()); QuicFilterManagerConnectionImpl::quic_connection_ = nullptr; } diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc index a815c2fbc4e89..ca3ff13ca47e8 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc @@ -96,7 +96,7 @@ void EnvoyQuicServerStream::encodeTrailers(const Http::HeaderMap& trailers) { void EnvoyQuicServerStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) { // Metadata Frame is not supported in QUIC. - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + // TODO(danzh): add stats for metadata not supported error. } void EnvoyQuicServerStream::resetStream(Http::StreamResetReason reason) { diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_stream.h b/source/extensions/quic_listeners/quiche/envoy_quic_stream.h index 228d0cf645926..59a7341b59341 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_stream.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_stream.h @@ -47,7 +47,26 @@ class EnvoyQuicStream : public Http::StreamEncoder, // Avoid calling this while decoding data because transient disabling and // enabling reading may trigger another decoding data inside the // callstack which messes up stream state. - switchStreamBlockState(disable); + if (disable) { + // Block QUIC stream right away. And if there are queued switching + // state callback, update the desired state as well. + switchStreamBlockState(true); + if (unblock_posted_) { + should_block_ = true; + } + } else { + should_block_ = false; + if (!unblock_posted_) { + // If this is the first time unblocking stream is desired, post a + // callback to do it in next loop. This is because unblocking QUIC + // stream can lead to immediate upstream encoding. + unblock_posted_ = true; + connection()->dispatcher().post([this] { + unblock_posted_ = false; + switchStreamBlockState(should_block_); + }); + } + } } } @@ -109,6 +128,15 @@ class EnvoyQuicStream : public Http::StreamEncoder, // OnBodyDataAvailable() hands all the ready-to-use request data from stream sequencer to HCM // directly and buffers them in filters if needed. Itself doesn't buffer request data. EnvoyQuicSimulatedWatermarkBuffer send_buffer_simulation_; + + // True if there is posted unblocking QUIC stream callback. There should be + // only one such callback no matter how many times readDisable() is called. + bool unblock_posted_{false}; + // The latest state an unblocking QUIC stream callback should look at. As + // more readDisable() calls may happen between the callback is posted and it's + // executed, the stream might be unblocked and blocked several times. Only the + // latest desired state should be considered by the callback. + bool should_block_{false}; }; } // namespace Quic diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_utils.cc b/source/extensions/quic_listeners/quiche/envoy_quic_utils.cc index 6eaf4616ecb4b..b02f76c7b1ade 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_utils.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_utils.cc @@ -1,5 +1,9 @@ #include "extensions/quic_listeners/quiche/envoy_quic_utils.h" +#include + +#include "common/network/socket_option_factory.h" + namespace Envoy { namespace Quic { @@ -105,5 +109,37 @@ Http::StreamResetReason quicErrorCodeToEnvoyResetReason(quic::QuicErrorCode erro } } +Network::ConnectionSocketPtr +createConnectionSocket(Network::Address::InstanceConstSharedPtr& peer_addr, + Network::Address::InstanceConstSharedPtr& local_addr, + const Network::ConnectionSocket::OptionsSharedPtr& options) { + Network::IoHandlePtr io_handle = peer_addr->socket(Network::Address::SocketType::Datagram); + auto connection_socket = + std::make_unique(std::move(io_handle), local_addr, peer_addr); + connection_socket->addOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions()); + connection_socket->addOptions(Network::SocketOptionFactory::buildRxQueueOverFlowOptions()); + if (options != nullptr) { + connection_socket->addOptions(options); + } + if (!Network::Socket::applyOptions(connection_socket->options(), *connection_socket, + envoy::api::v2::core::SocketOption::STATE_PREBIND)) { + connection_socket->close(); + ENVOY_LOG_MISC(error, "Fail to apply pre-bind options"); + return connection_socket; + } + local_addr->bind(connection_socket->ioHandle().fd()); + ASSERT(local_addr->ip()); + if (local_addr->ip()->port() == 0) { + // Get ephemeral port number. + local_addr = Network::Address::addressFromFd(connection_socket->ioHandle().fd()); + } + if (!Network::Socket::applyOptions(connection_socket->options(), *connection_socket, + envoy::api::v2::core::SocketOption::STATE_BOUND)) { + ENVOY_LOG_MISC(error, "Fail to apply post-bind options"); + connection_socket->close(); + } + return connection_socket; +} + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_utils.h b/source/extensions/quic_listeners/quiche/envoy_quic_utils.h index 816c5c99d1b3f..4236c1deeea7e 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_utils.h +++ b/source/extensions/quic_listeners/quiche/envoy_quic_utils.h @@ -4,6 +4,7 @@ #include "common/common/assert.h" #include "common/http/header_map_impl.h" #include "common/network/address_impl.h" +#include "common/network/listen_socket_impl.h" #pragma GCC diagnostic push @@ -48,5 +49,12 @@ Http::StreamResetReason quicRstErrorToEnvoyResetReason(quic::QuicRstStreamErrorC // Called when underlying QUIC connection is closed either locally or by peer. Http::StreamResetReason quicErrorCodeToEnvoyResetReason(quic::QuicErrorCode error); +// Create a connection socket instance and apply given socket options to the +// socket. IP_PKTINFO and SO_RXQ_OVFL is always set if supported. +Network::ConnectionSocketPtr +createConnectionSocket(Network::Address::InstanceConstSharedPtr& peer_addr, + Network::Address::InstanceConstSharedPtr& local_addr, + const Network::ConnectionSocket::OptionsSharedPtr& options); + } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc index 53d8671df2b29..13cb3829f0047 100644 --- a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc +++ b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.cc @@ -5,17 +5,17 @@ namespace Envoy { namespace Quic { -QuicFilterManagerConnectionImpl::QuicFilterManagerConnectionImpl(EnvoyQuicConnection* connection, +QuicFilterManagerConnectionImpl::QuicFilterManagerConnectionImpl(EnvoyQuicConnection& connection, Event::Dispatcher& dispatcher, uint32_t send_buffer_limit) // Using this for purpose other than logging is not safe. Because QUIC connection id can be // 18 bytes, so there might be collision when it's hashed to 8 bytes. - : Network::ConnectionImplBase(dispatcher, /*id=*/connection->connection_id().Hash()), - quic_connection_(connection), filter_manager_(*this), stream_info_(dispatcher.timeSource()), + : Network::ConnectionImplBase(dispatcher, /*id=*/connection.connection_id().Hash()), + quic_connection_(&connection), filter_manager_(*this), stream_info_(dispatcher.timeSource()), write_buffer_watermark_simulation_( send_buffer_limit / 2, send_buffer_limit, [this]() { onSendBufferLowWatermark(); }, [this]() { onSendBufferHighWatermark(); }, ENVOY_LOGGER()) { - stream_info_.protocol(Http::Protocol::Http2); + stream_info_.protocol(Http::Protocol::Http3); } void QuicFilterManagerConnectionImpl::addWriteFilter(Network::WriteFilterSharedPtr filter) { diff --git a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h index 58395e9b8b10f..a6b34cda9e798 100644 --- a/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h +++ b/source/extensions/quic_listeners/quiche/quic_filter_manager_connection_impl.h @@ -17,7 +17,7 @@ namespace Quic { // Act as a Network::Connection to HCM and a FilterManager to FilterFactoryCb. class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase { public: - QuicFilterManagerConnectionImpl(EnvoyQuicConnection* connection, Event::Dispatcher& dispatcher, + QuicFilterManagerConnectionImpl(EnvoyQuicConnection& connection, Event::Dispatcher& dispatcher, uint32_t send_buffer_limit); // Network::FilterManager @@ -105,7 +105,7 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase { void closeConnectionImmediately() override; - EnvoyQuicConnection* quic_connection_; + EnvoyQuicConnection* quic_connection_{nullptr}; private: // Called when aggregated buffered bytes across all the streams exceeds high watermark. diff --git a/test/config/utility.cc b/test/config/utility.cc index 17804e22de839..a7f75450ab0d6 100644 --- a/test/config/utility.cc +++ b/test/config/utility.cc @@ -116,6 +116,41 @@ const std::string ConfigHelper::HTTP_PROXY_CONFIG = BASE_CONFIG + R"EOF( name: route_config_0 )EOF"; +// TODO(danzh): For better compatibility with HTTP integration test framework, +// it's better to combine with HTTP_PROXY_CONFIG, and use config modifiers to +// specify quic specific things. +const std::string ConfigHelper::QUIC_HTTP_PROXY_CONFIG = BASE_UDP_LISTENER_CONFIG + R"EOF( + filter_chains: + transport_socket: + name: envoy.transport_sockets.quic + filters: + name: envoy.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager + stat_prefix: config_test + http_filters: + name: envoy.router + codec_type: HTTP3 + access_log: + name: envoy.file_access_log + filter: + not_health_check_filter: {} + config: + path: /dev/null + route_config: + virtual_hosts: + name: integration + routes: + route: + cluster: cluster_0 + match: + prefix: "/" + domains: "*" + name: route_config_0 + udp_listener_config: + udp_listener_name: "quiche_quic_listener" +)EOF"; + const std::string ConfigHelper::DEFAULT_BUFFER_FILTER = R"EOF( name: envoy.buffer diff --git a/test/config/utility.h b/test/config/utility.h index 8359fa0edeb1b..45a6dce7f7df8 100644 --- a/test/config/utility.h +++ b/test/config/utility.h @@ -78,7 +78,8 @@ class ConfigHelper { static const std::string TCP_PROXY_CONFIG; // A basic configuration for L7 proxying. static const std::string HTTP_PROXY_CONFIG; - + // A basic configuration for L7 proxying with QUIC transport. + static const std::string QUIC_HTTP_PROXY_CONFIG; // A string for a basic buffer filter, which can be used with addFilter() static const std::string DEFAULT_BUFFER_FILTER; // A string for a small buffer filter, which can be used with addFilter() diff --git a/test/extensions/quic_listeners/quiche/BUILD b/test/extensions/quic_listeners/quiche/BUILD index e225d839e0bee..2b34c79040947 100644 --- a/test/extensions/quic_listeners/quiche/BUILD +++ b/test/extensions/quic_listeners/quiche/BUILD @@ -69,6 +69,26 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "envoy_quic_client_stream_test", + srcs = ["envoy_quic_client_stream_test.cc"], + tags = ["nofips"], + deps = [ + ":quic_test_utils_for_envoy_lib", + ":test_utils_lib", + "//source/common/http:headers_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_alarm_factory_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_client_connection_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_client_session_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_connection_helper_lib", + "//test/mocks/http:http_mocks", + "//test/mocks/http:stream_decoder_mock", + "//test/mocks/network:network_mocks", + "//test/test_common:utility_lib", + "@com_googlesource_quiche//:quic_core_http_spdy_session_lib", + ], +) + envoy_cc_test( name = "envoy_quic_server_session_test", srcs = ["envoy_quic_server_session_test.cc"], @@ -96,6 +116,27 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "envoy_quic_client_session_test", + srcs = ["envoy_quic_client_session_test.cc"], + tags = ["nofips"], + deps = [ + ":quic_test_utils_for_envoy_lib", + "//include/envoy/stats:stats_macros", + "//source/extensions/quic_listeners/quiche:codec_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_alarm_factory_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_client_connection_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_client_session_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_connection_helper_lib", + "//test/mocks/http:http_mocks", + "//test/mocks/http:stream_decoder_mock", + "//test/mocks/network:network_mocks", + "//test/mocks/stats:stats_mocks", + "//test/test_common:logging_lib", + "//test/test_common:simulated_time_system_lib", + ], +) + envoy_cc_test( name = "active_quic_listener_test", srcs = ["active_quic_listener_test.cc"], diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_alarm_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_alarm_test.cc index 9ab3753ec8552..80578bd22f130 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_alarm_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_alarm_test.cc @@ -189,5 +189,17 @@ TEST_F(EnvoyQuicAlarmTest, CancelActiveAlarm) { EXPECT_FALSE(unowned_delegate->fired()); } +TEST_F(EnvoyQuicAlarmTest, CancelUponDestruction) { + auto unowned_delegate = new TestDelegate(); + quic::QuicAlarm* alarm = alarm_factory_.CreateAlarm(unowned_delegate); + // alarm becomes active upon Set(). + alarm->Set(clock_.Now() + QuicTime::Delta::FromMilliseconds(10)); + // delegate should be destroyed with alarm. + delete alarm; + // alarm firing callback should have been cancelled, otherwise the delegate + // would be used after free. + advanceMsAndLoop(10); +} + } // namespace Quic } // namespace Envoy diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_client_session_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_client_session_test.cc new file mode 100644 index 0000000000000..ea2c111e0a582 --- /dev/null +++ b/test/extensions/quic_listeners/quiche/envoy_quic_client_session_test.cc @@ -0,0 +1,236 @@ +#pragma GCC diagnostic push +// QUICHE allows unused parameters. +#pragma GCC diagnostic ignored "-Wunused-parameter" +// QUICHE uses offsetof(). +#pragma GCC diagnostic ignored "-Winvalid-offsetof" + +#include "quiche/quic/core/crypto/null_encrypter.h" +#include "quiche/quic/test_tools/crypto_test_utils.h" +#include "quiche/quic/test_tools/quic_test_utils.h" + +#pragma GCC diagnostic pop + +#include "extensions/quic_listeners/quiche/envoy_quic_client_session.h" +#include "extensions/quic_listeners/quiche/envoy_quic_client_connection.h" +#include "extensions/quic_listeners/quiche/codec_impl.h" +#include "extensions/quic_listeners/quiche/envoy_quic_connection_helper.h" +#include "extensions/quic_listeners/quiche/envoy_quic_alarm_factory.h" +#include "extensions/quic_listeners/quiche/envoy_quic_utils.h" + +#include "envoy/stats/stats_macros.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/http/stream_decoder.h" +#include "test/mocks/http/mocks.h" +#include "test/mocks/network/mocks.h" +#include "test/mocks/stats/mocks.h" +#include "test/test_common/logging.h" +#include "test/test_common/simulated_time_system.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Quic { + +class TestEnvoyQuicClientConnection : public EnvoyQuicClientConnection { +public: + TestEnvoyQuicClientConnection(const quic::QuicConnectionId& server_connection_id, + quic::QuicConnectionHelperInterface& helper, + quic::QuicAlarmFactory& alarm_factory, + quic::QuicPacketWriter& writer, + const quic::ParsedQuicVersionVector& supported_versions, + Event::Dispatcher& dispatcher, + Network::ConnectionSocketPtr&& connection_socket) + : EnvoyQuicClientConnection(server_connection_id, helper, alarm_factory, &writer, false, + supported_versions, dispatcher, std::move(connection_socket)) { + SetDefaultEncryptionLevel(quic::ENCRYPTION_FORWARD_SECURE); + SetEncrypter(quic::ENCRYPTION_FORWARD_SECURE, + std::make_unique(quic::Perspective::IS_CLIENT)); + } + + MOCK_METHOD2(SendConnectionClosePacket, void(quic::QuicErrorCode, const std::string&)); + MOCK_METHOD1(SendControlFrame, bool(const quic::QuicFrame& frame)); + + using EnvoyQuicClientConnection::connectionStats; +}; + +class TestQuicCryptoClientStream : public quic::QuicCryptoClientStream { +public: + TestQuicCryptoClientStream(const quic::QuicServerId& server_id, quic::QuicSession* session, + std::unique_ptr verify_context, + quic::QuicCryptoClientConfig* crypto_config, + ProofHandler* proof_handler) + : quic::QuicCryptoClientStream(server_id, session, std::move(verify_context), crypto_config, + proof_handler) {} + + bool encryption_established() const override { return true; } +}; + +class TestEnvoyQuicClientSession : public EnvoyQuicClientSession { +public: + TestEnvoyQuicClientSession(const quic::QuicConfig& config, + const quic::ParsedQuicVersionVector& supported_versions, + std::unique_ptr connection, + const quic::QuicServerId& server_id, + quic::QuicCryptoClientConfig* crypto_config, + quic::QuicClientPushPromiseIndex* push_promise_index, + Event::Dispatcher& dispatcher, uint32_t send_buffer_limit) + : EnvoyQuicClientSession(config, supported_versions, std::move(connection), server_id, + crypto_config, push_promise_index, dispatcher, send_buffer_limit) {} + + std::unique_ptr CreateQuicCryptoStream() override { + return std::make_unique( + server_id(), this, crypto_config()->proof_verifier()->CreateDefaultContext(), + crypto_config(), this); + } +}; + +class EnvoyQuicClientSessionTest : public testing::TestWithParam { +public: + EnvoyQuicClientSessionTest() + : api_(Api::createApiForTest(time_system_)), dispatcher_(api_->allocateDispatcher()), + connection_helper_(*dispatcher_), + alarm_factory_(*dispatcher_, *connection_helper_.GetClock()), quic_version_([]() { + SetQuicReloadableFlag(quic_enable_version_99, GetParam()); + return quic::ParsedVersionOfIndex(quic::CurrentSupportedVersions(), 0); + }()), + peer_addr_(Network::Utility::getAddressWithPort(*Network::Utility::getIpv6LoopbackAddress(), + 12345)), + self_addr_(Network::Utility::getAddressWithPort(*Network::Utility::getIpv6LoopbackAddress(), + 54321)), + quic_connection_(new TestEnvoyQuicClientConnection( + quic::test::TestConnectionId(), connection_helper_, alarm_factory_, writer_, + quic_version_, *dispatcher_, createConnectionSocket(peer_addr_, self_addr_, nullptr))), + crypto_config_(quic::test::crypto_test_utils::ProofVerifierForTesting()), + envoy_quic_session_(quic_config_, quic_version_, + std::unique_ptr(quic_connection_), + quic::QuicServerId("example.com", 443, false), &crypto_config_, nullptr, + *dispatcher_, + /*send_buffer_limit*/ 1024 * 1024), + http_connection_(envoy_quic_session_, http_connection_callbacks_) { + EXPECT_EQ(time_system_.systemTime(), envoy_quic_session_.streamInfo().startTime()); + EXPECT_EQ(EMPTY_STRING, envoy_quic_session_.nextProtocol()); + EXPECT_EQ(Http::Protocol::Http3, http_connection_.protocol()); + + time_system_.sleep(std::chrono::milliseconds(1)); + ON_CALL(writer_, WritePacket(_, _, _, _, _)) + .WillByDefault(testing::Return(quic::WriteResult(quic::WRITE_STATUS_OK, 1))); + } + + void SetUp() override { + envoy_quic_session_.Initialize(); + envoy_quic_session_.addConnectionCallbacks(network_connection_callbacks_); + envoy_quic_session_.setConnectionStats( + {read_total_, read_current_, write_total_, write_current_, nullptr, nullptr}); + EXPECT_EQ(&read_total_, &quic_connection_->connectionStats().read_total_); + } + + void TearDown() override { + if (quic_connection_->connected()) { + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + envoy_quic_session_.close(Network::ConnectionCloseType::NoFlush); + } + } + + EnvoyQuicClientStream& sendGetRequest(Http::StreamDecoder& response_decoder, + Http::StreamCallbacks& stream_callbacks) { + auto& stream = + dynamic_cast(http_connection_.newStream(response_decoder)); + stream.getStream().addCallbacks(stream_callbacks); + + std::string host("www.abc.com"); + Http::TestHeaderMapImpl request_headers{ + {":authority", host}, {":method", "GET"}, {":path", "/"}}; + stream.encodeHeaders(request_headers, true); + return stream; + } + +protected: + Event::SimulatedTimeSystemHelper time_system_; + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + EnvoyQuicConnectionHelper connection_helper_; + EnvoyQuicAlarmFactory alarm_factory_; + quic::ParsedQuicVersionVector quic_version_; + testing::NiceMock writer_; + Network::Address::InstanceConstSharedPtr peer_addr_; + Network::Address::InstanceConstSharedPtr self_addr_; + TestEnvoyQuicClientConnection* quic_connection_; + quic::QuicConfig quic_config_; + quic::QuicCryptoClientConfig crypto_config_; + TestEnvoyQuicClientSession envoy_quic_session_; + Network::MockConnectionCallbacks network_connection_callbacks_; + Http::MockServerConnectionCallbacks http_connection_callbacks_; + testing::StrictMock read_total_; + testing::StrictMock read_current_; + testing::StrictMock write_total_; + testing::StrictMock write_current_; + QuicHttpClientConnectionImpl http_connection_; +}; + +INSTANTIATE_TEST_SUITE_P(EnvoyQuicClientSessionTests, EnvoyQuicClientSessionTest, + testing::ValuesIn({true, false})); + +TEST_P(EnvoyQuicClientSessionTest, NewStream) { + Http::MockStreamDecoder response_decoder; + Http::MockStreamCallbacks stream_callbacks; + EnvoyQuicClientStream& stream = sendGetRequest(response_decoder, stream_callbacks); + + quic::QuicHeaderList headers; + headers.OnHeaderBlockStart(); + headers.OnHeader(":status", "200"); + headers.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, /*compressed_header_bytes=*/0); + // Response headers should be propagated to decoder. + EXPECT_CALL(response_decoder, decodeHeaders_(_, /*end_stream=*/true)) + .WillOnce(Invoke([](const Http::HeaderMapPtr& decoded_headers, bool) { + EXPECT_EQ("200", decoded_headers->Status()->value().getStringView()); + })); + stream.OnStreamHeaderList(/*fin=*/true, headers.uncompressed_header_bytes(), headers); +} + +TEST_P(EnvoyQuicClientSessionTest, OnResetFrame) { + Http::MockStreamDecoder response_decoder; + Http::MockStreamCallbacks stream_callbacks; + EnvoyQuicClientStream& stream = sendGetRequest(response_decoder, stream_callbacks); + + // G-QUIC or IETF bi-directional stream. + quic::QuicStreamId stream_id = stream.id(); + quic::QuicRstStreamFrame rst1(/*control_frame_id=*/1u, stream_id, + quic::QUIC_ERROR_PROCESSING_STREAM, /*bytes_written=*/0u); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::RemoteReset, _)); + stream.OnStreamReset(rst1); +} + +TEST_P(EnvoyQuicClientSessionTest, ConnectionClose) { + std::string error_details("dummy details"); + quic::QuicErrorCode error(quic::QUIC_INVALID_FRAME_DATA); + quic::QuicConnectionCloseFrame frame(quic_version_[0].transport_version, error, error_details, + /* transport_close_frame_type = */ 0); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::RemoteClose)); + quic_connection_->OnConnectionCloseFrame(frame); + EXPECT_EQ(absl::StrCat(quic::QuicErrorCodeToString(error), " with details: ", error_details), + envoy_quic_session_.transportFailureReason()); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); +} + +TEST_P(EnvoyQuicClientSessionTest, ConnectionCloseWithActiveStream) { + Http::MockStreamDecoder response_decoder; + Http::MockStreamCallbacks stream_callbacks; + EnvoyQuicClientStream& stream = sendGetRequest(response_decoder, stream_callbacks); + EXPECT_CALL(*quic_connection_, + SendConnectionClosePacket(quic::QUIC_NO_ERROR, "Closed by application")); + EXPECT_CALL(network_connection_callbacks_, onEvent(Network::ConnectionEvent::LocalClose)); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::ConnectionTermination, _)); + envoy_quic_session_.close(Network::ConnectionCloseType::NoFlush); + EXPECT_EQ(Network::Connection::State::Closed, envoy_quic_session_.state()); + EXPECT_TRUE(stream.write_side_closed() && stream.reading_stopped()); +} + +} // namespace Quic +} // namespace Envoy diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_client_stream_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_client_stream_test.cc new file mode 100644 index 0000000000000..bb56c21926d05 --- /dev/null +++ b/test/extensions/quic_listeners/quiche/envoy_quic_client_stream_test.cc @@ -0,0 +1,269 @@ +#include "extensions/quic_listeners/quiche/envoy_quic_alarm_factory.h" +#include "extensions/quic_listeners/quiche/envoy_quic_client_connection.h" +#include "extensions/quic_listeners/quiche/envoy_quic_client_stream.h" +#include "extensions/quic_listeners/quiche/envoy_quic_connection_helper.h" +#include "extensions/quic_listeners/quiche/envoy_quic_utils.h" + +#include "test/extensions/quic_listeners/quiche/test_utils.h" +#include "test/mocks/http/mocks.h" +#include "test/mocks/http/stream_decoder.h" +#include "test/mocks/network/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Quic { + +using testing::_; +using testing::Invoke; +using testing::Return; + +class EnvoyQuicClientStreamTest : public testing::TestWithParam { +public: + EnvoyQuicClientStreamTest() + : api_(Api::createApiForTest()), dispatcher_(api_->allocateDispatcher()), + connection_helper_(*dispatcher_), + alarm_factory_(*dispatcher_, *connection_helper_.GetClock()), quic_version_([]() { + SetQuicReloadableFlag(quic_enable_version_99, GetParam()); + return quic::CurrentSupportedVersions()[0]; + }()), + peer_addr_(Network::Utility::getAddressWithPort(*Network::Utility::getIpv6LoopbackAddress(), + 12345)), + self_addr_(Network::Utility::getAddressWithPort(*Network::Utility::getIpv6LoopbackAddress(), + 54321)), + quic_connection_(new EnvoyQuicClientConnection( + quic::test::TestConnectionId(), connection_helper_, alarm_factory_, &writer_, + /*owns_writer=*/false, {quic_version_}, *dispatcher_, + createConnectionSocket(peer_addr_, self_addr_, nullptr))), + quic_session_(quic_config_, {quic_version_}, quic_connection_, *dispatcher_, + quic_config_.GetInitialStreamFlowControlWindowToSend() * 2), + stream_id_(quic_version_.transport_version == quic::QUIC_VERSION_99 ? 4u : 5u), + quic_stream_(new EnvoyQuicClientStream(stream_id_, &quic_session_, quic::BIDIRECTIONAL)), + request_headers_{{":authority", host_}, {":method", "POST"}, {":path", "/"}} { + quic_stream_->setDecoder(stream_decoder_); + quic_stream_->addCallbacks(stream_callbacks_); + quic_session_.ActivateStream(std::unique_ptr(quic_stream_)); + EXPECT_CALL(quic_session_, WritevData(_, _, _, _, _)) + .WillRepeatedly(Invoke([](quic::QuicStream*, quic::QuicStreamId, size_t write_length, + quic::QuicStreamOffset, quic::StreamSendingState state) { + return quic::QuicConsumedData{write_length, state != quic::NO_FIN}; + })); + EXPECT_CALL(writer_, WritePacket(_, _, _, _, _)) + .WillRepeatedly(Invoke([](const char*, size_t buf_len, const quic::QuicIpAddress&, + const quic::QuicSocketAddress&, quic::PerPacketOptions*) { + return quic::WriteResult{quic::WRITE_STATUS_OK, static_cast(buf_len)}; + })); + } + + void SetUp() override { + quic_session_.Initialize(); + quic_connection_->setUpConnectionSocket(); + response_headers_.OnHeaderBlockStart(); + response_headers_.OnHeader(":status", "200"); + response_headers_.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, + /*compressed_header_bytes=*/0); + + trailers_.OnHeaderBlockStart(); + trailers_.OnHeader("key1", "value1"); + if (quic_version_.transport_version != quic::QUIC_VERSION_99) { + // ":final-offset" is required and stripped off by quic. + trailers_.OnHeader(":final-offset", absl::StrCat("", response_body_.length())); + } + trailers_.OnHeaderBlockEnd(/*uncompressed_header_bytes=*/0, /*compressed_header_bytes=*/0); + } + + void TearDown() override { + if (quic_connection_->connected()) { + quic_connection_->CloseConnection( + quic::QUIC_NO_ERROR, "Closed by application", + quic::ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + } + } + +protected: + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + EnvoyQuicConnectionHelper connection_helper_; + EnvoyQuicAlarmFactory alarm_factory_; + testing::NiceMock writer_; + quic::ParsedQuicVersion quic_version_; + quic::QuicConfig quic_config_; + Network::Address::InstanceConstSharedPtr peer_addr_; + Network::Address::InstanceConstSharedPtr self_addr_; + EnvoyQuicClientConnection* quic_connection_; + MockEnvoyQuicClientSession quic_session_; + quic::QuicStreamId stream_id_; + EnvoyQuicClientStream* quic_stream_; + Http::MockStreamDecoder stream_decoder_; + Http::MockStreamCallbacks stream_callbacks_; + std::string host_{"www.abc.com"}; + Http::TestHeaderMapImpl request_headers_; + quic::QuicHeaderList response_headers_; + quic::QuicHeaderList trailers_; + Buffer::OwnedImpl request_body_{"Hello world"}; + std::string response_body_{"OK\n"}; +}; + +INSTANTIATE_TEST_SUITE_P(EnvoyQuicClientStreamTests, EnvoyQuicClientStreamTest, + testing::ValuesIn({true, false})); + +TEST_P(EnvoyQuicClientStreamTest, PostRequestAndResponse) { + quic_stream_->encodeHeaders(request_headers_, false); + quic_stream_->encodeData(request_body_, true); + + EXPECT_CALL(stream_decoder_, decodeHeaders_(_, /*end_stream=*/false)) + .WillOnce(Invoke([](const Http::HeaderMapPtr& headers, bool) { + EXPECT_EQ("200", headers->Status()->value().getStringView()); + })); + if (quic_version_.transport_version == quic::QUIC_VERSION_99) { + quic_stream_->OnHeadersDecoded(response_headers_); + } else { + quic_stream_->OnStreamHeaderList(/*fin=*/false, response_headers_.uncompressed_header_bytes(), + response_headers_); + } + EXPECT_TRUE(quic_stream_->FinishedReadingHeaders()); + + EXPECT_CALL(stream_decoder_, decodeData(_, _)) + .Times(testing::AtMost(2)) + .WillOnce(Invoke([&](Buffer::Instance& buffer, bool finished_reading) { + EXPECT_EQ(response_body_, buffer.toString()); + EXPECT_FALSE(finished_reading); + })) + // Depends on QUIC version, there may be an empty STREAM_FRAME with FIN. But + // since there is trailers, finished_reading should always be false. + .WillOnce(Invoke([](Buffer::Instance& buffer, bool finished_reading) { + EXPECT_FALSE(finished_reading); + EXPECT_EQ(0, buffer.length()); + })); + std::string data = response_body_; + if (quic_version_.transport_version == quic::QUIC_VERSION_99) { + std::unique_ptr data_buffer; + quic::QuicByteCount data_frame_header_length = + quic::HttpEncoder::SerializeDataFrameHeader(response_body_.length(), &data_buffer); + quic::QuicStringPiece data_frame_header(data_buffer.get(), data_frame_header_length); + data = absl::StrCat(data_frame_header, response_body_); + } + quic::QuicStreamFrame frame(stream_id_, false, 0, data); + quic_stream_->OnStreamFrame(frame); + + EXPECT_CALL(stream_decoder_, decodeTrailers_(_)) + .WillOnce(Invoke([](const Http::HeaderMapPtr& headers) { + Http::LowerCaseString key1("key1"); + Http::LowerCaseString key2(":final-offset"); + EXPECT_EQ("value1", headers->get(key1)->value().getStringView()); + EXPECT_EQ(nullptr, headers->get(key2)); + })); + quic_stream_->OnStreamHeaderList(/*fin=*/true, trailers_.uncompressed_header_bytes(), trailers_); +} + +TEST_P(EnvoyQuicClientStreamTest, OutOfOrderTrailers) { + if (quic::VersionUsesHttp3(quic_version_.transport_version)) { + EXPECT_CALL(stream_callbacks_, onResetStream(_, _)); + return; + } + quic_stream_->encodeHeaders(request_headers_, true); + EXPECT_CALL(stream_decoder_, decodeHeaders_(_, /*end_stream=*/false)) + .WillOnce(Invoke([](const Http::HeaderMapPtr& headers, bool) { + EXPECT_EQ("200", headers->Status()->value().getStringView()); + })); + if (quic_version_.transport_version == quic::QUIC_VERSION_99) { + quic_stream_->OnHeadersDecoded(response_headers_); + } else { + quic_stream_->OnStreamHeaderList(/*fin=*/false, response_headers_.uncompressed_header_bytes(), + response_headers_); + } + EXPECT_TRUE(quic_stream_->FinishedReadingHeaders()); + + // Trailer should be delivered to HCM later after body arrives. + quic_stream_->OnStreamHeaderList(/*fin=*/true, trailers_.uncompressed_header_bytes(), trailers_); + + std::string data = response_body_; + if (quic_version_.transport_version == quic::QUIC_VERSION_99) { + std::unique_ptr data_buffer; + quic::QuicByteCount data_frame_header_length = + quic::HttpEncoder::SerializeDataFrameHeader(response_body_.length(), &data_buffer); + quic::QuicStringPiece data_frame_header(data_buffer.get(), data_frame_header_length); + data = absl::StrCat(data_frame_header, response_body_); + } + quic::QuicStreamFrame frame(stream_id_, false, 0, data); + EXPECT_CALL(stream_decoder_, decodeData(_, _)) + .Times(testing::AtMost(2)) + .WillOnce(Invoke([this](Buffer::Instance& buffer, bool finished_reading) { + EXPECT_EQ(response_body_, buffer.toString()); + EXPECT_FALSE(finished_reading); + })) + // Depends on QUIC version, there may be an empty STREAM_FRAME with FIN. But + // since there is trailers, finished_reading should always be false. + .WillOnce(Invoke([](Buffer::Instance& buffer, bool finished_reading) { + EXPECT_FALSE(finished_reading); + EXPECT_EQ(0, buffer.length()); + })); + + EXPECT_CALL(stream_decoder_, decodeTrailers_(_)) + .WillOnce(Invoke([](const Http::HeaderMapPtr& headers) { + Http::LowerCaseString key1("key1"); + Http::LowerCaseString key2(":final-offset"); + EXPECT_EQ("value1", headers->get(key1)->value().getStringView()); + EXPECT_EQ(nullptr, headers->get(key2)); + })); + quic_stream_->OnStreamFrame(frame); +} + +TEST_P(EnvoyQuicClientStreamTest, WatermarkSendBuffer) { + // Bump connection flow control window large enough not to cause connection + // level flow control blocked. + quic::QuicWindowUpdateFrame window_update( + quic::kInvalidControlFrameId, + quic::QuicUtils::GetInvalidStreamId(quic_version_.transport_version), 1024 * 1024); + quic_session_.OnWindowUpdateFrame(window_update); + + request_headers_.addCopy(":content-length", "32770"); // 32KB + 2 byte + quic_stream_->encodeHeaders(request_headers_, /*end_stream=*/false); + // Encode 32kB request body. first 16KB should be written out right away. The + // rest should be buffered. The high watermark is 16KB, so this call should + // make the send buffer reach its high watermark. + std::string request(32 * 1024 + 1, 'a'); + Buffer::OwnedImpl buffer(request); + EXPECT_CALL(stream_callbacks_, onAboveWriteBufferHighWatermark()); + quic_stream_->encodeData(buffer, false); + + EXPECT_EQ(0u, buffer.length()); + EXPECT_TRUE(quic_stream_->flow_controller()->IsBlocked()); + + // Receive a WINDOW_UPDATE frame not large enough to drain half of the send + // buffer. + quic::QuicWindowUpdateFrame window_update1(quic::kInvalidControlFrameId, quic_stream_->id(), + 16 * 1024 + 8 * 1024); + quic_stream_->OnWindowUpdateFrame(window_update1); + EXPECT_FALSE(quic_stream_->flow_controller()->IsBlocked()); + quic_session_.OnCanWrite(); + EXPECT_TRUE(quic_stream_->flow_controller()->IsBlocked()); + + // Receive another WINDOW_UPDATE frame to drain the send buffer till below low + // watermark. + quic::QuicWindowUpdateFrame window_update2(quic::kInvalidControlFrameId, quic_stream_->id(), + 16 * 1024 + 8 * 1024 + 1024); + quic_stream_->OnWindowUpdateFrame(window_update2); + EXPECT_FALSE(quic_stream_->flow_controller()->IsBlocked()); + EXPECT_CALL(stream_callbacks_, onBelowWriteBufferLowWatermark()).WillOnce(Invoke([this]() { + std::string rest_request(1, 'a'); + Buffer::OwnedImpl buffer(rest_request); + quic_stream_->encodeData(buffer, true); + })); + quic_session_.OnCanWrite(); + EXPECT_TRUE(quic_stream_->flow_controller()->IsBlocked()); + + quic::QuicWindowUpdateFrame window_update3(quic::kInvalidControlFrameId, quic_stream_->id(), + 32 * 1024 + 1024); + quic_stream_->OnWindowUpdateFrame(window_update3); + quic_session_.OnCanWrite(); + + EXPECT_TRUE(quic_stream_->local_end_stream_); + EXPECT_TRUE(quic_stream_->write_side_closed()); + EXPECT_CALL(stream_callbacks_, onResetStream(_, _)); +} + +} // namespace Quic +} // namespace Envoy diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_proof_source_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_proof_source_test.cc index 4737a532f558b..57bdf94e9e1f0 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_proof_source_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_proof_source_test.cc @@ -62,8 +62,8 @@ TEST_F(EnvoyQuicFakeProofSourceTest, TestGetProof) { TEST_F(EnvoyQuicFakeProofSourceTest, TestVerifyProof) { EXPECT_EQ(quic::QUIC_SUCCESS, proof_verifier_.VerifyProof(hostname_, /*port=*/0, server_config_, version_, chlo_hash_, - expected_certs_, "Fake timestamp", expected_signature_, - nullptr, nullptr, nullptr, nullptr)); + expected_certs_, "", expected_signature_, nullptr, nullptr, + nullptr, nullptr)); std::vector wrong_certs{"wrong cert"}; EXPECT_EQ(quic::QUIC_FAILURE, proof_verifier_.VerifyProof(hostname_, /*port=*/0, server_config_, version_, chlo_hash_, diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc index 87c6e320091c1..aa9d2e3ff6fd1 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_server_session_test.cc @@ -47,8 +47,6 @@ using testing::Invoke; using testing::Return; using testing::ReturnRef; -#include - namespace Envoy { namespace Quic { @@ -131,7 +129,7 @@ class EnvoyQuicServerSessionTest : public testing::TestWithParam { bool installReadFilter() { // Setup read filter. envoy_quic_session_.addReadFilter(read_filter_); - EXPECT_EQ(Http::Protocol::Http2, + EXPECT_EQ(Http::Protocol::Http3, read_filter_->callbacks_->connection().streamInfo().protocol().value()); EXPECT_EQ(envoy_quic_session_.id(), read_filter_->callbacks_->connection().id()); EXPECT_EQ(&envoy_quic_session_, &read_filter_->callbacks_->connection()); @@ -143,7 +141,7 @@ class EnvoyQuicServerSessionTest : public testing::TestWithParam { // Create ServerConnection instance and setup callbacks for it. http_connection_ = std::make_unique(envoy_quic_session_, http_connection_callbacks_); - EXPECT_EQ(Http::Protocol::Http2, http_connection_->protocol()); + EXPECT_EQ(Http::Protocol::Http3, http_connection_->protocol()); // Stop iteration to avoid calling getRead/WriteBuffer(). return Network::FilterStatus::StopIteration; })); diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_server_stream_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_server_stream_test.cc index 54304dd722db4..f21b4558e9eda 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_server_stream_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_server_stream_test.cc @@ -52,8 +52,8 @@ class EnvoyQuicServerStreamTest : public testing::TestWithParam { quic_session_.ActivateStream(std::unique_ptr(quic_stream_)); EXPECT_CALL(quic_session_, WritevData(_, _, _, _, _)) .WillRepeatedly(Invoke([](quic::QuicStream*, quic::QuicStreamId, size_t write_length, - quic::QuicStreamOffset, quic::StreamSendingState) { - return quic::QuicConsumedData{write_length, true}; + quic::QuicStreamOffset, quic::StreamSendingState state) { + return quic::QuicConsumedData{write_length, state != quic::NO_FIN}; })); EXPECT_CALL(writer_, WritePacket(_, _, _, _, _)) .WillRepeatedly(Invoke([](const char*, size_t buf_len, const quic::QuicIpAddress&, @@ -247,6 +247,7 @@ TEST_P(EnvoyQuicServerStreamTest, ReadDisableUponLargePost) { // Re-enable reading just once shouldn't unblock stream. quic_stream_->readDisable(false); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); // This data frame should also be buffered. std::string last_part_request = bodyToStreamPayload("ccc"); @@ -264,6 +265,8 @@ TEST_P(EnvoyQuicServerStreamTest, ReadDisableUponLargePost) { EXPECT_TRUE(finished_reading); })); quic_stream_->readDisable(false); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_CALL(stream_callbacks_, onResetStream(_, _)); } @@ -314,10 +317,18 @@ TEST_P(EnvoyQuicServerStreamTest, ReadDisableAndReEnableImmediately) { TEST_P(EnvoyQuicServerStreamTest, WatermarkSendBuffer) { sendRequest(request_body_, true, request_body_.size() * 2); + // Bump connection flow control window large enough not to cause connection + // level flow control blocked. + quic::QuicWindowUpdateFrame window_update( + quic::kInvalidControlFrameId, + quic::QuicUtils::GetInvalidStreamId(quic_version_.transport_version), 1024 * 1024); + quic_session_.OnWindowUpdateFrame(window_update); + // 32KB + 2 byte. The initial stream flow control window is 16k. response_headers_.addCopy(":content-length", "32770"); quic_stream_->encodeHeaders(response_headers_, /*end_stream=*/false); - // encode 32kB response body. first 16KB should be written out right away. The + + // Encode 32kB response body. first 16KB should be written out right away. The // rest should be buffered. The high watermark is 16KB, so this call should // make the send buffer reach its high watermark. std::string response(32 * 1024 + 1, 'a'); @@ -327,12 +338,6 @@ TEST_P(EnvoyQuicServerStreamTest, WatermarkSendBuffer) { EXPECT_EQ(0u, buffer.length()); EXPECT_TRUE(quic_stream_->flow_controller()->IsBlocked()); - // Bump connection flow control window large enough not to cause connection - // level flow control blocked. - quic::QuicWindowUpdateFrame window_update( - quic::kInvalidControlFrameId, - quic::QuicUtils::GetInvalidStreamId(quic_version_.transport_version), 1024 * 1024); - quic_session_.OnWindowUpdateFrame(window_update); // Receive a WINDOW_UPDATE frame not large enough to drain half of the send // buffer. diff --git a/test/extensions/quic_listeners/quiche/integration/BUILD b/test/extensions/quic_listeners/quiche/integration/BUILD new file mode 100644 index 0000000000000..2f2a48b28ab88 --- /dev/null +++ b/test/extensions/quic_listeners/quiche/integration/BUILD @@ -0,0 +1,26 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_package", +) + +envoy_package() + +envoy_cc_test( + name = "quic_http_integration_test", + srcs = ["quic_http_integration_test.cc"], + data = ["//test/config/integration/certs"], + tags = ["nofips"], + deps = [ + "//source/extensions/quic_listeners/quiche:active_quic_listener_config_lib", + "//source/extensions/quic_listeners/quiche:codec_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_client_connection_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_client_session_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_connection_helper_lib", + "//source/extensions/quic_listeners/quiche:envoy_quic_proof_verifier_lib", + "//source/extensions/quic_listeners/quiche:quic_transport_socket_factory_lib", + "//test/integration:http_integration_lib", + ], +) diff --git a/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc b/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc new file mode 100644 index 0000000000000..d65792479b980 --- /dev/null +++ b/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc @@ -0,0 +1,182 @@ +#include "test/config/utility.h" +#include "test/integration/http_integration.h" +#include "test/test_common/utility.h" + +#pragma GCC diagnostic push +// QUICHE allows unused parameters. +#pragma GCC diagnostic ignored "-Wunused-parameter" +// QUICHE uses offsetof(). +#pragma GCC diagnostic ignored "-Winvalid-offsetof" + +#include "quiche/quic/core/http/quic_client_push_promise_index.h" +#include "quiche/quic/core/quic_utils.h" + +#pragma GCC diagnostic pop + +#include "extensions/quic_listeners/quiche/envoy_quic_client_session.h" +#include "extensions/quic_listeners/quiche/envoy_quic_client_connection.h" +#include "extensions/quic_listeners/quiche/envoy_quic_fake_proof_verifier.h" +#include "extensions/quic_listeners/quiche/envoy_quic_connection_helper.h" +#include "extensions/quic_listeners/quiche/envoy_quic_alarm_factory.h" +#include "extensions/quic_listeners/quiche/envoy_quic_packet_writer.h" + +namespace Envoy { +namespace Quic { + +class CodecClientCallbacksForTest : public Http::CodecClientCallbacks { +public: + void onStreamDestroy() override {} + + void onStreamReset(Http::StreamResetReason reason) override { + last_stream_reset_reason_ = reason; + } + + Http::StreamResetReason last_stream_reset_reason_{Http::StreamResetReason::LocalReset}; +}; + +class QuicHttpIntegrationTest : public testing::TestWithParam, + public HttpIntegrationTest { +public: + QuicHttpIntegrationTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP3, GetParam(), + ConfigHelper::QUIC_HTTP_PROXY_CONFIG), + supported_versions_(quic::CurrentSupportedVersions()), + crypto_config_(std::make_unique()), conn_helper_(*dispatcher_), + alarm_factory_(*dispatcher_, *conn_helper_.GetClock()) {} + + Network::ClientConnectionPtr makeClientConnection(uint32_t port) override { + Network::Address::InstanceConstSharedPtr server_addr = Network::Utility::resolveUrl( + fmt::format("udp://{}:{}", Network::Test::getLoopbackAddressUrlString(version_), port)); + Network::Address::InstanceConstSharedPtr local_addr = + Network::Test::getCanonicalLoopbackAddress(version_); + // Initiate a QUIC connection with the highest supported version. If not + // supported by server, this connection will fail. + // TODO(danzh) Implement retry upon version mismatch and modify test frame work to specify a + // different version set on server side to test that. + auto connection = std::make_unique( + getNextServerDesignatedConnectionId(), server_addr, conn_helper_, alarm_factory_, + quic::ParsedQuicVersionVector{supported_versions_[0]}, local_addr, *dispatcher_, nullptr); + auto session = std::make_unique( + quic_config_, supported_versions_, std::move(connection), server_id_, &crypto_config_, + &push_promise_index_, *dispatcher_, 0); + session->Initialize(); + return session; + } + + // This call may fail because of INVALID_VERSION, because QUIC connection doesn't support + // in-connection version negotiation. + // TODO(#8479) Propagate INVALID_VERSION error to caller and let caller to use server advertised + // version list to create a new connection with mutually supported version and make client codec + // again. + IntegrationCodecClientPtr makeRawHttpConnection(Network::ClientConnectionPtr&& conn) override { + IntegrationCodecClientPtr codec = HttpIntegrationTest::makeRawHttpConnection(std::move(conn)); + if (codec->disconnected()) { + // Connection may get closed during version negotiation or handshake. + ENVOY_LOG(error, "Fail to connect to server with error: {}", + codec->connection()->transportFailureReason()); + } else { + codec->setCodecClientCallbacks(client_codec_callback_); + } + return codec; + } + + quic::QuicConnectionId getNextServerDesignatedConnectionId() { + quic::QuicCryptoClientConfig::CachedState* cached = crypto_config_.LookupOrCreate(server_id_); + // If the cached state indicates that we should use a server-designated + // connection ID, then return that connection ID. + quic::QuicConnectionId conn_id = cached->has_server_designated_connection_id() + ? cached->GetNextServerDesignatedConnectionId() + : quic::EmptyQuicConnectionId(); + return conn_id.IsEmpty() ? quic::QuicUtils::CreateRandomConnectionId() : conn_id; + } + + void initialize() override { + config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) { + envoy::api::v2::auth::DownstreamTlsContext tls_context; + ConfigHelper::initializeTls({}, *tls_context.mutable_common_tls_context()); + auto* filter_chain = + bootstrap.mutable_static_resources()->mutable_listeners(0)->mutable_filter_chains(0); + auto* transport_socket = filter_chain->mutable_transport_socket(); + TestUtility::jsonConvert(tls_context, *transport_socket->mutable_config()); + }); + config_helper_.addConfigModifier( + [](envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& + hcm) { + hcm.mutable_delayed_close_timeout()->set_nanos(0); + EXPECT_EQ(hcm.codec_type(), envoy::config::filter::network::http_connection_manager::v2:: + HttpConnectionManager::HTTP3); + }); + + HttpIntegrationTest::initialize(); + registerTestServerPorts({"http"}); + } + +protected: + quic::QuicConfig quic_config_; + quic::QuicServerId server_id_{"example.com", 443, false}; + quic::QuicClientPushPromiseIndex push_promise_index_; + quic::ParsedQuicVersionVector supported_versions_; + quic::QuicCryptoClientConfig crypto_config_; + EnvoyQuicConnectionHelper conn_helper_; + EnvoyQuicAlarmFactory alarm_factory_; + CodecClientCallbacksForTest client_codec_callback_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, QuicHttpIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +TEST_P(QuicHttpIntegrationTest, GetRequestAndEmptyResponse) { + testRouterHeaderOnlyRequestAndResponse(); +} + +TEST_P(QuicHttpIntegrationTest, GetRequestAndResponseWithBody) { + initialize(); + sendRequestAndVerifyResponse(default_request_headers_, /*request_size=*/0, + default_response_headers_, /*response_size=*/1024, + /*backend_index*/ 0); +} + +TEST_P(QuicHttpIntegrationTest, PostRequestAndResponseWithBody) { + testRouterRequestAndResponseWithBody(1024, 512, false); +} + +TEST_P(QuicHttpIntegrationTest, PostRequestWithBigHeadersAndResponseWithBody) { + testRouterRequestAndResponseWithBody(1024, 512, true); +} + +TEST_P(QuicHttpIntegrationTest, RouterUpstreamDisconnectBeforeRequestcomplete) { + testRouterUpstreamDisconnectBeforeRequestComplete(); +} + +TEST_P(QuicHttpIntegrationTest, RouterUpstreamDisconnectBeforeResponseComplete) { + testRouterUpstreamDisconnectBeforeResponseComplete(); + EXPECT_EQ(Http::StreamResetReason::RemoteReset, client_codec_callback_.last_stream_reset_reason_); +} + +TEST_P(QuicHttpIntegrationTest, RouterDownstreamDisconnectBeforeRequestComplete) { + testRouterDownstreamDisconnectBeforeRequestComplete(); +} + +TEST_P(QuicHttpIntegrationTest, RouterDownstreamDisconnectBeforeResponseComplete) { + testRouterDownstreamDisconnectBeforeResponseComplete(); +} + +TEST_P(QuicHttpIntegrationTest, RouterUpstreamResponseBeforeRequestComplete) { + testRouterUpstreamResponseBeforeRequestComplete(); +} + +TEST_P(QuicHttpIntegrationTest, Retry) { testRetry(); } + +TEST_P(QuicHttpIntegrationTest, UpstreamReadDisabledOnGiantResponseBody) { + config_helper_.setBufferLimits(/*upstream_buffer_limit=*/1024, /*downstream_buffer_limit=*/1024); + testRouterRequestAndResponseWithBody(/*request_size=*/512, /*response_size=*/1024 * 1024, false); +} + +TEST_P(QuicHttpIntegrationTest, DownstreamReadDisabledOnGiantPost) { + config_helper_.setBufferLimits(/*upstream_buffer_limit=*/1024, /*downstream_buffer_limit=*/1024); + testRouterRequestAndResponseWithBody(/*request_size=*/1024 * 1024, /*response_size=*/1024, false); +} + +} // namespace Quic +} // namespace Envoy diff --git a/test/extensions/quic_listeners/quiche/test_utils.h b/test/extensions/quic_listeners/quiche/test_utils.h index 6efb3678a5dcf..690191eda8c13 100644 --- a/test/extensions/quic_listeners/quiche/test_utils.h +++ b/test/extensions/quic_listeners/quiche/test_utils.h @@ -7,8 +7,10 @@ #pragma GCC diagnostic ignored "-Winvalid-offsetof" #include "quiche/quic/core/http/quic_spdy_session.h" +#include "quiche/quic/core/http/quic_spdy_client_session.h" #include "quiche/quic/test_tools/quic_test_utils.h" #include "quiche/quic/core/quic_utils.h" +#include "quiche/quic/test_tools/crypto_test_utils.h" #pragma GCC diagnostic pop @@ -22,7 +24,7 @@ class MockEnvoyQuicSession : public quic::QuicSpdySession, public QuicFilterMana EnvoyQuicConnection* connection, Event::Dispatcher& dispatcher, uint32_t send_buffer_limit) : quic::QuicSpdySession(connection, /*visitor=*/nullptr, config, supported_versions), - QuicFilterManagerConnectionImpl(connection, dispatcher, send_buffer_limit) { + QuicFilterManagerConnectionImpl(*connection, dispatcher, send_buffer_limit) { crypto_stream_ = std::make_unique(this); } @@ -53,5 +55,41 @@ class MockEnvoyQuicSession : public quic::QuicSpdySession, public QuicFilterMana std::unique_ptr crypto_stream_; }; +class MockEnvoyQuicClientSession : public quic::QuicSpdyClientSession, + public QuicFilterManagerConnectionImpl { +public: + MockEnvoyQuicClientSession(const quic::QuicConfig& config, + const quic::ParsedQuicVersionVector& supported_versions, + EnvoyQuicConnection* connection, Event::Dispatcher& dispatcher, + uint32_t send_buffer_limit) + : quic::QuicSpdyClientSession(config, supported_versions, connection, + quic::QuicServerId("example.com", 443, false), &crypto_config_, + nullptr), + QuicFilterManagerConnectionImpl(*connection, dispatcher, send_buffer_limit), + crypto_config_(quic::test::crypto_test_utils::ProofVerifierForTesting()) {} + + // From QuicSession. + MOCK_METHOD1(CreateIncomingStream, quic::QuicSpdyClientStream*(quic::QuicStreamId id)); + MOCK_METHOD1(CreateIncomingStream, quic::QuicSpdyClientStream*(quic::PendingStream* pending)); + MOCK_METHOD0(CreateOutgoingBidirectionalStream, quic::QuicSpdyClientStream*()); + MOCK_METHOD0(CreateOutgoingUnidirectionalStream, quic::QuicSpdyClientStream*()); + MOCK_METHOD1(ShouldCreateIncomingStream, bool(quic::QuicStreamId id)); + MOCK_METHOD0(ShouldCreateOutgoingBidirectionalStream, bool()); + MOCK_METHOD0(ShouldCreateOutgoingUnidirectionalStream, bool()); + MOCK_METHOD5(WritevData, + quic::QuicConsumedData(quic::QuicStream* stream, quic::QuicStreamId id, + size_t write_length, quic::QuicStreamOffset offset, + quic::StreamSendingState state)); + + absl::string_view requestedServerName() const override { + return {GetCryptoStream()->crypto_negotiated_params().sni}; + } + + using quic::QuicSpdySession::ActivateStream; + +private: + quic::QuicCryptoClientConfig crypto_config_; +}; + } // namespace Quic } // namespace Envoy diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index 14698039c2f24..97974a44fce7a 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -46,6 +46,9 @@ typeToCodecType(Http::CodecClient::Type type) { case Http::CodecClient::Type::HTTP2: return envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager:: HTTP2; + case Http::CodecClient::Type::HTTP3: + return envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager:: + HTTP3; default: RELEASE_ASSERT(0, ""); } @@ -180,6 +183,13 @@ void IntegrationCodecClient::ConnectionCallbacks::onEvent(Network::ConnectionEve parent_.disconnected_ = true; parent_.connection_->dispatcher().exit(); } else { + if (parent_.type() == CodecClient::Type::HTTP3 && !parent_.connected_) { + // Before handshake gets established, any connection failure should exit the loop. I.e. a QUIC + // connection may fail of INVALID_VERSION if both this client doesn't support any of the + // versions the server advertised before handshake established. In this case the connection is + // closed locally and this is in a blocking event loop. + parent_.connection_->dispatcher().exit(); + } parent_.disconnected_ = true; } } @@ -203,7 +213,7 @@ HttpIntegrationTest::makeRawHttpConnection(Network::ClientConnectionPtr&& conn) IntegrationCodecClientPtr HttpIntegrationTest::makeHttpConnection(Network::ClientConnectionPtr&& conn) { auto codec = makeRawHttpConnection(std::move(conn)); - EXPECT_TRUE(codec->connected()); + EXPECT_TRUE(codec->connected()) << codec->connection()->transportFailureReason(); return codec; } diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 679c93fbce726..46b2ea2de3264 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -108,7 +108,7 @@ class HttpIntegrationTest : public BaseIntegrationTest { IntegrationCodecClientPtr makeHttpConnection(uint32_t port); // Makes a http connection object without checking its connected state. - IntegrationCodecClientPtr makeRawHttpConnection(Network::ClientConnectionPtr&& conn); + virtual IntegrationCodecClientPtr makeRawHttpConnection(Network::ClientConnectionPtr&& conn); // Makes a http connection object with asserting a connected state. IntegrationCodecClientPtr makeHttpConnection(Network::ClientConnectionPtr&& conn); diff --git a/test/integration/integration.h b/test/integration/integration.h index 66461a88a1c72..2e088e52390e8 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -138,7 +138,7 @@ struct ApiFilesystemConfig { /** * Test fixture for all integration tests. */ -class BaseIntegrationTest : Logger::Loggable { +class BaseIntegrationTest : protected Logger::Loggable { public: using TestTimeSystemPtr = std::unique_ptr; using InstanceConstSharedPtrFn = std::function; @@ -191,7 +191,7 @@ class BaseIntegrationTest : Logger::Loggable { void setUpstreamAddress(uint32_t upstream_index, envoy::api::v2::endpoint::LbEndpoint& endpoint) const; - Network::ClientConnectionPtr makeClientConnection(uint32_t port); + virtual Network::ClientConnectionPtr makeClientConnection(uint32_t port); void registerTestServerPorts(const std::vector& port_names); void createTestServer(const std::string& json_path, const std::vector& port_names); diff --git a/test/integration/xfcc_integration_test.cc b/test/integration/xfcc_integration_test.cc index e8f023302a263..56f4545c6f563 100644 --- a/test/integration/xfcc_integration_test.cc +++ b/test/integration/xfcc_integration_test.cc @@ -92,7 +92,7 @@ Network::TransportSocketFactoryPtr XfccIntegrationTest::createUpstreamSslContext std::move(cfg), *context_manager_, *upstream_stats_store, std::vector{}); } -Network::ClientConnectionPtr XfccIntegrationTest::makeClientConnection() { +Network::ClientConnectionPtr XfccIntegrationTest::makeTcpClientConnection() { Network::Address::InstanceConstSharedPtr address = Network::Utility::resolveUrl("tcp://" + Network::Test::getLoopbackAddressUrlString(version_) + ":" + std::to_string(lookupPort("http"))); @@ -147,7 +147,7 @@ void XfccIntegrationTest::initialize() { void XfccIntegrationTest::testRequestAndResponseWithXfccHeader(std::string previous_xfcc, std::string expected_xfcc) { - Network::ClientConnectionPtr conn = tls_ ? makeMtlsClientConnection() : makeClientConnection(); + Network::ClientConnectionPtr conn = tls_ ? makeMtlsClientConnection() : makeTcpClientConnection(); Http::TestHeaderMapImpl header_map; if (previous_xfcc.empty()) { header_map = Http::TestHeaderMapImpl{{":method", "GET"}, diff --git a/test/integration/xfcc_integration_test.h b/test/integration/xfcc_integration_test.h index 488ff98aad65d..5e1ad2082890d 100644 --- a/test/integration/xfcc_integration_test.h +++ b/test/integration/xfcc_integration_test.h @@ -46,7 +46,7 @@ class XfccIntegrationTest : public testing::TestWithParam