Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions source/common/io/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,29 @@ class IoUring {
virtual ~IoUring() = default;

/**
* Registers an eventfd file descriptor for the ring and returns it.
* Registers an event for the ring.
* It can be used for integration with event loops.
*/
virtual os_fd_t registerEventfd() PURE;
virtual void registerEvent(Event::Dispatcher& dispatcher, Event::FileTriggerType trigger) PURE;

/**
* Resets the eventfd file descriptor for the ring.
* Resets the event for the ring.
*/
virtual void unregisterEventfd() PURE;
virtual void unregisterEvent() PURE;

/**
* Returns true if an eventfd file descriptor is registered with the ring.
* Returns true if an event is registered with the ring.
*/
virtual bool isEventfdRegistered() const PURE;

/**
* Iterates over entries in the completion queue, calls the given callback for
* every entry and marks them consumed.
*/
virtual void forEveryCompletion(CompletionCb completion_cb) PURE;
virtual bool isEventRegistered() const PURE;

/**
* Prepares an accept system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
socklen_t* remote_addr_len, void* user_data) PURE;
socklen_t* remote_addr_len, void* user_data,
CompletionCb cb) PURE;

/**
* Prepares a connect system call and puts it into the submission queue.
Expand All @@ -62,37 +57,45 @@ class IoUring {
*/
virtual IoUringResult prepareConnect(os_fd_t fd,
const Network::Address::InstanceConstSharedPtr& address,
void* user_data) PURE;
void* user_data, CompletionCb cb) PURE;

/**
* Prepares a readv system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) PURE;
off_t offset, void* user_data, CompletionCb cb) PURE;

/**
* Prepares a writev system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) PURE;
off_t offset, void* user_data, CompletionCb cb) PURE;

/**
* Prepares a close system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareClose(os_fd_t fd, void* user_data) PURE;
virtual IoUringResult prepareClose(os_fd_t fd, void* user_data, CompletionCb cb) PURE;

/**
* Prepares a cancellation and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareCancel(void* cancelling_user_data, void* user_data) PURE;
virtual IoUringResult prepareCancel(void* cancelling_user_data, void* user_data,
CompletionCb cb) PURE;

/**
* Prepares a nop and puts it into the cubmission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareNop(void* user_data, CompletionCb cb) PURE;

/**
* Submits the entries in the submission queue to the kernel using the
Expand All @@ -103,6 +106,17 @@ class IoUring {
* with the forEveryCompletion() method and try again.
*/
virtual IoUringResult submit() PURE;

/**
* Submits the entries in the submission queue to the kernel using the
* `io_uring_enter()` system call. The submission may be delayed if there is an incoming
* submission.
* Returns IoUringResult::Ok in case of success and may return
* IoUringResult::Busy if we over commit the number of requests. In the latter
* case the application should drain the completion queue by handling some completions
* with the forEveryCompletion() method and try again.
*/
virtual IoUringResult trySubmit() PURE;
};

