Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ trait ProvidesHoodieConfig extends Logging {
*/
def buildHoodieInsertConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession,
isOverwrite: Boolean,
isOverwritePartition: Boolean,
isOverwriteTable: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {

Expand Down Expand Up @@ -139,24 +140,24 @@ trait ProvidesHoodieConfig extends Logging {
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty
val operation =
(enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
case (true, _, _, false, _) =>
(enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, isNonStrictMode, isPartitionedTable) match {
case (true, _, _, _, false, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.")
case (true, true, _, _, true) =>
case (true, true, _, _, _, true) =>
throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.")
case (true, _, true, _, _) =>
case (true, _, _, true, _, _) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table.
case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite table
case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
case (false, false, true, _, _, _) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// insert overwrite partition
case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode.
case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
case (false, false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for non-strict mode.
case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation
case _ => INSERT_OPERATION_OPT_VAL
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,22 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
refreshTable: Boolean = true,
extraOptions: Map[String, String] = Map.empty): Boolean = {
val catalogTable = new HoodieCatalogTable(sparkSession, table)
val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions)

// NOTE: In case of partitioned table we override specified "overwrite" parameter
// to instead append to the dataset
val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
SaveMode.Overwrite
var mode = SaveMode.Append
var isOverWriteTable = false
var isOverWritePartition = false
if (overwrite && catalogTable.partitionFields.isEmpty) {
// insert overwrite non-partition table
mode = SaveMode.Overwrite
isOverWriteTable = true
} else {
SaveMode.Append
// for insert into or insert overwrite partition we use append mode.
mode = SaveMode.Append
isOverWritePartition = overwrite
}

val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)

val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf)

val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
Expand Down Expand Up @@ -443,6 +444,109 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
})
}

test("Test Insert Overwrite Table for V2 Table") {
withSQLConf("hoodie.schema.on.read.enable" -> "true") {
withRecordType()(withTempDir { tmp =>
if (HoodieSparkUtils.gteqSpark3_2) {
val tableName = generateTableName
// Create a partitioned table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| tblproperties (primaryKey = 'id', preCombineField='dt')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)

// Test insert overwrite table
spark.sql(
s"""
| insert overwrite table $tableName
| values(1, 'a1', 10.0, 1000, '2021-01-05')
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)

// Insert overwrite table
spark.sql(
s"""
| insert overwrite table $tableName
| values (2, 'a2', 10.0, 1000, '2021-01-06')
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
Seq(2, "a2", 10.0, 1000, "2021-01-06")
)

// Insert overwrite static partition
spark.sql(
s"""
| insert overwrite table $tableName partition(dt = '2021-01-05')
| select * from (select 2 , 'a2', 12.0, 1000) limit 10
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
Seq(2, "a2", 12.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-06")
)

// Insert data from another table
val tblNonPartition = generateTableName
spark.sql(
s"""
| create table $tblNonPartition (
| id int,
| name string,
| price double,
| ts long
| ) using hudi
| tblproperties (primaryKey = 'id')
| location '${tmp.getCanonicalPath}/$tblNonPartition'
""".stripMargin)
spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000")
spark.sql(
s"""
| insert overwrite table $tableName partition(dt ='2021-01-04')
| select * from $tblNonPartition limit 10
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")(
Seq(1, "a1", 10.0, 1000, "2021-01-04"),
Seq(2, "a2", 12.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-06")
)

// Insert overwrite partitioned table, all partitions will be truncated
spark.sql(
s"""
| insert overwrite table $tableName
| select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName " +
s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
Seq(3, "a1", 10.0, 1000, "2021-01-04")
)

// Test insert overwrite non-partitioned table
spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000")
checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
Seq(2, "a2", 10.0, 1000)
)

spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000")
checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
Seq(2, "a2", 10.0, 2000)
)
}
})
}
}


test("Test Different Type of Partition Column") {
withRecordType()(withTempDir { tmp =>
val typeAndValue = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,13 @@ class HoodieCatalog extends DelegatingCatalogExtension
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, options))
saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition =false, isOverwriteTable = false, Map.empty, options))
CreateHoodieTableCommand.createTableInCatalog(spark, hoodieCatalogTable, ignoreIfExists = false)
} else if (sourceQuery.isEmpty) {
saveSourceDF(sourceQuery, tableDesc.properties)
new CreateHoodieTableCommand(tableDesc, false).run(spark)
} else {
saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwrite = false, Map.empty, Map.empty))
saveSourceDF(sourceQuery, tableDesc.properties ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, isOverwritePartition = false, isOverwriteTable = false, Map.empty, Map.empty))
new CreateHoodieTableCommand(tableDesc, false).run(spark)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,27 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
spark: SparkSession)
extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig {

private var forceOverwrite = false
private var overwriteTable = false
private var overwritePartition = false

override def truncate(): HoodieV1WriteBuilder = {
forceOverwrite = true
overwriteTable = true
this
}

override def overwrite(filters: Array[Filter]): WriteBuilder = {
forceOverwrite = true
overwritePartition = true
this
}

override def build(): V1Write = new V1Write {
override def toInsertableRelation: InsertableRelation = {
new InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val mode = if (forceOverwrite && hoodieCatalogTable.partitionFields.isEmpty) {
// insert overwrite non-partition table
SaveMode.Overwrite
} else {
// for insert into or insert overwrite partition we use append mode.
SaveMode.Append
}
alignOutputColumns(data).write.format("org.apache.hudi")
.mode(mode)
.mode(SaveMode.Append)
.options(buildHoodieConfig(hoodieCatalogTable) ++
buildHoodieInsertConfig(hoodieCatalogTable, spark, forceOverwrite, Map.empty, Map.empty))
buildHoodieInsertConfig(hoodieCatalogTable, spark, overwritePartition, overwriteTable, Map.empty, Map.empty))
.save()
}
}
Expand Down