-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: support replace equality deletes to position deletes #2216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: support replace equality deletes to position deletes #2216
Conversation
|
Hi @rdblue @aokolnychyi @openinx, This is a draft for replacing deletes. Could you please help to take a look and check whether this is the right direction? I'd like to add another API in action to compact multiple position deletes to one. |
|
I'll try to make time this week. Thanks for working on this, @chenjunjiedada! |
| Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(tasksWithEqDelete.iterator()); | ||
|
|
||
| // Split and combine tasks under each partition | ||
| // TODO: can we split task? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can split the task based on DataFile(s) here. But that introduce another issue here, the current balance policy ( for splitting tasks ) only consider the DataFile , the idea way should be considering both insert file size and delete file size. I think there should be another separate issues to address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed an issue for this: #2298
| Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager()); | ||
|
|
||
| DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption); | ||
| List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to move the RDD chaining out of the DeleteRewriter class , so that we could reuse that class for other compute engine's ReplaceDeleteAction.
List<DeleteFile> posDeletes = taskRDD.map(deleteRewriter::toPosDeletes)
.collect()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, the DeleteRewriter is still using few other spark's class such as SparkAppenderFactory. We may need to abstract that part logics, so that we could reuse the rewrite logics between different engines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| scan.deletes().stream().anyMatch(delete -> delete.content().equals(FileContent.EQUALITY_DELETES)) | ||
| ); | ||
|
|
||
| List<DeleteFile> eqDeletes = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think the eqDeletes should better be defined as HashSet because different FileScanTask will share the same equality delete files (Though we've use the HashSet to deduplicate the same equality delete files in RewriteFiles , I still think it's better to do this before calling that API).
| CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy), | ||
| deleteSchema.asStruct()); | ||
|
|
||
| matchedRecords = CloseableIterable.concat(Lists.newArrayList(matchedRecords, Deletes.match(remainRecords, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I'm concerning it may not worth to take such a high complexity. Let's define the whole data set as S, for the first equality field ids <1,2>, the deleteSet is S1, the second equality field ids <1,2>, the deleteSet is S2, the third equality delete field ids <2,5>, the deleteSet is S3.
Finally the concat matchedRecords will be
Intersection(S, S1) UNION Intersection(( S - S1 ), S2) UNION Intersection((S - S1- S2), S3)
( Here S - S1 means it will return all elements which is in set S but not in set S1 )
though the current code will return the correct converted positional deletions , but it will iterate the big data set S three times ? This overhead will be very large...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would not iterate the data set several times since these are iterable chains and should be computed lazily.
For a filter chain of a data set of N elements with filters (F1, F2, F3, F4), suppose it will filter out (N1, N2, N3, N4) items, I think it iterates data set one time and the number of filter calls should be:
F1N timesF2(N-N1) timesF3(N-N1-N2) timesF4(N-N1-N2-N3) times
For a matching chain of a data set of N elements with filters (F1, F2, F3, F4), suppose it matches out (N1, N2, N3, N4) items, I think it iterates data set one time and the number of filter calls should be:
F12N times (filter and match)F22(N-N1) times (filter and match)F32(N-N1-N2) times (filter and match)F42(N-N1-N2-N3) times (filter and match)
@rdblue Could you please help to correct me if I am wrong?
Here is an alternative implementation that collects all delete sets in a list and does the projection in the filter. It doesn't depend on temporary iterables and looks a bit straightforward. I could change to this one if you like it.
public static <T> CloseableIterable<T> match(CloseableIterable<T> rows,
BiFunction<T, StructProjection, StructLike> rowToDeleteKey,
List<Pair<StructProjection, StructLikeSet>> unprojectedDeleteSets) {
if (unprojectedDeleteSets.isEmpty()) {
return rows;
}
EqualitySetDeleteMatcher<T> equalityFilter = new EqualitySetDeleteMatcher<>(rowToDeleteKey, unprojectedDeleteSets);
return equalityFilter.filter(rows);
}
private static class EqualitySetDeleteMatcher<T> extends Filter<T> {
private final List<Pair<StructProjection, StructLikeSet>> deleteSets;
private final BiFunction<T, StructProjection, StructLike> extractEqStruct;
protected EqualitySetDeleteMatcher(BiFunction<T, StructProjection, StructLike> extractEq,
List<Pair<StructProjection, StructLikeSet>> deleteSets) {
this.extractEqStruct = extractEq;
this.deleteSets = deleteSets;
}
@Override
protected boolean shouldKeep(T row) {
for (Pair<StructProjection, StructLikeSet> deleteSet : deleteSets) {
if (deleteSet.second().contains(extractEqStruct.apply(row, deleteSet.first()))) {
return true;
}
}
return false;
}
}PS: For the delete files with the same equality field IDs we will collect the deletes in one set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me post it first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would not iterate the data set several times since these are iterable chains and should be computed lazily.
That's incorrect, to analysis the complexity, we only need to consider the key sentence:
Deletes.match(remainRecords, record -> projectRow.wrap(asStructLike(record)), deleteSet)The final returned matchedRecords is composed by several above Iterable (s). When iterate this Iterable, we will scan all the elements in remainRecords, finally we will scan the original data set multiple times. That's why I said the complexity is too high.
|
Thanks @openinx for reviewing! I will update ASAP. |
| * @param deletesToAdd files that will be added, cannot be null or empty. | ||
| * @return this for method chaining | ||
| */ | ||
| RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we start the replacing equality deletes with position deletes, I think we need to refactor the RewriteFiles API to adjust more cases:
- Rewrite data files and remove all the delete rows. The files to delete will be a set of data files and a set of delete files, and the files to add will be a set of data files.
- Replace equality deletes with position deletes, the files to delete will be a set of equality delete files (we will need to ensure that all delete files are equality delete files ? ) , the files to add will be a set of position delete files.
- Merging small delete files into a bigger delete files. The files to delete will be a set of equality/position delete files, the files to add will be a set of equality/position delete files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me. I think we could parallelize the API refactoring and the implementation.
| return filter.filter(rows); | ||
| } | ||
|
|
||
| public static <T> CloseableIterable<T> match(CloseableIterable<T> rows, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match is not a good candidate for me to express the meaning of finding the existing row data that hits the equality delete sets. I may need a better name for this.
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The design looks good to me overall now, I think it's time to split the bigger PR into several small PRs for reviewing now. FYI @rdblue @aokolnychyi .
| try (CloseableIterator<FileScanTask> iterator = tasksIter) { | ||
| iterator.forEachRemaining(task -> { | ||
| StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition()); | ||
| if (TableScanUtil.hasDeletes(task)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the task must have at least one delete files because the Collection<FileScanTask> has been got by filtering the EQUALITY_DELETES delete files.
| deleteRowReader.close(); | ||
| deleteRowReader = null; | ||
|
|
||
| posDeleteWriter.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't have to close the posDeleteWriter here because the following complete() will close it inside automatically.
| PartitionKey key = new PartitionKey(spec, schema); | ||
| key.partition(task.first()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just pass the PartitionKey when groupTasksByPartition in ReplaceDeleteAction ? then we don't have to partition it again here , actually it's already partition value for the current task.first().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right! The original logic here has a problem, it uses the partition value for PartitionKey which should expect a data row. I updated the writer constructor to accept StructLike instead of PartitionKey to fix this. Let me update unit tests as well to cover this.
| public RewriteFiles rewriteDeletes(Set<DeleteFile> deletesToDelete, Set<DeleteFile> deletesToAdd) { | ||
| Preconditions.checkArgument(deletesToDelete != null && !deletesToDelete.isEmpty(), | ||
| "Files to delete cannot be null or empty"); | ||
| Preconditions.checkArgument(deletesToAdd != null && !deletesToAdd.isEmpty(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is incorrect, because if all the equality deletes are not hit the data files, then there will be no position delete to produce..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will suggest to add an unit test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your concern. The check is used to discard the invalid rewrite, we don't want to continue the rewrite if there is no position delete produced. Don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of rewrite is valid actually because it replace all the useless equality files to empty position delete files. After the rewrite action, the normal read path don't have to filter the useless equality deletes again, that will be a great performance improvement. So we have to submit the RewriteFiles transaction here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could see the validation in the extended RewriteFiles API here ( https://github.com/apache/iceberg/pull/2294/files#diff-b92a78b7fb207d4979d503a442189d9d096e4d19519a4b83eed9e1e779843810R68)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me! I will update then.
| } | ||
|
|
||
| private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) { | ||
| public CloseableIterable<T> matchEqDeletes(CloseableIterable<T> records) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this method is mostly the same as applyEqDeletes except for the predicate evaluation, do we want to abstract the common logic out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in this separate PR, we've abstracted them into a single method. https://github.com/apache/iceberg/pull/2320/files#diff-a6641d31cdfd66835b3447bef04be87786849126b07761e47b852837f67a988aR151
| deleteSetFilters.add(predicate); | ||
| } | ||
|
|
||
| Filter<T> findDeleteRows = new ChainOrFilter<>(deleteSetFilters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need an extra class for this? This seems to be achievable via something like
return CloseableIterable.filter(records, record ->
deleteSetFilters.stream().anyMatch(filter -> filter.test(record)));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've removed the ChainOrFilter in the committed PR #2320. The reviewed patch should have fixed your concern.
| DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption); | ||
| List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD); | ||
|
|
||
| if (!eqDeletes.isEmpty() && !posDeletes.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment "This kind of rewrite is valid actually because it replace all the useless equality files to empty position delete files" also applies here that we don't need to check for empty posDeletes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're correct !
| // update the current file for Spark's filename() function | ||
| InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); | ||
|
|
||
| return matches.matchEqDeletes(open(task, requiredSchema, idToConstant)).iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like mostly only this line differs between this class and RowDataReader that I think we can abstract a lot of the code out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The newly introduced EqualityDeleteReader should have fixed your comment: https://github.com/apache/iceberg/pull/2320/files#diff-6dc9ab9ec3abcb1972bc39e5c0f0fa95b00a822c0b8996b3c94d2dc702381fe4R34
| Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager()); | ||
|
|
||
| DeleteRewriter deleteRewriter = new DeleteRewriter(table, caseSensitive, io, encryption); | ||
| List<DeleteFile> posDeletes = deleteRewriter.toPosDeletes(taskRDD); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
@yyanyy Thanks a lot for your review! I Will update ASAP. |
|
@chenjunjiedada any update?Thanks. |
|
why this mr was closed? |
This adds a spark action to replace the equality deletes to position deletes which I think is minor compaction. The logic is:
This adds an API in
RewriteFilesto rewrite equality deletes to position deletes. It should keep the same semantic with current API that rows must be the same as before as after. This could be used to combine position deletes to reduce some small files.