diff --git a/benchmarks/bloom_filter/add_bench.cu b/benchmarks/bloom_filter/add_bench.cu index 72322bc21..e7ae4cb68 100644 --- a/benchmarks/bloom_filter/add_bench.cu +++ b/benchmarks/bloom_filter/add_bench.cu @@ -29,6 +29,7 @@ #include #include +#include using namespace cuco::benchmark; // defaults, dist_from_state, rebind_hasher_t, add_fpr_summary using namespace cuco::utility; // key_generator, distribution @@ -40,31 +41,38 @@ template , Dist>) { + using size_type = std::uint32_t; using policy_type = cuco::default_filter_policy, Word, static_cast(WordsPerBlock)>; using filter_type = - cuco::bloom_filter, cuda::thread_scope_device, policy_type>; + cuco::bloom_filter, cuda::thread_scope_device, policy_type>; + + constexpr auto filter_block_size = + sizeof(typename filter_type::word_type) * filter_type::words_per_block; auto const num_keys = state.get_int64("NumInputs"); auto const filter_size_mb = state.get_int64("FilterSizeMB"); auto const pattern_bits = WordsPerBlock; try { - auto const policy = policy_type{static_cast(pattern_bits)}; + auto const policy = policy_type{static_cast(pattern_bits)}; } catch (std::exception const& e) { state.skip(e.what()); // skip invalid configurations } - std::size_t const num_sub_filters = - (filter_size_mb * 1024 * 1024) / - (sizeof(typename filter_type::word_type) * filter_type::words_per_block); + std::size_t const num_sub_filters = (filter_size_mb * 1024 * 1024) / filter_block_size; + + if (num_sub_filters > std::numeric_limits::max()) { + state.skip("num_sub_filters too large for size_type"); // skip invalid configurations + } thrust::counting_iterator keys(0); state.add_element_count(num_keys); - filter_type filter{num_sub_filters, {}, {static_cast(pattern_bits)}}; + filter_type filter{ + static_cast(num_sub_filters), {}, {static_cast(pattern_bits)}}; state.collect_dram_throughput(); state.collect_l2_hit_rates(); @@ -83,9 +91,10 @@ void bloom_filter_add(nvbench::state& state, template void arrow_bloom_filter_add(nvbench::state& state, nvbench::type_list) { + using size_type = std::uint32_t; using policy_type = cuco::arrow_filter_policy; using filter_type = - cuco::bloom_filter, cuda::thread_scope_device, policy_type>; + cuco::bloom_filter, cuda::thread_scope_device, policy_type>; auto const num_keys = state.get_int64("NumInputs"); auto const filter_size_mb = state.get_int64("FilterSizeMB"); @@ -103,7 +112,7 @@ void arrow_bloom_filter_add(nvbench::state& state, nvbench::type_list state.add_element_count(num_keys); - filter_type filter{num_sub_filters}; + filter_type filter{static_cast(num_sub_filters)}; state.collect_dram_throughput(); state.collect_l2_hit_rates(); diff --git a/benchmarks/bloom_filter/contains_bench.cu b/benchmarks/bloom_filter/contains_bench.cu index 1eae4e13f..29561ec85 100644 --- a/benchmarks/bloom_filter/contains_bench.cu +++ b/benchmarks/bloom_filter/contains_bench.cu @@ -29,6 +29,7 @@ #include #include +#include using namespace cuco::benchmark; // defaults, dist_from_state, rebind_hasher_t, add_fpr_summary using namespace cuco::utility; // key_generator, distribution @@ -41,13 +42,20 @@ void bloom_filter_contains( nvbench::state& state, nvbench::type_list, Dist>) { - // cudaDeviceSetLimit(cudaLimitMaxL2FetchGranularity, 32); // slightly improves peformance if - // filter block fits into a 32B sector + using size_type = std::uint32_t; using policy_type = cuco::default_filter_policy, Word, static_cast(WordsPerBlock)>; using filter_type = - cuco::bloom_filter, cuda::thread_scope_device, policy_type>; + cuco::bloom_filter, cuda::thread_scope_device, policy_type>; + + constexpr auto filter_block_size = + sizeof(typename filter_type::word_type) * filter_type::words_per_block; + + // if (filter_block_size <= 32) { + // cudaDeviceSetLimit(cudaLimitMaxL2FetchGranularity, 32); // slightly improves peformance if + // filter block fits into a 32B sector + // } auto const num_keys = state.get_int64("NumInputs"); auto const filter_size_mb = state.get_int64("FilterSizeMB"); @@ -59,16 +67,19 @@ void bloom_filter_contains( state.skip(e.what()); // skip invalid configurations } - std::size_t const num_sub_filters = - (filter_size_mb * 1024 * 1024) / - (sizeof(typename filter_type::word_type) * filter_type::words_per_block); + std::size_t const num_sub_filters = (filter_size_mb * 1024 * 1024) / filter_block_size; + + if (num_sub_filters > std::numeric_limits::max()) { + state.skip("num_sub_filters too large for size_type"); // skip invalid configurations + } thrust::counting_iterator keys(0); thrust::device_vector result(num_keys, false); state.add_element_count(num_keys); - filter_type filter{num_sub_filters, {}, {static_cast(pattern_bits)}}; + filter_type filter{ + static_cast(num_sub_filters), {}, {static_cast(pattern_bits)}}; state.collect_dram_throughput(); state.collect_l2_hit_rates(); @@ -91,9 +102,10 @@ void arrow_bloom_filter_contains(nvbench::state& state, nvbench::type_list; using filter_type = - cuco::bloom_filter, cuda::thread_scope_device, policy_type>; + cuco::bloom_filter, cuda::thread_scope_device, policy_type>; auto const num_keys = state.get_int64("NumInputs"); auto const filter_size_mb = state.get_int64("FilterSizeMB"); @@ -112,7 +124,7 @@ void arrow_bloom_filter_contains(nvbench::state& state, nvbench::type_list(num_sub_filters)}; state.collect_dram_throughput(); state.collect_l2_hit_rates(); diff --git a/include/cuco/bloom_filter_ref.cuh b/include/cuco/bloom_filter_ref.cuh index ee65c52bb..2f3dcfa2b 100644 --- a/include/cuco/bloom_filter_ref.cuh +++ b/include/cuco/bloom_filter_ref.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -134,6 +134,22 @@ class bloom_filter_ref { template __device__ void add(CG const& group, ProbeKey const& key); + /** + * @brief Device function that adds all keys in the range `[first, last)` to the filter. + * + * @note Best performance is achieved if the size of the CG is larger than or equal to + * `words_per_block`. + * + * @tparam CG Cooperative Group type + * @tparam InputIt Device-accessible random access input key iterator + * + * @param group The Cooperative Group this operation is executed with + * @param first Beginning of the sequence of keys + * @param last End of the sequence of keys + */ + template + __device__ void add(CG const& group, InputIt first, InputIt last); + /** * @brief Adds all keys in the range `[first, last)` to the filter. * diff --git a/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh b/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh index 15b70b118..2a8f5b9c1 100644 --- a/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh +++ b/include/cuco/detail/bloom_filter/bloom_filter_impl.cuh @@ -35,6 +35,8 @@ #include #include +#include + #include namespace cuco::detail { @@ -121,14 +123,28 @@ class bloom_filter_impl { __device__ void add(ProbeKey const& key) { auto const hash_value = policy_.hash(key); - auto const idx = policy_.block_index(hash_value, num_blocks_); + this->add_impl(hash_value, policy_.block_index(hash_value, num_blocks_)); + } + + template + __device__ void add(InputIt first, InputIt last) + { + auto const num_keys = cuco::detail::distance(first, last); + for (decltype(num_keys) i = 0; i < num_keys; ++i) { + auto const hash_value = policy_.hash(*(first + i)); + this->add_impl(hash_value, policy_.block_index(hash_value, num_blocks_)); + } + } + template + __device__ void add_impl(HashValue const& hash_value, BlockIndex block_index) + { #pragma unroll words_per_block for (uint32_t i = 0; i < words_per_block; ++i) { auto const word = policy_.word_pattern(hash_value, i); if (word != 0) { - auto atom_word = - cuda::atomic_ref{*(words_ + (idx * words_per_block + i))}; + auto atom_word = cuda::atomic_ref{ + *(words_ + (block_index * words_per_block + i))}; atom_word.fetch_or(word, cuda::memory_order_relaxed); } } @@ -139,24 +155,97 @@ class bloom_filter_impl { { constexpr auto num_threads = tile_size_v; constexpr auto optimal_num_threads = add_optimal_cg_size(); - constexpr auto words_per_thread = words_per_block / optimal_num_threads; + constexpr auto worker_num_threads = + (num_threads < optimal_num_threads) ? num_threads : optimal_num_threads; // If single thread is optimal, use scalar add - if constexpr (num_threads == 1 or optimal_num_threads == 1) { + if constexpr (worker_num_threads == 1) { this->add(key); } else { - auto const rank = group.thread_rank(); - auto const hash_value = policy_.hash(key); - auto const idx = policy_.block_index(hash_value, num_blocks_); + this->add_impl(hash_value, policy_.block_index(hash_value, num_blocks_)); + } + } -#pragma unroll - for (uint32_t i = rank; i < optimal_num_threads; i += num_threads) { - auto const word = policy_.word_pattern(hash_value, rank); + template + __device__ void add(CG const& group, InputIt first, InputIt last) + { + namespace cg = cooperative_groups; - auto atom_word = - cuda::atomic_ref{*(words_ + (idx * words_per_block + rank))}; - atom_word.fetch_or(word, cuda::memory_order_relaxed); + constexpr auto num_threads = tile_size_v; + constexpr auto optimal_num_threads = add_optimal_cg_size(); + constexpr auto worker_num_threads = + (num_threads < optimal_num_threads) ? num_threads : optimal_num_threads; + + auto const num_keys = cuco::detail::distance(first, last); + if (num_keys == 0) { return; } + + auto const rank = group.thread_rank(); + + // If single thread is optimal, use scalar add + if constexpr (worker_num_threads == 1) { + for (auto i = rank; i < num_keys; i += num_threads) { + typename std::iterator_traits::value_type const& insert_element{*(first + i)}; + this->add(insert_element); + } + } else if constexpr (num_threads == worker_num_threads) { // given CG is optimal CG + typename policy_type::hash_result_type hash_value; + size_type block_index; + + auto const group_iters = cuco::detail::int_div_ceil(num_keys, num_threads); + for (size_type i = 0; (i / num_threads) < group_iters; i += num_threads) { + if (i + rank < num_keys) { + typename std::iterator_traits::value_type const& insert_element{ + *(first + i + rank)}; + hash_value = policy_.hash(insert_element); + block_index = policy_.block_index(hash_value, num_blocks_); + } + + for (uint32_t j = 0; (j < num_threads) and (i + j < num_keys); ++j) { + this->add_impl(group, group.shfl(hash_value, j), group.shfl(block_index, j)); + } + } + } else { // subdivide given CG into multiple optimal CGs + typename policy_type::hash_result_type hash_value; + size_type block_index; + + auto const worker_group = cg::tiled_partition(group); + auto const worker_offset = worker_num_threads * worker_group.meta_group_rank(); + + auto const group_iters = cuco::detail::int_div_ceil(num_keys, num_threads); + + for (size_type i = 0; (i / num_threads) < group_iters; i += num_threads) { + if (i + rank < num_keys) { + typename std::iterator_traits::value_type const& key{*(first + i + rank)}; + hash_value = policy_.hash(key); + block_index = policy_.block_index(hash_value, num_blocks_); + } + + for (uint32_t j = 0; (j < worker_num_threads) and (i + worker_offset + j < num_keys); ++j) { + this->add_impl( + worker_group, worker_group.shfl(hash_value, j), worker_group.shfl(block_index, j)); + } + } + } + } + + template + __device__ void add_impl(CG const& group, HashValue const& hash_value, BlockIndex block_index) + { + constexpr auto num_threads = tile_size_v; + + auto const rank = group.thread_rank(); + + if constexpr (num_threads == words_per_block) { + auto atom_word = cuda::atomic_ref{ + *(words_ + (block_index * words_per_block + rank))}; + atom_word.fetch_or(policy_.word_pattern(hash_value, rank), cuda::memory_order_relaxed); + } else { +#pragma unroll + for (auto i = rank; i < words_per_block; i += num_threads) { + auto atom_word = cuda::atomic_ref{ + *(words_ + (block_index * words_per_block + i))}; + atom_word.fetch_or(policy_.word_pattern(hash_value, i), cuda::memory_order_relaxed); } } } @@ -181,8 +270,17 @@ class bloom_filter_impl { [*this] __device__(key_type const key) mutable { this->add(key); }, stream.get())); } else { - auto const always_true = thrust::constant_iterator{true}; - this->add_if_async(first, last, always_true, cuda::std::identity{}, stream); + auto const num_keys = cuco::detail::distance(first, last); + if (num_keys == 0) { return; } + + auto constexpr cg_size = add_optimal_cg_size(); + auto constexpr block_size = cuco::detail::default_block_size(); + void const* kernel = reinterpret_cast( + detail::bloom_filter_ns::add); + auto const grid_size = cuco::detail::max_occupancy_grid_size(block_size, kernel); + + detail::bloom_filter_ns::add + <<>>(first, num_keys, *this); } } diff --git a/include/cuco/detail/bloom_filter/bloom_filter_ref.inl b/include/cuco/detail/bloom_filter/bloom_filter_ref.inl index ee99396db..96d2c0573 100644 --- a/include/cuco/detail/bloom_filter/bloom_filter_ref.inl +++ b/include/cuco/detail/bloom_filter/bloom_filter_ref.inl @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,15 @@ __device__ void bloom_filter_ref::add(CG const& grou impl_.add(group, key); } +template +template +__device__ void bloom_filter_ref::add(CG const& group, + InputIt first, + InputIt last) +{ + impl_.add(group, first, last); +} + template template __host__ constexpr void bloom_filter_ref::add(InputIt first, diff --git a/include/cuco/detail/bloom_filter/kernels.cuh b/include/cuco/detail/bloom_filter/kernels.cuh index b0ef7b684..9e04b73c4 100644 --- a/include/cuco/detail/bloom_filter/kernels.cuh +++ b/include/cuco/detail/bloom_filter/kernels.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,28 @@ namespace cuco::detail::bloom_filter_ns { CUCO_SUPPRESS_KERNEL_WARNINGS +template +CUCO_KERNEL __launch_bounds__(BlockSize) void add(InputIt first, + cuco::detail::index_type n, + Ref ref) +{ + namespace cg = cooperative_groups; + + constexpr auto tile_size = cuco::detail::warp_size(); + + auto const tile_idx = cuco::detail::global_thread_id() / tile_size; + auto const n_tiles = gridDim.x * BlockSize / tile_size; + auto const items_per_tile = cuco::detail::int_div_ceil(n, n_tiles); + + auto const tile_start = tile_idx * items_per_tile; + if (tile_start >= n) { return; } + auto const tile_stop = (tile_start + items_per_tile < n) ? tile_start + items_per_tile : n; + + auto const tile = cg::tiled_partition(cg::this_thread_block()); + + ref.add(tile, first + tile_start, first + tile_stop); +} + template