-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Add positional and equality delete file count to ExpireSnapshot results #4629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
from what I can tell, all Spark share the similar interface |
...3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
...k/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
Outdated
Show resolved
Hide resolved
...3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
Show resolved
Hide resolved
...3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
...3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
b1e2bfa to
6b452f1
Compare
| public void testExpireOlderThanWithDeleteFile() { | ||
| table.updateProperties() | ||
| .set(TableProperties.FORMAT_VERSION, "2") | ||
| .set(TableProperties.MANIFEST_MERGE_ENABLED, "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question] Any specific reason to disable the manifest merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea as this test checks the exact list of files removed for ExpireSnapshots, I thought it's simpler to get this if we don't have to deal with manifests auto-merging after some time, as with the merge turned off, I just need to get manifests from the latest snapshot.
Though in this case, I tried and it seems not necessary as the auto-merge does not kick in (default is set high). I was thinking to keep it in case the algorithm changes and manifests start merging, failing this test, but I am open, if it's better to remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, makes sense to me to keep the manifest merge as false here.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
...3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
Show resolved
Hide resolved
| switch (manifest.content()) { | ||
| case DATA: | ||
| return CloseableIterator.transform( | ||
| ManifestFiles.read(manifest, table.getValue().io(), table.getValue().specs()).iterator(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not necessary, only path is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I got the comment, but this needs to get the FileContent of the file to get the total number in the end, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the other pr, it was readPath, did this method open the data file ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, we can't use readPaths as we need to fetch the content file type. That being said, I think we have to add a projection. Right now, we will read all columns whereas we read only paths before.
Will something like this work?
public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
FileIO io = table.getValue().io();
Map<Integer, PartitionSpec> specs = table.getValue().specs();
ImmutableList<String> projection = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());
switch (manifest.content()) {
case DATA:
return CloseableIterator.transform(
ManifestFiles.read(manifest, io, specs).select(projection).iterator(),
ReadManifest::contentFileWithType);
case DELETES:
return CloseableIterator.transform(
ManifestFiles.readDeleteManifest(manifest, io, specs).select(projection).iterator(),
ReadManifest::contentFileWithType);
default:
throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks for suggestion
...3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
| switch (manifest.content()) { | ||
| case DATA: | ||
| return CloseableIterator.transform( | ||
| ManifestFiles.read(manifest, table.getValue().io(), table.getValue().specs()).iterator(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, we can't use readPaths as we need to fetch the content file type. That being said, I think we have to add a projection. Right now, we will read all columns whereas we read only paths before.
Will something like this work?
public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
FileIO io = table.getValue().io();
Map<Integer, PartitionSpec> specs = table.getValue().specs();
ImmutableList<String> projection = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());
switch (manifest.content()) {
case DATA:
return CloseableIterator.transform(
ManifestFiles.read(manifest, io, specs).select(projection).iterator(),
ReadManifest::contentFileWithType);
case DELETES:
return CloseableIterator.transform(
ManifestFiles.readDeleteManifest(manifest, io, specs).select(projection).iterator(),
ReadManifest::contentFileWithType);
default:
throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
}
}
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
...k/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
Outdated
Show resolved
Hide resolved
...3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
Outdated
Show resolved
Hide resolved
...3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
Outdated
Show resolved
Hide resolved
|
@aokolnychyi resolved all the comments, save the one about not having access to field ManifestFile 'content'. I mean this still works as-is (just the ManifestContent=DELETES path is never taken though still returning delete files), but I can wait until that one goes in, either way. |
26f6725 to
7975d94
Compare
|
@aokolnychyi when you have a chance, this now correctly utilizes 'content' field of Manifest table during the computation of valid files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I had a few mostly optional comments.
Great work, @szehon-ho! I think we should fix other actions next.
| - code: "java.method.addedToInterface" | ||
| new: "method ThisT org.apache.iceberg.SnapshotUpdate<ThisT>::scanManifestsWith(java.util.concurrent.ExecutorService)" | ||
| justification: "Accept all changes prior to introducing API compatibility checks" | ||
| - code: "java.method.addedToInterface" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am okay with this given other changes, upcoming 1.0 and that it is mostly Iceberg itself that provides implementations of this interface. Even if someone has custom actions, they probably reuse the provided result implementation.
If other folks are concerned, we could default the new methods.
core/src/main/java/org/apache/iceberg/actions/BaseExpireSnapshotsActionResult.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
|
|
||
| private void checkExpirationResults(long expectedDatafiles, long expectedManifestsDeleted, | ||
| long expectedManifestListsDeleted, ExpireSnapshots.Result results) { | ||
| private void checkExpirationResults(long expectedDatafiles, long expectedPosDeleteFiles, long expectedEqDeleteFiles, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing the indentation!
|
Thanks @aokolnychyi , @zinking , @dramaticlly , @rajarshisarkar for detailed reviews |
Currently both the ExpireSnapshot SparkAction and Procedure return 'deletedDataFileCount' but this includes delete files.
Now return deletedPositionalDeleteFiles and deletedEqualityDeleteFiles as separate values, and take them out of deletedDataFiles.