Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bazel/foreign_cc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ licenses(["notice"]) # Apache 2

envoy_package()

configure_make(
name = "liburing",
configure_in_place = True,
lib_source = "@com_github_axboe_liburing//:all",
tags = ["skip_on_windows"],
)

# autotools packages are unusable on Windows as-is
# TODO: Consider our own gperftools.BUILD file as we do with many other packages
configure_make(
Expand Down
11 changes: 11 additions & 0 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def envoy_dependencies(skip_targets = []):
# The long repo names (`com_github_fmtlib_fmt` instead of `fmtlib`) are
# semi-standard in the Bazel community, intended to avoid both duplicate
# dependencies and name conflicts.
_com_github_axboe_liburing()
_com_github_c_ares_c_ares()
_com_github_circonus_labs_libcircllhist()
_com_github_cyan4973_xxhash()
Expand Down Expand Up @@ -269,6 +270,16 @@ def _com_github_circonus_labs_libcircllhist():
actual = "@com_github_circonus_labs_libcircllhist//:libcircllhist",
)

def _com_github_axboe_liburing():
external_http_archive(
name = "com_github_axboe_liburing",
build_file_content = BUILD_ALL_CONTENT,
)
native.bind(
name = "uring",
actual = "@envoy//bazel/foreign_cc:liburing",
)

