diff --git a/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java b/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java index 59652bab98..a65ee4dfa3 100644 --- a/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java +++ b/api/src/test/java/org/apache/iceberg/ParameterizedTestExtension.java @@ -20,7 +20,9 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -70,18 +72,16 @@ public boolean supportsTestTemplate(ExtensionContext context) { public Stream provideTestTemplateInvocationContexts( ExtensionContext context) { - // Search method annotated with @Parameters + // Search for methods annotated with @Parameters, preferring the most specific class final List parameterProviders = AnnotationSupport.findAnnotatedMethods( context.getRequiredTestClass(), Parameters.class, HierarchyTraversalMode.TOP_DOWN); - if (parameterProviders.isEmpty()) { + Method parameterProvider = + resolveParameterProvider(context.getRequiredTestClass(), parameterProviders); + if (parameterProvider == null) { throw new IllegalStateException("Cannot find any parameter provider"); } - if (parameterProviders.size() > 1) { - throw new IllegalStateException("Multiple parameter providers are found"); - } - Method parameterProvider = parameterProviders.get(0); // Get potential test name String testNameTemplate = parameterProvider.getAnnotation(Parameters.class).name(); @@ -97,18 +97,30 @@ public Stream provideTestTemplateInvocationContex Preconditions.checkState(parameterValues != null, "Parameter values cannot be null"); - // Parameter values could be Object[][] - if (parameterValues instanceof Object[][]) { - Object[][] typedParameterValues = (Object[][]) parameterValues; - return createContextForParameters( - Arrays.stream(typedParameterValues), testNameTemplate, context); + List allParameters = new ArrayList<>(); + normalizeParameters(parameterValues).forEach(allParameters::add); + + try { + Method extraMethod = context.getRequiredTestClass().getMethod("getExtraParameters"); + if (Modifier.isStatic(extraMethod.getModifiers())) { + Object extra = extraMethod.invoke(null); + normalizeParameters(extra).forEach(allParameters::add); + } + } catch (NoSuchMethodException e) { + // ignore + } catch (Exception e) { + throw new RuntimeException("Failed to invoke getExtraParameters", e); } - // or a Collection - if (parameterValues instanceof Collection) { - final Collection typedParameterValues = (Collection) parameterValues; - final Stream parameterValueStream = - typedParameterValues.stream() + return createContextForParameters(allParameters.stream(), testNameTemplate, context); + } + + private Stream normalizeParameters(Object parameters) { + if (parameters instanceof Object[][]) { + return Arrays.stream((Object[][]) parameters); + } else if (parameters instanceof Collection) { + return ((Collection) parameters) + .stream() .map( (Function) parameterValue -> { @@ -118,13 +130,36 @@ public Stream provideTestTemplateInvocationContex return new Object[] {parameterValue}; } }); - return createContextForParameters(parameterValueStream, testNameTemplate, context); } - throw new IllegalStateException( String.format( - "Return type of @Parameters annotated method \"%s\" should be either Object[][] or Collection", - parameterProvider)); + "Return type of @Parameters annotated method should be either Object[][] or Collection, but was %s", + parameters.getClass().getName())); + } + + /** + * Resolves the parameter provider method, preferring the most specific (child) class when multiple + * @Parameters methods exist in the hierarchy. + */ + private static Method resolveParameterProvider( + Class testClass, List parameterProviders) { + if (parameterProviders.isEmpty()) { + return null; + } + if (parameterProviders.size() == 1) { + return parameterProviders.get(0); + } + + // Walk up the hierarchy, return the first match (most specific class) + for (Class current = testClass; current != null; current = current.getSuperclass()) { + for (Method candidate : parameterProviders) { + if (candidate.getDeclaringClass().equals(current)) { + return candidate; + } + } + } + + return parameterProviders.get(0); } private static class FieldInjectingInvocationContext implements TestTemplateInvocationContext { diff --git a/build.gradle b/build.gradle index 90675cfb98..5c8c6d871c 100644 --- a/build.gradle +++ b/build.gradle @@ -48,6 +48,12 @@ buildscript { String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") String sparkVersionsString = System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions") List sparkVersions = sparkVersionsString != null && !sparkVersionsString.isEmpty() ? sparkVersionsString.split(",") : [] +// OpenHouse compatibility testing configuration +String openHouseCompatibilitySparkMajorVersion = findProperty("openhouseCompatibilitySparkMajorVersion") ?: '3.5' +ext { + openHouseCompatibilityCoordinate = findProperty("openhouseCompatibilityCoordinate") ?: + 'com.linkedin.openhouse:tables-test-fixtures-iceberg-1.5_2.12:0.0.+:uber' +} try { // apply these plugins in a try-catch block so that we can handle cases without .git directory @@ -128,6 +134,10 @@ allprojects { repositories { mavenCentral() mavenLocal() + // LinkedIn OpenHouse artifacts for compatibility testing + maven { + url "https://linkedin.jfrog.io/artifactory/openhouse" + } } } @@ -335,9 +345,17 @@ project(':iceberg-common') { } project(':iceberg-core') { + configurations { + openhouseCompatibilityRuntime { + canBeConsumed = false + canBeResolved = true + } + } + test { useJUnitPlatform() } + dependencies { api project(':iceberg-api') implementation project(':iceberg-common') @@ -370,6 +388,63 @@ project(':iceberg-core') { testImplementation libs.esotericsoftware.kryo testImplementation libs.guava.testlib testImplementation libs.awaitility + + openhouseCompatibilityRuntime(rootProject.openHouseCompatibilityCoordinate) { + transitive = false + } + openhouseCompatibilityRuntime project(':iceberg-aws') + openhouseCompatibilityRuntime 'com.google.code.gson:gson:2.10.1' + openhouseCompatibilityRuntime 'com.zaxxer:HikariCP:4.0.3' + } + + tasks.register('openhouseCompatibilityTest', Test) { + useJUnitPlatform() + group = 'verification' + description = 'Runs OpenHouse compatibility tests for iceberg-core' + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + configurations.openhouseCompatibilityRuntime + + systemProperty 'iceberg.test.table.provider', + 'org.apache.iceberg.openhouse.OpenHouseTestTableProvider' + + // Test filter - initially exclude everything, then include from list + filter { + failOnNoMatchingTests = false + } + + // Load inclusions from the fixtures jar at execution time + doFirst { + println "DEBUG: Resolving openhouseCompatibilityRuntime..." + configurations.openhouseCompatibilityRuntime.resolve().each { file -> + if (file.name.contains('tables-test-fixtures')) { + println "DEBUG: Found fixtures jar: ${file.name}" + zipTree(file).matching { include 'openhouse-iceberg-compatibility-tests.txt' }.each { f -> + println "DEBUG: Found inclusions file: ${f}" + f.eachLine { line -> + line = line.trim() + if (line && !line.startsWith('#')) { + // Replace # with . for Gradle test filter pattern + line = line.replace('#', '.') + + // Append wildcard to handle parameterized tests + if (!line.endsWith('*')) { + line = line + '*' + } + println "DEBUG: Including ${line}" + filter.includeTestsMatching line + } + } + } + } + } + } + + + testLogging { + events "passed", "skipped", "failed", "standardOut", "standardError" + showStandardStreams = true + exceptionFormat = 'full' + } } } @@ -1030,6 +1105,14 @@ apply from: 'baseline.gradle' apply from: 'deploy.gradle' apply from: 'tasks.gradle' +tasks.register('openhouseCompatibility') { + group = "verification" + description = "Runs all OpenHouse compatibility tests" + dependsOn project(':iceberg-core').tasks.openhouseCompatibilityTest + dependsOn project(":iceberg-spark:iceberg-spark-${openHouseCompatibilitySparkMajorVersion}_${scalaVersion}").tasks.openhouseCompatibilityTest + dependsOn project(":iceberg-spark:iceberg-spark-extensions-${openHouseCompatibilitySparkMajorVersion}_${scalaVersion}").tasks.openhouseCompatibilityTest +} + project(':iceberg-bom') { apply plugin: 'java-platform' diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index c3db859101..bff0d73183 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -43,6 +43,7 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -171,6 +172,10 @@ public class TableTestBase { protected File metadataDir = null; public TestTables.TestTable table = null; + // External table provider loaded via reflection to avoid compile-time dependency + private static final Object EXTERNAL_TABLE_PROVIDER = initializeExternalTableProvider(); + private static final java.lang.reflect.Method CREATE_TABLE_METHOD = getCreateTableMethod(); + protected final int formatVersion; @SuppressWarnings("checkstyle:MemberName") @@ -199,26 +204,50 @@ public void cleanupTables() { TestTables.clearTables(); } + @AfterClass + public static void shutdownExternalTableProvider() { + if (EXTERNAL_TABLE_PROVIDER != null) { + try { + java.lang.reflect.Method afterAll = + EXTERNAL_TABLE_PROVIDER.getClass().getMethod("afterAll"); + afterAll.invoke(EXTERNAL_TABLE_PROVIDER); + } catch (Exception e) { + throw new RuntimeException( + "Failed to shutdown external table provider: " + + EXTERNAL_TABLE_PROVIDER.getClass().getName(), + e); + } + } + } + List listManifestFiles() { return listManifestFiles(tableDir); } List listManifestFiles(File tableDirToList) { - return Lists.newArrayList( + File[] files = new File(tableDirToList, "metadata") .listFiles( (dir, name) -> !name.startsWith("snap") - && Files.getFileExtension(name).equalsIgnoreCase("avro"))); + && Files.getFileExtension(name).equalsIgnoreCase("avro")); + if (files == null) { + return Lists.newArrayList(); + } + return Lists.newArrayList(files); } List listManifestLists(String tableDirToList) { - return Lists.newArrayList( + File[] files = new File(tableDirToList, "metadata") .listFiles( (dir, name) -> name.startsWith("snap") - && Files.getFileExtension(name).equalsIgnoreCase("avro"))); + && Files.getFileExtension(name).equalsIgnoreCase("avro")); + if (files == null) { + return Lists.newArrayList(); + } + return Lists.newArrayList(files); } public static long countAllMetadataFiles(File tableDir) { @@ -228,19 +257,60 @@ public static long countAllMetadataFiles(File tableDir) { } protected TestTables.TestTable create(Schema schema, PartitionSpec spec) { + if (EXTERNAL_TABLE_PROVIDER != null && CREATE_TABLE_METHOD != null) { + try { + Object result = + CREATE_TABLE_METHOD.invoke( + EXTERNAL_TABLE_PROVIDER, tableDir, "test", schema, spec, formatVersion); + return (TestTables.TestTable) result; + } catch (Exception e) { + throw new RuntimeException("Failed to create table via external provider", e); + } + } return TestTables.create(tableDir, "test", schema, spec, formatVersion); } + private static Object initializeExternalTableProvider() { + String providerClass = System.getProperty("iceberg.test.table.provider"); + if (providerClass == null || providerClass.isEmpty()) { + return null; + } + + try { + Object provider = Class.forName(providerClass).getDeclaredConstructor().newInstance(); + // Call beforeAll() via reflection + java.lang.reflect.Method beforeAll = provider.getClass().getMethod("beforeAll"); + beforeAll.invoke(provider); + return provider; + } catch (Exception e) { + throw new RuntimeException("Failed to initialize " + providerClass, e); + } + } + + private static java.lang.reflect.Method getCreateTableMethod() { + if (EXTERNAL_TABLE_PROVIDER == null) { + return null; + } + try { + return EXTERNAL_TABLE_PROVIDER + .getClass() + .getMethod("createTable", File.class, String.class, Schema.class, PartitionSpec.class, int.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException( + "External table provider must have createTable(File, String, Schema, PartitionSpec, int) method", e); + } + } + TestTables.TestTable load() { - return TestTables.load(tableDir, "test"); + return TestTables.load(tableDir, table.name()); } Integer version() { - return TestTables.metadataVersion("test"); + return TestTables.metadataVersion(table.name()); } public TableMetadata readMetadata() { - return TestTables.readMetadata("test"); + return TestTables.readMetadata(table.name()); } ManifestFile writeManifest(DataFile... files) throws IOException { diff --git a/spark/openhouse.gradle b/spark/openhouse.gradle new file mode 100644 index 0000000000..2c27531a45 --- /dev/null +++ b/spark/openhouse.gradle @@ -0,0 +1,90 @@ +// OpenHouse compatibility test configuration for Spark modules +// Applied to spark modules that need OpenHouse integration testing + +configurations { + openhouseCompatibilityRuntime { + canBeConsumed = false + canBeResolved = true + description = 'Runtime dependencies for OpenHouse compatibility tests' + } +} + +dependencies { + // Uber jar with transitive=false to avoid classpath conflicts + openhouseCompatibilityRuntime(rootProject.openHouseCompatibilityCoordinate) { + transitive = false + } + openhouseCompatibilityRuntime project(':iceberg-aws') + openhouseCompatibilityRuntime 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4' + openhouseCompatibilityRuntime 'com.google.code.gson:gson:2.10.1' + openhouseCompatibilityRuntime 'com.zaxxer:HikariCP:4.0.3' +} + +tasks.register('openhouseCompatibilityTest', Test) { + group = 'verification' + description = "Run Iceberg tests against OpenHouse catalog (Spark ${sparkMajorVersion})" + + useJUnitPlatform() + maxHeapSize '2560m' + + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + configurations.openhouseCompatibilityRuntime + + // Configure SPI providers + systemProperty 'iceberg.test.spark.session.provider', + 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' + systemProperty 'iceberg.test.catalog.provider', + 'org.apache.iceberg.spark.openhouse.OpenHouseSparkITestProvider' + systemProperty 'iceberg.test.catalog.skip.defaults', 'true' + + // Spark configuration + systemProperty 'spark.sql.sources.partitionOverwriteMode', 'dynamic' + systemProperty 'spark.sql.legacy.respectNullabilityInTextDatasetConversion', 'true' + systemProperty 'spark.driver.bindAddress', '127.0.0.1' + + // JVM args for Java 17+ compatibility + jvmArgs += [ + '--add-opens=java.base/java.lang=ALL-UNNAMED', + '--add-opens=java.base/java.lang.invoke=ALL-UNNAMED', + '--add-opens=java.base/java.lang.reflect=ALL-UNNAMED', + '--add-opens=java.base/java.io=ALL-UNNAMED', + '--add-opens=java.base/java.net=ALL-UNNAMED', + '--add-opens=java.base/java.nio=ALL-UNNAMED', + '--add-opens=java.base/java.util=ALL-UNNAMED', + '--add-opens=java.base/java.util.concurrent=ALL-UNNAMED', + '--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED', + '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED', + '--add-opens=java.base/sun.nio.cs=ALL-UNNAMED', + '--add-opens=java.base/sun.security.action=ALL-UNNAMED', + '--add-opens=java.base/sun.util.calendar=ALL-UNNAMED', + '--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED' + ] + + // Test filter - initially exclude everything, then include from list + filter { + failOnNoMatchingTests = false + } + + // Load inclusions from the fixtures jar at execution time + doFirst { + configurations.openhouseCompatibilityRuntime.resolve().each { file -> + if (file.name.contains('tables-test-fixtures')) { + zipTree(file).matching { include 'openhouse-iceberg-compatibility-tests.txt' }.each { f -> + f.eachLine { line -> + line = line.trim() + if (line && !line.startsWith('#')) { + // Replace # with . for Gradle test filter pattern + line = line.replace('#', '.') + + // Append wildcard to handle parameterized tests + if (!line.endsWith('*')) { + line = line + '*' + } + filter.includeTestsMatching line + } + } + } + } + } + } +} diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index b29ba6761e..7913b43d73 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -39,6 +39,9 @@ configure(sparkProjects) { } project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { + // Set property on the SUB-PROJECT so it is visible to the applied script + ext.sparkMajorVersion = sparkMajorVersion + apply plugin: 'scala' apply plugin: 'com.github.alisiikh.scalastyle' @@ -110,13 +113,17 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { useJUnitPlatform() } - tasks.withType(Test) { + tasks.withType(Test).configureEach { // Vectorized reads need more memory maxHeapSize '2560m' } + + apply from: "$projectDir/../../openhouse.gradle" } project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}") { + ext.sparkMajorVersion = sparkMajorVersion + apply from: "$projectDir/../../openhouse.gradle" apply plugin: 'java-library' apply plugin: 'scala' apply plugin: 'com.github.alisiikh.scalastyle' @@ -292,7 +299,9 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio useJUnitPlatform() description = "Test Spark3 Runtime Jar against Spark ${sparkMajorVersion}" group = "verification" - jvmArgs += project.property('extraJvmArgs') + if (project.hasProperty('extraJvmArgs')) { + jvmArgs += project.property('extraJvmArgs') + } testClassesDirs = sourceSets.integration.output.classesDirs classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path) inputs.file(shadowJar.archiveFile.get().asFile.path) @@ -304,4 +313,3 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio enabled = false } } - diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java index 8e167b7f73..a52c322fb8 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java @@ -42,7 +42,7 @@ public static void startMetastoreAndSpark() { metastore.start(); TestBase.hiveConf = metastore.hiveConf(); - TestBase.spark = + SparkSession.Builder builder = SparkSession.builder() .master("local[2]") .config("spark.testing", "true") @@ -54,8 +54,9 @@ public static void startMetastoreAndSpark() { .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") .config( SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean())) - .enableHiveSupport() - .getOrCreate(); + .enableHiveSupport(); + + TestBase.spark = buildSparkSession(builder, hiveConf); TestBase.catalog = (HiveCatalog) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java index ea1040dcf0..ad824dbcbc 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -67,6 +68,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.TestCatalogProvider; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; @@ -104,63 +106,95 @@ public abstract class SparkRowLevelOperationsTestBase extends ExtensionsTestBase + " format = {3}, vectorized = {4}, distributionMode = {5}," + " fanout = {6}, branch = {7}, planningMode = {8}") public static Object[][] parameters() { - return new Object[][] { - { - "testhive", - SparkCatalog.class.getName(), - ImmutableMap.of( - "type", "hive", - "default-namespace", "default"), - FileFormat.ORC, - true, - WRITE_DISTRIBUTION_MODE_NONE, - true, - SnapshotRef.MAIN_BRANCH, - LOCAL - }, - { - "testhive", - SparkCatalog.class.getName(), - ImmutableMap.of( - "type", "hive", - "default-namespace", "default"), - FileFormat.PARQUET, - true, - WRITE_DISTRIBUTION_MODE_NONE, - false, - "test", - DISTRIBUTED - }, - { - "testhadoop", - SparkCatalog.class.getName(), - ImmutableMap.of("type", "hadoop"), - FileFormat.PARQUET, - RANDOM.nextBoolean(), - WRITE_DISTRIBUTION_MODE_HASH, - true, - null, - LOCAL - }, - { - "spark_catalog", - SparkSessionCatalog.class.getName(), - ImmutableMap.of( - "type", "hive", - "default-namespace", "default", - "clients", "1", - "parquet-enabled", "false", - "cache-enabled", + List params = new ArrayList<>(); + + if (!Boolean.getBoolean("iceberg.test.catalog.skip.defaults")) { + params.add( + new Object[] { + "testhive", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hive", "default-namespace", "default"), + FileFormat.ORC, + true, + WRITE_DISTRIBUTION_MODE_NONE, + true, + SnapshotRef.MAIN_BRANCH, + LOCAL + }); + params.add( + new Object[] { + "testhive", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hive", "default-namespace", "default"), + FileFormat.PARQUET, + true, + WRITE_DISTRIBUTION_MODE_NONE, + false, + "test", + DISTRIBUTED + }); + params.add( + new Object[] { + "testhadoop", + SparkCatalog.class.getName(), + ImmutableMap.of("type", "hadoop"), + FileFormat.PARQUET, + RANDOM.nextBoolean(), + WRITE_DISTRIBUTION_MODE_HASH, + true, + null, + LOCAL + }); + params.add( + new Object[] { + "spark_catalog", + SparkSessionCatalog.class.getName(), + ImmutableMap.of( + "type", + "hive", + "default-namespace", + "default", + "clients", + "1", + "parquet-enabled", + "false", + "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync - ), - FileFormat.AVRO, - false, - WRITE_DISTRIBUTION_MODE_RANGE, - false, - "test", - DISTRIBUTED - } - }; + ), + FileFormat.AVRO, + false, + WRITE_DISTRIBUTION_MODE_RANGE, + false, + "test", + DISTRIBUTED + }); + } + + loadExternalCatalogProviders() + .forEach( + provider -> { + Object[][] configs = provider.getCatalogConfigurations(); + for (Object[] config : configs) { + String name = (String) config[0]; + String impl = (String) config[1]; + Map props = (Map) config[2]; + + params.add( + new Object[] { + name, + impl, + props, + FileFormat.PARQUET, + true, + WRITE_DISTRIBUTION_MODE_HASH, + true, + null, + LOCAL + }); + } + }); + + return params.toArray(new Object[0][]); } protected abstract Map extraTableProperties(); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java index 022edecc31..a35e0e6d41 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java @@ -29,6 +29,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.TestBaseWithCatalog; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 7357a4683b..901c0225d2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -149,6 +149,11 @@ protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap opti Configuration conf = SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name); Map optionsMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); optionsMap.putAll(options.asCaseSensitiveMap()); + + if (optionsMap.containsKey(CatalogProperties.CATALOG_IMPL)) { + optionsMap.remove(CatalogUtil.ICEBERG_CATALOG_TYPE); + } + optionsMap.put(CatalogProperties.APP_ID, SparkSession.active().sparkContext().applicationId()); optionsMap.put(CatalogProperties.USER, SparkSession.active().sparkContext().sparkUser()); return CatalogUtil.buildIcebergCatalog(name, optionsMap, conf); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java index 33384e3eff..8f4b169dcf 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java @@ -308,7 +308,7 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { && options .get(CatalogUtil.ICEBERG_CATALOG_TYPE) .equalsIgnoreCase(CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE)) { - validateHmsUri(options.get(CatalogProperties.URI)); + // validateHmsUri(options.get(CatalogProperties.URI)); } this.catalogName = name; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java index 6203bf89bf..5695943eab 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java @@ -19,17 +19,17 @@ package org.apache.iceberg.spark; import java.nio.file.Path; -import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.Parameters; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -@ExtendWith(ParameterizedTestExtension.class) public abstract class CatalogTestBase extends TestBaseWithCatalog { - // these parameters are broken out to avoid changes that need to modify lots of test suites - @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") - protected static Object[][] parameters() { + @TempDir protected Path temp; + + /** + * Override to include HIVE and SPARK catalogs in addition to HADOOP. + * External catalog providers are loaded by the parent class. + */ + protected static Object[][] defaultCatalogParameters() { return new Object[][] { { SparkCatalogConfig.HIVE.catalogName(), @@ -48,6 +48,4 @@ protected static Object[][] parameters() { } }; } - - @TempDir protected Path temp; } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java index abfd7da0c7..9d129b908e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java @@ -59,6 +59,31 @@ public enum SparkCatalogConfig { private final String implementation; private final Map properties; + private static final Object[] DYNAMIC_CONFIG = loadDynamicConfig(); + + private static Object[] loadDynamicConfig() { + String providerClass = System.getProperty("iceberg.test.catalog.provider"); + if (providerClass != null && !providerClass.isEmpty()) { + try { + TestCatalogProvider provider = + (TestCatalogProvider) + Class.forName(providerClass).getDeclaredConstructor().newInstance(); + Object[][] configs = provider.getCatalogConfigurations(); + if (configs != null && configs.length > 0) { + for (Object[] config : configs) { + if ("spark_catalog".equals(config[0])) { + return config; + } + } + return configs[0]; + } + } catch (Exception e) { + throw new RuntimeException("Failed to load catalog provider: " + providerClass, e); + } + } + return null; + } + SparkCatalogConfig(String catalogName, String implementation, Map properties) { this.catalogName = catalogName; this.implementation = implementation; @@ -70,10 +95,16 @@ public String catalogName() { } public String implementation() { + if (this == SPARK && DYNAMIC_CONFIG != null) { + return (String) DYNAMIC_CONFIG[1]; + } return implementation; } public Map properties() { + if (this == SPARK && DYNAMIC_CONFIG != null) { + return (Map) DYNAMIC_CONFIG[2]; + } return properties; } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java index e7d5a0f039..46c3a1e6d5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -27,8 +27,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import java.util.TimeZone; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -59,11 +61,15 @@ public abstract class TestBase extends SparkTestHelperBase { + private static final String SPARK_SESSION_PROVIDER_PROPERTY = + "iceberg.test.spark.session.provider"; + protected static TestHiveMetastore metastore = null; protected static HiveConf hiveConf = null; protected static SparkSession spark = null; protected static JavaSparkContext sparkContext = null; protected static HiveCatalog catalog = null; + private static TestSparkSessionProvider sparkSessionProvider = null; @BeforeAll public static void startMetastoreAndSpark() { @@ -71,14 +77,8 @@ public static void startMetastoreAndSpark() { metastore.start(); TestBase.hiveConf = metastore.hiveConf(); - TestBase.spark = - SparkSession.builder() - .master("local[2]") - .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") - .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) - .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") - .enableHiveSupport() - .getOrCreate(); + SparkSession.Builder defaultBuilder = createDefaultSparkBuilder(hiveConf); + TestBase.spark = buildSparkSession(defaultBuilder, hiveConf); TestBase.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -106,6 +106,57 @@ public static void stopMetastoreAndSpark() throws Exception { TestBase.spark = null; TestBase.sparkContext = null; } + if (sparkSessionProvider != null) { + try { + sparkSessionProvider.afterAll(); + } catch (Exception e) { + throw new RuntimeException("Failed to shutdown custom Spark session provider", e); + } finally { + sparkSessionProvider = null; + } + } + } + + protected static SparkSession.Builder createDefaultSparkBuilder(HiveConf conf) { + return SparkSession.builder() + .master("local[2]") + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.hadoop." + METASTOREURIS.varname, conf.get(METASTOREURIS.varname)) + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .enableHiveSupport(); + } + + protected static SparkSession buildSparkSession(SparkSession.Builder builder, HiveConf conf) { + sparkSessionProvider = loadSparkSessionProvider(); + if (sparkSessionProvider == null) { + return builder.getOrCreate(); + } + + try { + sparkSessionProvider.beforeAll(); + SparkSession session = sparkSessionProvider.createSparkSession(builder, conf); + return session != null ? session : builder.getOrCreate(); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize custom Spark session provider", e); + } + } + + private static TestSparkSessionProvider loadSparkSessionProvider() { + // Check system property first for explicit override + String providerClass = System.getProperty(SPARK_SESSION_PROVIDER_PROPERTY); + if (providerClass != null && !providerClass.isEmpty()) { + try { + return (TestSparkSessionProvider) + Class.forName(providerClass).getDeclaredConstructor().newInstance(); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to instantiate " + providerClass, e); + } + } + + // Fall back to SPI discovery + Iterator iterator = + ServiceLoader.load(TestSparkSessionProvider.class).iterator(); + return iterator.hasNext() ? iterator.next() : null; } protected long waitUntilAfter(long timestampMillis) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java index c3c958abf0..2a79983111 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java @@ -22,10 +22,16 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -37,6 +43,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.internal.SQLConf; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -45,10 +52,33 @@ @ExtendWith(ParameterizedTestExtension.class) public abstract class TestBaseWithCatalog extends TestBase { + + private static final String CATALOG_PROVIDER_PROPERTY = "iceberg.test.catalog.provider"; + private static final String SKIP_DEFAULTS_PROPERTY = "iceberg.test.catalog.skip.defaults"; + protected static File warehouse = null; @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") protected static Object[][] parameters() { + List params = new ArrayList<>(); + + if (!Boolean.getBoolean(SKIP_DEFAULTS_PROPERTY)) { + params.addAll(Arrays.asList(defaultCatalogParameters())); + } + + loadExternalCatalogProviders().forEach(provider -> { + try { + provider.beforeAll(); + params.addAll(Arrays.asList(provider.getCatalogConfigurations())); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize catalog provider: " + provider, e); + } + }); + + return params.toArray(new Object[0][]); + } + + protected static Object[][] defaultCatalogParameters() { return new Object[][] { { SparkCatalogConfig.HADOOP.catalogName(), @@ -58,6 +88,27 @@ protected static Object[][] parameters() { }; } + protected static List loadExternalCatalogProviders() { + List providers = new ArrayList<>(); + + // System property takes precedence + String providerClass = System.getProperty(CATALOG_PROVIDER_PROPERTY); + if (providerClass != null && !providerClass.isEmpty()) { + try { + providers.add( + (TestCatalogProvider) + Class.forName(providerClass).getDeclaredConstructor().newInstance()); + } catch (ReflectiveOperationException e) { + throw new RuntimeException("Failed to instantiate " + providerClass, e); + } + } + + // SPI discovery + ServiceLoader.load(TestCatalogProvider.class).forEach(providers::add); + + return providers; + } + @BeforeAll public static void createWarehouse() throws IOException { TestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null); @@ -91,24 +142,49 @@ public static void dropWarehouse() throws IOException { @BeforeEach public void before() { - this.validationCatalog = - catalogName.equals("testhadoop") - ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse) - : catalog; + this.validationCatalog = createValidationCatalog(); this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog; - spark.conf().set("spark.sql.catalog." + catalogName, implementation); + configureCatalog(); + + this.tableName = + (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table"; + + sql("CREATE NAMESPACE IF NOT EXISTS default"); + } + + private Catalog createValidationCatalog() { + String catalogImpl = catalogConfig.get(CatalogProperties.CATALOG_IMPL); + if (catalogImpl != null && !catalogImpl.isEmpty()) { + return CatalogUtil.loadCatalog( + catalogImpl, + catalogName + "-validation", + new HashMap<>(catalogConfig), + spark.sessionState().newHadoopConf()); + } + + return catalogName.equals("testhadoop") + ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse) + : catalog; + } + + private void configureCatalog() { + setSparkConf("spark.sql.catalog." + catalogName, implementation); catalogConfig.forEach( - (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); + (key, value) -> setSparkConf("spark.sql.catalog." + catalogName + "." + key, value)); if ("hadoop".equalsIgnoreCase(catalogConfig.get("type"))) { spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse); } + } - this.tableName = - (catalogName.equals("spark_catalog") ? "" : catalogName + ".") + "default.table"; - - sql("CREATE NAMESPACE IF NOT EXISTS default"); + private void setSparkConf(String key, String value) { + spark.conf().set(key, value); + try { + SQLConf.get().setConfString(key, value); + } catch (IllegalArgumentException ignored) { + // Some keys may not be valid SQLConf keys + } } protected String tableName(String name) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestCatalogProvider.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestCatalogProvider.java new file mode 100644 index 0000000000..27e480ac23 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestCatalogProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +/** + * SPI interface for external catalog providers to inject catalog configurations + * into Iceberg's parameterized tests. + * + *

Implementations can be registered via: + *

    + *
  • System property: -Diceberg.test.catalog.provider=com.example.Provider
  • + *
  • ServiceLoader: META-INF/services/org.apache.iceberg.spark.TestCatalogProvider
  • + *
+ */ +public interface TestCatalogProvider { + + /** + * Returns catalog configurations to test against. + * Each array contains: [catalogName, implementation, configMap] + * + * @return array of catalog configurations + */ + Object[][] getCatalogConfigurations(); + + /** + * Called before any tests run. Use for setup (e.g., starting servers). + * + * @throws Exception if setup fails + */ + default void beforeAll() throws Exception {} + + /** + * Called after all tests complete. Use for cleanup (e.g., stopping servers). + * + * @throws Exception if cleanup fails + */ + default void afterAll() throws Exception {} +} + diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionProvider.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionProvider.java new file mode 100644 index 0000000000..66c7b768af --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkSessionProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.spark.sql.SparkSession; + +/** + * Service Provider Interface that allows downstream integrations (e.g. catalog servers) + * to supply custom {@link SparkSession}s for Iceberg's Spark-based test suites. + * + *

Implementations can be wired using either the {@code iceberg.test.spark.session.provider} + * system property or via {@link java.util.ServiceLoader} registration at + * {@code META-INF/services/org.apache.iceberg.spark.TestSparkSessionProvider}. + * + *

The lifecycle methods mirror JUnit constructs so providers can eagerly start/stop the + * infrastructure they need (embedded services, custom Spark extensions, etc.). + */ +public interface TestSparkSessionProvider { + + /** + * Gives implementations a chance to perform any global setup before a Spark session is created. + */ + default void beforeAll() throws Exception {} + + /** + * Implementations must return the {@link SparkSession} Iceberg tests should use. + * + *

The method receives the default builder Iceberg would normally use along with the HiveConf + * backing the embedded metastore. Implementations can either mutate & invoke the builder or + * ignore it entirely and construct their own session. + * + * @param defaultBuilder pre-configured builder that points at Iceberg's embedded HMS + * @param hiveConf Hive configuration backing the test metastore + * @return a SparkSession ready for running Iceberg's tests + */ + SparkSession createSparkSession(SparkSession.Builder defaultBuilder, HiveConf hiveConf) + throws Exception; + + /** + * Gives implementations a chance to clean up resources after the test Spark session stops. + */ + default void afterAll() throws Exception {} +} + +