diff --git a/python/cudf_polars/cudf_polars/dsl/expressions/base.py b/python/cudf_polars/cudf_polars/dsl/expressions/base.py index 8ba3f9f407c..680d176f83f 100644 --- a/python/cudf_polars/cudf_polars/dsl/expressions/base.py +++ b/python/cudf_polars/cudf_polars/dsl/expressions/base.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 # TODO: remove need for this # ruff: noqa: D101 @@ -18,6 +18,8 @@ if TYPE_CHECKING: from collections.abc import Mapping + from typing_extensions import Self + from cudf_polars.containers import Column, DataFrame __all__ = ["AggInfo", "Col", "ColRef", "ExecutionContext", "Expr", "NamedExpr"] @@ -237,6 +239,24 @@ def collect_agg(self, *, depth: int) -> AggInfo: """Collect information about aggregations in groupbys.""" return self.value.collect_agg(depth=depth) + def reconstruct(self, expr: Expr) -> Self: + """ + Rebuild with a new `Expr` value. + + Parameters + ---------- + expr + New `Expr` value + + Returns + ------- + New `NamedExpr` with `expr` as the underlying expression. + The name of the original `NamedExpr` is preserved. + """ + if expr is self.value: + return self + return type(self)(self.name, expr) + class Col(Expr): __slots__ = ("name",) diff --git a/python/cudf_polars/cudf_polars/dsl/traversal.py b/python/cudf_polars/cudf_polars/dsl/traversal.py index 9c45a68812a..a34b62ad119 100644 --- a/python/cudf_polars/cudf_polars/dsl/traversal.py +++ b/python/cudf_polars/cudf_polars/dsl/traversal.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 """Traversal and visitor utilities for nodes.""" @@ -23,7 +23,9 @@ ] -def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: +def traversal( + nodes: Sequence[NodeT], *, cutoff_types: tuple[type[NodeT], ...] = () +) -> Generator[NodeT, None, None]: """ Pre-order traversal of nodes in an expression. @@ -31,6 +33,9 @@ def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: ---------- nodes Roots of expressions to traverse. + cutoff_types + Types to terminate traversal at. If a type is in this tuple + then we do not yield any of its children. Yields ------ @@ -43,6 +48,8 @@ def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: while lifo: node = lifo.pop() yield node + if cutoff_types and isinstance(node, cutoff_types): + continue for child in reversed(node.children): if child not in seen: seen.add(child) diff --git a/python/cudf_polars/cudf_polars/experimental/base.py b/python/cudf_polars/cudf_polars/experimental/base.py index 86f773f8a21..7605c867389 100644 --- a/python/cudf_polars/cudf_polars/experimental/base.py +++ b/python/cudf_polars/cudf_polars/experimental/base.py @@ -36,6 +36,6 @@ def keys(self, node: Node) -> Iterator[tuple[str, int]]: yield from ((name, i) for i in range(self.count)) -def get_key_name(node: Node) -> str: +def get_key_name(node: Node, *other: Node) -> str: """Generate the key name for a Node.""" - return f"{type(node).__name__.lower()}-{hash(node)}" + return f"{type(node).__name__.lower()}-{hash((node, *other))}" diff --git a/python/cudf_polars/cudf_polars/experimental/expressions.py b/python/cudf_polars/cudf_polars/experimental/expressions.py new file mode 100644 index 00000000000..3f764f0c3ba --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/expressions.py @@ -0,0 +1,632 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Multi-partition Expr classes and utilities.""" + +from __future__ import annotations + +from itertools import chain +from typing import TYPE_CHECKING, Any + +import pylibcudf as plc + +from cudf_polars.containers import Column, DataFrame +from cudf_polars.dsl.expressions.aggregation import Agg +from cudf_polars.dsl.expressions.base import Col, ExecutionContext, Expr, NamedExpr +from cudf_polars.dsl.expressions.binaryop import BinOp +from cudf_polars.dsl.expressions.literal import Literal +from cudf_polars.dsl.expressions.unary import Cast +from cudf_polars.dsl.traversal import ( + CachingVisitor, + traversal, +) +from cudf_polars.experimental.base import get_key_name + +if TYPE_CHECKING: + from collections.abc import Mapping, MutableMapping, Sequence + + from cudf_polars.dsl.ir import IR + from cudf_polars.experimental.base import PartitionInfo + from cudf_polars.typing import ExprTransformer + from cudf_polars.utils.config import ConfigOptions + + +_SUPPORTED_AGGS = ("count", "min", "max", "sum", "mean", "n_unique") + + +class FusedExpr(Expr): + """ + A single fused component of a decomposed Expr graph. + + Notes + ----- + - A FusedExpr object points to a single node in a + decomposed Expr graph (i.e. ``sub_expr``). + - A FusedExpr object may have children, but those + children must be other FusedExpr objects. + - When a FusedExpr object is evaluated, it will + substitute it's evaluated children into ``sub_expr``, + and evaluate the re-written sub-expression. + - The specific structure of ``sub_expr`` depends on + the ``kind`` attribute. + """ + + __slots__ = ("kind", "sub_expr") + _non_child = ("dtype", "sub_expr", "kind") + + def __init__( + self, + dtype: plc.DataType, + sub_expr: Expr, + kind: str | None, + *children: FusedExpr, + ): + self.dtype = dtype + self.sub_expr = sub_expr + self.kind = kind + self.children = children + self.is_pointwise = self.kind == "pointwise" + assert all(isinstance(c, FusedExpr) for c in children) + assert kind in ("pointwise", "shuffle", "aggregation") + + def do_evaluate( + self, + df: DataFrame, + *, + context: ExecutionContext = ExecutionContext.FRAME, + mapping: Mapping[Expr, Column] | None = None, + ) -> Column: # pragma: no cover + """Evaluate this expression given a dataframe for context.""" + return self.sub_expr.evaluate(df, context=context, mapping=mapping) + + +class NoOp(Expr): + """No-op expression.""" + + __slots__ = () + _non_child = ("dtype",) + + def __init__(self, dtype: plc.DataType, value: Expr) -> None: + self.dtype = dtype + self.children = (value,) + self.is_pointwise = True + + def do_evaluate( + self, + df: DataFrame, + *, + context: ExecutionContext = ExecutionContext.FRAME, + mapping: Mapping[Expr, Column] | None = None, + ) -> Column: # pragma: no cover + """Evaluate this expression given a dataframe for context.""" + (child,) = self.children + return child.evaluate(df, context=context, mapping=mapping) + + +class ShuffleColumn(Expr): + """Shuffle expression.""" + + __slots__ = ("config_options",) + _non_child = ("dtype", "config_options") + + def __init__( + self, + dtype: plc.DataType, + config_options: ConfigOptions, + value: Expr, + ) -> None: + self.dtype = dtype + self.config_options = config_options + self.children = (value,) + self.is_pointwise = False + + def do_evaluate( + self, + df: DataFrame, + *, + context: ExecutionContext = ExecutionContext.FRAME, + mapping: Mapping[Expr, Column] | None = None, + ) -> Column: + """Evaluate this expression given a dataframe for context.""" + (child,) = self.children + return child.evaluate(df, context=context, mapping=mapping) + + +def extract_partition_counts( + exprs: Sequence[Expr], + input_ir_count: int, + *, + update: MutableMapping[Expr, int] | None = None, + skip_fused_exprs: bool = False, +) -> MutableMapping[Expr, int]: + """ + Extract a partition-count mapping for Expr nodes. + + Parameters + ---------- + exprs + Sequence of root expressions to traverse and + get partition counts. + input_ir_count + Partition count for the child-IR node. + update + Existing mapping to update. + skip_fused_exprs + Whether to skip over FusedExpr objects. This + can be used to stay within a local FusedExpr + sub-expression. + + Returns + ------- + Mapping between Expr nodes and partition counts. + """ + expr_partition_counts: MutableMapping[Expr, int] = update or {} + cutoff_types = (FusedExpr,) if skip_fused_exprs else () + for expr in exprs: + for node in list(traversal([expr], cutoff_types=cutoff_types))[::-1]: + if isinstance(node, FusedExpr): + # Process the fused sub-expression graph first + expr_partition_counts = extract_partition_counts( + [node.sub_expr], + input_ir_count, + update=expr_partition_counts, + skip_fused_exprs=True, + ) + expr_partition_counts[node] = expr_partition_counts[node.sub_expr] + elif isinstance(node, (Agg, Literal)): + # Assume all aggregations produce 1 partition + expr_partition_counts[node] = 1 + elif node.is_pointwise or isinstance(node, ShuffleColumn): + # Pointwise expressions should preserve child partition count + if node.children: + # Assume maximum child partition count + expr_partition_counts[node] = max( + [expr_partition_counts[c] for c in node.children] + ) + else: + # If no children, we are preserving the child-IR partition count + expr_partition_counts[node] = input_ir_count + else: # pragma: no cover + raise NotImplementedError( + f"{type(node)} not supported for multiple partitions." + ) + + return expr_partition_counts + + +def _decompose(expr: Expr, rec: ExprTransformer) -> FusedExpr: + # Used by `decompose_expr_graph` + + # We are translating our original expression graph to + # comprise only of FusedExpr nodes. We use a depth-first + # traversal, so that we start with FusedExpr leaf nodes. + # As we work our way back towards the root node, the + # existing FusedExpr nodes will "absorb" their parents + # as long as all sub-expression nodes are "pointwise". + # As soon as a FusedExpr object contains a non-pointwise + # sub-expression node, that object may no-longer absorb + # its parents. + + # Transform child expressions first + new_children = tuple(map(rec, expr.children)) + fused_children: list[FusedExpr] = [] + if new_children: + # Non-leaf node. + # Construct child lists for new expressions + # (both the fused expression and the sub-expression) + sub_expr_children: list[Expr] = [] + for child in new_children: + # All children should be FusedExpr + assert isinstance(child, FusedExpr), "FusedExpr children must be FusedExpr." + if child.is_pointwise: + # Pointwise children must be fused into the + # "new" FusedExpr node with root `expr` + for c in child.children: + assert isinstance(c, FusedExpr), ( + "FusedExpr children must be FusedExpr." + ) + fused_children.append(c) + sub_expr_children.append(child.sub_expr) + else: + # Non-pointwise children must remain as + # distinct FusedExpr nodes + fused_children.append(child) + sub_expr_children.append(child) + sub_expr = expr.reconstruct(sub_expr_children) + else: + # Leaf node. + # Convert to simple FusedExpr with no children + sub_expr = expr + + return construct_fused_expr(sub_expr, fused_children, rec.state["config_options"]) + + +def construct_fused_expr( + sub_expr: Expr, fused_children: list[FusedExpr], config_options: ConfigOptions +) -> FusedExpr: + """ + Construct new FusedExpr object. + + Parameters + ---------- + sub_expr + Expression to be wrapped in a ``FusedExpr`` class. + fused_children + Children of ``sub_expr`` that are already ``FusedExpr`` nodes. + config_options + GPUEngine configuration options. + + Returns + ------- + New ``FusedExpr`` object. + """ + if sub_expr.is_pointwise: + # Pointwise expressions are always supported. + kind = "pointwise" + final_expr = sub_expr + elif isinstance(sub_expr, Agg) and sub_expr.name in _SUPPORTED_AGGS: + # This is a supported Agg expression. + kind = "aggregation" + agg = sub_expr + agg_name = agg.name + chunk_expr: Expr + if agg_name == "count": + # Chunkwise + chunk_expr = agg + # Combine + combine_expr = Agg( + agg.dtype, + "sum", + None, + chunk_expr, + ) + # Finalize + final_expr = NoOp(agg.dtype, combine_expr) + elif agg_name == "mean": + # Chunkwise + chunk_exprs = [ + Agg(agg.dtype, "sum", None, *agg.children), + Agg(agg.dtype, "count", None, *agg.children), + ] + # Combine + combine_exprs = [ + Agg( + agg.dtype, + "sum", + None, + chunk_expr, + ) + for chunk_expr in chunk_exprs + ] + # Finalize + final_expr = BinOp( + agg.dtype, + plc.binaryop.BinaryOperator.DIV, + *combine_exprs, + ) + elif agg_name == "n_unique": + # Inject shuffle + # TODO: Avoid shuffle if possible + (child,) = agg.children + shuffled = FusedExpr( + child.dtype, + ShuffleColumn(child.dtype, config_options, child), + "shuffle", + *fused_children, + ) + fused_children = [shuffled] + # Chunkwise + chunk_expr = Cast( + agg.dtype, + Agg(agg.dtype, "n_unique", agg.options, shuffled), + ) + # Combine + combine_expr = Agg(agg.dtype, "sum", None, chunk_expr) + # Finalize + final_expr = NoOp(agg.dtype, combine_expr) + else: + # Chunkwise + chunk_expr = agg + # Combine + combine_expr = Agg( + agg.dtype, + agg.name, + agg.options, + chunk_expr, + ) + # Finalize + final_expr = NoOp(agg.dtype, combine_expr) + else: + # This is an un-supported expression - raise. + raise NotImplementedError( + f"{type(sub_expr)} not supported for multiple partitions." + ) + + return FusedExpr(final_expr.dtype, final_expr, kind, *fused_children) + + +def decompose_expr_graph(expr: Expr, config_options: ConfigOptions) -> Expr: + """Transform an Expr into a graph of FusedExpr nodes.""" + mapper = CachingVisitor(_decompose, state={"config_options": config_options}) + return mapper(expr) + + +def evaluate_chunk( + df: DataFrame, + expr: Expr, + children: tuple[Expr, ...], + *references: Column, +) -> Column: + """Evaluate the sub-expression of a simple FusedExpr node.""" + return expr.evaluate(df, mapping=dict(zip(children, references, strict=True))) + + +def evaluate_chunk_multi_agg( + df: DataFrame, + exprs: Sequence[Expr], + children: tuple[Expr, ...], + *references: Column, +) -> tuple[Column, ...]: + """Evaluate multiple aggregations.""" + mapping = dict(zip(children, references, strict=True)) + return tuple(expr.evaluate(df, mapping=mapping) for expr in exprs) + + +def combine_chunks_multi_agg( + column_chunks: Sequence[tuple[Column, ...]], + combine_aggs: Sequence[Agg], + finalize: Expr, + name: str, +) -> Column: + """Aggregate Column chunks.""" + column_chunk_lists = zip(*column_chunks, strict=True) + + combined = [ + agg.op( + Column( + plc.concatenate.concatenate([col.obj for col in column_chunk_list]), + name=column_chunk_list[0].name, + ) + ) + for agg, column_chunk_list in zip(combine_aggs, column_chunk_lists, strict=True) + ] + + if isinstance(finalize, NoOp): + (col,) = combined + else: + col = finalize.evaluate( + DataFrame([]), # Placeholder DataFrame + mapping=dict(zip(finalize.children, combined, strict=True)), + ) + + return col.rename(name) + + +def make_agg_graph( + named_expr: NamedExpr, + expr_partition_counts: MutableMapping[Expr, int], + input_ir: IR, +) -> MutableMapping[Any, Any]: + """Build a FusedExpr aggregation graph.""" + expr = named_expr.value + assert isinstance(expr, FusedExpr) + assert expr.kind == "aggregation" + + # Define aggregation steps + final_expr = expr.sub_expr + combine_exprs = final_expr.children + chunkwise_exprs = tuple( + chain.from_iterable(combine_expr.children for combine_expr in combine_exprs) + ) + (chunkwise_child,) = chunkwise_exprs[0].children + if isinstance(chunkwise_child, Agg): + # We may have needed a cast after the chunkwise agg + (chunkwise_child,) = chunkwise_child.children + + # NOTE: This algorithm assumes we are doing nested + # aggregations, or we are only aggregating a single + # column. If we are performing aligned aggregations + # across multiple columns at once, we should perform + # our reduction at the DataFrame level instead. + + key_name = get_key_name(expr, input_ir) + expr_child_names = [get_key_name(c, input_ir) for c in expr.children] + expr_bcast = [expr_partition_counts[c] == 1 for c in expr.children] + input_count = expr_partition_counts[chunkwise_child] + + graph: MutableMapping[Any, Any] = {} + + # Pointwise stage + pointwise_keys = [] + key_name = get_key_name(expr, input_ir) + inpit_ir_name = get_key_name(input_ir) + chunk_name = f"chunk-{key_name}" + for i in range(input_count): + pointwise_keys.append((chunk_name, i)) + graph[pointwise_keys[-1]] = ( + evaluate_chunk_multi_agg, + (inpit_ir_name, i), + chunkwise_exprs, + expr.children, + *( + (name, 0) if bcast else (name, i) + for name, bcast in zip(expr_child_names, expr_bcast, strict=True) + ), + ) + + # Combine and finalize + graph[(key_name, 0)] = ( + combine_chunks_multi_agg, + pointwise_keys, + combine_exprs, + final_expr, + named_expr.name, + ) + + return graph + + +def _project(df: DataFrame, selection: NamedExpr) -> DataFrame: + # Select a Column from a DataFrame. + # (Used by `make_shuffle_graph`) + return DataFrame([selection.evaluate(df)]) + + +def _df_to_column(df: DataFrame, name: str) -> Column: + # Convert a DataFrame to a Column. + # (Used by `make_shuffle_graph`) + (column,) = df.columns + return column.rename(name) + + +def make_shuffle_graph( + named_expr: NamedExpr, + expr_partition_counts: MutableMapping[Expr, int], + input_ir: IR, + ir_partition_info: PartitionInfo, +) -> MutableMapping[Any, Any]: + """Build a FusedExpr aggregation graph for shuffling.""" + from cudf_polars.experimental.shuffle import ( + _SHUFFLE_METHODS, + RMPIntegration, + _simple_shuffle_graph, + ) + + expr = named_expr.value + col_name = named_expr.name + assert isinstance(expr, FusedExpr) + assert isinstance(expr.sub_expr, ShuffleColumn) + (child,) = expr.sub_expr.children + + key_name = get_key_name(expr, input_ir) + name_ir_in = get_key_name(input_ir) + name_projected = f"projected-{key_name}" + name_shuffled = f"shuffled-{key_name}" + partition_count = ir_partition_info.count + + # Check if we can avoid the shuffle + if partition_count == 1 or ( + ir_partition_info.partitioned_on == (named_expr.reconstruct(child),) + and ir_partition_info.count == partition_count + ): + return make_pointwise_graph( + named_expr, + expr_partition_counts, + input_ir, + ) + + # Select the column needed for shuffling + selection = NamedExpr(col_name, child) + graph: MutableMapping[Any, Any] = { + (name_projected, i): (_project, (name_ir_in, i), selection) + for i in range(partition_count) + } + + shuffle_on = NamedExpr(col_name, Col(child.dtype, col_name)) + if ( + shuffle_method := expr.sub_expr.config_options.get( + "executor_options.shuffle_method", + default=None, + ) + ) not in (*_SHUFFLE_METHODS, None): # pragma: no cover + raise ValueError( + f"{shuffle_method} is not a supported shuffle method. " + f"Expected one of: {_SHUFFLE_METHODS}." + ) + + if shuffle_method in (None, "rapidsmp"): # pragma: no cover + try: + from rapidsmp.integrations.dask import rapidsmp_shuffle_graph + + graph.update( + rapidsmp_shuffle_graph( + name_projected, + name_shuffled, + [col_name], + [col_name], + partition_count, + partition_count, + RMPIntegration, + ) + ) + + except (ImportError, ValueError) as err: + # ImportError: rapidsmp is not installed + # ValueError: rapidsmp couldn't find a distributed client + if shuffle_method == "rapidsmp": + # Only raise an error if the user specifically + # set the shuffle method to "rapidsmp" + raise ValueError( + "Rapidsmp is not installed correctly or the current " + "Dask cluster does not support rapidsmp shuffling." + ) from err + + graph.update( + _simple_shuffle_graph( + name_projected, + name_shuffled, + (shuffle_on,), + partition_count, + partition_count, + ) + ) + + # Convert DataFrame back to Column + # TODO: Use fusion/nesting? + graph.update( + { + (key_name, i): (_df_to_column, (name_shuffled, i), col_name) + for i in range(partition_count) + } + ) + + return graph + + +def make_pointwise_graph( + named_expr: NamedExpr, + expr_partition_counts: MutableMapping[Expr, int], + input_ir: IR, +) -> MutableMapping[Any, Any]: + """Build simple pointwise FusedExpr graph.""" + expr = named_expr.value + input_ir_name = get_key_name(input_ir) + key_name = get_key_name(expr, input_ir) + expr_child_names = [get_key_name(c, input_ir) for c in expr.children] + expr_bcast = [expr_partition_counts[c] == 1 for c in expr.children] + count = expr_partition_counts[expr] + assert isinstance(expr, FusedExpr) + sub_expr = named_expr.reconstruct(expr.sub_expr) + return { + (key_name, i): ( + evaluate_chunk, + (input_ir_name, i), + sub_expr, + expr.children, + *( + (name, 0) if bcast else (name, i) + for name, bcast in zip(expr_child_names, expr_bcast, strict=True) + ), + ) + for i in range(count) + } + + +def make_fusedexpr_graph( + named_expr: NamedExpr, + expr_partition_counts: MutableMapping[Expr, int], + input_ir: IR, + input_ir_partition_info: PartitionInfo, +) -> MutableMapping[Any, Any]: + """Build task graph for a FusedExpr node.""" + expr = named_expr.value + assert isinstance(expr, FusedExpr) + if expr.kind == "pointwise": + return make_pointwise_graph(named_expr, expr_partition_counts, input_ir) + elif expr.kind == "shuffle": + return make_shuffle_graph( + named_expr, expr_partition_counts, input_ir, input_ir_partition_info + ) + elif expr.kind == "aggregation": + return make_agg_graph(named_expr, expr_partition_counts, input_ir) + else: # pragma: no cover + raise ValueError(f"{expr.kind} is not a supported `FusedExpr.kind` value.") diff --git a/python/cudf_polars/cudf_polars/experimental/parallel.py b/python/cudf_polars/cudf_polars/experimental/parallel.py index 2170183bbb1..a8b2fd5b5ba 100644 --- a/python/cudf_polars/cudf_polars/experimental/parallel.py +++ b/python/cudf_polars/cudf_polars/experimental/parallel.py @@ -14,7 +14,7 @@ import cudf_polars.experimental.join import cudf_polars.experimental.select import cudf_polars.experimental.shuffle # noqa: F401 -from cudf_polars.dsl.ir import IR, Cache, Filter, HStack, Projection, Select, Union +from cudf_polars.dsl.ir import IR, Cache, Filter, HStack, Projection, Union from cudf_polars.dsl.traversal import CachingVisitor, traversal from cudf_polars.experimental.base import PartitionInfo, get_key_name from cudf_polars.experimental.dispatch import ( @@ -300,4 +300,3 @@ def _generate_ir_tasks_pwise( generate_ir_tasks.register(Cache, _generate_ir_tasks_pwise) generate_ir_tasks.register(Filter, _generate_ir_tasks_pwise) generate_ir_tasks.register(HStack, _generate_ir_tasks_pwise) -generate_ir_tasks.register(Select, _generate_ir_tasks_pwise) diff --git a/python/cudf_polars/cudf_polars/experimental/select.py b/python/cudf_polars/cudf_polars/experimental/select.py index 74e5506bad5..cd3330b6641 100644 --- a/python/cudf_polars/cudf_polars/experimental/select.py +++ b/python/cudf_polars/cudf_polars/experimental/select.py @@ -4,19 +4,54 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any +from cudf_polars.containers import DataFrame from cudf_polars.dsl.ir import Select from cudf_polars.dsl.traversal import traversal -from cudf_polars.experimental.dispatch import lower_ir_node +from cudf_polars.experimental.base import PartitionInfo, get_key_name +from cudf_polars.experimental.dispatch import generate_ir_tasks, lower_ir_node +from cudf_polars.experimental.expressions import ( + FusedExpr, + decompose_expr_graph, + extract_partition_counts, + make_fusedexpr_graph, +) from cudf_polars.experimental.utils import _lower_ir_fallback if TYPE_CHECKING: from collections.abc import MutableMapping + from cudf_polars.dsl.expressions.base import Expr from cudf_polars.dsl.ir import IR - from cudf_polars.experimental.base import PartitionInfo from cudf_polars.experimental.parallel import LowerIRTransformer + from cudf_polars.utils.config import ConfigOptions + + +def decompose_select( + ir: Select, + child: IR, + partition_info: MutableMapping[IR, PartitionInfo], + config_options: ConfigOptions, +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """Decompose a Select expression (if possible).""" + named_exprs = [ + e.reconstruct(decompose_expr_graph(e.value, config_options)) for e in ir.exprs + ] + new_node = Select( + ir.schema, + named_exprs, + ir.should_broadcast, + child, + ) + expr_partition_info = extract_partition_counts( + [ne.value for ne in named_exprs], + partition_info[child].count, + ) + partition_info[new_node] = PartitionInfo( + count=max(expr_partition_info[ne.value] for ne in named_exprs) + ) + return new_node, partition_info @lower_ir_node.register(Select) @@ -28,10 +63,92 @@ def _( if pi.count > 1 and not all( expr.is_pointwise for expr in traversal([e.value for e in ir.exprs]) ): - # TODO: Handle non-pointwise expressions. - return _lower_ir_fallback( - ir, rec, msg="This selection not support for multiple partitions." - ) + try: + # Try decomposing the underlying expressions + return decompose_select( + ir, child, partition_info, rec.state["config_options"] + ) + except NotImplementedError: + # TODO: Handle more non-pointwise expressions. + return _lower_ir_fallback( + ir, rec, msg="This selection is not supported for multiple partitions." + ) + new_node = ir.reconstruct([child]) partition_info[new_node] = pi return new_node, partition_info + + +def build_fusedexpr_select_graph( + ir: Select, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + """Build complex Select graph.""" + (child,) = ir.children + child_count = partition_info[child].count + + # Build Graph to produce a column for each + # NamedExpr element of ir.exprs + graph: MutableMapping[Any, Any] = {} + expr_partition_counts: MutableMapping[Expr, int] = {} + roots = [] + for ne in ir.exprs: + assert isinstance(ne.value, FusedExpr), f"{ne.value} is not a FusedExpr" + roots.append(ne) + expr_partition_counts = extract_partition_counts( + [ne.value], + child_count, + update=expr_partition_counts, + ) + for node in traversal([ne.value]): + assert isinstance(node, FusedExpr), f"{node} is not a FusedExpr" + graph.update( + make_fusedexpr_graph( + ne.reconstruct(node), + expr_partition_counts, + child, + partition_info[child], + ) + ) + + # Add task(s) to select the final columns + name = get_key_name(ir) + count = max(expr_partition_counts[root.value] for root in roots) + expr_names = [get_key_name(root.value, child) for root in roots] + expr_bcast = [expr_partition_counts[root.value] == 1 for root in roots] + for i in range(count): + graph[(name, i)] = ( + DataFrame, + [ + (name, 0) if bcast else (name, i) + for name, bcast in zip(expr_names, expr_bcast, strict=True) + ], + ) + + return graph + + +@generate_ir_tasks.register(Select) +def _( + ir: Select, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + # TODO: If we are doing aligned aggregations on multiple + # columns at once, we should build a task graph that + # evaluates the aligned expressions at the same time + # (rather than each task operating on an individual column). + + fused_exprs = [isinstance(ne.value, FusedExpr) for ne in ir.exprs] + if any(fused_exprs): + # Handle FusedExpr-based graph construction + assert all(fused_exprs), "Partial fusion is not supported" + return build_fusedexpr_select_graph(ir, partition_info) + else: + # Simple point-wise graph + child_name = get_key_name(ir.children[0]) + return { + key: ( + ir.do_evaluate, + *ir._non_child_args, + (child_name, i), + ) + for i, key in enumerate(partition_info[ir].keys(ir)) + } diff --git a/python/cudf_polars/cudf_polars/experimental/shuffle.py b/python/cudf_polars/cudf_polars/experimental/shuffle.py index e0dcedf8921..58947618a44 100644 --- a/python/cudf_polars/cudf_polars/experimental/shuffle.py +++ b/python/cudf_polars/cudf_polars/experimental/shuffle.py @@ -146,6 +146,10 @@ def _partition_dataframe( A dictionary mapping between int partition indices and DataFrame fragments. """ + if df.num_rows == 0: + # Fast path for empty DataFrame + return {i: df for i in range(count)} + # Hash the specified keys to calculate the output # partition for each row partition_map = plc.binaryop.binary_operation( diff --git a/python/cudf_polars/tests/dsl/test_expr.py b/python/cudf_polars/tests/dsl/test_expr.py index de8fec301fe..ced18f1b3bc 100644 --- a/python/cudf_polars/tests/dsl/test_expr.py +++ b/python/cudf_polars/tests/dsl/test_expr.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations @@ -95,3 +95,12 @@ def make_expr(n1, n2): assert e1 == e2 assert e1 != e3 assert e2 != e3 + + +def test_reconstruct_named_expr(): + ne1 = expr.NamedExpr("a", expr.Col(plc.DataType(plc.TypeId.INT8), "a")) + new_value = expr.Col(plc.DataType(plc.TypeId.INT16), "a") + ne2 = ne1.reconstruct(new_value) + assert ne1.name == ne2.name + assert ne1 != ne2 + assert ne2.value == new_value diff --git a/python/cudf_polars/tests/dsl/test_traversal.py b/python/cudf_polars/tests/dsl/test_traversal.py index c60abe50bab..15c9fc3ceb2 100644 --- a/python/cudf_polars/tests/dsl/test_traversal.py +++ b/python/cudf_polars/tests/dsl/test_traversal.py @@ -52,6 +52,12 @@ def test_traversal_unique(): assert set(unique_exprs) == {expr.Col(dt, "a"), expr.Col(dt, "b"), e3} assert unique_exprs == [e3, expr.Col(dt, "b"), expr.Col(dt, "a")] + e4 = make_expr(dt, "b", "a") + unique_exprs = list(traversal([e4], cutoff_types=(expr.BinOp,))) + + assert len(unique_exprs) == 1 + assert unique_exprs == [e4] + def rename(e, rec): mapping = rec.state["mapping"] diff --git a/python/cudf_polars/tests/experimental/test_join.py b/python/cudf_polars/tests/experimental/test_join.py index 609ca896756..b5eba0f7060 100644 --- a/python/cudf_polars/tests/experimental/test_join.py +++ b/python/cudf_polars/tests/experimental/test_join.py @@ -13,11 +13,33 @@ from cudf_polars.testing.asserts import assert_gpu_result_equal +@pytest.fixture(scope="module") +def left(): + return pl.LazyFrame( + { + "x": range(15), + "y": [1, 2, 3] * 5, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 3, + } + ) + + +@pytest.fixture(scope="module") +def right(): + return pl.LazyFrame( + { + "xx": range(9), + "y": [2, 4, 3] * 3, + "zz": [1, 2, 3] * 3, + } + ) + + @pytest.mark.parametrize("how", ["inner", "left", "right", "full", "semi", "anti"]) @pytest.mark.parametrize("reverse", [True, False]) @pytest.mark.parametrize("max_rows_per_partition", [1, 5, 10, 15]) @pytest.mark.parametrize("broadcast_join_limit", [1, 16]) -def test_join(how, reverse, max_rows_per_partition, broadcast_join_limit): +def test_join(left, right, how, reverse, max_rows_per_partition, broadcast_join_limit): engine = pl.GPUEngine( raise_on_fail=True, executor="dask-experimental", @@ -26,20 +48,6 @@ def test_join(how, reverse, max_rows_per_partition, broadcast_join_limit): "broadcast_join_limit": broadcast_join_limit, }, ) - left = pl.LazyFrame( - { - "x": range(15), - "y": [1, 2, 3] * 5, - "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 3, - } - ) - right = pl.LazyFrame( - { - "xx": range(6), - "y": [2, 4, 3] * 2, - "zz": [1, 2] * 3, - } - ) if reverse: left, right = right, left @@ -62,7 +70,7 @@ def test_join(how, reverse, max_rows_per_partition, broadcast_join_limit): @pytest.mark.parametrize("broadcast_join_limit", [1, 2, 3, 4]) -def test_broadcast_join_limit(broadcast_join_limit): +def test_broadcast_join_limit(left, right, broadcast_join_limit): engine = pl.GPUEngine( raise_on_fail=True, executor="dask-experimental", @@ -102,3 +110,22 @@ def test_broadcast_join_limit(broadcast_join_limit): else: # Expect broadcast join assert len(shuffle_nodes) == 0 + + +def test_join_then_shuffle(left, right): + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + executor_options={ + "max_rows_per_partition": 2, + "broadcast_join_limit": 1, + }, + ) + q = left.join(right, on="y", how="inner").select( + pl.col("x").sum(), + pl.col("xx").mean(), + pl.col("y").n_unique(), + (pl.col("y") * pl.col("y")).n_unique().alias("y2"), + ) + + assert_gpu_result_equal(q, engine=engine, check_row_order=False) diff --git a/python/cudf_polars/tests/experimental/test_select.py b/python/cudf_polars/tests/experimental/test_select.py index 47982e9d1fe..e83c98ef833 100644 --- a/python/cudf_polars/tests/experimental/test_select.py +++ b/python/cudf_polars/tests/experimental/test_select.py @@ -27,6 +27,7 @@ def df(): { "a": [1, 2, 3, 4, 5, 6, 7], "b": [1, 1, 1, 1, 1, 1, 1], + "c": [2, 4, 6, 8, 10, 12, 14], } ) @@ -48,11 +49,12 @@ def test_select_reduce_fallback(df, fallback_mode): "max_rows_per_partition": 3, }, ) - match = "This selection not support for multiple partitions." + match = "This selection is not supported for multiple partitions." query = df.select( (pl.col("a") + pl.col("b")).max(), - (pl.col("a") * 2 + pl.col("b")).alias("d").mean(), + # NOTE: We don't support `median` yet + (pl.col("a") * 2 + pl.col("b")).alias("d").median(), ) if fallback_mode == "silent": @@ -61,7 +63,7 @@ def test_select_reduce_fallback(df, fallback_mode): ctx = pytest.raises(pl.exceptions.ComputeError, match=match) elif fallback_mode == "foo": ctx = pytest.raises( - pl.exceptions.ComputeError, match="not a supported 'fallback_mode' option" + pl.exceptions.ComputeError, match="not a supported 'fallback_mode'" ) else: ctx = pytest.warns(UserWarning, match=match) @@ -69,6 +71,29 @@ def test_select_reduce_fallback(df, fallback_mode): assert_gpu_result_equal(query, engine=engine) +@pytest.mark.parametrize( + "aggs", + [ + ( + (pl.col("a") + pl.col("b")).sum(), + (pl.col("a") * 2 + pl.col("b")).alias("d").min(), + ), + (pl.col("a").min() + pl.col("b").max(),), + (pl.col("a") - (pl.col("b") + pl.col("c").max()).sum(),), + (pl.col("b").len(),), + (pl.col("a") - (pl.col("b") + pl.col("c").max()).mean(),), + ( + pl.col("b").n_unique().cast(pl.Int32), + (pl.col("c").n_unique() + 1).cast(pl.Int32), + ), + ], +) +def test_select_aggs(df, engine, aggs): + # Test supported aggs (e.g. "min", "max", "mean", "n_unique") + query = df.select(*aggs) + assert_gpu_result_equal(query, engine=engine) + + def test_select_with_cse_no_agg(df, engine): expr = pl.col("a") + pl.col("a") query = df.select(expr, (expr * 2).alias("b"), ((expr * 2) + 10).alias("c"))