Skip to content

Commit 08b5e4f

Browse files
author
Alexey Kudinkin
committed
Handle incompatibilities b/w Spark 3.2.0 and 3.2.1 in Spark32HoodieParquetFileFormat
1 parent 6d6c9a0 commit 08b5e4f

File tree

2 files changed

+160
-31
lines changed

2 files changed

+160
-31
lines changed

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
5353

5454
def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1")
5555

56+
def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
57+
58+
def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
59+
5660
def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")
5761

5862
def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2"
5963

60-
def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"
61-
62-
def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"
64+
def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1"
6365

6466
def getMetaSchema: StructType = {
6567
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {

hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala

Lines changed: 155 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.Path
2222
import org.apache.hadoop.mapred.FileSplit
2323
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
2424
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
25+
import org.apache.hudi.HoodieSparkUtils
2526
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
2627
import org.apache.hudi.common.fs.FSUtils
27-
import org.apache.hudi.common.util.InternalSchemaCache
2828
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
2929
import org.apache.hudi.common.util.collection.Pair
30+
import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils}
3031
import org.apache.hudi.internal.schema.InternalSchema
3132
import org.apache.hudi.internal.schema.action.InternalSchemaMerger
3233
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
@@ -35,17 +36,18 @@ import org.apache.parquet.filter2.predicate.FilterApi
3536
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
3637
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
3738
import org.apache.spark.TaskContext
38-
import org.apache.spark.sql.SparkSession
3939
import org.apache.spark.sql.catalyst.InternalRow
40-
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
4140
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
41+
import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
4242
import org.apache.spark.sql.catalyst.util.DateTimeUtils
43-
import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet}
43+
import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._
4444
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator}
4545
import org.apache.spark.sql.internal.SQLConf
46+
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
4647
import org.apache.spark.sql.sources._
4748
import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType}
48-
import org.apache.spark.util.SerializableConfiguration
49+
import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
50+
import org.apache.spark.util.{SerializableConfiguration, Utils}
4951

5052
import java.net.URI
5153

@@ -158,21 +160,38 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
158160

159161
lazy val footerFileMetaData =
160162
ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
161-
val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
162-
footerFileMetaData.getKeyValueMetaData.get,
163-
datetimeRebaseModeInRead)
164163
// Try to push down filters when filter push-down is enabled.
165164
val pushed = if (enableParquetFilterPushDown) {
166165
val parquetSchema = footerFileMetaData.getSchema
167-
val parquetFilters = new ParquetFilters(
168-
parquetSchema,
169-
pushDownDate,
170-
pushDownTimestamp,
171-
pushDownDecimal,
172-
pushDownStringStartWith,
173-
pushDownInFilterThreshold,
174-
isCaseSensitive,
175-
datetimeRebaseSpec)
166+
val parquetFilters = if (HoodieSparkUtils.gteqSpark3_2_1) {
167+
// NOTE: Below code could only be compiled against >= Spark 3.2.1,
168+
// and unfortunately won't compile against Spark 3.2.0
169+
// However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
170+
val datetimeRebaseSpec =
171+
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
172+
new ParquetFilters(
173+
parquetSchema,
174+
pushDownDate,
175+
pushDownTimestamp,
176+
pushDownDecimal,
177+
pushDownStringStartWith,
178+
pushDownInFilterThreshold,
179+
isCaseSensitive,
180+
datetimeRebaseSpec)
181+
} else {
182+
// Spark 3.2.0
183+
val datetimeRebaseMode =
184+
Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
185+
createParquetFilters(
186+
parquetSchema,
187+
pushDownDate,
188+
pushDownTimestamp,
189+
pushDownDecimal,
190+
pushDownStringStartWith,
191+
pushDownInFilterThreshold,
192+
isCaseSensitive,
193+
datetimeRebaseMode)
194+
}
176195
filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null)))
177196
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
178197
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
@@ -198,10 +217,6 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
198217
None
199218
}
200219

201-
val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
202-
footerFileMetaData.getKeyValueMetaData.get,
203-
int96RebaseModeInRead)
204-
205220
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
206221

