diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index f1f178e1716b9..fc79fefd20658 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -24,6 +24,10 @@ jobs: spark: "spark3,spark3.0.x" - scala: "scala-2.12" spark: "spark3,spark3.0.x,spark-shade-unbundle-avro" + - scala: "scala-2.12" + spark: "spark3,spark3.1.x" + - scala: "scala-2.12" + spark: "spark3,spark3.1.x,spark-shade-unbundle-avro" - scala: "scala-2.12" spark: "spark3" - scala: "scala-2.12" diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index a791edfdf6183..3e5402565c151 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -47,8 +47,14 @@ import scala.collection.JavaConverters.asScalaBufferConverter object HoodieSparkUtils extends SparkAdapterSupport { + def isSpark2: Boolean = SPARK_VERSION.startsWith("2.") + def isSpark3: Boolean = SPARK_VERSION.startsWith("3.") + def isSpark3_0: Boolean = SPARK_VERSION.startsWith("3.0") + + def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2") + def getMetaSchema: StructType = { StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { StructField(col, StringType, nullable = true) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala index fc2275bab1392..626b3c6ef0d4e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.datasources import java.util.TimeZone import org.apache.hadoop.fs.Path -import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues + +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.DataType trait SparkParsePartitionUtil extends Serializable { @@ -30,5 +31,5 @@ trait SparkParsePartitionUtil extends Serializable { typeInference: Boolean, basePaths: Set[Path], userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): Option[PartitionValues] + timeZone: TimeZone): InternalRow } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 0ed1b4852838f..698d51ae78056 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -569,14 +569,10 @@ case class HoodieFileIndex( }.mkString("/") val pathWithPartitionName = new Path(basePath, partitionWithName) val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap - val partitionValues = sparkParsePartitionUtil.parsePartition(pathWithPartitionName, + + sparkParsePartitionUtil.parsePartition(pathWithPartitionName, typeInference = false, Set(new Path(basePath)), partitionDataTypes, DateTimeUtils.getTimeZone(timeZoneId)) - - // Convert partitionValues to InternalRow - partitionValues.map(_.literals.map(_.value)) - .map(InternalRow.fromSeq) - .getOrElse(InternalRow.empty) } } PartitionRowPath(partitionRow, partitionPath) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 9e1fd43a27c29..d35cd49a482f4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -48,7 +48,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} -import org.apache.spark.{SPARK_VERSION, SparkContext} +import org.apache.spark.SparkContext + import java.util.Properties import scala.collection.JavaConversions._ @@ -463,13 +464,13 @@ object HoodieSparkSqlWriter { } else { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df) } - if (SPARK_VERSION.startsWith("2.")) { + if (HoodieSparkUtils.isSpark2) { hoodieDF.write.format("org.apache.hudi.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) .options(params) .mode(SaveMode.Append) .save() - } else if (SPARK_VERSION.startsWith("3.")) { + } else if(HoodieSparkUtils.isSpark3) { hoodieDF.write.format("org.apache.hudi.spark3.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) .option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala index cab1e543ffc13..1678dc05da4f8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala @@ -18,18 +18,30 @@ package org.apache.spark.sql.avro import org.apache.avro.Schema + +import org.apache.hudi.HoodieSparkUtils + import org.apache.spark.sql.types.DataType /** * This is to be compatible with the type returned by Spark 3.1 * and other spark versions for AvroDeserializer */ -case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) - extends AvroDeserializer(rootAvroType, rootCatalystType) { +case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { + + private val avroDeserializer = if (HoodieSparkUtils.isSpark3_2) { + // SPARK-34404: As of Spark3.2, there is no AvroDeserializer's constructor with Schema and DataType arguments. + // So use the reflection to get AvroDeserializer instance. + val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType], classOf[String]) + constructor.newInstance(rootAvroType, rootCatalystType, "EXCEPTION") + } else { + val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType]) + constructor.newInstance(rootAvroType, rootCatalystType) + } def deserializeData(data: Any): Any = { - super.deserialize(data) match { - case Some(r) => r // spark 3.1 return type is Option, we fetch the data. + avroDeserializer.deserialize(data) match { + case Some(r) => r // As of spark 3.1, this will return data wrapped with Option, so we fetch the data. case o => o // for other spark version, return the data directly. } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala index 8d4ef7a016d8d..f445e7c8a0dab 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Compaction.scala @@ -22,17 +22,37 @@ import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, instantTimestamp: Option[Long]) extends Command { override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionTable = { + copy(table = newChildren.head) + } } case class CompactionPath(path: String, operation: CompactionOperation, instantTimestamp: Option[Long]) - extends Command + extends Command { + override def children: Seq[LogicalPlan] = Seq.empty + + def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionPath = { + this + } +} case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20) extends Command { override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionShowOnTable = { + copy(table = newChildren.head) + } } -case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command +case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command { + override def children: Seq[LogicalPlan] = Seq.empty + + def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionShowOnPath = { + this + } +} object CompactionOperation extends Enumeration { type CompactionOperation = Value diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/trees/HoodieLeafLike.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/trees/HoodieLeafLike.scala new file mode 100644 index 0000000000000..bde1ba29e8b63 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/trees/HoodieLeafLike.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.trees + +/** + * Similar to `LeafLike` in Spark3.2. + */ +trait HoodieLeafLike[T <: TreeNode[T]] { self: TreeNode[T] => + + override final def children: Seq[T] = Nil + + override final def mapChildren(f: T => T): T = this.asInstanceOf[T] + + final def withNewChildrenInternal(newChildren: IndexedSeq[T]): T = this.asInstanceOf[T] +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index aa622e14ec7dd..3b6436ee2e60a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} -import org.apache.spark.SPARK_VERSION + import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -282,8 +282,6 @@ object HoodieSqlUtils extends SparkAdapterSupport { .filterKeys(_.startsWith("hoodie.")) } - def isSpark3: Boolean = SPARK_VERSION.startsWith("3.") - def isEnableHive(sparkSession: SparkSession): Boolean = "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 09164f1822fc7..92f73d503938e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.hudi.analysis +import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport} import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL -import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient + import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule @@ -137,7 +138,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi // We can do this because under the normal case, we should not allow to update or set // the hoodie's meta field in sql statement, it is a system field, cannot set the value // by user. - if (HoodieSqlUtils.isSpark3) { + if (HoodieSparkUtils.isSpark3) { val assignmentFieldNames = assignments.map(_.key).map { case attr: AttributeReference => attr.name @@ -178,11 +179,19 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi .map { case (targetAttr, sourceAttr) => Assignment(targetAttr, sourceAttr) } } } else { - assignments.map(assignment => { + // For Spark3.2, InsertStarAction/UpdateStarAction's assignments will contain the meta fields. + val withoutMetaAttrs = assignments.filterNot{ assignment => + if (assignment.key.isInstanceOf[Attribute]) { + HoodieSqlUtils.isMetaField(assignment.key.asInstanceOf[Attribute].name) + } else { + false + } + } + withoutMetaAttrs.map { assignment => val resolvedKey = resolveExpressionFrom(target)(assignment.key) val resolvedValue = resolveExpressionFrom(resolvedSource, Some(target))(assignment.value) Assignment(resolvedKey, resolvedValue) - }) + } } (resolvedCondition, resolvedAssignments) } @@ -242,6 +251,10 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi case DeleteAction(condition) => val resolvedCondition = condition.map(resolveExpressionFrom(resolvedSource)(_)) DeleteAction(resolvedCondition) + case action: MergeAction => + // SPARK-34962: use UpdateStarAction as the explicit representation of * in UpdateAction. + // So match and covert this in Spark3.2 env. + UpdateAction(action.condition, Seq.empty) } // Resolve the notMatchedActions val resolvedNotMatchedActions = notMatchedActions.map { @@ -249,6 +262,10 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi val (resolvedCondition, resolvedAssignments) = resolveConditionAssignments(condition, assignments) InsertAction(resolvedCondition, resolvedAssignments) + case action: MergeAction => + // SPARK-34962: use InsertStarAction as the explicit representation of * in InsertAction. + // So match and covert this in Spark3.2 env. + InsertAction(action.condition, Seq.empty) } // Return the resolved MergeIntoTable MergeIntoTable(target, resolvedSource, resolvedMergeCondition, @@ -426,9 +443,11 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic case AlterTableChangeColumnCommand(tableName, columnName, newColumn) if isHoodieTable(tableName, sparkSession) => AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn) - case ShowPartitionsCommand(tableName, specOpt) - if isHoodieTable(tableName, sparkSession) => - ShowHoodieTablePartitionsCommand(tableName, specOpt) + // SPARK-34238: the definition of ShowPartitionsCommand has been changed in Spark3.2. + // Match the class type instead of call the `unapply` method. + case s: ShowPartitionsCommand + if isHoodieTable(s.tableName, sparkSession) => + ShowHoodieTablePartitionsCommand(s.tableName, s.spec) // Rewrite TruncateTableCommand to TruncateHoodieTableCommand case TruncateTableCommand(tableName, partitionSpec) if isHoodieTable(tableName, sparkSession) => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index a2fa1829b852b..c6c08da1e69c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -31,7 +31,7 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} -import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.SchemaUtils @@ -44,7 +44,7 @@ import scala.util.control.NonFatal case class AlterHoodieTableAddColumnsCommand( tableId: TableIdentifier, colsToAdd: Seq[StructField]) - extends RunnableCommand { + extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { if (colsToAdd.nonEmpty) { @@ -74,7 +74,7 @@ case class AlterHoodieTableAddColumnsCommand( } private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable, - newSqlSchema: StructType): Unit = { + newSqlDataSchema: StructType): Unit = { try { sparkSession.catalog.uncacheTable(tableId.quotedString) } catch { @@ -84,12 +84,11 @@ case class AlterHoodieTableAddColumnsCommand( sparkSession.catalog.refreshTable(table.identifier.unquotedString) SchemaUtils.checkColumnNameDuplication( - newSqlSchema.map(_.name), + newSqlDataSchema.map(_.name), "in the table definition of " + table.identifier, conf.caseSensitiveAnalysis) - DDLUtils.checkDataColNames(table, colsToAdd.map(_.name)) - sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema) + sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlDataSchema) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala index 7aa1d83833251..b69c686f99bf8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -27,7 +27,7 @@ import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{StructField, StructType} import scala.util.control.NonFatal @@ -39,7 +39,7 @@ case class AlterHoodieTableChangeColumnCommand( tableIdentifier: TableIdentifier, columnName: String, newColumn: StructField) - extends RunnableCommand { + extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index d14783556868a..21f16275d1321 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -25,11 +25,12 @@ import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} + import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable -import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} @@ -39,7 +40,7 @@ case class AlterHoodieTableDropPartitionCommand( ifExists : Boolean, purge : Boolean, retainData : Boolean) -extends RunnableCommand { +extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index e310da673ed41..be66584778b4e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -24,12 +24,12 @@ import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeli import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} import org.apache.hudi.exception.HoodieException + import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation +import org.apache.spark.sql.catalyst.plans.logical.{CompactionOperation, LogicalPlan} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} -import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.types.StringType @@ -38,7 +38,7 @@ import scala.collection.JavaConverters._ case class CompactionHoodiePathCommand(path: String, operation: CompactionOperation, instantTimestamp: Option[Long] = None) - extends RunnableCommand { + extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val metaClient = HoodieTableMetaClient.builder().setBasePath(path) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala index 631504d514154..27fb6e779a250 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala @@ -21,13 +21,13 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation import org.apache.spark.sql.types.StringType case class CompactionHoodieTableCommand(table: CatalogTable, operation: CompactionOperation, instantTimestamp: Option[Long]) - extends RunnableCommand { + extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val basePath = getTableLocation(table, sparkSession) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala index ca7891c5d69ee..44c57239703f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala @@ -22,14 +22,14 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.util.CompactionUtils import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.types.{IntegerType, StringType} import scala.collection.JavaConverters.asScalaIteratorConverter case class CompactionShowHoodiePathCommand(path: String, limit: Int) - extends RunnableCommand { + extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val metaClient = HoodieTableMetaClient.builder().setBasePath(path.toString) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala index 0702e6bc2449b..7502bf7623aad 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala @@ -20,12 +20,12 @@ package org.apache.spark.sql.hudi.command import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation import org.apache.spark.sql.types.{IntegerType, StringType} case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int) - extends RunnableCommand { + extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val basePath = getTableLocation(table, sparkSession) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index ce6237ec99344..f1a344fb832f5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -41,6 +41,10 @@ case class CreateHoodieTableAsSelectCommand( mode: SaveMode, query: LogicalPlan) extends DataWritingCommand { + def withNewChildInternal(newChild: LogicalPlan): CreateHoodieTableAsSelectCommand = { + this + } + override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index bbbecca8236e9..2608f9383601d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -28,7 +28,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} @@ -46,7 +46,7 @@ import scala.util.control.NonFatal * Command for create hoodie table. */ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean) - extends RunnableCommand with SparkAdapterSupport { + extends HoodieLeafRunnableCommand with SparkAdapterSupport { override def run(sparkSession: SparkSession): Seq[Row] = { val tableIsExists = sparkSession.sessionState.catalog.tableExists(table.identifier) @@ -198,7 +198,7 @@ object CreateHoodieTableCommand { val schemaJsonString = schema.json // Split the JSON string. val parts = schemaJsonString.grouped(threshold).toSeq - properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) + properties.put(DATASOURCE_SCHEMA_PREFIX + "numParts", parts.size.toString) parts.zipWithIndex.foreach { case (part, index) => properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index 8475b631c586d..a77acf066a76d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -25,12 +25,11 @@ import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable -import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan} import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.types.StructType -case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends RunnableCommand +case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends HoodieLeafRunnableCommand with SparkAdapterSupport { private val table = deleteTable.table diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index 4b4426e1622fd..5e4a264a06322 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -18,25 +18,25 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.fs.Path -import org.apache.hudi.SparkAdapterSupport + import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.fs.FSUtils + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} -import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hudi.HoodieSqlUtils.isEnableHive import scala.util.control.NonFatal case class DropHoodieTableCommand( - tableIdentifier: TableIdentifier, - ifExists: Boolean, - isView: Boolean, - purge: Boolean) extends RunnableCommand - with SparkAdapterSupport { + tableIdentifier: TableIdentifier, + ifExists: Boolean, + isView: Boolean, + purge: Boolean) +extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieLeafRunnableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieLeafRunnableCommand.scala new file mode 100644 index 0000000000000..47e884e962d4b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieLeafRunnableCommand.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.HoodieLeafLike +import org.apache.spark.sql.execution.command.RunnableCommand + +/** + * Similar to `LeafRunnableCommand` in Spark3.2, `HoodieLeafRunnableCommand` mixed in + * `HoodieLeafLike` can avoid subclasses of `RunnableCommand` to override + * the `withNewChildrenInternal` method repeatedly. + */ +trait HoodieLeafRunnableCommand extends RunnableCommand with HoodieLeafLike[LogicalPlan] diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 4b6d8e06f7489..61bc5777178c6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -36,7 +36,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.internal.SQLConf @@ -54,7 +53,7 @@ case class InsertIntoHoodieTableCommand( query: LogicalPlan, partition: Map[String, Option[String]], overwrite: Boolean) - extends RunnableCommand { + extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { assert(logicalRelation.catalogTable.isDefined, "Missing catalog table") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 76c87158684f8..2d36c6c31fb3f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ @@ -60,7 +59,7 @@ import java.util.Base64 * ExpressionPayload#getInsertValue. * */ -case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends RunnableCommand +case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends HoodieLeafRunnableCommand with SparkAdapterSupport { private var sparkSession: SparkSession = _ @@ -203,7 +202,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab sourceExpression match { case attr: AttributeReference if sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) => true - case Cast(attr: AttributeReference, _, _) if sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) => true + // SPARK-35857: the definition of Cast has been changed in Spark3.2. + // Match the class type instead of call the `unapply` method. + case cast: Cast => + cast.child match { + case attr: AttributeReference if sourceColumnName.find(resolver(_, attr.name)).get.equals(targetColumnName) => true + case _ => false + } case _=> false } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala index d27ba6acd07a7..6a3eff8b4c356 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types.StringType @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StringType case class ShowHoodieTablePartitionsCommand( tableIdentifier: TableIdentifier, specOpt: Option[TablePartitionSpec]) -extends RunnableCommand { +extends HoodieLeafRunnableCommand { override val output: Seq[Attribute] = { AttributeReference("partition", StringType, nullable = false)() :: Nil diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index 7397b0dad942b..0ff7ffb45203a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -28,15 +28,14 @@ import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable} -import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan, UpdateTable} import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StructField, StructType} import scala.collection.JavaConverters._ -case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCommand +case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeafRunnableCommand with SparkAdapterSupport { private val table = updateTable.table diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala index 3a4494c144adb..9b1b88d34ce18 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala @@ -19,6 +19,7 @@ package org.apache.hudi import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpressions import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpression + import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith} import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType} import org.junit.jupiter.api.Assertions.assertEquals @@ -68,22 +69,36 @@ class TestConvertFilterToCatalystExpression { } private def checkConvertFilter(filter: Filter, expectExpression: String): Unit = { + // [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute, + // AttributeReference and Alias don't quote qualified names properly + val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.isSpark3_2) { + expectExpression.replace("`", "") + } else { + expectExpression + } val exp = convertToCatalystExpression(filter, tableSchema) - if (expectExpression == null) { + if (removeQuotesIfNeed == null) { assertEquals(exp.isEmpty, true) } else { assertEquals(exp.isDefined, true) - assertEquals(expectExpression, exp.get.sql) + assertEquals(removeQuotesIfNeed, exp.get.sql) } } private def checkConvertFilters(filters: Array[Filter], expectExpression: String): Unit = { + // [SPARK-25769][SPARK-34636][SPARK-34626][SQL] sql method in UnresolvedAttribute, + // AttributeReference and Alias don't quote qualified names properly + val removeQuotesIfNeed = if (expectExpression != null && HoodieSparkUtils.isSpark3_2) { + expectExpression.replace("`", "") + } else { + expectExpression + } val exp = convertToCatalystExpressions(filters, tableSchema) - if (expectExpression == null) { + if (removeQuotesIfNeed == null) { assertEquals(exp.isEmpty, true) } else { assertEquals(exp.isDefined, true) - assertEquals(expectExpression, exp.get.sql) + assertEquals(removeQuotesIfNeed, exp.get.sql) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index fa248e46bd721..35f4a61fe1bb2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -293,28 +293,26 @@ class TestHoodieSparkSqlWriter { */ @Test def testDisableAndEnableMetaFields(): Unit = { - try { - testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = false) - //create a new table - val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4") - .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) - .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") - .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name()) - .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true") + testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields = false) + //create a new table + val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4") + .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") + .updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name()) + .updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true") - // generate the inserts - val schema = DataSourceTestUtils.getStructTypeExampleSchema - val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) - val inserts = DataSourceTestUtils.generateRandomRows(1000) - val df = spark.createDataFrame(sc.parallelize(inserts), structType) - try { - // write to Hudi - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df) - fail("Should have thrown exception") - } catch { - case e: HoodieException => assertTrue(e.getMessage.startsWith("Config conflict")) - case e: Exception => fail(e); - } + // generate the inserts + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val inserts = DataSourceTestUtils.generateRandomRows(1000) + val df = spark.createDataFrame(sc.parallelize(inserts), structType) + try { + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df) + fail("Should have thrown exception") + } catch { + case e: HoodieException => assertTrue(e.getMessage.startsWith("Config conflict")) + case e: Exception => fail(e); } } @@ -711,51 +709,49 @@ class TestHoodieSparkSqlWriter { DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test") - try { - val df = spark.range(0, 1000).toDF("keyid") - .withColumn("col3", expr("keyid")) - .withColumn("age", lit(1)) - .withColumn("p", lit(2)) - - df.write.format("hudi") - .options(options) - .option(DataSourceWriteOptions.OPERATION.key, "insert") - .option("hoodie.insert.shuffle.parallelism", "4") - .mode(SaveMode.Overwrite).save(tempBasePath) - - df.write.format("hudi") - .options(options) - .option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table") - .option("hoodie.insert.shuffle.parallelism", "4") - .mode(SaveMode.Append).save(tempBasePath) - - val currentCommits = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) - val incrementalKeyIdNum = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") - .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0)) - .load(tempBasePath).select("keyid").orderBy("keyid").count - assert(incrementalKeyIdNum == 1000) - - df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath) - spark.emptyDataFrame.write.format("hudi") - .options(options) - .option(HoodieBootstrapConfig.BASE_PATH.key, baseBootStrapPath) - .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getCanonicalName) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) - .option(HoodieBootstrapConfig.PARALLELISM_VALUE.key, "4") - .mode(SaveMode.Overwrite).save(tempBasePath) - df.write.format("hudi").options(options) - .option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table") - .option("hoodie.insert.shuffle.parallelism", "4").mode(SaveMode.Append).save(tempBasePath) - val currentCommitsBootstrap = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) - val incrementalKeyIdNumBootstrap = spark.read.format("hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") - .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0)) - .load(tempBasePath).select("keyid").orderBy("keyid").count - assert(incrementalKeyIdNumBootstrap == 1000) - } + val df = spark.range(0, 1000).toDF("keyid") + .withColumn("col3", expr("keyid")) + .withColumn("age", lit(1)) + .withColumn("p", lit(2)) + + df.write.format("hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION.key, "insert") + .option("hoodie.insert.shuffle.parallelism", "4") + .mode(SaveMode.Overwrite).save(tempBasePath) + + df.write.format("hudi") + .options(options) + .option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table") + .option("hoodie.insert.shuffle.parallelism", "4") + .mode(SaveMode.Append).save(tempBasePath) + + val currentCommits = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) + val incrementalKeyIdNum = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") + .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommits(0)) + .load(tempBasePath).select("keyid").orderBy("keyid").count + assert(incrementalKeyIdNum == 1000) + + df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath) + spark.emptyDataFrame.write.format("hudi") + .options(options) + .option(HoodieBootstrapConfig.BASE_PATH.key, baseBootStrapPath) + .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getCanonicalName) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) + .option(HoodieBootstrapConfig.PARALLELISM_VALUE.key, "4") + .mode(SaveMode.Overwrite).save(tempBasePath) + df.write.format("hudi").options(options) + .option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table") + .option("hoodie.insert.shuffle.parallelism", "4").mode(SaveMode.Append).save(tempBasePath) + val currentCommitsBootstrap = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0)) + val incrementalKeyIdNumBootstrap = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000") + .option(DataSourceReadOptions.END_INSTANTTIME.key, currentCommitsBootstrap(0)) + .load(tempBasePath).select("keyid").orderBy("keyid").count + assert(incrementalKeyIdNumBootstrap == 1000) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index a7abddaa0308f..a5b49cc3683d0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -61,14 +61,18 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { } override protected def test(testName: String, testTags: Tag*)(testFun: => Any /* Assertion */)(implicit pos: source.Position): Unit = { - try super.test(testName, testTags: _*)(try testFun finally { - val catalog = spark.sessionState.catalog - catalog.listDatabases().foreach{db => - catalog.listTables(db).foreach {table => - catalog.dropTable(table, true, true) + super.test(testName, testTags: _*)( + try { + testFun + } finally { + val catalog = spark.sessionState.catalog + catalog.listDatabases().foreach{db => + catalog.listTables(db).foreach {table => + catalog.dropTable(table, true, true) + } } } - }) + ) } protected def generateTableName: String = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 072a1257d4069..5041a543168bf 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.Row @@ -352,7 +353,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { | when not matched and flag = '1' then insert * |""".stripMargin - if (HoodieSqlUtils.isSpark3) { + if (HoodieSparkUtils.isSpark3) { checkExceptionContain(mergeSql)("Columns aliases are not allowed in MERGE") } else { spark.sql(mergeSql) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala index 5bf028408ccec..c3cbcc407587c 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala @@ -16,18 +16,26 @@ */ package org.apache.spark.sql.execution.datasources + import java.util.TimeZone import org.apache.hadoop.fs.Path -import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues -import org.apache.spark.sql.types.DataType + +import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.InternalRow class Spark2ParsePartitionUtil extends SparkParsePartitionUtil { - override def parsePartition(path: Path, typeInference: Boolean, - basePaths: Set[Path], - userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): Option[PartitionValues] = { - PartitioningUtils.parsePartition(path, typeInference, - basePaths, userSpecifiedDataTypes, timeZone)._1 + + override def parsePartition( + path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone): InternalRow = { + val (partitionValues, _) = PartitioningUtils.parsePartition(path, typeInference, + basePaths, userSpecifiedDataTypes, timeZone) + + partitionValues.map(_.literals.map(_.value)).map(InternalRow.fromSeq) + .getOrElse(InternalRow.empty) } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java index c7a70438fc3cd..236fbe933c85f 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java +++ b/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java @@ -17,20 +17,25 @@ package org.apache.hudi.spark3.internal; +import org.apache.hudi.HoodieSparkUtils; import org.apache.spark.sql.catalyst.plans.logical.InsertIntoStatement; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.util.DateFormatter; + import scala.Option; import scala.collection.Seq; import scala.collection.immutable.Map; import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.time.ZoneId; public class ReflectUtil { - public static InsertIntoStatement createInsertInto(boolean isSpark30, LogicalPlan table, Map> partition, Seq userSpecifiedCols, + public static InsertIntoStatement createInsertInto(LogicalPlan table, Map> partition, Seq userSpecifiedCols, LogicalPlan query, boolean overwrite, boolean ifPartitionNotExists) { try { - if (isSpark30) { + if (HoodieSparkUtils.isSpark3_0()) { Constructor constructor = InsertIntoStatement.class.getConstructor( LogicalPlan.class, Map.class, LogicalPlan.class, boolean.class, boolean.class); return constructor.newInstance(table, partition, query, overwrite, ifPartitionNotExists); @@ -43,4 +48,23 @@ public static InsertIntoStatement createInsertInto(boolean isSpark30, LogicalPla throw new RuntimeException("Error in create InsertIntoStatement", e); } } + + public static DateFormatter getDateFormatter(ZoneId zoneId) { + try { + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + if (HoodieSparkUtils.isSpark3_2()) { + Class clazz = loader.loadClass(DateFormatter.class.getName()); + Method applyMethod = clazz.getDeclaredMethod("apply"); + applyMethod.setAccessible(true); + return (DateFormatter)applyMethod.invoke(null); + } else { + Class clazz = loader.loadClass(DateFormatter.class.getName()); + Method applyMethod = clazz.getDeclaredMethod("apply", ZoneId.class); + applyMethod.setAccessible(true); + return (DateFormatter)applyMethod.invoke(null, zoneId); + } + } catch (Exception e) { + throw new RuntimeException("Error in apply DateFormatter", e); + } + } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index 87d80d0b42bf0..7e806f7407b2b 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.spark3.internal.ReflectUtil -import org.apache.spark.SPARK_VERSION + import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -79,7 +79,7 @@ class Spark3Adapter extends SparkAdapter { override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { - ReflectUtil.createInsertInto(SPARK_VERSION.startsWith("3.0"), table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists) + ReflectUtil.createInsertInto(table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists) } override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = { diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala index ea9cc788aff48..d993b980367fc 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala @@ -16,24 +16,259 @@ */ package org.apache.spark.sql.execution.datasources -import java.util.TimeZone + +import java.lang.{Double => JDouble, Long => JLong} +import java.math.{BigDecimal => JBigDecimal} +import java.time.ZoneId +import java.util.{Locale, TimeZone} import org.apache.hadoop.fs.Path + +import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH +import org.apache.hudi.spark3.internal.ReflectUtil + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} -import org.apache.spark.sql.execution.datasources.PartitioningUtils.{PartitionValues, timestampPartitionPattern} +import org.apache.spark.sql.execution.datasources.PartitioningUtils.timestampPartitionPattern import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.mutable.ArrayBuffer +import scala.util.Try +import scala.util.control.NonFatal class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil { - override def parsePartition(path: Path, typeInference: Boolean, - basePaths: Set[Path], userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): Option[PartitionValues] = { - val dateFormatter = DateFormatter(timeZone.toZoneId) + /** + * The definition of PartitionValues has been changed by SPARK-34314 in Spark3.2. + * To solve the compatibility between 3.1 and 3.2, copy some codes from PartitioningUtils in Spark3.2 here. + * And this method will generate and return `InternalRow` directly instead of `PartitionValues`. + */ + override def parsePartition( + path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone): InternalRow = { + val dateFormatter = ReflectUtil.getDateFormatter(timeZone.toZoneId) val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone.toZoneId, isParsing = true) - PartitioningUtils.parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, - conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, timestampFormatter)._1 + val (partitionValues, _) = parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, + conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, timestampFormatter) + + partitionValues.map { + case PartitionValues(columnNames: Seq[String], typedValues: Seq[TypedPartValue]) => + val rowValues = columnNames.zip(typedValues).map { case (columnName, typedValue) => + try { + castPartValueToDesiredType(typedValue.dataType, typedValue.value, timeZone.toZoneId) + } catch { + case NonFatal(_) => + if (conf.validatePartitionColumns) { + throw new RuntimeException(s"Failed to cast value `${typedValue.value}` to " + + s"`${typedValue.dataType}` for partition column `$columnName`") + } else null + } + } + InternalRow.fromSeq(rowValues) + }.getOrElse(InternalRow.empty) + } + + case class TypedPartValue(value: String, dataType: DataType) + + case class PartitionValues(columnNames: Seq[String], typedValues: Seq[TypedPartValue]) + { + require(columnNames.size == typedValues.size) } + + private def parsePartition( + path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + validatePartitionColumns: Boolean, + zoneId: ZoneId, + dateFormatter: DateFormatter, + timestampFormatter: TimestampFormatter): (Option[PartitionValues], Option[Path]) = { + + val columns = ArrayBuffer.empty[(String, TypedPartValue)] + // Old Hadoop versions don't have `Path.isRoot` + var finished = path.getParent == null + // currentPath is the current path that we will use to parse partition column value. + var currentPath: Path = path + + while (!finished) { + // Sometimes (e.g., when speculative task is enabled), temporary directories may be left + // uncleaned. Here we simply ignore them. + if (currentPath.getName.toLowerCase(Locale.ROOT) == "_temporary") { + // scalastyle:off return + return (None, None) + // scalastyle:on return + } + + if (basePaths.contains(currentPath)) { + // If the currentPath is one of base paths. We should stop. + finished = true + } else { + // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1. + // Once we get the string, we try to parse it and find the partition column and value. + val maybeColumn = + parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes, + validatePartitionColumns, zoneId, dateFormatter, timestampFormatter) + maybeColumn.foreach(columns += _) + + // Now, we determine if we should stop. + // When we hit any of the following cases, we will stop: + // - In this iteration, we could not parse the value of partition column and value, + // i.e. maybeColumn is None, and columns is not empty. At here we check if columns is + // empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in + // this case). + // - After we get the new currentPath, this new currentPath represent the top level dir + // i.e. currentPath.getParent == null. For the example of "/table/a=1/", + // the top level dir is "/table". + finished = + (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null + + if (!finished) { + // For the above example, currentPath will be "/table/". + currentPath = currentPath.getParent + } + } + } + + if (columns.isEmpty) { + (None, Some(path)) + } else { + val (columnNames, values) = columns.reverse.unzip + (Some(PartitionValues(columnNames.toSeq, values.toSeq)), Some(currentPath)) + } + } + + private def parsePartitionColumn( + columnSpec: String, + typeInference: Boolean, + userSpecifiedDataTypes: Map[String, DataType], + validatePartitionColumns: Boolean, + zoneId: ZoneId, + dateFormatter: DateFormatter, + timestampFormatter: TimestampFormatter): Option[(String, TypedPartValue)] = { + val equalSignIndex = columnSpec.indexOf('=') + if (equalSignIndex == -1) { + None + } else { + val columnName = unescapePathName(columnSpec.take(equalSignIndex)) + assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'") + + val rawColumnValue = columnSpec.drop(equalSignIndex + 1) + assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") + + val dataType = if (userSpecifiedDataTypes.contains(columnName)) { + // SPARK-26188: if user provides corresponding column schema, get the column value without + // inference, and then cast it as user specified data type. + userSpecifiedDataTypes(columnName) + } else { + inferPartitionColumnValue( + rawColumnValue, + typeInference, + zoneId, + dateFormatter, + timestampFormatter) + } + Some(columnName -> TypedPartValue(rawColumnValue, dataType)) + } + } + + private def inferPartitionColumnValue( + raw: String, + typeInference: Boolean, + zoneId: ZoneId, + dateFormatter: DateFormatter, + timestampFormatter: TimestampFormatter): DataType = { + val decimalTry = Try { + // `BigDecimal` conversion can fail when the `field` is not a form of number. + val bigDecimal = new JBigDecimal(raw) + // It reduces the cases for decimals by disallowing values having scale (e.g. `1.1`). + require(bigDecimal.scale <= 0) + // `DecimalType` conversion can fail when + // 1. The precision is bigger than 38. + // 2. scale is bigger than precision. + fromDecimal(Decimal(bigDecimal)) + } + + val dateTry = Try { + // try and parse the date, if no exception occurs this is a candidate to be resolved as + // DateType + dateFormatter.parse(raw) + // SPARK-23436: Casting the string to date may still return null if a bad Date is provided. + // This can happen since DateFormat.parse may not use the entire text of the given string: + // so if there are extra-characters after the date, it returns correctly. + // We need to check that we can cast the raw string since we later can use Cast to get + // the partition values with the right DataType (see + // org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning) + val dateValue = Cast(Literal(raw), DateType, Some(zoneId.getId)).eval() + // Disallow DateType if the cast returned null + require(dateValue != null) + DateType + } + + val timestampTry = Try { + val unescapedRaw = unescapePathName(raw) + // the inferred data type is consistent with the default timestamp type + val timestampType = TimestampType + // try and parse the date, if no exception occurs this is a candidate to be resolved as TimestampType + timestampFormatter.parse(unescapedRaw) + + // SPARK-23436: see comment for date + val timestampValue = Cast(Literal(unescapedRaw), timestampType, Some(zoneId.getId)).eval() + // Disallow TimestampType if the cast returned null + require(timestampValue != null) + timestampType + } + + if (typeInference) { + // First tries integral types + Try({ Integer.parseInt(raw); IntegerType }) + .orElse(Try { JLong.parseLong(raw); LongType }) + .orElse(decimalTry) + // Then falls back to fractional types + .orElse(Try { JDouble.parseDouble(raw); DoubleType }) + // Then falls back to date/timestamp types + .orElse(timestampTry) + .orElse(dateTry) + // Then falls back to string + .getOrElse { + if (raw == DEFAULT_PARTITION_PATH) NullType else StringType + } + } else { + if (raw == DEFAULT_PARTITION_PATH) NullType else StringType + } + } + + def castPartValueToDesiredType( + desiredType: DataType, + value: String, + zoneId: ZoneId): Any = desiredType match { + case _ if value == DEFAULT_PARTITION_PATH => null + case NullType => null + case StringType => UTF8String.fromString(unescapePathName(value)) + case IntegerType => Integer.parseInt(value) + case LongType => JLong.parseLong(value) + case DoubleType => JDouble.parseDouble(value) + case _: DecimalType => Literal(new JBigDecimal(value)).value + case DateType => + Cast(Literal(value), DateType, Some(zoneId.getId)).eval() + // Timestamp types + case dt: TimestampType => + Try { + Cast(Literal(unescapePathName(value)), dt, Some(zoneId.getId)).eval() + }.getOrElse { + Cast(Cast(Literal(value), DateType, Some(zoneId.getId)), dt).eval() + } + case dt => throw new IllegalArgumentException(s"Unexpected type $dt") + } + + private def fromDecimal(d: Decimal): DecimalType = DecimalType(d.precision, d.scale) } diff --git a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java index f8b18289433a5..0d1867047847b 100644 --- a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java +++ b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestReflectUtil.java @@ -40,7 +40,6 @@ public void testDataSourceWriterExtraCommitMetadata() throws Exception { InsertIntoStatement statement = (InsertIntoStatement) spark.sessionState().sqlParser().parsePlan(insertIntoSql); InsertIntoStatement newStatment = ReflectUtil.createInsertInto( - spark.version().startsWith("3.0"), statement.table(), statement.partitionSpec(), scala.collection.immutable.List.empty(), diff --git a/pom.xml b/pom.xml index aa3ea59fabd0c..dc7e0957ac60c 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ ${spark2bundle.version} 1.13.1 2.4.4 - 3.1.2 + 3.2.0 3 hudi-spark2 @@ -1515,11 +1515,36 @@ + + spark3.1.x + + 3.1.2 + ${spark3.version} + ${spark3bundle.version} + ${scala12.version} + 2.12 + hudi-spark3 + 3.1.0 + 2.4.1 + ${fasterxml.spark3.version} + ${fasterxml.spark3.version} + ${fasterxml.spark3.version} + ${fasterxml.spark3.version} + true + true + + + + spark3 + + + + spark3.0.x - 3.0.0 + 3.0.3 ${spark3.version} 3.0.1