diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 8664cf7865d74..c14ad23cab91d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -20,10 +20,12 @@ package org.apache.hudi.testutils; import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieBaseFile; @@ -41,7 +43,6 @@ import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieIOException; @@ -108,6 +109,17 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe @TempDir protected java.nio.file.Path tempDir; + public static Map getSparkSqlConf() { + Map sqlConf = new HashMap<>(); + sqlConf.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"); + + if (HoodieSparkUtils.gteqSpark3_2()) { + sqlConf.put("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog"); + } + + return sqlConf; + } + public String basePath() { return tempDir.toAbsolutePath().toUri().toString(); } diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java index 96c73dd240bc3..8089f86928034 100644 --- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java +++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.testutils.providers.SparkProvider; import org.apache.spark.HoodieSparkKryoRegistrar$; @@ -56,6 +57,11 @@ public SparkSession spark() { return spark; } + @Override + public SparkConf conf() { + return conf(SparkClientFunctionalTestHarness.getSparkSqlConf()); + } + @Override public SQLContext sqlContext() { return sqlContext; diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index cb807319d94a1..7ec65b779b7b6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -29,7 +29,9 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.TimestampBasedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} @@ -58,6 +60,8 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val verificationCol: String = "driver" val updatedVerificationVal: String = "driver_update" + override def conf: SparkConf = conf(getSparkSqlConf) + @ParameterizedTest @CsvSource(value = Array( "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index 8cf6b4174c9f2..107514c1de830 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -26,8 +26,10 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.log4j.LogManager +import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} @@ -57,6 +59,8 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { val verificationCol: String = "driver" val updatedVerificationVal: String = "driver_update" + override def conf: SparkConf = conf(getSparkSqlConf) + @ParameterizedTest @CsvSource(Array( "true,", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala index 9942132aba807..995fa6adb6a74 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala @@ -20,14 +20,18 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.HoodieAvroRecordMerger import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf +import org.apache.spark.SparkConf import org.apache.spark.sql.SaveMode import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{Tag, Test} +import org.junit.jupiter.api.Tag +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + import scala.collection.JavaConverters._ @Tag("functional") @@ -45,8 +49,11 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) - @Test - def testReadability(): Unit = { + override def conf: SparkConf = conf(getSparkSqlConf) + + @ParameterizedTest + @ValueSource(ints = Array(1, 5)) + def testReadability(compactNumDeltaCommits: Int): Unit = { val dataGen = new HoodieTestDataGenerator() val metadataOpts: Map[String, String] = Map( @@ -55,7 +62,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn ) val combinedOpts: Map[String, String] = commonOpts ++ metadataOpts ++ - Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1") + Map(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> compactNumDeltaCommits.toString) // Insert records val newRecords = dataGen.generateInserts("001", 100) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index 66d63d4a871aa..732ad4ccc89bc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -27,8 +27,10 @@ import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenera import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD} import org.apache.parquet.hadoop.util.counters.BenchmarkCounter +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode} @@ -55,6 +57,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName ) + override def conf: SparkConf = conf(getSparkSqlConf) + @Disabled("Currently disabled b/c of the fallback to HadoopFsRelation") @Test def testBaseFileOnlyViewRelation(): Unit = {