Support multi-partition Select operations with aggregations#18492
Support multi-partition Select operations with aggregations#18492rapids-bot[bot] merged 20 commits intorapidsai:branch-25.06from
Select operations with aggregations#18492Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
|
/ok to test |
@rjzamora, there was an error processing your request: See the following link for more information: https://docs.gha-runners.nvidia.com/cpr/e/1/ |
wence-
left a comment
There was a problem hiding this comment.
This looks pretty good I think. I have relatively minor comments
| # Try decomposing the underlying expressions | ||
| return decompose_select( | ||
| ir, child, partition_info, rec.state["config_options"] | ||
| ) |
There was a problem hiding this comment.
I was thinking that you could do the rewrite as a pass over the IR representation before lowering and assigning partitioning (which I think would make things a bit simpler), but I guess you don't want to do the "complicated" thing if there's only a single partition so we need to do things here.
There was a problem hiding this comment.
There is also the fact that we don't handle iterative lowering at the moment, so it would take a larger diff to make something like that work. We would run into Select(Agg) nodes during the lowering stage and need to know (or deduce) that those nodes are already decomposed.
There was a problem hiding this comment.
However, I do think your right that the key problem is that you don't want to do the decomposition unless you need to.
| See Also | ||
| -------- | ||
| _add_select_ir | ||
| _decompose_expr_node | ||
|
|
||
| Notes | ||
| ----- | ||
| This function is called by ``_decompose_expr_node`` to decompose | ||
| an Agg node into multiple IR nodes. The new IR nodes are added | ||
| with ``_add_select_ir``. |
There was a problem hiding this comment.
nit: As above, I am not sure these xrefs add much to locally understanding how to use this function/what it does, and would perhaps be better served by an overview module-level docstring/comment.
| columns, input_ir, partition_info = select( | ||
| [Cast(agg.dtype, agg)], | ||
| input_ir, | ||
| partition_info, | ||
| names=names, | ||
| repartition=True, | ||
| ) | ||
|
|
||
| # Combined stage | ||
| (column,) = columns | ||
| columns, input_ir, partition_info = select( | ||
| [Agg(agg.dtype, "sum", None, column)], | ||
| input_ir, | ||
| partition_info, | ||
| names=names, | ||
| ) |
There was a problem hiding this comment.
To understand this, is there a reason we can't put the cast into the final sum aggregation? Ah, it's the repartitioning step?
| schema: MutableMapping[str, Any] = {} | ||
| for ir in unique_input_irs: | ||
| schema.update(ir.schema) |
There was a problem hiding this comment.
nit (maybe a followup): should we check that none of the column names overlap?
|
/merge |
Description
This PR supersedes #17941
In contrast to 17941, this PR does not introduce any new task-graph logic. Instead, complex expression graphs (expression graphs containing non-pointwise nodes) are decomposed into multiple
IRnodes.The design used in this PR is probably more intuitive than
FusedExprconcept.Illustration
TODO:
FusedExpr.Checklist