Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.catalog

import org.apache.hudi.DataSourceWriteOptions.OPERATION
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
Expand Down Expand Up @@ -198,6 +198,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
}

HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableConfigs.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue()))
.fromProperties(properties)
.setDatabaseName(catalogDatabaseName)
.setTableName(table.identifier.table)
Expand Down Expand Up @@ -290,12 +291,18 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
extraConfig(URL_ENCODE_PARTITIONING.key) = URL_ENCODE_PARTITIONING.defaultValue()
}

if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
if (catalogProperties.contains(KEYGENERATOR_CLASS_NAME.key)) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
catalogProperties.get(KEYGENERATOR_CLASS_NAME.key).get
} else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
} else {
val primaryKeys = table.properties.getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName, table.storage.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)).toString
var primaryKeys = table.properties.getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName, table.storage.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)).toString
if (primaryKeys.equals("None")){
primaryKeys = catalogProperties.get(RECORDKEY_FIELD.key).getOrElse(RECORDKEY_FIELD.defaultValue)
}
val partitions = table.partitionColumnNames.mkString(",")
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ object HoodieOptionConfig {
.withSqlKey("type")
.withHoodieKey(DataSourceWriteOptions.TABLE_TYPE.key)
.withTableConfigKey(HoodieTableConfig.TYPE.key)
.defaultValue(SQL_VALUE_TABLE_TYPE_COW)
.build()

val SQL_KEY_PRECOMBINE_FIELD: HoodieSQLOption[String] = buildConf()
Expand Down Expand Up @@ -195,10 +194,11 @@ object HoodieOptionConfig {
// validate primary key
val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
.map(_.split(",").filter(_.length > 0))
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
primaryKeys.get.foreach { primaryKey =>
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))),
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
if (primaryKeys.nonEmpty) {
primaryKeys.get.foreach { primaryKey =>
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))),
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
}
}

// validate preCombine key
Expand All @@ -210,11 +210,13 @@ object HoodieOptionConfig {

// validate table type
val tableType = sqlOptions.get(SQL_KEY_TABLE_TYPE.sqlKeyName)
ValidationUtils.checkArgument(tableType.nonEmpty, "No `type` is specified.")
ValidationUtils.checkArgument(
tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_COW) ||
tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_MOR),
s"'type' must be '$SQL_VALUE_TABLE_TYPE_COW' or '$SQL_VALUE_TABLE_TYPE_MOR'")

if(tableType.nonEmpty) {
ValidationUtils.checkArgument(
tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_COW) ||
tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_MOR),
s"'type' must be '$SQL_VALUE_TABLE_TYPE_COW' or '$SQL_VALUE_TABLE_TYPE_MOR'")
}
}

def buildConf[T](): HoodieSQLOptionBuilder[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,5 +1013,105 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
""".stripMargin)
checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName)
spark.sql(s"drop table $tableName")

import spark.implicits._
val partitionValue = "2022-11-05"
val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt")

// Test key generator class with composite keys by spark dataframe.saveAsTable
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(RECORDKEY_FIELD.key, "id,name")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.partitionBy("dt")
.mode(SaveMode.Overwrite)
.saveAsTable(tableName)
checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName)
spark.sql(s"drop table $tableName")
}

test("Test CTAS COW Table With DataFrame saveAsTable") {
import spark.implicits._
val partitionValue = "2022-11-05"
val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt")

val tableName = generateTableName
// Write a table by spark dataframe.saveAsTable
// Test default table type and key generator class
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.partitionBy("dt")
.mode(SaveMode.Overwrite)
.saveAsTable(tableName)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
val tablePath = table.storage.properties("path")
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(spark.sessionState.newHadoopConf())
.build()
val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap
assertResult(COW_TABLE_TYPE_OPT_VAL)(tableConfig(HoodieTableConfig.TYPE.key()))

checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue)
)

// Test insert into
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue),
Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
)
}

test("Test CTAS MOR Table With DataFrame saveAsTable") {
import spark.implicits._
val partitionValue = "2022-11-05"
val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt")

val tableName = generateTableName
// Write a table by spark dataframe.saveAsTable
// Test the compatibility of table type and key generator classes in DF with SQL
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
.option(PARTITIONPATH_FIELD.key, "dt")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
.partitionBy("dt")
.mode(SaveMode.Overwrite)
.saveAsTable(tableName)

val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
val tablePath = table.storage.properties("path")
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(tablePath)
.setConf(spark.sessionState.newHadoopConf())
.build()
val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap
assertResult(MOR_TABLE_TYPE_OPT_VAL)(tableConfig(HoodieTableConfig.TYPE.key()))

checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
Seq("id:1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue)
)

// Test insert into
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')")
checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")(
Seq("id:1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue),
Seq("id:2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness {
def testWithDefaultSqlOptions(): Unit = {
val ops1 = Map("primaryKey" -> "id")
val with1 = HoodieOptionConfig.withDefaultSqlOptions(ops1)
assertTrue(with1.size == 3)
assertTrue(with1.size == 2)
assertTrue(with1("primaryKey") == "id")
assertTrue(with1("type") == "cow")
assertTrue(with1("payloadClass") == classOf[OverwriteWithLatestAvroPayload].getName)

val ops2 = Map("primaryKey" -> "id",
Expand Down Expand Up @@ -106,65 +105,45 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness {
StructField("dt", StringType, true))
)

// miss primaryKey parameter
// primary field not found
val sqlOptions1 = baseSqlOptions ++ Map(
"primaryKey" -> "xxx",
"type" -> "mor"
)

val e1 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions1)
}
assertTrue(e1.getMessage.contains("No `primaryKey` is specified."))

// primary field not found
val sqlOptions2 = baseSqlOptions ++ Map(
"primaryKey" -> "xxx",
"type" -> "mor"
)
val e2 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions2)
}
assertTrue(e2.getMessage.contains("Can't find primaryKey"))
assertTrue(e1.getMessage.contains("Can't find primaryKey"))

// preCombine field not found
val sqlOptions3 = baseSqlOptions ++ Map(
val sqlOptions2 = baseSqlOptions ++ Map(
"primaryKey" -> "id",
"preCombineField" -> "ts",
"type" -> "mor"
)
val e3 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions3)
}
assertTrue(e3.getMessage.contains("Can't find preCombineKey"))

// miss type parameter
val sqlOptions4 = baseSqlOptions ++ Map(
"primaryKey" -> "id",
"preCombineField" -> "timestamp"
)
val e4 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions4)
val e2 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions2)
}
assertTrue(e4.getMessage.contains("No `type` is specified."))
assertTrue(e2.getMessage.contains("Can't find preCombineKey"))

// type is invalid
val sqlOptions5 = baseSqlOptions ++ Map(
val sqlOptions3 = baseSqlOptions ++ Map(
"primaryKey" -> "id",
"preCombineField" -> "timestamp",
"type" -> "abc"
)
val e5 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions5)
val e3 = intercept[IllegalArgumentException] {
HoodieOptionConfig.validateTable(spark, schema, sqlOptions3)
}
assertTrue(e5.getMessage.contains("'type' must be 'cow' or 'mor'"))
assertTrue(e3.getMessage.contains("'type' must be 'cow' or 'mor'"))

// right options and schema
val sqlOptions6 = baseSqlOptions ++ Map(
val sqlOptions4 = baseSqlOptions ++ Map(
"primaryKey" -> "id",
"preCombineField" -> "timestamp",
"type" -> "cow"
)
HoodieOptionConfig.validateTable(spark, schema, sqlOptions6)
HoodieOptionConfig.validateTable(spark, schema, sqlOptions4)
}

}