Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1d32cdc
Add CUDA streams to cudf-polars
TomAugspurger Oct 16, 2025
84d4fda
Remove stream singleton for duration_to_scalar
TomAugspurger Oct 17, 2025
61ad556
Remove get_stream_for_stats
TomAugspurger Oct 17, 2025
af6b7e0
Remove get_stream_for_conditional_join_predicate
TomAugspurger Oct 17, 2025
9bae4df
Merge remote-tracking branch 'upstream/branch-25.12' into tom/polars-…
TomAugspurger Oct 20, 2025
b63c00f
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 21, 2025
b3eaf7c
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 21, 2025
5b1e7ef
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 22, 2025
78839ac
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 23, 2025
b2d17e6
Synchronize when accessing cached scalars
TomAugspurger Oct 23, 2025
cc3bd58
Small fixes
TomAugspurger Oct 23, 2025
76569ec
add a note
TomAugspurger Oct 23, 2025
bbf37e9
Remove caching in Column.nan_count, obj_scalar
TomAugspurger Oct 23, 2025
80de395
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 24, 2025
617f450
Refactor ConditionalJoin.Predicate.ast
TomAugspurger Oct 24, 2025
4bce714
Defer offsets_to_windows computation
TomAugspurger Oct 24, 2025
9110f89
Revert "Defer offsets_to_windows computation"
TomAugspurger Oct 24, 2025
75dd586
Defer offsets_to_windows calls
TomAugspurger Oct 24, 2025
4610a1b
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 24, 2025
094795b
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 24, 2025
a7b12b0
Revert formatting changes
TomAugspurger Oct 24, 2025
064af86
Revert "Refactor ConditionalJoin.Predicate.ast"
TomAugspurger Oct 24, 2025
1cd5280
link issue
TomAugspurger Oct 24, 2025
316d6cf
fix missing stream
TomAugspurger Oct 24, 2025
ec90dde
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 24, 2025
c62e41d
Merge remote-tracking branch 'upstream/main' into tom/polars-cuda-str…
TomAugspurger Oct 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 61 additions & 32 deletions python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from __future__ import annotations

import functools
from typing import TYPE_CHECKING

import polars as pl
Expand All @@ -28,6 +27,8 @@
if TYPE_CHECKING:
from typing_extensions import Self

from rmm.pylibrmm.stream import Stream