def _com_github_c_ares_c_ares():
external_http_archive(
name = "com_github_c_ares_c_ares",
Expand Down
12 changes: 12 additions & 0 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ REPOSITORY_LOCATIONS_SPEC = dict(
release_date = "2021-06-03",
cpe = "N/A",
),
com_github_axboe_liburing = dict(
project_name = "liburing",
project_desc = "C helpers to set up and tear down io_uring instances",
project_url = "https://github.com/axboe/liburing",
version = "2.1",
sha256 = "f1e0500cb3934b0b61c5020c3999a973c9c93b618faff1eba75aadc95bb03e07",
strip_prefix = "liburing-liburing-{version}",
urls = ["https://github.com/axboe/liburing/archive/liburing-{version}.tar.gz"],
use_category = ["dataplane_core", "controlplane"],
release_date = "2021-09-09",
cpe = "N/A",
),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note #19082 (review) which seems to apply here as well.

# This dependency is built only when performance tracing is enabled with the
# option --define=perf_tracing=enabled. It's never built for releases.
com_github_google_perfetto = dict(
Expand Down
33 changes: 33 additions & 0 deletions source/common/io/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_package",
)

licenses(["notice"]) # Apache 2

envoy_package()

envoy_cc_library(
name = "io_uring_interface",
hdrs = [
"io_uring.h",
],
deps = [
"//source/common/network:address_lib",
],
)

envoy_cc_library(
name = "io_uring_impl_lib",
srcs = [
"io_uring_impl.cc",
],
hdrs = [
"io_uring_impl.h",
],
external_deps = ["uring"],
deps = [
":io_uring_interface",
],
)
121 changes: 121 additions & 0 deletions source/common/io/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#pragma once

#include "envoy/common/pure.h"

#include "source/common/network/address_impl.h"

namespace Envoy {
namespace Io {

/**
* Callback invoked when iterating over entries in the completion queue.
* @param user_data is any data attached to an entry submitted to the submission
* queue.
* @param result is a return code of submitted system call.
*/
using CompletionCb = std::function<void(void* user_data, int32_t result)>;

enum class IoUringResult { Ok, Busy, Failed };

/**
* Abstract wrapper around `io_uring`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the threading model assumed by IoUring? Which methods can be called from any thread and which can only be called from the IoRing thread?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io_uring docs don't state any requirement regarding thread-safety though it probably should. In case of Envoy every IoUring is coupled with one libevent loop (it doesn't run its own) which is rarely posted from threads other than the one running it. So, I assumed that making IoUring thread local and calling its methods only from the thread it lives in simplifies everything. But perhaps it makes more sense to extend Dispatcher to own IoUrings.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make sense for the dispatcher to own the io_urings eventually instead of providing a getOrCreate factory method. We can get to that as development continues.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From examples I believe that uring is thread safe. However conceptually there should only one thread emptying the queue. But adding more I/O operations (preparing) into the ring can be done from any thread.

*/
class IoUring {
public:
virtual ~IoUring() = default;

/**
* Registers an eventfd file descriptor for the ring and returns it.
* It can be used for integration with event loops.
*/
virtual os_fd_t registerEventfd() PURE;

/**
* Resets the eventfd file descriptor for the ring.
*/
virtual void unregisterEventfd() PURE;

/**
* Returns true if an eventfd file descriptor is registered with the ring.
*/
virtual bool isEventfdRegistered() const PURE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


/**
* Iterates over entries in the completion queue, calls the given callback for
* every entry and marks them consumed.
*/
virtual void forEveryCompletion(CompletionCb completion_cb) PURE;

/**
* Prepares an accept system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
socklen_t* remote_addr_len, void* user_data) PURE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of void* in the interface seems a bit too low level. What is the ultimate purpose of this interface? Implement an alternate dispatcher.h implementation? I wonder if use of an user_data interface would improve the usability of the resulting class as it would allow for the removal of the CompletionCb argument to forEveryCompletion

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Particularly this interface hides the io_uring syscalls and makes unit testing of the new iohandle (not in this PR) easier a bit. Originally I used a custom type containing the network iohandle instead of *void, but in case of Block IO a different type might be needed. Probably this *void in the interface could be avoided with a template.

I haven't thought of the possibility to implement an alternative Dispatcher, because its interface implies the file readiness model (i.e. Dispatcher::createFileEvent()), whereas io_uring is all about completeness of system operations. CompletionCb is an analog of FileReadyCb, but the former isn't called for a known file descriptor. A system operation and its context (including the arguments) are parts of a completeness event.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to use void* for now, we may revisit this later as our understanding of this module improves.

I'ld like to better understand how io_uring and dispatcher should interact. I could see some threads being exclusively one or the other. I'm not sure about how they would coexist in a thread while avoiding starvation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no problem of multiple io_urings coexisting in one dispatcher: all updates in their completion queues are reflected in their respective event notification file descriptors (man eventfd). So, different callback instances are called for different eventfds, only one registered callback per io_uring. The dispatcher is supposed to call these callbacks in the same manner as in the case of connection callbacks.

I have rewritten the tests to run the callbacks through Dispatcher::run().

Though I fail to come up with a simple enough API for Dispatcher to accommodate multiple io_urings if go this way. If we assume only one io_uring per Dispatcher then extending Dispatcher with something like void Dispatcher::registerCompletionEvent(CompletionCb cb) and IoUring& Dispatcher::getIoUring() would be enough, IoUring::registerEventfd() could be dropped, since all manipulations with eventfd would be incapsulated in Dispatcher.

For the case of multiple io_urings Dispatcher::getIoUring() and Dispatcher::registerCompletionEvent() should distinguish io_urings somehow. Maybe something like the following could work:

using IoUringId = uint32_t;

class DispatcherImpl {
public:
  ...
  IoUringId addIoUring(const IoUringFactory& factory);
  void registerCompletionEvent(IoUringId id, CompletionCb cb);
  IoUring& getIoUring(IoUringId id);
  ...

private:
  std::vector<IoUring> io_urings_;
};


/**
* Prepares a connect system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareConnect(os_fd_t fd,
const Network::Address::InstanceConstSharedPtr& address,
void* user_data) PURE;

/**
* Prepares a readv system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) PURE;

/**
* Prepares a writev system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) PURE;

/**
* Prepares a close system call and puts it into the submission queue.
* Returns IoUringResult::Failed in case the submission queue is full already
* and IoUringResult::Ok otherwise.
*/
virtual IoUringResult prepareClose(os_fd_t fd, void* user_data) PURE;

/**
* Submits the entries in the submission queue to the kernel using the
* `io_uring_enter()` system call.
* Returns IoUringResult::Ok in case of success and may return
* IoUringResult::Busy if we over commit the number of requests. In the latter
* case the application should drain the completion queue by handling some completions
* with the forEveryCompletion() method and try again.
*/
virtual IoUringResult submit() PURE;
};

/**
* Abstract factory for IoUring wrappers.
*/
class IoUringFactory {
public:
virtual ~IoUringFactory() = default;

/**
* Returns an instance of IoUring and creates it if needed for the current
* thread.
*/
virtual IoUring& getOrCreate() const PURE;

/**
* Initializes a factory upon server readiness. For example this method can be
* used to set TLS.
*/
virtual void onServerInitialized() PURE;
};

} // namespace Io
} // namespace Envoy
135 changes: 135 additions & 0 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#include "source/common/io/io_uring_impl.h"

#include <sys/eventfd.h>

namespace Envoy {
namespace Io {

IoUringFactoryImpl::IoUringFactoryImpl(uint32_t io_uring_size, bool use_submission_queue_polling,
ThreadLocal::SlotAllocator& tls)
: io_uring_size_(io_uring_size), use_submission_queue_polling_(use_submission_queue_polling),
tls_(tls) {}

IoUring& IoUringFactoryImpl::getOrCreate() const {
return const_cast<IoUringImpl&>(tls_.get().ref());
}

void IoUringFactoryImpl::onServerInitialized() {
tls_.set([io_uring_size = io_uring_size_,
use_submission_queue_polling = use_submission_queue_polling_](Event::Dispatcher&) {
return std::make_shared<IoUringImpl>(io_uring_size, use_submission_queue_polling);
});
}

IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling)
: io_uring_size_(io_uring_size), cqes_(io_uring_size_, nullptr) {
struct io_uring_params p {};
if (use_submission_queue_polling) {
p.flags |= IORING_SETUP_SQPOLL;
}
int ret = io_uring_queue_init_params(io_uring_size_, &ring_, &p);
RELEASE_ASSERT(ret == 0, fmt::format("unable to initialize io_uring: {}", errorDetails(-ret)));
}

IoUringImpl::~IoUringImpl() { io_uring_queue_exit(&ring_); }

os_fd_t IoUringImpl::registerEventfd() {
ASSERT(!isEventfdRegistered());
event_fd_ = eventfd(0, 0);
int res = io_uring_register_eventfd(&ring_, event_fd_);
RELEASE_ASSERT(res == 0, fmt::format("unable to register eventfd: {}", errorDetails(-res)));
return event_fd_;
}

void IoUringImpl::unregisterEventfd() {
int res = io_uring_unregister_eventfd(&ring_);
RELEASE_ASSERT(res == 0, fmt::format("unable to unregister eventfd: {}", errorDetails(-res)));
SET_SOCKET_INVALID(event_fd_);
}

bool IoUringImpl::isEventfdRegistered() const { return SOCKET_VALID(event_fd_); }

void IoUringImpl::forEveryCompletion(CompletionCb completion_cb) {
ASSERT(SOCKET_VALID(event_fd_));

eventfd_t v;
int ret = eventfd_read(event_fd_, &v);
RELEASE_ASSERT(ret == 0, "unable to drain eventfd");

unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_);

for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe* cqe = cqes_[i];
completion_cb(reinterpret_cast<void*>(cqe->user_data), cqe->res);
}
io_uring_cq_advance(&ring_, count);
}

IoUringResult IoUringImpl::prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
socklen_t* remote_addr_len, void* user_data) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_accept(sqe, fd, remote_addr, remote_addr_len, 0);
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareConnect(os_fd_t fd,
const Network::Address::InstanceConstSharedPtr& address,
void* user_data) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_connect(sqe, fd, address->sockAddr(), address->sockAddrLen());
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset);
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
off_t offset, void* user_data) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset);
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_close(sqe, fd);
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::submit() {
int res = io_uring_submit(&ring_);
RELEASE_ASSERT(res >= 0 || res == -EBUSY, "unable to submit io_uring queue entries");
return res == -EBUSY ? IoUringResult::Busy : IoUringResult::Ok;
}

} // namespace Io
} // namespace Envoy
Loading