diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java index a1c575c4f414b..797df196441a7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java @@ -46,14 +46,6 @@ public class HoodieInternalConfig extends HoodieConfig { .withDocumentation("For SQL operations, if enables bulk_insert operation, " + "this configure will take effect to decide overwrite whole table or partitions specified"); - public static final ConfigProperty SQL_MERGE_INTO_WRITES = ConfigProperty - .key("hoodie.internal.sql.merge.into.writes") - .defaultValue(false) - .markAdvanced() - .sinceVersion("0.14.0") - .withDocumentation("This internal config is used by Merge Into SQL logic only to mark such use case " - + "and let the core components know it should handle the write differently"); - /** * Returns if partition records are sorted or not. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 5f594e553af0f..1581e21c070be 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -721,6 +721,12 @@ public class HoodieWriteConfig extends HoodieConfig { + "The class must be a subclass of `org.apache.hudi.callback.HoodieClientInitCallback`." + "By default, no Hudi client init callback is executed."); + /** + * Config key with boolean value that indicates whether record being written during MERGE INTO Spark SQL + * operation are already prepped. + */ + public static final String SPARK_SQL_MERGE_INTO_PREPPED_KEY = "_hoodie.spark.sql.merge.into.prepped"; + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java index c908f40e4f091..eebaf0f05bac8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.config.HoodieInternalConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; @@ -44,9 +43,8 @@ */ public final class SparkHoodieIndexFactory { public static HoodieIndex createIndex(HoodieWriteConfig config) { - boolean mergeIntoWrites = config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(), - HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue()); - if (mergeIntoWrites) { + boolean sqlMergeIntoPrepped = config.getProps().getBoolean(HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY, false); + if (sqlMergeIntoPrepped) { return new HoodieInternalProxyIndex(config); } // first use index class config to create index. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java index 235bd63b99f40..cceaad0a78507 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java @@ -45,7 +45,7 @@ import java.util.Locale; import java.util.Map; -import static org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES; +import static org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY; import static org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_TYPE; import static org.apache.hudi.keygen.KeyGenUtils.inferKeyGeneratorType; @@ -80,7 +80,7 @@ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOEx boolean autoRecordKeyGen = KeyGenUtils.enableAutoGenerateRecordKeys(props) //Need to prevent overwriting the keygen for spark sql merge into because we need to extract //the recordkey from the meta cols if it exists. Sql keygen will use pkless keygen if needed. - && !props.getBoolean(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue()); + && !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false); try { KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props); if (autoRecordKeyGen) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 841f1088db6dc..6fb84932c1374 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -311,9 +311,10 @@ object DataSourceWriteOptions { .withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.") /** - * Config key with boolean value that indicates whether record being written is already prepped. + * Config key with boolean value that indicates whether record being written during UPDATE or DELETE Spark SQL + * operations are already prepped. */ - val DATASOURCE_WRITE_PREPPED_KEY = "_hoodie.datasource.write.prepped"; + val SPARK_SQL_WRITES_PREPPED_KEY = "_hoodie.spark.sql.writes.prepped"; /** * May be derive partition path from incoming df if not explicitly set. @@ -641,12 +642,12 @@ object DataSourceWriteOptions { val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS - val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty - .key("hoodie.spark.sql.writes.optimized.enable") + val SPARK_SQL_OPTIMIZED_WRITES: ConfigProperty[String] = ConfigProperty + .key("hoodie.spark.sql.optimized.writes.enable") .defaultValue("true") .markAdvanced() .sinceVersion("0.14.0") - .withDocumentation("Controls whether spark sql optimized update is enabled.") + .withDocumentation("Controls whether spark sql prepped update, delete, and merge are enabled.") /** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */ @Deprecated diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 1b961ba411a22..8fe36ec71becb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -19,7 +19,7 @@ package org.apache.hudi import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, DATASOURCE_WRITE_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER} +import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, SPARK_SQL_WRITES_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER} import org.apache.hudi.cdc.CDCRelation import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} @@ -29,9 +29,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ConfigUtils import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY -import org.apache.hudi.config.HoodieInternalConfig -import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES -import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE +import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, SPARK_SQL_MERGE_INTO_PREPPED_KEY} import org.apache.hudi.exception.HoodieException import org.apache.hudi.util.PathUtils import org.apache.spark.sql.execution.streaming.{Sink, Source} @@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider mode: SaveMode, optParams: Map[String, String], rawDf: DataFrame): BaseRelation = { - val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, - optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue().toString)) - .equalsIgnoreCase("true")) { + val df = if (optParams.getOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, "false").toBoolean || optParams.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean) { rawDf // Don't remove meta columns for prepped write. } else { rawDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala index 09d6c7a106a21..d59edc64bf894 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala @@ -57,7 +57,7 @@ object HoodieCreateRecordUtils { operation: WriteOperationType, instantTime: String, isPrepped: Boolean, - mergeIntoWrites: Boolean) + sqlMergeIntoPrepped: Boolean) def createHoodieRecordRdd(args: createHoodieRecordRddArgs) = { val df = args.df @@ -70,7 +70,7 @@ object HoodieCreateRecordUtils { val operation = args.operation val instantTime = args.instantTime val isPrepped = args.isPrepped - val mergeIntoWrites = args.mergeIntoWrites + val sqlMergeIntoPrepped = args.sqlMergeIntoPrepped val shouldDropPartitionColumns = config.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS) val recordType = config.getRecordMerger.getRecordType @@ -127,8 +127,8 @@ object HoodieCreateRecordUtils { } val (hoodieKey: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = HoodieCreateRecordUtils.getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator, avroRec, - isPrepped, mergeIntoWrites) - val avroRecWithoutMeta: GenericRecord = if (isPrepped || mergeIntoWrites) { + isPrepped, sqlMergeIntoPrepped) + val avroRecWithoutMeta: GenericRecord = if (isPrepped || sqlMergeIntoPrepped) { HoodieAvroUtils.rewriteRecord(avroRec, HoodieAvroUtils.removeMetadataFields(dataFileSchema)) } else { avroRec @@ -184,7 +184,7 @@ object HoodieCreateRecordUtils { validateMetaFieldsInSparkRecords(sourceStructType) validatePreppedRecord = false } - val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped, mergeIntoWrites) + val (key: HoodieKey, recordLocation: Option[HoodieRecordLocation]) = HoodieCreateRecordUtils.getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator, sourceRow, sourceStructType, isPrepped, sqlMergeIntoPrepped) val targetRow = finalStructTypeRowWriter(sourceRow) var hoodieSparkRecord = new HoodieSparkRecord(key, targetRow, dataFileStructType, false) @@ -220,8 +220,8 @@ object HoodieCreateRecordUtils { } def getHoodieKeyAndMaybeLocationFromAvroRecord(keyGenerator: Option[BaseKeyGenerator], avroRec: GenericRecord, - isPrepped: Boolean, mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { - //use keygen for mergeIntoWrites recordKey and partitionPath because the keygenerator handles + isPrepped: Boolean, sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { + //use keygen for sqlMergeIntoPrepped recordKey and partitionPath because the keygenerator handles //fetching from the meta fields if they are populated and otherwise doing keygen val recordKey = if (isPrepped) { avroRec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString @@ -236,13 +236,13 @@ object HoodieCreateRecordUtils { } val hoodieKey = new HoodieKey(recordKey, partitionPath) - val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) { + val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) { Option(avroRec.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).map(_.toString) } else { None } - val fileName: Option[String] = if (isPrepped || mergeIntoWrites) { + val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) { Option(avroRec.get(HoodieRecord.FILENAME_METADATA_FIELD)).map(_.toString) } else { @@ -259,8 +259,8 @@ object HoodieCreateRecordUtils { def getHoodieKeyAndMayBeLocationFromSparkRecord(sparkKeyGenerator: Option[SparkKeyGeneratorInterface], sourceRow: InternalRow, schema: StructType, - isPrepped: Boolean, mergeIntoWrites: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { - //use keygen for mergeIntoWrites recordKey and partitionPath because the keygenerator handles + isPrepped: Boolean, sqlMergeIntoPrepped: Boolean): (HoodieKey, Option[HoodieRecordLocation]) = { + //use keygen for sqlMergeIntoPrepped recordKey and partitionPath because the keygenerator handles //fetching from the meta fields if they are populated and otherwise doing keygen val recordKey = if (isPrepped) { sourceRow.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD) @@ -274,13 +274,13 @@ object HoodieCreateRecordUtils { sparkKeyGenerator.get.getPartitionPath(sourceRow, schema).toString } - val instantTime: Option[String] = if (isPrepped || mergeIntoWrites) { + val instantTime: Option[String] = if (isPrepped || sqlMergeIntoPrepped) { Option(sourceRow.getString(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD)) } else { None } - val fileName: Option[String] = if (isPrepped || mergeIntoWrites) { + val fileName: Option[String] = if (isPrepped || sqlMergeIntoPrepped) { Option(sourceRow.getString(HoodieRecord.FILENAME_META_FIELD_ORD)) } else { None diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 272b55b7f2084..ca1359578d1b1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -45,7 +45,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} -import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES +import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException} import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} import org.apache.hudi.index.HoodieIndex @@ -91,6 +91,14 @@ object HoodieSparkSqlWriter { ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable") .defaultValue(true) + /** + * For merge into from spark-sql, we need some special handling. for eg, schema validation should be disabled + * for writes from merge into. This config is used for internal purposes. + */ + val SQL_MERGE_INTO_WRITES: ConfigProperty[Boolean] = + ConfigProperty.key("hoodie.internal.sql.merge.into.writes") + .defaultValue(false) + /** * For spark streaming use-cases, holds the batch Id. */ @@ -250,7 +258,7 @@ object HoodieSparkSqlWriter { case WriteOperationType.DELETE | WriteOperationType.DELETE_PREPPED => val genericRecords = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace) // Convert to RDD[HoodieKey] - val isPrepped = hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false) + val isPrepped = hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false) val keyGenerator: Option[BaseKeyGenerator] = if (isPrepped) { None } else { @@ -348,10 +356,9 @@ object HoodieSparkSqlWriter { } // Remove meta columns from writerSchema if isPrepped is true. - val isPrepped = hoodieConfig.getBooleanOrDefault(DATASOURCE_WRITE_PREPPED_KEY, false) - val mergeIntoWrites = parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(), - SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean - val processedDataSchema = if (isPrepped || mergeIntoWrites) { + val isPrepped = hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITES_PREPPED_KEY, false) + val sqlMergeIntoPrepped = parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean + val processedDataSchema = if (isPrepped || sqlMergeIntoPrepped) { HoodieAvroUtils.removeMetadataFields(dataFileSchema) } else { dataFileSchema @@ -388,7 +395,7 @@ object HoodieSparkSqlWriter { val hoodieRecords = HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema, - processedDataSchema, operation, instantTime, isPrepped, mergeIntoWrites)) + processedDataSchema, operation, instantTime, isPrepped, sqlMergeIntoPrepped)) val dedupedHoodieRecords = if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) { @@ -453,7 +460,7 @@ object HoodieSparkSqlWriter { // in the table's one we want to proceed aligning nullability constraints w/ the table's schema val shouldCanonicalizeNullable = opts.getOrDefault(CANONICALIZE_NULLABLE.key, CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean - val mergeIntoWrites = opts.getOrDefault(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(), + val mergeIntoWrites = opts.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index cf8d85f704ef8..405e761635a22 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -82,7 +82,6 @@ object HoodieWriterUtils { hoodieConfig.setDefaultValue(RECONCILE_SCHEMA) hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS) hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED) - hoodieConfig.setDefaultValue(ENABLE_OPTIMIZED_SQL_WRITES) Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index d10b3d529f510..55f2ebb8ac679 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, ENABLE_OPTIMIZED_SQL_WRITES} +import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_WRITES_PREPPED_KEY, SPARK_SQL_OPTIMIZED_WRITES} import org.apache.hudi.SparkAdapterSupport import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable @@ -40,8 +40,8 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunn val condition = sparkAdapter.extractDeleteCondition(dft) - val targetLogicalPlan = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key() - , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") { + val targetLogicalPlan = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key() + , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") { dft.table } else { stripMetaFieldAttributes(dft.table) @@ -53,9 +53,9 @@ case class DeleteHoodieTableCommand(dft: DeleteFromTable) extends HoodieLeafRunn targetLogicalPlan } - val config = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key() - , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") { - buildHoodieDeleteTableConfig(catalogTable, sparkSession) + (DATASOURCE_WRITE_PREPPED_KEY -> "true") + val config = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key() + , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") { + buildHoodieDeleteTableConfig(catalogTable, sparkSession) + (SPARK_SQL_WRITES_PREPPED_KEY -> "true") } else { buildHoodieDeleteTableConfig(catalogTable, sparkSession) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index d4c72e5bfb464..eba75c95452b5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -25,7 +25,7 @@ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.HoodieAvroRecordMerger import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, TBL_NAME} -import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.sync.common.HoodieSyncConfig @@ -342,7 +342,12 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val tableMetaCols = mergeInto.targetTable.output.filter(a => isMetaField(a.name)) val joinData = sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE") val incomingDataCols = joinData.output.filterNot(mergeInto.targetTable.outputSet.contains) - val projectedJoinPlan = Project(tableMetaCols ++ incomingDataCols, joinData) + val projectedJoinPlan = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") { + Project(tableMetaCols ++ incomingDataCols, joinData) + } else { + Project(incomingDataCols, joinData) + } + val projectedJoinOutput = projectedJoinPlan.output val requiredAttributesMap = recordKeyAttributeToConditionExpression ++ preCombineAttributeAssociatedExpression @@ -617,6 +622,15 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) + val enableOptimizedMerge = sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), + SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) + + val keyGeneratorClassName = if (enableOptimizedMerge == "true") { + classOf[MergeIntoKeyGenerator].getCanonicalName + } else { + classOf[SqlKeyGenerator].getCanonicalName + } + val overridingOpts = Map( "path" -> path, RECORDKEY_FIELD.key -> tableConfig.getRawRecordKeyFieldProp, @@ -625,7 +639,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[MergeIntoKeyGenerator].getCanonicalName, + KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName, SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), @@ -648,7 +662,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie RECONCILE_SCHEMA.key -> "false", CANONICALIZE_NULLABLE.key -> "false", SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true", - HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key -> "true", + HoodieSparkSqlWriter.SQL_MERGE_INTO_WRITES.key -> "true", + HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY -> enableOptimizedMerge, HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() -> (!StringUtils.isNullOrEmpty(preCombineField)).toString ) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index 7d6d5f39bb161..e35e4939f0492 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions.{DATASOURCE_WRITE_PREPPED_KEY, ENABLE_OPTIMIZED_SQL_WRITES} +import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_WRITES_PREPPED_KEY, SPARK_SQL_OPTIMIZED_WRITES} import org.apache.hudi.SparkAdapterSupport import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals import org.apache.spark.sql._ @@ -44,8 +44,8 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends HoodieLeafRunnableC case Assignment(attr: AttributeReference, value) => attr -> value } - val filteredOutput = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key() - , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") { + val filteredOutput = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key() + , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") { ut.table.output } else { removeMetaFields(ut.table.output) @@ -63,10 +63,10 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends HoodieLeafRunnableC val condition = ut.condition.getOrElse(TrueLiteral) val filteredPlan = Filter(condition, Project(targetExprs, ut.table)) - val config = if (sparkSession.sqlContext.conf.getConfString(ENABLE_OPTIMIZED_SQL_WRITES.key() - , ENABLE_OPTIMIZED_SQL_WRITES.defaultValue()) == "true") { + val config = if (sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key() + , SPARK_SQL_OPTIMIZED_WRITES.defaultValue()) == "true") { // Set config to show that this is a prepped write. - buildHoodieConfig(catalogTable) + (DATASOURCE_WRITE_PREPPED_KEY -> "true") + buildHoodieConfig(catalogTable) + (SPARK_SQL_WRITES_PREPPED_KEY -> "true") } else { buildHoodieConfig(catalogTable) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java index 448215a9b0a76..844f134ed8356 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java @@ -161,7 +161,7 @@ public void run() throws Exception { .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false") .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") - .option(DataSourceWriteOptions.ENABLE_OPTIMIZED_SQL_WRITES().key(), "true") + .option(DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES().key(), DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES().defaultValue()) // This will remove any existing data at path below, and create a .mode(SaveMode.Overwrite); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala index 6233884a63e9f..bc87405b9f918 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala @@ -26,7 +26,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { test("Test Delete Table") { withTempDir { tmp => - Seq(true, false).foreach { optimizedSqlEnabled => + Seq(true, false).foreach { sparkSqlOptimizedWrites => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName // create table @@ -47,7 +47,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { """.stripMargin) // test with optimized sql writes enabled / disabled. - spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites") // insert data to table spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") @@ -97,7 +97,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { """.stripMargin) // test with optimized sql writes enabled. - spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") // insert data to table spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") @@ -279,7 +279,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { Seq(false, true).foreach { urlencode => test(s"Test Delete single-partition table' partitions, urlencode: $urlencode") { - Seq(true, false).foreach { optimizedSqlEnabled => + Seq(true, false).foreach { sparkSqlOptimizedWrites => withTempDir { tmp => val tableName = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$tableName" @@ -308,7 +308,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { |""".stripMargin) // test with optimized sql writes enabled / disabled. - spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites") // delete 2021-10-01 partition if (urlencode) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index 7ee3e838a2ec5..63adacbf1292c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, HoodieSparkUtils, ScalaAssertionSupport} import org.apache.spark.sql.internal.SQLConf @@ -24,94 +25,99 @@ import org.apache.spark.sql.internal.SQLConf class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSupport { test("Test MergeInto Basic") { - withRecordType()(withTempDir { tmp => - spark.sql("set hoodie.payload.combined.schema.validate = true") - val tableName = generateTableName - // Create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | location '${tmp.getCanonicalPath}' - | tblproperties ( - | primaryKey ='id', - | preCombineField = 'ts' - | ) + Seq(true, false).foreach { sparkSqlOptimizedWrites => + withRecordType()(withTempDir { tmp => + spark.sql("set hoodie.payload.combined.schema.validate = false") + val tableName = generateTableName + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey ='id', + | preCombineField = 'ts' + | ) """.stripMargin) - // First merge with a extra input field 'flag' (insert a new record) - spark.sql( - s""" - | merge into $tableName - | using ( - | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag - | ) s0 - | on s0.id = $tableName.id - | when matched and flag = '1' then update set - | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts - | when not matched and flag = '1' then insert * + // test with optimized sql merge enabled / disabled. + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites") + + // First merge with a extra input field 'flag' (insert a new record) + spark.sql( + s""" + | merge into $tableName + | using ( + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as flag + | ) s0 + | on s0.id = $tableName.id + | when matched and flag = '1' then update set + | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts + | when not matched and flag = '1' then insert * """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 10.0, 1000) - ) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) - // Second merge (update the record) - spark.sql( - s""" - | merge into $tableName - | using ( - | select 1 as id, 'a1' as name, 10 as price, 1001 as ts - | ) s0 - | on s0.id = $tableName.id - | when matched then update set - | id = s0.id, name = s0.name, price = s0.price + $tableName.price, ts = s0.ts - | when not matched then insert * + // Second merge (update the record) + spark.sql( + s""" + | merge into $tableName + | using ( + | select 1 as id, 'a1' as name, 10 as price, 1001 as ts + | ) s0 + | on s0.id = $tableName.id + | when matched then update set + | id = s0.id, name = s0.name, price = s0.price + $tableName.price, ts = s0.ts + | when not matched then insert * """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 20.0, 1001) - ) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 20.0, 1001) + ) - // the third time merge (update & insert the record) - spark.sql( - s""" - | merge into $tableName - | using ( - | select * from ( - | select 1 as id, 'a1' as name, 10 as price, 1002 as ts - | union all - | select 2 as id, 'a2' as name, 12 as price, 1001 as ts - | ) - | ) s0 - | on s0.id = $tableName.id - | when matched then update set - | id = s0.id, name = s0.name, price = s0.price + $tableName.price, ts = s0.ts - | when not matched and s0.id % 2 = 0 then insert * + // the third time merge (update & insert the record) + spark.sql( + s""" + | merge into $tableName + | using ( + | select * from ( + | select 1 as id, 'a1' as name, 10 as price, 1002 as ts + | union all + | select 2 as id, 'a2' as name, 12 as price, 1001 as ts + | ) + | ) s0 + | on s0.id = $tableName.id + | when matched then update set + | id = s0.id, name = s0.name, price = s0.price + $tableName.price, ts = s0.ts + | when not matched and s0.id % 2 = 0 then insert * """.stripMargin) - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 30.0, 1002), - Seq(2, "a2", 12.0, 1001) - ) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 30.0, 1002), + Seq(2, "a2", 12.0, 1001) + ) - // the fourth merge (delete the record) - spark.sql( - s""" - | merge into $tableName - | using ( - | select 1 as id, 'a1' as name, 12 as price, 1003 as ts - | ) s0 - | on s0.id = $tableName.id - | when matched and s0.id != 1 then update set - | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts - | when matched and s0.id = 1 then delete - | when not matched then insert * + // the fourth merge (delete the record) + spark.sql( + s""" + | merge into $tableName + | using ( + | select 1 as id, 'a1' as name, 12 as price, 1003 as ts + | ) s0 + | on s0.id = $tableName.id + | when matched and s0.id != 1 then update set + | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts + | when matched and s0.id = 1 then delete + | when not matched then insert * """.stripMargin) - val cnt = spark.sql(s"select * from $tableName where id = 1").count() - assertResult(0)(cnt) - }) + val cnt = spark.sql(s"select * from $tableName where id = 1").count() + assertResult(0)(cnt) + }) + } } /** @@ -1187,41 +1193,47 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } test("Test MergeInto with partial insert") { - withRecordType()(withTempDir {tmp => - spark.sql("set hoodie.payload.combined.schema.validate = true") - // Create a partitioned mor table - val tableName = generateTableName - spark.sql( - s""" - | create table $tableName ( - | id bigint, - | name string, - | price double, - | dt string - | ) using hudi - | tblproperties ( - | type = 'mor', - | primaryKey = 'id' - | ) - | partitioned by(dt) - | location '${tmp.getCanonicalPath}' + Seq(true, false).foreach { sparkSqlOptimizedWrites => + withRecordType()(withTempDir { tmp => + spark.sql("set hoodie.payload.combined.schema.validate = true") + // Create a partitioned mor table + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName ( + | id bigint, + | name string, + | price double, + | dt string + | ) using hudi + | tblproperties ( + | type = 'mor', + | primaryKey = 'id' + | ) + | partitioned by(dt) + | location '${tmp.getCanonicalPath}' """.stripMargin) - spark.sql(s"insert into $tableName select 1, 'a1', 10, '2021-03-21'") - spark.sql( - s""" - | merge into $tableName as t0 - | using ( - | select 2 as id, 'a2' as name, 10 as price, '2021-03-20' as dt - | ) s0 - | on s0.id = t0.id - | when not matched and s0.id % 2 = 0 then insert (id, name, dt) - | values(s0.id, s0.name, s0.dt) + spark.sql(s"insert into $tableName select 1, 'a1', 10, '2021-03-21'") + + // test with optimized sql merge enabled / disabled. + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites") + + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 2 as id, 'a2' as name, 10 as price, '2021-03-20' as dt + | ) s0 + | on s0.id = t0.id + | when not matched and s0.id % 2 = 0 then insert (id, name, dt) + | values(s0.id, s0.name, s0.dt) """.stripMargin) - checkAnswer(s"select id, name, price, dt from $tableName order by id")( - Seq(1, "a1", 10, "2021-03-21"), - Seq(2, "a2", null, "2021-03-20") - ) - }) + checkAnswer(s"select id, name, price, dt from $tableName order by id")( + Seq(1, "a1", 10, "2021-03-21"), + Seq(2, "a2", null, "2021-03-20") + ) + }) + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala index 69a40e868ac4a..dd1d00580dc74 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala @@ -17,127 +17,134 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES import org.apache.hudi.{HoodieSparkUtils, ScalaAssertionSupport} class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase with ScalaAssertionSupport { test("Test Merge into extra cond") { - withTempDir { tmp => - val tableName = generateTableName - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | location '${tmp.getCanonicalPath}/$tableName' - | tblproperties ( - | primaryKey ='id', - | preCombineField = 'ts' - | ) + Seq(true, false).foreach { sparkSqlOptimizedWrites => + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey ='id', + | preCombineField = 'ts' + | ) """.stripMargin) - val tableName2 = generateTableName - spark.sql( - s""" - |create table $tableName2 ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | location '${tmp.getCanonicalPath}/$tableName2' - | tblproperties ( - | primaryKey ='id', - | preCombineField = 'ts' - | ) + val tableName2 = generateTableName + spark.sql( + s""" + |create table $tableName2 ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName2' + | tblproperties ( + | primaryKey ='id', + | preCombineField = 'ts' + | ) """.stripMargin) - spark.sql( - s""" - |insert into $tableName values - | (1, 'a1', 10, 100), - | (2, 'a2', 20, 200), - | (3, 'a3', 20, 100) - |""".stripMargin) - spark.sql( - s""" - |insert into $tableName2 values - | (1, 'u1', 10, 999), - | (3, 'u3', 30, 9999), - | (4, 'u4', 40, 99999) - |""".stripMargin) + spark.sql( + s""" + |insert into $tableName values + | (1, 'a1', 10, 100), + | (2, 'a2', 20, 200), + | (3, 'a3', 20, 100) + |""".stripMargin) + spark.sql( + s""" + |insert into $tableName2 values + | (1, 'u1', 10, 999), + | (3, 'u3', 30, 9999), + | (4, 'u4', 40, 99999) + |""".stripMargin) - spark.sql( - s""" - |merge into $tableName as oldData - |using $tableName2 - |on oldData.id = $tableName2.id - |when matched and oldData.price = $tableName2.price then update set oldData.name = $tableName2.name - | - |""".stripMargin) + // test with optimized sql merge enabled / disabled. + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites") - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "u1", 10.0, 100), - Seq(3, "a3", 20.0, 100), - Seq(2, "a2", 20.0, 200) - ) + spark.sql( + s""" + |merge into $tableName as oldData + |using $tableName2 + |on oldData.id = $tableName2.id + |when matched and oldData.price = $tableName2.price then update set oldData.name = $tableName2.name + | + |""".stripMargin) - val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) { - "Only simple conditions of the form `t.id = s.id` using primary key or partition path " + - "columns are allowed on tables with primary key. (illegal column(s) used: `price`" - } else { - "Only simple conditions of the form `t.id = s.id` using primary key or partition path " + - "columns are allowed on tables with primary key. (illegal column(s) used: `price`;" - } + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "u1", 10.0, 100), + Seq(3, "a3", 20.0, 100), + Seq(2, "a2", 20.0, 200) + ) - checkException( - s""" - |merge into $tableName as oldData - |using $tableName2 - |on oldData.id = $tableName2.id and oldData.price = $tableName2.price - |when matched then update set oldData.name = $tableName2.name - |when not matched then insert * - |""".stripMargin)(errorMessage) + val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) { + "Only simple conditions of the form `t.id = s.id` using primary key or partition path " + + "columns are allowed on tables with primary key. (illegal column(s) used: `price`" + } else { + "Only simple conditions of the form `t.id = s.id` using primary key or partition path " + + "columns are allowed on tables with primary key. (illegal column(s) used: `price`;" + } - //test with multiple pks - val tableName3 = generateTableName - spark.sql( - s""" - |create table $tableName3 ( - | id int, - | name string, - | price double, - | ts long - |) using hudi - | location '${tmp.getCanonicalPath}/$tableName3' - | tblproperties ( - | primaryKey ='id,name', - | preCombineField = 'ts' - | ) + checkException( + s""" + |merge into $tableName as oldData + |using $tableName2 + |on oldData.id = $tableName2.id and oldData.price = $tableName2.price + |when matched then update set oldData.name = $tableName2.name + |when not matched then insert * + |""".stripMargin)(errorMessage) + + //test with multiple pks + val tableName3 = generateTableName + spark.sql( + s""" + |create table $tableName3 ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName3' + | tblproperties ( + | primaryKey ='id,name', + | preCombineField = 'ts' + | ) """.stripMargin) - val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) { - "Hudi tables with primary key are required to match on all primary key colums. Column: 'name' not found" - } else { - "Hudi tables with primary key are required to match on all primary key colums. Column: 'name' not found;" - } + val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) { + "Hudi tables with primary key are required to match on all primary key colums. Column: 'name' not found" + } else { + "Hudi tables with primary key are required to match on all primary key colums. Column: 'name' not found;" + } - checkException( - s""" - |merge into $tableName3 as oldData - |using $tableName2 - |on oldData.id = $tableName2.id - |when matched then update set oldData.name = $tableName2.name - |when not matched then insert * - |""".stripMargin)(errorMessage2) + checkException( + s""" + |merge into $tableName3 as oldData + |using $tableName2 + |on oldData.id = $tableName2.id + |when matched then update set oldData.name = $tableName2.name + |when not matched then insert * + |""".stripMargin)(errorMessage2) + } } } test("Test pkless complex merge cond") { withRecordType()(withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") val tableName = generateTableName // Create table spark.sql( @@ -212,6 +219,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit for (withPrecombine <- Seq(true, false)) { withRecordType()(withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") val tableName = generateTableName val prekstr = if (withPrecombine) "tblproperties (preCombineField = 'ts')" else "" @@ -264,6 +272,7 @@ class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase wit test("Test MergeInto Basic pkless") { withRecordType()(withTempDir { tmp => spark.sql("set hoodie.payload.combined.schema.validate = true") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") val tableName = generateTableName // Create table spark.sql( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index 23d192ba099a5..f244167d14244 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES import org.apache.hudi.HoodieSparkUtils.isSpark2 class TestUpdateTable extends HoodieSparkSqlTestBase { test("Test Update Table") { withRecordType()(withTempDir { tmp => - Seq(true, false).foreach { optimizedSqlEnabled => + Seq(true, false).foreach { sparkSqlOptimizedWrites => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName // create table @@ -50,7 +51,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { ) // test with optimized sql writes enabled / disabled. - spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites") // update data spark.sql(s"update $tableName set price = 20 where id = 1") @@ -95,7 +96,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { ) // test with optimized sql writes enabled. - spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=true") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true") // update data spark.sql(s"update $tableName set price = 20 where id = 1") @@ -256,7 +257,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { test("Test decimal type") { withTempDir { tmp => - Seq(true, false).foreach { optimizedSqlEnabled => + Seq(true, false).foreach { sparkSqlOptimizedWrites => val tableName = generateTableName // create table spark.sql( @@ -283,7 +284,7 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { ) // test with optimized sql writes enabled / disabled. - spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") + spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites") spark.sql(s"update $tableName set price = 22 where id = 1") checkAnswer(s"select id, name, price, ts from $tableName")(