-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Improve performance of expire snapshot by not double-scanning retained Snapshots #3457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Improve performance of expire snapshot by not double-scanning retained Snapshots #3457
Conversation
415c210 to
f52ef9c
Compare
|
I'm going to try to do some simple benchmarks to validate it improves the perf, but putting the idea out here for any early feedback |
| */ | ||
| public static List<String> manifestListLocations(Table table, Set<Long> snapshotIds) { | ||
| Iterable<Snapshot> snapshots = table.snapshots(); | ||
| return StreamSupport.stream(snapshots.spliterator(), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment to clarify the false argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| return StreamSupport.stream(snapshots.spliterator(), false) | ||
| .filter(s -> snapshotIds.contains(s.snapshotId())) | ||
| .filter(s -> s.manifestListLocation() != null) | ||
| .map(Snapshot::manifestListLocation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of calling manifestListLocation twice, why not map and then filter using Objects.nonNull?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, thanks
| expireSnapshots = expireSnapshots.retainLast(retainLastValue); | ||
| } | ||
|
|
||
| List<Snapshot> expired = expireSnapshots.apply(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no guarantee that apply followed by commit will produce the same result, so this is unsafe. It may work most of the time, but there will probably be leaks when expired here doesn't contain a snapshot that was actually removed. That's why we always refresh and compare against the version that was actually committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, changed.
| }; | ||
|
|
||
| private final Set<Long> expiredSnapshotIds = Sets.newHashSet(); | ||
| private final Set<Long> snapshotIdsToExpire = Sets.newHashSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is more correct, but I don't think it's worth making this change larger. I would just use a removedSnapshotIds variable later to avoid the name conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, reverted
| } | ||
|
|
||
| List<Snapshot> expired = expireSnapshots.apply(); | ||
| Set<Long> expiredSnapshotIds = expired.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be done by diffing the snapshot ID sets between metadata versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
I think this is actually promising, the performance gain is in line with expectation. Test: table with 1000 small snapshots, expire 1 at a time. The time and resources spent in expire-snapshot and input/shuffle sizes falls by little less than half, as most snapshots are not double-scanned. So I think it's worth to continue. @rdblue thanks for the early review, I will look at the comments and make the changes |
|
Patch should be ready for more review |
e632307 to
5490d42
Compare
|
@szehon-ho sorry for not getting back to this sooner, let's finish this up when you are back online |
|
I'm not sure if people think these changes are too hacky. Another option I've thought, is to implement IncrementalScan (#4580) for All_files table (to be added in #4694), which will allow snapshot filtering. Then rewrite ExpireSnapshotAction to use that table, with different filters to avoid double-scanning all_files. It would be much cleaner that way, but a bigger refactor (and would make #4674 much harder if we go that route) I wonder was there was some reason initially to use all_manifests table and flatmap the ReadManifest task, rather than relying on all_files table? @rdblue @aokolnychyi for any thoughts/information. |
|
Actually I think I get why, all_files table does not parallelize the planning (reading each snapshot in spark task), so maybe better to keep this way (all_manifests table and ReadManifest task). Will take a look to see if this can be cleaned up in another way. |
c27cac2 to
8232c8b
Compare
|
@szehon-ho, the reason why we use |
|
@RussellSpitzer @aokolnychyi Rebased the pr , it's still using the manual way to filter out snapshots from all_manifest table, if you guys have time to take a look. The idea I was mentioning in above few comments to use manifest table with snapshot filtering via time-travel (to make it cleaner), I tried to implement in : #4736 , I'd like to see your thoughts if that is a better approach. The problem there is I stopped when I realized manifest table do not support time-travel, and it didn't look trivial to implement. Also FYI @ajantha-bhat as we were talking about this on #4674 |
|
@rdblue yea thanks, I realized it after asking. |
8d3c706 to
222423e
Compare
|
Ref : discussion on #4736. This implements the original idea to reduce the 'deleteCandidate' scan to just the ones from deleted snapshots. There is another idea to remove current manifests during the delete candidate scan as well, not done yet. |
bf44a0d to
343fa89
Compare
| Set<Long> retainedSnapshots = | ||
| updatedTable.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); | ||
| Dataset<Row> validFiles = buildValidFileDF(updatedTable); | ||
| Dataset<Row> deleteCandidateFiles = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I correct that we are trying to build the reachability set of expired snapshots? Would it be easier to write this logic in terms of expired snapshots instead of retained snapshots? Right now, we pass snapshots to ignore, which takes a bit of time to wrap your head around. Would it be easier if we passed snapshots we are looking for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it makes sense to me, can make the change.
I had the optimization in the end to pass the lesser of the two sets to Spark , with either 'in' or 'not in', but can still do it the other way.
| private Dataset<Row> buildFilteredValidDataDF( | ||
| TableMetadata metadata, Set<Long> snapshotsToExclude) { | ||
| Table staticTable = newStaticTable(metadata, table.io()); | ||
| return buildValidContentFileWithTypeDF(staticTable, snapshotsToExclude) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two questions.
- Will we scan
ALL_MANIFESTStwice? Once for content files and once for manifests? - Will we open extra manifests when building the reachability set for expired snapshots? I don't think the current implementation removes any manifests from the expired set that are currently in the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yes, I think there was a proposal for that by @ajantha-bhat Spark-3.2: Avoid duplicate computation of ALL_MANIFESTS metadata table for spark actions #4674, we werent sure that caching is worth the cost (it may involves an extra read-write)
- Yes, I think I gave it a try but decided to punt for now as it always involves an extra Spark join. (the all-manifests already being in a dataset). And so not always beneficial in all cases. For example in my test, where each manifest is a small number of data files, it is more expensive. But of course there are other cases where a manifest has many data files where it helps.
For these maybe we can do make it configurable?
This change only has the changes I see will not hurt performance but only help it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think there was a proposal for that by @ajantha-bhat #4674, we werent sure that caching is worth the cost (it may involves an extra read-write)
I didn't find so much performance difference when tested locally. Hence, I didn't proceed further on #4674. May be I need to Test in S3 to really see the benefit of avoiding IO scans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about something like this:
- Cache/persist a dataset of unique manifests in still valid snapshots (via
all_manifests) - Cache/persist a dataset of unique manifests in expired snapshots (via
all_manifests) - Find expired manifests that are no longer referenced by still valid snapshots (anti-join)
- Read expired manifests to build a dataset of content files referenced by expired manifests
- Read still valid manifests to build a dataset of still valid content files
- Find expired content files (anti-join)
- Find expired manifest lists
- Union datasets for different expired results
If we do it this way, we read all_manifests table only once and also skip still live manifests when looking for expired content files. The extra shuffle is a valid concern but I think we can avoid it. Right now, we are doing a round-robin partitioning when building unique manifests. Instead, we can assign a deterministic hash partitioning to both datasets to avoid a shuffle during anti-joins. Spark won't shuffle datasets if they are partitioned in the same way. We are shuffling those datasets anyway as we deduplicate manifests before opening them up and then have an extra round-robin shuffle to distribute the load. I feel we can just use a deterministic hash partitioning instead. We will have to set an explicit number of shuffle partitions to prevent AQE from coalescing tasks.
@karuppayya is also working on some benchmarks for actions, which would be really handy to evaluate the impact here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts, @szehon-ho @RussellSpitzer @rdblue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've kinda been thinking we probably don't need the anti-joins distributed (mostly we just want distributed IO) for this operation but I think all of your logic makes sense here. if I think this through we have
ALL_MANIFESTS broken into 3 subsets
oldManifests: [Manifests only used in Expired Snapshots ]
commonManifests: [ Manifests used in both expired and unexpired Snapshots ]
newManifests: [ Manifests only used in unexpired-snapshots ]
To build this we read allManifests
expiredManifests = allManifests where snapshot is in snapshots being expired
unexpiredManifests = allManifests where snapshots is not in snapshots being expired
oldManifests = expiredManifests -- unexpiredManifests
newManifests = unexpiredManifests -- expiredManifests
We read all the manifests in oldManifests and newManifests
oldManifests -> flatmap read manifest and dedupe -> oldFiles
newManifests -> flatmap read manifest and dedupe -> newFiles
filesToDelete == (oldFiles -- newFiles) + oldManifests + oldManifestLists
Did I get that right?
51b3802 to
93f637b
Compare
| * @return the location of manifest Lists | ||
| */ | ||
| public static List<String> manifestListLocations(Table table, Set<Long> snapshots) { | ||
| Stream<Snapshot> snapshotStream = StreamSupport.stream(table.snapshots().spliterator(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: rather than using Java's spliterator method and streams, we generally prefer to use Iterables.filter and Iterables.transform or a simple for loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, used Iterables.filter and restored the old for loop
|
|
||
| protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshots) { | ||
| Broadcast<Table> tableBroadcast = | ||
| sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: this was broken out into two lines (L138-L139) before. It would be good to keep it the same way to reduce churn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| protected Dataset<Row> filterAllManifests(Dataset<Row> allManifestDF, Set<Long> snapshots) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: snapshots would be more clear if it were named snapshotIds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored snapshots -> snapshotIds in these files
| protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshots) { | ||
| Dataset<Row> allManifests = loadMetadataTable(table, ALL_MANIFESTS); | ||
| if (snapshots != null) { | ||
| allManifests = filterAllManifests(allManifests, snapshots); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: preserving the use of all makes this awkward after filtering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, renamed.
| testExpireFilesAreDeleted(2, 5); | ||
| } | ||
|
|
||
| public void testExpireFilesAreDeleted(int dataFilesExpired, int dataFilesRetained) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test is okay, but I think that it doesn't really test cases where the retained and deleted files are mixed together. That deserves a direct test, where the deleteCandidateFileDS actually contains files in the validFileDS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, added test below
| } | ||
|
|
||
| public static Set<String> reachableManifestPaths(Table table) { | ||
| return StreamSupport.stream(table.snapshots().spliterator(), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's actually a method in ManifestFiles that more efficiently returns just the paths if you want to use it. @amogh-jahagirdar uses it in the non-Spark reachable file implementation: #5669 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I miss something, but looks like that is for data files of a manifest, while this is for listing manifests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it's for listing live data file paths for a given manifest.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me other than a few minor things. It would be great to get this in!
93f637b to
1109ae5
Compare
1109ae5 to
aa2630a
Compare
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking at this! Addressed the comments
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Outdated
Show resolved
Hide resolved
| testExpireFilesAreDeleted(2, 5); | ||
| } | ||
|
|
||
| public void testExpireFilesAreDeleted(int dataFilesExpired, int dataFilesRetained) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, added test below
| } | ||
|
|
||
| public static Set<String> reachableManifestPaths(Table table) { | ||
| return StreamSupport.stream(table.snapshots().spliterator(), false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I miss something, but looks like that is for data files of a manifest, while this is for listing manifests?
| protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshots) { | ||
| Dataset<Row> allManifests = loadMetadataTable(table, ALL_MANIFESTS); | ||
| if (snapshots != null) { | ||
| allManifests = filterAllManifests(allManifests, snapshots); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, renamed.
| } | ||
| } | ||
|
|
||
| protected Dataset<Row> filterAllManifests(Dataset<Row> allManifestDF, Set<Long> snapshots) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored snapshots -> snapshotIds in these files
|
|
||
| protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshots) { | ||
| Broadcast<Table> tableBroadcast = | ||
| sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted
| * @return the location of manifest Lists | ||
| */ | ||
| public static List<String> manifestListLocations(Table table, Set<Long> snapshots) { | ||
| Stream<Snapshot> snapshotStream = StreamSupport.stream(table.snapshots().spliterator(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, used Iterables.filter and restored the old for loop
|
I'd love to take a quick look today too! |
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks great. I left just a few suggestions.
| * @return the location of manifest Lists | ||
| */ | ||
| public static List<String> manifestListLocations(Table table, Set<Long> snapshotIds) { | ||
| Iterable<Snapshot> tableSnapshots = table.snapshots(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there a particular reason for calling it tableSnapshots instead of just snapshots?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
| * | ||
| * @param table table for which manifestList needs to be fetched | ||
| * @param snapshotIds ids of snapshots for which manifest lists will be returned | ||
| * @return the location of manifest Lists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Lists -> lists?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was a typo from the earlier method, changed on both.
| } | ||
|
|
||
| protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) { | ||
| Broadcast<Table> tableBroadcast = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd prefer to keep the temp var so that we don't have to split the statement on multiple lines.
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, missed this when changing spark 3.3 version.
| Dataset<ManifestFileBean> allManifests = | ||
| loadMetadataTable(table, ALL_MANIFESTS) | ||
| Dataset<Row> allManifests = loadMetadataTable(table, ALL_MANIFESTS); | ||
| if (snapshotIds != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we have this piece that loads ALL_MANIFESTS with optional filtering in quite a few places. What about refactoring the entire logic into a separate method?
We already have manifestDS, so we can simply add manifestDF.
private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
if (snapshotIds != null) {
Column filterCond = col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
return manifestDF.filter(filterCond);
} else {
return manifestDF;
}
}
Then we don't need filterAllManifests and can simply replace loadMetadataTable calls with manifestDF.
protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
return manifestDF(table, snapshotIds)
.select(col("path"), lit(MANIFEST).as("type"))
.as(FileInfo.ENCODER);
}
This should allow us to reduce the number of changes to minimum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, made new method and changed two methods to use it.
| Dataset<FileInfo> originalFileDS = validFileDS(ops.current()); | ||
|
|
||
| // Save old metadata | ||
| TableMetadata originalTable = ops.current(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: What about calling it originalMetadata since it is TableMetadata, not Table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| Dataset<FileInfo> validFileDS = validFileDS(ops.refresh()); | ||
| // determine expired files | ||
| TableMetadata updatedTable = ops.refresh(); | ||
| Set<Long> retainedSnapshots = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding findExpiredSnapshotIds that would accept originalMetadata, updatedMetadata?
// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();
Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);
// fetch files referenced by expired snapshots
Set<Long> expiredSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata);
Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata, expiredSnapshotIds);
// determine expired files
this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| // fetch metadata after expiration | ||
| Dataset<FileInfo> validFileDS = validFileDS(ops.refresh()); | ||
| // determine expired files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We have the same comment determine expired files on different blocks now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed with your other code suggestion
| // fetch metadata before expiration | ||
| Dataset<FileInfo> originalFileDS = validFileDS(ops.current()); | ||
|
|
||
| // Save old metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is the comment change deliberate? The old comment seems OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted
|
|
||
| private Dataset<FileInfo> validFileDS(TableMetadata metadata) { | ||
| private Dataset<FileInfo> fileDS(TableMetadata metadata) { | ||
| Table staticTable = newStaticTable(metadata, table.io()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simply call fileDS(metadata, null)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, changed
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
Show resolved
Hide resolved
ae7f255 to
b3d37b4
Compare
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I think we have an extra method left in BaseSparkAction in 3.3.
Otherwise, looks ready to go. Thanks, @szehon-ho!
| } | ||
| } | ||
|
|
||
| protected Dataset<Row> filterAllManifests(Dataset<Row> allManifestDF, Set<Long> snapshotIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, removed
| Dataset<ManifestFileBean> allManifests = | ||
| loadMetadataTable(table, ALL_MANIFESTS) | ||
| Dataset<Row> manifests = manifestDF(table, snapshotIds); | ||
| Dataset<ManifestFileBean> manifestsBean = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Most other vars in this class use xxxDF for Dataset<Row> or xxxDS for Dataset of a specific type.
Dataset<ManifestFileBean> manifestBeanDS =
manifestDF(table, snapshotIds)
.select(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed var name and inlined the previous var as suggested.
| Dataset<FileInfo> validFileDS = fileDS(updatedMetadata); | ||
|
|
||
| // fetch files referenced by expired snapshots | ||
| Set<Long> removedSnapshotIds = findExpiredSnapshotIds(originalMetadata, updatedMetadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed this variable name doesnt match spark 3.2, will change it. (Initially it was 'expiredSnapshotIds' but it hides the member variable name and so I made last minute change).
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. Excited to see how it is going to perform. Thanks, @szehon-ho!


Expire snapshots can take a long time for large tables with millions of files and thousands of snapshots/manifests.
One cause is the calculation of files to be deleted. The current algorithm is:
But this explores every retained snapshot twice. Example: any periodic expire-snapshot job that expires 1 snapshot needs to explore all n-1 snapshots twice.
Proposal:
Implementation: For expired-snapshot scan, change the original spark query of metadata tables to custom spark jobs that only explore from expired snapshot(s).
Note: The new expired-snapshot scan duplicates manifestList scan logic to handle "write.manifest-lists.enabled"="false" flag, but unfortunately the functionality seems broken without this change and so not possible to test currently. Added a test for demonstration purpose.