-
Notifications
You must be signed in to change notification settings - Fork 922
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extend and simplify API for calculation of range-based rolling window offsets #17807
base: branch-25.04
Are you sure you want to change the base?
Extend and simplify API for calculation of range-based rolling window offsets #17807
Conversation
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
In draft because waiting for #17787 |
This is brilliant. |
4fe1283
to
a1f27ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signposts
std::unique_ptr<table> grouped_range_rolling_window( | ||
table_view const& group_keys, | ||
column_view const& orderby, | ||
order order, | ||
null_order null_order, | ||
range_window_type preceding, | ||
range_window_type following, | ||
size_type min_periods, | ||
std::vector<std::pair<column_view const&, rolling_aggregation const&>> requests, | ||
rmm::cuda_stream_view stream = cudf::get_default_stream(), | ||
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't yet added tests of this function, want to bikeshed the interface first.
For example, should the min_periods
be part of the request, do we want a rolling_request
object similar to the groupby_request
we have for grouped aggregations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are now no longer required.
enum class window_tag : std::int8_t { | ||
BOUNDED_OPEN, ///< Window is bounded by a value-based endpoint, endpoint is excluded. | ||
BOUNDED_CLOSED, ///< Window is bounded by a value-based endpoint, endpoint is included. | ||
UNBOUNDED, ///< Window runs to beginning (or end) of the group the row is in. | ||
CURRENT_ROW, ///< Window contains all rows that compare equal to the current row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose I can actually use the public types and do constexpr dispatch via is_same_v
rather than comparison to a compile-time known enum tag.
[[nodiscard]] __device__ constexpr cuda::std:: | ||
tuple<size_type, size_type, size_type, size_type, size_type, size_type, size_type> | ||
row_info(size_type i) const noexcept | ||
{ | ||
if (nulls_at_start) { | ||
return {null_count, 0, num_rows, 0, null_count, null_count, num_rows}; | ||
} else { | ||
return {null_count, | ||
num_rows, | ||
null_count, | ||
num_rows - null_count, | ||
num_rows, | ||
0, | ||
num_rows - null_count}; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should go back and try again, but I had what seemed to be miscompilation issues if I exposed all of these values via individual function calls instead of returning everything all at once.
* Spark requires that orderby columns with floating point type have a | ||
* total order on floats where all NaNs compare equal to one-another, | ||
* and greater than any non-nan value. These structs implement that logic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mythrocks to confirm that I have this behaviour correct.
auto get_window_type = [](range_window_bounds const& bound) -> range_window_type { | ||
if (bound.is_unbounded()) { | ||
return unbounded{}; | ||
} else if (bound.is_current_row()) { | ||
return current_row{}; | ||
} else { | ||
return bounded_closed{bound.range_scalar()}; | ||
} | ||
}; | ||
auto [preceding_column, following_column] = | ||
make_range_windows(group_keys, | ||
order_by_column, | ||
order, | ||
get_window_type(preceding), | ||
get_window_type(following), | ||
stream, | ||
cudf::get_current_device_resource_ref()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Polyfill to migrate the old interface to the new one. I propose deprecating this API in favour of the one where one passes range_window_type
s and additionally the null_order
.
[[nodiscard]] null_order deduce_null_order(column_view const& orderby, | ||
order order, | ||
rmm::device_uvector<size_type> const& offsets, | ||
rmm::device_uvector<size_type> const& per_group_nulls, | ||
rmm::cuda_stream_view stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to polyfill the old interface to the new one.
{ | ||
auto d_orderby = column_device_view::create(orderby, stream); | ||
auto const num_groups = offsets.size() - 1; | ||
std::size_t bytes{0}; | ||
auto is_null_it = cudf::detail::make_counting_transform_iterator( | ||
cudf::size_type{0}, [orderby = *d_orderby] __device__(size_type i) -> size_type { | ||
return static_cast<size_type>(orderby.is_null_nocheck(i)); | ||
}); | ||
rmm::device_uvector<cudf::size_type> null_counts{num_groups, stream}; | ||
cub::DeviceSegmentedReduce::Sum(nullptr, | ||
bytes, | ||
is_null_it, | ||
null_counts.begin(), | ||
num_groups, | ||
offsets.begin(), | ||
offsets.begin() + 1, | ||
stream.value()); | ||
auto tmp = rmm::device_buffer(bytes, stream); | ||
cub::DeviceSegmentedReduce::Sum(tmp.data(), | ||
bytes, | ||
is_null_it, | ||
null_counts.begin(), | ||
num_groups, | ||
offsets.begin(), | ||
offsets.begin() + 1, | ||
stream.value()); | ||
return null_counts; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the major performance improvement in this PR. It makes it much faster to find the nulls per group in a grouped rolling window with low-cardinality group key and nulls in the orderby column (in bad cases the old code would take more than a second, this new code takes a few ms).
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> make_range_windows( | ||
table_view const& group_keys, | ||
column_view const& orderby, | ||
order order, | ||
range_window_type preceding, | ||
range_window_type following, | ||
rmm::cuda_stream_view stream, | ||
rmm::device_async_resource_ref mr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function goes away once the old APIs are deprecated.
auto deduced_null_order = [&]() { | ||
if (null_order.has_value()) { return null_order.value(); } | ||
if (!orderby.has_nulls()) { | ||
// Doesn't matter in this case | ||
return null_order::BEFORE; | ||
} | ||
return deduce_null_order(orderby, order, offsets, per_group_nulls, stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also goes away.
Description
In both cudf.pandas and cudf-polars we would like to be able to use the existing libcudf functionality for computing the
preceding
andfollowing
columns for range-based (grouped) rolling windows.The current functionality is designed with spark in mind and supports calculations with slightly different requirements compared to pandas and polars.
In this PR, we unify the construction of these offset column calculations to satisfy all uses.
Specifically:
BOUNDED_OPEN
window. This is a range-based window where the endpoint of the range is not included.The proposed interface for describing the requested window-type going forward is a strongly-typed one using
std::variant
. This removes the need for default-constructed scalars when specifyingUNBOUNDED
andCURRENT_ROW
windows.Performance improvements
Spark permits nulls in the
orderby
column. In the grouped-rolling case, these nulls must be sorted at one end of each group. The current implementation finds the partition point in each group usingthrust::for_each
over each group andthrust::partition_point
to find the break. For low-cardinality grouped rolling aggregations this can be very slow, since we have only a single thread searching over each group. We replace this with a segmented sum with CUB to find the per-group null count (and hence the break).The dispatched functor for computing the bound given a row value has constexpr dispatch for the common, and required for pandas and polars, case that the
orderby
column has no-nulls. This shaves some milliseconds.In the case that the
orderby
column does have nulls, we extend the interface such that the caller (who must have arranged that it is sorted correctly) tells us whether nulls were sortedBEFORE
orAFTER
. This avoids multiple kernel launches to deduce the null order, saving some milliseconds, and (in the grouped case) memory footprint. We polyfill this deduction so that the existing interface still works until we can deprecate and remove it.Guide for review
The core logic is implemented in
range_utils.cuh
, specifically therange_window_clamper
, dispatched to bymake_range_window
. To keep compile times under control, the template instantiation for computing preceding and following windows is moved into separate translation units: locally compilation takes ~10mins/TU. Things could be split out further if deemed necessary.Checklist