/**
Expand Down
142 changes: 106 additions & 36 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ OptRef<IoUring> IoUringFactoryImpl::get() const {

void IoUringFactoryImpl::onServerInitialized() {
tls_.set([io_uring_size = io_uring_size_,
use_submission_queue_polling = use_submission_queue_polling_](Event::Dispatcher&) {
return std::make_shared<IoUringImpl>(io_uring_size, use_submission_queue_polling);
use_submission_queue_polling =
use_submission_queue_polling_](Event::Dispatcher& dispatcher) {
std::shared_ptr<IoUringImpl> io_uring =
std::make_shared<IoUringImpl>(io_uring_size, use_submission_queue_polling);
io_uring->registerEvent(dispatcher, Event::PlatformDefaultTriggerType);
return io_uring;
});
}

Expand All @@ -49,108 +53,140 @@ IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polli
RELEASE_ASSERT(ret == 0, fmt::format("unable to initialize io_uring: {}", errorDetails(-ret)));
}

IoUringImpl::~IoUringImpl() { io_uring_queue_exit(&ring_); }
IoUringImpl::~IoUringImpl() {
if (isEventRegistered()) {
unregisterEvent();
}
io_uring_queue_exit(&ring_);
}

os_fd_t IoUringImpl::registerEventfd() {
ASSERT(!isEventfdRegistered());
void IoUringImpl::registerEvent(Event::Dispatcher& dispatcher, Event::FileTriggerType trigger) {
ASSERT(!isEventRegistered());
event_fd_ = eventfd(0, 0);
int res = io_uring_register_eventfd(&ring_, event_fd_);
RELEASE_ASSERT(res == 0, fmt::format("unable to register eventfd: {}", errorDetails(-res)));
return event_fd_;
file_event_ = dispatcher.createFileEvent(
event_fd_, [this](uint32_t) { onFileEvent(); }, trigger, Event::FileReadyType::Read);
}

void IoUringImpl::unregisterEventfd() {
void IoUringImpl::unregisterEvent() {
int res = io_uring_unregister_eventfd(&ring_);
RELEASE_ASSERT(res == 0, fmt::format("unable to unregister eventfd: {}", errorDetails(-res)));
file_event_.reset();
SET_SOCKET_INVALID(event_fd_);
}

bool IoUringImpl::isEventfdRegistered() const { return SOCKET_VALID(event_fd_); }

void IoUringImpl::forEveryCompletion(CompletionCb completion_cb) {
ASSERT(SOCKET_VALID(event_fd_));

eventfd_t v;
int ret = eventfd_read(event_fd_, &v);
RELEASE_ASSERT(ret == 0, "unable to drain eventfd");

unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_);

for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe* cqe = cqes_[i];
completion_cb(reinterpret_cast<void*>(cqe->user_data), cqe->res);
}
io_uring_cq_advance(&ring_, count);
}
bool IoUringImpl::isEventRegistered() const { return SOCKET_VALID(event_fd_); }

IoUringResult IoUringImpl::prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
socklen_t* remote_addr_len, void* user_data) {
socklen_t* remote_addr_len, void* user_data,
CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_accept(sqe, fd, remote_addr, remote_addr_len, 0);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareConnect(os_fd_t fd,
const Network::Address::InstanceConstSharedPtr& address,
void* user_data) {
void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_connect(sqe, fd, address->sockAddr(), address->sockAddrLen());
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) {
off_t offset, void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) {
off_t offset, void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data) {
IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_close(sqe, fd);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareCancel(void* cancelling_user_data, void* user_data) {
IoUringResult IoUringImpl::prepareCancel(void* cancelling_user_data, void* user_data,
CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_cancel(sqe, cancelling_user_data, 0);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareNop(void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_nop(sqe);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

Expand All @@ -160,5 +196,39 @@ IoUringResult IoUringImpl::submit() {
return res == -EBUSY ? IoUringResult::Busy : IoUringResult::Ok;
}

IoUringResult IoUringImpl::trySubmit() {
if (delay_submit_) {
return IoUringResult::Ok;
}

return submit();
}

void IoUringImpl::onFileEvent() {
ASSERT(SOCKET_VALID(event_fd_));
delay_submit_ = true;

eventfd_t v;
int ret = eventfd_read(event_fd_, &v);
RELEASE_ASSERT(ret == 0, "unable to drain eventfd");

unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_);

for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe* cqe = cqes_[i];
CompletionCb cb = cbs_[cqe->user_data];
if (cb == nullptr) {
ENVOY_LOG_MISC(warn, "ignore CQ without correspoding callback");
continue;
}

cb(reinterpret_cast<void*>(cqe->user_data), cqe->res);
cbs_.erase(cqe->user_data);
}
io_uring_cq_advance(&ring_, count);
submit();
delay_submit_ = false;
}

} // namespace Io
} // namespace Envoy
27 changes: 17 additions & 10 deletions source/common/io/io_uring_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,34 @@ class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject {
IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling);
~IoUringImpl() override;

os_fd_t registerEventfd() override;
void unregisterEventfd() override;
bool isEventfdRegistered() const override;
void forEveryCompletion(CompletionCb completion_cb) override;
void registerEvent(Event::Dispatcher& dispatcher, Event::FileTriggerType trigger) override;
void unregisterEvent() override;
bool isEventRegistered() const override;
IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr, socklen_t* remote_addr_len,
void* user_data) override;
void* user_data, CompletionCb cb) override;
IoUringResult prepareConnect(os_fd_t fd, const Network::Address::InstanceConstSharedPtr& address,
void* user_data) override;
void* user_data, CompletionCb cb) override;
IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, off_t offset,
void* user_data) override;
void* user_data, CompletionCb cb) override;
IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) override;
IoUringResult prepareClose(os_fd_t fd, void* user_data) override;
IoUringResult prepareCancel(void* cancelling_user_data, void* user_data) override;
off_t offset, void* user_data, CompletionCb cb) override;
IoUringResult prepareClose(os_fd_t fd, void* user_data, CompletionCb cb) override;
IoUringResult prepareCancel(void* cancelling_user_data, void* user_data,
CompletionCb cb) override;
IoUringResult prepareNop(void* user_data, CompletionCb cb) override;
IoUringResult submit() override;
IoUringResult trySubmit() override;

private:
const uint32_t io_uring_size_;
struct io_uring ring_ {};
std::vector<struct io_uring_cqe*> cqes_;
os_fd_t event_fd_{INVALID_SOCKET};
Event::FileEventPtr file_event_{nullptr};
absl::flat_hash_map<uint64_t, CompletionCb> cbs_;
bool delay_submit_{false};

void onFileEvent();
};

class IoUringFactoryImpl : public IoUringFactory {
Expand Down
Loading