diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 332455ea217ef..dd2f5ab85ce33 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -45,6 +45,7 @@ trait ProvidesHoodieConfig extends Logging { def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = { val sparkSession: SparkSession = hoodieCatalogTable.spark + val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") val catalogProperties = hoodieCatalogTable.catalogProperties val tableConfig = hoodieCatalogTable.tableConfig @@ -71,18 +72,10 @@ trait ProvidesHoodieConfig extends Logging { KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, OPERATION.key -> UPSERT_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, + PARTITIONPATH_FIELD.key -> partitionFields, HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL - ) + ) ++ commonHiveSyncConfig(hiveSyncConfig, partitionFields) .filter { case(_, v) => v != null } } } @@ -114,7 +107,7 @@ trait ProvidesHoodieConfig extends Logging { val parameters = withSparkConf(sparkSession, catalogProperties)() - val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",") + val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") // NOTE: Here we fallback to "" to make sure that null value is not overridden with // default value ("ts") @@ -183,22 +176,53 @@ trait ProvidesHoodieConfig extends Logging { SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> preCombineField, - PARTITIONPATH_FIELD.key -> partitionFieldsStr, + PARTITIONPATH_FIELD.key -> partitionFields, PAYLOAD_CLASS_NAME.key -> payloadClassName, ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL - ) + ) ++ commonHiveSyncConfig(hiveSyncConfig, partitionFields) + .filter { case (_, v) => v != null } + } + } + + /** + * Build the default config for mergeInto. + */ + def buildMergeIntoConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = { + val sparkSession: SparkSession = hoodieCatalogTable.spark + val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") + val path = hoodieCatalogTable.tableLocation + + val catalogProperties = hoodieCatalogTable.catalogProperties + val tableConfig = hoodieCatalogTable.tableConfig + + // NOTE: Here we fallback to "" to make sure that null value is not overridden with + // default value ("ts") + // TODO(HUDI-3456) clean up + val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("") + + val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) + val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) + + withSparkConf(sparkSession, catalogProperties) { + Map( + "path" -> path, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + PRECOMBINE_FIELD.key -> preCombineField, + TBL_NAME.key -> hoodieCatalogTable.tableName, + PARTITIONPATH_FIELD.key -> partitionFields, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql + HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), + HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"), + SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL + ) ++ commonHiveSyncConfig(hiveSyncConfig, partitionFields) .filter { case (_, v) => v != null } } } @@ -223,16 +247,8 @@ trait ProvidesHoodieConfig extends Logging { PARTITIONS_TO_DELETE.key -> partitionsToDrop, RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), - PARTITIONPATH_FIELD.key -> partitionFields, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) - ) + PARTITIONPATH_FIELD.key -> partitionFields + ) ++ commonHiveSyncConfig(hiveSyncConfig, partitionFields) .filter { case (_, v) => v != null } } } @@ -253,7 +269,6 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) val options = hoodieCatalogTable.catalogProperties - val enableHive = isUsingHiveCatalog(sparkSession) val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") withSparkConf(sparkSession, options) { @@ -266,21 +281,27 @@ trait ProvidesHoodieConfig extends Logging { KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), + PARTITIONPATH_FIELD.key -> partitionFields, HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"), SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL - ) + ) ++ commonHiveSyncConfig(hiveSyncConfig, partitionFields) } } + def commonHiveSyncConfig(hiveSyncConfig: HiveSyncConfig, partitionFields: String): Map[String, String] = { + Map( + HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE), + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), + HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) + ) + + } + def getHoodieProps(catalogProperties: Map[String, String], tableConfig: HoodieTableConfig, conf: SQLConf, extraOptions: Map[String, String] = Map.empty): TypedProperties = { val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ conf.getAllConfs ++ extraOptions val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(options) @@ -296,6 +317,7 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig(props) hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_ENABLED.key, enableHive.toString) hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key, enableHive.toString) + // The default value of HIVE_SYNC_MODE is HMS. Be careful when modifying hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, props.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())) hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_PATH, hoodieCatalogTable.tableLocation) hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, hoodieCatalogTable.baseFileFormat) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 9919062cac70d..52f239dcf4e0c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -21,9 +21,6 @@ import org.apache.avro.Schema import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.hive.HiveSyncConfigHolder -import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast import org.apache.spark.sql._ @@ -183,8 +180,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie override def run(sparkSession: SparkSession): Seq[Row] = { this.sparkSession = sparkSession + // force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in MergeIntoHoodieTableCommand + val payloadClassName = Map(PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName) // Create the write parameters - val parameters = buildMergeIntoConfig(hoodieCatalogTable) + val parameters = buildMergeIntoConfig(hoodieCatalogTable) ++ payloadClassName executeUpsert(sourceDF, parameters) sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString) @@ -469,57 +468,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie }) }) } - - /** - * Create the config for hoodie writer. - */ - private def buildMergeIntoConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = { - - val targetTableDb = targetTableIdentify.database.getOrElse("default") - val targetTableName = targetTableIdentify.identifier - val path = hoodieCatalogTable.tableLocation - // force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in MergeIntoHoodieTableCommand - val catalogProperties = hoodieCatalogTable.catalogProperties + (PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName) - val tableConfig = hoodieCatalogTable.tableConfig - val tableSchema = hoodieCatalogTable.tableSchema - val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) - val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) - - // NOTE: Here we fallback to "" to make sure that null value is not overridden with - // default value ("ts") - // TODO(HUDI-3456) clean up - val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("") - - val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) - val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) - - withSparkConf(sparkSession, catalogProperties) { - Map( - "path" -> path, - RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, - PRECOMBINE_FIELD.key -> preCombineField, - TBL_NAME.key -> hoodieCatalogTable.tableName, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb, - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName, - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql - HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"), - HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"), - SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL - ) - .filter { case (_, v) => v != null } - } - } } object MergeIntoHoodieTableCommand {