Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions benchmarks/bloom_filter/add_bench.cu
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <cstdint>
#include <exception>
#include <limits>

using namespace cuco::benchmark; // defaults, dist_from_state, rebind_hasher_t, add_fpr_summary
using namespace cuco::utility; // key_generator, distribution
Expand All @@ -40,31 +41,38 @@ template <typename Key, typename Hash, typename Word, nvbench::int32_t WordsPerB
void bloom_filter_add(nvbench::state& state,
nvbench::type_list<Key, Hash, Word, nvbench::enum_type<WordsPerBlock>, Dist>)
{
using size_type = std::uint32_t;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runtime difference brought by the 100% occupancy is not worth the change IMO.

Copy link
Collaborator Author

@sleeepyjack sleeepyjack Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size_type = uint32_t should be the default size type for bloom_filter IMO. The only thing preventing us from setting this default in the public API is (1) we use size_t everywhere else in our library to denote sizes (I could start my usual rant about how it was a mistake for STL to choose uint64_t as the default size type). (2) A user might run into the inconvenience of a narrowing conversion if their input value is a size_t.

From an algorithmic standpoint, there is little to no need to use a wider type.

The benefit is that the kernel now achieves near-optimal occupancy, although it's not showing any significant end-to-end effect for our particular benchmark setup. However, lets say a user wants to use a hasher that needs more registers than xxhash64, then, with the 4 more registers available, the compiler has more options to optimize the code before running out of resources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are using size_t mainly to match STL and when possible, cudf or the upstream CCCL algorithms always use the signed integer like int32_t/int64_t as size type since it doesn't have the overflow handling.

using policy_type = cuco::default_filter_policy<rebind_hasher_t<Hash, Key>,
Word,
static_cast<std::uint32_t>(WordsPerBlock)>;
using filter_type =
cuco::bloom_filter<Key, cuco::extent<size_t>, cuda::thread_scope_device, policy_type>;
cuco::bloom_filter<Key, cuco::extent<size_type>, cuda::thread_scope_device, policy_type>;

constexpr auto filter_block_size =
sizeof(typename filter_type::word_type) * filter_type::words_per_block;

auto const num_keys = state.get_int64("NumInputs");
auto const filter_size_mb = state.get_int64("FilterSizeMB");
auto const pattern_bits = WordsPerBlock;

try {
auto const policy = policy_type{static_cast<uint32_t>(pattern_bits)};
auto const policy = policy_type{static_cast<std::uint32_t>(pattern_bits)};
} catch (std::exception const& e) {
state.skip(e.what()); // skip invalid configurations
}

std::size_t const num_sub_filters =
(filter_size_mb * 1024 * 1024) /
(sizeof(typename filter_type::word_type) * filter_type::words_per_block);
std::size_t const num_sub_filters = (filter_size_mb * 1024 * 1024) / filter_block_size;

if (num_sub_filters > std::numeric_limits<size_type>::max()) {
state.skip("num_sub_filters too large for size_type"); // skip invalid configurations
}

thrust::counting_iterator<Key> keys(0);

state.add_element_count(num_keys);

filter_type filter{num_sub_filters, {}, {static_cast<uint32_t>(pattern_bits)}};
filter_type filter{
static_cast<size_type>(num_sub_filters), {}, {static_cast<std::uint32_t>(pattern_bits)}};

state.collect_dram_throughput();
state.collect_l2_hit_rates();
Expand All @@ -83,9 +91,10 @@ void bloom_filter_add(nvbench::state& state,
template <typename Key, typename Dist>
void arrow_bloom_filter_add(nvbench::state& state, nvbench::type_list<Key, Dist>)
{
using size_type = std::uint32_t;
using policy_type = cuco::arrow_filter_policy<Key>;
using filter_type =
cuco::bloom_filter<Key, cuco::extent<size_t>, cuda::thread_scope_device, policy_type>;
cuco::bloom_filter<Key, cuco::extent<size_type>, cuda::thread_scope_device, policy_type>;

auto const num_keys = state.get_int64("NumInputs");
auto const filter_size_mb = state.get_int64("FilterSizeMB");
Expand All @@ -103,7 +112,7 @@ void arrow_bloom_filter_add(nvbench::state& state, nvbench::type_list<Key, Dist>

state.add_element_count(num_keys);

filter_type filter{num_sub_filters};
filter_type filter{static_cast<size_type>(num_sub_filters)};

state.collect_dram_throughput();
state.collect_l2_hit_rates();
Expand Down
30 changes: 21 additions & 9 deletions benchmarks/bloom_filter/contains_bench.cu
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <thrust/iterator/counting_iterator.h>

#include <exception>
#include <limits>

using namespace cuco::benchmark; // defaults, dist_from_state, rebind_hasher_t, add_fpr_summary
using namespace cuco::utility; // key_generator, distribution
Expand All @@ -41,13 +42,20 @@ void bloom_filter_contains(
nvbench::state& state,
nvbench::type_list<Key, Hash, Word, nvbench::enum_type<WordsPerBlock>, Dist>)
{
// cudaDeviceSetLimit(cudaLimitMaxL2FetchGranularity, 32); // slightly improves peformance if
// filter block fits into a 32B sector
using size_type = std::uint32_t;
using policy_type = cuco::default_filter_policy<rebind_hasher_t<Hash, Key>,
Word,
static_cast<std::uint32_t>(WordsPerBlock)>;
using filter_type =
cuco::bloom_filter<Key, cuco::extent<size_t>, cuda::thread_scope_device, policy_type>;
cuco::bloom_filter<Key, cuco::extent<size_type>, cuda::thread_scope_device, policy_type>;

constexpr auto filter_block_size =
sizeof(typename filter_type::word_type) * filter_type::words_per_block;

// if (filter_block_size <= 32) {
// cudaDeviceSetLimit(cudaLimitMaxL2FetchGranularity, 32); // slightly improves peformance if
// filter block fits into a 32B sector
// }

auto const num_keys = state.get_int64("NumInputs");
auto const filter_size_mb = state.get_int64("FilterSizeMB");
Expand All @@ -59,16 +67,19 @@ void bloom_filter_contains(
state.skip(e.what()); // skip invalid configurations
}

std::size_t const num_sub_filters =
(filter_size_mb * 1024 * 1024) /
(sizeof(typename filter_type::word_type) * filter_type::words_per_block);
std::size_t const num_sub_filters = (filter_size_mb * 1024 * 1024) / filter_block_size;

if (num_sub_filters > std::numeric_limits<size_type>::max()) {
state.skip("num_sub_filters too large for size_type"); // skip invalid configurations
}

thrust::counting_iterator<Key> keys(0);
thrust::device_vector<bool> result(num_keys, false);

state.add_element_count(num_keys);

filter_type filter{num_sub_filters, {}, {static_cast<uint32_t>(pattern_bits)}};
filter_type filter{
static_cast<size_type>(num_sub_filters), {}, {static_cast<uint32_t>(pattern_bits)}};

state.collect_dram_throughput();
state.collect_l2_hit_rates();
Expand All @@ -91,9 +102,10 @@ void arrow_bloom_filter_contains(nvbench::state& state, nvbench::type_list<Key,
{
// cudaDeviceSetLimit(cudaLimitMaxL2FetchGranularity, 32); // slightly improves peformance if
// filter block fits into a 32B sector
using size_type = std::uint32_t;
using policy_type = cuco::arrow_filter_policy<Key>;
using filter_type =
cuco::bloom_filter<Key, cuco::extent<size_t>, cuda::thread_scope_device, policy_type>;
cuco::bloom_filter<Key, cuco::extent<size_type>, cuda::thread_scope_device, policy_type>;

auto const num_keys = state.get_int64("NumInputs");
auto const filter_size_mb = state.get_int64("FilterSizeMB");
Expand All @@ -112,7 +124,7 @@ void arrow_bloom_filter_contains(nvbench::state& state, nvbench::type_list<Key,

state.add_element_count(num_keys);

filter_type filter{num_sub_filters};
filter_type filter{static_cast<size_type>(num_sub_filters)};

state.collect_dram_throughput();
state.collect_l2_hit_rates();
Expand Down
18 changes: 17 additions & 1 deletion include/cuco/bloom_filter_ref.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -134,6 +134,22 @@ class bloom_filter_ref {
template <class CG, class ProbeKey>
__device__ void add(CG const& group, ProbeKey const& key);

/**
* @brief Device function that adds all keys in the range `[first, last)` to the filter.
*
* @note Best performance is achieved if the size of the CG is larger than or equal to
* `words_per_block`.
*
* @tparam CG Cooperative Group type
* @tparam InputIt Device-accessible random access input key iterator
*
* @param group The Cooperative Group this operation is executed with
* @param first Beginning of the sequence of keys
* @param last End of the sequence of keys
*/
template <class CG, class InputIt>
__device__ void add(CG const& group, InputIt first, InputIt last);

/**
* @brief Adds all keys in the range `[first, last)` to the filter.
*
Expand Down
130 changes: 114 additions & 16 deletions include/cuco/detail/bloom_filter/bloom_filter_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <cuda/stream_ref>
#include <thrust/iterator/constant_iterator.h>

#include <cooperative_groups.h>

#include <cstdint>

namespace cuco::detail {
Expand Down Expand Up @@ -121,14 +123,28 @@ class bloom_filter_impl {
__device__ void add(ProbeKey const& key)
{
auto const hash_value = policy_.hash(key);
auto const idx = policy_.block_index(hash_value, num_blocks_);
this->add_impl(hash_value, policy_.block_index(hash_value, num_blocks_));
}

template <class InputIt>
__device__ void add(InputIt first, InputIt last)
{
auto const num_keys = cuco::detail::distance(first, last);
for (decltype(num_keys) i = 0; i < num_keys; ++i) {
auto const hash_value = policy_.hash(*(first + i));
this->add_impl(hash_value, policy_.block_index(hash_value, num_blocks_));
}
}

template <class HashValue, class BlockIndex>
__device__ void add_impl(HashValue const& hash_value, BlockIndex block_index)
{
#pragma unroll words_per_block
for (uint32_t i = 0; i < words_per_block; ++i) {
auto const word = policy_.word_pattern(hash_value, i);
if (word != 0) {
auto atom_word =
cuda::atomic_ref<word_type, thread_scope>{*(words_ + (idx * words_per_block + i))};
auto atom_word = cuda::atomic_ref<word_type, thread_scope>{
*(words_ + (block_index * words_per_block + i))};
atom_word.fetch_or(word, cuda::memory_order_relaxed);
}
}
Expand All @@ -139,24 +155,97 @@ class bloom_filter_impl {
{
constexpr auto num_threads = tile_size_v<CG>;
constexpr auto optimal_num_threads = add_optimal_cg_size();
constexpr auto words_per_thread = words_per_block / optimal_num_threads;
constexpr auto worker_num_threads =
(num_threads < optimal_num_threads) ? num_threads : optimal_num_threads;

// If single thread is optimal, use scalar add
if constexpr (num_threads == 1 or optimal_num_threads == 1) {
if constexpr (worker_num_threads == 1) {
this->add(key);
} else {
auto const rank = group.thread_rank();

auto const hash_value = policy_.hash(key);
auto const idx = policy_.block_index(hash_value, num_blocks_);
this->add_impl(hash_value, policy_.block_index(hash_value, num_blocks_));
}
}

#pragma unroll
for (uint32_t i = rank; i < optimal_num_threads; i += num_threads) {
auto const word = policy_.word_pattern(hash_value, rank);
template <class CG, class InputIt>
__device__ void add(CG const& group, InputIt first, InputIt last)
{
namespace cg = cooperative_groups;

auto atom_word =
cuda::atomic_ref<word_type, thread_scope>{*(words_ + (idx * words_per_block + rank))};
atom_word.fetch_or(word, cuda::memory_order_relaxed);
constexpr auto num_threads = tile_size_v<CG>;
constexpr auto optimal_num_threads = add_optimal_cg_size();
constexpr auto worker_num_threads =
(num_threads < optimal_num_threads) ? num_threads : optimal_num_threads;

auto const num_keys = cuco::detail::distance(first, last);
if (num_keys == 0) { return; }

auto const rank = group.thread_rank();

// If single thread is optimal, use scalar add
if constexpr (worker_num_threads == 1) {
for (auto i = rank; i < num_keys; i += num_threads) {
typename std::iterator_traits<InputIt>::value_type const& insert_element{*(first + i)};
this->add(insert_element);
}
} else if constexpr (num_threads == worker_num_threads) { // given CG is optimal CG
typename policy_type::hash_result_type hash_value;
size_type block_index;

auto const group_iters = cuco::detail::int_div_ceil(num_keys, num_threads);
for (size_type i = 0; (i / num_threads) < group_iters; i += num_threads) {
if (i + rank < num_keys) {
typename std::iterator_traits<InputIt>::value_type const& insert_element{
*(first + i + rank)};
hash_value = policy_.hash(insert_element);
block_index = policy_.block_index(hash_value, num_blocks_);
}

for (uint32_t j = 0; (j < num_threads) and (i + j < num_keys); ++j) {
this->add_impl(group, group.shfl(hash_value, j), group.shfl(block_index, j));
}
}
} else { // subdivide given CG into multiple optimal CGs
typename policy_type::hash_result_type hash_value;
size_type block_index;

auto const worker_group = cg::tiled_partition<worker_num_threads>(group);
auto const worker_offset = worker_num_threads * worker_group.meta_group_rank();

auto const group_iters = cuco::detail::int_div_ceil(num_keys, num_threads);

for (size_type i = 0; (i / num_threads) < group_iters; i += num_threads) {
if (i + rank < num_keys) {
typename std::iterator_traits<InputIt>::value_type const& key{*(first + i + rank)};
hash_value = policy_.hash(key);
block_index = policy_.block_index(hash_value, num_blocks_);
}

for (uint32_t j = 0; (j < worker_num_threads) and (i + worker_offset + j < num_keys); ++j) {
this->add_impl(
worker_group, worker_group.shfl(hash_value, j), worker_group.shfl(block_index, j));
}
}
}
}

template <class CG, class HashValue, class BlockIndex>
__device__ void add_impl(CG const& group, HashValue const& hash_value, BlockIndex block_index)
{
constexpr auto num_threads = tile_size_v<CG>;

auto const rank = group.thread_rank();

if constexpr (num_threads == words_per_block) {
auto atom_word = cuda::atomic_ref<word_type, thread_scope>{
*(words_ + (block_index * words_per_block + rank))};
atom_word.fetch_or(policy_.word_pattern(hash_value, rank), cuda::memory_order_relaxed);
} else {
#pragma unroll
for (auto i = rank; i < words_per_block; i += num_threads) {
auto atom_word = cuda::atomic_ref<word_type, thread_scope>{
*(words_ + (block_index * words_per_block + i))};
atom_word.fetch_or(policy_.word_pattern(hash_value, i), cuda::memory_order_relaxed);
}
}
}
Expand All @@ -181,8 +270,17 @@ class bloom_filter_impl {
[*this] __device__(key_type const key) mutable { this->add(key); },
stream.get()));
} else {
auto const always_true = thrust::constant_iterator<bool>{true};
this->add_if_async(first, last, always_true, cuda::std::identity{}, stream);
auto const num_keys = cuco::detail::distance(first, last);
if (num_keys == 0) { return; }

auto constexpr cg_size = add_optimal_cg_size();
auto constexpr block_size = cuco::detail::default_block_size();
void const* kernel = reinterpret_cast<void const*>(
detail::bloom_filter_ns::add<cg_size, block_size, InputIt, bloom_filter_impl>);
auto const grid_size = cuco::detail::max_occupancy_grid_size(block_size, kernel);

detail::bloom_filter_ns::add<cg_size, block_size>
<<<grid_size, block_size, 0, stream.get()>>>(first, num_keys, *this);
}
}

Expand Down
11 changes: 10 additions & 1 deletion include/cuco/detail/bloom_filter/bloom_filter_ref.inl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,6 +72,15 @@ __device__ void bloom_filter_ref<Key, Extent, Scope, Policy>::add(CG const& grou
impl_.add(group, key);
}

template <class Key, class Extent, cuda::thread_scope Scope, class Policy>
template <class CG, class InputIt>
__device__ void bloom_filter_ref<Key, Extent, Scope, Policy>::add(CG const& group,
InputIt first,
InputIt last)
{
impl_.add(group, first, last);
}

template <class Key, class Extent, cuda::thread_scope Scope, class Policy>
template <class InputIt>
__host__ constexpr void bloom_filter_ref<Key, Extent, Scope, Policy>::add(InputIt first,
Expand Down
Loading
Loading