Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ jobs:
spark: "spark3,spark3.0.x"
- scala: "scala-2.12"
spark: "spark3,spark3.0.x,spark-shade-unbundle-avro"
- scala: "scala-2.12"
spark: "spark3,spark3.1.x"
- scala: "scala-2.12"
spark: "spark3,spark3.1.x,spark-shade-unbundle-avro"
- scala: "scala-2.12"
spark: "spark3"
- scala: "scala-2.12"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ import scala.collection.JavaConverters.asScalaBufferConverter

object HoodieSparkUtils extends SparkAdapterSupport {

def isSpark2: Boolean = SPARK_VERSION.startsWith("2.")

def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")

def isSpark3_0: Boolean = SPARK_VERSION.startsWith("3.0")

def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")

def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.datasources
import java.util.TimeZone

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.DataType

trait SparkParsePartitionUtil extends Serializable {
Expand All @@ -30,5 +31,5 @@ trait SparkParsePartitionUtil extends Serializable {
typeInference: Boolean,
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
timeZone: TimeZone): Option[PartitionValues]
timeZone: TimeZone): InternalRow
}
Original file line number Diff line number Diff line change
Expand Up @@ -569,14 +569,10 @@ case class HoodieFileIndex(
}.mkString("/")
val pathWithPartitionName = new Path(basePath, partitionWithName)
val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap
val partitionValues = sparkParsePartitionUtil.parsePartition(pathWithPartitionName,

sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
typeInference = false, Set(new Path(basePath)), partitionDataTypes,
DateTimeUtils.getTimeZone(timeZoneId))

// Convert partitionValues to InternalRow
partitionValues.map(_.literals.map(_.value))
.map(InternalRow.fromSeq)
.getOrElse(InternalRow.empty)
}
}
PartitionRowPath(partitionRow, partitionPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.SparkContext

import java.util.Properties

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -463,13 +464,13 @@ object HoodieSparkSqlWriter {
} else {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}
if (SPARK_VERSION.startsWith("2.")) {
if (HoodieSparkUtils.isSpark2) {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(params)
.mode(SaveMode.Append)
.save()
} else if (SPARK_VERSION.startsWith("3.")) {
} else if(HoodieSparkUtils.isSpark3) {
hoodieDF.write.format("org.apache.hudi.spark3.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@
package org.apache.spark.sql.avro

import org.apache.avro.Schema

import org.apache.hudi.HoodieSparkUtils

import org.apache.spark.sql.types.DataType

/**
* This is to be compatible with the type returned by Spark 3.1
* and other spark versions for AvroDeserializer
*/
case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType)
extends AvroDeserializer(rootAvroType, rootCatalystType) {
case class HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-34404


private val avroDeserializer = if (HoodieSparkUtils.isSpark3_2) {
// SPARK-34404: As of Spark3.2, there is no AvroDeserializer's constructor with Schema and DataType arguments.
// So use the reflection to get AvroDeserializer instance.
val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType], classOf[String])
constructor.newInstance(rootAvroType, rootCatalystType, "EXCEPTION")
} else {
val constructor = classOf[AvroDeserializer].getConstructor(classOf[Schema], classOf[DataType])
constructor.newInstance(rootAvroType, rootCatalystType)
}

def deserializeData(data: Any): Any = {
super.deserialize(data) match {
case Some(r) => r // spark 3.1 return type is Option, we fetch the data.
avroDeserializer.deserialize(data) match {
case Some(r) => r // As of spark 3.1, this will return data wrapped with Option, so we fetch the data.
case o => o // for other spark version, return the data directly.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,37 @@ import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.Compactio
case class CompactionTable(table: LogicalPlan, operation: CompactionOperation, instantTimestamp: Option[Long])
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)

def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionTable = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-34989

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we link the spark jira link in the codebase to help users understand the background

copy(table = newChildren.head)
}
}

case class CompactionPath(path: String, operation: CompactionOperation, instantTimestamp: Option[Long])
extends Command
extends Command {
override def children: Seq[LogicalPlan] = Seq.empty

def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionPath = {
this
}
}

case class CompactionShowOnTable(table: LogicalPlan, limit: Int = 20)
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)

def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionShowOnTable = {
copy(table = newChildren.head)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you please clarify why here use copy while the other just use this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other classes which extends RunnableCommand, class CompactionShowOnPath and CompactionPath has no children, so that withNewChildrenInternal just return the original object.
In Spark3.2, any classes which extends RunnableCommand extends LeafRunnableCommand at the same time, except ones related to view. And LeafRunnableCommand overrides method withNewChildrenInternal and return this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron thanks for the explanation; looks like worth noting down as block comment there.

}
}

case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command
case class CompactionShowOnPath(path: String, limit: Int = 20) extends Command {
override def children: Seq[LogicalPlan] = Seq.empty

def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): CompactionShowOnPath = {
this
}
}

