Skip to content

Commit

Permalink
add io_uring_context::open_listening_socket
Browse files Browse the repository at this point in the history
* opens a socket on a specified port
* exposes a _Stream_ of `async_read_write_file` to handle accept(s)
  • Loading branch information
janondrusek committed May 25, 2023
1 parent 6d87373 commit ccdfe4a
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 4 deletions.
2 changes: 0 additions & 2 deletions include/unifex/file_concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include <unifex/tag_invoke.hpp>

#include <unifex/io_concepts.hpp>

#include <unifex/filesystem.hpp>

#include <unifex/detail/prologue.hpp>
Expand Down
1 change: 1 addition & 0 deletions include/unifex/linux/io_epoll_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unifex/detail/atomic_intrusive_queue.hpp>
#include <unifex/detail/intrusive_heap.hpp>
#include <unifex/detail/intrusive_queue.hpp>
#include <unifex/io_concepts.hpp>
#include <unifex/pipe_concepts.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
Expand Down
186 changes: 186 additions & 0 deletions include/unifex/linux/io_uring_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
#include <unifex/detail/atomic_intrusive_queue.hpp>
#include <unifex/detail/intrusive_heap.hpp>
#include <unifex/detail/intrusive_queue.hpp>
#include <unifex/defer.hpp>
#include <unifex/file_concepts.hpp>
#include <unifex/filesystem.hpp>
#include <unifex/io_concepts.hpp>
#include <unifex/just_done.hpp>
#include <unifex/let_value_with.hpp>
#include <unifex/socket_concepts.hpp>
#include <unifex/get_stop_token.hpp>
#include <unifex/manual_lifetime.hpp>
#include <unifex/receiver_concepts.hpp>
Expand All @@ -43,7 +48,10 @@

#include UNIFEX_LIBURING_HEADER

#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>

#include <unifex/detail/prologue.hpp>

