Skip to content

Commit

Permalink
Merge pull request #5593 from msimberg/any-sender-tag-dispatch
Browse files Browse the repository at this point in the history
Update any_sender to use tag_dispatch for execution customizations
  • Loading branch information
msimberg authored Oct 5, 2021
2 parents acfd9b3 + febd0f9 commit 92ab22e
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 41 deletions.
37 changes: 23 additions & 14 deletions libs/core/execution_base/include/hpx/execution_base/any_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,9 @@ namespace hpx::execution::experimental {
any_operation_state& operator=(any_operation_state&&) = delete;
any_operation_state& operator=(any_operation_state const&) = delete;

void start() & noexcept;
HPX_CORE_EXPORT friend void tag_dispatch(
hpx::execution::experimental::start_t,
any_operation_state& os) noexcept;
};

template <typename... Ts>
Expand Down Expand Up @@ -367,7 +369,7 @@ namespace hpx::execution::experimental {
return true;
}

HPX_NORETURN void set_value(Ts...) && override
void set_value(Ts...) && override
{
throw_bad_any_call("any_receiver", "set_value");
}
Expand Down Expand Up @@ -456,33 +458,36 @@ namespace hpx::execution::experimental {
any_receiver& operator=(any_receiver&&) = default;
any_receiver& operator=(any_receiver const&) = delete;

void set_value(Ts... ts) &&
friend void tag_dispatch(hpx::execution::experimental::set_value_t,
any_receiver&& r, Ts... ts)
{
// We first move the storage to a temporary variable so that
// this any_receiver is empty after this set_value. Doing
// std::move(storage.get()).set_value(...) would leave us with a
// non-empty any_receiver holding a moved-from receiver.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(r.storage);
std::move(moved_storage.get()).set_value(std::move(ts)...);
}

void set_error(std::exception_ptr ep) && noexcept
friend void tag_dispatch(hpx::execution::experimental::set_error_t,
any_receiver&& r, std::exception_ptr ep) noexcept
{
// We first move the storage to a temporary variable so that
// this any_receiver is empty after this set_error. Doing
// std::move(storage.get()).set_error(...) would leave us with a
// non-empty any_receiver holding a moved-from receiver.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(r.storage);
std::move(moved_storage.get()).set_error(std::move(ep));
}

void set_done() && noexcept
friend void tag_dispatch(hpx::execution::experimental::set_done_t,
any_receiver&& r) noexcept
{
// We first move the storage to a temporary variable so that
// this any_receiver is empty after this set_done. Doing
// std::move(storage.get()).set_done(...) would leave us with a
// non-empty any_receiver holding a moved-from receiver.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(r.storage);
std::move(moved_storage.get()).set_done();
}
};
Expand Down Expand Up @@ -684,13 +689,15 @@ namespace hpx::execution::experimental {
static constexpr bool sends_done = false;

template <typename R>
detail::any_operation_state connect(R&& r) &&
friend detail::any_operation_state tag_dispatch(
hpx::execution::experimental::connect_t, unique_any_sender&& s,
R&& r)
{
// We first move the storage to a temporary variable so that this
// any_sender is empty after this connect. Doing
// std::move(storage.get()).connect(...) would leave us with a
// non-empty any_sender holding a moved-from sender.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(s.storage);
return std::move(moved_storage.get())
.connect(detail::any_receiver<Ts...>{std::forward<R>(r)});
}
Expand Down Expand Up @@ -755,20 +762,22 @@ namespace hpx::execution::experimental {
static constexpr bool sends_done = false;

template <typename R>
detail::any_operation_state connect(R&& r) &
friend detail::any_operation_state tag_dispatch(
hpx::execution::experimental::connect_t, any_sender& s, R&& r)
{
return storage.get().connect(
return s.storage.get().connect(
detail::any_receiver<Ts...>{std::forward<R>(r)});
}

template <typename R>
detail::any_operation_state connect(R&& r) &&
friend detail::any_operation_state tag_dispatch(
hpx::execution::experimental::connect_t, any_sender&& s, R&& r)
{
// We first move the storage to a temporary variable so that this
// any_sender is empty after this connect. Doing
// std::move(storage.get()).connect(...) would leave us with a
// non-empty any_sender holding a moved-from sender.
auto moved_storage = std::move(storage);
auto moved_storage = std::move(s.storage);
return std::move(moved_storage.get())
.connect(detail::any_receiver<Ts...>{std::forward<R>(r)});
}
Expand Down
5 changes: 3 additions & 2 deletions libs/core/execution_base/src/any_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ namespace hpx::execution::experimental::detail {
return true;
}

void any_operation_state::start() & noexcept
void tag_dispatch(
hpx::execution::experimental::start_t, any_operation_state& os) noexcept
{
storage.get().start();
os.storage.get().start();
}

void throw_bad_any_call(char const* class_name, char const* function_name)
Expand Down
63 changes: 38 additions & 25 deletions libs/core/execution_base/tests/unit/any_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,22 @@ struct non_copyable_sender
std::decay_t<R> r;
std::tuple<std::decay_t<Ts>...> ts;

void start() & noexcept
friend void tag_dispatch(
hpx::execution::experimental::start_t, operation_state& os) noexcept
{
hpx::util::invoke_fused(
hpx::util::bind_front(
hpx::execution::experimental::set_value, std::move(r)),
std::move(ts));
hpx::execution::experimental::set_value, std::move(os.r)),
std::move(os.ts));
};
};

