From 7e4bcb9d3c18b7c948cc43489e7181915b160dda Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 13:14:40 -0700 Subject: [PATCH 01/15] Re-use `ReflectionUtils` --- .../parquet/Spark312HoodieParquetFileFormat.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala index 769373866ff34..6061edd522855 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala @@ -25,7 +25,7 @@ import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.util.StringUtils.isNullOrEmpty -import org.apache.hudi.common.util.{InternalSchemaCache, StringUtils} +import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils, StringUtils} import org.apache.hudi.common.util.collection.Pair import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger @@ -343,10 +343,9 @@ object Spark312HoodieParquetFileFormat { } } - private def createParquetFilters(arg: Any*): ParquetFilters = { - val clazz = Class.forName(PARQUET_FILTERS_CLASS_NAME, true, Thread.currentThread().getContextClassLoader) - val ctor = clazz.getConstructors.head - ctor.newInstance(arg.map(_.asInstanceOf[AnyRef]): _*).asInstanceOf[ParquetFilters] + private def createParquetFilters(args: Any*): ParquetFilters = { + val instance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) + instance.asInstanceOf[ParquetFilters] } private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = { From f2e5fd6cd232f8a5a5fe8bbe7a01c1c97d576993 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 14:27:42 -0700 Subject: [PATCH 02/15] Handle incompatibilities b/w Spark 3.2.0 and 3.2.1 in `Spark32HoodieParquetFileFormat` --- .../org/apache/hudi/HoodieSparkUtils.scala | 8 +- .../Spark32HoodieParquetFileFormat.scala | 183 +++++++++++++++--- 2 files changed, 160 insertions(+), 31 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 54bc06bd76201..7a8f8a1580d97 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -53,13 +53,15 @@ object HoodieSparkUtils extends SparkAdapterSupport { def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1") + def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1" + + def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3" + def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2") def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2" - def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1" - - def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3" + def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1" def getMetaSchema: StructType = { StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index f2a0a21df830f..dfeedd7ae4c91 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -22,11 +22,12 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -35,17 +36,18 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} import org.apache.spark.TaskContext -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet} +import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} +import org.apache.spark.util.{SerializableConfiguration, Utils} import java.net.URI @@ -158,21 +160,38 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo lazy val footerFileMetaData = ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringStartWith, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) + val parquetFilters = if (HoodieSparkUtils.gteqSpark3_2_1) { + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + } else { + // Spark 3.2.0 + val datetimeRebaseMode = + Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + createParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) + } filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null))) // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -198,10 +217,6 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo None } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - int96RebaseModeInRead) - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) // Clone new conf @@ -225,6 +240,10 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo if (enableVectorizedReader) { val vectorizedReader = if (shouldUseInternalSchema) { + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) new Spark32HoodieVectorizedParquetRecordReader( convertTz.orNull, datetimeRebaseSpec.mode.toString, @@ -234,7 +253,14 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo enableOffHeapColumnVector && taskContext.isDefined, capacity, typeChangeInfos) - } else { + } else if (HoodieSparkUtils.gteqSpark3_2_1) { + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) new VectorizedParquetRecordReader( convertTz.orNull, datetimeRebaseSpec.mode.toString, @@ -243,7 +269,20 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo int96RebaseSpec.timeZone, enableOffHeapColumnVector && taskContext.isDefined, capacity) + } else { + // Spark 3.2.0 + val datetimeRebaseMode = + Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + val int96RebaseMode = + Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + createVectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + int96RebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) } + // SPARK-37089: We cannot register a task completion listener to close this iterator here // because downstream exec nodes have already registered their listeners. Since listeners // are executed in reverse order of registration, a listener registered here would close the @@ -279,12 +318,32 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } else { logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) + val readSupport = if (HoodieSparkUtils.gteqSpark3_2_1) { + // ParquetRecordReader returns InternalRow + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + } else { + val datetimeRebaseMode = + Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + val int96RebaseMode = + Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + createParquetReadSupport( + convertTz, + /* enableVectorizedReader = */ false, + datetimeRebaseMode, + int96RebaseMode) + } + val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -332,10 +391,78 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } } + } object Spark32HoodieParquetFileFormat { + private val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters" + private val PARQUET_VECTORIZED_READER_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader" + private val PARQUET_READ_SUPPORT_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport" + + private def createParquetFilters(args: Any*): ParquetFilters = { + val parquetFiltersInstance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) + parquetFiltersInstance.asInstanceOf[ParquetFilters] + } + + private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { + val vectorizedRecordReader = + ReflectionUtils.loadClass(PARQUET_VECTORIZED_READER_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) + vectorizedRecordReader.asInstanceOf[VectorizedParquetRecordReader] + } + + private def createParquetReadSupport(args: Any*): ParquetReadSupport = { + val parquetReadSupport = + ReflectionUtils.loadClass(PARQUET_READ_SUPPORT_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) + parquetReadSupport.asInstanceOf[ParquetReadSupport] + } + + // TODO scala-doc + // Spark 3.2.0 + // scalastyle:off + def int96RebaseMode(lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return LegacyBehaviorPolicy.CORRECTED + } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to + // rebase the INT96 timestamp values. + // Files written by Spark 3.1 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + // scalastyle:on + + // TODO scala-doc + // Spark 3.2.0 + // scalastyle:off + def datetimeRebaseMode(lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return LegacyBehaviorPolicy.CORRECTED + } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to + // rebase the datetime values. + // Files written by Spark 3.0 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + // scalastyle:on + def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) if (querySchemaOption.isPresent && requiredSchema.nonEmpty) { From 07435f47abf78d3e4fb6f821ff2df4f8c9834028 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 14:44:46 -0700 Subject: [PATCH 03/15] Extracted `int96RebaseMode`, `datetimeRebaseMode` into `Spark32DataSourceUtils` --- .../parquet/Spark32DataSourceUtils.scala | 77 +++++++++++++++++++ .../Spark32HoodieParquetFileFormat.scala | 56 ++------------ 2 files changed, 82 insertions(+), 51 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala new file mode 100644 index 0000000000000..6d1c76380f216 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.util.Utils + +object Spark32DataSourceUtils { + + /** + * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime + * compatibility against Spark 3.2.0 + */ + // scalastyle:off + def int96RebaseMode(lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return LegacyBehaviorPolicy.CORRECTED + } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to + // rebase the INT96 timestamp values. + // Files written by Spark 3.1 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + // scalastyle:on + + /** + * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime + * compatibility against Spark 3.2.0 + */ + // scalastyle:off + def datetimeRebaseMode(lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return LegacyBehaviorPolicy.CORRECTED + } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to + // rebase the datetime values. + // Files written by Spark 3.0 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + // scalastyle:on + +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index dfeedd7ae4c91..99cb83cf511ae 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -181,7 +181,7 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } else { // Spark 3.2.0 val datetimeRebaseMode = - Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) createParquetFilters( parquetSchema, pushDownDate, @@ -272,9 +272,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } else { // Spark 3.2.0 val datetimeRebaseMode = - Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) val int96RebaseMode = - Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) createVectorizedParquetRecordReader( convertTz.orNull, datetimeRebaseMode.toString, @@ -334,9 +334,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo int96RebaseSpec) } else { val datetimeRebaseMode = - Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) val int96RebaseMode = - Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) createParquetReadSupport( convertTz, /* enableVectorizedReader = */ false, @@ -417,52 +417,6 @@ object Spark32HoodieParquetFileFormat { parquetReadSupport.asInstanceOf[ParquetReadSupport] } - // TODO scala-doc - // Spark 3.2.0 - // scalastyle:off - def int96RebaseMode(lookupFileMeta: String => String, - modeByConfig: String): LegacyBehaviorPolicy.Value = { - if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { - return LegacyBehaviorPolicy.CORRECTED - } - // If there is no version, we return the mode specified by the config. - Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => - // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to - // rebase the INT96 timestamp values. - // Files written by Spark 3.1 and latter may also need the rebase if they were written with - // the "LEGACY" rebase mode. - if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) { - LegacyBehaviorPolicy.LEGACY - } else { - LegacyBehaviorPolicy.CORRECTED - } - }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) - } - // scalastyle:on - - // TODO scala-doc - // Spark 3.2.0 - // scalastyle:off - def datetimeRebaseMode(lookupFileMeta: String => String, - modeByConfig: String): LegacyBehaviorPolicy.Value = { - if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { - return LegacyBehaviorPolicy.CORRECTED - } - // If there is no version, we return the mode specified by the config. - Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => - // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to - // rebase the datetime values. - // Files written by Spark 3.0 and latter may also need the rebase if they were written with - // the "LEGACY" rebase mode. - if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) { - LegacyBehaviorPolicy.LEGACY - } else { - LegacyBehaviorPolicy.CORRECTED - } - }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) - } - // scalastyle:on - def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) if (querySchemaOption.isPresent && requiredSchema.nonEmpty) { From 9252bdbed234bb67d9225c5a734d83cfe016414a Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 15:19:58 -0700 Subject: [PATCH 04/15] Added new reflection utisl to instantiate already loaded class --- .../hudi/common/util/ReflectionUtils.java | 44 +++++++++++++++---- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index a4ef09641d50c..ec361d9f9a718 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -38,6 +38,8 @@ import java.util.Objects; import java.util.stream.Stream; +import static org.apache.hudi.TypeUtils.unsafeCast; + /** * A utility class for reflection. */ @@ -85,11 +87,14 @@ public static T loadPayload(String recordPayload * Creates an instance of the given class. Use this version when dealing with interface types as constructor args. */ public static Object loadClass(String clazz, Class[] constructorArgTypes, Object... constructorArgs) { - try { - return getClass(clazz).getConstructor(constructorArgTypes).newInstance(constructorArgs); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new HoodieException("Unable to instantiate class " + clazz, e); - } + return newInstanceUnchecked(getClass(clazz), constructorArgTypes, constructorArgs); + } + + /** + * Creates an instance of the given class. Constructor arg types are inferred. + */ + public static Object loadClass(String clazz, Object... constructorArgs) { + return newInstanceUnchecked(getClass(clazz), constructorArgs); } /** @@ -111,11 +116,32 @@ public static boolean hasConstructor(String clazz, Class[] constructorArgType } /** - * Creates an instance of the given class. Constructor arg types are inferred. + * Creates a new instance of provided {@link Class} by invoking ctor identified by the + * provided specific arguments + * + * @param klass target class to instantiate + * @param ctorArgs specific constructor arguments + * @return new instance of the class */ - public static Object loadClass(String clazz, Object... constructorArgs) { - Class[] constructorArgTypes = Arrays.stream(constructorArgs).map(Object::getClass).toArray(Class[]::new); - return loadClass(clazz, constructorArgTypes, constructorArgs); + public static T newInstanceUnchecked(Class klass, Object ...ctorArgs) { + Class[] ctorArgTypes = Arrays.stream(ctorArgs).map(Object::getClass).toArray(Class[]::new); + return newInstanceUnchecked(klass, ctorArgTypes, ctorArgs); + } + + /** + * Creates a new instance of provided {@link Class} by invoking ctor identified by the + * provided specific arguments + * + * @param klass target class to instantiate + * @param ctorArgs specific constructor arguments + * @return new instance of the class + */ + public static T newInstanceUnchecked(Class klass, Class[] ctorArgTypes, Object ...ctorArgs) { + try { + return unsafeCast(klass.getConstructor(ctorArgTypes).newInstance(ctorArgs)); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new HoodieException(String.format("Unable to instantiate class %s", klass.getSimpleName()), e); + } } /** From a881e382c7e31ee145c5436567b3902436f719f6 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 15:20:40 -0700 Subject: [PATCH 05/15] Fixed instantiation of the components t/h reflection --- .../hudi/common/util/ReflectionUtils.java | 4 +-- .../Spark312HoodieParquetFileFormat.scala | 8 ++--- .../Spark32HoodieParquetFileFormat.scala | 33 +++++++++---------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index ec361d9f9a718..13228c440c003 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -123,7 +123,7 @@ public static boolean hasConstructor(String clazz, Class[] constructorArgType * @param ctorArgs specific constructor arguments * @return new instance of the class */ - public static T newInstanceUnchecked(Class klass, Object ...ctorArgs) { + public static T newInstanceUnchecked(Class klass, Object... ctorArgs) { Class[] ctorArgTypes = Arrays.stream(ctorArgs).map(Object::getClass).toArray(Class[]::new); return newInstanceUnchecked(klass, ctorArgTypes, ctorArgs); } @@ -136,7 +136,7 @@ public static T newInstanceUnchecked(Class klass, Object ...ctorArgs) { * @param ctorArgs specific constructor arguments * @return new instance of the class */ - public static T newInstanceUnchecked(Class klass, Class[] ctorArgTypes, Object ...ctorArgs) { + public static T newInstanceUnchecked(Class klass, Class[] ctorArgTypes, Object... ctorArgs) { try { return unsafeCast(klass.getConstructor(ctorArgTypes).newInstance(ctorArgs)); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala index 6061edd522855..4c9902a3c4995 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala @@ -331,8 +331,6 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B object Spark312HoodieParquetFileFormat { - val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters" - def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) if (querySchemaOption.isPresent && requiredSchema.nonEmpty) { @@ -344,8 +342,10 @@ object Spark312HoodieParquetFileFormat { } private def createParquetFilters(args: Any*): ParquetFilters = { - val instance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) - instance.asInstanceOf[ParquetFilters] + // ParquetFilters bears a single ctor (in Spark 3.1) + val ctor = classOf[ParquetFilters].getConstructors.head + ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) + .asInstanceOf[ParquetFilters] } private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = { diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index 99cb83cf511ae..351203ca582ad 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -36,6 +36,7 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} import org.apache.spark.TaskContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} @@ -43,11 +44,9 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} -import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.SerializableConfiguration import java.net.URI @@ -396,27 +395,25 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo object Spark32HoodieParquetFileFormat { - private val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters" - private val PARQUET_VECTORIZED_READER_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader" - private val PARQUET_READ_SUPPORT_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport" - private def createParquetFilters(args: Any*): ParquetFilters = { - val parquetFiltersInstance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) - parquetFiltersInstance.asInstanceOf[ParquetFilters] - } - - private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { - val vectorizedRecordReader = - ReflectionUtils.loadClass(PARQUET_VECTORIZED_READER_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) - vectorizedRecordReader.asInstanceOf[VectorizedParquetRecordReader] + // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it + // up by arg types, and have to instead rely on relative order of ctors + val ctor = classOf[ParquetFilters].getConstructors.head + ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) + .asInstanceOf[ParquetFilters] } private def createParquetReadSupport(args: Any*): ParquetReadSupport = { - val parquetReadSupport = - ReflectionUtils.loadClass(PARQUET_READ_SUPPORT_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) - parquetReadSupport.asInstanceOf[ParquetReadSupport] + // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it + // up by arg types, and have to instead rely on relative order of ctors + val ctor = classOf[ParquetReadSupport].getConstructors.head + ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) + .asInstanceOf[ParquetReadSupport] } + private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = + ReflectionUtils.newInstanceUnchecked(classOf[VectorizedParquetRecordReader], args.map(_.asInstanceOf[AnyRef]): _*) + def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) if (querySchemaOption.isPresent && requiredSchema.nonEmpty) { From c42106594eb8b75d215041119c099fa73fe1016a Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 15:29:18 -0700 Subject: [PATCH 06/15] Fixing NPEs --- .../java/org/apache/hudi/common/util/ReflectionUtils.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 13228c440c003..8ac4b0fa58600 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -124,7 +124,9 @@ public static boolean hasConstructor(String clazz, Class[] constructorArgType * @return new instance of the class */ public static T newInstanceUnchecked(Class klass, Object... ctorArgs) { - Class[] ctorArgTypes = Arrays.stream(ctorArgs).map(Object::getClass).toArray(Class[]::new); + Class[] ctorArgTypes = Arrays.stream(ctorArgs) + .map(arg -> Objects.requireNonNull(arg).getClass()) + .toArray(Class[]::new); return newInstanceUnchecked(klass, ctorArgTypes, ctorArgs); } From 69c7be01e3a86c05647bab831073e2aea9265c69 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 15:32:48 -0700 Subject: [PATCH 07/15] Fallback to pick ctors positionally as opposed to looking them up --- .../parquet/Spark32HoodieParquetFileFormat.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index 351203ca582ad..ccd93b6fd348e 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -411,8 +411,11 @@ object Spark32HoodieParquetFileFormat { .asInstanceOf[ParquetReadSupport] } - private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = - ReflectionUtils.newInstanceUnchecked(classOf[VectorizedParquetRecordReader], args.map(_.asInstanceOf[AnyRef]): _*) + private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { + val ctor = classOf[VectorizedParquetRecordReader].getConstructors.head + ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) + .asInstanceOf[VectorizedParquetRecordReader] + } def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) From cff4952461b74a79ae4e280d101c13b0dc6b2926 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 15:34:05 -0700 Subject: [PATCH 08/15] `Spark312HoodieParquetFileFormat` > `Spark31HoodieParquetFileFormat` --- .../org/apache/spark/sql/adapter/Spark3_1Adapter.scala | 4 ++-- ...ileFormat.scala => Spark31HoodieParquetFileFormat.scala} | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) rename hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/{Spark312HoodieParquetFileFormat.scala => Spark31HoodieParquetFileFormat.scala} (98%) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index cd5cd9c82fbec..22431cb2574a3 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -23,7 +23,7 @@ import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, HoodieSpark3_1AvroSerializer} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark312HoodieParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark31HoodieParquetFileFormat} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieSpark3_1CatalystExpressionUtils, SparkSession} @@ -55,6 +55,6 @@ class Spark3_1Adapter extends BaseSpark3Adapter { } override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { - Some(new Spark312HoodieParquetFileFormat(appendPartitionValues)) + Some(new Spark31HoodieParquetFileFormat(appendPartitionValues)) } } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala rename to hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala index 4c9902a3c4995..30e7b9c78a4c1 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.Spark312HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet} +import org.apache.spark.sql.execution.datasources.parquet.Spark31HoodieParquetFileFormat.{createParquetFilters, pruneInternalSchema, rebuildFilterFromParquet} import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -61,7 +61,7 @@ import java.net.URI *
  • Schema on-read
  • * */ -class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { +class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat { override def buildReaderWithPartitionValues(sparkSession: SparkSession, dataSchema: StructType, @@ -329,7 +329,7 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B } } -object Spark312HoodieParquetFileFormat { +object Spark31HoodieParquetFileFormat { def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) From c9ab84e0ff2904b4ecad8ac88190510fcdf3635f Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 20 Apr 2022 22:22:47 -0700 Subject: [PATCH 09/15] Revert "Cleaning up `ReflectionUtils`" This reverts commit 6775bded7861bef7930124477440abe72e004ad4. --- .../hudi/common/util/ReflectionUtils.java | 46 ++++--------------- 1 file changed, 9 insertions(+), 37 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 8ac4b0fa58600..a4ef09641d50c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -38,8 +38,6 @@ import java.util.Objects; import java.util.stream.Stream; -import static org.apache.hudi.TypeUtils.unsafeCast; - /** * A utility class for reflection. */ @@ -87,14 +85,11 @@ public static T loadPayload(String recordPayload * Creates an instance of the given class. Use this version when dealing with interface types as constructor args. */ public static Object loadClass(String clazz, Class[] constructorArgTypes, Object... constructorArgs) { - return newInstanceUnchecked(getClass(clazz), constructorArgTypes, constructorArgs); - } - - /** - * Creates an instance of the given class. Constructor arg types are inferred. - */ - public static Object loadClass(String clazz, Object... constructorArgs) { - return newInstanceUnchecked(getClass(clazz), constructorArgs); + try { + return getClass(clazz).getConstructor(constructorArgTypes).newInstance(constructorArgs); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new HoodieException("Unable to instantiate class " + clazz, e); + } } /** @@ -116,34 +111,11 @@ public static boolean hasConstructor(String clazz, Class[] constructorArgType } /** - * Creates a new instance of provided {@link Class} by invoking ctor identified by the - * provided specific arguments - * - * @param klass target class to instantiate - * @param ctorArgs specific constructor arguments - * @return new instance of the class - */ - public static T newInstanceUnchecked(Class klass, Object... ctorArgs) { - Class[] ctorArgTypes = Arrays.stream(ctorArgs) - .map(arg -> Objects.requireNonNull(arg).getClass()) - .toArray(Class[]::new); - return newInstanceUnchecked(klass, ctorArgTypes, ctorArgs); - } - - /** - * Creates a new instance of provided {@link Class} by invoking ctor identified by the - * provided specific arguments - * - * @param klass target class to instantiate - * @param ctorArgs specific constructor arguments - * @return new instance of the class + * Creates an instance of the given class. Constructor arg types are inferred. */ - public static T newInstanceUnchecked(Class klass, Class[] ctorArgTypes, Object... ctorArgs) { - try { - return unsafeCast(klass.getConstructor(ctorArgTypes).newInstance(ctorArgs)); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new HoodieException(String.format("Unable to instantiate class %s", klass.getSimpleName()), e); - } + public static Object loadClass(String clazz, Object... constructorArgs) { + Class[] constructorArgTypes = Arrays.stream(constructorArgs).map(Object::getClass).toArray(Class[]::new); + return loadClass(clazz, constructorArgTypes, constructorArgs); } /** From 7b815f2c3649d15e56fc62569728dd8f8a3f9f74 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Thu, 21 Apr 2022 20:45:22 +0800 Subject: [PATCH 10/15] use tail to get VectorizedParquetRecordReader ctor --- .../parquet/Spark32HoodieParquetFileFormat.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index ccd93b6fd348e..5d9d2a737db83 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -412,7 +412,11 @@ object Spark32HoodieParquetFileFormat { } private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { - val ctor = classOf[VectorizedParquetRecordReader].getConstructors.head + // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it + // up by arg types, and have to instead rely on relative order of ctors + // NOTE: VectorizedParquetRecordReader has 2 ctors and the one we need is 2nd on the array + // This is a hacky workaround for the fixed version of Class. + val ctor = classOf[VectorizedParquetRecordReader].getConstructors.tail ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) .asInstanceOf[VectorizedParquetRecordReader] } From 29f20a1125935e8746c0639d53c2eb28c872399a Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Thu, 21 Apr 2022 21:36:04 +0800 Subject: [PATCH 11/15] fix tail misuse --- .../datasources/parquet/Spark32HoodieParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index 5d9d2a737db83..3ba5d38623db9 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -416,7 +416,7 @@ object Spark32HoodieParquetFileFormat { // up by arg types, and have to instead rely on relative order of ctors // NOTE: VectorizedParquetRecordReader has 2 ctors and the one we need is 2nd on the array // This is a hacky workaround for the fixed version of Class. - val ctor = classOf[VectorizedParquetRecordReader].getConstructors.tail + val ctor = classOf[VectorizedParquetRecordReader].getConstructors.last ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) .asInstanceOf[VectorizedParquetRecordReader] } From 5372ecf0ba92c500665f9545a20fcdb292ebd66f Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Thu, 21 Apr 2022 22:05:03 +0800 Subject: [PATCH 12/15] fix undeterministic ctor order --- .../Spark32HoodieParquetFileFormat.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index 3ba5d38623db9..ec522fb31b2d5 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -397,26 +397,27 @@ object Spark32HoodieParquetFileFormat { private def createParquetFilters(args: Any*): ParquetFilters = { // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it - // up by arg types, and have to instead rely on relative order of ctors - val ctor = classOf[ParquetFilters].getConstructors.head + // up by arg types, and have to instead rely on the number of args based on individual class; + // the ctor order is not guaranteed + val ctor = classOf[ParquetFilters].getConstructors.maxBy(_.getParameterCount) ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) .asInstanceOf[ParquetFilters] } private def createParquetReadSupport(args: Any*): ParquetReadSupport = { // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it - // up by arg types, and have to instead rely on relative order of ctors - val ctor = classOf[ParquetReadSupport].getConstructors.head + // up by arg types, and have to instead rely on the number of args based on individual class; + // the ctor order is not guaranteed + val ctor = classOf[ParquetReadSupport].getConstructors.maxBy(_.getParameterCount) ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) .asInstanceOf[ParquetReadSupport] } private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it - // up by arg types, and have to instead rely on relative order of ctors - // NOTE: VectorizedParquetRecordReader has 2 ctors and the one we need is 2nd on the array - // This is a hacky workaround for the fixed version of Class. - val ctor = classOf[VectorizedParquetRecordReader].getConstructors.last + // up by arg types, and have to instead rely on the number of args based on individual class; + // the ctor order is not guaranteed + val ctor = classOf[VectorizedParquetRecordReader].getConstructors.maxBy(_.getParameterCount) ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) .asInstanceOf[VectorizedParquetRecordReader] } From 92dac49724ee148dcadbfadd6e4e637d9c483521 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Apr 2022 13:00:09 -0700 Subject: [PATCH 13/15] Fixing assumption that HoodieParquetFileFormat could only be used w/ Hudi tables --- .../datasources/parquet/Spark32HoodieParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index ec522fb31b2d5..85f89181376c8 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -149,8 +149,8 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) - val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val fileSchema = if (shouldUseInternalSchema) { + val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) } else { From ed24c34019a3f5d36606c87f979b0fc26b5f31be Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Apr 2022 13:00:23 -0700 Subject: [PATCH 14/15] Hardening the code --- .../Spark32HoodieParquetFileFormat.scala | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index 85f89181376c8..7135f19e95e2d 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -25,9 +25,9 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -220,13 +220,17 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo // Clone new conf val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) - var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap() - if (shouldUseInternalSchema) { + val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) - typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) + + SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + } else { + new java.util.HashMap() } + val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopAttemptConf, attemptId) @@ -395,6 +399,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo object Spark32HoodieParquetFileFormat { + /** + * NOTE: This method is specific to Spark 3.2.0 + */ private def createParquetFilters(args: Any*): ParquetFilters = { // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class; @@ -404,6 +411,9 @@ object Spark32HoodieParquetFileFormat { .asInstanceOf[ParquetFilters] } + /** + * NOTE: This method is specific to Spark 3.2.0 + */ private def createParquetReadSupport(args: Any*): ParquetReadSupport = { // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class; @@ -413,6 +423,9 @@ object Spark32HoodieParquetFileFormat { .asInstanceOf[ParquetReadSupport] } + /** + * NOTE: This method is specific to Spark 3.2.0 + */ private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class; From c03a0c5c5c3b0072888b6ca80ca6d6f84e508a99 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 21 Apr 2022 13:01:08 -0700 Subject: [PATCH 15/15] Replicating to Spark 3.1 --- .../parquet/Spark31HoodieParquetFileFormat.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala index 30e7b9c78a4c1..e99850bef06b8 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala @@ -154,8 +154,8 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) - val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val fileSchema = if (shouldUseInternalSchema) { + val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) } else { @@ -223,13 +223,17 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo // Clone new conf val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) - var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap() - if (shouldUseInternalSchema) { + var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) - typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) + + SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + } else { + new java.util.HashMap() } + val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)