Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions source/common/io/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ class IoUring {
*/
virtual IoUringResult prepareClose(os_fd_t fd, void* user_data) PURE;

/**
* Prepares a cancellation and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareCancel(void* cancelling_user_data, void* user_data) PURE;

/**
* Submits the entries in the submission queue to the kernel using the
* `io_uring_enter()` system call.
Expand Down
11 changes: 11 additions & 0 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data) {
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareCancel(void* cancelling_user_data, void* user_data) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_cancel(sqe, cancelling_user_data, 0);
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::submit() {
int res = io_uring_submit(&ring_);
RELEASE_ASSERT(res >= 0 || res == -EBUSY, "unable to submit io_uring queue entries");
Expand Down
1 change: 1 addition & 0 deletions source/common/io/io_uring_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject {
IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) override;
IoUringResult prepareClose(os_fd_t fd, void* user_data) override;
IoUringResult prepareCancel(void* cancelling_user_data, void* user_data) override;
IoUringResult submit() override;

private:
Expand Down
38 changes: 28 additions & 10 deletions source/common/network/io_uring_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,28 @@ IoUringSocketHandleImpl::~IoUringSocketHandleImpl() {

Api::IoCallUint64Result IoUringSocketHandleImpl::close() {
ASSERT(SOCKET_VALID(fd_));
auto& uring = io_uring_factory_.get().ref();
if (read_req_) {
auto req = new Request{*this, RequestType::Cancel};
auto res = uring.prepareCancel(read_req_, req);
if (res == Io::IoUringResult::Failed) {
// TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
uring.submit();
res = uring.prepareCancel(read_req_, req);
RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare cancel");
}
}

auto req = new Request{absl::nullopt, RequestType::Close};
Io::IoUringResult res = io_uring_factory_.get().ref().prepareClose(fd_, req);
auto res = uring.prepareClose(fd_, req);
if (res == Io::IoUringResult::Failed) {
// Fall back to posix system call.
::close(fd_);
}
uring.submit();
if (isLeader()) {
if (io_uring_factory_.get().ref().isEventfdRegistered()) {
io_uring_factory_.get().ref().unregisterEventfd();
if (uring.isEventfdRegistered()) {
uring.unregisterEventfd();
}
file_event_adapter_.reset();
}
Expand All @@ -73,7 +86,7 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::readv(uint64_t /* max_length */
num_bytes_to_read += slice_length;
}
ASSERT(num_bytes_to_read <= static_cast<uint64_t>(bytes_to_read_));
is_read_added_ = false;
read_req_ = nullptr;

uint64_t len = bytes_to_read_;
bytes_to_read_ = 0;
Expand Down Expand Up @@ -107,7 +120,7 @@ Api::IoCallUint64Result IoUringSocketHandleImpl::read(Buffer::Instance& buffer,
delete this_fragment;
});
buffer.addBufferFragment(*fragment);
is_read_added_ = false;
read_req_ = nullptr;

uint64_t len = bytes_to_read_;
bytes_to_read_ = 0;
Expand Down Expand Up @@ -316,22 +329,21 @@ void IoUringSocketHandleImpl::resetFileEvents() { file_event_adapter_.reset(); }
Api::SysCallIntResult IoUringSocketHandleImpl::shutdown(int how) { return Api::OsSysCallsSingleton::get().shutdown(fd_, how); }

void IoUringSocketHandleImpl::addReadRequest() {
if (!is_read_enabled_ || !SOCKET_VALID(fd_) || is_read_added_) {
if (!is_read_enabled_ || !SOCKET_VALID(fd_) || read_req_) {
return;
}

ASSERT(read_buf_ == nullptr);
is_read_added_ = true; // don't add READ if it's been already added.
read_buf_ = std::unique_ptr<uint8_t[]>(new uint8_t[read_buffer_size_]);
iov_.iov_base = read_buf_.get();
iov_.iov_len = read_buffer_size_;
auto& uring = io_uring_factory_.get().ref();
auto req = new Request{*this, RequestType::Read};
auto res = uring.prepareReadv(fd_, &iov_, 1, 0, req);
read_req_ = new Request{*this, RequestType::Read};
auto res = uring.prepareReadv(fd_, &iov_, 1, 0, read_req_);
if (res == Io::IoUringResult::Failed) {
// TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
uring.submit();
res = uring.prepareReadv(fd_, &iov_, 1, 0, req);
res = uring.prepareReadv(fd_, &iov_, 1, 0, read_req_);
RELEASE_ASSERT(res == Io::IoUringResult::Ok, "unable to prepare readv");
}
}
Expand Down Expand Up @@ -496,6 +508,10 @@ void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Reques
break;
case RequestType::Read: {
ASSERT(req.iohandle_.has_value());
// Read is cancellable.
if (result == -ECANCELED) {
return;
}
auto& iohandle = req.iohandle_->get();
iohandle.bytes_to_read_ = result;
// This is hacky fix, we should check the req is valid or not.
Expand Down Expand Up @@ -539,6 +555,8 @@ void IoUringSocketHandleImpl::FileEventAdapter::onRequestCompletion(const Reques
}
case RequestType::Close:
break;
case RequestType::Cancel:
break;
default:
PANIC("not implemented");
}
Expand Down
4 changes: 2 additions & 2 deletions source/common/network/io_uring_socket_handle_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Network {

class IoUringSocketHandleImpl;

enum class RequestType { Accept, Connect, Read, Write, Close, Unknown };
enum class RequestType { Accept, Connect, Read, Write, Close, Cancel, Unknown };

using IoUringSocketHandleImplOptRef =
absl::optional<std::reference_wrapper<IoUringSocketHandleImpl>>;
Expand Down Expand Up @@ -126,7 +126,7 @@ class IoUringSocketHandleImpl final : public IoHandle, protected Logger::Loggabl
struct iovec iov_;
std::unique_ptr<uint8_t[]> read_buf_{nullptr};
int32_t bytes_to_read_{0};
bool is_read_added_{false};
Request* read_req_{nullptr};
bool is_read_enabled_{true};
std::list<Buffer::SliceDataPtr> write_buf_{};
uint32_t vecs_to_write_{0};
Expand Down