diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index 1f1057b237d47..4a4c4c479c5f2 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -176,12 +176,12 @@ REPOSITORY_LOCATIONS_SPEC = dict( project_name = "liburing", project_desc = "C helpers to set up and tear down io_uring instances", project_url = "https://github.com/axboe/liburing", - version = "2.1", - sha256 = "f1e0500cb3934b0b61c5020c3999a973c9c93b618faff1eba75aadc95bb03e07", - strip_prefix = "liburing-liburing-{version}", - urls = ["https://github.com/axboe/liburing/archive/liburing-{version}.tar.gz"], + version = "1ddd58dab67ea21b94463326ddfda0388ca21ca2", + sha256 = "166fca1a8a65366b43901fb8498e2f16ecabda7e593e1c0c5cfd16bd35938df0", + strip_prefix = "liburing-{version}", + urls = ["https://github.com/axboe/liburing/archive/{version}.tar.gz"], use_category = ["dataplane_core", "controlplane"], - release_date = "2021-09-09", + release_date = "2022-10-20", cpe = "N/A", ), # This dependency is built only when performance tracing is enabled with the diff --git a/source/common/io/io_uring.h b/source/common/io/io_uring.h index 26ef78ea9173f..bc1c455c45e4c 100644 --- a/source/common/io/io_uring.h +++ b/source/common/io/io_uring.h @@ -87,6 +87,13 @@ class IoUring { */ virtual IoUringResult prepareClose(os_fd_t fd, void* user_data) PURE; + /** + * Cancels an entry in the submission queue. + * Returns IoUringResult::Failed in case the cancellation is failed and + * IoUringResult::Ok otherwise. + */ + virtual IoUringResult cancel(void* user_data) PURE; + /** * Submits the entries in the submission queue to the kernel using the * `io_uring_enter()` system call. diff --git a/source/common/io/io_uring_impl.cc b/source/common/io/io_uring_impl.cc index 97258e524cf4f..1f406256c6a6c 100644 --- a/source/common/io/io_uring_impl.cc +++ b/source/common/io/io_uring_impl.cc @@ -143,6 +143,17 @@ IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data) { return IoUringResult::Ok; } +IoUringResult IoUringImpl::cancel(void* user_data) { + struct io_uring_sync_cancel_reg reg {}; + reg.addr = reinterpret_cast(user_data); + int res = io_uring_register_sync_cancel(&ring_, ®); + if (res < 0 && res != -ENOENT) { + return IoUringResult::Failed; + } + + return IoUringResult::Ok; +} + IoUringResult IoUringImpl::submit() { int res = io_uring_submit(&ring_); RELEASE_ASSERT(res >= 0 || res == -EBUSY, "unable to submit io_uring queue entries"); diff --git a/source/common/io/io_uring_impl.h b/source/common/io/io_uring_impl.h index d97b5031e8aa3..7a8e08d4c5280 100644 --- a/source/common/io/io_uring_impl.h +++ b/source/common/io/io_uring_impl.h @@ -29,6 +29,7 @@ class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject { IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, off_t offset, void* user_data) override; IoUringResult prepareClose(os_fd_t fd, void* user_data) override; + IoUringResult cancel(void* user_data) override; IoUringResult submit() override; private: diff --git a/source/common/network/io_uring_socket_handle_impl.cc b/source/common/network/io_uring_socket_handle_impl.cc index 3044e43a296a4..bbce16989be75 100644 --- a/source/common/network/io_uring_socket_handle_impl.cc +++ b/source/common/network/io_uring_socket_handle_impl.cc @@ -38,15 +38,23 @@ IoUringSocketHandleImpl::~IoUringSocketHandleImpl() { Api::IoCallUint64Result IoUringSocketHandleImpl::close() { ASSERT(SOCKET_VALID(fd_)); + auto& uring = io_uring_factory_.get().ref(); + if (read_req_) { + auto res = uring.cancel(read_req_); + RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to cancel"); + } + read_req_ = nullptr; + auto req = new Request{absl::nullopt, RequestType::Close}; - Io::IoUringResult res = io_uring_factory_.get().ref().prepareClose(fd_, req); + auto res = uring.prepareClose(fd_, req); if (res == Io::IoUringResult::Failed) { // Fall back to posix system call. ::close(fd_); } + uring.submit(); if (isLeader()) { - if (io_uring_factory_.get().ref().isEventfdRegistered()) { - io_uring_factory_.get().ref().unregisterEventfd(); + if (uring.isEventfdRegistered()) { + uring.unregisterEventfd(); } file_event_adapter_.reset(); } @@ -73,7 +81,7 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::readv(uint64_t /* max_length */ num_bytes_to_read += slice_length; } ASSERT(num_bytes_to_read <= static_cast(bytes_to_read_)); - is_read_added_ = false; + read_req_ = nullptr; uint64_t len = bytes_to_read_; bytes_to_read_ = 0; @@ -107,7 +115,7 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::read(Buffer::Instance& buffer, delete this_fragment; }); buffer.addBufferFragment(*fragment); - is_read_added_ = false; + read_req_ = nullptr; uint64_t len = bytes_to_read_; bytes_to_read_ = 0; @@ -316,22 +324,21 @@ void IoUringSocketHandleImpl::resetFileEvents() { file_event_adapter_.reset(); } Api::SysCallIntResult IoUringSocketHandleImpl::shutdown(int how) { return Api::OsSysCallsSingleton::get().shutdown(fd_, how); } void IoUringSocketHandleImpl::addReadRequest() { - if (!is_read_enabled_ || !SOCKET_VALID(fd_) || is_read_added_) { + if (!is_read_enabled_ || !SOCKET_VALID(fd_) || read_req_) { return; } ASSERT(read_buf_ == nullptr); - is_read_added_ = true; // don't add READ if it's been already added. read_buf_ = std::unique_ptr(new uint8_t[read_buffer_size_]); iov_.iov_base = read_buf_.get(); iov_.iov_len = read_buffer_size_; auto& uring = io_uring_factory_.get().ref(); - auto req = new Request{*this, RequestType::Read}; - auto res = uring.prepareReadv(fd_, &iov_, 1, 0, req); + read_req_ = new Request{*this, RequestType::Read}; + auto res = uring.prepareReadv(fd_, &iov_, 1, 0, read_req_); if (res == Io::IoUringResult::Failed) { // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped. uring.submit(); - res = uring.prepareReadv(fd_, &iov_, 1, 0, req); + res = uring.prepareReadv(fd_, &iov_, 1, 0, read_req_); RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare readv"); } } @@ -496,13 +503,17 @@ void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Reques break; case RequestType::Read: { ASSERT(req.iohandle_.has_value()); + // Read is cancellable. + if (result == -ECANCELED) { + return; + } auto& iohandle = req.iohandle_->get(); - iohandle.bytes_to_read_ = result; // This is hacky fix, we should check the req is valid or not. if (iohandle.fd_ == -1) { ENVOY_LOG_MISC(debug, "the uring's fd already closed"); break; } + iohandle.bytes_to_read_ = result; if (result == 0) { iohandle.remote_closed_ = true; @@ -522,7 +533,6 @@ void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Reques ASSERT(req.iov_ != nullptr); ASSERT(req.iohandle_.has_value()); auto& iohandle = req.iohandle_->get(); - // This is hacky fix, we should check the req is valid or not. if (iohandle.fd_ == -1) { ENVOY_LOG_MISC(debug, "the uring's fd already closed"); diff --git a/source/common/network/io_uring_socket_handle_impl.h b/source/common/network/io_uring_socket_handle_impl.h index ae1119f9e087d..bb221019fe3ad 100644 --- a/source/common/network/io_uring_socket_handle_impl.h +++ b/source/common/network/io_uring_socket_handle_impl.h @@ -126,7 +126,7 @@ class IoUringSocketHandleImpl final : public IoHandle, protected Logger::Loggabl struct iovec iov_; std::unique_ptr read_buf_{nullptr}; int32_t bytes_to_read_{0}; - bool is_read_added_{false}; + Request* read_req_{nullptr}; bool is_read_enabled_{true}; std::list write_buf_{}; uint32_t vecs_to_write_{0};