Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add io_uring_context::open_listening_socket #531

Merged
merged 1 commit into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions include/unifex/file_concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include <unifex/tag_invoke.hpp>

#include <unifex/io_concepts.hpp>

#include <unifex/filesystem.hpp>

#include <unifex/detail/prologue.hpp>
Expand Down
1 change: 1 addition & 0 deletions include/unifex/linux/io_epoll_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unifex/detail/atomic_intrusive_queue.hpp>
#include <unifex/detail/intrusive_heap.hpp>
#include <unifex/detail/intrusive_queue.hpp>
#include <unifex/io_concepts.hpp>
#include <unifex/pipe_concepts.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
Expand Down
186 changes: 186 additions & 0 deletions include/unifex/linux/io_uring_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
#include <unifex/detail/atomic_intrusive_queue.hpp>
#include <unifex/detail/intrusive_heap.hpp>
#include <unifex/detail/intrusive_queue.hpp>
#include <unifex/defer.hpp>
#include <unifex/file_concepts.hpp>
#include <unifex/filesystem.hpp>
#include <unifex/io_concepts.hpp>
#include <unifex/just_done.hpp>
#include <unifex/let_value_with.hpp>
#include <unifex/socket_concepts.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/receiver_concepts.hpp>
Expand All @@ -43,7 +48,10 @@

#include UNIFEX_LIBURING_HEADER

#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>

#include <unifex/detail/prologue.hpp>

Expand All @@ -62,6 +70,8 @@ class io_uring_context {
class async_read_write_file;
class async_write_only_file;
class scheduler;
class accept_sender;
class accept_stream;

io_uring_context();

Expand Down Expand Up @@ -925,6 +935,10 @@ class io_uring_context::scheduler {
tag_t<open_file_write_only>,
scheduler s,
const filesystem::path& path);
friend accept_stream tag_invoke(
tag_t<open_listening_socket>,
scheduler s,
port_t port);

friend bool operator==(scheduler a, scheduler b) noexcept {
return a.context_ == b.context_;
Expand All @@ -942,6 +956,178 @@ inline io_uring_context::scheduler io_uring_context::get_scheduler() noexcept {
return scheduler{*this};
}

class io_uring_context::accept_sender {
using offset_t = std::int64_t;

template <typename Receiver>
class operation : private completion_base {
janondrusek marked this conversation as resolved.
Show resolved Hide resolved
friend io_uring_context;

public:
template <typename Receiver2>
explicit operation(const accept_sender& sender, Receiver2&& r) noexcept(
std::is_nothrow_constructible_v<Receiver, Receiver2>)
: context_(sender.context_)
, fd_(sender.fd_)
, receiver_((Receiver2 &&) r) {}

janondrusek marked this conversation as resolved.
Show resolved Hide resolved
operation(operation&&) = delete;

void start() noexcept {
if (!context_.is_running_on_io_thread()) {
this->execute_ = &operation::on_schedule_complete;
context_.schedule_remote(this);
} else {
start_io();
}
}
janondrusek marked this conversation as resolved.
Show resolved Hide resolved

private:
static void on_schedule_complete(operation_base* op) noexcept {
static_cast<operation*>(op)->start_io();
}

void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());

auto populateSqe = [this](io_uring_sqe& sqe) noexcept {
sqe.opcode = IORING_OP_ACCEPT;
sqe.accept_flags = SOCK_NONBLOCK;
sqe.fd = fd_;

sqe.user_data = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(this));

this->execute_ = &operation::on_accept;
};

if (!context_.try_submit_io(populateSqe)) {
this->execute_ = &operation::on_schedule_complete;
context_.schedule_pending_io(this);
}
}

static void on_accept(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(op);
if (self.result_ >= 0) {
if constexpr (noexcept(unifex::set_value(
std::move(self.receiver_), async_read_write_file{self.context_, self.result_}))) {
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
}
UNIFEX_CATCH(...) {
unifex::set_error(
std::move(self.receiver_), std::current_exception());
}
}
} else if (self.result_ == -ECANCELED) {
unifex::set_done(std::move(self.receiver_));
} else {
unifex::set_error(
std::move(self.receiver_),
std::error_code{-self.result_, std::system_category()});
}
}

io_uring_context& context_;
int fd_;
Receiver receiver_;
};

