Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ public class HoodieInternalConfig extends HoodieConfig {
.withDocumentation("For SQL operations, if enables bulk_insert operation, "
+ "this configure will take effect to decide overwrite whole table or partitions specified");

public static final ConfigProperty<Boolean> SQL_MERGE_INTO_WRITES = ConfigProperty
.key("hoodie.internal.sql.merge.into.writes")
.defaultValue(false)
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("This internal config is used by Merge Into SQL logic only to mark such use case "
+ "and let the core components know it should handle the write differently");

/**
* Returns if partition records are sorted or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of `org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");

public static final String WRITE_PREPPED_MERGE_KEY = "_hoodie.datasource.merge.prepped";

private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
Expand All @@ -44,8 +43,7 @@
*/
public final class SparkHoodieIndexFactory {
public static HoodieIndex createIndex(HoodieWriteConfig config) {
boolean mergeIntoWrites = config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue());
boolean mergeIntoWrites = config.getProps().getBoolean(HoodieWriteConfig.WRITE_PREPPED_MERGE_KEY, false);
if (mergeIntoWrites) {
return new HoodieInternalProxyIndex(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.util.Locale;
import java.util.Map;

import static org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_PREPPED_MERGE_KEY;
import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;

Expand Down Expand Up @@ -80,7 +80,7 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx
boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props)
//Need to prevent overwriting the keygen for spark sql merge into because we need to extract
//the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed.
&& !props.getBoolean(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue());
&& !props.getBoolean(WRITE_PREPPED_MERGE_KEY, false);
try {
KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
if (autoRecordKeyGen) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,11 +642,18 @@ object DataSourceWriteOptions {
val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS

val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty
.key("hoodie.spark.sql.writes.optimized.enable")
.key("hoodie.spark.sql.optimized.writes.enable")
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Controls whether spark sql optimized update is enabled.")
.withDocumentation("Controls whether spark sql prepped update and delete is enabled.")

val ENABLE_OPTIMIZED_SQL_MERGE_WRITES: ConfigProperty[String] = ConfigProperty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also thinking, from a user standpoint we should have just 1 config to enable or disable the optimized flow (irrespective of whether its mIT or updates or deletes).
but internally we can use diff configs if we wish to differentiate MIT and rest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

.key("hoodie.spark.sql.optimized.merge.enable")
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Controls whether spark sql prepped merge is enabled.")

/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieInternalConfig
import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, WRITE_PREPPED_MERGE_KEY}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
import org.apache.spark.sql.execution.streaming.{Sink, Source}
Expand Down Expand Up @@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
rawDf: DataFrame): BaseRelation = {
val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue().toString))
.equalsIgnoreCase("true")) {
val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, "false").toBoolean || optParams.getOrDefault(WRITE_PREPPED_MERGE_KEY, "false").toBoolean) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also rename the variable
DATASOURCE_WRITE_PREPPED_KEY to
SPARK_SQL_WRITE_PREPPED_KEY

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the config key be
_hoodie.spark.sql.writes.prepped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

rawDf // Don't remove meta columns for prepped write.
} else {
rawDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.config.HoodieWriteConfig.WRITE_PREPPED_MERGE_KEY
import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.index.HoodieIndex
Expand Down Expand Up @@ -91,6 +91,14 @@ object HoodieSparkSqlWriter {
ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
.defaultValue(true)

/**
* For merge into from spark-sql, we need some special handling. for eg, schema validation should be disabled
* for writes from merge into. This config is used for internal purposes.
*/
val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] =
ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
.defaultValue(false)

/**
* For spark streaming use-cases, holds the batch Id.
*/
Expand Down Expand Up @@ -349,8 +357,7 @@ object HoodieSparkSqlWriter {

// Remove meta columns from writerSchema if isPrepped is true.
val isPrepped = hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
val mergeIntoWrites = parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
val mergeIntoWrites = parameters.getOrDefault(WRITE_PREPPED_MERGE_KEY, "false").toBoolean
val processedDataSchema = if (isPrepped || mergeIntoWrites) {
HoodieAvroUtils.removeMetadataFields(dataFileSchema)
} else {
Expand Down Expand Up @@ -453,7 +460,7 @@ object HoodieSparkSqlWriter {
// in the table's one we want to proceed aligning nullability constraints w/ the table's schema
val shouldCanonicalizeNullable = opts.getOrDefault(CANONICALIZE_NULLABLE.key,
CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
val mergeIntoWrites = opts.getOrDefault(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean

val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
Expand Down Expand Up @@ -342,7 +342,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val tableMetaCols = mergeInto.targetTable.output.filter(a => isMetaField(a.name))
val joinData = sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
val incomingDataCols = joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
val projectedJoinPlan = Project(tableMetaCols ++ incomingDataCols, joinData)
val projectedJoinPlan = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_MERGE_WRITES.key(), ENABLE_OPTIMIZED_SQL_MERGE_WRITES.defaultValue()) == "true") {
Project(tableMetaCols ++ incomingDataCols, joinData)
} else {
Project(incomingDataCols, joinData)
}

val projectedJoinOutput = projectedJoinPlan.output

val requiredAttributesMap = recordKeyAttributeToConditionExpression ++ preCombineAttributeAssociatedExpression
Expand Down Expand Up @@ -617,6 +622,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie

val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig)

val enableOptimizedMerge = sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_MERGE_WRITES.key(),
ENABLE_OPTIMIZED_SQL_MERGE_WRITES.defaultValue())
val overridingOpts = Map(
"path" -> path,
RECORDKEY_FIELD.key -> tableConfig.getRawRecordKeyFieldProp,
Expand All @@ -625,7 +632,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[MergeIntoKeyGenerator].getCanonicalName,
KEYGENERATOR_CLASS_NAME.key -> (if (enableOptimizedMerge == "true") {
classOf[MergeIntoKeyGenerator].getCanonicalName
} else {
classOf[SqlKeyGenerator].getCanonicalName
}),
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
Expand All @@ -648,7 +659,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
RECONCILE_SCHEMA.key -> "false",
CANONICALIZE_NULLABLE.key -> "false",
SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key -> "true",
HoodieWriteConfig.WRITE_PREPPED_MERGE_KEY -> enableOptimizedMerge,
HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() -> (!StringUtils.isNullOrEmpty(preCombineField)).toString
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
""".stripMargin)

// test with optimized sql writes enabled / disabled.
spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
spark.sql(s"set hoodie.spark.sql.optimized.writes.enable=$optimizedSqlEnabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets see if we can use the variable to avoid any mis-steps

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
Expand Down Expand Up @@ -97,7 +97,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
""".stripMargin)

// test with optimized sql writes enabled.
spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true")
spark.sql(s"set hoodie.spark.sql.optimized.writes.enable=true")

// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
Expand Down Expand Up @@ -308,7 +308,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
|""".stripMargin)

// test with optimized sql writes enabled / disabled.
spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled")
spark.sql(s"set hoodie.spark.sql.optimized.writes.enable=$optimizedSqlEnabled")

// delete 2021-10-01 partition
if (urlencode) {
Expand Down
Loading