Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.TimestampBasedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Disabled, Tag}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}

Expand All @@ -51,46 +51,46 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false"
)

val verificationCol: String = "driver"
val updatedVerificationVal: String = "driver_update"

@ParameterizedTest
@CsvSource(Array(
"true,org.apache.hudi.keygen.SimpleKeyGenerator",
"true,org.apache.hudi.keygen.ComplexKeyGenerator",
"true,org.apache.hudi.keygen.TimestampBasedKeyGenerator",
"false,org.apache.hudi.keygen.SimpleKeyGenerator",
"false,org.apache.hudi.keygen.ComplexKeyGenerator",
"false,org.apache.hudi.keygen.TimestampBasedKeyGenerator"
))
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String): Unit = {
commonOpts += DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass
if (classOf[ComplexKeyGenerator].getName.equals(keyGenClass)) {
commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key, pii_col"
}
if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
commonOpts += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
commonOpts += DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "current_ts"
commonOpts += Config.TIMESTAMP_TYPE_FIELD_PROP -> "EPOCHMILLISECONDS"
commonOpts += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
@CsvSource(value = Array(
"true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes",
"true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
"false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,nation.bytes",
"false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
), delimiter = '|')
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String): Unit = {
var options: Map[String, String] = commonOpts +
(HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) +
(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
(DataSourceWriteOptions.RECORDKEY_FIELD.key() -> recordKeys)
val isTimestampBasedKeyGen: Boolean = classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
if (isTimestampBasedKeyGen) {
options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
options += Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING"
options += Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd"
options += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
}
val dataGen = new HoodieTestDataGenerator(0xDEED)
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
// Insert Operation
val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
inputDF0.write.format("org.apache.hudi")
.options(commonOpts)
.options(options)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)

assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)

// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi")
Expand All @@ -102,7 +102,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
val verificationRowKey = inputDF1.limit(1).select("_row_key").first.getString(0)
var updateDf: DataFrame = null
if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
if (isTimestampBasedKeyGen) {
// update current_ts to be same as original record so that partition path does not change with timestamp based key gen
val originalRow = snapshotDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0)
updateDf = inputDF1.filter(col("_row_key") === verificationRowKey)
Expand All @@ -116,8 +116,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
}

updateDf.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(options)
.mode(SaveMode.Append)
.save(basePath)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
Expand All @@ -132,7 +131,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
val records2 = recordsToStrings(dataGen.generateUpdates("002", 100)).toList
var inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))

if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
if (isTimestampBasedKeyGen) {
// incase of Timestamp based key gen, current_ts should not be updated. but dataGen.generateUpdates() would have updated
// the value of current_ts. So, we need to revert it back to original value.
// here is what we are going to do. Copy values to temp columns, join with original df and update the current_ts
Expand All @@ -152,8 +151,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()

inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(options)
.mode(SaveMode.Append)
.save(basePath)

Expand Down Expand Up @@ -191,8 +189,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
val emptyRecords = recordsToStrings(dataGen.generateUpdates("003", 0)).toList
val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
emptyDF.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(options)
.mode(SaveMode.Append)
.save(basePath)

Expand All @@ -211,9 +208,10 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
.option(DataSourceReadOptions.INCR_PATH_GLOB.key, "/2016/*/*/*")
.option(DataSourceReadOptions.INCR_PATH_GLOB.key, if (isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
.load(basePath)
assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
assertEquals(hoodieIncViewDF2
.filter(col("_hoodie_partition_path").startsWith("2016")).count(), hoodieIncViewDF3.count())

val timeTravelDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
Expand Down