Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public class HoodieInternalConfig extends HoodieConfig {
.withDocumentation("For SQL operations, if enables bulk_insert operation, "
+ "this configure will take effect to decide overwrite whole table or partitions specified");

public static final ConfigProperty<Boolean> ENABLE_PREPPED_MERGE_WRITES = ConfigProperty
.key("hoodie.internal.sql.prepped.merge.enabled")
.defaultValue(true)
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Controls whether spark sql optimized merge is enabled.");

public static final ConfigProperty<Boolean> SQL_MERGE_INTO_WRITES = ConfigProperty
.key("hoodie.internal.sql.merge.into.writes")
.defaultValue(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
*/
public final class SparkHoodieIndexFactory {
public static HoodieIndex createIndex(HoodieWriteConfig config) {
boolean mergeIntoWrites = config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue());
boolean mergeIntoWrites = config.getProps().getBoolean(HoodieInternalConfig.ENABLE_PREPPED_MERGE_WRITES.key(),
HoodieInternalConfig.ENABLE_PREPPED_MERGE_WRITES.defaultValue());
if (mergeIntoWrites) {
return new HoodieInternalProxyIndex(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.util.Locale;
import java.util.Map;

import static org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES;
import static org.apache.hudi.config.HoodieInternalConfig.ENABLE_PREPPED_MERGE_WRITES;
import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;

Expand Down Expand Up @@ -80,7 +80,7 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx
boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props)
//Need to prevent overwriting the keygen for spark sql merge into because we need to extract
//the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed.
&& !props.getBoolean(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue());
&& !props.getBoolean(ENABLE_PREPPED_MERGE_WRITES.key(), ENABLE_PREPPED_MERGE_WRITES.defaultValue());
try {
KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
if (autoRecordKeyGen) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,14 @@ object DataSourceWriteOptions {
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Controls whether spark sql optimized update is enabled.")
.withDocumentation("Controls whether spark sql prepped update and delete is enabled.")

val ENABLE_OPTIMIZED_MERGE_WRITES: ConfigProperty[String] = ConfigProperty
.key("hoodie.spark.sql.optimized.merge.enable")
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Controls whether spark sql prepped merge is enabled.")

/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieInternalConfig
import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.config.HoodieInternalConfig.{ENABLE_PREPPED_MERGE_WRITES, SQL_MERGE_INTO_WRITES}
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
Expand Down Expand Up @@ -146,8 +146,10 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
rawDf: DataFrame): BaseRelation = {
// AKL_TODO: check if this function is called before ENABLE_PREPPED_MERGE_WRITES is set, but for now
// the default is always true, so sequence should not make a difference.
val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue().toString))
optParams.getOrDefault(ENABLE_PREPPED_MERGE_WRITES.key(), ENABLE_PREPPED_MERGE_WRITES.defaultValue().toString))
.equalsIgnoreCase("true")) {
rawDf // Don't remove meta columns for prepped write.
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.config.HoodieInternalConfig.{ENABLE_PREPPED_MERGE_WRITES, SQL_MERGE_INTO_WRITES}
import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.index.HoodieIndex
Expand Down Expand Up @@ -349,8 +349,7 @@ object HoodieSparkSqlWriter {

// Remove meta columns from writerSchema if isPrepped is true.
val isPrepped = hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
val mergeIntoWrites = parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
val mergeIntoWrites = parameters.getOrDefault(ENABLE_PREPPED_MERGE_WRITES.key(), ENABLE_PREPPED_MERGE_WRITES.defaultValue.toString).toBoolean
val processedDataSchema = if (isPrepped || mergeIntoWrites) {
HoodieAvroUtils.removeMetadataFields(dataFileSchema)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val tableMetaCols = mergeInto.targetTable.output.filter(a => isMetaField(a.name))
val joinData = sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
val incomingDataCols = joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
val projectedJoinPlan = Project(tableMetaCols ++ incomingDataCols, joinData)
val projectedJoinPlan = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_MERGE_WRITES.key(), ENABLE_OPTIMIZED_MERGE_WRITES.defaultValue()) == "true") {
Project(tableMetaCols ++ incomingDataCols, joinData)
} else {
Project(incomingDataCols, joinData)
}

val projectedJoinOutput = projectedJoinPlan.output

val requiredAttributesMap = recordKeyAttributeToConditionExpression ++ preCombineAttributeAssociatedExpression
Expand Down Expand Up @@ -649,6 +654,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
CANONICALIZE_NULLABLE.key -> "false",
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key -> "true",
HoodieInternalConfig.ENABLE_PREPPED_MERGE_WRITES.key -> sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_MERGE_WRITES.key(), ENABLE_OPTIMIZED_MERGE_WRITES.defaultValue()),
HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() -> (!StringUtils.isNullOrEmpty(preCombineField)).toString
)

Expand Down