Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
fb4cb18
[Data][1/n] - Use visitor pattern for Ray Data Expressions
goutamvenkat-anyscale Sep 29, 2025
f99ae1c
Fix projection pushdown
goutamvenkat-anyscale Oct 1, 2025
275d919
Fix count
goutamvenkat-anyscale Oct 1, 2025
4c273e8
gemini comments
goutamvenkat-anyscale Oct 1, 2025
7aed3af
Clean up
goutamvenkat-anyscale Oct 1, 2025
40f724e
Merge from master
goutamvenkat-anyscale Oct 1, 2025
c879ad6
Fix filter + cleanup
goutamvenkat-anyscale Oct 1, 2025
c140ad7
Move test to fusion + fix projection fusion tests
goutamvenkat-anyscale Oct 1, 2025
f35cfd0
Fix operator fusion
goutamvenkat-anyscale Oct 1, 2025
30a7ab5
Fix parquet tests
goutamvenkat-anyscale Oct 1, 2025
6a626e8
Spelling mistake
goutamvenkat-anyscale Oct 1, 2025
f1b5d97
one more cleanup
goutamvenkat-anyscale Oct 1, 2025
73cb01b
Handle invalid orders in projection pushdown
goutamvenkat-anyscale Oct 1, 2025
dbf609a
Simplify and use pairwise fusion
goutamvenkat-anyscale Oct 1, 2025
73115b2
Merge branch 'master' into goutam/visitor_expr
goutamvenkat-anyscale Oct 2, 2025
7120d7f
Address some comments
goutamvenkat-anyscale Oct 2, 2025
4ec2d14
Address comments
goutamvenkat-anyscale Oct 2, 2025
15bbe1e
One more cleanup
goutamvenkat-anyscale Oct 2, 2025
762f27f
Test fix
goutamvenkat-anyscale Oct 2, 2025
a142194
Merge branch 'master' into goutam/visitor_expr
goutamvenkat-anyscale Oct 3, 2025
f488532
Refactor and add some clarifying comments
goutamvenkat-anyscale Oct 5, 2025
5ac304c
Fix one class of bugs
goutamvenkat-anyscale Oct 7, 2025
282c3d5
Merge from master
goutamvenkat-anyscale Oct 7, 2025
eb52554
remove old test suite
goutamvenkat-anyscale Oct 7, 2025
ebe2441
Fix download test + comments
goutamvenkat-anyscale Oct 8, 2025
5f814f9
Add all expression
goutamvenkat-anyscale Oct 8, 2025
fb9d380
doc fix
goutamvenkat-anyscale Oct 8, 2025
f46a40f
Fix test + api_policy_check
goutamvenkat-anyscale Oct 8, 2025
cd84a20
merge from master
goutamvenkat-anyscale Oct 8, 2025
8867156
Merge branch 'master' into goutam/visitor_expr
goutamvenkat-anyscale Oct 13, 2025
c675dd6
Address comments
goutamvenkat-anyscale Oct 14, 2025
aa2a9b5
doc fix
goutamvenkat-anyscale Oct 14, 2025
29feb0f
zero_div error
goutamvenkat-anyscale Oct 14, 2025
638b9c6
Fix test
goutamvenkat-anyscale Oct 14, 2025
62f354b
Add stub column back and remove arrow block change
goutamvenkat-anyscale Oct 14, 2025
b869cfe
use visitor in pushdown
goutamvenkat-anyscale Oct 15, 2025
3df14b3
Merge branch 'master' into goutam/visitor_expr
goutamvenkat-anyscale Oct 15, 2025
6ded04a
one more visitor
goutamvenkat-anyscale Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/source/data/api/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Public API
:nosignatures:
:toctree: doc/

star
col
lit
udf
Expand All @@ -37,4 +38,5 @@ instantiate them directly, but you may encounter them when working with expressi
LiteralExpr
BinaryExpr
UnaryExpr
UDFExpr
UDFExpr
StarColumnsExpr
183 changes: 0 additions & 183 deletions python/ray/data/_expression_evaluator.py

