Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ object HoodieSparkSqlWriter {
.setBaseFileFormat(baseFileFormat)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME))
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
// but we are interested in what user has set, hence fetching from optParams.
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null))
.setPartitionFields(partitionColumns)
.setPopulateMetaFields(populateMetaFields)
.setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}

@Test def testNoPrecombine() {
// Insert Operation
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))

val commonOptsNoPreCombine = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
inputDF.write.format("hudi")
.options(commonOptsNoPreCombine)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

spark.read.format("org.apache.hudi").load(basePath).count()
}

@Test def testHoodieIsDeletedNonBooleanField() {
// Insert Operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,28 @@ class TestMORDataSource extends HoodieClientTestBase {
hudiSnapshotDF2.show(1)
}

@Test def testNoPrecombine() {
// Insert Operation
val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))

val commonOptsNoPreCombine = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
inputDF.write.format("hudi")
.options(commonOptsNoPreCombine)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), "MERGE_ON_READ")
.mode(SaveMode.Overwrite)
.save(basePath)

spark.read.format("org.apache.hudi").load(basePath).count()
}

@Test
def testPreCombineFiledForReadMOR(): Unit = {
writeData((1, "a0", 10, 100, false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,83 +263,100 @@ class TestCreateTable extends TestHoodieSqlBase {

test("Test Create Table As Select") {
withTempDir { tmp =>
// Create Non-Partitioned table
val tableName1 = generateTableName
spark.sql(
s"""
| create table $tableName1 using hudi
| tblproperties(primaryKey = 'id')
| location '${tmp.getCanonicalPath}/$tableName1'
| AS
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
Seq("cow", "mor").foreach { tableType =>
// Create Non-Partitioned table
val tableName1 = generateTableName
spark.sql(
s"""
| create table $tableName1 using hudi
| tblproperties(
| primaryKey = 'id',
| type = '$tableType'
| )
| location '${tmp.getCanonicalPath}/$tableName1'
| AS
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
""".stripMargin)
checkAnswer(s"select id, name, price, ts from $tableName1")(
Seq(1, "a1", 10.0, 1000)
)
checkAnswer(s"select id, name, price, ts from $tableName1")(
Seq(1, "a1", 10.0, 1000)
)

// Create Partitioned table
val tableName2 = generateTableName
spark.sql(
s"""
| create table $tableName2 using hudi
| partitioned by (dt)
| tblproperties(primaryKey = 'id')
| location '${tmp.getCanonicalPath}/$tableName2'
| AS
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
// Create Partitioned table
val tableName2 = generateTableName
spark.sql(
s"""
| create table $tableName2 using hudi
| partitioned by (dt)
| tblproperties(
| primaryKey = 'id',
| type = '$tableType'
| )
| location '${tmp.getCanonicalPath}/$tableName2'
| AS
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
""".stripMargin
)
checkAnswer(s"select id, name, price, dt from $tableName2")(
Seq(1, "a1", 10, "2021-04-01")
)
)
checkAnswer(s"select id, name, price, dt from $tableName2")(
Seq(1, "a1", 10, "2021-04-01")
)

// Create Partitioned table with timestamp data type
val tableName3 = generateTableName
// CTAS failed with null primaryKey
assertThrows[Exception] {
// Create Partitioned table with timestamp data type
val tableName3 = generateTableName
// CTAS failed with null primaryKey
assertThrows[Exception] {
spark.sql(
s"""
| create table $tableName3 using hudi
| partitioned by (dt)
| tblproperties(
| primaryKey = 'id',
| type = '$tableType'
| )
| location '${tmp.getCanonicalPath}/$tableName3'
| AS
| select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt
|
""".stripMargin
)
}
// Create table with timestamp type partition
spark.sql(
s"""
| create table $tableName3 using hudi
| partitioned by (dt)
| tblproperties(primaryKey = 'id')
| tblproperties(
| primaryKey = 'id',
| type = '$tableType'
| )
| location '${tmp.getCanonicalPath}/$tableName3'
| AS
| select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt
|
""".stripMargin
)
}
// Create table with timestamp type partition
spark.sql(
s"""
| create table $tableName3 using hudi
| partitioned by (dt)
| tblproperties(primaryKey = 'id')
| location '${tmp.getCanonicalPath}/$tableName3'
| AS
| select cast('2021-05-06 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as
| price
| select cast('2021-05-06 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as
| price
""".stripMargin
)
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")(
Seq(1, "a1", 10, "2021-05-06 00:00:00")
)
// Create table with date type partition
val tableName4 = generateTableName
spark.sql(
s"""
| create table $tableName4 using hudi
| partitioned by (dt)
| tblproperties(primaryKey = 'id')
| location '${tmp.getCanonicalPath}/$tableName4'
| AS
| select cast('2021-05-06' as date) as dt, 1 as id, 'a1' as name, 10 as
| price
)
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")(
Seq(1, "a1", 10, "2021-05-06 00:00:00")
)
// Create table with date type partition
val tableName4 = generateTableName
spark.sql(
s"""
| create table $tableName4 using hudi
| partitioned by (dt)
| tblproperties(
| primaryKey = 'id',
| type = '$tableType'
| )
| location '${tmp.getCanonicalPath}/$tableName4'
| AS
| select cast('2021-05-06' as date) as dt, 1 as id, 'a1' as name, 10 as
| price
""".stripMargin
)
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")(
Seq(1, "a1", 10, "2021-05-06")
)
)
checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")(
Seq(1, "a1", 10, "2021-05-06")
)
}
}
}

Expand Down