From 50705a022d1d6e40e5526a2f0d4b2d8794f177f7 Mon Sep 17 00:00:00 2001 From: Yann Date: Mon, 14 Feb 2022 22:28:55 +0800 Subject: [PATCH 1/4] [HUDI-3423] upgrade spark to 3.2.1 --- README.md | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index af11e6a14d5df..6d3475755ff87 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ mvn clean package -DskipTests -Dscala-2.12 The default Spark version supported is 2.4.4. To build for different Spark 3 versions, use the corresponding profile ``` -# Build against Spark 3.2.0 (the default build shipped with the public Spark 3 bundle) +# Build against Spark 3.2.1 (the default build shipped with the public Spark 3 bundle) mvn clean package -DskipTests -Dspark3 # Build against Spark 3.1.2 diff --git a/pom.xml b/pom.xml index 8ebf7a9ef80de..acd32dc14369e 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ 1.14.3 2.4.4 - 3.2.0 + 3.2.1 hudi-spark2 hudi-spark2-common 1.8.2 From 4e192eaff556265d038da9fdbaae0bf4a89153aa Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 15 Feb 2022 10:29:20 +0800 Subject: [PATCH 2/4] [MINOR] upgrade parquet/orc version --- pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index acd32dc14369e..37f38f11df9fc 100644 --- a/pom.xml +++ b/pom.xml @@ -1586,8 +1586,9 @@ hudi-spark3-common 3.1.0 2.4.1 - 1.12.1 + 1.12.2 1.10.2 + 1.6.12 ${fasterxml.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} From 2ca25230209c0233a35efb3465266cace83f9103 Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 15 Feb 2022 17:43:05 +0800 Subject: [PATCH 3/4] [MINOR] fix spark-datasource UTs --- .../main/scala/org/apache/hudi/HoodieSparkUtils.scala | 2 ++ .../scala/org/apache/hudi/TestHoodieSparkUtils.scala | 2 +- .../org/apache/hudi/functional/TestMORDataSource.scala | 10 +++++++--- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index c763c264cf8f9..360632fe0a71e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -57,6 +57,8 @@ object HoodieSparkUtils extends SparkAdapterSupport { def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2") + def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2" + def beforeSpark3_2(): Boolean = { if (isSpark2 || isSpark3_0 || isSpark3_1) { true diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index 28cd59cb08cd8..acab7dae0a44f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -191,7 +191,7 @@ class TestHoodieSparkUtils { val genRecRDD3 = HoodieSparkUtils.createRdd(df1, "test_struct_name", "test_namespace", true, org.apache.hudi.common.util.Option.of(schema2)) assert(genRecRDD3.collect()(0).getSchema.equals(schema2)) - genRecRDD3.foreach(entry => assertNull(entry.get("nonNullableInnerStruct2"))) + genRecRDD3.foreach(entry => assertNull(entry.get("nullableInnerStruct2"))) val innerStruct3 = new StructType().add("innerKey","string",false).add("innerValue", "long", true) .add("new_nested_col","string",true) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 7b51e5a623543..2ef59872a0ae8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -30,8 +30,7 @@ import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} - +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} import org.apache.log4j.LogManager import org.apache.spark.sql._ @@ -557,7 +556,12 @@ class TestMORDataSource extends HoodieClientTestBase { assertEquals(sampleRow.getLong(1), sampleRow.get(1)) assertEquals(sampleRow.getString(2), sampleRow.get(2)) assertEquals(sampleRow.getSeq(3), sampleRow.get(3)) - assertEquals(sampleRow.getStruct(4), sampleRow.get(4)) + if (HoodieSparkUtils.gteqSpark3_2) { + // Since Spark3.2, the `nation` column is parsed as String, not Struct. + assertEquals(sampleRow.getString(4), sampleRow.get(4)) + } else { + assertEquals(sampleRow.getStruct(4), sampleRow.get(4)) + } } def verifyShow(df: DataFrame): Unit = { From 298b53fa3cf691a84e274ee081f29428aa5968b0 Mon Sep 17 00:00:00 2001 From: Yann Date: Mon, 21 Feb 2022 21:38:06 +0800 Subject: [PATCH 4/4] [MINOR] combine gteqSpark3_2 and beforeSpark3_2 in HoodieSparkUtils --- .../src/main/scala/org/apache/hudi/HoodieSparkUtils.scala | 8 -------- .../apache/spark/sql/hudi/analysis/HoodieAnalysis.scala | 4 ++-- .../scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +- .../org/apache/spark/sql/hudi/TestHoodieSqlBase.scala | 2 +- 4 files changed, 4 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 360632fe0a71e..2db0cf2269213 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -59,14 +59,6 @@ object HoodieSparkUtils extends SparkAdapterSupport { def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2" - def beforeSpark3_2(): Boolean = { - if (isSpark2 || isSpark3_0 || isSpark3_1) { - true - } else { - false - } - } - def getMetaSchema: StructType = { StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { StructField(col, StringType, nullable = true) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 5b44bc6d8971a..12bbd64851001 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -50,7 +50,7 @@ object HoodieAnalysis { ) ++ extraPostHocResolutionRules() def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = { - if (!HoodieSparkUtils.beforeSpark3_2()) { + if (HoodieSparkUtils.gteqSpark3_2) { val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis" val spark3Analysis: SparkSession => Rule[LogicalPlan] = session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]] @@ -66,7 +66,7 @@ object HoodieAnalysis { } def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = - if (!HoodieSparkUtils.beforeSpark3_2()) { + if (HoodieSparkUtils.gteqSpark3_2) { val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule" val spark3PostHocResolution: SparkSession => Rule[LogicalPlan] = session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]] diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index edcefb531832d..beabfbe9bfd14 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -95,7 +95,7 @@ class TestHoodieSparkSqlWriter { */ def initSparkContext(): Unit = { val sparkConf = new SparkConf() - if (!HoodieSparkUtils.beforeSpark3_2()) { + if (HoodieSparkUtils.gteqSpark3_2) { sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index a222b91d49783..ca3919599b6fa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -58,7 +58,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { def sparkConf(): SparkConf = { val sparkConf = new SparkConf() - if (!HoodieSparkUtils.beforeSpark3_2()) { + if (HoodieSparkUtils.gteqSpark3_2) { sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") }