This file was deleted.

7 changes: 5 additions & 2 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@


_MIN_PYARROW_VERSION_TO_NUMPY_ZERO_COPY_ONLY = parse_version("13.0.0")
_BATCH_SIZE_PRESERVING_STUB_COL_NAME = "__bsp_stub"


# Set the max chunk size in bytes for Arrow to Batches conversion in
Expand Down Expand Up @@ -221,7 +222,7 @@ def fill_column(self, name: str, value: Any) -> Block:

array = pyarrow.nulls(len(self._table), type=type)
array = pc.fill_null(array, value)
return self._table.append_column(name, array)
return self.upsert_column(name, array)

@classmethod
def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor":
Expand Down Expand Up @@ -453,7 +454,9 @@ def filter(self, predicate_expr: "Expr") -> "pyarrow.Table":
if self._table.num_rows == 0:
return self._table

from ray.data._expression_evaluator import eval_expr
from ray.data._internal.planner.plan_expression.expression_evaluator import (
eval_expr,
)

# Evaluate the expression to get a boolean mask
mask = eval_expr(predicate_expr, self._table)
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

import ray
from ray._private.arrow_utils import get_pyarrow_version
from ray.data._internal.arrow_block import ArrowBlockAccessor
from ray.data._internal.arrow_block import (
_BATCH_SIZE_PRESERVING_STUB_COL_NAME,
ArrowBlockAccessor,
)
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.util import (
Expand Down Expand Up @@ -104,9 +107,6 @@
PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS = 1024


_BATCH_SIZE_PRESERVING_STUB_COL_NAME = "__bsp_stub"


class _ParquetFragment:
"""This wrapper class is created to avoid utilizing `ParquetFileFragment` original
serialization protocol that actually does network RPCs during serialization
Expand Down
37 changes: 13 additions & 24 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.logical.operators.one_to_one_operator import AbstractOneToOne
from ray.data.block import UserDefinedFunction
from ray.data.expressions import Expr
from ray.data.expressions import Expr, StarColumnsExpr
from ray.data.preprocessor import Preprocessor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -268,16 +268,12 @@ def can_modify_num_rows(self) -> bool:


class Project(AbstractMap):
"""Logical operator for select_columns."""
"""Logical operator for select_columns, with_column, rename_columns."""

def __init__(
self,
input_op: LogicalOperator,
cols: Optional[List[str]] = None,
cols_rename: Optional[Dict[str, str]] = None,
exprs: Optional[
Dict[str, "Expr"]
] = None, # TODO Remove cols and cols_rename and replace them with corresponding exprs
exprs: list["Expr"],
compute: Optional[ComputeStrategy] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
Expand All @@ -288,30 +284,23 @@ def __init__(
compute=compute,
)
self._batch_size = None
self._cols = cols
self._cols_rename = cols_rename
self._exprs = exprs
self._batch_format = "pyarrow"
self._zero_copy_batch = True

if exprs is not None:
# Validate that all values are expressions
for name, expr in exprs.items():
if not isinstance(expr, Expr):
raise TypeError(
f"Expected Expr for column '{name}', got {type(expr)}"
)
for expr in self._exprs:
if expr.name is None and not isinstance(expr, StarColumnsExpr):
raise TypeError(
"All Project expressions must be named (use .alias(name) or col(name)), "
"or be a star() expression."
)

@property
def cols(self) -> Optional[List[str]]:
return self._cols

@property
def cols_rename(self) -> Optional[Dict[str, str]]:
return self._cols_rename
def has_all_columns_expr(self) -> bool:
"""Check if this projection contains a star() expression."""
return any(isinstance(expr, StarColumnsExpr) for expr in self._exprs)

@property
def exprs(self) -> Optional[Dict[str, "Expr"]]:
def exprs(self) -> List["Expr"]:
return self._exprs

def can_modify_num_rows(self) -> bool:
Expand Down
Loading