Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hudi.HoodieOptionConfig
import org.apache.spark.sql.hudi.HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY
import org.apache.spark.sql.hudi.HoodieOptionConfig._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, SparkSession}
Expand Down Expand Up @@ -216,20 +217,21 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
val globalTableConfigs = mappingSparkDatasourceConfigsToTableConfigs(globalProps)
val globalSqlOptions = HoodieOptionConfig.mappingTableConfigToSqlOption(globalTableConfigs)
val globalSqlOptions = mapTableConfigsToSqlOptions(globalTableConfigs)

val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlOptions ++ catalogProperties)
val sqlOptions = withDefaultSqlOptions(globalSqlOptions ++
mapDataSourceWriteOptionsToSqlOptions(catalogProperties) ++ catalogProperties)
Comment on lines +222 to +223
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when use saveAsTable(), table is not yet created, hence no table configs will be available and catalogProperties contains hoodie.datasource.write.* configs, which should be converted to sql options, which will be later stored as table configs. In catalogProperties, existing sql options should take precedence over the corresponding hoodie.datasource.write.*.


// get final schema and parameters
val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists) match {
case (CatalogTableType.EXTERNAL, true) =>
val existingTableConfig = tableConfig.getProps.asScala.toMap
val currentTableConfig = globalTableConfigs ++ existingTableConfig
val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties)
val catalogTableProps = mapSqlOptionsToTableConfigs(catalogProperties)
validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig))

val options = extraTableConfig(hoodieTableExists, currentTableConfig) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig
mapSqlOptionsToTableConfigs(sqlOptions) ++ currentTableConfig

val schemaFromMetaOpt = loadTableSchemaByMetaClient()
val schema = if (schemaFromMetaOpt.nonEmpty) {
Expand All @@ -243,19 +245,19 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
(schema, options)

case (_, false) =>
ValidationUtils.checkArgument(table.schema.nonEmpty,
checkArgument(table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = table.schema
val options = extraTableConfig(tableExists = false, globalTableConfigs) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
mapSqlOptionsToTableConfigs(sqlOptions)
(addMetaFields(schema), options)

case (CatalogTableType.MANAGED, true) =>
throw new AnalysisException(s"Can not create the managed table('$catalogTableName')" +
s". The associated location('$tableLocation') already exists.")
}
HoodieOptionConfig.validateTable(spark, finalSchema,
HoodieOptionConfig.mappingTableConfigToSqlOption(tableConfigs))
mapTableConfigsToSqlOptions(tableConfigs))

val resolver = spark.sessionState.conf.resolver
val dataSchema = finalSchema.filterNot { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,22 @@ object HoodieOptionConfig {
/**
* The mapping of the sql short name key to the hoodie's config key.
*/
private lazy val keyMapping: Map[String, String] = {
private lazy val sqlOptionKeyToWriteConfigKey: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieSQLOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
.map(option => option.sqlKeyName -> option.hoodieKeyName)
.toMap
}

private lazy val writeConfigKeyToSqlOptionKey: Map[String, String] =
sqlOptionKeyToWriteConfigKey.map(f => f._2 -> f._1)

/**
* The mapping of the sql short name key to the hoodie table config key
* defined in HoodieTableConfig.
*/
private lazy val keyTableConfigMapping: Map[String, String] = {
private lazy val sqlOptionKeyToTableConfigKey: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieSQLOption[_]])
.map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]})
Expand All @@ -100,52 +103,54 @@ object HoodieOptionConfig {
.toMap
}

private lazy val tableConfigKeyToSqlKey: Map[String, String] =
keyTableConfigMapping.map(f => f._2 -> f._1)
private lazy val tableConfigKeyToSqlOptionKey: Map[String, String] =
sqlOptionKeyToTableConfigKey.map(f => f._2 -> f._1)

/**
* Mapping of the short sql value to the hoodie's config value
*/
private val valueMapping: Map[String, String] = Map (
private val sqlOptionValueToWriteConfigValue: Map[String, String] = Map (
SQL_VALUE_TABLE_TYPE_COW -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
SQL_VALUE_TABLE_TYPE_MOR -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
)

private lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1)
private lazy val writeConfigValueToSqlOptionValue = sqlOptionValueToWriteConfigValue.map(f => f._2 -> f._1)

