Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update any_sender to use tag_dispatch for execution customizations #5593

Merged
merged 1 commit into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,9 @@ namespace hpx::execution::experimental {
any_operation_state& operator=(any_operation_state&&) = delete;
any_operation_state& operator=(any_operation_state const&) = delete;

void start() & noexcept;
HPX_CORE_EXPORT friend void tag_dispatch(
hpx::execution::experimental::start_t,
any_operation_state& os) noexcept;
};

template <typename... Ts>
Expand Down Expand Up @@ -367,7 +369,7 @@ namespace hpx::execution::experimental {
return true;
}

HPX_NORETURN void set_value(Ts...) && override
void set_value(Ts...) && override
{
throw_bad_any_call("any_receiver", "set_value");
}
Expand Down Expand Up @@ -456,33 +458,36 @@ namespace hpx::execution::experimental {
any_receiver& operator=(any_receiver&&) = default;
any_receiver& operator=(any_receiver const&) = delete;

void set_value(Ts... ts) &&
friend void tag_dispatch(hpx::execution::experimental::set_value_t,
any_receiver&& r, Ts... ts)
{
// We first move the storage to a temporary variable so that
// this any_receiver is empty after this set_value. Doing
// std::move(storage.get()).set_value(...) would leave us with a
// non-empty any_receiver holding a moved-from receiver.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(r.storage);
std::move(moved_storage.get()).set_value(std::move(ts)...);
}

void set_error(std::exception_ptr ep) && noexcept
friend void tag_dispatch(hpx::execution::experimental::set_error_t,
any_receiver&& r, std::exception_ptr ep) noexcept
{
// We first move the storage to a temporary variable so that
// this any_receiver is empty after this set_error. Doing
// std::move(storage.get()).set_error(...) would leave us with a
// non-empty any_receiver holding a moved-from receiver.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(r.storage);
std::move(moved_storage.get()).set_error(std::move(ep));
}

void set_done() && noexcept
friend void tag_dispatch(hpx::execution::experimental::set_done_t,
any_receiver&& r) noexcept
{
// We first move the storage to a temporary variable so that
// this any_receiver is empty after this set_done. Doing
// std::move(storage.get()).set_done(...) would leave us with a
// non-empty any_receiver holding a moved-from receiver.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(r.storage);
std::move(moved_storage.get()).set_done();
}
};
Expand Down Expand Up @@ -684,13 +689,15 @@ namespace hpx::execution::experimental {
static constexpr bool sends_done = false;

template <typename R>
detail::any_operation_state connect(R&& r) &&
friend detail::any_operation_state tag_dispatch(
hpx::execution::experimental::connect_t, unique_any_sender&& s,
R&& r)
{
// We first move the storage to a temporary variable so that this
// any_sender is empty after this connect. Doing
// std::move(storage.get()).connect(...) would leave us with a
// non-empty any_sender holding a moved-from sender.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(s.storage);
return std::move(moved_storage.get())
.connect(detail::any_receiver<Ts...>{std::forward<R>(r)});
}
Expand Down Expand Up @@ -755,20 +762,22 @@ namespace hpx::execution::experimental {
static constexpr bool sends_done = false;

template <typename R>
detail::any_operation_state connect(R&& r) &
friend detail::any_operation_state tag_dispatch(
hpx::execution::experimental::connect_t, any_sender& s, R&& r)
{
return storage.get().connect(
return s.storage.get().connect(
detail::any_receiver<Ts...>{std::forward<R>(r)});
}

template <typename R>
detail::any_operation_state connect(R&& r) &&
friend detail::any_operation_state tag_dispatch(
hpx::execution::experimental::connect_t, any_sender&& s, R&& r)
{
// We first move the storage to a temporary variable so that this
// any_sender is empty after this connect. Doing
// std::move(storage.get()).connect(...) would leave us with a
// non-empty any_sender holding a moved-from sender.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(s.storage);
return std::move(moved_storage.get())
.connect(detail::any_receiver<Ts...>{std::forward<R>(r)});
}
Expand Down
5 changes: 3 additions & 2 deletions libs/core/execution_base/src/any_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ namespace hpx::execution::experimental::detail {
return true;
}

void any_operation_state::start() & noexcept
void tag_dispatch(
hpx::execution::experimental::start_t, any_operation_state& os) noexcept
{
storage.get().start();
os.storage.get().start();
}

void throw_bad_any_call(char const* class_name, char const* function_name)
Expand Down
63 changes: 38 additions & 25 deletions libs/core/execution_base/tests/unit/any_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,22 @@ struct non_copyable_sender
std::decay_t<R> r;
std::tuple<std::decay_t<Ts>...> ts;

void start() & noexcept
friend void tag_dispatch(
hpx::execution::experimental::start_t, operation_state& os) noexcept
{
hpx::util::invoke_fused(
hpx::util::bind_front(
hpx::execution::experimental::set_value, std::move(r)),
std::move(ts));
hpx::execution::experimental::set_value, std::move(os.r)),
std::move(os.ts));
};
};

