diff --git a/python/cudf_polars/cudf_polars/containers/column.py b/python/cudf_polars/cudf_polars/containers/column.py index e999d17e8fa..3af2de7ca7e 100644 --- a/python/cudf_polars/cudf_polars/containers/column.py +++ b/python/cudf_polars/cudf_polars/containers/column.py @@ -5,7 +5,6 @@ from __future__ import annotations -import functools from typing import TYPE_CHECKING import polars as pl @@ -28,6 +27,8 @@ if TYPE_CHECKING: from typing_extensions import Self + from rmm.pylibrmm.stream import Stream + from cudf_polars.typing import ( ColumnHeader, ColumnOptions, @@ -130,6 +131,7 @@ def deserialize_ctor_kwargs( def serialize( self, + stream: Stream, ) -> tuple[ColumnHeader, tuple[memoryview[bytes], plc.gpumemoryview]]: """ Serialize the Column into header and frames. @@ -149,7 +151,7 @@ def serialize( frames Two-tuple of frames suitable for passing to `plc.contiguous_split.unpack_from_memoryviews` """ - packed = plc.contiguous_split.pack(plc.Table([self.obj])) + packed = plc.contiguous_split.pack(plc.Table([self.obj]), stream=stream) header: ColumnHeader = { "column_kwargs": self.serialize_ctor_kwargs(), "frame_count": 2, @@ -175,11 +177,17 @@ def serialize_ctor_kwargs(self) -> ColumnOptions: "dtype": dtype, } - @functools.cached_property - def obj_scalar(self) -> plc.Scalar: + def obj_scalar(self, stream: Stream) -> plc.Scalar: """ A copy of the column object as a pylibcudf Scalar. + Parameters + ---------- + stream + CUDA stream used for device memory operations and kernel launches. + ``self.obj`` must be valid on this stream, and the result will be + valid on this stream. + Returns ------- pylibcudf Scalar object. @@ -191,7 +199,7 @@ def obj_scalar(self) -> plc.Scalar: """ if not self.is_scalar: raise ValueError(f"Cannot convert a column of length {self.size} to scalar") - return plc.copying.get_element(self.obj, 0) + return plc.copying.get_element(self.obj, 0, stream=stream) def rename(self, name: str | None, /) -> Self: """ @@ -241,6 +249,7 @@ def check_sorted( *, order: plc.types.Order, null_order: plc.types.NullOrder, + stream: Stream, ) -> bool: """ Check if the column is sorted. @@ -251,6 +260,9 @@ def check_sorted( The requested sort order. null_order Where nulls sort to. + stream + CUDA stream used for device memory operations and kernel launches + on this Column. The data in ``self.obj`` must be valid on this stream. Returns ------- @@ -267,14 +279,16 @@ def check_sorted( return self.order == order and ( self.null_count == 0 or self.null_order == null_order ) - if plc.sorting.is_sorted(plc.Table([self.obj]), [order], [null_order]): + if plc.sorting.is_sorted( + plc.Table([self.obj]), [order], [null_order], stream=stream + ): self.sorted = plc.types.Sorted.YES self.order = order self.null_order = null_order return True return False - def astype(self, dtype: DataType) -> Column: + def astype(self, dtype: DataType, stream: Stream) -> Column: """ Cast the column to as the requested dtype. @@ -282,6 +296,9 @@ def astype(self, dtype: DataType) -> Column: ---------- dtype Datatype to cast to. + stream + CUDA stream used for device memory operations and kernel launches + on this Column. The data in ``self.obj`` must be valid on this stream. Returns ------- @@ -305,11 +322,15 @@ def astype(self, dtype: DataType) -> Column: plc_dtype.id() == plc.TypeId.STRING or self.obj.type().id() == plc.TypeId.STRING ): - return Column(self._handle_string_cast(plc_dtype), dtype=dtype) + return Column( + self._handle_string_cast(plc_dtype, stream=stream), dtype=dtype + ) elif plc.traits.is_integral_not_bool( self.obj.type() ) and plc.traits.is_timestamp(plc_dtype): - upcasted = plc.unary.cast(self.obj, plc.DataType(plc.TypeId.INT64)) + upcasted = plc.unary.cast( + self.obj, plc.DataType(plc.TypeId.INT64), stream=stream + ) plc_col = plc.column.Column( plc_dtype, upcasted.size(), @@ -332,40 +353,44 @@ def astype(self, dtype: DataType) -> Column: self.obj.offset(), self.obj.children(), ) - return Column(plc.unary.cast(plc_col, plc_dtype), dtype=dtype).sorted_like( - self - ) + return Column( + plc.unary.cast(plc_col, plc_dtype, stream=stream), dtype=dtype + ).sorted_like(self) else: - result = Column(plc.unary.cast(self.obj, plc_dtype), dtype=dtype) + result = Column( + plc.unary.cast(self.obj, plc_dtype, stream=stream), dtype=dtype + ) if is_order_preserving_cast(self.obj.type(), plc_dtype): return result.sorted_like(self) return result - def _handle_string_cast(self, dtype: plc.DataType) -> plc.Column: + def _handle_string_cast(self, dtype: plc.DataType, stream: Stream) -> plc.Column: if dtype.id() == plc.TypeId.STRING: if is_floating_point(self.obj.type()): - return from_floats(self.obj) + return from_floats(self.obj, stream=stream) else: - return from_integers(self.obj) + return from_integers(self.obj, stream=stream) else: if is_floating_point(dtype): - floats = is_float(self.obj) + floats = is_float(self.obj, stream=stream) if not plc.reduce.reduce( floats, plc.aggregation.all(), plc.DataType(plc.TypeId.BOOL8), + stream=stream, ).to_py(): raise InvalidOperationError("Conversion from `str` failed.") return to_floats(self.obj, dtype) else: - integers = is_integer(self.obj) + integers = is_integer(self.obj, stream=stream) if not plc.reduce.reduce( integers, plc.aggregation.all(), plc.DataType(plc.TypeId.BOOL8), + stream=stream, ).to_py(): raise InvalidOperationError("Conversion from `str` failed.") - return to_integers(self.obj, dtype) + return to_integers(self.obj, dtype, stream=stream) def copy_metadata(self, from_: pl.Series, /) -> Self: """ @@ -452,28 +477,44 @@ def copy(self) -> Self: dtype=self.dtype, ) - def mask_nans(self) -> Self: + def mask_nans(self, stream: Stream) -> Self: """Return a shallow copy of self with nans masked out.""" if plc.traits.is_floating_point(self.obj.type()): old_count = self.null_count - mask, new_count = plc.transform.nans_to_nulls(self.obj) + mask, new_count = plc.transform.nans_to_nulls(self.obj, stream=stream) result = type(self)(self.obj.with_mask(mask, new_count), self.dtype) if old_count == new_count: return result.sorted_like(self) return result return self.copy() - @functools.cached_property - def nan_count(self) -> int: - """Return the number of NaN values in the column.""" + def nan_count(self, stream: Stream) -> int: + """ + Return the number of NaN values in the column. + + Parameters + ---------- + stream + CUDA stream used for device memory operations and kernel launches. + ``self.obj`` must be valid on this stream, and the result will be + valid on this stream. + + Returns + ------- + Number of NaN values in the column. + """ + result: int if self.size > 0 and plc.traits.is_floating_point(self.obj.type()): # See https://github.com/rapidsai/cudf/issues/20202 for we type ignore - return plc.reduce.reduce( - plc.unary.is_nan(self.obj), + result = plc.reduce.reduce( # type: ignore[assignment] + plc.unary.is_nan(self.obj, stream=stream), plc.aggregation.sum(), plc.types.SIZE_TYPE, - ).to_py() # type: ignore[return-value] - return 0 + stream=stream, + ).to_py() + else: + result = 0 + return result @property def size(self) -> int: @@ -485,7 +526,7 @@ def null_count(self) -> int: """Return the number of Null values in the column.""" return self.obj.null_count() - def slice(self, zlice: Slice | None) -> Self: + def slice(self, zlice: Slice | None, stream: Stream) -> Self: """ Slice a column. @@ -494,6 +535,9 @@ def slice(self, zlice: Slice | None) -> Self: zlice optional, tuple of start and length, negative values of start treated as for python indexing. If not provided, returns self. + stream + CUDA stream used for device memory operations and kernel launches + on this Column. The data in ``self.obj`` must be valid on this stream. Returns ------- @@ -504,6 +548,7 @@ def slice(self, zlice: Slice | None) -> Self: (table,) = plc.copying.slice( plc.Table([self.obj]), conversion.from_polars_slice(zlice, num_rows=self.size), + stream=stream, ) (column,) = table.columns() return type(self)(column, name=self.name, dtype=self.dtype).sorted_like(self) diff --git a/python/cudf_polars/cudf_polars/containers/dataframe.py b/python/cudf_polars/cudf_polars/containers/dataframe.py index 2b950854ebd..89b5c05973e 100644 --- a/python/cudf_polars/cudf_polars/containers/dataframe.py +++ b/python/cudf_polars/cudf_polars/containers/dataframe.py @@ -260,6 +260,7 @@ def deserialize( def serialize( self, + stream: Stream | None = None, ) -> tuple[DataFrameHeader, tuple[memoryview[bytes], plc.gpumemoryview]]: """ Serialize the table into header and frames. @@ -272,6 +273,12 @@ def serialize( >>> from cudf_polars.experimental.dask_serialize import register >>> register() + Parameters + ---------- + stream + CUDA stream used for device memory operations and kernel launches + on this dataframe. + Returns ------- header @@ -279,7 +286,7 @@ def serialize( frames Two-tuple of frames suitable for passing to `plc.contiguous_split.unpack_from_memoryviews` """ - packed = plc.contiguous_split.pack(self.table, stream=self.stream) + packed = plc.contiguous_split.pack(self.table, stream=stream) # Keyword arguments for `Column.__init__`. columns_kwargs: list[ColumnOptions] = [ diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/aggregation.py b/python/cudf_polars/cudf_polars/dsl/expressions/aggregation.py index bc51058e5cb..99ac3ece406 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/aggregation.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/aggregation.py @@ -16,6 +16,8 @@ from cudf_polars.dsl.expressions.literal import Literal if TYPE_CHECKING: + from rmm.pylibrmm.stream import Stream + from cudf_polars.containers import DataFrame, DataType __all__ = ["Agg"] @@ -152,82 +154,100 @@ def agg_request(self) -> plc.aggregation.Aggregation: # noqa: D102 return self.request def _reduce( - self, column: Column, *, request: plc.aggregation.Aggregation + self, column: Column, *, request: plc.aggregation.Aggregation, stream: Stream ) -> Column: if ( self.name in {"mean", "median"} and plc.traits.is_fixed_point(column.dtype.plc_type) and self.dtype.plc_type.id() in {plc.TypeId.FLOAT32, plc.TypeId.FLOAT64} ): - column = column.astype(self.dtype) + column = column.astype(self.dtype, stream=stream) return Column( plc.Column.from_scalar( - plc.reduce.reduce(column.obj, request, self.dtype.plc_type), 1 + plc.reduce.reduce( + column.obj, request, self.dtype.plc_type, stream=stream + ), + 1, + stream=stream, ), name=column.name, dtype=self.dtype, ) - def _count(self, column: Column, *, include_nulls: bool) -> Column: + def _count(self, column: Column, *, include_nulls: bool, stream: Stream) -> Column: null_count = column.null_count if not include_nulls else 0 return Column( plc.Column.from_scalar( - plc.Scalar.from_py(column.size - null_count, self.dtype.plc_type), + plc.Scalar.from_py( + column.size - null_count, self.dtype.plc_type, stream=stream + ), 1, + stream=stream, ), name=column.name, dtype=self.dtype, ) - def _sum(self, column: Column) -> Column: + def _sum(self, column: Column, stream: Stream) -> Column: if column.size == 0 or column.null_count == column.size: return Column( plc.Column.from_scalar( - plc.Scalar.from_py(0, self.dtype.plc_type), + plc.Scalar.from_py(0, self.dtype.plc_type, stream=stream), 1, + stream=stream, ), name=column.name, dtype=self.dtype, ) - return self._reduce(column, request=plc.aggregation.sum()) + return self._reduce(column, request=plc.aggregation.sum(), stream=stream) - def _min(self, column: Column, *, propagate_nans: bool) -> Column: - if propagate_nans and column.nan_count > 0: + def _min(self, column: Column, *, propagate_nans: bool, stream: Stream) -> Column: + nan_count = column.nan_count(stream=stream) + if propagate_nans and nan_count > 0: return Column( plc.Column.from_scalar( - plc.Scalar.from_py(float("nan"), self.dtype.plc_type), + plc.Scalar.from_py( + float("nan"), self.dtype.plc_type, stream=stream + ), 1, + stream=stream, ), name=column.name, dtype=self.dtype, ) - if column.nan_count > 0: - column = column.mask_nans() - return self._reduce(column, request=plc.aggregation.min()) + if nan_count > 0: + column = column.mask_nans(stream=stream) + return self._reduce(column, request=plc.aggregation.min(), stream=stream) - def _max(self, column: Column, *, propagate_nans: bool) -> Column: - if propagate_nans and column.nan_count > 0: + def _max(self, column: Column, *, propagate_nans: bool, stream: Stream) -> Column: + nan_count = column.nan_count(stream=stream) + if propagate_nans and nan_count > 0: return Column( plc.Column.from_scalar( - plc.Scalar.from_py(float("nan"), self.dtype.plc_type), + plc.Scalar.from_py( + float("nan"), self.dtype.plc_type, stream=stream + ), 1, + stream=stream, ), name=column.name, dtype=self.dtype, ) - if column.nan_count > 0: - column = column.mask_nans() - return self._reduce(column, request=plc.aggregation.max()) + if nan_count > 0: + column = column.mask_nans(stream=stream) + return self._reduce(column, request=plc.aggregation.max(), stream=stream) - def _first(self, column: Column) -> Column: + def _first(self, column: Column, stream: Stream) -> Column: return Column( - plc.copying.slice(column.obj, [0, 1])[0], name=column.name, dtype=self.dtype + plc.copying.slice(column.obj, [0, 1], stream=stream)[0], + name=column.name, + dtype=self.dtype, ) - def _last(self, column: Column) -> Column: + def _last(self, column: Column, stream: Stream) -> Column: n = column.size return Column( - plc.copying.slice(column.obj, [n - 1, n])[0], + plc.copying.slice(column.obj, [n - 1, n], stream=stream)[0], name=column.name, dtype=self.dtype, ) @@ -244,4 +264,4 @@ def do_evaluate( # Aggregations like quantiles may have additional children that were # preprocessed into pylibcudf requests. child = self.children[0] - return self.op(child.evaluate(df, context=context)) + return self.op(child.evaluate(df, context=context), stream=df.stream) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py b/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py index 86a0823f2aa..3a4ab017218 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py @@ -91,16 +91,21 @@ def do_evaluate( rop: plc.Column | plc.Scalar = right.obj if left.size != right.size: if left.is_scalar: - lop = left.obj_scalar + lop = left.obj_scalar(stream=df.stream) elif right.is_scalar: - rop = right.obj_scalar + rop = right.obj_scalar(stream=df.stream) if plc.traits.is_integral_not_bool(self.dtype.plc_type) and self.op in { plc.binaryop.BinaryOperator.FLOOR_DIV, plc.binaryop.BinaryOperator.PYMOD, }: - if right.obj.size() == 1 and right.obj.to_scalar().to_py() == 0: + if ( + right.obj.size() == 1 + and right.obj.to_scalar(stream=df.stream).to_py() == 0 + ): return Column( - plc.Column.all_null_like(left.obj, left.obj.size()), + plc.Column.all_null_like( + left.obj, left.obj.size(), stream=df.stream + ), dtype=self.dtype, ) @@ -108,13 +113,24 @@ def do_evaluate( rop = plc.replace.find_and_replace_all( right.obj, plc.Column.from_scalar( - plc.Scalar.from_py(0, dtype=self.dtype.plc_type), 1 + plc.Scalar.from_py( + 0, dtype=self.dtype.plc_type, stream=df.stream + ), + 1, + stream=df.stream, ), plc.Column.from_scalar( - plc.Scalar.from_py(None, dtype=self.dtype.plc_type), 1 + plc.Scalar.from_py( + None, dtype=self.dtype.plc_type, stream=df.stream + ), + 1, + stream=df.stream, ), + stream=df.stream, ) return Column( - plc.binaryop.binary_operation(lop, rop, self.op, self.dtype.plc_type), + plc.binaryop.binary_operation( + lop, rop, self.op, self.dtype.plc_type, stream=df.stream + ), dtype=self.dtype, ) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/boolean.py b/python/cudf_polars/cudf_polars/dsl/expressions/boolean.py index 9aabed45b02..c309c6f9db3 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/boolean.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/boolean.py @@ -24,6 +24,8 @@ import polars.type_aliases as pl_types from polars.polars import _expr_nodes as pl_expr + from rmm.pylibrmm.stream import Stream + from cudf_polars.containers import DataFrame __all__ = ["BooleanFunction"] @@ -101,6 +103,7 @@ def _distinct( keep: plc.stream_compaction.DuplicateKeepOption, source_value: plc.Scalar, target_value: plc.Scalar, + stream: Stream, ) -> Column: table = plc.Table([column.obj]) indices = plc.stream_compaction.distinct_indices( @@ -109,12 +112,20 @@ def _distinct( # TODO: polars doesn't expose options for these plc.types.NullEquality.EQUAL, plc.types.NanEquality.ALL_EQUAL, + stream=stream, ) return Column( plc.copying.scatter( [source_value], indices, - plc.Table([plc.Column.from_scalar(target_value, table.num_rows())]), + plc.Table( + [ + plc.Column.from_scalar( + target_value, table.num_rows(), stream=stream + ) + ] + ), + stream=stream, ).columns()[0], dtype=dtype, ) @@ -161,7 +172,9 @@ def do_evaluate( is_finite = self.name is BooleanFunction.Name.IsFinite if not is_float: base = plc.Column.from_scalar( - plc.Scalar.from_py(py_val=is_finite), values.size + plc.Scalar.from_py(py_val=is_finite, stream=df.stream), + values.size, + stream=df.stream, ) out = base.with_mask(values.obj.null_mask(), values.null_count) return Column(out, dtype=self.dtype) @@ -172,10 +185,13 @@ def do_evaluate( nonfinite_values = plc.Column.from_iterable_of_py( to_search, dtype=values.obj.type(), + stream=df.stream, ) - result = plc.search.contains(nonfinite_values, values.obj) + result = plc.search.contains(nonfinite_values, values.obj, stream=df.stream) if is_finite: - result = plc.unary.unary_operation(result, plc.unary.UnaryOperator.NOT) + result = plc.unary.unary_operation( + result, plc.unary.UnaryOperator.NOT, stream=df.stream + ) return Column( result.with_mask(values.obj.null_mask(), values.null_count), dtype=self.dtype, @@ -188,7 +204,9 @@ def do_evaluate( (column,) = columns is_any = self.name is BooleanFunction.Name.Any agg = plc.aggregation.any() if is_any else plc.aggregation.all() - scalar_result = plc.reduce.reduce(column.obj, agg, self.dtype.plc_type) + scalar_result = plc.reduce.reduce( + column.obj, agg, self.dtype.plc_type, stream=df.stream + ) if not ignore_nulls and column.null_count > 0: # Truth tables # Any All @@ -205,15 +223,23 @@ def do_evaluate( # Any All # False || Null => Null True && Null => Null return Column( - plc.Column.all_null_like(column.obj, 1), dtype=self.dtype + plc.Column.all_null_like(column.obj, 1, stream=df.stream), + dtype=self.dtype, ) - return Column(plc.Column.from_scalar(scalar_result, 1), dtype=self.dtype) + return Column( + plc.Column.from_scalar(scalar_result, 1, stream=df.stream), + dtype=self.dtype, + ) if self.name is BooleanFunction.Name.IsNull: (column,) = columns - return Column(plc.unary.is_null(column.obj), dtype=self.dtype) + return Column( + plc.unary.is_null(column.obj, stream=df.stream), dtype=self.dtype + ) elif self.name is BooleanFunction.Name.IsNotNull: (column,) = columns - return Column(plc.unary.is_valid(column.obj), dtype=self.dtype) + return Column( + plc.unary.is_valid(column.obj, stream=df.stream), dtype=self.dtype + ) elif self.name in (BooleanFunction.Name.IsNan, BooleanFunction.Name.IsNotNan): (column,) = columns is_float = column.obj.type().id() in ( @@ -230,9 +256,11 @@ def do_evaluate( else: base = plc.Column.from_scalar( plc.Scalar.from_py( - py_val=self.name is not BooleanFunction.Name.IsNan + py_val=self.name is not BooleanFunction.Name.IsNan, + stream=df.stream, ), column.size, + stream=df.stream, ) out = base.with_mask(column.obj.null_mask(), column.null_count) return Column(out, dtype=self.dtype) @@ -242,10 +270,13 @@ def do_evaluate( column, dtype=self.dtype, keep=plc.stream_compaction.DuplicateKeepOption.KEEP_FIRST, - source_value=plc.Scalar.from_py(py_val=True, dtype=self.dtype.plc_type), + source_value=plc.Scalar.from_py( + py_val=True, dtype=self.dtype.plc_type, stream=df.stream + ), target_value=plc.Scalar.from_py( - py_val=False, dtype=self.dtype.plc_type + py_val=False, dtype=self.dtype.plc_type, stream=df.stream ), + stream=df.stream, ) elif self.name is BooleanFunction.Name.IsLastDistinct: (column,) = columns @@ -253,10 +284,15 @@ def do_evaluate( column, dtype=self.dtype, keep=plc.stream_compaction.DuplicateKeepOption.KEEP_LAST, - source_value=plc.Scalar.from_py(py_val=True, dtype=self.dtype.plc_type), + source_value=plc.Scalar.from_py( + py_val=True, dtype=self.dtype.plc_type, stream=df.stream + ), target_value=plc.Scalar.from_py( - py_val=False, dtype=self.dtype.plc_type + py_val=False, + dtype=self.dtype.plc_type, + stream=df.stream, ), + stream=df.stream, ) elif self.name is BooleanFunction.Name.IsUnique: (column,) = columns @@ -264,10 +300,13 @@ def do_evaluate( column, dtype=self.dtype, keep=plc.stream_compaction.DuplicateKeepOption.KEEP_NONE, - source_value=plc.Scalar.from_py(py_val=True, dtype=self.dtype.plc_type), + source_value=plc.Scalar.from_py( + py_val=True, dtype=self.dtype.plc_type, stream=df.stream + ), target_value=plc.Scalar.from_py( - py_val=False, dtype=self.dtype.plc_type + py_val=False, dtype=self.dtype.plc_type, stream=df.stream ), + stream=df.stream, ) elif self.name is BooleanFunction.Name.IsDuplicated: (column,) = columns @@ -276,9 +315,12 @@ def do_evaluate( dtype=self.dtype, keep=plc.stream_compaction.DuplicateKeepOption.KEEP_NONE, source_value=plc.Scalar.from_py( - py_val=False, dtype=self.dtype.plc_type + py_val=False, dtype=self.dtype.plc_type, stream=df.stream + ), + target_value=plc.Scalar.from_py( + py_val=True, dtype=self.dtype.plc_type, stream=df.stream ), - target_value=plc.Scalar.from_py(py_val=True, dtype=self.dtype.plc_type), + stream=df.stream, ) elif self.name is BooleanFunction.Name.AllHorizontal: return Column( @@ -311,23 +353,30 @@ def do_evaluate( haystack = Column( haystack.obj.children()[1], dtype=DataType(haystack.dtype.polars_type.inner), - ).astype(needles.dtype) + ).astype(needles.dtype, stream=df.stream) if haystack.size: return Column( plc.search.contains( haystack.obj, needles.obj, + stream=df.stream, ), dtype=self.dtype, ) return Column( - plc.Column.from_scalar(plc.Scalar.from_py(py_val=False), needles.size), + plc.Column.from_scalar( + plc.Scalar.from_py(py_val=False, stream=df.stream), + needles.size, + stream=df.stream, + ), dtype=self.dtype, ) elif self.name is BooleanFunction.Name.Not: (column,) = columns return Column( - plc.unary.unary_operation(column.obj, plc.unary.UnaryOperator.NOT), + plc.unary.unary_operation( + column.obj, plc.unary.UnaryOperator.NOT, stream=df.stream + ), dtype=self.dtype, ) else: diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/datetime.py b/python/cudf_polars/cudf_polars/dsl/expressions/datetime.py index b36ab7da033..098e8f21673 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/datetime.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/datetime.py @@ -141,14 +141,18 @@ def do_evaluate( (column,) = columns if self.name is TemporalFunction.Name.CastTimeUnit: return Column( - plc.unary.cast(column.obj, self.dtype.plc_type), dtype=self.dtype + plc.unary.cast(column.obj, self.dtype.plc_type, stream=df.stream), + dtype=self.dtype, ) if self.name == TemporalFunction.Name.ToString: return Column( plc.strings.convert.convert_datetime.from_timestamps( column.obj, self.options[0], - plc.Column.from_iterable_of_py([], dtype=self.dtype.plc_type), + plc.Column.from_iterable_of_py( + [], dtype=self.dtype.plc_type, stream=df.stream + ), + stream=df.stream, ), dtype=self.dtype, ) @@ -158,10 +162,12 @@ def do_evaluate( column.obj, format="%V", input_strings_names=plc.Column.from_iterable_of_py( - [], dtype=plc.DataType(plc.TypeId.STRING) + [], dtype=plc.DataType(plc.TypeId.STRING), stream=df.stream ), + stream=df.stream, ), self.dtype.plc_type, + stream=df.stream, ) return Column(result, dtype=self.dtype) if self.name is TemporalFunction.Name.IsoYear: @@ -170,97 +176,117 @@ def do_evaluate( column.obj, format="%G", input_strings_names=plc.Column.from_iterable_of_py( - [], dtype=plc.DataType(plc.TypeId.STRING) + [], dtype=plc.DataType(plc.TypeId.STRING), stream=df.stream ), + stream=df.stream, ), self.dtype.plc_type, + stream=df.stream, ) return Column(result, dtype=self.dtype) if self.name is TemporalFunction.Name.MonthStart: - ends = plc.datetime.last_day_of_month(column.obj) - days_to_subtract = plc.datetime.days_in_month(column.obj) + ends = plc.datetime.last_day_of_month(column.obj, stream=df.stream) + days_to_subtract = plc.datetime.days_in_month(column.obj, stream=df.stream) # must subtract 1 to avoid rolling over to the previous month days_to_subtract = plc.binaryop.binary_operation( days_to_subtract, - plc.Scalar.from_py(1, plc.DataType(plc.TypeId.INT32)), + plc.Scalar.from_py(1, plc.DataType(plc.TypeId.INT32), stream=df.stream), plc.binaryop.BinaryOperator.SUB, plc.DataType(plc.TypeId.DURATION_DAYS), + stream=df.stream, ) result = plc.binaryop.binary_operation( ends, days_to_subtract, plc.binaryop.BinaryOperator.SUB, self.dtype.plc_type, + stream=df.stream, ) return Column(result, dtype=self.dtype) if self.name is TemporalFunction.Name.MonthEnd: return Column( plc.unary.cast( - plc.datetime.last_day_of_month(column.obj), self.dtype.plc_type + plc.datetime.last_day_of_month(column.obj, stream=df.stream), + self.dtype.plc_type, + stream=df.stream, ), dtype=self.dtype, ) if self.name is TemporalFunction.Name.IsLeapYear: return Column( - plc.datetime.is_leap_year(column.obj), + plc.datetime.is_leap_year(column.obj, stream=df.stream), dtype=self.dtype, ) if self.name is TemporalFunction.Name.OrdinalDay: - return Column(plc.datetime.day_of_year(column.obj), dtype=self.dtype) + return Column( + plc.datetime.day_of_year(column.obj, stream=df.stream), dtype=self.dtype + ) if self.name is TemporalFunction.Name.Microsecond: millis = plc.datetime.extract_datetime_component( - column.obj, plc.datetime.DatetimeComponent.MILLISECOND + column.obj, plc.datetime.DatetimeComponent.MILLISECOND, stream=df.stream ) micros = plc.datetime.extract_datetime_component( - column.obj, plc.datetime.DatetimeComponent.MICROSECOND + column.obj, plc.datetime.DatetimeComponent.MICROSECOND, stream=df.stream ) millis_as_micros = plc.binaryop.binary_operation( millis, - plc.Scalar.from_py(1_000, plc.DataType(plc.TypeId.INT32)), + plc.Scalar.from_py( + 1_000, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), plc.binaryop.BinaryOperator.MUL, self.dtype.plc_type, + stream=df.stream, ) total_micros = plc.binaryop.binary_operation( micros, millis_as_micros, plc.binaryop.BinaryOperator.ADD, self.dtype.plc_type, + stream=df.stream, ) return Column(total_micros, dtype=self.dtype) elif self.name is TemporalFunction.Name.Nanosecond: millis = plc.datetime.extract_datetime_component( - column.obj, plc.datetime.DatetimeComponent.MILLISECOND + column.obj, plc.datetime.DatetimeComponent.MILLISECOND, stream=df.stream ) micros = plc.datetime.extract_datetime_component( - column.obj, plc.datetime.DatetimeComponent.MICROSECOND + column.obj, plc.datetime.DatetimeComponent.MICROSECOND, stream=df.stream ) nanos = plc.datetime.extract_datetime_component( - column.obj, plc.datetime.DatetimeComponent.NANOSECOND + column.obj, plc.datetime.DatetimeComponent.NANOSECOND, stream=df.stream ) millis_as_nanos = plc.binaryop.binary_operation( millis, - plc.Scalar.from_py(1_000_000, plc.DataType(plc.TypeId.INT32)), + plc.Scalar.from_py( + 1_000_000, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), plc.binaryop.BinaryOperator.MUL, self.dtype.plc_type, + stream=df.stream, ) micros_as_nanos = plc.binaryop.binary_operation( micros, - plc.Scalar.from_py(1_000, plc.DataType(plc.TypeId.INT32)), + plc.Scalar.from_py( + 1_000, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), plc.binaryop.BinaryOperator.MUL, self.dtype.plc_type, + stream=df.stream, ) total_nanos = plc.binaryop.binary_operation( nanos, millis_as_nanos, plc.binaryop.BinaryOperator.ADD, self.dtype.plc_type, + stream=df.stream, ) total_nanos = plc.binaryop.binary_operation( total_nanos, micros_as_nanos, plc.binaryop.BinaryOperator.ADD, self.dtype.plc_type, + stream=df.stream, ) return Column(total_nanos, dtype=self.dtype) @@ -268,6 +294,7 @@ def do_evaluate( plc.datetime.extract_datetime_component( column.obj, self._COMPONENT_MAP[self.name], + stream=df.stream, ), dtype=self.dtype, ) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/literal.py b/python/cudf_polars/cudf_polars/dsl/expressions/literal.py index 315905bbb7d..20a18f0f646 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/literal.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/literal.py @@ -44,7 +44,9 @@ def do_evaluate( """Evaluate this expression given a dataframe for context.""" return Column( plc.Column.from_scalar( - plc.Scalar.from_py(self.value, self.dtype.plc_type), 1 + plc.Scalar.from_py(self.value, self.dtype.plc_type, stream=df.stream), + 1, + stream=df.stream, ), dtype=self.dtype, ) @@ -90,7 +92,9 @@ def do_evaluate( self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME ) -> Column: """Evaluate this expression given a dataframe for context.""" - return Column(plc.Column.from_arrow(self.value), dtype=self.dtype) + return Column( + plc.Column.from_arrow(self.value, stream=df.stream), dtype=self.dtype + ) @property def agg_request(self) -> NoReturn: # noqa: D102 diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/rolling.py b/python/cudf_polars/cudf_polars/dsl/expressions/rolling.py index bdff3470791..a60031e999f 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/rolling.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/rolling.py @@ -17,11 +17,17 @@ from cudf_polars.dsl import expr from cudf_polars.dsl.expressions.base import ExecutionContext, Expr from cudf_polars.dsl.utils.reshape import broadcast -from cudf_polars.dsl.utils.windows import offsets_to_windows, range_window_bounds +from cudf_polars.dsl.utils.windows import ( + duration_to_int, + offsets_to_windows, + range_window_bounds, +) if TYPE_CHECKING: from collections.abc import Sequence + from rmm.pylibrmm.stream import Stream + from cudf_polars.typing import ClosedInterval, Duration __all__ = ["GroupedRollingWindow", "RollingWindow", "to_request"] @@ -89,12 +95,12 @@ def to_request( class RollingWindow(Expr): __slots__ = ( "closed_window", - "following", + "following_ordinal", "offset", "orderby", "orderby_dtype", "period", - "preceding", + "preceding_ordinal", ) _non_child = ( "dtype", @@ -123,9 +129,11 @@ def __init__( # within `__init__`). self.offset = offset self.period = period - self.preceding, self.following = offsets_to_windows( - orderby_dtype, offset, period - ) + self.orderby_dtype = orderby_dtype + self.offset = offset + self.period = period + self.preceding_ordinal = duration_to_int(orderby_dtype, *offset) + self.following_ordinal = duration_to_int(orderby_dtype, *period) self.closed_window = closed_window self.orderby = orderby self.children = (agg,) @@ -153,18 +161,28 @@ def do_evaluate( # noqa: D102 plc.traits.is_integral(orderby.obj.type()) and orderby.obj.type().id() != plc.TypeId.INT64 ): - orderby_obj = plc.unary.cast(orderby.obj, plc.DataType(plc.TypeId.INT64)) + orderby_obj = plc.unary.cast( + orderby.obj, plc.DataType(plc.TypeId.INT64), stream=df.stream + ) else: orderby_obj = orderby.obj + preceding_scalar, following_scalar = offsets_to_windows( + self.orderby_dtype, + self.preceding_ordinal, + self.following_ordinal, + stream=df.stream, + ) preceding, following = range_window_bounds( - self.preceding, self.following, self.closed_window + preceding_scalar, following_scalar, self.closed_window ) if orderby.obj.null_count() != 0: raise RuntimeError( f"Index column '{self.orderby}' in rolling may not contain nulls" ) if not orderby.check_sorted( - order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.BEFORE + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, + stream=df.stream, ): raise RuntimeError( f"Index column '{self.orderby}' in rolling is not sorted, please sort first" @@ -177,6 +195,7 @@ def do_evaluate( # noqa: D102 preceding, following, [to_request(agg, orderby, df)], + stream=df.stream, ).columns() return Column(result, dtype=self.dtype) @@ -306,6 +325,7 @@ def _( plc.Table([val_col]), order_index, plc.copying.OutOfBoundsPolicy.NULLIFY, + stream=df.stream, ).columns()[0] assert isinstance(rank_expr, expr.UnaryFunction) method_str, descending, _ = rank_expr.options @@ -364,6 +384,7 @@ def _( # type: ignore[no-untyped-def] plc.Table(plc_cols), op.order_index, plc.copying.OutOfBoundsPolicy.NULLIFY, + stream=df.stream, ) else: vals_tbl = plc.Table(plc_cols) @@ -404,6 +425,7 @@ def _( # type: ignore[no-untyped-def] plc.Table(plc_cols), order_index, plc.copying.OutOfBoundsPolicy.NULLIFY, + stream=df.stream, ).columns() else: val_cols = [ @@ -433,6 +455,7 @@ def _reorder_to_input( rank_out_dtypes: list[DataType], *, order_index: plc.Column | None = None, + stream: Stream, ) -> list[Column]: # Reorder scan results from grouped-order back to input row order if order_index is None: @@ -442,6 +465,7 @@ def _reorder_to_input( plc.Table([*(c.obj for c in by_cols), row_id]), [*key_orders, plc.types.Order.ASCENDING], [*key_nulls, plc.types.NullOrder.AFTER], + stream=stream, ) return [ @@ -452,11 +476,15 @@ def _reorder_to_input( plc.Table( [ plc.Column.from_scalar( - plc.Scalar.from_py(None, tbl.columns()[0].type()), + plc.Scalar.from_py( + None, tbl.columns()[0].type(), stream=stream + ), n_rows, + stream=stream, ) ] ), + stream=stream, ).columns()[0], name=name, dtype=dtype, @@ -495,6 +523,7 @@ def _build_window_order_index( ob_nulls_last: bool, value_col: plc.Column | None = None, value_desc: bool = False, + stream: Stream, ) -> plc.Column: """Compute a stable row ordering for unary operations in a grouped context.""" cols: list[plc.Column] = [c.obj for c in by_cols] @@ -527,17 +556,18 @@ def _build_window_order_index( orders.append(plc.types.Order.ASCENDING) nulls.append(plc.types.NullOrder.AFTER) - return plc.sorting.stable_sorted_order(plc.Table(cols), orders, nulls) + return plc.sorting.stable_sorted_order( + plc.Table(cols), orders, nulls, stream=stream + ) def _gather_columns( - self, - cols: Sequence[Column], - order_index: plc.Column, + self, cols: Sequence[Column], order_index: plc.Column, stream: Stream ) -> list[Column]: gathered_tbl = plc.copying.gather( plc.Table([c.obj for c in cols]), order_index, plc.copying.OutOfBoundsPolicy.NULLIFY, + stream=stream, ) return [ @@ -561,6 +591,7 @@ def _grouped_window_scan_setup( ob_desc: bool, ob_nulls_last: bool, grouper: plc.groupby.GroupBy, + stream: Stream, ) -> tuple[plc.Column | None, list[Column] | None, plc.groupby.GroupBy]: if order_by_col is None: # keep the original ordering @@ -571,8 +602,9 @@ def _grouped_window_scan_setup( order_by_col=order_by_col, ob_desc=ob_desc, ob_nulls_last=ob_nulls_last, + stream=stream, ) - by_cols_for_scan = self._gather_columns(by_cols, order_index) + by_cols_for_scan = self._gather_columns(by_cols, order_index, stream=stream) assert by_cols_for_scan is not None local = self._sorted_grouper(by_cols_for_scan) return order_index, by_cols_for_scan, local @@ -584,12 +616,13 @@ def _broadcast_agg_results( value_tbls: list[plc.Table], names: list[str], dtypes: list[DataType], + stream: Stream, ) -> list[Column]: # We do a left-join between the input keys to group-keys # so every input row appears exactly once. left_order is # returned un-ordered by libcudf. left_order, right_order = plc.join.left_join( - by_tbl, group_keys_tbl, plc.types.NullEquality.EQUAL + by_tbl, group_keys_tbl, plc.types.NullEquality.EQUAL, stream ) # Scatter the right order indices into an all-null table @@ -597,12 +630,13 @@ def _broadcast_agg_results( # have the map between rows and groups with the correct ordering. left_rows = left_order.size() target = plc.Column.from_scalar( - plc.Scalar.from_py(None, plc.types.SIZE_TYPE), left_rows + plc.Scalar.from_py(None, plc.types.SIZE_TYPE, stream), left_rows, stream ) aligned_map = plc.copying.scatter( plc.Table([right_order]), left_order, plc.Table([target]), + stream, ).columns()[0] # Broadcast each scalar aggregated result back to row-shape using @@ -614,6 +648,7 @@ def _broadcast_agg_results( plc.Table([col]), aligned_map, plc.copying.OutOfBoundsPolicy.NULLIFY, + stream, ).columns()[0], name=name, dtype=dtype, @@ -651,7 +686,10 @@ def _build_groupby_requests( if order_index is not None and eval_cols: eval_cols = plc.copying.gather( - plc.Table(eval_cols), order_index, plc.copying.OutOfBoundsPolicy.NULLIFY + plc.Table(eval_cols), + order_index, + plc.copying.OutOfBoundsPolicy.NULLIFY, + stream=df.stream, ).columns() gathered_iter = iter(eval_cols) @@ -680,9 +718,12 @@ def do_evaluate( # noqa: D102 by_cols = broadcast( *(b.evaluate(df) for b in by_exprs), target_length=df.num_rows, + stream=df.stream, ) order_by_col = ( - broadcast(order_by_expr.evaluate(df), target_length=df.num_rows)[0] + broadcast( + order_by_expr.evaluate(df), target_length=df.num_rows, stream=df.stream + )[0] if order_by_expr is not None else None ) @@ -724,14 +765,20 @@ def do_evaluate( # noqa: D102 group_keys_tbl, value_tables = grouper.aggregate(gb_requests) broadcasted_cols = self._broadcast_agg_results( - by_tbl, group_keys_tbl, value_tables, out_names, out_dtypes + by_tbl, + group_keys_tbl, + value_tables, + out_names, + out_dtypes, + df.stream, ) if order_sensitive: row_id = plc.filling.sequence( df.num_rows, - plc.Scalar.from_py(0, plc.types.SIZE_TYPE), - plc.Scalar.from_py(1, plc.types.SIZE_TYPE), + plc.Scalar.from_py(0, plc.types.SIZE_TYPE, stream=df.stream), + plc.Scalar.from_py(1, plc.types.SIZE_TYPE, stream=df.stream), + stream=df.stream, ) _, _, ob_desc, ob_nulls_last = self.options order_index, _, local = self._grouped_window_scan_setup( @@ -741,6 +788,7 @@ def do_evaluate( # noqa: D102 ob_desc=ob_desc, ob_nulls_last=ob_nulls_last, grouper=grouper, + stream=df.stream, ) assert order_index is not None @@ -756,13 +804,15 @@ def do_evaluate( # noqa: D102 value_tables_local, out_names, out_dtypes, + df.stream, ) ) row_id = plc.filling.sequence( df.num_rows, - plc.Scalar.from_py(0, plc.types.SIZE_TYPE), - plc.Scalar.from_py(1, plc.types.SIZE_TYPE), + plc.Scalar.from_py(0, plc.types.SIZE_TYPE, stream=df.stream), + plc.Scalar.from_py(1, plc.types.SIZE_TYPE, stream=df.stream), + stream=df.stream, ) if rank_named := unary_window_ops["rank"]: @@ -784,8 +834,11 @@ def do_evaluate( # noqa: D102 df, context=ExecutionContext.FRAME ).obj, value_desc=desc, + stream=df.stream, + ) + rank_by_cols_for_scan = self._gather_columns( + by_cols, order_index, stream=df.stream ) - rank_by_cols_for_scan = self._gather_columns(by_cols, order_index) local = GroupedRollingWindow._sorted_grouper(rank_by_cols_for_scan) names, dtypes, tables = self._apply_unary_op( RankOp( @@ -806,6 +859,7 @@ def do_evaluate( # noqa: D102 names, dtypes, order_index=order_index, + stream=df.stream, ) ) else: @@ -818,7 +872,13 @@ def do_evaluate( # noqa: D102 ) broadcasted_cols.extend( self._reorder_to_input( - row_id, by_cols, df.num_rows, tables, names, dtypes + row_id, + by_cols, + df.num_rows, + tables, + names, + dtypes, + stream=df.stream, ) ) @@ -837,6 +897,7 @@ def do_evaluate( # noqa: D102 if self._order_by_expr is not None else False, grouper=grouper, + stream=df.stream, ) ) @@ -872,6 +933,7 @@ def do_evaluate( # noqa: D102 names, dtypes, order_index=order_index, + stream=df.stream, ) ) @@ -890,6 +952,7 @@ def do_evaluate( # noqa: D102 if self._order_by_expr is not None else False, grouper=grouper, + stream=df.stream, ) ) names, dtypes, tables = self._apply_unary_op( @@ -911,6 +974,7 @@ def do_evaluate( # noqa: D102 names, dtypes, order_index=order_index, + stream=df.stream, ) ) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/selection.py b/python/cudf_polars/cudf_polars/dsl/expressions/selection.py index 68af9c4227f..acc78913097 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/selection.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/selection.py @@ -36,19 +36,22 @@ def do_evaluate( child.evaluate(df, context=context) for child in self.children ) n = values.size - lo, hi = plc.reduce.minmax(indices.obj) + lo, hi = plc.reduce.minmax(indices.obj, stream=df.stream) if hi.to_py() >= n or lo.to_py() < -n: # type: ignore[operator] raise ValueError("gather indices are out of bounds") if indices.null_count: bounds_policy = plc.copying.OutOfBoundsPolicy.NULLIFY obj = plc.replace.replace_nulls( indices.obj, - plc.Scalar.from_py(n, dtype=indices.obj.type()), + plc.Scalar.from_py(n, dtype=indices.obj.type(), stream=df.stream), + stream=df.stream, ) else: bounds_policy = plc.copying.OutOfBoundsPolicy.DONT_CHECK obj = indices.obj - table = plc.copying.gather(plc.Table([values.obj]), obj, bounds_policy) + table = plc.copying.gather( + plc.Table([values.obj]), obj, bounds_policy, stream=df.stream + ) return Column(table.columns()[0], dtype=self.dtype) @@ -67,6 +70,6 @@ def do_evaluate( """Evaluate this expression given a dataframe for context.""" values, mask = (child.evaluate(df, context=context) for child in self.children) table = plc.stream_compaction.apply_boolean_mask( - plc.Table([values.obj]), mask.obj + plc.Table([values.obj]), mask.obj, stream=df.stream ) return Column(table.columns()[0], dtype=self.dtype).sorted_like(values) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/slicing.py b/python/cudf_polars/cudf_polars/dsl/expressions/slicing.py index 66df8e5291f..83b5c81d5fc 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/slicing.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/slicing.py @@ -43,4 +43,4 @@ def do_evaluate( """Evaluate this expression given a dataframe for context.""" (child,) = self.children column = child.evaluate(df, context=context) - return column.slice((self.offset, self.length)) + return column.slice((self.offset, self.length), stream=df.stream) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/string.py b/python/cudf_polars/cudf_polars/dsl/expressions/string.py index a40afc644ee..92b8a61d533 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/string.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/string.py @@ -315,7 +315,7 @@ def do_evaluate( columns = [ Column( child.evaluate(df, context=context).obj, dtype=child.dtype - ).astype(self.dtype) + ).astype(self.dtype, stream=df.stream) for child in self.children ] @@ -323,6 +323,7 @@ def do_evaluate( broadcasted = broadcast( *columns, target_length=max(non_unit_sizes) if non_unit_sizes else None, + stream=df.stream, ) delimiter, ignore_nulls = self.options @@ -330,26 +331,39 @@ def do_evaluate( return Column( plc.strings.combine.concatenate( plc.Table([col.obj for col in broadcasted]), - plc.Scalar.from_py(delimiter, self.dtype.plc_type), + plc.Scalar.from_py( + delimiter, self.dtype.plc_type, stream=df.stream + ), None if ignore_nulls - else plc.Scalar.from_py(None, self.dtype.plc_type), + else plc.Scalar.from_py( + None, self.dtype.plc_type, stream=df.stream + ), None, plc.strings.combine.SeparatorOnNulls.NO, + stream=df.stream, ), dtype=self.dtype, ) elif self.name is StringFunction.Name.ConcatVertical: (child,) = self.children - column = child.evaluate(df, context=context).astype(self.dtype) + column = child.evaluate(df, context=context).astype( + self.dtype, stream=df.stream + ) delimiter, ignore_nulls = self.options if column.null_count > 0 and not ignore_nulls: - return Column(plc.Column.all_null_like(column.obj, 1), dtype=self.dtype) + return Column( + plc.Column.all_null_like(column.obj, 1, stream=df.stream), + dtype=self.dtype, + ) return Column( plc.strings.combine.join_strings( column.obj, - plc.Scalar.from_py(delimiter, self.dtype.plc_type), - plc.Scalar.from_py(None, self.dtype.plc_type), + plc.Scalar.from_py( + delimiter, self.dtype.plc_type, stream=df.stream + ), + plc.Scalar.from_py(None, self.dtype.plc_type, stream=df.stream), + stream=df.stream, ), dtype=self.dtype, ) @@ -358,18 +372,24 @@ def do_evaluate( # polars pads based on bytes, libcudf by visual width # only pass chars if the visual width matches the byte length column = self.children[0].evaluate(df, context=context) - col_len_bytes = plc.strings.attributes.count_bytes(column.obj) - col_len_chars = plc.strings.attributes.count_characters(column.obj) + col_len_bytes = plc.strings.attributes.count_bytes( + column.obj, stream=df.stream + ) + col_len_chars = plc.strings.attributes.count_characters( + column.obj, stream=df.stream + ) equal = plc.binaryop.binary_operation( col_len_bytes, col_len_chars, plc.binaryop.BinaryOperator.NULL_EQUALS, plc.DataType(plc.TypeId.BOOL8), + stream=df.stream, ) if not plc.reduce.reduce( equal, plc.aggregation.all(), plc.DataType(plc.TypeId.BOOL8), + stream=df.stream, ).to_py(): raise InvalidOperationError( "zfill only supports ascii strings with no unicode characters" @@ -380,22 +400,31 @@ def do_evaluate( if width.value is None: return Column( plc.Column.from_scalar( - plc.Scalar.from_py(None, self.dtype.plc_type), + plc.Scalar.from_py( + None, self.dtype.plc_type, stream=df.stream + ), column.size, + stream=df.stream, ), self.dtype, ) return Column( - plc.strings.padding.zfill(column.obj, width.value), self.dtype + plc.strings.padding.zfill( + column.obj, width.value, stream=df.stream + ), + self.dtype, ) else: col_width = self.children[1].evaluate(df, context=context) assert isinstance(col_width, Column) all_gt_0 = plc.binaryop.binary_operation( col_width.obj, - plc.Scalar.from_py(0, plc.DataType(plc.TypeId.INT64)), + plc.Scalar.from_py( + 0, plc.DataType(plc.TypeId.INT64), stream=df.stream + ), plc.binaryop.BinaryOperator.GREATER_EQUAL, plc.DataType(plc.TypeId.BOOL8), + stream=df.stream, ) if ( @@ -404,12 +433,15 @@ def do_evaluate( all_gt_0, plc.aggregation.all(), plc.DataType(plc.TypeId.BOOL8), + stream=df.stream, ).to_py() ): # pragma: no cover raise InvalidOperationError("fill conversion failed.") return Column( - plc.strings.padding.zfill_by_widths(column.obj, col_width.obj), + plc.strings.padding.zfill_by_widths( + column.obj, col_width.obj, stream=df.stream + ), self.dtype, ) @@ -421,16 +453,19 @@ def do_evaluate( if literal: pat = arg.evaluate(df, context=context) pattern = ( - pat.obj_scalar + pat.obj_scalar(stream=df.stream) if pat.is_scalar and pat.size != column.size else pat.obj ) return Column( - plc.strings.find.contains(column.obj, pattern), dtype=self.dtype + plc.strings.find.contains(column.obj, pattern, stream=df.stream), + dtype=self.dtype, ) else: return Column( - plc.strings.contains.contains_re(column.obj, self._regex_program), + plc.strings.contains.contains_re( + column.obj, self._regex_program, stream=df.stream + ), dtype=self.dtype, ) elif self.name is StringFunction.Name.ContainsAny: @@ -439,16 +474,18 @@ def do_evaluate( plc_column = child.evaluate(df, context=context).obj plc_targets = arg.evaluate(df, context=context).obj if ascii_case_insensitive: - plc_column = plc.strings.case.to_lower(plc_column) - plc_targets = plc.strings.case.to_lower(plc_targets) + plc_column = plc.strings.case.to_lower(plc_column, stream=df.stream) + plc_targets = plc.strings.case.to_lower(plc_targets, stream=df.stream) contains = plc.strings.find_multiple.contains_multiple( plc_column, plc_targets, + stream=df.stream, ) binary_or = functools.partial( plc.binaryop.binary_operation, op=plc.binaryop.BinaryOperator.BITWISE_OR, output_type=self.dtype.plc_type, + stream=df.stream, ) return Column( functools.reduce(binary_or, contains.columns()), @@ -459,8 +496,11 @@ def do_evaluate( plc_column = child.evaluate(df, context=context).obj return Column( plc.unary.cast( - plc.strings.contains.count_re(plc_column, self._regex_program), + plc.strings.contains.count_re( + plc_column, self._regex_program, stream=df.stream + ), self.dtype.plc_type, + stream=df.stream, ), dtype=self.dtype, ) @@ -469,15 +509,14 @@ def do_evaluate( plc_column = self.children[0].evaluate(df, context=context).obj return Column( plc.strings.extract.extract_single( - plc_column, self._regex_program, group_index - 1 + plc_column, self._regex_program, group_index - 1, stream=df.stream ), dtype=self.dtype, ) elif self.name is StringFunction.Name.ExtractGroups: plc_column = self.children[0].evaluate(df, context=context).obj plc_table = plc.strings.extract.extract( - plc_column, - self._regex_program, + plc_column, self._regex_program, stream=df.stream ) return Column( plc.Column.struct_from_children(plc_table.columns()), @@ -491,33 +530,40 @@ def do_evaluate( assert isinstance(expr, Literal) plc_column = plc.strings.find.find( plc_column, - plc.Scalar.from_py(expr.value, expr.dtype.plc_type), + plc.Scalar.from_py( + expr.value, expr.dtype.plc_type, stream=df.stream + ), + stream=df.stream, ) else: plc_column = plc.strings.findall.find_re( - plc_column, - self._regex_program, + plc_column, self._regex_program, stream=df.stream ) # Polars returns None for not found, libcudf returns -1 new_mask, null_count = plc.transform.bools_to_mask( plc.binaryop.binary_operation( plc_column, - plc.Scalar.from_py(-1, plc_column.type()), + plc.Scalar.from_py(-1, plc_column.type(), stream=df.stream), plc.binaryop.BinaryOperator.NOT_EQUAL, plc.DataType(plc.TypeId.BOOL8), - ) + stream=df.stream, + ), + stream=df.stream, ) plc_column = plc.unary.cast( - plc_column.with_mask(new_mask, null_count), self.dtype.plc_type + plc_column.with_mask(new_mask, null_count), + self.dtype.plc_type, + stream=df.stream, ) return Column(plc_column, dtype=self.dtype) elif self.name is StringFunction.Name.JsonDecode: plc_column = self.children[0].evaluate(df, context=context).obj plc_table_with_metadata = plc.io.json.read_json_from_string_column( plc_column, - plc.Scalar.from_py("\n"), - plc.Scalar.from_py("NULL"), + plc.Scalar.from_py("\n", stream=df.stream), + plc.Scalar.from_py("NULL", stream=df.stream), _dtypes_for_json_decode(self.dtype), + stream=df.stream, ) return Column( plc.Column.struct_from_children(plc_table_with_metadata.columns), @@ -527,16 +573,20 @@ def do_evaluate( (child, expr) = self.children plc_column = child.evaluate(df, context=context).obj assert isinstance(expr, Literal) - json_path = plc.Scalar.from_py(expr.value, expr.dtype.plc_type) + json_path = plc.Scalar.from_py( + expr.value, expr.dtype.plc_type, stream=df.stream + ) return Column( - plc.json.get_json_object(plc_column, json_path), + plc.json.get_json_object(plc_column, json_path, stream=df.stream), dtype=self.dtype, ) elif self.name is StringFunction.Name.LenBytes: plc_column = self.children[0].evaluate(df, context=context).obj return Column( plc.unary.cast( - plc.strings.attributes.count_bytes(plc_column), self.dtype.plc_type + plc.strings.attributes.count_bytes(plc_column, stream=df.stream), + self.dtype.plc_type, + stream=df.stream, ), dtype=self.dtype, ) @@ -544,8 +594,11 @@ def do_evaluate( plc_column = self.children[0].evaluate(df, context=context).obj return Column( plc.unary.cast( - plc.strings.attributes.count_characters(plc_column), + plc.strings.attributes.count_characters( + plc_column, stream=df.stream + ), self.dtype.plc_type, + stream=df.stream, ), dtype=self.dtype, ) @@ -575,8 +628,13 @@ def do_evaluate( return Column( plc.strings.slice.slice_strings( column.obj, - plc.Scalar.from_py(start, plc.DataType(plc.TypeId.INT32)), - plc.Scalar.from_py(stop, plc.DataType(plc.TypeId.INT32)), + plc.Scalar.from_py( + start, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), + plc.Scalar.from_py( + stop, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), + stream=df.stream, ), dtype=self.dtype, ) @@ -600,7 +658,9 @@ def do_evaluate( ) else: assert isinstance(expr, Literal) - by = plc.Scalar.from_py(expr.value, expr.dtype.plc_type) + by = plc.Scalar.from_py( + expr.value, expr.dtype.plc_type, stream=df.stream + ) # See https://github.com/pola-rs/polars/issues/11640 # for SplitN vs SplitExact edge case behaviors max_splits = n if is_split_n else 0 @@ -608,13 +668,16 @@ def do_evaluate( column.obj, by, max_splits - 1, + stream=df.stream, ) children = plc_table.columns() ref_column = children[0] if (remainder := n - len(children)) > 0: # Reach expected number of splits by padding with nulls children.extend( - plc.Column.all_null_like(ref_column, ref_column.size()) + plc.Column.all_null_like( + ref_column, ref_column.size(), stream=df.stream + ) for _ in range(remainder + int(not is_split_n)) ) if not is_split_n: @@ -638,7 +701,9 @@ def do_evaluate( child, expr = self.children plc_column = child.evaluate(df, context=context).obj assert isinstance(expr, Literal) - target = plc.Scalar.from_py(expr.value, expr.dtype.plc_type) + target = plc.Scalar.from_py( + expr.value, expr.dtype.plc_type, stream=df.stream + ) if self.name == StringFunction.Name.StripPrefix: find = plc.strings.find.starts_with start = len(expr.value) @@ -648,17 +713,23 @@ def do_evaluate( start = 0 end = -len(expr.value) - mask = find(plc_column, target) + mask = find(plc_column, target, stream=df.stream) sliced = plc.strings.slice.slice_strings( plc_column, - plc.Scalar.from_py(start, plc.DataType(plc.TypeId.INT32)), - plc.Scalar.from_py(end, plc.DataType(plc.TypeId.INT32)), + plc.Scalar.from_py( + start, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), + plc.Scalar.from_py( + end, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), + stream=df.stream, ) return Column( plc.copying.copy_if_else( sliced, plc_column, mask, + stream=df.stream, ), dtype=self.dtype, ) @@ -675,7 +746,12 @@ def do_evaluate( else: side = plc.strings.SideType.BOTH return Column( - plc.strings.strip.strip(column.obj, side, chars.obj_scalar), + plc.strings.strip.strip( + column.obj, + side, + chars.obj_scalar(stream=df.stream), + stream=df.stream, + ), dtype=self.dtype, ) @@ -686,15 +762,17 @@ def do_evaluate( if self.children[1].value is None: return Column( plc.Column.from_scalar( - plc.Scalar.from_py(None, self.dtype.plc_type), + plc.Scalar.from_py(None, self.dtype.plc_type, stream=df.stream), column.size, + stream=df.stream, ), self.dtype, ) elif self.children[1].value == 0: result = plc.Column.from_scalar( - plc.Scalar.from_py("", self.dtype.plc_type), + plc.Scalar.from_py("", self.dtype.plc_type, stream=df.stream), column.size, + stream=df.stream, ) if column.obj.null_mask(): result = result.with_mask( @@ -708,9 +786,14 @@ def do_evaluate( return Column( plc.strings.slice.slice_strings( column.obj, - plc.Scalar.from_py(start, plc.DataType(plc.TypeId.INT32)), - plc.Scalar.from_py(end, plc.DataType(plc.TypeId.INT32)), + plc.Scalar.from_py( + start, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), + plc.Scalar.from_py( + end, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), None, + stream=df.stream, ), self.dtype, ) @@ -723,16 +806,22 @@ def do_evaluate( if end is None: return Column( plc.Column.from_scalar( - plc.Scalar.from_py(None, self.dtype.plc_type), + plc.Scalar.from_py(None, self.dtype.plc_type, stream=df.stream), column.size, + stream=df.stream, ), self.dtype, ) return Column( plc.strings.slice.slice_strings( column.obj, - plc.Scalar.from_py(0, plc.DataType(plc.TypeId.INT32)), - plc.Scalar.from_py(end, plc.DataType(plc.TypeId.INT32)), + plc.Scalar.from_py( + 0, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), + plc.Scalar.from_py( + end, plc.DataType(plc.TypeId.INT32), stream=df.stream + ), + stream=df.stream, ), self.dtype, ) @@ -740,18 +829,25 @@ def do_evaluate( columns = [child.evaluate(df, context=context) for child in self.children] if self.name is StringFunction.Name.Lowercase: (column,) = columns - return Column(plc.strings.case.to_lower(column.obj), dtype=self.dtype) + return Column( + plc.strings.case.to_lower(column.obj, stream=df.stream), + dtype=self.dtype, + ) elif self.name is StringFunction.Name.Uppercase: (column,) = columns - return Column(plc.strings.case.to_upper(column.obj), dtype=self.dtype) + return Column( + plc.strings.case.to_upper(column.obj, stream=df.stream), + dtype=self.dtype, + ) elif self.name is StringFunction.Name.EndsWith: column, suffix = columns return Column( plc.strings.find.ends_with( column.obj, - suffix.obj_scalar + suffix.obj_scalar(stream=df.stream) if column.size != suffix.size and suffix.is_scalar else suffix.obj, + stream=df.stream, ), dtype=self.dtype, ) @@ -760,9 +856,10 @@ def do_evaluate( return Column( plc.strings.find.starts_with( column.obj, - prefix.obj_scalar + prefix.obj_scalar(stream=df.stream) if column.size != prefix.size and prefix.is_scalar else prefix.obj, + stream=df.stream, ), dtype=self.dtype, ) @@ -774,22 +871,27 @@ def do_evaluate( if plc_col.null_count() == plc_col.size(): return Column( plc.Column.from_scalar( - plc.Scalar.from_py(None, self.dtype.plc_type), + plc.Scalar.from_py(None, self.dtype.plc_type, stream=df.stream), plc_col.size(), + stream=df.stream, ), self.dtype, ) if format is None: # Polars begins inference with the first non null value if plc_col.null_mask() is not None: - boolmask = plc.unary.is_valid(plc_col) + boolmask = plc.unary.is_valid(plc_col, stream=df.stream) table = plc.stream_compaction.apply_boolean_mask( - plc.Table([plc_col]), boolmask + plc.Table([plc_col]), boolmask, stream=df.stream ) filtered = table.columns()[0] - first_valid_data = plc.copying.get_element(filtered, 0).to_py() + first_valid_data = plc.copying.get_element( + filtered, 0, stream=df.stream + ).to_py() else: - first_valid_data = plc.copying.get_element(plc_col, 0).to_py() + first_valid_data = plc.copying.get_element( + plc_col, 0, stream=df.stream + ).to_py() # See https://github.com/rapidsai/cudf/issues/20202 for we type ignore format = _infer_datetime_format(first_valid_data) # type: ignore[arg-type] @@ -799,27 +901,28 @@ def do_evaluate( ) is_timestamps = plc.strings.convert.convert_datetime.is_timestamp( - plc_col, format + plc_col, format, stream=df.stream ) if strict: if not plc.reduce.reduce( is_timestamps, plc.aggregation.all(), plc.DataType(plc.TypeId.BOOL8), + stream=df.stream, ).to_py(): raise InvalidOperationError("conversion from `str` failed.") else: not_timestamps = plc.unary.unary_operation( - is_timestamps, plc.unary.UnaryOperator.NOT + is_timestamps, plc.unary.UnaryOperator.NOT, stream=df.stream ) - null = plc.Scalar.from_py(None, plc_col.type()) + null = plc.Scalar.from_py(None, plc_col.type(), stream=df.stream) plc_col = plc.copying.boolean_mask_scatter( - [null], plc.Table([plc_col]), not_timestamps + [null], plc.Table([plc_col]), not_timestamps, stream=df.stream ).columns()[0] return Column( plc.strings.convert.convert_datetime.to_timestamps( - plc_col, self.dtype.plc_type, format + plc_col, self.dtype.plc_type, format, stream=df.stream ), dtype=self.dtype, ) @@ -829,9 +932,10 @@ def do_evaluate( return Column( plc.strings.replace.replace( col_column.obj, - col_target.obj_scalar, - col_repl.obj_scalar, + col_target.obj_scalar(stream=df.stream), + col_repl.obj_scalar(stream=df.stream), maxrepl=n, + stream=df.stream, ), dtype=self.dtype, ) @@ -839,7 +943,7 @@ def do_evaluate( col_column, col_target, col_repl = columns return Column( plc.strings.replace.replace_multiple( - col_column.obj, col_target.obj, col_repl.obj + col_column.obj, col_target.obj, col_repl.obj, stream=df.stream ), dtype=self.dtype, ) @@ -853,13 +957,14 @@ def do_evaluate( # TODO: Maybe accept a string scalar in # cudf::strings::pad to avoid DtoH transfer # See https://github.com/rapidsai/cudf/issues/20202 for we type ignore - width: int = width_col.obj.to_scalar().to_py() # type: ignore[no-redef] + width: int = width_col.obj.to_scalar(stream=df.stream).to_py() # type: ignore[no-redef] return Column( plc.strings.padding.pad( column.obj, width, # type: ignore[arg-type] plc.strings.SideType.LEFT, char, + stream=df.stream, ), dtype=self.dtype, ) @@ -872,22 +977,29 @@ def do_evaluate( (char,) = self.options # TODO: Maybe accept a string scalar in # cudf::strings::pad to avoid DtoH transfer - width: int = width_col.obj.to_scalar().to_py() # type: ignore[no-redef] + width: int = width_col.obj.to_scalar(stream=df.stream).to_py() # type: ignore[no-redef] return Column( plc.strings.padding.pad( column.obj, width, # type: ignore[arg-type] plc.strings.SideType.RIGHT, char, + stream=df.stream, ), dtype=self.dtype, ) elif self.name is StringFunction.Name.Reverse: (column,) = columns - return Column(plc.strings.reverse.reverse(column.obj), dtype=self.dtype) + return Column( + plc.strings.reverse.reverse(column.obj, stream=df.stream), + dtype=self.dtype, + ) elif self.name is StringFunction.Name.Titlecase: (column,) = columns - return Column(plc.strings.capitalize.title(column.obj), dtype=self.dtype) + return Column( + plc.strings.capitalize.title(column.obj, stream=df.stream), + dtype=self.dtype, + ) raise NotImplementedError( f"StringFunction {self.name}" ) # pragma: no cover; handled by init raising diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/struct.py b/python/cudf_polars/cudf_polars/dsl/expressions/struct.py index b9749a1bbba..daa28deb624 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/struct.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/struct.py @@ -125,9 +125,11 @@ def do_evaluate( .utf8_escaped(val=False) .build() ) - plc.io.json.write_json(options) + plc.io.json.write_json(options, stream=df.stream) return Column( - plc.Column.from_iterable_of_py(buff.getvalue().split()), + plc.Column.from_iterable_of_py( + buff.getvalue().split(), stream=df.stream + ), dtype=self.dtype, ) elif self.name in { diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/ternary.py b/python/cudf_polars/cudf_polars/dsl/expressions/ternary.py index 1bf6ea476f1..7a87cf2bba7 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/ternary.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/ternary.py @@ -41,9 +41,15 @@ def do_evaluate( when, then, otherwise = ( child.evaluate(df, context=context) for child in self.children ) - then_obj = then.obj_scalar if then.is_scalar else then.obj - otherwise_obj = otherwise.obj_scalar if otherwise.is_scalar else otherwise.obj + then_obj = then.obj_scalar(stream=df.stream) if then.is_scalar else then.obj + otherwise_obj = ( + otherwise.obj_scalar(stream=df.stream) + if otherwise.is_scalar + else otherwise.obj + ) return Column( - plc.copying.copy_if_else(then_obj, otherwise_obj, when.obj), + plc.copying.copy_if_else( + then_obj, otherwise_obj, when.obj, stream=df.stream + ), dtype=self.dtype, ) diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/unary.py b/python/cudf_polars/cudf_polars/dsl/expressions/unary.py index 4e1adb4f9de..4e184e6533e 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/unary.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/unary.py @@ -43,7 +43,7 @@ def do_evaluate( """Evaluate this expression given a dataframe for context.""" (child,) = self.children column = child.evaluate(df, context=context) - return column.astype(self.dtype) + return column.astype(self.dtype, stream=df.stream) class Len(Expr): @@ -60,8 +60,9 @@ def do_evaluate( """Evaluate this expression given a dataframe for context.""" return Column( plc.Column.from_scalar( - plc.Scalar.from_py(df.num_rows, self.dtype.plc_type), + plc.Scalar.from_py(df.num_rows, self.dtype.plc_type, stream=df.stream), 1, + stream=df.stream, ), dtype=self.dtype, ) @@ -173,13 +174,16 @@ def do_evaluate( """Evaluate this expression given a dataframe for context.""" if self.name == "mask_nans": (child,) = self.children - return child.evaluate(df, context=context).mask_nans() + return child.evaluate(df, context=context).mask_nans(stream=df.stream) if self.name == "null_count": (column,) = (child.evaluate(df, context=context) for child in self.children) return Column( plc.Column.from_scalar( - plc.Scalar.from_py(column.null_count, self.dtype.plc_type), + plc.Scalar.from_py( + column.null_count, self.dtype.plc_type, stream=df.stream + ), 1, + stream=df.stream, ), dtype=self.dtype, ) @@ -199,6 +203,7 @@ def do_evaluate( if round_mode == "half_to_even" else plc.round.RoundingMethod.HALF_UP ), + stream=df.stream, ), dtype=self.dtype, ).sorted_like(values) # pragma: no cover @@ -215,6 +220,7 @@ def do_evaluate( [0], keep, plc.types.NullEquality.EQUAL, + stream=df.stream, ).columns() else: distinct = ( @@ -228,6 +234,7 @@ def do_evaluate( keep, plc.types.NullEquality.EQUAL, plc.types.NanEquality.ALL_EQUAL, + stream=df.stream, ).columns() column = Column(compacted, dtype=self.dtype) if maintain_order: @@ -244,9 +251,11 @@ def do_evaluate( null_order = plc.types.NullOrder.BEFORE if column.null_count > 0 and (n := column.size) > 1: # PERF: This invokes four stream synchronisations! - has_nulls_first = not plc.copying.get_element(column.obj, 0).is_valid() + has_nulls_first = not plc.copying.get_element( + column.obj, 0, stream=df.stream + ).is_valid() has_nulls_last = not plc.copying.get_element( - column.obj, n - 1 + column.obj, n - 1, stream=df.stream ).is_valid() if (order == plc.types.Order.DESCENDING and has_nulls_first) or ( order == plc.types.Order.ASCENDING and has_nulls_last @@ -274,27 +283,40 @@ def do_evaluate( counts_table, [plc.types.Order.DESCENDING], [plc.types.NullOrder.BEFORE], + stream=df.stream, ) counts_table = plc.copying.gather( - counts_table, sort_indices, plc.copying.OutOfBoundsPolicy.DONT_CHECK + counts_table, + sort_indices, + plc.copying.OutOfBoundsPolicy.DONT_CHECK, + stream=df.stream, ) keys_table = plc.copying.gather( - keys_table, sort_indices, plc.copying.OutOfBoundsPolicy.DONT_CHECK + keys_table, + sort_indices, + plc.copying.OutOfBoundsPolicy.DONT_CHECK, + stream=df.stream, ) keys_col = keys_table.columns()[0] counts_col = counts_table.columns()[0] if normalize: total_counts = plc.reduce.reduce( - counts_col, plc.aggregation.sum(), plc.DataType(plc.TypeId.UINT64) + counts_col, + plc.aggregation.sum(), + plc.DataType(plc.TypeId.UINT64), + stream=df.stream, ) counts_col = plc.binaryop.binary_operation( counts_col, total_counts, plc.binaryop.BinaryOperator.DIV, plc.DataType(plc.TypeId.FLOAT64), + stream=df.stream, ) elif counts_col.type().id() == plc.TypeId.INT32: - counts_col = plc.unary.cast(counts_col, plc.DataType(plc.TypeId.UINT32)) + counts_col = plc.unary.cast( + counts_col, plc.DataType(plc.TypeId.UINT32), stream=df.stream + ) plc_column = plc.Column( self.dtype.plc_type, @@ -312,7 +334,7 @@ def do_evaluate( return column return Column( plc.stream_compaction.drop_nulls( - plc.Table([column.obj]), [0], 1 + plc.Table([column.obj]), [0], 1, stream=df.stream ).columns()[0], dtype=self.dtype, ) @@ -322,19 +344,31 @@ def do_evaluate( return column fill_value = self.children[1] if isinstance(fill_value, Literal): - arg = plc.Scalar.from_py(fill_value.value, fill_value.dtype.plc_type) + arg = plc.Scalar.from_py( + fill_value.value, fill_value.dtype.plc_type, stream=df.stream + ) else: evaluated = fill_value.evaluate(df, context=context) - arg = evaluated.obj_scalar if evaluated.is_scalar else evaluated.obj + arg = ( + evaluated.obj_scalar(stream=df.stream) + if evaluated.is_scalar + else evaluated.obj + ) if isinstance(arg, plc.Scalar) and dtypes.can_cast( column.dtype.plc_type, arg.type() ): # pragma: no cover arg = ( - Column(plc.Column.from_scalar(arg, 1), dtype=fill_value.dtype) - .astype(column.dtype) - .obj.to_scalar() + Column( + plc.Column.from_scalar(arg, 1, stream=df.stream), + dtype=fill_value.dtype, + ) + .astype(column.dtype, stream=df.stream) + .obj.to_scalar(stream=df.stream) ) - return Column(plc.replace.replace_nulls(column.obj, arg), dtype=self.dtype) + return Column( + plc.replace.replace_nulls(column.obj, arg, stream=df.stream), + dtype=self.dtype, + ) elif self.name == "fill_null_with_strategy": column = self.children[0].evaluate(df, context=context) strategy, limit = self.options @@ -357,36 +391,48 @@ def do_evaluate( column.obj, plc.aggregation.min(), column.dtype.plc_type, + stream=df.stream, ) elif strategy == "max": replacement = plc.reduce.reduce( column.obj, plc.aggregation.max(), column.dtype.plc_type, + stream=df.stream, ) elif strategy == "mean": replacement = plc.reduce.reduce( column.obj, plc.aggregation.mean(), plc.DataType(plc.TypeId.FLOAT64), + stream=df.stream, ) elif strategy == "zero": - replacement = plc.scalar.Scalar.from_py(0, dtype=column.dtype.plc_type) + replacement = plc.scalar.Scalar.from_py( + 0, dtype=column.dtype.plc_type, stream=df.stream + ) elif strategy == "one": - replacement = plc.scalar.Scalar.from_py(1, dtype=column.dtype.plc_type) + replacement = plc.scalar.Scalar.from_py( + 1, dtype=column.dtype.plc_type, stream=df.stream + ) else: assert_never(strategy) # pragma: no cover if strategy == "mean": return Column( plc.replace.replace_nulls( - plc.unary.cast(column.obj, plc.DataType(plc.TypeId.FLOAT64)), + plc.unary.cast( + column.obj, + plc.DataType(plc.TypeId.FLOAT64), + stream=df.stream, + ), replacement, + stream=df.stream, ), dtype=self.dtype, - ).astype(self.dtype) + ).astype(self.dtype, stream=df.stream) return Column( - plc.replace.replace_nulls(column.obj, replacement), + plc.replace.replace_nulls(column.obj, replacement, stream=df.stream), dtype=self.dtype, ) elif self.name == "as_struct": @@ -428,6 +474,7 @@ def do_evaluate( plc.types.NullPolicy.EXCLUDE, plc.types.NullOrder.BEFORE if descending else plc.types.NullOrder.AFTER, percentage=False, + stream=df.stream, ) # Min/Max/Dense/Ordinal -> IDX_DTYPE @@ -436,11 +483,15 @@ def do_evaluate( dest = self.dtype.plc_type.id() src = ranked.type().id() if dest == plc.TypeId.UINT32 and src != plc.TypeId.UINT32: - ranked = plc.unary.cast(ranked, plc.DataType(plc.TypeId.UINT32)) + ranked = plc.unary.cast( + ranked, plc.DataType(plc.TypeId.UINT32), stream=df.stream + ) elif ( dest == plc.TypeId.UINT64 and src != plc.TypeId.UINT64 ): # pragma: no cover - ranked = plc.unary.cast(ranked, plc.DataType(plc.TypeId.UINT64)) + ranked = plc.unary.cast( + ranked, plc.DataType(plc.TypeId.UINT64), stream=df.stream + ) return Column(ranked, dtype=self.dtype) elif self.name == "top_k": @@ -455,17 +506,20 @@ def do_evaluate( plc.types.Order.ASCENDING if reverse else plc.types.Order.DESCENDING, + stream=df.stream, ), dtype=self.dtype, ) elif self.name in self._OP_MAPPING: column = self.children[0].evaluate(df, context=context) if column.dtype.plc_type.id() != self.dtype.id(): - arg = plc.unary.cast(column.obj, self.dtype.plc_type) + arg = plc.unary.cast(column.obj, self.dtype.plc_type, stream=df.stream) else: arg = column.obj return Column( - plc.unary.unary_operation(arg, self._OP_MAPPING[self.name]), + plc.unary.unary_operation( + arg, self._OP_MAPPING[self.name], stream=df.stream + ), dtype=self.dtype, ) elif self.name in UnaryFunction._supported_cum_aggs: @@ -492,12 +546,16 @@ def do_evaluate( and plc.traits.is_integral(col_type) and plc.types.size_of(col_type) <= 4 ): - plc_col = plc.unary.cast(plc_col, plc.DataType(plc.TypeId.INT64)) + plc_col = plc.unary.cast( + plc_col, plc.DataType(plc.TypeId.INT64), stream=df.stream + ) elif ( self.name == "cum_sum" and column.dtype.plc_type.id() == plc.TypeId.BOOL8 ): - plc_col = plc.unary.cast(plc_col, plc.DataType(plc.TypeId.UINT32)) + plc_col = plc.unary.cast( + plc_col, plc.DataType(plc.TypeId.UINT32), stream=df.stream + ) if self.name == "cum_sum": agg = plc.aggregation.sum() elif self.name == "cum_prod": @@ -508,7 +566,9 @@ def do_evaluate( agg = plc.aggregation.max() return Column( - plc.reduce.scan(plc_col, agg, plc.reduce.ScanType.INCLUSIVE), + plc.reduce.scan( + plc_col, agg, plc.reduce.ScanType.INCLUSIVE, stream=df.stream + ), dtype=self.dtype, ) raise NotImplementedError( diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 936086b75c5..7f44e2bd0a6 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -38,7 +38,10 @@ from cudf_polars.dsl.to_ast import to_ast, to_parquet_filter from cudf_polars.dsl.tracing import log_do_evaluate, nvtx_annotate_cudf_polars from cudf_polars.dsl.utils.reshape import broadcast -from cudf_polars.dsl.utils.windows import range_window_bounds +from cudf_polars.dsl.utils.windows import ( + offsets_to_windows, + range_window_bounds, +) from cudf_polars.utils import dtypes from cudf_polars.utils.cuda_stream import ( get_cuda_stream, @@ -569,9 +572,16 @@ def add_file_paths( Each path is repeated according to the number of rows read from it. """ (filepaths,) = plc.filling.repeat( - plc.Table([plc.Column.from_arrow(pl.Series(values=map(str, paths)))]), + plc.Table( + [ + plc.Column.from_arrow( + pl.Series(values=map(str, paths)), stream=df.stream + ) + ] + ), plc.Column.from_arrow( - pl.Series(values=rows_per_path, dtype=pl.datatypes.Int32()) + pl.Series(values=rows_per_path, dtype=pl.datatypes.Int32()), + stream=df.stream, ), stream=df.stream, ).columns() @@ -729,7 +739,8 @@ def read_csv_header( _parquet_physical_types( schema, paths, with_columns or list(schema.keys()), stream ), - ) + ), + stream=stream, ) parquet_reader_options = plc.io.parquet.ParquetReaderOptions.builder( plc.io.SourceInfo(paths) @@ -844,7 +855,9 @@ def read_csv_header( if predicate is None: return df else: - (mask,) = broadcast(predicate.evaluate(df), target_length=df.num_rows) + (mask,) = broadcast( + predicate.evaluate(df), target_length=df.num_rows, stream=df.stream + ) return df.filter(mask) @@ -1348,7 +1361,7 @@ def do_evaluate( # Handle any broadcasting columns = [e.evaluate(df) for e in exprs] if should_broadcast: - columns = broadcast(*columns) + columns = broadcast(*columns, stream=df.stream) return DataFrame(columns, stream=df.stream) def evaluate( @@ -1436,7 +1449,7 @@ def do_evaluate( context: IRExecutionContext, ) -> DataFrame: # pragma: no cover; not exposed by polars yet """Evaluate and return a dataframe.""" - columns = broadcast(*(e.evaluate(df) for e in exprs)) + columns = broadcast(*(e.evaluate(df) for e in exprs), stream=df.stream) assert all(column.size == 1 for column in columns) return DataFrame(columns, stream=df.stream) @@ -1447,17 +1460,19 @@ class Rolling(IR): __slots__ = ( "agg_requests", "closed_window", - "following", + "following_ordinal", "index", + "index_dtype", "keys", - "preceding", + "preceding_ordinal", "zlice", ) _non_child = ( "schema", "index", - "preceding", - "following", + "index_dtype", + "preceding_ordinal", + "following_ordinal", "closed_window", "keys", "agg_requests", @@ -1465,10 +1480,12 @@ class Rolling(IR): ) index: expr.NamedExpr """Column being rolled over.""" - preceding: plc.Scalar - """Preceding window extent defining start of window.""" - following: plc.Scalar - """Following window extent defining end of window.""" + index_dtype: plc.DataType + """Datatype of the index column.""" + preceding_ordinal: int + """Preceding window extent defining start of window as a host integer.""" + following_ordinal: int + """Following window extent defining end of window as a host integer.""" closed_window: ClosedInterval """Treatment of window endpoints.""" keys: tuple[expr.NamedExpr, ...] @@ -1482,8 +1499,9 @@ def __init__( self, schema: Schema, index: expr.NamedExpr, - preceding: plc.Scalar, - following: plc.Scalar, + index_dtype: plc.DataType, + preceding_ordinal: int, + following_ordinal: int, closed_window: ClosedInterval, keys: Sequence[expr.NamedExpr], agg_requests: Sequence[expr.NamedExpr], @@ -1492,8 +1510,9 @@ def __init__( ): self.schema = schema self.index = index - self.preceding = preceding - self.following = following + self.index_dtype = index_dtype + self.preceding_ordinal = preceding_ordinal + self.following_ordinal = following_ordinal self.closed_window = closed_window self.keys = tuple(keys) self.agg_requests = tuple(agg_requests) @@ -1516,8 +1535,9 @@ def __init__( self.children = (df,) self._non_child_args = ( index, - preceding, - following, + index_dtype, + preceding_ordinal, + following_ordinal, closed_window, keys, agg_requests, @@ -1530,8 +1550,9 @@ def __init__( def do_evaluate( cls, index: expr.NamedExpr, - preceding: plc.Scalar, - following: plc.Scalar, + index_dtype: plc.DataType, + preceding_ordinal: int, + following_ordinal: int, closed_window: ClosedInterval, keys_in: Sequence[expr.NamedExpr], aggs: Sequence[expr.NamedExpr], @@ -1541,7 +1562,11 @@ def do_evaluate( context: IRExecutionContext, ) -> DataFrame: """Evaluate and return a dataframe.""" - keys = broadcast(*(k.evaluate(df) for k in keys_in), target_length=df.num_rows) + keys = broadcast( + *(k.evaluate(df) for k in keys_in), + target_length=df.num_rows, + stream=df.stream, + ) orderby = index.evaluate(df) # Polars casts integral orderby to int64, but only for calculating window bounds if ( @@ -1553,8 +1578,13 @@ def do_evaluate( ) else: orderby_obj = orderby.obj + + preceding_scalar, following_scalar = offsets_to_windows( + index_dtype, preceding_ordinal, following_ordinal, stream=df.stream + ) + preceding_window, following_window = range_window_bounds( - preceding, following, closed_window + preceding_scalar, following_scalar, closed_window ) if orderby.obj.null_count() != 0: raise RuntimeError( @@ -1573,7 +1603,9 @@ def do_evaluate( raise RuntimeError("Input for grouped rolling is not sorted") else: if not orderby.check_sorted( - order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.BEFORE + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, + stream=df.stream, ): raise RuntimeError( f"Index column '{index.name}' in rolling is not sorted, please sort first" @@ -1673,7 +1705,11 @@ def do_evaluate( context: IRExecutionContext, ) -> DataFrame: """Evaluate and return a dataframe.""" - keys = broadcast(*(k.evaluate(df) for k in keys_in), target_length=df.num_rows) + keys = broadcast( + *(k.evaluate(df) for k in keys_in), + target_length=df.num_rows, + stream=df.stream, + ) sorted = ( plc.types.Sorted.YES if all(k.is_sorted for k in keys) @@ -1719,7 +1755,7 @@ def do_evaluate( Column(grouped_key, name=key.name, dtype=key.dtype) for key, grouped_key in zip(keys, group_keys.columns(), strict=True) ] - broadcasted = broadcast(*result_keys, *results) + broadcasted = broadcast(*result_keys, *results, stream=df.stream) # Handle order preservation of groups if maintain_order and not sorted: # The order we want @@ -1864,7 +1900,7 @@ def _apply_casts(df: DataFrame, casts: dict[str, DataType]) -> DataFrame: if target is None: columns.append(Column(col.obj, dtype=col.dtype, name=col.name)) else: - casted = col.astype(target) + casted = col.astype(target, stream=df.stream) columns.append(Column(casted.obj, dtype=casted.dtype, name=col.name)) return DataFrame(columns, stream=df.stream) @@ -1880,7 +1916,9 @@ class Predicate: def __init__(self, predicate: expr.Expr): self.predicate = predicate - ast_result = to_ast(predicate) + stream = get_cuda_stream() + ast_result = to_ast(predicate, stream=stream) + stream.synchronize() if ast_result is None: raise NotImplementedError( f"Conditional join with predicate {predicate}" @@ -1952,7 +1990,12 @@ def do_evaluate( context: IRExecutionContext, ) -> DataFrame: """Evaluate and return a dataframe.""" - stream = get_joined_cuda_stream(upstreams=(left.stream, right.stream)) + stream = get_joined_cuda_stream( + upstreams=( + left.stream, + right.stream, + ) + ) left_casts, right_casts = _collect_decimal_binop_casts( predicate_wrapper.predicate ) @@ -2276,10 +2319,12 @@ def do_evaluate( return DataFrame([*left_cols, *right_cols], stream=stream).slice(zlice) # TODO: Waiting on clarity based on https://github.com/pola-rs/polars/issues/17184 left_on = DataFrame( - broadcast(*(e.evaluate(left) for e in left_on_exprs)), stream=stream + broadcast(*(e.evaluate(left) for e in left_on_exprs), stream=stream), + stream=stream, ) right_on = DataFrame( - broadcast(*(e.evaluate(right) for e in right_on_exprs)), stream=stream + broadcast(*(e.evaluate(right) for e in right_on_exprs), stream=stream), + stream=stream, ) null_equality = ( plc.types.NullEquality.EQUAL @@ -2413,7 +2458,9 @@ def do_evaluate( columns = [c.evaluate(df) for c in exprs] if should_broadcast: columns = broadcast( - *columns, target_length=df.num_rows if df.num_columns != 0 else None + *columns, + target_length=df.num_rows if df.num_columns != 0 else None, + stream=df.stream, ) else: # Polars ensures this is true, but let's make sure nothing @@ -2492,6 +2539,7 @@ def do_evaluate( indices, keep, plc.types.NullEquality.EQUAL, + stream=df.stream, ) else: distinct = ( @@ -2575,7 +2623,9 @@ def do_evaluate( context: IRExecutionContext, ) -> DataFrame: """Evaluate and return a dataframe.""" - sort_keys = broadcast(*(k.evaluate(df) for k in by), target_length=df.num_rows) + sort_keys = broadcast( + *(k.evaluate(df) for k in by), target_length=df.num_rows, stream=df.stream + ) do_sort = plc.sorting.stable_sort_by_key if stable else plc.sorting.sort_by_key table = do_sort( df.table, @@ -2647,7 +2697,9 @@ def do_evaluate( cls, mask_expr: expr.NamedExpr, df: DataFrame, *, context: IRExecutionContext ) -> DataFrame: """Evaluate and return a dataframe.""" - (mask,) = broadcast(mask_expr.evaluate(df), target_length=df.num_rows) + (mask,) = broadcast( + mask_expr.evaluate(df), target_length=df.num_rows, stream=df.stream + ) return df.filter(mask) @@ -2671,7 +2723,9 @@ def do_evaluate( """Evaluate and return a dataframe.""" # This can reorder things. columns = broadcast( - *(df.column_map[name] for name in schema), target_length=df.num_rows + *(df.column_map[name] for name in schema), + target_length=df.num_rows, + stream=df.stream, ) return DataFrame(columns, stream=df.stream) @@ -2880,7 +2934,8 @@ def do_evaluate( plc.Column.from_arrow( pl.Series( values=pivotees, dtype=schema[variable_name].polars_type - ) + ), + stream=df.stream, ) ] ), @@ -2889,7 +2944,9 @@ def do_evaluate( ).columns() value_column = plc.concatenate.concatenate( [ - df.column_map[pivotee].astype(schema[value_name]).obj + df.column_map[pivotee] + .astype(schema[value_name], stream=df.stream) + .obj for pivotee in pivotees ], stream=df.stream, @@ -3021,7 +3078,10 @@ def do_evaluate( # Used to recombine decomposed expressions if should_broadcast: return DataFrame( - broadcast(*itertools.chain.from_iterable(df.columns for df in dfs)), + broadcast( + *itertools.chain.from_iterable(df.columns for df in dfs), + stream=stream, + ), stream=stream, ) diff --git a/python/cudf_polars/cudf_polars/dsl/to_ast.py b/python/cudf_polars/cudf_polars/dsl/to_ast.py index 320868a6c25..add5891444b 100644 --- a/python/cudf_polars/cudf_polars/dsl/to_ast.py +++ b/python/cudf_polars/cudf_polars/dsl/to_ast.py @@ -19,6 +19,8 @@ if TYPE_CHECKING: from collections.abc import Mapping + from rmm.pylibrmm.stream import Stream + # Can't merge these op-mapping dictionaries because scoped enum values # are exposed by cython with equality/hash based one their underlying @@ -103,6 +105,7 @@ class ASTState(TypedDict): """ for_parquet: bool + stream: Stream class ExprTransformerState(TypedDict): @@ -170,7 +173,9 @@ def _(node: expr.ColRef, self: Transformer) -> plc_expr.Expression: @_to_ast.register def _(node: expr.Literal, self: Transformer) -> plc_expr.Expression: - return plc_expr.Literal(plc.Scalar.from_py(node.value, node.dtype.plc_type)) + return plc_expr.Literal( + plc.Scalar.from_py(node.value, node.dtype.plc_type, stream=self.state["stream"]) + ) @_to_ast.register @@ -228,7 +233,9 @@ def _(node: expr.BooleanFunction, self: Transformer) -> plc_expr.Expression: else: plc_dtype = haystack.dtype.plc_type # pragma: no cover values = ( - plc_expr.Literal(plc.Scalar.from_py(val, plc_dtype)) + plc_expr.Literal( + plc.Scalar.from_py(val, plc_dtype, stream=self.state["stream"]) + ) for val in haystack.value ) return reduce( @@ -265,7 +272,7 @@ def _(node: expr.UnaryFunction, self: Transformer) -> plc_expr.Expression: ) -def to_parquet_filter(node: expr.Expr) -> plc_expr.Expression | None: +def to_parquet_filter(node: expr.Expr, stream: Stream) -> plc_expr.Expression | None: """ Convert an expression to libcudf AST nodes suitable for parquet filtering. @@ -273,19 +280,23 @@ def to_parquet_filter(node: expr.Expr) -> plc_expr.Expression | None: ---------- node Expression to convert. + stream + CUDA stream used for device memory operations and kernel launches. Returns ------- pylibcudf Expression if conversion is possible, otherwise None. """ - mapper: Transformer = CachingVisitor(_to_ast, state={"for_parquet": True}) + mapper: Transformer = CachingVisitor( + _to_ast, state={"for_parquet": True, "stream": stream} + ) try: return mapper(node) except (KeyError, NotImplementedError): return None -def to_ast(node: expr.Expr) -> plc_expr.Expression | None: +def to_ast(node: expr.Expr, stream: Stream) -> plc_expr.Expression | None: """ Convert an expression to libcudf AST nodes suitable for compute_column. @@ -293,6 +304,8 @@ def to_ast(node: expr.Expr) -> plc_expr.Expression | None: ---------- node Expression to convert. + stream + CUDA stream used for device memory operations and kernel launches. Notes ----- @@ -304,7 +317,9 @@ def to_ast(node: expr.Expr) -> plc_expr.Expression | None: ------- pylibcudf Expression if conversion is possible, otherwise None. """ - mapper: Transformer = CachingVisitor(_to_ast, state={"for_parquet": False}) + mapper: Transformer = CachingVisitor( + _to_ast, state={"for_parquet": False, "stream": stream} + ) try: return mapper(node) except (KeyError, NotImplementedError): diff --git a/python/cudf_polars/cudf_polars/dsl/utils/reshape.py b/python/cudf_polars/cudf_polars/dsl/utils/reshape.py index 511dc63e884..8143f9997b1 100644 --- a/python/cudf_polars/cudf_polars/dsl/utils/reshape.py +++ b/python/cudf_polars/cudf_polars/dsl/utils/reshape.py @@ -4,12 +4,19 @@ from __future__ import annotations +from typing import TYPE_CHECKING + import pylibcudf as plc from cudf_polars.containers import Column +if TYPE_CHECKING: + from rmm.pylibrmm.stream import Stream + -def broadcast(*columns: Column, target_length: int | None = None) -> list[Column]: +def broadcast( + *columns: Column, target_length: int | None = None, stream: Stream +) -> list[Column]: """ Broadcast a sequence of columns to a common length. @@ -20,6 +27,9 @@ def broadcast(*columns: Column, target_length: int | None = None) -> list[Column target_length Optional length to broadcast to. If not provided, uses the non-unit length of existing columns. + stream + CUDA stream used for device memory operations and kernel launches + on this dataframe. Returns ------- @@ -63,7 +73,9 @@ def broadcast(*columns: Column, target_length: int | None = None) -> list[Column column if column.size != 1 else Column( - plc.Column.from_scalar(column.obj_scalar, nrows), + plc.Column.from_scalar( + column.obj_scalar(stream=stream), nrows, stream=stream + ), is_sorted=plc.types.Sorted.YES, order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.BEFORE, diff --git a/python/cudf_polars/cudf_polars/dsl/utils/rolling.py b/python/cudf_polars/cudf_polars/dsl/utils/rolling.py index 139720e17d5..729706d165e 100644 --- a/python/cudf_polars/cudf_polars/dsl/utils/rolling.py +++ b/python/cudf_polars/cudf_polars/dsl/utils/rolling.py @@ -13,7 +13,7 @@ from cudf_polars.dsl.expressions.base import ExecutionContext from cudf_polars.dsl.utils.aggregations import apply_pre_evaluation from cudf_polars.dsl.utils.naming import unique_names -from cudf_polars.dsl.utils.windows import offsets_to_windows +from cudf_polars.dsl.utils.windows import duration_to_int if TYPE_CHECKING: from collections.abc import Sequence @@ -95,9 +95,9 @@ def rewrite_rolling( else: rolling_schema = schema apply_post_evaluation = lambda inp: inp # noqa: E731 - preceding, following = offsets_to_windows( - plc_index_dtype, options.rolling.offset, options.rolling.period - ) + preceding_ordinal = duration_to_int(plc_index_dtype, *options.rolling.offset) + following_ordinal = duration_to_int(plc_index_dtype, *options.rolling.period) + if (n := len(keys)) > 0: # Grouped rolling in polars sorts the output by the groups. inp = ir.Sort( @@ -113,8 +113,9 @@ def rewrite_rolling( ir.Rolling( rolling_schema, index, - preceding, - following, + plc_index_dtype, + preceding_ordinal, + following_ordinal, options.rolling.closed_window, keys, aggs, diff --git a/python/cudf_polars/cudf_polars/dsl/utils/windows.py b/python/cudf_polars/cudf_polars/dsl/utils/windows.py index 49aebc57fdf..e7b99d66d0d 100644 --- a/python/cudf_polars/cudf_polars/dsl/utils/windows.py +++ b/python/cudf_polars/cudf_polars/dsl/utils/windows.py @@ -12,7 +12,9 @@ import pylibcudf as plc if TYPE_CHECKING: - from cudf_polars.typing import ClosedInterval, Duration + from rmm.pylibrmm.stream import Stream + + from cudf_polars.typing import ClosedInterval __all__ = [ @@ -75,7 +77,7 @@ def duration_to_int( return -value if negative else value -def duration_to_scalar(dtype: plc.DataType, value: int) -> plc.Scalar: +def duration_to_scalar(dtype: plc.DataType, value: int, stream: Stream) -> plc.Scalar: """ Convert a raw polars duration value to a pylibcudf scalar. @@ -86,6 +88,9 @@ def duration_to_scalar(dtype: plc.DataType, value: int) -> plc.Scalar: value The raw value as in integer. If `dtype` represents a timestamp type, this should be in nanoseconds. + stream + CUDA stream used for device memory operations and kernel launches + on this dataframe. The returned scalar will be valid on this stream. Returns ------- @@ -99,20 +104,28 @@ def duration_to_scalar(dtype: plc.DataType, value: int) -> plc.Scalar: """ tid = dtype.id() if tid == plc.TypeId.INT64: - return plc.Scalar.from_py(value, dtype) + return plc.Scalar.from_py(value, dtype, stream=stream) elif tid == plc.TypeId.TIMESTAMP_NANOSECONDS: - return plc.Scalar.from_py(value, plc.DataType(plc.TypeId.DURATION_NANOSECONDS)) + return plc.Scalar.from_py( + value, plc.DataType(plc.TypeId.DURATION_NANOSECONDS), stream=stream + ) elif tid == plc.TypeId.TIMESTAMP_MICROSECONDS: return plc.Scalar.from_py( - value // 1000, plc.DataType(plc.TypeId.DURATION_MICROSECONDS) + value // 1000, + plc.DataType(plc.TypeId.DURATION_MICROSECONDS), + stream=stream, ) elif tid == plc.TypeId.TIMESTAMP_MILLISECONDS: return plc.Scalar.from_py( - value // 1_000_000, plc.DataType(plc.TypeId.DURATION_MILLISECONDS) + value // 1_000_000, + plc.DataType(plc.TypeId.DURATION_MILLISECONDS), + stream=stream, ) elif tid == plc.TypeId.TIMESTAMP_DAYS: return plc.Scalar.from_py( - value // 86_400_000_000_000, plc.DataType(plc.TypeId.DURATION_DAYS) + value // 86_400_000_000_000, + plc.DataType(plc.TypeId.DURATION_DAYS), + stream=stream, ) else: raise NotImplementedError( @@ -122,8 +135,9 @@ def duration_to_scalar(dtype: plc.DataType, value: int) -> plc.Scalar: def offsets_to_windows( dtype: plc.DataType, - offset: Duration, - period: Duration, + offset_i: int, + period_i: int, + stream: Stream, ) -> tuple[plc.Scalar, plc.Scalar]: """ Convert polars offset/period pair to preceding/following windows. @@ -132,21 +146,22 @@ def offsets_to_windows( ---------- dtype Datatype of column defining windows - offset - Offset duration - period - Period of window + offset_i + Integer ordinal representing the offset of the window. + See :func:`duration_to_int` for more details. + period_i + Integer ordinal representing the period of the window. + See :func:`duration_to_int` for more details. + stream + CUDA stream used for device memory operations and kernel launches Returns ------- - tuple of preceding and following windows as pylibcudf scalars. + tuple of preceding and following windows as host integers. """ - offset_i = duration_to_int(dtype, *offset) - period_i = duration_to_int(dtype, *period) - # Polars uses current_row + offset, ..., current_row + offset + period - # Libcudf uses current_row - preceding, ..., current_row + following - return duration_to_scalar(dtype, -offset_i), duration_to_scalar( - dtype, offset_i + period_i + return ( + duration_to_scalar(dtype, -offset_i, stream=stream), + duration_to_scalar(dtype, offset_i + period_i, stream=stream), ) diff --git a/python/cudf_polars/cudf_polars/experimental/base.py b/python/cudf_polars/cudf_polars/experimental/base.py index 4ce273c4223..00e572df296 100644 --- a/python/cudf_polars/cudf_polars/experimental/base.py +++ b/python/cudf_polars/cudf_polars/experimental/base.py @@ -121,7 +121,10 @@ def row_count(self) -> ColumnStat[int]: # pragma: no cover """Data source row-count estimate.""" raise NotImplementedError("Sub-class must implement row_count.") - def unique_stats(self, column: str) -> UniqueStats: # pragma: no cover + def unique_stats( + self, + column: str, + ) -> UniqueStats: # pragma: no cover """Return unique-value statistics for a column.""" raise NotImplementedError("Sub-class must implement unique_stats.") diff --git a/python/cudf_polars/cudf_polars/experimental/dask_registers.py b/python/cudf_polars/cudf_polars/experimental/dask_registers.py index 2c4e155252f..fe623723187 100644 --- a/python/cudf_polars/cudf_polars/experimental/dask_registers.py +++ b/python/cudf_polars/cudf_polars/experimental/dask_registers.py @@ -78,7 +78,7 @@ def serialize_column_or_frame( DataFrameHeader | ColumnHeader, list[memoryview[bytes] | plc.gpumemoryview] ]: with log_errors(): - header, frames = x.serialize() + header, frames = x.serialize(stream=get_dask_cuda_stream()) # Dask expect a list of frames return header, list(frames) @@ -118,8 +118,9 @@ def dask_serialize_column_or_frame( ) -> tuple[ DataFrameHeader | ColumnHeader, tuple[memoryview[bytes], memoryview[bytes]] ]: + stream = get_dask_cuda_stream() with log_errors(): - header, (metadata, gpudata) = x.serialize() + header, (metadata, gpudata) = x.serialize(stream=stream) # For robustness, we check that the gpu data is contiguous cai = gpudata.__cuda_array_interface__ diff --git a/python/cudf_polars/cudf_polars/experimental/dispatch.py b/python/cudf_polars/cudf_polars/experimental/dispatch.py index 677bca8252b..35f2991b6d8 100644 --- a/python/cudf_polars/cudf_polars/experimental/dispatch.py +++ b/python/cudf_polars/cudf_polars/experimental/dispatch.py @@ -143,7 +143,9 @@ def initialize_column_stats( @singledispatch def update_column_stats( - ir: IR, stats: StatsCollector, config_options: ConfigOptions + ir: IR, + stats: StatsCollector, + config_options: ConfigOptions, ) -> None: """ Finalize local column statistics for an IR node. diff --git a/python/cudf_polars/cudf_polars/experimental/explain.py b/python/cudf_polars/cudf_polars/experimental/explain.py index 905ade6e9a0..12dedd5b38a 100644 --- a/python/cudf_polars/cudf_polars/experimental/explain.py +++ b/python/cudf_polars/cudf_polars/experimental/explain.py @@ -34,7 +34,10 @@ def explain_query( - q: pl.LazyFrame, engine: pl.GPUEngine, *, physical: bool = True + q: pl.LazyFrame, + engine: pl.GPUEngine, + *, + physical: bool = True, ) -> str: """ Return a formatted string representation of the IR plan. diff --git a/python/cudf_polars/cudf_polars/experimental/io.py b/python/cudf_polars/cudf_polars/experimental/io.py index bcc837fd94a..25496916513 100644 --- a/python/cudf_polars/cudf_polars/experimental/io.py +++ b/python/cudf_polars/cudf_polars/experimental/io.py @@ -39,6 +39,7 @@ get_key_name, ) from cudf_polars.experimental.dispatch import generate_ir_tasks, lower_ir_node +from cudf_polars.utils.cuda_stream import get_cuda_stream if TYPE_CHECKING: from collections.abc import Hashable, MutableMapping @@ -438,7 +439,9 @@ def _sink_to_parquet_file( plc.io.parquet.ChunkedParquetWriterOptions.builder(sink), options ) writer_options = builder.metadata(metadata).build() - writer = plc.io.parquet.ChunkedParquetWriter.from_options(writer_options) + writer = plc.io.parquet.ChunkedParquetWriter.from_options( + writer_options, stream=df.stream + ) # Append to the open Parquet file. assert isinstance(writer, plc.io.parquet.ChunkedParquetWriter), ( @@ -762,7 +765,8 @@ def _sample_row_groups(self) -> None: ).build() options.set_columns(key_columns) options.set_row_groups(list(samples.values())) - tbl_w_meta = plc.io.parquet.read_parquet(options) + stream = get_cuda_stream() + tbl_w_meta = plc.io.parquet.read_parquet(options, stream=stream) row_group_num_rows = tbl_w_meta.tbl.num_rows() for name, column in zip( tbl_w_meta.column_names(include_children=False), @@ -773,6 +777,7 @@ def _sample_row_groups(self) -> None: column, plc.types.NullPolicy.INCLUDE, plc.types.NanPolicy.NAN_IS_NULL, + stream=stream, ) fraction = row_group_unique_count / row_group_num_rows # Assume that if every row is unique then this is a @@ -792,6 +797,7 @@ def _sample_row_groups(self) -> None: ColumnStat[int](value=count, exact=exact), ColumnStat[float](value=fraction, exact=exact), ) + stream.synchronize() def _update_unique_stats(self, column: str) -> None: if column not in self._unique_stats and column in self.metadata.column_names: diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index 54c3f879612..531d21fcb47 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -212,11 +212,15 @@ def _hash_partition_dataframe( # partition for each row partition_map = plc.binaryop.binary_operation( plc.hashing.murmurhash3_x86_32( - DataFrame([expr.evaluate(df) for expr in on], stream=df.stream).table + DataFrame([expr.evaluate(df) for expr in on], stream=df.stream).table, + stream=df.stream, + ), + plc.Scalar.from_py( + partition_count, plc.DataType(plc.TypeId.UINT32), stream=df.stream ), - plc.Scalar.from_py(partition_count, plc.DataType(plc.TypeId.UINT32)), plc.binaryop.BinaryOperator.PYMOD, plc.types.DataType(plc.types.TypeId.UINT32), + stream=df.stream, ) # Apply partitioning @@ -224,6 +228,7 @@ def _hash_partition_dataframe( df.table, partition_map, partition_count, + stream=df.stream, ) splits = offsets[1:-1] @@ -235,7 +240,7 @@ def _hash_partition_dataframe( df.dtypes, df.stream, ) - for i, split in enumerate(plc.copying.split(t, splits)) + for i, split in enumerate(plc.copying.split(t, splits, stream=df.stream)) } diff --git a/python/cudf_polars/cudf_polars/experimental/sort.py b/python/cudf_polars/cudf_polars/experimental/sort.py index 0d5450e786d..f12c22704b5 100644 --- a/python/cudf_polars/cudf_polars/experimental/sort.py +++ b/python/cudf_polars/cudf_polars/experimental/sort.py @@ -23,10 +23,13 @@ from cudf_polars.experimental.shuffle import _simple_shuffle_graph from cudf_polars.experimental.utils import _concat, _fallback_inform, _lower_ir_fallback from cudf_polars.utils.config import ShuffleMethod +from cudf_polars.utils.cuda_stream import get_joined_cuda_stream if TYPE_CHECKING: from collections.abc import MutableMapping, Sequence + from rmm.pylibrmm.stream import Stream + from cudf_polars.dsl.expr import NamedExpr from cudf_polars.dsl.ir import IRExecutionContext from cudf_polars.experimental.dispatch import LowerIRTransformer @@ -39,6 +42,7 @@ def find_sort_splits( my_part_id: int, column_order: Sequence[plc.types.Order], null_order: Sequence[plc.types.NullOrder], + stream: Stream, ) -> list[int]: """ Find local sort splits given all (global) split candidates. @@ -61,6 +65,10 @@ def find_sort_splits( The order in which tbl is sorted. null_order The null order in which tbl is sorted. + stream + CUDA stream used for device memory operations and kernel launches. + The values in both ``tbl`` and ``sort_boundaries`` must be valid on + ``stream``. Returns ------- @@ -76,10 +84,18 @@ def find_sort_splits( # Now we find the first and last row in the local table corresponding to the split value # (first and last, because there may be multiple rows with the same split value) split_first_col = plc.search.lower_bound( - tbl, sort_boundaries, column_order, null_order + tbl, + sort_boundaries, + column_order, + null_order, + stream=stream, ) split_last_col = plc.search.upper_bound( - tbl, sort_boundaries, column_order, null_order + tbl, + sort_boundaries, + column_order, + null_order, + stream=stream, ) # And convert to list for final processing split_first_list = pl.Series(split_first_col).to_list() @@ -132,12 +148,16 @@ def _select_local_split_candidates( [ *df.columns, Column( - plc.column_factories.make_empty_column(part_id_dtype.plc_type), + plc.column_factories.make_empty_column( + part_id_dtype.plc_type, stream=df.stream + ), dtype=part_id_dtype, name=next(name_gen), ), Column( - plc.column_factories.make_empty_column(part_id_dtype.plc_type), + plc.column_factories.make_empty_column( + part_id_dtype.plc_type, stream=df.stream + ), dtype=part_id_dtype, name=next(name_gen), ), @@ -146,12 +166,17 @@ def _select_local_split_candidates( ) candidates = [i * df.num_rows // num_partitions for i in range(num_partitions)] - row_id = plc.Column.from_iterable_of_py(candidates, part_id_dtype.plc_type) + row_id = plc.Column.from_iterable_of_py( + candidates, part_id_dtype.plc_type, stream=df.stream + ) - res = plc.copying.gather(df.table, row_id, plc.copying.OutOfBoundsPolicy.DONT_CHECK) + res = plc.copying.gather( + df.table, row_id, plc.copying.OutOfBoundsPolicy.DONT_CHECK, stream=df.stream + ) part_id = plc.Column.from_scalar( - plc.Scalar.from_py(my_part_id, part_id_dtype.plc_type), + plc.Scalar.from_py(my_part_id, part_id_dtype.plc_type, stream=df.stream), len(candidates), + stream=df.stream, ) return DataFrame.from_table( @@ -194,16 +219,21 @@ def _get_final_sort_boundaries( # split candidates has the additional partition_id and row_number columns column_order + [plc.types.Order.ASCENDING] * 2, null_order + [plc.types.NullOrder.AFTER] * 2, + stream=sort_boundaries_candidates.stream, ) selected_candidates = plc.Column.from_iterable_of_py( [ i * sorted_candidates.num_rows() // num_partitions for i in range(1, num_partitions) - ] + ], + stream=sort_boundaries_candidates.stream, ) # Get the actual values at which we will split the data sort_boundaries = plc.copying.gather( - sorted_candidates, selected_candidates, plc.copying.OutOfBoundsPolicy.DONT_CHECK + sorted_candidates, + selected_candidates, + plc.copying.OutOfBoundsPolicy.DONT_CHECK, + stream=sort_boundaries_candidates.stream, ) return DataFrame.from_table( @@ -287,19 +317,24 @@ def insert_partition( by = options["by"] + stream = get_joined_cuda_stream(upstreams=(df.stream, sort_boundaries.stream)) + splits = find_sort_splits( df.select(by).table, sort_boundaries.table, partition_id, options["order"], options["null_order"], + stream=stream, ) packed_inputs = split_and_pack( df.table, splits=splits, br=context.br, - stream=DEFAULT_STREAM, + stream=stream, ) + # TODO: figure out handoff with rapidsmpf + # https://github.com/rapidsai/cudf/issues/20337 shuffler.insert_chunks(packed_inputs) @staticmethod @@ -330,6 +365,8 @@ def extract_partition( # TODO: When sorting, this step should finalize with a merge (unless we # require stability, as cudf merge is not stable). + # TODO: figure out handoff with rapidsmpf + # https://github.com/rapidsai/cudf/issues/20337 return DataFrame.from_table( unpack_and_concat( unspill_partitions( @@ -374,12 +411,15 @@ def _sort_partition_dataframe( # Fast path for empty DataFrame return dict.fromkeys(range(partition_count), df) + stream = get_joined_cuda_stream(upstreams=(df.stream, sort_boundaries.stream)) + splits = find_sort_splits( df.select(options["by"]).table, sort_boundaries.table, partition_id, options["order"], options["null_order"], + stream=stream, ) # Split and return the partitioned result @@ -390,7 +430,7 @@ def _sort_partition_dataframe( df.dtypes, stream=df.stream, ) - for i, split in enumerate(plc.copying.split(df.table, splits)) + for i, split in enumerate(plc.copying.split(df.table, splits, stream=stream)) } diff --git a/python/cudf_polars/cudf_polars/experimental/statistics.py b/python/cudf_polars/cudf_polars/experimental/statistics.py index ca80cb5d1f4..46c202568fb 100644 --- a/python/cudf_polars/cudf_polars/experimental/statistics.py +++ b/python/cudf_polars/cudf_polars/experimental/statistics.py @@ -37,6 +37,7 @@ from cudf_polars.experimental.expressions import _SUPPORTED_AGGS from cudf_polars.experimental.utils import _leaf_column_names from cudf_polars.utils import conversion +from cudf_polars.utils.cuda_stream import get_cuda_stream if TYPE_CHECKING: from collections.abc import Mapping, Sequence @@ -47,7 +48,10 @@ from cudf_polars.utils.config import ConfigOptions, StatsPlanningOptions -def collect_statistics(root: IR, config_options: ConfigOptions) -> StatsCollector: +def collect_statistics( + root: IR, + config_options: ConfigOptions, +) -> StatsCollector: """ Collect column statistics for a query. @@ -607,7 +611,12 @@ def _(ir: IR, stats: StatsCollector, config_options: ConfigOptions) -> None: @update_column_stats.register(DataFrameScan) -def _(ir: DataFrameScan, stats: StatsCollector, config_options: ConfigOptions) -> None: +def _( + ir: DataFrameScan, + stats: StatsCollector, + config_options: ConfigOptions, +) -> None: + stream = get_cuda_stream() # Use datasource row-count estimate. if stats.column_stats[ir]: stats.row_count[ir] = next( @@ -620,15 +629,23 @@ def _(ir: DataFrameScan, stats: StatsCollector, config_options: ConfigOptions) - for column_stats in stats.column_stats[ir].values(): if column_stats.source_info.implied_unique_count.value is None: # We don't have a unique-count estimate, so we need to sample the data. - source_unique_stats = column_stats.source_info.unique_stats(force=False) + source_unique_stats = column_stats.source_info.unique_stats( + force=False, + ) if source_unique_stats.count.value is not None: column_stats.unique_count = source_unique_stats.count else: column_stats.unique_count = column_stats.source_info.implied_unique_count + stream.synchronize() + @update_column_stats.register(Scan) -def _(ir: Scan, stats: StatsCollector, config_options: ConfigOptions) -> None: +def _( + ir: Scan, + stats: StatsCollector, + config_options: ConfigOptions, +) -> None: # Use datasource row-count estimate. if stats.column_stats[ir]: stats.row_count[ir] = next( @@ -649,7 +666,9 @@ def _(ir: Scan, stats: StatsCollector, config_options: ConfigOptions) -> None: for column_stats in stats.column_stats[ir].values(): if column_stats.source_info.implied_unique_count.value is None: # We don't have a unique-count estimate, so we need to sample the data. - source_unique_stats = column_stats.source_info.unique_stats(force=False) + source_unique_stats = column_stats.source_info.unique_stats( + force=False, + ) if source_unique_stats.count.value is not None: column_stats.unique_count = source_unique_stats.count elif ( diff --git a/python/cudf_polars/tests/containers/test_column.py b/python/cudf_polars/tests/containers/test_column.py index 7b0ea76aebb..2aef4ec7b5c 100644 --- a/python/cudf_polars/tests/containers/test_column.py +++ b/python/cudf_polars/tests/containers/test_column.py @@ -12,6 +12,7 @@ import cudf_polars.containers.column import cudf_polars.containers.datatype from cudf_polars.containers import Column, DataType +from cudf_polars.utils.cuda_stream import get_cuda_stream def test_non_scalar_access_raises(): @@ -23,17 +24,32 @@ def test_non_scalar_access_raises(): dtype=dtype, ) with pytest.raises(ValueError): - _ = column.obj_scalar + _ = column.obj_scalar(stream=get_cuda_stream()) + + +def test_obj_scalar_caching(): + stream = get_cuda_stream() + dtype = DataType(pl.Int8()) + column = Column( + plc.Column.from_iterable_of_py([1], dtype.plc_type), + dtype=dtype, + ) + assert column.obj_scalar(stream=stream).to_py() == 1 + # test caching behavior + assert column.obj_scalar(stream=stream).to_py() == 1 def test_check_sorted(): + stream = get_cuda_stream() dtype = DataType(pl.Int8()) column = Column( plc.Column.from_iterable_of_py([0, 1, 2], dtype.plc_type), dtype=dtype, ) assert column.check_sorted( - order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.AFTER + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.AFTER, + stream=stream, ) column.set_sorted( is_sorted=plc.types.Sorted.YES, @@ -41,12 +57,15 @@ def test_check_sorted(): null_order=plc.types.NullOrder.AFTER, ) assert column.check_sorted( - order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.AFTER + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.AFTER, + stream=stream, ) @pytest.mark.parametrize("length", [0, 1]) def test_length_leq_one_always_sorted(length): + stream = get_cuda_stream() dtype = DataType(pl.Int8()) column = Column( plc.column_factories.make_numeric_column( @@ -55,10 +74,14 @@ def test_length_leq_one_always_sorted(length): dtype=dtype, ) assert column.check_sorted( - order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.AFTER + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.AFTER, + stream=stream, ) assert column.check_sorted( - order=plc.types.Order.DESCENDING, null_order=plc.types.NullOrder.AFTER + order=plc.types.Order.DESCENDING, + null_order=plc.types.NullOrder.AFTER, + stream=stream, ) column.set_sorted( @@ -67,10 +90,14 @@ def test_length_leq_one_always_sorted(length): null_order=plc.types.NullOrder.AFTER, ) assert column.check_sorted( - order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.AFTER + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.AFTER, + stream=stream, ) assert column.check_sorted( - order=plc.types.Order.DESCENDING, null_order=plc.types.NullOrder.AFTER + order=plc.types.Order.DESCENDING, + null_order=plc.types.NullOrder.AFTER, + stream=stream, ) @@ -94,27 +121,32 @@ def test_shallow_copy(): @pytest.mark.parametrize("typeid", [pl.Int8(), pl.Float32()]) def test_mask_nans(typeid): + stream = get_cuda_stream() dtype = DataType(typeid) column = Column( plc.Column.from_iterable_of_py([0, 0, 0], dtype=dtype.plc_type), dtype=dtype ) - masked = column.mask_nans() + masked = column.mask_nans(stream=stream) assert column.null_count == masked.null_count def test_mask_nans_float(): + stream = get_cuda_stream() dtype = DataType(pl.Float32()) column = Column( plc.Column.from_iterable_of_py([0, 0, float("nan")], dtype=dtype.plc_type), dtype=dtype, ) - masked = column.mask_nans() - assert masked.nan_count == 0 - assert masked.slice((0, 2)).null_count == 0 - assert masked.slice((2, 1)).null_count == 1 + masked = column.mask_nans(stream=stream) + assert masked.nan_count(stream=stream) == 0 + # test caching behavior + assert masked.nan_count(stream=stream) == 0 + assert masked.slice((0, 2), stream=stream).null_count == 0 + assert masked.slice((2, 1), stream=stream).null_count == 1 def test_slice_none_returns_self(): + stream = get_cuda_stream() dtype = DataType(pl.Int8()) column = Column( plc.column_factories.make_numeric_column( @@ -122,7 +154,7 @@ def test_slice_none_returns_self(): ), dtype=dtype, ) - assert column.slice(None) is column + assert column.slice(None, stream=stream) is column def test_deserialize_ctor_kwargs_invalid_dtype(): @@ -158,6 +190,7 @@ def test_deserialize_ctor_kwargs_list_dtype(): def test_serialize_cache_miss(): + stream = get_cuda_stream() dtype = DataType(pl.Int8()) column = Column( plc.column_factories.make_numeric_column( @@ -165,7 +198,7 @@ def test_serialize_cache_miss(): ), dtype=dtype, ) - header, frames = column.serialize() + header, frames = column.serialize(stream=stream) assert header == {"column_kwargs": column.serialize_ctor_kwargs(), "frame_count": 2} assert len(frames) == 2 assert frames[0].nbytes > 0 diff --git a/python/cudf_polars/tests/dsl/test_to_ast.py b/python/cudf_polars/tests/dsl/test_to_ast.py index 30fe59f8f20..c25b29b8e14 100644 --- a/python/cudf_polars/tests/dsl/test_to_ast.py +++ b/python/cudf_polars/tests/dsl/test_to_ast.py @@ -78,7 +78,7 @@ def compute_column(e): ) with pytest.raises(NotImplementedError): e_with_colrefs.evaluate(table) - ast = to_ast(e_with_colrefs) + ast = to_ast(e_with_colrefs, stream=stream) if ast is not None: return NamedColumn( plc.transform.compute_column(table.table, ast, stream=stream), @@ -103,10 +103,11 @@ def test_invalid_colref_construction_raises(): def test_to_ast_without_colref_raises(): + stream = get_cuda_stream() col = expr_nodes.Col(DataType(pl.datatypes.Int8()), "a") - with pytest.raises(TypeError): - to_ast(col) + with pytest.raises(TypeError, match="Should always be wrapped"): + to_ast(col, stream=stream) def test_to_parquet_filter_with_colref_raises(): @@ -114,4 +115,4 @@ def test_to_parquet_filter_with_colref_raises(): colref = expr_nodes.ColRef(col.dtype, 0, plc.expressions.TableReference.LEFT, col) with pytest.raises(TypeError): - to_parquet_filter(colref) + to_parquet_filter(colref, stream=get_cuda_stream()) diff --git a/python/cudf_polars/tests/experimental/test_stats.py b/python/cudf_polars/tests/experimental/test_stats.py index c37764e0e79..a44fb7ca238 100644 --- a/python/cudf_polars/tests/experimental/test_stats.py +++ b/python/cudf_polars/tests/experimental/test_stats.py @@ -72,14 +72,20 @@ def test_base_stats_dataframescan(df, engine): # We need to use force=True to sample unique-value statistics, # because nothing in the query requires unique-value statistics. assert math.isclose( - source_info_x.unique_stats(force=True).count.value, row_count, rel_tol=5e-2 + source_info_x.unique_stats(force=True).count.value, + row_count, + rel_tol=5e-2, ) assert math.isclose( - source_info_x.unique_stats(force=True).fraction.value, 1.0, abs_tol=1e-2 + source_info_x.unique_stats(force=True).fraction.value, + 1.0, + abs_tol=1e-2, ) assert not source_info_x.unique_stats(force=True).count.exact assert math.isclose( - source_info_y.unique_stats(force=True).count.value, 3, rel_tol=5e-2 + source_info_y.unique_stats(force=True).count.value, + 3, + rel_tol=5e-2, ) assert math.isclose( source_info_y.unique_stats(force=True).fraction.value, @@ -88,7 +94,9 @@ def test_base_stats_dataframescan(df, engine): ) assert not source_info_y.unique_stats(force=True).count.exact assert math.isclose( - source_info_z.unique_stats(force=True).count.value, 5, rel_tol=5e-2 + source_info_z.unique_stats(force=True).count.value, + 5, + rel_tol=5e-2, ) assert math.isclose( source_info_z.unique_stats(force=True).fraction.value, diff --git a/python/cudf_polars/tests/utils/test_broadcast.py b/python/cudf_polars/tests/utils/test_broadcast.py index 15981aeffea..34feee74cd9 100644 --- a/python/cudf_polars/tests/utils/test_broadcast.py +++ b/python/cudf_polars/tests/utils/test_broadcast.py @@ -11,21 +11,23 @@ from cudf_polars.containers import Column, DataType from cudf_polars.dsl.ir import broadcast +from cudf_polars.utils.cuda_stream import get_cuda_stream @pytest.mark.parametrize("target", [4, None]) def test_broadcast_all_scalar(target): + stream = get_cuda_stream() columns = [ Column( plc.column_factories.make_numeric_column( - plc.DataType(plc.TypeId.INT8), 1, plc.MaskState.ALL_VALID + plc.DataType(plc.TypeId.INT8), 1, plc.MaskState.ALL_VALID, stream=stream ), name=f"col{i}", dtype=DataType(pl.Int8()), ) for i in range(3) ] - result = broadcast(*columns, target_length=target) + result = broadcast(*columns, target_length=target, stream=stream) expected = 1 if target is None else target assert [c.name for c in result] == [f"col{i}" for i in range(3)] @@ -33,11 +35,12 @@ def test_broadcast_all_scalar(target): def test_invalid_target_length(): + stream = get_cuda_stream() dtype = DataType(pl.Int8()) columns = [ Column( plc.column_factories.make_numeric_column( - dtype.plc_type, 4, plc.MaskState.ALL_VALID + dtype.plc_type, 4, plc.MaskState.ALL_VALID, stream=stream ), dtype=dtype, name=f"col{i}", @@ -45,15 +48,16 @@ def test_invalid_target_length(): for i in range(3) ] with pytest.raises(RuntimeError): - _ = broadcast(*columns, target_length=8) + _ = broadcast(*columns, target_length=8, stream=stream) def test_broadcast_mismatching_column_lengths(): + stream = get_cuda_stream() dtype = DataType(pl.Int8()) columns = [ Column( plc.column_factories.make_numeric_column( - dtype.plc_type, i + 1, plc.MaskState.ALL_VALID + dtype.plc_type, i + 1, plc.MaskState.ALL_VALID, stream=stream ), dtype=dtype, name=f"col{i}", @@ -61,11 +65,12 @@ def test_broadcast_mismatching_column_lengths(): for i in range(3) ] with pytest.raises(RuntimeError): - _ = broadcast(*columns) + _ = broadcast(*columns, stream=stream) @pytest.mark.parametrize("nrows", [0, 5]) def test_broadcast_with_scalars(nrows): + stream = get_cuda_stream() dtype = DataType(pl.Int8()) columns = [ Column( @@ -73,6 +78,7 @@ def test_broadcast_with_scalars(nrows): dtype.plc_type, nrows if i == 0 else 1, plc.MaskState.ALL_VALID, + stream=stream, ), dtype=dtype, name=f"col{i}", @@ -80,6 +86,6 @@ def test_broadcast_with_scalars(nrows): for i in range(3) ] - result = broadcast(*columns) + result = broadcast(*columns, stream=stream) assert [c.name for c in result] == [f"col{i}" for i in range(3)] assert all(column.size == nrows for column in result)