From 2f1df287092329c4ef19eaa9b93daa44745137fd Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 6 Jul 2022 05:04:57 +0000 Subject: [PATCH 1/8] Add dimitry's implement and move to the core Signed-off-by: Dmitry Rozhkov Signed-off-by: He Jie Xu --- source/common/network/BUILD | 3 + .../network/io_uring_socket_handle_impl.cc | 471 ++++++++++++++++++ .../network/io_uring_socket_handle_impl.h | 136 +++++ .../common/network/socket_interface_impl.cc | 43 +- source/common/network/socket_interface_impl.h | 22 +- source/server/server.cc | 21 +- 6 files changed, 682 insertions(+), 14 deletions(-) create mode 100644 source/common/network/io_uring_socket_handle_impl.cc create mode 100644 source/common/network/io_uring_socket_handle_impl.h diff --git a/source/common/network/BUILD b/source/common/network/BUILD index c4680bcd8f5cf..872db56c8015a 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -233,11 +233,13 @@ envoy_cc_library( name = "default_socket_interface_lib", srcs = [ "io_socket_handle_impl.cc", + "io_uring_socket_handle_impl.cc", "socket_interface_impl.cc", "win32_socket_handle_impl.cc", ], hdrs = [ "io_socket_handle_impl.h", + "io_uring_socket_handle_impl.h", "socket_interface_impl.h", "win32_socket_handle_impl.h", ], @@ -251,6 +253,7 @@ envoy_cc_library( "//source/common/api:os_sys_calls_lib", "//source/common/buffer:buffer_lib", "//source/common/event:dispatcher_includes", + "//source/common/io:io_uring_impl_lib", "@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto", ], ) diff --git a/source/common/network/io_uring_socket_handle_impl.cc b/source/common/network/io_uring_socket_handle_impl.cc new file mode 100644 index 0000000000000..30d597dceec82 --- /dev/null +++ b/source/common/network/io_uring_socket_handle_impl.cc @@ -0,0 +1,471 @@ +#include "source/common/network/io_uring_socket_handle_impl.h" + +#include "envoy/buffer/buffer.h" +#include "envoy/common/exception.h" +#include "envoy/event/dispatcher.h" + +#include "source/common/api/os_sys_calls_impl.h" +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/assert.h" +#include "source/common/common/utility.h" +#include "source/common/io/io_uring.h" +#include "source/common/network/address_impl.h" +#include "source/common/network/io_socket_error_impl.h" + +namespace Envoy { +namespace Network { + +namespace { + +constexpr socklen_t udsAddressLength() { return sizeof(sa_family_t); } + +} // namespace + +IoUringSocketHandleImpl::IoUringSocketHandleImpl(const uint32_t read_buffer_size, + const Io::IoUringFactory& io_uring_factory, + os_fd_t fd, bool socket_v6only, + absl::optional domain) + : read_buffer_size_(read_buffer_size), io_uring_factory_(io_uring_factory), fd_(fd), + socket_v6only_(socket_v6only), domain_(domain) {} + +IoUringSocketHandleImpl::~IoUringSocketHandleImpl() { + if (SOCKET_VALID(fd_)) { + // The TLS slot has been shut down by this moment with IoUring wiped out, thus + // better use this posix system call instead of IoUringSocketHandleImpl::close(). + ::close(fd_); + } +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::close() { + ASSERT(SOCKET_VALID(fd_)); + auto req = new Request{absl::nullopt, RequestType::Close}; + Io::IoUringResult res = io_uring_factory_.getOrCreate().prepareClose(fd_, req); + if (res == Io::IoUringResult::Failed) { + // Fall back to posix system call. + ::close(fd_); + } + if (isLeader()) { + io_uring_factory_.getOrCreate().unregisterEventfd(); + file_event_adapter_.reset(); + } + SET_SOCKET_INVALID(fd_); + return Api::ioCallUint64ResultNoError(); +} + +bool IoUringSocketHandleImpl::isOpen() const { return SOCKET_VALID(fd_); } +Api::IoCallUint64Result IoUringSocketHandleImpl::readv(uint64_t /* max_length */, + Buffer::RawSlice* /* slices */, + uint64_t /* num_slice */) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::read(Buffer::Instance& buffer, + absl::optional max_length_opt) { + const uint64_t max_length = max_length_opt.value_or(UINT64_MAX); + if (max_length == 0) { + return Api::ioCallUint64ResultNoError(); + } + + if (bytes_to_read_ == 0) { + return Api::IoCallUint64Result(0, Api::IoErrorPtr(IoSocketError::getIoSocketEagainInstance(), + IoSocketError::deleteIoError)); + } + + ASSERT(read_buf_ != nullptr); + auto fragment = new Buffer::BufferFragmentImpl( + read_buf_.release(), bytes_to_read_, + [](const void* data, size_t /*len*/, const Buffer::BufferFragmentImpl* this_fragment) { + delete[] reinterpret_cast(data); + delete this_fragment; + }); + buffer.addBufferFragment(*fragment); + is_read_added_ = false; + + uint64_t len = bytes_to_read_; + bytes_to_read_ = 0; + return Api::IoCallUint64Result(len, Api::IoErrorPtr(nullptr, IoSocketError::deleteIoError)); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::writev(const Buffer::RawSlice* /*slices */, + uint64_t /*num_slice*/) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::write(Buffer::Instance& buffer) { + auto length = buffer.length(); + ASSERT(length > 0); + + std::list slices; + while (buffer.length() > 0) { + // The buffer must not own the data after it has been extracted and put into + // the `io-uring` submission queue to avoid freeing it before the writev + // operation is completed. + Buffer::SliceDataPtr data = buffer.extractMutableFrontSlice(); + slices.push_back(std::move(data)); + } + + uint32_t nr_vecs = slices.size(); + struct iovec* iovecs = new struct iovec[slices.size()]; + struct iovec* iov = iovecs; + for (auto& slice : slices) { + absl::Span mdata = slice->getMutableData(); + iov->iov_base = mdata.data(); + iov->iov_len = mdata.size(); + iov++; + } + + auto req = new Request{*this, RequestType::Write, iovecs, std::move(slices)}; + auto& uring = io_uring_factory_.getOrCreate(); + auto res = uring.prepareWritev(fd_, iovecs, nr_vecs, 0, req); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + uring.submit(); + res = uring.prepareWritev(fd_, iovecs, nr_vecs, 0, req); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare writev"); + } + return Api::IoCallUint64Result(length, Api::IoErrorPtr(nullptr, IoSocketError::deleteIoError)); +} + +Api::IoCallUint64Result +IoUringSocketHandleImpl::sendmsg(const Buffer::RawSlice* /*slices*/, uint64_t /*num_slice*/, + int /*flags*/, const Address::Ip* /*self_ip*/, + const Address::Instance& /*peer_address*/) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::recvmsg(Buffer::RawSlice* /*slices*/, + const uint64_t /*num_slice*/, + uint32_t /*self_port*/, + RecvMsgOutput& /*output*/) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::recvmmsg(RawSliceArrays& /*slices*/, + uint32_t /*self_port*/, + RecvMsgOutput& /*output*/) { + PANIC("not implemented"); +} + +Api::IoCallUint64Result IoUringSocketHandleImpl::recv(void* /*buffer*/, size_t /*length*/, + int /*flags*/) { + PANIC("not implemented"); +} + +bool IoUringSocketHandleImpl::supportsMmsg() const { PANIC("not implemented"); } + +bool IoUringSocketHandleImpl::supportsUdpGro() const { PANIC("not implemented"); } + +Api::SysCallIntResult IoUringSocketHandleImpl::bind(Address::InstanceConstSharedPtr address) { + return Api::OsSysCallsSingleton::get().bind(fd_, address->sockAddr(), address->sockAddrLen()); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::listen(int backlog) { + file_event_adapter_ = + std::make_unique(read_buffer_size_, io_uring_factory_, fd_); + return Api::OsSysCallsSingleton::get().listen(fd_, backlog); +} + +IoHandlePtr IoUringSocketHandleImpl::accept(struct sockaddr* addr, socklen_t* addrlen) { + return file_event_adapter_->accept(addr, addrlen); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::connect(Address::InstanceConstSharedPtr address) { + auto& uring = io_uring_factory_.getOrCreate(); + auto req = new Request{*this, RequestType::Connect}; + auto res = uring.prepareConnect(fd_, address, req); + if (res == Io::IoUringResult::Failed) { + res = uring.submit(); + if (res == Io::IoUringResult::Busy) { + return Api::SysCallIntResult{0, SOCKET_ERROR_AGAIN}; + } + res = uring.prepareConnect(fd_, address, req); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare connect"); + } + if (isLeader()) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + uring.submit(); + } + return Api::SysCallIntResult{0, SOCKET_ERROR_IN_PROGRESS}; +} + +Api::SysCallIntResult IoUringSocketHandleImpl::setOption(int level, int optname, const void* optval, + socklen_t optlen) { + return Api::OsSysCallsSingleton::get().setsockopt(fd_, level, optname, optval, optlen); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::getOption(int level, int optname, void* optval, + socklen_t* optlen) { + return Api::OsSysCallsSingleton::get().getsockopt(fd_, level, optname, optval, optlen); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::ioctl(unsigned long, void*, unsigned long, void*, + unsigned long, unsigned long*) { + PANIC("not implemented"); +} + +Api::SysCallIntResult IoUringSocketHandleImpl::setBlocking(bool /*blocking*/) { + PANIC("not implemented"); +} + +absl::optional IoUringSocketHandleImpl::domain() { return domain_; } + +Address::InstanceConstSharedPtr IoUringSocketHandleImpl::localAddress() { + // TODO(rojkov): This is a copy-paste from Network::IoSocketHandleImpl. + // Unification is needed. + sockaddr_storage ss; + socklen_t ss_len = sizeof(ss); + auto& os_sys_calls = Api::OsSysCallsSingleton::get(); + Api::SysCallIntResult result = + os_sys_calls.getsockname(fd_, reinterpret_cast(&ss), &ss_len); + if (result.return_value_ != 0) { + throw EnvoyException(fmt::format("getsockname failed for '{}': ({}) {}", fd_, result.errno_, + errorDetails(result.errno_))); + } + return Address::addressFromSockAddrOrThrow(ss, ss_len, socket_v6only_); +} + +Address::InstanceConstSharedPtr IoUringSocketHandleImpl::peerAddress() { + // TODO(rojkov): This is a copy-paste from Network::IoSocketHandleImpl. + // Unification is needed. + sockaddr_storage ss; + socklen_t ss_len = sizeof ss; + auto& os_sys_calls = Api::OsSysCallsSingleton::get(); + Api::SysCallIntResult result = + os_sys_calls.getpeername(fd_, reinterpret_cast(&ss), &ss_len); + if (result.return_value_ != 0) { + throw EnvoyException( + fmt::format("getpeername failed for '{}': {}", errorDetails(result.errno_))); + } + + if (ss_len == udsAddressLength() && ss.ss_family == AF_UNIX) { + // For Unix domain sockets, can't find out the peer name, but it should match our own + // name for the socket (i.e. the path should match, barring any namespace or other + // mechanisms to hide things, of which there are many). + ss_len = sizeof ss; + result = os_sys_calls.getsockname(fd_, reinterpret_cast(&ss), &ss_len); + if (result.return_value_ != 0) { + throw EnvoyException( + fmt::format("getsockname failed for '{}': {}", fd_, errorDetails(result.errno_))); + } + } + return Address::addressFromSockAddrOrThrow(ss, ss_len, socket_v6only_); +} + +void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, + Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) { + // Check if this is a server socket accepting new connections. + if (isLeader()) { + file_event_adapter_->initialize(dispatcher, cb, trigger, events); + file_event_adapter_->addAcceptRequest(); + io_uring_factory_.getOrCreate().submit(); + return; + } + + // Check if this is going to become a leading client socket. + if (!io_uring_factory_.getOrCreate().isEventfdRegistered()) { + file_event_adapter_ = + std::make_unique(read_buffer_size_, io_uring_factory_, fd_); + file_event_adapter_->initialize(dispatcher, cb, trigger, events); + } + + cb_ = std::move(cb); +} + +IoHandlePtr IoUringSocketHandleImpl::duplicate() { PANIC("not implemented"); } + +void IoUringSocketHandleImpl::activateFileEvents(uint32_t events) { + if (events & Event::FileReadyType::Write) { + addReadRequest(); + cb_(Event::FileReadyType::Write); + } +} + +void IoUringSocketHandleImpl::enableFileEvents(uint32_t events) { + if (events & Event::FileReadyType::Read) { + is_read_enabled_ = true; + addReadRequest(); + } else { + is_read_enabled_ = false; + } +} + +void IoUringSocketHandleImpl::resetFileEvents() { file_event_adapter_.reset(); } + +Api::SysCallIntResult IoUringSocketHandleImpl::shutdown(int /*how*/) { PANIC("not implemented"); } + +void IoUringSocketHandleImpl::addReadRequest() { + if (!is_read_enabled_ || !SOCKET_VALID(fd_) || is_read_added_) { + return; + } + + ASSERT(read_buf_ == nullptr); + is_read_added_ = true; // don't add READ if it's been already added. + read_buf_ = std::unique_ptr(new uint8_t[read_buffer_size_]); + iov_.iov_base = read_buf_.get(); + iov_.iov_len = read_buffer_size_; + auto& uring = io_uring_factory_.getOrCreate(); + auto req = new Request{*this, RequestType::Read}; + auto res = uring.prepareReadv(fd_, &iov_, 1, 0, req); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + uring.submit(); + res = uring.prepareReadv(fd_, &iov_, 1, 0, req); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare readv"); + } +} + +absl::optional IoUringSocketHandleImpl::interfaceName() { + // TODO(rojkov): This is a copy-paste from Network::IoSocketHandleImpl. + // Unification is needed. + auto& os_syscalls_singleton = Api::OsSysCallsSingleton::get(); + if (!os_syscalls_singleton.supportsGetifaddrs()) { + return absl::nullopt; + } + + Address::InstanceConstSharedPtr socket_address = localAddress(); + if (!socket_address || socket_address->type() != Address::Type::Ip) { + return absl::nullopt; + } + + Api::InterfaceAddressVector interface_addresses{}; + const Api::SysCallIntResult rc = os_syscalls_singleton.getifaddrs(interface_addresses); + RELEASE_ASSERT(!rc.return_value_, fmt::format("getiffaddrs error: {}", rc.errno_)); + + absl::optional selected_interface_name{}; + for (const auto& interface_address : interface_addresses) { + if (!interface_address.interface_addr_) { + continue; + } + + if (socket_address->ip()->version() == interface_address.interface_addr_->ip()->version()) { + // Compare address _without port_. + // TODO: create common addressAsStringWithoutPort method to simplify code here. + absl::uint128 socket_address_value; + absl::uint128 interface_address_value; + switch (socket_address->ip()->version()) { + case Address::IpVersion::v4: + socket_address_value = socket_address->ip()->ipv4()->address(); + interface_address_value = interface_address.interface_addr_->ip()->ipv4()->address(); + break; + case Address::IpVersion::v6: + socket_address_value = socket_address->ip()->ipv6()->address(); + interface_address_value = interface_address.interface_addr_->ip()->ipv6()->address(); + break; + default: + ENVOY_BUG(false, fmt::format("unexpected IP family {}", + static_cast(socket_address->ip()->version()))); + } + + if (socket_address_value == interface_address_value) { + selected_interface_name = interface_address.interface_name_; + break; + } + } + } + + return selected_interface_name; +} + +IoHandlePtr IoUringSocketHandleImpl::FileEventAdapter::accept(struct sockaddr* addr, + socklen_t* addrlen) { + if (!is_accept_added_) { + return nullptr; + } + + ASSERT(SOCKET_VALID(connection_fd_)); + + is_accept_added_ = false; + *addr = remote_addr_; + *addrlen = remote_addr_len_; + auto io_handle = std::make_unique(read_buffer_size_, io_uring_factory_, + connection_fd_); + SET_SOCKET_INVALID(connection_fd_); + io_handle->addReadRequest(); + return io_handle; +} + +void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Request& req, + int32_t result) { + if (result < 0) { + ENVOY_LOG(debug, "async request failed: {}", errorDetails(-result)); + } + + switch (req.type_) { + case RequestType::Accept: + ASSERT(!SOCKET_VALID(connection_fd_)); + addAcceptRequest(); + if (result >= 0) { + connection_fd_ = result; + cb_(Event::FileReadyType::Read); + } + break; + case RequestType::Read: { + ASSERT(req.iohandle_.has_value()); + auto& iohandle = req.iohandle_->get(); + iohandle.bytes_to_read_ = result; + iohandle.cb_(result > 0 ? Event::FileReadyType::Read : Event::FileReadyType::Closed); + if (result > 0) { + iohandle.addReadRequest(); + } + break; + } + case RequestType::Connect: + ASSERT(req.iohandle_.has_value()); + req.iohandle_->get().cb_(result < 0 ? Event::FileReadyType::Closed + : Event::FileReadyType::Write); + break; + case RequestType::Write: + ASSERT(req.iov_ != nullptr); + ASSERT(req.iohandle_.has_value()); + delete[] req.iov_; + if (result < 0) { + req.iohandle_->get().cb_(Event::FileReadyType::Closed); + } + break; + case RequestType::Close: + break; + default: + PANIC("not implemented"); + } +} + +void IoUringSocketHandleImpl::FileEventAdapter::onFileEvent() { + Io::IoUring& uring = io_uring_factory_.getOrCreate(); + uring.forEveryCompletion([this](void* user_data, int32_t result) { + auto req = static_cast(user_data); + onRequestCompletion(*req, result); + delete req; + }); + uring.submit(); +} + +void IoUringSocketHandleImpl::FileEventAdapter::initialize(Event::Dispatcher& dispatcher, + Event::FileReadyCb cb, + Event::FileTriggerType trigger, + uint32_t events) { + ASSERT(file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same " + "file descriptor. This is not allowed."); + + cb_ = std::move(cb); + Io::IoUring& uring = io_uring_factory_.getOrCreate(); + const os_fd_t event_fd = uring.registerEventfd(); + file_event_ = dispatcher.createFileEvent( + event_fd, [this](uint32_t) { onFileEvent(); }, trigger, events); +} + +void IoUringSocketHandleImpl::FileEventAdapter::addAcceptRequest() { + is_accept_added_ = true; + auto& uring = io_uring_factory_.getOrCreate(); + auto req = new Request{absl::nullopt, RequestType::Accept}; + auto res = uring.prepareAccept(fd_, &remote_addr_, &remote_addr_len_, req); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + uring.submit(); + res = uring.prepareAccept(fd_, &remote_addr_, &remote_addr_len_, req); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare accept"); + } +} + +} // namespace Network +} // namespace Envoy diff --git a/source/common/network/io_uring_socket_handle_impl.h b/source/common/network/io_uring_socket_handle_impl.h new file mode 100644 index 0000000000000..737584bcff2e7 --- /dev/null +++ b/source/common/network/io_uring_socket_handle_impl.h @@ -0,0 +1,136 @@ +#pragma once + +#include "envoy/buffer/buffer.h" +#include "envoy/network/io_handle.h" + +#include "source/common/common/logger.h" + +namespace Envoy { + +namespace Io { +class IoUringFactory; +} // namespace Io + +namespace Network { + +class IoUringSocketHandleImpl; + +enum class RequestType { Accept, Connect, Read, Write, Close, Unknown }; + +using IoUringSocketHandleImplOptRef = + absl::optional>; + +struct Request { + IoUringSocketHandleImplOptRef iohandle_{absl::nullopt}; + RequestType type_{RequestType::Unknown}; + struct iovec* iov_{nullptr}; + std::list slices_{}; +}; + +/** + * IoHandle derivative for sockets. + */ +class IoUringSocketHandleImpl final : public IoHandle, protected Logger::Loggable { +public: + IoUringSocketHandleImpl(const uint32_t read_buffer_size, const Io::IoUringFactory&, + os_fd_t fd = INVALID_SOCKET, bool socket_v6only = false, + absl::optional domain = absl::nullopt); + ~IoUringSocketHandleImpl() override; + + // Network::IoHandle + // TODO(rojkov) To be removed when the fd is fully abstracted from clients. + os_fd_t fdDoNotUse() const override { return fd_; } + Api::IoCallUint64Result close() override; + bool isOpen() const override; + Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices, + uint64_t num_slice) override; + Api::IoCallUint64Result read(Buffer::Instance& buffer, + absl::optional max_length_opt) override; + Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; + Api::IoCallUint64Result write(Buffer::Instance& buffer) override; + Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags, + const Address::Ip* self_ip, + const Address::Instance& peer_address) override; + Api::IoCallUint64Result recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice, + uint32_t self_port, RecvMsgOutput& output) override; + Api::IoCallUint64Result recvmmsg(RawSliceArrays& slices, uint32_t self_port, + RecvMsgOutput& output) override; + Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override; + bool supportsMmsg() const override; + bool supportsUdpGro() const override; + Api::SysCallIntResult bind(Address::InstanceConstSharedPtr address) override; + Api::SysCallIntResult listen(int backlog) override; + IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override; + Api::SysCallIntResult connect(Address::InstanceConstSharedPtr address) override; + Api::SysCallIntResult setOption(int level, int optname, const void* optval, + socklen_t optlen) override; + Api::SysCallIntResult getOption(int level, int optname, void* optval, socklen_t* optlen) override; + Api::SysCallIntResult ioctl(unsigned long, void*, unsigned long, void*, unsigned long, + unsigned long*) override; + Api::SysCallIntResult setBlocking(bool blocking) override; + absl::optional domain() override; + Address::InstanceConstSharedPtr localAddress() override; + Address::InstanceConstSharedPtr peerAddress() override; + void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override; + IoHandlePtr duplicate() override; + void activateFileEvents(uint32_t events) override; + void enableFileEvents(uint32_t events) override; + void resetFileEvents() override; + Api::SysCallIntResult shutdown(int how) override; + absl::optional lastRoundTripTime() override { return absl::nullopt; } + absl::optional congestionWindowInBytes() const override { return absl::nullopt; } + absl::optional interfaceName() override; + +private: + // FileEventAdapter adapts `io_uring` to libevent. + class FileEventAdapter { + public: + FileEventAdapter(const uint32_t read_buffer_size, const Io::IoUringFactory& io_uring_factory, + os_fd_t fd) + : read_buffer_size_(read_buffer_size), io_uring_factory_(io_uring_factory), fd_(fd) {} + void initialize(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events); + IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen); + void addAcceptRequest(); + + private: + void onFileEvent(); + void onRequestCompletion(const Request& req, int32_t result); + + const uint32_t read_buffer_size_; + const Io::IoUringFactory& io_uring_factory_; + os_fd_t fd_; + Event::FileReadyCb cb_; + Event::FileEventPtr file_event_{nullptr}; + os_fd_t connection_fd_{INVALID_SOCKET}; + bool is_accept_added_{false}; + struct sockaddr remote_addr_; + socklen_t remote_addr_len_{sizeof(remote_addr_)}; + }; + + void addReadRequest(); + // Checks if the io handle is the one that registered eventfd with `io_uring`. + // An io handle can be a leader in two cases: + // 1. it's a server socket accepting new connections; + // 2. it's a client socket about to connect to a remote socket, but created + // in a thread without properly initialized `io_uring`. + bool isLeader() const { return file_event_adapter_ != nullptr; } + + const uint32_t read_buffer_size_; + const Io::IoUringFactory& io_uring_factory_; + os_fd_t fd_; + int socket_v6only_; + const absl::optional domain_; + + Event::FileReadyCb cb_; + struct iovec iov_; + std::unique_ptr read_buf_{nullptr}; + int32_t bytes_to_read_{0}; + bool is_read_added_{false}; + bool is_read_enabled_{true}; + std::unique_ptr file_event_adapter_{nullptr}; +}; + +} // namespace Network +} // namespace Envoy diff --git a/source/common/network/socket_interface_impl.cc b/source/common/network/socket_interface_impl.cc index b296488c0f048..7b62b0d57e2ec 100644 --- a/source/common/network/socket_interface_impl.cc +++ b/source/common/network/socket_interface_impl.cc @@ -6,18 +6,43 @@ #include "source/common/api/os_sys_calls_impl.h" #include "source/common/common/assert.h" #include "source/common/common/utility.h" +#include "source/common/io/io_uring_impl.h" #include "source/common/network/io_socket_handle_impl.h" +#include "source/common/network/io_uring_socket_handle_impl.h" #include "source/common/network/win32_socket_handle_impl.h" namespace Envoy { namespace Network { -IoHandlePtr SocketInterfaceImpl::makePlatformSpecificSocket(int socket_fd, bool socket_v6only, - absl::optional domain) { +namespace { + +// TODO (soulxu): making those configurable if needed. +constexpr uint32_t DefaultIoUringSize = 300; +constexpr uint32_t DefaultReadBufferSize = 8192; +constexpr bool UseSubmissionQueuePolling = false; + +} // namespace + +void DefaultSocketInterfaceExtension::onServerInitialized() { + if (io_uring_factory_ != nullptr) { + io_uring_factory_->onServerInitialized(); + } +} + +IoHandlePtr +SocketInterfaceImpl::makePlatformSpecificSocket(int socket_fd, bool socket_v6only, + absl::optional domain, + const Io::IoUringFactory* io_uring_factory) { if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) { return std::make_unique(socket_fd, socket_v6only, domain); } - return std::make_unique(socket_fd, socket_v6only, domain); + + if (io_uring_factory == nullptr) { + return std::make_unique(socket_fd, socket_v6only, domain); + } else { + return std::make_unique(DefaultReadBufferSize, *io_uring_factory, + socket_fd, socket_v6only, domain); + } } IoHandlePtr SocketInterfaceImpl::makeSocket(int socket_fd, bool socket_v6only, @@ -112,10 +137,14 @@ bool SocketInterfaceImpl::ipFamilySupported(int domain) { return SOCKET_VALID(result.return_value_); } -Server::BootstrapExtensionPtr -SocketInterfaceImpl::createBootstrapExtension(const Protobuf::Message&, - Server::Configuration::ServerFactoryContext&) { - return std::make_unique(*this); +Server::BootstrapExtensionPtr SocketInterfaceImpl::createBootstrapExtension( + const Protobuf::Message&, Server::Configuration::ServerFactoryContext& context) { + // TODO (soulxu): Add runtime flag here. + if (Io::isIoUringSupported()) { + io_uring_factory_ = std::make_unique( + DefaultIoUringSize, UseSubmissionQueuePolling, context.threadLocal()); + } + return std::make_unique(*this, io_uring_factory_); } ProtobufTypes::MessagePtr SocketInterfaceImpl::createEmptyConfigProto() { diff --git a/source/common/network/socket_interface_impl.h b/source/common/network/socket_interface_impl.h index 780c0dc97b71d..68c030d070fab 100644 --- a/source/common/network/socket_interface_impl.h +++ b/source/common/network/socket_interface_impl.h @@ -2,11 +2,25 @@ #include "envoy/network/socket.h" +#include "source/common/io/io_uring.h" #include "source/common/network/socket_interface.h" namespace Envoy { namespace Network { +class DefaultSocketInterfaceExtension : public Network::SocketInterfaceExtension { +public: + DefaultSocketInterfaceExtension(Network::SocketInterface& sock_interface, + std::unique_ptr& io_uring_factory) + : Network::SocketInterfaceExtension(sock_interface), io_uring_factory_(io_uring_factory) {} + + // Server::BootstrapExtension + void onServerInitialized() override; + +protected: + std::unique_ptr& io_uring_factory_; +}; + class SocketInterfaceImpl : public SocketInterfaceBase { public: // SocketInterface @@ -26,12 +40,16 @@ class SocketInterfaceImpl : public SocketInterfaceBase { return "envoy.extensions.network.socket_interface.default_socket_interface"; }; - static IoHandlePtr makePlatformSpecificSocket(int socket_fd, bool socket_v6only, - absl::optional domain); + static IoHandlePtr + makePlatformSpecificSocket(int socket_fd, bool socket_v6only, absl::optional domain, + const Io::IoUringFactory* io_uring_factory = nullptr); protected: virtual IoHandlePtr makeSocket(int socket_fd, bool socket_v6only, absl::optional domain) const; + +private: + std::unique_ptr io_uring_factory_; }; DECLARE_FACTORY(SocketInterfaceImpl); diff --git a/source/server/server.cc b/source/server/server.cc index 411a579ebde77..b4623df7124da 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -583,15 +583,26 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add std::move(safe_actions), std::move(unsafe_actions), api_->threadFactory()); } + Network::SocketInterface* sock = nullptr; + if (!bootstrap_.default_socket_interface().empty()) { auto& sock_name = bootstrap_.default_socket_interface(); - auto sock = const_cast(Network::socketInterface(sock_name)); - if (sock != nullptr) { - Network::SocketInterfaceSingleton::clear(); - Network::SocketInterfaceSingleton::initialize(sock); - } + sock = const_cast(Network::socketInterface(sock_name)); } + if (bootstrap_.default_socket_interface().empty() || sock == nullptr) { + auto factory = + Registry::FactoryRegistry::getFactory( + "envoy.extensions.network.socket_interface.default_socket_interface"); + bootstrap_extensions_.push_back(factory->createBootstrapExtension( + *factory->createEmptyConfigProto(), serverFactoryContext())); + sock = dynamic_cast(factory); + } + + ASSERT(sock != nullptr); + Network::SocketInterfaceSingleton::clear(); + Network::SocketInterfaceSingleton::initialize(sock); + // Initialize the regex engine and inject to singleton. if (bootstrap_.has_default_regex_engine()) { const auto& default_regex_engine = bootstrap_.default_regex_engine(); From 46477a877d42f3c4c03c5951f292a992970e4381 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 6 Jul 2022 07:51:24 +0000 Subject: [PATCH 2/8] pass the io uring factory Signed-off-by: He Jie Xu --- source/common/network/socket_interface_impl.cc | 8 +++++--- source/common/network/socket_interface_impl.h | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/source/common/network/socket_interface_impl.cc b/source/common/network/socket_interface_impl.cc index 7b62b0d57e2ec..55189fbbe749c 100644 --- a/source/common/network/socket_interface_impl.cc +++ b/source/common/network/socket_interface_impl.cc @@ -46,8 +46,9 @@ SocketInterfaceImpl::makePlatformSpecificSocket(int socket_fd, bool socket_v6onl } IoHandlePtr SocketInterfaceImpl::makeSocket(int socket_fd, bool socket_v6only, - absl::optional domain) const { - return makePlatformSpecificSocket(socket_fd, socket_v6only, domain); + absl::optional domain, + const Io::IoUringFactory* io_uring_factory) const { + return makePlatformSpecificSocket(socket_fd, socket_v6only, domain, io_uring_factory); } IoHandlePtr SocketInterfaceImpl::socket(Socket::Type socket_type, Address::Type addr_type, @@ -94,7 +95,8 @@ IoHandlePtr SocketInterfaceImpl::socket(Socket::Type socket_type, Address::Type Api::OsSysCallsSingleton::get().socket(domain, flags, protocol); RELEASE_ASSERT(SOCKET_VALID(result.return_value_), fmt::format("socket(2) failed, got error: {}", errorDetails(result.errno_))); - IoHandlePtr io_handle = makeSocket(result.return_value_, socket_v6only, domain); + IoHandlePtr io_handle = + makeSocket(result.return_value_, socket_v6only, domain, io_uring_factory_.get()); #if defined(__APPLE__) || defined(WIN32) // Cannot set SOCK_NONBLOCK as a ::socket flag. diff --git a/source/common/network/socket_interface_impl.h b/source/common/network/socket_interface_impl.h index 68c030d070fab..916dbd79868fb 100644 --- a/source/common/network/socket_interface_impl.h +++ b/source/common/network/socket_interface_impl.h @@ -45,8 +45,8 @@ class SocketInterfaceImpl : public SocketInterfaceBase { const Io::IoUringFactory* io_uring_factory = nullptr); protected: - virtual IoHandlePtr makeSocket(int socket_fd, bool socket_v6only, - absl::optional domain) const; + virtual IoHandlePtr makeSocket(int socket_fd, bool socket_v6only, absl::optional domain, + const Io::IoUringFactory* io_uring_factory = nullptr) const; private: std::unique_ptr io_uring_factory_; From 28b9d796173e98d008b789b4b8960f152ccacc2b Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Thu, 14 Jul 2022 05:36:43 +0000 Subject: [PATCH 3/8] not set non-blocking flag for iouring --- source/common/network/socket_interface_impl.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/common/network/socket_interface_impl.cc b/source/common/network/socket_interface_impl.cc index 55189fbbe749c..1a401519c97e6 100644 --- a/source/common/network/socket_interface_impl.cc +++ b/source/common/network/socket_interface_impl.cc @@ -61,6 +61,10 @@ IoHandlePtr SocketInterfaceImpl::socket(Socket::Type socket_type, Address::Type #else int flags = SOCK_NONBLOCK; + if (io_uring_factory_ != nullptr) { + flags = 0; + } + if (options.mptcp_enabled_) { ASSERT(socket_type == Socket::Type::Stream); ASSERT(addr_type == Address::Type::Ip); From 5e8ce5ca6517e18d44cd43f86c2ac8e7e91ec807 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 31 Aug 2022 06:32:49 +0000 Subject: [PATCH 4/8] test socket interface Signed-off-by: He Jie Xu --- test/integration/filters/test_socket_interface.cc | 3 ++- test/integration/filters/test_socket_interface.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/test/integration/filters/test_socket_interface.cc b/test/integration/filters/test_socket_interface.cc index 618a27e257423..aec9395202d2f 100644 --- a/test/integration/filters/test_socket_interface.cc +++ b/test/integration/filters/test_socket_interface.cc @@ -59,7 +59,8 @@ IoHandlePtr TestIoSocketHandle::duplicate() { } IoHandlePtr TestSocketInterface::makeSocket(int socket_fd, bool socket_v6only, - absl::optional domain) const { + absl::optional domain, + const Io::IoUringFactory*) const { return std::make_unique(write_override_proc_, socket_fd, socket_v6only, domain); } diff --git a/test/integration/filters/test_socket_interface.h b/test/integration/filters/test_socket_interface.h index af28e319ce04a..4d3d89f04f0f9 100644 --- a/test/integration/filters/test_socket_interface.h +++ b/test/integration/filters/test_socket_interface.h @@ -98,7 +98,7 @@ class TestSocketInterface : public SocketInterfaceImpl { private: // SocketInterfaceImpl IoHandlePtr makeSocket(int socket_fd, bool socket_v6only, - absl::optional domain) const override; + absl::optional domain, const Io::IoUringFactory*) const override; const TestIoSocketHandle::WriteOverrideProc write_override_proc_; }; From 80c0b7ea4e7516d8a06c86fa5df1594c511da91b Mon Sep 17 00:00:00 2001 From: Xie Zhihao Date: Tue, 13 Sep 2022 09:12:19 +0000 Subject: [PATCH 5/8] chore: move constants to headers Signed-off-by: Xie Zhihao --- source/common/network/socket_interface_impl.cc | 9 --------- source/common/network/socket_interface_impl.h | 5 +++++ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/source/common/network/socket_interface_impl.cc b/source/common/network/socket_interface_impl.cc index 1a401519c97e6..25edd1a3d354d 100644 --- a/source/common/network/socket_interface_impl.cc +++ b/source/common/network/socket_interface_impl.cc @@ -14,15 +14,6 @@ namespace Envoy { namespace Network { -namespace { - -// TODO (soulxu): making those configurable if needed. -constexpr uint32_t DefaultIoUringSize = 300; -constexpr uint32_t DefaultReadBufferSize = 8192; -constexpr bool UseSubmissionQueuePolling = false; - -} // namespace - void DefaultSocketInterfaceExtension::onServerInitialized() { if (io_uring_factory_ != nullptr) { io_uring_factory_->onServerInitialized(); diff --git a/source/common/network/socket_interface_impl.h b/source/common/network/socket_interface_impl.h index 916dbd79868fb..4467ae1bfed9c 100644 --- a/source/common/network/socket_interface_impl.h +++ b/source/common/network/socket_interface_impl.h @@ -44,6 +44,11 @@ class SocketInterfaceImpl : public SocketInterfaceBase { makePlatformSpecificSocket(int socket_fd, bool socket_v6only, absl::optional domain, const Io::IoUringFactory* io_uring_factory = nullptr); + // TODO (soulxu): making those configurable if needed. + static constexpr uint32_t DefaultIoUringSize = 300; + static constexpr uint32_t DefaultReadBufferSize = 8192; + static constexpr bool UseSubmissionQueuePolling = false; + protected: virtual IoHandlePtr makeSocket(int socket_fd, bool socket_v6only, absl::optional domain, const Io::IoUringFactory* io_uring_factory = nullptr) const; From aa1ce451333fae223782595ab6cde07156f6ff82 Mon Sep 17 00:00:00 2001 From: Xie Zhihao Date: Tue, 13 Sep 2022 09:24:38 +0000 Subject: [PATCH 6/8] network: handle io_uring factory safely Signed-off-by: Xie Zhihao --- source/common/network/socket_interface_impl.cc | 15 ++++++++++----- source/common/network/socket_interface_impl.h | 6 +++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/source/common/network/socket_interface_impl.cc b/source/common/network/socket_interface_impl.cc index 25edd1a3d354d..29b052688a35d 100644 --- a/source/common/network/socket_interface_impl.cc +++ b/source/common/network/socket_interface_impl.cc @@ -52,7 +52,7 @@ IoHandlePtr SocketInterfaceImpl::socket(Socket::Type socket_type, Address::Type #else int flags = SOCK_NONBLOCK; - if (io_uring_factory_ != nullptr) { + if (io_uring_factory_.lock() != nullptr) { flags = 0; } @@ -91,7 +91,7 @@ IoHandlePtr SocketInterfaceImpl::socket(Socket::Type socket_type, Address::Type RELEASE_ASSERT(SOCKET_VALID(result.return_value_), fmt::format("socket(2) failed, got error: {}", errorDetails(result.errno_))); IoHandlePtr io_handle = - makeSocket(result.return_value_, socket_v6only, domain, io_uring_factory_.get()); + makeSocket(result.return_value_, socket_v6only, domain, io_uring_factory_.lock().get()); #if defined(__APPLE__) || defined(WIN32) // Cannot set SOCK_NONBLOCK as a ::socket flag. @@ -138,10 +138,15 @@ Server::BootstrapExtensionPtr SocketInterfaceImpl::createBootstrapExtension( const Protobuf::Message&, Server::Configuration::ServerFactoryContext& context) { // TODO (soulxu): Add runtime flag here. if (Io::isIoUringSupported()) { - io_uring_factory_ = std::make_unique( - DefaultIoUringSize, UseSubmissionQueuePolling, context.threadLocal()); + std::shared_ptr io_uring_factory = + std::make_unique(DefaultIoUringSize, UseSubmissionQueuePolling, + context.threadLocal()); + io_uring_factory_ = io_uring_factory; + + return std::make_unique(*this, io_uring_factory); + } else { + return std::make_unique(*this, nullptr); } - return std::make_unique(*this, io_uring_factory_); } ProtobufTypes::MessagePtr SocketInterfaceImpl::createEmptyConfigProto() { diff --git a/source/common/network/socket_interface_impl.h b/source/common/network/socket_interface_impl.h index 4467ae1bfed9c..17df52282ec55 100644 --- a/source/common/network/socket_interface_impl.h +++ b/source/common/network/socket_interface_impl.h @@ -11,14 +11,14 @@ namespace Network { class DefaultSocketInterfaceExtension : public Network::SocketInterfaceExtension { public: DefaultSocketInterfaceExtension(Network::SocketInterface& sock_interface, - std::unique_ptr& io_uring_factory) + std::shared_ptr io_uring_factory) : Network::SocketInterfaceExtension(sock_interface), io_uring_factory_(io_uring_factory) {} // Server::BootstrapExtension void onServerInitialized() override; protected: - std::unique_ptr& io_uring_factory_; + std::shared_ptr io_uring_factory_; }; class SocketInterfaceImpl : public SocketInterfaceBase { @@ -54,7 +54,7 @@ class SocketInterfaceImpl : public SocketInterfaceBase { const Io::IoUringFactory* io_uring_factory = nullptr) const; private: - std::unique_ptr io_uring_factory_; + std::weak_ptr io_uring_factory_; }; DECLARE_FACTORY(SocketInterfaceImpl); From 6f63b8429f6d18c68140d4f38c5cd168c789dac3 Mon Sep 17 00:00:00 2001 From: Xie Zhihao Date: Tue, 13 Sep 2022 11:10:30 +0000 Subject: [PATCH 7/8] server: fix duplicate socket interface Signed-off-by: Xie Zhihao --- source/server/server.cc | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/source/server/server.cc b/source/server/server.cc index b7591b35b6bd0..9cd42ec3a5816 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -600,26 +600,15 @@ void InstanceImpl::initialize(Network::Address::InstanceConstSharedPtr local_add std::move(safe_actions), std::move(unsafe_actions), api_->threadFactory()); } - Network::SocketInterface* sock = nullptr; - if (!bootstrap_.default_socket_interface().empty()) { auto& sock_name = bootstrap_.default_socket_interface(); - sock = const_cast(Network::socketInterface(sock_name)); - } - - if (bootstrap_.default_socket_interface().empty() || sock == nullptr) { - auto factory = - Registry::FactoryRegistry::getFactory( - "envoy.extensions.network.socket_interface.default_socket_interface"); - bootstrap_extensions_.push_back(factory->createBootstrapExtension( - *factory->createEmptyConfigProto(), serverFactoryContext())); - sock = dynamic_cast(factory); + auto sock = const_cast(Network::socketInterface(sock_name)); + if (sock != nullptr) { + Network::SocketInterfaceSingleton::clear(); + Network::SocketInterfaceSingleton::initialize(sock); + } } - ASSERT(sock != nullptr); - Network::SocketInterfaceSingleton::clear(); - Network::SocketInterfaceSingleton::initialize(sock); - // Workers get created first so they register for thread local updates. listener_manager_ = std::make_unique(*this, listener_component_factory_, worker_factory_, From fee498b3449cb3989e2909b92412cf2fae902515 Mon Sep 17 00:00:00 2001 From: Xie Zhihao Date: Tue, 13 Sep 2022 16:27:57 +0000 Subject: [PATCH 8/8] chore: remove unused class definitions Signed-off-by: Xie Zhihao --- source/common/network/io_uring_socket_handle_impl.h | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/source/common/network/io_uring_socket_handle_impl.h b/source/common/network/io_uring_socket_handle_impl.h index 737584bcff2e7..87721a67a9a76 100644 --- a/source/common/network/io_uring_socket_handle_impl.h +++ b/source/common/network/io_uring_socket_handle_impl.h @@ -4,13 +4,10 @@ #include "envoy/network/io_handle.h" #include "source/common/common/logger.h" +#include "source/common/io/io_uring.h" namespace Envoy { -namespace Io { -class IoUringFactory; -} // namespace Io - namespace Network { class IoUringSocketHandleImpl;