Enable multi-partition Select operations containing basic aggregations#17941
Enable multi-partition Select operations containing basic aggregations#17941rjzamora wants to merge 70 commits intorapidsai:branch-25.06from
Select operations containing basic aggregations#17941Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
| def rename_agg(agg: Agg, new_name: str): | ||
| """Modify the name of an aggregation expression.""" | ||
| return CachingVisitor( | ||
| replace_sub_expr, | ||
| state={"mapping": {agg: Agg(agg.dtype, new_name, agg.options, *agg.children)}}, | ||
| )(agg) |
There was a problem hiding this comment.
"Renaming" feels like the wrong thing, because the options for one agg might not apply to the options of another.
There was a problem hiding this comment.
the options for one agg might not apply to the options of another
Yeah, that's totally true. We definitely need to figure out how the options "plumbing" should work here. I was hoping the options would normally translate in a trivial way, but I don't have a great sense for the range of possibilities.
There was a problem hiding this comment.
I think we just need pattern rules that map a global Agg into a pair of (local_aggs, finalise_agg) or whatever it looks like.
There was a problem hiding this comment.
Yeah, for this iteration we can just pass in None for the "new" options by default (since that's actually the "correct" thing to do for the supported aggregations that require "renaming").
It will be useful to serialize individual columns during multi-GPU cudf-polars execution. For example, the `Expr`-decomposition approach proposed in #17941 may "require" `Column` serialization (or an ugly workaround). Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Matthew Murray (https://github.com/Matt711) - Lawrence Mitchell (https://github.com/wence-) URL: #17990
| children = [child for child in traversal([old]) if isinstance(child, FusedExpr)] | ||
| new = FusedExpr(old.dtype, old, *children) | ||
| mapper = CachingVisitor(replace_sub_expr, state={"mapping": {old: new}}) | ||
| root = mapper(root) |
There was a problem hiding this comment.
It seems like it should be possible to write this function as a single bottom up visit rather than this approach which does a fixed-point iteration with three full tree-traversals per iteration.
Can you describe what rewrite patterns at a node are?
I think it's something like:
def _decompose(expr: Expr, rec: ExprTransformer):
new_children = tuple(map(rec, expr.children))
fused_children = tuple(c for c in new_children if isinstance(c, FusedExpr))
if fused_children:
return FusedExpr(expr.dtype, expr, fused_children)
elif not e.is_pointwise:
# check for supported case of `Agg`...
return FusedExpr(expr.dtype, expr, ())
else:
# pointwise, no fused children
return expr
def decompose_expr_graph(expr):
mapper = CachingVisitor(_decompose)
return mapper(expr)
Do I have it right?
There was a problem hiding this comment.
Yeah, kind of. I played with this for a bit, and I think I have something working now.
…are explicitly defined
|
Update: This PR now depends on #18405 |
wence-
left a comment
There was a problem hiding this comment.
Lots of questions/comments. I think this is pretty good but there are a few places where I'm confused about the logic
| def collect_agg(self, *, depth: int) -> AggInfo: # pragma: no cover | ||
| """Collect information about aggregations in groupbys.""" | ||
| return self.sub_expr.collect_agg(depth=depth) | ||
| assert all( |
There was a problem hiding this comment.
These should be independent of the size of the graph, do you mean "large expressions"?
| child_ir_count | ||
| Partition count for the child-IR node. |
There was a problem hiding this comment.
Do you mean "child" here, or do you mean the partition count of the input frame?
There was a problem hiding this comment.
I use the term child-IR to mean the input dataframe in most places, since this IR node is the child of the overarching Select node (that owns this expression we are processing). This is indeed, not an expression child.
Would you prefer "input IR"? I just want to distinguish this input IR node from the IR node that "owns" the expression. Otherwise, I don't care much about the naming.
| if skip_fused_exprs: | ||
| continue # Stay within the current sub expression |
There was a problem hiding this comment.
If you add:
diff --git a/python/cudf_polars/cudf_polars/dsl/traversal.py b/python/cudf_polars/cudf_polars/dsl/traversal.py
index 9c45a68812..50a643e2cc 100644
--- a/python/cudf_polars/cudf_polars/dsl/traversal.py
+++ b/python/cudf_polars/cudf_polars/dsl/traversal.py
@@ -49,6 +49,39 @@ def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]:
lifo.append(child)
+def cutoff_traversal(
+ nodes: Sequence[NodeT], *, cutoff_types: tuple[type[NodeT], ...]
+) -> Generator[NodeT, None, None]:
+ """
+ Pre-order traversal of nodes in an expression.
+
+ Parameters
+ ----------
+ nodes
+ Roots of expressions to traverse.
+ cutoff_types
+ Types to terminate traversal at. If a type is in this tuple
+ then we do not yield any of its children.
+
+ Yields
+ ------
+ Unique nodes in the expressions, parent before child, children
+ in-order from left to right.
+ """
+ seen = set(nodes)
+ lifo = list(nodes)
+
+ while lifo:
+ node = lifo.pop()
+ yield node
+ if isinstance(node, cutoff_types):
+ continue
+ for child in reversed(node.children):
+ if child not in seen:
+ seen.add(child)
+ lifo.append(child)
+
+
def reuse_if_unchanged(node: NodeT, fn: GenericTransformer[NodeT, NodeT]) -> NodeT:
"""
Recipe for transforming nodes that returns the old object if unchanged.and then use (at the top of this function):
traverse = partial(cutoff_traversal, cutoff_types=(FusedExpr,)) else traversal
...
for e in exprs:
for node in list(traverse([expr]))[::-1]:
...
You won't needlessly traverse the full tree when skipping fused exprs in the children.
There was a problem hiding this comment.
Yeah, I was tempted to do this before, but held off. Shall I just add an optional cutoff_types argument to traversal rather than duplicate most of the logic?
There was a problem hiding this comment.
Yeah, I think that's reasonable.
| e.is_pointwise or isinstance(e, FusedExpr) | ||
| for e in traversal(list(sub_expr.children)) | ||
| ), f"Invalid FusedExpr sub-expression: {sub_expr}" |
There was a problem hiding this comment.
I would prefer to validate this once before lowering to a graph. Rather than on construction. This is where it's "annoying" that FusedExprs have two types of children. The actual sub expressions and the FusedExpr "children".
| ) | ||
|
|
||
| pi: MutableMapping[IR, PartitionInfo] = {child: child_partition_info} | ||
| schema = {col.name: col.dtype for col in traversal([on]) if isinstance(col, Col)} |
There was a problem hiding this comment.
This seems very wrong, it's extracting all columns that are leaves of on (an arbitrary expression that we are shuffling on) and using those as the schema of the Select call, which only selects a single column (shuffle_on).
What are you trying to do here?
I suppose what you're trying to do is determine which columns of the child are required to evaluate shuffle_on?
There was a problem hiding this comment.
I can avoid all this for now if you'd rather keep things simple (and there could definitely be a mistake).
The general idea is that we need to shuffle the input IR node to evaluate this expression, but the output is only a single column. So, in order to avoid shuffling a bunch of columns that we don't actually need to evaluate the expression, we (1) look for the underlying columns needed by the expression, (2) drop all other columns from the input IR node, and (3) shuffle only the columns we need.
There was a problem hiding this comment.
That makes sense, but those columns need to be in the list of expressions you're selecting
There was a problem hiding this comment.
but those columns need to be in the list of expressions you're selecting
I don't think I understand this comment very well. Are you saying that we cannot drop columns because we might need it in the overarching Select, even if we don't need it for the current FusedExpr we are processing in this function?
When we drop columns and shuffle IR, the modified/shuffled IR is only used for this specific FusedExpr. If other FusedExpr nodes depended on other columns, they would still be referencing the original input IR.
There was a problem hiding this comment.
As I understand it, you're shuffling on some expression A. This expression depends on some columns (B, C). The child input might have additional columns (D, E, ...). And you want to ensure that you only move around the data you need to evaluate A.
So I think this is:
necessary_columns = [e for e in traversal([on]) if isinstance(e, expr.Col)]
input = Select({c.name: c.dtype for c in necessary_columns},
necessary_columns,
False,
child)
shuffled = Shuffle(..., shuffle_on, input)
No?
| child = Select( | ||
| schema, | ||
| shuffle_on, | ||
| False, # noqa: FBT003 | ||
| child, | ||
| ) |
There was a problem hiding this comment.
question: it seems like this Select might induce a shuffle itself (if, for example, shuffle_on contains an n_unique aggregation, or later a sort). Will that break anything? Or is it guaranteed not to be the case?
There was a problem hiding this comment.
Will that break anything? Or is it guaranteed not to be the case?
This is guaranteed not to be the case, because we have decomposed everything into a graph of FusedExpr nodes. A FusedExpr may only contain a single non-pointwise expression.
There was a problem hiding this comment.
Actually, it is possible that two distinct FusedExpr nodes will end up shuffling the same input IR multiple times (on the same or different columns). I haven't thought through ways to optimize this case, but I don't think anything should "break".
| if set(schema) != set(child.schema): | ||
| # Drop unnecessary columns before the shuffle | ||
| child = Select( | ||
| schema, |
There was a problem hiding this comment.
The schema of this select is {named_expr.name: named_expr.value.dtype}
There was a problem hiding this comment.
Right, but this is function is creating a new IR node that is shuffled (and that the original Select can be called on in a pointwise fashion). This temporary schema may have multiple columns (e.g. if we are selecting the uniques of some binary op between two columns).
There was a problem hiding this comment.
But that's not what this Select expression does: it produces a dataframe with a single output column that is the result of evaluating shuffle_on on the input.
|
Update: I decided that I don't like how much work we are doing during graph-construction time. I prefer handling the decomposition entirely during the lowering/re-write process. I'm exploring a slight refactoring of this PR in rjzamora:complex-aggregations-refactor. It may take me a few more hours tomorrow morning to clean that up and get shuffling implemented for |
|
Update: It is likely that #18492 will be superseding this PR. That PR accomplishes what we need without introducing any new (or |
|
Closing in favor of #18492 |

Description
The overall goal is to enable us to decompose arbitrary
Exprgraphs containing one or more "non-pointwise" nodes. In order to achieve this, I propose that we add an experimentalFusedExprclass (and relatedExpr-graph decomposition utilities). The general idea is that we can iteratively traverse anExpr-graph in reverse-topological order, and rewrite the graph until it is entirely composed ofFusedExprnodes. From there, it becomes relatively simple to build the task graph for eachFusedExprnode independently.Checklist