diff --git a/ci/run_cudf_polars_pytests.sh b/ci/run_cudf_polars_pytests.sh index c10612a065a..bf5a3ccee8e 100755 --- a/ci/run_cudf_polars_pytests.sh +++ b/ci/run_cudf_polars_pytests.sh @@ -8,4 +8,8 @@ set -euo pipefail # Support invoking run_cudf_polars_pytests.sh outside the script directory cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../python/cudf_polars/ +# Test the default "cudf" executor python -m pytest --cache-clear "$@" tests + +# Test the "dask-experimental" executor +python -m pytest --cache-clear "$@" tests --executor dask-experimental diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 7915c9e6b18..6f8d141d246 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -135,6 +135,7 @@ def _callback( *, device: int | None, memory_resource: int | None, + executor: str | None, ) -> pl.DataFrame: assert with_columns is None assert pyarrow_predicate is None @@ -145,7 +146,14 @@ def _callback( set_device(device), set_memory_resource(memory_resource), ): - return ir.evaluate(cache={}).to_polars() + if executor is None or executor == "cudf": + return ir.evaluate(cache={}).to_polars() + elif executor == "dask-experimental": + from cudf_polars.experimental.parallel import evaluate_dask + + return evaluate_dask(ir).to_polars() + else: + raise ValueError(f"Unknown executor '{executor}'") def validate_config_options(config: dict) -> None: @@ -162,13 +170,19 @@ def validate_config_options(config: dict) -> None: ValueError If the configuration contains unsupported options. """ - if unsupported := (config.keys() - {"raise_on_fail", "parquet_options"}): + if unsupported := ( + config.keys() + - {"raise_on_fail", "parquet_options", "parallel_options", "executor"} + ): raise ValueError( f"Engine configuration contains unsupported settings: {unsupported}" ) - assert {"chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( + assert {"blocksize", "chunked", "chunk_read_limit", "pass_read_limit"}.issuperset( config.get("parquet_options", {}) ) + assert {"parquet_blocksize", "num_rows_threshold"}.issuperset( + config.get("parallel_options", {}) + ) def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: @@ -197,6 +211,7 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: device = config.device memory_resource = config.memory_resource raise_on_fail = config.config.get("raise_on_fail", False) + executor = config.config.get("executor", None) validate_config_options(config.config) with nvtx.annotate(message="ConvertIR", domain="cudf_polars"): @@ -226,5 +241,6 @@ def execute_with_cudf(nt: NodeTraverser, *, config: GPUEngine) -> None: ir, device=device, memory_resource=memory_resource, + executor=executor, ) ) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 62a2da9dcea..863e5336614 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -683,14 +683,16 @@ class DataFrameScan(IR): This typically arises from ``q.collect().lazy()`` """ - __slots__ = ("df", "projection", "predicate") - _non_child = ("schema", "df", "projection", "predicate") + __slots__ = ("df", "projection", "predicate", "config_options") + _non_child = ("schema", "df", "projection", "predicate", "config_options") df: Any """Polars LazyFrame object.""" projection: tuple[str, ...] | None """List of columns to project out.""" predicate: expr.NamedExpr | None """Mask to apply.""" + config_options: dict[str, Any] + """GPU-specific configuration options""" def __init__( self, @@ -698,11 +700,13 @@ def __init__( df: Any, projection: Sequence[str] | None, predicate: expr.NamedExpr | None, + config_options: dict[str, Any], ): self.schema = schema self.df = df self.projection = tuple(projection) if projection is not None else None self.predicate = predicate + self.config_options = config_options self._non_child_args = (schema, df, self.projection, predicate) self.children = () @@ -1599,13 +1603,15 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR): # polars requires that all to-explode columns have the # same sub-shapes raise NotImplementedError("Explode with more than one column") + self.options = (tuple(to_explode),) elif self.name == "rename": - old, new, _ = self.options + old, new, strict = self.options # TODO: perhaps polars should validate renaming in the IR? if len(new) != len(set(new)) or ( set(new) & (set(df.schema.keys()) - set(old)) ): raise NotImplementedError("Duplicate new names in rename.") + self.options = (tuple(old), tuple(new), strict) elif self.name == "unpivot": indices, pivotees, variable_name, value_name = self.options value_name = "value" if value_name is None else value_name @@ -1623,13 +1629,20 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR): self.options = ( tuple(indices), tuple(pivotees), - (variable_name, schema[variable_name]), - (value_name, schema[value_name]), + variable_name, + value_name, ) - self._non_child_args = (name, self.options) + self._non_child_args = (schema, name, self.options) + + def get_hashable(self) -> Hashable: # pragma: no cover; Needed by experimental + """Hashable representation of the node.""" + schema_hash = tuple(self.schema.items()) + return (type(self), schema_hash, self.name, self.options, *self.children) @classmethod - def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: + def do_evaluate( + cls, schema: Schema, name: str, options: Any, df: DataFrame + ) -> DataFrame: """Evaluate and return a dataframe.""" if name == "rechunk": # No-op in our data model @@ -1651,8 +1664,8 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: ( indices, pivotees, - (variable_name, variable_dtype), - (value_name, value_dtype), + variable_name, + value_name, ) = options npiv = len(pivotees) index_columns = [ @@ -1669,7 +1682,7 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: plc.interop.from_arrow( pa.array( pivotees, - type=plc.interop.to_arrow(variable_dtype), + type=plc.interop.to_arrow(schema[variable_name]), ), ) ] @@ -1677,7 +1690,10 @@ def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: df.num_rows, ).columns() value_column = plc.concatenate.concatenate( - [df.column_map[pivotee].astype(value_dtype).obj for pivotee in pivotees] + [ + df.column_map[pivotee].astype(schema[value_name]).obj + for pivotee in pivotees + ] ) return DataFrame( [ diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index 12fc2a196cd..da506a09896 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -263,6 +263,7 @@ def _( translate_named_expr(translator, n=node.selection) if node.selection is not None else None, + translator.config.config.copy(), ) @@ -633,9 +634,10 @@ def _(node: pl_expr.Sort, translator: Translator, dtype: plc.DataType) -> expr.E @_translate_expr.register def _(node: pl_expr.SortBy, translator: Translator, dtype: plc.DataType) -> expr.Expr: + options = node.sort_options return expr.SortBy( dtype, - node.sort_options, + (options[0], tuple(options[1]), tuple(options[2])), translator.translate_expr(n=node.expr), *(translator.translate_expr(n=n) for n in node.by), ) diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/__init__.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/__init__.py new file mode 100644 index 00000000000..4e8ece354d8 --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/__init__.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +"""Experimental benchmarks.""" + +from __future__ import annotations + +__all__: list[str] = [] diff --git a/python/cudf_polars/cudf_polars/experimental/benchmarks/tpch_bench.py b/python/cudf_polars/cudf_polars/experimental/benchmarks/tpch_bench.py new file mode 100644 index 00000000000..205bc86bc35 --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/benchmarks/tpch_bench.py @@ -0,0 +1,257 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +"""Experimental TPC-H benchmarks.""" + +from __future__ import annotations + +import argparse +import time +from datetime import date + +import polars as pl + +from cudf_polars.dsl.translate import Translator +from cudf_polars.experimental.parallel import evaluate_dask + +parser = argparse.ArgumentParser( + prog="Cudf-Polars TPC-H Benchmarks", + description="Experimental Dask-Executor benchmarks.", +) +parser.add_argument( + "query", + type=int, + choices=[1, 5, 10, 18], + help="Query number.", +) +parser.add_argument( + "--path", + type=str, + default="/datasets/rzamora/tpch-data/scale-30.0", + help="Root directory path.", +) +parser.add_argument( + "--suffix", + type=str, + default=".parquet", + help="Table file suffix.", +) +parser.add_argument( + "-e", + "--executor", + default="dask", + type=str, + choices=["dask", "dask-experimental", "cudf", "polars"], + help="Executor.", +) +parser.add_argument( + "--blocksize", + default=2 * 1024**3, + type=int, + help="Approx. partition size.", +) +parser.add_argument( + "--debug", + default=False, + action="store_true", + help="Debug run.", +) +args = parser.parse_args() + + +def get_data(path, table_name, suffix=""): + """Get table from dataset.""" + return pl.scan_parquet(f"{path}/{table_name}{suffix}") + + +def q1(args): + """Query 1.""" + lineitem = get_data(args.path, "lineitem", args.suffix) + + var1 = date(1998, 9, 2) + + return ( + lineitem.filter(pl.col("l_shipdate") <= var1) + .group_by("l_returnflag", "l_linestatus") + .agg( + pl.sum("l_quantity").alias("sum_qty"), + pl.sum("l_extendedprice").alias("sum_base_price"), + (pl.col("l_extendedprice") * (1.0 - pl.col("l_discount"))) + .sum() + .alias("sum_disc_price"), + ( + pl.col("l_extendedprice") + * (1.0 - pl.col("l_discount")) + * (1.0 + pl.col("l_tax")) + ) + .sum() + .alias("sum_charge"), + pl.mean("l_quantity").alias("avg_qty"), + pl.mean("l_extendedprice").alias("avg_price"), + pl.mean("l_discount").alias("avg_disc"), + pl.len().alias("count_order"), + ) + .sort("l_returnflag", "l_linestatus") + ) + + +def q5(args): + """Query 5.""" + path = args.path + suffix = args.suffix + customer = get_data(path, "customer", suffix) + lineitem = get_data(path, "lineitem", suffix) + nation = get_data(path, "nation", suffix) + orders = get_data(path, "orders", suffix) + region = get_data(path, "region", suffix) + supplier = get_data(path, "supplier", suffix) + + var1 = "ASIA" + var2 = date(1994, 1, 1) + var3 = date(1995, 1, 1) + + return ( + region.join(nation, left_on="r_regionkey", right_on="n_regionkey") + .join(customer, left_on="n_nationkey", right_on="c_nationkey") + .join(orders, left_on="c_custkey", right_on="o_custkey") + .join(lineitem, left_on="o_orderkey", right_on="l_orderkey") + .join( + supplier, + left_on=["l_suppkey", "n_nationkey"], + right_on=["s_suppkey", "s_nationkey"], + ) + .filter(pl.col("r_name") == var1) + .filter(pl.col("o_orderdate").is_between(var2, var3, closed="left")) + .with_columns( + (pl.col("l_extendedprice") * (1 - pl.col("l_discount"))).alias("revenue") + ) + .group_by("n_name") + .agg(pl.sum("revenue")) + .sort(by="revenue", descending=True) + ) + + +def q10(args): + """Query 10.""" + path = args.path + suffix = args.suffix + customer = get_data(path, "customer", suffix) + lineitem = get_data(path, "lineitem", suffix) + nation = get_data(path, "nation", suffix) + orders = get_data(path, "orders", suffix) + + var1 = date(1993, 10, 1) + var2 = date(1994, 1, 1) + + return ( + customer.join(orders, left_on="c_custkey", right_on="o_custkey") + .join(lineitem, left_on="o_orderkey", right_on="l_orderkey") + .join(nation, left_on="c_nationkey", right_on="n_nationkey") + .filter(pl.col("o_orderdate").is_between(var1, var2, closed="left")) + .filter(pl.col("l_returnflag") == "R") + .group_by( + "c_custkey", + "c_name", + "c_acctbal", + "c_phone", + "n_name", + "c_address", + "c_comment", + ) + .agg( + (pl.col("l_extendedprice") * (1 - pl.col("l_discount"))) + .sum() + # .round(2) # TODO: Support `round` + .alias("revenue") + ) + .select( + "c_custkey", + "c_name", + "revenue", + "c_acctbal", + "n_name", + "c_address", + "c_phone", + "c_comment", + ) + .sort(by="revenue", descending=True) + .head(20) + ) + + +def q18(args): + """Query 18.""" + path = args.path + suffix = args.suffix + customer = get_data(path, "customer", suffix) + lineitem = get_data(path, "lineitem", suffix) + orders = get_data(path, "orders", suffix) + + var1 = 300 + + q1 = ( + lineitem.group_by("l_orderkey") + .agg(pl.col("l_quantity").sum().alias("sum_quantity")) + .filter(pl.col("sum_quantity") > var1) + ) + + return ( + orders.join(q1, left_on="o_orderkey", right_on="l_orderkey", how="semi") + .join(lineitem, left_on="o_orderkey", right_on="l_orderkey") + .join(customer, left_on="o_custkey", right_on="c_custkey") + .group_by("c_name", "o_custkey", "o_orderkey", "o_orderdate", "o_totalprice") + .agg(pl.col("l_quantity").sum().alias("col6")) + .select( + pl.col("c_name"), + pl.col("o_custkey").alias("c_custkey"), + pl.col("o_orderkey"), + pl.col("o_orderdate").alias("o_orderdat"), + pl.col("o_totalprice"), + pl.col("col6"), + ) + .sort(by=["o_totalprice", "o_orderdat"], descending=[True, False]) + .head(100) + ) + + +def run(args): + """Run the benchmark once.""" + t0 = time.time() + + q_id = args.query + if q_id == 1: + q = q1(args) + elif q_id == 5: + q = q5(args) + elif q_id == 10: + q = q10(args) + elif q_id == 18: + q = q18(args) + else: + raise NotImplementedError(f"Query {q_id} not implemented.") + + executor = args.executor + if executor == "polars": + result = q.collect() + else: + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental" if executor.startswith("dask") else executor, + parquet_options={"blocksize": args.blocksize, "chunked": False}, + ) + if args.debug: + ir = Translator(q._ldf.visit(), engine).translate_ir() + if executor == "cudf": + result = ir.evaluate(cache={}).to_polars() + elif executor.startswith("dask"): + result = evaluate_dask(ir).to_polars() + else: + result = q.collect(engine=engine) + + t1 = time.time() + print(result) + print(f"time is {t1-t0}") + + +if __name__ == "__main__": + run(args) diff --git a/python/cudf_polars/cudf_polars/experimental/groupby.py b/python/cudf_polars/cudf_polars/experimental/groupby.py new file mode 100644 index 00000000000..282c16856bd --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/groupby.py @@ -0,0 +1,217 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Parallel GroupBy Logic.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import pylibcudf as plc + +from cudf_polars.dsl.expr import Agg, BinOp, Cast, Col, Len, NamedExpr +from cudf_polars.dsl.ir import GroupBy, Select +from cudf_polars.experimental.parallel import ( + PartitionInfo, + _concat, + _default_lower_ir_node, + _lower_children, + _partitionwise_ir_tasks, + generate_ir_tasks, + get_key_name, +) + +if TYPE_CHECKING: + from collections.abc import MutableMapping + + from cudf_polars.dsl.ir import IR + from cudf_polars.experimental.parallel import LowerIRTransformer + + +class GroupByPart(GroupBy): + """Partitionwise groupby operation.""" + + +class GroupByTree(GroupBy): + """Groupby tree-reduction operation.""" + + +class GroupByFinalize(Select): + """Finalize a groupby aggregation.""" + + +_GB_AGG_SUPPORTED = ("sum", "count", "mean") + + +def lower_groupby_node( + ir: GroupBy, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """Rewrite a GroupBy node with proper partitioning.""" + # Lower children first + children, partition_info = _lower_children(ir, rec) + + if partition_info[children[0]].count == 1: + # Single partition + return _default_lower_ir_node(ir, rec) + + # Check that we are groupbing on element-wise + # keys (is this already guaranteed?) + for ne in ir.keys: + if not isinstance(ne.value, Col): + return _default_lower_ir_node(ir, rec) + + name_map: MutableMapping[str, Any] = {} + agg_tree: Cast | Agg | None = None + agg_requests_pwise = [] + agg_requests_tree = [] + for ne in ir.agg_requests: + name = ne.name + agg = ne.value + if isinstance(agg, Cast) and isinstance(agg.children[0], Len): + # Len + agg_requests_pwise.append(ne) + agg_tree = Cast( + agg.dtype, Agg(agg.dtype, "sum", None, Col(agg.dtype, name)) + ) + agg_requests_tree.append(NamedExpr(name, agg_tree)) + elif isinstance(agg, Agg): + # Agg + if agg.name not in _GB_AGG_SUPPORTED: + return _default_lower_ir_node(ir, rec) + + if len(agg.children) > 1: + return _default_lower_ir_node(ir, rec) + + if agg.name == "sum": + # Partwise + agg_pwise = Agg(agg.dtype, "sum", agg.options, *agg.children) + agg_requests_pwise.append(NamedExpr(name, agg_pwise)) + # Tree + agg_tree = Agg(agg.dtype, "sum", agg.options, Col(agg.dtype, name)) + agg_requests_tree.append(NamedExpr(name, agg_tree)) + elif agg.name == "count": + # Partwise + agg_pwise = Agg(agg.dtype, "count", agg.options, *agg.children) + agg_requests_pwise.append(NamedExpr(name, agg_pwise)) + # Tree + agg_tree = Agg(agg.dtype, "sum", agg.options, Col(agg.dtype, name)) + agg_requests_tree.append(NamedExpr(name, agg_tree)) + elif agg.name == "mean": + name_map[name] = {agg.name: {}} + for sub in ["sum", "count"]: + # Partwise + tmp_name = f"{name}__{sub}" + name_map[name][agg.name][sub] = tmp_name + agg_pwise = Agg(agg.dtype, sub, agg.options, *agg.children) + agg_requests_pwise.append(NamedExpr(tmp_name, agg_pwise)) + # Tree + child = Col(agg.dtype, tmp_name) + agg_tree = Agg(agg.dtype, "sum", agg.options, child) + agg_requests_tree.append(NamedExpr(tmp_name, agg_tree)) + else: + # Unsupported + return _default_lower_ir_node(ir, rec) + + gb_pwise = GroupByPart( + ir.schema, + ir.keys, + agg_requests_pwise, + ir.maintain_order, + ir.options, + *children, + ) + child_count = partition_info[children[0]].count + partition_info[gb_pwise] = PartitionInfo(count=child_count) + + gb_tree = GroupByTree( + ir.schema, + ir.keys, + agg_requests_tree, + ir.maintain_order, + ir.options, + gb_pwise, + ) + partition_info[gb_tree] = PartitionInfo(count=1) + + schema = ir.schema + output_exprs = [] + for name, dtype in schema.items(): + agg_mapping = name_map.get(name, None) + if agg_mapping is None: + output_exprs.append(NamedExpr(name, Col(dtype, name))) + elif "mean" in agg_mapping: + mean_cols = agg_mapping["mean"] + output_exprs.append( + NamedExpr( + name, + BinOp( + dtype, + plc.binaryop.BinaryOperator.DIV, + Col(dtype, mean_cols["sum"]), + Col(dtype, mean_cols["count"]), + ), + ) + ) + should_broadcast: bool = False + new_node = GroupByFinalize( + schema, + output_exprs, + should_broadcast, + gb_tree, + ) + partition_info[new_node] = PartitionInfo(count=1) + return new_node, partition_info + + +@generate_ir_tasks.register(GroupByPart) +def _( + ir: GroupByPart, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + return _partitionwise_ir_tasks(ir, partition_info) + + +def _tree_node(do_evaluate, batch, *args): + return do_evaluate(*args, _concat(batch)) + + +@generate_ir_tasks.register(GroupByTree) +def _( + ir: GroupByTree, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + child = ir.children[0] + child_count = partition_info[child].count + child_name = get_key_name(child) + name = get_key_name(ir) + + # Simple tree reduction. + j = 0 + graph: MutableMapping[Any, Any] = {} + split_every = 32 + keys: list[Any] = [(child_name, i) for i in range(child_count)] + while len(keys) > split_every: + new_keys: list[Any] = [] + for i, k in enumerate(range(0, len(keys), split_every)): + batch = keys[k : k + split_every] + graph[(name, j, i)] = ( + _tree_node, + ir.do_evaluate, + batch, + *ir._non_child_args, + ) + new_keys.append((name, j, i)) + j += 1 + keys = new_keys + graph[(name, 0)] = ( + _tree_node, + ir.do_evaluate, + keys, + *ir._non_child_args, + ) + return graph + + +@generate_ir_tasks.register(GroupByFinalize) +def _( + ir: GroupByFinalize, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + # TODO: Fuse with GroupByTree child task? + return _partitionwise_ir_tasks(ir, partition_info) diff --git a/python/cudf_polars/cudf_polars/experimental/io.py b/python/cudf_polars/cudf_polars/experimental/io.py new file mode 100644 index 00000000000..c5aa3535c39 --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/io.py @@ -0,0 +1,290 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Parallel IO Logic.""" + +from __future__ import annotations + +import math +from functools import cached_property +from typing import TYPE_CHECKING, Any, ClassVar + +import pylibcudf as plc + +from cudf_polars.dsl.ir import DataFrameScan, Scan +from cudf_polars.experimental.parallel import ( + PartitionInfo, + _default_lower_ir_node, + generate_ir_tasks, + get_key_name, +) + +if TYPE_CHECKING: + from collections.abc import MutableMapping + + from cudf_polars.dsl.ir import IR + from cudf_polars.experimental.parallel import LowerIRTransformer + + +## +## DataFrameScan +## + + +class ParDataFrameScan(DataFrameScan): + """Parallel DataFrameScan.""" + + @property + def _max_n_rows(self) -> int: + """Row-count threshold for splitting a DataFrame.""" + parallel_options = self.config_options.get("parallel_options", {}) + return parallel_options.get("num_rows_threshold", 1_000_000) + + @property + def _count(self) -> int: + """Partition count.""" + total_rows, _ = self.df.shape() + return math.ceil(total_rows / self._max_n_rows) + + def _tasks(self) -> MutableMapping[Any, Any]: + """Task graph.""" + total_rows, _ = self.df.shape() + stride = math.ceil(total_rows / self._count) + key_name = get_key_name(self) + return { + (key_name, i): ( + self.do_evaluate, + self.schema, + self.df.slice(offset, stride), + self.projection, + self.predicate, + ) + for i, offset in enumerate(range(0, total_rows, stride)) + } + + +def lower_dataframescan_node( + ir: DataFrameScan, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """Rewrite a Scan node with proper partitioning.""" + new_node = ParDataFrameScan( + ir.schema, + ir.df, + ir.projection, + ir.predicate, + ir.config_options, + ) + return new_node, {new_node: PartitionInfo(count=new_node._count)} + + +@generate_ir_tasks.register(ParDataFrameScan) +def _( + ir: ParDataFrameScan, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + return ir._tasks() + + +## +## Scan +## + + +class ParFileScan(Scan): + """Parallel scan over files.""" + + _STATS_CACHE: ClassVar[dict[int, dict[str, float]]] = {} + + def _sample_pq_statistics(self) -> dict[str, float]: + import numpy as np + import pyarrow.dataset as pa_ds + + n_sample = 3 + paths = self.paths[:n_sample] + key = hash(tuple(paths)) + try: + return self._STATS_CACHE[key] + except KeyError: + # Use average total_uncompressed_size of three files + # TODO: Use plc.io.parquet_metadata.read_parquet_metadata + column_sizes = {} + ds = pa_ds.dataset(paths, format="parquet") + for i, frag in enumerate(ds.get_fragments()): + md = frag.metadata + for rg in range(md.num_row_groups): + row_group = md.row_group(rg) + for col in range(row_group.num_columns): + column = row_group.column(col) + name = column.path_in_schema + if name not in column_sizes: + column_sizes[name] = np.zeros(n_sample, dtype="int64") + column_sizes[name][i] += column.total_uncompressed_size + + self._STATS_CACHE[key] = { + name: np.mean(sizes) for name, sizes in column_sizes.items() + } + return self._STATS_CACHE[key] + + @cached_property + def _plan(self) -> tuple[int, int]: + split, stride = 1, 1 + if self.typ == "parquet": + file_size: float = 0 + # TODO: Use system info to set default blocksize + parallel_options = self.config_options.get("parallel_options", {}) + blocksize: int = parallel_options.get("parquet_blocksize", 2 * 1024**3) + stats = self._sample_pq_statistics() + columns: list = self.with_columns or list(stats.keys()) + for name in columns: + file_size += float(stats[name]) + if file_size > 0: + if file_size > blocksize: + # Split large files + split = math.ceil(file_size / blocksize) + else: + # Aggregate small files + stride = max(int(blocksize / file_size), 1) + + # TODO: Use file sizes for csv/json? + return (split, stride) + + +def lower_scan_node( + ir: Scan, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """Rewrite a Scan node with proper partitioning.""" + if ir.typ in ("csv", "parquet", "ndjson") and ir.n_rows == -1 and ir.skip_rows == 0: + # TODO: mypy complains: ParFileScan(*ir._ctor_arguments([])) + new_node = ParFileScan( + ir.schema, + ir.typ, + ir.reader_options, + ir.cloud_options, + ir.config_options, + ir.paths, + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + split, stride = new_node._plan + if split > 1: + count = len(new_node.paths) * split + else: + count = math.ceil(len(new_node.paths) / stride) + return new_node, {new_node: PartitionInfo(count=count)} + + return _default_lower_ir_node(ir, rec) + + +def _split_read( + do_evaluate, + split_index, + total_splits, + schema, + typ, + reader_options, + config_options, + paths, + with_columns, + skip_rows, + n_rows, + row_index, + predicate, +): + if typ != "parquet": + raise NotImplementedError() + + rowgroup_metadata = plc.io.parquet_metadata.read_parquet_metadata( + plc.io.SourceInfo(paths) + ).rowgroup_metadata() + total_row_groups = len(rowgroup_metadata) + if total_splits > total_row_groups: + # Don't bother aligning on row-groups + total_rows = sum(rg["num_rows"] for rg in rowgroup_metadata) + n_rows = int(total_rows / total_splits) + skip_rows = n_rows * split_index + else: + # Align split with row-groups + rg_stride = int(total_row_groups / total_splits) + skip_rgs = rg_stride * split_index + skip_rows = ( + sum(rg["num_rows"] for rg in rowgroup_metadata[:skip_rgs]) + if skip_rgs + else 0 + ) + n_rows = sum( + rg["num_rows"] for rg in rowgroup_metadata[skip_rgs : skip_rgs + rg_stride] + ) + + # Last split should always read to end of file + if split_index == (total_splits - 1): + n_rows = -1 + + return do_evaluate( + schema, + typ, + reader_options, + config_options, + paths, + with_columns, + skip_rows, + n_rows, + row_index, + predicate, + ) + + +@generate_ir_tasks.register(ParFileScan) +def _( + ir: ParFileScan, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + key_name = get_key_name(ir) + split, stride = ir._plan + paths = list(ir.paths) + if split > 1: + # Disable chunked reader + config_options = ir.config_options.copy() + config_options["parquet_options"] = config_options.get( + "parquet_options", {} + ).copy() + config_options["parquet_options"]["chunked"] = False + + graph = {} + count = 0 + for path in paths: + for sindex in range(split): + graph[(key_name, count)] = ( + _split_read, + ir.do_evaluate, + sindex, + split, + ir.schema, + ir.typ, + ir.reader_options, + config_options, + [path], + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + count += 1 + return graph + else: + return { + (key_name, i): ( + ir.do_evaluate, + ir.schema, + ir.typ, + ir.reader_options, + ir.config_options, + paths[j : j + stride], + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + for i, j in enumerate(range(0, len(paths), stride)) + } diff --git a/python/cudf_polars/cudf_polars/experimental/join.py b/python/cudf_polars/cudf_polars/experimental/join.py new file mode 100644 index 00000000000..71b29957786 --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/join.py @@ -0,0 +1,240 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Parallel Join Logic.""" + +from __future__ import annotations + +import operator +from typing import TYPE_CHECKING, Any + +import pyarrow as pa + +import pylibcudf as plc + +from cudf_polars.containers import DataFrame +from cudf_polars.dsl.ir import Join +from cudf_polars.experimental.parallel import ( + _concat, + _default_lower_ir_node, + _lower_children, + generate_ir_tasks, + get_key_name, +) + +if TYPE_CHECKING: + from collections.abc import MutableMapping + + from cudf_polars.dsl.expr import NamedExpr + from cudf_polars.dsl.ir import IR + from cudf_polars.experimental.parallel import LowerIRTransformer, PartitionInfo + + +class BroadcastJoin(Join): + """Broadcast Join operation.""" + + +class LeftBroadcastJoin(BroadcastJoin): + """Left Broadcast Join operation.""" + + +class RightBroadcastJoin(BroadcastJoin): + """Right Broadcast Join operation.""" + + +def lower_join_node( + ir: Join, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """Rewrite a Join node with proper partitioning.""" + # TODO: Add shuffle-based join. + # (Currently using broadcast join in all cases) + + # Lower children first + children, partition_info = _lower_children(ir, rec) + + how = ir.options[0] + if how not in ("inner", "left", "right"): + # Not supported (yet) + return _default_lower_ir_node(ir, rec) + + assert len(children) == 2 + left, right = children + left_parts = partition_info[left] + right_parts = partition_info[right] + if left_parts.count == right_parts.count == 1: + # Single-partition case + return _default_lower_ir_node(ir, rec) + elif left_parts.count >= right_parts.count and how in ("inner", "left"): + # Broadcast right to every partition of left + new_node = RightBroadcastJoin( + ir.schema, + ir.left_on, + ir.right_on, + ir.options, + *children, + ) + partition_info[new_node] = partition_info[left] + else: + # Broadcast left to every partition of right + new_node = LeftBroadcastJoin( + ir.schema, + ir.left_on, + ir.right_on, + ir.options, + *children, + ) + partition_info[new_node] = partition_info[right] + return new_node, partition_info + + +def rearrange_by_column( + name_out: str, + name_in: str, + on: tuple[NamedExpr, ...], + count_in: int, + count_out: int, +) -> MutableMapping[Any, Any]: + """Shuffle on a list of columns.""" + # Simple all-to-all shuffle (for now) + split_name = f"split-{name_out}" + inter_name = f"inter-{name_out}" + + graph: MutableMapping[Any, Any] = {} + for part_out in range(count_out): + _concat_list = [] + for part_in in range(count_in): + graph[(split_name, part_in)] = ( + _split_by_column, + (name_in, part_in), + on, + count_out, + ) + _concat_list.append((inter_name, part_out, part_in)) + graph[_concat_list[-1]] = ( + operator.getitem, + (split_name, part_in), + part_out, + ) + graph[(name_out, part_out)] = (_concat, _concat_list) + return graph + + +def _split_by_column( + df: DataFrame, + on: tuple[NamedExpr, ...], + count: int, +) -> dict[int, DataFrame]: + # Extract the partition-map column + if len(on) == 1 and on[0].name == "_partitions": + # The `on` argument already contains the + # destination partition id for each row. + partition_map = on[0].evaluate(df).obj + else: + # Use murmurhash % count to choose the + # destination partition id for each row. + partition_map = plc.binaryop.binary_operation( + plc.hashing.murmurhash3_x86_32( + DataFrame([expr.evaluate(df) for expr in on]).table + ), + plc.interop.from_arrow(pa.scalar(count, type="uint32")), + plc.binaryop.BinaryOperator.PYMOD, + plc.types.DataType(plc.types.TypeId.UINT32), + ) + + # Split and return the partitioned result + return { + i: DataFrame.from_table( + split, + df.column_names, + ) + for i, split in enumerate( + plc.copying.split( + *plc.partitioning.partition( + df.table, + partition_map, + count, + ) + ) + ) + } + + +@generate_ir_tasks.register(BroadcastJoin) +def _( + ir: BroadcastJoin, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + how = ir.options[0] + left, right = ir.children + broadcast_side = "right" if isinstance(ir, RightBroadcastJoin) else "left" + if broadcast_side == "left": + bcast_name = get_key_name(left) + bcast_size = partition_info[left].count + other = get_key_name(right) + other_on = ir.right_on + bcast_on = ir.left_on + else: + bcast_name = get_key_name(right) + bcast_size = partition_info[right].count + other = get_key_name(left) + other_on = ir.left_on + bcast_on = ir.right_on + + graph: MutableMapping[Any, Any] = {} + + # Shuffle broadcasted side if necessary + if how != "inner" and bcast_size > 1: + shuffle_name = "shuffle-" + bcast_name + graph = rearrange_by_column( + shuffle_name, + bcast_name, + bcast_on, + bcast_size, + bcast_size, + ) + bcast_name = shuffle_name + + out_name = get_key_name(ir) + out_size = partition_info[ir].count + split_name = "split-" + out_name + inter_name = "inter-" + out_name + + for part_out in range(out_size): + if how != "inner": + graph[(split_name, part_out)] = ( + _split_by_column, + (other, part_out), + other_on, + bcast_size, + ) + + _concat_list = [] + for j in range(bcast_size): + _merge_args = [ + ( + ( + operator.getitem, + (split_name, part_out), + j, + ) + if how != "inner" + else (other, part_out) + ), + (bcast_name, j), + ] + if broadcast_side == "left": + _merge_args.reverse() + + inter_key = (inter_name, part_out, j) + graph[(inter_name, part_out, j)] = ( + ir.do_evaluate, + ir.left_on, + ir.right_on, + ir.options, + *_merge_args, + ) + _concat_list.append(inter_key) + if len(_concat_list) == 1: + graph[(out_name, part_out)] = graph.pop(_concat_list[0]) + else: + graph[(out_name, part_out)] = (_concat, _concat_list) + + return graph diff --git a/python/cudf_polars/cudf_polars/experimental/parallel.py b/python/cudf_polars/cudf_polars/experimental/parallel.py new file mode 100644 index 00000000000..878ce2056c9 --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/parallel.py @@ -0,0 +1,350 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Partitioned LogicalPlan nodes.""" + +from __future__ import annotations + +import operator +from functools import reduce, singledispatch +from typing import TYPE_CHECKING, Any + +from cudf_polars.dsl.ir import ( + IR, + DataFrameScan, + Filter, + GroupBy, + HStack, + Join, + Projection, + Scan, + Select, + Union, +) +from cudf_polars.dsl.traversal import traversal + +if TYPE_CHECKING: + from collections.abc import MutableMapping, Sequence + from typing import TypeAlias + + from cudf_polars.containers import DataFrame + from cudf_polars.dsl.nodebase import Node + from cudf_polars.typing import GenericTransformer + + +class PartitionInfo: + """ + Partitioning information. + + This class only tracks the partition count (for now). + """ + + __slots__ = ("count",) + + def __init__(self, count: int): + self.count = count + + +LowerIRTransformer: TypeAlias = ( + "GenericTransformer[IR, MutableMapping[IR, PartitionInfo]]" +) +"""Protocol for Lowering IR nodes.""" + + +def get_key_name(node: Node) -> str: + """Generate the key name for a Node.""" + return f"{type(node).__name__.lower()}-{hash(node)}" + + +@singledispatch +def lower_ir_node( + ir: IR, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """Rewrite an IR node with proper partitioning.""" + raise AssertionError(f"Unhandled type {type(ir)}") + + +def _lower_children( + ir: IR, rec: LowerIRTransformer +) -> tuple[tuple[IR], MutableMapping[IR, PartitionInfo]]: + children, _partition_info = zip(*(rec(c) for c in ir.children), strict=False) + partition_info: MutableMapping[IR, PartitionInfo] = reduce( + operator.or_, _partition_info + ) + return children, partition_info + + +@lower_ir_node.register(IR) +def _default_lower_ir_node( + ir: IR, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + if len(ir.children) == 0: + # Default leaf node has single partition + return ir, {ir: PartitionInfo(count=1)} + + # Lower children + children, partition_info = _lower_children(ir, rec) + + # Check that child partitioning is supported + count = max(partition_info[c].count for c in children) + if count > 1: + raise NotImplementedError( + f"Class {type(ir)} does not support multiple partitions." + ) # pragma: no cover + + # Return reconstructed node and + partition = PartitionInfo(count=1) + new_node = ir.reconstruct(children) + partition_info[new_node] = partition + return new_node, partition_info + + +def _lower_ir_node_partitionwise( + ir: IR, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + # Simple partitionwise behavior + children, partition_info = _lower_children(ir, rec) + partition = PartitionInfo(count=max(partition_info[c].count for c in children)) + new_node = ir.reconstruct(children) + partition_info[new_node] = partition + return new_node, partition_info + + +def lower_ir_graph(ir: IR) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """Rewrite an IR graph with proper partitioning.""" + from cudf_polars.dsl.traversal import CachingVisitor + + mapper = CachingVisitor(lower_ir_node) + return mapper(ir) + + +@singledispatch +def generate_ir_tasks( + ir: IR, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + """ + Generate tasks for an IR node. + + An IR node only needs to generate the graph for + the current IR logic (not including child IRs). + """ + raise AssertionError(f"Unhandled type {type(ir)}") + + +@generate_ir_tasks.register(IR) +def _default_ir_tasks( + ir: IR, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + # Single-partition default behavior. + # This is used by `generate_ir_tasks` for all unregistered IR sub-types. + if partition_info[ir].count > 1: + raise NotImplementedError( + f"Failed to generate multiple output tasks for {ir}." + ) # pragma: no cover + + child_names = [] + for child in ir.children: + child_names.append(get_key_name(child)) + if partition_info[child].count > 1: + raise NotImplementedError( + f"Failed to generate tasks for {ir} with child {child}." + ) # pragma: no cover + + key_name = get_key_name(ir) + return { + (key_name, 0): ( + ir.do_evaluate, + *ir._non_child_args, + *((child_name, 0) for child_name in child_names), + ) + } + + +def _partitionwise_ir_tasks( + ir: IR, + partition_info: MutableMapping[IR, PartitionInfo], +) -> MutableMapping[Any, Any]: + # Simple partitionwise behavior. + child_names = [] + counts = [] + for child in ir.children: + child_names.append(get_key_name(child)) + counts.append(partition_info[child].count) + counts = counts or [1] + if len(set(counts)) > 1: + raise NotImplementedError( + f"Mismatched partition counts not supported: {counts}" + ) + + key_name = get_key_name(ir) + return { + (key_name, i): ( + ir.do_evaluate, + *ir._non_child_args, + *((child_name, i) for child_name in child_names), + ) + for i in range(counts[0]) + } + + +def task_graph( + ir: IR, partition_info: MutableMapping[IR, PartitionInfo] +) -> tuple[MutableMapping[str, Any], str]: + """Construct a Dask-compatible task graph.""" + graph = reduce( + operator.or_, + [generate_ir_tasks(node, partition_info) for node in traversal(ir)], + ) + + key_name = get_key_name(ir) + partition_count = partition_info[ir].count + if partition_count: + graph[key_name] = (_concat, [(key_name, i) for i in range(partition_count)]) + else: + graph[key_name] = (key_name, 0) + + return graph, key_name + + +def evaluate_dask(ir: IR) -> DataFrame: + """Evaluate an IR graph with Dask.""" + from dask import get + + ir, partition_info = lower_ir_graph(ir) + + graph, key = task_graph(ir, partition_info) + return get(graph, key) + + +def _concat(dfs: Sequence[DataFrame]) -> DataFrame: + # Concatenate a sequence of DataFrames vertically + return Union.do_evaluate(None, *dfs) + + +## +## DataFrameScan +## + + +@lower_ir_node.register(DataFrameScan) +def _( + ir: DataFrameScan, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + import cudf_polars.experimental.io as _io + + return _io.lower_dataframescan_node(ir, rec) + + +## +## Scan +## + + +@lower_ir_node.register(Scan) +def _( + ir: Scan, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + import cudf_polars.experimental.io as _io + + return _io.lower_scan_node(ir, rec) + + +## +## Select +## + + +@lower_ir_node.register(Select) +def _( + ir: Select, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + import cudf_polars.experimental.select as _select + + return _select.lower_select_node(ir, rec) + + +## +## HStack +## + + +@lower_ir_node.register(HStack) +def _( + ir: HStack, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + return _lower_ir_node_partitionwise(ir, rec) + + +@generate_ir_tasks.register(HStack) +def _( + ir: HStack, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + return _partitionwise_ir_tasks(ir, partition_info) + + +## +## Filter +## + + +## TODO: Can filter expressions include aggregations? + + +@lower_ir_node.register(Filter) +def _( + ir: Filter, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + return _lower_ir_node_partitionwise(ir, rec) + + +@generate_ir_tasks.register(Filter) +def _( + ir: Filter, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + return _partitionwise_ir_tasks(ir, partition_info) + + +## +## Projection +## + + +@lower_ir_node.register(Projection) +def _( + ir: Projection, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + return _lower_ir_node_partitionwise(ir, rec) + + +@generate_ir_tasks.register(Projection) +def _( + ir: Projection, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + return _partitionwise_ir_tasks(ir, partition_info) + + +## +## GroupBy +## + + +@lower_ir_node.register(GroupBy) +def _( + ir: GroupBy, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + import cudf_polars.experimental.groupby as _groupby + + return _groupby.lower_groupby_node(ir, rec) + + +## +## Join +## + + +@lower_ir_node.register(Join) +def _( + ir: Join, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + import cudf_polars.experimental.join as _join + + return _join.lower_join_node(ir, rec) diff --git a/python/cudf_polars/cudf_polars/experimental/select.py b/python/cudf_polars/cudf_polars/experimental/select.py new file mode 100644 index 00000000000..1d5cebc8bed --- /dev/null +++ b/python/cudf_polars/cudf_polars/experimental/select.py @@ -0,0 +1,77 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +"""Parallel Select Logic.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from cudf_polars.dsl.ir import Select +from cudf_polars.experimental.parallel import ( + PartitionInfo, + _default_lower_ir_node, + _lower_children, + _partitionwise_ir_tasks, + generate_ir_tasks, +) + +if TYPE_CHECKING: + from collections.abc import MutableMapping + + from cudf_polars.dsl.ir import IR + from cudf_polars.experimental.parallel import LowerIRTransformer + + +_PARTWISE = ( + "Literal", + "LiteralColumn", + "Col", + "ColRef", + "BooleanFunction", + "StringFunction", + "TemporalFunction", + "Filter", + "Cast", + "Ternary", + "BinOp", + "UnaryFunction", +) + + +class PartwiseSelect(Select): + """Partitionwise Select operation.""" + + +def lower_select_node( + ir: Select, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + """Rewrite a GroupBy node with proper partitioning.""" + from cudf_polars.dsl.traversal import traversal + + # Lower children first + children, partition_info = _lower_children(ir, rec) + + # Search the expressions for "complex" operations + for ne in ir.exprs: + for expr in traversal(ne.value): + if type(expr).__name__ not in _PARTWISE: + return _default_lower_ir_node(ir, rec) + + # Remaining Select ops are partition-wise + new_node = PartwiseSelect( + ir.schema, + ir.exprs, + ir.should_broadcast, + *children, + ) + partition_info[new_node] = PartitionInfo( + count=max(partition_info[c].count for c in children) + ) + return new_node, partition_info + + +@generate_ir_tasks.register(PartwiseSelect) +def _( + ir: PartwiseSelect, partition_info: MutableMapping[IR, PartitionInfo] +) -> MutableMapping[Any, Any]: + return _partitionwise_ir_tasks(ir, partition_info) diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index ba0bb12a0fb..d986f150b2e 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -20,6 +20,11 @@ __all__: list[str] = ["assert_gpu_result_equal", "assert_ir_translation_raises"] +# Will be overriden by `conftest.py` with the value from the `--executor` +# command-line argument +Executor = None + + def assert_gpu_result_equal( lazydf: pl.LazyFrame, *, @@ -34,6 +39,7 @@ def assert_gpu_result_equal( rtol: float = 1e-05, atol: float = 1e-08, categorical_as_str: bool = False, + executor: str | None = None, ) -> None: """ Assert that collection of a lazyframe on GPU produces correct results. @@ -71,6 +77,9 @@ def assert_gpu_result_equal( Absolute tolerance for float comparisons categorical_as_str Decat categoricals to strings before comparing + executor + The executor configuration to pass to `GPUEngine`. If not specified + uses the module level `Executor` attribute. Raises ------ @@ -80,7 +89,7 @@ def assert_gpu_result_equal( If GPU collection failed in some way. """ if engine is None: - engine = GPUEngine(raise_on_fail=True) + engine = GPUEngine(raise_on_fail=True, executor=executor or Executor) final_polars_collect_kwargs, final_cudf_collect_kwargs = _process_kwargs( collect_kwargs, polars_collect_kwargs, cudf_collect_kwargs diff --git a/python/cudf_polars/tests/conftest.py b/python/cudf_polars/tests/conftest.py index 9bbce6bc080..c57e6f733cc 100644 --- a/python/cudf_polars/tests/conftest.py +++ b/python/cudf_polars/tests/conftest.py @@ -8,3 +8,19 @@ @pytest.fixture(params=[False, True], ids=["no_nulls", "nulls"], scope="session") def with_nulls(request): return request.param + + +def pytest_addoption(parser): + parser.addoption( + "--executor", + action="store", + default="cudf", + choices=("cudf", "dask-experimental"), + help="Executor to use for GPUEngine.", + ) + + +def pytest_configure(config): + import cudf_polars.testing.asserts + + cudf_polars.testing.asserts.Executor = config.getoption("--executor") diff --git a/python/cudf_polars/tests/dsl/test_traversal.py b/python/cudf_polars/tests/dsl/test_traversal.py index 2f4df9289f8..9755994c419 100644 --- a/python/cudf_polars/tests/dsl/test_traversal.py +++ b/python/cudf_polars/tests/dsl/test_traversal.py @@ -116,7 +116,11 @@ def test_rewrite_ir_node(): def replace_df(node, rec): if isinstance(node, ir.DataFrameScan): return ir.DataFrameScan( - node.schema, new_df._df, node.projection, node.predicate + node.schema, + new_df._df, + node.projection, + node.predicate, + node.config_options, ) return reuse_if_unchanged(node, rec) @@ -144,7 +148,11 @@ def test_rewrite_scan_node(tmp_path): def replace_scan(node, rec): if isinstance(node, ir.Scan): return ir.DataFrameScan( - node.schema, right._df, node.with_columns, node.predicate + node.schema, + right._df, + node.with_columns, + node.predicate, + node.config_options, ) return reuse_if_unchanged(node, rec) diff --git a/python/cudf_polars/tests/experimental/test_dataframescan.py b/python/cudf_polars/tests/experimental/test_dataframescan.py new file mode 100644 index 00000000000..0984e14cd01 --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_dataframescan.py @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars import Translator +from cudf_polars.experimental.parallel import lower_ir_graph +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.fixture(scope="module") +def df(): + return pl.LazyFrame( + { + "x": range(30_000), + "y": ["cat", "dog", "fish"] * 10_000, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 6_000, + } + ) + + +@pytest.mark.parametrize("num_rows_threshold", [1_000, 1_000_000]) +def test_parquet_blocksize(df, num_rows_threshold): + total_row_count = len(df.collect()) + engine = pl.GPUEngine( + raise_on_fail=True, + parallel_options={"num_rows_threshold": num_rows_threshold}, + executor="dask-experimental", + ) + assert_gpu_result_equal(df, engine=engine) + + # Check partitioning + qir = Translator(df._ldf.visit(), engine).translate_ir() + ir, info = lower_ir_graph(qir) + count = info[ir].count + if num_rows_threshold < total_row_count: + assert count > 1 + else: + assert count == 1 diff --git a/python/cudf_polars/tests/experimental/test_groupby.py b/python/cudf_polars/tests/experimental/test_groupby.py new file mode 100644 index 00000000000..c8ecf7d78ea --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_groupby.py @@ -0,0 +1,37 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.fixture(scope="module") +def engine(): + return pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + parallel_options={"num_rows_threshold": 100}, + ) + + +@pytest.fixture(scope="module") +def df(): + return pl.LazyFrame( + { + "x": range(30), + "y": ["cat", "dog", "fish"] * 10, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 6, + } + ) + + +@pytest.mark.parametrize("op", ["sum", "mean", "len"]) +@pytest.mark.parametrize("keys", [("y",), ("y", "z")]) +def test_parallel_groupby(df, engine, op, keys): + q = getattr(df.group_by(*keys), op)() + assert_gpu_result_equal(q, engine=engine, check_row_order=False) diff --git a/python/cudf_polars/tests/experimental/test_join.py b/python/cudf_polars/tests/experimental/test_join.py new file mode 100644 index 00000000000..c2156638d6c --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_join.py @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + + +@pytest.mark.parametrize("how", ["inner", "left", "right"]) +@pytest.mark.parametrize("num_rows_threshold", [5, 10, 15]) +def test_parallel_join(how, num_rows_threshold): + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + parallel_options={"num_rows_threshold": num_rows_threshold}, + ) + left = pl.LazyFrame( + { + "x": range(15), + "y": ["cat", "dog", "fish"] * 5, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 3, + } + ) + right = pl.LazyFrame( + { + "xx": range(6), + "y": ["dog", "bird", "fish"] * 2, + "zz": [1, 2] * 3, + } + ) + q = left.join(right, on="y", how=how) + + from cudf_polars import Translator + from cudf_polars.experimental.parallel import evaluate_dask + + qir = Translator(q._ldf.visit(), engine).translate_ir() + evaluate_dask(qir) + + # assert_gpu_result_equal(q, engine=engine, check_row_order=False) diff --git a/python/cudf_polars/tests/experimental/test_parallel.py b/python/cudf_polars/tests/experimental/test_parallel.py new file mode 100644 index 00000000000..99d212f47d3 --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_parallel.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import polars as pl +from polars import GPUEngine +from polars.testing import assert_frame_equal + +from cudf_polars import Translator +from cudf_polars.experimental.parallel import evaluate_dask +from cudf_polars.testing.asserts import Executor + + +def test_evaluate_dask(): + df = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5], "c": [5, 6, 7], "d": [7, 9, 8]}) + q = df.select(pl.col("a") - (pl.col("b") + pl.col("c") * 2), pl.col("d")).sort("d") + + config = GPUEngine(raise_on_fail=True, executor=Executor) + qir = Translator(q._ldf.visit(), config).translate_ir() + + expected = qir.evaluate(cache={}).to_polars() + got = evaluate_dask(qir).to_polars() + assert_frame_equal(expected, got) diff --git a/python/cudf_polars/tests/experimental/test_scan.py b/python/cudf_polars/tests/experimental/test_scan.py new file mode 100644 index 00000000000..e22b5d236f2 --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_scan.py @@ -0,0 +1,80 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars import Translator +from cudf_polars.experimental.parallel import lower_ir_graph +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.fixture(scope="module") +def df(): + return pl.DataFrame( + { + "x": range(30_000), + "y": ["cat", "dog", "fish"] * 10_000, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 6_000, + } + ) + + +def make_source(df, path, fmt, n_files=3): + n_rows = len(df) + stride = int(n_rows / n_files) + for i in range(n_files): + offset = stride * i + part = df.slice(offset, stride) + if fmt == "csv": + part.write_csv(path / f"part.{i}.csv") + elif fmt == "ndjson": + part.write_ndjson(path / f"part.{i}.ndjson") + else: + part.write_parquet( + path / f"part.{i}.parquet", + row_group_size=int(n_rows / 2), + ) + + +@pytest.mark.parametrize( + "fmt, scan_fn", + [ + ("csv", pl.scan_csv), + ("ndjson", pl.scan_ndjson), + ("parquet", pl.scan_parquet), + ], +) +def test_parallel_scan(tmp_path, df, fmt, scan_fn): + make_source(df, tmp_path, fmt) + q = scan_fn(tmp_path) + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + ) + assert_gpu_result_equal(q, engine=engine) + + +@pytest.mark.parametrize("blocksize", [8_000, 1_000_000]) +def test_parquet_blocksize(tmp_path, df, blocksize): + n_files = 3 + make_source(df, tmp_path, "parquet", n_files) + q = pl.scan_parquet(tmp_path) + engine = pl.GPUEngine( + raise_on_fail=True, + parallel_options={"parquet_blocksize": blocksize}, + executor="dask-experimental", + ) + assert_gpu_result_equal(q, engine=engine) + + # Check partitioning + qir = Translator(q._ldf.visit(), engine).translate_ir() + ir, info = lower_ir_graph(qir) + count = info[ir].count + if blocksize == 8_000: + assert count > n_files + else: + assert count < n_files diff --git a/python/cudf_polars/tests/test_executors.py b/python/cudf_polars/tests/test_executors.py new file mode 100644 index 00000000000..2de06e47fa1 --- /dev/null +++ b/python/cudf_polars/tests/test_executors.py @@ -0,0 +1,72 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.mark.parametrize("executor", [None, "cudf", "dask-experimental"]) +def test_executor_basics(executor): + """Test basics of each executor.""" + if executor == "dask-experimental": + pytest.importorskip("dask") + + df = pl.LazyFrame( + { + "a": pl.Series([[1, 2], [3]], dtype=pl.List(pl.Int8())), + "b": pl.Series([[1], [2]], dtype=pl.List(pl.UInt16())), + "c": pl.Series( + [ + [["1", "2", "3"], ["4", "567"]], + [["8", "9"], []], + ], + dtype=pl.List(pl.List(pl.String())), + ), + "d": pl.Series([[[1, 2]], []], dtype=pl.List(pl.List(pl.UInt16()))), + } + ) + + assert_gpu_result_equal(df, executor=executor) + + +def test_cudf_cache_evaluate(): + """Tests `cudf_polars.dsl.ir.Cache.evaluate()`.""" + ldf = pl.DataFrame( + { + "a": [1, 2, 3, 4, 5, 6, 7], + "b": [1, 1, 1, 1, 1, 1, 1], + } + ).lazy() + ldf2 = ldf.select((pl.col("a") + pl.col("b")).alias("c"), pl.col("a")) + query = pl.concat([ldf, ldf2], how="diagonal") + assert_gpu_result_equal(query, executor="cudf") + + +def test_dask_experimental_map_function_get_hashable(): + """Tests `cudf_polars.dsl.ir.MapFunction.get_hashable()`.""" + df = pl.LazyFrame( + { + "a": pl.Series([11, 12, 13], dtype=pl.UInt16), + "b": pl.Series([1, 3, 5], dtype=pl.Int16), + "c": pl.Series([2, 4, 6], dtype=pl.Float32), + "d": ["a", "b", "c"], + } + ) + q = df.unpivot(index="d") + assert_gpu_result_equal(q, executor="dask-experimental") + + +def test_unknown_executor(): + """Test invalid executor.""" + df = pl.LazyFrame({}) + + with pytest.raises( + pl.exceptions.ComputeError, + match="ValueError: Unknown executor 'unknown-executor'", + ): + assert_gpu_result_equal(df, executor="unknown-executor")