diff --git a/benchmarks/static_map/insert_or_apply_bench.cu b/benchmarks/static_map/insert_or_apply_bench.cu index a9ec10662..4633a8b0f 100644 --- a/benchmarks/static_map/insert_or_apply_bench.cu +++ b/benchmarks/static_map/insert_or_apply_bench.cu @@ -19,6 +19,7 @@ #include #include +#include #include @@ -60,7 +61,7 @@ std::enable_if_t<(sizeof(Key) == sizeof(Value)), void> static_map_insert_or_appl state.exec(nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) { timer.start(); map.insert_or_apply_async( - pairs.begin(), pairs.end(), cuco::op::reduce::sum, {launch.get_stream()}); + pairs.begin(), pairs.end(), cuco::reduce::plus{}, {launch.get_stream()}); timer.stop(); map.clear_async({launch.get_stream()}); }); diff --git a/include/cuco/detail/static_map/helpers.cuh b/include/cuco/detail/static_map/helpers.cuh new file mode 100644 index 000000000..04d19f842 --- /dev/null +++ b/include/cuco/detail/static_map/helpers.cuh @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace cuco::static_map_ns::detail { + +/** + * @brief Dispatches to shared memory map kernel if `num_elements_per_thread > 2`, else + * fallbacks to global memory map kernel. + * + * @tparam HasInit Boolean to dispatch based on init parameter + * @tparam CGSize Number of threads in each CG + * @tparam Allocator Allocator type used to created shared_memory map + * @tparam InputIt Device accessible input iterator whose `value_type` is + * convertible to the `value_type` of the data structure + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type used to peform `apply` operation. + * @tparam Ref Type of non-owning device ref allowing access to storage + * + * @param first Beginning of the sequence of input elements + * @param last End of the sequence of input elements + * @param init The init value of the `op` + * @param op Callable object to perform apply operation. + * @param ref Non-owning container device ref used to access the slot storage + * @param stream CUDA stream used for insert_or_apply operation + */ +template +void dispatch_insert_or_apply( + InputIt first, InputIt last, Init init, Op op, Ref ref, cuda::stream_ref stream) +{ + auto const num = cuco::detail::distance(first, last); + if (num == 0) { return; } + + int32_t const default_grid_size = cuco::detail::grid_size(num, CGSize); + + if constexpr (CGSize == 1) { + using shmem_size_type = int32_t; + + int32_t constexpr shmem_block_size = 1024; + shmem_size_type constexpr cardinality_threshold = shmem_block_size; + shmem_size_type constexpr shared_map_num_elements = cardinality_threshold + shmem_block_size; + float constexpr load_factor = 0.7; + shmem_size_type constexpr shared_map_size = + static_cast((1.0 / load_factor) * shared_map_num_elements); + + using extent_type = cuco::extent; + using shared_map_type = cuco::static_map>; + + using shared_map_ref_type = typename shared_map_type::ref_type<>; + auto constexpr window_extent = cuco::make_window_extent(extent_type{}); + + auto insert_or_apply_shmem_fn_ptr = insert_or_apply_shmem; + + int32_t const max_op_grid_size = + cuco::detail::max_occupancy_grid_size(shmem_block_size, insert_or_apply_shmem_fn_ptr); + + int32_t const shmem_default_grid_size = + cuco::detail::grid_size(num, CGSize, cuco::detail::default_stride(), shmem_block_size); + + auto const shmem_grid_size = std::min(shmem_default_grid_size, max_op_grid_size); + auto const num_elements_per_thread = num / (shmem_grid_size * shmem_block_size); + + // use shared_memory only if each thread has atleast 3 elements to process + if (num_elements_per_thread > 2) { + insert_or_apply_shmem + <<>>( + first, num, init, op, ref, window_extent); + } else { + insert_or_apply + <<>>( + first, num, init, op, ref); + } + } else { + insert_or_apply + <<>>( + first, num, init, op, ref); + } +} +} // namespace cuco::static_map_ns::detail \ No newline at end of file diff --git a/include/cuco/detail/static_map/kernels.cuh b/include/cuco/detail/static_map/kernels.cuh index cbf1cef06..c05d0b28b 100644 --- a/include/cuco/detail/static_map/kernels.cuh +++ b/include/cuco/detail/static_map/kernels.cuh @@ -77,47 +77,101 @@ CUCO_KERNEL __launch_bounds__(BlockSize) void insert_or_assign(InputIt first, * * @note Callable object to perform binary operation should be able to invoke as * Op(cuda::atomic_ref, T>) + * @note If `HasInit` is `true` and if `init == empty_sentinel_value`, we directly + * `apply` the `op` instead of atomic store and then waiting for the payload to get materalized. + * This has potential speedups when insert strategy is not `packed_cas`. * + * @tparam HasInit Boolean to dispatch based on init parameter * @tparam CGSize Number of threads in each CG * @tparam BlockSize Number of threads in each block * @tparam InputIt Device accessible input iterator whose `value_type` is * convertible to the `value_type` of the data structure + * @tparam Init Type of init value convertible to payload type * @tparam Op Callable type used to peform `apply` operation. * @tparam Ref Type of non-owning device ref allowing access to storage * * @param first Beginning of the sequence of input elements * @param n Number of input elements + * @param init The init value of the op * @param op Callable object to perform apply operation. * @param ref Non-owning container device ref used to access the slot storage */ -template -__global__ void insert_or_apply(InputIt first, cuco::detail::index_type n, Op op, Ref ref) +template +__global__ void insert_or_apply( + InputIt first, cuco::detail::index_type n, [[maybe_unused]] Init init, Op op, Ref ref) { auto const loop_stride = cuco::detail::grid_stride() / CGSize; auto idx = cuco::detail::global_thread_id() / CGSize; while (idx < n) { - typename std::iterator_traits::value_type const& insert_pair = *(first + idx); + using value_type = typename std::iterator_traits::value_type; + value_type const& insert_pair = *(first + idx); if constexpr (CGSize == 1) { - ref.insert_or_apply(insert_pair, op); + if constexpr (HasInit) { + ref.insert_or_apply(insert_pair, init, op); + } else { + ref.insert_or_apply(insert_pair, op); + } } else { auto const tile = cooperative_groups::tiled_partition(cooperative_groups::this_thread_block()); - ref.insert_or_apply(tile, insert_pair, op); + if constexpr (HasInit) { + ref.insert_or_apply(tile, insert_pair, init, op); + } else { + ref.insert_or_apply(tile, insert_pair, op); + } } idx += loop_stride; } } -template , T>) + * @note If `HasInit` is `true` and if `init == empty_sentinel_value`, we directly + * `apply` the `op` instead of atomic store and then waiting for the payload to get materalized. + * This has potential speedups when insert strategy is not `packed_cas`. + * + * @tparam HasInit Boolean to dispatch based on init parameter + * @tparam CGSize Number of threads in each CG + * @tparam BlockSize Number of threads in each block + * @tparam SharedMapRefType The Shared Memory Map Ref Type + * @tparam InputIt Device accessible input iterator whose `value_type` is + * convertible to the `value_type` of the data structure + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type used to peform `apply` operation. + * @tparam Ref Type of non-owning device ref allowing access to storage + * + * @param first Beginning of the sequence of input elements + * @param n Number of input elements + * @param init The init value of the op + * @param op Callable object to perform apply operation. + * @param ref Non-owning container device ref used to access the slot storage + * @param window_extent Window Extent used for shared memory map slot storage + */ +template CUCO_KERNEL __launch_bounds__(BlockSize) void insert_or_apply_shmem( InputIt first, cuco::detail::index_type n, + [[maybe_unused]] Init init, Op op, Ref ref, typename SharedMapRefType::extent_type window_extent) @@ -157,18 +211,22 @@ CUCO_KERNEL __launch_bounds__(BlockSize) void insert_or_apply_shmem( block.sync(); while ((idx - thread_idx / CGSize) < n) { - int32_t inserted = 0; - int32_t local_cardinality = 0; + int32_t inserted = 0; + int32_t warp_cardinality = 0; // insert-or-apply into the shared map first if (idx < n) { value_type const& insert_pair = *(first + idx); - inserted = shared_map_ref.insert_or_apply(insert_pair, op); + if constexpr (HasInit) { + inserted = shared_map_ref.insert_or_apply(insert_pair, init, op); + } else { + inserted = shared_map_ref.insert_or_apply(insert_pair, op); + } } if (idx - warp_thread_idx < n) { // all threads in warp particpate - local_cardinality = cg::reduce(warp, inserted, cg::plus()); + warp_cardinality = cg::reduce(warp, inserted, cg::plus()); } if (warp_thread_idx == 0) { - block_cardinality.fetch_add(local_cardinality, cuda::memory_order_relaxed); + block_cardinality.fetch_add(warp_cardinality, cuda::memory_order_relaxed); } block.sync(); if (block_cardinality > BlockSize) { break; } @@ -180,7 +238,11 @@ CUCO_KERNEL __launch_bounds__(BlockSize) void insert_or_apply_shmem( while (window_idx < num_windows) { auto const slot = storage[window_idx][0]; if (not cuco::detail::bitwise_compare(slot.first, ref.empty_key_sentinel())) { - ref.insert_or_apply(slot, op); + if constexpr (HasInit) { + ref.insert_or_apply(slot, init, op); + } else { + ref.insert_or_apply(slot, op); + } } window_idx += BlockSize; } @@ -191,10 +253,13 @@ CUCO_KERNEL __launch_bounds__(BlockSize) void insert_or_apply_shmem( idx += loop_stride; while (idx < n) { value_type const& insert_pair = *(first + idx); - ref.insert_or_apply(insert_pair, op); + if constexpr (HasInit) { + ref.insert_or_apply(insert_pair, init, op); + } else { + ref.insert_or_apply(insert_pair, op); + } idx += loop_stride; } } } - } // namespace cuco::static_map_ns::detail \ No newline at end of file diff --git a/include/cuco/detail/static_map/static_map.inl b/include/cuco/detail/static_map/static_map.inl index 8f7b58ab1..86b75507d 100644 --- a/include/cuco/detail/static_map/static_map.inl +++ b/include/cuco/detail/static_map/static_map.inl @@ -15,6 +15,7 @@ */ #include +#include #include #include #include @@ -304,6 +305,22 @@ void static_map +template +void static_map:: + insert_or_apply(InputIt first, InputIt last, Init init, Op op, cuda::stream_ref stream) +{ + this->insert_or_apply_async(first, last, init, op, stream); + stream.wait(); +} + template void static_map:: insert_or_apply_async(InputIt first, InputIt last, Op op, cuda::stream_ref stream) noexcept { - auto const num = cuco::detail::distance(first, last); - if (num == 0) { return; } + auto constexpr has_init = false; + auto const init = this->empty_value_sentinel(); // use empty_sentinel as unused init value + static_map_ns::detail::dispatch_insert_or_apply( + first, last, init, op, ref(op::insert_or_apply), stream); +} - using shmem_size_type = int32_t; - - int32_t const default_grid_size = cuco::detail::grid_size(num, cg_size); - - if constexpr (cg_size == 1) { - int32_t constexpr shmem_block_size = 1024; - shmem_size_type constexpr cardinality_threshold = shmem_block_size; - shmem_size_type constexpr shared_map_num_elements = cardinality_threshold + shmem_block_size; - float constexpr load_factor = 0.7; - shmem_size_type constexpr shared_map_size = - static_cast((1.0 / load_factor) * shared_map_num_elements); - - using extent_type = cuco::extent; - using shared_map_type = cuco::static_map>; - using shared_map_ref_type = typename shared_map_type::ref_type<>; - auto constexpr window_extent = cuco::make_window_extent(extent_type{}); - - using ref_type = decltype(ref(op::insert_or_apply)); - - auto insert_or_apply_shmem_fn_ptr = static_map_ns::detail:: - insert_or_apply_shmem; - - int32_t const max_op_grid_size = - cuco::detail::max_occupancy_grid_size(shmem_block_size, insert_or_apply_shmem_fn_ptr); - - int32_t const shmem_default_grid_size = - cuco::detail::grid_size(num, cg_size, cuco::detail::default_stride(), shmem_block_size); - - auto const shmem_grid_size = std::min(shmem_default_grid_size, max_op_grid_size); - auto const num_elements_per_thread = num / (shmem_grid_size * shmem_block_size); - - // use shared_memory only if each thread has atleast 3 elements to process - if (num_elements_per_thread > 2) { - static_map_ns::detail::insert_or_apply_shmem - <<>>( - first, num, op, ref(op::insert_or_apply), window_extent); - } else { - static_map_ns::detail::insert_or_apply - <<>>( - first, num, op, ref(op::insert_or_apply)); - } - } else { - static_map_ns::detail::insert_or_apply - <<>>( - first, num, op, ref(op::insert_or_apply)); - } +template +template +void static_map:: + insert_or_apply_async( + InputIt first, InputIt last, Init init, Op op, cuda::stream_ref stream) noexcept +{ + using shared_map_type = cuco::static_map>; + auto constexpr has_init = true; + static_map_ns::detail::dispatch_insert_or_apply( + first, last, init, op, ref(op::insert_or_apply), stream); } template #include #include #include #include +#include #include #include @@ -615,6 +617,190 @@ class operator_impl< cuda::std::is_invocable_v, T>, "insert_or_apply expects `Op` to be a callable as `Op(cuda::atomic_ref, T)`"); + auto& ref_ = static_cast(*this); + + // directly dispatch to implementation if no init element is given + auto constexpr use_direct_apply = false; + return ref_.insert_or_apply_impl(value, op); + } + + /** + * @brief Inserts a key-value pair `{k, v}` if it's not present in the map. Otherwise, applies + * `Op` binary function to the mapped_type corresponding to the key `k` and the value `v`. + * + * @tparam Value Input type which is implicitly convertible to 'value_type' + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type which is used as apply operation and can be + * called with arguments as Op(cuda::atomic_ref, T). Op strictly must + * have this signature to atomically apply the operation. + + * @param value The element to insert + * @param init The init value of the op + * @param op The callable object to perform binary operation between existing value at the slot + * and the element to insert. + * + * @return Returns `true` if the given `value` is inserted successfully. + */ + template >> + __device__ bool insert_or_apply(Value const& value, Init init, Op op) + { + static_assert(cg_size == 1, "Non-CG operation is incompatible with the current probing scheme"); + + static_assert( + cuda::std::is_invocable_v, T>, + "insert_or_apply expects `Op` to be a callable as `Op(cuda::atomic_ref, T)`"); + + auto& ref_ = static_cast(*this); + return ref_.dispatch_insert_or_apply(value, init, op); + } + + /** + * @brief Inserts a key-value pair `{k, v}` if it's not present in the map. Otherwise, applies + * `Op` binary function to the mapped_type corresponding to the key `k` and the value `v`. + * + * @tparam Value Input type which is implicitly convertible to 'value_type' + * @tparam Op Callable type which is used as apply operation and can be + * called with arguments as Op(cuda::atomic_ref, T). Op strictly must + * have this signature to atomically apply the operation. + * + * @param group The Cooperative Group used to perform group insert + * @param value The element to insert + * @param op The callable object to perform binary operation between existing value at the slot + * and the element to insert. + * + * @return Returns `true` if the given `value` is inserted successfully. + */ + + template + __device__ bool insert_or_apply(cooperative_groups::thread_block_tile const& group, + Value const& value, + Op op) + { + static_assert( + cuda::std::is_invocable_v, T>, + "insert_or_apply expects `Op` to be a callable as `Op(cuda::atomic_ref, T)`"); + auto& ref_ = static_cast(*this); + + auto constexpr use_direct_apply = false; + return ref_.insert_or_apply_impl(group, value, op); + } + + /** + * @brief Inserts a key-value pair `{k, v}` if it's not present in the map. Otherwise, applies + * `Op` binary function to the mapped_type corresponding to the key `k` and the value `v`. + * + * @tparam Value Input type which is implicitly convertible to 'value_type' + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type which is used as apply operation and can be + * called with arguments as Op(cuda::atomic_ref, T). Op strictly must + * have this signature to atomically apply the operation. + * + * @param group The Cooperative Group used to perform group insert + * @param value The element to insert + * @param init The init value of the op + * @param op The callable object to perform binary operation between existing value at the slot + * and the element to insert. + * + * @return Returns `true` if the given `value` is inserted successfully. + */ + template + __device__ bool insert_or_apply(cooperative_groups::thread_block_tile const& group, + Value const& value, + Init init, + Op op) + { + auto& ref_ = static_cast(*this); + static_assert( + cuda::std::is_invocable_v, T>, + "insert_or_apply expects `Op` to be a callable as `Op(cuda::atomic_ref, T)`"); + + return ref_.dispatch_insert_or_apply(group, value, init, op); + } + + private: + /** + * @brief dispatches `insert_or_apply_impl` based on condition `init == empty_value_sentinel`. + * + * @tparam Value Input type which is implicitly convertible to 'value_type' + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type which is used as apply operation and can be + * called with arguments as Op(cuda::atomic_ref, T). Op strictly must + * have this signature to atomically apply the operation. + * + * @param value The element to insert + * @param init The init value of the op + * @param op The callable object to perform binary operation between existing value at the slot + * and the element to insert. + * + * @return Returns `true` if the given `value` is inserted successfully. + */ + template + __device__ bool dispatch_insert_or_apply(Value const& value, Init init, Op op) + { + ref_type& ref_ = static_cast(*this); + // if init equals sentinel value, then we can just `apply` op instead of write + if (cuco::detail::bitwise_compare(init, ref_.empty_value_sentinel())) { + return ref_.insert_or_apply_impl(value, op); + } else { + return ref_.insert_or_apply_impl(value, op); + } + } + + /** + * @brief dispatches `insert_or_apply_impl` based on condition `init == empty_value_sentinel`. + * + * @tparam Value Input type which is implicitly convertible to 'value_type' + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type which is used as apply operation and can be + * called with arguments as Op(cuda::atomic_ref, T). Op strictly must + * have this signature to atomically apply the operation. + * + * @param group The Cooperative Group used to perform group insert + * @param value The element to insert + * @param init The init value of the op + * @param op The callable object to perform binary operation between existing value at the slot + * and the element to insert. + */ + template + __device__ bool dispatch_insert_or_apply( + cooperative_groups::thread_block_tile const& group, + Value const& value, + Init init, + Op op) + { + ref_type& ref_ = static_cast(*this); + // if init equals sentinel value, then we can just `apply` op instead of write + if (cuco::detail::bitwise_compare(init, ref_.empty_value_sentinel())) { + return ref_.insert_or_apply_impl(group, value, op); + } else { + return ref_.insert_or_apply_impl(group, value, op); + } + } + + /** + * @brief Inserts a key-value pair `{k, v}` if it's not present in the map. Otherwise, applies + * `Op` binary function to the mapped_type corresponding to the key `k` and the value `v`. + * + * @tparam UseDirectApply Boolean condition which enables direct apply `op` instead of + * wait_for_payload with an atomic_store on an empty slot. + * @tparam Value Input type which is implicitly convertible to 'value_type' + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type which is used as apply operation and can be + * called with arguments as Op(cuda::atomic_ref, T). Op strictly must + * have this signature to atomically apply the operation. + * + * @param value The element to insert + * @param op The callable object to perform binary operation between existing value at the slot + * and the element to insert. + * + * @return Returns `true` if the given `value` is inserted successfully. + */ + template + __device__ bool insert_or_apply_impl(Value const& value, Op op) + { ref_type& ref_ = static_cast(*this); auto const val = ref_.impl_.heterogeneous_value(value); @@ -622,7 +808,10 @@ class operator_impl< auto& probing_scheme = ref_.impl_.probing_scheme(); auto storage_ref = ref_.impl_.storage_ref(); auto probing_iter = probing_scheme(key, storage_ref.window_extent()); - auto const empty_value = ref_.impl_.empty_slot_sentinel().second; + auto const empty_value = ref_.empty_value_sentinel(); + + // wait for payload only when init != sentinel and insert strategy is not `packed_cas` + auto constexpr wait_for_payload = (not UseDirectApply) and (sizeof(value_type) > 8); while (true) { auto const window_slots = storage_ref[*probing_iter]; @@ -635,17 +824,19 @@ class operator_impl< // If the key is already in the container, update the payload and return if (eq_res == detail::equal_result::EQUAL) { - if constexpr (sizeof(value_type) > 8) { + // wait for payload only when performing insert operation + if constexpr (wait_for_payload) { ref_.impl_.wait_for_payload(slot_ptr->second, empty_value); } op(cuda::atomic_ref{slot_ptr->second}, val.second); return false; } if (eq_res == detail::equal_result::AVAILABLE) { - switch (ref_.impl_.attempt_insert_stable(slot_ptr, slot_content, val)) { + switch (ref_.attempt_insert_or_apply(slot_ptr, slot_content, val, op)) { case insert_result::SUCCESS: return true; case insert_result::DUPLICATE: { - if constexpr (sizeof(value_type) > 8) { + // wait for payload only when performing insert operation + if constexpr (wait_for_payload) { ref_.impl_.wait_for_payload(slot_ptr->second, empty_value); } op(cuda::atomic_ref{slot_ptr->second}, val.second); @@ -659,42 +850,31 @@ class operator_impl< } } - template - __device__ bool insert_or_apply(Value const& value, cuco::op::reduce::sum_tag) - { - auto& ref_ = static_cast(*this); - return ref_.insert_or_apply(value, - [](cuda::atomic_ref payload_ref, T const& payload) { - payload_ref.fetch_add(payload, cuda::memory_order_relaxed); - }); - } - /** * @brief Inserts a key-value pair `{k, v}` if it's not present in the map. Otherwise, applies * `Op` binary function to the mapped_type corresponding to the key `k` and the value `v`. * + * @tparam UseDirectApply Boolean condition which enables direct apply `op` instead of + * wait_for_payload with an atomic_store on an empty slot. * @tparam Value Input type which is implicitly convertible to 'value_type' + * @tparam Init Type of init value convertible to payload type * @tparam Op Callable type which is used as apply operation and can be * called with arguments as Op(cuda::atomic_ref, T). Op strictly must * have this signature to atomically apply the operation. * * @param group The Cooperative Group used to perform group insert * @param value The element to insert + * @param init The init value of the op * @param op The callable object to perform binary operation between existing value at the slot * and the element to insert. * * @return Returns `true` if the given `value` is inserted successfully. */ - - template - __device__ bool insert_or_apply(cooperative_groups::thread_block_tile const& group, - Value const& value, - Op op) + template + __device__ bool insert_or_apply_impl(cooperative_groups::thread_block_tile const& group, + Value const& value, + Op op) { - static_assert( - cuda::std::is_invocable_v, T>, - "insert_or_apply expects `Op` to be a callable as `Op(cuda::atomic_ref, T)`"); - ref_type& ref_ = static_cast(*this); auto const val = ref_.impl_.heterogeneous_value(value); @@ -702,7 +882,10 @@ class operator_impl< auto& probing_scheme = ref_.impl_.probing_scheme(); auto storage_ref = ref_.impl_.storage_ref(); auto probing_iter = probing_scheme(group, key, storage_ref.window_extent()); - auto const empty_value = ref_.impl_.empty_slot_sentinel().second; + auto const empty_value = ref_.empty_value_sentinel(); + + // wait for payload only when init != sentinel and insert strategy is not `packed_cas` + auto constexpr wait_for_payload = (not UseDirectApply) and (sizeof(value_type) > 8); while (true) { auto const window_slots = storage_ref[*probing_iter]; @@ -725,7 +908,7 @@ class operator_impl< if (group_contains_equal) { auto const src_lane = __ffs(group_contains_equal) - 1; if (group.thread_rank() == src_lane) { - if constexpr (sizeof(value_type) > 8) { + if constexpr (wait_for_payload) { ref_.impl_.wait_for_payload(slot_ptr->second, empty_value); } op(cuda::atomic_ref{slot_ptr->second}, val.second); @@ -738,14 +921,15 @@ class operator_impl< auto const src_lane = __ffs(group_contains_available) - 1; auto const status = [&, target_idx = intra_window_index]() { if (group.thread_rank() != src_lane) { return insert_result::CONTINUE; } - return ref_.impl_.attempt_insert_stable(slot_ptr, window_slots[target_idx], val); + return ref_.attempt_insert_or_apply( + slot_ptr, window_slots[target_idx], val, op); }(); switch (group.shfl(status, src_lane)) { case insert_result::SUCCESS: return true; case insert_result::DUPLICATE: { if (group.thread_rank() == src_lane) { - if constexpr (sizeof(value_type) > 8) { + if constexpr (wait_for_payload) { ref_.impl_.wait_for_payload(slot_ptr->second, empty_value); } op(cuda::atomic_ref{slot_ptr->second}, val.second); @@ -760,16 +944,63 @@ class operator_impl< } } - template - __device__ bool insert_or_apply(cooperative_groups::thread_block_tile const& group, - Value const& value, - cuco::op::reduce::sum_tag) + /** + * @brief Attempts to insert a key-value pair `{k, v}` if it's not present in the map. Otherwise, + * applies `Op` binary function to the mapped_type corresponding to the key `k` and the value `v`. + * + * @tparam UseDirectApply Boolean condition which enables direct apply `op` instead of + * wait_for_payload with an atomic_store on an empty slot when insert strategy is not + * `packed_cas`. + * @tparam Value Input type which is implicitly convertible to 'value_type' + * @tparam Op Callable type which is used as apply operation and can be + * called with arguments as Op(cuda::atomic_ref, T). Op strictly must + * have this signature to atomically apply the operation. + * + * @param address Pointer to the slot in memory + * @param expected Element to compare against + * @param desired Element to insert + * @param op The callable object to perform binary operation between existing value at the slot + * and the element to insert. + */ + template + [[nodiscard]] __device__ insert_result attempt_insert_or_apply(value_type* address, + value_type const& expected, + Value const& desired, + Op op) noexcept { - auto& ref_ = static_cast(*this); - return ref_.insert_or_apply( - group, value, [](cuda::atomic_ref payload_ref, T const& payload) { - payload_ref.fetch_add(payload, cuda::memory_order_relaxed); - }); + ref_type& ref_ = static_cast(*this); + + if constexpr (sizeof(value_type) <= 8) { + return ref_.impl_.packed_cas(address, expected, desired); // no need to wait for payload + } else { + using mapped_type = T; + + cuda::atomic_ref key_ref(address->first); + auto expected_key = expected.first; + auto const success = key_ref.compare_exchange_strong( + expected_key, static_cast(desired.first), cuda::memory_order_relaxed); + + // if key success + if (success) { + cuda::atomic_ref payload_ref(address->second); + // if init values == sentinel then directly apply the `op` + if constexpr (UseDirectApply) { + op(payload_ref, desired.second); + } else { + payload_ref.store(desired.second, cuda::memory_order_relaxed); + } + return insert_result::SUCCESS; + } + + // Our key was already present in the slot, so our key is a duplicate + // Shouldn't use `predicate` operator directly since it includes a redundant bitwise compare + if (ref_.impl_.predicate_.equal_to(desired.first, expected_key) == + detail::equal_result::EQUAL) { + return insert_result::DUPLICATE; + } + + return insert_result::CONTINUE; + } } }; diff --git a/include/cuco/operator.hpp b/include/cuco/operator.hpp index cb4cfd4de..303124ec4 100644 --- a/include/cuco/operator.hpp +++ b/include/cuco/operator.hpp @@ -74,16 +74,6 @@ struct find_tag { struct for_each_tag { } inline constexpr for_each; ///< `cuco::for_each` operator -namespace reduce { - -/** - * @brief `sum` reduction operator tag - */ -struct sum_tag { -} inline constexpr sum; - -} // namespace reduce - } // namespace op } // namespace cuco diff --git a/include/cuco/static_map.cuh b/include/cuco/static_map.cuh index 2559f6427..9c87e45a9 100644 --- a/include/cuco/static_map.cuh +++ b/include/cuco/static_map.cuh @@ -488,6 +488,34 @@ class static_map { template void insert_or_apply(InputIt first, InputIt last, Op op, cuda::stream_ref stream = {}); + /** + * @brief For any key-value pair `{k, v}` in the range `[first, first + n)`, if a key equivalent + * to `k` already exists in the container, then binary operation is applied using `op` callable + * object on the existing value at slot and the element to insert. If the key does not exist, + * inserts the pair as if by insert. + * + * @note This function synchronizes the given stream. For asynchronous execution use + * `insert_or_apply_async`. + * @note Callable object to perform binary operation should be able to invoke as + * Op(cuda::atomic_ref, T>) + * @note There could be performance improvements if `init` value passed here equals to the + * `sentinel value` of the map. + * + * @tparam InputIt Device accessible random access input iterator where + * std::is_convertible::value_type, + * static_map::value_type> is `true` + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type used to peform `apply` operation. + * + * @param first Beginning of the sequence of keys + * @param last End of the sequence of keys + * @param init The identity value of the op + * @param op Callable object to perform apply operation. + * @param stream CUDA stream used for insert + */ + template + void insert_or_apply(InputIt first, InputIt last, Init init, Op op, cuda::stream_ref stream = {}); + /** * @brief For any key-value pair `{k, v}` in the range `[first, first + n)`, if a key equivalent * to `k` already exists in the container, then binary operation is applied using `op` callable @@ -513,6 +541,33 @@ class static_map { Op op, cuda::stream_ref stream = {}) noexcept; + /** + * @brief For any key-value pair `{k, v}` in the range `[first, first + n)`, if a key equivalent + * to `k` already exists in the container, then binary operation is applied using `op` callable + * object on the existing value at slot and the element to insert. If the key does not exist, + * inserts the pair as if by insert. + * + * @note Callable object to perform binary operation should be able to invoke as + * Op(cuda::atomic_ref, T>) + * @note There could be performance improvements if `init` value passed here equals to the + * `sentinel value` of the map. + * + * @tparam InputIt Device accessible random access input iterator where + * std::is_convertible::value_type, + * static_map::value_type> is `true` + * @tparam Init Type of init value convertible to payload type + * @tparam Op Callable type used to peform `apply` operation. + * + * @param first Beginning of the sequence of keys + * @param last End of the sequence of keys + * @param init The identity value of the op + * @param op Callable object to perform apply operation. + * @param stream CUDA stream used for insert + */ + template + void insert_or_apply_async( + InputIt first, InputIt last, Init init, Op op, cuda::stream_ref stream = {}) noexcept; + /** * @brief Erases keys in the range `[first, last)`. * diff --git a/include/cuco/utility/reduction_functors.cuh b/include/cuco/utility/reduction_functors.cuh new file mode 100644 index 000000000..eaf6b03a0 --- /dev/null +++ b/include/cuco/utility/reduction_functors.cuh @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace cuco::reduce { + +/** + * @brief Device functor performing sum reduction, used with `insert-or-apply` + */ +struct plus { + /** + * @brief Performs atomic fetch_add on payload and the new value to be inserted + * + * @tparam T The payload type + * @tparam Scope The cuda::thread_scope used for atomic_ref + * + * @param payload_ref The atomic_ref pointing to payload part of the slot + * @param val The new value to be applied as reduction to the current value + * in the payload. + */ + template + __device__ void operator()(cuda::atomic_ref payload_ref, const T& val) + { + payload_ref.fetch_add(val, cuda::memory_order_relaxed); + } +}; + +/** + * @brief Device functor performing max reduction, used with `insert-or-apply` + */ +struct max { + /** + * @brief Performs atomic fetch_max on payload and the new value to be inserted + * + * @tparam T The payload type + * @tparam Scope The cuda::thread_scope used for atomic_ref + * + * @param payload_ref The atomic_ref pointing to payload part of the slot + * @param val The new value to be applied as reduction to the current value + * in the payload. + */ + template + __device__ void operator()(cuda::atomic_ref payload_ref, const T& val) + { + payload_ref.fetch_max(val, cuda::memory_order_relaxed); + } +}; + +/** + * @brief Device functor performing min reduction, used with `insert-or-apply` + */ +struct min { + /** + * @brief Performs atomic fetch_min on payload and the new value to be inserted + * + * @tparam T The payload type + * @tparam Scope The cuda::thread_scope used for atomic_ref + * + * @param payload_ref The atomic_ref pointing to payload part of the slot + * @param val The new value to be applied as reduction to the current value + * in the payload. + */ + template + __device__ void operator()(cuda::atomic_ref payload_ref, const T& val) + { + payload_ref.fetch_min(val, cuda::memory_order_relaxed); + } +}; + +} // namespace cuco::reduce \ No newline at end of file diff --git a/tests/static_map/insert_or_apply_test.cu b/tests/static_map/insert_or_apply_test.cu index e27013949..1b818e38c 100644 --- a/tests/static_map/insert_or_apply_test.cu +++ b/tests/static_map/insert_or_apply_test.cu @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -29,20 +30,11 @@ #include #include -#include using size_type = std::size_t; -struct binary_plus_op { - template - __device__ void operator()(cuda::atomic_ref lhs, T rhs) - { - lhs.fetch_add(rhs, cuda::memory_order_relaxed); - } -}; - -template -void test_insert_or_apply(Map& map, size_type num_keys, size_type num_unique_keys) +template +void test_insert_or_apply(Map& map, size_type num_keys, size_type num_unique_keys, Init init) { REQUIRE((num_keys % num_unique_keys) == 0); @@ -56,7 +48,12 @@ void test_insert_or_apply(Map& map, size_type num_keys, size_type num_unique_key return cuco::pair{i % num_unique_keys, 1}; })); - map.insert_or_apply(pairs_begin, pairs_begin + num_keys, binary_plus_op{}); + auto constexpr plus_op = cuco::reduce::plus{}; + if constexpr (HasInit) { + map.insert_or_apply(pairs_begin, pairs_begin + num_keys, init, plus_op); + } else { + map.insert_or_apply(pairs_begin, pairs_begin + num_keys, plus_op); + } REQUIRE(map.size() == num_unique_keys); @@ -70,8 +67,8 @@ void test_insert_or_apply(Map& map, size_type num_keys, size_type num_unique_key thrust::equal_to{})); } -template -void test_insert_or_apply_shmem(Map& map, size_type num_keys, size_type num_unique_keys) +template +void test_insert_or_apply_shmem(Map& map, size_type num_keys, size_type num_unique_keys, Init init) { REQUIRE((num_keys % num_unique_keys) == 0); @@ -118,9 +115,14 @@ void test_insert_or_apply_shmem(Map& map, size_type num_keys, size_type num_uniq cuda::stream_ref stream{}; // launch the shmem kernel - cuco::static_map_ns::detail::insert_or_apply_shmem - <<>>( - pairs_begin, num_keys, binary_plus_op{}, map.ref(cuco::op::insert_or_apply), window_extent); + cuco::static_map_ns::detail:: + insert_or_apply_shmem + <<>>(pairs_begin, + num_keys, + init, + cuco::reduce::plus{}, + map.ref(cuco::op::insert_or_apply), + window_extent); REQUIRE(map.size() == num_unique_keys); @@ -176,18 +178,25 @@ TEMPLATE_TEST_CASE_SIG( cuco::cuda_allocator, cuco::storage<2>>; - SECTION("Sentinel equals to identity") + SECTION("sentinel equals init; has_init = true") { auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; - - test_insert_or_apply(map, num_keys, num_unique_keys); + test_insert_or_apply(map, num_keys, num_unique_keys, static_cast(0)); } - - SECTION("Sentinel not equals to identity") + SECTION("sentinel equals init; has_init = false") { - auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{-1}}; - - test_insert_or_apply(map, num_keys, num_unique_keys); + auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; + test_insert_or_apply(map, num_keys, num_unique_keys, static_cast(0)); + } + SECTION("sentinel not equals init; has_init = true") + { + auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; + test_insert_or_apply(map, num_keys, num_unique_keys, static_cast(-1)); + } + SECTION("sentinel not equals init; has_init = false") + { + auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; + test_insert_or_apply(map, num_keys, num_unique_keys, static_cast(-1)); } } @@ -198,17 +207,35 @@ TEMPLATE_TEST_CASE_SIG( constexpr size_type num_keys = 100; - auto map = cuco::static_map, - cuda::thread_scope_device, - thrust::equal_to, - cuco::linear_probing<1, cuco::murmurhash3_32>, - cuco::cuda_allocator, - cuco::storage<2>>{ - num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; - - test_insert_or_apply(map, num_keys, num_keys); + using map_type = cuco::static_map, + cuda::thread_scope_device, + thrust::equal_to, + cuco::linear_probing<2, cuco::murmurhash3_32>, + cuco::cuda_allocator, + cuco::storage<2>>; + + SECTION("sentinel equals init; has_init = true") + { + auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; + test_insert_or_apply(map, num_keys, num_keys, static_cast(0)); + } + SECTION("sentinel equals init; has_init = false") + { + auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; + test_insert_or_apply(map, num_keys, num_keys, static_cast(0)); + } + SECTION("sentinel not equals init; has_init = true") + { + auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; + test_insert_or_apply(map, num_keys, num_keys, static_cast(-1)); + } + SECTION("sentinel not equals init; has_init = false") + { + auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; + test_insert_or_apply(map, num_keys, num_keys, static_cast(-1)); + } } TEMPLATE_TEST_CASE_SIG( @@ -231,7 +258,7 @@ TEMPLATE_TEST_CASE_SIG( constexpr size_type num_unique_keys = 100; auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; - test_insert_or_apply_shmem(map, num_keys, num_unique_keys); + test_insert_or_apply_shmem(map, num_keys, num_unique_keys, static_cast(0)); } SECTION("unique keys") @@ -240,6 +267,6 @@ TEMPLATE_TEST_CASE_SIG( constexpr size_type num_unique_keys = num_keys; auto map = map_type{num_keys, cuco::empty_key{-1}, cuco::empty_value{0}}; - test_insert_or_apply_shmem(map, num_keys, num_unique_keys); + test_insert_or_apply_shmem(map, num_keys, num_unique_keys, static_cast(0)); } }