diff --git a/CODEOWNERS b/CODEOWNERS index 5e900f8c4d4b4..7b12732d6e02c 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -163,6 +163,8 @@ extensions/filters/http/oauth2 @rgs1 @derekargueta @snowp /*/extensions/matching/common_inputs/environment @snowp @donyu # user space socket pair, event, connection and listener /*/extensions/io_socket/user_space @lambdai @antoniovicente +# io_uring-based IO socket +/*/extensions/io_socket/io_uring @rojkov @antoniovicente /*/extensions/bootstrap/internal_listener @lambdai @adisuissa # Default UUID4 request ID extension /*/extensions/request_id/uuid @mattklein123 @alyssawilk diff --git a/api/envoy/extensions/network/socket_interface/v3/io_uring_socket_interface.proto b/api/envoy/extensions/network/socket_interface/v3/io_uring_socket_interface.proto new file mode 100644 index 0000000000000..9a7ac412af758 --- /dev/null +++ b/api/envoy/extensions/network/socket_interface/v3/io_uring_socket_interface.proto @@ -0,0 +1,39 @@ +syntax = "proto3"; + +package envoy.extensions.network.socket_interface.v3; + +import "google/protobuf/wrappers.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.network.socket_interface.v3"; +option java_outer_classname = "IoUringSocketInterfaceProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/network/socket_interface/v3;socket_interfacev3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: ``io_uring`` Socket Interface configuration] + +// Configuration for a socket interface that relies on Linux specific ``io_uring`` API to create +// sockets. +message IoUringSocketInterface { + // The size of read buffer. If not set, defaults to 8192. + google.protobuf.UInt32Value read_buffer_size = 1 [(validate.rules).uint32 = {gte: 4096}]; + + // The size of both submission and completion queues in queue entries. For heavily loaded + // processes 300 queue entries is a good enough value. If the load is not high and memory + // is a constraint then it's safe to have smaller queues. If not set, defaults to 300 + // queue entries. + google.protobuf.UInt32Value io_uring_size = 2 [(validate.rules).uint32 = {gte: 16}]; + + // When this flag is specified, a kernel thread is created to perform submission queue + // polling. An ``io_uring`` instance configured in this way enables ``io_uring`` sockets to + // issue I/O without ever context switching into the kernel and with better latency. + // + // Please note that the polling kernel thread will waste CPU cycles after the ``io_uring`` + // instance becomes inactive for a grace period which is set to 1 second currently. The + // polling kernel thread will be started automatically as soon as the ``io_uring`` instance + // becomes active again. + bool use_submission_queue_polling = 3; +} diff --git a/docs/root/api-v3/bootstrap/bootstrap.rst b/docs/root/api-v3/bootstrap/bootstrap.rst index 8082028c72817..52633bf19c478 100644 --- a/docs/root/api-v3/bootstrap/bootstrap.rst +++ b/docs/root/api-v3/bootstrap/bootstrap.rst @@ -6,6 +6,7 @@ Bootstrap :maxdepth: 2 ../config/bootstrap/v3/bootstrap.proto + ../extensions/network/socket_interface/v3/io_uring_socket_interface.proto ../config/metrics/v3/stats.proto ../config/metrics/v3/metrics_service.proto ../config/overload/v3/overload.proto diff --git a/docs/root/configuration/other_features/io_uring.rst b/docs/root/configuration/other_features/io_uring.rst new file mode 100644 index 0000000000000..0d772e3ad500c --- /dev/null +++ b/docs/root/configuration/other_features/io_uring.rst @@ -0,0 +1,26 @@ +.. _config_sock_interface_io_uring: + +io_uring Socket Interface +========================= + +* :ref:`v3 API reference ` + +.. attention:: + + The io_uring socket interface extension is experimental and is currently under active development. + +io_uring is an asynchronous I/O API implemented in the Linux kernel. +This socket interface uses [liburing](https://github.com/axboe/liburing) to integrate io_uring +with Envoy. + +Example configuration +--------------------- + +.. code-block:: yaml + + bootstrap_extensions: + - name: envoy.extensions.io_socket.io_uring + typed_config: + "@type": type.googleapis.com/envoy.extensions.network.socket_interface.v3.IoUringSocketInterface + default_socket_interface: "envoy.extensions.network.socket_interface.io_uring" + diff --git a/docs/root/configuration/other_features/other_features.rst b/docs/root/configuration/other_features/other_features.rst index c93d34e2303e0..65c42185525be 100644 --- a/docs/root/configuration/other_features/other_features.rst +++ b/docs/root/configuration/other_features/other_features.rst @@ -5,6 +5,7 @@ Other features :maxdepth: 2 internal_listener + io_uring rate_limit vcl wasm diff --git a/docs/root/faq/windows/win_not_supported_features.rst b/docs/root/faq/windows/win_not_supported_features.rst index 76c1b8da8467a..b5fcb41f580f3 100644 --- a/docs/root/faq/windows/win_not_supported_features.rst +++ b/docs/root/faq/windows/win_not_supported_features.rst @@ -10,6 +10,7 @@ The most notable features that are not supported on Windows are: * :ref:`Hot restart ` * :ref:`Signed Exchange Filter ` * :ref:`VCL Socket Interface ` +* :ref:`io_uring Socket Interface ` There are certain Envoy features that require newer versions of Windows. These features explicitly document the required version. diff --git a/source/common/event/file_event_impl.cc b/source/common/event/file_event_impl.cc index 4c68e6193fc42..138f27c7d3b75 100644 --- a/source/common/event/file_event_impl.cc +++ b/source/common/event/file_event_impl.cc @@ -3,7 +3,6 @@ #include #include "source/common/common/assert.h" -#include "source/common/event/dispatcher_impl.h" #include "event2/event.h" diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index c8a7d11807317..a975a9d2e7680 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -272,6 +272,7 @@ EXTENSIONS = { "envoy.io_socket.user_space": "//source/extensions/io_socket/user_space:config", "envoy.bootstrap.internal_listener": "//source/extensions/bootstrap/internal_listener:config", + "envoy.io_socket.io_uring": "//source/extensions/io_socket/io_uring:config", # # TLS peer certification validators diff --git a/source/extensions/extensions_metadata.yaml b/source/extensions/extensions_metadata.yaml index 3a337763da094..0ab66d443d1b0 100644 --- a/source/extensions/extensions_metadata.yaml +++ b/source/extensions/extensions_metadata.yaml @@ -508,6 +508,11 @@ envoy.internal_redirect_predicates.safe_cross_scheme: - envoy.internal_redirect_predicates security_posture: robust_to_untrusted_downstream_and_upstream status: stable +envoy.io_socket.io_uring: + categories: + - envoy.io_socket + security_posture: unknown + status: wip envoy.io_socket.user_space: categories: - envoy.io_socket diff --git a/source/extensions/io_socket/io_uring/BUILD b/source/extensions/io_socket/io_uring/BUILD new file mode 100644 index 0000000000000..eb0fdeb3f8fe9 --- /dev/null +++ b/source/extensions/io_socket/io_uring/BUILD @@ -0,0 +1,43 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_extension_package() + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":io_handle_impl_lib", + "//source/common/network:socket_interface_lib", + "@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto", + ] + select({ + "//bazel:linux": ["//source/common/io:io_uring_impl_lib"], + "//conditions:default": [], + }), +) + +envoy_cc_library( + name = "io_handle_impl_lib", + srcs = [ + "io_handle_impl.cc", + ], + hdrs = [ + "io_handle_impl.h", + ], + deps = [ + "//envoy/event:dispatcher_interface", + "//envoy/network:io_handle_interface", + "//source/common/api:os_sys_calls_lib", + "//source/common/buffer:buffer_lib", + "//source/common/io:io_uring_interface", + "//source/common/network:address_lib", + "//source/common/network:io_socket_error_lib", + ], +) diff --git a/source/extensions/io_socket/io_uring/config.cc b/source/extensions/io_socket/io_uring/config.cc new file mode 100644 index 0000000000000..dcbc66aad95c0 --- /dev/null +++ b/source/extensions/io_socket/io_uring/config.cc @@ -0,0 +1,123 @@ +#include "source/extensions/io_socket/io_uring/config.h" + +#include "envoy/common/platform.h" +#include "envoy/extensions/network/socket_interface/v3/io_uring_socket_interface.pb.validate.h" + +#include "source/common/api/os_sys_calls_impl.h" +#include "source/common/io/io_uring_impl.h" +#include "source/extensions/io_socket/io_uring/io_handle_impl.h" + +namespace Envoy { +namespace Extensions { +namespace IoSocket { +namespace IoUring { + +namespace { + +constexpr uint32_t DefaultIoUringSize = 300; +constexpr uint32_t DefaultReadBufferSize = 8192; + +} // namespace + +void SocketInterfaceExtension::onServerInitialized() { factory_.onServerInitialized(); } + +Network::IoHandlePtr +SocketInterfaceImpl::socket(Network::Socket::Type socket_type, Network::Address::Type addr_type, + Network::Address::IpVersion version, bool socket_v6only, + const Network::SocketCreationOptions& options) const { + int protocol = 0; + int flags = 0; + + if (options.mptcp_enabled_) { + ASSERT(socket_type == Network::Socket::Type::Stream); + ASSERT(addr_type == Network::Address::Type::Ip); + protocol = IPPROTO_MPTCP; + } + + if (socket_type == Network::Socket::Type::Stream) { + flags |= SOCK_STREAM; + } else { + flags |= SOCK_DGRAM; + } + + int domain; + if (addr_type == Network::Address::Type::Ip) { + if (version == Network::Address::IpVersion::v6) { + domain = AF_INET6; + } else { + ASSERT(version == Network::Address::IpVersion::v4); + domain = AF_INET; + } + } else if (addr_type == Network::Address::Type::Pipe) { + domain = AF_UNIX; + } else { + PANIC("not implemented"); + } + + const Api::SysCallSocketResult result = + Api::OsSysCallsSingleton::get().socket(domain, flags, protocol); + RELEASE_ASSERT(SOCKET_VALID(result.return_value_), + fmt::format("socket(2) failed, got error: {}", errorDetails(result.errno_))); + + ASSERT(io_uring_factory_ != nullptr); + return std::make_unique(read_buffer_size_, *io_uring_factory_, + result.return_value_, socket_v6only, domain); +} + +Network::IoHandlePtr +SocketInterfaceImpl::socket(Network::Socket::Type socket_type, + const Network::Address::InstanceConstSharedPtr addr, + const Network::SocketCreationOptions& options) const { + Network::Address::IpVersion ip_version = + addr->ip() ? addr->ip()->version() : Network::Address::IpVersion::v4; + int v6only = 0; + if (addr->type() == Network::Address::Type::Ip && ip_version == Network::Address::IpVersion::v6) { + v6only = addr->ip()->ipv6()->v6only(); + } + + Network::IoHandlePtr io_handle = + SocketInterfaceImpl::socket(socket_type, addr->type(), ip_version, v6only, options); + if (addr->type() == Network::Address::Type::Ip && ip_version == Network::Address::IpVersion::v6) { + // Setting IPV6_V6ONLY restricts the IPv6 socket to IPv6 connections only. + const Api::SysCallIntResult result = io_handle->setOption( + IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast(&v6only), sizeof(v6only)); + RELEASE_ASSERT(!SOCKET_FAILURE(result.return_value_), ""); + } + return io_handle; +} + +bool SocketInterfaceImpl::ipFamilySupported(int domain) { + Api::OsSysCalls& os_sys_calls = Api::OsSysCallsSingleton::get(); + const Api::SysCallSocketResult result = os_sys_calls.socket(domain, SOCK_STREAM, 0); + if (SOCKET_VALID(result.return_value_)) { + RELEASE_ASSERT( + os_sys_calls.close(result.return_value_).return_value_ == 0, + fmt::format("Fail to close fd: response code {}", errorDetails(result.return_value_))); + } + return SOCKET_VALID(result.return_value_); +} + +Server::BootstrapExtensionPtr SocketInterfaceImpl::createBootstrapExtension( + const Protobuf::Message& message, Server::Configuration::ServerFactoryContext& context) { + auto config = MessageUtil::downcastAndValidate< + const envoy::extensions::network::socket_interface::v3::IoUringSocketInterface&>( + message, context.messageValidationContext().staticValidationVisitor()); + read_buffer_size_ = + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, read_buffer_size, DefaultReadBufferSize); + io_uring_factory_ = std::make_unique( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, io_uring_size, DefaultIoUringSize), + config.use_submission_queue_polling(), context.threadLocal()); + return std::make_unique(*this, *io_uring_factory_); +} + +ProtobufTypes::MessagePtr SocketInterfaceImpl::createEmptyConfigProto() { + return std::make_unique< + envoy::extensions::network::socket_interface::v3::IoUringSocketInterface>(); +} + +REGISTER_FACTORY(SocketInterfaceImpl, Server::Configuration::BootstrapExtensionFactory); + +} // namespace IoUring +} // namespace IoSocket +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/io_socket/io_uring/config.h b/source/extensions/io_socket/io_uring/config.h new file mode 100644 index 0000000000000..f1e52ad3dec0b --- /dev/null +++ b/source/extensions/io_socket/io_uring/config.h @@ -0,0 +1,60 @@ +#pragma once + +#include "envoy/extensions/network/socket_interface/v3/io_uring_socket_interface.pb.h" + +#include "source/common/network/socket_interface.h" + +namespace Envoy { + +namespace Io { +class IoUringFactory; +} // namespace Io + +namespace Extensions { +namespace IoSocket { +namespace IoUring { + +class SocketInterfaceExtension : public Network::SocketInterfaceExtension { +public: + SocketInterfaceExtension(Network::SocketInterface& sock_interface, Io::IoUringFactory& factory) + : Network::SocketInterfaceExtension(sock_interface), factory_(factory) {} + + // Server::BootstrapExtension + void onServerInitialized() override; + +protected: + Io::IoUringFactory& factory_; +}; + +class SocketInterfaceImpl : public Network::SocketInterfaceBase { +public: + // SocketInterface + Network::IoHandlePtr socket(Network::Socket::Type socket_type, Network::Address::Type addr_type, + Network::Address::IpVersion version, bool socket_v6only, + const Network::SocketCreationOptions& options) const override; + Network::IoHandlePtr socket(Network::Socket::Type socket_type, + const Network::Address::InstanceConstSharedPtr addr, + const Network::SocketCreationOptions& options) const override; + bool ipFamilySupported(int domain) override; + + // Server::Configuration::BootstrapExtensionFactory + Server::BootstrapExtensionPtr + createBootstrapExtension(const Protobuf::Message& message, + Server::Configuration::ServerFactoryContext& context) override; + + ProtobufTypes::MessagePtr createEmptyConfigProto() override; + std::string name() const override { + return "envoy.extensions.network.socket_interface.io_uring"; + }; + +private: + uint32_t read_buffer_size_; + std::unique_ptr io_uring_factory_; +}; + +DECLARE_FACTORY(SocketInterfaceImpl); + +} // namespace IoUring +} // namespace IoSocket +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/io_socket/io_uring/io_handle_impl.cc b/source/extensions/io_socket/io_uring/io_handle_impl.cc new file mode 100644 index 0000000000000..4699a6836df23 --- /dev/null +++ b/source/extensions/io_socket/io_uring/io_handle_impl.cc @@ -0,0 +1,480 @@ +#include "source/extensions/io_socket/io_uring/io_handle_impl.h" + +#include "envoy/buffer/buffer.h" +#include "envoy/common/exception.h" +#include "envoy/event/dispatcher.h" + +#include "source/common/api/os_sys_calls_impl.h" +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/assert.h" +#include "source/common/common/utility.h" +#include "source/common/io/io_uring.h" +#include "source/common/network/address_impl.h" +#include "source/common/network/io_socket_error_impl.h" + +namespace Envoy { +namespace Extensions { +namespace IoSocket { +namespace IoUring { + +namespace { + +constexpr socklen_t udsAddressLength() { return sizeof(sa_family_t); } + +} // namespace + +IoUringSocketHandleImpl::IoUringSocketHandleImpl(const uint32_t read_buffer_size, + const Io::IoUringFactory& io_uring_factory, + os_fd_t fd, bool socket_v6only, + absl::optional domain) + : read_buffer_size_(read_buffer_size), io_uring_factory_(io_uring_factory), fd_(fd), + socket_v6only_(socket_v6only), domain_(domain) {} + +IoUringSocketHandleImpl::~IoUringSocketHandleImpl() { + if (SOCKET_VALID(fd_)) { + // The TLS slot has been shut down by this moment with IoUring wiped out, thus + // better use this posix system call instead of IoUringSocketHandleImpl::close(). + ::close(fd_); + } +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::close() { + ASSERT(SOCKET_VALID(fd_)); + auto req = new Request{absl::nullopt, RequestType::Close}; + Io::IoUringResult res = io_uring_factory_.getOrCreate().prepareClose(fd_, req); + if (res == Io::IoUringResult::Failed) { + // Fall back to posix system call. + ::close(fd_); + } + if (isLeader()) { + io_uring_factory_.getOrCreate().unregisterEventfd(); + file_event_adapter_.reset(); + } + SET_SOCKET_INVALID(fd_); + return Api::ioCallUint64ResultNoError(); +} + +bool IoUringSocketHandleImpl::isOpen() const { return SOCKET_VALID(fd_); } +Api::IoCallUint64Result IoUringSocketHandleImpl::readv(uint64_t /* max_length */, + Buffer::RawSlice* /* slices */, + uint64_t /* num_slice */) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::read(Buffer::Instance& buffer, + absl::optional max_length_opt) { + const uint64_t max_length = max_length_opt.value_or(UINT64_MAX); + if (max_length == 0) { + return Api::ioCallUint64ResultNoError(); + } + + if (bytes_to_read_ == 0) { + return Api::IoCallUint64Result( + 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)); + } + + ASSERT(read_buf_ != nullptr); + auto fragment = new Buffer::BufferFragmentImpl( + read_buf_.release(), bytes_to_read_, + [](const void* data, size_t /*len*/, const Buffer::BufferFragmentImpl* this_fragment) { + delete[] reinterpret_cast(data); + delete this_fragment; + }); + buffer.addBufferFragment(*fragment); + is_read_added_ = false; + + uint64_t len = bytes_to_read_; + bytes_to_read_ = 0; + return Api::IoCallUint64Result(len, + Api::IoErrorPtr(nullptr, Network::IoSocketError::deleteIoError)); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::writev(const Buffer::RawSlice* /*slices */, + uint64_t /*num_slice*/) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::write(Buffer::Instance& buffer) { + auto length = buffer.length(); + ASSERT(length > 0); + + std::list slices; + while (buffer.length() > 0) { + // The buffer must not own the data after it has been extracted and put into + // the `io-uring` submission queue to avoid freeing it before the writev + // operation is completed. + Buffer::SliceDataPtr data = buffer.extractMutableFrontSlice(); + slices.push_back(std::move(data)); + } + + uint32_t nr_vecs = slices.size(); + struct iovec* iovecs = new struct iovec[slices.size()]; + struct iovec* iov = iovecs; + for (auto& slice : slices) { + absl::Span mdata = slice->getMutableData(); + iov->iov_base = mdata.data(); + iov->iov_len = mdata.size(); + iov++; + } + + auto req = new Request{*this, RequestType::Write, iovecs, std::move(slices)}; + auto& uring = io_uring_factory_.getOrCreate(); + auto res = uring.prepareWritev(fd_, iovecs, nr_vecs, 0, req); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + uring.submit(); + res = uring.prepareWritev(fd_, iovecs, nr_vecs, 0, req); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare writev"); + } + return Api::IoCallUint64Result(length, + Api::IoErrorPtr(nullptr, Network::IoSocketError::deleteIoError)); +} + +Api::IoCallUint64Result +IoUringSocketHandleImpl::sendmsg(const Buffer::RawSlice* /*slices*/, uint64_t /*num_slice*/, + int /*flags*/, const Network::Address::Ip* /*self_ip*/, + const Network::Address::Instance& /*peer_address*/) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::recvmsg(Buffer::RawSlice* /*slices*/, + const uint64_t /*num_slice*/, + uint32_t /*self_port*/, + RecvMsgOutput& /*output*/) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::recvmmsg(RawSliceArrays& /*slices*/, + uint32_t /*self_port*/, + RecvMsgOutput& /*output*/) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::recv(void* /*buffer*/, size_t /*length*/, + int /*flags*/) { + PANIC("not implemented"); +} + +bool IoUringSocketHandleImpl::supportsMmsg() const { PANIC("not implemented"); } + +bool IoUringSocketHandleImpl::supportsUdpGro() const { PANIC("not implemented"); } + +Api::SysCallIntResult +IoUringSocketHandleImpl::bind(Network::Address::InstanceConstSharedPtr address) { + return Api::OsSysCallsSingleton::get().bind(fd_, address->sockAddr(), address->sockAddrLen()); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::listen(int backlog) { + file_event_adapter_ = + std::make_unique(read_buffer_size_, io_uring_factory_, fd_); + return Api::OsSysCallsSingleton::get().listen(fd_, backlog); +} + +Network::IoHandlePtr IoUringSocketHandleImpl::accept(struct sockaddr* addr, socklen_t* addrlen) { + return file_event_adapter_->accept(addr, addrlen); +} + +Api::SysCallIntResult +IoUringSocketHandleImpl::connect(Network::Address::InstanceConstSharedPtr address) { + auto& uring = io_uring_factory_.getOrCreate(); + auto req = new Request{*this, RequestType::Connect}; + auto res = uring.prepareConnect(fd_, address, req); + if (res == Io::IoUringResult::Failed) { + res = uring.submit(); + if (res == Io::IoUringResult::Busy) { + return Api::SysCallIntResult{0, SOCKET_ERROR_AGAIN}; + } + res = uring.prepareConnect(fd_, address, req); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare connect"); + } + if (isLeader()) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + uring.submit(); + } + return Api::SysCallIntResult{0, SOCKET_ERROR_IN_PROGRESS}; +} + +Api::SysCallIntResult IoUringSocketHandleImpl::setOption(int level, int optname, const void* optval, + socklen_t optlen) { + return Api::OsSysCallsSingleton::get().setsockopt(fd_, level, optname, optval, optlen); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::getOption(int level, int optname, void* optval, + socklen_t* optlen) { + return Api::OsSysCallsSingleton::get().getsockopt(fd_, level, optname, optval, optlen); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::ioctl(unsigned long, void*, unsigned long, void*, + unsigned long, unsigned long*) { + PANIC("not implemented"); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::setBlocking(bool /*blocking*/) { + PANIC("not implemented"); +} + +absl::optional IoUringSocketHandleImpl::domain() { return domain_; } + +Network::Address::InstanceConstSharedPtr IoUringSocketHandleImpl::localAddress() { + // TODO(rojkov): This is a copy-paste from Network::IoSocketHandleImpl. + // Unification is needed. + sockaddr_storage ss; + socklen_t ss_len = sizeof(ss); + auto& os_sys_calls = Api::OsSysCallsSingleton::get(); + Api::SysCallIntResult result = + os_sys_calls.getsockname(fd_, reinterpret_cast(&ss), &ss_len); + if (result.return_value_ != 0) { + throw EnvoyException(fmt::format("getsockname failed for '{}': ({}) {}", fd_, result.errno_, + errorDetails(result.errno_))); + } + return Network::Address::addressFromSockAddrOrThrow(ss, ss_len, socket_v6only_); +} + +Network::Address::InstanceConstSharedPtr IoUringSocketHandleImpl::peerAddress() { + // TODO(rojkov): This is a copy-paste from Network::IoSocketHandleImpl. + // Unification is needed. + sockaddr_storage ss; + socklen_t ss_len = sizeof ss; + auto& os_sys_calls = Api::OsSysCallsSingleton::get(); + Api::SysCallIntResult result = + os_sys_calls.getpeername(fd_, reinterpret_cast(&ss), &ss_len); + if (result.return_value_ != 0) { + throw EnvoyException( + fmt::format("getpeername failed for '{}': {}", errorDetails(result.errno_))); + } + + if (ss_len == udsAddressLength() && ss.ss_family == AF_UNIX) { + // For Unix domain sockets, can't find out the peer name, but it should match our own + // name for the socket (i.e. the path should match, barring any namespace or other + // mechanisms to hide things, of which there are many). + ss_len = sizeof ss; + result = os_sys_calls.getsockname(fd_, reinterpret_cast(&ss), &ss_len); + if (result.return_value_ != 0) { + throw EnvoyException( + fmt::format("getsockname failed for '{}': {}", fd_, errorDetails(result.errno_))); + } + } + return Network::Address::addressFromSockAddrOrThrow(ss, ss_len, socket_v6only_); +} + +void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, + Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) { + // Check if this is a server socket accepting new connections. + if (isLeader()) { + file_event_adapter_->initialize(dispatcher, cb, trigger, events); + file_event_adapter_->addAcceptRequest(); + io_uring_factory_.getOrCreate().submit(); + return; + } + + // Check if this is going to become a leading client socket. + if (!io_uring_factory_.getOrCreate().isEventfdRegistered()) { + file_event_adapter_ = + std::make_unique(read_buffer_size_, io_uring_factory_, fd_); + file_event_adapter_->initialize(dispatcher, cb, trigger, events); + } + + cb_ = std::move(cb); +} + +Network::IoHandlePtr IoUringSocketHandleImpl::duplicate() { PANIC("not implemented"); } + +void IoUringSocketHandleImpl::activateFileEvents(uint32_t events) { + if (events & Event::FileReadyType::Write) { + addReadRequest(); + cb_(Event::FileReadyType::Write); + } +} + +void IoUringSocketHandleImpl::enableFileEvents(uint32_t events) { + if (events & Event::FileReadyType::Read) { + is_read_enabled_ = true; + addReadRequest(); + } else { + is_read_enabled_ = false; + } +} + +void IoUringSocketHandleImpl::resetFileEvents() { file_event_adapter_.reset(); } + +Api::SysCallIntResult IoUringSocketHandleImpl::shutdown(int /*how*/) { PANIC("not implemented"); } + +void IoUringSocketHandleImpl::addReadRequest() { + if (!is_read_enabled_ || !SOCKET_VALID(fd_) || is_read_added_) { + return; + } + + ASSERT(read_buf_ == nullptr); + is_read_added_ = true; // don't add READ if it's been already added. + read_buf_ = std::unique_ptr(new uint8_t[read_buffer_size_]); + iov_.iov_base = read_buf_.get(); + iov_.iov_len = read_buffer_size_; + auto& uring = io_uring_factory_.getOrCreate(); + auto req = new Request{*this, RequestType::Read}; + auto res = uring.prepareReadv(fd_, &iov_, 1, 0, req); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + uring.submit(); + res = uring.prepareReadv(fd_, &iov_, 1, 0, req); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare readv"); + } +} + +absl::optional IoUringSocketHandleImpl::interfaceName() { + // TODO(rojkov): This is a copy-paste from Network::IoSocketHandleImpl. + // Unification is needed. + auto& os_syscalls_singleton = Api::OsSysCallsSingleton::get(); + if (!os_syscalls_singleton.supportsGetifaddrs()) { + return absl::nullopt; + } + + Network::Address::InstanceConstSharedPtr socket_address = localAddress(); + if (!socket_address || socket_address->type() != Network::Address::Type::Ip) { + return absl::nullopt; + } + + Api::InterfaceAddressVector interface_addresses{}; + const Api::SysCallIntResult rc = os_syscalls_singleton.getifaddrs(interface_addresses); + RELEASE_ASSERT(!rc.return_value_, fmt::format("getiffaddrs error: {}", rc.errno_)); + + absl::optional selected_interface_name{}; + for (const auto& interface_address : interface_addresses) { + if (!interface_address.interface_addr_) { + continue; + } + + if (socket_address->ip()->version() == interface_address.interface_addr_->ip()->version()) { + // Compare address _without port_. + // TODO: create common addressAsStringWithoutPort method to simplify code here. + absl::uint128 socket_address_value; + absl::uint128 interface_address_value; + switch (socket_address->ip()->version()) { + case Network::Address::IpVersion::v4: + socket_address_value = socket_address->ip()->ipv4()->address(); + interface_address_value = interface_address.interface_addr_->ip()->ipv4()->address(); + break; + case Network::Address::IpVersion::v6: + socket_address_value = socket_address->ip()->ipv6()->address(); + interface_address_value = interface_address.interface_addr_->ip()->ipv6()->address(); + break; + default: + ENVOY_BUG(false, fmt::format("unexpected IP family {}", + static_cast(socket_address->ip()->version()))); + } + + if (socket_address_value == interface_address_value) { + selected_interface_name = interface_address.interface_name_; + break; + } + } + } + + return selected_interface_name; +} + +Network::IoHandlePtr IoUringSocketHandleImpl::FileEventAdapter::accept(struct sockaddr* addr, + socklen_t* addrlen) { + if (!is_accept_added_) { + return nullptr; + } + + ASSERT(SOCKET_VALID(connection_fd_)); + + is_accept_added_ = false; + *addr = remote_addr_; + *addrlen = remote_addr_len_; + auto io_handle = std::make_unique(read_buffer_size_, io_uring_factory_, + connection_fd_); + SET_SOCKET_INVALID(connection_fd_); + io_handle->addReadRequest(); + return io_handle; +} + +void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Request& req, + int32_t result) { + if (result < 0) { + ENVOY_LOG(debug, "async request failed: {}", errorDetails(-result)); + } + + switch (req.type_) { + case RequestType::Accept: + ASSERT(!SOCKET_VALID(connection_fd_)); + addAcceptRequest(); + if (result >= 0) { + connection_fd_ = result; + cb_(Event::FileReadyType::Read); + } + break; + case RequestType::Read: { + ASSERT(req.iohandle_.has_value()); + auto& iohandle = req.iohandle_->get(); + iohandle.bytes_to_read_ = result; + iohandle.cb_(result > 0 ? Event::FileReadyType::Read : Event::FileReadyType::Closed); + if (result > 0) { + iohandle.addReadRequest(); + } + break; + } + case RequestType::Connect: + ASSERT(req.iohandle_.has_value()); + req.iohandle_->get().cb_(result < 0 ? Event::FileReadyType::Closed + : Event::FileReadyType::Write); + break; + case RequestType::Write: + ASSERT(req.iov_ != nullptr); + ASSERT(req.iohandle_.has_value()); + delete[] req.iov_; + if (result < 0) { + req.iohandle_->get().cb_(Event::FileReadyType::Closed); + } + break; + case RequestType::Close: + break; + default: + PANIC("not implemented"); + } +} + +void IoUringSocketHandleImpl::FileEventAdapter::onFileEvent() { + Io::IoUring& uring = io_uring_factory_.getOrCreate(); + uring.forEveryCompletion([this](void* user_data, int32_t result) { + auto req = static_cast(user_data); + onRequestCompletion(*req, result); + delete req; + }); + uring.submit(); +} + +void IoUringSocketHandleImpl::FileEventAdapter::initialize(Event::Dispatcher& dispatcher, + Event::FileReadyCb cb, + Event::FileTriggerType trigger, + uint32_t events) { + ASSERT(file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same " + "file descriptor. This is not allowed."); + + cb_ = std::move(cb); + Io::IoUring& uring = io_uring_factory_.getOrCreate(); + const os_fd_t event_fd = uring.registerEventfd(); + file_event_ = dispatcher.createFileEvent( + event_fd, [this](uint32_t) { onFileEvent(); }, trigger, events); +} + +void IoUringSocketHandleImpl::FileEventAdapter::addAcceptRequest() { + is_accept_added_ = true; + auto& uring = io_uring_factory_.getOrCreate(); + auto req = new Request{absl::nullopt, RequestType::Accept}; + auto res = uring.prepareAccept(fd_, &remote_addr_, &remote_addr_len_, req); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + uring.submit(); + res = uring.prepareAccept(fd_, &remote_addr_, &remote_addr_len_, req); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare accept"); + } +} + +} // namespace IoUring +} // namespace IoSocket +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/io_socket/io_uring/io_handle_impl.h b/source/extensions/io_socket/io_uring/io_handle_impl.h new file mode 100644 index 0000000000000..3db61b8872789 --- /dev/null +++ b/source/extensions/io_socket/io_uring/io_handle_impl.h @@ -0,0 +1,141 @@ +#pragma once + +#include "envoy/buffer/buffer.h" +#include "envoy/network/io_handle.h" + +#include "source/common/common/logger.h" + +namespace Envoy { + +namespace Io { +class IoUringFactory; +} // namespace Io + +namespace Extensions { +namespace IoSocket { +namespace IoUring { + +class IoUringSocketHandleImpl; + +enum class RequestType { Accept, Connect, Read, Write, Close, Unknown }; + +using IoUringSocketHandleImplOptRef = + absl::optional>; + +struct Request { + IoUringSocketHandleImplOptRef iohandle_{absl::nullopt}; + RequestType type_{RequestType::Unknown}; + struct iovec* iov_{nullptr}; + std::list slices_{}; +}; + +/** + * IoHandle derivative for sockets. + */ +class IoUringSocketHandleImpl final : public Network::IoHandle, + protected Logger::Loggable { +public: + IoUringSocketHandleImpl(const uint32_t read_buffer_size, const Io::IoUringFactory&, + os_fd_t fd = INVALID_SOCKET, bool socket_v6only = false, + absl::optional domain = absl::nullopt); + ~IoUringSocketHandleImpl() override; + + // Network::IoHandle + // TODO(rojkov) To be removed when the fd is fully abstracted from clients. + os_fd_t fdDoNotUse() const override { return fd_; } + Api::IoCallUint64Result close() override; + bool isOpen() const override; + Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices, + uint64_t num_slice) override; + Api::IoCallUint64Result read(Buffer::Instance& buffer, + absl::optional max_length_opt) override; + Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; + Api::IoCallUint64Result write(Buffer::Instance& buffer) override; + Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags, + const Network::Address::Ip* self_ip, + const Network::Address::Instance& peer_address) override; + Api::IoCallUint64Result recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice, + uint32_t self_port, RecvMsgOutput& output) override; + Api::IoCallUint64Result recvmmsg(RawSliceArrays& slices, uint32_t self_port, + RecvMsgOutput& output) override; + Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override; + bool supportsMmsg() const override; + bool supportsUdpGro() const override; + Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override; + Api::SysCallIntResult listen(int backlog) override; + Network::IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override; + Api::SysCallIntResult connect(Network::Address::InstanceConstSharedPtr address) override; + Api::SysCallIntResult setOption(int level, int optname, const void* optval, + socklen_t optlen) override; + Api::SysCallIntResult getOption(int level, int optname, void* optval, socklen_t* optlen) override; + Api::SysCallIntResult ioctl(unsigned long, void*, unsigned long, void*, unsigned long, + unsigned long*) override; + Api::SysCallIntResult setBlocking(bool blocking) override; + absl::optional domain() override; + Network::Address::InstanceConstSharedPtr localAddress() override; + Network::Address::InstanceConstSharedPtr peerAddress() override; + void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override; + Network::IoHandlePtr duplicate() override; + void activateFileEvents(uint32_t events) override; + void enableFileEvents(uint32_t events) override; + void resetFileEvents() override; + Api::SysCallIntResult shutdown(int how) override; + absl::optional lastRoundTripTime() override { return absl::nullopt; } + absl::optional congestionWindowInBytes() const override { return absl::nullopt; } + absl::optional interfaceName() override; + +private: + // FileEventAdapter adapts `io_uring` to libevent. + class FileEventAdapter { + public: + FileEventAdapter(const uint32_t read_buffer_size, const Io::IoUringFactory& io_uring_factory, + os_fd_t fd) + : read_buffer_size_(read_buffer_size), io_uring_factory_(io_uring_factory), fd_(fd) {} + void initialize(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events); + Network::IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen); + void addAcceptRequest(); + + private: + void onFileEvent(); + void onRequestCompletion(const Request& req, int32_t result); + + const uint32_t read_buffer_size_; + const Io::IoUringFactory& io_uring_factory_; + os_fd_t fd_; + Event::FileReadyCb cb_; + Event::FileEventPtr file_event_{nullptr}; + os_fd_t connection_fd_{INVALID_SOCKET}; + bool is_accept_added_{false}; + struct sockaddr remote_addr_; + socklen_t remote_addr_len_{sizeof(remote_addr_)}; + }; + + void addReadRequest(); + // Checks if the io handle is the one that registered eventfd with `io_uring`. + // An io handle can be a leader in two cases: + // 1. it's a server socket accepting new connections; + // 2. it's a client socket about to connect to a remote socket, but created + // in a thread without properly initialized `io_uring`. + bool isLeader() const { return file_event_adapter_ != nullptr; } + + const uint32_t read_buffer_size_; + const Io::IoUringFactory& io_uring_factory_; + os_fd_t fd_; + int socket_v6only_; + const absl::optional domain_; + + Event::FileReadyCb cb_; + struct iovec iov_; + std::unique_ptr read_buf_{nullptr}; + int32_t bytes_to_read_{0}; + bool is_read_added_{false}; + bool is_read_enabled_{true}; + std::unique_ptr file_event_adapter_{nullptr}; +}; + +} // namespace IoUring +} // namespace IoSocket +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/io_socket/io_uring/BUILD b/test/extensions/io_socket/io_uring/BUILD new file mode 100644 index 0000000000000..f278211d912ea --- /dev/null +++ b/test/extensions/io_socket/io_uring/BUILD @@ -0,0 +1,19 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_cc_test( + name = "socket_interface_integration_test", + srcs = ["socket_interface_integration_test.cc"], + deps = [ + "//source/extensions/filters/network/echo:config", + "//source/extensions/io_socket/io_uring:config", + "//test/integration:http_integration_lib", + ], +) diff --git a/test/extensions/io_socket/io_uring/socket_interface_integration_test.cc b/test/extensions/io_socket/io_uring/socket_interface_integration_test.cc new file mode 100644 index 0000000000000..51b9226ca3bb0 --- /dev/null +++ b/test/extensions/io_socket/io_uring/socket_interface_integration_test.cc @@ -0,0 +1,76 @@ +#include "source/common/buffer/buffer_impl.h" +#include "source/common/network/address_impl.h" +#include "source/common/network/socket_interface.h" + +#include "test/integration/integration.h" +#include "test/test_common/environment.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +class SocketInterfaceIntegrationTest : public BaseIntegrationTest, + public testing::TestWithParam { +public: + SocketInterfaceIntegrationTest() : BaseIntegrationTest(GetParam(), config()) { + use_lds_ = false; + }; + + static std::string config() { + // At least one empty filter chain needs to be specified. + return absl::StrCat(echoConfig(), R"EOF( +bootstrap_extensions: + - name: envoy.extensions.network.socket_interface.default_socket_interface + typed_config: + "@type": type.googleapis.com/envoy.extensions.network.socket_interface.v3.IoUringSocketInterface + - name: envoy.extensions.io.io_uring + typed_config: + "@type": type.googleapis.com/envoy.extensions.io.io_uring.v3.IoUring + io_uring_size: 20 +default_socket_interface: "envoy.extensions.network.socket_interface.io_uring" + )EOF"); + } + static std::string echoConfig() { + return absl::StrCat(ConfigHelper::baseConfig(), R"EOF( + filter_chains: + filters: + name: ratelimit + typed_config: + "@type": type.googleapis.com/envoy.config.filter.network.rate_limit.v2.RateLimit + domain: foo + stats_prefix: name + descriptors: [{"key": "foo", "value": "bar"}] + filters: + name: envoy.filters.network.echo + )EOF"); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, SocketInterfaceIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +#if defined(__linux__) +TEST_P(SocketInterfaceIntegrationTest, Basic) { + BaseIntegrationTest::initialize(); + const Network::SocketInterface* factory = + Network::socketInterface("envoy.extensions.network.socket_interface.io_uring"); + ASSERT_TRUE(factory != nullptr); + ASSERT_TRUE(Network::SocketInterfaceSingleton::getExisting() == factory); + + std::string response; + auto connection = createConnectionDriver( + lookupPort("listener_0"), "hello", + [&response](Network::ClientConnection& conn, const Buffer::Instance& data) -> void { + response.append(data.toString()); + conn.close(Network::ConnectionCloseType::FlushWrite); + }); + connection->run(); + EXPECT_EQ("hello", response); +} +#endif + +} // namespace +} // namespace Envoy