diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index 7da69b2c0bb03..bb682cf9b5ffe 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -162,18 +162,6 @@ object HoodieOptionConfig { .toMap } - /** - * Get the primary key from the table options. - * @param options - * @return - */ - def getPrimaryColumns(options: Map[String, String]): Array[String] = { - val params = mapSqlOptionsToDataSourceWriteConfigs(options) - params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key) - .map(_.split(",").filter(_.nonEmpty)) - .getOrElse(Array.empty) - } - /** * Get the table type from the table options. * @param options diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index bf6b6509b9550..77ff939cf2623 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.HoodieConversionUtils.toProperties +import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedProperties} import org.apache.hudi.common.model.{OverwriteWithLatestAvroPayload, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME @@ -28,12 +30,13 @@ import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, MultiPartKeys import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.HoodieSyncConfig -import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isUsingHiveCatalog, withSparkConf} +import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, isUsingHiveCatalog} +import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, withCombinedOptions} import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -45,7 +48,6 @@ trait ProvidesHoodieConfig extends Logging { def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = { val sparkSession: SparkSession = hoodieCatalogTable.spark - val catalogProperties = hoodieCatalogTable.catalogProperties val tableConfig = hoodieCatalogTable.tableConfig // NOTE: Here we fallback to "" to make sure that null value is not overridden with @@ -56,17 +58,14 @@ trait ProvidesHoodieConfig extends Logging { require(hoodieCatalogTable.primaryKeys.nonEmpty, s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator") - val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) + val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) - - withSparkConf(sparkSession, catalogProperties) { + withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { Map.apply( "path" -> hoodieCatalogTable.tableLocation, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), TBL_NAME.key -> hoodieCatalogTable.tableName, PRECOMBINE_FIELD.key -> preCombineField, - RECORD_MERGER_IMPLS.key -> hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue), HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, @@ -81,10 +80,8 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL ) - .filter { case(_, v) => v != null } } } @@ -109,12 +106,9 @@ trait ProvidesHoodieConfig extends Logging { val path = hoodieCatalogTable.tableLocation val tableType = hoodieCatalogTable.tableTypeName val tableConfig = hoodieCatalogTable.tableConfig - val catalogProperties = hoodieCatalogTable.catalogProperties - - val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf, extraOptions) - val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) - val parameters = withSparkConf(sparkSession, catalogProperties)() + val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions) + val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig, extraOptions) val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",") @@ -128,18 +122,21 @@ trait ProvidesHoodieConfig extends Logging { val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName) .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName) - val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, - DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean || - parameters.get(DataSourceWriteOptions.OPERATION.key).exists(_.equalsIgnoreCase(WriteOperationType.BULK_INSERT.value)) + val enableBulkInsert = combinedOpts.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, + DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean val dropDuplicate = sparkSession.conf .getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean - val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key, + val insertMode = InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key, DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue())) val isNonStrictMode = insertMode == InsertMode.NON_STRICT val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty - val operation = + + // NOTE: Target operation could be overridden by the user, therefore if it has been provided as an input + // we'd prefer that value over auto-deduced operation. Otherwise, we deduce target operation type + val operationOverride = combinedOpts.get(DataSourceWriteOptions.OPERATION.key) + val operation = operationOverride.getOrElse { (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, isNonStrictMode, isPartitionedTable) match { case (true, _, _, _, false, _) => throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.") @@ -161,6 +158,7 @@ trait ProvidesHoodieConfig extends Logging { // for the rest case, use the insert operation case _ => INSERT_OPERATION_OPT_VAL } + } val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL && tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) { @@ -176,10 +174,7 @@ trait ProvidesHoodieConfig extends Logging { classOf[OverwriteWithLatestAvroPayload].getCanonicalName } - - logInfo(s"Insert statement use write operation type: $operation, payloadClass: $payloadClassName") - - withSparkConf(sparkSession, catalogProperties) { + withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { Map( "path" -> path, TABLE_TYPE.key -> tableType, @@ -193,8 +188,6 @@ trait ProvidesHoodieConfig extends Logging { PRECOMBINE_FIELD.key -> preCombineField, PARTITIONPATH_FIELD.key -> partitionFieldsStr, PAYLOAD_CLASS_NAME.key -> payloadClassName, - RECORD_MERGER_IMPLS.key -> hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue), - ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), @@ -204,26 +197,20 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL ) - .filter { case (_, v) => v != null } } } - def buildHoodieDropPartitionsConfig( - sparkSession: SparkSession, - hoodieCatalogTable: HoodieCatalogTable, - partitionsToDrop: String): Map[String, String] = { + def buildHoodieDropPartitionsConfig(sparkSession: SparkSession, + hoodieCatalogTable: HoodieCatalogTable, + partitionsToDrop: String): Map[String, String] = { val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") - val catalogProperties = hoodieCatalogTable.catalogProperties val tableConfig = hoodieCatalogTable.tableConfig - val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) - val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - withSparkConf(sparkSession, catalogProperties) { + withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { Map( "path" -> hoodieCatalogTable.tableLocation, TBL_NAME.key -> hoodieCatalogTable.tableName, @@ -242,14 +229,12 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) ) - .filter { case (_, v) => v != null } } } def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable, sparkSession: SparkSession): Map[String, String] = { val path = hoodieCatalogTable.tableLocation - val catalogProperties = hoodieCatalogTable.catalogProperties val tableConfig = hoodieCatalogTable.tableConfig val tableSchema = hoodieCatalogTable.tableSchema val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase(Locale.ROOT)) @@ -258,14 +243,9 @@ trait ProvidesHoodieConfig extends Logging { assert(hoodieCatalogTable.primaryKeys.nonEmpty, s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operation") - val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) - val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - val options = hoodieCatalogTable.catalogProperties - val enableHive = isUsingHiveCatalog(sparkSession) - val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") - - withSparkConf(sparkSession, options) { + withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { Map( "path" -> path, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), @@ -282,24 +262,20 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","), HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL ) } } - def getHoodieProps(catalogProperties: Map[String, String], tableConfig: HoodieTableConfig, conf: SQLConf, extraOptions: Map[String, String] = Map.empty): TypedProperties = { - val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ conf.getAllConfs ++ extraOptions - val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(options) - hoodieConfig.getProps - } + def buildHiveSyncConfig(sparkSession: SparkSession, + hoodieCatalogTable: HoodieCatalogTable, + tableConfig: HoodieTableConfig, + extraOptions: Map[String, String] = Map.empty): HiveSyncConfig = { + val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions) + val props = new TypedProperties(toProperties(combinedOpts)) - def buildHiveSyncConfig( - props: TypedProperties, - hoodieCatalogTable: HoodieCatalogTable, - sparkSession: SparkSession = SparkSession.active): HiveSyncConfig = { // Enable the hive sync by default if spark have enable the hive metastore. val enableHive = isUsingHiveCatalog(sparkSession) val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig(props) @@ -325,3 +301,39 @@ trait ProvidesHoodieConfig extends Logging { hiveSyncConfig } } + +object ProvidesHoodieConfig { + + def filterNullValues(opts: Map[String, String]): Map[String, String] = + opts.filter { case (_, v) => v != null } + + def withCombinedOptions(catalogTable: HoodieCatalogTable, + tableConfig: HoodieTableConfig, + sqlConf: SQLConf)(optionOverrides: Map[String, String] = Map.empty): Map[String, String] = { + combineOptions(catalogTable, tableConfig, sqlConf, optionOverrides) + } + + private def combineOptions(catalogTable: HoodieCatalogTable, + tableConfig: HoodieTableConfig, + sqlConf: SQLConf, + optionOverrides: Map[String, String] = Map.empty): Map[String, String] = { + // NOTE: Properties are merged in the following order of priority (first has the highest priority, last has the + // lowest, which is inverse to the ordering in the code): + // 1. (Extra) Option overrides + // 2. Spark SQL configs + // 3. Persisted Hudi's Table configs + // 4. Table's properties in Spark Catalog + // 5. Global DFS properties + DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++ + // NOTE: Catalog table provided t/h `TBLPROPERTIES` clause might contain Spark SQL specific + // properties that need to be mapped into Hudi's conventional ones + mapSqlOptionsToDataSourceWriteConfigs(catalogTable.catalogProperties) ++ + tableConfig.getProps.asScala.toMap ++ + filterHoodieConfigs(sqlConf.getAllConfs) ++ + filterNullValues(optionOverrides) + } + + private def filterHoodieConfigs(opts: Map[String, String]): Map[String, String] = + opts.filterKeys(isHoodieConfigKey) + +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 1f8d009530146..0e19514e28c7b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.util.ConfigUtils @@ -92,13 +93,11 @@ case class CreateHoodieTableAsSelectCommand( hoodieCatalogTable.initHoodieTable() val tableProperties = hoodieCatalogTable.catalogProperties - // NOTE: Users might be specifying write-configuration (inadvertently) as options or table properties - // in CTAS, therefore we need to make sure that these are appropriately propagated to the - // write operation - val options = tableProperties ++ Map( + val options = Map( HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString, HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tableProperties.asJava), HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(updatedTable.properties.asJava), + HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> "false", DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(), DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true" ) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 7befb97c7b831..9099c7225e0f9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -20,11 +20,10 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.model.HoodieAvroRecordMerger +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TBL_NAME} -import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sync.common.HoodieSyncConfig @@ -38,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId +import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ @@ -491,8 +491,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val targetTableDb = targetTableIdentify.database.getOrElse("default") val targetTableName = targetTableIdentify.identifier val path = hoodieCatalogTable.tableLocation - // force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in MergeIntoHoodieTableCommand - val catalogProperties = hoodieCatalogTable.catalogProperties + (PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName) val tableConfig = hoodieCatalogTable.tableConfig val tableSchema = hoodieCatalogTable.tableSchema val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) @@ -503,10 +501,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // TODO(HUDI-3456) clean up val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("") - val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) - val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - withSparkConf(sparkSession, catalogProperties) { + withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { Map( "path" -> path, RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, @@ -525,10 +522,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), - HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL, + PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, // NOTE: We have to explicitly override following configs to make sure no schema validation is performed // as schema of the incoming dataset might be diverging from the table's schema (full schemas' @@ -539,7 +534,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie RECONCILE_SCHEMA.key -> "false", "hoodie.datasource.write.schema.canonicalize" -> "false" ) - .filter { case (_, v) => v != null } } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala index f237a1dcbb432..513f40a4c8c57 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala @@ -84,8 +84,7 @@ class HiveSyncProcedure extends BaseProcedure with ProcedureBuilder hiveConf.addResource(hadoopConf) val tableConfig = hoodieCatalogTable.tableConfig - val hoodieProps = getHoodieProps(hoodieCatalogTable.catalogProperties, tableConfig, sqlConf) - val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) var hiveSyncTool: HiveSyncTool = null try { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index cf37f7436fd2b..6beed5cfc39a1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -23,11 +23,14 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.HoodieAvroRecordMerger import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.ExceptionUtil.getRootCause import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex import org.apache.log4j.Level import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.checkMessageContains import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.util.Utils import org.joda.time.DateTimeZone @@ -144,8 +147,11 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { try { spark.sql(sql) } catch { - case e: Throwable if e.getMessage.trim.contains(errorMsg.trim) => hasException = true - case f: Throwable => fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f) + case e: Throwable if checkMessageContains(e, errorMsg) || checkMessageContains(getRootCause(e), errorMsg) => + hasException = true + + case f: Throwable => + fail("Exception should contain: " + errorMsg + ", error message: " + f.getMessage, f) } assertResult(true)(hasException) } @@ -219,3 +225,19 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { } } } + +object HoodieSparkSqlTestBase { + + def getLastCommitMetadata(spark: SparkSession, tablePath: String) = { + val metaClient = HoodieTableMetaClient.builder() + .setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(tablePath) + .build() + + metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getRight + } + + private def checkMessageContains(e: Throwable, text: String): Boolean = + e.getMessage.trim.contains(text.trim) + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 9fc910c4f1f16..120c6adb6cdf4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieSparkUtils -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName import org.apache.hudi.config.HoodieWriteConfig @@ -28,6 +28,7 @@ import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertFalse @@ -299,6 +300,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | AS | select 1 as id, 'a1' as name, 10 as price, 1000 as ts """.stripMargin) + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName1").getOperationType + } checkAnswer(s"select id, name, price, ts from $tableName1")( Seq(1, "a1", 10.0, 1000) ) @@ -318,6 +323,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt """.stripMargin ) + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName2").getOperationType + } checkAnswer(s"select id, name, price, dt from $tableName2")( Seq(1, "a1", 10, "2021-04-01") ) @@ -356,9 +365,14 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | price """.stripMargin ) + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName3").getOperationType + } checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")( Seq(1, "a1", 10, "2021-05-06 00:00:00") ) + // Create table with date type partition val tableName4 = generateTableName spark.sql( @@ -375,6 +389,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase { | price """.stripMargin ) + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName4").getOperationType + } checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")( Seq(1, "a1", 10, "2021-05-06") ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 868a96ebc2cf7..a227a20b8b6d0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -22,11 +22,13 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType +import org.apache.hudi.common.model.WriteOperationType import org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata import java.io.File @@ -725,13 +727,20 @@ class TestInsertTable extends HoodieSparkSqlTestBase { spark.sql("set hoodie.sql.bulk.insert.enable = true") spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')") + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType + } checkAnswer(s"select id, name, price, dt from $tableName")( Seq(1, "a1", 10.0, "2021-07-18") ) + // Disable the bulk insert spark.sql("set hoodie.sql.bulk.insert.enable = false") spark.sql(s"insert into $tableName values(2, 'a2', 10, '2021-07-18')") + assertResult(WriteOperationType.INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType + } checkAnswer(s"select id, name, price, dt from $tableName order by id")( Seq(1, "a1", 10.0, "2021-07-18"), Seq(2, "a2", 10.0, "2021-07-18")