Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
fb4cb18
[Data][1/n] - Use visitor pattern for Ray Data Expressions
goutamvenkat-anyscale Sep 29, 2025
f99ae1c
Fix projection pushdown
goutamvenkat-anyscale Oct 1, 2025
275d919
Fix count
goutamvenkat-anyscale Oct 1, 2025
4c273e8
gemini comments
goutamvenkat-anyscale Oct 1, 2025
7aed3af
Clean up
goutamvenkat-anyscale Oct 1, 2025
40f724e
Merge from master
goutamvenkat-anyscale Oct 1, 2025
c879ad6
Fix filter + cleanup
goutamvenkat-anyscale Oct 1, 2025
c140ad7
Move test to fusion + fix projection fusion tests
goutamvenkat-anyscale Oct 1, 2025
f35cfd0
Fix operator fusion
goutamvenkat-anyscale Oct 1, 2025
30a7ab5
Fix parquet tests
goutamvenkat-anyscale Oct 1, 2025
6a626e8
Spelling mistake
goutamvenkat-anyscale Oct 1, 2025
f1b5d97
one more cleanup
goutamvenkat-anyscale Oct 1, 2025
73cb01b
Handle invalid orders in projection pushdown
goutamvenkat-anyscale Oct 1, 2025
dbf609a
Simplify and use pairwise fusion
goutamvenkat-anyscale Oct 1, 2025
73115b2
Merge branch 'master' into goutam/visitor_expr
goutamvenkat-anyscale Oct 2, 2025
7120d7f
Address some comments
goutamvenkat-anyscale Oct 2, 2025
4ec2d14
Address comments
goutamvenkat-anyscale Oct 2, 2025
15bbe1e
One more cleanup
goutamvenkat-anyscale Oct 2, 2025
762f27f
Test fix
goutamvenkat-anyscale Oct 2, 2025
a142194
Merge branch 'master' into goutam/visitor_expr
goutamvenkat-anyscale Oct 3, 2025
f488532
Refactor and add some clarifying comments
goutamvenkat-anyscale Oct 5, 2025
5ac304c
Fix one class of bugs
goutamvenkat-anyscale Oct 7, 2025
282c3d5
Merge from master
goutamvenkat-anyscale Oct 7, 2025
eb52554
remove old test suite
goutamvenkat-anyscale Oct 7, 2025
ebe2441
Fix download test + comments
goutamvenkat-anyscale Oct 8, 2025
5f814f9
Add all expression
goutamvenkat-anyscale Oct 8, 2025
fb9d380
doc fix
goutamvenkat-anyscale Oct 8, 2025
f46a40f
Fix test + api_policy_check
goutamvenkat-anyscale Oct 8, 2025
cd84a20
merge from master
goutamvenkat-anyscale Oct 8, 2025
8867156
Merge branch 'master' into goutam/visitor_expr
goutamvenkat-anyscale Oct 13, 2025
c675dd6
Address comments
goutamvenkat-anyscale Oct 14, 2025
aa2a9b5
doc fix
goutamvenkat-anyscale Oct 14, 2025
29feb0f
zero_div error
goutamvenkat-anyscale Oct 14, 2025
638b9c6
Fix test
goutamvenkat-anyscale Oct 14, 2025
62f354b
Add stub column back and remove arrow block change
goutamvenkat-anyscale Oct 14, 2025
b869cfe
use visitor in pushdown
goutamvenkat-anyscale Oct 15, 2025
3df14b3
Merge branch 'master' into goutam/visitor_expr
goutamvenkat-anyscale Oct 15, 2025
6ded04a
one more visitor
goutamvenkat-anyscale Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 0 additions & 183 deletions python/ray/data/_expression_evaluator.py

This file was deleted.

29 changes: 24 additions & 5 deletions python/ray/data/_internal/arrow_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ def _table_from_pydict(columns: Dict[str, List[Any]]) -> Block:
@staticmethod
def _combine_tables(tables: List[Block]) -> Block:
if len(tables) > 1:
# Check if we have 0-column tables to avoid losing rows during concat
# PyArrow's concat on 0-column tables returns a 0-row table
if all(table.num_columns == 0 for table in tables):
# Add stub column to preserve rows during concatenation
import pyarrow as pa

