diff --git a/bazel/foreign_cc/BUILD b/bazel/foreign_cc/BUILD index 582b7f21de268..bd74bf57c9385 100644 --- a/bazel/foreign_cc/BUILD +++ b/bazel/foreign_cc/BUILD @@ -6,6 +6,13 @@ licenses(["notice"]) # Apache 2 envoy_package() +configure_make( + name = "liburing", + configure_in_place = True, + lib_source = "@com_github_axboe_liburing//:all", + tags = ["skip_on_windows"], +) + # autotools packages are unusable on Windows as-is # TODO: Consider our own gperftools.BUILD file as we do with many other packages configure_make( diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index ae048e6006128..b463a499b4f10 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -156,6 +156,7 @@ def envoy_dependencies(skip_targets = []): # The long repo names (`com_github_fmtlib_fmt` instead of `fmtlib`) are # semi-standard in the Bazel community, intended to avoid both duplicate # dependencies and name conflicts. + _com_github_axboe_liburing() _com_github_c_ares_c_ares() _com_github_circonus_labs_libcircllhist() _com_github_cyan4973_xxhash() @@ -269,6 +270,16 @@ def _com_github_circonus_labs_libcircllhist(): actual = "@com_github_circonus_labs_libcircllhist//:libcircllhist", ) +def _com_github_axboe_liburing(): + external_http_archive( + name = "com_github_axboe_liburing", + build_file_content = BUILD_ALL_CONTENT, + ) + native.bind( + name = "uring", + actual = "@envoy//bazel/foreign_cc:liburing", + ) + def _com_github_c_ares_c_ares(): external_http_archive( name = "com_github_c_ares_c_ares", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index 9238ba45c5348..f272d3bfc3e11 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -112,6 +112,18 @@ REPOSITORY_LOCATIONS_SPEC = dict( release_date = "2021-06-03", cpe = "N/A", ), + com_github_axboe_liburing = dict( + project_name = "liburing", + project_desc = "C helpers to set up and tear down io_uring instances", + project_url = "https://github.com/axboe/liburing", + version = "2.1", + sha256 = "f1e0500cb3934b0b61c5020c3999a973c9c93b618faff1eba75aadc95bb03e07", + strip_prefix = "liburing-liburing-{version}", + urls = ["https://github.com/axboe/liburing/archive/liburing-{version}.tar.gz"], + use_category = ["dataplane_core", "controlplane"], + release_date = "2021-09-09", + cpe = "N/A", + ), # This dependency is built only when performance tracing is enabled with the # option --define=perf_tracing=enabled. It's never built for releases. com_github_google_perfetto = dict( diff --git a/source/common/io/BUILD b/source/common/io/BUILD new file mode 100644 index 0000000000000..4edca499043a4 --- /dev/null +++ b/source/common/io/BUILD @@ -0,0 +1,33 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_cc_library( + name = "io_uring_interface", + hdrs = [ + "io_uring.h", + ], + deps = [ + "//source/common/network:address_lib", + ], +) + +envoy_cc_library( + name = "io_uring_impl_lib", + srcs = [ + "io_uring_impl.cc", + ], + hdrs = [ + "io_uring_impl.h", + ], + external_deps = ["uring"], + deps = [ + ":io_uring_interface", + ], +) diff --git a/source/common/io/io_uring.h b/source/common/io/io_uring.h new file mode 100644 index 0000000000000..c664cd4630a03 --- /dev/null +++ b/source/common/io/io_uring.h @@ -0,0 +1,121 @@ +#pragma once + +#include "envoy/common/pure.h" + +#include "source/common/network/address_impl.h" + +namespace Envoy { +namespace Io { + +/** + * Callback invoked when iterating over entries in the completion queue. + * @param user_data is any data attached to an entry submitted to the submission + * queue. + * @param result is a return code of submitted system call. + */ +using CompletionCb = std::function; + +enum class IoUringResult { Ok, Busy, Failed }; + +/** + * Abstract wrapper around `io_uring`. + */ +class IoUring { +public: + virtual ~IoUring() = default; + + /** + * Registers an eventfd file descriptor for the ring and returns it. + * It can be used for integration with event loops. + */ + virtual os_fd_t registerEventfd() PURE; + + /** + * Resets the eventfd file descriptor for the ring. + */ + virtual void unregisterEventfd() PURE; + + /** + * Returns true if an eventfd file descriptor is registered with the ring. + */ + virtual bool isEventfdRegistered() const PURE; + + /** + * Iterates over entries in the completion queue, calls the given callback for + * every entry and marks them consumed. + */ + virtual void forEveryCompletion(CompletionCb completion_cb) PURE; + + /** + * Prepares an accept system call and puts it into the submission queue. + * Returns IoUringResult::Failed in case the submission queue is full already + * and IoUringResult::Ok otherwise. + */ + virtual IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr, + socklen_t* remote_addr_len, void* user_data) PURE; + + /** + * Prepares a connect system call and puts it into the submission queue. + * Returns IoUringResult::Failed in case the submission queue is full already + * and IoUringResult::Ok otherwise. + */ + virtual IoUringResult prepareConnect(os_fd_t fd, + const Network::Address::InstanceConstSharedPtr& address, + void* user_data) PURE; + + /** + * Prepares a readv system call and puts it into the submission queue. + * Returns IoUringResult::Failed in case the submission queue is full already + * and IoUringResult::Ok otherwise. + */ + virtual IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, + off_t offset, void* user_data) PURE; + + /** + * Prepares a writev system call and puts it into the submission queue. + * Returns IoUringResult::Failed in case the submission queue is full already + * and IoUringResult::Ok otherwise. + */ + virtual IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, + off_t offset, void* user_data) PURE; + + /** + * Prepares a close system call and puts it into the submission queue. + * Returns IoUringResult::Failed in case the submission queue is full already + * and IoUringResult::Ok otherwise. + */ + virtual IoUringResult prepareClose(os_fd_t fd, void* user_data) PURE; + + /** + * Submits the entries in the submission queue to the kernel using the + * `io_uring_enter()` system call. + * Returns IoUringResult::Ok in case of success and may return + * IoUringResult::Busy if we over commit the number of requests. In the latter + * case the application should drain the completion queue by handling some completions + * with the forEveryCompletion() method and try again. + */ + virtual IoUringResult submit() PURE; +}; + +/** + * Abstract factory for IoUring wrappers. + */ +class IoUringFactory { +public: + virtual ~IoUringFactory() = default; + + /** + * Returns an instance of IoUring and creates it if needed for the current + * thread. + */ + virtual IoUring& getOrCreate() const PURE; + + /** + * Initializes a factory upon server readiness. For example this method can be + * used to set TLS. + */ + virtual void onServerInitialized() PURE; +}; + +} // namespace Io +} // namespace Envoy diff --git a/source/common/io/io_uring_impl.cc b/source/common/io/io_uring_impl.cc new file mode 100644 index 0000000000000..071c68c74d64a --- /dev/null +++ b/source/common/io/io_uring_impl.cc @@ -0,0 +1,135 @@ +#include "source/common/io/io_uring_impl.h" + +#include + +namespace Envoy { +namespace Io { + +IoUringFactoryImpl::IoUringFactoryImpl(uint32_t io_uring_size, bool use_submission_queue_polling, + ThreadLocal::SlotAllocator& tls) + : io_uring_size_(io_uring_size), use_submission_queue_polling_(use_submission_queue_polling), + tls_(tls) {} + +IoUring& IoUringFactoryImpl::getOrCreate() const { + return const_cast(tls_.get().ref()); +} + +void IoUringFactoryImpl::onServerInitialized() { + tls_.set([io_uring_size = io_uring_size_, + use_submission_queue_polling = use_submission_queue_polling_](Event::Dispatcher&) { + return std::make_shared(io_uring_size, use_submission_queue_polling); + }); +} + +IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling) + : io_uring_size_(io_uring_size), cqes_(io_uring_size_, nullptr) { + struct io_uring_params p {}; + if (use_submission_queue_polling) { + p.flags |= IORING_SETUP_SQPOLL; + } + int ret = io_uring_queue_init_params(io_uring_size_, &ring_, &p); + RELEASE_ASSERT(ret == 0, fmt::format("unable to initialize io_uring: {}", errorDetails(-ret))); +} + +IoUringImpl::~IoUringImpl() { io_uring_queue_exit(&ring_); } + +os_fd_t IoUringImpl::registerEventfd() { + ASSERT(!isEventfdRegistered()); + event_fd_ = eventfd(0, 0); + int res = io_uring_register_eventfd(&ring_, event_fd_); + RELEASE_ASSERT(res == 0, fmt::format("unable to register eventfd: {}", errorDetails(-res))); + return event_fd_; +} + +void IoUringImpl::unregisterEventfd() { + int res = io_uring_unregister_eventfd(&ring_); + RELEASE_ASSERT(res == 0, fmt::format("unable to unregister eventfd: {}", errorDetails(-res))); + SET_SOCKET_INVALID(event_fd_); +} + +bool IoUringImpl::isEventfdRegistered() const { return SOCKET_VALID(event_fd_); } + +void IoUringImpl::forEveryCompletion(CompletionCb completion_cb) { + ASSERT(SOCKET_VALID(event_fd_)); + + eventfd_t v; + int ret = eventfd_read(event_fd_, &v); + RELEASE_ASSERT(ret == 0, "unable to drain eventfd"); + + unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), io_uring_size_); + + for (unsigned i = 0; i < count; ++i) { + struct io_uring_cqe* cqe = cqes_[i]; + completion_cb(reinterpret_cast(cqe->user_data), cqe->res); + } + io_uring_cq_advance(&ring_, count); +} + +IoUringResult IoUringImpl::prepareAccept(os_fd_t fd, struct sockaddr* remote_addr, + socklen_t* remote_addr_len, void* user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (sqe == nullptr) { + return IoUringResult::Failed; + } + + io_uring_prep_accept(sqe, fd, remote_addr, remote_addr_len, 0); + io_uring_sqe_set_data(sqe, user_data); + return IoUringResult::Ok; +} + +IoUringResult IoUringImpl::prepareConnect(os_fd_t fd, + const Network::Address::InstanceConstSharedPtr& address, + void* user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (sqe == nullptr) { + return IoUringResult::Failed; + } + + io_uring_prep_connect(sqe, fd, address->sockAddr(), address->sockAddrLen()); + io_uring_sqe_set_data(sqe, user_data); + return IoUringResult::Ok; +} + +IoUringResult IoUringImpl::prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, + off_t offset, void* user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (sqe == nullptr) { + return IoUringResult::Failed; + } + + io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset); + io_uring_sqe_set_data(sqe, user_data); + return IoUringResult::Ok; +} + +IoUringResult IoUringImpl::prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, + off_t offset, void* user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (sqe == nullptr) { + return IoUringResult::Failed; + } + + io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset); + io_uring_sqe_set_data(sqe, user_data); + return IoUringResult::Ok; +} + +IoUringResult IoUringImpl::prepareClose(os_fd_t fd, void* user_data) { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (sqe == nullptr) { + return IoUringResult::Failed; + } + + io_uring_prep_close(sqe, fd); + io_uring_sqe_set_data(sqe, user_data); + return IoUringResult::Ok; +} + +IoUringResult IoUringImpl::submit() { + int res = io_uring_submit(&ring_); + RELEASE_ASSERT(res >= 0 || res == -EBUSY, "unable to submit io_uring queue entries"); + return res == -EBUSY ? IoUringResult::Busy : IoUringResult::Ok; +} + +} // namespace Io +} // namespace Envoy diff --git a/source/common/io/io_uring_impl.h b/source/common/io/io_uring_impl.h new file mode 100644 index 0000000000000..44e2ecca125da --- /dev/null +++ b/source/common/io/io_uring_impl.h @@ -0,0 +1,55 @@ +#pragma once + +#include "envoy/thread_local/thread_local.h" + +#include "source/common/io/io_uring.h" + +#include "liburing.h" + +namespace Envoy { +namespace Io { + +class IoUringImpl : public IoUring, public ThreadLocal::ThreadLocalObject { +public: + IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling); + ~IoUringImpl() override; + + os_fd_t registerEventfd() override; + void unregisterEventfd() override; + bool isEventfdRegistered() const override; + void forEveryCompletion(CompletionCb completion_cb) override; + IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr, socklen_t* remote_addr_len, + void* user_data) override; + IoUringResult prepareConnect(os_fd_t fd, const Network::Address::InstanceConstSharedPtr& address, + void* user_data) override; + IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, off_t offset, + void* user_data) override; + IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs, + off_t offset, void* user_data) override; + IoUringResult prepareClose(os_fd_t fd, void* user_data) override; + IoUringResult submit() override; + +private: + const uint32_t io_uring_size_; + struct io_uring ring_; + std::vector cqes_; + os_fd_t event_fd_{INVALID_SOCKET}; +}; + +class IoUringFactoryImpl : public IoUringFactory { +public: + IoUringFactoryImpl(uint32_t io_uring_size, bool use_submission_queue_polling, + ThreadLocal::SlotAllocator& tls); + + // IoUringFactory + IoUring& getOrCreate() const override; + void onServerInitialized() override; + +private: + const uint32_t io_uring_size_{}; + const bool use_submission_queue_polling_{}; + ThreadLocal::TypedSlot tls_; +}; + +} // namespace Io +} // namespace Envoy diff --git a/source/exe/BUILD b/source/exe/BUILD index 0eb4078c164c3..994bc24231636 100644 --- a/source/exe/BUILD +++ b/source/exe/BUILD @@ -96,15 +96,20 @@ envoy_cc_library( envoy_cc_library( name = "envoy_common_with_core_extensions_lib", deps = [ - "//source/common/event:libevent_lib", - "//source/common/network:utility_lib", - "//source/common/stats:stats_lib", - "//source/common/stats:thread_local_store_lib", - "//source/server:drain_manager_lib", - "//source/server:options_lib", - "//source/server:server_lib", - "//source/server:listener_hooks_lib", - ] + envoy_all_core_extensions(), + "//source/common/event:libevent_lib", + "//source/common/network:utility_lib", + "//source/common/stats:stats_lib", + "//source/common/stats:thread_local_store_lib", + "//source/server:drain_manager_lib", + "//source/server:options_lib", + "//source/server:server_lib", + "//source/server:listener_hooks_lib", + ] + envoy_all_core_extensions() + + # TODO(rojkov): drop io_uring dependency when it's fully integrated. + select({ + "//bazel:linux": ["//source/common/io:io_uring_impl_lib"], + "//conditions:default": [], + }), ) envoy_cc_library( diff --git a/test/common/io/BUILD b/test/common/io/BUILD new file mode 100644 index 0000000000000..e877b555c656c --- /dev/null +++ b/test/common/io/BUILD @@ -0,0 +1,21 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_cc_test( + name = "io_uring_impl_test", + srcs = ["io_uring_impl_test.cc"], + tags = ["skip_on_windows"], + deps = [ + "//source/common/io:io_uring_impl_lib", + "//test/mocks/server:server_mocks", + "//test/test_common:environment_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/test/common/io/io_uring_impl_test.cc b/test/common/io/io_uring_impl_test.cc new file mode 100644 index 0000000000000..b34b75051a488 --- /dev/null +++ b/test/common/io/io_uring_impl_test.cc @@ -0,0 +1,242 @@ +#include "source/common/io/io_uring_impl.h" + +#include "test/mocks/server/mocks.h" +#include "test/test_common/environment.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace Io { +namespace { + +class IoUringBaseTest : public ::testing::Test { +public: + IoUringBaseTest() : api_(Api::createApiForTest()), factory_(2, false, context_.threadLocal()) { + factory_.onServerInitialized(); + } + + void TearDown() override { + auto& uring = factory_.getOrCreate(); + if (uring.isEventfdRegistered()) { + uring.unregisterEventfd(); + } + } + + Api::ApiPtr api_; + testing::NiceMock context_; + IoUringFactoryImpl factory_; +}; + +class IoUringImplParamTest + : public IoUringBaseTest, + public testing::WithParamInterface> {}; + +INSTANTIATE_TEST_SUITE_P(InvalidPrepareMethodParamsTest, IoUringImplParamTest, + testing::Values( + [](IoUring& uring, os_fd_t fd) -> IoUringResult { + return uring.prepareAccept(fd, nullptr, nullptr, nullptr); + }, + [](IoUring& uring, os_fd_t fd) -> IoUringResult { + auto address = + std::make_shared( + "test"); + return uring.prepareConnect(fd, address, nullptr); + }, + [](IoUring& uring, os_fd_t fd) -> IoUringResult { + return uring.prepareReadv(fd, nullptr, 0, 0, nullptr); + }, + [](IoUring& uring, os_fd_t fd) -> IoUringResult { + return uring.prepareWritev(fd, nullptr, 0, 0, nullptr); + }, + [](IoUring& uring, os_fd_t fd) -> IoUringResult { + return uring.prepareClose(fd, nullptr); + })); + +TEST_P(IoUringImplParamTest, InvalidParams) { + os_fd_t fd; + SET_SOCKET_INVALID(fd); + auto dispatcher = api_->allocateDispatcher("test_thread"); + + auto& uring = factory_.getOrCreate(); + + os_fd_t event_fd = uring.registerEventfd(); + const Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType; + int32_t completions_nr = 0; + auto file_event = dispatcher->createFileEvent( + event_fd, + [&uring, &completions_nr](uint32_t) { + uring.forEveryCompletion([&completions_nr](void*, int32_t res) { + EXPECT_TRUE(res < 0); + completions_nr++; + }); + }, + trigger, Event::FileReadyType::Read); + + auto prepare_method = GetParam(); + IoUringResult res = prepare_method(uring, fd); + EXPECT_EQ(res, IoUringResult::Ok); + res = prepare_method(uring, fd); + EXPECT_EQ(res, IoUringResult::Ok); + res = prepare_method(uring, fd); + EXPECT_EQ(res, IoUringResult::Failed); + res = uring.submit(); + EXPECT_EQ(res, IoUringResult::Ok); + res = uring.submit(); + EXPECT_EQ(res, IoUringResult::Ok); + + dispatcher->run(Event::Dispatcher::RunType::NonBlock); + EXPECT_EQ(completions_nr, 2); +} + +class IoUringImplTest : public IoUringBaseTest { +protected: + void SetUp() override { test_dir_ = TestEnvironment::temporaryDirectory(); } + + void TearDown() override { + TestEnvironment::removePath(test_dir_); + IoUringBaseTest::TearDown(); + } + + std::string test_dir_; +}; + +TEST_F(IoUringImplTest, Instantiate) { + auto& uring1 = factory_.getOrCreate(); + auto& uring2 = factory_.getOrCreate(); + EXPECT_EQ(&uring1, &uring2); +} + +TEST_F(IoUringImplTest, RegisterEventfd) { + auto& uring = factory_.getOrCreate(); + + EXPECT_FALSE(uring.isEventfdRegistered()); + uring.registerEventfd(); + EXPECT_TRUE(uring.isEventfdRegistered()); + uring.unregisterEventfd(); + EXPECT_FALSE(uring.isEventfdRegistered()); + EXPECT_DEATH(uring.unregisterEventfd(), "unable to unregister eventfd"); +} + +TEST_F(IoUringImplTest, PrepareReadvAllDataFitsOneChunk) { + std::string test_file = TestEnvironment::writeStringToFileForTest( + absl::StrCat(test_dir_, "prepare_readv"), "test text", true); + os_fd_t fd = open(test_file.c_str(), O_RDONLY); + ASSERT_TRUE(fd >= 0); + + auto dispatcher = api_->allocateDispatcher("test_thread"); + + uint8_t buffer[4096]{}; + struct iovec iov; + iov.iov_base = buffer; + iov.iov_len = 4096; + + auto& uring = factory_.getOrCreate(); + os_fd_t event_fd = uring.registerEventfd(); + + const Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType; + int32_t completions_nr = 0; + auto file_event = dispatcher->createFileEvent( + event_fd, + [&uring, &completions_nr, d = dispatcher.get()](uint32_t) { + uring.forEveryCompletion([&completions_nr](void*, int32_t res) { + completions_nr++; + EXPECT_EQ(res, strlen("test text")); + }); + d->exit(); + }, + trigger, Event::FileReadyType::Read); + + uring.prepareReadv(fd, &iov, 1, 0, nullptr); + EXPECT_STREQ(static_cast(iov.iov_base), ""); + uring.submit(); + + dispatcher->run(Event::Dispatcher::RunType::Block); + + // Check that the completion callback has been actually called. + EXPECT_EQ(completions_nr, 1); + // The file's content is in the read buffer now. + EXPECT_STREQ(static_cast(iov.iov_base), "test text"); +} + +TEST_F(IoUringImplTest, PrepareReadvQueueOverflow) { + std::string test_file = TestEnvironment::writeStringToFileForTest( + absl::StrCat(test_dir_, "prepare_readv"), "abcdefhg", true); + os_fd_t fd = open(test_file.c_str(), O_RDONLY); + ASSERT_TRUE(fd >= 0); + + auto dispatcher = api_->allocateDispatcher("test_thread"); + + uint8_t buffer1[2]{}; + struct iovec iov1; + iov1.iov_base = buffer1; + iov1.iov_len = 2; + uint8_t buffer2[2]{}; + struct iovec iov2; + iov2.iov_base = buffer2; + iov2.iov_len = 2; + uint8_t buffer3[2]{}; + struct iovec iov3; + iov3.iov_base = buffer3; + iov3.iov_len = 2; + + auto& uring = factory_.getOrCreate(); + + os_fd_t event_fd = uring.registerEventfd(); + const Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType; + int32_t completions_nr = 0; + auto file_event = dispatcher->createFileEvent( + event_fd, + [&uring, &completions_nr](uint32_t) { + uring.forEveryCompletion([&completions_nr](void* user_data, int32_t res) { + EXPECT_TRUE(user_data != nullptr); + EXPECT_EQ(res, 2); + completions_nr++; + // Note: generally events are not guaranteed to complete in the same order + // we submit them, but for this case of reading from a single file it's ok + // to expect the same order. + EXPECT_EQ(reinterpret_cast(user_data), completions_nr); + }); + }, + trigger, Event::FileReadyType::Read); + + IoUringResult res = uring.prepareReadv(fd, &iov1, 1, 0, reinterpret_cast(1)); + EXPECT_EQ(res, IoUringResult::Ok); + res = uring.prepareReadv(fd, &iov2, 1, 2, reinterpret_cast(2)); + EXPECT_EQ(res, IoUringResult::Ok); + res = uring.prepareReadv(fd, &iov3, 1, 4, reinterpret_cast(3)); + // Expect the submission queue overflow. + EXPECT_EQ(res, IoUringResult::Failed); + res = uring.submit(); + EXPECT_EQ(res, IoUringResult::Ok); + + // Even though we haven't been notified about ops completion the buffers + // are filled already. + EXPECT_EQ(static_cast(iov1.iov_base)[0], 'a'); + EXPECT_EQ(static_cast(iov1.iov_base)[1], 'b'); + EXPECT_EQ(static_cast(iov2.iov_base)[0], 'c'); + EXPECT_EQ(static_cast(iov2.iov_base)[1], 'd'); + + dispatcher->run(Event::Dispatcher::RunType::NonBlock); + + // Only 2 completions are expected because the completion queue can contain + // no more than 2 entries. + EXPECT_EQ(completions_nr, 2); + + // Check a new event gets handled in the next dispatcher run. + res = uring.prepareReadv(fd, &iov3, 1, 4, reinterpret_cast(3)); + EXPECT_EQ(res, IoUringResult::Ok); + res = uring.submit(); + EXPECT_EQ(res, IoUringResult::Ok); + + EXPECT_EQ(static_cast(iov3.iov_base)[0], 'e'); + EXPECT_EQ(static_cast(iov3.iov_base)[1], 'f'); + + dispatcher->run(Event::Dispatcher::RunType::NonBlock); + // Check the completion callback was called actually. + EXPECT_EQ(completions_nr, 3); +} + +} // namespace +} // namespace Io +} // namespace Envoy