-
Notifications
You must be signed in to change notification settings - Fork 7k
[Data] - Unify Project Operator to use Expressions #57076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] - Unify Project Operator to use Expressions #57076
Conversation
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request is a significant refactoring to unify projection-like operations (select_columns, with_column, rename_columns) under a single Project logical operator that uses expressions. This is a great improvement for consistency and maintainability. The changes include moving the expression evaluator, implementing a visitor pattern for evaluation, and rewriting the projection pushdown logic to correctly fuse chains of projection operations. The new logic is more robust and handles complex cases like column swaps and chained renames correctly. I've found a few minor issues with type hints and docstrings that could be improved for clarity. Overall, this is a high-quality contribution.
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request is a significant and well-executed refactoring that unifies the select_columns, rename_columns, and with_column operations into a single expression-based Project operator. This change enhances the planner's optimization capabilities. The implementation is solid, introducing a visitor pattern for expression evaluation and rewriting the projection pushdown logic to correctly handle complex chains of operations. My review focuses on the new, complex logic in the planner and expression evaluator, with suggestions to improve type hint accuracy and simplify some logic for better long-term maintainability.
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Show resolved
Hide resolved
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request is an excellent and significant refactoring that unifies the Project operator to be fully expression-driven. The changes, including moving the expression evaluator to an internal module, adopting the visitor pattern for evaluation, and overhauling the projection pushdown logic, greatly improve the code's structure, maintainability, and correctness. The new ProjectionPushdown implementation is particularly impressive in how it handles complex chains of select, with_column, and rename operations. The addition of comprehensive tests is also a major plus. I've found one minor issue related to error handling that could be improved.
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Show resolved
Hide resolved
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam V. <[email protected]>
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request is a major and well-executed refactoring that unifies select_columns, with_column, and rename_columns under a single expression-based Project logical operator. This is a significant improvement that simplifies the logical plan and enables more powerful and robust optimizations, particularly for projection fusion and pushdown. The changes are extensive, touching the logical operators, the physical planner, the expression evaluation logic, and the public Dataset API. The new implementation of projection pushdown is much cleaner and correctly handles complex chains of operations. The introduction of a visitor-based expression evaluator is also a solid design choice. I've found one minor issue with error handling in a helper function, but overall this is an excellent refactoring.
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Show resolved
Hide resolved
Signed-off-by: Goutam V. <[email protected]>
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
python/ray/data/_internal/planner/exchange/shuffle_task_spec.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Show resolved
Hide resolved
Signed-off-by: Goutam <[email protected]>
python/ray/data/_internal/planner/exchange/shuffle_task_spec.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Goutam <[email protected]>
python/ray/data/_internal/planner/exchange/shuffle_task_spec.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Goutam <[email protected]>
python/ray/data/_internal/planner/exchange/shuffle_task_spec.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
python/ray/data/_internal/planner/exchange/shuffle_task_spec.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request is a significant and well-executed refactoring that unifies various column operations (with_column, select_columns, rename_columns) under a single expression-based Project operator. Moving the expression evaluation logic to an internal module and adopting the visitor pattern is a great architectural improvement. The introduction of StarColumnsExpr and star() provides a clean mechanism for additive column operations. The rewrite of the projection pushdown rule is particularly impressive, enabling more powerful and robust fusion of consecutive Project operators. Overall, these changes greatly improve the clarity, consistency, and maintainability of the Ray Data API and its underlying implementation. I've included a few suggestions for further improvement.
python/ray/data/_internal/planner/exchange/shuffle_task_spec.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Goutam <[email protected]>
| elif _is_pa_string_like(x) and isinstance(x, (pa.Array, pa.ChunkedArray)): | ||
| x = _pa_decode_dict_string_array(x) | ||
| else: | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
| # If any expression is star(), we need all columns | ||
| if any(isinstance(expr, StarColumnsExpr) for expr in exprs): | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This encoding (none -> all) is implicit -- let's make it explicit.
To make it explicit we'd need to add an expression resolution (against schema) step. We don't need to do in this PR, but let's cut a ticket and link it here
| """ | ||
| referenced_columns: Set[str] = set() | ||
|
|
||
| def visit_expr(expr: Expr) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Make all traversals a corresponding Visitor (and make visitors w/o state singletons)
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| class _ColumnReferenceCollector(_ExprVisitor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is impossible to review now
| operand: Expr | ||
|
|
||
| data_type: DataType = field(init=False) | ||
| data_type: DataType = field(default_factory=lambda: DataType(object), init=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this default mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment in new PR. It's the default return dtype. For UnaryExpr it should actually be bool not object.
Without this defined, further expressions can't be chained. If I have expr1.is_not_null().alias('x') then we need to ensure that AliasExpr gets the datatype propagated from the UnaryExpr
python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
Show resolved
Hide resolved
| if len(exprs) == 1 and isinstance(exprs[0], StarColumnsExpr): | ||
| return block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expand stars before continuing processing (will later move this into resolving stage)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean creating a list of col expressions?
- That requires the schema
- If I have 100k+ columns, isn't that slow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed this in new PR
| if isinstance(expr, StarColumnsExpr): | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in new PR
|
Closing in favor of #57855 |
Why are these changes needed?
Makes the following changes:
rename_columns,with_column,select_columnsand removecolsandcols_renamein ProjectRelated issue number
Closes #56878, #57700
Checks
git commit -s) in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.Note
Switch Project/column ops to named expression lists with preserve_existing, move/evolve expression evaluation to internal native Block visitor, and implement robust Project fusion/pushdown; plus Arrow/Pandas block fixes and test updates.
_internal/planner/plan_expression/expression_evaluator.py; add native Block/BlockColumn visitor (NativeExpressionEvaluator) and centralize op maps.eval_expr(Arrow/Pandas blocks, UDF planner, tests).Projectto acceptexprs: List[Expr]with enforced names andpreserve_existingflag; removecols/cols_renamepaths.plan_project_op) to compute expressions, fill/upsert columns, and finalize schema based onpreserve_existing.with_column,select_columns,rename_columnsto build named expressions;countuses empty exprs withpreserve_existing=False.Projects (handles renames, substitution, and visibility viapreserve_existing).fill_columnusesupsert_column;num_rowsreturns rawnum_rows.UnaryExprdefaultdata_typeinitialization.Written by Cursor Bugbot for commit dbf609a. This will update automatically on new commits. Configure here.