diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 5e42c2dabb41..08d1b7ccbbcf 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -63,6 +63,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -151,12 +152,15 @@ public void testCoalesceDelete() throws Exception { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), DELETE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); @@ -1306,10 +1310,8 @@ public void testDeleteWithMultipleSpecs() { Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { validateCopyOnWrite(currentSnapshot, "3", "4", "1"); - } else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) { - validateMergeOnRead(currentSnapshot, "3", "4", null); } else { - validateMergeOnRead(currentSnapshot, "3", "3", null); + validateMergeOnRead(currentSnapshot, "3", "4", null); } assertEquals( diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 5304e6f752df..11bd6bebd66d 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.extensions; import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; +import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS; import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS; import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE; @@ -56,6 +57,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -233,7 +235,6 @@ public void testMergeWithVectorizedReads() { @TestTemplate public void testCoalesceMerge() { - assumeThat(formatVersion).isLessThan(3); createAndInitTable("id INT, salary INT, dep STRING"); String[] records = new String[100]; @@ -252,7 +253,9 @@ public void testCoalesceMerge() { SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), MERGE_DISTRIBUTION_MODE, - DistributionMode.NONE.modeName()); + DistributionMode.NONE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); @@ -295,6 +298,9 @@ public void testCoalesceMerge() { // AQE detects that all shuffle blocks are small and processes them in 1 task // otherwise, there would be 200 tasks writing to the table validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "1"); + } else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) { + validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4"); + validateProperty(currentSnapshot, SnapshotSummary.ADDED_DVS_PROP, "4"); } else { // MoR MERGE would perform a join on `id` // every task has data for each of 200 reducers diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java index bb82b63d208d..eae9208022a1 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java @@ -49,7 +49,7 @@ private void createTable(boolean partitioned) throws Exception { String partitionStmt = partitioned ? "PARTITIONED BY (id)" : ""; sql( "CREATE TABLE %s (id bigint, data string) USING iceberg %s TBLPROPERTIES" - + "('format-version'='2', 'write.delete.mode'='merge-on-read')", + + "('format-version'='2', 'write.delete.mode'='merge-on-read', 'write.delete.granularity'='partition')", tableName, partitionStmt); List records = diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 6332e303ad67..b28ea0e286e9 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -58,6 +58,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -131,12 +132,15 @@ public void testCoalesceUpdate() { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), UPDATE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); @@ -440,10 +444,8 @@ public void testUpdateWithoutCondition() { validateProperty(currentSnapshot, CHANGED_PARTITION_COUNT_PROP, "2"); validateProperty(currentSnapshot, DELETED_FILES_PROP, "3"); validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3")); - } else if (mode(table) == MERGE_ON_READ && formatVersion >= 3) { - validateMergeOnRead(currentSnapshot, "2", "3", "2"); } else { - validateMergeOnRead(currentSnapshot, "2", "2", "2"); + validateMergeOnRead(currentSnapshot, "2", "3", "2"); } assertEquals( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 7ba8d558d5e7..f9fb904db394 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -714,14 +714,12 @@ private double shuffleCompressionRatio(FileFormat outputFileFormat, String outpu } public DeleteGranularity deleteGranularity() { - String valueAsString = - confParser - .stringConf() - .option(SparkWriteOptions.DELETE_GRANULARITY) - .tableProperty(TableProperties.DELETE_GRANULARITY) - .defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT) - .parse(); - return DeleteGranularity.fromString(valueAsString); + return confParser + .enumConf(DeleteGranularity::fromString) + .option(SparkWriteOptions.DELETE_GRANULARITY) + .tableProperty(TableProperties.DELETE_GRANULARITY) + .defaultValue(DeleteGranularity.FILE) + .parse(); } public boolean useDVs() { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index c2df62697882..42d697410377 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -142,7 +142,7 @@ public void testDeleteGranularityDefault() { SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); DeleteGranularity value = writeConf.deleteGranularity(); - assertThat(value).isEqualTo(DeleteGranularity.PARTITION); + assertThat(value).isEqualTo(DeleteGranularity.FILE); } @TestTemplate @@ -151,13 +151,13 @@ public void testDeleteGranularityTableProperty() { table .updateProperties() - .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.FILE.toString()) + .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.PARTITION.toString()) .commit(); SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); DeleteGranularity value = writeConf.deleteGranularity(); - assertThat(value).isEqualTo(DeleteGranularity.FILE); + assertThat(value).isEqualTo(DeleteGranularity.PARTITION); } @TestTemplate diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index 8ecec5ac2d42..a2da5cc447df 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -757,7 +757,9 @@ private Map tableProperties() { TableProperties.FORMAT_VERSION, "2", TableProperties.DEFAULT_FILE_FORMAT, - format.toString()); + format.toString(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); } private void writeRecords(Table table, int files, int numRecords) {