diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/base.py b/python/cudf_polars/cudf_polars/dsl/expressions/base.py index 8ba3f9f407c..680d176f83f 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/base.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/base.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 # TODO: remove need for this # ruff: noqa: D101 @@ -18,6 +18,8 @@ if TYPE_CHECKING: from collections.abc import Mapping + from typing_extensions import Self + from cudf_polars.containers import Column, DataFrame __all__ = ["AggInfo", "Col", "ColRef", "ExecutionContext", "Expr", "NamedExpr"] @@ -237,6 +239,24 @@ def collect_agg(self, *, depth: int) -> AggInfo: """Collect information about aggregations in groupbys.""" return self.value.collect_agg(depth=depth) + def reconstruct(self, expr: Expr) -> Self: + """ + Rebuild with a new `Expr` value. + + Parameters + ---------- + expr + New `Expr` value + + Returns + ------- + New `NamedExpr` with `expr` as the underlying expression. + The name of the original `NamedExpr` is preserved. + """ + if expr is self.value: + return self + return type(self)(self.name, expr) + class Col(Expr): __slots__ = ("name",) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index cbac8d21aa2..04daf7c6028 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -53,6 +53,7 @@ "ConditionalJoin", "DataFrameScan", "Distinct", + "Empty", "ErrorNode", "Filter", "GroupBy", @@ -1953,12 +1954,18 @@ def do_evaluate(cls, zlice: Zlice | None, *dfs: DataFrame) -> DataFrame: class HConcat(IR): """Concatenate dataframes horizontally.""" - __slots__ = () - _non_child = ("schema",) + __slots__ = ("should_broadcast",) + _non_child = ("schema", "should_broadcast") - def __init__(self, schema: Schema, *children: IR): + def __init__( + self, + schema: Schema, + should_broadcast: bool, # noqa: FBT001 + *children: IR, + ): self.schema = schema - self._non_child_args = () + self.should_broadcast = should_broadcast + self._non_child_args = (should_broadcast,) self.children = children @staticmethod @@ -1990,8 +1997,19 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table: ) @classmethod - def do_evaluate(cls, *dfs: DataFrame) -> DataFrame: + def do_evaluate( + cls, + should_broadcast: bool, # noqa: FBT001 + *dfs: DataFrame, + ) -> DataFrame: """Evaluate and return a dataframe.""" + # Special should_broadcast case. + # Used to recombine decomposed expressions + if should_broadcast: + return DataFrame( + broadcast(*itertools.chain.from_iterable(df.columns for df in dfs)) + ) + max_rows = max(df.num_rows for df in dfs) # Horizontal concatenation extends shorter tables with nulls return DataFrame( @@ -2008,3 +2026,20 @@ def do_evaluate(cls, *dfs: DataFrame) -> DataFrame: ) ) ) + + +class Empty(IR): + """Represents an empty DataFrame.""" + + __slots__ = () + _non_child = () + + def __init__(self) -> None: + self.schema = {} + self._non_child_args = () + self.children = () + + @classmethod + def do_evaluate(cls) -> DataFrame: + """Evaluate and return a dataframe.""" + return DataFrame([]) diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index ed6fcd8ad45..6350853e3c4 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -491,7 +491,11 @@ def _( def _( node: pl_ir.HConcat, translator: Translator, schema: dict[str, plc.DataType] ) -> ir.IR: - return ir.HConcat(schema, *(translator.translate_ir(n=n) for n in node.inputs)) + return ir.HConcat( + schema, + False, # noqa: FBT003 + *(translator.translate_ir(n=n) for n in node.inputs), + ) def translate_named_expr( diff --git a/python/cudf_polars/cudf_polars/dsl/utils/__init__.py b/python/cudf_polars/cudf_polars/dsl/utils/__init__.py new file mode 100644 index 00000000000..7a20c682676 --- /dev/null +++ b/python/cudf_polars/cudf_polars/dsl/utils/__init__.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +"""DSL utilities.""" + +from __future__ import annotations + +__all__: list[str] = [] diff --git a/python/cudf_polars/cudf_polars/dsl/utils/naming.py b/python/cudf_polars/cudf_polars/dsl/utils/naming.py new file mode 100644 index 00000000000..65eedbb1495 --- /dev/null +++ b/python/cudf_polars/cudf_polars/dsl/utils/naming.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +"""Name generation utilities.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from collections.abc import Generator, Iterable + + +__all__ = ["unique_names"] + + +def unique_names(names: Iterable[str]) -> Generator[str, None, None]: + """ + Generate unique names relative to some known names. + + Parameters + ---------- + names + Names we should be unique with respect to. + + Yields + ------ + Unique names (just using sequence numbers) + """ + prefix = "_" * max(map(len, names)) + i = 0 + while True: + yield f"{prefix}{i}" + i += 1 diff --git a/python/cudf_polars/cudf_polars/experimental/expressions.py b/python/cudf_polars/cudf_polars/experimental/expressions.py new file mode 100644 index 00000000000..ec0c8e3982b --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/expressions.py @@ -0,0 +1,439 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +""" +Multi-partition Expr classes and utilities. + +This module includes the necessary functionality to +decompose a non-pointwise expression graph into stages +that can each be mapped onto a simple partition-wise +task graph at execution time. + +For example, if ``Select.exprs`` contains an ``expr.Agg`` +node, ``decompose_expr_graph`` will decompose the complex +NamedExpr node into a sequence of three new IR nodes:: + + - Select: Partition-wise aggregation logic. + - Repartition: Concatenate the results of each partition. + - Select: Final aggregation on the combined results. + +In this example, the Select stages are mapped onto a simple +partition-wise task graph at execution time, and the Repartition +stage is used to capture the data-movement required for a global +aggregation. At the moment, data movement is always introduced +by either repartitioning or shuffling the data. + +Since we are introducing intermediate IR nodes, we are also +introducing a temporary column for each intermediate result. +In order to avoid column-name collisions with the original +input-IR node, we generate unique names for temporary columns +and concatenate them to the input-IR node using ``HConcat``. +""" + +from __future__ import annotations + +import operator +from functools import reduce +from typing import TYPE_CHECKING + +import pylibcudf as plc + +from cudf_polars.dsl.expressions.aggregation import Agg +from cudf_polars.dsl.expressions.base import Col, Expr, NamedExpr +from cudf_polars.dsl.expressions.binaryop import BinOp +from cudf_polars.dsl.expressions.literal import Literal +from cudf_polars.dsl.expressions.unary import Cast, UnaryFunction +from cudf_polars.dsl.ir import Empty, HConcat, Select +from cudf_polars.dsl.traversal import ( + CachingVisitor, +) +from cudf_polars.dsl.utils.naming import unique_names +from cudf_polars.experimental.base import PartitionInfo +from cudf_polars.experimental.repartition import Repartition + +if TYPE_CHECKING: + from collections.abc import Generator, MutableMapping, Sequence + from typing import Any, TypeAlias + + from cudf_polars.dsl.expressions.base import Expr + from cudf_polars.dsl.ir import IR + from cudf_polars.typing import GenericTransformer + from cudf_polars.utils.config import ConfigOptions + + +ExprDecomposer: TypeAlias = ( + "GenericTransformer[Expr, tuple[Expr, IR, MutableMapping[IR, PartitionInfo]]]" +) + + +def select( + exprs: Sequence[Expr], + input_ir: IR, + partition_info: MutableMapping[IR, PartitionInfo], + *, + names: Generator[str, None, None], + repartition: bool = False, +) -> tuple[list[Col], IR, MutableMapping[IR, PartitionInfo]]: + """ + Select expressions from an IR node, introducing temporaries. + + Parameters + ---------- + exprs + Expressions to select. + input_ir + The input IR node to select from. + partition_info + A mapping from all unique IR nodes to the + associated partitioning information. + names + Generator of unique names for temporaries. + repartition + Whether to add a Repartition node after the + new selection. + + Returns + ------- + columns + Expressions to select from the new IR output. + new_ir + The new IR node that will introduce temporaries. + partition_info + A mapping from unique nodes in the new graph to associated + partitioning information. + """ + output_names = [next(names) for _ in range(len(exprs))] + named_exprs = [ + NamedExpr(name, expr) for name, expr in zip(output_names, exprs, strict=True) + ] + new_ir: IR = Select( + {ne.name: ne.value.dtype for ne in named_exprs}, + named_exprs, + True, # noqa: FBT003 + input_ir, + ) + partition_info[new_ir] = PartitionInfo(count=partition_info[input_ir].count) + + # Optionally collapse into one output partition + if repartition: + new_ir = Repartition(new_ir.schema, new_ir) + partition_info[new_ir] = PartitionInfo(count=1) + + columns = [Col(ne.value.dtype, ne.name) for ne in named_exprs] + return columns, new_ir, partition_info + + +def _decompose_agg_node( + agg: Agg, + input_ir: IR, + partition_info: MutableMapping[IR, PartitionInfo], + config_options: ConfigOptions, + *, + names: Generator[str, None, None], +) -> tuple[Expr, IR, MutableMapping[IR, PartitionInfo]]: + """ + Decompose an agg expression into partition-wise stages. + + Parameters + ---------- + agg + The Agg node to decompose. + input_ir + The original input-IR node that ``agg`` will evaluate. + partition_info + A mapping from all unique IR nodes to the + associated partitioning information. + config_options + GPUEngine configuration options. + names + Generator of unique names for temporaries. + + Returns + ------- + expr + Decomposed Agg node. + input_ir + The rewritten ``input_ir`` to be evaluated by ``expr``. + partition_info + A mapping from unique nodes in the new graph to associated + partitioning information. + """ + expr: Expr + exprs: list[Expr] + if agg.name == "count": + # Chunkwise stage + columns, input_ir, partition_info = select( + [agg], + input_ir, + partition_info, + names=names, + repartition=True, + ) + + # Combined stage + (column,) = columns + columns, input_ir, partition_info = select( + [Agg(agg.dtype, "sum", None, column)], + input_ir, + partition_info, + names=names, + ) + (expr,) = columns + elif agg.name == "mean": + # Chunkwise stage + exprs = [ + Agg(agg.dtype, "sum", None, *agg.children), + Agg(agg.dtype, "count", None, *agg.children), + ] + columns, input_ir, partition_info = select( + exprs, + input_ir, + partition_info, + names=names, + repartition=True, + ) + + # Combined stage + exprs = [ + BinOp( + agg.dtype, + plc.binaryop.BinaryOperator.DIV, + *(Agg(agg.dtype, "sum", None, column) for column in columns), + ) + ] + columns, input_ir, partition_info = select( + exprs, + input_ir, + partition_info, + names=names, + repartition=True, + ) + (expr,) = columns + elif agg.name == "n_unique": + # Get uniques and shuffle (if necessary) + # TODO: Should this be a tree reduction by default? + (child,) = agg.children + pi = partition_info[input_ir] + if pi.count > 1 and [ne.value for ne in pi.partitioned_on] != [input_ir]: + from cudf_polars.experimental.shuffle import Shuffle + + children, input_ir, partition_info = select( + [UnaryFunction(agg.dtype, "unique", (False,), child)], + input_ir, + partition_info, + names=names, + ) + (child,) = children + agg = agg.reconstruct([child]) + shuffle_on = (NamedExpr(next(names), child),) + input_ir = Shuffle( + input_ir.schema, + shuffle_on, + config_options, + input_ir, + ) + partition_info[input_ir] = PartitionInfo( + count=pi.count, + partitioned_on=shuffle_on, + ) + + # Chunkwise stage + columns, input_ir, partition_info = select( + [Cast(agg.dtype, agg)], + input_ir, + partition_info, + names=names, + repartition=True, + ) + + # Combined stage + (column,) = columns + columns, input_ir, partition_info = select( + [Agg(agg.dtype, "sum", None, column)], + input_ir, + partition_info, + names=names, + ) + (expr,) = columns + else: + # Chunkwise stage + columns, input_ir, partition_info = select( + [agg], + input_ir, + partition_info, + names=names, + repartition=True, + ) + + # Combined stage + (column,) = columns + columns, input_ir, partition_info = select( + [Agg(agg.dtype, agg.name, agg.options, column)], + input_ir, + partition_info, + names=names, + ) + (expr,) = columns + + return expr, input_ir, partition_info + + +_SUPPORTED_AGGS = ("count", "min", "max", "sum", "mean", "n_unique") + + +def _decompose_expr_node( + expr: Expr, + input_ir: IR, + partition_info: MutableMapping[IR, PartitionInfo], + config_options: ConfigOptions, + *, + names: Generator[str, None, None], +) -> tuple[Expr, IR, MutableMapping[IR, PartitionInfo]]: + """ + Decompose an expression into partition-wise stages. + + Parameters + ---------- + expr + The Expr node to decompose. + input_ir + The input IR node that ``expr`` will evaluate. + partition_info + A mapping from all unique IR nodes to the + associated partitioning information. + config_options + GPUEngine configuration options. + names + Generator of unique names for temporaries. + + Returns + ------- + expr + Decomposed Expr node. + input_ir + The rewritten ``input_ir`` to be evaluated by ``expr``. + partition_info + A mapping from unique nodes in the new graph to associated + partitioning information. + """ + if isinstance(expr, Literal): + # For Literal nodes, we don't actually want an + # input IR with real columns, because it will + # mess up the result of ``HConcat``. + input_ir = Empty() + partition_info[input_ir] = PartitionInfo(count=1) + + partition_count = partition_info[input_ir].count + if partition_count == 1 or expr.is_pointwise: + # Single-partition and pointwise expressions are always supported. + return expr, input_ir, partition_info + elif isinstance(expr, Agg) and expr.name in _SUPPORTED_AGGS: + # This is a supported Agg expression. + return _decompose_agg_node( + expr, input_ir, partition_info, config_options, names=names + ) + else: + # This is an un-supported expression - raise. + raise NotImplementedError( + f"{type(expr)} not supported for multiple partitions." + ) + + +def _decompose( + expr: Expr, rec: ExprDecomposer +) -> tuple[Expr, IR, MutableMapping[IR, PartitionInfo]]: + # Used by `decompose_expr_graph`` + + if not expr.children: + # Leaf node + return _decompose_expr_node( + expr, + rec.state["input_ir"], + {rec.state["input_ir"]: rec.state["input_partition_info"]}, + rec.state["config_options"], + names=rec.state["unique_names"], + ) + + # Process child Exprs first + children, input_irs, _partition_info = zip( + *(rec(c) for c in expr.children), strict=True + ) + partition_info = reduce(operator.or_, _partition_info) + + # Assume the partition count is the maximum input-IR partition count + input_ir: IR + assert len(input_irs) > 0 # Must have at least one input IR + partition_count = max(partition_info[ir].count for ir in input_irs) + unique_input_irs = list(dict.fromkeys(input_irs)) + if len(unique_input_irs) > 1: + # Need to make sure we only have a single input IR + # TODO: Check that we aren't concatenating misaligned + # columns that cannot be broadcasted. For example, what + # if one of the columns is sorted? + schema: MutableMapping[str, Any] = {} + for ir in unique_input_irs: + schema.update(ir.schema) + input_ir = HConcat( + schema, + True, # noqa: FBT003 + *unique_input_irs, + ) + partition_info[input_ir] = PartitionInfo(count=partition_count) + else: + input_ir = input_irs[0] + + # Call into class-specific logic to decompose ``expr`` + return _decompose_expr_node( + expr.reconstruct(children), + input_ir, + partition_info, + rec.state["config_options"], + names=rec.state["unique_names"], + ) + + +def decompose_expr_graph( + named_expr: NamedExpr, + input_ir: IR, + partition_info: MutableMapping[IR, PartitionInfo], + config_options: ConfigOptions, +) -> tuple[NamedExpr, IR, MutableMapping[IR, PartitionInfo]]: + """ + Decompose a NamedExpr into stages. + + Parameters + ---------- + named_expr + The original NamedExpr to decompose. + input_ir + The input-IR node that ``named_expr`` will be + evaluated on. + partition_info + A mapping from all unique IR nodes to the + associated partitioning information. + config_options + GPUEngine configuration options. + + Returns + ------- + named_expr + Decomposed NamedExpr object. + input_ir + The rewritten ``input_ir`` to be evaluated by ``named_expr``. + partition_info + A mapping from unique nodes in the new graph to associated + partitioning information. + + Notes + ----- + This function recursively decomposes ``named_expr.value`` and + ``input_ir`` into multiple partition-wise stages. + """ + state = { + "input_ir": input_ir, + "input_partition_info": partition_info[input_ir], + "config_options": config_options, + "unique_names": unique_names((named_expr.name, *input_ir.schema.keys())), + } + mapper = CachingVisitor(_decompose, state=state) + expr, input_ir, partition_info = mapper(named_expr.value) + return named_expr.reconstruct(expr), input_ir, partition_info diff --git a/python/cudf_polars/cudf_polars/experimental/parallel.py b/python/cudf_polars/cudf_polars/experimental/parallel.py index 1d9e8810ce7..44bb34a9e2a 100644 --- a/python/cudf_polars/cudf_polars/experimental/parallel.py +++ b/python/cudf_polars/cudf_polars/experimental/parallel.py @@ -19,6 +19,7 @@ IR, Cache, Filter, + HConcat, HStack, MapFunction, Projection, @@ -185,11 +186,15 @@ def _( ) -> MutableMapping[Any, Any]: # Generate pointwise (embarrassingly-parallel) tasks by default child_names = [get_key_name(c) for c in ir.children] + bcast_child = [partition_info[c].count == 1 for c in ir.children] return { key: ( ir.do_evaluate, *ir._non_child_args, - *[(child_name, i) for child_name in child_names], + *[ + (child_name, 0 if bcast_child[j] else i) + for j, child_name in enumerate(child_names) + ], ) for i, key in enumerate(partition_info[ir].keys(ir)) } @@ -274,3 +279,4 @@ def _lower_ir_pwise( lower_ir_node.register(Cache, _lower_ir_pwise) lower_ir_node.register(Filter, _lower_ir_pwise) lower_ir_node.register(HStack, _lower_ir_pwise) +lower_ir_node.register(HConcat, _lower_ir_pwise) diff --git a/python/cudf_polars/cudf_polars/experimental/select.py b/python/cudf_polars/cudf_polars/experimental/select.py index 74e5506bad5..a0f986e7c5c 100644 --- a/python/cudf_polars/cudf_polars/experimental/select.py +++ b/python/cudf_polars/cudf_polars/experimental/select.py @@ -6,17 +6,95 @@ from typing import TYPE_CHECKING -from cudf_polars.dsl.ir import Select +from cudf_polars.dsl.ir import HConcat, Select from cudf_polars.dsl.traversal import traversal +from cudf_polars.experimental.base import PartitionInfo from cudf_polars.experimental.dispatch import lower_ir_node +from cudf_polars.experimental.expressions import decompose_expr_graph from cudf_polars.experimental.utils import _lower_ir_fallback if TYPE_CHECKING: from collections.abc import MutableMapping from cudf_polars.dsl.ir import IR - from cudf_polars.experimental.base import PartitionInfo from cudf_polars.experimental.parallel import LowerIRTransformer + from cudf_polars.utils.config import ConfigOptions + + +def decompose_select( + select_ir: Select, + input_ir: IR, + partition_info: MutableMapping[IR, PartitionInfo], + config_options: ConfigOptions, +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """ + Decompose a multi-partition Select operation. + + Parameters + ---------- + select_ir + The original Select operation to decompose. + This object has not been reconstructed with + ``input_ir`` as its child yet. + input_ir + The lowered child of ``select_ir``. This object + will be decomposed into a "partial" selection + for each element of ``select_ir.exprs``. + partition_info + A mapping from all unique IR nodes to the + associated partitioning information. + config_options + GPUEngine configuration options. + + Returns + ------- + new_ir, partition_info + The rewritten Select node, and a mapping from + unique nodes in the new graph to associated + partitioning information. + + Notes + ----- + This function uses ``decompose_expr_graph`` to further + decompose each element of ``select_ir.exprs``. + + See Also + -------- + decompose_expr_graph + """ + # Collect partial selections + selections = [] + for ne in select_ir.exprs: + # Decompose this partial expression + new_ne, partial_input_ir, _partition_info = decompose_expr_graph( + ne, input_ir, partition_info, config_options + ) + pi = _partition_info[partial_input_ir] + partial_input_ir = Select( + {ne.name: ne.value.dtype}, + [new_ne], + True, # noqa: FBT003 + partial_input_ir, + ) + _partition_info[partial_input_ir] = pi + partition_info.update(_partition_info) + selections.append(partial_input_ir) + + # Concatenate partial selections + new_ir: HConcat | Select + if len(selections) > 1: + new_ir = HConcat( + select_ir.schema, + True, # noqa: FBT003 + *selections, + ) + partition_info[new_ir] = PartitionInfo( + count=max(partition_info[c].count for c in selections) + ) + else: + new_ir = selections[0] + + return new_ir, partition_info @lower_ir_node.register(Select) @@ -28,10 +106,16 @@ def _( if pi.count > 1 and not all( expr.is_pointwise for expr in traversal([e.value for e in ir.exprs]) ): - # TODO: Handle non-pointwise expressions. - return _lower_ir_fallback( - ir, rec, msg="This selection not support for multiple partitions." - ) + try: + # Try decomposing the underlying expressions + return decompose_select( + ir, child, partition_info, rec.state["config_options"] + ) + except NotImplementedError: + return _lower_ir_fallback( + ir, rec, msg="This selection is not supported for multiple partitions." + ) + new_node = ir.reconstruct([child]) partition_info[new_node] = pi return new_node, partition_info diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index 1251768c65a..7f16610eea4 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -144,6 +144,10 @@ def _partition_dataframe( A dictionary mapping between int partition indices and DataFrame fragments. """ + if df.num_rows == 0: + # Fast path for empty DataFrame + return {i: df for i in range(count)} + # Hash the specified keys to calculate the output # partition for each row partition_map = plc.binaryop.binary_operation( diff --git a/python/cudf_polars/tests/dsl/test_expr.py b/python/cudf_polars/tests/dsl/test_expr.py index de8fec301fe..879bc98425f 100644 --- a/python/cudf_polars/tests/dsl/test_expr.py +++ b/python/cudf_polars/tests/dsl/test_expr.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations @@ -95,3 +95,15 @@ def make_expr(n1, n2): assert e1 == e2 assert e1 != e3 assert e2 != e3 + + +def test_reconstruct_named_expr(): + ne1 = expr.NamedExpr("a", expr.Col(plc.DataType(plc.TypeId.INT8), "a")) + new_value = expr.Col(plc.DataType(plc.TypeId.INT16), "a") + ne2 = ne1.reconstruct(new_value) + assert ne1.name == ne2.name + assert ne1 != ne2 + assert ne2.value == new_value + + ne3 = ne2.reconstruct(ne2.value) + assert ne3 is ne2 diff --git a/python/cudf_polars/tests/experimental/test_join.py b/python/cudf_polars/tests/experimental/test_join.py index 108f5ee559a..d2ad2114db2 100644 --- a/python/cudf_polars/tests/experimental/test_join.py +++ b/python/cudf_polars/tests/experimental/test_join.py @@ -14,11 +14,33 @@ from cudf_polars.utils.config import ConfigOptions +@pytest.fixture(scope="module") +def left(): + return pl.LazyFrame( + { + "x": range(15), + "y": [1, 2, 3] * 5, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 3, + } + ) + + +@pytest.fixture(scope="module") +def right(): + return pl.LazyFrame( + { + "xx": range(9), + "y": [2, 4, 3] * 3, + "zz": [1, 2, 3] * 3, + } + ) + + @pytest.mark.parametrize("how", ["inner", "left", "right", "full", "semi", "anti"]) @pytest.mark.parametrize("reverse", [True, False]) @pytest.mark.parametrize("max_rows_per_partition", [1, 5, 10, 15]) @pytest.mark.parametrize("broadcast_join_limit", [1, 16]) -def test_join(how, reverse, max_rows_per_partition, broadcast_join_limit): +def test_join(left, right, how, reverse, max_rows_per_partition, broadcast_join_limit): engine = pl.GPUEngine( raise_on_fail=True, executor="streaming", @@ -29,20 +51,6 @@ def test_join(how, reverse, max_rows_per_partition, broadcast_join_limit): "shuffle_method": "tasks", }, ) - left = pl.LazyFrame( - { - "x": range(15), - "y": [1, 2, 3] * 5, - "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 3, - } - ) - right = pl.LazyFrame( - { - "xx": range(6), - "y": [2, 4, 3] * 2, - "zz": [1, 2] * 3, - } - ) if reverse: left, right = right, left @@ -65,7 +73,7 @@ def test_join(how, reverse, max_rows_per_partition, broadcast_join_limit): @pytest.mark.parametrize("broadcast_join_limit", [1, 2, 3, 4]) -def test_broadcast_join_limit(broadcast_join_limit): +def test_broadcast_join_limit(left, right, broadcast_join_limit): engine = pl.GPUEngine( raise_on_fail=True, executor="streaming", @@ -110,3 +118,23 @@ def test_broadcast_join_limit(broadcast_join_limit): else: # Expect broadcast join assert len(shuffle_nodes) == 0 + + +def test_join_then_shuffle(left, right): + engine = pl.GPUEngine( + raise_on_fail=True, + executor="streaming", + executor_options={ + "scheduler": DEFAULT_SCHEDULER, + "max_rows_per_partition": 2, + "broadcast_join_limit": 1, + }, + ) + q = left.join(right, on="y", how="inner").select( + pl.col("x").sum(), + pl.col("xx").mean(), + pl.col("y").n_unique(), + (pl.col("y") * pl.col("y")).n_unique().alias("y2"), + ) + + assert_gpu_result_equal(q, engine=engine, check_row_order=False) diff --git a/python/cudf_polars/tests/experimental/test_select.py b/python/cudf_polars/tests/experimental/test_select.py index 97169976590..aed0e0ed277 100644 --- a/python/cudf_polars/tests/experimental/test_select.py +++ b/python/cudf_polars/tests/experimental/test_select.py @@ -27,6 +27,7 @@ def df(): { "a": [1, 2, 3, 4, 5, 6, 7], "b": [1, 1, 1, 1, 1, 1, 1], + "c": [2, 4, 6, 8, 10, 12, 14], } ) @@ -49,11 +50,12 @@ def test_select_reduce_fallback(df, fallback_mode): "scheduler": DEFAULT_SCHEDULER, }, ) - match = "This selection not support for multiple partitions." + match = "This selection is not supported for multiple partitions." query = df.select( (pl.col("a") + pl.col("b")).max(), - (pl.col("a") * 2 + pl.col("b")).alias("d").mean(), + # NOTE: We don't support `median` yet + (pl.col("a") * 2 + pl.col("b")).alias("d").median(), ) if fallback_mode == "silent": @@ -71,6 +73,34 @@ def test_select_reduce_fallback(df, fallback_mode): assert_gpu_result_equal(query, engine=engine) +@pytest.mark.parametrize( + "aggs", + [ + (pl.col("a").sum(),), + ( + (pl.col("a") + pl.col("b")).sum(), + (pl.col("a") * 2 + pl.col("b")).alias("d").min(), + ), + (pl.col("a").min() + pl.col("b").max(),), + (pl.col("a") - (pl.col("b") + pl.col("c").max()).sum(),), + (pl.col("b").len(),), + (pl.col("a") - (pl.col("b") + pl.col("c").max()).mean(),), + ( + pl.col("b").sum(), + (pl.col("c").sum() + 1), + ), + ( + pl.col("b").n_unique(), + (pl.col("c").n_unique() + 1), + ), + ], +) +def test_select_aggs(df, engine, aggs): + # Test supported aggs (e.g. "min", "max", "mean", "n_unique") + query = df.select(*aggs) + assert_gpu_result_equal(query, engine=engine) + + def test_select_with_cse_no_agg(df, engine): expr = pl.col("a") + pl.col("a") query = df.select(expr, (expr * 2).alias("b"), ((expr * 2) + 10).alias("c"))