Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ public class HoodieInternalConfig extends HoodieConfig {
.withDocumentation("For SQL operations, if enables bulk_insert operation, "
+ "this configure will take effect to decide overwrite whole table or partitions specified");

public static final ConfigProperty<Boolean> SQL_MERGE_INTO_WRITES = ConfigProperty
.key("hoodie.internal.sql.merge.into.writes")
.defaultValue(false)
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("This internal config is used by Merge Into SQL logic only to mark such use case "
+ "and let the core components know it should handle the write differently");

/**
* Returns if partition records are sorted or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,12 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of `org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");

/**
* Config key with boolean value that indicates whether record being written during MERGE INTO Spark SQL
* operation are already prepped.
*/
public static final String SPARK_SQL_MERGE_INTO_PREPPED_KEY = "_hoodie.spark.sql.merge.into.prepped";

private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
Expand All @@ -44,9 +43,8 @@
*/
public final class SparkHoodieIndexFactory {
public static HoodieIndex createIndex(HoodieWriteConfig config) {
boolean mergeIntoWrites = config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue());
if (mergeIntoWrites) {
boolean sqlMergeIntoPrepped = config.getProps().getBoolean(HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
if (sqlMergeIntoPrepped) {
return new HoodieInternalProxyIndex(config);
}
// first use index class config to create index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.util.Locale;
import java.util.Map;

import static org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES;
import static org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY;
import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE;
import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType;

Expand Down Expand Up @@ -80,7 +80,7 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx
boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props)
//Need to prevent overwriting the keygen for spark sql merge into because we need to extract
//the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed.
&& !props.getBoolean(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue());
&& !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
try {
KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
if (autoRecordKeyGen) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,10 @@ object DataSourceWriteOptions {
.withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")

/**
* Config key with boolean value that indicates whether record being written is already prepped.
* Config key with boolean value that indicates whether record being written during UPDATE or DELETE Spark SQL
* operations are already prepped.
*/
val DATASOURCE_WRITE_PREPPED_KEY = "_hoodie.datasource.write.prepped";
val SPARK_SQL_WRITES_PREPPED_KEY = "_hoodie.spark.sql.writes.prepped";

/**
* May be derive partition path from incoming df if not explicitly set.
Expand Down Expand Up @@ -641,12 +642,12 @@ object DataSourceWriteOptions {

val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS

val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty
.key("hoodie.spark.sql.writes.optimized.enable")
val SPARK_SQL_OPTIMIZED_WRITES: ConfigProperty[String] = ConfigProperty
.key("hoodie.spark.sql.optimized.writes.enable")
.defaultValue("true")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Controls whether spark sql optimized update is enabled.")
.withDocumentation("Controls whether spark sql prepped update, delete, and merge are enabled.")

/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.hudi

import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, DATASOURCE_WRITE_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, SPARK_SQL_WRITES_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
Expand All @@ -29,9 +29,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieInternalConfig
import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, SPARK_SQL_MERGE_INTO_PREPPED_KEY}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
import org.apache.spark.sql.execution.streaming.{Sink, Source}
Expand Down Expand Up @@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
rawDf: DataFrame): BaseRelation = {
val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue().toString))
.equalsIgnoreCase("true")) {
val df = if (optParams.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, "false").toBoolean || optParams.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean) {
rawDf // Don't remove meta columns for prepped write.
} else {
rawDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object HoodieCreateRecordUtils {
operation: WriteOperationType,
instantTime: String,
isPrepped: Boolean,
mergeIntoWrites: Boolean)
sqlMergeIntoPrepped: Boolean)

def createHoodieRecordRdd(args: createHoodieRecordRddArgs) = {
val df = args.df
Expand All @@ -70,7 +70,7 @@ object HoodieCreateRecordUtils {
val operation = args.operation
val instantTime = args.instantTime
val isPrepped = args.isPrepped
val mergeIntoWrites = args.mergeIntoWrites
val sqlMergeIntoPrepped = args.sqlMergeIntoPrepped

val shouldDropPartitionColumns = config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
val recordType = config.getRecordMerger.getRecordType
Expand Down Expand Up @@ -127,8 +127,8 @@ object HoodieCreateRecordUtils {
}

val (hoodieKey: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator, avroRec,
isPrepped, mergeIntoWrites)
val avroRecWithoutMeta: GenericRecord = if (isPrepped || mergeIntoWrites) {
isPrepped, sqlMergeIntoPrepped)
val avroRecWithoutMeta: GenericRecord = if (isPrepped || sqlMergeIntoPrepped) {
HoodieAvroUtils.rewriteRecord(avroRec, HoodieAvroUtils.removeMetadataFields(dataFileSchema))
} else {
avroRec
Expand Down Expand Up @@ -184,7 +184,7 @@ object HoodieCreateRecordUtils {
validateMetaFieldsInSparkRecords(sourceStructType)
validatePreppedRecord = false
}
val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped, mergeIntoWrites)
val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped, sqlMergeIntoPrepped)

val targetRow = finalStructTypeRowWriter(sourceRow)
var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, dataFileStructType, false)
Expand Down Expand Up @@ -220,8 +220,8 @@ object HoodieCreateRecordUtils {
}

def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: Option[BaseKeyGenerator], avroRec: GenericRecord,
isPrepped: Boolean, mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
//use keygen for mergeIntoWrites recordKey and partitionPath because the keygenerator handles
isPrepped: Boolean, sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
//use keygen for sqlMergeIntoPrepped recordKey and partitionPath because the keygenerator handles
//fetching from the meta fields if they are populated and otherwise doing keygen
val recordKey = if (isPrepped) {
avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
Expand All @@ -236,13 +236,13 @@ object HoodieCreateRecordUtils {
}

val hoodieKey = new HoodieKey(recordKey, partitionPath)
val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString)
}
else {
None
}
val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString)
}
else {
Expand All @@ -259,8 +259,8 @@ object HoodieCreateRecordUtils {

def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: Option[SparkKeyGeneratorInterface],
sourceRow: InternalRow, schema: StructType,
isPrepped: Boolean, mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
//use keygen for mergeIntoWrites recordKey and partitionPath because the keygenerator handles
isPrepped: Boolean, sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = {
//use keygen for sqlMergeIntoPrepped recordKey and partitionPath because the keygenerator handles
//fetching from the meta fields if they are populated and otherwise doing keygen
val recordKey = if (isPrepped) {
sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD)
Expand All @@ -274,13 +274,13 @@ object HoodieCreateRecordUtils {
sparkKeyGenerator.get.getPartitionPath(sourceRow, schema).toString
}

val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) {
val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
Option(sourceRow.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD))
} else {
None
}

val fileName: Option[String] = if (isPrepped || mergeIntoWrites) {
val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) {
Option(sourceRow.getString(HoodieRecord.FILENAME_META_FIELD_ORD))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.index.HoodieIndex
Expand Down Expand Up @@ -91,6 +91,14 @@ object HoodieSparkSqlWriter {
ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
.defaultValue(true)

/**
* For merge into from spark-sql, we need some special handling. for eg, schema validation should be disabled
* for writes from merge into. This config is used for internal purposes.
*/
val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] =
ConfigProperty.key("hoodie.internal.sql.merge.into.writes")
.defaultValue(false)

/**
* For spark streaming use-cases, holds the batch Id.
*/
Expand Down Expand Up @@ -250,7 +258,7 @@ object HoodieSparkSqlWriter {
case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED =>
val genericRecords = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace)
// Convert to RDD[HoodieKey]
val isPrepped = hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
val isPrepped = hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
val keyGenerator: Option[BaseKeyGenerator] = if (isPrepped) {
None
} else {
Expand Down Expand Up @@ -348,10 +356,9 @@ object HoodieSparkSqlWriter {
}

// Remove meta columns from writerSchema if isPrepped is true.
val isPrepped = hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false)
val mergeIntoWrites = parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean
val processedDataSchema = if (isPrepped || mergeIntoWrites) {
val isPrepped = hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false)
val sqlMergeIntoPrepped = parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean
val processedDataSchema = if (isPrepped || sqlMergeIntoPrepped) {
HoodieAvroUtils.removeMetadataFields(dataFileSchema)
} else {
dataFileSchema
Expand Down Expand Up @@ -388,7 +395,7 @@ object HoodieSparkSqlWriter {
val hoodieRecords =
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema,
processedDataSchema, operation, instantTime, isPrepped, mergeIntoWrites))
processedDataSchema, operation, instantTime, isPrepped, sqlMergeIntoPrepped))

val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
Expand Down Expand Up @@ -453,7 +460,7 @@ object HoodieSparkSqlWriter {
// in the table's one we want to proceed aligning nullability constraints w/ the table's schema
val shouldCanonicalizeNullable = opts.getOrDefault(CANONICALIZE_NULLABLE.key,
CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
val mergeIntoWrites = opts.getOrDefault(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(),
val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean

val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ object HoodieWriterUtils {
hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED)
hoodieConfig.setDefaultValue(ENABLE_OPTIMIZED_SQL_WRITES)
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi.command

import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, ENABLE_OPTIMIZED_SQL_WRITES}
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_WRITES_PREPPED_KEY, SPARK_SQL_OPTIMIZED_WRITES}
import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
Expand All @@ -40,8 +40,8 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunn

val condition = sparkAdapter.extractDeleteCondition(dft)

val targetLogicalPlan = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
, ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
val targetLogicalPlan = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
, SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
dft.table
} else {
stripMetaFieldAttributes(dft.table)
Expand All @@ -53,9 +53,9 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunn
targetLogicalPlan
}

val config = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key()
, ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") {
buildHoodieDeleteTableConfig(catalogTable, sparkSession) + (DATASOURCE_WRITE_PREPPED_KEY -> "true")
val config = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key()
, SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") {
buildHoodieDeleteTableConfig(catalogTable, sparkSession) + (SPARK_SQL_WRITES_PREPPED_KEY -> "true")
} else {
buildHoodieDeleteTableConfig(catalogTable, sparkSession)
}
Expand Down
Loading