Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand All @@ -55,6 +56,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.SparkException;
Expand Down Expand Up @@ -1066,6 +1068,73 @@ public void testDeleteWithMultipleSpecs() {
sql("SELECT * FROM %s ORDER BY id", selectTarget()));
}

@Test
public void testDeleteToWapBranch() throws NoSuchTableException {
Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);

createAndInitPartitionedTable();
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() -> {
sql("DELETE FROM %s t WHERE id=0", tableName);
Assert.assertEquals(
"Should have expected num of rows when reading table",
2L,
spark.table(tableName).count());
Assert.assertEquals(
"Should have expected num of rows when reading WAP branch",
2L,
spark.table(tableName + ".branch_wap").count());
Assert.assertEquals(
"Should not modify main branch", 3L, spark.table(tableName + ".branch_main").count());
});

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() -> {
sql("DELETE FROM %s t WHERE id=1", tableName);
Assert.assertEquals(
"Should have expected num of rows when reading table with multiple writes",
1L,
spark.table(tableName).count());
Assert.assertEquals(
"Should have expected num of rows when reading WAP branch with multiple writes",
1L,
spark.table(tableName + ".branch_wap").count());
Assert.assertEquals(
"Should not modify main branch with multiple writes",
3L,
spark.table(tableName + ".branch_main").count());
});
}

@Test
public void testDeleteToWapBranchWithTableBranchIdentifier() throws NoSuchTableException {
Assume.assumeTrue("Test must have branch name part in table identifier", branch != null);

createAndInitPartitionedTable();
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
createBranchIfNeeded();

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() ->
Assertions.assertThatThrownBy(() -> sql("DELETE FROM %s t WHERE id=0", commitTarget()))
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
branch)));
}

// TODO: multiple stripes for ORC

protected void createAndInitPartitionedTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.SparkException;
import org.apache.spark.sql.AnalysisException;
Expand Down Expand Up @@ -2448,6 +2450,96 @@ public void testMergeNonExistingBranch() {
.hasMessage("Cannot use branch (does not exist): test");
}

@Test
public void testMergeToWapBranch() {
Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);

createAndInitTable("id INT", "{\"id\": -1}");
ImmutableList<Object[]> originalRows = ImmutableList.of(row(-1));
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
spark.range(0, 5).coalesce(1).createOrReplaceTempView("source");
ImmutableList<Object[]> expectedRows =
ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4));

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() -> {
sql(
"MERGE INTO %s t USING source s ON t.id = s.id "
+ "WHEN MATCHED THEN UPDATE SET *"
+ "WHEN NOT MATCHED THEN INSERT *",
tableName);
assertEquals(
"Should have expected rows when reading table",
expectedRows,
sql("SELECT * FROM %s ORDER BY id", tableName));
assertEquals(
"Should have expected rows when reading WAP branch",
expectedRows,
sql("SELECT * FROM %s.branch_wap ORDER BY id", tableName));
assertEquals(
"Should not modify main branch",
originalRows,
sql("SELECT * FROM %s.branch_main ORDER BY id", tableName));
});

spark.range(3, 6).coalesce(1).createOrReplaceTempView("source2");
ImmutableList<Object[]> expectedRows2 =
ImmutableList.of(row(-1), row(0), row(1), row(2), row(5));
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() -> {
sql(
"MERGE INTO %s t USING source2 s ON t.id = s.id "
+ "WHEN MATCHED THEN DELETE "
+ "WHEN NOT MATCHED THEN INSERT *",
tableName);
assertEquals(
"Should have expected rows when reading table with multiple writes",
expectedRows2,
sql("SELECT * FROM %s ORDER BY id", tableName));
assertEquals(
"Should have expected rows when reading WAP branch with multiple writes",
expectedRows2,
sql("SELECT * FROM %s.branch_wap ORDER BY id", tableName));
assertEquals(
"Should not modify main branch with multiple writes",
originalRows,
sql("SELECT * FROM %s.branch_main ORDER BY id", tableName));
});
}

@Test
public void testMergeToWapBranchWithTableBranchIdentifier() {
Assume.assumeTrue("Test must have branch name part in table identifier", branch != null);

createAndInitTable("id INT", "{\"id\": -1}");
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
spark.range(0, 5).coalesce(1).createOrReplaceTempView("source");
ImmutableList<Object[]> expectedRows =
ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4));

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() ->
Assertions.assertThatThrownBy(
() ->
sql(
"MERGE INTO %s t USING source s ON t.id = s.id "
+ "WHEN MATCHED THEN UPDATE SET *"
+ "WHEN NOT MATCHED THEN INSERT *",
commitTarget()))
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
branch)));
}

