diff --git a/README.md b/README.md index af11e6a14d5df..6d3475755ff87 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ mvn clean package -DskipTests -Dscala-2.12 The default Spark version supported is 2.4.4. To build for different Spark 3 versions, use the corresponding profile ``` -# Build against Spark 3.2.0 (the default build shipped with the public Spark 3 bundle) +# Build against Spark 3.2.1 (the default build shipped with the public Spark 3 bundle) mvn clean package -DskipTests -Dspark3 # Build against Spark 3.1.2 diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index c763c264cf8f9..2db0cf2269213 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -57,13 +57,7 @@ object HoodieSparkUtils extends SparkAdapterSupport { def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2") - def beforeSpark3_2(): Boolean = { - if (isSpark2 || isSpark3_0 || isSpark3_1) { - true - } else { - false - } - } + def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2" def getMetaSchema: StructType = { StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 5b44bc6d8971a..12bbd64851001 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -50,7 +50,7 @@ object HoodieAnalysis { ) ++ extraPostHocResolutionRules() def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = { - if (!HoodieSparkUtils.beforeSpark3_2()) { + if (HoodieSparkUtils.gteqSpark3_2) { val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis" val spark3Analysis: SparkSession => Rule[LogicalPlan] = session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]] @@ -66,7 +66,7 @@ object HoodieAnalysis { } def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = - if (!HoodieSparkUtils.beforeSpark3_2()) { + if (HoodieSparkUtils.gteqSpark3_2) { val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule" val spark3PostHocResolution: SparkSession => Rule[LogicalPlan] = session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index edcefb531832d..beabfbe9bfd14 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -95,7 +95,7 @@ class TestHoodieSparkSqlWriter { */ def initSparkContext(): Unit = { val sparkConf = new SparkConf() - if (!HoodieSparkUtils.beforeSpark3_2()) { + if (HoodieSparkUtils.gteqSpark3_2) { sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index 28cd59cb08cd8..acab7dae0a44f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -191,7 +191,7 @@ class TestHoodieSparkUtils { val genRecRDD3 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, org.apache.hudi.common.util.Option.of(schema2)) assert(genRecRDD3.collect()(0).getSchema.equals(schema2)) - genRecRDD3.foreach(entry => assertNull(entry.get("nonNullableInnerStruct2"))) + genRecRDD3.foreach(entry => assertNull(entry.get("nullableInnerStruct2"))) val innerStruct3 = new StructType().add("innerKey","string",false).add("innerValue", "long", true) .add("new_nested_col","string",true) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 7b51e5a623543..2ef59872a0ae8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -30,8 +30,7 @@ import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} - +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} import org.apache.log4j.LogManager import org.apache.spark.sql._ @@ -557,7 +556,12 @@ class TestMORDataSource extends HoodieClientTestBase { assertEquals(sampleRow.getLong(1), sampleRow.get(1)) assertEquals(sampleRow.getString(2), sampleRow.get(2)) assertEquals(sampleRow.getSeq(3), sampleRow.get(3)) - assertEquals(sampleRow.getStruct(4), sampleRow.get(4)) + if (HoodieSparkUtils.gteqSpark3_2) { + // Since Spark3.2, the `nation` column is parsed as String, not Struct. + assertEquals(sampleRow.getString(4), sampleRow.get(4)) + } else { + assertEquals(sampleRow.getStruct(4), sampleRow.get(4)) + } } def verifyShow(df: DataFrame): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index a222b91d49783..ca3919599b6fa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -58,7 +58,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { def sparkConf(): SparkConf = { val sparkConf = new SparkConf() - if (!HoodieSparkUtils.beforeSpark3_2()) { + if (HoodieSparkUtils.gteqSpark3_2) { sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") } diff --git a/pom.xml b/pom.xml index 8ebf7a9ef80de..37f38f11df9fc 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ 1.14.3 2.4.4 - 3.2.0 + 3.2.1 hudi-spark2 hudi-spark2-common 1.8.2 @@ -1586,8 +1586,9 @@ hudi-spark3-common 3.1.0 2.4.1 - 1.12.1 + 1.12.2 1.10.2 + 1.6.12 ${fasterxml.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version}