diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java index 578a63b194b14..6c94c1c54d71d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java @@ -33,6 +33,8 @@ protected HoodieFileReader newParquetFileReader(Configuration conf, Path path) { conf.setIfUnset(SQLConf.PARQUET_BINARY_AS_STRING().key(), SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()); conf.setIfUnset(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()); conf.setIfUnset(SQLConf.CASE_SENSITIVE().key(), SQLConf.CASE_SENSITIVE().defaultValueString()); + // Using string value of this conf to preserve compatibility across spark versions. + conf.setIfUnset("spark.sql.legacy.parquet.nanosAsLong", "false"); return new HoodieSparkParquetReader(conf, path); } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 290b232598cc6..f5e48f948fc37 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -58,6 +58,7 @@ private[hudi] trait SparkVersionsSupport { def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1" def gteqSpark3_2_2: Boolean = getSparkVersion >= "3.2.2" def gteqSpark3_3: Boolean = getSparkVersion >= "3.3" + def gteqSpark3_3_2: Boolean = getSparkVersion >= "3.3.2" } object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport with Logging { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 1ab2a23cd04bc..d2832362ba9cd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -75,7 +75,10 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext, override def imbueConfigs(sqlContext: SQLContext): Unit = { super.imbueConfigs(sqlContext) - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") + // TODO Issue with setting this to true in spark 332 + if (!HoodieSparkUtils.gteqSpark3_3_2) { + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") + } } protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala index 112f55cfc16a7..5026809fb42ad 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala @@ -95,7 +95,11 @@ class Spark32PlusHoodieParquetFileFormat(private val shouldAppendPartitionValues hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) - + // Using string value of this conf to preserve compatibility across spark versions. + hadoopConf.setBoolean( + "spark.sql.legacy.parquet.nanosAsLong", + sparkSession.sessionState.conf.getConfString("spark.sql.legacy.parquet.nanosAsLong", "false").toBoolean + ) val internalSchemaStr = hadoopConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself, // therefore it's safe to do schema projection here diff --git a/pom.xml b/pom.xml index 015ba78b29f75..ab52bdf35f529 100644 --- a/pom.xml +++ b/pom.xml @@ -2213,6 +2213,7 @@ ${spark33.version} ${spark3.version} 3 + 2.12.15 ${scala12.version} 2.12 hudi-spark3.3.x @@ -2337,6 +2338,7 @@ ${spark33.version} ${spark3.version} 3.3 + 2.12.15 ${scala12.version} 2.12 hudi-spark3.3.x