tables_with_stub = []
for table in tables:
if table.num_rows > 0:
table = table.append_column(
"__concat_stub", pa.nulls(table.num_rows)
)
tables_with_stub.append(table)
result = transform_pyarrow.concat(tables_with_stub, promote_types=True)
# Remove stub column after concatenation
if result.num_columns > 0:
result = result.select([])
return result
return transform_pyarrow.concat(tables, promote_types=True)
else:
return tables[0]
Expand Down Expand Up @@ -221,7 +239,7 @@ def fill_column(self, name: str, value: Any) -> Block:

array = pyarrow.nulls(len(self._table), type=type)
array = pc.fill_null(array, value)
return self._table.append_column(name, array)
return self.upsert_column(name, array)

@classmethod
def from_bytes(cls, data: bytes) -> "ArrowBlockAccessor":
Expand Down Expand Up @@ -321,9 +339,8 @@ def to_arrow(self) -> "pyarrow.Table":
return self._table

def num_rows(self) -> int:
# Arrow may represent an empty table via an N > 0 row, 0-column table, e.g. when
# slicing an empty table, so we return 0 if num_columns == 0.
return self._table.num_rows if self._table.num_columns > 0 else 0
# Arrow may represent an empty table via an N > 0 row, 0-column table
return self._table.num_rows
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Zero-Column Table Row Count Error

The ArrowBlockAccessor.num_rows() method now unconditionally returns self._table.num_rows, allowing 0-column tables to report a non-zero row count. This contradicts the existing comment and can break downstream logic expecting 0-column tables to have 0 rows. The removal of a slicing assertion also removes a critical check for data integrity.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to change this in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change, count() operation fails. So in count we call Project(exprs=[]) which is an empty selection that results in an arrow table of k rows but 0 columns. Without this change, num_rows() would return 0 which is wrong.


def size_bytes(self) -> int:
return self._table.nbytes
Expand Down Expand Up @@ -469,7 +486,9 @@ def filter(self, predicate_expr: "Expr") -> "pyarrow.Table":
if self._table.num_rows == 0:
return self._table

from ray.data._expression_evaluator import eval_expr
from ray.data._internal.planner.plan_expression.expression_evaluator import (
eval_expr,
)

# Evaluate the expression to get a boolean mask
mask = eval_expr(predicate_expr, self._table)
Expand Down
36 changes: 14 additions & 22 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,13 @@ def can_modify_num_rows(self) -> bool:


class Project(AbstractMap):
"""Logical operator for select_columns."""
"""Logical operator for select_columns, with_column, rename_columns."""

def __init__(
self,
input_op: LogicalOperator,
cols: Optional[List[str]] = None,
cols_rename: Optional[Dict[str, str]] = None,
exprs: Optional[
Dict[str, "Expr"]
] = None, # TODO Remove cols and cols_rename and replace them with corresponding exprs
exprs: List["Expr"],
preserve_existing: bool = False,
compute: Optional[ComputeStrategy] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
Expand All @@ -288,30 +285,25 @@ def __init__(
compute=compute,
)
self._batch_size = None
self._cols = cols
self._cols_rename = cols_rename
self._exprs = exprs
self._batch_format = "pyarrow"
self._zero_copy_batch = True
self._preserve_existing = preserve_existing

if exprs is not None:
# Validate that all values are expressions
for name, expr in exprs.items():
if not isinstance(expr, Expr):
raise TypeError(
f"Expected Expr for column '{name}', got {type(expr)}"
)
for expr in self._exprs:
if not isinstance(expr, Expr):
raise TypeError(f"Expected Expr got {expr} with type: {type(expr)}")
if expr.name is None:
raise TypeError(
"All Project expressions must be named; use .alias(name) or col(name)."
)

@property
def cols(self) -> Optional[List[str]]:
return self._cols
def preserve_existing(self) -> bool:
return self._preserve_existing

@property
def cols_rename(self) -> Optional[Dict[str, str]]:
return self._cols_rename

@property
def exprs(self) -> Optional[Dict[str, "Expr"]]:
def exprs(self) -> List["Expr"]:
return self._exprs

def can_modify_num_rows(self) -> bool:
Expand Down
Loading