diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 92f73d503938..31af71994d0b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -254,7 +254,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi case action: MergeAction => // SPARK-34962: use UpdateStarAction as the explicit representation of * in UpdateAction. // So match and covert this in Spark3.2 env. - UpdateAction(action.condition, Seq.empty) + val (resolvedCondition, resolvedAssignments) = + resolveConditionAssignments(action.condition, Seq.empty) + UpdateAction(resolvedCondition, resolvedAssignments) } // Resolve the notMatchedActions val resolvedNotMatchedActions = notMatchedActions.map { @@ -265,7 +267,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi case action: MergeAction => // SPARK-34962: use InsertStarAction as the explicit representation of * in InsertAction. // So match and covert this in Spark3.2 env. - InsertAction(action.condition, Seq.empty) + val (resolvedCondition, resolvedAssignments) = + resolveConditionAssignments(action.condition, Seq.empty) + InsertAction(resolvedCondition, resolvedAssignments) } // Return the resolved MergeIntoTable MergeIntoTable(target, resolvedSource, resolvedMergeCondition, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 61bc5777178c..37d30c813588 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -21,7 +21,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME @@ -254,7 +254,7 @@ object InsertIntoHoodieTableCommand extends Logging { // on reading. classOf[ValidateDuplicateKeyPayload].getCanonicalName } else { - classOf[DefaultHoodieRecordPayload].getCanonicalName + classOf[OverwriteWithLatestAvroPayload].getCanonicalName } logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index 2090185f3150..12ec22499db7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.TruncateTableCommand +import scala.util.control.NonFatal + /** * Command for truncate hudi table. */ @@ -36,10 +38,16 @@ class TruncateHoodieTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) val properties = hoodieCatalogTable.tableConfig.getProps - val tablePath = hoodieCatalogTable.tableLocation - // Delete all data in the table directory - super.run(sparkSession) + try { + // Delete all data in the table directory + super.run(sparkSession) + } catch { + // TruncateTableCommand will delete the related directories first, and then refresh the table. + // It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step. + // So here ignore this failure, and refresh table later. + case NonFatal(_) => + } // If we have not specified the partition, truncate will delete all the data in the table path // include the hoodi.properties. In this case we should reInit the table. @@ -50,6 +58,10 @@ class TruncateHoodieTableCommand( .fromProperties(properties) .initTable(hadoopConf, hoodieCatalogTable.tableLocation) } + + // After deleting the data, refresh the table to make sure we don't keep around a stale + // file relation in the metastore cache and cached table data in the cache manager. + sparkSession.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString) Seq.empty[Row] } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala index 19d8d0345c4d..868bfc43d57f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala @@ -154,7 +154,7 @@ class TestShowPartitions extends TestHoodieSqlBase { Seq("year=2021/month=02/day=default"), Seq("year=2021/month=02/day=01") ) - checkAnswer(s"show partitions $tableName partition(day=01)")( + checkAnswer(s"show partitions $tableName partition(day='01')")( Seq("year=2021/month=02/day=01"), Seq("year=2021/month=default/day=01"), Seq("year=2021/month=01/day=01"),