-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Add an action to rewrite equality deletes #2364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Add an action to rewrite equality deletes #2364
Conversation
| return new RewriteDeleteActionResult(Collections.emptyList(), Collections.emptyList()); | ||
| } | ||
|
|
||
| CloseableIterable<FileScanTask> tasksWithEqDelete = CloseableIterable.filter(fileScanTasks, scan -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do the nullable check before filter this fileScanTasks ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is closeable so I think we don't have to. There is an empty check after grouping tasks. Right?
|
|
||
| @Override | ||
| protected RewriteDeletes self() { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null ? here we should return this ?
| return encryptionManager; | ||
| } | ||
|
|
||
| private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I see there are another same groupTasksByPartition in BaseRewriteDeletesSparkAction, maybe we could use the same method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean BaseRewriteDataFilesSparkAction, right?
| this.spec = table.spec(); | ||
| this.schema = table.schema(); | ||
| this.locations = table.locationProvider(); | ||
| this.caseSensitive = caseSensitive; | ||
| this.io = io; | ||
| this.encryptionManager = encryptionManager; | ||
| this.properties = table.properties(); | ||
| this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING); | ||
|
|
||
| String formatString = table.properties().getOrDefault( | ||
| TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); | ||
| this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: could we align the assignment order with the field definition order ? That helps a lot when checking all those assignment. thanks.
|
@chenjunjiedada Thanks for updating this patch, I've got the #2294 merged, that patch extends RewriteFiles API to rewrite both insert data files and delete files in iceberg. I think we could rebase this patch based on the latest commit. I will take a look at the patch again once you've rebased this. Thanks. |
97d5bec to
e6ebfc1
Compare
|
Thanks for review, @openinx ! |
Spark: add position delete row reader minor refactor use subclass of stream filter for stream selector allow delete row reader to read all deleted rows remove reading all delete row once logic implement alternative delete row reader update keepRowsFromDeletes minor refactors remove useless code
e6ebfc1 to
f4a23c1
Compare
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class ConvertEqDeletesStrategy implements RewriteDeleteStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have an abstract ConvertEqDeletesStrategy and a Spark3ConvertEqDeletesStrategy
|
|
||
| @Override | ||
| public Iterable<DeleteFile> selectDeletes() { | ||
| CloseableIterable<FileScanTask> fileScanTasks = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we simplify this block with something like the following?
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().ignoreResiduals().planFiles()) {
...
} finally {
...
}| scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) | ||
| ); | ||
|
|
||
| Set<DeleteFile> eqDeletes = Sets.newHashSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think we can do a flatMap from tasks to deletes, and then filter and use forEach(eqDeletes.add)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it seems a bit redundant that we are iterating through tasks 2 times at L119 and here, there should be a way to simplify the whole logic.
| BaseCombinedScanTask::new); | ||
| } | ||
|
|
||
| public static Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I missed some other place, I only see it used in the strategy class, why is it not a private method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the common groupTasksByPartition between BaseRewriteDataFilesAction and ConvertEqDeletesStrategy into TableScanUtil should be OK for me.
| TableScanUtil.groupTasksByPartition(table.spec(), tasksWithEqDelete.iterator()); | ||
|
|
||
| // Split and combine tasks under each partition | ||
| List<Pair<StructLike, CombinedScanTask>> combinedScanTasks = groupedTasks.entrySet().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading this, I think we can make the RewriteDeleteStrategy interface closer to RewriteStrategy interface. What we have here is basically the equivalent of planFileGroups plus rewriteFiles in RewriteStrategy. So I would propose we have the following methods in RewriteDeleteStrategy to be more aligned:
Iterable<DeleteFile> selectDeletesToRewrite(Iterable<FileScanTask> dataFiles);
Iterable<List<FileScanTask>> planDeleteGroups(Iterable<DeleteFile> deleteFiles);
Set<DeleteFile> rewriteDeletes(List<DeleteFile> deleteFilesToRewrite);
And we can get the partition StructLike directly from the list of scan tasks instead of passing it through the task pair in EqualityDeleteRewriter. In this way, we can also enable partial progress for commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm updating this PR according to the API changes, the changes of selectDeletesToRewrite and rewriteDeletes are OK to me. But Iterable<List<FileScanTask>> planDeleteGroups(Iterable<DeleteFile> deleteFiles); is a bit wired since it returns groups of List<FileScanTask>, while a FileScanTask could contains several deletes which don't exist in the deleteFiles. So I prefer to return Iterable<list<DeleteFile>>. It is worth noting that one data file could have several deletes, so we could not directly using FileScanTask to transfer the deletes. This is slightly different from the date file rewrite.
And we can get the partition StructLike directly from the list of scan tasks instead of passing it through the task pair in EqualityDeleteRewriter. In this way, we can also enable partial progress for commits.
The scan tasks in a group may belong to different partitions. So unless we group deletes by partition, it needs to know the partition values.
| } | ||
|
|
||
| public static Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition( | ||
| PartitionSpec spec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's correct to use the table latest partition spec to group the FileScanTask, because different FileScanTask many have different partition specs, the correct way is to use the FileScanTask#spec to group the tasks. We should remove the spec as an argument, otherwise it's introducing a bug...
|
|
||
| Predicate<T> isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); | ||
| isInDeleteSets.add(isInDeleteSet); | ||
| isDeleted = isDeleted == null ? record -> deleteSet.contains(projectRow.wrap(asStructLike(record))) : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initializing the isDeleted as a predicate liket->false will simplify this if-else as:
isDeleted = isDeleted.or(record -> deleteSet.contains(projectRow.wrap(asStructLike(record))));There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found I wrote in this way first but change to the current way because of the comment from Ryan, and that sounds reasonable to me. FYI bfd0aeb#r603700530.
Just reverted back.
|
|
||
| } else { | ||
| List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes); | ||
| markedRecords = CloseableIterable.transform(Deletes.streamingDeletedRowMarker(records, this::pos, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will always load the pos-deletes into in-memory HashSet even if the row count of positional files exceed the given threshold, because in this buildPosDeletePredicate, we've loaded all the file-offset into memory, I think that's not the expected behavior..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to open one by one.
| return deleteMarkerIndex; | ||
| } | ||
|
|
||
| protected abstract Consumer<T> deleteMarker(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about introducing a new interface named Setter to set the is_deleted flag (which is similar to the org.apache.iceberg.Accessor) so that we could have a good abstraction to hide the delete marker logic:
interface Setter<T> extends Serializable {
T set(T reuse);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can try to make this better abstraction in the following PR, this PR contains too many changes now. I think we will have some following-up minor changes and optimizations. Does that sound ok to you?
| .map(Predicate::negate) | ||
| .reduce(Predicate::and) | ||
| .orElse(t -> true); | ||
| Predicate<T> isDeleted = buildEqDeletePredicate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are separating the RewriteDeletes path and normal read path into two branches:
For the RewriteDeletes path, we introduced three new methods:
- keepRowsFromDeletes
- keepRowsFromEqualityDeletes
- keepRowsFromPosDeletes
For the normal read path, we introduced another three methods:
- applyEqDeletes
- applyPosDeletes
- filter
I remember there's an issue that we discussed to introduce the is_deleted meta column because we want to unify all the rewrite paths and normal read path ? ( I cannot find the specific PR now...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The related PR is : #2372
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Thanks @openinx and @jackye1995 for the detailed reviews. Let me update related PRs and will ping you guys soon. |
This is a sub-PR of #2216, it adds a spark action to replace the equality deletes to position deletes which I think is minor compaction. The logic is:
This adds an API in RewriteFiles to rewrite equality deletes to position deletes. It should keep the same semantic with the current API that rows must be the same as before as after. This may need some changes when #2294 get merged.
There are two following things: