Skip to content

Commit bcc62e5

Browse files
committed
[HUDI-2811] Support Spark 3.2 and Parquet 1.12.x
1 parent 734c9f5 commit bcc62e5

28 files changed

+567
-156
lines changed

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.datasources
2020
import java.util.TimeZone
2121

2222
import org.apache.hadoop.fs.Path
23-
import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues
23+
24+
import org.apache.spark.sql.catalyst.InternalRow
2425
import org.apache.spark.sql.types.DataType
2526

2627
trait SparkParsePartitionUtil extends Serializable {
@@ -30,5 +31,5 @@ trait SparkParsePartitionUtil extends Serializable {
3031
typeInference: Boolean,
3132
basePaths: Set[Path],
3233
userSpecifiedDataTypes: Map[String, DataType],
33-
timeZone: TimeZone): Option[PartitionValues]
34+
timeZone: TimeZone): InternalRow
3435
}

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -554,14 +554,10 @@ case class HoodieFileIndex(
554554
}.mkString("/")
555555
val pathWithPartitionName = new Path(basePath, partitionWithName)
556556
val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap
557-
val partitionValues = sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
557+
558+
sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
558559
typeInference = false, Set(new Path(basePath)), partitionDataTypes,
559560
DateTimeUtils.getTimeZone(timeZoneId))
560-
561-
// Convert partitionValues to InternalRow
562-
partitionValues.map(_.literals.map(_.value))
563-
.map(InternalRow.fromSeq)
564-
.getOrElse(InternalRow.empty)
565561
}
566562
}
567563
PartitionRowPath(partitionRow, partitionPath)

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,25 @@
1818
package org.apache.spark.sql.avro
1919

2020
import org.apache.avro.Schema
21+
2122
import org.apache.spark.sql.types.DataType
2223

2324
/**
2425
* This is to be compatible with the type returned by Spark 3.1
2526
* and other spark versions for AvroDeserializer
2627
*/
27-
case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType)
28-
extends AvroDeserializer(rootAvroType, rootCatalystType) {
28+
case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
29+
30+
private val avroDeserializer = if (org.apache.spark.SPARK_VERSION.startsWith("3.2")) {
31+
val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType], classOf[String])
32+
constructor.newInstance(rootAvroType, rootCatalystType, "EXCEPTION")
33+
} else {
34+
val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType])
35+
constructor.newInstance(rootAvroType, rootCatalystType)
36+
}
2937

3038
def deserializeData(data: Any): Any = {
31-
super.deserialize(data) match {
39+
avroDeserializer.deserialize(data) match {
3240
case Some(r) => r // spark 3.1 return type is Option, we fetch the data.
3341
case o => o // for other spark version, return the data directly.
3442
}

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,37 @@ import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
2222
case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, instantTimestamp: Option[Long])
2323
extends Command {
2424
override def children: Seq[LogicalPlan] = Seq(table)
25+
26+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionTable = {
27+
copy(table = newChildren.head)
28+
}
2529
}
2630

2731
case class CompactionPath(path: String, operation: CompactionOperation, instantTimestamp: Option[Long])
28-
extends Command
32+
extends Command {
33+
override def children: Seq[LogicalPlan] = Seq.empty
34+
35+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionPath = {
36+
this
37+
}
38+
}
2939

3040
case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20)
3141
extends Command {
3242
override def children: Seq[LogicalPlan] = Seq(table)
43+
44+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionShowOnTable = {
45+
copy(table = newChildren.head)
46+
}
3347
}
3448

35-
case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command
49+
case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command {
50+
override def children: Seq[LogicalPlan] = Seq.empty
51+
52+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionShowOnPath = {
53+
this
54+
}
55+
}
3656

3757
object CompactionOperation extends Enumeration {
3858
type CompactionOperation = Value

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,14 @@ import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
2121
import org.apache.hudi.SparkAdapterSupport
2222

2323
import scala.collection.JavaConverters._
24+
2425
import org.apache.hudi.common.model.HoodieRecord
2526
import org.apache.hudi.common.table.HoodieTableMetaClient
27+
2628
import org.apache.spark.sql.{AnalysisException, SparkSession}
2729
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
28-
import org.apache.spark.sql.catalyst.expressions.AttributeReference
30+
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression}
2931
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
30-
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, NamedExpression}
3132
import org.apache.spark.sql.catalyst.plans.Inner
3233
import org.apache.spark.sql.catalyst.plans.logical._
3334
import org.apache.spark.sql.catalyst.rules.Rule
@@ -180,11 +181,19 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
180181
.map { case (targetAttr, sourceAttr) => Assignment(targetAttr, sourceAttr) }
181182
}
182183
} else {
183-
assignments.map(assignment => {
184+
// For Spark3.2, InsertStarAction/UpdateStarAction's assignments will contain the meta fields.
185+
val withoutMetaAttrs = assignments.filterNot{ assignment =>
186+
if (assignment.key.isInstanceOf[Attribute]) {
187+
HoodieSqlUtils.isMetaField(assignment.key.asInstanceOf[Attribute].name)
188+
} else {
189+
false
190+
}
191+
}
192+
withoutMetaAttrs.map { assignment =>
184193
val resolvedKey = resolveExpressionFrom(target)(assignment.key)
185194
val resolvedValue = resolveExpressionFrom(resolvedSource, Some(target))(assignment.value)
186195
Assignment(resolvedKey, resolvedValue)
187-
})
196+
}
188197
}
189198
(resolvedCondition, resolvedAssignments)
190199
}
@@ -244,13 +253,19 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
244253
case DeleteAction(condition) =>
245254
val resolvedCondition = condition.map(resolveExpressionFrom(resolvedSource)(_))
246255
DeleteAction(resolvedCondition)
256+
case action: MergeAction =>
257+
// ForSpark3.2, it's UpdateStarAction
258+
UpdateAction(action.condition, Seq.empty)
247259
}
248260
// Resolve the notMatchedActions
249261
val resolvedNotMatchedActions = notMatchedActions.map {
250262
case InsertAction(condition, assignments) =>
251263
val (resolvedCondition, resolvedAssignments) =
252264
resolveConditionAssignments(condition, assignments)
253265
InsertAction(resolvedCondition, resolvedAssignments)
266+
case action: MergeAction =>
267+
// ForSpark3.2, it's InsertStarAction
268+
InsertAction(action.condition, Seq.empty)
254269
}
255270
// Return the resolved MergeIntoTable
256271
MergeIntoTable(target, resolvedSource, resolvedMergeCondition,
@@ -424,9 +439,9 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
424439
case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
425440
if isHoodieTable(tableName, sparkSession) =>
426441
AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
427-
case ShowPartitionsCommand(tableName, specOpt)
428-
if isHoodieTable(tableName, sparkSession) =>
429-
ShowHoodieTablePartitionsCommand(tableName, specOpt)
442+
case s: ShowPartitionsCommand
443+
if isHoodieTable(s.tableName, sparkSession) =>
444+
ShowHoodieTablePartitionsCommand(s.tableName, s.spec)
430445
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand
431446
case TruncateTableCommand(tableName, partitionSpec)
432447
if isHoodieTable(tableName, sparkSession) =>

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.api.java.JavaSparkContext
3131
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
3232
import org.apache.spark.sql.catalyst.TableIdentifier
3333
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
34+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3435
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
3536
import org.apache.spark.sql.types.{StructField, StructType}
3637
import org.apache.spark.sql.util.SchemaUtils
@@ -46,6 +47,10 @@ case class AlterHoodieTableAddColumnsCommand(
4647
colsToAdd: Seq[StructField])
4748
extends RunnableCommand {
4849

50+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): AlterHoodieTableAddColumnsCommand = {
51+
this
52+
}
53+
4954
override def run(sparkSession: SparkSession): Seq[Row] = {
5055
if (colsToAdd.nonEmpty) {
5156
val resolver = sparkSession.sessionState.conf.resolver
@@ -67,14 +72,13 @@ case class AlterHoodieTableAddColumnsCommand(
6772
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession)
6873

6974
// Refresh the new schema to meta
70-
val newDataSchema = StructType(hoodieCatalogTable.dataSchema.fields ++ colsToAdd)
71-
refreshSchemaInMeta(sparkSession, hoodieCatalogTable.table, newDataSchema)
75+
refreshSchemaInMeta(sparkSession, hoodieCatalogTable.table, newSchema)
7276
}
7377
Seq.empty[Row]
7478
}
7579

7680
private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
77-
newSqlSchema: StructType): Unit = {
81+
newSchema: Schema): Unit = {
7882
try {
7983
sparkSession.catalog.uncacheTable(tableId.quotedString)
8084
} catch {
@@ -83,13 +87,16 @@ case class AlterHoodieTableAddColumnsCommand(
8387
}
8488
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
8589

90+
val newSqlSchema = AvroConversionUtils.convertAvroSchemaToStructType(newSchema)
91+
val newTable = table.copy(schema = newSqlSchema)
8692
SchemaUtils.checkColumnNameDuplication(
87-
newSqlSchema.map(_.name),
93+
newTable.dataSchema.map(_.name),
8894
"in the table definition of " + table.identifier,
8995
conf.caseSensitiveAnalysis)
90-
DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))
9196

92-
sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema)
97+
DDLUtils.checkDataColNames(newTable)
98+
99+
sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newTable.dataSchema)
93100
}
94101
}
95102

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieException
2727
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2828
import org.apache.spark.sql.catalyst.TableIdentifier
2929
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
30+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3031
import org.apache.spark.sql.execution.command.RunnableCommand
3132
import org.apache.spark.sql.types.{StructField, StructType}
3233

@@ -41,6 +42,10 @@ case class AlterHoodieTableChangeColumnCommand(
4142
newColumn: StructField)
4243
extends RunnableCommand {
4344

45+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): AlterHoodieTableChangeColumnCommand = {
46+
this
47+
}
48+
4449
override def run(sparkSession: SparkSession): Seq[Row] = {
4550
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
4651

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2727
import org.apache.spark.sql.catalyst.analysis.Resolver
2828
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2929
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
30+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3031
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
3132
import org.apache.spark.sql.hudi.HoodieSqlUtils._
3233

@@ -35,6 +36,10 @@ case class AlterHoodieTableDropPartitionCommand(
3536
specs: Seq[TablePartitionSpec])
3637
extends RunnableCommand {
3738

39+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): AlterHoodieTableDropPartitionCommand = {
40+
this
41+
}
42+
3843
override def run(sparkSession: SparkSession): Seq[Row] = {
3944
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
4045
DDLUtils.verifyAlterTableType(

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli
2424
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
2525
import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
2626
import org.apache.hudi.exception.HoodieException
27+
2728
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
2829
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
29-
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation
30+
import org.apache.spark.sql.catalyst.plans.logical.{CompactionOperation, LogicalPlan}
3031
import org.apache.spark.sql.{Row, SparkSession}
3132
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
3233
import org.apache.spark.sql.execution.command.RunnableCommand
@@ -40,6 +41,10 @@ case class CompactionHoodiePathCommand(path: String,
4041
operation: CompactionOperation, instantTimestamp: Option[Long] = None)
4142
extends RunnableCommand {
4243

44+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionHoodiePathCommand = {
45+
this
46+
}
47+
4348
override def run(sparkSession: SparkSession): Seq[Row] = {
4449
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
4550
.setConf(sparkSession.sessionState.newHadoopConf()).build()

hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.{Row, SparkSession}
2121
import org.apache.spark.sql.catalyst.catalog.CatalogTable
2222
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2323
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
24+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2425
import org.apache.spark.sql.execution.command.RunnableCommand
2526
import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation
2627
import org.apache.spark.sql.types.StringType
@@ -29,6 +30,10 @@ case class CompactionHoodieTableCommand(table: CatalogTable,
2930
operation: CompactionOperation, instantTimestamp: Option[Long])
3031
extends RunnableCommand {
3132

33+
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionHoodieTableCommand = {
34+
this
35+
}
36+
3237
override def run(sparkSession: SparkSession): Seq[Row] = {
3338
val basePath = getTableLocation(table, sparkSession)
3439
CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession)

0 commit comments

Comments
 (0)