diff --git a/ci/lint/pydoclint-baseline.txt b/ci/lint/pydoclint-baseline.txt index e26015af166b..fa7ba88f5314 100644 --- a/ci/lint/pydoclint-baseline.txt +++ b/ci/lint/pydoclint-baseline.txt @@ -1018,10 +1018,6 @@ python/ray/data/_internal/block_batching/util.py DOC402: Function `finalize_batches` has "yield" statements, but the docstring does not have a "Yields" section DOC404: Function `finalize_batches` yield type(s) in docstring not consistent with the return annotation. Return annotation exists, but docstring "yields" section does not exist or has 0 type(s). -------------------- -python/ray/data/_internal/datasource/iceberg_datasink.py - DOC102: Method `IcebergDatasink.__init__`: Docstring contains more arguments than in function signature. - DOC103: Method `IcebergDatasink.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the docstring but not in the function signature: [to an iceberg table, e.g. {"commit_time": ]. --------------------- python/ray/data/_internal/datasource/lance_datasink.py DOC101: Method `LanceDatasink.__init__`: Docstring contains fewer arguments than in function signature. DOC103: Method `LanceDatasink.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**kwargs: , *args: , max_rows_per_file: int, min_rows_per_file: int, mode: Literal['create', 'append', 'overwrite'], schema: Optional[pa.Schema], storage_options: Optional[Dict[str, Any]], uri: str]. Arguments in the docstring but not in the function signature: [max_rows_per_file : , min_rows_per_file : , mode : , schema : , storage_options : , uri : ]. diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index b0a7a188e053..88a188faf8ac 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -22,6 +22,7 @@ FileShuffleConfig, ReadTask, RowBasedFileDatasink, + SaveMode, ) from ray.data.iterator import DataIterator, DatasetIterator from ray.data.preprocessor import Preprocessor @@ -131,6 +132,7 @@ "NodeIdStr", "ReadTask", "RowBasedFileDatasink", + "SaveMode", "Schema", "SinkMode", "TaskPoolStrategy", diff --git a/python/ray/data/_internal/datasource/iceberg_datasink.py b/python/ray/data/_internal/datasource/iceberg_datasink.py index 1e04b12279fe..8fb43c4912d9 100644 --- a/python/ray/data/_internal/datasource/iceberg_datasink.py +++ b/python/ray/data/_internal/datasource/iceberg_datasink.py @@ -2,26 +2,27 @@ Module to write a Ray Dataset into an iceberg table, by using the Ray Datasink API. """ import logging -import uuid from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional -from packaging import version - from ray.data._internal.execution.interfaces import TaskContext +from ray.data._internal.savemode import SaveMode from ray.data.block import Block, BlockAccessor from ray.data.datasource.datasink import Datasink, WriteResult from ray.util.annotations import DeveloperAPI if TYPE_CHECKING: + import pyarrow as pa from pyiceberg.catalog import Catalog - from pyiceberg.manifest import DataFile + from pyiceberg.table import Table + + from ray.data.expressions import Expr logger = logging.getLogger(__name__) @DeveloperAPI -class IcebergDatasink(Datasink[List["DataFile"]]): +class IcebergDatasink(Datasink[List["pa.Table"]]): """ Iceberg datasink to write a Ray Dataset into an existing Iceberg table. This module heavily uses PyIceberg to write to iceberg table. All the routines in this class override @@ -34,137 +35,223 @@ def __init__( table_identifier: str, catalog_kwargs: Optional[Dict[str, Any]] = None, snapshot_properties: Optional[Dict[str, str]] = None, + mode: SaveMode = SaveMode.APPEND, + overwrite_filter: Optional["Expr"] = None, + upsert_kwargs: Optional[Dict[str, Any]] = None, + overwrite_kwargs: Optional[Dict[str, Any]] = None, ): """ Initialize the IcebergDatasink Args: - table_identifier: The identifier of the table to read e.g. `default.taxi_dataset` + table_identifier: The identifier of the table to read such as `default.taxi_dataset` catalog_kwargs: Optional arguments to use when setting up the Iceberg catalog - snapshot_properties: custom properties write to snapshot when committing - to an iceberg table, e.g. {"commit_time": "2021-01-01T00:00:00Z"} + snapshot_properties: Custom properties to write to snapshot summary, such as commit metadata + mode: Write mode - APPEND, UPSERT, or OVERWRITE. Defaults to APPEND. + - APPEND: Add new data without checking for duplicates + - UPSERT: Update existing rows or insert new ones based on a join condition + - OVERWRITE: Replace table data (all data or filtered subset) + overwrite_filter: Optional filter for OVERWRITE mode to perform partial overwrites. + Must be a Ray Data expression from `ray.data.expressions`. Only rows matching + this filter are replaced. If None with OVERWRITE mode, replaces all table data. + upsert_kwargs: Optional arguments to pass through to PyIceberg's table.upsert() + method. Supported parameters include join_cols (List[str]), + when_matched_update_all (bool), when_not_matched_insert_all (bool), + case_sensitive (bool), branch (str). See PyIceberg documentation for details. + overwrite_kwargs: Optional arguments to pass through to PyIceberg's table.overwrite() + method. Supported parameters include case_sensitive (bool) and branch (str). + See PyIceberg documentation for details. + + Note: + Schema evolution is automatically enabled. New columns in the incoming data + are automatically added to the table schema. """ - from pyiceberg.io import FileIO - from pyiceberg.table import Transaction - from pyiceberg.table.metadata import TableMetadata - self.table_identifier = table_identifier - self._catalog_kwargs = catalog_kwargs if catalog_kwargs is not None else {} - self._snapshot_properties = ( - snapshot_properties if snapshot_properties is not None else {} - ) + self._catalog_kwargs = (catalog_kwargs or {}).copy() + self._snapshot_properties = snapshot_properties or {} + self._mode = mode + self._overwrite_filter = overwrite_filter + self._upsert_kwargs = (upsert_kwargs or {}).copy() + self._overwrite_kwargs = (overwrite_kwargs or {}).copy() + + # Validate kwargs are only set for relevant modes + if self._upsert_kwargs and self._mode != SaveMode.UPSERT: + raise ValueError( + f"upsert_kwargs can only be specified when mode is SaveMode.UPSERT, " + f"but mode is {self._mode}" + ) + if self._overwrite_kwargs and self._mode != SaveMode.OVERWRITE: + raise ValueError( + f"overwrite_kwargs can only be specified when mode is SaveMode.OVERWRITE, " + f"but mode is {self._mode}" + ) if "name" in self._catalog_kwargs: self._catalog_name = self._catalog_kwargs.pop("name") else: self._catalog_name = "default" - self._uuid: str = None - self._io: FileIO = None - self._txn: Transaction = None - self._table_metadata: TableMetadata = None + self._table: "Table" = None - # Since iceberg transaction is not pickle-able, because of the table and catalog properties - # we need to exclude the transaction object during serialization and deserialization during pickle + # Since iceberg table is not pickle-able, we need to exclude it during serialization def __getstate__(self) -> dict: - """Exclude `_txn` during pickling.""" + """Exclude `_table` during pickling.""" state = self.__dict__.copy() - del state["_txn"] + state.pop("_table", None) return state def __setstate__(self, state: dict) -> None: self.__dict__.update(state) - self._txn = None + self._table = None def _get_catalog(self) -> "Catalog": from pyiceberg import catalog return catalog.load_catalog(self._catalog_name, **self._catalog_kwargs) - def on_write_start(self) -> None: - """Prepare for the transaction""" - import pyiceberg - from pyiceberg.table import TableProperties + def _update_schema(self, incoming_schema: "pa.Schema") -> None: + """ + Update the table schema to accommodate incoming data using union-by-name semantics. - if version.parse(pyiceberg.__version__) >= version.parse("0.9.0"): - from pyiceberg.utils.properties import property_as_bool - else: - from pyiceberg.table import PropertyUtil + This automatically handles: + - Adding new columns from the incoming schema + - Type promotion (e.g., int32 -> int64) where compatible + - Preserving existing columns not in the incoming schema - property_as_bool = PropertyUtil.property_as_bool + Args: + incoming_schema: The PyArrow schema from the incoming data + """ + # Use PyIceberg's update_schema API + with self._table.update_schema() as update: + update.union_by_name(incoming_schema) + # Reload table completely after schema evolution catalog = self._get_catalog() - table = catalog.load_table(self.table_identifier) - self._txn = table.transaction() - self._io = self._txn._table.io - self._table_metadata = self._txn.table_metadata - self._uuid = uuid.uuid4() - - if unsupported_partitions := [ - field - for field in self._table_metadata.spec().fields - if not field.transform.supports_pyarrow_transform - ]: - raise ValueError( - f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}." - ) + self._table = catalog.load_table(self.table_identifier) - self._manifest_merge_enabled = property_as_bool( - self._table_metadata.properties, - TableProperties.MANIFEST_MERGE_ENABLED, - TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, - ) + def on_write_start(self) -> None: + """Initialize table for writing.""" + catalog = self._get_catalog() + self._table = catalog.load_table(self.table_identifier) - def write( - self, blocks: Iterable[Block], ctx: TaskContext - ) -> WriteResult[List["DataFile"]]: - from pyiceberg.io.pyarrow import ( - _check_pyarrow_schema_compatible, - _dataframe_to_data_files, - ) - from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE - from pyiceberg.utils.config import Config + def _collect_tables_from_blocks(self, blocks: Iterable[Block]) -> List["pa.Table"]: + """Collect PyArrow tables from blocks.""" + collected_tables = [] - data_files_list: WriteResult[List["DataFile"]] = [] for block in blocks: pa_table = BlockAccessor.for_block(block).to_arrow() - downcast_ns_timestamp_to_us = ( - Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False - ) - _check_pyarrow_schema_compatible( - self._table_metadata.schema(), - provided_schema=pa_table.schema, - downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - ) + if pa_table.num_rows > 0: + collected_tables.append(pa_table) - if pa_table.shape[0] <= 0: - continue + return collected_tables - task_uuid = uuid.uuid4() - data_files = _dataframe_to_data_files( - self._table_metadata, pa_table, self._io, task_uuid - ) - data_files_list.extend(data_files) + def write(self, blocks: Iterable[Block], ctx: TaskContext) -> List["pa.Table"]: + """Collect blocks as PyArrow tables for all write modes.""" + return self._collect_tables_from_blocks(blocks) + + def _collect_and_concat_tables( + self, write_result: WriteResult[List["pa.Table"]] + ) -> Optional["pa.Table"]: + """Collect and concatenate all PyArrow tables from write results.""" + import pyarrow as pa + + all_tables = [] + for tables_batch in write_result.write_returns: + all_tables.extend(tables_batch) - return data_files_list + if not all_tables: + logger.warning("No data to write") + return None - def on_write_complete(self, write_result: WriteResult[List["DataFile"]]): - update_snapshot = self._txn.update_snapshot( - snapshot_properties=self._snapshot_properties + return pa.concat_tables(all_tables) + + def _complete_append(self, combined_table: "pa.Table") -> None: + """Complete APPEND mode write using PyIceberg's append API.""" + self._table.append( + df=combined_table, + snapshot_properties=self._snapshot_properties, ) - append_method = ( - update_snapshot.merge_append - if self._manifest_merge_enabled - else update_snapshot.fast_append + logger.info( + f"Appended {combined_table.num_rows} rows to {self.table_identifier}" ) - with append_method() as append_files: - append_files.commit_uuid = self._uuid - for data_files in write_result.write_returns: - for data_file in data_files: - append_files.append_data_file(data_file) + def _complete_upsert(self, combined_table: "pa.Table") -> None: + """Complete UPSERT mode write using PyIceberg's upsert API.""" + self._table.upsert(df=combined_table, **self._upsert_kwargs) - self._txn.commit_transaction() + join_cols = self._upsert_kwargs.get("join_cols") + if join_cols: + logger.info( + f"Upserted {combined_table.num_rows} rows to {self.table_identifier} " + f"using join columns: {join_cols}" + ) + else: + logger.info( + f"Upserted {combined_table.num_rows} rows to {self.table_identifier} " + f"using table-defined identifier-field-ids" + ) + + def _complete_overwrite(self, combined_table: "pa.Table") -> None: + """Complete OVERWRITE mode write using PyIceberg's overwrite API.""" + # Warn if user passed overwrite_filter via overwrite_kwargs + if "overwrite_filter" in self._overwrite_kwargs: + self._overwrite_kwargs.pop("overwrite_filter") + logger.warning( + "Use Ray Data's Expressions for overwrite filter instead of passing " + "it via PyIceberg's overwrite_filter parameter" + ) + + if self._overwrite_filter: + # Partial overwrite with filter + from ray.data._internal.datasource.iceberg_datasource import ( + _IcebergExpressionVisitor, + ) + + iceberg_filter = _IcebergExpressionVisitor().visit(self._overwrite_filter) + self._table.overwrite( + df=combined_table, + overwrite_filter=iceberg_filter, + snapshot_properties=self._snapshot_properties, + **self._overwrite_kwargs, + ) + logger.info( + f"Overwrote {combined_table.num_rows} rows in {self.table_identifier} " + f"matching filter: {self._overwrite_filter}" + ) + else: + # Full table overwrite + self._table.overwrite( + df=combined_table, + snapshot_properties=self._snapshot_properties, + **self._overwrite_kwargs, + ) + logger.info( + f"Overwrote entire table {self.table_identifier} " + f"with {combined_table.num_rows} rows" + ) + + def on_write_complete(self, write_result: WriteResult[List["pa.Table"]]) -> None: + """Complete the write operation based on the configured mode.""" + # Collect and concatenate all PyArrow tables + combined_table = self._collect_and_concat_tables(write_result) + if combined_table is None: + return + + # Apply schema evolution for all modes (PyIceberg doesn't handle this automatically) + self._update_schema(combined_table.schema) + + # Execute the appropriate write operation + if self._mode == SaveMode.APPEND: + self._complete_append(combined_table) + elif self._mode == SaveMode.UPSERT: + self._complete_upsert(combined_table) + elif self._mode == SaveMode.OVERWRITE: + self._complete_overwrite(combined_table) + else: + raise ValueError( + f"Unsupported write mode: {self._mode}. " + f"Supported modes are: APPEND, UPSERT, OVERWRITE" + ) diff --git a/python/ray/data/_internal/savemode.py b/python/ray/data/_internal/savemode.py index 8317904e420f..355b8b9fa8b2 100644 --- a/python/ray/data/_internal/savemode.py +++ b/python/ray/data/_internal/savemode.py @@ -5,7 +5,21 @@ @PublicAPI(stability="alpha") class SaveMode(str, Enum): + """Enum of possible modes for saving/writing data.""" + APPEND = "append" + """Add new data without modifying existing data.""" + OVERWRITE = "overwrite" + """Replace all existing data with new data.""" + IGNORE = "ignore" + """Don't write if data already exists.""" + ERROR = "error" + """Raise an error if data already exists.""" + + UPSERT = "upsert" + """Update existing rows that match on key fields, or insert new rows. + Requires identifier/key fields to be specified. + """ diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index f6b630c34d09..5a6a5156251a 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4034,14 +4034,17 @@ def write_iceberg( table_identifier: str, catalog_kwargs: Optional[Dict[str, Any]] = None, snapshot_properties: Optional[Dict[str, str]] = None, + mode: "SaveMode" = SaveMode.APPEND, + overwrite_filter: Optional["Expr"] = None, + upsert_kwargs: Optional[Dict[str, Any]] = None, + overwrite_kwargs: Optional[Dict[str, Any]] = None, ray_remote_args: Dict[str, Any] = None, concurrency: Optional[int] = None, ) -> None: """Writes the :class:`~ray.data.Dataset` to an Iceberg table. .. tip:: - For more details on PyIceberg, see - - URI: https://py.iceberg.apache.org/ + For more details on PyIceberg, see https://py.iceberg.apache.org/ Examples: .. testcode:: @@ -4049,31 +4052,92 @@ def write_iceberg( import ray import pandas as pd - docs = [{"title": "Iceberg data sink test"} for key in range(4)] + from ray.data import SaveMode + from ray.data.expressions import col + + # Basic append (current behavior) + docs = [{"id": i, "title": f"Doc {i}"} for i in range(4)] ds = ray.data.from_pandas(pd.DataFrame(docs)) ds.write_iceberg( table_identifier="db_name.table_name", catalog_kwargs={"name": "default", "type": "sql"} ) + # Upsert mode - update existing rows or insert new ones + updated_docs = [{"id": 2, "title": "Updated Doc 2"}, {"id": 5, "title": "New Doc 5"}] + ds_updates = ray.data.from_pandas(pd.DataFrame(updated_docs)) + ds_updates.write_iceberg( + table_identifier="db_name.table_name", + catalog_kwargs={"name": "default", "type": "sql"}, + mode=SaveMode.UPSERT, + upsert_kwargs={"join_cols": ["id"]}, + ) + + # Schema evolution is automatic - new columns are added automatically + enriched_docs = [{"id": i, "title": f"Doc {i}", "category": "new"} for i in range(3)] + ds_enriched = ray.data.from_pandas(pd.DataFrame(enriched_docs)) + ds_enriched.write_iceberg( + table_identifier="db_name.table_name", + catalog_kwargs={"name": "default", "type": "sql"} + ) + + # Partial overwrite with Ray Data expressions + ds.write_iceberg( + table_identifier="events.user_activity", + catalog_kwargs={"name": "default", "type": "rest"}, + mode=SaveMode.OVERWRITE, + overwrite_filter=col("date") >= "2024-10-28" + ) + Args: table_identifier: Fully qualified table identifier (``db_name.table_name``) catalog_kwargs: Optional arguments to pass to PyIceberg's catalog.load_catalog() - function (e.g., name, type, etc.). For the function definition, see + function (such as name, type). For the function definition, see `pyiceberg catalog `_. - snapshot_properties: custom properties write to snapshot when committing + snapshot_properties: Custom properties to write to snapshot when committing to an iceberg table. + mode: Write mode using SaveMode enum. Options: + + * SaveMode.APPEND (default): Add new data to the table without checking for duplicates. + * SaveMode.UPSERT: Update existing rows that match on the join condition (``join_cols`` in ``upsert_kwargs``), + or insert new rows if they don't exist in the table. + * SaveMode.OVERWRITE: Replace all existing data in the table with new data, or replace + data matching overwrite_filter if specified. + + overwrite_filter: Optional filter for OVERWRITE mode to perform partial overwrites. + Must be a Ray Data expression from `ray.data.expressions`. Only rows matching + this filter are replaced. If None with OVERWRITE mode, replaces all table data. + Example: `col("date") >= "2024-01-01"` or `(col("region") == "US") & (col("status") == "active")` + upsert_kwargs: Optional arguments to pass through to PyIceberg's table.upsert() method. + Supported parameters: join_cols (List[str]), when_matched_update_all (bool), when_not_matched_insert_all (bool), + case_sensitive (bool), branch (str). See PyIceberg documentation for details. + overwrite_kwargs: Optional arguments to pass through to PyIceberg's table.overwrite() method. + Supported parameters: case_sensitive (bool), branch (str). See PyIceberg documentation + for details. ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks. concurrency: The maximum number of Ray tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn't change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources. + + Note: + Schema evolution is automatically enabled. New columns in the incoming data + are automatically added to the table schema. + + Raises: + ValueError: If `mode` is `SaveMode.UPSERT`, `join_cols` is not provided in `upsert_kwargs`, and the table has no identifier fields. """ datasink = IcebergDatasink( - table_identifier, catalog_kwargs, snapshot_properties + table_identifier=table_identifier, + catalog_kwargs=catalog_kwargs, + snapshot_properties=snapshot_properties, + mode=mode, + overwrite_filter=overwrite_filter, + upsert_kwargs=upsert_kwargs, + overwrite_kwargs=overwrite_kwargs, ) self.write_datasink( diff --git a/python/ray/data/tests/test_iceberg.py b/python/ray/data/tests/test_iceberg.py index bd0be727653a..b7d203d017d8 100644 --- a/python/ray/data/tests/test_iceberg.py +++ b/python/ray/data/tests/test_iceberg.py @@ -1,5 +1,6 @@ import os import random +from typing import Any, Dict, Generator, List, Optional, Tuple, Type, Union import numpy as np import pandas as pd @@ -12,8 +13,10 @@ schema as pyi_schema, types as pyi_types, ) +from pyiceberg.catalog import Catalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.table import Table from pyiceberg.transforms import IdentityTransform import ray @@ -23,6 +26,7 @@ from ray.data._internal.logical.interfaces import LogicalPlan from ray.data._internal.logical.operators.map_operator import Filter, Project from ray.data._internal.logical.optimizers import LogicalOptimizer +from ray.data._internal.savemode import SaveMode from ray.data._internal.util import rows_same from ray.data.expressions import col @@ -218,9 +222,8 @@ def test_read_basic(): ) table: pa.Table = pa.concat_tables((ray.get(ref) for ref in ray_ds.to_arrow_refs())) - # string -> large_string because pyiceberg by default chooses large_string expected_schema = pa.schema( - [pa.field("col_a", pa.int32()), pa.field("col_b", pa.large_string())] + [pa.field("col_a", pa.int32()), pa.field("col_b", pa.string())] ) assert table.schema.equals(expected_schema) @@ -744,6 +747,611 @@ def test_predicate_pushdown_complex_expression(): assert rows_same(result, expected_table) +# Helper functions and fixtures for write mode tests +@pytest.fixture +def clean_table() -> Generator[Tuple[Catalog, Table], None, None]: + """Pytest fixture to get a clean Iceberg table by deleting all data.""" + sql_catalog = pyi_catalog.load_catalog(**_CATALOG_KWARGS) + table = sql_catalog.load_table(f"{_DB_NAME}.{_TABLE_NAME}") + table.delete() + yield sql_catalog, table + + +def _create_typed_dataframe(data_dict: Dict[str, List[Any]]) -> pd.DataFrame: + """Create a pandas DataFrame with proper int32 dtypes for col_a and col_c.""" + df = pd.DataFrame(data_dict) + if "col_a" in df.columns: + df["col_a"] = df["col_a"].astype(np.int32) + if "col_c" in df.columns: + df["col_c"] = df["col_c"].astype(np.int32) + return df + + +def _write_to_iceberg( + df: pd.DataFrame, mode: Optional[SaveMode] = None, **kwargs: Any +) -> None: + """Write a DataFrame to the test Iceberg table.""" + ds = ray.data.from_pandas(df) + write_kwargs: Dict[str, Any] = { + "table_identifier": f"{_DB_NAME}.{_TABLE_NAME}", + "catalog_kwargs": _CATALOG_KWARGS.copy(), + } + if mode is not None: + write_kwargs["mode"] = mode + write_kwargs.update(kwargs) + ds.write_iceberg(**write_kwargs) + + +def _read_from_iceberg(sort_by: Optional[Union[str, List[str]]] = None) -> pd.DataFrame: + """Read data from the test Iceberg table and optionally sort.""" + ds = ray.data.read_iceberg( + table_identifier=f"{_DB_NAME}.{_TABLE_NAME}", + catalog_kwargs=_CATALOG_KWARGS.copy(), + ) + result_df = ds.to_pandas() + if sort_by: + result_df = result_df.sort_values(sort_by).reset_index(drop=True) + return result_df + + +def _verify_schema(expected_fields: Dict[str, Type[pyi_types.IcebergType]]) -> None: + """ + Verify the Iceberg table schema matches expected fields. + + Args: + expected_fields: Dict mapping field names to PyIceberg type classes + e.g., {"col_a": pyi_types.IntegerType, "col_b": pyi_types.StringType} + """ + sql_catalog = pyi_catalog.load_catalog(**_CATALOG_KWARGS) + table = sql_catalog.load_table(f"{_DB_NAME}.{_TABLE_NAME}") + schema = {field.name: field.field_type for field in table.schema().fields} + + assert len(schema) == len( + expected_fields + ), f"Expected {len(expected_fields)} fields, got {len(schema)}" + + for field_name, expected_type in expected_fields.items(): + assert field_name in schema, f"Field {field_name} not found in schema" + assert isinstance(schema[field_name], expected_type), ( + f"Field {field_name} expected type {expected_type}, " + f"got {type(schema[field_name])}" + ) + + +@pytest.mark.skipif( + get_pyarrow_version() < parse_version("14.0.0"), + reason="PyIceberg 0.7.0 fails on pyarrow <= 14.0.0", +) +class TestBasicWriteModes: + """Test basic write operations for APPEND, UPSERT, and OVERWRITE modes.""" + + def test_append_basic(self, clean_table): + """Test basic APPEND mode - add new rows without schema changes.""" + initial_data = _create_typed_dataframe( + {"col_a": [1, 2], "col_b": ["row_1", "row_2"], "col_c": [1, 2]} + ) + _write_to_iceberg(initial_data) + + append_data = _create_typed_dataframe( + {"col_a": [3, 4], "col_b": ["row_3", "row_4"], "col_c": [3, 4]} + ) + _write_to_iceberg(append_data, mode=SaveMode.APPEND) + + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe( + { + "col_a": [1, 2, 3, 4], + "col_b": ["row_1", "row_2", "row_3", "row_4"], + "col_c": [1, 2, 3, 4], + } + ) + assert rows_same(result_df, expected) + + def test_upsert_basic(self, clean_table): + """Test basic upsert - update existing rows and insert new ones.""" + initial_data = _create_typed_dataframe( + { + "col_a": [1, 2, 3], + "col_b": ["initial_1", "initial_2", "initial_3"], + "col_c": [1, 2, 3], + } + ) + _write_to_iceberg(initial_data) + + upsert_data = _create_typed_dataframe( + { + "col_a": [2, 3, 4], + "col_b": ["updated_2", "updated_3", "new_4"], + "col_c": [2, 3, 4], + } + ) + _write_to_iceberg( + upsert_data, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": ["col_a"]} + ) + + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe( + { + "col_a": [1, 2, 3, 4], + "col_b": ["initial_1", "updated_2", "updated_3", "new_4"], + "col_c": [1, 2, 3, 4], + } + ) + assert rows_same(result_df, expected) + + def test_upsert_composite_key(self, clean_table): + """Test upsert with composite key (multiple identifier fields).""" + initial_data = _create_typed_dataframe( + { + "col_a": [1, 1, 2, 2], + "col_b": ["A", "B", "A", "B"], + "col_c": [10, 20, 30, 40], + } + ) + _write_to_iceberg(initial_data) + + # Update (1, "B") and (2, "A"), insert (3, "A") + upsert_data = _create_typed_dataframe( + {"col_a": [1, 2, 3], "col_b": ["B", "A", "A"], "col_c": [999, 888, 777]} + ) + _write_to_iceberg( + upsert_data, + mode=SaveMode.UPSERT, + upsert_kwargs={"join_cols": ["col_a", "col_b"]}, + ) + + result_df = _read_from_iceberg(sort_by=["col_a", "col_b"]) + expected = _create_typed_dataframe( + { + "col_a": [1, 1, 2, 2, 3], + "col_b": ["A", "B", "A", "B", "A"], + "col_c": [10, 999, 888, 40, 777], + } + ) + assert rows_same(result_df, expected) + + def test_overwrite_full_table(self, clean_table): + """Test full table overwrite - replace all data.""" + initial_data = _create_typed_dataframe( + { + "col_a": [1, 2, 3, 4, 5], + "col_b": ["old_1", "old_2", "old_3", "old_4", "old_5"], + "col_c": [1, 2, 3, 4, 5], + } + ) + _write_to_iceberg(initial_data) + + new_data = _create_typed_dataframe( + { + "col_a": [10, 20, 30], + "col_b": ["new_10", "new_20", "new_30"], + "col_c": [100, 200, 300], + } + ) + _write_to_iceberg(new_data, mode=SaveMode.OVERWRITE) + + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe( + { + "col_a": [10, 20, 30], + "col_b": ["new_10", "new_20", "new_30"], + "col_c": [100, 200, 300], + } + ) + assert rows_same(result_df, expected) + + def test_overwrite_with_filter(self, clean_table): + """Test partial overwrite using filter expression.""" + initial_data = _create_typed_dataframe( + { + "col_a": [1, 2, 3, 4, 5], + "col_b": ["data_1", "data_2", "data_3", "data_4", "data_5"], + "col_c": [1, 1, 2, 2, 3], + } + ) + _write_to_iceberg(initial_data) + + # Replace only rows where col_c == 2 + overwrite_data = _create_typed_dataframe( + { + "col_a": [10, 20], + "col_b": ["replaced_10", "replaced_20"], + "col_c": [2, 2], + } + ) + _write_to_iceberg( + overwrite_data, mode=SaveMode.OVERWRITE, overwrite_filter=col("col_c") == 2 + ) + + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe( + { + "col_a": [1, 2, 5, 10, 20], + "col_b": ["data_1", "data_2", "data_5", "replaced_10", "replaced_20"], + "col_c": [1, 1, 3, 2, 2], + } + ) + assert rows_same(result_df, expected) + + +@pytest.mark.skipif( + get_pyarrow_version() < parse_version("14.0.0"), + reason="PyIceberg 0.7.0 fails on pyarrow <= 14.0.0", +) +class TestSchemaEvolution: + """Test schema evolution across different write modes.""" + + @pytest.mark.parametrize("mode", [SaveMode.APPEND, SaveMode.OVERWRITE]) + def test_schema_evolution_by_mode(self, clean_table, mode): + """Test adding new columns works for APPEND and OVERWRITE.""" + initial_data = _create_typed_dataframe( + {"col_a": [1, 2], "col_b": ["row_1", "row_2"], "col_c": [1, 2]} + ) + _write_to_iceberg(initial_data) + + new_data = _create_typed_dataframe( + { + "col_a": [3, 4], + "col_b": ["row_3", "row_4"], + "col_c": [3, 4], + "col_d": ["extra_3", "extra_4"], + } + ) + _write_to_iceberg(new_data, mode=mode) + + _verify_schema( + { + "col_a": pyi_types.IntegerType, + "col_b": pyi_types.StringType, + "col_c": pyi_types.IntegerType, + "col_d": pyi_types.StringType, + } + ) + + result_df = _read_from_iceberg(sort_by="col_a") + if mode == SaveMode.APPEND: + expected = _create_typed_dataframe( + { + "col_a": [1, 2, 3, 4], + "col_b": ["row_1", "row_2", "row_3", "row_4"], + "col_c": [1, 2, 3, 4], + "col_d": [None, None, "extra_3", "extra_4"], + } + ) + else: # OVERWRITE + expected = _create_typed_dataframe( + { + "col_a": [3, 4], + "col_b": ["row_3", "row_4"], + "col_c": [3, 4], + "col_d": ["extra_3", "extra_4"], + } + ) + assert rows_same(result_df, expected) + + def test_multiple_schema_evolutions(self, clean_table): + """Test multiple sequential schema evolutions.""" + initial_data = _create_typed_dataframe( + {"col_a": [1], "col_b": ["row_1"], "col_c": [10]} + ) + _write_to_iceberg(initial_data) + + # First evolution: add col_d + data_with_d = _create_typed_dataframe( + {"col_a": [2], "col_b": ["row_2"], "col_c": [20], "col_d": ["extra_2"]} + ) + _write_to_iceberg(data_with_d, mode=SaveMode.APPEND) + + _verify_schema( + { + "col_a": pyi_types.IntegerType, + "col_b": pyi_types.StringType, + "col_c": pyi_types.IntegerType, + "col_d": pyi_types.StringType, + } + ) + + # Second evolution: add col_e + data_with_e = _create_typed_dataframe( + { + "col_a": [3], + "col_b": ["row_3"], + "col_c": [30], + "col_d": ["extra_3"], + "col_e": ["bonus_3"], + } + ) + _write_to_iceberg(data_with_e, mode=SaveMode.APPEND) + + _verify_schema( + { + "col_a": pyi_types.IntegerType, + "col_b": pyi_types.StringType, + "col_c": pyi_types.IntegerType, + "col_d": pyi_types.StringType, + "col_e": pyi_types.StringType, + } + ) + + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe( + { + "col_a": [1, 2, 3], + "col_b": ["row_1", "row_2", "row_3"], + "col_c": [10, 20, 30], + "col_d": [None, "extra_2", "extra_3"], + "col_e": [None, None, "bonus_3"], + } + ) + assert rows_same(result_df, expected) + + def test_column_order_independence(self, clean_table): + """Test writing data with columns in different order works.""" + initial_data = _create_typed_dataframe( + {"col_a": [1, 2], "col_b": ["row_1", "row_2"], "col_c": [1, 2]} + ) + _write_to_iceberg(initial_data) + + # Append data with columns in different order + reordered_data = _create_typed_dataframe( + {"col_c": [3, 4], "col_a": [3, 4], "col_b": ["row_3", "row_4"]} + ) + _write_to_iceberg(reordered_data, mode=SaveMode.APPEND) + + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe( + { + "col_a": [1, 2, 3, 4], + "col_b": ["row_1", "row_2", "row_3", "row_4"], + "col_c": [1, 2, 3, 4], + } + ) + assert rows_same(result_df, expected) + + +@pytest.mark.skipif( + get_pyarrow_version() < parse_version("14.0.0"), + reason="PyIceberg 0.7.0 fails on pyarrow <= 14.0.0", +) +class TestUpsertScenarios: + """Test various upsert matching scenarios.""" + + @pytest.mark.parametrize( + "upsert_keys,upsert_col_b,upsert_col_c,expected_data", + [ + # No matching rows (behaves like append) + ( + [4, 5, 6], + ["new_4", "new_5", "new_6"], + [40, 50, 60], + { + "col_a": [1, 2, 3, 4, 5, 6], + "col_b": ["row_1", "row_2", "row_3", "new_4", "new_5", "new_6"], + "col_c": [10, 20, 30, 40, 50, 60], + }, + ), + # All rows match (behaves like update) + ( + [1, 2, 3], + ["updated_1", "updated_2", "updated_3"], + [100, 200, 300], + { + "col_a": [1, 2, 3], + "col_b": ["updated_1", "updated_2", "updated_3"], + "col_c": [100, 200, 300], + }, + ), + # Partial match (mixed update and insert) + ( + [2, 3, 4], + ["updated_2", "updated_3", "new_4"], + [200, 300, 40], + { + "col_a": [1, 2, 3, 4], + "col_b": ["row_1", "updated_2", "updated_3", "new_4"], + "col_c": [10, 200, 300, 40], + }, + ), + ], + ) + def test_upsert_matching_scenarios( + self, clean_table, upsert_keys, upsert_col_b, upsert_col_c, expected_data + ): + """Test upsert with different row matching patterns.""" + initial_data = _create_typed_dataframe( + { + "col_a": [1, 2, 3], + "col_b": ["row_1", "row_2", "row_3"], + "col_c": [10, 20, 30], + } + ) + _write_to_iceberg(initial_data) + + upsert_data = _create_typed_dataframe( + {"col_a": upsert_keys, "col_b": upsert_col_b, "col_c": upsert_col_c} + ) + _write_to_iceberg( + upsert_data, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": ["col_a"]} + ) + + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe(expected_data) + assert rows_same(result_df, expected) + + +@pytest.mark.skipif( + get_pyarrow_version() < parse_version("14.0.0"), + reason="PyIceberg 0.7.0 fails on pyarrow <= 14.0.0", +) +class TestOverwriteScenarios: + """Test various overwrite filter scenarios.""" + + @pytest.mark.parametrize( + "filter_expr,overwrite_col_c,expected_data", + [ + # Filter matches nothing (behaves like append) + ( + col("col_c") == 999, + [999, 999], + { + "col_a": [1, 2, 3, 4, 5], + "col_b": ["row_1", "row_2", "row_3", "row_4", "row_5"], + "col_c": [10, 20, 30, 999, 999], + }, + ), + # Filter matches some rows + ( + col("col_c") >= 20, + [200, 300], + { + "col_a": [1, 4, 5], + "col_b": ["row_1", "row_4", "row_5"], + "col_c": [10, 200, 300], + }, + ), + # Filter matches all rows (full overwrite) + ( + col("col_c") < 100, + [40, 50], + { + "col_a": [4, 5], + "col_b": ["row_4", "row_5"], + "col_c": [40, 50], + }, + ), + ], + ) + def test_overwrite_filter_scenarios( + self, clean_table, filter_expr, overwrite_col_c, expected_data + ): + """Test partial overwrite with different filter matching patterns.""" + initial_data = _create_typed_dataframe( + { + "col_a": [1, 2, 3], + "col_b": ["row_1", "row_2", "row_3"], + "col_c": [10, 20, 30], + } + ) + _write_to_iceberg(initial_data) + + overwrite_data = _create_typed_dataframe( + {"col_a": [4, 5], "col_b": ["row_4", "row_5"], "col_c": overwrite_col_c} + ) + _write_to_iceberg( + overwrite_data, mode=SaveMode.OVERWRITE, overwrite_filter=filter_expr + ) + + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe(expected_data) + assert rows_same(result_df, expected) + + +@pytest.mark.skipif( + get_pyarrow_version() < parse_version("14.0.0"), + reason="PyIceberg 0.7.0 fails on pyarrow <= 14.0.0", +) +class TestEdgeCases: + """Test edge cases and special scenarios.""" + + @pytest.mark.parametrize( + "mode", [SaveMode.APPEND, SaveMode.UPSERT, SaveMode.OVERWRITE] + ) + def test_write_empty_dataset(self, clean_table, mode): + """Test writing empty dataset doesn't fail or modify table.""" + initial_data = _create_typed_dataframe( + {"col_a": [1, 2], "col_b": ["row_1", "row_2"], "col_c": [1, 2]} + ) + _write_to_iceberg(initial_data) + + empty_data = _create_typed_dataframe({"col_a": [], "col_b": [], "col_c": []}) + + write_kwargs = {} + if mode == SaveMode.UPSERT: + write_kwargs["upsert_kwargs"] = {"join_cols": ["col_a"]} + elif mode == SaveMode.OVERWRITE: + write_kwargs["overwrite_filter"] = col("col_c") == 999 + + _write_to_iceberg(empty_data, mode=mode, **write_kwargs) + + result_df = _read_from_iceberg(sort_by="col_a") + assert rows_same(result_df, initial_data) + + def test_overwrite_empty_table(self, clean_table): + """Test overwriting an empty table.""" + data = _create_typed_dataframe( + { + "col_a": [1, 2, 3], + "col_b": ["row_1", "row_2", "row_3"], + "col_c": [10, 20, 30], + } + ) + _write_to_iceberg(data, mode=SaveMode.OVERWRITE) + + result_df = _read_from_iceberg(sort_by="col_a") + assert rows_same(result_df, data) + + @pytest.mark.parametrize("mode", [SaveMode.APPEND, SaveMode.OVERWRITE]) + def test_snapshot_properties(self, clean_table, mode): + """Test snapshot_properties are passed through for APPEND and OVERWRITE.""" + # Note: UPSERT doesn't support snapshot_properties in PyIceberg + data = _create_typed_dataframe( + { + "col_a": [1, 2, 3], + "col_b": ["row_1", "row_2", "row_3"], + "col_c": [10, 20, 30], + } + ) + + snapshot_props = {"test_property": "test_value", "author": "ray_data_test"} + + ds = ray.data.from_pandas(data) + ds.write_iceberg( + table_identifier=f"{_DB_NAME}.{_TABLE_NAME}", + catalog_kwargs=_CATALOG_KWARGS.copy(), + mode=mode, + snapshot_properties=snapshot_props, + ) + + sql_catalog = pyi_catalog.load_catalog(**_CATALOG_KWARGS) + table = sql_catalog.load_table(f"{_DB_NAME}.{_TABLE_NAME}") + latest_snapshot = table.current_snapshot() + + assert latest_snapshot is not None + assert latest_snapshot.summary.get("test_property") == "test_value" + assert latest_snapshot.summary.get("author") == "ray_data_test" + + def test_mixed_mode_operations(self, clean_table): + """Test mixing different write modes in sequence.""" + # Start with APPEND + data1 = _create_typed_dataframe( + {"col_a": [1, 2], "col_b": ["a", "b"], "col_c": [10, 20]} + ) + _write_to_iceberg(data1, mode=SaveMode.APPEND) + + # Then UPSERT (update row 2, add row 3) + data2 = _create_typed_dataframe( + {"col_a": [2, 3], "col_b": ["updated_b", "c"], "col_c": [200, 30]} + ) + _write_to_iceberg( + data2, mode=SaveMode.UPSERT, upsert_kwargs={"join_cols": ["col_a"]} + ) + + # Then OVERWRITE with filter (deletes rows 2 and 3, adds rows 4 and 5) + data3 = _create_typed_dataframe( + {"col_a": [4, 5], "col_b": ["d", "e"], "col_c": [40, 50]} + ) + _write_to_iceberg( + data3, mode=SaveMode.OVERWRITE, overwrite_filter=col("col_c") >= 30 + ) + + # Verify final state: rows 2 and 3 deleted, rows 4 and 5 added + result_df = _read_from_iceberg(sort_by="col_a") + expected = _create_typed_dataframe( + {"col_a": [1, 4, 5], "col_b": ["a", "d", "e"], "col_c": [10, 40, 50]} + ) + assert rows_same(result_df, expected) + + if __name__ == "__main__": import sys diff --git a/python/requirements/ml/data-test-requirements.txt b/python/requirements/ml/data-test-requirements.txt index 14d2e9c2dfe9..d7a37d221e40 100644 --- a/python/requirements/ml/data-test-requirements.txt +++ b/python/requirements/ml/data-test-requirements.txt @@ -19,7 +19,7 @@ deltalake==0.9.0 pytest-mock decord snowflake-connector-python>=3.15.0 -pyiceberg[sql-sqlite]==0.9.0 +pyiceberg[sql-sqlite]==0.10.0 clickhouse-connect pybase64 hudi==0.4.0 diff --git a/python/requirements_compiled.txt b/python/requirements_compiled.txt index 42e5bcf044dc..b029bd508982 100644 --- a/python/requirements_compiled.txt +++ b/python/requirements_compiled.txt @@ -1684,7 +1684,7 @@ pygments==2.18.0 # nbconvert # rich # sphinx -pyiceberg==0.9.0 +pyiceberg==0.10.0 # via -r python/requirements/ml/data-test-requirements.txt pyjwt==2.8.0 # via @@ -1734,6 +1734,8 @@ pyro-ppl==1.9.1 # via botorch pyro4==4.82 # via hpbandster +pyroaring==1.0.3 + # via pyiceberg pysocks==1.7.1 # via requests pyspark==3.4.1