diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java deleted file mode 100644 index 6b2b9a1b8082..000000000000 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark; - -import java.util.Map; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public abstract class SparkCatalogTestBase extends SparkTestBaseWithCatalog { - - // these parameters are broken out to avoid changes that need to modify lots of test suites - @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") - public static Object[][] parameters() { - return new Object[][] { - { - SparkCatalogConfig.HIVE.catalogName(), - SparkCatalogConfig.HIVE.implementation(), - SparkCatalogConfig.HIVE.properties() - }, - { - SparkCatalogConfig.HADOOP.catalogName(), - SparkCatalogConfig.HADOOP.implementation(), - SparkCatalogConfig.HADOOP.properties() - }, - { - SparkCatalogConfig.SPARK.catalogName(), - SparkCatalogConfig.SPARK.implementation(), - SparkCatalogConfig.SPARK.properties() - }, - { - SparkCatalogConfig.REST.catalogName(), - SparkCatalogConfig.REST.implementation(), - ImmutableMap.builder() - .putAll(SparkCatalogConfig.REST.properties()) - .put(CatalogProperties.URI, REST_SERVER_RULE.uri()) - .build() - } - }; - } - - @Rule public TemporaryFolder temp = new TemporaryFolder(); - - public SparkCatalogTestBase(SparkCatalogConfig config) { - super(config); - } - - public SparkCatalogTestBase( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } -} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java index 14e8fc34c3db..24a14bb64d86 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -18,6 +18,9 @@ */ package org.apache.iceberg.spark.source; +import static org.apache.iceberg.FileFormat.AVRO; +import static org.apache.iceberg.FileFormat.ORC; +import static org.apache.iceberg.FileFormat.PARQUET; import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; @@ -50,6 +53,9 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.actions.SizeBasedFileRewritePlanner; @@ -58,8 +64,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.SparkCatalogConfig; -import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.orc.OrcFile; @@ -69,73 +75,98 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestCompressionSettings extends SparkCatalogTestBase { +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestCompressionSettings extends CatalogTestBase { private static final Configuration CONF = new Configuration(); private static final String TABLE_NAME = "testWriteData"; private static SparkSession spark = null; - private final FileFormat format; - private final ImmutableMap properties; + @Parameter(index = 3) + private FileFormat format; + + @Parameter(index = 4) + private Map properties; - @Rule public TemporaryFolder temp = new TemporaryFolder(); + @TempDir private java.nio.file.Path temp; - @Parameterized.Parameters(name = "format = {0}, properties = {1}") + @Parameters( + name = + "catalogName = {0}, implementation = {1}, config = {2}, format = {3}, properties = {4}") public static Object[][] parameters() { return new Object[][] { - {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")}, - {"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")}, - {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")}, - {"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")}, - {"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")} + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + PARQUET, + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1") + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + PARQUET, + ImmutableMap.of(COMPRESSION_CODEC, "gzip") + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + ORC, + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed") + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + ORC, + ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression") + }, + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties(), + AVRO, + ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3") + } }; } - @BeforeClass + @BeforeAll public static void startSpark() { TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate(); } - @Before + @BeforeEach public void resetSpecificConfigurations() { spark.conf().unset(COMPRESSION_CODEC); spark.conf().unset(COMPRESSION_LEVEL); spark.conf().unset(COMPRESSION_STRATEGY); } - @Parameterized.AfterParam - public static void clearSourceCache() { + @AfterEach + public void afterEach() { spark.sql(String.format("DROP TABLE IF EXISTS %s", TABLE_NAME)); } - @AfterClass + @AfterAll public static void stopSpark() { SparkSession currentSpark = TestCompressionSettings.spark; TestCompressionSettings.spark = null; currentSpark.stop(); } - public TestCompressionSettings(String format, ImmutableMap properties) { - super( - SparkCatalogConfig.SPARK.catalogName(), - SparkCatalogConfig.SPARK.implementation(), - SparkCatalogConfig.SPARK.properties()); - this.format = FileFormat.fromString(format); - this.properties = properties; - } - - @Test + @TestTemplate public void testWriteDataWithDifferentSetting() throws Exception { sql("CREATE TABLE %s (id int, data string) USING iceberg", TABLE_NAME); Map tableProperties = Maps.newHashMap(); @@ -168,6 +199,8 @@ public void testWriteDataWithDifferentSetting() throws Exception { spark.conf().set(entry.getKey(), entry.getValue()); } + assertSparkConf(); + df.select("id", "data") .writeTo(TABLE_NAME) .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) @@ -230,4 +263,13 @@ private String getCompressionType(InputFile inputFile) throws Exception { return fileReader.getMetaString(DataFileConstants.CODEC); } } + + private void assertSparkConf() { + String[] propertiesToCheck = {COMPRESSION_CODEC, COMPRESSION_LEVEL, COMPRESSION_STRATEGY}; + for (String prop : propertiesToCheck) { + String expected = properties.getOrDefault(prop, null); + String actual = spark.conf().get(prop, null); + assertThat(actual).isEqualToIgnoringCase(expected); + } + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java index b669c91313f3..dc7d87b9036d 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java @@ -21,31 +21,28 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; -import java.util.Map; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.junit.After; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; -public class TestRequiredDistributionAndOrdering extends SparkCatalogTestBase { +@ExtendWith(ParameterizedTestExtension.class) +public class TestRequiredDistributionAndOrdering extends CatalogTestBase { - public TestRequiredDistributionAndOrdering( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - - @After + @AfterEach public void dropTestTable() { sql("DROP TABLE IF EXISTS %s", tableName); } - @Test + @TestTemplate public void testDefaultLocalSort() throws NoSuchTableException { sql( "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " @@ -74,7 +71,7 @@ public void testDefaultLocalSort() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } - @Test + @TestTemplate public void testPartitionColumnsArePrependedForRangeDistribution() throws NoSuchTableException { sql( "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " @@ -110,7 +107,7 @@ public void testPartitionColumnsArePrependedForRangeDistribution() throws NoSuch sql("SELECT count(*) FROM %s", tableName)); } - @Test + @TestTemplate public void testSortOrderIncludesPartitionColumns() throws NoSuchTableException { sql( "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " @@ -142,7 +139,7 @@ public void testSortOrderIncludesPartitionColumns() throws NoSuchTableException sql("SELECT count(*) FROM %s", tableName)); } - @Test + @TestTemplate public void testDisabledDistributionAndOrdering() { sql( "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " @@ -176,7 +173,7 @@ public void testDisabledDistributionAndOrdering() { + "and by partition within each spec. Either cluster the incoming records or switch to fanout writers."); } - @Test + @TestTemplate public void testHashDistribution() throws NoSuchTableException { sql( "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " @@ -212,7 +209,7 @@ public void testHashDistribution() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } - @Test + @TestTemplate public void testSortBucketTransformsWithoutExtensions() throws NoSuchTableException { sql( "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " @@ -238,7 +235,7 @@ public void testSortBucketTransformsWithoutExtensions() throws NoSuchTableExcept assertEquals("Rows must match", expected, sql("SELECT * FROM %s ORDER BY c1", tableName)); } - @Test + @TestTemplate public void testRangeDistributionWithQuotedColumnsNames() throws NoSuchTableException { sql( "CREATE TABLE %s (c1 INT, c2 STRING, `c.3` STRING) " @@ -274,7 +271,7 @@ public void testRangeDistributionWithQuotedColumnsNames() throws NoSuchTableExce sql("SELECT count(*) FROM %s", tableName)); } - @Test + @TestTemplate public void testHashDistributionWithQuotedColumnsNames() throws NoSuchTableException { sql( "CREATE TABLE %s (c1 INT, c2 STRING, `c``3` STRING) " diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java index b09c995b30fa..e7346e270f38 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.PlanningMode.LOCAL; import static org.apache.spark.sql.functions.date_add; import static org.apache.spark.sql.functions.expr; +import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.io.UncheckedIOException; @@ -29,6 +30,9 @@ import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; @@ -36,38 +40,47 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.TestBaseWithCatalog; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestRuntimeFiltering extends SparkTestBaseWithCatalog { - - @Parameterized.Parameters(name = "planningMode = {0}") - public static Object[] parameters() { - return new Object[] {LOCAL, DISTRIBUTED}; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestRuntimeFiltering extends TestBaseWithCatalog { + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, planningMode = {3}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + LOCAL + }, + { + SparkCatalogConfig.HADOOP.catalogName(), + SparkCatalogConfig.HADOOP.implementation(), + SparkCatalogConfig.HADOOP.properties(), + DISTRIBUTED + } + }; } - private final PlanningMode planningMode; - - public TestRuntimeFiltering(PlanningMode planningMode) { - this.planningMode = planningMode; - } + @Parameter(index = 3) + private PlanningMode planningMode; - @After + @AfterEach public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); sql("DROP TABLE IF EXISTS dim"); } - @Test + @TestTemplate public void testIdentityPartitionedTable() throws NoSuchTableException { sql( "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " @@ -106,7 +119,7 @@ public void testIdentityPartitionedTable() throws NoSuchTableException { sql(query)); } - @Test + @TestTemplate public void testBucketedTable() throws NoSuchTableException { sql( "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " @@ -145,7 +158,7 @@ public void testBucketedTable() throws NoSuchTableException { sql(query)); } - @Test + @TestTemplate public void testRenamedSourceColumnTable() throws NoSuchTableException { sql( "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " @@ -186,7 +199,7 @@ public void testRenamedSourceColumnTable() throws NoSuchTableException { sql(query)); } - @Test + @TestTemplate public void testMultipleRuntimeFilters() throws NoSuchTableException { sql( "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " @@ -229,7 +242,7 @@ public void testMultipleRuntimeFilters() throws NoSuchTableException { sql(query)); } - @Test + @TestTemplate public void testCaseSensitivityOfRuntimeFilters() throws NoSuchTableException { sql( "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " @@ -273,7 +286,7 @@ public void testCaseSensitivityOfRuntimeFilters() throws NoSuchTableException { sql(caseInsensitiveQuery)); } - @Test + @TestTemplate public void testBucketedTableWithMultipleSpecs() throws NoSuchTableException { sql( "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg", @@ -325,7 +338,7 @@ public void testBucketedTableWithMultipleSpecs() throws NoSuchTableException { sql(query)); } - @Test + @TestTemplate public void testSourceColumnWithDots() throws NoSuchTableException { sql( "CREATE TABLE %s (`i.d` BIGINT, data STRING, date DATE, ts TIMESTAMP) " @@ -369,7 +382,7 @@ public void testSourceColumnWithDots() throws NoSuchTableException { sql(query)); } - @Test + @TestTemplate public void testSourceColumnWithBackticks() throws NoSuchTableException { sql( "CREATE TABLE %s (`i``d` BIGINT, data STRING, date DATE, ts TIMESTAMP) " @@ -410,7 +423,7 @@ public void testSourceColumnWithBackticks() throws NoSuchTableException { sql(query)); } - @Test + @TestTemplate public void testUnpartitionedTable() throws NoSuchTableException { sql( "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) USING iceberg", @@ -458,7 +471,7 @@ private void assertQueryContainsRuntimeFilters( List output = spark.sql("EXPLAIN EXTENDED " + query).collectAsList(); String plan = output.get(0).getString(0); int actualFilterCount = StringUtils.countMatches(plan, "dynamicpruningexpression"); - Assert.assertEquals(errorMessage, expectedFilterCount, actualFilterCount); + assertThat(actualFilterCount).as(errorMessage).isEqualTo(expectedFilterCount); } // delete files that don't match the filter to ensure dynamic filtering works and only required @@ -490,9 +503,8 @@ private void deleteNotMatchingFiles(Expression filter, int expectedDeletedFileCo throw new UncheckedIOException(e); } - Assert.assertEquals( - "Deleted unexpected number of files", - expectedDeletedFileCount, - deletedFileLocations.size()); + assertThat(deletedFileLocations) + .as("Deleted unexpected number of files") + .hasSize(expectedDeletedFileCount); } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java index c27671311374..fd155a6bcaf3 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java @@ -18,25 +18,28 @@ */ package org.apache.iceberg.spark.source; -import java.util.Map; +import static org.assertj.core.api.Assertions.assertThat; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.KryoHelpers; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.SparkCatalog; -import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; -public class TestSparkCatalogHadoopOverrides extends SparkCatalogTestBase { +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkCatalogHadoopOverrides extends CatalogTestBase { private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir"; // prepend "hadoop." so that the test base formats SQLConf correctly @@ -44,7 +47,7 @@ public class TestSparkCatalogHadoopOverrides extends SparkCatalogTestBase { private static final String HADOOP_PREFIXED_CONFIG_TO_OVERRIDE = "hadoop." + CONFIG_TO_OVERRIDE; private static final String CONFIG_OVERRIDE_VALUE = "/tmp-overridden"; - @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") public static Object[][] parameters() { return new Object[][] { { @@ -77,41 +80,36 @@ public static Object[][] parameters() { }; } - public TestSparkCatalogHadoopOverrides( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - - @Before + @BeforeEach public void createTable() { sql("CREATE TABLE IF NOT EXISTS %s (id bigint) USING iceberg", tableName(tableIdent.name())); } - @After + @AfterEach public void dropTable() { sql("DROP TABLE IF EXISTS %s", tableName(tableIdent.name())); } - @Test + @TestTemplate public void testTableFromCatalogHasOverrides() throws Exception { Table table = getIcebergTableFromSparkCatalog(); Configuration conf = ((Configurable) table.io()).getConf(); String actualCatalogOverride = conf.get(CONFIG_TO_OVERRIDE, "/whammies"); - Assert.assertEquals( - "Iceberg tables from spark should have the overridden hadoop configurations from the spark config", - CONFIG_OVERRIDE_VALUE, - actualCatalogOverride); + assertThat(actualCatalogOverride) + .as( + "Iceberg tables from spark should have the overridden hadoop configurations from the spark config") + .isEqualTo(CONFIG_OVERRIDE_VALUE); } - @Test + @TestTemplate public void ensureRoundTripSerializedTableRetainsHadoopConfig() throws Exception { Table table = getIcebergTableFromSparkCatalog(); Configuration originalConf = ((Configurable) table.io()).getConf(); String actualCatalogOverride = originalConf.get(CONFIG_TO_OVERRIDE, "/whammies"); - Assert.assertEquals( - "Iceberg tables from spark should have the overridden hadoop configurations from the spark config", - CONFIG_OVERRIDE_VALUE, - actualCatalogOverride); + assertThat(actualCatalogOverride) + .as( + "Iceberg tables from spark should have the overridden hadoop configurations from the spark config") + .isEqualTo(CONFIG_OVERRIDE_VALUE); // Now convert to SerializableTable and ensure overridden property is still present. Table serializableTable = SerializableTableWithSize.copyOf(table); @@ -119,19 +117,19 @@ public void ensureRoundTripSerializedTableRetainsHadoopConfig() throws Exception KryoHelpers.roundTripSerialize(SerializableTableWithSize.copyOf(table)); Configuration configFromKryoSerde = ((Configurable) kryoSerializedTable.io()).getConf(); String kryoSerializedCatalogOverride = configFromKryoSerde.get(CONFIG_TO_OVERRIDE, "/whammies"); - Assert.assertEquals( - "Tables serialized with Kryo serialization should retain overridden hadoop configuration properties", - CONFIG_OVERRIDE_VALUE, - kryoSerializedCatalogOverride); + assertThat(kryoSerializedCatalogOverride) + .as( + "Tables serialized with Kryo serialization should retain overridden hadoop configuration properties") + .isEqualTo(CONFIG_OVERRIDE_VALUE); // Do the same for Java based serde Table javaSerializedTable = TestHelpers.roundTripSerialize(serializableTable); Configuration configFromJavaSerde = ((Configurable) javaSerializedTable.io()).getConf(); String javaSerializedCatalogOverride = configFromJavaSerde.get(CONFIG_TO_OVERRIDE, "/whammies"); - Assert.assertEquals( - "Tables serialized with Java serialization should retain overridden hadoop configuration properties", - CONFIG_OVERRIDE_VALUE, - javaSerializedCatalogOverride); + assertThat(javaSerializedCatalogOverride) + .as( + "Tables serialized with Java serialization should retain overridden hadoop configuration properties") + .isEqualTo(CONFIG_OVERRIDE_VALUE); } @SuppressWarnings("ThrowSpecificity") diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java index 241293f367aa..b0029c09ab66 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java @@ -18,38 +18,35 @@ */ package org.apache.iceberg.spark.source; +import static org.assertj.core.api.Assertions.assertThat; + import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.ScanTaskSetManager; -import org.apache.iceberg.spark.SparkCatalogTestBase; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; - -public class TestSparkStagedScan extends SparkCatalogTestBase { +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; - public TestSparkStagedScan( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkStagedScan extends CatalogTestBase { - @After + @AfterEach public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); } - @Test + @TestTemplate public void testTaskSetLoading() throws NoSuchTableException, IOException { sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); @@ -59,7 +56,7 @@ public void testTaskSetLoading() throws NoSuchTableException, IOException { df.writeTo(tableName).append(); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should produce 1 snapshot", 1, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should produce 1 snapshot").hasSize(1); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); @@ -84,7 +81,7 @@ public void testTaskSetLoading() throws NoSuchTableException, IOException { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void testTaskSetPlanning() throws NoSuchTableException, IOException { sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); @@ -95,7 +92,7 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException { df.coalesce(1).writeTo(tableName).append(); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should produce 2 snapshots", 2, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should produce 1 snapshot").hasSize(2); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); @@ -111,7 +108,9 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException { .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) .option(SparkReadOptions.SPLIT_SIZE, tasks.get(0).file().fileSizeInBytes()) .load(tableName); - Assert.assertEquals("Num partitions should match", 2, scanDF.javaRDD().getNumPartitions()); + assertThat(scanDF.javaRDD().getNumPartitions()) + .as("Num partitions should match") + .isEqualTo(2); // load the staged file set and make sure we combine both files into a single split scanDF = @@ -121,7 +120,9 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException { .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE) .load(tableName); - Assert.assertEquals("Num partitions should match", 1, scanDF.javaRDD().getNumPartitions()); + assertThat(scanDF.javaRDD().getNumPartitions()) + .as("Num partitions should match") + .isEqualTo(1); } } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java index 616a196872de..4a386ee861d6 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java @@ -18,34 +18,32 @@ */ package org.apache.iceberg.spark.source; -import java.util.Map; -import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.spark.CatalogTestBase; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.CatalogManager; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; -public class TestSparkTable extends SparkCatalogTestBase { +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkTable extends CatalogTestBase { - public TestSparkTable(String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } - - @Before + @BeforeEach public void createTable() { sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); } - @After + @AfterEach public void removeTable() { sql("DROP TABLE IF EXISTS %s", tableName); } - @Test + @TestTemplate public void testTableEquality() throws NoSuchTableException { CatalogManager catalogManager = spark.sessionState().catalogManager(); TableCatalog catalog = (TableCatalog) catalogManager.catalog(catalogName); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index b7d415de3454..2544a8f73954 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -49,7 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; @@ -59,20 +60,14 @@ import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; -@RunWith(Parameterized.class) -public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase { - public TestStructuredStreamingRead3( - String catalogName, String implementation, Map config) { - super(catalogName, implementation, config); - } +@ExtendWith(ParameterizedTestExtension.class) +public final class TestStructuredStreamingRead3 extends CatalogTestBase { private Table table; @@ -114,13 +109,13 @@ public TestStructuredStreamingRead3( Lists.newArrayList( new SimpleRecord(15, "fifteen"), new SimpleRecord(16, "sixteen")))); - @BeforeClass + @BeforeAll public static void setupSpark() { // disable AQE as tests assume that writes generate a particular number of files spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false"); } - @Before + @BeforeEach public void setupTable() { sql( "CREATE TABLE %s " @@ -132,19 +127,19 @@ public void setupTable() { microBatches.set(0); } - @After + @AfterEach public void stopStreams() throws TimeoutException { for (StreamingQuery query : spark.streams().active()) { query.stop(); } } - @After + @AfterEach public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); } - @Test + @TestTemplate public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception { List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; appendDataAsMultipleSnapshots(expected); @@ -155,37 +150,38 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } - @Test + @TestTemplate public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); - Assert.assertEquals( - 6, - microBatchCount( - ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"))); + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"))) + .isEqualTo(6); } - @Test + @TestTemplate public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); - Assert.assertEquals( - 3, - microBatchCount( - ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2"))); + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2"))) + .isEqualTo(3); } - @Test + @TestTemplate public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); // only 1 micro-batch will be formed and we will read data partially - Assert.assertEquals( - 1, - microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"))); + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"))) + .isEqualTo(1); StreamingQuery query = startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"); @@ -196,17 +192,18 @@ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1 Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0))); } - @Test + @TestTemplate public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); - Assert.assertEquals( - 2, - microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4"))); + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4"))) + .isEqualTo(2); } - @Test + @TestTemplate public void testReadStreamOnIcebergThenAddData() throws Exception { List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; @@ -218,7 +215,7 @@ public void testReadStreamOnIcebergThenAddData() throws Exception { assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } - @Test + @TestTemplate public void testReadingStreamFromTimestamp() throws Exception { List dataBeforeTimestamp = Lists.newArrayList( @@ -245,7 +242,7 @@ public void testReadingStreamFromTimestamp() throws Exception { assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } - @Test + @TestTemplate public void testReadingStreamFromFutureTimetsamp() throws Exception { long futureTimestamp = System.currentTimeMillis() + 10000; @@ -277,7 +274,7 @@ public void testReadingStreamFromFutureTimetsamp() throws Exception { assertThat(actual).containsExactlyInAnyOrderElementsOf(data); } - @Test + @TestTemplate public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception { List dataBeforeTimestamp = Lists.newArrayList( @@ -290,7 +287,7 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp)); List actual = rowsAvailable(query); - Assert.assertEquals(Collections.emptyList(), actual); + assertThat(actual).isEmpty(); // Stream should contain data added after the timestamp elapses waitUntilAfter(streamStartTimestamp); @@ -300,7 +297,7 @@ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws E .containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } - @Test + @TestTemplate public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception { List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; @@ -322,7 +319,7 @@ public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } - @Test + @TestTemplate public void testReadingStreamWithExpiredSnapshotFromTimestamp() throws TimeoutException { List firstSnapshotRecordList = Lists.newArrayList(new SimpleRecord(1, "one")); @@ -351,11 +348,11 @@ public void testReadingStreamWithExpiredSnapshotFromTimestamp() throws TimeoutEx assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecordList); } - @Test + @TestTemplate public void testResumingStreamReadFromCheckpoint() throws Exception { - File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder"); + File writerCheckpointFolder = temp.resolve("writer-checkpoint-folder").toFile(); File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint"); - File output = temp.newFolder(); + File output = temp.resolve("junit").toFile(); DataStreamWriter querySource = spark @@ -391,11 +388,11 @@ public void testResumingStreamReadFromCheckpoint() throws Exception { } } - @Test + @TestTemplate public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException { - File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder"); + File writerCheckpointFolder = temp.resolve("writer-checkpoint-folder").toFile(); File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint"); - File output = temp.newFolder(); + File output = temp.resolve("junit").toFile(); DataStreamWriter querySource = spark @@ -431,7 +428,7 @@ public void testFailReadingCheckpointInvalidSnapshot() throws IOException, Timeo firstSnapshotid)); } - @Test + @TestTemplate public void testParquetOrcAvroDataInOneTable() throws Exception { List parquetFileRecords = Lists.newArrayList( @@ -453,14 +450,14 @@ public void testParquetOrcAvroDataInOneTable() throws Exception { Iterables.concat(parquetFileRecords, orcFileRecords, avroFileRecords)); } - @Test + @TestTemplate public void testReadStreamFromEmptyTable() throws Exception { StreamingQuery stream = startStream(); List actual = rowsAvailable(stream); - Assert.assertEquals(Collections.emptyList(), actual); + assertThat(actual).isEmpty(); } - @Test + @TestTemplate public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception { // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. TableOperations ops = ((BaseTable) table).operations(); @@ -481,14 +478,14 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception DeleteFile eqDeletes = FileHelpers.writeDeleteFile( table, - Files.localOutput(temp.newFile()), + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); DataFile dataFile = DataFiles.builder(table.spec()) - .withPath(temp.newFile().toString()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) .withFileSizeInBytes(10) .withRecordCount(1) .withFormat(FileFormat.PARQUET) @@ -498,7 +495,7 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception // check pre-condition - that the above Delete file write - actually resulted in snapshot of // type OVERWRITE - Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation()); + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); StreamingQuery query = startStream(); @@ -508,7 +505,7 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception .hasMessageStartingWith("Cannot process overwrite snapshot"); } - @Test + @TestTemplate public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception { // fill table with some data List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; @@ -518,14 +515,14 @@ public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Excepti table.rewriteManifests().clusterBy(f -> 1).commit(); // check pre-condition - Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation()); + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.REPLACE); StreamingQuery query = startStream(); List actual = rowsAvailable(query); assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } - @Test + @TestTemplate public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); @@ -538,7 +535,7 @@ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { // check pre-condition - that the above delete operation on table resulted in Snapshot of Type // DELETE. - Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation()); + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.DELETE); StreamingQuery query = startStream(); @@ -548,7 +545,7 @@ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { .hasMessageStartingWith("Cannot process delete snapshot"); } - @Test + @TestTemplate public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception { table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); @@ -561,14 +558,14 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exc // check pre-condition - that the above delete operation on table resulted in Snapshot of Type // DELETE. - Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation()); + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.DELETE); StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true"); assertThat(rowsAvailable(query)) .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); } - @Test + @TestTemplate public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws Exception { table.updateSpec().removeField("id_bucket").addField(ref("id")).commit(); @@ -578,7 +575,7 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws DataFile dataFile = DataFiles.builder(table.spec()) - .withPath(temp.newFile().toString()) + .withPath(File.createTempFile("junit", null, temp.toFile()).getPath()) .withFileSizeInBytes(10) .withRecordCount(1) .withFormat(FileFormat.PARQUET) @@ -593,7 +590,7 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws // check pre-condition - that the above delete operation on table resulted in Snapshot of Type // OVERWRITE. - Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation()); + assertThat(table.currentSnapshot().operation()).isEqualTo(DataOperations.OVERWRITE); StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, "true"); assertThat(rowsAvailable(query)) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java index 55fd2cefe2e6..5dbfc7fa6c0f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -31,7 +32,9 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestRequiredDistributionAndOrdering extends CatalogTestBase { @AfterEach diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java index c031f2991fed..fd155a6bcaf3 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.KryoHelpers; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; @@ -35,7 +36,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkCatalogHadoopOverrides extends CatalogTestBase { private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir"; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java index 6ce2ce623835..e444b7cb1f7c 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.UUID; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -35,7 +36,9 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkStagedScan extends CatalogTestBase { @AfterEach diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java index 46ee484b39ea..d14b1a52cf82 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.CatalogManager; @@ -28,7 +29,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkTable extends CatalogTestBase { @BeforeEach diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java index 55fd2cefe2e6..5dbfc7fa6c0f 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -31,7 +32,9 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestRequiredDistributionAndOrdering extends CatalogTestBase { @AfterEach diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java index c031f2991fed..fd155a6bcaf3 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalogHadoopOverrides.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.KryoHelpers; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.Table; import org.apache.iceberg.TestHelpers; @@ -35,7 +36,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkCatalogHadoopOverrides extends CatalogTestBase { private static final String CONFIG_TO_OVERRIDE = "fs.s3a.buffer.dir"; diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java index 6ce2ce623835..e444b7cb1f7c 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.UUID; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -35,7 +36,9 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkStagedScan extends CatalogTestBase { @AfterEach diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java index 46ee484b39ea..d14b1a52cf82 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.CatalogManager; @@ -28,7 +29,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ParameterizedTestExtension.class) public class TestSparkTable extends CatalogTestBase { @BeforeEach