-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark 4.0: Refactor Spark procedures to consistently use ProcedureInput for parameter handling. #13913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 4.0: Refactor Spark procedures to consistently use ProcedureInput for parameter handling. #13913
Conversation
…t for parameter handling.
02799a1 to
bcf5ba1
Compare
dramaticlly
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the refactoring, some nitpicks
...4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
Outdated
Show resolved
Hide resolved
...k/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
Outdated
Show resolved
Hide resolved
...v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
Outdated
Show resolved
Hide resolved
...k/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
Outdated
Show resolved
Hide resolved
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
Outdated
Show resolved
Hide resolved
| Integer specId = args.isNullAt(2) ? null : args.getInt(2); | ||
| ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); | ||
| Identifier tableIdent = toIdentifier(input.asString(TABLE_PARAM), TABLE_PARAM.name()); | ||
| Boolean useCaching = input.asBoolean(USE_CACHING_PARAM, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use primitive boolean as well
...4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
Outdated
Show resolved
Hide resolved
....0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
Outdated
Show resolved
Hide resolved
...4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
Outdated
Show resolved
Hide resolved
| public Long[] asLongArray(ProcedureParameter param) { | ||
| Long[] value = asLongArray(param, null); | ||
| Preconditions.checkArgument(value != null, "Parameter '%s' is not set", param.name()); | ||
| return value; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this is not needed for this change, let's add later when the needs come
|
@dramaticlly Thank you very much for reviewing the code and for your valuable suggestions! I have adopted most of your suggestions. Regarding the suggestion to modify the field types, my consideration is that this PR mainly focuses on improving input parameter parsing. Changing the field types might require adjustments to the execution logic, so in order to maintain consistency with the original author's setup, I would prefer not to make this change. Do you think this is acceptable? |
Thanks @slfan1989 , generally I think separate changes into multiple pulls are acceptable. However for Spark, we generally want to keep the supported Spark versions in sync, so we likely will need a back port PR to have same change for Spark 3.4 and 3.5. Many PRs with back port can fanout quickly These suggested change from Boolean to boolean shall be relative straightforward, and our existing unit tests shall catch the problem with right coverage. Otherwise there's bigger problem in our procedure tests. Please let me know if this changes your mind. |
@dramaticlly Thank you for your explanation! I believe it makes sense. I will try to improve the code based on your suggestions. |
|
@nastra Could you please help review this PR? Thank you very much! This PR unifies input parameter parsing using ProcedureInput to enhance consistency and maintainability. cc: @dramaticlly |
@nastra Could you kindly review this PR? In this PR, we have standardized the input parameter parsing for all stored procedures, and going forward, the parameter parsing for all stored procedures will be consistent. Thank you very much! |
| return value; | ||
| } | ||
|
|
||
| public Long asTimestampLong(ProcedureParameter param, Long defaultValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be called asTimestampMillis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for reviewing the code. I will make the adjustments based on your suggestions.
| return args.isNullAt(ordinal) ? defaultValue : (Integer) args.getInt(ordinal); | ||
| } | ||
|
|
||
| public long asTimestampLong(ProcedureParameter param) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public long asTimestampLong(ProcedureParameter param) { | |
| public long asTimestampMillis(ProcedureParameter param) { |
| Integer retainLastNum = input.asInt(RETAIN_LAST_PARAM, null); | ||
| Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null); | ||
| boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, false); | ||
| Long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not make those return long[] instead of Long[]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your suggestion. I have adjusted the return value to long[] to maintain consistency with the original usage.
| Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, null); | ||
| boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, false); | ||
| Long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null); | ||
| boolean cleanExpiredMetadata = input.asBoolean(CLEAN_EXPIRED_METADATA_PARAM, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally speaking, we need to make sure that we don't change return types as this changes semantics. previously it was expecting a Boolean and now it gets a boolean. If the parameter wasn't defined by the caller, then we would rely on the API's default value (instead of passing true/false). That's also why the null checks further below existed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch on the boolean handling. I actually suggested @slfan1989 convert these to primitive booleans because all the procedures here use boolean parameters as explicit enablement flags, where null defaults to disabled.
This gives us a chance to clean up the if $Boolean != null pattern below when setting the corresponding flags.
Since we typically backport changes from the latest Spark version to older ones, making this change now should save us some backport work later, even though it means a bit more review effort upfront. Our unit tests should catch any behavioral changes from the semantic shift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be in favor of using boolean instead of Boolean but we do actually have cases where we want to not pass an argument to the underlying core API if the user didn't provide any input and thus rely on whatever the default behavior of the core API is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the information! From my perspective, changing it to boolean is feasible. The unit tests have all passed, and the results are as expected. Shall we go ahead and accept this improvement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to switch back to using Boolean if it was Boolean originally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we do actually have cases where we want to not pass an argument to the underlying core API if the user didn't provide any input and thus rely on whatever the default behavior of the core API is
From my understanding, I believe @nastra wanted us to keep original boxed boolean to convey user intention, as unset default of null implies not apply such flag to underlying spark action. Now we switch to false as primitive boolean, it will always pass such flag to action.
Sorry I missed this initially, let's switch to Boolean just like before this change and keep the minimal necessary change to adopt ProcedureInput. Maybe also worth a comment to convey the message as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nastra @dramaticlly Thank you for helping review the code! I have already reverted this part of the code back to Boolean.
| defaultValue); | ||
| } | ||
|
|
||
| public DeleteOrphanFiles.PrefixMismatchMode asPrefixMismatchMode(ProcedureParameter param) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should put this method here as it's only used in a single place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of the code has already been improved.
| boolean prefixListing = args.isNullAt(9) ? false : args.getBoolean(9); | ||
| boolean prefixListing = input.asBoolean(PREFIX_LISTING_PARAM, false); | ||
|
|
||
| Long finalOlderThanMillis = olderThanMillis; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this one needed? It doesn't seem that olderThanMillis is being modified here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also suggested this earlier in #13913 (comment)
| Integer specId = args.isNullAt(2) ? null : args.getInt(2); | ||
| ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args); | ||
| Identifier tableIdent = input.ident(TABLE_PARAM); | ||
| boolean useCaching = input.asBoolean(USE_CACHING_PARAM, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
semantics are being changed here
nastra
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
semantics of parameter types are being changed at a few places and we need to restore the original behavior
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
Show resolved
Hide resolved
.../v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
Outdated
Show resolved
Hide resolved
…ures/RewriteManifestsProcedure.java Spark4.0: Using the default value null. Co-authored-by: Eduard Tudenhoefner <[email protected]>
|
@nastra Thanks for reviewing and merging the code! @dramaticlly Thanks for taking the time to review it! |
…ut for parameter handling. (apache#13913)
Description
This pull request refactors the existing Spark procedures to consistently use
ProcedureInputfor parameter handling. Previously, many stored procedures manually handled parameter extraction fromInternalRow, leading to inconsistent code across the project. By adoptingProcedureInput, we improve the code uniformity and ensure that all stored procedures now use a unified approach for parameter handling.Changes:
ProcedureInputin relevant stored procedure code.ProcedureInputmethods likeasString,asLong,asInt, etc.Benefits:
ProcedureInput's built-in functions.