-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark 3.3: support write to WAP branch #7050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionProperties.java
Outdated
Show resolved
Hide resolved
| import org.junit.Before; | ||
| import org.junit.Test; | ||
|
|
||
| public class TestPartitionedWritesToWAPBranch extends PartitionedWritesTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not add TestUnpartitionedWritesToWAPBranch, seems unnecessary to add more tests because it does not test additional case in the code path.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Outdated
Show resolved
Hide resolved
|
I'll take a look on Monday. |
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Show resolved
Hide resolved
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| // commit target in WAP is just the table name | ||
| // should use table + branch name instead for read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with this. I think it has to read the branch if it exists.
For example, a DELETE command is going to modify the state of a branch if it exists, and it could exist because branch WAP supports multiple writes. That means the reads for both the delete itself and any dynamic pruning must read the branch if it exists and main if it doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we need to use the branch during planning of the write to handle the dynamic pruning case, that's what came up in https://github.com/apache/iceberg/pull/6651/files. If we pass the branch through the SparkTable that should take care of this? Whether it's WAP branch or not doesn't matter, just needs to read the branch
.../v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWAPBranch.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| // commit target in WAP is just the table name | ||
| // should use table + branch name instead for read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we need to use the branch during planning of the write to handle the dynamic pruning case, that's what came up in https://github.com/apache/iceberg/pull/6651/files. If we pass the branch through the SparkTable that should take care of this? Whether it's WAP branch or not doesn't matter, just needs to read the branch
|
@rdblue @amogh-jahagirdar thanks for the suggestions, I have addressed all nit comments, and also fixed the scan issue and updated the tests to verify behavior of multiple writes. Could you take another look? |
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another pass @jackye1995 just a nit but this looks great to me overall!
| branch == null, | ||
| "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]", | ||
| branch, | ||
| wapBranch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this behavior is a blocker because it is strict, but I would expect to be able to write to another branch with the WAP branch set. I'm curious what other people think the long-term behavior should be.
I think this behavior does help ensure that there are no side-effects, which is good if you want people to trust the pattern. But that's undermined by enabling/disabling WAP on a per-table basis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I added issue #7103 and we can discuss there with related people.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like a reasonable starting point to me.
|
Thanks @jackye1995! Looks great. |
| return inputBranch; | ||
| } | ||
|
|
||
| boolean wapEnabled = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd prefer a separate method called wapEnabled() like we have in SparkWriteConf. Then we could use the constant for the default value and it would simplify this method.
public boolean wapEnabled() {
return confParser
.booleanConf()
.tableProperty(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED)
.defaultValue(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)
.parse();
}
| if (wapEnabled()) { | ||
| String wapId = wapId(); | ||
| String wapBranch = | ||
| confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: What about a separate method like we have for wapId()?
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late +1 from me too.
Fixes #6774
based on #6965 , separated PR for writing to WAP branch. Will rebase once that is merged.
@rdblue @aokolnychyi @amogh-jahagirdar @namrathamyske