template <typename R>
operation_state<R> connect(R&& r) && noexcept
friend operation_state<R> tag_dispatch(
hpx::execution::experimental::connect_t, non_copyable_sender&& s,
R&& r) noexcept
{
return {std::forward<R>(r), std::move(ts)};
return {std::forward<R>(r), std::move(s.ts)};
}
};

Expand Down Expand Up @@ -123,25 +126,28 @@ struct sender
std::decay_t<R> r;
std::tuple<std::decay_t<Ts>...> ts;

void start() & noexcept
friend void tag_dispatch(
hpx::execution::experimental::start_t, operation_state& os) noexcept
{
hpx::util::invoke_fused(
hpx::util::bind_front(
hpx::execution::experimental::set_value, std::move(r)),
std::move(ts));
hpx::execution::experimental::set_value, std::move(os.r)),
std::move(os.ts));
};
};

template <typename R>
operation_state<R> connect(R&& r) && noexcept
friend operation_state<R> tag_dispatch(
hpx::execution::experimental::connect_t, sender&& s, R&& r)
{
return {std::forward<R>(r), std::move(ts)};
return {std::forward<R>(r), std::move(s.ts)};
}

template <typename R>
operation_state<R> connect(R&& r) & noexcept
friend operation_state<R> tag_dispatch(
hpx::execution::experimental::connect_t, sender& s, R&& r)
{
return {std::forward<R>(r), ts};
return {std::forward<R>(r), s.ts};
}
};

Expand Down Expand Up @@ -215,7 +221,8 @@ struct error_sender
struct operation_state
{
std::decay_t<R> r;
void start() noexcept
friend void tag_dispatch(
hpx::execution::experimental::start_t, operation_state& os) noexcept
{
try
{
Expand All @@ -224,13 +231,14 @@ struct error_sender
catch (...)
{
hpx::execution::experimental::set_error(
std::move(r), std::current_exception());
std::move(os.r), std::current_exception());
}
}
};

template <typename R>
operation_state<R> connect(R&& r)
friend operation_state<R> tag_dispatch(
hpx::execution::experimental::connect_t, error_sender, R&& r)
{
return {std::forward<R>(r)};
}
Expand All @@ -243,30 +251,33 @@ struct callback_receiver
std::atomic<bool>& set_value_called;

template <typename E>
void set_error(E&&) && noexcept
friend void tag_dispatch(hpx::execution::experimental::set_error_t,
callback_receiver&&, E&&) noexcept
{
HPX_TEST(false);
}

void set_done() && noexcept
friend void tag_dispatch(
hpx::execution::experimental::set_done_t, callback_receiver&&) noexcept
{
HPX_TEST(false);
};

template <typename... Ts>
auto set_value(Ts&&... ts) && noexcept
-> decltype(HPX_INVOKE(f, std::forward<Ts>(ts)...), void())
friend auto tag_dispatch(hpx::execution::experimental::set_value_t,
callback_receiver&& r, Ts&&... ts) noexcept
{
HPX_INVOKE(f, std::forward<Ts>(ts)...);
set_value_called = true;
HPX_INVOKE(std::move(r.f), std::forward<Ts>(ts)...);
r.set_value_called = true;
}
};

struct error_receiver
{
std::atomic<bool>& set_error_called;

void set_error(std::exception_ptr&& e) noexcept
friend void tag_dispatch(hpx::execution::experimental::set_error_t,
error_receiver&& r, std::exception_ptr&& e) noexcept
{
try
{
Expand All @@ -280,16 +291,18 @@ struct error_receiver
{
HPX_TEST(false);
}
set_error_called = true;
r.set_error_called = true;
}

void set_done() noexcept
friend void tag_dispatch(
hpx::execution::experimental::set_done_t, error_receiver&&) noexcept
{
HPX_TEST(false);
};

template <typename... Ts>
void set_value(Ts&&...) noexcept
friend void tag_dispatch(hpx::execution::experimental::set_value_t,
error_receiver&&, Ts&&...) noexcept
{
HPX_TEST(false);
}
Expand Down