diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 7bd6dd2244f4f..826c0090b212c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -23,14 +23,15 @@ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} -import org.apache.hudi.common.util.{StringUtils, ValidationUtils} +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper} import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hudi.HoodieOptionConfig -import org.apache.spark.sql.hudi.HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY +import org.apache.spark.sql.hudi.HoodieOptionConfig._ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -216,20 +217,21 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = { val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap val globalTableConfigs = mappingSparkDatasourceConfigsToTableConfigs(globalProps) - val globalSqlOptions = HoodieOptionConfig.mappingTableConfigToSqlOption(globalTableConfigs) + val globalSqlOptions = mapTableConfigsToSqlOptions(globalTableConfigs) - val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlOptions ++ catalogProperties) + val sqlOptions = withDefaultSqlOptions(globalSqlOptions ++ + mapDataSourceWriteOptionsToSqlOptions(catalogProperties) ++ catalogProperties) // get final schema and parameters val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists) match { case (CatalogTableType.EXTERNAL, true) => val existingTableConfig = tableConfig.getProps.asScala.toMap val currentTableConfig = globalTableConfigs ++ existingTableConfig - val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties) + val catalogTableProps = mapSqlOptionsToTableConfigs(catalogProperties) validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig)) val options = extraTableConfig(hoodieTableExists, currentTableConfig) ++ - HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig + mapSqlOptionsToTableConfigs(sqlOptions) ++ currentTableConfig val schemaFromMetaOpt = loadTableSchemaByMetaClient() val schema = if (schemaFromMetaOpt.nonEmpty) { @@ -243,11 +245,11 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten (schema, options) case (_, false) => - ValidationUtils.checkArgument(table.schema.nonEmpty, + checkArgument(table.schema.nonEmpty, s"Missing schema for Create Table: $catalogTableName") val schema = table.schema val options = extraTableConfig(tableExists = false, globalTableConfigs) ++ - HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) + mapSqlOptionsToTableConfigs(sqlOptions) (addMetaFields(schema), options) case (CatalogTableType.MANAGED, true) => @@ -255,7 +257,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten s". The associated location('$tableLocation') already exists.") } HoodieOptionConfig.validateTable(spark, finalSchema, - HoodieOptionConfig.mappingTableConfigToSqlOption(tableConfigs)) + mapTableConfigsToSqlOptions(tableConfigs)) val resolver = spark.sessionState.conf.resolver val dataSchema = finalSchema.filterNot { f => diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index df75b60f5494e..7219af6c4af0e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -79,7 +79,7 @@ object HoodieOptionConfig { /** * The mapping of the sql short name key to the hoodie's config key. */ - private lazy val keyMapping: Map[String, String] = { + private lazy val sqlOptionKeyToWriteConfigKey: Map[String, String] = { HoodieOptionConfig.getClass.getDeclaredFields .filter(f => f.getType == classOf[HoodieSQLOption[_]]) .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]}) @@ -87,11 +87,14 @@ object HoodieOptionConfig { .toMap } + private lazy val writeConfigKeyToSqlOptionKey: Map[String, String] = + sqlOptionKeyToWriteConfigKey.map(f => f._2 -> f._1) + /** * The mapping of the sql short name key to the hoodie table config key * defined in HoodieTableConfig. */ - private lazy val keyTableConfigMapping: Map[String, String] = { + private lazy val sqlOptionKeyToTableConfigKey: Map[String, String] = { HoodieOptionConfig.getClass.getDeclaredFields .filter(f => f.getType == classOf[HoodieSQLOption[_]]) .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]}) @@ -100,41 +103,43 @@ object HoodieOptionConfig { .toMap } - private lazy val tableConfigKeyToSqlKey: Map[String, String] = - keyTableConfigMapping.map(f => f._2 -> f._1) + private lazy val tableConfigKeyToSqlOptionKey: Map[String, String] = + sqlOptionKeyToTableConfigKey.map(f => f._2 -> f._1) /** * Mapping of the short sql value to the hoodie's config value */ - private val valueMapping: Map[String, String] = Map ( + private val sqlOptionValueToWriteConfigValue: Map[String, String] = Map ( SQL_VALUE_TABLE_TYPE_COW -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, SQL_VALUE_TABLE_TYPE_MOR -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL ) - private lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1) + private lazy val writeConfigValueToSqlOptionValue = sqlOptionValueToWriteConfigValue.map(f => f._2 -> f._1) def withDefaultSqlOptions(options: Map[String, String]): Map[String, String] = defaultSqlOptions ++ options /** - * Mapping the sql's short name key/value in the options to the hoodie's config key/value. - * @param options - * @return + * Map SQL options to data source write configs. */ - def mappingSqlOptionToHoodieParam(options: Map[String, String]): Map[String, String] = { + def mapSqlOptionsToDataSourceWriteConfigs(options: Map[String, String]): Map[String, String] = { options.map (kv => - keyMapping.getOrElse(kv._1, kv._1) -> valueMapping.getOrElse(kv._2, kv._2)) + sqlOptionKeyToWriteConfigKey.getOrElse(kv._1, kv._1) -> sqlOptionValueToWriteConfigValue.getOrElse(kv._2, kv._2)) } /** - * Mapping the sql options to the hoodie table config which used to store to the hoodie - * .properties when create the table. - * @param options - * @return + * Mapping the data source write configs to SQL options. + */ + def mapDataSourceWriteOptionsToSqlOptions(options: Map[String, String]): Map[String, String] = { + options.map(kv => writeConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) -> writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2)) + } + + /** + * Map SQL options to table configs. */ - def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = { + def mapSqlOptionsToTableConfigs(options: Map[String, String]): Map[String, String] = { options.map { case (k, v) => - if (keyTableConfigMapping.contains(k)) { - keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v) + if (sqlOptionKeyToTableConfigKey.contains(k)) { + sqlOptionKeyToTableConfigKey(k) -> sqlOptionValueToWriteConfigValue.getOrElse(v, v) } else { k -> v } @@ -142,10 +147,10 @@ object HoodieOptionConfig { } /** - * Mapping the table config (loaded from the hoodie.properties) to the sql options. + * Map table configs to SQL options. */ - def mappingTableConfigToSqlOption(options: Map[String, String]): Map[String, String] = { - options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2)) + def mapTableConfigsToSqlOptions(options: Map[String, String]): Map[String, String] = { + options.map(kv => tableConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) -> writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2)) } val defaultSqlOptions: Map[String, String] = { @@ -163,7 +168,7 @@ object HoodieOptionConfig { * @return */ def getPrimaryColumns(options: Map[String, String]): Array[String] = { - val params = mappingSqlOptionToHoodieParam(options) + val params = mapSqlOptionsToDataSourceWriteConfigs(options) params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key) .map(_.split(",").filter(_.nonEmpty)) .getOrElse(Array.empty) @@ -175,24 +180,24 @@ object HoodieOptionConfig { * @return */ def getTableType(options: Map[String, String]): String = { - val params = mappingSqlOptionToHoodieParam(options) + val params = mapSqlOptionsToDataSourceWriteConfigs(options) params.getOrElse(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.TABLE_TYPE.defaultValue) } def getPreCombineField(options: Map[String, String]): Option[String] = { - val params = mappingSqlOptionToHoodieParam(options) + val params = mapSqlOptionsToDataSourceWriteConfigs(options) params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key).filter(_.nonEmpty) } def deleteHoodieOptions(options: Map[String, String]): Map[String, String] = { - options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => keyMapping.contains(kv._1)) + options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => sqlOptionKeyToWriteConfigKey.contains(kv._1)) } // extract primaryKey, preCombineField, type options def extractSqlOptions(options: Map[String, String]): Map[String, String] = { - val sqlOptions = mappingTableConfigToSqlOption(options) - val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName) + val sqlOptions = mapTableConfigsToSqlOptions(options) + val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_MERGER_STRATEGY.sqlKeyName) sqlOptions.filterKeys(targetOptions.contains) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index aff65672c5203..eb26ef52d34d8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -263,7 +263,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { def withSparkConf(spark: SparkSession, options: Map[String, String]) (baseConfig: Map[String, String] = Map.empty): Map[String, String] = { baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority - (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options)) + (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs(options)) .filterKeys(isHoodieConfigKey) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala index ef84eb2c89c0a..2cb9c98878b29 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala @@ -56,7 +56,7 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness { "hoodie.index.type" -> "INMEMORY", "hoodie.compact.inline" -> "true" ) - val tableConfigs = HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) + val tableConfigs = HoodieOptionConfig.mapSqlOptionsToTableConfigs(sqlOptions) assertTrue(tableConfigs.size == 5) assertTrue(tableConfigs(HoodieTableConfig.RECORDKEY_FIELDS.key) == "id,addr")