-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark 3.3 write to branch snapshot #6651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 27 commits
9e8bf34
ee4cadb
3225506
e1dfa45
58b4bf2
8677134
af17f25
7642b9e
da9dcc0
ca8e1ff
2e4eefe
de20c76
85d7475
bbf57e3
0e081e1
51b1052
aa42e2e
03c962d
bed5ec3
332064e
6ef5f4e
8ecfdcd
6b8f954
f8b34bd
a8a5d89
7ee1689
64db07e
1b2cd5a
4c94693
2f3d6e1
9bbed3a
51a29b3
b2692fe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
| import org.apache.iceberg.DistributionMode; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.IsolationLevel; | ||
| import org.apache.iceberg.SnapshotRef; | ||
| import org.apache.iceberg.SnapshotSummary; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableProperties; | ||
|
|
@@ -324,4 +325,12 @@ public boolean caseSensitive() { | |
| .defaultValue(SQLConf.CASE_SENSITIVE().defaultValueString()) | ||
| .parse(); | ||
| } | ||
|
|
||
| public String branch() { | ||
| return confParser | ||
| .stringConf() | ||
| .option(SparkWriteOptions.BRANCH) | ||
| .defaultValue(SnapshotRef.MAIN_BRANCH) | ||
|
||
| .parse(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -52,6 +52,7 @@ | |||||||
| import org.apache.iceberg.spark.SparkReadOptions; | ||||||||
| import org.apache.iceberg.spark.SparkSchemaUtil; | ||||||||
| import org.apache.iceberg.spark.SparkUtil; | ||||||||
| import org.apache.iceberg.spark.SparkWriteOptions; | ||||||||
| import org.apache.iceberg.util.PropertyUtil; | ||||||||
| import org.apache.iceberg.util.SnapshotUtil; | ||||||||
| import org.apache.spark.sql.SparkSession; | ||||||||
|
|
@@ -250,7 +251,9 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { | |||||||
| @Override | ||||||||
| public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { | ||||||||
| Preconditions.checkArgument( | ||||||||
| snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId); | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this no longer valid? I think that we do not want to write to a specific snapshot. Is branch somehow passed as the snapshot ID?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After looking into this a bit more, I think this is incorrect. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue Can we add more checks that if the snapshot Id is the tip of the branch, then writing to branch is supported ? I believe when we do We are calling When passing the snapshotId() is getting set
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this isn't an issue. I reverted this change and ran
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue @amogh-jahagirdar if bug fix for read by snapshot ref gets merged #6717, then write to branch snapshot will fail as per test TestDeleteFrom.java That's because of the above condition. If feel we have to tweak the condition if this is going to be there.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it seems the issue is that
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @namrathamyske Yeah just updated to use the name
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But i think we can't disregard calling loadTable wrt to ref passed. Later in future when we implement session configs for testing iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java Line 260 in 32a8ef5
iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java Line 424 in 32a8ef5
iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java Line 393 in 32a8ef5
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point @namrathamyske , I was a bit short sighted we actually do want to leverage the statistics for the specific snapshot for writes. These statistics would be used during the scan itself (for example MERGE INTO branch) . So either we 1.) seek a good way to differentiate between a time travel query where the write shouldn't be able to be applied and an intentional write on a branch or 2.) we just relax the check that a snapshot is set as you did earlier.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue @amogh-jahagirdar @jackye1995 this is still an open item for this PR get merged. I would prefer to go with second option. But let me know otherwise! |
||||||||
| snapshotId == null || info.options().get(SparkWriteOptions.BRANCH) != null, | ||||||||
|
||||||||
| "Cannot write to table at a specific snapshot: %s", | ||||||||
| snapshotId); | ||||||||
|
|
||||||||
namrathamyske marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
| return new SparkWriteBuilder(sparkSession(), icebergTable, info); | ||||||||
| } | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ | |
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.Snapshot; | ||
| import org.apache.iceberg.SnapshotRef; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.TableProperties; | ||
| import org.apache.iceberg.avro.Avro; | ||
|
|
@@ -44,13 +45,15 @@ | |
| import org.apache.iceberg.io.FileAppender; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.spark.SparkReadOptions; | ||
| import org.apache.iceberg.spark.SparkSQLProperties; | ||
| import org.apache.iceberg.spark.SparkSchemaUtil; | ||
| import org.apache.iceberg.spark.SparkWriteOptions; | ||
| import org.apache.iceberg.spark.data.AvroDataTest; | ||
| import org.apache.iceberg.spark.data.RandomData; | ||
| import org.apache.iceberg.spark.data.SparkAvroReader; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.util.SnapshotUtil; | ||
| import org.apache.spark.SparkException; | ||
| import org.apache.spark.TaskContext; | ||
| import org.apache.spark.api.java.JavaRDD; | ||
|
|
@@ -156,6 +159,18 @@ public void testWriteWithCustomDataLocation() throws IOException { | |
| writeAndValidateWithLocations(table, location, tablePropertyDataLocation); | ||
| } | ||
|
|
||
| @Test | ||
| public void testBranchWriteWithCustomDataLocation() throws IOException { | ||
| File location = createTableFolder(); | ||
| File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir"); | ||
| Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location); | ||
| table | ||
| .updateProperties() | ||
| .set(TableProperties.WRITE_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()) | ||
| .commit(); | ||
| writeAndValidateWithLocations(table, location, tablePropertyDataLocation, "test-branch"); | ||
| } | ||
|
|
||
| private File createTableFolder() throws IOException { | ||
| File parent = temp.newFolder("parquet"); | ||
| File location = new File(parent, "test"); | ||
|
|
@@ -170,16 +185,21 @@ private Table createTable(Schema schema, File location) { | |
|
|
||
| private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) | ||
| throws IOException { | ||
| writeAndValidateWithLocations(table, location, expectedDataDir, SnapshotRef.MAIN_BRANCH); | ||
| } | ||
|
|
||
| private void writeAndValidateWithLocations( | ||
| Table table, File location, File expectedDataDir, String branch) throws IOException { | ||
| Schema tableSchema = table.schema(); // use the table schema because ids are reassigned | ||
|
|
||
| table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); | ||
|
|
||
| Iterable<Record> expected = RandomData.generate(tableSchema, 100, 0L); | ||
| writeData(expected, tableSchema, location.toString()); | ||
| writeData(expected, tableSchema, location.toString(), branch); | ||
|
|
||
| table.refresh(); | ||
|
|
||
| List<Row> actual = readTable(location.toString()); | ||
| List<Row> actual = readTable(location.toString(), branch); | ||
|
|
||
| Iterator<Record> expectedIter = expected.iterator(); | ||
| Iterator<Row> actualIter = actual.iterator(); | ||
|
|
@@ -189,8 +209,7 @@ private void writeAndValidateWithLocations(Table table, File location, File expe | |
| Assert.assertEquals( | ||
| "Both iterators should be exhausted", expectedIter.hasNext(), actualIter.hasNext()); | ||
|
|
||
| table | ||
| .currentSnapshot() | ||
| SnapshotUtil.latestSnapshot(table, branch) | ||
| .addedDataFiles(table.io()) | ||
| .forEach( | ||
| dataFile -> | ||
|
|
@@ -204,15 +223,26 @@ private void writeAndValidateWithLocations(Table table, File location, File expe | |
| } | ||
|
|
||
| private List<Row> readTable(String location) { | ||
| Dataset<Row> result = spark.read().format("iceberg").load(location); | ||
| return readTable(location, SnapshotRef.MAIN_BRANCH); | ||
| } | ||
|
|
||
| private List<Row> readTable(String location, String branch) { | ||
| Dataset<Row> result = | ||
| spark.read().format("iceberg").option(SparkReadOptions.BRANCH, branch).load(location); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: In tests, I'd generally prefer not setting the branch option when we intend to write to main. Otherwise we're not testing the default case. I know that it is currently equivalent, but it seems like a gap that could eventually introduce errors if all of our tests use a specific branch.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, is it alright if I get this in a follow on PR before release? That way the PR is focused on how we want to organize branch tests which I think is a more fundamental thing |
||
|
|
||
| return result.collectAsList(); | ||
| } | ||
|
|
||
| private void writeData(Iterable<Record> records, Schema schema, String location) | ||
| throws IOException { | ||
| writeData(records, schema, location, SnapshotRef.MAIN_BRANCH); | ||
| } | ||
|
|
||
| private void writeData(Iterable<Record> records, Schema schema, String location, String branch) | ||
| throws IOException { | ||
| Dataset<Row> df = createDataset(records, schema); | ||
| DataFrameWriter<?> writer = df.write().format("iceberg").mode("append"); | ||
| DataFrameWriter<?> writer = | ||
| df.write().format("iceberg").option(SparkWriteOptions.BRANCH, branch).mode("append"); | ||
| writer.save(location); | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@namrathamyske @rdblue @aokolnychyi @jackye1995 I'm removing this check because this prevents writing to new branches. Catalog#loadTable gets called in spark when planning the write, and we fail the validation check that the branch snapshot exists. I added a test to validate that if a read on an invalid branch is performed we still fail (albeit later, when trying to build the scan).