Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,15 @@ object DataSourceWriteOptions {
.sinceVersion("0.14.0")
.withDocumentation("Controls whether spark sql prepped update, delete, and merge are enabled.")

val OVERWRITE_MODE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.overwrite.mode")
.noDefaultValue()
.withValidValues("STATIC", "DYNAMIC")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Controls whether overwrite use dynamic or static mode, if not configured, " +
"respect spark.sql.sources.partitionOverwriteMode")

/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
@Deprecated
val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.{DataSourceWriteOptions, config}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedProperties}
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, isUsin
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PARTITION_OVERWRITE_MODE
import org.apache.spark.sql.types.StructType

import java.util.Locale
Expand Down Expand Up @@ -108,9 +109,9 @@ trait ProvidesHoodieConfig extends Logging {
private def deduceSqlWriteOperation(isOverwritePartition: Boolean, isOverwriteTable: Boolean,
sqlWriteOperation: String): String = {
if (isOverwriteTable) {
WriteOperationType.INSERT_OVERWRITE_TABLE.name()
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
} else if (isOverwritePartition) {
WriteOperationType.INSERT_OVERWRITE.name()
INSERT_OVERWRITE_OPERATION_OPT_VAL
} else {
sqlWriteOperation
}
Expand Down Expand Up @@ -208,7 +209,7 @@ trait ProvidesHoodieConfig extends Logging {
// or when both configs are set, or when only sql write operation is set), we honor sql write operation and ignore
// the insert mode.
val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet
val operation = combinedOpts.getOrElse(OPERATION.key,
var operation = combinedOpts.getOrElse(OPERATION.key,
if (useLegacyInsertModeFlow) {
// NOTE: Target operation could be overridden by the user, therefore if it has been provided as an input
// we'd prefer that value over auto-deduced operation. Otherwise, we deduce target operation type
Expand All @@ -219,6 +220,32 @@ trait ProvidesHoodieConfig extends Logging {
}
)

val overwriteTableOpts = if (operation.equals(BULK_INSERT_OPERATION_OPT_VAL)) {
if (isOverwriteTable) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
} else if (isOverwritePartition) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
} else {
Map()
}
} else if (operation.equals(INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)) {
if (sqlWriteOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) {
operation = BULK_INSERT_OPERATION_OPT_VAL
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
} else {
Map()
}
} else if (operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) {
if (sqlWriteOperation.equals(BULK_INSERT_OPERATION_OPT_VAL) || enableBulkInsert) {
operation = BULK_INSERT_OPERATION_OPT_VAL
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
} else {
Map()
}
} else {
Map()
}

// try to use new insert dup policy instead of legacy insert mode to deduce payload class. If only insert mode is explicitly specified,
// w/o specifying any value for insert dup policy, legacy configs will be honored. But on all other cases (i.e when neither of the configs is set,
// or when both configs are set, or when only insert dup policy is set), we honor insert dup policy and ignore the insert mode.
Expand Down Expand Up @@ -257,17 +284,6 @@ trait ProvidesHoodieConfig extends Logging {
null
}

val overwriteTableOpts = if (operation.equals(BULK_INSERT_OPERATION_OPT_VAL)) {
if (isOverwriteTable) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value())
} else if (isOverwritePartition) {
Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value())
} else {
Map()
}
} else {
Map()
}
val overridingOpts = extraOptions ++ Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
Expand Down Expand Up @@ -300,6 +316,44 @@ trait ProvidesHoodieConfig extends Logging {
}
}

def deduceIsOverwriteTable(sparkSession: SparkSession,
catalogTable: HoodieCatalogTable,
partitionSpec: Map[String, Option[String]],
extraOptions: Map[String, String]): Boolean = {
val combinedOpts: Map[String, String] = combineOptions(catalogTable, catalogTable.tableConfig, sparkSession.sqlContext.conf,
defaultOpts = Map.empty, overridingOpts = extraOptions)
val operation = combinedOpts.getOrElse(OPERATION.key, null)
operation match {
case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =>
true
case INSERT_OVERWRITE_OPERATION_OPT_VAL =>
false
case _ =>
// NonPartitioned table always insert overwrite whole table
if (catalogTable.partitionFields.isEmpty) {
true
} else {
// Insert overwrite partitioned table with PARTITION clause will always insert overwrite the specific partition
if (partitionSpec.nonEmpty) {
false
} else {
// If hoodie.datasource.overwrite.mode configured, respect it, otherwise respect spark.sql.sources.partitionOverwriteMode
val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key,
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()

hoodieOverwriteMode match {
case "STATIC" =>
true
case "DYNAMIC" =>
false
case _ =>
throw new IllegalArgumentException("Config hoodie.datasource.overwrite.mode is illegal")
}
}
}
}
}

def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
partitionsToDrop: String): Map[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
var mode = SaveMode.Append
var isOverWriteTable = false
var isOverWritePartition = false
if (overwrite && partitionSpec.isEmpty) {
// insert overwrite table
mode = SaveMode.Overwrite
isOverWriteTable = true
} else {
// for insert into or insert overwrite partition we use append mode.
mode = SaveMode.Append
isOverWritePartition = overwrite

if (overwrite) {
if (deduceIsOverwriteTable(sparkSession, catalogTable, partitionSpec, extraOptions)) {
isOverWriteTable = true
mode = SaveMode.Overwrite
} else {
isOverWritePartition = true
}
}

val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
Expand Down
Loading