def withDefaultSqlOptions(options: Map[String, String]): Map[String, String] = defaultSqlOptions ++ options

/**
* Mapping the sql's short name key/value in the options to the hoodie's config key/value.
* @param options
* @return
* Map SQL options to data source write configs.
*/
def mappingSqlOptionToHoodieParam(options: Map[String, String]): Map[String, String] = {
def mapSqlOptionsToDataSourceWriteConfigs(options: Map[String, String]): Map[String, String] = {
options.map (kv =>
keyMapping.getOrElse(kv._1, kv._1) -> valueMapping.getOrElse(kv._2, kv._2))
sqlOptionKeyToWriteConfigKey.getOrElse(kv._1, kv._1) -> sqlOptionValueToWriteConfigValue.getOrElse(kv._2, kv._2))
}

/**
* Mapping the sql options to the hoodie table config which used to store to the hoodie
* .properties when create the table.
* @param options
* @return
* Mapping the data source write configs to SQL options.
*/
def mapDataSourceWriteOptionsToSqlOptions(options: Map[String, String]): Map[String, String] = {
options.map(kv => writeConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) -> writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
}

/**
* Map SQL options to table configs.
*/
def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = {
def mapSqlOptionsToTableConfigs(options: Map[String, String]): Map[String, String] = {
options.map { case (k, v) =>
if (keyTableConfigMapping.contains(k)) {
keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v)
if (sqlOptionKeyToTableConfigKey.contains(k)) {
sqlOptionKeyToTableConfigKey(k) -> sqlOptionValueToWriteConfigValue.getOrElse(v, v)
} else {
k -> v
}
}
}

/**
* Mapping the table config (loaded from the hoodie.properties) to the sql options.
* Map table configs to SQL options.
*/
def mappingTableConfigToSqlOption(options: Map[String, String]): Map[String, String] = {
options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2))
def mapTableConfigsToSqlOptions(options: Map[String, String]): Map[String, String] = {
options.map(kv => tableConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) -> writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
}

val defaultSqlOptions: Map[String, String] = {
Expand All @@ -163,7 +168,7 @@ object HoodieOptionConfig {
* @return
*/
def getPrimaryColumns(options: Map[String, String]): Array[String] = {
val params = mappingSqlOptionToHoodieParam(options)
val params = mapSqlOptionsToDataSourceWriteConfigs(options)
params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
.map(_.split(",").filter(_.nonEmpty))
.getOrElse(Array.empty)
Expand All @@ -175,24 +180,24 @@ object HoodieOptionConfig {
* @return
*/
def getTableType(options: Map[String, String]): String = {
val params = mappingSqlOptionToHoodieParam(options)
val params = mapSqlOptionsToDataSourceWriteConfigs(options)
params.getOrElse(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.TABLE_TYPE.defaultValue)
}

def getPreCombineField(options: Map[String, String]): Option[String] = {
val params = mappingSqlOptionToHoodieParam(options)
val params = mapSqlOptionsToDataSourceWriteConfigs(options)
params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key).filter(_.nonEmpty)
}

def deleteHoodieOptions(options: Map[String, String]): Map[String, String] = {
options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => keyMapping.contains(kv._1))
options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => sqlOptionKeyToWriteConfigKey.contains(kv._1))
}

// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
val sqlOptions = mappingTableConfigToSqlOption(options)
val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
val sqlOptions = mapTableConfigsToSqlOptions(options)
val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName)
sqlOptions.filterKeys(targetOptions.contains)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def withSparkConf(spark: SparkSession, options: Map[String, String])
(baseConfig: Map[String, String] = Map.empty): Map[String, String] = {
baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority
(spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options))
(spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(options))
.filterKeys(isHoodieConfigKey)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness {
"hoodie.index.type" -> "INMEMORY",
"hoodie.compact.inline" -> "true"
)
val tableConfigs = HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
val tableConfigs = HoodieOptionConfig.mapSqlOptionsToTableConfigs(sqlOptions)

assertTrue(tableConfigs.size == 5)
assertTrue(tableConfigs(HoodieTableConfig.RECORDKEY_FIELDS.key) == "id,addr")
Expand Down