Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case action: MergeAction =>
// SPARK-34962: use UpdateStarAction as the explicit representation of * in UpdateAction.
// So match and covert this in Spark3.2 env.
UpdateAction(action.condition, Seq.empty)
val (resolvedCondition, resolvedAssignments) =
resolveConditionAssignments(action.condition, Seq.empty)
UpdateAction(resolvedCondition, resolvedAssignments)
}
// Resolve the notMatchedActions
val resolvedNotMatchedActions = notMatchedActions.map {
Expand All @@ -265,7 +267,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case action: MergeAction =>
// SPARK-34962: use InsertStarAction as the explicit representation of * in InsertAction.
// So match and covert this in Spark3.2 env.
InsertAction(action.condition, Seq.empty)
val (resolvedCondition, resolvedAssignments) =
resolveConditionAssignments(action.condition, Seq.empty)
InsertAction(resolvedCondition, resolvedAssignments)
}
// Return the resolved MergeIntoTable
MergeIntoTable(target, resolvedSource, resolvedMergeCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
Expand Down Expand Up @@ -254,7 +254,7 @@ object InsertIntoHoodieTableCommand extends Logging {
// on reading.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
classOf[DefaultHoodieRecordPayload].getCanonicalName
classOf[OverwriteWithLatestAvroPayload].getCanonicalName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with #4169

}
logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.TruncateTableCommand

import scala.util.control.NonFatal

/**
* Command for truncate hudi table.
*/
Expand All @@ -36,10 +38,16 @@ class TruncateHoodieTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val properties = hoodieCatalogTable.tableConfig.getProps
val tablePath = hoodieCatalogTable.tableLocation

// Delete all data in the table directory
super.run(sparkSession)
try {
// Delete all data in the table directory
super.run(sparkSession)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will throw exception and cause failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the process, call the TruncateTableCommand in Spark first and reconstruct the hoodie's table.
Within the TruncateTableCommand in Spark, it will delete the related directories first, and refresh the table which will fail because .hoodie dir has gone and it can't resolve hudi's relation. So I catch this and ignore this.

The related error as following:

Cause: org.apache.hudi.exception.TableNotFoundException: Hoodie table not found in path Unable to find a hudi table for the user provided paths.
  at org.apache.hudi.DataSourceUtils.getTablePath(DataSourceUtils.java:85)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:103)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
  at org.apache.spark.sql.execution.datasources.FindDataSourceTable.$anonfun$readDataSourceTable$1(DataSourceStrategy.scala:252)

Copy link
Member

@xushiyan xushiyan Jan 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron looks like it deserves a log statement or comment in the catch block to help explain the try catch scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done~

} catch {
// TruncateTableCommand will delete the related directories first, and then refresh the table.
// It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step.
// So here ignore this failure, and refresh table later.
case NonFatal(_) =>
}

// If we have not specified the partition, truncate will delete all the data in the table path
// include the hoodi.properties. In this case we should reInit the table.
Expand All @@ -50,6 +58,10 @@ class TruncateHoodieTableCommand(
.fromProperties(properties)
.initTable(hadoopConf, hoodieCatalogTable.tableLocation)
}

// After deleting the data, refresh the table to make sure we don't keep around a stale
// file relation in the metastore cache and cached table data in the cache manager.
sparkSession.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class TestShowPartitions extends TestHoodieSqlBase {
Seq("year=2021/month=02/day=default"),
Seq("year=2021/month=02/day=01")
)
checkAnswer(s"show partitions $tableName partition(day=01)")(
checkAnswer(s"show partitions $tableName partition(day='01')")(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here partition must be in string format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's related to the origin type of the field. In this case, day is string type.

Seq("year=2021/month=02/day=01"),
Seq("year=2021/month=default/day=01"),
Seq("year=2021/month=01/day=01"),
Expand Down