object CompactionOperation extends Enumeration {
type CompactionOperation = Value
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.trees

/**
* Similar to `LeafLike` in Spark3.2.
*/
trait HoodieLeafLike[T <: TreeNode[T]] { self: TreeNode[T] =>

override final def children: Seq[T] = Nil

override final def mapChildren(f: T => T): T = this.asInstanceOf[T]

final def withNewChildrenInternal(newChildren: IndexedSeq[T]): T = this.asInstanceOf[T]
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.spark.SPARK_VERSION

import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand Down Expand Up @@ -282,8 +282,6 @@ object HoodieSqlUtils extends SparkAdapterSupport {
.filterKeys(_.startsWith("hoodie."))
}

def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")

def isEnableHive(sparkSession: SparkSession): Boolean =
"hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.hudi.analysis

import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient

import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -137,7 +138,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// We can do this because under the normal case, we should not allow to update or set
// the hoodie's meta field in sql statement, it is a system field, cannot set the value
// by user.
if (HoodieSqlUtils.isSpark3) {
if (HoodieSparkUtils.isSpark3) {
val assignmentFieldNames = assignments.map(_.key).map {
case attr: AttributeReference =>
attr.name
Expand Down Expand Up @@ -178,11 +179,19 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
.map { case (targetAttr, sourceAttr) => Assignment(targetAttr, sourceAttr) }
}
} else {
assignments.map(assignment => {
// For Spark3.2, InsertStarAction/UpdateStarAction's assignments will contain the meta fields.
Copy link
Contributor

@leesf leesf Dec 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so here meta fields is hudi meta fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep.

val withoutMetaAttrs = assignments.filterNot{ assignment =>
if (assignment.key.isInstanceOf[Attribute]) {
HoodieSqlUtils.isMetaField(assignment.key.asInstanceOf[Attribute].name)
} else {
false
}
}
withoutMetaAttrs.map { assignment =>
val resolvedKey = resolveExpressionFrom(target)(assignment.key)
val resolvedValue = resolveExpressionFrom(resolvedSource, Some(target))(assignment.value)
Assignment(resolvedKey, resolvedValue)
})
}
}
(resolvedCondition, resolvedAssignments)
}
Expand Down Expand Up @@ -242,13 +251,21 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case DeleteAction(condition) =>
val resolvedCondition = condition.map(resolveExpressionFrom(resolvedSource)(_))
DeleteAction(resolvedCondition)
case action: MergeAction =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-34962

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-34962

ditto, link the spark jira link

// SPARK-34962: use UpdateStarAction as the explicit representation of * in UpdateAction.
// So match and covert this in Spark3.2 env.
UpdateAction(action.condition, Seq.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the UpdateAction is not aligned with the description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

action is a UpdateStarAction's instance. Here we convert UpdateStarAction to UpdateAction.

}
// Resolve the notMatchedActions
val resolvedNotMatchedActions = notMatchedActions.map {
case InsertAction(condition, assignments) =>
val (resolvedCondition, resolvedAssignments) =
resolveConditionAssignments(condition, assignments)
InsertAction(resolvedCondition, resolvedAssignments)
case action: MergeAction =>
// SPARK-34962: use InsertStarAction as the explicit representation of * in InsertAction.
// So match and covert this in Spark3.2 env.
InsertAction(action.condition, Seq.empty)
}
// Return the resolved MergeIntoTable
MergeIntoTable(target, resolvedSource, resolvedMergeCondition,
Expand Down Expand Up @@ -426,9 +443,11 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
case ShowPartitionsCommand(tableName, specOpt)
if isHoodieTable(tableName, sparkSession) =>
ShowHoodieTablePartitionsCommand(tableName, specOpt)
// SPARK-34238: the definition of ShowPartitionsCommand has been changed in Spark3.2.
// Match the class type instead of call the `unapply` method.
case s: ShowPartitionsCommand
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-34238

if isHoodieTable(s.tableName, sparkSession) =>
ShowHoodieTablePartitionsCommand(s.tableName, s.spec)
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand
case TruncateTableCommand(tableName, partitionSpec)
if isHoodieTable(tableName, sparkSession) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils

Expand All @@ -44,7 +44,7 @@ import scala.util.control.NonFatal
case class AlterHoodieTableAddColumnsCommand(
tableId: TableIdentifier,
colsToAdd: Seq[StructField])
extends RunnableCommand {
extends HoodieLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
if (colsToAdd.nonEmpty) {
Expand Down Expand Up @@ -74,7 +74,7 @@ case class AlterHoodieTableAddColumnsCommand(
}

private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
newSqlSchema: StructType): Unit = {
newSqlDataSchema: StructType): Unit = {
try {
sparkSession.catalog.uncacheTable(tableId.quotedString)
} catch {
Expand All @@ -84,12 +84,11 @@ case class AlterHoodieTableAddColumnsCommand(
sparkSession.catalog.refreshTable(table.identifier.unquotedString)

SchemaUtils.checkColumnNameDuplication(
newSqlSchema.map(_.name),
newSqlDataSchema.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkDataColNames(table, colsToAdd.map(_.name))
Copy link
Contributor Author

@YannByron YannByron Dec 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the definition of checkDataColNames has been changed by SPARK-36201, and this method is meaningless for hudi table.


sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlSchema)
sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlDataSchema)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{StructField, StructType}

import scala.util.control.NonFatal
Expand All @@ -39,7 +39,7 @@ case class AlterHoodieTableChangeColumnCommand(
tableIdentifier: TableIdentifier,
columnName: String,
newColumn: StructField)
extends RunnableCommand {
extends HoodieLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

Expand All @@ -39,7 +40,7 @@ case class AlterHoodieTableDropPartitionCommand(
ifExists : Boolean,
purge : Boolean,
retainData : Boolean)
extends RunnableCommand {
extends HoodieLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
Expand Down
Loading