-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Refactor action for expiring snapshots #2314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@RussellSpitzer @rdblue @rymurr @shardulm94 @karuppayya @flyrain, could you please review? |
| public ExpireSnapshotsAction expireSnapshots() { | ||
| return new ExpireSnapshotsAction(spark, table); | ||
| BaseExpireSnapshotsSparkAction delegate = new BaseExpireSnapshotsSparkAction(spark, table); | ||
| return new ExpireSnapshotsAction(delegate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we still use ExpireSnapshotsAction since it is deprecated in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a user-facing API so we cannot break it without deprecating first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about the Iceberg deprecating process. Here is my understanding, we need to create a new method with new return type for this user-facing API, which can be used by user moving forward. Meanwhile we mark this method deprecated so that user won't use it anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah, we will mark Actions deprecated once we have a new alternative. We will have a totally new entry point called SparkActions in another package. Right now, I am migrating action by action to reduce the amount of changes in a single pr. Once all actions are done, I'll create a new public API for users to use and deprecate the rest.
| import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; | ||
|
|
||
| abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> { | ||
| public abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need it to be public? Looks like there is no scope change needed in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporarily yes since we are using it in another package. Once we get rid of the old actions, we should be able to move this and make it non-public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is public but will be removed, we should mark it deprecated so people know not to rely on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class will be used even after refactoring. It may be moved but to another package, though.
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.spark.actions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The package here is important. I am going to add other actions here too.
flyrain
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
|
||
| package org.apache.iceberg.actions; | ||
|
|
||
| public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm grateful that the ExpireSnapshotsAction refactoring work has been considered to work for different compute engines, such as flink. In this way, we flink don't have to abstract the common logic of actions between flink and spark and we could just extend those BaseXXAction to implement our flink own actions. That's really important !
About the ExpireSnapshotsAction, I think both flink & spark will have the same Result. So maybe we could just use the ExpireSnapshotsActionResult directly ( I don't think there will be a reason that flink or spark will extend this BaseExpireSnapshotsActionResult, so maybe we could just remove the Base prefix).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I followed the naming we have in core like BaseFile, BaseTable, BaseRewriteManifests.
Also, there is a class called ExpireSnapshotsActionResult in this package in Spark already. We deprecate it with the introduction of this new one.
I agree query engines will most likely use the same result classes (unless they are doing something really specific).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about making ExpireSnapshots.Result a class instead of an interface? Then we would always have the same default implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine converting Result into a class but I am not sure we will gain much by doing that. The current class is in the core module so it is accessible to everyone. Making Result a class may make inheritance a bit harder. I do like our current pattern of having BaseXXX classes in core and interfaces in the API module.
|
|
||
| public class BaseExpireSnapshotsActionResult implements ExpireSnapshots.Result { | ||
|
|
||
| private final long deletedDataFilesCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we don't consider the format v2 in this patch, right ? That's OK if true because we could support v2 in a separate issue. For the RewriteDataFilesAction, now @chenjunjiedada and I have prepared few PRs to support format v2's Rewrite actions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you are correct. I keep the scope of this PR to the existing logic.
@RussellSpitzer is also working on redesigning the rewrite data files action to also support sort-based compactions and per partition compactions. Shall 4 of us meet next week to discuss this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be easy to extend the interface and this implementation to include other counts once we support them.
| * Deletes are still performed locally after retrieving the results from the Spark executors. | ||
| */ | ||
| @SuppressWarnings("UnnecessaryAnonymousClass") | ||
| public class BaseExpireSnapshotsSparkAction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, seems we will still need to abstract the expireSnapshotAction core logic for flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, seems like a lot of this is not spark specific. Coudl we have a more generic BaseExpireSnapshotsAction that this class extends?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it is a good idea to build a common hierarchy for different query engines. We tried that in the past but it led to a really weird situation with our Spark actions now. For example, we can no longer use our BaseSparkAction in some cases due to single class inheritance in Java. So some Spark actions extend that some don't. That forced us to create more static methods in places where we don't need them. There will be more problems like this, I guess.
I agree about sharing code wherever possible, though. I'd prefer to do that using utility classes instead of building a common hierarchy. Refactoring common code into utility classes is beyond this PR. I tried to basically create a new action while keeping the backward compatibility. Also, designing utility classes seems easier when we know what parts Flink can reuse. This action heavily depends on Spark Row and metadata tables, for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @aokolnychyi 's reasoning here as I had to do a lot of those static methods as some of our actions could descend from the base class and other couldn't, I'd prefer each frame work be allowed to have it's own base class while shared logic belongs to a class in the "core module" and either is accessed through composition or pure functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea a lot more. On reflection I can definitely see sharing a base class across engines could be an issue. Sharing through composition and util classes makes a lot of sense to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for utility classes rather than using the class hierarchy.
spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
Outdated
Show resolved
Hide resolved
|
Overall, this looks good to me. It might be a good idea to make the |
4ff3949 to
d26f83d
Compare
d26f83d to
a7b27ff
Compare
|
Thanks everyone for reviewing! There is one open question about converting |
This PR refactors the action for expiring snapshots and moves it to the new API.