diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 332455ea217ef..61acdf866102b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -162,13 +162,18 @@ trait ProvidesHoodieConfig extends Logging { val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) { - // Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload + // Validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload // on reading. classOf[ValidateDuplicateKeyPayload].getCanonicalName + } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL && + insertMode == InsertMode.STRICT){ + // Validate duplicate key for inserts to COW table when using strict insert mode. + classOf[ValidateDuplicateKeyPayload].getCanonicalName } else { classOf[OverwriteWithLatestAvroPayload].getCanonicalName } + logInfo(s"Insert statement use write operation type: $operation, payloadClass: $payloadClassName") withSparkConf(sparkSession, catalogProperties) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index c6eccac2bc50e..c5e1abbf09ab2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -267,6 +267,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { throw root } } + // Create table with dropDup is true val tableName2 = generateTableName spark.sql("set hoodie.datasource.write.insert.drop.duplicates = true") @@ -297,6 +298,52 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } + test("Test Insert Into None Partitioned Table strict mode with no preCombineField") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql(s"set hoodie.sql.insert.mode=strict") + // Create none partitioned cow table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10)") + checkAnswer(s"select id, name, price from $tableName")( + Seq(1, "a1", 10.0) + ) + spark.sql(s"insert into $tableName select 2, 'a2', 12") + checkAnswer(s"select id, name, price from $tableName")( + Seq(1, "a1", 10.0), + Seq(2, "a2", 12.0) + ) + + assertThrows[HoodieDuplicateKeyException] { + try { + spark.sql(s"insert into $tableName select 1, 'a1', 10") + } catch { + case e: Exception => + var root: Throwable = e + while (root.getCause != null) { + root = root.getCause + } + throw root + } + } + + // disable this config to avoid affect other test in this class. + spark.sql(s"set hoodie.sql.insert.mode=upsert") + } + } + test("Test Insert Overwrite") { withTempDir { tmp => val tableName = generateTableName