public:
// Produces an open read-write file corresponding to the accepted connection.
template <
template <typename...>
class Variant,
template <typename...>
class Tuple>
using value_types = Variant<Tuple<async_read_write_file>>;

// Note: Only case it might complete with exception_ptr is if the
// receiver's set_value() exits with an exception.
template <template <typename...> class Variant>
using error_types = Variant<std::error_code, std::exception_ptr>;

static constexpr bool sends_done = true;
static constexpr blocking_kind blocking = blocking_kind::never;
// always completes on the io_uring context
static constexpr bool is_always_scheduler_affine = false;

janondrusek marked this conversation as resolved.
Show resolved Hide resolved
explicit accept_sender(io_uring_context& context, int fd) noexcept
: context_(context)
, fd_(fd) {}

template <typename Receiver>
operation<remove_cvref_t<Receiver>> connect(Receiver&& r) && {
return operation<remove_cvref_t<Receiver>>{*this, (Receiver &&) r};
}
janondrusek marked this conversation as resolved.
Show resolved Hide resolved

private:
io_uring_context& context_;
int fd_;
};

class io_uring_context::accept_stream {
public:
using offset_t = std::int64_t;

explicit accept_stream(io_uring_context& context, port_t port) noexcept
: context_(context)
, port_(port) {}

auto next() noexcept {
return let_value_with(
[this]() noexcept {
// TODO move to operation.start_io()
if (!fd_.valid()) {
open_socket();
}
return fd_.get();
},
[this](auto fd) noexcept { return accept_sender{context_, fd}; });
}

auto cleanup() noexcept {
return defer([this]() noexcept {
if (fd_.valid()) {
fd_.close();
}
return just_done();
});
}

private:
friend scheduler;

io_uring_context& context_;
port_t port_;
safe_file_descriptor fd_;

// TODO should this run on the io_context? If so, why?
void open_socket() noexcept {
// both IPv4 and IPv6
sockaddr_in6 addr;
fd_ = safe_file_descriptor{socket(
AF_INET6, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP)};
std::int32_t val = 1;
[[maybe_unused]] int ret;
ret = setsockopt(fd_.get(), SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
UNIFEX_ASSERT(ret != -1);
ret = setsockopt(fd_.get(), SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
UNIFEX_ASSERT(ret != -1);

addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port_);
addr.sin6_addr = in6addr_any;
ret = bind(fd_.get(), (const struct sockaddr*)&addr, sizeof(addr));
UNIFEX_ASSERT(ret != -1);
ret = listen(fd_.get(), 128);
UNIFEX_ASSERT(ret != -1);
}
};

} // namespace linuxos
} // namespace unifex

Expand Down
2 changes: 0 additions & 2 deletions include/unifex/pipe_concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include <unifex/tag_invoke.hpp>

#include <unifex/io_concepts.hpp>

#include <unifex/detail/prologue.hpp>

namespace unifex {
Expand Down
40 changes: 40 additions & 0 deletions include/unifex/socket_concepts.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <unifex/tag_invoke.hpp>

#include <unifex/detail/prologue.hpp>

namespace unifex {
namespace _socket {
using port_t = std::uint16_t;
janondrusek marked this conversation as resolved.
Show resolved Hide resolved

inline constexpr struct open_listening_socket_cpo final {
template <typename Scheduler>
constexpr auto operator()(Scheduler&& sched, port_t port) const noexcept(
is_nothrow_tag_invocable_v<open_listening_socket_cpo, Scheduler, port_t>)
-> tag_invoke_result_t<open_listening_socket_cpo, Scheduler, port_t> {
return tag_invoke(*this, static_cast<Scheduler&&>(sched), port);
}
} open_listening_socket{};
} // namespace _socket

using _socket::open_listening_socket;
using _socket::port_t;
} // namespace unifex

#include <unifex/detail/epilogue.hpp>
6 changes: 6 additions & 0 deletions source/linux/io_uring_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,12 @@ io_uring_context::async_read_write_file tag_invoke(
return io_uring_context::async_read_write_file{*scheduler.context_, result};
}

io_uring_context::accept_stream tag_invoke(
tag_t<open_listening_socket>,
io_uring_context::scheduler scheduler,
port_t port) {
return io_uring_context::accept_stream{*scheduler.context_, port};
}
} // namespace unifex::linuxos

#endif // UNIFEX_NO_LIBURING