diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index fcba6e310dc97..db2b93eda08c5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -95,7 +95,8 @@ trait ProvidesHoodieConfig extends Logging { */ def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable, sparkSession: SparkSession, - isOverwrite: Boolean, + isOverwritePartition: Boolean, + isOverwriteTable: Boolean, insertPartitions: Map[String, Option[String]] = Map.empty, extraOptions: Map[String, String]): Map[String, String] = { @@ -139,24 +140,24 @@ trait ProvidesHoodieConfig extends Logging { val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty val operation = - (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match { - case (true, _, _, false, _) => + (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, isNonStrictMode, isPartitionedTable) match { + case (true, _, _, _, false, _) => throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.") - case (true, true, _, _, true) => + case (true, true, _, _, _, true) => throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.") - case (true, _, true, _, _) => + case (true, _, _, true, _, _) => throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." + s" Please disable $INSERT_DROP_DUPS and try again.") // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table. - case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL + case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL // insert overwrite table - case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL + case (false, false, true, _, _, _) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL // insert overwrite partition - case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL + case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode. - case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL + case (false, false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode. - case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL + case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL // for the rest case, use the insert operation case _ => INSERT_OPERATION_OPT_VAL } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 0228e5ddcf7c3..2e4c1db099e38 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -86,16 +86,22 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi refreshTable: Boolean = true, extraOptions: Map[String, String] = Map.empty): Boolean = { val catalogTable = new HoodieCatalogTable(sparkSession, table) - val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions) - // NOTE: In case of partitioned table we override specified "overwrite" parameter - // to instead append to the dataset - val mode = if (overwrite && catalogTable.partitionFields.isEmpty) { - SaveMode.Overwrite + var mode = SaveMode.Append + var isOverWriteTable = false + var isOverWritePartition = false + if (overwrite && catalogTable.partitionFields.isEmpty) { + // insert overwrite non-partition table + mode = SaveMode.Overwrite + isOverWriteTable = true } else { - SaveMode.Append + // for insert into or insert overwrite partition we use append mode. + mode = SaveMode.Append + isOverWritePartition = overwrite } + val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions) + val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf) val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 73aaabc0d8f8e..b6444b52b521d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} @@ -443,6 +444,109 @@ class TestInsertTable extends HoodieSparkSqlTestBase { }) } + test("Test Insert Overwrite Table for V2 Table") { + withSQLConf("hoodie.schema.on.read.enable" -> "true") { + withRecordType()(withTempDir { tmp => + if (HoodieSparkUtils.gteqSpark3_2) { + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | tblproperties (primaryKey = 'id', preCombineField='dt') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + + // Test insert overwrite table + spark.sql( + s""" + | insert overwrite table $tableName + | values(1, 'a1', 10.0, 1000, '2021-01-05') + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2021-01-05") + ) + + // Insert overwrite table + spark.sql( + s""" + | insert overwrite table $tableName + | values (2, 'a2', 10.0, 1000, '2021-01-06') + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")( + Seq(2, "a2", 10.0, 1000, "2021-01-06") + ) + + // Insert overwrite static partition + spark.sql( + s""" + | insert overwrite table $tableName partition(dt = '2021-01-05') + | select * from (select 2 , 'a2', 12.0, 1000) limit 10 + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")( + Seq(2, "a2", 12.0, 1000, "2021-01-05"), + Seq(2, "a2", 10.0, 1000, "2021-01-06") + ) + + // Insert data from another table + val tblNonPartition = generateTableName + spark.sql( + s""" + | create table $tblNonPartition ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | tblproperties (primaryKey = 'id') + | location '${tmp.getCanonicalPath}/$tblNonPartition' + """.stripMargin) + spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000") + spark.sql( + s""" + | insert overwrite table $tableName partition(dt ='2021-01-04') + | select * from $tblNonPartition limit 10 + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")( + Seq(1, "a1", 10.0, 1000, "2021-01-04"), + Seq(2, "a2", 12.0, 1000, "2021-01-05"), + Seq(2, "a2", 10.0, 1000, "2021-01-06") + ) + + // Insert overwrite partitioned table, all partitions will be truncated + spark.sql( + s""" + | insert overwrite table $tableName + | select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10 + """.stripMargin) + checkAnswer(s"select id, name, price, ts, dt from $tableName " + + s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")( + Seq(3, "a1", 10.0, 1000, "2021-01-04") + ) + + // Test insert overwrite non-partitioned table + spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000") + checkAnswer(s"select id, name, price, ts from $tblNonPartition")( + Seq(2, "a2", 10.0, 1000) + ) + + spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000") + checkAnswer(s"select id, name, price, ts from $tblNonPartition")( + Seq(2, "a2", 10.0, 2000) + ) + } + }) + } + } + + test("Test Different Type of Partition Column") { withRecordType()(withTempDir { tmp => val typeAndValue = Seq( diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala index eeef56d3cff74..6d3610db21eda 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala @@ -293,13 +293,13 @@ class HoodieCatalog extends DelegatingCatalogExtension DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(), DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true" ) - saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, options)) + saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition =false, isOverwriteTable = false, Map.empty, options)) CreateHoodieTableCommand.createTableInCatalog(spark, hoodieCatalogTable, ignoreIfExists = false) } else if (sourceQuery.isEmpty) { saveSourceDF(sourceQuery, tableDesc.properties) new CreateHoodieTableCommand(tableDesc, false).run(spark) } else { - saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, Map.empty)) + saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition = false, isOverwriteTable = false, Map.empty, Map.empty)) new CreateHoodieTableCommand(tableDesc, false).run(spark) } diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala index 9968095f3a5d3..b41c7456b7125 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala @@ -89,15 +89,16 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap, spark: SparkSession) extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig { - private var forceOverwrite = false + private var overwriteTable = false + private var overwritePartition = false override def truncate(): HoodieV1WriteBuilder = { - forceOverwrite = true + overwriteTable = true this } override def overwrite(filters: Array[Filter]): WriteBuilder = { - forceOverwrite = true + overwritePartition = true this } @@ -105,17 +106,10 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap, override def toInsertableRelation: InsertableRelation = { new InsertableRelation { override def insert(data: DataFrame, overwrite: Boolean): Unit = { - val mode = if (forceOverwrite && hoodieCatalogTable.partitionFields.isEmpty) { - // insert overwrite non-partition table - SaveMode.Overwrite - } else { - // for insert into or insert overwrite partition we use append mode. - SaveMode.Append - } alignOutputColumns(data).write.format("org.apache.hudi") - .mode(mode) + .mode(SaveMode.Append) .options(buildHoodieConfig(hoodieCatalogTable) ++ - buildHoodieInsertConfig(hoodieCatalogTable, spark, forceOverwrite, Map.empty, Map.empty)) + buildHoodieInsertConfig(hoodieCatalogTable, spark, overwritePartition, overwriteTable, Map.empty, Map.empty)) .save() } }