Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5b57402
Add dimitry's implement and move to the core
soulxu Jul 6, 2022
13760ab
pass the io uring factory
soulxu Jul 6, 2022
302a7f0
not set non-blocking flag for iouring
soulxu Jul 14, 2022
466f9fe
test socket interface
soulxu Aug 31, 2022
435d9ef
Not using IoUringSocketHandle when the current thread isn't registered
soulxu Sep 19, 2022
85effee
network: handle io_uring factory safely (#4)
zhxie Sep 20, 2022
a5edb42
iouring: change the iouring:getOrCreate to just get method
soulxu Sep 26, 2022
c7aa1f0
SocketInterface: only create IoUringHandle when the iouring factory
soulxu Sep 26, 2022
2ccb461
Only register Read event for eventfd
soulxu Sep 26, 2022
6c94104
network: write sequentially (#7)
zhxie Oct 11, 2022
38e970b
initialize bootstrap ext before add listener
soulxu Oct 11, 2022
0eb8777
skip eventfd register when it alreayd registered
soulxu Oct 11, 2022
a4fcd42
network: force starting read after write (#9)
zhxie Oct 17, 2022
7e08174
network: implement readv and writev (#8)
zhxie Oct 17, 2022
0eb7119
Ensure the write request submitted
soulxu Oct 17, 2022
2df8192
SKip invoking callback if the fd already closed when uring ready event
soulxu Oct 17, 2022
5181812
network: fix incorrect ptr assignment (#11)
zhxie Oct 20, 2022
43b6048
Using return zero instead of raise close event for halfclose
soulxu Oct 20, 2022
4e84781
Return error when result is negative
soulxu Oct 20, 2022
a37f079
hacky fix for invalid fd for a write completion
soulxu Oct 20, 2022
4d32b11
add simple shutdown implement
soulxu Oct 20, 2022
79cc8fb
network: cancel submitted read before close (#13)
zhxie Oct 27, 2022
9ceda00
network: delay write result (#14)
zhxie Oct 27, 2022
75bdfd6
network: fix (#15)
zhxie Oct 27, 2022
395fa4b
network: add read request when read or connect (#16)
zhxie Oct 31, 2022
b39c113
network: fix write result handling (#17)
zhxie Oct 31, 2022
eda48cd
network: support max read length (#18)
zhxie Nov 1, 2022
cc9fef1
network: activate read event after enabling (#19)
zhxie Nov 1, 2022
d919c92
network: ensure connect request will get submitted (#20)
zhxie Nov 10, 2022
2b5939e
[backport] network: stop adding read request on read (#23)
zhxie Dec 2, 2022
1cc8331
[backport] network: drain after writing (#24)
zhxie Dec 2, 2022
d2d3959
[backport] network: limit write slice size (#25)
zhxie Dec 2, 2022
2f925e8
[backport] network: move instead of copy (#26)
zhxie Dec 2, 2022
e677b6b
network: separate adapter from IO handle (#22)
zhxie Dec 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions source/common/io/io_uring.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "envoy/common/optref.h"
#include "envoy/common/pure.h"

#include "source/common/network/address_impl.h"
Expand All @@ -25,34 +26,29 @@ class IoUring {
virtual ~IoUring() = default;

/**
* Registers an eventfd file descriptor for the ring and returns it.
* Registers an event for the ring.
* It can be used for integration with event loops.
*/
virtual os_fd_t registerEventfd() PURE;
virtual void registerEvent(Event::Dispatcher& dispatcher, Event::FileTriggerType trigger) PURE;

/**
* Resets the eventfd file descriptor for the ring.
* Resets the event for the ring.
*/
virtual void unregisterEventfd() PURE;
virtual void unregisterEvent() PURE;

/**
* Returns true if an eventfd file descriptor is registered with the ring.
* Returns true if an event is registered with the ring.
*/
virtual bool isEventfdRegistered() const PURE;

/**
* Iterates over entries in the completion queue, calls the given callback for
* every entry and marks them consumed.
*/
virtual void forEveryCompletion(CompletionCb completion_cb) PURE;
virtual bool isEventRegistered() const PURE;

/**
* Prepares an accept system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
socklen_t* remote_addr_len, void* user_data) PURE;
socklen_t* remote_addr_len, void* user_data,
CompletionCb cb) PURE;

/**
* Prepares a connect system call and puts it into the submission queue.
Expand All @@ -61,30 +57,45 @@ class IoUring {
*/
virtual IoUringResult prepareConnect(os_fd_t fd,
const Network::Address::InstanceConstSharedPtr& address,
void* user_data) PURE;
void* user_data, CompletionCb cb) PURE;

/**
* Prepares a readv system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) PURE;
off_t offset, void* user_data, CompletionCb cb) PURE;

/**
* Prepares a writev system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) PURE;
off_t offset, void* user_data, CompletionCb cb) PURE;

/**
* Prepares a close system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareClose(os_fd_t fd, void* user_data) PURE;
virtual IoUringResult prepareClose(os_fd_t fd, void* user_data, CompletionCb cb) PURE;

/**
* Prepares a cancellation and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareCancel(void* cancelling_user_data, void* user_data,
CompletionCb cb) PURE;

/**
* Prepares a nop and puts it into the cubmission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareNop(void* user_data, CompletionCb cb) PURE;

/**
* Submits the entries in the submission queue to the kernel using the
Expand All @@ -95,6 +106,17 @@ class IoUring {
* with the forEveryCompletion() method and try again.
*/
virtual IoUringResult submit() PURE;

/**
* Submits the entries in the submission queue to the kernel using the
* `io_uring_enter()` system call. The submission may be delayed if there is an incoming
* submission.
* Returns IoUringResult::Ok in case of success and may return
* IoUringResult::Busy if we over commit the number of requests. In the latter
* case the application should drain the completion queue by handling some completions
* with the forEveryCompletion() method and try again.
*/
virtual IoUringResult trySubmit() PURE;
};

/**
Expand All @@ -105,16 +127,17 @@ class IoUringFactory {
virtual ~IoUringFactory() = default;

/**
* Returns an instance of IoUring and creates it if needed for the current
* thread.
* Returns an instance of IoUring for the current thread.
*/
virtual IoUring& getOrCreate() const PURE;
virtual OptRef<IoUring> get() const PURE;

/**
* Initializes a factory upon server readiness. For example this method can be
* used to set TLS.
*/
virtual void onServerInitialized() PURE;

virtual bool currentThreadRegistered() PURE;
};

} // namespace Io
Expand Down
159 changes: 123 additions & 36 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,27 @@ IoUringFactoryImpl::IoUringFactoryImpl(uint32_t io_uring_size, bool use_submissi
: io_uring_size_(io_uring_size), use_submission_queue_polling_(use_submission_queue_polling),
tls_(tls) {}

IoUring& IoUringFactoryImpl::getOrCreate() const {
return const_cast<IoUringImpl&>(tls_.get().ref());
OptRef<IoUring> IoUringFactoryImpl::get() const {
auto ret = tls_.get();
if (ret == absl::nullopt) {
return absl::nullopt;
}
return ret.ref();
}

void IoUringFactoryImpl::onServerInitialized() {
tls_.set([io_uring_size = io_uring_size_,
use_submission_queue_polling = use_submission_queue_polling_](Event::Dispatcher&) {
return std::make_shared<IoUringImpl>(io_uring_size, use_submission_queue_polling);
use_submission_queue_polling =
use_submission_queue_polling_](Event::Dispatcher& dispatcher) {
std::shared_ptr<IoUringImpl> io_uring =
std::make_shared<IoUringImpl>(io_uring_size, use_submission_queue_polling);
io_uring->registerEvent(dispatcher, Event::PlatformDefaultTriggerType);
return io_uring;
});
}

bool IoUringFactoryImpl::currentThreadRegistered() { return tls_.currentThreadRegistered(); }

IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling)
: io_uring_size_(io_uring_size), cqes_(io_uring_size_, nullptr) {
struct io_uring_params p {};
Expand All @@ -43,97 +53,140 @@ IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polli
RELEASE_ASSERT(ret == 0, fmt::format("unable to initialize io_uring: {}", errorDetails(-ret)));
}

IoUringImpl::~IoUringImpl() { io_uring_queue_exit(&ring_); }
IoUringImpl::~IoUringImpl() {
if (isEventRegistered()) {
unregisterEvent();
}
io_uring_queue_exit(&ring_);
}

os_fd_t IoUringImpl::registerEventfd() {
ASSERT(!isEventfdRegistered());
void IoUringImpl::registerEvent(Event::Dispatcher& dispatcher, Event::FileTriggerType trigger) {
ASSERT(!isEventRegistered());
event_fd_ = eventfd(0, 0);
int res = io_uring_register_eventfd(&ring_, event_fd_);
RELEASE_ASSERT(res == 0, fmt::format("unable to register eventfd: {}", errorDetails(-res)));
return event_fd_;
file_event_ = dispatcher.createFileEvent(
event_fd_, [this](uint32_t) { onFileEvent(); }, trigger, Event::FileReadyType::Read);
}

void IoUringImpl::unregisterEventfd() {
void IoUringImpl::unregisterEvent() {
int res = io_uring_unregister_eventfd(&ring_);
RELEASE_ASSERT(res == 0, fmt::format("unable to unregister eventfd: {}", errorDetails(-res)));
file_event_.reset();
SET_SOCKET_INVALID(event_fd_);
}

bool IoUringImpl::isEventfdRegistered() const { return SOCKET_VALID(event_fd_); }

void IoUringImpl::forEveryCompletion(CompletionCb completion_cb) {
ASSERT(SOCKET_VALID(event_fd_));

eventfd_t v;
int ret = eventfd_read(event_fd_, &v);
RELEASE_ASSERT(ret == 0, "unable to drain eventfd");

unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_);

for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe* cqe = cqes_[i];
completion_cb(reinterpret_cast<void*>(cqe->user_data), cqe->res);
}
io_uring_cq_advance(&ring_, count);
}
bool IoUringImpl::isEventRegistered() const { return SOCKET_VALID(event_fd_); }

IoUringResult IoUringImpl::prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
socklen_t* remote_addr_len, void* user_data) {
socklen_t* remote_addr_len, void* user_data,
CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_accept(sqe, fd, remote_addr, remote_addr_len, 0);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareConnect(os_fd_t fd,
const Network::Address::InstanceConstSharedPtr& address,
void* user_data) {
void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_connect(sqe, fd, address->sockAddr(), address->sockAddrLen());
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) {
off_t offset, void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) {
off_t offset, void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data) {
IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_close(sqe, fd);
io_uring_sqe_set_data(sqe, user_data);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareCancel(void* cancelling_user_data, void* user_data,
CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_cancel(sqe, cancelling_user_data, 0);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareNop(void* user_data, CompletionCb cb) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_nop(sqe);
if (user_data) {
io_uring_sqe_set_data(sqe, user_data);
}

cbs_[reinterpret_cast<uint64_t>(user_data)] = cb;
return IoUringResult::Ok;
}

Expand All @@ -143,5 +196,39 @@ IoUringResult IoUringImpl::submit() {
return res == -EBUSY ? IoUringResult::Busy : IoUringResult::Ok;
}

IoUringResult IoUringImpl::trySubmit() {
if (delay_submit_) {
return IoUringResult::Ok;
}

return submit();
}

void IoUringImpl::onFileEvent() {
ASSERT(SOCKET_VALID(event_fd_));
delay_submit_ = true;

eventfd_t v;
int ret = eventfd_read(event_fd_, &v);
RELEASE_ASSERT(ret == 0, "unable to drain eventfd");

unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_);

for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe* cqe = cqes_[i];
CompletionCb cb = cbs_[cqe->user_data];
if (cb == nullptr) {
ENVOY_LOG_MISC(warn, "ignore CQ without correspoding callback");
continue;
}

cb(reinterpret_cast<void*>(cqe->user_data), cqe->res);
cbs_.erase(cqe->user_data);
}
io_uring_cq_advance(&ring_, count);
submit();
delay_submit_ = false;
}

} // namespace Io
} // namespace Envoy
Loading