diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index c86b1615ba58d..3e286486471be 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -151,7 +151,9 @@ object HoodieSparkSqlWriter { .setBaseFileFormat(baseFileFormat) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) - .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) + // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, + // but we are interested in what user has set, hence fetching from optParams. + .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 000004ace9ad4..5db98f7a3744c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -96,6 +96,26 @@ class TestCOWDataSource extends HoodieClientTestBase { assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) } + @Test def testNoPrecombine() { + // Insert Operation + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + val commonOptsNoPreCombine = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + inputDF.write.format("hudi") + .options(commonOptsNoPreCombine) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + spark.read.format("org.apache.hudi").load(basePath).count() + } @Test def testHoodieIsDeletedNonBooleanField() { // Insert Operation diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index d8ebe5cbcd8b0..e3ead19753608 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -499,6 +499,28 @@ class TestMORDataSource extends HoodieClientTestBase { hudiSnapshotDF2.show(1) } + @Test def testNoPrecombine() { + // Insert Operation + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + val commonOptsNoPreCombine = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + inputDF.write.format("hudi") + .options(commonOptsNoPreCombine) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key(), "MERGE_ON_READ") + .mode(SaveMode.Overwrite) + .save(basePath) + + spark.read.format("org.apache.hudi").load(basePath).count() + } + @Test def testPreCombineFiledForReadMOR(): Unit = { writeData((1, "a0", 10, 100, false)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 8ebef198af458..6b8efb84e32f1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -263,83 +263,100 @@ class TestCreateTable extends TestHoodieSqlBase { test("Test Create Table As Select") { withTempDir { tmp => - // Create Non-Partitioned table - val tableName1 = generateTableName - spark.sql( - s""" - | create table $tableName1 using hudi - | tblproperties(primaryKey = 'id') - | location '${tmp.getCanonicalPath}/$tableName1' - | AS - | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + Seq("cow", "mor").foreach { tableType => + // Create Non-Partitioned table + val tableName1 = generateTableName + spark.sql( + s""" + | create table $tableName1 using hudi + | tblproperties( + | primaryKey = 'id', + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName1' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName1")( - Seq(1, "a1", 10.0, 1000) - ) + checkAnswer(s"select id, name, price, ts from $tableName1")( + Seq(1, "a1", 10.0, 1000) + ) - // Create Partitioned table - val tableName2 = generateTableName - spark.sql( - s""" - | create table $tableName2 using hudi - | partitioned by (dt) - | tblproperties(primaryKey = 'id') - | location '${tmp.getCanonicalPath}/$tableName2' - | AS - | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt + // Create Partitioned table + val tableName2 = generateTableName + spark.sql( + s""" + | create table $tableName2 using hudi + | partitioned by (dt) + | tblproperties( + | primaryKey = 'id', + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName2' + | AS + | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt """.stripMargin - ) - checkAnswer(s"select id, name, price, dt from $tableName2")( - Seq(1, "a1", 10, "2021-04-01") - ) + ) + checkAnswer(s"select id, name, price, dt from $tableName2")( + Seq(1, "a1", 10, "2021-04-01") + ) - // Create Partitioned table with timestamp data type - val tableName3 = generateTableName - // CTAS failed with null primaryKey - assertThrows[Exception] { + // Create Partitioned table with timestamp data type + val tableName3 = generateTableName + // CTAS failed with null primaryKey + assertThrows[Exception] { + spark.sql( + s""" + | create table $tableName3 using hudi + | partitioned by (dt) + | tblproperties( + | primaryKey = 'id', + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName3' + | AS + | select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt + | + """.stripMargin + ) + } + // Create table with timestamp type partition spark.sql( s""" | create table $tableName3 using hudi | partitioned by (dt) - | tblproperties(primaryKey = 'id') + | tblproperties( + | primaryKey = 'id', + | type = '$tableType' + | ) | location '${tmp.getCanonicalPath}/$tableName3' | AS - | select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt - | - """.stripMargin - ) - } - // Create table with timestamp type partition - spark.sql( - s""" - | create table $tableName3 using hudi - | partitioned by (dt) - | tblproperties(primaryKey = 'id') - | location '${tmp.getCanonicalPath}/$tableName3' - | AS - | select cast('2021-05-06 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as - | price + | select cast('2021-05-06 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as + | price """.stripMargin - ) - checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")( - Seq(1, "a1", 10, "2021-05-06 00:00:00") - ) - // Create table with date type partition - val tableName4 = generateTableName - spark.sql( - s""" - | create table $tableName4 using hudi - | partitioned by (dt) - | tblproperties(primaryKey = 'id') - | location '${tmp.getCanonicalPath}/$tableName4' - | AS - | select cast('2021-05-06' as date) as dt, 1 as id, 'a1' as name, 10 as - | price + ) + checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")( + Seq(1, "a1", 10, "2021-05-06 00:00:00") + ) + // Create table with date type partition + val tableName4 = generateTableName + spark.sql( + s""" + | create table $tableName4 using hudi + | partitioned by (dt) + | tblproperties( + | primaryKey = 'id', + | type = '$tableType' + | ) + | location '${tmp.getCanonicalPath}/$tableName4' + | AS + | select cast('2021-05-06' as date) as dt, 1 as id, 'a1' as name, 10 as + | price """.stripMargin - ) - checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")( - Seq(1, "a1", 10, "2021-05-06") - ) + ) + checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")( + Seq(1, "a1", 10, "2021-05-06") + ) + } } }