diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index eeef75be6334..5b07a3c3847d 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -172,6 +172,10 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer antlr libs.antlr.antlr4 } + test { + useJUnitPlatform() + } + generateGrammarSource { maxHeapSize = "64m" arguments += ['-visitor', '-package', 'org.apache.spark.sql.catalyst.parser.extensions'] diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java new file mode 100644 index 000000000000..8e167b7f7320 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; + +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.TestBase; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.jupiter.api.BeforeAll; + +public abstract class ExtensionsTestBase extends CatalogTestBase { + + private static final Random RANDOM = ThreadLocalRandom.current(); + + @BeforeAll + public static void startMetastoreAndSpark() { + TestBase.metastore = new TestHiveMetastore(); + metastore.start(); + TestBase.hiveConf = metastore.hiveConf(); + + TestBase.spark = + SparkSession.builder() + .master("local[2]") + .config("spark.testing", "true") + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName()) + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config("spark.sql.shuffle.partitions", "4") + .config("spark.sql.hive.metastorePartitionPruningFallbackOnException", "true") + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .config( + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean())) + .enableHiveSupport() + .getOrCreate(); + + TestBase.catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); + } +} diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index eaa0a5894c85..91c74a6ad435 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -19,11 +19,11 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; -import java.io.IOException; +import java.nio.file.Path; import java.util.List; -import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -35,6 +35,9 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.iceberg.DataFile; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -49,17 +52,15 @@ import org.apache.spark.sql.types.StructType; import org.assertj.core.api.Assertions; import org.joda.time.DateTime; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runners.Parameterized.Parameters; - -public class TestAddFilesProcedure extends SparkExtensionsTestBase { +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestAddFilesProcedure extends ExtensionsTestBase { @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}") public static Object[][] parameters() { @@ -85,34 +86,26 @@ public static Object[][] parameters() { }; } - private final int formatVersion; + @Parameter(index = 3) + private int formatVersion; + private final String sourceTableName = "source_table"; private File fileTableDir; - public TestAddFilesProcedure( - String catalogName, String implementation, Map config, int formatVersion) { - super(catalogName, implementation, config); - this.formatVersion = formatVersion; - } - - @Rule public TemporaryFolder temp = new TemporaryFolder(); + @TempDir private Path temp; - @Before + @BeforeEach public void setupTempDirs() { - try { - fileTableDir = temp.newFolder(); - } catch (IOException e) { - throw new RuntimeException(e); - } + fileTableDir = temp.toFile(); } - @After + @AfterEach public void dropTables() { sql("DROP TABLE IF EXISTS %s PURGE", sourceTableName); sql("DROP TABLE IF EXISTS %s", tableName); } - @Test + @TestTemplate public void addDataUnpartitioned() { createUnpartitionedFileTable("parquet"); @@ -131,7 +124,7 @@ public void addDataUnpartitioned() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void deleteAndAddBackUnpartitioned() { createUnpartitionedFileTable("parquet"); @@ -157,7 +150,7 @@ public void deleteAndAddBackUnpartitioned() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Ignore // TODO Classpath issues prevent us from actually writing to a Spark ORC table + @Disabled // TODO Classpath issues prevent us from actually writing to a Spark ORC table public void addDataUnpartitionedOrc() { createUnpartitionedFileTable("orc"); @@ -171,7 +164,7 @@ public void addDataUnpartitionedOrc() { "CALL %s.system.add_files('%s', '`orc`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - Assert.assertEquals(2L, result); + assertThat(result).isEqualTo(2L); assertEquals( "Iceberg table contains correct data", @@ -179,11 +172,11 @@ public void addDataUnpartitionedOrc() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addAvroFile() throws Exception { // Spark Session Catalog cannot load metadata tables // with "The namespace in session catalog must have exactly one name part" - Assume.assumeFalse(catalogName.equals("spark_catalog")); + assumeThat(catalogName).isNotEqualTo("spark_catalog"); // Create an Avro file @@ -199,7 +192,7 @@ public void addAvroFile() throws Exception { GenericRecord record2 = new GenericData.Record(schema); record2.put("id", 2L); record2.put("data", "b"); - File outputFile = temp.newFile("test.avro"); + File outputFile = temp.resolve("test.avro").toFile(); DatumWriter datumWriter = new GenericDatumWriter(schema); DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); @@ -234,7 +227,7 @@ public void addAvroFile() throws Exception { } // TODO Adding spark-avro doesn't work in tests - @Ignore + @Disabled public void addDataUnpartitionedAvro() { createUnpartitionedFileTable("avro"); @@ -248,7 +241,7 @@ public void addDataUnpartitionedAvro() { "CALL %s.system.add_files('%s', '`avro`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - Assert.assertEquals(2L, result); + assertThat(result).isEqualTo(2L); assertEquals( "Iceberg table contains correct data", @@ -256,7 +249,7 @@ public void addDataUnpartitionedAvro() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addDataUnpartitionedHive() { createUnpartitionedHiveTable(); @@ -273,7 +266,7 @@ public void addDataUnpartitionedHive() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addDataUnpartitionedExtraCol() { createUnpartitionedFileTable("parquet"); @@ -292,7 +285,7 @@ public void addDataUnpartitionedExtraCol() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addDataUnpartitionedMissingCol() { createUnpartitionedFileTable("parquet"); @@ -311,7 +304,7 @@ public void addDataUnpartitionedMissingCol() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addDataPartitionedMissingCol() { createPartitionedFileTable("parquet"); @@ -330,7 +323,7 @@ public void addDataPartitionedMissingCol() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addDataPartitioned() { createPartitionedFileTable("parquet"); @@ -350,7 +343,7 @@ public void addDataPartitioned() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Ignore // TODO Classpath issues prevent us from actually writing to a Spark ORC table + @Disabled // TODO Classpath issues prevent us from actually writing to a Spark ORC table public void addDataPartitionedOrc() { createPartitionedFileTable("orc"); @@ -364,7 +357,7 @@ public void addDataPartitionedOrc() { "CALL %s.system.add_files('%s', '`parquet`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - Assert.assertEquals(8L, result); + assertThat(result).isEqualTo(8L); assertEquals( "Iceberg table contains correct data", @@ -373,7 +366,7 @@ public void addDataPartitionedOrc() { } // TODO Adding spark-avro doesn't work in tests - @Ignore + @Disabled public void addDataPartitionedAvro() { createPartitionedFileTable("avro"); @@ -387,7 +380,7 @@ public void addDataPartitionedAvro() { "CALL %s.system.add_files('%s', '`avro`.`%s`')", catalogName, tableName, fileTableDir.getAbsolutePath()); - Assert.assertEquals(8L, result); + assertThat(result).isEqualTo(8L); assertEquals( "Iceberg table contains correct data", @@ -395,7 +388,7 @@ public void addDataPartitionedAvro() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addDataPartitionedHive() { createPartitionedHiveTable(); @@ -413,7 +406,7 @@ public void addDataPartitionedHive() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addPartitionToPartitioned() { createPartitionedFileTable("parquet"); @@ -433,7 +426,7 @@ public void addPartitionToPartitioned() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void deleteAndAddBackPartitioned() { createPartitionedFileTable("parquet"); @@ -460,7 +453,7 @@ public void deleteAndAddBackPartitioned() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() { createPartitionedFileTable("parquet"); @@ -489,10 +482,10 @@ public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() { Pattern uuidPattern = Pattern.compile("[a-f0-9]{8}(?:-[a-f0-9]{4}){4}[a-f0-9]{8}"); Matcher matcher = uuidPattern.matcher(manifestPath); - Assert.assertTrue("verify manifest path has uuid", matcher.find()); + assertThat(matcher.find()).as("verify manifest path has uuid").isTrue(); } - @Test + @TestTemplate public void addDataPartitionedByDateToPartitioned() { createDatePartitionedFileTable("parquet"); @@ -511,7 +504,7 @@ public void addDataPartitionedByDateToPartitioned() { sql("SELECT id, name, date FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addDataPartitionedVerifyPartitionTypeInferredCorrectly() { createTableWithTwoPartitions("parquet"); @@ -530,7 +523,7 @@ public void addDataPartitionedVerifyPartitionTypeInferredCorrectly() { sql(sqlFormat, tableName)); } - @Test + @TestTemplate public void addFilteredPartitionsToPartitioned() { createCompositePartitionedTable("parquet"); @@ -550,7 +543,7 @@ public void addFilteredPartitionsToPartitioned() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addFilteredPartitionsToPartitioned2() { createCompositePartitionedTable("parquet"); @@ -572,7 +565,7 @@ public void addFilteredPartitionsToPartitioned2() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnId() { createCompositePartitionedTableWithNullValueInPartitionColumn("parquet"); @@ -592,7 +585,7 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnId() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { createCompositePartitionedTableWithNullValueInPartitionColumn("parquet"); @@ -614,7 +607,7 @@ public void addFilteredPartitionsToPartitionedWithNullValueFilteringOnDept() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addWeirdCaseHiveTable() { createWeirdCaseTable(); @@ -639,22 +632,20 @@ public void addWeirdCaseHiveTable() { .collect(Collectors.toList()); // TODO when this assert breaks Spark fixed the pushdown issue - Assert.assertEquals( - "If this assert breaks it means that Spark has fixed the pushdown issue", - 0, - sql( + assertThat( + sql( "SELECT id, `naMe`, dept, subdept from %s WHERE `naMe` = 'John Doe' ORDER BY id", - sourceTableName) - .size()); + sourceTableName)) + .as("If this assert breaks it means that Spark has fixed the pushdown issue") + .hasSize(0); // Pushdown works for iceberg - Assert.assertEquals( - "We should be able to pushdown mixed case partition keys", - 2, - sql( + assertThat( + sql( "SELECT id, `naMe`, dept, subdept FROM %s WHERE `naMe` = 'John Doe' ORDER BY id", - tableName) - .size()); + tableName)) + .as("We should be able to pushdown mixed case partition keys") + .hasSize(2); assertEquals( "Iceberg table contains correct data", @@ -662,7 +653,7 @@ public void addWeirdCaseHiveTable() { sql("SELECT id, `naMe`, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void addPartitionToPartitionedHive() { createPartitionedHiveTable(); @@ -682,7 +673,7 @@ public void addPartitionToPartitionedHive() { sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void invalidDataImport() { createPartitionedFileTable("parquet"); @@ -705,7 +696,7 @@ public void invalidDataImport() { .hasMessageStartingWith("Cannot add partitioned files to an unpartitioned table"); } - @Test + @TestTemplate public void invalidDataImportPartitioned() { createUnpartitionedFileTable("parquet"); @@ -732,7 +723,7 @@ public void invalidDataImportPartitioned() { "specified partition filter refers to columns that are not partitioned"); } - @Test + @TestTemplate public void addTwice() { createPartitionedHiveTable(); @@ -767,7 +758,7 @@ public void addTwice() { sql("SELECT id, name, dept, subdept FROM %s WHERE id = 2 ORDER BY id", tableName)); } - @Test + @TestTemplate public void duplicateDataPartitioned() { createPartitionedHiveTable(); @@ -795,7 +786,7 @@ public void duplicateDataPartitioned() { + " exist within the target table"); } - @Test + @TestTemplate public void duplicateDataPartitionedAllowed() { createPartitionedHiveTable(); @@ -832,7 +823,7 @@ public void duplicateDataPartitionedAllowed() { sql("SELECT id, name, dept, subdept FROM %s", tableName, tableName)); } - @Test + @TestTemplate public void duplicateDataUnpartitioned() { createUnpartitionedHiveTable(); @@ -851,7 +842,7 @@ public void duplicateDataUnpartitioned() { + " exist within the target table"); } - @Test + @TestTemplate public void duplicateDataUnpartitionedAllowed() { createUnpartitionedHiveTable(); @@ -878,7 +869,7 @@ public void duplicateDataUnpartitionedAllowed() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void testEmptyImportDoesNotThrow() { createIcebergTable("id Integer, name String, dept String, subdept String"); @@ -907,7 +898,7 @@ public void testEmptyImportDoesNotThrow() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { createPartitionedHiveTable(); @@ -935,7 +926,7 @@ public void testPartitionedImportFromEmptyPartitionDoesNotThrow() { sql("SELECT * FROM %s ORDER BY id", tableName)); } - @Test + @TestTemplate public void testAddFilesWithParallelism() { createUnpartitionedHiveTable();