diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py index a064ae3bdfdf..e7903e221cae 100644 --- a/python/pyiceberg/io/pyarrow.py +++ b/python/pyiceberg/io/pyarrow.py @@ -31,6 +31,7 @@ TYPE_CHECKING, Any, Callable, + Dict, Iterable, List, Optional, @@ -72,6 +73,7 @@ OutputFile, OutputStream, ) +from pyiceberg.manifest import DataFileContent from pyiceberg.schema import ( PartnerAccessor, Schema, @@ -391,7 +393,7 @@ def visit_timestamp(self, _: TimestampType) -> pa.DataType: return pa.timestamp(unit="us") def visit_timestampz(self, _: TimestamptzType) -> pa.DataType: - return pa.timestamp(unit="us", tz="+00:00") + return pa.timestamp(unit="us", tz="UTC") def visit_string(self, _: StringType) -> pa.DataType: return pa.string() @@ -470,6 +472,15 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression: return boolean_expression_visit(expr, _ConvertToArrowExpression()) +def _read_deletes(fs: FileSystem, file_path: str) -> Dict[str, pa.ChunkedArray]: + _, path = PyArrowFileIO.parse_location(file_path) + table = pq.read_table( + source=path, pre_buffer=True, buffer_size=8 * ONE_MEGABYTE, read_dictionary=["file_path"], filesystem=fs + ) + table.unify_dictionaries() + return {file.as_py(): table.filter(pc.field("file_path") == file).column(1) for file in table.columns[0].chunks[0].dictionary} + + def _file_to_table( fs: FileSystem, task: FileScanTask, @@ -477,6 +488,7 @@ def _file_to_table( projected_schema: Schema, projected_field_ids: Set[int], case_sensitive: bool, + positional_deletes: List[pa.ChunkedArray], ) -> pa.Table: _, path = PyArrowFileIO.parse_location(task.file.file_path) @@ -515,6 +527,14 @@ def _file_to_table( if pyarrow_filter is not None: arrow_table = arrow_table.filter(pyarrow_filter) + if len(positional_deletes) > 0: + # When there are positional deletes, create a filter mask + mask = [True] * len(arrow_table) + for buffer in positional_deletes: + for pos in buffer: + mask[pos.as_py()] = False + arrow_table = arrow_table.filter(mask) + return to_requested_schema(projected_schema, file_project_schema, arrow_table) @@ -546,11 +566,45 @@ def project_table( id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) }.union(extract_field_ids(bound_row_filter)) + tasks_data_files: List[FileScanTask] = [] + tasks_positional_deletes: List[FileScanTask] = [] + for task in tasks: + if task.file.content == DataFileContent.DATA: + tasks_data_files.append(task) + elif task.file.content == DataFileContent.POSITION_DELETES: + tasks_positional_deletes.append(task) + elif task.file.content == DataFileContent.EQUALITY_DELETES: + raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568") + else: + raise ValueError(f"Unknown file content: {task.file.content}") + with ThreadPool() as pool: + positional_deletes_per_file: Dict[str, List[pa.ChunkedArray]] = {} + if tasks_positional_deletes: + # If there are any positional deletes, get those first + for delete_files in pool.starmap( + func=_read_deletes, + iterable=[(fs, task.file.file_path) for task in tasks_positional_deletes], + ): + for file, buffer in delete_files.items(): + positional_deletes_per_file[file] = positional_deletes_per_file.get(file, []) + [buffer] + tables = pool.starmap( func=_file_to_table, - iterable=[(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive) for task in tasks], - chunksize=None, # we could use this to control how to materialize the generator of tasks (we should also make the expression above lazy) + iterable=[ + ( + fs, + task, + bound_row_filter, + projected_schema, + projected_field_ids, + case_sensitive, + positional_deletes_per_file.get(task.file.file_path, []), + ) + for task in tasks_data_files + ], + chunksize=None, + # we could use this to control how to materialize the generator of tasks (we should also make the expression above lazy) ) if len(tables) > 1: diff --git a/python/pyiceberg/table/__init__.py b/python/pyiceberg/table/__init__.py index dc9cc9e03643..1a69f4f7e59b 100644 --- a/python/pyiceberg/table/__init__.py +++ b/python/pyiceberg/table/__init__.py @@ -47,12 +47,7 @@ from pyiceberg.expressions.visitors import inclusive_projection from pyiceberg.io import FileIO from pyiceberg.io.pyarrow import project_table -from pyiceberg.manifest import ( - DataFile, - ManifestContent, - ManifestFile, - files, -) +from pyiceberg.manifest import DataFile, ManifestFile, files from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.metadata import TableMetadata @@ -282,21 +277,10 @@ def __init__(self, data_file: DataFile, start: Optional[int] = None, length: Opt self.length = length or data_file.file_size_in_bytes -def _check_content(file: DataFile) -> DataFile: - try: - if file.content == ManifestContent.DELETES: - raise ValueError("PyIceberg does not support deletes: https://github.com/apache/iceberg/issues/6568") - return file - except AttributeError: - # If the attribute is not there, it is a V1 record - return file - - def _open_manifest(io: FileIO, manifest: ManifestFile, partition_filter: Callable[[DataFile], bool]) -> List[FileScanTask]: all_files = files(io.new_input(manifest.manifest_path)) matching_partition_files = filter(partition_filter, all_files) - matching_partition_data_files = map(_check_content, matching_partition_files) - return [FileScanTask(file) for file in matching_partition_data_files] + return [FileScanTask(file) for file in matching_partition_files] class DataScan(TableScan["DataScan"]): @@ -352,7 +336,6 @@ def plan_files(self) -> Iterator[FileScanTask]: # step 2: filter the data files in each manifest # this filter depends on the partition spec used to write the manifest file - partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) with ThreadPool() as pool: diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py index f894963d9421..c6ae6eea492a 100644 --- a/python/tests/io/test_pyarrow.py +++ b/python/tests/io/test_pyarrow.py @@ -55,11 +55,12 @@ PyArrowFile, PyArrowFileIO, _ConvertToArrowSchema, + _read_deletes, expression_to_pyarrow, project_table, schema_to_pyarrow, ) -from pyiceberg.manifest import DataFile, FileFormat +from pyiceberg.manifest import DataFile, DataFileContent, FileFormat from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema, visit from pyiceberg.table import FileScanTask, Table @@ -377,7 +378,7 @@ def test_timestamp_type_to_pyarrow() -> None: def test_timestamptz_type_to_pyarrow() -> None: iceberg_type = TimestamptzType() - assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="+00:00") + assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="UTC") def test_string_type_to_pyarrow() -> None: @@ -749,7 +750,14 @@ def project( return project_table( [ FileScanTask( - DataFile(file_path=file, file_format=FileFormat.PARQUET, partition={}, record_count=3, file_size_in_bytes=3) + DataFile( + content=DataFileContent.DATA, + file_path=file, + file_format=FileFormat.PARQUET, + partition={}, + record_count=3, + file_size_in_bytes=3, + ) ) for file in files ], @@ -1114,3 +1122,25 @@ def test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str _ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), schema_int_str) assert "Could not find field with name unknown_field, case_sensitive=True" in str(exc_info.value) + + +@pytest.fixture +def deletes_file(tmp_path: str) -> str: + import pyarrow as pa + import pyarrow.parquet as pq + path = "s3://bucket/default.db/table/data.parquet" + + table = pa.table({'file_path': [path, path, path], + 'pos': [19, 22, 25]}) + + deletes_file_path = f"{tmp_path}/deletes.parquet" + pq.write_table(table, deletes_file_path) + + return deletes_file_path + + +def test_read_deletes(deletes_file: str) -> None: + # None filesystem will default to a local filesystem + deletes = _read_deletes(None, deletes_file) + assert set(deletes.keys()) == {"s3://bucket/default.db/table/data.parquet"} + assert list(deletes.values())[0] == pa.chunked_array([[19, 22, 25]]) diff --git a/python/tests/table/test_init.py b/python/tests/table/test_init.py index 506c6609f7f9..3c663ceeb442 100644 --- a/python/tests/table/test_init.py +++ b/python/tests/table/test_init.py @@ -26,10 +26,9 @@ In, ) from pyiceberg.io import load_file_io -from pyiceberg.manifest import DataFile, ManifestContent from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import Table, _check_content +from pyiceberg.table import Table from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.table.snapshots import ( Operation, @@ -44,7 +43,6 @@ SortOrder, ) from pyiceberg.transforms import BucketTransform, IdentityTransform -from pyiceberg.typedef import Record from pyiceberg.types import LongType, NestedField @@ -245,23 +243,3 @@ def test_table_scan_projection_unknown_column(table: Table) -> None: _ = scan.select("a").projection() assert "Could not find column: 'a'" in str(exc_info.value) - - -def test_check_content_deletes() -> None: - with pytest.raises(ValueError) as exc_info: - _check_content( - DataFile( - content=ManifestContent.DELETES, - ) - ) - assert "PyIceberg does not support deletes: https://github.com/apache/iceberg/issues/6568" in str(exc_info.value) - - -def test_check_content_data() -> None: - manifest_file = DataFile(content=ManifestContent.DATA) - assert _check_content(manifest_file) == manifest_file - - -def test_check_content_missing_attr() -> None: - r = Record(*([None] * 15)) - assert _check_content(r) == r # type: ignore