private void checkJoinAndFilterConditions(String query, String join, String icebergFilters) {
// disable runtime filtering for easier validation
withSQLConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -1257,6 +1258,74 @@ public void testUpdateOnNonIcebergTableNotSupported() {
() -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", "testtable"));
}

@Test
public void testUpdateToWAPBranch() {
Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);

createAndInitTable(
"id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"a\" }");
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() -> {
sql("UPDATE %s SET dep='hr' WHERE dep='a'", tableName);
Assert.assertEquals(
"Should have expected num of rows when reading table",
2L,
sql("SELECT * FROM %s WHERE dep='hr'", tableName).size());
Assert.assertEquals(
"Should have expected num of rows when reading WAP branch",
2L,
sql("SELECT * FROM %s.branch_wap WHERE dep='hr'", tableName).size());
Assert.assertEquals(
"Should not modify main branch",
1L,
sql("SELECT * FROM %s.branch_main WHERE dep='hr'", tableName).size());
});

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() -> {
sql("UPDATE %s SET dep='b' WHERE dep='hr'", tableName);
Assert.assertEquals(
"Should have expected num of rows when reading table with multiple writes",
2L,
sql("SELECT * FROM %s WHERE dep='b'", tableName).size());
Assert.assertEquals(
"Should have expected num of rows when reading WAP branch with multiple writes",
2L,
sql("SELECT * FROM %s.branch_wap WHERE dep='b'", tableName).size());
Assert.assertEquals(
"Should not modify main branch with multiple writes",
0L,
sql("SELECT * FROM %s.branch_main WHERE dep='b'", tableName).size());
});
}

@Test
public void testUpdateToWapBranchWithTableBranchIdentifier() {
Assume.assumeTrue("Test must have branch name part in table identifier", branch != null);

createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }");
sql(
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);

withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() ->
Assertions.assertThatThrownBy(
() -> sql("UPDATE %s SET dep='hr' WHERE dep='a'", commitTarget()))
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
branch)));
}

private RowLevelOperationMode mode(Table table) {
String modeName = table.properties().getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT);
return RowLevelOperationMode.fromName(modeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,22 @@ public String branch() {
+ "got [%s] in identifier and [%s] in options",
branch,
optionBranch);
return branch != null ? branch : optionBranch;
String inputBranch = branch != null ? branch : optionBranch;
if (inputBranch != null) {
return inputBranch;
}

boolean wapEnabled =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer a separate method called wapEnabled() like we have in SparkWriteConf. Then we could use the constant for the default value and it would simplify this method.

public boolean wapEnabled() {
  return confParser
      .booleanConf()
      .tableProperty(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED)
      .defaultValue(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)
      .parse();
}

PropertyUtil.propertyAsBoolean(
table.properties(), TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, false);
if (wapEnabled) {
String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
if (wapBranch != null && table.refs().containsKey(wapBranch)) {
return wapBranch;
}
}

return null;
}

public String tag() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,12 @@ private SparkSQLProperties() {}

// Controls write distribution mode
public static final String DISTRIBUTION_MODE = "spark.sql.iceberg.distribution-mode";

// Controls the WAP ID used for write-audit-publish workflow.
// When set, new snapshots will be staged with this ID in snapshot summary.
public static final String WAP_ID = "spark.wap.id";

// Controls the WAP branch used for write-audit-publish workflow.
// When set, new snapshots will be committed to this branch.
public static final String WAP_BRANCH = "spark.wap.branch";
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -128,7 +129,7 @@ public boolean wapEnabled() {
}

public String wapId() {
return sessionConf.get("spark.wap.id", null);
return sessionConf.get(SparkSQLProperties.WAP_ID, null);
}

public boolean mergeSchema() {
Expand Down Expand Up @@ -333,6 +334,28 @@ public boolean caseSensitive() {
}

public String branch() {
if (wapEnabled()) {
String wapId = wapId();
String wapBranch =
confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What about a separate method like we have for wapId()?


ValidationException.check(
wapId == null || wapBranch == null,
"Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
wapId,
wapBranch);

if (wapBranch != null) {
ValidationException.check(
branch == null,
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]",
branch,
wapBranch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this behavior is a blocker because it is strict, but I would expect to be able to write to another branch with the WAP branch set. I'm curious what other people think the long-term behavior should be.

I think this behavior does help ensure that there are no side-effects, which is good if you want people to trust the pattern. But that's undermined by enabling/disabling WAP on a per-table basis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I added issue #7103 and we can discuss there with related people.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a reasonable starting point to me.


return wapBranch;
}
}

return branch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.StringToFileURI;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
Expand Down Expand Up @@ -319,7 +320,7 @@ public void testWapFilesAreKept() throws InterruptedException {
// normal write
df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);

spark.conf().set("spark.wap.id", "1");
spark.conf().set(SparkSQLProperties.WAP_ID, "1");

// wap write
df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);
Expand Down
Loading