diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 336d40cca0e3..359b229eda02 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -108,6 +108,67 @@ public void removeTables() { sql("DROP TABLE IF EXISTS parquet_table"); } + @Test + public void testCoalesceDelete() throws Exception { + createAndInitUnpartitionedTable(); + + Employee[] employees = new Employee[100]; + for (int index = 0; index < 100; index++) { + employees[index] = new Employee(index, "hr"); + } + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + append(tableName, employees); + + // set the open file cost large enough to produce a separate scan task per file + // use range distribution to trigger a shuffle + Map tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + DELETE_DISTRIBUTION_MODE, + DistributionMode.RANGE.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + // enable AQE and set the advisory partition size big enough to trigger combining + // set the number of shuffle partitions to 200 to distribute the work across reducers + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "200", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), + () -> { + SparkPlan plan = + executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); + Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + + if (mode(table) == COPY_ON_WRITE) { + // CoW DELETE requests the remaining records to be range distributed by `_file`, `_pos` + // every task has data for each of 200 reducers + // AQE detects that all shuffle blocks are small and processes them in 1 task + // otherwise, there would be 200 tasks writing to the table + validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "1"); + } else { + // MoR DELETE requests the deleted records to be range distributed by partition and `_file` + // each task contains only 1 file and therefore writes only 1 shuffle block + // that means 4 shuffle blocks are distributed among 200 reducers + // AQE detects that all 4 shuffle blocks are small and processes them in 1 task + // otherwise, there would be 4 tasks processing 1 shuffle block each + validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1"); + } + + Assert.assertEquals( + "Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget())); + } + @Test public void testSkewDelete() throws Exception { createAndInitPartitionedTable(); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 18d42ca6ae85..c008017d50af 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -97,6 +97,75 @@ public void removeTables() { sql("DROP TABLE IF EXISTS source"); } + @Test + public void testCoalesceMerge() { + createAndInitTable("id INT, salary INT, dep STRING"); + + String[] records = new String[100]; + for (int index = 0; index < 100; index++) { + records[index] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": \"hr\" }", index); + } + append(tableName, records); + append(tableName, records); + append(tableName, records); + append(tableName, records); + + // set the open file cost large enough to produce a separate scan task per file + // disable any write distribution + Map tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + MERGE_DISTRIBUTION_MODE, + DistributionMode.NONE.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + spark.range(0, 100).createOrReplaceTempView("source"); + + // enable AQE and set the advisory partition big enough to trigger combining + // set the number of shuffle partitions to 200 to distribute the work across reducers + // disable broadcast joins to make sure the join triggers a shuffle + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "200", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(), "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), + () -> { + sql( + "MERGE INTO %s t USING source " + + "ON t.id = source.id " + + "WHEN MATCHED THEN " + + " UPDATE SET salary = -1 ", + commitTarget()); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); + + if (mode(table) == COPY_ON_WRITE) { + // CoW MERGE would perform a join on `id` + // every task has data for each of 200 reducers + // AQE detects that all shuffle blocks are small and processes them in 1 task + // otherwise, there would be 200 tasks writing to the table + validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "1"); + } else { + // MoR MERGE would perform a join on `id` + // every task has data for each of 200 reducers + // AQE detects that all shuffle blocks are small and processes them in 1 task + // otherwise, there would be 200 tasks writing to the table + validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1"); + } + + Assert.assertEquals( + "Row count must match", + 400L, + scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget())); + } + @Test public void testSkewMerge() { createAndInitTable("id INT, salary INT, dep STRING"); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index ccfd83c73303..b05be6452a7e 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -102,6 +102,69 @@ public void removeTables() { sql("DROP TABLE IF EXISTS deleted_employee"); } + @Test + public void testCoalesceUpdate() { + createAndInitTable("id INT, dep STRING"); + + String[] records = new String[100]; + for (int index = 0; index < 100; index++) { + records[index] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", index); + } + append(tableName, records); + append(tableName, records); + append(tableName, records); + append(tableName, records); + + // set the open file cost large enough to produce a separate scan task per file + // use range distribution to trigger a shuffle + Map tableProps = + ImmutableMap.of( + SPLIT_OPEN_FILE_COST, + String.valueOf(Integer.MAX_VALUE), + UPDATE_DISTRIBUTION_MODE, + DistributionMode.RANGE.modeName()); + sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); + + createBranchIfNeeded(); + + // enable AQE and set the advisory partition size big enough to trigger combining + // set the number of shuffle partitions to 200 to distribute the work across reducers + withSQLConf( + ImmutableMap.of( + SQLConf.SHUFFLE_PARTITIONS().key(), "200", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", + SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), + () -> { + SparkPlan plan = + executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget()); + Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + }); + + Table table = validationCatalog.loadTable(tableIdent); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); + + if (mode(table) == COPY_ON_WRITE) { + // CoW UPDATE requests the updated records to be range distributed by `_file`, `_pos` + // every task has data for each of 200 reducers + // AQE detects that all shuffle blocks are small and processes them in 1 task + // otherwise, there would be 200 tasks writing to the table + validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "1"); + } else { + // MoR UPDATE requests the deleted records to be range distributed by partition and `_file` + // each task contains only 1 file and therefore writes only 1 shuffle block + // that means 4 shuffle blocks are distributed among 200 reducers + // AQE detects that all 4 shuffle blocks are small and processes them in 1 task + // otherwise, there would be 4 tasks processing 1 shuffle block each + validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1"); + } + + Assert.assertEquals( + "Row count must match", + 200L, + scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget())); + } + @Test public void testSkewUpdate() { createAndInitTable("id INT, dep STRING");