-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: Add positional deletes #6775
Conversation
a3fb95a
to
fc49b8b
Compare
fc49b8b
to
8b293cb
Compare
python/pyiceberg/io/pyarrow.py
Outdated
@@ -28,12 +28,16 @@ | |||
import os | |||
from abc import ABC, abstractmethod | |||
from functools import lru_cache, singledispatch | |||
from heapq import merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Are there any specific changes that you would like to see in a separate PR? The heapq
is used for merging the different positional deletes.
python/pyiceberg/io/pyarrow.py
Outdated
for pos in range(fn_rows()): | ||
if deleted_pos == pos: | ||
try: | ||
deleted_pos = next(sorted_deleted).as_py() # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oof, nice catch!
>>> sorted_deleted = iter([1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 4, 5, 6])
>>> deleted_pos = next(sorted_deleted)
>>> for pos in range(10):
... if deleted_pos == pos:
... while deleted_pos == pos:
... try:
... deleted_pos = next(sorted_deleted)
... except StopIteration:
... deleted_pos = -1
... else:
... print(f"yield {pos}")
...
...
yield 0
yield 3
yield 7
yield 8
yield 9
python/pyiceberg/io/pyarrow.py
Outdated
def _create_positional_deletes_indices(positional_deletes: List[pa.ChunkedArray], fn_rows: Callable[[], int]) -> pa.Array: | ||
sorted_deleted = merge(*positional_deletes) | ||
|
||
def generator() -> Generator[int, None, None]: | ||
deleted_pos = next(sorted_deleted).as_py() # type: ignore | ||
for pos in range(fn_rows()): | ||
if deleted_pos == pos: | ||
try: | ||
deleted_pos = next(sorted_deleted).as_py() # type: ignore | ||
except StopIteration: | ||
deleted_pos = -1 | ||
else: | ||
yield pos | ||
|
||
# Filter on the positions | ||
return pa.array(generator(), type=pa.int64()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorisvandenbossche Let me know if you're interested in providing this from the PyArrow side :) Would be very welcome.
python/pyiceberg/io/pyarrow.py
Outdated
@@ -786,15 +865,39 @@ def project_table( | |||
rows_counter = multiprocessing.Value("i", 0) | |||
|
|||
with ThreadPool() as pool: | |||
# Fetch the deletes | |||
deletes_per_file: Dict[str, List[ChunkedArray]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good one, I like that a lot!
@@ -259,7 +262,7 @@ def projection(self) -> Schema: | |||
return snapshot_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) | |||
|
|||
@abstractmethod | |||
def plan_files(self) -> Iterator[ScanTask]: | |||
def plan_files(self) -> Iterable[ScanTask]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't break the code of anyone using this, but it might alarm the type checker.
python/pyiceberg/table/__init__.py
Outdated
@@ -401,9 +423,38 @@ def plan_files(self) -> Iterator[FileScanTask]: | |||
metrics_evaluator, | |||
) | |||
for manifest in manifests | |||
if (manifest.content is None or manifest.content == ManifestContent.DATA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this, I'm going to separate this out in a function.
python/pyiceberg/table/__init__.py
Outdated
data_datafiles.append(datafile) | ||
elif datafile.content == DataFileContent.POSITION_DELETES: | ||
deletes_positional.append(datafile) | ||
elif datafile.content == DataFileContent.EQUALITY_DELETES: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left it out for now since it is already quite a hefty PR
python/pyiceberg/table/__init__.py
Outdated
] | ||
|
||
def _match_deletes_to_datafile(self, data_file: DataFile, positional_delete_files: List[DataFile]) -> Set[DataFile]: | ||
return set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, I see. Yes. I agree. I also recall this from the Java implementation, but I was confused with the sequence numbers on the manifest-list level.
I've added a sorted-list for collecting the deletes by sequence number, then we can efficiently bisect them by only selecting the deletes that came after the data file.
python/pyiceberg/io/pyarrow.py
Outdated
columns=[col.name for col in file_project_schema.columns], | ||
) | ||
|
||
if positional_deletes: | ||
# In the case of a mask, it is a bit awkward because we first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, the problem is that we are relying on the arrow result to correspond 1-to-1 with the records in the file so that we can use position in the DataFrame as the row position in the file.
Yes, this is correct.
But if we need to read deletes, we don't want to read the entire file, which could mean reading whole row groups that are unnecessary.
Based on the row group statistics, yes.
I don't know if Arrow supports this, but it would need to.
I don't think Arrow supports this today. I think we can even implement this on the PyIceberg side, but I don't think we should. Because:
- It is internal to PyArrow
- This would pull a lot of hot code into Python, where the GIL will slow us down.
The points that you address above are correct. At the Arrow side we're looking into implementing this: apache/arrow#35301
The last comment was about adding an internal index column that can be used for this purpose. This way we can combine the filters, and push this down to PyArrow (and also simplify things at the PyIceberg end, I feel like all the if-else branches make the code error prone).
…eberg into fd-positional-deletes
I was doing some work on the Python side: apache#6775 But ran into an issue when creating some integration tests for testing the positional deletes. I ended up with double slashes: s3://warehouse/default/test_positional_mor_deletes/data//00000-32-70be11f7-3c4b-40e0-b35a-334e97ef6554-00001-deletes.parquet It looks like the Struct is not-null, but the partition not partitioned, therefore it creates a partitioned path, but with the empty struct we'll end up with a double slash `//` that Minio doesn't like. Outputfactory.java ```java public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) { // partition is a StructCopy String newDataLocation = locations.newDataLocation(spec, partition, generateFilename()); OutputFile rawOutputFile = io.newOutputFile(newDataLocation); return encryptionManager.encrypt(rawOutputFile); } ``` ClusteredWriter.java ```java // copy the partition key as the key object may be reused this.currentPartition = StructCopy.copy(partition); // partition is a StructProjection this.currentWriter = newWriter(currentSpec, currentPartition); ``` I still have to dig into why there is a StructProjection. Resolves apache#7678
This PR grows very big. Let's split some stuff into smaller PRs to make reviewing easier. Let's start with support for |
I was doing some work on the Python side: apache#6775 But ran into an issue when creating some integration tests for testing the positional deletes. I ended up with double slashes: s3://warehouse/default/test_positional_mor_deletes/data//00000-32-70be11f7-3c4b-40e0-b35a-334e97ef6554-00001-deletes.parquet It looks like the Struct is not-null, but the partition not partitioned, therefore it creates a partitioned path, but with the empty struct we'll end up with a double slash `//` that Minio doesn't like. Outputfactory.java ```java public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) { // partition is a StructCopy String newDataLocation = locations.newDataLocation(spec, partition, generateFilename()); OutputFile rawOutputFile = io.newOutputFile(newDataLocation); return encryptionManager.encrypt(rawOutputFile); } ``` ClusteredWriter.java ```java // copy the partition key as the key object may be reused this.currentPartition = StructCopy.copy(partition); // partition is a StructProjection this.currentWriter = newWriter(currentSpec, currentPartition); ``` Resolves apache#7678
4cc9ccc
to
60c83b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looks good to me. I'd say let's merge it and improve as we get new features into Arrow.
@rdblue Sounds like a plan! I'll follow up on this once we get |
Closes #6568