Expand All @@ -62,6 +70,8 @@ class io_uring_context {
class async_read_write_file;
class async_write_only_file;
class scheduler;
class accept_sender;
class accept_stream;

io_uring_context();

Expand Down Expand Up @@ -925,6 +935,10 @@ class io_uring_context::scheduler {
tag_t<open_file_write_only>,
scheduler s,
const filesystem::path& path);
friend accept_stream tag_invoke(
tag_t<open_listening_socket>,
scheduler s,
port_t port);

friend bool operator==(scheduler a, scheduler b) noexcept {
return a.context_ == b.context_;
Expand All @@ -942,6 +956,178 @@ inline io_uring_context::scheduler io_uring_context::get_scheduler() noexcept {
return scheduler{*this};
}

class io_uring_context::accept_sender {
using offset_t = std::int64_t;

template <typename Receiver>
class operation : private completion_base {
friend io_uring_context;

public:
template <typename Receiver2>
explicit operation(const accept_sender& sender, Receiver2&& r) noexcept(
std::is_nothrow_constructible_v<Receiver, Receiver2>)
: context_(sender.context_)
, fd_(sender.fd_)
, receiver_((Receiver2 &&) r) {}

operation(operation&&) = delete;

void start() noexcept {
if (!context_.is_running_on_io_thread()) {
this->execute_ = &operation::on_schedule_complete;
context_.schedule_remote(this);
} else {
start_io();
}
}

private:
static void on_schedule_complete(operation_base* op) noexcept {
static_cast<operation*>(op)->start_io();
}

void start_io() noexcept {
UNIFEX_ASSERT(context_.is_running_on_io_thread());

auto populateSqe = [this](io_uring_sqe& sqe) noexcept {
sqe.opcode = IORING_OP_ACCEPT;
sqe.accept_flags = SOCK_NONBLOCK;
sqe.fd = fd_;

sqe.user_data = reinterpret_cast<std::uintptr_t>(
static_cast<completion_base*>(this));

this->execute_ = &operation::on_accept;
};

if (!context_.try_submit_io(populateSqe)) {
this->execute_ = &operation::on_schedule_complete;
context_.schedule_pending_io(this);
}
}

static void on_accept(operation_base* op) noexcept {
auto& self = *static_cast<operation*>(op);
if (self.result_ >= 0) {
if constexpr (noexcept(unifex::set_value(
std::move(self.receiver_), async_read_write_file{self.context_, self.result_}))) {
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
} else {
UNIFEX_TRY {
unifex::set_value(std::move(self.receiver_), async_read_write_file{self.context_, self.result_});
}
UNIFEX_CATCH(...) {
unifex::set_error(
std::move(self.receiver_), std::current_exception());
}
}
} else if (self.result_ == -ECANCELED) {
unifex::set_done(std::move(self.receiver_));
} else {
unifex::set_error(
std::move(self.receiver_),
std::error_code{-self.result_, std::system_category()});
}
}

io_uring_context& context_;
int fd_;
Receiver receiver_;
};

public:
// Produces an open read-write file corresponding to the accepted connection.
template <
template <typename...>
class Variant,
template <typename...>
class Tuple>
using value_types = Variant<Tuple<async_read_write_file>>;

// Note: Only case it might complete with exception_ptr is if the
// receiver's set_value() exits with an exception.
template <template <typename...> class Variant>
using error_types = Variant<std::error_code, std::exception_ptr>;

static constexpr bool sends_done = true;
static constexpr blocking_kind blocking = blocking_kind::never;
// always completes on the io_uring context
static constexpr bool is_always_scheduler_affine = false;

explicit accept_sender(io_uring_context& context, int fd) noexcept
: context_(context)
, fd_(fd) {}

template <typename Receiver>
operation<remove_cvref_t<Receiver>> connect(Receiver&& r) && {
return operation<remove_cvref_t<Receiver>>{*this, (Receiver &&) r};
}

private:
io_uring_context& context_;
int fd_;
};

class io_uring_context::accept_stream {
public:
using offset_t = std::int64_t;

explicit accept_stream(io_uring_context& context, port_t port) noexcept
: context_(context)
, port_(port) {}

auto next() noexcept {
return let_value_with(
[this]() noexcept {
// TODO move to operation.start_io()
if (!fd_.valid()) {
open_socket();
}
return fd_.get();
},
[this](auto fd) noexcept { return accept_sender{context_, fd}; });
}

auto cleanup() noexcept {
return defer([this]() noexcept {
if (fd_.valid()) {
fd_.close();
}
return just_done();
});
}

private:
friend scheduler;

io_uring_context& context_;
port_t port_;
safe_file_descriptor fd_;

// TODO should this run on the io_context? If so, why?
void open_socket() noexcept {
// both IPv4 and IPv6
sockaddr_in6 addr;
fd_ = safe_file_descriptor{socket(
AF_INET6, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP)};
std::int32_t val = 1;
[[maybe_unused]] int ret;
ret = setsockopt(fd_.get(), SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val));
UNIFEX_ASSERT(ret != -1);
ret = setsockopt(fd_.get(), SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
UNIFEX_ASSERT(ret != -1);

addr.sin6_family = AF_INET6;
addr.sin6_port = htons(port_);
addr.sin6_addr = in6addr_any;
ret = bind(fd_.get(), (const struct sockaddr*)&addr, sizeof(addr));
UNIFEX_ASSERT(ret != -1);
ret = listen(fd_.get(), 128);
UNIFEX_ASSERT(ret != -1);
}
};

} // namespace linuxos
} // namespace unifex

Expand Down
2 changes: 0 additions & 2 deletions include/unifex/pipe_concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include <unifex/tag_invoke.hpp>

#include <unifex/io_concepts.hpp>

#include <unifex/detail/prologue.hpp>

namespace unifex {
Expand Down
40 changes: 40 additions & 0 deletions include/unifex/socket_concepts.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <unifex/tag_invoke.hpp>

#include <unifex/detail/prologue.hpp>

namespace unifex {
namespace _socket {
using port_t = std::uint16_t;

inline constexpr struct open_listening_socket_cpo final {
template <typename Scheduler>
constexpr auto operator()(Scheduler&& sched, port_t port) const noexcept(
is_nothrow_tag_invocable_v<open_listening_socket_cpo, Scheduler, port_t>)
-> tag_invoke_result_t<open_listening_socket_cpo, Scheduler, port_t> {
return tag_invoke(*this, static_cast<Scheduler&&>(sched), port);
}
} open_listening_socket{};
} // namespace _socket

using _socket::open_listening_socket;
using _socket::port_t;
} // namespace unifex

#include <unifex/detail/epilogue.hpp>
6 changes: 6 additions & 0 deletions source/linux/io_uring_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,12 @@ io_uring_context::async_read_write_file tag_invoke(
return io_uring_context::async_read_write_file{*scheduler.context_, result};
}

io_uring_context::accept_stream tag_invoke(
tag_t<open_listening_socket>,
io_uring_context::scheduler scheduler,
port_t port) {
return io_uring_context::accept_stream{*scheduler.context_, port};
}
} // namespace unifex::linuxos

#endif // UNIFEX_NO_LIBURING

0 comments on commit ccdfe4a

Please sign in to comment.