diff --git a/source/common/io/io_uring.h b/source/common/io/io_uring.h index 348f99db76f25..d0958c9cf695f 100644 --- a/source/common/io/io_uring.h +++ b/source/common/io/io_uring.h @@ -26,26 +26,20 @@ class IoUring { virtual ~IoUring() = default; /** - * Registers an eventfd file descriptor for the ring and returns it. + * Registers an event for the ring. * It can be used for integration with event loops. */ - virtual os_fd_t registerEventfd() PURE; + virtual void registerEvent(Event::Dispatcher& dispatcher, Event::FileTriggerType trigger) PURE; /** - * Resets the eventfd file descriptor for the ring. + * Resets the event for the ring. */ - virtual void unregisterEventfd() PURE; + virtual void unregisterEvent() PURE; /** - * Returns true if an eventfd file descriptor is registered with the ring. + * Returns true if an event is registered with the ring. */ - virtual bool isEventfdRegistered() const PURE; - - /** - * Iterates over entries in the completion queue, calls the given callback for - * every entry and marks them consumed. - */ - virtual void forEveryCompletion(CompletionCb completion_cb) PURE; + virtual bool isEventRegistered() const PURE; /** * Prepares an accept system call and puts it into the submission queue. @@ -53,7 +47,8 @@ class IoUring { * and IoUringResult::Ok otherwise. */ virtual IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr, - socklen_t* remote_addr_len, void* user_data) PURE; + socklen_t* remote_addr_len, void* user_data, + CompletionCb cb) PURE; /** * Prepares a connect system call and puts it into the submission queue. @@ -62,7 +57,7 @@ class IoUring { */ virtual IoUringResult prepareConnect(os_fd_t fd, const Network::Address::InstanceConstSharedPtr& address, - void* user_data) PURE; + void* user_data, CompletionCb cb) PURE; /** * Prepares a readv system call and puts it into the submission queue. @@ -70,7 +65,7 @@ class IoUring { * and IoUringResult::Ok otherwise. */ virtual IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, - off_t offset, void* user_data) PURE; + off_t offset, void* user_data, CompletionCb cb) PURE; /** * Prepares a writev system call and puts it into the submission queue. @@ -78,21 +73,29 @@ class IoUring { * and IoUringResult::Ok otherwise. */ virtual IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, - off_t offset, void* user_data) PURE; + off_t offset, void* user_data, CompletionCb cb) PURE; /** * Prepares a close system call and puts it into the submission queue. * Returns IoUringResult::Failed in case the submission queue is full already * and IoUringResult::Ok otherwise. */ - virtual IoUringResult prepareClose(os_fd_t fd, void* user_data) PURE; + virtual IoUringResult prepareClose(os_fd_t fd, void* user_data, CompletionCb cb) PURE; /** * Prepares a cancellation and puts it into the submission queue. * Returns IoUringResult::Failed in case the submission queue is full already * and IoUringResult::Ok otherwise. */ - virtual IoUringResult prepareCancel(void* cancelling_user_data, void* user_data) PURE; + virtual IoUringResult prepareCancel(void* cancelling_user_data, void* user_data, + CompletionCb cb) PURE; + + /** + * Prepares a nop and puts it into the cubmission queue. + * Returns IoUringResult::Failed in case the submission queue is full already + * and IoUringResult::Ok otherwise. + */ + virtual IoUringResult prepareNop(void* user_data, CompletionCb cb) PURE; /** * Submits the entries in the submission queue to the kernel using the @@ -103,6 +106,17 @@ class IoUring { * with the forEveryCompletion() method and try again. */ virtual IoUringResult submit() PURE; + + /** + * Submits the entries in the submission queue to the kernel using the + * `io_uring_enter()` system call. The submission may be delayed if there is an incoming + * submission. + * Returns IoUringResult::Ok in case of success and may return + * IoUringResult::Busy if we over commit the number of requests. In the latter + * case the application should drain the completion queue by handling some completions + * with the forEveryCompletion() method and try again. + */ + virtual IoUringResult trySubmit() PURE; }; /** diff --git a/source/common/io/io_uring_impl.cc b/source/common/io/io_uring_impl.cc index 14e3122dcc362..fd11fc1615072 100644 --- a/source/common/io/io_uring_impl.cc +++ b/source/common/io/io_uring_impl.cc @@ -32,8 +32,12 @@ OptRef IoUringFactoryImpl::get() const { void IoUringFactoryImpl::onServerInitialized() { tls_.set([io_uring_size = io_uring_size_, - use_submission_queue_polling = use_submission_queue_polling_](Event::Dispatcher&) { - return std::make_shared(io_uring_size, use_submission_queue_polling); + use_submission_queue_polling = + use_submission_queue_polling_](Event::Dispatcher& dispatcher) { + std::shared_ptr io_uring = + std::make_shared(io_uring_size, use_submission_queue_polling); + io_uring->registerEvent(dispatcher, Event::PlatformDefaultTriggerType); + return io_uring; }); } @@ -49,108 +53,140 @@ IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polli RELEASE_ASSERT(ret == 0, fmt::format("unable to initialize io_uring: {}", errorDetails(-ret))); } -IoUringImpl::~IoUringImpl() { io_uring_queue_exit(&ring_); } +IoUringImpl::~IoUringImpl() { + if (isEventRegistered()) { + unregisterEvent(); + } + io_uring_queue_exit(&ring_); +} -os_fd_t IoUringImpl::registerEventfd() { - ASSERT(!isEventfdRegistered()); +void IoUringImpl::registerEvent(Event::Dispatcher& dispatcher, Event::FileTriggerType trigger) { + ASSERT(!isEventRegistered()); event_fd_ = eventfd(0, 0); int res = io_uring_register_eventfd(&ring_, event_fd_); RELEASE_ASSERT(res == 0, fmt::format("unable to register eventfd: {}", errorDetails(-res))); - return event_fd_; + file_event_ = dispatcher.createFileEvent( + event_fd_, [this](uint32_t) { onFileEvent(); }, trigger, Event::FileReadyType::Read); } -void IoUringImpl::unregisterEventfd() { +void IoUringImpl::unregisterEvent() { int res = io_uring_unregister_eventfd(&ring_); RELEASE_ASSERT(res == 0, fmt::format("unable to unregister eventfd: {}", errorDetails(-res))); + file_event_.reset(); SET_SOCKET_INVALID(event_fd_); } -bool IoUringImpl::isEventfdRegistered() const { return SOCKET_VALID(event_fd_); } - -void IoUringImpl::forEveryCompletion(CompletionCb completion_cb) { - ASSERT(SOCKET_VALID(event_fd_)); - - eventfd_t v; - int ret = eventfd_read(event_fd_, &v); - RELEASE_ASSERT(ret == 0, "unable to drain eventfd"); - - unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_); - - for (unsigned i = 0; i < count; ++i) { - struct io_uring_cqe* cqe = cqes_[i]; - completion_cb(reinterpret_cast(cqe->user_data), cqe->res); - } - io_uring_cq_advance(&ring_, count); -} +bool IoUringImpl::isEventRegistered() const { return SOCKET_VALID(event_fd_); } IoUringResult IoUringImpl::prepareAccept(os_fd_t fd, struct sockaddr* remote_addr, - socklen_t* remote_addr_len, void* user_data) { + socklen_t* remote_addr_len, void* user_data, + CompletionCb cb) { struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); if (sqe == nullptr) { return IoUringResult::Failed; } io_uring_prep_accept(sqe, fd, remote_addr, remote_addr_len, 0); - io_uring_sqe_set_data(sqe, user_data); + if (user_data) { + io_uring_sqe_set_data(sqe, user_data); + } + + cbs_[reinterpret_cast(user_data)] = cb; return IoUringResult::Ok; } IoUringResult IoUringImpl::prepareConnect(os_fd_t fd, const Network::Address::InstanceConstSharedPtr& address, - void* user_data) { + void* user_data, CompletionCb cb) { struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); if (sqe == nullptr) { return IoUringResult::Failed; } io_uring_prep_connect(sqe, fd, address->sockAddr(), address->sockAddrLen()); - io_uring_sqe_set_data(sqe, user_data); + if (user_data) { + io_uring_sqe_set_data(sqe, user_data); + } + + cbs_[reinterpret_cast(user_data)] = cb; return IoUringResult::Ok; } IoUringResult IoUringImpl::prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, - off_t offset, void* user_data) { + off_t offset, void* user_data, CompletionCb cb) { struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); if (sqe == nullptr) { return IoUringResult::Failed; } io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset); - io_uring_sqe_set_data(sqe, user_data); + if (user_data) { + io_uring_sqe_set_data(sqe, user_data); + } + + cbs_[reinterpret_cast(user_data)] = cb; return IoUringResult::Ok; } IoUringResult IoUringImpl::prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, - off_t offset, void* user_data) { + off_t offset, void* user_data, CompletionCb cb) { struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); if (sqe == nullptr) { return IoUringResult::Failed; } io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset); - io_uring_sqe_set_data(sqe, user_data); + if (user_data) { + io_uring_sqe_set_data(sqe, user_data); + } + + cbs_[reinterpret_cast(user_data)] = cb; return IoUringResult::Ok; } -IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data) { +IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data, CompletionCb cb) { struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); if (sqe == nullptr) { return IoUringResult::Failed; } io_uring_prep_close(sqe, fd); - io_uring_sqe_set_data(sqe, user_data); + if (user_data) { + io_uring_sqe_set_data(sqe, user_data); + } + + cbs_[reinterpret_cast(user_data)] = cb; return IoUringResult::Ok; } -IoUringResult IoUringImpl::prepareCancel(void* cancelling_user_data, void* user_data) { +IoUringResult IoUringImpl::prepareCancel(void* cancelling_user_data, void* user_data, + CompletionCb cb) { struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); if (sqe == nullptr) { return IoUringResult::Failed; } io_uring_prep_cancel(sqe, cancelling_user_data, 0); - io_uring_sqe_set_data(sqe, user_data); + if (user_data) { + io_uring_sqe_set_data(sqe, user_data); + } + + cbs_[reinterpret_cast(user_data)] = cb; + return IoUringResult::Ok; +} + +IoUringResult IoUringImpl::prepareNop(void* user_data, CompletionCb cb) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (sqe == nullptr) { + return IoUringResult::Failed; + } + + io_uring_prep_nop(sqe); + if (user_data) { + io_uring_sqe_set_data(sqe, user_data); + } + + cbs_[reinterpret_cast(user_data)] = cb; return IoUringResult::Ok; } @@ -160,5 +196,39 @@ IoUringResult IoUringImpl::submit() { return res == -EBUSY ? IoUringResult::Busy : IoUringResult::Ok; } +IoUringResult IoUringImpl::trySubmit() { + if (delay_submit_) { + return IoUringResult::Ok; + } + + return submit(); +} + +void IoUringImpl::onFileEvent() { + ASSERT(SOCKET_VALID(event_fd_)); + delay_submit_ = true; + + eventfd_t v; + int ret = eventfd_read(event_fd_, &v); + RELEASE_ASSERT(ret == 0, "unable to drain eventfd"); + + unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_); + + for (unsigned i = 0; i < count; ++i) { + struct io_uring_cqe* cqe = cqes_[i]; + CompletionCb cb = cbs_[cqe->user_data]; + if (cb == nullptr) { + ENVOY_LOG_MISC(warn, "ignore CQ without correspoding callback"); + continue; + } + + cb(reinterpret_cast(cqe->user_data), cqe->res); + cbs_.erase(cqe->user_data); + } + io_uring_cq_advance(&ring_, count); + submit(); + delay_submit_ = false; +} + } // namespace Io } // namespace Envoy diff --git a/source/common/io/io_uring_impl.h b/source/common/io/io_uring_impl.h index 176080d551cfe..5cc192d74fdcb 100644 --- a/source/common/io/io_uring_impl.h +++ b/source/common/io/io_uring_impl.h @@ -16,27 +16,34 @@ class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject { IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling); ~IoUringImpl() override; - os_fd_t registerEventfd() override; - void unregisterEventfd() override; - bool isEventfdRegistered() const override; - void forEveryCompletion(CompletionCb completion_cb) override; + void registerEvent(Event::Dispatcher& dispatcher, Event::FileTriggerType trigger) override; + void unregisterEvent() override; + bool isEventRegistered() const override; IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr, socklen_t* remote_addr_len, - void* user_data) override; + void* user_data, CompletionCb cb) override; IoUringResult prepareConnect(os_fd_t fd, const Network::Address::InstanceConstSharedPtr& address, - void* user_data) override; + void* user_data, CompletionCb cb) override; IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, off_t offset, - void* user_data) override; + void* user_data, CompletionCb cb) override; IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, - off_t offset, void* user_data) override; - IoUringResult prepareClose(os_fd_t fd, void* user_data) override; - IoUringResult prepareCancel(void* cancelling_user_data, void* user_data) override; + off_t offset, void* user_data, CompletionCb cb) override; + IoUringResult prepareClose(os_fd_t fd, void* user_data, CompletionCb cb) override; + IoUringResult prepareCancel(void* cancelling_user_data, void* user_data, + CompletionCb cb) override; + IoUringResult prepareNop(void* user_data, CompletionCb cb) override; IoUringResult submit() override; + IoUringResult trySubmit() override; private: const uint32_t io_uring_size_; struct io_uring ring_ {}; std::vector cqes_; os_fd_t event_fd_{INVALID_SOCKET}; + Event::FileEventPtr file_event_{nullptr}; + absl::flat_hash_map cbs_; + bool delay_submit_{false}; + + void onFileEvent(); }; class IoUringFactoryImpl : public IoUringFactory { diff --git a/source/common/network/io_uring_socket_handle_impl.cc b/source/common/network/io_uring_socket_handle_impl.cc index 4b312d23366f8..7bc7b73abd394 100644 --- a/source/common/network/io_uring_socket_handle_impl.cc +++ b/source/common/network/io_uring_socket_handle_impl.cc @@ -10,6 +10,7 @@ #include "source/common/io/io_uring.h" #include "source/common/network/address_impl.h" #include "source/common/network/io_socket_error_impl.h" +#include "source/common/network/socket_interface_impl.h" namespace Envoy { namespace Network { @@ -37,31 +38,34 @@ IoUringSocketHandleImpl::~IoUringSocketHandleImpl() { Api::IoCallUint64Result IoUringSocketHandleImpl::close() { ASSERT(SOCKET_VALID(fd_)); - auto& uring = io_uring_factory_.get().ref(); if (read_req_) { - auto req = new Request{*this, RequestType::Cancel}; - auto res = uring.prepareCancel(read_req_, req); + auto req = new Request{RequestType::Cancel}; + auto res = ioUring().prepareCancel(read_req_, req, nullptr); if (res == Io::IoUringResult::Failed) { // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. - uring.submit(); - res = uring.prepareCancel(read_req_, req); + ioUring().submit(); + res = ioUring().prepareCancel(read_req_, req, nullptr); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare cancel"); + } + } + if (accept_req_) { + auto req = new Request{RequestType::Cancel}; + auto res = ioUring().prepareCancel(accept_req_, req, nullptr); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + ioUring().submit(); + res = ioUring().prepareCancel(accept_req_, req, nullptr); RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare cancel"); } } - auto req = new Request{absl::nullopt, RequestType::Close}; - auto res = uring.prepareClose(fd_, req); + auto req = new Request{RequestType::Close}; + auto res = ioUring().prepareClose(fd_, req, nullptr); if (res == Io::IoUringResult::Failed) { // Fall back to posix system call. ::close(fd_); } - uring.submit(); - if (isLeader()) { - if (uring.isEventfdRegistered()) { - uring.unregisterEventfd(); - } - file_event_adapter_.reset(); - } + ioUring().trySubmit(); SET_SOCKET_INVALID(fd_); return Api::ioCallUint64ResultNoError(); } @@ -74,22 +78,22 @@ IoUringSocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices, ui return Api::ioCallUint64ResultNoError(); } - if (bytes_to_read_ < 0) { - return {0, Api::IoErrorPtr(new IoSocketError(-bytes_to_read_), IoSocketError::deleteIoError)}; + if (read_ret_ < 0) { + return {0, Api::IoErrorPtr(new IoSocketError(-read_ret_), IoSocketError::deleteIoError)}; } - if (bytes_to_read_ == 0 || read_req_ == nullptr) { + if (read_ret_ == 0 || read_req_ == nullptr) { return {0, Api::IoErrorPtr(IoSocketError::getIoSocketEagainInstance(), IoSocketError::deleteIoError)}; } - const uint64_t max_read_length = std::min(max_length, static_cast(bytes_to_read_)); + const uint64_t max_read_length = std::min(max_length, static_cast(read_ret_)); uint64_t num_bytes_to_read = read_buf_.copyOutToSlices(max_read_length, slices, num_slice); ASSERT(num_bytes_to_read <= max_read_length); read_buf_.drain(num_bytes_to_read); - bytes_to_read_ -= num_bytes_to_read; - if (bytes_to_read_ == 0) { - bytes_to_read_ = 0; + read_ret_ -= num_bytes_to_read; + if (read_ret_ == 0) { + read_ret_ = 0; read_req_ = nullptr; addReadRequest(); } @@ -138,14 +142,13 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::writev(const Buffer::RawSlice* IoSocketError::deleteIoError)}; } - if (bytes_already_wrote_ < 0) { - return { - 0, Api::IoErrorPtr(new IoSocketError(-bytes_already_wrote_), IoSocketError::deleteIoError)}; + if (write_ret_ < 0) { + return {0, Api::IoErrorPtr(new IoSocketError(-write_ret_), IoSocketError::deleteIoError)}; } - if (bytes_already_wrote_ > 0) { - uint64_t len = bytes_already_wrote_; - bytes_already_wrote_ = 0; + if (write_ret_ > 0) { + uint64_t len = write_ret_; + write_ret_ = 0; return {len, Api::IoErrorPtr(nullptr, IoSocketError::deleteIoError)}; } @@ -162,17 +165,22 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::writev(const Buffer::RawSlice* if (num_slices_to_write > 0) { is_write_added_ = true; // don't add WRITE if it's been already added. - auto req = new Request{*this, RequestType::Write, iovecs}; - auto& uring = io_uring_factory_.get().ref(); - auto res = uring.prepareWritev(fd_, iovecs, num_slice, 0, req); + auto req = new Request{RequestType::Write, iovecs}; + auto res = ioUring().prepareWritev( + fd_, iovecs, num_slice, 0, req, [this](void* user_data, int32_t result) { + this->onRequestCompletion(reinterpret_cast(user_data), result); + }); if (res == Io::IoUringResult::Failed) { // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. - uring.submit(); - res = uring.prepareWritev(fd_, iovecs, num_slice, 0, req); + ioUring().submit(); + res = ioUring().prepareWritev( + fd_, iovecs, num_slice, 0, req, [this](void* user_data, int32_t result) { + this->onRequestCompletion(reinterpret_cast(user_data), result); + }); RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare writev"); } // Need to ensure the write request submitted. - uring.submit(); + ioUring().trySubmit(); } return { @@ -223,29 +231,43 @@ Api::SysCallIntResult IoUringSocketHandleImpl::bind(Address::InstanceConstShared } Api::SysCallIntResult IoUringSocketHandleImpl::listen(int backlog) { - file_event_adapter_ = - std::make_unique(read_buffer_size_, io_uring_factory_, fd_); + is_listener_ = true; return Api::OsSysCallsSingleton::get().listen(fd_, backlog); } IoHandlePtr IoUringSocketHandleImpl::accept(struct sockaddr* addr, socklen_t* addrlen) { - return file_event_adapter_->accept(addr, addrlen); + if (accept_req_ == nullptr || SOCKET_INVALID(connection_fd_)) { + return nullptr; + } + + *addr = connection_addr_; + *addrlen = connection_addr_len_; + auto io_handle = std::make_unique(read_buffer_size_, io_uring_factory_, + connection_fd_); + io_handle->addReadRequest(); + SET_SOCKET_INVALID(connection_fd_); + accept_req_ = nullptr; + addAcceptRequest(); + return io_handle; } Api::SysCallIntResult IoUringSocketHandleImpl::connect(Address::InstanceConstSharedPtr address) { - auto& uring = io_uring_factory_.get().ref(); - auto req = new Request{*this, RequestType::Connect}; - auto res = uring.prepareConnect(fd_, address, req); + auto req = new Request{RequestType::Connect}; + auto res = ioUring().prepareConnect(fd_, address, req, [this](void* user_data, int32_t result) { + this->onRequestCompletion(reinterpret_cast(user_data), result); + }); if (res == Io::IoUringResult::Failed) { - res = uring.submit(); + res = ioUring().submit(); if (res == Io::IoUringResult::Busy) { return Api::SysCallIntResult{0, SOCKET_ERROR_AGAIN}; } - res = uring.prepareConnect(fd_, address, req); + res = ioUring().prepareConnect(fd_, address, req, [this](void* user_data, int32_t result) { + this->onRequestCompletion(reinterpret_cast(user_data), result); + }); RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare connect"); } // Need to ensure the connect request submitted. - uring.submit(); + ioUring().trySubmit(); return Api::SysCallIntResult{0, SOCKET_ERROR_IN_PROGRESS}; } @@ -256,6 +278,13 @@ Api::SysCallIntResult IoUringSocketHandleImpl::setOption(int level, int optname, Api::SysCallIntResult IoUringSocketHandleImpl::getOption(int level, int optname, void* optval, socklen_t* optlen) { + // ConnectionImpl will check connect result via getOption. + if (connect_ret_ < 0 && optname == SO_ERROR) { + int ret = connect_ret_; + connect_ret_ = 1; + return Api::SysCallIntResult{0, -ret}; + } + return Api::OsSysCallsSingleton::get().getsockopt(fd_, level, optname, optval, optlen); } @@ -312,28 +341,13 @@ Address::InstanceConstSharedPtr IoUringSocketHandleImpl::peerAddress() { return Address::addressFromSockAddrOrThrow(ss, ss_len, socket_v6only_); } -void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, - Event::FileReadyCb cb, - Event::FileTriggerType trigger, uint32_t events) { - // Check if this is a server socket accepting new connections. - if (isLeader()) { - // Multiple listeners in single thread, there can be registered by other listener. - if (!io_uring_factory_.get().ref().isEventfdRegistered()) { - file_event_adapter_->initialize(dispatcher, cb, trigger, events); - } - file_event_adapter_->addAcceptRequest(); - io_uring_factory_.get().ref().submit(); - return; - } - - // Check if this is going to become a leading client socket. - if (!io_uring_factory_.get().ref().isEventfdRegistered()) { - file_event_adapter_ = - std::make_unique(read_buffer_size_, io_uring_factory_, fd_); - file_event_adapter_->initialize(dispatcher, cb, trigger, events); - } - +void IoUringSocketHandleImpl::initializeFileEvent(Event::Dispatcher&, Event::FileReadyCb cb, + Event::FileTriggerType, uint32_t) { cb_ = std::move(cb); + if (is_listener_) { + addAcceptRequest(); + ioUring().trySubmit(); + } } IoHandlePtr IoUringSocketHandleImpl::duplicate() { PANIC("not implemented"); } @@ -349,38 +363,26 @@ void IoUringSocketHandleImpl::enableFileEvents(uint32_t events) { if (events & Event::FileReadyType::Read) { is_read_enabled_ = true; addReadRequest(); - cb_(Event::FileReadyType::Read); + auto req = new Request{RequestType::Unknown}; + auto res = ioUring().prepareNop( + req, [this](void*, int32_t) { this->cb_(Event::FileReadyType::Read); }); + if (res == Io::IoUringResult::Failed) { + res = ioUring().submit(); + res = ioUring().prepareNop(req, + [this](void*, int32_t) { this->cb_(Event::FileReadyType::Read); }); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare nop"); + } } else { is_read_enabled_ = false; } } -void IoUringSocketHandleImpl::resetFileEvents() { file_event_adapter_.reset(); } +void IoUringSocketHandleImpl::resetFileEvents() {} Api::SysCallIntResult IoUringSocketHandleImpl::shutdown(int how) { return Api::OsSysCallsSingleton::get().shutdown(fd_, how); } -void IoUringSocketHandleImpl::addReadRequest() { - if (!is_read_enabled_ || !SOCKET_VALID(fd_) || read_req_) { - return; - } - - read_req_ = new Request{*this, RequestType::Read}; - read_req_->buf_ = std::make_unique(read_buffer_size_); - read_req_->iov_ = new struct iovec[1]; - read_req_->iov_->iov_base = read_req_->buf_.get(); - read_req_->iov_->iov_len = read_buffer_size_; - auto& uring = io_uring_factory_.get().ref(); - auto res = uring.prepareReadv(fd_, read_req_->iov_, 1, 0, read_req_); - if (res == Io::IoUringResult::Failed) { - // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. - uring.submit(); - res = uring.prepareReadv(fd_, read_req_->iov_, 1, 0, read_req_); - RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare readv"); - } -} - absl::optional IoUringSocketHandleImpl::interfaceName() { // TODO(rojkov): This is a copy-paste from Network::IoSocketHandleImpl. // Unification is needed. @@ -433,143 +435,117 @@ absl::optional IoUringSocketHandleImpl::interfaceName() { return selected_interface_name; } -IoHandlePtr IoUringSocketHandleImpl::FileEventAdapter::accept(struct sockaddr* addr, - socklen_t* addrlen) { - if (!is_accept_added_) { - return nullptr; +Io::IoUring& IoUringSocketHandleImpl::ioUring() { + if (io_uring_ == absl::nullopt) { + io_uring_ = io_uring_factory_.get(); } - ASSERT(SOCKET_VALID(connection_fd_)); - - is_accept_added_ = false; - *addr = remote_addr_; - *addrlen = remote_addr_len_; - auto io_handle = std::make_unique(read_buffer_size_, io_uring_factory_, - connection_fd_); - SET_SOCKET_INVALID(connection_fd_); - io_handle->addReadRequest(); - return io_handle; + return io_uring_.ref(); } -void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Request& req, - int32_t result) { - if (result < 0) { - ENVOY_LOG(debug, "async request failed: {}", errorDetails(-result)); +void IoUringSocketHandleImpl::addAcceptRequest() { + if (accept_req_) { + return; } - switch (req.type_) { - case RequestType::Accept: - ASSERT(!SOCKET_VALID(connection_fd_)); - addAcceptRequest(); - if (result >= 0) { - connection_fd_ = result; - cb_(Event::FileReadyType::Read); - } - break; - case RequestType::Read: { - // Read is cancellable. - if (result == -ECANCELED) { - return; - } - ASSERT(req.iohandle_.has_value()); - auto& iohandle = req.iohandle_->get(); - // This is hacky fix, we should check the req is valid or not. - if (iohandle.fd_ == -1) { - ENVOY_LOG_MISC(debug, "the uring's fd already closed"); - return; - } - - iohandle.bytes_to_read_ = result; - if (result == 0) { - iohandle.remote_closed_ = true; - } - if (result > 0) { - Buffer::BufferFragment* fragment = new Buffer::BufferFragmentImpl( - const_cast(req).buf_.release(), result, - [](const void* data, size_t /*len*/, const Buffer::BufferFragmentImpl* this_fragment) { - delete[] reinterpret_cast(data); - delete this_fragment; - }); - iohandle.read_buf_.addBufferFragment(*fragment); - } - iohandle.cb_(Event::FileReadyType::Read); - break; - } - case RequestType::Connect: { - ASSERT(req.iohandle_.has_value()); - auto& iohandle = req.iohandle_->get(); - if (result < 0) { - iohandle.cb_(Event::FileReadyType::Closed); - return; - } - - iohandle.cb_(Event::FileReadyType::Write); - iohandle.addReadRequest(); - break; - } - case RequestType::Write: { - ASSERT(req.iohandle_.has_value()); - auto& iohandle = req.iohandle_->get(); - // This is hacky fix, we should check the req is valid or not. - if (iohandle.fd_ == -1) { - ENVOY_LOG_MISC(debug, "the uring's fd already closed"); - return; - } + accept_req_ = new Request{RequestType::Accept}; + auto res = ioUring().prepareAccept( + fd_, &accept_req_->remote_addr_, &accept_req_->remote_addr_len_, accept_req_, + [this](void* user_data, int32_t result) { + this->onRequestCompletion(reinterpret_cast(user_data), result); + }); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + ioUring().submit(); + res = ioUring().prepareAccept(fd_, &accept_req_->remote_addr_, &accept_req_->remote_addr_len_, + accept_req_, [this](void* user_data, int32_t result) { + this->onRequestCompletion(reinterpret_cast(user_data), + result); + }); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare readv"); + } +} - iohandle.bytes_already_wrote_ = result; - iohandle.is_write_added_ = false; - iohandle.cb_(Event::FileReadyType::Write); - break; +void IoUringSocketHandleImpl::addReadRequest() { + if (!is_read_enabled_ || SOCKET_INVALID(fd_) || read_req_) { + return; } - case RequestType::Close: - break; - case RequestType::Cancel: - break; - default: - PANIC("not implemented"); + + read_req_ = new Request{RequestType::Read}; + read_req_->buf_ = std::make_unique(read_buffer_size_); + read_req_->iov_ = new struct iovec[1]; + read_req_->iov_->iov_base = read_req_->buf_.get(); + read_req_->iov_->iov_len = read_buffer_size_; + auto res = ioUring().prepareReadv( + fd_, read_req_->iov_, 1, 0, read_req_, [this](void* user_data, int32_t result) { + this->onRequestCompletion(reinterpret_cast(user_data), result); + }); + if (res == Io::IoUringResult::Failed) { + // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. + ioUring().submit(); + res = ioUring().prepareReadv( + fd_, read_req_->iov_, 1, 0, read_req_, [this](void* user_data, int32_t result) { + this->onRequestCompletion(reinterpret_cast(user_data), result); + }); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare readv"); } } -void IoUringSocketHandleImpl::FileEventAdapter::onFileEvent() { - Io::IoUring& uring = io_uring_factory_.get().ref(); - uring.forEveryCompletion([this](void* user_data, int32_t result) { - auto req = static_cast(user_data); - onRequestCompletion(*req, result); - if (req->iov_) { - delete[] req->iov_; +void IoUringSocketHandleImpl::onRequestCompletion(Request* request, int32_t result) { + if (result < 0) { + ENVOY_LOG(debug, "async request failed: {}", errorDetails(-result)); + } + // TODO(zhxie): Cancel requests instead of escaping completion. + if (SOCKET_VALID(fd_)) { + switch (request->type_) { + case RequestType::Accept: + ASSERT(SOCKET_INVALID(connection_fd_)); + connection_fd_ = result; + connection_addr_ = request->remote_addr_; + connection_addr_len_ = request->remote_addr_len_; + cb_(Event::FileReadyType::Read); + break; + case RequestType::Connect: + connect_ret_ = result; + cb_(Event::FileReadyType::Write); + if (result >= 0) { + addReadRequest(); + } + break; + case RequestType::Read: + read_ret_ = result; + if (result == 0) { + remote_closed_ = true; + } else if (result > 0) { + Buffer::BufferFragment* fragment = new Buffer::BufferFragmentImpl( + request->buf_.release(), result, + [](const void* data, size_t /*len*/, const Buffer::BufferFragmentImpl* this_fragment) { + delete[] reinterpret_cast(data); + delete this_fragment; + }); + read_buf_.addBufferFragment(*fragment); + } + cb_(Event::FileReadyType::Read); + break; + case RequestType::Write: + write_ret_ = result; + is_write_added_ = false; + cb_(Event::FileReadyType::Write); + break; + case RequestType::Close: + break; + case RequestType::Cancel: + break; + default: + PANIC("not implemented"); } - delete req; - }); - uring.submit(); -} - -void IoUringSocketHandleImpl::FileEventAdapter::initialize(Event::Dispatcher& dispatcher, - Event::FileReadyCb cb, - Event::FileTriggerType trigger, - uint32_t) { - ASSERT(file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same " - "file descriptor. This is not allowed."); - - cb_ = std::move(cb); - Io::IoUring& uring = io_uring_factory_.get().ref(); - const os_fd_t event_fd = uring.registerEventfd(); - // We only care about the read event of Eventfd, since we only receive the - // event here. - file_event_ = dispatcher.createFileEvent( - event_fd, [this](uint32_t) { onFileEvent(); }, trigger, Event::FileReadyType::Read); -} + } -void IoUringSocketHandleImpl::FileEventAdapter::addAcceptRequest() { - is_accept_added_ = true; - auto& uring = io_uring_factory_.get().ref(); - auto req = new Request{absl::nullopt, RequestType::Accept}; - auto res = uring.prepareAccept(fd_, &remote_addr_, &remote_addr_len_, req); - if (res == Io::IoUringResult::Failed) { - // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. - uring.submit(); - res = uring.prepareAccept(fd_, &remote_addr_, &remote_addr_len_, req); - RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare accept"); + // Cleanup. + if (request->iov_) { + delete[] request->iov_; } + delete request; } } // namespace Network diff --git a/source/common/network/io_uring_socket_handle_impl.h b/source/common/network/io_uring_socket_handle_impl.h index 9e733fdbb504d..6f598d6991eea 100644 --- a/source/common/network/io_uring_socket_handle_impl.h +++ b/source/common/network/io_uring_socket_handle_impl.h @@ -19,10 +19,11 @@ using IoUringSocketHandleImplOptRef = absl::optional>; struct Request { - IoUringSocketHandleImplOptRef iohandle_{absl::nullopt}; RequestType type_{RequestType::Unknown}; struct iovec* iov_{nullptr}; std::unique_ptr buf_{}; + struct sockaddr remote_addr_ {}; + socklen_t remote_addr_len_{sizeof(remote_addr_)}; }; /** @@ -81,39 +82,10 @@ class IoUringSocketHandleImpl final : public IoHandle, protected Logger::Loggabl absl::optional interfaceName() override; private: - // FileEventAdapter adapts `io_uring` to libevent. - class FileEventAdapter { - public: - FileEventAdapter(const uint32_t read_buffer_size, const Io::IoUringFactory& io_uring_factory, - os_fd_t fd) - : read_buffer_size_(read_buffer_size), io_uring_factory_(io_uring_factory), fd_(fd) {} - void initialize(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, - Event::FileTriggerType trigger, uint32_t events); - IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen); - void addAcceptRequest(); - - private: - void onFileEvent(); - void onRequestCompletion(const Request& req, int32_t result); - - const uint32_t read_buffer_size_; - const Io::IoUringFactory& io_uring_factory_; - os_fd_t fd_; - Event::FileReadyCb cb_; - Event::FileEventPtr file_event_{nullptr}; - os_fd_t connection_fd_{INVALID_SOCKET}; - bool is_accept_added_{false}; - struct sockaddr remote_addr_; - socklen_t remote_addr_len_{sizeof(remote_addr_)}; - }; - + Io::IoUring& ioUring(); + void addAcceptRequest(); void addReadRequest(); - // Checks if the io handle is the one that registered eventfd with `io_uring`. - // An io handle can be a leader in two cases: - // 1. it's a server socket accepting new connections; - // 2. it's a client socket about to connect to a remote socket, but created - // in a thread without properly initialized `io_uring`. - bool isLeader() const { return file_event_adapter_ != nullptr; } + void onRequestCompletion(Request* request, int32_t result); const uint32_t read_buffer_size_; const Io::IoUringFactory& io_uring_factory_; @@ -121,14 +93,20 @@ class IoUringSocketHandleImpl final : public IoHandle, protected Logger::Loggabl int socket_v6only_; const absl::optional domain_; + OptRef io_uring_{absl::nullopt}; + bool is_listener_{false}; Event::FileReadyCb cb_; + os_fd_t connection_fd_{INVALID_SOCKET}; + struct sockaddr connection_addr_; + socklen_t connection_addr_len_; + Request* accept_req_{nullptr}; + int32_t connect_ret_{0}; Buffer::OwnedImpl read_buf_; - int32_t bytes_to_read_{0}; + int32_t read_ret_{0}; Request* read_req_{nullptr}; bool is_read_enabled_{true}; - int32_t bytes_already_wrote_{0}; + int32_t write_ret_{0}; bool is_write_added_{false}; - std::unique_ptr file_event_adapter_{nullptr}; bool remote_closed_{false}; };