template <typename R>
operation_state<R> connect(R&& r) && noexcept
friend operation_state<R> tag_dispatch(
hpx::execution::experimental::connect_t, non_copyable_sender&& s,
R&& r) noexcept
{
return {std::forward<R>(r), std::move(ts)};
return {std::forward<R>(r), std::move(s.ts)};
}
};

Expand Down Expand Up @@ -123,25 +126,28 @@ struct sender
std::decay_t<R> r;
std::tuple<std::decay_t<Ts>...> ts;

void start() & noexcept
friend void tag_dispatch(
hpx::execution::experimental::start_t, operation_state& os) noexcept
{
hpx::util::invoke_fused(
hpx::util::bind_front(
hpx::execution::experimental::set_value, std::move(r)),
std::move(ts));
hpx::execution::experimental::set_value, std::move(os.r)),
std::move(os.ts));
};
};

template <typename R>
operation_state<R> connect(R&& r) && noexcept
friend operation_state<R> tag_dispatch(
hpx::execution::experimental::connect_t, sender&& s, R&& r)
{
return {std::forward<R>(r), std::move(ts)};
return {std::forward<R>(r), std::move(s.ts)};
}

template <typename R>
operation_state<R> connect(R&& r) & noexcept
friend operation_state<R> tag_dispatch(
hpx::execution::experimental::connect_t, sender& s, R&& r)
{
return {std::forward<R>(r), ts};
return {std::forward<R>(r), s.ts};
}
};

Expand Down Expand Up @@ -215,7 +221,8 @@ struct error_sender
struct operation_state
{
std::decay_t<R> r;
void start() noexcept
friend void tag_dispatch(
hpx::execution::experimental::start_t, operation_state& os) noexcept
{
try
{
Expand All @@ -224,13 +231,14 @@ struct error_sender
catch (...)
{
hpx::execution::experimental::set_error(
std::move(r), std::current_exception());
std::move(os.r), std::current_exception());
}
}
};

template <typename R>
operation_state<R> connect(R&& r)
friend operation_state<R> tag_dispatch(
hpx::execution::experimental::connect_t, error_sender, R&& r)
{
return {std::forward<R>(r)};
}
Expand All @@ -243,30 +251,33 @@ struct callback_receiver
std::atomic<bool>& set_value_called;

template <typename E>
void set_error(E&&) && noexcept
friend void tag_dispatch(hpx::execution::experimental::set_error_t,
callback_receiver&&, E&&) noexcept
{
HPX_TEST(false);
}

void set_done() && noexcept
friend void tag_dispatch(
hpx::execution::experimental::set_done_t, callback_receiver&&) noexcept
{
HPX_TEST(false);
};

template <typename... Ts>
auto set_value(Ts&&... ts) && noexcept
-> decltype(HPX_INVOKE(f, std::forward<Ts>(ts)...), void())
friend auto tag_dispatch(hpx::execution::experimental::set_value_t,
callback_receiver&& r, Ts&&... ts) noexcept
{
HPX_INVOKE(f, std::forward<Ts>(ts)...);
set_value_called = true;
HPX_INVOKE(std::move(r.f), std::forward<Ts>(ts)...);
r.set_value_called = true;
}
};

struct error_receiver
{
std::atomic<bool>& set_error_called;

void set_error(std::exception_ptr&& e) noexcept
friend void tag_dispatch(hpx::execution::experimental::set_error_t,
error_receiver&& r, std::exception_ptr&& e) noexcept
{
try
{
Expand All @@ -280,16 +291,18 @@ struct error_receiver
{
HPX_TEST(false);
}
set_error_called = true;
r.set_error_called = true;
}

void set_done() noexcept
friend void tag_dispatch(
hpx::execution::experimental::set_done_t, error_receiver&&) noexcept
{
HPX_TEST(false);
};

template <typename... Ts>
void set_value(Ts&&...) noexcept
friend void tag_dispatch(hpx::execution::experimental::set_value_t,
error_receiver&&, Ts&&...) noexcept
{
HPX_TEST(false);
}
Expand Down

0 comments on commit 92ab22e

Please sign in to comment.