Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,10 @@ trait ProvidesHoodieConfig extends Logging {
val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
// operation can not be overwrite
val options = hoodieCatalogTable.catalogProperties.-(OPERATION.key())

withSparkConf(sparkSession, options) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,51 @@ class TestDeleteTable extends TestHoodieSqlBase {
}
}
}

test("Test Delete Table with op upsert") {
withTempDir { tmp =>
Seq("cow", "mor").foreach {tableType =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.datasource.write.operation = 'upsert'
| )
""".stripMargin)
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

// delete data from table
spark.sql(s"delete from $tableName where id = 1")
checkAnswer(s"select count(1) from $tableName") (
Seq(0)
)

spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000")
spark.sql(s"delete from $tableName where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(2, "a2", 10.0, 1000)
)

spark.sql(s"delete from $tableName")
checkAnswer(s"select count(1) from $tableName")(
Seq(0)
)
}
}
}
}