diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index 48bb46f81b1b0..6f13dbc82f4d9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -26,14 +26,14 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.keygen.TimestampBasedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config -import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} -import org.junit.jupiter.api.{Disabled, Tag} +import org.junit.jupiter.api.Tag import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} @@ -51,31 +51,33 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false" ) val verificationCol: String = "driver" val updatedVerificationVal: String = "driver_update" @ParameterizedTest - @CsvSource(Array( - "true,org.apache.hudi.keygen.SimpleKeyGenerator", - "true,org.apache.hudi.keygen.ComplexKeyGenerator", - "true,org.apache.hudi.keygen.TimestampBasedKeyGenerator", - "false,org.apache.hudi.keygen.SimpleKeyGenerator", - "false,org.apache.hudi.keygen.ComplexKeyGenerator", - "false,org.apache.hudi.keygen.TimestampBasedKeyGenerator" - )) - def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String): Unit = { - commonOpts += DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass - if (classOf[ComplexKeyGenerator].getName.equals(keyGenClass)) { - commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key, pii_col" - } - if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) { - commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key" - commonOpts += DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "current_ts" - commonOpts += Config.TIMESTAMP_TYPE_FIELD_PROP -> "EPOCHMILLISECONDS" - commonOpts += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd" + @CsvSource(value = Array( + "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key", + "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes", + "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key", + "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key", + "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes", + "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key" + ), delimiter = '|') + def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String): Unit = { + var options: Map[String, String] = commonOpts + + (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) + + (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) + + (DataSourceWriteOptions.RECORDKEY_FIELD.key() -> recordKeys) + val isTimestampBasedKeyGen: Boolean = classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass) + if (isTimestampBasedKeyGen) { + options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key" + options += Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING" + options += Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd" + options += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd" } val dataGen = new HoodieTestDataGenerator(0xDEED) val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) @@ -83,14 +85,12 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2)) inputDF0.write.format("org.apache.hudi") - .options(commonOpts) + .options(options) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) .mode(SaveMode.Overwrite) .save(basePath) assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) - val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) // Snapshot query val snapshotDF1 = spark.read.format("org.apache.hudi") @@ -102,7 +102,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) val verificationRowKey = inputDF1.limit(1).select("_row_key").first.getString(0) var updateDf: DataFrame = null - if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) { + if (isTimestampBasedKeyGen) { // update current_ts to be same as original record so that partition path does not change with timestamp based key gen val originalRow = snapshotDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0) updateDf = inputDF1.filter(col("_row_key") === verificationRowKey) @@ -116,8 +116,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { } updateDf.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(options) .mode(SaveMode.Append) .save(basePath) val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath) @@ -132,7 +131,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val records2 = recordsToStrings(dataGen.generateUpdates("002", 100)).toList var inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) { + if (isTimestampBasedKeyGen) { // incase of Timestamp based key gen, current_ts should not be updated. but dataGen.generateUpdates() would have updated // the value of current_ts. So, we need to revert it back to original value. // here is what we are going to do. Copy values to temp columns, join with original df and update the current_ts @@ -152,8 +151,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() inputDF2.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(options) .mode(SaveMode.Append) .save(basePath) @@ -191,8 +189,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val emptyRecords = recordsToStrings(dataGen.generateUpdates("003", 0)).toList val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) emptyDF.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(options) .mode(SaveMode.Append) .save(basePath) @@ -211,9 +208,10 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val hoodieIncViewDF3 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2) - .option(DataSourceReadOptions.INCR_PATH_GLOB.key, "/2016/*/*/*") + .option(DataSourceReadOptions.INCR_PATH_GLOB.key, if (isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*") .load(basePath) - assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count()) + assertEquals(hoodieIncViewDF2 + .filter(col("_hoodie_partition_path").startsWith("2016")).count(), hoodieIncViewDF3.count()) val timeTravelDF = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)