from cudf_polars.typing import (
ColumnHeader,
ColumnOptions,
Expand Down Expand Up @@ -82,6 +83,8 @@ def __init__(
self.name = name
self.dtype = dtype
self.set_sorted(is_sorted=is_sorted, order=order, null_order=null_order)
self._nan_count: int | None = None
self._obj_scalar: plc.Scalar | None = None

@classmethod
def deserialize(
Expand Down Expand Up @@ -126,6 +129,7 @@ def deserialize_ctor_kwargs(

def serialize(
self,
stream: Stream,
) -> tuple[ColumnHeader, tuple[memoryview[bytes], plc.gpumemoryview]]:
"""
Serialize the Column into header and frames.
Expand All @@ -145,7 +149,7 @@ def serialize(
frames
Two-tuple of frames suitable for passing to `plc.contiguous_split.unpack_from_memoryviews`
"""
packed = plc.contiguous_split.pack(plc.Table([self.obj]))
packed = plc.contiguous_split.pack(plc.Table([self.obj]), stream=stream)
header: ColumnHeader = {
"column_kwargs": self.serialize_ctor_kwargs(),
"frame_count": 2,
Expand All @@ -162,8 +166,7 @@ def serialize_ctor_kwargs(self) -> ColumnOptions:
"dtype": pl.polars.dtype_str_repr(self.dtype.polars_type),
}

@functools.cached_property
def obj_scalar(self) -> plc.Scalar:
def obj_scalar(self, stream: Stream) -> plc.Scalar:
"""
A copy of the column object as a pylibcudf Scalar.

Expand All @@ -178,7 +181,9 @@ def obj_scalar(self) -> plc.Scalar:
"""
if not self.is_scalar:
raise ValueError(f"Cannot convert a column of length {self.size} to scalar")
return plc.copying.get_element(self.obj, 0)
if self._obj_scalar is None:
self._obj_scalar = plc.copying.get_element(self.obj, 0, stream=stream)
return self._obj_scalar

def rename(self, name: str | None, /) -> Self:
"""
Expand Down Expand Up @@ -228,6 +233,7 @@ def check_sorted(
*,
order: plc.types.Order,
null_order: plc.types.NullOrder,
stream: Stream,
) -> bool:
"""
Check if the column is sorted.
Expand All @@ -238,6 +244,9 @@ def check_sorted(
The requested sort order.
null_order
Where nulls sort to.
stream
CUDA stream used for device memory operations and kernel launches
on this dataframe. The data in ``self.obj`` must be valid on this stream.

Returns
-------
Expand All @@ -254,21 +263,26 @@ def check_sorted(
return self.order == order and (
self.null_count == 0 or self.null_order == null_order
)
if plc.sorting.is_sorted(plc.Table([self.obj]), [order], [null_order]):
if plc.sorting.is_sorted(
plc.Table([self.obj]), [order], [null_order], stream=stream
):
self.sorted = plc.types.Sorted.YES
self.order = order
self.null_order = null_order
return True
return False

def astype(self, dtype: DataType) -> Column:
def astype(self, dtype: DataType, stream: Stream) -> Column:
"""
Cast the column to as the requested dtype.

Parameters
----------
dtype
Datatype to cast to.
stream
CUDA stream used for device memory operations and kernel launches
on this dataframe. The data in ``self.obj`` must be valid on this stream.

Returns
-------
Expand All @@ -292,11 +306,15 @@ def astype(self, dtype: DataType) -> Column:
plc_dtype.id() == plc.TypeId.STRING
or self.obj.type().id() == plc.TypeId.STRING
):
return Column(self._handle_string_cast(plc_dtype), dtype=dtype)
return Column(
self._handle_string_cast(plc_dtype, stream=stream), dtype=dtype
)
elif plc.traits.is_integral_not_bool(
self.obj.type()
) and plc.traits.is_timestamp(plc_dtype):
upcasted = plc.unary.cast(self.obj, plc.DataType(plc.TypeId.INT64))
upcasted = plc.unary.cast(
self.obj, plc.DataType(plc.TypeId.INT64), stream=stream
)
plc_col = plc.column.Column(
plc_dtype,
upcasted.size(),
Expand All @@ -319,40 +337,44 @@ def astype(self, dtype: DataType) -> Column:
self.obj.offset(),
self.obj.children(),
)
return Column(plc.unary.cast(plc_col, plc_dtype), dtype=dtype).sorted_like(
self
)
return Column(
plc.unary.cast(plc_col, plc_dtype, stream=stream), dtype=dtype
).sorted_like(self)
else:
result = Column(plc.unary.cast(self.obj, plc_dtype), dtype=dtype)
result = Column(
plc.unary.cast(self.obj, plc_dtype, stream=stream), dtype=dtype
)
if is_order_preserving_cast(self.obj.type(), plc_dtype):
return result.sorted_like(self)
return result

def _handle_string_cast(self, dtype: plc.DataType) -> plc.Column:
def _handle_string_cast(self, dtype: plc.DataType, stream: Stream) -> plc.Column:
if dtype.id() == plc.TypeId.STRING:
if is_floating_point(self.obj.type()):
return from_floats(self.obj)
return from_floats(self.obj, stream=stream)
else:
return from_integers(self.obj)
return from_integers(self.obj, stream=stream)
else:
if is_floating_point(dtype):
floats = is_float(self.obj)
floats = is_float(self.obj, stream=stream)
if not plc.reduce.reduce(
floats,
plc.aggregation.all(),
plc.DataType(plc.TypeId.BOOL8),
stream=stream,
).to_py():
raise InvalidOperationError("Conversion from `str` failed.")
return to_floats(self.obj, dtype)
else:
integers = is_integer(self.obj)
integers = is_integer(self.obj, stream=stream)
if not plc.reduce.reduce(
integers,
plc.aggregation.all(),
plc.DataType(plc.TypeId.BOOL8),
stream=stream,
).to_py():
raise InvalidOperationError("Conversion from `str` failed.")
return to_integers(self.obj, dtype)
return to_integers(self.obj, dtype, stream=stream)

def copy_metadata(self, from_: pl.Series, /) -> Self:
"""
Expand Down Expand Up @@ -439,28 +461,31 @@ def copy(self) -> Self:
dtype=self.dtype,
)

def mask_nans(self) -> Self:
def mask_nans(self, stream: Stream) -> Self:
"""Return a shallow copy of self with nans masked out."""
if plc.traits.is_floating_point(self.obj.type()):
old_count = self.null_count
mask, new_count = plc.transform.nans_to_nulls(self.obj)
mask, new_count = plc.transform.nans_to_nulls(self.obj, stream=stream)
result = type(self)(self.obj.with_mask(mask, new_count), self.dtype)
if old_count == new_count:
return result.sorted_like(self)
return result
return self.copy()

@functools.cached_property
def nan_count(self) -> int:
def nan_count(self, stream: Stream) -> int:
"""Return the number of NaN values in the column."""
if self.size > 0 and plc.traits.is_floating_point(self.obj.type()):
# See https://github.com/rapidsai/cudf/issues/20202 for we type ignore
return plc.reduce.reduce(
plc.unary.is_nan(self.obj),
plc.aggregation.sum(),
plc.types.SIZE_TYPE,
).to_py() # type: ignore[return-value]
return 0
if self._nan_count is None:
if self.size > 0 and plc.traits.is_floating_point(self.obj.type()):
# See https://github.com/rapidsai/cudf/issues/20202 for we type ignore
self._nan_count = plc.reduce.reduce( # type: ignore[assignment]
plc.unary.is_nan(self.obj, stream),
plc.aggregation.sum(),
plc.types.SIZE_TYPE,
stream=stream,
).to_py()
else:
self._nan_count = 0
return self._nan_count # type: ignore[return-value]

@property
def size(self) -> int:
Expand All @@ -472,7 +497,7 @@ def null_count(self) -> int:
"""Return the number of Null values in the column."""
return self.obj.null_count()

def slice(self, zlice: Slice | None) -> Self:
def slice(self, zlice: Slice | None, stream: Stream) -> Self:
"""
Slice a column.

Expand All @@ -481,6 +506,9 @@ def slice(self, zlice: Slice | None) -> Self:
zlice
optional, tuple of start and length, negative values of start
treated as for python indexing. If not provided, returns self.
stream
CUDA stream used for device memory operations and kernel launches
on this dataframe. The data in ``self.obj`` must be valid on this stream.

Returns
-------
Expand All @@ -491,6 +519,7 @@ def slice(self, zlice: Slice | None) -> Self:
(table,) = plc.copying.slice(
plc.Table([self.obj]),
conversion.from_polars_slice(zlice, num_rows=self.size),
stream=stream,
)
(column,) = table.columns()
return type(self)(column, name=self.name, dtype=self.dtype).sorted_like(self)
9 changes: 8 additions & 1 deletion python/cudf_polars/cudf_polars/containers/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def deserialize(

def serialize(
self,
stream: Stream | None = None,
) -> tuple[DataFrameHeader, tuple[memoryview[bytes], plc.gpumemoryview]]:
"""
Serialize the table into header and frames.
Expand All @@ -264,14 +265,20 @@ def serialize(
>>> from cudf_polars.experimental.dask_serialize import register
>>> register()

Parameters
----------
stream
CUDA stream used for device memory operations and kernel launches
on this dataframe.

Returns
-------
header
A dict containing any picklable metadata required to reconstruct the object.
frames
Two-tuple of frames suitable for passing to `plc.contiguous_split.unpack_from_memoryviews`
"""
packed = plc.contiguous_split.pack(self.table, stream=self.stream)
packed = plc.contiguous_split.pack(self.table, stream=stream)

# Keyword arguments for `Column.__init__`.
columns_kwargs: list[ColumnOptions] = [
Expand Down
Loading