diff --git a/source/extensions/quic_listeners/quiche/active_quic_listener.h b/source/extensions/quic_listeners/quiche/active_quic_listener.h index f1a5fd84c4c8e..9d92ec881701e 100644 --- a/source/extensions/quic_listeners/quiche/active_quic_listener.h +++ b/source/extensions/quic_listeners/quiche/active_quic_listener.h @@ -9,6 +9,7 @@ #include "common/protobuf/utility.h" #include "common/runtime/runtime_protos.h" +#include "server/active_udp_listener.h" #include "server/connection_handler_impl.h" #include "extensions/quic_listeners/quiche/envoy_quic_dispatcher.h" diff --git a/source/server/BUILD b/source/server/BUILD index 34d1a82a1db5f..b57723a4af175 100644 --- a/source/server/BUILD +++ b/source/server/BUILD @@ -61,6 +61,7 @@ envoy_cc_library( envoy_cc_library( name = "connection_handler_lib", deps = [ + ":active_udp_listener", ":connection_handler_impl", ], ) @@ -95,6 +96,26 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "active_udp_listener", + srcs = ["active_udp_listener.cc"], + hdrs = [ + "active_udp_listener.h", + ], + deps = [ + "//include/envoy/event:dispatcher_interface", + "//include/envoy/network:connection_handler_interface", + "//include/envoy/network:exception_interface", + "//include/envoy/network:filter_interface", + "//include/envoy/network:listen_socket_interface", + "//include/envoy/network:listener_interface", + "//include/envoy/server:active_udp_listener_config_interface", + "//include/envoy/server:listener_manager_interface", + "//source/extensions/transport_sockets:well_known_names", + "//source/server:connection_handler_impl", + ], +) + envoy_cc_library( name = "drain_manager_lib", srcs = ["drain_manager_impl.cc"], diff --git a/source/server/active_raw_udp_listener_config.cc b/source/server/active_raw_udp_listener_config.cc index cf2a947a08953..a8c872e7c6581 100644 --- a/source/server/active_raw_udp_listener_config.cc +++ b/source/server/active_raw_udp_listener_config.cc @@ -5,6 +5,7 @@ #include "envoy/api/v2/listener/listener.pb.h" +#include "server/active_udp_listener.h" #include "server/connection_handler_impl.h" #include "server/well_known_names.h" diff --git a/source/server/active_udp_listener.cc b/source/server/active_udp_listener.cc new file mode 100644 index 0000000000000..42ffa661522cc --- /dev/null +++ b/source/server/active_udp_listener.cc @@ -0,0 +1,135 @@ +#include "server/active_udp_listener.h" + +#include "envoy/network/exception.h" +#include "envoy/server/listener_manager.h" +#include "envoy/stats/scope.h" + +#include "spdlog/spdlog.h" + +namespace Envoy { +namespace Server { +ActiveUdpListenerBase::ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, + Network::Socket& listen_socket, + Network::UdpListenerPtr&& listener, + Network::ListenerConfig* config) + : ConnectionHandlerImpl::ActiveListenerImplBase(parent, config), worker_index_(worker_index), + concurrency_(concurrency), parent_(parent), listen_socket_(listen_socket), + udp_listener_(std::move(listener)) { + ASSERT(worker_index_ < concurrency_); + config_->udpListenerWorkerRouter()->get().registerWorkerForListener(*this); +} + +ActiveUdpListenerBase::~ActiveUdpListenerBase() { + config_->udpListenerWorkerRouter()->get().unregisterWorkerForListener(*this); +} + +void ActiveUdpListenerBase::post(Network::UdpRecvData&& data) { + ASSERT(!udp_listener_->dispatcher().isThreadSafe(), + "Shouldn't be posting if thread safe; use onWorkerData() instead."); + + // It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must + // bundle the socket inside a shared_ptr that can be captured. + // TODO(mattklein123): It may be possible to change the post() API such that the lambda is only + // moved, but this is non-trivial and needs investigation. + auto data_to_post = std::make_shared(); + *data_to_post = std::move(data); + + udp_listener_->dispatcher().post( + [data_to_post, tag = config_->listenerTag(), &parent = parent_]() { + Network::UdpListenerCallbacksOptRef listener = parent.getUdpListenerCallbacks(tag); + if (listener.has_value()) { + listener->get().onDataWorker(std::move(*data_to_post)); + } + }); +} + +void ActiveUdpListenerBase::onData(Network::UdpRecvData&& data) { + uint32_t dest = worker_index_; + + // For concurrency == 1, the packet will always go to the current worker. + if (concurrency_ > 1) { + dest = destination(data); + ASSERT(dest < concurrency_); + } + + if (dest == worker_index_) { + onDataWorker(std::move(data)); + } else { + config_->udpListenerWorkerRouter()->get().deliver(dest, std::move(data)); + } +} + +ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, + Event::Dispatcher& dispatcher, + Network::ListenerConfig& config) + : ActiveRawUdpListener(worker_index, concurrency, parent, + config.listenSocketFactory().getListenSocket(), dispatcher, config) {} + +ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, + Network::SocketSharedPtr listen_socket_ptr, + Event::Dispatcher& dispatcher, + Network::ListenerConfig& config) + : ActiveRawUdpListener(worker_index, concurrency, parent, *listen_socket_ptr, listen_socket_ptr, + dispatcher, config) {} + +ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, + Network::Socket& listen_socket, + Network::SocketSharedPtr listen_socket_ptr, + Event::Dispatcher& dispatcher, + Network::ListenerConfig& config) + : ActiveRawUdpListener(worker_index, concurrency, parent, listen_socket, + dispatcher.createUdpListener(listen_socket_ptr, *this), config) {} + +ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, + Network::Socket& listen_socket, + Network::UdpListenerPtr&& listener, + Network::ListenerConfig& config) + : ActiveUdpListenerBase(worker_index, concurrency, parent, listen_socket, std::move(listener), + &config), + read_filter_(nullptr) { + // Create the filter chain on creating a new udp listener + config_->filterChainFactory().createUdpListenerFilterChain(*this, *this); + + // If filter is nullptr, fail the creation of the listener + if (read_filter_ == nullptr) { + throw Network::CreateListenerException( + fmt::format("Cannot create listener as no read filter registered for the udp listener: {} ", + config_->name())); + } + + // Create udp_packet_writer + udp_packet_writer_ = config.udpPacketWriterFactory()->get().createUdpPacketWriter( + listen_socket_.ioHandle(), config.listenerScope()); +} + +void ActiveRawUdpListener::onDataWorker(Network::UdpRecvData&& data) { read_filter_->onData(data); } + +void ActiveRawUdpListener::onReadReady() {} + +void ActiveRawUdpListener::onWriteReady(const Network::Socket&) { + // TODO(sumukhs): This is not used now. When write filters are implemented, this is a + // trigger to invoke the on write ready API on the filters which is when they can write + // data. + + // Clear write_blocked_ status for udpPacketWriter. + udp_packet_writer_->setWritable(); +} + +void ActiveRawUdpListener::onReceiveError(Api::IoError::IoErrorCode error_code) { + read_filter_->onReceiveError(error_code); +} + +void ActiveRawUdpListener::addReadFilter(Network::UdpListenerReadFilterPtr&& filter) { + ASSERT(read_filter_ == nullptr, "Cannot add a 2nd UDP read filter"); + read_filter_ = std::move(filter); +} + +Network::UdpListener& ActiveRawUdpListener::udpListener() { return *udp_listener_; } + +} // namespace Server +} // namespace Envoy diff --git a/source/server/active_udp_listener.h b/source/server/active_udp_listener.h new file mode 100644 index 0000000000000..f163bba113241 --- /dev/null +++ b/source/server/active_udp_listener.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include + +#include "envoy/network/connection_handler.h" +#include "envoy/network/filter.h" +#include "envoy/network/listen_socket.h" +#include "envoy/network/listener.h" +#include "envoy/server/active_udp_listener_config.h" + +// TODO(lambdai): remove connection_handler_impl after ActiveListenerImplBase is extracted from it. +#include "server/connection_handler_impl.h" + +namespace Envoy { +namespace Server { + +class ActiveUdpListenerBase : public ConnectionHandlerImpl::ActiveListenerImplBase, + public Network::ConnectionHandler::ActiveUdpListener { +public: + ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, Network::Socket& listen_socket, + Network::UdpListenerPtr&& listener, Network::ListenerConfig* config); + ~ActiveUdpListenerBase() override; + + // Network::UdpListenerCallbacks + void onData(Network::UdpRecvData&& data) final; + uint32_t workerIndex() const final { return worker_index_; } + void post(Network::UdpRecvData&& data) final; + + // ActiveListenerImplBase + Network::Listener* listener() override { return udp_listener_.get(); } + +protected: + uint32_t destination(const Network::UdpRecvData& /*data*/) const override { + // By default, route to the current worker. + return worker_index_; + } + + const uint32_t worker_index_; + const uint32_t concurrency_; + Network::UdpConnectionHandler& parent_; + Network::Socket& listen_socket_; + Network::UdpListenerPtr udp_listener_; +}; + +/** + * Wrapper for an active udp listener owned by this handler. + */ +class ActiveRawUdpListener : public ActiveUdpListenerBase, + public Network::UdpListenerFilterManager, + public Network::UdpReadFilterCallbacks { +public: + ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, Event::Dispatcher& dispatcher, + Network::ListenerConfig& config); + ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, + Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher, + Network::ListenerConfig& config); + ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, Network::Socket& listen_socket, + Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher, + Network::ListenerConfig& config); + ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, + Network::UdpConnectionHandler& parent, Network::Socket& listen_socket, + Network::UdpListenerPtr&& listener, Network::ListenerConfig& config); + + // Network::UdpListenerCallbacks + void onReadReady() override; + void onWriteReady(const Network::Socket& socket) override; + void onReceiveError(Api::IoError::IoErrorCode error_code) override; + Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; } + + // Network::UdpWorker + void onDataWorker(Network::UdpRecvData&& data) override; + + // ActiveListenerImplBase + void pauseListening() override { udp_listener_->disable(); } + void resumeListening() override { udp_listener_->enable(); } + void shutdownListener() override { + // The read filter should be deleted before the UDP listener is deleted. + // The read filter refers to the UDP listener to send packets to downstream. + // If the UDP listener is deleted before the read filter, the read filter may try to use it + // after deletion. + read_filter_.reset(); + udp_listener_.reset(); + } + + // Network::UdpListenerFilterManager + void addReadFilter(Network::UdpListenerReadFilterPtr&& filter) override; + + // Network::UdpReadFilterCallbacks + Network::UdpListener& udpListener() override; + +private: + Network::UdpListenerReadFilterPtr read_filter_; + Network::UdpPacketWriterPtr udp_packet_writer_; +}; + +} // namespace Server +} // namespace Envoy diff --git a/source/server/connection_handler_impl.cc b/source/server/connection_handler_impl.cc index 1b6599d53ca8b..53d12903f9e1e 100644 --- a/source/server/connection_handler_impl.cc +++ b/source/server/connection_handler_impl.cc @@ -651,128 +651,5 @@ ConnectionHandlerImpl::ActiveTcpConnection::~ActiveTcpConnection() { listener.parent_.decNumConnections(); } -ActiveUdpListenerBase::ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, - Network::Socket& listen_socket, - Network::UdpListenerPtr&& listener, - Network::ListenerConfig* config) - : ConnectionHandlerImpl::ActiveListenerImplBase(parent, config), worker_index_(worker_index), - concurrency_(concurrency), parent_(parent), listen_socket_(listen_socket), - udp_listener_(std::move(listener)) { - ASSERT(worker_index_ < concurrency_); - config_->udpListenerWorkerRouter()->get().registerWorkerForListener(*this); -} - -ActiveUdpListenerBase::~ActiveUdpListenerBase() { - config_->udpListenerWorkerRouter()->get().unregisterWorkerForListener(*this); -} - -void ActiveUdpListenerBase::post(Network::UdpRecvData&& data) { - ASSERT(!udp_listener_->dispatcher().isThreadSafe(), - "Shouldn't be posting if thread safe; use onWorkerData() instead."); - - // It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must - // bundle the socket inside a shared_ptr that can be captured. - // TODO(mattklein123): It may be possible to change the post() API such that the lambda is only - // moved, but this is non-trivial and needs investigation. - auto data_to_post = std::make_shared(); - *data_to_post = std::move(data); - - udp_listener_->dispatcher().post( - [data_to_post, tag = config_->listenerTag(), &parent = parent_]() { - Network::UdpListenerCallbacksOptRef listener = parent.getUdpListenerCallbacks(tag); - if (listener.has_value()) { - listener->get().onDataWorker(std::move(*data_to_post)); - } - }); -} - -void ActiveUdpListenerBase::onData(Network::UdpRecvData&& data) { - uint32_t dest = worker_index_; - - // For concurrency == 1, the packet will always go to the current worker. - if (concurrency_ > 1) { - dest = destination(data); - ASSERT(dest < concurrency_); - } - - if (dest == worker_index_) { - onDataWorker(std::move(data)); - } else { - config_->udpListenerWorkerRouter()->get().deliver(dest, std::move(data)); - } -} - -ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, - Event::Dispatcher& dispatcher, - Network::ListenerConfig& config) - : ActiveRawUdpListener(worker_index, concurrency, parent, - config.listenSocketFactory().getListenSocket(), dispatcher, config) {} - -ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, - Network::SocketSharedPtr listen_socket_ptr, - Event::Dispatcher& dispatcher, - Network::ListenerConfig& config) - : ActiveRawUdpListener(worker_index, concurrency, parent, *listen_socket_ptr, listen_socket_ptr, - dispatcher, config) {} - -ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, - Network::Socket& listen_socket, - Network::SocketSharedPtr listen_socket_ptr, - Event::Dispatcher& dispatcher, - Network::ListenerConfig& config) - : ActiveRawUdpListener(worker_index, concurrency, parent, listen_socket, - dispatcher.createUdpListener(listen_socket_ptr, *this), config) {} - -ActiveRawUdpListener::ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, - Network::Socket& listen_socket, - Network::UdpListenerPtr&& listener, - Network::ListenerConfig& config) - : ActiveUdpListenerBase(worker_index, concurrency, parent, listen_socket, std::move(listener), - &config), - read_filter_(nullptr) { - // Create the filter chain on creating a new udp listener - config_->filterChainFactory().createUdpListenerFilterChain(*this, *this); - - // If filter is nullptr, fail the creation of the listener - if (read_filter_ == nullptr) { - throw Network::CreateListenerException( - fmt::format("Cannot create listener as no read filter registered for the udp listener: {} ", - config_->name())); - } - - // Create udp_packet_writer - udp_packet_writer_ = config.udpPacketWriterFactory()->get().createUdpPacketWriter( - listen_socket_.ioHandle(), config.listenerScope()); -} - -void ActiveRawUdpListener::onDataWorker(Network::UdpRecvData&& data) { read_filter_->onData(data); } - -void ActiveRawUdpListener::onReadReady() {} - -void ActiveRawUdpListener::onWriteReady(const Network::Socket&) { - // TODO(sumukhs): This is not used now. When write filters are implemented, this is a - // trigger to invoke the on write ready API on the filters which is when they can write - // data. - - // Clear write_blocked_ status for udpPacketWriter. - udp_packet_writer_->setWritable(); -} - -void ActiveRawUdpListener::onReceiveError(Api::IoError::IoErrorCode error_code) { - read_filter_->onReceiveError(error_code); -} - -void ActiveRawUdpListener::addReadFilter(Network::UdpListenerReadFilterPtr&& filter) { - ASSERT(read_filter_ == nullptr, "Cannot add a 2nd UDP read filter"); - read_filter_ = std::move(filter); -} - -Network::UdpListener& ActiveRawUdpListener::udpListener() { return *udp_listener_; } - } // namespace Server } // namespace Envoy diff --git a/source/server/connection_handler_impl.h b/source/server/connection_handler_impl.h index 8e15b621704a6..e0352039b9a8a 100644 --- a/source/server/connection_handler_impl.h +++ b/source/server/connection_handler_impl.h @@ -375,87 +375,6 @@ class ConnectionHandlerImpl : public Network::TcpConnectionHandler, bool disable_listeners_; UnitFloat listener_reject_fraction_{UnitFloat::min()}; }; -class ActiveUdpListenerBase : public ConnectionHandlerImpl::ActiveListenerImplBase, - public Network::ConnectionHandler::ActiveUdpListener { -public: - ActiveUdpListenerBase(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, Network::Socket& listen_socket, - Network::UdpListenerPtr&& listener, Network::ListenerConfig* config); - ~ActiveUdpListenerBase() override; - - // Network::UdpListenerCallbacks - void onData(Network::UdpRecvData&& data) final; - uint32_t workerIndex() const final { return worker_index_; } - void post(Network::UdpRecvData&& data) final; - - // ActiveListenerImplBase - Network::Listener* listener() override { return udp_listener_.get(); } - -protected: - uint32_t destination(const Network::UdpRecvData& /*data*/) const override { - // By default, route to the current worker. - return worker_index_; - } - - const uint32_t worker_index_; - const uint32_t concurrency_; - Network::UdpConnectionHandler& parent_; - Network::Socket& listen_socket_; - Network::UdpListenerPtr udp_listener_; -}; - -/** - * Wrapper for an active udp listener owned by this handler. - */ -class ActiveRawUdpListener : public ActiveUdpListenerBase, - public Network::UdpListenerFilterManager, - public Network::UdpReadFilterCallbacks { -public: - ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, Event::Dispatcher& dispatcher, - Network::ListenerConfig& config); - ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, - Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher, - Network::ListenerConfig& config); - ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, Network::Socket& listen_socket, - Network::SocketSharedPtr listen_socket_ptr, Event::Dispatcher& dispatcher, - Network::ListenerConfig& config); - ActiveRawUdpListener(uint32_t worker_index, uint32_t concurrency, - Network::UdpConnectionHandler& parent, Network::Socket& listen_socket, - Network::UdpListenerPtr&& listener, Network::ListenerConfig& config); - - // Network::UdpListenerCallbacks - void onReadReady() override; - void onWriteReady(const Network::Socket& socket) override; - void onReceiveError(Api::IoError::IoErrorCode error_code) override; - Network::UdpPacketWriter& udpPacketWriter() override { return *udp_packet_writer_; } - - // Network::UdpWorker - void onDataWorker(Network::UdpRecvData&& data) override; - - // ActiveListenerImplBase - void pauseListening() override { udp_listener_->disable(); } - void resumeListening() override { udp_listener_->enable(); } - void shutdownListener() override { - // The read filter should be deleted before the UDP listener is deleted. - // The read filter refers to the UDP listener to send packets to downstream. - // If the UDP listener is deleted before the read filter, the read filter may try to use it - // after deletion. - read_filter_.reset(); - udp_listener_.reset(); - } - - // Network::UdpListenerFilterManager - void addReadFilter(Network::UdpListenerReadFilterPtr&& filter) override; - - // Network::UdpReadFilterCallbacks - Network::UdpListener& udpListener() override; -private: - Network::UdpListenerReadFilterPtr read_filter_; - Network::UdpPacketWriterPtr udp_packet_writer_; -}; } // namespace Server } // namespace Envoy