207222
// Clone new conf
@@ -225,6 +240,10 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
225240
if (enableVectorizedReader) {
226241
val vectorizedReader =
227242
if (shouldUseInternalSchema) {
243+
val int96RebaseSpec =
244+
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
245+
val datetimeRebaseSpec =
246+
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
228247
new Spark32HoodieVectorizedParquetRecordReader(
229248
convertTz.orNull,
230249
datetimeRebaseSpec.mode.toString,
@@ -234,7 +253,14 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
234253
enableOffHeapColumnVector && taskContext.isDefined,
235254
capacity,
236255
typeChangeInfos)
237-
} else {
256+
} else if (HoodieSparkUtils.gteqSpark3_2_1) {
257+
// NOTE: Below code could only be compiled against >= Spark 3.2.1,
258+
// and unfortunately won't compile against Spark 3.2.0
259+
// However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
260+
val int96RebaseSpec =
261+
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
262+
val datetimeRebaseSpec =
263+
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
238264
new VectorizedParquetRecordReader(
239265
convertTz.orNull,
240266
datetimeRebaseSpec.mode.toString,
@@ -243,7 +269,20 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
243269
int96RebaseSpec.timeZone,
244270
enableOffHeapColumnVector && taskContext.isDefined,
245271
capacity)
272+
} else {
273+
// Spark 3.2.0
274+
val datetimeRebaseMode =
275+
Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
276+
val int96RebaseMode =
277+
Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
278+
createVectorizedParquetRecordReader(
279+
convertTz.orNull,
280+
datetimeRebaseMode.toString,
281+
int96RebaseMode.toString,
282+
enableOffHeapColumnVector && taskContext.isDefined,
283+
capacity)
246284
}
285+
247286
// SPARK-37089: We cannot register a task completion listener to close this iterator here
248287
// because downstream exec nodes have already registered their listeners. Since listeners
249288
// are executed in reverse order of registration, a listener registered here would close the
@@ -279,12 +318,32 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
279318
}
280319
} else {
281320
logDebug(s"Falling back to parquet-mr")
282-
// ParquetRecordReader returns InternalRow
283-
val readSupport = new ParquetReadSupport(
284-
convertTz,
285-
enableVectorizedReader = false,
286-
datetimeRebaseSpec,
287-
int96RebaseSpec)
321+
val readSupport = if (HoodieSparkUtils.gteqSpark3_2_1) {
322+
// ParquetRecordReader returns InternalRow
323+
// NOTE: Below code could only be compiled against >= Spark 3.2.1,
324+
// and unfortunately won't compile against Spark 3.2.0
325+
// However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1
326+
val int96RebaseSpec =
327+
DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
328+
val datetimeRebaseSpec =
329+
DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
330+
new ParquetReadSupport(
331+
convertTz,
332+
enableVectorizedReader = false,
333+
datetimeRebaseSpec,
334+
int96RebaseSpec)
335+
} else {
336+
val datetimeRebaseMode =
337+
Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead)
338+
val int96RebaseMode =
339+
Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
340+
createParquetReadSupport(
341+
convertTz,
342+
/* enableVectorizedReader = */ false,
343+
datetimeRebaseMode,
344+
int96RebaseMode)
345+
}
346+
288347
val reader = if (pushed.isDefined && enableRecordFilter) {
289348
val parquetFilter = FilterCompat.get(pushed.get, null)
290349
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
@@ -332,10 +391,78 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo
332391
}
333392
}
334393
}
394+
335395
}
336396

337397
object Spark32HoodieParquetFileFormat {
338398

399+
private val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters"
400+
private val PARQUET_VECTORIZED_READER_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader"
401+
private val PARQUET_READ_SUPPORT_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport"
402+
403+
private def createParquetFilters(args: Any*): ParquetFilters = {
404+
val parquetFiltersInstance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
405+
parquetFiltersInstance.asInstanceOf[ParquetFilters]
406+
}
407+
408+
private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = {
409+
val vectorizedRecordReader =
410+
ReflectionUtils.loadClass(PARQUET_VECTORIZED_READER_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
411+
vectorizedRecordReader.asInstanceOf[VectorizedParquetRecordReader]
412+
}
413+
414+
private def createParquetReadSupport(args: Any*): ParquetReadSupport = {
415+
val parquetReadSupport =
416+
ReflectionUtils.loadClass(PARQUET_READ_SUPPORT_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*)
417+
parquetReadSupport.asInstanceOf[ParquetReadSupport]
418+
}
419+
420+
// TODO scala-doc
421+
// Spark 3.2.0
422+
// scalastyle:off
423+
def int96RebaseMode(lookupFileMeta: String => String,
424+
modeByConfig: String): LegacyBehaviorPolicy.Value = {
425+
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
426+
return LegacyBehaviorPolicy.CORRECTED
427+
}
428+
// If there is no version, we return the mode specified by the config.
429+
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
430+
// Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to
431+
// rebase the INT96 timestamp values.
432+
// Files written by Spark 3.1 and latter may also need the rebase if they were written with
433+
// the "LEGACY" rebase mode.
434+
if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) {
435+
LegacyBehaviorPolicy.LEGACY
436+
} else {
437+
LegacyBehaviorPolicy.CORRECTED
438+
}
439+
}.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
440+
}
441+
// scalastyle:on
442+
443+
// TODO scala-doc
444+
// Spark 3.2.0
445+
// scalastyle:off
446+
def datetimeRebaseMode(lookupFileMeta: String => String,
447+
modeByConfig: String): LegacyBehaviorPolicy.Value = {
448+
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
449+
return LegacyBehaviorPolicy.CORRECTED
450+
}
451+
// If there is no version, we return the mode specified by the config.
452+
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
453+
// Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
454+
// rebase the datetime values.
455+
// Files written by Spark 3.0 and latter may also need the rebase if they were written with
456+
// the "LEGACY" rebase mode.
457+
if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) {
458+
LegacyBehaviorPolicy.LEGACY
459+
} else {
460+
LegacyBehaviorPolicy.CORRECTED
461+
}
462+
}.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
463+
}
464+
// scalastyle:on
465+
339466
def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = {
340467
val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr)
341468
if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {

0 commit comments

Comments
 (0)