Skip to content
10 changes: 5 additions & 5 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "liburing",
project_desc = "C helpers to set up and tear down io_uring instances",
project_url = "https://github.com/axboe/liburing",
version = "2.1",
sha256 = "f1e0500cb3934b0b61c5020c3999a973c9c93b618faff1eba75aadc95bb03e07",
strip_prefix = "liburing-liburing-{version}",
urls = ["https://github.com/axboe/liburing/archive/liburing-{version}.tar.gz"],
version = "1ddd58dab67ea21b94463326ddfda0388ca21ca2",
sha256 = "166fca1a8a65366b43901fb8498e2f16ecabda7e593e1c0c5cfd16bd35938df0",
strip_prefix = "liburing-{version}",
urls = ["https://github.com/axboe/liburing/archive/{version}.tar.gz"],
use_category = ["dataplane_core", "controlplane"],
release_date = "2021-09-09",
release_date = "2022-10-20",
cpe = "N/A",
),
# This dependency is built only when performance tracing is enabled with the
Expand Down
7 changes: 7 additions & 0 deletions source/common/io/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ class IoUring {
*/
virtual IoUringResult prepareClose(os_fd_t fd, void* user_data) PURE;

/**
* Cancels an entry in the submission queue.
* Returns IoUringResult::Failed in case the cancellation is failed and
* IoUringResult::Ok otherwise.
*/
virtual IoUringResult cancel(void* user_data) PURE;

/**
* Submits the entries in the submission queue to the kernel using the
* `io_uring_enter()` system call.
Expand Down
11 changes: 11 additions & 0 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data) {
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::cancel(void* user_data) {
struct io_uring_sync_cancel_reg reg {};
reg.addr = reinterpret_cast<uint64_t>(user_data);
int res = io_uring_register_sync_cancel(&ring_, &reg);
if (res < 0 && res != -ENOENT) {
return IoUringResult::Failed;
}

return IoUringResult::Ok;
}

IoUringResult IoUringImpl::submit() {
int res = io_uring_submit(&ring_);
RELEASE_ASSERT(res >= 0 || res == -EBUSY, "unable to submit io_uring queue entries");
Expand Down
1 change: 1 addition & 0 deletions source/common/io/io_uring_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject {
IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) override;
IoUringResult prepareClose(os_fd_t fd, void* user_data) override;
IoUringResult cancel(void* user_data) override;
IoUringResult submit() override;

private:
Expand Down
34 changes: 22 additions & 12 deletions source/common/network/io_uring_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,23 @@ IoUringSocketHandleImpl::~IoUringSocketHandleImpl() {

Api::IoCallUint64Result IoUringSocketHandleImpl::close() {
ASSERT(SOCKET_VALID(fd_));
auto& uring = io_uring_factory_.get().ref();
if (read_req_) {
auto res = uring.cancel(read_req_);
RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to cancel");
}
read_req_ = nullptr;

auto req = new Request{absl::nullopt, RequestType::Close};
Io::IoUringResult res = io_uring_factory_.get().ref().prepareClose(fd_, req);
auto res = uring.prepareClose(fd_, req);
if (res == Io::IoUringResult::Failed) {
// Fall back to posix system call.
::close(fd_);
}
uring.submit();
if (isLeader()) {
if (io_uring_factory_.get().ref().isEventfdRegistered()) {
io_uring_factory_.get().ref().unregisterEventfd();
if (uring.isEventfdRegistered()) {
uring.unregisterEventfd();
}
file_event_adapter_.reset();
}
Expand All @@ -73,7 +81,7 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::readv(uint64_t /* max_length */
num_bytes_to_read += slice_length;
}
ASSERT(num_bytes_to_read <= static_cast<uint64_t>(bytes_to_read_));
is_read_added_ = false;
read_req_ = nullptr;

uint64_t len = bytes_to_read_;
bytes_to_read_ = 0;
Expand Down Expand Up @@ -107,7 +115,7 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::read(Buffer::Instance& buffer,
delete this_fragment;
});
buffer.addBufferFragment(*fragment);
is_read_added_ = false;
read_req_ = nullptr;

uint64_t len = bytes_to_read_;
bytes_to_read_ = 0;
Expand Down Expand Up @@ -316,22 +324,21 @@ void IoUringSocketHandleImpl::resetFileEvents() { file_event_adapter_.reset(); }
Api::SysCallIntResult IoUringSocketHandleImpl::shutdown(int how) { return Api::OsSysCallsSingleton::get().shutdown(fd_, how); }

void IoUringSocketHandleImpl::addReadRequest() {
if (!is_read_enabled_ || !SOCKET_VALID(fd_) || is_read_added_) {
if (!is_read_enabled_ || !SOCKET_VALID(fd_) || read_req_) {
return;
}

ASSERT(read_buf_ == nullptr);
is_read_added_ = true; // don't add READ if it's been already added.
read_buf_ = std::unique_ptr<uint8_t[]>(new uint8_t[read_buffer_size_]);
iov_.iov_base = read_buf_.get();
iov_.iov_len = read_buffer_size_;
auto& uring = io_uring_factory_.get().ref();
auto req = new Request{*this, RequestType::Read};
auto res = uring.prepareReadv(fd_, &iov_, 1, 0, req);
read_req_ = new Request{*this, RequestType::Read};
auto res = uring.prepareReadv(fd_, &iov_, 1, 0, read_req_);
if (res == Io::IoUringResult::Failed) {
// TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
uring.submit();
res = uring.prepareReadv(fd_, &iov_, 1, 0, req);
res = uring.prepareReadv(fd_, &iov_, 1, 0, read_req_);
RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare readv");
}
}
Expand Down Expand Up @@ -496,13 +503,17 @@ void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Reques
break;
case RequestType::Read: {
ASSERT(req.iohandle_.has_value());
// Read is cancellable.
if (result == -ECANCELED) {
return;
}
auto& iohandle = req.iohandle_->get();
iohandle.bytes_to_read_ = result;
// This is hacky fix, we should check the req is valid or not.
if (iohandle.fd_ == -1) {
ENVOY_LOG_MISC(debug, "the uring's fd already closed");
break;
}
iohandle.bytes_to_read_ = result;

if (result == 0) {
iohandle.remote_closed_ = true;
Expand All @@ -522,7 +533,6 @@ void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Reques
ASSERT(req.iov_ != nullptr);
ASSERT(req.iohandle_.has_value());
auto& iohandle = req.iohandle_->get();

// This is hacky fix, we should check the req is valid or not.
if (iohandle.fd_ == -1) {
ENVOY_LOG_MISC(debug, "the uring's fd already closed");
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/io_uring_socket_handle_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class IoUringSocketHandleImpl final : public IoHandle, protected Logger::Loggabl
struct iovec iov_;
std::unique_ptr<uint8_t[]> read_buf_{nullptr};
int32_t bytes_to_read_{0};
bool is_read_added_{false};
Request* read_req_{nullptr};
bool is_read_enabled_{true};
std::list<Buffer::SliceDataPtr> write_buf_{};
uint32_t vecs_to_write_{0};
Expand Down