-
Notifications
You must be signed in to change notification settings - Fork 7k
[Data] - Add Predicate Pushdown Rule #58150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3830608
136a2c1
cf2b4a5
02c83e9
8012a50
00e2b5a
f73f834
e69d031
b08a6e9
775f1f5
82d516a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -56,6 +56,7 @@ | |
| _has_file_extension, | ||
| _resolve_paths_and_filesystem, | ||
| ) | ||
| from ray.data.expressions import Expr | ||
| from ray.util.debug import log_once | ||
|
|
||
| if TYPE_CHECKING: | ||
|
|
@@ -286,7 +287,7 @@ def __init__( | |
| self._file_metadata_shuffler = None | ||
| self._include_paths = include_paths | ||
| self._partitioning = partitioning | ||
|
|
||
| self._predicate_expr: Optional[Expr] = None | ||
| if shuffle == "files": | ||
| self._file_metadata_shuffler = np.random.default_rng() | ||
| elif isinstance(shuffle, FileShuffleConfig): | ||
|
|
@@ -362,6 +363,12 @@ def get_read_tasks( | |
| ) | ||
|
|
||
| read_tasks = [] | ||
| filter_expr = ( | ||
| self._predicate_expr.to_pyarrow() | ||
| if self._predicate_expr is not None | ||
| else None | ||
| ) | ||
|
|
||
| for fragments, paths in zip( | ||
| np.array_split(pq_fragments, parallelism), | ||
| np.array_split(pq_paths, parallelism), | ||
|
|
@@ -411,6 +418,7 @@ def get_read_tasks( | |
| f, | ||
| include_paths, | ||
| partitioning, | ||
| filter_expr, | ||
| ), | ||
| meta, | ||
| schema=target_schema, | ||
|
|
@@ -434,6 +442,9 @@ def supports_distributed_reads(self) -> bool: | |
| def supports_projection_pushdown(self) -> bool: | ||
| return True | ||
|
|
||
| def supports_predicate_pushdown(self) -> bool: | ||
| return True | ||
|
|
||
| def get_current_projection(self) -> Optional[List[str]]: | ||
| # NOTE: In case there's no projection both file and partition columns | ||
| # will be none | ||
|
|
@@ -456,6 +467,35 @@ def apply_projection( | |
|
|
||
| return clone | ||
|
|
||
| # TODO: This should be moved to the Datasource class | ||
| def apply_predicate( | ||
| self, | ||
| predicate_expr: Expr, | ||
| ) -> "ParquetDatasource": | ||
| from ray.data._internal.planner.plan_expression.expression_visitors import ( | ||
| _ColumnRefRebindingVisitor, | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ) | ||
| from ray.data.expressions import col | ||
|
|
||
| clone = copy.copy(self) | ||
| # Handle column renaming for Ray Data expressions | ||
| if self._data_columns_rename_map: | ||
| # Create mapping from new column names to old column names | ||
| column_mapping = { | ||
| new_col: col(old_col) | ||
|
||
| for old_col, new_col in self._data_columns_rename_map.items() | ||
| } | ||
| visitor = _ColumnRefRebindingVisitor(column_mapping) | ||
| predicate_expr = visitor.visit(predicate_expr) | ||
|
|
||
| # Combine with existing predicate using AND | ||
| if clone._predicate_expr is not None: | ||
| clone._predicate_expr = clone._predicate_expr & predicate_expr | ||
| else: | ||
| clone._predicate_expr = predicate_expr | ||
|
|
||
| return clone | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def _estimate_in_mem_size(self, fragments: List[_ParquetFragment]) -> int: | ||
| in_mem_size = sum([f.file_size for f in fragments]) * self._encoding_ratio | ||
|
|
||
|
|
@@ -473,6 +513,7 @@ def read_fragments( | |
| fragments: List[_ParquetFragment], | ||
| include_paths: bool, | ||
| partitioning: Partitioning, | ||
| filter_expr: Optional["pyarrow.dataset.Expression"] = None, | ||
| ) -> Iterator["pyarrow.Table"]: | ||
| # This import is necessary to load the tensor extension type. | ||
| from ray.data.extensions.tensor_extension import ArrowTensorType # noqa | ||
|
|
@@ -494,6 +535,7 @@ def read_fragments( | |
| partition_columns=partition_columns, | ||
| partitioning=partitioning, | ||
| include_path=include_paths, | ||
| filter_expr=filter_expr, | ||
| batch_size=default_read_batch_size_rows, | ||
| to_batches_kwargs=to_batches_kwargs, | ||
| ), | ||
|
|
@@ -532,7 +574,14 @@ def _read_batches_from( | |
| # NOTE: Passed in kwargs overrides always take precedence | ||
| # TODO deprecate to_batches_kwargs | ||
| use_threads = to_batches_kwargs.pop("use_threads", use_threads) | ||
| filter_expr = to_batches_kwargs.pop("filter", filter_expr) | ||
| # TODO: We should deprecate filter through the read_parquet API and only allow through dataset.filter() | ||
| if to_batches_kwargs.get("filter") is not None: | ||
| filter_from_kwargs = to_batches_kwargs.get("filter") | ||
| if filter_expr is not None: | ||
| filter_expr = filter_expr & filter_from_kwargs | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| else: | ||
| filter_expr = filter_from_kwargs | ||
| to_batches_kwargs.pop("filter") | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # NOTE: Arrow's ``to_batches`` expects ``batch_size`` as an int | ||
| if batch_size is not None: | ||
| to_batches_kwargs.setdefault("batch_size", batch_size) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| from ray.data._internal.logical.interfaces import ( | ||
| LogicalOperator, | ||
| LogicalOperatorSupportsPredicatePushdown, | ||
| LogicalPlan, | ||
| Rule, | ||
| ) | ||
| from ray.data._internal.logical.operators.map_operator import Filter | ||
|
|
||
|
|
||
| class PredicatePushdown(Rule): | ||
| """Pushes down predicates across the graph. | ||
| This rule performs the following optimizations: | ||
| 1. Combines chained Filter operators with compatible expressions | ||
| 2. Pushes filter expressions down to operators that support predicate pushdown | ||
| """ | ||
|
|
||
| def apply(self, plan: LogicalPlan) -> LogicalPlan: | ||
| """Apply predicate pushdown optimization to the logical plan.""" | ||
| dag = plan.dag | ||
| while True: | ||
| new_dag = dag._apply_transform(self._try_fuse_filters) | ||
| new_dag = new_dag._apply_transform(self._try_push_down_predicate) | ||
| if new_dag is dag: | ||
| break | ||
| dag = new_dag | ||
| return LogicalPlan(dag, plan.context) | ||
|
|
||
| @classmethod | ||
| def _try_fuse_filters(cls, op: LogicalOperator) -> LogicalOperator: | ||
| """Fuse consecutive Filter operators with compatible expressions.""" | ||
| if not isinstance(op, Filter) or not op.is_expression_based(): | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return op | ||
|
|
||
| input_op = op.input_dependencies[0] | ||
| if not isinstance(input_op, Filter) or not input_op.is_expression_based(): | ||
| return op | ||
|
|
||
| # Check if predicates are of the same type | ||
| if type(op._predicate_expr) is not type(input_op._predicate_expr): | ||
| return op | ||
goutamvenkat-anyscale marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # Combine predicates | ||
| combined_predicate = op._predicate_expr & input_op._predicate_expr | ||
|
|
||
| # Create new filter on the input of the lower filter | ||
| return Filter( | ||
| input_op.input_dependencies[0], | ||
| predicate_expr=combined_predicate, | ||
| ) | ||
|
|
||
| @classmethod | ||
| def _try_push_down_predicate(cls, op: LogicalOperator) -> LogicalOperator: | ||
| """Push Filter down to any operator that supports predicate pushdown.""" | ||
| if not isinstance(op, Filter) or not op.is_expression_based(): | ||
| return op | ||
|
|
||
| input_op = op.input_dependencies[0] | ||
|
|
||
| # Check if the input operator supports predicate pushdown | ||
| if ( | ||
| isinstance(input_op, LogicalOperatorSupportsPredicatePushdown) | ||
| and input_op.supports_predicate_pushdown() | ||
| ): | ||
| # Push the predicate down and return the result without the filter | ||
| return input_op.apply_predicate(op._predicate_expr) | ||
|
|
||
| return op | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much work is actually push our expr all the way into the reader itself?
If not a lot let's do the right thing right away (otherwise do it in stacked PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm already pushing this into the reader.
apply_predicateshould change the_predicate_exprwhich then callsto_pyarrow()to convert to apyarrow.dataset.expressionwhich then gets sent tofragment.to_batches()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant, threading of our expressions instead of PA ones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow. How will pyarrow accept Ray Data's Expressions? At some point we have to convert before calling
to_batches()right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. As part of the next PR, I'll refactor the remaining 2 functions that are not managed by Pyarrow to only pass in Ray Data's Expr