diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index 5b07a3c3847d..d4cb67a2bbf3 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -166,6 +166,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer testImplementation libs.avro.avro testImplementation libs.parquet.hadoop + testImplementation libs.awaitility // Required because we remove antlr plugin dependencies from the compile configuration, see note above runtimeOnly libs.antlr.runtime diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java index e4605a765823..ea1040dcf0ac 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java @@ -45,11 +45,15 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; @@ -69,41 +73,30 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.execution.SparkPlan; -import org.junit.Assert; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import org.junit.jupiter.api.extension.ExtendWith; -@RunWith(Parameterized.class) -public abstract class SparkRowLevelOperationsTestBase extends SparkExtensionsTestBase { +@ExtendWith(ParameterizedTestExtension.class) +public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase { private static final Random RANDOM = ThreadLocalRandom.current(); - protected final String fileFormat; - protected final boolean vectorized; - protected final String distributionMode; - protected final boolean fanoutEnabled; - protected final String branch; - protected final PlanningMode planningMode; - - public SparkRowLevelOperationsTestBase( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super(catalogName, implementation, config); - this.fileFormat = fileFormat; - this.vectorized = vectorized; - this.distributionMode = distributionMode; - this.fanoutEnabled = fanoutEnabled; - this.branch = branch; - this.planningMode = planningMode; - } + @Parameter(index = 3) + protected FileFormat fileFormat; + + @Parameter(index = 4) + protected boolean vectorized; + + @Parameter(index = 5) + protected String distributionMode; + + @Parameter(index = 6) + protected boolean fanoutEnabled; + + @Parameter(index = 7) + protected String branch; + + @Parameter(index = 8) + protected PlanningMode planningMode; @Parameters( name = @@ -118,7 +111,7 @@ public static Object[][] parameters() { ImmutableMap.of( "type", "hive", "default-namespace", "default"), - "orc", + FileFormat.ORC, true, WRITE_DISTRIBUTION_MODE_NONE, true, @@ -131,7 +124,7 @@ public static Object[][] parameters() { ImmutableMap.of( "type", "hive", "default-namespace", "default"), - "parquet", + FileFormat.PARQUET, true, WRITE_DISTRIBUTION_MODE_NONE, false, @@ -142,7 +135,7 @@ public static Object[][] parameters() { "testhadoop", SparkCatalog.class.getName(), ImmutableMap.of("type", "hadoop"), - "parquet", + FileFormat.PARQUET, RANDOM.nextBoolean(), WRITE_DISTRIBUTION_MODE_HASH, true, @@ -160,7 +153,7 @@ public static Object[][] parameters() { "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync ), - "avro", + FileFormat.AVRO, false, WRITE_DISTRIBUTION_MODE_RANGE, false, @@ -188,18 +181,18 @@ protected void initTable() { planningMode.modeName()); switch (fileFormat) { - case "parquet": + case PARQUET: sql( "ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", tableName, PARQUET_VECTORIZATION_ENABLED, vectorized); break; - case "orc": + case ORC: sql( "ALTER TABLE %s SET TBLPROPERTIES('%s' '%b')", tableName, ORC_VECTORIZATION_ENABLED, vectorized); break; - case "avro": - Assert.assertFalse(vectorized); + case AVRO: + assertThat(vectorized).isFalse(); break; } @@ -303,7 +296,7 @@ protected void validateSnapshot( String deletedDataFiles, String addedDeleteFiles, String addedDataFiles) { - Assert.assertEquals("Operation must match", operation, snapshot.operation()); + assertThat(snapshot.operation()).as("Operation must match").isEqualTo(operation); validateProperty(snapshot, CHANGED_PARTITION_COUNT_PROP, changedPartitionCount); validateProperty(snapshot, DELETED_FILES_PROP, deletedDataFiles); validateProperty(snapshot, ADDED_DELETE_FILES_PROP, addedDeleteFiles); @@ -312,20 +305,22 @@ protected void validateSnapshot( protected void validateProperty(Snapshot snapshot, String property, Set expectedValues) { String actual = snapshot.summary().get(property); - Assert.assertTrue( - "Snapshot property " - + property - + " has unexpected value, actual = " - + actual - + ", expected one of : " - + String.join(",", expectedValues), - expectedValues.contains(actual)); + assertThat(actual) + .as( + "Snapshot property " + + property + + " has unexpected value, actual = " + + actual + + ", expected one of : " + + String.join(",", expectedValues)) + .isIn(expectedValues); } protected void validateProperty(Snapshot snapshot, String property, String expectedValue) { String actual = snapshot.summary().get(property); - Assert.assertEquals( - "Snapshot property " + property + " has unexpected value.", expectedValue, actual); + assertThat(actual) + .as("Snapshot property " + property + " has unexpected value.") + .isEqualTo(expectedValue); } protected void sleep(long millis) { @@ -338,7 +333,9 @@ protected void sleep(long millis) { protected DataFile writeDataFile(Table table, List records) { try { - OutputFile file = Files.localOutput(temp.newFile()); + OutputFile file = + Files.localOutput( + temp.resolve(fileFormat.addExtension(UUID.randomUUID().toString())).toFile()); DataWriter dataWriter = Parquet.writeData(file) @@ -384,7 +381,7 @@ protected boolean supportsVectorization() { } private boolean isParquet() { - return fileFormat.equalsIgnoreCase(FileFormat.PARQUET.name()); + return fileFormat.equals(FileFormat.PARQUET); } private boolean isCopyOnWrite() { diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java index aaa275db9c16..cc2bd91cf802 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java @@ -19,8 +19,8 @@ package org.apache.iceberg.spark.extensions; import java.util.List; -import java.util.Map; import org.apache.iceberg.IsolationLevel; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -30,18 +30,15 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.functions; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; -public class TestConflictValidation extends SparkExtensionsTestBase { +@ExtendWith(ParameterizedTestExtension.class) +public class TestConflictValidation extends ExtensionsTestBase { - public TestConflictValidation( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - - @Before + @BeforeEach public void createTables() { sql( "CREATE TABLE %s (id int, data string) USING iceberg " @@ -53,12 +50,12 @@ public void createTables() { sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName); } - @After + @AfterEach public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); } - @Test + @TestTemplate public void testOverwriteFilterSerializableIsolation() throws Exception { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); @@ -90,7 +87,7 @@ public void testOverwriteFilterSerializableIsolation() throws Exception { .overwrite(functions.col("id").equalTo(1)); } - @Test + @TestTemplate public void testOverwriteFilterSerializableIsolation2() throws Exception { List records = Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(1, "b")); @@ -127,7 +124,7 @@ public void testOverwriteFilterSerializableIsolation2() throws Exception { .overwrite(functions.col("id").equalTo(1)); } - @Test + @TestTemplate public void testOverwriteFilterSerializableIsolation3() throws Exception { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); @@ -161,7 +158,7 @@ public void testOverwriteFilterSerializableIsolation3() throws Exception { .overwrite(functions.col("id").equalTo(1)); } - @Test + @TestTemplate public void testOverwriteFilterNoSnapshotIdValidation() throws Exception { Table table = validationCatalog.loadTable(tableIdent); @@ -192,7 +189,7 @@ public void testOverwriteFilterNoSnapshotIdValidation() throws Exception { .overwrite(functions.col("id").equalTo(1)); } - @Test + @TestTemplate public void testOverwriteFilterSnapshotIsolation() throws Exception { List records = Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(1, "b")); @@ -229,7 +226,7 @@ public void testOverwriteFilterSnapshotIsolation() throws Exception { .overwrite(functions.col("id").equalTo(1)); } - @Test + @TestTemplate public void testOverwriteFilterSnapshotIsolation2() throws Exception { Table table = validationCatalog.loadTable(tableIdent); long snapshotId = table.currentSnapshot().snapshotId(); @@ -246,7 +243,7 @@ public void testOverwriteFilterSnapshotIsolation2() throws Exception { .overwrite(functions.col("id").equalTo(1)); } - @Test + @TestTemplate public void testOverwritePartitionSerializableIsolation() throws Exception { Table table = validationCatalog.loadTable(tableIdent); final long snapshotId = table.currentSnapshot().snapshotId(); @@ -278,7 +275,7 @@ public void testOverwritePartitionSerializableIsolation() throws Exception { .overwritePartitions(); } - @Test + @TestTemplate public void testOverwritePartitionSnapshotIsolation() throws Exception { List records = Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(1, "b")); @@ -313,7 +310,7 @@ public void testOverwritePartitionSnapshotIsolation() throws Exception { .overwritePartitions(); } - @Test + @TestTemplate public void testOverwritePartitionSnapshotIsolation2() throws Exception { Table table = validationCatalog.loadTable(tableIdent); final long snapshotId = table.currentSnapshot().snapshotId(); @@ -347,7 +344,7 @@ public void testOverwritePartitionSnapshotIsolation2() throws Exception { .overwritePartitions(); } - @Test + @TestTemplate public void testOverwritePartitionSnapshotIsolation3() throws Exception { Table table = validationCatalog.loadTable(tableIdent); final long snapshotId = table.currentSnapshot().snapshotId(); @@ -364,7 +361,7 @@ public void testOverwritePartitionSnapshotIsolation3() throws Exception { .overwritePartitions(); } - @Test + @TestTemplate public void testOverwritePartitionNoSnapshotIdValidation() throws Exception { Table table = validationCatalog.loadTable(tableIdent); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java index 9ebe73da3323..edddb3bbb8fd 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.extensions; import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Collections; import java.util.Map; @@ -32,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; -import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -40,7 +42,6 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSQLProperties; @@ -49,45 +50,23 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestCopyOnWriteDelete extends TestDelete { - public TestCopyOnWriteDelete( - String catalogName, - String implementation, - Map config, - String fileFormat, - Boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - @Override protected Map extraTableProperties() { return ImmutableMap.of( TableProperties.DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); } - @Test + @TestTemplate public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception { // this test can only be run with Hive tables as it requires a reliable lock // also, the table cache must be enabled so that the same table instance can be reused - Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive")); + assumeThat(catalogName).isEqualToIgnoringCase("testhive"); createAndInitUnpartitionedTable(); createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT()); @@ -167,10 +146,10 @@ public synchronized void testDeleteWithConcurrentTableRefresh() throws Exception } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableException { createAndInitPartitionedTable(); @@ -188,7 +167,7 @@ public void testRuntimeFilteringWithPreservedDataGrouping() throws NoSuchTableEx withSQLConf(sqlConf, () -> sql("DELETE FROM %s WHERE id = 2", commitTarget())); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); validateCopyOnWrite(currentSnapshot, "1", "1", "1"); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java index 6b6819a924e8..dfd22e96ddc1 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.extensions; import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Collections; import java.util.Map; @@ -31,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.DataFile; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -39,7 +40,6 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSQLProperties; @@ -47,45 +47,21 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Test; +import org.junit.jupiter.api.TestTemplate; public class TestCopyOnWriteMerge extends TestMerge { - public TestCopyOnWriteMerge( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - @Override protected Map extraTableProperties() { return ImmutableMap.of( TableProperties.MERGE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); } - @Test + @TestTemplate public synchronized void testMergeWithConcurrentTableRefresh() throws Exception { // this test can only be run with Hive tables as it requires a reliable lock // also, the table cache must be enabled so that the same table instance can be reused - Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive")); + assumeThat(catalogName).isEqualToIgnoringCase("testhive"); createAndInitTable("id INT, dep STRING"); createOrReplaceView("source", Collections.singletonList(1), Encoders.INT()); @@ -165,10 +141,10 @@ public synchronized void testMergeWithConcurrentTableRefresh() throws Exception } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public void testRuntimeFilteringWithReportedPartitioning() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -199,7 +175,7 @@ public void testRuntimeFilteringWithReportedPartitioning() { commitTarget())); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); validateCopyOnWrite(currentSnapshot, "1", "1", "1"); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java index 4354a1019c69..e9767c66bafc 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.extensions; import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -31,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -39,52 +40,27 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Test; +import org.junit.jupiter.api.TestTemplate; public class TestCopyOnWriteUpdate extends TestUpdate { - public TestCopyOnWriteUpdate( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - @Override protected Map extraTableProperties() { return ImmutableMap.of( TableProperties.UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName()); } - @Test + @TestTemplate public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception { // this test can only be run with Hive tables as it requires a reliable lock // also, the table cache must be enabled so that the same table instance can be reused - Assume.assumeTrue(catalogName.equalsIgnoreCase("testhive")); + assumeThat(catalogName).isEqualToIgnoringCase("testhive"); createAndInitTable("id INT, dep STRING"); @@ -163,10 +139,10 @@ public synchronized void testUpdateWithConcurrentTableRefresh() throws Exception } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public void testRuntimeFilteringWithReportedPartitioning() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -187,7 +163,7 @@ public void testRuntimeFilteringWithReportedPartitioning() { withSQLConf(sqlConf, () -> sql("UPDATE %s SET id = -1 WHERE id = 2", commitTarget())); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); validateCopyOnWrite(currentSnapshot, "1", "1", "1"); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 9bc46d05713f..e9a8c13be56a 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -27,6 +27,7 @@ import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.apache.spark.sql.functions.lit; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Arrays; @@ -45,8 +46,9 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; @@ -57,7 +59,6 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.spark.Spark3Util; @@ -77,42 +78,21 @@ import org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public abstract class TestDelete extends SparkRowLevelOperationsTestBase { - public TestDelete( - String catalogName, - String implementation, - Map config, - String fileFormat, - Boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - - @BeforeClass + @BeforeAll public static void setupSparkConf() { spark.conf().set("spark.sql.shuffle.partitions", "4"); } - @After + @AfterEach public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); sql("DROP TABLE IF EXISTS deleted_id"); @@ -120,7 +100,7 @@ public void removeTables() { sql("DROP TABLE IF EXISTS parquet_table"); } - @Test + @TestTemplate public void testDeleteWithVectorizedReads() throws NoSuchTableException { assumeThat(supportsVectorization()).isTrue(); @@ -135,7 +115,7 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException { assertAllBatchScansVectorized(plan); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { @@ -150,7 +130,7 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException { sql("SELECT * FROM %s ORDER BY id ASC", selectTarget())); } - @Test + @TestTemplate public void testCoalesceDelete() throws Exception { createAndInitUnpartitionedTable(); @@ -193,7 +173,7 @@ public void testCoalesceDelete() throws Exception { () -> { SparkPlan plan = executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); - Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); }); Table table = validationCatalog.loadTable(tableIdent); @@ -214,11 +194,12 @@ public void testCoalesceDelete() throws Exception { validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1"); } - Assert.assertEquals( - "Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget())); + assertThat(scalarSql("SELECT COUNT(*) FROM %s", commitTarget())) + .as("Row count must match") + .isEqualTo(200L); } - @Test + @TestTemplate public void testSkewDelete() throws Exception { createAndInitPartitionedTable(); @@ -261,7 +242,7 @@ public void testSkewDelete() throws Exception { () -> { SparkPlan plan = executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); - Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); }); Table table = validationCatalog.loadTable(tableIdent); @@ -283,11 +264,12 @@ public void testSkewDelete() throws Exception { validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4"); } - Assert.assertEquals( - "Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget())); + assertThat(scalarSql("SELECT COUNT(*) FROM %s", commitTarget())) + .as("Row count must match") + .isEqualTo(200L); } - @Test + @TestTemplate public void testDeleteWithoutScanningTable() throws Exception { createAndInitPartitionedTable(); @@ -308,10 +290,10 @@ public void testDeleteWithoutScanningTable() throws Exception { LogicalPlan parsed = parsePlan("DELETE FROM %s WHERE dep = 'hr'", commitTarget()); LogicalPlan analyzed = spark.sessionState().analyzer().execute(parsed); - Assertions.assertThat(analyzed).isInstanceOf(RowLevelWrite.class); + assertThat(analyzed).isInstanceOf(RowLevelWrite.class); LogicalPlan optimized = OptimizeMetadataOnlyDeleteFromTable.apply(analyzed); - Assertions.assertThat(optimized).isInstanceOf(DeleteFromTableWithFilters.class); + assertThat(optimized).isInstanceOf(DeleteFromTableWithFilters.class); }); sql("DELETE FROM %s WHERE dep = 'hr'", commitTarget()); @@ -322,9 +304,11 @@ public void testDeleteWithoutScanningTable() throws Exception { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testDeleteFileThenMetadataDelete() throws Exception { - Assume.assumeFalse("Avro does not support metadata delete", fileFormat.equals("avro")); + assumeThat(fileFormat) + .as("Avro does not support metadata delete") + .isNotEqualTo(FileFormat.AVRO); createAndInitUnpartitionedTable(); createBranchIfNeeded(); sql("INSERT INTO TABLE %s VALUES (1, 'hr'), (2, 'hardware'), (null, 'hr')", commitTarget()); @@ -339,8 +323,9 @@ public void testDeleteFileThenMetadataDelete() throws Exception { sql("DELETE FROM %s AS t WHERE t.id = 1", commitTarget()); List dataFilesAfter = TestHelpers.dataFiles(table, branch); - Assert.assertTrue( - "Data file should have been removed", dataFilesBefore.size() > dataFilesAfter.size()); + assertThat(dataFilesAfter) + .as("Data file should have been removed") + .hasSizeLessThan(dataFilesBefore.size()); assertEquals( "Should have expected rows", @@ -348,7 +333,7 @@ public void testDeleteFileThenMetadataDelete() throws Exception { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithPartitionedTable() throws Exception { createAndInitPartitionedTable(); @@ -364,8 +349,9 @@ public void testDeleteWithPartitionedTable() throws Exception { sql("SELECT * FROM %s ORDER BY id", tableName)); List rowLevelDeletePartitions = spark.sql("SELECT * FROM " + tableName + ".partitions ").collectAsList(); - Assert.assertEquals( - "row level delete does not reduce number of partition", 2, rowLevelDeletePartitions.size()); + assertThat(rowLevelDeletePartitions) + .as("row level delete does not reduce number of partition") + .hasSize(2); // partition aligned delete sql("DELETE FROM %s WHERE dep = 'hr'", tableName); @@ -376,11 +362,10 @@ public void testDeleteWithPartitionedTable() throws Exception { sql("SELECT * FROM %s ORDER BY id", tableName)); List actualPartitions = spark.sql("SELECT * FROM " + tableName + ".partitions ").collectAsList(); - Assert.assertEquals( - "partition aligned delete results in 1 partition", 1, actualPartitions.size()); + assertThat(actualPartitions).as("partition aligned delete results in 1 partition").hasSize(1); } - @Test + @TestTemplate public void testDeleteWithFalseCondition() { createAndInitUnpartitionedTable(); @@ -390,7 +375,7 @@ public void testDeleteWithFalseCondition() { sql("DELETE FROM %s WHERE id = 1 AND id > 20", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 2 snapshots").hasSize(2); assertEquals( "Should have expected rows", @@ -398,16 +383,16 @@ public void testDeleteWithFalseCondition() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testDeleteFromEmptyTable() { - Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(branch)); + assumeThat(branch).as("Custom branch does not exist for empty table").isNotEqualTo("test"); createAndInitUnpartitionedTable(); sql("DELETE FROM %s WHERE id IN (1)", commitTarget()); sql("DELETE FROM %s WHERE dep = 'hr'", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 2 snapshots").hasSize(2); assertEquals( "Should have expected rows", @@ -415,9 +400,9 @@ public void testDeleteFromEmptyTable() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testDeleteFromNonExistingCustomBranch() { - Assume.assumeTrue("Test only applicable to custom branch", "test".equals(branch)); + assumeThat(branch).as("Test only applicable to custom branch").isEqualTo("test"); createAndInitUnpartitionedTable(); Assertions.assertThatThrownBy(() -> sql("DELETE FROM %s WHERE id IN (1)", commitTarget())) @@ -425,7 +410,7 @@ public void testDeleteFromNonExistingCustomBranch() { .hasMessage("Cannot use branch (does not exist): test"); } - @Test + @TestTemplate public void testExplain() { createAndInitUnpartitionedTable(); @@ -437,7 +422,7 @@ public void testExplain() { sql("EXPLAIN DELETE FROM %s WHERE true", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 1 snapshot", 1, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 1 snapshot").hasSize(1); assertEquals( "Should have expected rows", @@ -445,7 +430,7 @@ public void testExplain() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", commitTarget())); } - @Test + @TestTemplate public void testDeleteWithAlias() { createAndInitUnpartitionedTable(); @@ -460,7 +445,7 @@ public void testDeleteWithAlias() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithDynamicFileFiltering() throws NoSuchTableException { createAndInitPartitionedTable(); @@ -471,7 +456,7 @@ public void testDeleteWithDynamicFileFiltering() throws NoSuchTableException { sql("DELETE FROM %s WHERE id = 2", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { @@ -486,7 +471,7 @@ public void testDeleteWithDynamicFileFiltering() throws NoSuchTableException { sql("SELECT * FROM %s ORDER BY id, dep", selectTarget())); } - @Test + @TestTemplate public void testDeleteNonExistingRecords() { createAndInitPartitionedTable(); @@ -496,11 +481,11 @@ public void testDeleteNonExistingRecords() { sql("DELETE FROM %s AS t WHERE t.id > 10", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 2 snapshots").hasSize(2); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); - if (fileFormat.equals("orc") || fileFormat.equals("parquet")) { + if (fileFormat.equals(FileFormat.ORC) || fileFormat.equals(FileFormat.PARQUET)) { validateDelete(currentSnapshot, "0", null); } else { if (mode(table) == COPY_ON_WRITE) { @@ -516,7 +501,7 @@ public void testDeleteNonExistingRecords() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithoutCondition() { createAndInitPartitionedTable(); @@ -528,7 +513,7 @@ public void testDeleteWithoutCondition() { sql("DELETE FROM %s", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 4 snapshots").hasSize(4); // should be a delete instead of an overwrite as it is done through a metadata operation Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); @@ -538,7 +523,7 @@ public void testDeleteWithoutCondition() { "Should have expected rows", ImmutableList.of(), sql("SELECT * FROM %s", commitTarget())); } - @Test + @TestTemplate public void testDeleteUsingMetadataWithComplexCondition() { createAndInitPartitionedTable(); @@ -552,7 +537,7 @@ public void testDeleteUsingMetadataWithComplexCondition() { commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 4 snapshots").hasSize(4); // should be a delete instead of an overwrite as it is done through a metadata operation Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); @@ -564,7 +549,7 @@ public void testDeleteUsingMetadataWithComplexCondition() { sql("SELECT * FROM %s", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithArbitraryPartitionPredicates() { createAndInitPartitionedTable(); @@ -577,7 +562,7 @@ public void testDeleteWithArbitraryPartitionPredicates() { sql("DELETE FROM %s WHERE id = 10 OR dep LIKE '%%ware'", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 4 snapshots").hasSize(4); // should be an overwrite since cannot be executed using a metadata operation Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); @@ -593,7 +578,7 @@ public void testDeleteWithArbitraryPartitionPredicates() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithNonDeterministicCondition() { createAndInitPartitionedTable(); @@ -606,7 +591,7 @@ public void testDeleteWithNonDeterministicCondition() { .hasMessageContaining("The operator expects a deterministic expression"); } - @Test + @TestTemplate public void testDeleteWithFoldableConditions() { createAndInitPartitionedTable(); @@ -642,10 +627,10 @@ public void testDeleteWithFoldableConditions() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 2 snapshots").hasSize(2); } - @Test + @TestTemplate public void testDeleteWithNullConditions() { createAndInitPartitionedTable(); @@ -677,13 +662,13 @@ public void testDeleteWithNullConditions() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); validateDelete(currentSnapshot, "1", "1"); } - @Test + @TestTemplate public void testDeleteWithInAndNotInConditions() { createAndInitUnpartitionedTable(); @@ -709,9 +694,9 @@ public void testDeleteWithInAndNotInConditions() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithMultipleRowGroupsParquet() throws NoSuchTableException { - Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet")); + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); createAndInitPartitionedTable(); @@ -732,15 +717,15 @@ public void testDeleteWithMultipleRowGroupsParquet() throws NoSuchTableException df.coalesce(1).writeTo(tableName).append(); createBranchIfNeeded(); - Assert.assertEquals(200, spark.table(commitTarget()).count()); + assertThat(spark.table(commitTarget()).count()).isEqualTo(200); // delete a record from one of two row groups and copy over the second one sql("DELETE FROM %s WHERE id IN (200, 201)", commitTarget()); - Assert.assertEquals(199, spark.table(commitTarget()).count()); + assertThat(spark.table(commitTarget()).count()).isEqualTo(199); } - @Test + @TestTemplate public void testDeleteWithConditionOnNestedColumn() { createAndInitNestedColumnsTable(); @@ -759,7 +744,7 @@ public void testDeleteWithConditionOnNestedColumn() { "Should have expected rows", ImmutableList.of(), sql("SELECT id FROM %s", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithInSubquery() throws NoSuchTableException { createAndInitUnpartitionedTable(); @@ -806,7 +791,7 @@ public void testDeleteWithInSubquery() throws NoSuchTableException { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithMultiColumnInSubquery() throws NoSuchTableException { createAndInitUnpartitionedTable(); @@ -824,7 +809,7 @@ public void testDeleteWithMultiColumnInSubquery() throws NoSuchTableException { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithNotInSubquery() throws NoSuchTableException { createAndInitUnpartitionedTable(); @@ -884,9 +869,9 @@ public void testDeleteWithNotInSubquery() throws NoSuchTableException { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testDeleteOnNonIcebergTableNotSupported() { - Assume.assumeTrue(catalogName.equalsIgnoreCase("spark_catalog")); + assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog"); sql("CREATE TABLE parquet_table (c1 INT, c2 INT) USING parquet"); @@ -895,7 +880,7 @@ public void testDeleteOnNonIcebergTableNotSupported() { .hasMessageContaining("does not support DELETE"); } - @Test + @TestTemplate public void testDeleteWithExistSubquery() throws NoSuchTableException { createAndInitUnpartitionedTable(); @@ -940,7 +925,7 @@ public void testDeleteWithExistSubquery() throws NoSuchTableException { sql("SELECT * FROM %s", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithNotExistsSubquery() throws NoSuchTableException { createAndInitUnpartitionedTable(); @@ -976,7 +961,7 @@ public void testDeleteWithNotExistsSubquery() throws NoSuchTableException { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testDeleteWithScalarSubquery() throws NoSuchTableException { createAndInitUnpartitionedTable(); @@ -997,7 +982,7 @@ public void testDeleteWithScalarSubquery() throws NoSuchTableException { }); } - @Test + @TestTemplate public void testDeleteThatRequiresGroupingBeforeWrite() throws NoSuchTableException { createAndInitPartitionedTable(); @@ -1015,20 +1000,21 @@ public void testDeleteThatRequiresGroupingBeforeWrite() throws NoSuchTableExcept spark.conf().set("spark.sql.shuffle.partitions", "1"); sql("DELETE FROM %s t WHERE id IN (SELECT * FROM deleted_id)", commitTarget()); - Assert.assertEquals( - "Should have expected num of rows", 8L, spark.table(commitTarget()).count()); + assertThat(spark.table(commitTarget()).count()) + .as("Should have expected num of rows") + .isEqualTo(8L); } finally { spark.conf().set("spark.sql.shuffle.partitions", originalNumOfShufflePartitions); } } - @Test + @TestTemplate public synchronized void testDeleteWithSerializableIsolation() throws InterruptedException { // cannot run tests with concurrency for Hadoop tables without atomic renames - Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop")); + assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop"); // if caching is off, the table is eagerly refreshed during runtime filtering // this can cause a validation exception as concurrent changes would be visible - Assume.assumeTrue(cachingCatalogEnabled()); + assumeThat(cachingCatalogEnabled()).isTrue(); createAndInitUnpartitionedTable(); createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT()); @@ -1052,9 +1038,11 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte executorService.submit( () -> { for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { - while (barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> barrier.get() >= currentNumOperations * 2); sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", commitTarget()); @@ -1074,9 +1062,11 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte record.set(1, "hr"); // dep for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { - while (shouldAppend.get() && barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> !shouldAppend.get() || barrier.get() >= currentNumOperations * 2); if (!shouldAppend.get()) { return; @@ -1090,7 +1080,6 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte } appendFiles.commit(); - sleep(10); } barrier.incrementAndGet(); @@ -1109,17 +1098,17 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public synchronized void testDeleteWithSnapshotIsolation() throws InterruptedException, ExecutionException { // cannot run tests with concurrency for Hadoop tables without atomic renames - Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop")); + assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop"); // if caching is off, the table is eagerly refreshed during runtime filtering // this can cause a validation exception as concurrent changes would be visible - Assume.assumeTrue(cachingCatalogEnabled()); + assumeThat(cachingCatalogEnabled()).isTrue(); createAndInitUnpartitionedTable(); createOrReplaceView("deleted_id", Collections.singletonList(1), Encoders.INT()); @@ -1143,9 +1132,11 @@ public synchronized void testDeleteWithSnapshotIsolation() executorService.submit( () -> { for (int numOperations = 0; numOperations < 20; numOperations++) { - while (barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> barrier.get() >= currentNumOperations * 2); sql("DELETE FROM %s WHERE id IN (SELECT * FROM deleted_id)", commitTarget()); @@ -1165,9 +1156,11 @@ public synchronized void testDeleteWithSnapshotIsolation() record.set(1, "hr"); // dep for (int numOperations = 0; numOperations < 20; numOperations++) { - while (shouldAppend.get() && barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> !shouldAppend.get() || barrier.get() >= currentNumOperations * 2); if (!shouldAppend.get()) { return; @@ -1181,7 +1174,6 @@ public synchronized void testDeleteWithSnapshotIsolation() } appendFiles.commit(); - sleep(10); } barrier.incrementAndGet(); @@ -1196,10 +1188,10 @@ public synchronized void testDeleteWithSnapshotIsolation() } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public void testDeleteRefreshesRelationCache() throws NoSuchTableException { createAndInitPartitionedTable(); @@ -1220,7 +1212,7 @@ public void testDeleteRefreshesRelationCache() throws NoSuchTableException { sql("DELETE FROM %s WHERE id = 1", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { @@ -1241,7 +1233,7 @@ public void testDeleteRefreshesRelationCache() throws NoSuchTableException { spark.sql("UNCACHE TABLE tmp"); } - @Test + @TestTemplate public void testDeleteWithMultipleSpecs() { createAndInitTable("id INT, dep STRING, category STRING"); @@ -1267,7 +1259,7 @@ public void testDeleteWithMultipleSpecs() { sql("DELETE FROM %s WHERE id IN (1, 3, 5, 7)", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 5 snapshots", 5, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 5 snapshots").hasSize(5); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { @@ -1282,9 +1274,9 @@ public void testDeleteWithMultipleSpecs() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testDeleteToWapBranch() throws NoSuchTableException { - Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null); + assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull(); createAndInitPartitionedTable(); sql( @@ -1296,40 +1288,36 @@ public void testDeleteToWapBranch() throws NoSuchTableException { ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"), () -> { sql("DELETE FROM %s t WHERE id=0", tableName); - Assert.assertEquals( - "Should have expected num of rows when reading table", - 2L, - spark.table(tableName).count()); - Assert.assertEquals( - "Should have expected num of rows when reading WAP branch", - 2L, - spark.table(tableName + ".branch_wap").count()); - Assert.assertEquals( - "Should not modify main branch", 3L, spark.table(tableName + ".branch_main").count()); + assertThat(spark.table(tableName).count()) + .as("Should have expected num of rows when reading table") + .isEqualTo(2L); + assertThat(spark.table(tableName + ".branch_wap").count()) + .as("Should have expected num of rows when reading WAP branch") + .isEqualTo(2L); + assertThat(spark.table(tableName + ".branch_main").count()) + .as("Should not modify main branch") + .isEqualTo(3L); }); withSQLConf( ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"), () -> { sql("DELETE FROM %s t WHERE id=1", tableName); - Assert.assertEquals( - "Should have expected num of rows when reading table with multiple writes", - 1L, - spark.table(tableName).count()); - Assert.assertEquals( - "Should have expected num of rows when reading WAP branch with multiple writes", - 1L, - spark.table(tableName + ".branch_wap").count()); - Assert.assertEquals( - "Should not modify main branch with multiple writes", - 3L, - spark.table(tableName + ".branch_main").count()); + assertThat(spark.table(tableName).count()) + .as("Should have expected num of rows when reading table with multiple writes") + .isEqualTo(1L); + assertThat(spark.table(tableName + ".branch_wap").count()) + .as("Should have expected num of rows when reading WAP branch with multiple writes") + .isEqualTo(1L); + assertThat(spark.table(tableName + ".branch_main").count()) + .as("Should not modify main branch with multiple writes") + .isEqualTo(3L); }); } - @Test + @TestTemplate public void testDeleteToWapBranchWithTableBranchIdentifier() throws NoSuchTableException { - Assume.assumeTrue("Test must have branch name part in table identifier", branch != null); + assumeThat(branch).as("Test must have branch name part in table identifier").isNotNull(); createAndInitPartitionedTable(); sql( @@ -1349,7 +1337,7 @@ public void testDeleteToWapBranchWithTableBranchIdentifier() throws NoSuchTableE branch))); } - @Test + @TestTemplate public void testDeleteToCustomWapBranchWithoutWhereClause() throws NoSuchTableException { assumeThat(branch) .as("Run only if custom WAP branch is not main") @@ -1367,9 +1355,9 @@ public void testDeleteToCustomWapBranchWithoutWhereClause() throws NoSuchTableEx ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch), () -> { sql("DELETE FROM %s t WHERE id=1", tableName); - Assertions.assertThat(spark.table(tableName).count()).isEqualTo(2L); - Assertions.assertThat(spark.table(tableName + ".branch_" + branch).count()).isEqualTo(2L); - Assertions.assertThat(spark.table(tableName + ".branch_main").count()) + assertThat(spark.table(tableName).count()).isEqualTo(2L); + assertThat(spark.table(tableName + ".branch_" + branch).count()).isEqualTo(2L); + assertThat(spark.table(tableName + ".branch_main").count()) .as("Should not modify main branch") .isEqualTo(3L); }); @@ -1377,9 +1365,9 @@ public void testDeleteToCustomWapBranchWithoutWhereClause() throws NoSuchTableEx ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch), () -> { sql("DELETE FROM %s t", tableName); - Assertions.assertThat(spark.table(tableName).count()).isEqualTo(0L); - Assertions.assertThat(spark.table(tableName + ".branch_" + branch).count()).isEqualTo(0L); - Assertions.assertThat(spark.table(tableName + ".branch_main").count()) + assertThat(spark.table(tableName).count()).isEqualTo(0L); + assertThat(spark.table(tableName + ".branch_" + branch).count()).isEqualTo(0L); + assertThat(spark.table(tableName + ".branch_main").count()) .as("Should not modify main branch") .isEqualTo(3L); }); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 2694d522c913..9f8ac9c993c2 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; import static org.apache.spark.sql.functions.lit; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Arrays; @@ -45,7 +46,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; -import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; @@ -69,48 +70,25 @@ import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; public abstract class TestMerge extends SparkRowLevelOperationsTestBase { - public TestMerge( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - - @BeforeClass + @BeforeAll public static void setupSparkConf() { spark.conf().set("spark.sql.shuffle.partitions", "4"); } - @After + @AfterEach public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); sql("DROP TABLE IF EXISTS source"); } - @Test + @TestTemplate public void testMergeWithAllClauses() { createAndInitTable( "id INT, dep STRING", @@ -152,7 +130,7 @@ public void testMergeWithAllClauses() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithOneNotMatchedBySourceClause() { createAndInitTable( "id INT, dep STRING", @@ -180,7 +158,7 @@ public void testMergeWithOneNotMatchedBySourceClause() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeNotMatchedBySourceClausesPartitionedTable() { createAndInitTable( "id INT, dep STRING", @@ -209,7 +187,7 @@ public void testMergeNotMatchedBySourceClausesPartitionedTable() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithVectorizedReads() { assumeThat(supportsVectorization()).isTrue(); @@ -251,7 +229,7 @@ public void testMergeWithVectorizedReads() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testCoalesceMerge() { createAndInitTable("id INT, salary INT, dep STRING"); @@ -322,13 +300,12 @@ public void testCoalesceMerge() { validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1"); } - Assert.assertEquals( - "Row count must match", - 400L, - scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget())); + assertThat(scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget())) + .as("Row count must match") + .isEqualTo(400L); } - @Test + @TestTemplate public void testSkewMerge() { createAndInitTable("id INT, salary INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -382,7 +359,7 @@ public void testSkewMerge() { + "WHEN MATCHED THEN " + " UPDATE SET salary = -1 ", commitTarget()); - Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); }); Table table = validationCatalog.loadTable(tableIdent); @@ -406,13 +383,12 @@ public void testSkewMerge() { validateProperty(currentSnapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4"); } - Assert.assertEquals( - "Row count must match", - 400L, - scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget())); + assertThat(scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", commitTarget())) + .as("Row count must match") + .isEqualTo(400L); } - @Test + @TestTemplate public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() { createAndInitTable( "id INT, salary INT, dep STRING, sub_dep STRING", @@ -461,7 +437,7 @@ public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithStaticPredicatePushDown() { createAndInitTable("id BIGINT, dep STRING"); @@ -478,7 +454,7 @@ public void testMergeWithStaticPredicatePushDown() { Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); String dataFilesCount = snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP); - Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount); + assertThat(dataFilesCount).as("Must have 2 files before MERGE").isEqualTo("2"); createOrReplaceView( "source", "{ \"id\": 1, \"dep\": \"finance\" }\n" + "{ \"id\": 2, \"dep\": \"hardware\" }"); @@ -516,9 +492,9 @@ public void testMergeWithStaticPredicatePushDown() { sql("SELECT * FROM %s ORDER BY id, dep", selectTarget())); } - @Test + @TestTemplate public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() { - Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(branch)); + assumeThat(branch).as("Custom branch does not exist for empty table").isNotEqualTo("test"); createAndInitTable("id INT, dep STRING"); createOrReplaceView( @@ -547,9 +523,9 @@ public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() { - Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(branch)); + assumeThat(branch).as("Custom branch does not exist for empty table").isNotEqualTo("test"); createAndInitTable("id INT, dep STRING"); createOrReplaceView( @@ -577,7 +553,7 @@ public void testMergeIntoEmptyTargetInsertOnlyMatchingRows() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithOnlyUpdateClause() { createAndInitTable( "id INT, dep STRING", @@ -608,7 +584,7 @@ public void testMergeWithOnlyUpdateClause() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithOnlyUpdateNullUnmatchedValues() { createAndInitTable( "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, \"value\": null }"); @@ -634,7 +610,7 @@ public void testMergeWithOnlyUpdateNullUnmatchedValues() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() { createAndInitTable( "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, \"value\": null }"); @@ -660,7 +636,7 @@ public void testMergeWithOnlyUpdateSingleFieldNullUnmatchedValues() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithOnlyDeleteNullUnmatchedValues() { createAndInitTable( "id INT, value INT", "{ \"id\": 1, \"value\": 2 }\n" + "{ \"id\": 6, \"value\": null }"); @@ -680,7 +656,7 @@ public void testMergeWithOnlyDeleteNullUnmatchedValues() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithOnlyUpdateClauseAndNullValues() { createAndInitTable( "id INT, dep STRING", @@ -713,7 +689,7 @@ public void testMergeWithOnlyUpdateClauseAndNullValues() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithOnlyDeleteClause() { createAndInitTable( "id INT, dep STRING", @@ -743,7 +719,7 @@ public void testMergeWithOnlyDeleteClause() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMatchedAndNotMatchedClauses() { createAndInitTable( "id INT, dep STRING", @@ -778,7 +754,7 @@ public void testMergeWithMatchedAndNotMatchedClauses() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithAllCausesWithExplicitColumnSpecification() { createAndInitTable( "id INT, dep STRING", @@ -813,7 +789,7 @@ public void testMergeWithAllCausesWithExplicitColumnSpecification() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithSourceCTE() { createAndInitTable( "id INT, dep STRING", @@ -849,7 +825,7 @@ public void testMergeWithSourceCTE() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithSourceFromSetOps() { createAndInitTable( "id INT, dep STRING", @@ -889,7 +865,7 @@ public void testMergeWithSourceFromSetOps() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithOneMatchingBranchButMultipleSourceRowsForTargetRow() { createAndInitTable( "id INT, dep STRING", @@ -924,7 +900,7 @@ public void testMergeWithOneMatchingBranchButMultipleSourceRowsForTargetRow() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSource() { createAndInitTable( "id INT, dep STRING", @@ -961,7 +937,7 @@ public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSource() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceEnabledHashShuffleJoin() { createAndInitTable( @@ -1003,7 +979,7 @@ public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSource() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoEqualityCondition() { createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }"); @@ -1042,7 +1018,7 @@ public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoEqua sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActions() { createAndInitTable( "id INT, dep STRING", @@ -1076,7 +1052,7 @@ public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotM sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotMatchedActionsNoEqualityCondition() { createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"emp-id-one\" }"); @@ -1110,7 +1086,7 @@ public void testMergeWithMultipleUpdatesForTargetRowSmallTargetLargeSourceNoNotM sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleUpdatesForTargetRow() { createAndInitTable( "id INT, dep STRING", @@ -1148,7 +1124,7 @@ public void testMergeWithMultipleUpdatesForTargetRow() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testMergeWithUnconditionalDelete() { createAndInitTable( "id INT, dep STRING", @@ -1181,7 +1157,7 @@ public void testMergeWithUnconditionalDelete() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithSingleConditionalDelete() { createAndInitTable( "id INT, dep STRING", @@ -1217,7 +1193,7 @@ public void testMergeWithSingleConditionalDelete() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testMergeWithIdentityTransform() { for (DistributionMode mode : DistributionMode.values()) { createAndInitTable("id INT, dep STRING"); @@ -1263,7 +1239,7 @@ public void testMergeWithIdentityTransform() { } } - @Test + @TestTemplate public void testMergeWithDaysTransform() { for (DistributionMode mode : DistributionMode.values()) { createAndInitTable("id INT, ts TIMESTAMP"); @@ -1311,7 +1287,7 @@ public void testMergeWithDaysTransform() { } } - @Test + @TestTemplate public void testMergeWithBucketTransform() { for (DistributionMode mode : DistributionMode.values()) { createAndInitTable("id INT, dep STRING"); @@ -1357,7 +1333,7 @@ public void testMergeWithBucketTransform() { } } - @Test + @TestTemplate public void testMergeWithTruncateTransform() { for (DistributionMode mode : DistributionMode.values()) { createAndInitTable("id INT, dep STRING"); @@ -1403,7 +1379,7 @@ public void testMergeWithTruncateTransform() { } } - @Test + @TestTemplate public void testMergeIntoPartitionedAndOrderedTable() { for (DistributionMode mode : DistributionMode.values()) { createAndInitTable("id INT, dep STRING"); @@ -1450,7 +1426,7 @@ public void testMergeIntoPartitionedAndOrderedTable() { } } - @Test + @TestTemplate public void testSelfMerge() { createAndInitTable( "id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1473,7 +1449,7 @@ public void testSelfMerge() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testSelfMergeWithCaching() { createAndInitTable( "id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1498,7 +1474,7 @@ public void testSelfMergeWithCaching() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", commitTarget())); } - @Test + @TestTemplate public void testMergeWithSourceAsSelfSubquery() { createAndInitTable( "id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1523,13 +1499,13 @@ public void testMergeWithSourceAsSelfSubquery() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public synchronized void testMergeWithSerializableIsolation() throws InterruptedException { // cannot run tests with concurrency for Hadoop tables without atomic renames - Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop")); + assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop"); // if caching is off, the table is eagerly refreshed during runtime filtering // this can cause a validation exception as concurrent changes would be visible - Assume.assumeTrue(cachingCatalogEnabled()); + assumeThat(cachingCatalogEnabled()).isTrue(); createAndInitTable("id INT, dep STRING"); createOrReplaceView("source", Collections.singletonList(1), Encoders.INT()); @@ -1553,9 +1529,11 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted executorService.submit( () -> { for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { - while (barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> barrier.get() >= currentNumOperations * 2); sql( "MERGE INTO %s t USING source s " @@ -1580,9 +1558,11 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted record.set(1, "hr"); // dep for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { - while (shouldAppend.get() && barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> !shouldAppend.get() || barrier.get() >= currentNumOperations * 2); if (!shouldAppend.get()) { return; @@ -1595,7 +1575,6 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted appendFiles.toBranch(branch); } appendFiles.commit(); - sleep(10); } barrier.incrementAndGet(); @@ -1614,17 +1593,17 @@ public synchronized void testMergeWithSerializableIsolation() throws Interrupted } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public synchronized void testMergeWithSnapshotIsolation() throws InterruptedException, ExecutionException { // cannot run tests with concurrency for Hadoop tables without atomic renames - Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop")); + assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop"); // if caching is off, the table is eagerly refreshed during runtime filtering // this can cause a validation exception as concurrent changes would be visible - Assume.assumeTrue(cachingCatalogEnabled()); + assumeThat(cachingCatalogEnabled()).isTrue(); createAndInitTable("id INT, dep STRING"); createOrReplaceView("source", Collections.singletonList(1), Encoders.INT()); @@ -1648,9 +1627,11 @@ public synchronized void testMergeWithSnapshotIsolation() executorService.submit( () -> { for (int numOperations = 0; numOperations < 20; numOperations++) { - while (barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> barrier.get() >= currentNumOperations * 2); sql( "MERGE INTO %s t USING source s " @@ -1675,9 +1656,11 @@ public synchronized void testMergeWithSnapshotIsolation() record.set(1, "hr"); // dep for (int numOperations = 0; numOperations < 20; numOperations++) { - while (shouldAppend.get() && barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> !shouldAppend.get() || barrier.get() >= currentNumOperations * 2); if (!shouldAppend.get()) { return; @@ -1691,7 +1674,6 @@ public synchronized void testMergeWithSnapshotIsolation() } appendFiles.commit(); - sleep(10); } barrier.incrementAndGet(); @@ -1706,10 +1688,10 @@ public synchronized void testMergeWithSnapshotIsolation() } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public void testMergeWithExtraColumnsInSource() { createAndInitTable( "id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1739,7 +1721,7 @@ public void testMergeWithExtraColumnsInSource() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithNullsInTargetAndSource() { createAndInitTable( "id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1767,7 +1749,7 @@ public void testMergeWithNullsInTargetAndSource() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", selectTarget())); } - @Test + @TestTemplate public void testMergeWithNullSafeEquals() { createAndInitTable( "id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1794,7 +1776,7 @@ public void testMergeWithNullSafeEquals() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", selectTarget())); } - @Test + @TestTemplate public void testMergeWithNullCondition() { createAndInitTable( "id INT, v STRING", "{ \"id\": null, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1822,7 +1804,7 @@ public void testMergeWithNullCondition() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", selectTarget())); } - @Test + @TestTemplate public void testMergeWithNullActionConditions() { createAndInitTable( "id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1873,7 +1855,7 @@ public void testMergeWithNullActionConditions() { "Output should match", expectedRows2, sql("SELECT * FROM %s ORDER BY v", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleMatchingActions() { createAndInitTable( "id INT, v STRING", "{ \"id\": 1, \"v\": \"v1\" }\n" + "{ \"id\": 2, \"v\": \"v2\" }"); @@ -1902,9 +1884,9 @@ public void testMergeWithMultipleMatchingActions() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY v", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException { - Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet")); + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -1928,7 +1910,7 @@ public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException df.coalesce(1).writeTo(tableName).append(); createBranchIfNeeded(); - Assert.assertEquals(200, spark.table(commitTarget()).count()); + assertThat(spark.table(commitTarget()).count()).isEqualTo(200); // update a record from one of two row groups and copy over the second one sql( @@ -1938,10 +1920,10 @@ public void testMergeWithMultipleRowGroupsParquet() throws NoSuchTableException + " UPDATE SET dep = 'x'", commitTarget()); - Assert.assertEquals(200, spark.table(commitTarget()).count()); + assertThat(spark.table(commitTarget()).count()).isEqualTo(200); } - @Test + @TestTemplate public void testMergeInsertOnly() { createAndInitTable( "id STRING, v STRING", @@ -1973,7 +1955,7 @@ public void testMergeInsertOnly() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeInsertOnlyWithCondition() { createAndInitTable("id INTEGER, v INTEGER", "{ \"id\": 1, \"v\": 1 }"); createOrReplaceView( @@ -1999,7 +1981,7 @@ public void testMergeInsertOnlyWithCondition() { "Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeAlignsUpdateAndInsertActions() { createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }"); createOrReplaceView( @@ -2022,7 +2004,7 @@ public void testMergeAlignsUpdateAndInsertActions() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeMixedCaseAlignsUpdateAndInsertActions() { createAndInitTable("id INT, a INT, b STRING", "{ \"id\": 1, \"a\": 2, \"b\": \"str\" }"); createOrReplaceView( @@ -2054,7 +2036,7 @@ public void testMergeMixedCaseAlignsUpdateAndInsertActions() { sql("SELECT * FROM %s WHERE b = 'new_str_2'ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeUpdatesNestedStructFields() { createAndInitTable( "id INT, s STRUCT,m:MAP>>", @@ -2101,7 +2083,7 @@ public void testMergeUpdatesNestedStructFields() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithInferredCasts() { createAndInitTable("id INT, s STRING", "{ \"id\": 1, \"s\": \"value\" }"); createOrReplaceView("source", "{ \"id\": 1, \"c1\": -2}"); @@ -2120,7 +2102,7 @@ public void testMergeWithInferredCasts() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeModifiesNullStruct() { createAndInitTable("id INT, s STRUCT", "{ \"id\": 1, \"s\": null }"); createOrReplaceView("source", "{ \"id\": 1, \"n1\": -10 }"); @@ -2138,7 +2120,7 @@ public void testMergeModifiesNullStruct() { sql("SELECT * FROM %s", selectTarget())); } - @Test + @TestTemplate public void testMergeRefreshesRelationCache() { createAndInitTable("id INT, name STRING", "{ \"id\": 1, \"name\": \"n1\" }"); createOrReplaceView("source", "{ \"id\": 1, \"name\": \"n2\" }"); @@ -2164,7 +2146,7 @@ public void testMergeRefreshesRelationCache() { spark.sql("UNCACHE TABLE tmp"); } - @Test + @TestTemplate public void testMergeWithMultipleNotMatchedActions() { createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }"); @@ -2197,7 +2179,7 @@ public void testMergeWithMultipleNotMatchedActions() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithMultipleConditionalNotMatchedActions() { createAndInitTable("id INT, dep STRING", "{ \"id\": 0, \"dep\": \"emp-id-0\" }"); @@ -2229,7 +2211,7 @@ public void testMergeWithMultipleConditionalNotMatchedActions() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeResolvesColumnsByName() { createAndInitTable( "id INT, badge INT, dep STRING", @@ -2264,7 +2246,7 @@ public void testMergeResolvesColumnsByName() { sql("SELECT id, badge, dep FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeShouldResolveWhenThereAreNoUnresolvedExpressionsOrColumns() { // ensures that MERGE INTO will resolve into the correct action even if no columns // or otherwise unresolved expressions exist in the query (testing SPARK-34962) @@ -2299,7 +2281,7 @@ public void testMergeShouldResolveWhenThereAreNoUnresolvedExpressionsOrColumns() sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithTableWithNonNullableColumn() { createAndInitTable( "id INT NOT NULL, dep STRING", @@ -2333,7 +2315,7 @@ public void testMergeWithTableWithNonNullableColumn() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithNonExistingColumns() { createAndInitTable( "id INT, c STRUCT>", @@ -2377,7 +2359,7 @@ public void testMergeWithNonExistingColumns() { "A column or function parameter with name `invalid_col` cannot be resolved"); } - @Test + @TestTemplate public void testMergeWithInvalidColumnsInInsert() { createAndInitTable( "id INT, c STRUCT> NOT NULL", @@ -2420,7 +2402,7 @@ public void testMergeWithInvalidColumnsInInsert() { .hasMessageContaining("No assignment for 'c'"); } - @Test + @TestTemplate public void testMergeWithMissingOptionalColumnsInInsert() { createAndInitTable("id INT, value LONG", "{ \"id\": 1, \"value\": 100}"); createOrReplaceView("source", "{ \"c1\": 2, \"c2\": 200 }"); @@ -2440,7 +2422,7 @@ public void testMergeWithMissingOptionalColumnsInInsert() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testMergeWithInvalidUpdates() { createAndInitTable( "id INT, a ARRAY>, m MAP", @@ -2469,7 +2451,7 @@ public void testMergeWithInvalidUpdates() { .hasMessageContaining("Updating nested fields is only supported for StructType"); } - @Test + @TestTemplate public void testMergeWithConflictingUpdates() { createAndInitTable( "id INT, c STRUCT>", @@ -2508,7 +2490,7 @@ public void testMergeWithConflictingUpdates() { .hasMessageContaining("Conflicting assignments for 'c'"); } - @Test + @TestTemplate public void testMergeWithInvalidAssignmentsAnsi() { createAndInitTable( "id INT NOT NULL, s STRUCT> NOT NULL", @@ -2575,7 +2557,7 @@ public void testMergeWithInvalidAssignmentsAnsi() { }); } - @Test + @TestTemplate public void testMergeWithInvalidAssignmentsStrict() { createAndInitTable( "id INT NOT NULL, s STRUCT> NOT NULL", @@ -2643,7 +2625,7 @@ public void testMergeWithInvalidAssignmentsStrict() { }); } - @Test + @TestTemplate public void testMergeWithNonDeterministicConditions() { createAndInitTable( "id INT, c STRUCT>", @@ -2697,7 +2679,7 @@ public void testMergeWithNonDeterministicConditions() { "MERGE operation contains unsupported INSERT condition. Non-deterministic expressions are not allowed"); } - @Test + @TestTemplate public void testMergeWithAggregateExpressions() { createAndInitTable( "id INT, c STRUCT>", @@ -2752,7 +2734,7 @@ public void testMergeWithAggregateExpressions() { "MERGE operation contains unsupported INSERT condition. Aggregates are not allowed"); } - @Test + @TestTemplate public void testMergeWithSubqueriesInConditions() { createAndInitTable( "id INT, c STRUCT>", @@ -2807,7 +2789,7 @@ public void testMergeWithSubqueriesInConditions() { "MERGE operation contains unsupported INSERT condition. Subqueries are not allowed"); } - @Test + @TestTemplate public void testMergeWithTargetColumnsInInsertConditions() { createAndInitTable("id INT, c2 INT", "{ \"id\": 1, \"c2\": 2 }"); createOrReplaceView("source", "{ \"id\": 1, \"value\": 11 }"); @@ -2824,7 +2806,7 @@ public void testMergeWithTargetColumnsInInsertConditions() { .hasMessageContaining("A column or function parameter with name `c2` cannot be resolved"); } - @Test + @TestTemplate public void testMergeWithNonIcebergTargetTableNotSupported() { createOrReplaceView("target", "{ \"c1\": -100, \"c2\": -200 }"); createOrReplaceView("source", "{ \"c1\": -100, \"c2\": -200 }"); @@ -2844,7 +2826,7 @@ public void testMergeWithNonIcebergTargetTableNotSupported() { * Tests a merge where both the source and target are evaluated to be partitioned by * SingePartition at planning time but DynamicFileFilterExec will return an empty target. */ - @Test + @TestTemplate public void testMergeSinglePartitionPartitioning() { // This table will only have a single file and a single partition createAndInitTable("id INT", "{\"id\": -1}"); @@ -2865,9 +2847,9 @@ public void testMergeSinglePartitionPartitioning() { assertEquals("Should correctly add the non-matching rows", expectedRows, result); } - @Test + @TestTemplate public void testMergeEmptyTable() { - Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(branch)); + assumeThat(branch).as("Custom branch does not exist for empty table").isNotEqualTo("test"); // This table will only have a single file and a single partition createAndInitTable("id INT", null); @@ -2886,9 +2868,9 @@ public void testMergeEmptyTable() { assertEquals("Should correctly add the non-matching rows", expectedRows, result); } - @Test + @TestTemplate public void testMergeNonExistingBranch() { - Assume.assumeTrue("Test only applicable to custom branch", "test".equals(branch)); + assumeThat(branch).as("Test only applicable to custom branch").isEqualTo("test"); createAndInitTable("id INT", null); // Coalesce forces our source into a SinglePartition distribution @@ -2904,9 +2886,9 @@ public void testMergeNonExistingBranch() { .hasMessage("Cannot use branch (does not exist): test"); } - @Test + @TestTemplate public void testMergeToWapBranch() { - Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null); + assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull(); createAndInitTable("id INT", "{\"id\": -1}"); ImmutableList originalRows = ImmutableList.of(row(-1)); @@ -2965,9 +2947,9 @@ public void testMergeToWapBranch() { }); } - @Test + @TestTemplate public void testMergeToWapBranchWithTableBranchIdentifier() { - Assume.assumeTrue("Test must have branch name part in table identifier", branch != null); + assumeThat(branch).as("Test must have branch name part in table identifier").isNotNull(); createAndInitTable("id INT", "{\"id\": -1}"); sql( @@ -3004,9 +2986,9 @@ private void checkJoinAndFilterConditions(String query, String join, String iceb SparkPlan sparkPlan = executeAndKeepPlan(() -> sql(query)); String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", ""); - Assertions.assertThat(planAsString).as("Join should match").contains(join + "\n"); + assertThat(planAsString).as("Join should match").contains(join + "\n"); - Assertions.assertThat(planAsString) + assertThat(planAsString) .as("Pushed filters must match") .contains("[filters=" + icebergFilters + ","); }); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java index 91600d4df08d..0d6f3f395f01 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RowDelta; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; @@ -44,34 +44,13 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.assertj.core.api.Assertions; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestMergeOnReadDelete extends TestDelete { - public TestMergeOnReadDelete( - String catalogName, - String implementation, - Map config, - String fileFormat, - Boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - @Override protected Map extraTableProperties() { return ImmutableMap.of( @@ -81,12 +60,12 @@ protected Map extraTableProperties() { RowLevelOperationMode.MERGE_ON_READ.modeName()); } - @Parameterized.AfterParam - public static void clearTestSparkCatalogCache() { + @BeforeEach + public void clearTestSparkCatalogCache() { TestSparkCatalog.clearTables(); } - @Test + @TestTemplate public void testDeleteWithExecutorCacheLocality() throws NoSuchTableException { createAndInitPartitionedTable(); @@ -110,12 +89,12 @@ public void testDeleteWithExecutorCacheLocality() throws NoSuchTableException { }); } - @Test + @TestTemplate public void testDeleteFileGranularity() throws NoSuchTableException { checkDeleteFileGranularity(DeleteGranularity.FILE); } - @Test + @TestTemplate public void testDeletePartitionGranularity() throws NoSuchTableException { checkDeleteFileGranularity(DeleteGranularity.PARTITION); } @@ -150,7 +129,7 @@ private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity) sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget())); } - @Test + @TestTemplate public void testCommitUnknownException() { createAndInitTable("id INT, dep STRING, category STRING"); @@ -208,7 +187,7 @@ public void testCommitUnknownException() { sql("SELECT * FROM %s ORDER BY id", "dummy_catalog.default.table")); } - @Test + @TestTemplate public void testAggregatePushDownInMergeOnReadDelete() { createAndInitTable("id LONG, data INT"); sql( @@ -228,8 +207,9 @@ public void testAggregatePushDownInMergeOnReadDelete() { explainContainsPushDownAggregates = true; } - Assert.assertFalse( - "min/max/count not pushed down for deleted", explainContainsPushDownAggregates); + assertThat(explainContainsPushDownAggregates) + .as("min/max/count not pushed down for deleted") + .isFalse(); List actual = sql(select, selectTarget()); List expected = Lists.newArrayList(); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java index f9c13d828cd3..71ca3421f28d 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java @@ -21,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Map; -import org.apache.iceberg.PlanningMode; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -31,32 +30,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.Encoders; -import org.junit.Test; +import org.junit.jupiter.api.TestTemplate; public class TestMergeOnReadMerge extends TestMerge { - public TestMergeOnReadMerge( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - @Override protected Map extraTableProperties() { return ImmutableMap.of( @@ -66,12 +43,12 @@ protected Map extraTableProperties() { RowLevelOperationMode.MERGE_ON_READ.modeName()); } - @Test + @TestTemplate public void testMergeDeleteFileGranularity() { checkMergeDeleteGranularity(DeleteGranularity.FILE); } - @Test + @TestTemplate public void testMergeDeletePartitionGranularity() { checkMergeDeleteGranularity(DeleteGranularity.PARTITION); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java index 45ef343b2dfe..391fae4ea696 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java @@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Map; -import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -30,32 +30,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.SnapshotUtil; -import org.junit.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestMergeOnReadUpdate extends TestUpdate { - public TestMergeOnReadUpdate( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - @Override protected Map extraTableProperties() { return ImmutableMap.of( @@ -65,12 +45,12 @@ protected Map extraTableProperties() { RowLevelOperationMode.MERGE_ON_READ.modeName()); } - @Test + @TestTemplate public void testUpdateFileGranularity() { checkUpdateFileGranularity(DeleteGranularity.FILE); } - @Test + @TestTemplate public void testUpdatePartitionGranularity() { checkUpdateFileGranularity(DeleteGranularity.PARTITION); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 4cc8845befee..09ec61a6ad4e 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -31,6 +31,7 @@ import static org.apache.iceberg.TableProperties.UPDATE_MODE; import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT; import static org.apache.spark.sql.functions.lit; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import java.util.Arrays; @@ -47,7 +48,8 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; -import org.apache.iceberg.PlanningMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.RowLevelOperationMode; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotSummary; @@ -72,42 +74,21 @@ import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public abstract class TestUpdate extends SparkRowLevelOperationsTestBase { - public TestUpdate( - String catalogName, - String implementation, - Map config, - String fileFormat, - boolean vectorized, - String distributionMode, - boolean fanoutEnabled, - String branch, - PlanningMode planningMode) { - super( - catalogName, - implementation, - config, - fileFormat, - vectorized, - distributionMode, - fanoutEnabled, - branch, - planningMode); - } - - @BeforeClass + @BeforeAll public static void setupSparkConf() { spark.conf().set("spark.sql.shuffle.partitions", "4"); } - @After + @AfterEach public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); sql("DROP TABLE IF EXISTS updated_id"); @@ -115,7 +96,7 @@ public void removeTables() { sql("DROP TABLE IF EXISTS deleted_employee"); } - @Test + @TestTemplate public void testUpdateWithVectorizedReads() { assumeThat(supportsVectorization()).isTrue(); @@ -134,7 +115,7 @@ public void testUpdateWithVectorizedReads() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testCoalesceUpdate() { createAndInitTable("id INT, dep STRING"); @@ -177,7 +158,7 @@ public void testCoalesceUpdate() { () -> { SparkPlan plan = executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget()); - Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); }); Table table = validationCatalog.loadTable(tableIdent); @@ -198,13 +179,12 @@ public void testCoalesceUpdate() { validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1"); } - Assert.assertEquals( - "Row count must match", - 200L, - scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget())); + assertThat(scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget())) + .as("Row count must match") + .isEqualTo(200L); } - @Test + @TestTemplate public void testSkewUpdate() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -248,7 +228,7 @@ public void testSkewUpdate() { () -> { SparkPlan plan = executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget()); - Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); + assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL"); }); Table table = validationCatalog.loadTable(tableIdent); @@ -270,13 +250,12 @@ public void testSkewUpdate() { validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4"); } - Assert.assertEquals( - "Row count must match", - 200L, - scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget())); + assertThat(scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget())) + .as("Row count must match") + .isEqualTo(200L); } - @Test + @TestTemplate public void testExplain() { createAndInitTable("id INT, dep STRING"); @@ -288,7 +267,7 @@ public void testExplain() { sql("EXPLAIN UPDATE %s SET dep = 'invalid' WHERE true", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 1 snapshot", 1, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 1 snapshot").hasSize(1); assertEquals( "Should have expected rows", @@ -296,16 +275,16 @@ public void testExplain() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testUpdateEmptyTable() { - Assume.assumeFalse("Custom branch does not exist for empty table", "test".equals(branch)); + assumeThat(branch).as("Custom branch does not exist for empty table").isNotEqualTo("test"); createAndInitTable("id INT, dep STRING"); sql("UPDATE %s SET dep = 'invalid' WHERE id IN (1)", commitTarget()); sql("UPDATE %s SET id = -1 WHERE dep = 'hr'", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 2 snapshots").hasSize(2); assertEquals( "Should have expected rows", @@ -313,9 +292,9 @@ public void testUpdateEmptyTable() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testUpdateNonExistingCustomBranch() { - Assume.assumeTrue("Test only applicable to custom branch", "test".equals(branch)); + assumeThat(branch).as("Test only applicable to custom branch").isEqualTo("test"); createAndInitTable("id INT, dep STRING"); Assertions.assertThatThrownBy( @@ -324,7 +303,7 @@ public void testUpdateNonExistingCustomBranch() { .hasMessage("Cannot use branch (does not exist): test"); } - @Test + @TestTemplate public void testUpdateWithAlias() { createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"a\" }"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -332,7 +311,7 @@ public void testUpdateWithAlias() { sql("UPDATE %s AS t SET t.dep = 'invalid'", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 2 snapshots").hasSize(2); assertEquals( "Should have expected rows", @@ -340,7 +319,7 @@ public void testUpdateWithAlias() { sql("SELECT * FROM %s", selectTarget())); } - @Test + @TestTemplate public void testUpdateAlignsAssignments() { createAndInitTable("id INT, c1 INT, c2 INT"); @@ -355,7 +334,7 @@ public void testUpdateAlignsAssignments() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithUnsupportedPartitionPredicate() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -371,7 +350,7 @@ public void testUpdateWithUnsupportedPartitionPredicate() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithDynamicFileFiltering() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -385,7 +364,7 @@ public void testUpdateWithDynamicFileFiltering() { sql("UPDATE %s SET id = cast('-1' AS INT) WHERE id = 2", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { @@ -400,7 +379,7 @@ public void testUpdateWithDynamicFileFiltering() { sql("SELECT * FROM %s ORDER BY id, dep", commitTarget())); } - @Test + @TestTemplate public void testUpdateNonExistingRecords() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -411,7 +390,7 @@ public void testUpdateNonExistingRecords() { sql("UPDATE %s SET id = -1 WHERE id > 10", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 2 snapshots", 2, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 2 snapshots").hasSize(2); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { @@ -426,7 +405,7 @@ public void testUpdateNonExistingRecords() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithoutCondition() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -448,13 +427,13 @@ public void testUpdateWithoutCondition() { }); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 4 snapshots").hasSize(4); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); - Assert.assertEquals("Operation must match", OVERWRITE, currentSnapshot.operation()); + assertThat(currentSnapshot.operation()).as("Operation must match").isEqualTo(OVERWRITE); if (mode(table) == COPY_ON_WRITE) { - Assert.assertEquals("Operation must match", OVERWRITE, currentSnapshot.operation()); + assertThat(currentSnapshot.operation()).as("Operation must match").isEqualTo(OVERWRITE); validateProperty(currentSnapshot, CHANGED_PARTITION_COUNT_PROP, "2"); validateProperty(currentSnapshot, DELETED_FILES_PROP, "3"); validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3")); @@ -468,7 +447,7 @@ public void testUpdateWithoutCondition() { sql("SELECT * FROM %s ORDER BY dep ASC", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithNullConditions() { createAndInitTable("id INT, dep STRING"); @@ -501,7 +480,7 @@ public void testUpdateWithNullConditions() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithInAndNotInConditions() { createAndInitTable("id INT, dep STRING"); @@ -531,9 +510,9 @@ public void testUpdateWithInAndNotInConditions() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST, dep", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithMultipleRowGroupsParquet() throws NoSuchTableException { - Assume.assumeTrue(fileFormat.equalsIgnoreCase("parquet")); + assumeThat(fileFormat).isEqualTo(FileFormat.PARQUET); createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -555,15 +534,15 @@ public void testUpdateWithMultipleRowGroupsParquet() throws NoSuchTableException df.coalesce(1).writeTo(tableName).append(); createBranchIfNeeded(); - Assert.assertEquals(200, spark.table(commitTarget()).count()); + assertThat(spark.table(commitTarget()).count()).isEqualTo(200); // update a record from one of two row groups and copy over the second one sql("UPDATE %s SET id = -1 WHERE id IN (200, 201)", commitTarget()); - Assert.assertEquals(200, spark.table(commitTarget()).count()); + assertThat(spark.table(commitTarget()).count()).isEqualTo(200); } - @Test + @TestTemplate public void testUpdateNestedStructFields() { createAndInitTable( "id INT, s STRUCT,m:MAP>>", @@ -596,7 +575,7 @@ public void testUpdateNestedStructFields() { sql("SELECT * FROM %s", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithUserDefinedDistribution() { createAndInitTable("id INT, c2 INT, c3 INT"); sql("ALTER TABLE %s ADD PARTITION FIELD bucket(8, c3)", tableName); @@ -633,13 +612,13 @@ public void testUpdateWithUserDefinedDistribution() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public synchronized void testUpdateWithSerializableIsolation() throws InterruptedException { // cannot run tests with concurrency for Hadoop tables without atomic renames - Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop")); + assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop"); // if caching is off, the table is eagerly refreshed during runtime filtering // this can cause a validation exception as concurrent changes would be visible - Assume.assumeTrue(cachingCatalogEnabled()); + assumeThat(cachingCatalogEnabled()).isTrue(); createAndInitTable("id INT, dep STRING"); @@ -662,9 +641,11 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte executorService.submit( () -> { for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { - while (barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> barrier.get() >= currentNumOperations * 2); sql("UPDATE %s SET id = -1 WHERE id = 1", commitTarget()); @@ -684,9 +665,11 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte record.set(1, "hr"); // dep for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) { - while (shouldAppend.get() && barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> !shouldAppend.get() || barrier.get() >= currentNumOperations * 2); if (!shouldAppend.get()) { return; @@ -700,7 +683,6 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte } appendFiles.commit(); - sleep(10); } barrier.incrementAndGet(); @@ -719,17 +701,17 @@ public synchronized void testUpdateWithSerializableIsolation() throws Interrupte } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public synchronized void testUpdateWithSnapshotIsolation() throws InterruptedException, ExecutionException { // cannot run tests with concurrency for Hadoop tables without atomic renames - Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop")); + assumeThat(catalogName).isNotEqualToIgnoringCase("testhadoop"); // if caching is off, the table is eagerly refreshed during runtime filtering // this can cause a validation exception as concurrent changes would be visible - Assume.assumeTrue(cachingCatalogEnabled()); + assumeThat(cachingCatalogEnabled()).isTrue(); createAndInitTable("id INT, dep STRING"); @@ -752,9 +734,11 @@ public synchronized void testUpdateWithSnapshotIsolation() executorService.submit( () -> { for (int numOperations = 0; numOperations < 20; numOperations++) { - while (barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> barrier.get() >= currentNumOperations * 2); sql("UPDATE %s SET id = -1 WHERE id = 1", tableName); @@ -774,9 +758,11 @@ public synchronized void testUpdateWithSnapshotIsolation() record.set(1, "hr"); // dep for (int numOperations = 0; numOperations < 20; numOperations++) { - while (shouldAppend.get() && barrier.get() < numOperations * 2) { - sleep(10); - } + int currentNumOperations = numOperations; + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> !shouldAppend.get() || barrier.get() >= currentNumOperations * 2); if (!shouldAppend.get()) { return; @@ -790,7 +776,6 @@ public synchronized void testUpdateWithSnapshotIsolation() } appendFiles.commit(); - sleep(10); } barrier.incrementAndGet(); @@ -805,10 +790,10 @@ public synchronized void testUpdateWithSnapshotIsolation() } executorService.shutdown(); - Assert.assertTrue("Timeout", executorService.awaitTermination(2, TimeUnit.MINUTES)); + assertThat(executorService.awaitTermination(2, TimeUnit.MINUTES)).as("Timeout").isTrue(); } - @Test + @TestTemplate public void testUpdateWithInferredCasts() { createAndInitTable("id INT, s STRING", "{ \"id\": 1, \"s\": \"value\" }"); @@ -820,7 +805,7 @@ public void testUpdateWithInferredCasts() { sql("SELECT * FROM %s", selectTarget())); } - @Test + @TestTemplate public void testUpdateModifiesNullStruct() { createAndInitTable("id INT, s STRUCT", "{ \"id\": 1, \"s\": null }"); @@ -832,7 +817,7 @@ public void testUpdateModifiesNullStruct() { sql("SELECT * FROM %s", selectTarget())); } - @Test + @TestTemplate public void testUpdateRefreshesRelationCache() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -857,7 +842,7 @@ public void testUpdateRefreshesRelationCache() { sql("UPDATE %s SET id = -1 WHERE id = 1", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { @@ -879,7 +864,7 @@ public void testUpdateRefreshesRelationCache() { spark.sql("UNCACHE TABLE tmp"); } - @Test + @TestTemplate public void testUpdateWithInSubquery() { createAndInitTable("id INT, dep STRING"); @@ -929,7 +914,7 @@ public void testUpdateWithInSubquery() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST, dep", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithInSubqueryAndDynamicFileFiltering() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -947,7 +932,7 @@ public void testUpdateWithInSubqueryAndDynamicFileFiltering() { sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM updated_id)", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 3 snapshots", 3, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 3 snapshots").hasSize(3); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { @@ -962,7 +947,7 @@ public void testUpdateWithInSubqueryAndDynamicFileFiltering() { sql("SELECT * FROM %s ORDER BY id, dep", commitTarget())); } - @Test + @TestTemplate public void testUpdateWithSelfSubquery() { createAndInitTable("id INT, dep STRING"); @@ -998,7 +983,7 @@ public void testUpdateWithSelfSubquery() { sql("SELECT * FROM %s ORDER BY id, dep", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithMultiColumnInSubquery() { createAndInitTable("id INT, dep STRING"); @@ -1022,7 +1007,7 @@ public void testUpdateWithMultiColumnInSubquery() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithNotInSubquery() { createAndInitTable("id INT, dep STRING"); @@ -1060,7 +1045,7 @@ public void testUpdateWithNotInSubquery() { sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST, dep", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithExistSubquery() { createAndInitTable("id INT, dep STRING"); @@ -1112,7 +1097,7 @@ public void testUpdateWithExistSubquery() { sql("SELECT * FROM %s ORDER BY id, dep", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithNotExistsSubquery() { createAndInitTable("id INT, dep STRING"); @@ -1155,7 +1140,7 @@ public void testUpdateWithNotExistsSubquery() { sql("SELECT * FROM %s ORDER BY id, dep", selectTarget())); } - @Test + @TestTemplate public void testUpdateWithScalarSubquery() { createAndInitTable("id INT, dep STRING"); @@ -1182,7 +1167,7 @@ public void testUpdateWithScalarSubquery() { }); } - @Test + @TestTemplate public void testUpdateThatRequiresGroupingBeforeWrite() { createAndInitTable("id INT, dep STRING"); sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); @@ -1220,14 +1205,15 @@ public void testUpdateThatRequiresGroupingBeforeWrite() { spark.conf().set("spark.sql.shuffle.partitions", "1"); sql("UPDATE %s t SET id = -1 WHERE id IN (SELECT * FROM updated_id)", commitTarget()); - Assert.assertEquals( - "Should have expected num of rows", 12L, spark.table(commitTarget()).count()); + assertThat(spark.table(commitTarget()).count()) + .as("Should have expected num of rows") + .isEqualTo(12L); } finally { spark.conf().set("spark.sql.shuffle.partitions", originalNumOfShufflePartitions); } } - @Test + @TestTemplate public void testUpdateWithVectorization() { createAndInitTable("id INT, dep STRING"); @@ -1250,7 +1236,7 @@ public void testUpdateWithVectorization() { }); } - @Test + @TestTemplate public void testUpdateModifyPartitionSourceField() throws NoSuchTableException { createAndInitTable("id INT, dep STRING, country STRING"); @@ -1290,10 +1276,10 @@ public void testUpdateModifyPartitionSourceField() throws NoSuchTableException { sql( "UPDATE %s SET id = -1 WHERE id IN (10, 11, 12, 13, 14, 15, 16, 17, 18, 19)", commitTarget()); - Assert.assertEquals(30L, scalarSql("SELECT count(*) FROM %s WHERE id = -1", selectTarget())); + assertThat(scalarSql("SELECT count(*) FROM %s WHERE id = -1", selectTarget())).isEqualTo(30L); } - @Test + @TestTemplate public void testUpdateWithStaticPredicatePushdown() { createAndInitTable("id INT, dep STRING"); @@ -1310,7 +1296,7 @@ public void testUpdateWithStaticPredicatePushdown() { Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch); String dataFilesCount = snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP); - Assert.assertEquals("Must have 2 files before UPDATE", "2", dataFilesCount); + assertThat(dataFilesCount).as("Must have 2 files before UPDATE").isEqualTo("2"); // remove the data file from the 'hr' partition to ensure it is not scanned DataFile dataFile = Iterables.getOnlyElement(snapshot.addedDataFiles(table.io())); @@ -1326,7 +1312,7 @@ public void testUpdateWithStaticPredicatePushdown() { }); } - @Test + @TestTemplate public void testUpdateWithInvalidUpdates() { createAndInitTable( "id INT, a ARRAY>, m MAP", @@ -1341,7 +1327,7 @@ public void testUpdateWithInvalidUpdates() { .hasMessageContaining("Updating nested fields is only supported for StructType"); } - @Test + @TestTemplate public void testUpdateWithConflictingAssignments() { createAndInitTable( "id INT, c STRUCT>", "{ \"id\": 0, \"s\": null }"); @@ -1365,7 +1351,7 @@ public void testUpdateWithConflictingAssignments() { .hasMessageContaining("Conflicting assignments for 'c'"); } - @Test + @TestTemplate public void testUpdateWithInvalidAssignmentsAnsi() { createAndInitTable( "id INT NOT NULL, s STRUCT> NOT NULL", @@ -1401,7 +1387,7 @@ public void testUpdateWithInvalidAssignmentsAnsi() { }); } - @Test + @TestTemplate public void testUpdateWithInvalidAssignmentsStrict() { createAndInitTable( "id INT NOT NULL, s STRUCT> NOT NULL", @@ -1437,7 +1423,7 @@ public void testUpdateWithInvalidAssignmentsStrict() { }); } - @Test + @TestTemplate public void testUpdateWithNonDeterministicCondition() { createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }"); @@ -1447,7 +1433,7 @@ public void testUpdateWithNonDeterministicCondition() { .hasMessageContaining("The operator expects a deterministic expression"); } - @Test + @TestTemplate public void testUpdateOnNonIcebergTableNotSupported() { createOrReplaceView("testtable", "{ \"c1\": -100, \"c2\": -200 }"); @@ -1456,9 +1442,9 @@ public void testUpdateOnNonIcebergTableNotSupported() { .hasMessage("UPDATE TABLE is not supported temporarily."); } - @Test + @TestTemplate public void testUpdateToWAPBranch() { - Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null); + assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull(); createAndInitTable( "id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, \"dep\": \"a\" }"); @@ -1470,42 +1456,36 @@ public void testUpdateToWAPBranch() { ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"), () -> { sql("UPDATE %s SET dep='hr' WHERE dep='a'", tableName); - Assert.assertEquals( - "Should have expected num of rows when reading table", - 2L, - sql("SELECT * FROM %s WHERE dep='hr'", tableName).size()); - Assert.assertEquals( - "Should have expected num of rows when reading WAP branch", - 2L, - sql("SELECT * FROM %s.branch_wap WHERE dep='hr'", tableName).size()); - Assert.assertEquals( - "Should not modify main branch", - 1L, - sql("SELECT * FROM %s.branch_main WHERE dep='hr'", tableName).size()); + assertThat(sql("SELECT * FROM %s WHERE dep='hr'", tableName)) + .as("Should have expected num of rows when reading table") + .hasSize(2); + assertThat(sql("SELECT * FROM %s.branch_wap WHERE dep='hr'", tableName)) + .as("Should have expected num of rows when reading WAP branch") + .hasSize(2); + assertThat(sql("SELECT * FROM %s.branch_main WHERE dep='hr'", tableName)) + .as("Should not modify main branch") + .hasSize(1); }); withSQLConf( ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"), () -> { sql("UPDATE %s SET dep='b' WHERE dep='hr'", tableName); - Assert.assertEquals( - "Should have expected num of rows when reading table with multiple writes", - 2L, - sql("SELECT * FROM %s WHERE dep='b'", tableName).size()); - Assert.assertEquals( - "Should have expected num of rows when reading WAP branch with multiple writes", - 2L, - sql("SELECT * FROM %s.branch_wap WHERE dep='b'", tableName).size()); - Assert.assertEquals( - "Should not modify main branch with multiple writes", - 0L, - sql("SELECT * FROM %s.branch_main WHERE dep='b'", tableName).size()); + assertThat(sql("SELECT * FROM %s WHERE dep='b'", tableName)) + .as("Should have expected num of rows when reading table with multiple writes") + .hasSize(2); + assertThat(sql("SELECT * FROM %s.branch_wap WHERE dep='b'", tableName)) + .as("Should have expected num of rows when reading WAP branch with multiple writes") + .hasSize(2); + assertThat(sql("SELECT * FROM %s.branch_main WHERE dep='b'", tableName)) + .as("Should not modify main branch with multiple writes") + .hasSize(0); }); } - @Test + @TestTemplate public void testUpdateToWapBranchWithTableBranchIdentifier() { - Assume.assumeTrue("Test must have branch name part in table identifier", branch != null); + assumeThat(branch).as("Test must have branch name part in table identifier").isNotNull(); createAndInitTable("id INT, dep STRING", "{ \"id\": 1, \"dep\": \"hr\" }"); sql( diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java index 4d87099572b8..256e654b7775 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java @@ -22,6 +22,8 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.BulkDeletionFailureException; @@ -38,15 +40,14 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.assertj.core.api.Assertions; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; -public class TestWriteAborts extends SparkExtensionsTestBase { +@ExtendWith(ParameterizedTestExtension.class) +public class TestWriteAborts extends ExtensionsTestBase { - @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") public static Object[][] parameters() { return new Object[][] { { @@ -74,20 +75,14 @@ public static Object[][] parameters() { }; } - @Rule public TemporaryFolder temp = new TemporaryFolder(); - - public TestWriteAborts(String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - - @After + @AfterEach public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); } - @Test + @TestTemplate public void testBatchAppend() throws Exception { - String dataLocation = temp.newFolder().toString(); + String dataLocation = temp.toFile().toString(); sql( "CREATE TABLE %s (id INT, data STRING) "