-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction #7565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.4: Fixup for RewritePositionDeleteFilesSparkAction #7565
Conversation
...rk/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
Outdated
Show resolved
Hide resolved
| fileGroupsByPartition.put(partition, groups); | ||
| } | ||
| }); | ||
| StructLikeMap<List<PositionDeletesScanTask>> filesPerPartition = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: filesPerPartition -> filesByPartition to match other variables?
I also wonder whether we should call it tasksByPartition but that is up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly its a bit confusing. The API's refer to files(), FileGroups, etc but the class is FileTask. At some point we will need to change from task to file terminology. I end up calling them fileTasks, not sure its the best way.
| StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition = | ||
| StructLikeMap.create(partitionType); | ||
|
|
||
| filesByPartition.forEach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: It seems we would benefit from an easy way to transform StructLikeMap values. It is used here and below when we construct the context. Would adding something like this to StructLikeMap be an overkill?
public <U> StructLikeMap<U> transformValues(Function<T, U> func) {
StructLikeMap<U> map = create(type);
for (Entry<StructLikeWrapper, T> entry : wrapperMap.entrySet()) {
U newValue = func.apply(entry.getValue());
map.put(entry.getKey().get(), newValue);
}
return map;
}
Then the entire method fileGroupsByPartition can be expressed in one line:
tasksByPartition.transformValues(this::planFileGroups);
...
private List<List<PositionDeletesScanTask>> planFileGroups(List<PositionDeletesScanTask> tasks) {
return ImmutableList.copyOf(rewriter.planFileGroups(tasks));
}
And RewriteExuctionContext would call:
this.numGroupsByPartition = groupsByPartition.transformValues(List::size);
We would need to filter out empty file group lists but we can do that in toGroupStream. I am not sure it is worth it so I leave it up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea, implemented it. I was wondering initially as its not the pattern of the static method like CloseableIterable.transform(), but I think putting on instance level makes the code shorter and even more functional and composable.
...rk/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
Outdated
Show resolved
Hide resolved
...rk/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| public <U> StructLikeMap<U> transformValues(Function<T, U> func) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your idea of making this a static method also makes sense to me. It would be more in line with Guava and other similar libs. If you decide to make it static, keep in mind the ordering of methods. Static methods should be right after the constructor.
public static <T, U> StructLikeMap<U> transformValues(StructLikeMap<T> map, Function<T, U> func) {
...
}
| } | ||
| } | ||
|
|
||
| private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: A note to the above snippet. What about getting rid of commitManager var and just calling the method directly?
if (partialProgressEnabled) {
return doExecuteWithPartialProgress(ctx, groupStream, commitManager());
} else {
return doExecute(ctx, groupStream, commitManager());
}
|
|
||
| return CloseableIterable.transform( | ||
| deletesTable.newBatchScan().ignoreResiduals().planFiles(), | ||
| t -> (PositionDeletesScanTask) t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: t -> task?
| t -> (PositionDeletesScanTask) t); | ||
| } | ||
|
|
||
| private StructLikeMap<List<PositionDeletesScanTask>> filesByPartition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: What about calling it groupByPartition?
...rk/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
Show resolved
Hide resolved
...rk/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java
Show resolved
Hide resolved
| return scanGroups.stream().map(tasks -> newRewriteGroup(ctx, partition, tasks)); | ||
| }); | ||
|
|
||
| return rewriteFileGroupStream.sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: What about getting rid of the temp var completely?
return groupsByPartition.entrySet().stream()
.filter(...)
.flatMap(...)
.sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder));
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. I like your idea of having transformValues as static but it is up to you. Please, merge whenever you are ready.
Could you also mirror these changes to the action for rewriting data files, @szehon-ho?
|
Made #7625 to mirror changes in RewriteDataFilesSparkAction |
Fixup for #7389
Ref: #7389 (review)