Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,15 @@ object DataSourceWriteOptions {
.sinceVersion("0.14.0")
.withDocumentation("Controls whether spark sql prepped update, delete, and merge are enabled.")

val OVERWRITE_MODE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.overwrite.mode")
.noDefaultValue()
.withValidValues("STATIC", "DYNAMIC")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Controls whether overwrite use dynamic or static mode, if not configured, " +
"respect spark.sql.sources.partitionOverwriteMode")

/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
@Deprecated
val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.{DataSourceWriteOptions, config}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedProperties}
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, isUsin
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
import org.apache.spark.sql.types.StructType

import java.util.Locale
Expand Down Expand Up @@ -108,9 +109,9 @@ trait ProvidesHoodieConfig extends Logging {
private def deduceSqlWriteOperation(isOverwritePartition: Boolean, isOverwriteTable: Boolean,
sqlWriteOperation: String): String = {
if (isOverwriteTable) {
WriteOperationType.INSERT_OVERWRITE_TABLE.name()
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
} else if (isOverwritePartition) {
WriteOperationType.INSERT_OVERWRITE.name()
INSERT_OVERWRITE_OPERATION_OPT_VAL
} else {
sqlWriteOperation
}
Expand Down Expand Up @@ -208,7 +209,7 @@ trait ProvidesHoodieConfig extends Logging {
// or when both configs are set, or when only sql write operation is set), we honor sql write operation and ignore
// the insert mode.
val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet
val operation = combinedOpts.getOrElse(OPERATION.key,
var operation = combinedOpts.getOrElse(OPERATION.key,
if (useLegacyInsertModeFlow) {
// NOTE: Target operation could be overridden by the user, therefore if it has been provided as an input
// we'd prefer that value over auto-deduced operation. Otherwise, we deduce target operation type
Expand All @@ -219,6 +220,32 @@ trait ProvidesHoodieConfig extends Logging {
}
)

val overwriteTableOpts = if (operation.equals(BULK_INSERT_OPERATION_OPT_VAL)) {
if (isOverwriteTable) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
} else if (isOverwritePartition) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
} else {
Map()
}
} else if (operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)) {
if (sqlWriteOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) {
operation = BULK_INSERT_OPERATION_OPT_VAL
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
} else {
Map()
}
} else if (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) {
if (sqlWriteOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) {
operation = BULK_INSERT_OPERATION_OPT_VAL
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
} else {
Map()
}
} else {
Map()
}

// try to use new insert dup policy instead of legacy insert mode to deduce payload class. If only insert mode is explicitly specified,
// w/o specifying any value for insert dup policy, legacy configs will be honored. But on all other cases (i.e when neither of the configs is set,
// or when both configs are set, or when only insert dup policy is set), we honor insert dup policy and ignore the insert mode.
Expand Down Expand Up @@ -257,17 +284,6 @@ trait ProvidesHoodieConfig extends Logging {
null
}

val overwriteTableOpts = if (operation.equals(BULK_INSERT_OPERATION_OPT_VAL)) {
if (isOverwriteTable) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
} else if (isOverwritePartition) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
} else {
Map()
}
} else {
Map()
}
val overridingOpts = extraOptions ++ Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
Expand Down Expand Up @@ -300,6 +316,45 @@ trait ProvidesHoodieConfig extends Logging {
}
}

def deduceIsOverwriteTable(sparkSession: SparkSession,
catalogTable: HoodieCatalogTable,
partitionSpec: Map[String, Option[String]]): Boolean = {
val operation = sparkSession.sqlContext.getConf(OPERATION.key, "")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use combineOptions() here to keep the sources of HUDI configure consistent with others.

if (operation.nonEmpty && operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)) {
true
} else if (operation.nonEmpty && operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) {
false
} else {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Scala match expression can be more clear?

    operation match {
      case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =>
        true
      case INSERT_OPERATION_OPT_VAL =>
        false
      case _ =>
        // NonPartitioned table always insert overwrite whole table
        if (catalogTable.partitionFields.isEmpty) {
          true
        } else {
          // Insert overwrite partitioned table with PARTITION clause will always insert overwrite the specific partition
          if (partitionSpec.nonEmpty) {
            false
          } else {
            // If hoodie.datasource.overwrite.mode configured, respect it
            val hoodieOverwriteMode = sparkSession.sqlContext.getConf(OVERWRITE_MODE.key,
              sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
            
            hoodieOverwriteMode match {
              case "STATIC" =>
                true
              case "DYNAMIC" =>
                false
              case _ =>
                throw new IllegalArgumentException("xxx")
            }
          }
        }
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, I'll refine the code later

// NonPartitioned table always insert overwrite whole table
if (catalogTable.partitionFields.isEmpty) {
true
} else {
// Insert overwrite partitioned table with PARTITION clause will always insert overwrite the specific partition
if (partitionSpec.nonEmpty) {
false
} else {
// If hoodie.datasource.overwrite.mode configured, respect it
val hoodieOverwriteMode = sparkSession.sqlContext.getConf(OVERWRITE_MODE.key, "").toUpperCase()
if (hoodieOverwriteMode.isEmpty) {
val sparkOverwriteMode = sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key).toUpperCase()
if (sparkOverwriteMode.equals(PartitionOverwriteMode.STATIC.toString)) {
true
} else {
false
}
} else {
OVERWRITE_MODE.checkValues(hoodieOverwriteMode)
if (hoodieOverwriteMode.equals("STATIC")) {
true
} else {
false
}
}
}
}
}
}

def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
partitionsToDrop: String): Map[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
var mode = SaveMode.Append
var isOverWriteTable = false
var isOverWritePartition = false
if (overwrite && partitionSpec.isEmpty) {
// insert overwrite table
mode = SaveMode.Overwrite
isOverWriteTable = true
} else {
// for insert into or insert overwrite partition we use append mode.
mode = SaveMode.Append
isOverWritePartition = overwrite

if (overwrite) {
if (deduceIsOverwriteTable(sparkSession, catalogTable, partitionSpec)) {
isOverWriteTable = true
mode = SaveMode.Overwrite
} else {
isOverWritePartition = true
}
}

val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
Expand Down
Loading