diff --git a/include/envoy/event/dispatcher.h b/include/envoy/event/dispatcher.h index 9fcf55707c99c..0e7865b656d83 100644 --- a/include/envoy/event/dispatcher.h +++ b/include/envoy/event/dispatcher.h @@ -161,7 +161,7 @@ class Dispatcher { * @param cb supplies the udp listener callbacks to invoke for listener events. * @return Network::ListenerPtr a new listener that is owned by the caller. */ - virtual Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr&& socket, + virtual Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket, Network::UdpListenerCallbacks& cb) PURE; /** * Allocates a timer. @see Timer for docs on how to use the timer. diff --git a/include/envoy/network/connection_handler.h b/include/envoy/network/connection_handler.h index 58f672c04641a..ea804eba5789e 100644 --- a/include/envoy/network/connection_handler.h +++ b/include/envoy/network/connection_handler.h @@ -52,6 +52,13 @@ class ConnectionHandler { */ virtual void removeListeners(uint64_t listener_tag) PURE; + /** + * Get the ``UdpListenerCallbacks`` associated with ``listener_tag``. This will be + * absl::nullopt for non-UDP listeners and for ``listener_tag`` values that have already been + * removed. + */ + virtual UdpListenerCallbacksOptRef getUdpListenerCallbacks(uint64_t listener_tag) PURE; + /** * Remove the filter chains and the connections in the listener. All connections owned * by the filter chains will be closed. Once all the connections are destroyed(connections @@ -126,6 +133,22 @@ class ConnectionHandler { }; using ActiveListenerPtr = std::unique_ptr; + + /** + * Used by ConnectionHandler to manage UDP listeners. + */ + class ActiveUdpListener : public virtual ActiveListener, public Network::UdpListenerCallbacks { + public: + ~ActiveUdpListener() override = default; + + /** + * Returns the worker index that ``data`` should be delivered to. The return value must be in + * the range [0, concurrency). + */ + virtual uint32_t destination(const Network::UdpRecvData& data) const PURE; + }; + + using ActiveUdpListenerPtr = std::unique_ptr; }; using ConnectionHandlerPtr = std::unique_ptr; @@ -140,15 +163,16 @@ class ActiveUdpListenerFactory { /** * Creates an ActiveUdpListener object and a corresponding UdpListener * according to given config. + * @param worker_index The index of the worker this listener is being created on. * @param parent is the owner of the created ActiveListener objects. * @param dispatcher is used to create actual UDP listener. * @param config provides information needed to create ActiveUdpListener and * UdpListener objects. * @return the ActiveUdpListener created. */ - virtual ConnectionHandler::ActiveListenerPtr - createActiveUdpListener(ConnectionHandler& parent, Event::Dispatcher& disptacher, - Network::ListenerConfig& config) PURE; + virtual ConnectionHandler::ActiveUdpListenerPtr + createActiveUdpListener(uint32_t worker_index, ConnectionHandler& parent, + Event::Dispatcher& dispatcher, Network::ListenerConfig& config) PURE; /** * @return true if the UDP passing through listener doesn't form stateful connections. @@ -159,4 +183,4 @@ class ActiveUdpListenerFactory { using ActiveUdpListenerFactoryPtr = std::unique_ptr; } // namespace Network -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/include/envoy/network/listener.h b/include/envoy/network/listener.h index da708aa95d35b..f74d6416103ab 100644 --- a/include/envoy/network/listener.h +++ b/include/envoy/network/listener.h @@ -20,6 +20,10 @@ namespace Envoy { namespace Network { class ActiveUdpListenerFactory; +class UdpListenerWorkerRouter; + +using UdpListenerWorkerRouterOptRef = + absl::optional>; /** * ListenSocketFactory is a member of ListenConfig to provide listen socket. @@ -137,11 +141,17 @@ class ListenerConfig { virtual ActiveUdpListenerFactory* udpListenerFactory() PURE; /** - * @return factory pointer if writing on UDP socket, otherwise return - * nullptr. + * @return factory if writing on UDP socket, otherwise return + * nullopt. */ virtual UdpPacketWriterFactoryOptRef udpPacketWriterFactory() PURE; + /** + * @return the ``UdpListenerWorkerRouter`` for this listener. This will + * be non-empty iff this is a UDP listener. + */ + virtual UdpListenerWorkerRouterOptRef udpListenerWorkerRouter() PURE; + /** * @return traffic direction of the listener. */ @@ -246,7 +256,7 @@ class UdpListenerCallbacks { * * @param data UdpRecvData from the underlying socket. */ - virtual void onData(UdpRecvData& data) PURE; + virtual void onData(UdpRecvData&& data) PURE; /** * Called when the underlying socket is ready for read, before onData() is @@ -278,8 +288,26 @@ class UdpListenerCallbacks { * UdpListenerCallback */ virtual UdpPacketWriter& udpPacketWriter() PURE; + + /** + * Returns the index of this worker, in the range of [0, concurrency). + */ + virtual uint32_t workerIndex() const PURE; + + /** + * Called whenever data is received on the underlying udp socket, on + * the destination worker for the datagram according to ``destination()``. + */ + virtual void onDataWorker(Network::UdpRecvData&& data) PURE; + + /** + * Posts ``data`` to be delivered on this worker. + */ + virtual void post(Network::UdpRecvData&& data) PURE; }; +using UdpListenerCallbacksOptRef = absl::optional>; + /** * An abstract socket listener. Free the listener to stop listening on the socket. */ @@ -337,9 +365,44 @@ class UdpListener : public virtual Listener { * @return the error code of the underlying flush api. */ virtual Api::IoCallUint64Result flush() PURE; + + /** + * Make this listener readable at the beginning of the next event loop. + * + * @note: it may become readable during the current loop if feature + * ``envoy.reloadable_features.activate_fds_next_event_loop`` is disabled. + */ + virtual void activateRead() PURE; }; using UdpListenerPtr = std::unique_ptr; +/** + * Handles delivering datagrams to the correct worker. + */ +class UdpListenerWorkerRouter { +public: + virtual ~UdpListenerWorkerRouter() = default; + + /** + * Registers a worker's callbacks for this listener. This worker must accept + * packets until it calls ``unregisterWorker``. + */ + virtual void registerWorkerForListener(UdpListenerCallbacks& listener) PURE; + + /** + * Unregisters a worker's callbacks for this listener. + */ + virtual void unregisterWorkerForListener(UdpListenerCallbacks& listener) PURE; + + /** + * Deliver ``data`` to the correct worker by calling ``onDataWorker()`` + * or ``post()`` on one of the registered workers. + */ + virtual void deliver(uint32_t dest_worker_index, UdpRecvData&& data) PURE; +}; + +using UdpListenerWorkerRouterPtr = std::unique_ptr; + } // namespace Network } // namespace Envoy diff --git a/include/envoy/server/worker.h b/include/envoy/server/worker.h index c2f273044150b..9d6ed578cfbe2 100644 --- a/include/envoy/server/worker.h +++ b/include/envoy/server/worker.h @@ -100,11 +100,12 @@ class WorkerFactory { virtual ~WorkerFactory() = default; /** + * @param index supplies the index of the worker, in the range of [0, concurrency). * @param overload_manager supplies the server's overload manager. * @param worker_name supplies the name of the worker, used for per-worker stats. * @return WorkerPtr a new worker. */ - virtual WorkerPtr createWorker(OverloadManager& overload_manager, + virtual WorkerPtr createWorker(uint32_t index, OverloadManager& overload_manager, const std::string& worker_name) PURE; }; diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index 04b9cb63909c1..3ce0a54fdfb0e 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -144,7 +144,7 @@ Network::ListenerPtr DispatcherImpl::createListener(Network::SocketSharedPtr&& s backlog_size); } -Network::UdpListenerPtr DispatcherImpl::createUdpListener(Network::SocketSharedPtr&& socket, +Network::UdpListenerPtr DispatcherImpl::createUdpListener(Network::SocketSharedPtr socket, Network::UdpListenerCallbacks& cb) { ASSERT(isThreadSafe()); return std::make_unique(*this, std::move(socket), cb, timeSource()); diff --git a/source/common/event/dispatcher_impl.h b/source/common/event/dispatcher_impl.h index c81498c89f790..4b05b355410ca 100644 --- a/source/common/event/dispatcher_impl.h +++ b/source/common/event/dispatcher_impl.h @@ -62,7 +62,7 @@ class DispatcherImpl : Logger::Loggable, Network::ListenerPtr createListener(Network::SocketSharedPtr&& socket, Network::TcpListenerCallbacks& cb, bool bind_to_port, uint32_t backlog_size) override; - Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr&& socket, + Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket, Network::UdpListenerCallbacks& cb) override; TimerPtr createTimer(TimerCb cb) override; Event::SchedulableCallbackPtr createSchedulableCallback(std::function cb) override; diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc index d2cbd0feb7337..e4f647196b2d8 100644 --- a/source/common/network/udp_listener_impl.cc +++ b/source/common/network/udp_listener_impl.cc @@ -93,7 +93,7 @@ void UdpListenerImpl::processPacket(Address::InstanceConstSharedPtr local_addres ASSERT(local_address != nullptr); UdpRecvData recvData{ {std::move(local_address), std::move(peer_address)}, std::move(buffer), receive_time}; - cb_.onData(recvData); + cb_.onData(std::move(recvData)); } void UdpListenerImpl::handleWriteCallback() { @@ -125,5 +125,39 @@ Api::IoCallUint64Result UdpListenerImpl::flush() { return cb_.udpPacketWriter().flush(); } +void UdpListenerImpl::activateRead() { file_event_->activate(Event::FileReadyType::Read); } + +UdpListenerWorkerRouterImpl::UdpListenerWorkerRouterImpl(uint32_t concurrency) + : workers_(concurrency) {} + +void UdpListenerWorkerRouterImpl::registerWorkerForListener(UdpListenerCallbacks& listener) { + absl::WriterMutexLock lock(&mutex_); + + ASSERT(listener.workerIndex() < workers_.size()); + ASSERT(workers_.at(listener.workerIndex()) == nullptr); + workers_.at(listener.workerIndex()) = &listener; +} + +void UdpListenerWorkerRouterImpl::unregisterWorkerForListener(UdpListenerCallbacks& listener) { + absl::WriterMutexLock lock(&mutex_); + + ASSERT(workers_.at(listener.workerIndex()) == &listener); + workers_.at(listener.workerIndex()) = nullptr; +} + +void UdpListenerWorkerRouterImpl::deliver(uint32_t dest_worker_index, UdpRecvData&& data) { + absl::ReaderMutexLock lock(&mutex_); + + ASSERT(dest_worker_index < workers_.size(), + "UdpListenerCallbacks::destination returned out-of-range value"); + auto* worker = workers_[dest_worker_index]; + + // When a listener is being removed, packets could be processed on some workers after the + // listener is removed from other workers, which could result in a nullptr for that worker. + if (worker != nullptr) { + worker->post(std::move(data)); + } +} + } // namespace Network } // namespace Envoy diff --git a/source/common/network/udp_listener_impl.h b/source/common/network/udp_listener_impl.h index 67168fb1c7ee1..e857a0150f25d 100644 --- a/source/common/network/udp_listener_impl.h +++ b/source/common/network/udp_listener_impl.h @@ -36,6 +36,7 @@ class UdpListenerImpl : public BaseListenerImpl, const Address::InstanceConstSharedPtr& localAddress() const override; Api::IoCallUint64Result send(const UdpSendData& data) override; Api::IoCallUint64Result flush() override; + void activateRead() override; void processPacket(Address::InstanceConstSharedPtr local_address, Address::InstanceConstSharedPtr peer_address, Buffer::InstancePtr buffer, @@ -61,5 +62,19 @@ class UdpListenerImpl : public BaseListenerImpl, Event::FileEventPtr file_event_; }; +class UdpListenerWorkerRouterImpl : public UdpListenerWorkerRouter { +public: + UdpListenerWorkerRouterImpl(uint32_t concurrency); + + // UdpListenerWorkerRouter + void registerWorkerForListener(UdpListenerCallbacks& listener) override; + void unregisterWorkerForListener(UdpListenerCallbacks& listener) override; + void deliver(uint32_t dest_worker_index, UdpRecvData&& data) override; + +private: + absl::Mutex mutex_; + std::vector workers_ ABSL_GUARDED_BY(mutex_); +}; + } // namespace Network } // namespace Envoy diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index c7d79484bbf62..3d8b03597149d 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -78,6 +78,7 @@ constexpr const char* runtime_features[] = { "envoy.reloadable_features.http2_skip_encoding_empty_trailers", "envoy.reloadable_features.listener_in_place_filterchain_update", "envoy.reloadable_features.overload_manager_disable_keepalive_drain_http2", + "envoy.reloadable_features.prefer_quic_kernel_bpf_packet_routing", "envoy.reloadable_features.preserve_query_string_in_path_redirects", "envoy.reloadable_features.preserve_upstream_date", "envoy.reloadable_features.stop_faking_paths", diff --git a/source/extensions/quic_listeners/quiche/BUILD b/source/extensions/quic_listeners/quiche/BUILD index 238792f4687d1..e259575024b18 100644 --- a/source/extensions/quic_listeners/quiche/BUILD +++ b/source/extensions/quic_listeners/quiche/BUILD @@ -409,10 +409,7 @@ envoy_cc_library( "//bazel:linux": ["udp_gso_batch_writer.cc"], "//conditions:default": [], }), - hdrs = select({ - "//bazel:linux": ["udp_gso_batch_writer.h"], - "//conditions:default": [], - }), + hdrs = ["udp_gso_batch_writer.h"], external_deps = ["quiche_quic_platform"], tags = ["nofips"], visibility = [ diff --git a/source/extensions/quic_listeners/quiche/active_quic_listener.cc b/source/extensions/quic_listeners/quiche/active_quic_listener.cc index 2d756875ad4fb..f4808adc52b0a 100644 --- a/source/extensions/quic_listeners/quiche/active_quic_listener.cc +++ b/source/extensions/quic_listeners/quiche/active_quic_listener.cc @@ -8,6 +8,7 @@ #include +#include "common/runtime/runtime_features.h" #include "extensions/quic_listeners/quiche/envoy_quic_alarm_factory.h" #include "extensions/quic_listeners/quiche/envoy_quic_connection_helper.h" #include "extensions/quic_listeners/quiche/envoy_quic_dispatcher.h" @@ -18,26 +19,27 @@ namespace Envoy { namespace Quic { -ActiveQuicListener::ActiveQuicListener(Event::Dispatcher& dispatcher, - Network::ConnectionHandler& parent, - Network::ListenerConfig& listener_config, - const quic::QuicConfig& quic_config, - Network::Socket::OptionsSharedPtr options, - const envoy::config::core::v3::RuntimeFeatureFlag& enabled) - : ActiveQuicListener(dispatcher, parent, +ActiveQuicListener::ActiveQuicListener( + uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher, + Network::ConnectionHandler& parent, Network::ListenerConfig& listener_config, + const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options, + bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled) + : ActiveQuicListener(worker_index, concurrency, dispatcher, parent, listener_config.listenSocketFactory().getListenSocket(), listener_config, - quic_config, std::move(options), enabled) {} - -ActiveQuicListener::ActiveQuicListener(Event::Dispatcher& dispatcher, - Network::ConnectionHandler& parent, - Network::SocketSharedPtr listen_socket, - Network::ListenerConfig& listener_config, - const quic::QuicConfig& quic_config, - Network::Socket::OptionsSharedPtr options, - const envoy::config::core::v3::RuntimeFeatureFlag& enabled) - : Server::ConnectionHandlerImpl::ActiveListenerImplBase(parent, &listener_config), + quic_config, std::move(options), kernel_worker_routing, enabled) {} + +ActiveQuicListener::ActiveQuicListener( + uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher, + Network::ConnectionHandler& parent, Network::SocketSharedPtr listen_socket, + Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config, + Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing, + const envoy::config::core::v3::RuntimeFeatureFlag& enabled) + : Server::ActiveUdpListenerBase(worker_index, concurrency, parent, *listen_socket, + dispatcher.createUdpListener(listen_socket, *this), + &listener_config), dispatcher_(dispatcher), version_manager_(quic::CurrentSupportedVersions()), - listen_socket_(*listen_socket), enabled_(enabled, Runtime::LoaderSingleton::get()) { + kernel_worker_routing_(kernel_worker_routing), + enabled_(enabled, Runtime::LoaderSingleton::get()) { if (options != nullptr) { const bool ok = Network::Socket::applyOptions( options, listen_socket_, envoy::config::core::v3::SocketOption::STATE_BOUND); @@ -49,7 +51,7 @@ ActiveQuicListener::ActiveQuicListener(Event::Dispatcher& dispatcher, } listen_socket_.addOptions(options); } - udp_listener_ = dispatcher_.createUdpListener(std::move(listen_socket), *this); + quic::QuicRandom* const random = quic::QuicRandom::GetInstance(); random->RandBytes(random_seed_, sizeof(random_seed_)); crypto_config_ = std::make_unique( @@ -94,7 +96,11 @@ void ActiveQuicListener::onListenerShutdown() { udp_listener_.reset(); } -void ActiveQuicListener::onData(Network::UdpRecvData& data) { +void ActiveQuicListener::onDataWorker(Network::UdpRecvData&& data) { + if (!enabled_.enabled()) { + return; + } + quic::QuicSocketAddress peer_address( envoyIpAddressToQuicSocketAddress(data.addresses_.peer_->ip())); quic::QuicSocketAddress self_address( @@ -112,6 +118,12 @@ void ActiveQuicListener::onData(Network::UdpRecvData& data) { /*packet_headers=*/nullptr, /*headers_length=*/0, /*owns_header_buffer*/ false); quic_dispatcher_->ProcessPacket(self_address, peer_address, packet); + + if (quic_dispatcher_->HasChlosBuffered()) { + // If there are any buffered CHLOs, activate a read event for the next event loop to process + // them. + udp_listener_->activateRead(); + } } void ActiveQuicListener::onReadReady() { @@ -119,7 +131,17 @@ void ActiveQuicListener::onReadReady() { ENVOY_LOG(trace, "Quic listener {}: runtime disabled", config_->name()); return; } + + if (quic_dispatcher_->HasChlosBuffered()) { + event_loops_with_buffered_chlo_for_test_++; + } + quic_dispatcher_->ProcessBufferedChlos(kNumSessionsToCreatePerLoop); + + // If there were more buffered than the limit, schedule again for the next event loop. + if (quic_dispatcher_->HasChlosBuffered()) { + udp_listener_->activateRead(); + } } void ActiveQuicListener::onWriteReady(const Network::Socket& /*socket*/) { @@ -136,6 +158,49 @@ void ActiveQuicListener::shutdownListener() { quic_dispatcher_->StopAcceptingNewConnections(); } +uint32_t ActiveQuicListener::destination(const Network::UdpRecvData& data) const { + if (kernel_worker_routing_) { + // The kernel has already routed the packet correctly. Make it stay on the current worker. + return worker_index_; + } + + // This implementation is not as performant as it could be. It will result in most packets being + // delivered by the kernel to the wrong worker, and then redirected to the correct worker. + // + // This could possibly be improved by keeping a global table of connection IDs, so that a new + // connection will add its connection ID to the table on the current worker, and so packets should + // be delivered to the correct worker by the kernel unless the client changes address. + + // This is a re-implementation of the same algorithm written in BPF in + // ``ActiveQuicListenerFactory::createActiveUdpListener`` + const uint64_t packet_length = data.buffer_->length(); + if (packet_length < 9) { + return worker_index_; + } + + uint8_t first_octet; + data.buffer_->copyOut(0, sizeof(first_octet), &first_octet); + + uint32_t connection_id_snippet; + if (first_octet & 0x80) { + // IETF QUIC long header. + // The connection id starts from 7th byte. + // Minimum length of a long header packet is 14. + if (packet_length < 14) { + return worker_index_; + } + + data.buffer_->copyOut(6, sizeof(connection_id_snippet), &connection_id_snippet); + } else { + // IETF QUIC short header, or gQUIC. + // The connection id starts from 2nd byte. + data.buffer_->copyOut(1, sizeof(connection_id_snippet), &connection_id_snippet); + } + + connection_id_snippet = htonl(connection_id_snippet); + return connection_id_snippet % concurrency_; +} + ActiveQuicListenerFactory::ActiveQuicListenerFactory( const envoy::config::listener::v3::QuicProtocolOptions& config, uint32_t concurrency) : concurrency_(concurrency), enabled_(config.enabled()) { @@ -155,11 +220,12 @@ ActiveQuicListenerFactory::ActiveQuicListenerFactory( quic_config_.SetMaxUnidirectionalStreamsToSend(max_streams); } -Network::ConnectionHandler::ActiveListenerPtr -ActiveQuicListenerFactory::createActiveUdpListener(Network::ConnectionHandler& parent, - Event::Dispatcher& disptacher, - Network::ListenerConfig& config) { +Network::ConnectionHandler::ActiveUdpListenerPtr ActiveQuicListenerFactory::createActiveUdpListener( + uint32_t worker_index, Network::ConnectionHandler& parent, Event::Dispatcher& disptacher, + Network::ListenerConfig& config) { + bool kernel_worker_routing = false; std::unique_ptr options = std::make_unique(); + #if defined(SO_ATTACH_REUSEPORT_CBPF) && defined(__linux__) // This BPF filter reads the 1st word of QUIC connection id in the UDP payload and mods it by the // number of workers to get the socket index in the SO_REUSEPORT socket groups. QUIC packets @@ -194,32 +260,32 @@ ActiveQuicListenerFactory::createActiveUdpListener(Network::ConnectionHandler& p sock_fprog prog; // This option only needs to be applied once to any one of the sockets in SO_REUSEPORT socket // group. One of the listener will be created with this socket option. - absl::call_once(install_bpf_once_, [&]() { - if (concurrency_ > 1) { - prog.len = filter.size(); - prog.filter = filter.data(); - options->push_back(std::make_shared( - envoy::config::core::v3::SocketOption::STATE_BOUND, ENVOY_ATTACH_REUSEPORT_CBPF, - absl::string_view(reinterpret_cast(&prog), sizeof(prog)))); - } - }); -#else - if (concurrency_ > 1) { -#ifdef __APPLE__ - // Not support multiple listeners in Mac OS unless someone cares. This is because SO_REUSEPORT - // doesn't behave as expected in Mac OS.(#8794) - ENVOY_LOG(error, "Because SO_REUSEPORT doesn't guarantee stable hashing from network 5 tuple " - "to socket in Mac OS. QUIC connection is not stable with concurrency > 1"); + if (Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.prefer_quic_kernel_bpf_packet_routing")) { + absl::call_once(install_bpf_once_, [&]() { + if (concurrency_ > 1) { + prog.len = filter.size(); + prog.filter = filter.data(); + options->push_back(std::make_shared( + envoy::config::core::v3::SocketOption::STATE_BOUND, ENVOY_ATTACH_REUSEPORT_CBPF, + absl::string_view(reinterpret_cast(&prog), sizeof(prog)))); + } + }); + + kernel_worker_routing = true; + }; + #else - ENVOY_LOG(warn, "BPF filter is not supported on this platform. QUIC won't support connection " - "migration and NAT port rebinding."); -#endif + if (concurrency_ != 1) { + ENVOY_LOG(warn, "Efficient routing of QUIC packets to the correct worker is not supported or " + "not implemented by Envoy on this platform. QUIC performance may be degraded."); } #endif - return std::make_unique(disptacher, parent, config, quic_config_, - std::move(options), enabled_); -} + return std::make_unique(worker_index, concurrency_, disptacher, parent, + config, quic_config_, std::move(options), + kernel_worker_routing, enabled_); +} // namespace Quic } // namespace Quic } // namespace Envoy diff --git a/source/extensions/quic_listeners/quiche/active_quic_listener.h b/source/extensions/quic_listeners/quiche/active_quic_listener.h index 08b7807dfc4f6..878032406ea9a 100644 --- a/source/extensions/quic_listeners/quiche/active_quic_listener.h +++ b/source/extensions/quic_listeners/quiche/active_quic_listener.h @@ -18,39 +18,42 @@ namespace Quic { // QUIC specific UdpListenerCallbacks implementation which delegates incoming // packets, write signals and listener errors to QuicDispatcher. -class ActiveQuicListener : public Network::UdpListenerCallbacks, - public Server::ConnectionHandlerImpl::ActiveListenerImplBase, +class ActiveQuicListener : public Envoy::Server::ActiveUdpListenerBase, Logger::Loggable { public: // TODO(bencebeky): Tune this value. static const size_t kNumSessionsToCreatePerLoop = 16; - ActiveQuicListener(Event::Dispatcher& dispatcher, Network::ConnectionHandler& parent, - Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config, - Network::Socket::OptionsSharedPtr options, + ActiveQuicListener(uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher, + Network::ConnectionHandler& parent, Network::ListenerConfig& listener_config, + const quic::QuicConfig& quic_config, Network::Socket::OptionsSharedPtr options, + bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled); - ActiveQuicListener(Event::Dispatcher& dispatcher, Network::ConnectionHandler& parent, - Network::SocketSharedPtr listen_socket, + ActiveQuicListener(uint32_t worker_index, uint32_t concurrency, Event::Dispatcher& dispatcher, + Network::ConnectionHandler& parent, Network::SocketSharedPtr listen_socket, Network::ListenerConfig& listener_config, const quic::QuicConfig& quic_config, - Network::Socket::OptionsSharedPtr options, + Network::Socket::OptionsSharedPtr options, bool kernel_worker_routing, const envoy::config::core::v3::RuntimeFeatureFlag& enabled); ~ActiveQuicListener() override; void onListenerShutdown(); + uint64_t eventLoopsWithBufferedChlosForTest() const { + return event_loops_with_buffered_chlo_for_test_; + } // Network::UdpListenerCallbacks - void onData(Network::UdpRecvData& data) override; void onReadReady() override; void onWriteReady(const Network::Socket& socket) override; void onReceiveError(Api::IoError::IoErrorCode /*error_code*/) override { // No-op. Quic can't do anything upon listener error. } Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; } + void onDataWorker(Network::UdpRecvData&& data) override; + uint32_t destination(const Network::UdpRecvData& data) const override; // ActiveListenerImplBase - Network::Listener* listener() override { return udp_listener_.get(); } void pauseListening() override; void resumeListening() override; void shutdownListener() override; @@ -58,15 +61,18 @@ class ActiveQuicListener : public Network::UdpListenerCallbacks, private: friend class ActiveQuicListenerPeer; - Network::UdpListenerPtr udp_listener_; uint8_t random_seed_[16]; std::unique_ptr crypto_config_; Event::Dispatcher& dispatcher_; quic::QuicVersionManager version_manager_; std::unique_ptr quic_dispatcher_; - Network::Socket& listen_socket_; + const bool kernel_worker_routing_; Runtime::FeatureFlag enabled_; Network::UdpPacketWriter* udp_packet_writer_; + + // The number of runs of the event loop in which at least one CHLO was buffered. + // TODO(ggreenway): Consider making this a published stat, or some variation of this information. + uint64_t event_loops_with_buffered_chlo_for_test_{0}; }; using ActiveQuicListenerPtr = std::unique_ptr; @@ -79,9 +85,9 @@ class ActiveQuicListenerFactory : public Network::ActiveUdpListenerFactory, uint32_t concurrency); // Network::ActiveUdpListenerFactory. - Network::ConnectionHandler::ActiveListenerPtr - createActiveUdpListener(Network::ConnectionHandler& parent, Event::Dispatcher& disptacher, - Network::ListenerConfig& config) override; + Network::ConnectionHandler::ActiveUdpListenerPtr + createActiveUdpListener(uint32_t worker_index, Network::ConnectionHandler& parent, + Event::Dispatcher& disptacher, Network::ListenerConfig& config) override; bool isTransportConnectionless() const override { return false; } private: diff --git a/source/extensions/quic_listeners/quiche/udp_gso_batch_writer.h b/source/extensions/quic_listeners/quiche/udp_gso_batch_writer.h index 477ad8bdcdc7a..db4ebe9a3247c 100644 --- a/source/extensions/quic_listeners/quiche/udp_gso_batch_writer.h +++ b/source/extensions/quic_listeners/quiche/udp_gso_batch_writer.h @@ -1,5 +1,10 @@ #pragma once +#if !defined(__linux__) +#define UDP_GSO_BATCH_WRITER_COMPILETIME_SUPPORT 0 +#else +#define UDP_GSO_BATCH_WRITER_COMPILETIME_SUPPORT 1 + #pragma GCC diagnostic push // QUICHE allows unused parameters. #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -122,3 +127,5 @@ class UdpGsoBatchWriterFactory : public Network::UdpPacketWriterFactory { } // namespace Quic } // namespace Envoy + +#endif // defined(__linux__) diff --git a/source/server/BUILD b/source/server/BUILD index dc82ca7ef502a..bc938e92819e3 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -328,6 +328,7 @@ envoy_cc_library( "//source/common/network:connection_balancer_lib", "//source/common/network:filter_matcher_lib", "//source/common/network:listen_socket_lib", + "//source/common/network:listener_lib", "//source/common/network:resolver_lib", "//source/common/network:socket_option_factory_lib", "//source/common/network:utility_lib", diff --git a/source/server/active_raw_udp_listener_config.cc b/source/server/active_raw_udp_listener_config.cc index f34abe2fcb0ef..a3aa7b71918ff 100644 --- a/source/server/active_raw_udp_listener_config.cc +++ b/source/server/active_raw_udp_listener_config.cc @@ -11,11 +11,16 @@ namespace Envoy { namespace Server { -Network::ConnectionHandler::ActiveListenerPtr -ActiveRawUdpListenerFactory::createActiveUdpListener(Network::ConnectionHandler& parent, +ActiveRawUdpListenerFactory::ActiveRawUdpListenerFactory(uint32_t concurrency) + : concurrency_(concurrency) {} + +Network::ConnectionHandler::ActiveUdpListenerPtr +ActiveRawUdpListenerFactory::createActiveUdpListener(uint32_t worker_index, + Network::ConnectionHandler& parent, Event::Dispatcher& dispatcher, Network::ListenerConfig& config) { - return std::make_unique(parent, dispatcher, config); + return std::make_unique(worker_index, concurrency_, parent, dispatcher, + config); } ProtobufTypes::MessagePtr ActiveRawUdpListenerConfigFactory::createEmptyConfigProto() { @@ -24,8 +29,8 @@ ProtobufTypes::MessagePtr ActiveRawUdpListenerConfigFactory::createEmptyConfigPr Network::ActiveUdpListenerFactoryPtr ActiveRawUdpListenerConfigFactory::createActiveUdpListenerFactory( - const Protobuf::Message& /*message*/, uint32_t /*concurrency*/) { - return std::make_unique(); + const Protobuf::Message& /*message*/, uint32_t concurrency) { + return std::make_unique(concurrency); } std::string ActiveRawUdpListenerConfigFactory::name() const { diff --git a/source/server/active_raw_udp_listener_config.h b/source/server/active_raw_udp_listener_config.h index da0216301de65..ce718dc8295d9 100644 --- a/source/server/active_raw_udp_listener_config.h +++ b/source/server/active_raw_udp_listener_config.h @@ -9,11 +9,16 @@ namespace Server { class ActiveRawUdpListenerFactory : public Network::ActiveUdpListenerFactory { public: - Network::ConnectionHandler::ActiveListenerPtr - createActiveUdpListener(Network::ConnectionHandler& parent, Event::Dispatcher& disptacher, - Network::ListenerConfig& config) override; + ActiveRawUdpListenerFactory(uint32_t concurrency); + + Network::ConnectionHandler::ActiveUdpListenerPtr + createActiveUdpListener(uint32_t worker_index, Network::ConnectionHandler& parent, + Event::Dispatcher& disptacher, Network::ListenerConfig& config) override; bool isTransportConnectionless() const override { return true; } + +private: + const uint32_t concurrency_; }; // This class uses a protobuf config to create a UDP listener factory which diff --git a/source/server/admin/admin.h b/source/server/admin/admin.h index 8ed948da85b13..945885ac39098 100644 --- a/source/server/admin/admin.h +++ b/source/server/admin/admin.h @@ -345,6 +345,9 @@ class AdminImpl : public Admin, Network::UdpPacketWriterFactoryOptRef udpPacketWriterFactory() override { NOT_REACHED_GCOVR_EXCL_LINE; } + Network::UdpListenerWorkerRouterOptRef udpListenerWorkerRouter() override { + NOT_REACHED_GCOVR_EXCL_LINE; + } envoy::config::core::v3::TrafficDirection direction() const override { return envoy::config::core::v3::UNSPECIFIED; } diff --git a/source/server/config_validation/server.h b/source/server/config_validation/server.h index 11fe1c5f84edb..a82364038ad94 100644 --- a/source/server/config_validation/server.h +++ b/source/server/config_validation/server.h @@ -156,7 +156,7 @@ class ValidationInstance final : Logger::Loggable, uint64_t nextListenerTag() override { return 0; } // Server::WorkerFactory - WorkerPtr createWorker(OverloadManager&, const std::string&) override { + WorkerPtr createWorker(uint32_t, OverloadManager&, const std::string&) override { // Returned workers are not currently used so we can return nothing here safely vs. a // validation mock. return nullptr; diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index 70f69b8185b86..4c3c431c42915 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -26,9 +26,10 @@ void emitLogs(Network::ListenerConfig& config, StreamInfo::StreamInfo& stream_in } } // namespace -ConnectionHandlerImpl::ConnectionHandlerImpl(Event::Dispatcher& dispatcher) - : dispatcher_(dispatcher), per_handler_stat_prefix_(dispatcher.name() + "."), - disable_listeners_(false) {} +ConnectionHandlerImpl::ConnectionHandlerImpl(Event::Dispatcher& dispatcher, + absl::optional worker_index) + : worker_index_(worker_index), dispatcher_(dispatcher), + per_handler_stat_prefix_(dispatcher.name() + "."), disable_listeners_(false) {} void ConnectionHandlerImpl::incNumConnections() { ++num_handler_connections_; } @@ -44,19 +45,23 @@ void ConnectionHandlerImpl::addListener(absl::optional overridden_list if (overridden_listener.has_value()) { for (auto& listener : listeners_) { if (listener.second.listener_->listenerTag() == overridden_listener) { - listener.second.tcp_listener_->get().updateListenerConfig(config); + listener.second.tcpListener()->get().updateListenerConfig(config); return; } } NOT_REACHED_GCOVR_EXCL_LINE; } auto tcp_listener = std::make_unique(*this, config); - details.tcp_listener_ = *tcp_listener; + details.typed_listener_ = *tcp_listener; details.listener_ = std::move(tcp_listener); } else { ASSERT(config.udpListenerFactory() != nullptr, "UDP listener factory is not initialized."); - details.listener_ = - config.udpListenerFactory()->createActiveUdpListener(*this, dispatcher_, config); + ASSERT(worker_index_.has_value()); + ConnectionHandler::ActiveUdpListenerPtr udp_listener = + config.udpListenerFactory()->createActiveUdpListener(*worker_index_, *this, dispatcher_, + config); + details.typed_listener_ = *udp_listener; + details.listener_ = std::move(udp_listener); } if (disable_listeners_) { details.listener_->pauseListening(); @@ -74,12 +79,39 @@ void ConnectionHandlerImpl::removeListeners(uint64_t listener_tag) { } } +ConnectionHandlerImpl::ActiveListenerDetailsOptRef +ConnectionHandlerImpl::findActiveListenerByTag(uint64_t listener_tag) { + // TODO(mattklein123): We should probably use a hash table here to lookup the tag + // instead of iterating through the listener list. + for (auto& listener : listeners_) { + if (listener.second.listener_->listener() != nullptr && + listener.second.listener_->listenerTag() == listener_tag) { + return listener.second; + } + } + + return absl::nullopt; +} + +Network::UdpListenerCallbacksOptRef +ConnectionHandlerImpl::getUdpListenerCallbacks(uint64_t listener_tag) { + auto listener = findActiveListenerByTag(listener_tag); + if (listener.has_value()) { + // If the tag matches this must be a UDP listener. + auto udp_listener = listener->get().udpListener(); + ASSERT(udp_listener.has_value()); + return udp_listener; + } + + return absl::nullopt; +} + void ConnectionHandlerImpl::removeFilterChains( uint64_t listener_tag, const std::list& filter_chains, std::function completion) { for (auto& listener : listeners_) { if (listener.second.listener_->listenerTag() == listener_tag) { - listener.second.tcp_listener_->get().deferredRemoveFilterChains(filter_chains); + listener.second.tcpListener()->get().deferredRemoveFilterChains(filter_chains); // Completion is deferred because the above removeFilterChains() may defer delete connection. Event::DeferredTaskUtil::deferredRun(dispatcher_, std::move(completion)); return; @@ -205,15 +237,14 @@ ConnectionHandlerImpl::findActiveTcpListenerByAddress(const Network::Address::In // We do not return stopped listeners. auto listener_it = std::find_if( listeners_.begin(), listeners_.end(), - [&address]( - const std::pair& p) { - return p.second.tcp_listener_.has_value() && p.second.listener_->listener() != nullptr && + [&address](std::pair& p) { + return p.second.tcpListener().has_value() && p.second.listener_->listener() != nullptr && p.first->type() == Network::Address::Type::Ip && *(p.first) == address; }); // If there is exact address match, return the corresponding listener. if (listener_it != listeners_.end()) { - return listener_it->second.tcp_listener_; + return listener_it->second.tcpListener(); } // Otherwise, we need to look for the wild card match, i.e., 0.0.0.0:[address_port]. @@ -223,11 +254,16 @@ ConnectionHandlerImpl::findActiveTcpListenerByAddress(const Network::Address::In listeners_.begin(), listeners_.end(), [&address]( const std::pair& p) { - return p.second.tcp_listener_.has_value() && p.second.listener_->listener() != nullptr && + return absl::holds_alternative>( + p.second.typed_listener_) && + p.second.listener_->listener() != nullptr && p.first->type() == Network::Address::Type::Ip && p.first->ip()->port() == address.ip()->port() && p.first->ip()->isAnyAddress(); }); - return (listener_it != listeners_.end()) ? listener_it->second.tcp_listener_ : absl::nullopt; + return (listener_it != listeners_.end()) + ? ActiveTcpListenerOptRef(absl::get>( + listener_it->second.typed_listener_)) + : absl::nullopt; } void ConnectionHandlerImpl::ActiveTcpSocket::onTimeout() { @@ -477,21 +513,18 @@ void ConnectionHandlerImpl::ActiveTcpListener::post(Network::ConnectionSocketPtr parent_.dispatcher_.post( [socket_to_rebalance, tag = config_->listenerTag(), &parent = parent_]() { - // TODO(mattklein123): We should probably use a hash table here to lookup the tag instead of - // iterating through the listener list. - for (const auto& listener : parent.listeners_) { - if (listener.second.listener_->listener() != nullptr && - listener.second.listener_->listenerTag() == tag) { - // If the tag matches this must be a TCP listener. - ASSERT(listener.second.tcp_listener_.has_value()); - listener.second.tcp_listener_.value().get().onAcceptWorker( - std::move(socket_to_rebalance->socket), - listener.second.tcp_listener_.value() - .get() - .config_->handOffRestoredDestinationConnections(), - true); - return; - } + auto listener = parent.findActiveListenerByTag(tag); + if (listener.has_value()) { + // If the tag matches this must be a TCP listener. + ASSERT(absl::holds_alternative>( + listener->get().typed_listener_)); + auto& tcp_listener = + absl::get>(listener->get().typed_listener_) + .get(); + tcp_listener.onAcceptWorker(std::move(socket_to_rebalance->socket), + tcp_listener.config_->handOffRestoredDestinationConnections(), + true); + return; } }); } @@ -542,33 +575,102 @@ ConnectionHandlerImpl::ActiveTcpConnection::~ActiveTcpConnection() { listener.parent_.decNumConnections(); } -ActiveRawUdpListener::ActiveRawUdpListener(Network::ConnectionHandler& parent, +ConnectionHandlerImpl::ActiveTcpListenerOptRef +ConnectionHandlerImpl::ActiveListenerDetails::tcpListener() { + auto* val = absl::get_if>(&typed_listener_); + return (val != nullptr) ? absl::make_optional(*val) : absl::nullopt; +} + +ConnectionHandlerImpl::UdpListenerCallbacksOptRef +ConnectionHandlerImpl::ActiveListenerDetails::udpListener() { + auto* val = absl::get_if>(&typed_listener_); + return (val != nullptr) ? absl::make_optional(*val) : absl::nullopt; +} + +ActiveUdpListenerBase::ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, + Network::Socket& listen_socket, + Network::UdpListenerPtr&& listener, + Network::ListenerConfig* config) + : ConnectionHandlerImpl::ActiveListenerImplBase(parent, config), worker_index_(worker_index), + concurrency_(concurrency), parent_(parent), listen_socket_(listen_socket), + udp_listener_(std::move(listener)) { + ASSERT(worker_index_ < concurrency_); + config_->udpListenerWorkerRouter()->get().registerWorkerForListener(*this); +} + +ActiveUdpListenerBase::~ActiveUdpListenerBase() { + config_->udpListenerWorkerRouter()->get().unregisterWorkerForListener(*this); +} + +void ActiveUdpListenerBase::post(Network::UdpRecvData&& data) { + ASSERT(!udp_listener_->dispatcher().isThreadSafe(), + "Shouldn't be post'ing if thread safe; use onWorkerData() instead."); + + // It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must + // bundle the socket inside a shared_ptr that can be captured. + // TODO(mattklein123): It may be possible to change the post() API such that the lambda is only + // moved, but this is non-trivial and needs investigation. + auto data_to_post = std::make_shared(); + *data_to_post = std::move(data); + + udp_listener_->dispatcher().post( + [data_to_post, tag = config_->listenerTag(), &parent = parent_]() { + Network::UdpListenerCallbacksOptRef listener = parent.getUdpListenerCallbacks(tag); + if (listener.has_value()) { + listener->get().onDataWorker(std::move(*data_to_post)); + } + }); +} + +void ActiveUdpListenerBase::onData(Network::UdpRecvData&& data) { + uint32_t dest = worker_index_; + + // For concurrency == 1, the packet will always go to the current worker. + if (concurrency_ > 1) { + dest = destination(data); + ASSERT(dest < concurrency_); + } + + if (dest == worker_index_) { + onDataWorker(std::move(data)); + } else { + config_->udpListenerWorkerRouter()->get().deliver(dest, std::move(data)); + } +} + +ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Event::Dispatcher& dispatcher, Network::ListenerConfig& config) - : ActiveRawUdpListener(parent, config.listenSocketFactory().getListenSocket(), dispatcher, - config) {} + : ActiveRawUdpListener(worker_index, concurrency, parent, + config.listenSocketFactory().getListenSocket(), dispatcher, config) {} -ActiveRawUdpListener::ActiveRawUdpListener(Network::ConnectionHandler& parent, +ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher, Network::ListenerConfig& config) - : ActiveRawUdpListener(parent, *listen_socket_ptr, listen_socket_ptr, dispatcher, config) {} + : ActiveRawUdpListener(worker_index, concurrency, parent, *listen_socket_ptr, listen_socket_ptr, + dispatcher, config) {} -ActiveRawUdpListener::ActiveRawUdpListener(Network::ConnectionHandler& parent, +ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Network::Socket& listen_socket, Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher, Network::ListenerConfig& config) - : ActiveRawUdpListener(parent, listen_socket, - dispatcher.createUdpListener(std::move(listen_socket_ptr), *this), - config) {} + : ActiveRawUdpListener(worker_index, concurrency, parent, listen_socket, + dispatcher.createUdpListener(listen_socket_ptr, *this), config) {} -ActiveRawUdpListener::ActiveRawUdpListener(Network::ConnectionHandler& parent, +ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Network::Socket& listen_socket, Network::UdpListenerPtr&& listener, Network::ListenerConfig& config) - : ConnectionHandlerImpl::ActiveListenerImplBase(parent, &config), - udp_listener_(std::move(listener)), read_filter_(nullptr), listen_socket_(listen_socket) { + : ActiveUdpListenerBase(worker_index, concurrency, parent, listen_socket, std::move(listener), + &config), + read_filter_(nullptr) { // Create the filter chain on creating a new udp listener config_->filterChainFactory().createUdpListenerFilterChain(*this, *this); @@ -584,7 +686,7 @@ ActiveRawUdpListener::ActiveRawUdpListener(Network::ConnectionHandler& parent, listen_socket_.ioHandle(), config.listenerScope()); } -void ActiveRawUdpListener::onData(Network::UdpRecvData& data) { read_filter_->onData(data); } +void ActiveRawUdpListener::onDataWorker(Network::UdpRecvData&& data) { read_filter_->onData(data); } void ActiveRawUdpListener::onReadReady() {} diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index 3a6a252e3084f..3eb3e8e58c351 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -55,6 +55,8 @@ struct PerHandlerListenerStats { ALL_PER_HANDLER_LISTENER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) }; +class ActiveUdpListenerBase; + /** * Server side connection handler. This is used both by workers as well as the * main thread for non-threaded listeners. @@ -63,7 +65,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, NonCopyable, Logger::Loggable { public: - ConnectionHandlerImpl(Event::Dispatcher& dispatcher); + ConnectionHandlerImpl(Event::Dispatcher& dispatcher, absl::optional worker_index); // Network::ConnectionHandler uint64_t numConnections() const override { return num_handler_connections_; } @@ -72,6 +74,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, void addListener(absl::optional overridden_listener, Network::ListenerConfig& config) override; void removeListeners(uint64_t listener_tag) override; + Network::UdpListenerCallbacksOptRef getUdpListenerCallbacks(uint64_t listener_tag) override; void removeFilterChains(uint64_t listener_tag, const std::list& filter_chains, std::function completion) override; @@ -84,7 +87,7 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, /** * Wrapper for an active listener owned by this handler. */ - class ActiveListenerImplBase : public Network::ConnectionHandler::ActiveListener { + class ActiveListenerImplBase : public virtual Network::ConnectionHandler::ActiveListener { public: ActiveListenerImplBase(Network::ConnectionHandler& parent, Network::ListenerConfig* config); @@ -331,16 +334,28 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, }; using ActiveTcpListenerOptRef = absl::optional>; + using UdpListenerCallbacksOptRef = + absl::optional>; struct ActiveListenerDetails { // Strong pointer to the listener, whether TCP, UDP, QUIC, etc. Network::ConnectionHandler::ActiveListenerPtr listener_; - // Reference to the listener IFF this is a TCP listener. Null otherwise. - ActiveTcpListenerOptRef tcp_listener_; + + absl::variant, + std::reference_wrapper> + typed_listener_; + + // Helpers for accessing the data in the variant for cleaner code. + ActiveTcpListenerOptRef tcpListener(); + UdpListenerCallbacksOptRef udpListener(); }; + using ActiveListenerDetailsOptRef = absl::optional>; ActiveTcpListenerOptRef findActiveTcpListenerByAddress(const Network::Address::Instance& address); + ActiveListenerDetailsOptRef findActiveListenerByTag(uint64_t listener_tag); + // This has a value on worker threads, and no value on the main thread. + const absl::optional worker_index_; Event::Dispatcher& dispatcher_; const std::string per_handler_stat_prefix_; std::list> listeners_; @@ -348,34 +363,67 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler, bool disable_listeners_; }; +class ActiveUdpListenerBase : public ConnectionHandlerImpl::ActiveListenerImplBase, + public Network::ConnectionHandler::ActiveUdpListener { +public: + ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Network::Socket& listen_socket, + Network::UdpListenerPtr&& listener, Network::ListenerConfig* config); + ~ActiveUdpListenerBase() override; + + // Network::UdpListenerCallbacks + void onData(Network::UdpRecvData&& data) final; + uint32_t workerIndex() const final { return worker_index_; } + void post(Network::UdpRecvData&& data) final; + + // ActiveListenerImplBase + Network::Listener* listener() override { return udp_listener_.get(); } + +protected: + uint32_t destination(const Network::UdpRecvData& /*data*/) const override { + // By default, route to the current worker. + return worker_index_; + } + + const uint32_t worker_index_; + const uint32_t concurrency_; + Network::ConnectionHandler& parent_; + Network::Socket& listen_socket_; + Network::UdpListenerPtr udp_listener_; +}; + /** * Wrapper for an active udp listener owned by this handler. */ -class ActiveRawUdpListener : public Network::UdpListenerCallbacks, - public ConnectionHandlerImpl::ActiveListenerImplBase, +class ActiveRawUdpListener : public ActiveUdpListenerBase, public Network::UdpListenerFilterManager, public Network::UdpReadFilterCallbacks { public: - ActiveRawUdpListener(Network::ConnectionHandler& parent, Event::Dispatcher& dispatcher, + ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Event::Dispatcher& dispatcher, Network::ListenerConfig& config); - ActiveRawUdpListener(Network::ConnectionHandler& parent, + ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher, Network::ListenerConfig& config); - ActiveRawUdpListener(Network::ConnectionHandler& parent, Network::Socket& listen_socket, + ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Network::Socket& listen_socket, Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher, Network::ListenerConfig& config); - ActiveRawUdpListener(Network::ConnectionHandler& parent, Network::Socket& listen_socket, + ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::ConnectionHandler& parent, Network::Socket& listen_socket, Network::UdpListenerPtr&& listener, Network::ListenerConfig& config); // Network::UdpListenerCallbacks - void onData(Network::UdpRecvData& data) override; void onReadReady() override; void onWriteReady(const Network::Socket& socket) override; void onReceiveError(Api::IoError::IoErrorCode error_code) override; Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; } + // Network::UdpWorker + void onDataWorker(Network::UdpRecvData&& data) override; + // ActiveListenerImplBase - Network::Listener* listener() override { return udp_listener_.get(); } void pauseListening() override { udp_listener_->disable(); } void resumeListening() override { udp_listener_->enable(); } void shutdownListener() override { @@ -394,10 +442,8 @@ class ActiveRawUdpListener : public Network::UdpListenerCallbacks, Network::UdpListener& udpListener() override; private: - Network::UdpListenerPtr udp_listener_; Network::UdpListenerReadFilterPtr read_filter_; Network::UdpPacketWriterPtr udp_packet_writer_; - Network::Socket& listen_socket_; }; } // namespace Server diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index 70c13c4504ea2..3238108fa712a 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -19,6 +19,7 @@ #include "common/network/resolver_impl.h" #include "common/network/socket_option_factory.h" #include "common/network/socket_option_impl.h" +#include "common/network/udp_listener_impl.h" #include "common/network/utility.h" #include "common/protobuf/utility.h" #include "common/runtime/runtime_features.h" @@ -377,6 +378,9 @@ void ListenerImpl::buildUdpListenerFactory(Network::Socket::Type socket_type, ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(udp_config, validation_visitor_, config_factory); udp_listener_factory_ = config_factory.createActiveUdpListenerFactory(*message, concurrency); + + udp_listener_worker_router_ = + std::make_unique(concurrency); } } diff --git a/source/server/listener_impl.h b/source/server/listener_impl.h index 1e9a26606628d..ec0119b32b2f1 100644 --- a/source/server/listener_impl.h +++ b/source/server/listener_impl.h @@ -305,6 +305,11 @@ class ListenerImpl final : public Network::ListenerConfig, Network::UdpPacketWriterFactoryOptRef udpPacketWriterFactory() override { return Network::UdpPacketWriterFactoryOptRef(std::ref(*udp_writer_factory_)); } + Network::UdpListenerWorkerRouterOptRef udpListenerWorkerRouter() override { + return udp_listener_worker_router_ + ? Network::UdpListenerWorkerRouterOptRef(*udp_listener_worker_router_) + : absl::nullopt; + } Network::ConnectionBalancer& connectionBalancer() override { return *connection_balancer_; } ResourceLimit& openConnections() override { return *open_connections_; } @@ -393,6 +398,7 @@ class ListenerImpl final : public Network::ListenerConfig, const bool continue_on_listener_filters_timeout_; Network::ActiveUdpListenerFactoryPtr udp_listener_factory_; Network::UdpPacketWriterFactoryPtr udp_writer_factory_; + Network::UdpListenerWorkerRouterPtr udp_listener_worker_router_; Network::ConnectionBalancerSharedPtr connection_balancer_; std::shared_ptr listener_factory_context_; FilterChainManagerImpl filter_chain_manager_; diff --git a/source/server/listener_manager_impl.cc b/source/server/listener_manager_impl.cc index 5a01aea28c473..8506d5cbc2862 100644 --- a/source/server/listener_manager_impl.cc +++ b/source/server/listener_manager_impl.cc @@ -261,7 +261,7 @@ ListenerManagerImpl::ListenerManagerImpl(Instance& server, enable_dispatcher_stats_(enable_dispatcher_stats) { for (uint32_t i = 0; i < server.options().concurrency(); i++) { workers_.emplace_back( - worker_factory.createWorker(server.overloadManager(), absl::StrCat("worker_", i))); + worker_factory.createWorker(i, server.overloadManager(), absl::StrCat("worker_", i))); } } diff --git a/source/server/listener_manager_impl.h b/source/server/listener_manager_impl.h index e502565dd910b..af4e333c1adb9 100644 --- a/source/server/listener_manager_impl.h +++ b/source/server/listener_manager_impl.h @@ -302,7 +302,7 @@ class ListenerManagerImpl : public ListenerManager, Logger::Loggable draining_listeners_; std::list draining_filter_chains_manager_; - std::list workers_; + std::vector workers_; bool workers_started_{}; absl::optional stop_listeners_type_; Stats::ScopePtr scope_; diff --git a/source/server/server.cc b/source/server/server.cc index abcc8e59f4311..1616e79c86dda 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -72,7 +72,7 @@ InstanceImpl::InstanceImpl( : absl::nullopt)), dispatcher_(api_->allocateDispatcher("main_thread")), singleton_manager_(new Singleton::ManagerImpl(api_->threadFactory())), - handler_(new ConnectionHandlerImpl(*dispatcher_)), + handler_(new ConnectionHandlerImpl(*dispatcher_, absl::nullopt)), random_generator_(std::move(random_generator)), listener_component_factory_(*this), worker_factory_(thread_local_, *api_, hooks), access_log_manager_(options.fileFlushIntervalMsec(), *api_, *dispatcher_, access_log_lock, diff --git a/source/server/worker_impl.cc b/source/server/worker_impl.cc index b5bbe8d91cbbc..1fd18f106618a 100644 --- a/source/server/worker_impl.cc +++ b/source/server/worker_impl.cc @@ -14,13 +14,12 @@ namespace Envoy { namespace Server { -WorkerPtr ProdWorkerFactory::createWorker(OverloadManager& overload_manager, +WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager, const std::string& worker_name) { Event::DispatcherPtr dispatcher(api_.allocateDispatcher(worker_name)); - return WorkerPtr{ - new WorkerImpl(tls_, hooks_, std::move(dispatcher), - Network::ConnectionHandlerPtr{new ConnectionHandlerImpl(*dispatcher)}, - overload_manager, api_)}; + return std::make_unique(tls_, hooks_, std::move(dispatcher), + std::make_unique(*dispatcher, index), + overload_manager, api_); } WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks, diff --git a/source/server/worker_impl.h b/source/server/worker_impl.h index 4161c1abcc0af..c4cb4a58c2b5b 100644 --- a/source/server/worker_impl.h +++ b/source/server/worker_impl.h @@ -23,7 +23,7 @@ class ProdWorkerFactory : public WorkerFactory, Logger::LoggableallocateDispatcher("test_thread")), socket_(std::make_shared( Network::Test::getCanonicalLoopbackAddress(GetParam()), nullptr, true)), - connection_handler_(new Server::ConnectionHandlerImpl(*dispatcher_)), name_("proxy"), - filter_chain_(Network::Test::createEmptyFilterChainWithRawBufferSockets()), + connection_handler_(new Server::ConnectionHandlerImpl(*dispatcher_, absl::nullopt)), + name_("proxy"), filter_chain_(Network::Test::createEmptyFilterChainWithRawBufferSockets()), init_manager_(nullptr) { EXPECT_CALL(socket_factory_, socketType()).WillOnce(Return(Network::Socket::Type::Stream)); EXPECT_CALL(socket_factory_, localAddress()).WillOnce(ReturnRef(socket_->localAddress())); @@ -72,6 +72,9 @@ class ProxyProtocolRegressionTest : public testing::TestWithParamallocateDispatcher("test_thread")), socket_(std::make_shared( Network::Test::getCanonicalLoopbackAddress(GetParam()), nullptr, true)), - connection_handler_(new Server::ConnectionHandlerImpl(*dispatcher_)), name_("proxy"), - filter_chain_(Network::Test::createEmptyFilterChainWithRawBufferSockets()), + connection_handler_(new Server::ConnectionHandlerImpl(*dispatcher_, absl::nullopt)), + name_("proxy"), filter_chain_(Network::Test::createEmptyFilterChainWithRawBufferSockets()), init_manager_(nullptr) { EXPECT_CALL(socket_factory_, socketType()).WillOnce(Return(Network::Socket::Type::Stream)); EXPECT_CALL(socket_factory_, localAddress()).WillOnce(ReturnRef(socket_->localAddress())); @@ -88,6 +88,9 @@ class ProxyProtocolTest : public testing::TestWithParamlocalAddress()->ip()->port())), - connection_handler_(new Server::ConnectionHandlerImpl(*dispatcher_)), name_("proxy"), - filter_chain_(Network::Test::createEmptyFilterChainWithRawBufferSockets()), + connection_handler_(new Server::ConnectionHandlerImpl(*dispatcher_, absl::nullopt)), + name_("proxy"), filter_chain_(Network::Test::createEmptyFilterChainWithRawBufferSockets()), init_manager_(nullptr) { EXPECT_CALL(socket_factory_, socketType()).WillOnce(Return(Network::Socket::Type::Stream)); EXPECT_CALL(socket_factory_, localAddress()).WillOnce(ReturnRef(socket_->localAddress())); @@ -1323,6 +1326,9 @@ class WildcardProxyProtocolTest : public testing::TestWithParamallocateDispatcher("test_thread")), clock_(*dispatcher_), local_address_(Network::Test::getCanonicalLoopbackAddress(version_)), - connection_handler_(*dispatcher_), quic_version_([]() { + connection_handler_(*dispatcher_, absl::nullopt), quic_version_([]() { if (GetParam().second == QuicVersionType::GquicQuicCrypto) { return quic::CurrentSupportedVersionsWithQuicCrypto(); } @@ -111,23 +112,27 @@ class ActiveQuicListenerTest : public QuicMultiVersionTest { ON_CALL(listener_config_, listenSocketFactory()).WillByDefault(ReturnRef(socket_factory_)); ON_CALL(socket_factory_, getListenSocket()).WillByDefault(Return(listen_socket_)); - // Use UdpGsoBatchWriter to perform non-batched writes for the purpose of this test + // Use UdpGsoBatchWriter to perform non-batched writes for the purpose of this test, if it is + // supported. ON_CALL(listener_config_, udpPacketWriterFactory()) .WillByDefault(Return( std::reference_wrapper(udp_packet_writer_factory_))); ON_CALL(udp_packet_writer_factory_, createUdpPacketWriter(_, _)) .WillByDefault(Invoke( [&](Network::IoHandle& io_handle, Stats::Scope& scope) -> Network::UdpPacketWriterPtr { - Network::UdpPacketWriterPtr udp_packet_writer = - std::make_unique(io_handle, scope); - return udp_packet_writer; +#if UDP_GSO_BATCH_WRITER_COMPILETIME_SUPPORT + return std::make_unique(io_handle, scope); +#else + UNREFERENCED_PARAMETER(scope); + return std::make_unique(io_handle); +#endif })); listener_factory_ = createQuicListenerFactory(yamlForQuicConfig()); EXPECT_CALL(listener_config_, filterChainManager()).WillOnce(ReturnRef(filter_chain_manager_)); quic_listener_ = staticUniquePointerCast(listener_factory_->createActiveUdpListener( - connection_handler_, *dispatcher_, listener_config_)); + 0, connection_handler_, *dispatcher_, listener_config_)); quic_dispatcher_ = ActiveQuicListenerPeer::quicDispatcher(*quic_listener_); quic::QuicCryptoServerConfig& crypto_config = ActiveQuicListenerPeer::cryptoConfig(*quic_listener_); @@ -137,6 +142,13 @@ class ActiveQuicListenerTest : public QuicMultiVersionTest { crypto_config_peer.ResetProofSource(std::move(proof_source)); simulated_time_system_.advanceTimeAndRun(std::chrono::milliseconds(100), *dispatcher_, Event::Dispatcher::RunType::NonBlock); + + // The state of whether client hellos can be buffered or not is different before and after + // the first packet processed by the listener. This only matters in tests. Force an event + // to get it into a consistent state. + dispatcher_->post([this]() { quic_listener_->onReadReady(); }); + + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); } Network::ActiveUdpListenerFactoryPtr createQuicListenerFactory(const std::string& yaml) { @@ -200,6 +212,14 @@ class ActiveQuicListenerTest : public QuicMultiVersionTest { auto send_rc = Network::Utility::writeToSocket(client_sockets_.back()->ioHandle(), slice.data(), 1, nullptr, *listen_socket_->localAddress()); ASSERT_EQ(slice[0].len_, send_rc.rc_); + +#if defined(__APPLE__) + // This sleep makes the tests pass more reliably. Some debugging showed that without this, + // no packet is received when the event loop is running. + // TODO(ggreenway): make tests more reliable, and handle packet loss during the tests, possibly + // by retransmitting on a timer. + ::usleep(1000); +#endif } void readFromClientSockets() { @@ -230,7 +250,9 @@ class ActiveQuicListenerTest : public QuicMultiVersionTest { } void TearDown() override { - quic_listener_->onListenerShutdown(); + if (quic_listener_ != nullptr) { + quic_listener_->onListenerShutdown(); + } // Trigger alarm to fire before listener destruction. dispatcher_->run(Event::Dispatcher::RunType::NonBlock); Runtime::LoaderSingleton::clear(); @@ -292,10 +314,11 @@ TEST_P(ActiveQuicListenerTest, FailSocketOptionUponCreation) { .WillOnce(Return(false)); auto options = std::make_shared>(); options->emplace_back(std::move(option)); + quic_listener_.reset(); EXPECT_THROW_WITH_REGEX( std::make_unique( - *dispatcher_, connection_handler_, listen_socket_, listener_config_, quic_config_, - options, + 0, 1, *dispatcher_, connection_handler_, listen_socket_, listener_config_, quic_config_, + options, false, ActiveQuicListenerFactoryPeer::runtimeEnabled( static_cast(listener_factory_.get()))), Network::CreateListenerException, "Failed to apply socket options."); @@ -315,50 +338,46 @@ TEST_P(ActiveQuicListenerTest, ReceiveCHLO) { TEST_P(ActiveQuicListenerTest, ProcessBufferedChlos) { quic::QuicBufferedPacketStore* const buffered_packets = quic::test::QuicDispatcherPeer::GetBufferedPackets(quic_dispatcher_); - maybeConfigureMocks(ActiveQuicListener::kNumSessionsToCreatePerLoop + 2); + const uint32_t count = (ActiveQuicListener::kNumSessionsToCreatePerLoop * 2) + 1; + maybeConfigureMocks(count); // Generate one more CHLO than can be processed immediately. - for (size_t i = 1; i <= ActiveQuicListener::kNumSessionsToCreatePerLoop + 1; ++i) { + for (size_t i = 1; i <= count; ++i) { sendCHLO(quic::test::TestConnectionId(i)); } dispatcher_->run(Event::Dispatcher::RunType::NonBlock); - // The first kNumSessionsToCreatePerLoop CHLOs are processed, - // the last one is buffered. - for (size_t i = 1; i <= ActiveQuicListener::kNumSessionsToCreatePerLoop; ++i) { - EXPECT_FALSE(buffered_packets->HasBufferedPackets(quic::test::TestConnectionId(i))); - } - EXPECT_TRUE(buffered_packets->HasBufferedPackets( - quic::test::TestConnectionId(ActiveQuicListener::kNumSessionsToCreatePerLoop + 1))); - EXPECT_TRUE(buffered_packets->HasChlosBuffered()); - EXPECT_FALSE(quic_dispatcher_->session_map().empty()); - - // Generate more data to trigger a socket read during the next event loop. - sendCHLO(quic::test::TestConnectionId(ActiveQuicListener::kNumSessionsToCreatePerLoop + 2)); - dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + // The first kNumSessionsToCreatePerLoop were processed immediately, the next + // kNumSessionsToCreatePerLoop were buffered for the next run of the event loop, and the last one + // was buffered to the subsequent event loop. + EXPECT_EQ(2, quic_listener_->eventLoopsWithBufferedChlosForTest()); - // The socket read results in processing all CHLOs. - for (size_t i = 1; i <= ActiveQuicListener::kNumSessionsToCreatePerLoop + 2; ++i) { + for (size_t i = 1; i <= count; ++i) { EXPECT_FALSE(buffered_packets->HasBufferedPackets(quic::test::TestConnectionId(i))); } EXPECT_FALSE(buffered_packets->HasChlosBuffered()); - + EXPECT_FALSE(quic_dispatcher_->session_map().empty()); readFromClientSockets(); } TEST_P(ActiveQuicListenerTest, QuicProcessingDisabledAndEnabled) { + maybeConfigureMocks(/* connection_count = */ 2); EXPECT_TRUE(ActiveQuicListenerPeer::enabled(*quic_listener_)); - Runtime::LoaderSingleton::getExisting()->mergeValues({{"quic.enabled", " false"}}); sendCHLO(quic::test::TestConnectionId(1)); dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(quic_dispatcher_->session_map().size(), 1); + + Runtime::LoaderSingleton::getExisting()->mergeValues({{"quic.enabled", " false"}}); + sendCHLO(quic::test::TestConnectionId(2)); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); // If listener was enabled, there should have been session created for active connection. - EXPECT_TRUE(quic_dispatcher_->session_map().empty()); + EXPECT_EQ(quic_dispatcher_->session_map().size(), 1); EXPECT_FALSE(ActiveQuicListenerPeer::enabled(*quic_listener_)); + Runtime::LoaderSingleton::getExisting()->mergeValues({{"quic.enabled", " true"}}); - maybeConfigureMocks(/* connection_count = */ 1); - sendCHLO(quic::test::TestConnectionId(1)); + sendCHLO(quic::test::TestConnectionId(2)); dispatcher_->run(Event::Dispatcher::RunType::NonBlock); - EXPECT_FALSE(quic_dispatcher_->session_map().empty()); + EXPECT_EQ(quic_dispatcher_->session_map().size(), 2); EXPECT_TRUE(ActiveQuicListenerPeer::enabled(*quic_listener_)); } diff --git a/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc b/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc index c8714642d0609..c5f97f59fc8f5 100644 --- a/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc +++ b/test/extensions/quic_listeners/quiche/envoy_quic_dispatcher_test.cc @@ -76,7 +76,7 @@ class EnvoyQuicDispatcherTest : public QuicMultiVersionTest, per_worker_stats_({ALL_PER_HANDLER_LISTENER_STATS( POOL_COUNTER_PREFIX(listener_config_.listenerScope(), "worker."), POOL_GAUGE_PREFIX(listener_config_.listenerScope(), "worker."))}), - connection_handler_(*dispatcher_), + connection_handler_(*dispatcher_, absl::nullopt), envoy_quic_dispatcher_( &crypto_config_, quic_config_, &version_manager_, std::make_unique(*dispatcher_), diff --git a/test/extensions/quic_listeners/quiche/integration/BUILD b/test/extensions/quic_listeners/quiche/integration/BUILD index a36af5d08dee5..09dab3f9b0029 100644 --- a/test/extensions/quic_listeners/quiche/integration/BUILD +++ b/test/extensions/quic_listeners/quiche/integration/BUILD @@ -31,6 +31,7 @@ envoy_cc_test( "//test/extensions/quic_listeners/quiche:quic_test_utils_for_envoy_lib", "//test/extensions/quic_listeners/quiche:test_utils_lib", "//test/integration:http_integration_lib", + "//test/test_common:test_runtime_lib", "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/overload/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", diff --git a/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc b/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc index 6268981d4c247..b9f29d36e2c70 100644 --- a/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc +++ b/test/extensions/quic_listeners/quiche/integration/quic_http_integration_test.cc @@ -10,6 +10,7 @@ #include "test/config/utility.h" #include "test/integration/http_integration.h" #include "test/integration/ssl_utility.h" +#include "test/test_common/test_runtime.h" #include "test/test_common/utility.h" #pragma GCC diagnostic push @@ -233,6 +234,48 @@ class QuicHttpIntegrationTest : public HttpIntegrationTest, public QuicMultiVers timeSystem())); } + void testMultipleQuicConnections() { + concurrency_ = 8; + set_reuse_port_ = true; + initialize(); + std::vector codec_clients; + for (size_t i = 1; i <= concurrency_; ++i) { + // The BPF filter and ActiveQuicListener::destination() look at the 1st word of connection id + // in the packet header. And currently all QUIC versions support 8 bytes connection id. So + // create connections with the first 4 bytes of connection id different from each + // other so they should be evenly distributed. + designated_connection_ids_.push_back(quic::test::TestConnectionId(i << 32)); + codec_clients.push_back(makeHttpConnection(lookupPort("http"))); + } + constexpr auto timeout_first = std::chrono::seconds(15); + constexpr auto timeout_subsequent = std::chrono::milliseconds(10); + if (GetParam().first == Network::Address::IpVersion::v4) { + test_server_->waitForCounterEq("listener.0.0.0.0_0.downstream_cx_total", 8u, timeout_first); + } else { + test_server_->waitForCounterEq("listener.[__]_0.downstream_cx_total", 8u, timeout_first); + } + for (size_t i = 0; i < concurrency_; ++i) { + if (GetParam().first == Network::Address::IpVersion::v4) { + test_server_->waitForGaugeEq( + fmt::format("listener.0.0.0.0_0.worker_{}.downstream_cx_active", i), 1u, + timeout_subsequent); + test_server_->waitForCounterEq( + fmt::format("listener.0.0.0.0_0.worker_{}.downstream_cx_total", i), 1u, + timeout_subsequent); + } else { + test_server_->waitForGaugeEq( + fmt::format("listener.[__]_0.worker_{}.downstream_cx_active", i), 1u, + timeout_subsequent); + test_server_->waitForCounterEq( + fmt::format("listener.[__]_0.worker_{}.downstream_cx_total", i), 1u, + timeout_subsequent); + } + } + for (size_t i = 0; i < concurrency_; ++i) { + codec_clients[i]->close(); + } + } + protected: quic::QuicConfig quic_config_; quic::QuicServerId server_id_{"lyft.com", 443, false}; @@ -344,100 +387,15 @@ TEST_P(QuicHttpIntegrationTest, TestDelayedConnectionTeardownTimeoutTrigger) { 1); } -TEST_P(QuicHttpIntegrationTest, MultipleQuicListenersWithBPF) { -#if defined(SO_ATTACH_REUSEPORT_CBPF) && defined(__linux__) - concurrency_ = 8; - set_reuse_port_ = true; - initialize(); - std::vector codec_clients; - for (size_t i = 1; i <= concurrency_; ++i) { - // The BPF filter looks at the 1st word of connection id in the packet - // header. And currently all QUIC versions support 8 bytes connection id. So - // create connections with the first 4 bytes of connection id different from each - // other so they should be evenly distributed. - designated_connection_ids_.push_back(quic::test::TestConnectionId(i << 32)); - codec_clients.push_back(makeHttpConnection(lookupPort("http"))); - } - if (GetParam().first == Network::Address::IpVersion::v4) { - test_server_->waitForCounterEq("listener.0.0.0.0_0.downstream_cx_total", 8u); - } else { - test_server_->waitForCounterEq("listener.[__]_0.downstream_cx_total", 8u); - } - for (size_t i = 0; i < concurrency_; ++i) { - if (GetParam().first == Network::Address::IpVersion::v4) { - test_server_->waitForGaugeEq( - fmt::format("listener.0.0.0.0_0.worker_{}.downstream_cx_active", i), 1u); - test_server_->waitForCounterEq( - fmt::format("listener.0.0.0.0_0.worker_{}.downstream_cx_total", i), 1u); - } else { - test_server_->waitForGaugeEq(fmt::format("listener.[__]_0.worker_{}.downstream_cx_active", i), - 1u); - test_server_->waitForCounterEq( - fmt::format("listener.[__]_0.worker_{}.downstream_cx_total", i), 1u); - } - } - for (size_t i = 0; i < concurrency_; ++i) { - codec_clients[i]->close(); - } -#endif -} +TEST_P(QuicHttpIntegrationTest, MultipleQuicConnectionsWithBPF) { testMultipleQuicConnections(); } -#ifndef __APPLE__ -TEST_P(QuicHttpIntegrationTest, MultipleQuicListenersNoBPF) { - concurrency_ = 8; - set_reuse_port_ = true; - initialize(); -#ifdef SO_ATTACH_REUSEPORT_CBPF -#define SO_ATTACH_REUSEPORT_CBPF_TMP SO_ATTACH_REUSEPORT_CBPF -#undef SO_ATTACH_REUSEPORT_CBPF -#endif - std::vector codec_clients; - for (size_t i = 1; i <= concurrency_; ++i) { - // The BPF filter looks at the 1st byte of connection id in the packet - // header. And currently all QUIC versions support 8 bytes connection id. So - // create connections with the first 4 bytes of connection id different from each - // other so they should be evenly distributed. - designated_connection_ids_.push_back(quic::test::TestConnectionId(i << 32)); - codec_clients.push_back(makeHttpConnection(lookupPort("http"))); - } - if (GetParam().first == Network::Address::IpVersion::v4) { - test_server_->waitForCounterEq("listener.0.0.0.0_0.downstream_cx_total", 8u); - } else { - test_server_->waitForCounterEq("listener.[__]_0.downstream_cx_total", 8u); - } - // Even without BPF support, these connections should more or less distributed - // across different workers. - for (size_t i = 0; i < concurrency_; ++i) { - if (GetParam().first == Network::Address::IpVersion::v4) { - EXPECT_LT( - test_server_->gauge(fmt::format("listener.0.0.0.0_0.worker_{}.downstream_cx_active", i)) - ->value(), - 8u); - EXPECT_LT( - test_server_->counter(fmt::format("listener.0.0.0.0_0.worker_{}.downstream_cx_total", i)) - ->value(), - 8u); - } else { - EXPECT_LT( - test_server_->gauge(fmt::format("listener.[__]_0.worker_{}.downstream_cx_active", i)) - ->value(), - 8u); - EXPECT_LT( - test_server_->counter(fmt::format("listener.[__]_0.worker_{}.downstream_cx_total", i)) - ->value(), - 8u); - } - } - for (size_t i = 0; i < concurrency_; ++i) { - codec_clients[i]->close(); - } -#ifdef SO_ATTACH_REUSEPORT_CBPF_TMP -#define SO_ATTACH_REUSEPORT_CBPF SO_ATTACH_REUSEPORT_CBPF_TMP -#endif +TEST_P(QuicHttpIntegrationTest, MultipleQuicConnectionsNoBPF) { + config_helper_.addRuntimeOverride( + "envoy.reloadable_features.prefer_quic_kernel_bpf_packet_routing", "false"); + + testMultipleQuicConnections(); } -#endif -#if defined(SO_ATTACH_REUSEPORT_CBPF) && defined(__linux__) TEST_P(QuicHttpIntegrationTest, ConnectionMigration) { concurrency_ = 2; set_reuse_port_ = true; @@ -475,7 +433,6 @@ TEST_P(QuicHttpIntegrationTest, ConnectionMigration) { EXPECT_EQ(1024u * 2, upstream_request_->bodyLength()); cleanupUpstreamAndDownstream(); } -#endif TEST_P(QuicHttpIntegrationTest, StopAcceptingConnectionsWhenOverloaded) { initialize(); diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 2dddab7160f13..bb47d05c74448 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -446,7 +446,7 @@ FakeUpstream::FakeUpstream(Network::TransportSocketFactoryPtr&& transport_socket socket_factory_(std::make_shared(socket_)), api_(Api::createApiForTest(stats_store_)), time_system_(time_system), dispatcher_(api_->allocateDispatcher("fake_upstream")), - handler_(new Server::ConnectionHandlerImpl(*dispatcher_)), + handler_(new Server::ConnectionHandlerImpl(*dispatcher_, 0)), read_disable_on_new_connection_(true), enable_half_close_(enable_half_close), listener_(*this), filter_chain_(Network::Test::createEmptyFilterChain(std::move(transport_socket_factory))) { diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 4d405ec129a05..3d113f9b34ccf 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -29,6 +29,7 @@ #include "common/network/filter_impl.h" #include "common/network/listen_socket_impl.h" #include "common/network/udp_default_writer_config.h" +#include "common/network/udp_listener_impl.h" #include "common/stats/isolated_store_impl.h" #include "server/active_raw_udp_listener_config.h" @@ -655,9 +656,9 @@ class FakeUpstream : Logger::Loggable, public: FakeListener(FakeUpstream& parent) : parent_(parent), name_("fake_upstream"), - udp_listener_factory_(std::make_unique()), + udp_listener_factory_(std::make_unique(1)), udp_writer_factory_(std::make_unique()), - init_manager_(nullptr) {} + udp_listener_worker_router_(1), init_manager_(nullptr) {} private: // Network::ListenerConfig @@ -680,6 +681,9 @@ class FakeUpstream : Logger::Loggable, Network::UdpPacketWriterFactoryOptRef udpPacketWriterFactory() override { return Network::UdpPacketWriterFactoryOptRef(std::ref(*udp_writer_factory_)); } + Network::UdpListenerWorkerRouterOptRef udpListenerWorkerRouter() override { + return udp_listener_worker_router_; + } Network::ConnectionBalancer& connectionBalancer() override { return connection_balancer_; } envoy::config::core::v3::TrafficDirection direction() const override { return envoy::config::core::v3::UNSPECIFIED; @@ -701,6 +705,7 @@ class FakeUpstream : Logger::Loggable, Network::NopConnectionBalancerImpl connection_balancer_; const Network::ActiveUdpListenerFactoryPtr udp_listener_factory_; const Network::UdpPacketWriterFactoryPtr udp_writer_factory_; + Network::UdpListenerWorkerRouterImpl udp_listener_worker_router_; BasicResourceLimitImpl connection_resource_; const std::vector empty_access_logs_; std::unique_ptr init_manager_; diff --git a/test/mocks/event/mocks.h b/test/mocks/event/mocks.h index c4f9d6c911fee..9f85e09ea2098 100644 --- a/test/mocks/event/mocks.h +++ b/test/mocks/event/mocks.h @@ -69,9 +69,9 @@ class MockDispatcher : public Dispatcher { return Network::ListenerPtr{createListener_(std::move(socket), cb, bind_to_port, backlog_size)}; } - Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr&& socket, + Network::UdpListenerPtr createUdpListener(Network::SocketSharedPtr socket, Network::UdpListenerCallbacks& cb) override { - return Network::UdpListenerPtr{createUdpListener_(std::move(socket), cb)}; + return Network::UdpListenerPtr{createUdpListener_(socket, cb)}; } Event::TimerPtr createTimer(Event::TimerCb cb) override { @@ -118,7 +118,7 @@ class MockDispatcher : public Dispatcher { (Network::SocketSharedPtr && socket, Network::TcpListenerCallbacks& cb, bool bind_to_port, uint32_t backlog_size)); MOCK_METHOD(Network::UdpListener*, createUdpListener_, - (Network::SocketSharedPtr && socket, Network::UdpListenerCallbacks& cb)); + (Network::SocketSharedPtr socket, Network::UdpListenerCallbacks& cb)); MOCK_METHOD(Timer*, createTimer_, (Event::TimerCb cb)); MOCK_METHOD(SchedulableCallback*, createSchedulableCallback_, (std::function cb)); MOCK_METHOD(void, deferredDelete_, (DeferredDeletable * to_delete)); diff --git a/test/mocks/network/mocks.cc b/test/mocks/network/mocks.cc index 3f203fe7e3439..db6e933ec19c8 100644 --- a/test/mocks/network/mocks.cc +++ b/test/mocks/network/mocks.cc @@ -7,6 +7,7 @@ #include "common/network/address_impl.h" #include "common/network/io_socket_handle_impl.h" +#include "common/network/udp_listener_impl.h" #include "common/network/utility.h" #include "test/test_common/printers.h" @@ -25,7 +26,8 @@ namespace Envoy { namespace Network { MockListenerConfig::MockListenerConfig() - : socket_(std::make_shared>()) { + : socket_(std::make_shared>()), + udp_listener_worker_router_(std::make_unique(1)) { ON_CALL(*this, filterChainFactory()).WillByDefault(ReturnRef(filter_chain_factory_)); ON_CALL(*this, listenSocketFactory()).WillByDefault(ReturnRef(socket_factory_)); ON_CALL(socket_factory_, localAddress()).WillByDefault(ReturnRef(socket_->localAddress())); @@ -34,6 +36,9 @@ MockListenerConfig::MockListenerConfig() .WillByDefault(Return(std::reference_wrapper(*socket_))); ON_CALL(*this, listenerScope()).WillByDefault(ReturnRef(scope_)); ON_CALL(*this, name()).WillByDefault(ReturnRef(name_)); + ON_CALL(*this, udpListenerWorkerRouter()).WillByDefault(Invoke([this]() { + return UdpListenerWorkerRouterOptRef(*udp_listener_worker_router_); + })); } MockListenerConfig::~MockListenerConfig() = default; @@ -168,6 +173,8 @@ MockConnectionHandler::~MockConnectionHandler() = default; MockIp::MockIp() = default; MockIp::~MockIp() = default; +MockResolvedAddress::MockResolvedAddress(const std::string& logical, const std::string& physical) + : logical_(logical), physical_(physical) {} MockResolvedAddress::~MockResolvedAddress() = default; MockTransportSocketCallbacks::MockTransportSocketCallbacks() { @@ -175,6 +182,9 @@ MockTransportSocketCallbacks::MockTransportSocketCallbacks() { } MockTransportSocketCallbacks::~MockTransportSocketCallbacks() = default; +MockUdpPacketWriter::MockUdpPacketWriter() = default; +MockUdpPacketWriter::~MockUdpPacketWriter() = default; + MockUdpListener::MockUdpListener() { ON_CALL(*this, dispatcher()).WillByDefault(ReturnRef(dispatcher_)); } diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index 5b326e5d6de2b..88e580effa692 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -139,11 +139,14 @@ class MockUdpListenerCallbacks : public UdpListenerCallbacks { MockUdpListenerCallbacks(); ~MockUdpListenerCallbacks() override; - MOCK_METHOD(void, onData, (UdpRecvData & data)); + MOCK_METHOD(void, onData, (UdpRecvData && data)); MOCK_METHOD(void, onReadReady, ()); MOCK_METHOD(void, onWriteReady, (const Socket& socket)); MOCK_METHOD(void, onReceiveError, (Api::IoError::IoErrorCode err)); MOCK_METHOD(Network::UdpPacketWriter&, udpPacketWriter, ()); + MOCK_METHOD(uint32_t, workerIndex, (), (const)); + MOCK_METHOD(void, onDataWorker, (Network::UdpRecvData && data)); + MOCK_METHOD(void, post, (Network::UdpRecvData && data)); }; class MockDrainDecision : public DrainDecision { @@ -356,6 +359,7 @@ class MockListenerConfig : public ListenerConfig { MOCK_METHOD(const std::string&, name, (), (const)); MOCK_METHOD(Network::ActiveUdpListenerFactory*, udpListenerFactory, ()); MOCK_METHOD(Network::UdpPacketWriterFactoryOptRef, udpPacketWriterFactory, ()); + MOCK_METHOD(Network::UdpListenerWorkerRouterOptRef, udpListenerWorkerRouter, ()); MOCK_METHOD(ConnectionBalancer&, connectionBalancer, ()); MOCK_METHOD(ResourceLimit&, openConnections, ()); MOCK_METHOD(uint32_t, tcpBacklogSize, (), (const)); @@ -372,6 +376,7 @@ class MockListenerConfig : public ListenerConfig { testing::NiceMock filter_chain_factory_; MockListenSocketFactory socket_factory_; SocketSharedPtr socket_; + UdpListenerWorkerRouterPtr udp_listener_worker_router_; Stats::IsolatedStoreImpl scope_; std::string name_; const std::vector empty_access_logs_; @@ -398,6 +403,7 @@ class MockConnectionHandler : public ConnectionHandler { MOCK_METHOD(void, addListener, (absl::optional overridden_listener, ListenerConfig& config)); MOCK_METHOD(void, removeListeners, (uint64_t listener_tag)); + MOCK_METHOD(UdpListenerCallbacksOptRef, getUdpListenerCallbacks, (uint64_t listener_tag)); MOCK_METHOD(void, removeFilterChains, (uint64_t listener_tag, const std::list& filter_chains, std::function completion)); @@ -425,8 +431,7 @@ class MockIp : public Address::Ip { class MockResolvedAddress : public Address::Instance { public: - MockResolvedAddress(const std::string& logical, const std::string& physical) - : logical_(logical), physical_(physical) {} + MockResolvedAddress(const std::string& logical, const std::string& physical); ~MockResolvedAddress() override; bool operator==(const Address::Instance& other) const override { @@ -472,7 +477,8 @@ class MockTransportSocketCallbacks : public TransportSocketCallbacks { class MockUdpPacketWriter : public UdpPacketWriter { public: - MockUdpPacketWriter() = default; + MockUdpPacketWriter(); + ~MockUdpPacketWriter() override; MOCK_METHOD(Api::IoCallUint64Result, writePacket, (const Buffer::Instance& buffer, const Address::Ip* local_ip, @@ -498,6 +504,7 @@ class MockUdpListener : public UdpListener { MOCK_METHOD(Address::InstanceConstSharedPtr&, localAddress, (), (const)); MOCK_METHOD(Api::IoCallUint64Result, send, (const UdpSendData&)); MOCK_METHOD(Api::IoCallUint64Result, flush, ()); + MOCK_METHOD(void, activateRead, ()); Event::MockDispatcher dispatcher_; }; diff --git a/test/mocks/server/worker_factory.h b/test/mocks/server/worker_factory.h index 3c05ed76566c6..ca3ee983a3d4a 100644 --- a/test/mocks/server/worker_factory.h +++ b/test/mocks/server/worker_factory.h @@ -13,7 +13,7 @@ class MockWorkerFactory : public WorkerFactory { ~MockWorkerFactory() override; // Server::WorkerFactory - WorkerPtr createWorker(OverloadManager&, const std::string&) override { + WorkerPtr createWorker(uint32_t, OverloadManager&, const std::string&) override { return WorkerPtr{createWorker_()}; } diff --git a/test/per_file_coverage.sh b/test/per_file_coverage.sh index 2227ab30273c5..25bd84aa46cd2 100755 --- a/test/per_file_coverage.sh +++ b/test/per_file_coverage.sh @@ -70,7 +70,7 @@ declare -a KNOWN_LOW_COVERAGE=( "source/extensions/watchdog/abort_action:42.9" # Death tests don't report LCOV "source/server:94.6" "source/server/config_validation:76.8" -"source/server/admin:95.5" +"source/server/admin:95.3" ) [[ -z "${SRCDIR}" ]] && SRCDIR="${PWD}" diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 32396758f822a..47d58f94e256f 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -12,6 +12,7 @@ #include "common/network/io_socket_handle_impl.h" #include "common/network/raw_buffer_socket.h" #include "common/network/udp_default_writer_config.h" +#include "common/network/udp_listener_impl.h" #include "common/network/utility.h" #include "server/connection_handler_impl.h" @@ -43,7 +44,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable()), - handler_(new ConnectionHandlerImpl(dispatcher_)), + handler_(new ConnectionHandlerImpl(dispatcher_, 0)), filter_chain_(Network::Test::createEmptyFilterChainWithRawBufferSockets()), listener_filter_matcher_(std::make_shared>()), access_log_(std::make_shared()) { @@ -110,6 +111,9 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable udp_listener_factory_; std::unique_ptr udp_writer_factory_; + Network::UdpListenerWorkerRouterPtr udp_listener_worker_router_; Network::ConnectionBalancerSharedPtr connection_balancer_; BasicResourceLimitImpl open_connections_; const std::vector access_logs_; @@ -175,6 +180,7 @@ class ConnectionHandlerTest : public testing::Test, protected Logger::Loggable Network::UdpListener* { return dynamic_cast(listener); })); + listeners_.back()->udp_listener_worker_router_ = + std::make_unique(1); } if (balanced_connection_handler != nullptr) { diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 754813d39a586..9859b33b69b4f 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -867,6 +867,7 @@ pausable pcall pcap pclose +performant pfctl pipelined pipelining @@ -987,6 +988,7 @@ resolvers responder restarter resync +retransmitting retriable retriggers revalidated