Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
FileShuffleConfig,
ReadTask,
RowBasedFileDatasink,
SaveMode,
)
from ray.data.iterator import DataIterator, DatasetIterator
from ray.data.preprocessor import Preprocessor
Expand Down Expand Up @@ -131,6 +132,7 @@
"NodeIdStr",
"ReadTask",
"RowBasedFileDatasink",
"SaveMode",
"Schema",
"SinkMode",
"TaskPoolStrategy",
Expand Down
320 changes: 256 additions & 64 deletions python/ray/data/_internal/datasource/iceberg_datasink.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import replace
from typing import Dict, List, TypeVar
from typing import TYPE_CHECKING, Dict, List, TypeVar

from ray.data.expressions import (
AliasExpr,
Expand All @@ -8,6 +8,7 @@
DownloadExpr,
Expr,
LiteralExpr,
Operation,
StarExpr,
UDFExpr,
UnaryExpr,
Expand All @@ -16,6 +17,9 @@

T = TypeVar("T")

if TYPE_CHECKING:
from pyiceberg.expressions import BooleanExpression


class _ExprVisitorBase(_ExprVisitor[None]):
"""Base visitor that provides automatic recursive traversal.
Expand Down Expand Up @@ -347,3 +351,136 @@ def visit_download(self, expr: "DownloadExpr") -> str:

def visit_star(self, expr: "StarExpr") -> str:
return self._make_tree_lines("COL(*)", expr=expr)


class _IcebergExpressionVisitor(_ExprVisitor["BooleanExpression"]):
"""
Visitor that converts Ray Data expressions to PyIceberg expressions.

This enables Ray Data users to write filters using the familiar col() syntax
while leveraging Iceberg's native filtering capabilities.

Example:
>>> from ray.data.expressions import col
>>> ray_expr = (col("date") >= "2024-01-01") & (col("status") == "active")
>>> iceberg_expr = _IcebergExpressionVisitor().visit(ray_expr)
>>> # iceberg_expr can now be used with PyIceberg's filter APIs
"""

def visit_column(self, expr: "ColumnExpr") -> "BooleanExpression":
"""Convert a column reference to an Iceberg reference."""
from pyiceberg.expressions import Reference

return Reference(expr.name)

def visit_literal(self, expr: "LiteralExpr") -> "BooleanExpression":
"""Convert a literal value to an Iceberg literal."""
from pyiceberg.expressions import literal

return literal(expr.value)

def visit_binary(self, expr: "BinaryExpr") -> "BooleanExpression":
"""Convert a binary operation to an Iceberg expression."""
from pyiceberg.expressions import (
And,
EqualTo,
GreaterThan,
GreaterThanOrEqual,
In,
LessThan,
LessThanOrEqual,
NotEqualTo,
NotIn,
Or,
)

# Handle IN/NOT_IN specially since they don't visit the right operand
# (the right operand is a list literal that can't be converted)
if expr.op == Operation.IN:
left = self.visit(expr.left)
# For IN operations, right should be a literal list
if isinstance(expr.right, LiteralExpr):
return In(left, expr.right.value)
else:
raise ValueError(
f"IN operation requires right operand to be a literal list, "
f"got {type(expr.right).__name__}"
)
elif expr.op == Operation.NOT_IN:
left = self.visit(expr.left)
if isinstance(expr.right, LiteralExpr):
return NotIn(left, expr.right.value)
else:
raise ValueError(
f"NOT_IN operation requires right operand to be a literal list, "
f"got {type(expr.right).__name__}"
)

# For all other operations, visit both operands
left = self.visit(expr.left)
right = self.visit(expr.right)

# Map Ray Data operations to Iceberg operations
operation_map = {
Operation.EQ: EqualTo,
Operation.NE: NotEqualTo,
Operation.GT: GreaterThan,
Operation.GE: GreaterThanOrEqual,
Operation.LT: LessThan,
Operation.LE: LessThanOrEqual,
Operation.AND: And,
Operation.OR: Or,
}

if expr.op in operation_map:
return operation_map[expr.op](left, right)
else:
# Arithmetic operations are not supported in filter expressions
raise ValueError(
f"Unsupported binary operation for Iceberg filters: {expr.op}. "
f"Iceberg filters support: EQ, NE, GT, GE, LT, LE, AND, OR, IN, NOT_IN. "
f"Arithmetic operations (ADD, SUB, MUL, DIV) cannot be used in filters."
)

def visit_unary(self, expr: "UnaryExpr") -> "BooleanExpression":
"""Convert a unary operation to an Iceberg expression."""
from pyiceberg.expressions import IsNull, Not, NotNull

operand = self.visit(expr.operand)

operation_map = {
Operation.IS_NULL: IsNull,
Operation.IS_NOT_NULL: NotNull,
Operation.NOT: Not,
}

if expr.op in operation_map:
return operation_map[expr.op](operand)
else:
raise ValueError(
f"Unsupported unary operation for Iceberg: {expr.op}. "
f"Supported operations: IS_NULL, IS_NOT_NULL, NOT"
)

def visit_alias(self, expr) -> "BooleanExpression":
"""Convert an aliased expression (just unwrap the alias)."""
return self.visit(expr.expr)

def visit_udf(self, expr) -> "BooleanExpression":
"""UDF expressions cannot be converted to Iceberg expressions."""
raise TypeError(
"UDF expressions cannot be converted to Iceberg expressions. "
"Iceberg filters must use simple column comparisons and boolean operations."
)

def visit_download(self, expr) -> "BooleanExpression":
"""Download expressions cannot be converted to Iceberg expressions."""
raise TypeError(
"Download expressions cannot be converted to Iceberg expressions."
)

def visit_star(self, expr) -> "BooleanExpression":
"""Star expressions cannot be converted to Iceberg expressions."""
raise TypeError(
"Star expressions cannot be converted to Iceberg filter expressions."
)
12 changes: 12 additions & 0 deletions python/ray/data/_internal/savemode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@

@PublicAPI(stability="alpha")
class SaveMode(str, Enum):
"""Enum of possible modes for saving/writing data.
Attributes:
APPEND: Add new data without modifying existing data.
OVERWRITE: Replace all existing data with new data.
IGNORE: Don't write if data already exists.
ERROR: Raise an error if data already exists.
UPSERT: Update existing rows that match on key fields, or insert new rows.
Requires identifier/key fields to be specified.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, use triple-quote-strings for each item.
This would work better with IDEs - you can hover an item to show its doc.


APPEND = "append"
OVERWRITE = "overwrite"
IGNORE = "ignore"
ERROR = "error"
UPSERT = "upsert"
71 changes: 65 additions & 6 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4024,46 +4024,105 @@ def write_iceberg(
table_identifier: str,
catalog_kwargs: Optional[Dict[str, Any]] = None,
snapshot_properties: Optional[Dict[str, str]] = None,
mode: "SaveMode" = SaveMode.APPEND,
identifier_fields: Optional[List[str]] = None,
overwrite_filter: Optional["Expr"] = None,
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
) -> None:
"""Writes the :class:`~ray.data.Dataset` to an Iceberg table.

.. tip::
For more details on PyIceberg, see
- URI: https://py.iceberg.apache.org/
For more details on PyIceberg, see https://py.iceberg.apache.org/

Examples:
.. testcode::
:skipif: True

import ray
import pandas as pd
docs = [{"title": "Iceberg data sink test"} for key in range(4)]
from ray.data import SaveMode
from ray.data.expressions import col

# Basic append (current behavior)
docs = [{"id": i, "title": f"Doc {i}"} for i in range(4)]
ds = ray.data.from_pandas(pd.DataFrame(docs))
ds.write_iceberg(
table_identifier="db_name.table_name",
catalog_kwargs={"name": "default", "type": "sql"}
)

# Upsert mode - update existing rows or insert new ones
updated_docs = [{"id": 2, "title": "Updated Doc 2"}, {"id": 5, "title": "New Doc 5"}]
ds_updates = ray.data.from_pandas(pd.DataFrame(updated_docs))
ds_updates.write_iceberg(
table_identifier="db_name.table_name",
catalog_kwargs={"name": "default", "type": "sql"},
mode=SaveMode.UPSERT,
identifier_fields=["id"]
)

# Schema evolution is automatic - new columns are added automatically
enriched_docs = [{"id": i, "title": f"Doc {i}", "category": "new"} for i in range(3)]
ds_enriched = ray.data.from_pandas(pd.DataFrame(enriched_docs))
ds_enriched.write_iceberg(
table_identifier="db_name.table_name",
catalog_kwargs={"name": "default", "type": "sql"}
)

# Partial overwrite with Ray Data expressions
ds.write_iceberg(
table_identifier="events.user_activity",
catalog_kwargs={"name": "default", "type": "rest"},
mode=SaveMode.OVERWRITE,
overwrite_filter=col("date") >= "2024-10-28"
)

Args:
table_identifier: Fully qualified table identifier (``db_name.table_name``)
catalog_kwargs: Optional arguments to pass to PyIceberg's catalog.load_catalog()
function (e.g., name, type, etc.). For the function definition, see
function (such as name, type). For the function definition, see
`pyiceberg catalog
<https://py.iceberg.apache.org/reference/pyiceberg/catalog/\
#pyiceberg.catalog.load_catalog>`_.
snapshot_properties: custom properties write to snapshot when committing
snapshot_properties: Custom properties to write to snapshot when committing
to an iceberg table.
mode: Write mode using SaveMode enum. Options:

* SaveMode.APPEND (default): Add new data to the table without checking for duplicates.
* SaveMode.UPSERT: Update existing rows that match on identifier_fields, or insert
new rows if they don't exist in the table.
* SaveMode.OVERWRITE: Replace all existing data in the table with new data, or replace
data matching overwrite_filter if specified.

identifier_fields: List of column names to use as unique identifiers for upsert
operations. Required when mode is SaveMode.UPSERT. The system uses these columns
to determine which rows to update versus insert.
overwrite_filter: Optional filter for OVERWRITE mode to perform partial overwrites.
Must be a Ray Data expression from `ray.data.expressions`. Only rows matching
this filter are replaced. If None with OVERWRITE mode, replaces all table data.
Example: `col("date") >= "2024-01-01"` or `(col("region") == "US") & (col("status") == "active")`
ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run. By default, concurrency is dynamically
decided based on the available resources.

Note:
Schema evolution is automatically enabled. New columns in the incoming data
are automatically added to the table schema.

Raises:
ValueError: If mode is SaveMode.UPSERT but identifier_fields isn't provided.
"""

datasink = IcebergDatasink(
table_identifier, catalog_kwargs, snapshot_properties
table_identifier=table_identifier,
catalog_kwargs=catalog_kwargs,
snapshot_properties=snapshot_properties,
mode=mode,
identifier_fields=identifier_fields,
overwrite_filter=overwrite_filter,
)

self.write_datasink(
Expand Down
26 changes: 26 additions & 0 deletions python/ray/data/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Callable, Dict, Generic, List, TypeVar, Union

import pyarrow
from pyiceberg.expressions import BooleanExpression

from ray.data.block import BatchColumn
from ray.data.datatype import DataType
Expand Down Expand Up @@ -242,6 +243,31 @@ def to_pyarrow(self) -> "pyarrow.compute.Expression":
"""
return _PyArrowExpressionVisitor().visit(self)

def to_iceberg(self) -> "BooleanExpression":
"""
Convert a Ray Data expression to a PyIceberg expression.

Returns:
A PyIceberg BooleanExpression that can be used with PyIceberg's filter APIs

Raises:
ValueError: If the expression contains operations not supported by Iceberg
TypeError: If the expression type cannot be converted to Iceberg

Example:
>>> from ray.data.expressions import col
>>> ray_expr = (col("age") >= 18) & (col("status") == "active")
>>> iceberg_expr = ray_expr.to_iceberg()
>>> # Use with PyIceberg
>>> table.overwrite(df=data, overwrite_filter=iceberg_expr)
"""
from ray.data._internal.planner.plan_expression.expression_visitors import (
_IcebergExpressionVisitor,
)

visitor = _IcebergExpressionVisitor()
return visitor.visit(self)

def __repr__(self) -> str:
"""Return a tree-structured string representation of the expression.

Expand Down
Loading