From 9b1aa17e64cb3f246b7748306a2fafeb4dfd8669 Mon Sep 17 00:00:00 2001 From: kazdy Date: Tue, 22 Nov 2022 00:39:26 +0100 Subject: [PATCH 1/3] Insert into sql command with strict sql insert mode and no preCombineFiled should not overwrite existing records with the same key but throw error. --- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 4 ++ .../spark/sql/hudi/TestInsertTable.scala | 47 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 332455ea217ef..8d99394f6ce02 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -165,10 +165,14 @@ trait ProvidesHoodieConfig extends Logging { // Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload // on reading. classOf[ValidateDuplicateKeyPayload].getCanonicalName + } else if (operation == INSERT_OPERATION_OPT_VAL && + tableType == COW_TABLE_TYPE_OPT_VAL && hasPrecombineColumn == false && insertMode == InsertMode.STRICT){ + classOf[ValidateDuplicateKeyPayload].getCanonicalName } else { classOf[OverwriteWithLatestAvroPayload].getCanonicalName } + logInfo(s"Insert statement use write operation type: $operation, payloadClass: $payloadClassName") withSparkConf(sparkSession, catalogProperties) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index c6eccac2bc50e..5f04a8b808281 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -267,6 +267,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { throw root } } + // Create table with dropDup is true val tableName2 = generateTableName spark.sql("set hoodie.datasource.write.insert.drop.duplicates = true") @@ -297,6 +298,52 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } + test("Test Insert Into None Partitioned Table strict mode with no preCombineField") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql(s"set hoodie.sql.insert.mode=strict") + // Create none partitioned cow table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + spark.sql(s"insert into $tableName select 2, 'a2', 12") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0), + Seq(2, "a2", 12.0) + ) + + assertThrows[HoodieDuplicateKeyException] { + try { + spark.sql(s"insert into $tableName select 1, 'a1', 10") + } catch { + case e: Exception => + var root: Throwable = e + while (root.getCause != null) { + root = root.getCause + } + throw root + } + } + + // disable this config to avoid affect other test in this class. + spark.sql(s"set hoodie.sql.insert.mode=upsert") + } + } + test("Test Insert Overwrite") { withTempDir { tmp => val tableName = generateTableName From 7bfd0b1a4788f007dc63922771efa4fef46699dd Mon Sep 17 00:00:00 2001 From: kazdy Date: Tue, 22 Nov 2022 10:34:03 +0100 Subject: [PATCH 2/3] fix test for strict insert mode without precombine field --- .../scala/org/apache/spark/sql/hudi/TestInsertTable.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 5f04a8b808281..c5e1abbf09ab2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -317,11 +317,11 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | ) """.stripMargin) spark.sql(s"insert into $tableName values(1, 'a1', 10)") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 10.0, 1000) + checkAnswer(s"select id, name, price from $tableName")( + Seq(1, "a1", 10.0) ) spark.sql(s"insert into $tableName select 2, 'a2', 12") - checkAnswer(s"select id, name, price, ts from $tableName")( + checkAnswer(s"select id, name, price from $tableName")( Seq(1, "a1", 10.0), Seq(2, "a2", 12.0) ) From a17ba9d2cbeb5099cbed641ab1aa90f70e1ebec4 Mon Sep 17 00:00:00 2001 From: kazdy Date: Wed, 23 Nov 2022 18:27:01 +0100 Subject: [PATCH 3/3] For COW table STRICT insert mode, PK uniqueness should be honored irrespective of precombine field. --- .../org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 8d99394f6ce02..61acdf866102b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -162,11 +162,12 @@ trait ProvidesHoodieConfig extends Logging { val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) { - // Only validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload + // Validate duplicate key for COW, for MOR it will do the merge with the DefaultHoodieRecordPayload // on reading. classOf[ValidateDuplicateKeyPayload].getCanonicalName - } else if (operation == INSERT_OPERATION_OPT_VAL && - tableType == COW_TABLE_TYPE_OPT_VAL && hasPrecombineColumn == false && insertMode == InsertMode.STRICT){ + } else if (operation == INSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL && + insertMode == InsertMode.STRICT){ + // Validate duplicate key for inserts to COW table when using strict insert mode. classOf[ValidateDuplicateKeyPayload].getCanonicalName } else { classOf[OverwriteWithLatestAvroPayload].getCanonicalName