diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java index 40154d8675cff..7c6f79f2bd330 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java @@ -23,6 +23,7 @@ import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.client.utils.SparkRowSerDe; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieKeyException; import org.apache.log4j.LogManager; @@ -480,6 +481,7 @@ private Object[] getNestedFieldValues(InternalRow row, HoodieUnsafeRowUtils.Nest private HoodieUnsafeRowUtils.NestedFieldPath[] resolveNestedFieldPaths(List fieldPaths, StructType schema, boolean returnNull) { try { return fieldPaths.stream() + .filter(fieldPath -> !StringUtils.isNullOrEmpty(fieldPath)) .map(fieldPath -> HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, fieldPath)) .toArray(HoodieUnsafeRowUtils.NestedFieldPath[]::new); } catch (Exception e) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index ea4e58cf76906..8ca07bbc01a21 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -50,7 +50,7 @@ public SimpleKeyGenerator(TypedProperties props) { SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) { super(props); // Make sure key-generator is configured properly - validateRecordKey(recordKeyField); + validateRecordKey(recordKeyField, autoGenerateRecordKeys); validatePartitionPath(partitionPathField); this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField); @@ -126,8 +126,8 @@ private static void validatePartitionPath(String partitionPathField) { String.format("Single partition-path field is expected; provided (%s)", partitionPathField)); } - private static void validateRecordKey(String recordKeyField) { - checkArgument(recordKeyField == null || !recordKeyField.isEmpty(), + private static void validateRecordKey(String recordKeyField, boolean isAutoGenerateRecordKeyEnabled) { + checkArgument(recordKeyField == null || !recordKeyField.isEmpty() || isAutoGenerateRecordKeyEnabled, "Record key field has to be non-empty!"); checkArgument(recordKeyField == null || !recordKeyField.contains(FIELDS_SEP), String.format("Single record-key field is expected; provided (%s)", recordKeyField)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 90b10c60ed04b..2a5b3f4972af8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -531,8 +531,12 @@ public String getPreCombineField() { } public Option getRecordKeyFields() { - String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, HoodieRecord.RECORD_KEY_METADATA_FIELD); - return Option.of(Arrays.stream(keyFieldsValue.split(",")) + return getRecordKeyFields(HoodieRecord.RECORD_KEY_METADATA_FIELD); + } + + public Option getRecordKeyFields(String defaultValue) { + String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, defaultValue); + return keyFieldsValue == null ? Option.empty() : Option.of(Arrays.stream(keyFieldsValue.split(",")) .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {})); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java similarity index 100% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java rename to hudi-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 5de9cc8a411d8..46edbb4bb30b1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -376,7 +376,7 @@ object DataSourceWriteOptions { * Key generator class, that implements will extract the key out of incoming record. */ val keyGeneratorInferFunc = JFunction.toJavaFunction((config: HoodieConfig) => { - Option.of(DataSourceOptionsHelper.inferKeyGenClazz(config.getProps)) + Option.ofNullable(DataSourceOptionsHelper.inferKeyGenClazz(config.getProps)) }) val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty @@ -862,6 +862,10 @@ object DataSourceOptionsHelper { def inferKeyGenClazz(recordsKeyFields: String, partitionFields: String): String = { if (!StringUtils.isNullOrEmpty(partitionFields)) { val numPartFields = partitionFields.split(",").length + // Inference may not work when auto generation of record keys are enabled. + if (StringUtils.isNullOrEmpty(recordsKeyFields)) { + return null + } val numRecordKeyFields = recordsKeyFields.split(",").length if (numPartFields == 1 && numRecordKeyFields == 1) { classOf[SimpleKeyGenerator].getName diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 52df84b17f8fb..9f63f2a7c1b28 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -117,7 +117,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, if (tableConfig.populateMetaFields()) { HoodieRecord.RECORD_KEY_METADATA_FIELD } else { - val keyFields = tableConfig.getRecordKeyFields.get() + val keyFields = tableConfig.getRecordKeyFields().get() checkState(keyFields.length == 1) keyFields.head } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5dda7c9df78c3..39b93166fdeb7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -99,13 +99,13 @@ object HoodieSparkSqlWriter { val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) - val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode) + var (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode) val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters) val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator( originKeyGeneratorClassName, parameters) // Validate datasource and tableconfig keygen are the same - validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig); + parameters = validateAndSetKeyGeneratorConfigs(originKeyGeneratorClassName, hoodieConfig, parameters, tableConfig); validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite); val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 90535348ae662..f52ae7b7ffeb6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -22,22 +22,30 @@ import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _} import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig} import org.apache.hudi.common.table.HoodieTableConfig -import org.apache.hudi.exception.HoodieException +import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.{HoodieException, HoodieKeyGeneratorException} import org.apache.hudi.hive.HiveSyncConfigHolder +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.util.SparkKeyGenUtils +import org.apache.log4j.{LogManager} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hudi.command.SqlKeyGenerator + import java.util.Properties import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters._ +import scala.collection.mutable /** * WriterUtils to assist in write path in Datasource and tests. */ object HoodieWriterUtils { + private val log = LogManager.getLogger(getClass) + def javaParametersWithWriteDefaults(parameters: java.util.Map[String, String]): java.util.Map[String, String] = { mapAsJavaMap(parametersWithWriteDefaults(parameters.asScala.toMap)) } @@ -55,7 +63,10 @@ object HoodieWriterUtils { val hoodieConfig: HoodieConfig = new HoodieConfig(props) hoodieConfig.setDefaultValue(OPERATION) hoodieConfig.setDefaultValue(TABLE_TYPE) - hoodieConfig.setDefaultValue(PRECOMBINE_FIELD) + //when auto generation of record keys is not enabled, do not set preCombine field. This might also result in setting default value from code. + if (!parameters.getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(), KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue()).toBoolean) { + hoodieConfig.setDefaultValue(PRECOMBINE_FIELD) + } hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME) hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME) hoodieConfig.setDefaultValue(ENABLE) @@ -185,7 +196,9 @@ object HoodieWriterUtils { /** * Detects conflicts between datasourceKeyGen and existing table configuration keyGen */ - def validateKeyGeneratorConfig(datasourceKeyGen: String, tableConfig: HoodieConfig): Unit = { + def validateAndSetKeyGeneratorConfigs(datasourceKeyGen: String, hoodieConfig: HoodieConfig, + inputParams: Map[String, String], tableConfig: HoodieConfig) + : Map[String, String] = { val diffConfigs = StringBuilder.newBuilder if (null != tableConfig) { @@ -203,6 +216,51 @@ object HoodieWriterUtils { diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n") throw new HoodieException(diffConfigs.toString.trim) } + + val parameters = mutable.Map() ++ inputParams + // auto generation of record keys needs some special handling on setting configs. + + if (hoodieConfig.getBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS)) { + val autoGenerateRecordKeyConfigKey = KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() + // of user explicitly sets combine before insert, we fail. de-dup is not supported with auto generation of record keys. + if (hoodieConfig.getBoolean(HoodieWriteConfig.COMBINE_BEFORE_INSERT)) { + throw new HoodieKeyGeneratorException(s"Config $autoGenerateRecordKeyConfigKey can not be used when " + + s"${HoodieWriteConfig.COMBINE_BEFORE_INSERT.key()} is enabled") + } + // enables concat handle to ensure duplicate records are not de-duped. + if (!hoodieConfig.getBoolean(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE)) { + hoodieConfig.setValue(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE, "true") + parameters += (HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true") + log.warn(s"Enabling config {${HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key()}} when $autoGenerateRecordKeyConfigKey is used") + } + // MOR table type is not supported when auto generation of record keys are enabled since auto generation is meant only for immutable workloads. + if (hoodieConfig.getString(DataSourceWriteOptions.TABLE_TYPE) == MOR_TABLE_TYPE_OPT_VAL) { + throw new HoodieKeyGeneratorException(s"Config ${DataSourceWriteOptions.TABLE_TYPE.key()} should be set to " + + s"COW_TABLE_TYPE_OPT_VAL when $autoGenerateRecordKeyConfigKey is used") + } + // If OPERATION is explicitly set as UPSERT by the user, throw an exception. If user is using default value then + // operation is overridden to INSERT_OPERATION_OPT_VAL + if (parameters.getOrElse(OPERATION.key(), StringUtils.EMPTY_STRING) == UPSERT_OPERATION_OPT_VAL) { + throw new HoodieKeyGeneratorException(s"Config ${OPERATION.key()} should not be set to $UPSERT_OPERATION_OPT_VAL" + + s" when $autoGenerateRecordKeyConfigKey is used") + } else if (hoodieConfig.getString(OPERATION) == UPSERT_OPERATION_OPT_VAL) { + hoodieConfig.setValue(OPERATION.key(), INSERT_OPERATION_OPT_VAL) + parameters += (OPERATION.key() -> INSERT_OPERATION_OPT_VAL) + log.warn(s"Setting config ${OPERATION.key()} to $INSERT_OPERATION_OPT_VAL when $autoGenerateRecordKeyConfigKey is used") + } + // preCombine does not make sense when auto generation of record keys are enabled. + if (!StringUtils.isNullOrEmpty(hoodieConfig.getString(DataSourceWriteOptions.PRECOMBINE_FIELD))) { + throw new HoodieKeyGeneratorException(s"Config ${DataSourceWriteOptions.PRECOMBINE_FIELD.key()} should not be set" + + s" when $autoGenerateRecordKeyConfigKey is used") + } + // record key field config should not be set when auto generation of record keys are used. + if (!StringUtils.isNullOrEmpty(hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD))) { + throw new HoodieKeyGeneratorException(s"Config ${DataSourceWriteOptions.RECORDKEY_FIELD.key()} should not be set " + + s"when $autoGenerateRecordKeyConfigKey is used") + } + } + + parameters.toMap } private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index 17a1cab27feb7..d37f80dd45492 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -141,7 +141,7 @@ class HoodieCDCRDD( private lazy val recordKeyField: String = if (populateMetaFields) { HoodieRecord.RECORD_KEY_METADATA_FIELD } else { - val keyFields = metaClient.getTableConfig.getRecordKeyFields.get() + val keyFields = metaClient.getTableConfig.getRecordKeyFields().get() checkState(keyFields.length == 1) keyFields.head } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 826c0090b212c..f9422c700428b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -111,7 +111,8 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten /** * Record Field List(Primary Key List) */ - lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields.orElse(Array.empty) + // FIX ME? + lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields(null).orElse(Array.empty) /** * PreCombine Field diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index bb682cf9b5ffe..f2a77af69d7c8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecordMerger} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.{StringUtils, ValidationUtils} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType @@ -193,13 +194,17 @@ object HoodieOptionConfig { def validateTable(spark: SparkSession, schema: StructType, sqlOptions: Map[String, String]): Unit = { val resolver = spark.sessionState.conf.resolver - // validate primary key - val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName) - .map(_.split(",").filter(_.length > 0)) - ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.") - primaryKeys.get.foreach { primaryKey => - ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))), - s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.") + // validate primary keys only when auto generation of record keys is not enabled. + if (!sqlOptions.getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(), + KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue()).toBoolean) { + // validate primary key + val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName) + .map(_.split(",").filter(_.length > 0)) + ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.") + primaryKeys.get.foreach { primaryKey => + ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))), + s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.") + } } // validate preCombine key diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 77ff939cf2623..4d0176810ec51 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -28,6 +28,7 @@ import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, MultiPartKeysValueExtractor} import org.apache.hudi.keygen.ComplexKeyGenerator +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.spark.internal.Logging @@ -54,6 +55,8 @@ trait ProvidesHoodieConfig extends Logging { // default value ("ts") // TODO(HUDI-3456) clean up val preCombineField = Option(tableConfig.getPreCombineField).getOrElse("") + val autoGenerateRecordKeys = sparkSession.conf + .getOption(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key).getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue).toBoolean require(hoodieCatalogTable.primaryKeys.nonEmpty, s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator") @@ -61,9 +64,8 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map.apply( + val opts = Map.apply( "path" -> hoodieCatalogTable.tableLocation, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), TBL_NAME.key -> hoodieCatalogTable.tableName, PRECOMBINE_FIELD.key -> preCombineField, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, @@ -82,6 +84,15 @@ trait ProvidesHoodieConfig extends Logging { HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL ) + handleAutoGenerateRecordKeys(hoodieCatalogTable, opts, autoGenerateRecordKeys) + } + } + + def handleAutoGenerateRecordKeys(hoodieCatalogTable: HoodieCatalogTable, opts: Map[String, String], autoGenerateRecordKeys: Boolean): Map[String, String] = { + if (autoGenerateRecordKeys) { + opts + } else { + (opts ++ Map(RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","))) } } @@ -122,6 +133,8 @@ trait ProvidesHoodieConfig extends Logging { val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName) .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName) + val autoGenerateRecordKeys = sparkSession.conf + .getOption(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key).getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue).toBoolean val enableBulkInsert = combinedOpts.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean val dropDuplicate = sparkSession.conf @@ -175,7 +188,7 @@ trait ProvidesHoodieConfig extends Logging { } withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map( + val opts = Map( "path" -> path, TABLE_TYPE.key -> tableType, TBL_NAME.key -> hoodieCatalogTable.tableName, @@ -184,7 +197,6 @@ trait ProvidesHoodieConfig extends Logging { URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> preCombineField, PARTITIONPATH_FIELD.key -> partitionFieldsStr, PAYLOAD_CLASS_NAME.key -> payloadClassName, @@ -199,6 +211,7 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL ) + handleAutoGenerateRecordKeys(hoodieCatalogTable, opts, autoGenerateRecordKeys) } } @@ -209,15 +222,16 @@ trait ProvidesHoodieConfig extends Logging { val tableConfig = hoodieCatalogTable.tableConfig val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) + val autoGenerateRecordKeys = sparkSession.conf + .getOption(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key).getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue).toBoolean withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map( + val opts = Map( "path" -> hoodieCatalogTable.tableLocation, TBL_NAME.key -> hoodieCatalogTable.tableName, TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName, OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, PARTITIONS_TO_DELETE.key -> partitionsToDrop, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), PARTITIONPATH_FIELD.key -> partitionFields, HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), @@ -229,6 +243,7 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) ) + handleAutoGenerateRecordKeys(hoodieCatalogTable, opts, autoGenerateRecordKeys) } } @@ -244,11 +259,12 @@ trait ProvidesHoodieConfig extends Logging { s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operation") val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) + val autoGenerateRecordKeys = sparkSession.conf + .getOption(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key).getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue).toBoolean withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map( + val opts = Map( "path" -> path, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), TBL_NAME.key -> tableConfig.getTableName, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, @@ -266,6 +282,7 @@ trait ProvidesHoodieConfig extends Logging { HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL ) + handleAutoGenerateRecordKeys(hoodieCatalogTable, opts, autoGenerateRecordKeys) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 2e4c1db099e38..1f1585c934ec2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.exception.HoodieException -import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport} +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, NamedExpression} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 9099c7225e0f9..6743e99d4fed1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, TBL_NAME} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.HiveSyncConfigHolder +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast @@ -500,13 +501,14 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // default value ("ts") // TODO(HUDI-3456) clean up val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("") + val autoGenerateRecordKeys = sparkSession.conf + .getOption(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key).getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue).toBoolean val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map( + val opts = Map( "path" -> path, - RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, PRECOMBINE_FIELD.key -> preCombineField, TBL_NAME.key -> hoodieCatalogTable.tableName, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, @@ -534,6 +536,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie RECONCILE_SCHEMA.key -> "false", "hoodie.datasource.write.schema.canonicalize" -> "false" ) + if (autoGenerateRecordKeys) { + opts + } else { + (opts ++ Map(RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp)) + } } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala index 543b4f31c197d..659b467ce1137 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala @@ -45,7 +45,6 @@ class TestAutoRecordKeyGeneration extends SparkClientFunctionalTestHarness { "hoodie.bulkinsert.shuffle.parallelism" -> "4", "hoodie.delete.shuffle.parallelism" -> "2", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) @@ -53,10 +52,7 @@ class TestAutoRecordKeyGeneration extends SparkClientFunctionalTestHarness { @CsvSource(value = Array( "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator", "COPY_ON_WRITE|org.apache.hudi.keygen.ComplexKeyGenerator", - "COPY_ON_WRITE|org.apache.hudi.keygen.TimestampBasedKeyGenerator", - "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator", - "MERGE_ON_READ|org.apache.hudi.keygen.ComplexKeyGenerator", - "MERGE_ON_READ|org.apache.hudi.keygen.TimestampBasedKeyGenerator" + "COPY_ON_WRITE|org.apache.hudi.keygen.TimestampBasedKeyGenerator" ), delimiter = '|') def testRecordKeyGeneration(tableType: String, keyGenClass: String): Unit = { var options: Map[String, String] = commonOpts + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 657c7762c380e..91d2687ceb891 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -29,10 +29,13 @@ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.model._ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig, HoodieWriteConfig} -import org.apache.hudi.exception.HoodieException +import org.apache.hudi.exception.{HoodieException, HoodieKeyGeneratorException} import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.functional.TestBootstrap +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest @@ -42,7 +45,7 @@ import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.hudi.command.SqlKeyGenerator import org.apache.spark.{SparkConf, SparkContext} -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue, fail} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments @@ -52,9 +55,6 @@ import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept} -import java.io.IOException -import java.time.Instant -import java.util.{Collections, Date, UUID} import scala.collection.JavaConversions._ import scala.collection.JavaConverters @@ -1177,6 +1177,144 @@ class TestHoodieSparkSqlWriter { assertTrue(kg2 == classOf[SimpleKeyGenerator].getName) } + @Test + def testAutoGenerationOfRecordKeysFailsWithIncompatibleConfigs(): Unit = { + val _spark = spark + import _spark.implicits._ + + val initialOpts = Map( + HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() -> "true", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", + HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "false", + HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true", + DataSourceWriteOptions.OPERATION.key() -> INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL + ) + + val incompatibleConfigList = List( + HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "true", + // PRECOMBINE_FIELD should not be set + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + // RECORDKEY_FIELD should not be set + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + // Only COW table is supported + DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL, + // Explicitly set Upsert operation is not supported + DataSourceWriteOptions.OPERATION.key() -> UPSERT_OPERATION_OPT_VAL + ) + + for (incompatibleOpt <- incompatibleConfigList) { + val opts = initialOpts + incompatibleOpt + try { + // verify exception is thrown when HoodieWriteConfig.COMBINE_BEFORE_INSERT is enabled + val tmpDF = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + tmpDF.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(tempBasePath) + throw new Exception("Should fail for " + incompatibleOpt) + } catch { + case e: HoodieException => assertTrue(e.isInstanceOf[HoodieKeyGeneratorException], + "e is not instance of HoodieKeyGeneratorException " + e.printStackTrace()) + case other => throw other + } + } + } + + @Test + def testAutoGenerationOfRecordKeysOverridesConfigs(): Unit = { + val _spark = spark + import _spark.implicits._ + + val initialOpts = Map( + HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() -> "true", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", + HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "false", + HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true", + DataSourceWriteOptions.OPERATION.key() -> INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL + ) + + val overrideConfigList = List( + // override to true + HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "false", + // Default hoodie.datasource.write.operation type upsert is overridden to INSERT_OPERATION_OPT_VAL + "" -> "" + ) + + for (overrideConfig <- overrideConfigList) { + val opts = initialOpts + overrideConfig + // verify exception is thrown when HoodieWriteConfig.COMBINE_BEFORE_INSERT is enabled + val tmpDF = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + tmpDF.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(tempBasePath) + + val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tempBasePath).build() + if (metaClient.getTableConfig.contains(overrideConfig._1)) { + assertNotEquals(overrideConfig._2, metaClient.getTableConfig.getString(overrideConfig._1)) + } + } + } + + @ParameterizedTest + @ValueSource(strings = Array("bulk_insert", "insert")) + def testAutoGenerationOfRecordKeys(opType : String): Unit = { + + val dataGen = new HoodieTestDataGenerator() + val initialOpts = Map( + HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL, + KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() -> "true", + DataSourceWriteOptions.OPERATION.key() -> opType, + DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", + HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true") + + var totalRecs = 0 + for (_ <- 1 to 2) { + val opts = initialOpts + for (x <- 1 to 3) { + val instantTime = "00" + x + val genRecsList = if (x == 1) { + totalRecs += 100 * 2 + val inserts = dataGen.generateInserts(instantTime, 100) + // Adds duplicate inserts by doubling the inserts added + // In the second iteration of outer loop, MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE comes into play. Since the + // incoming inserts would be merged with existing records if the config is disabled. + inserts.addAll(inserts) + inserts + } else { + totalRecs += 10 + dataGen.generateUniqueUpdates(instantTime, 10) + } + val records = recordsToStrings(genRecsList).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + inputDF.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(tempBasePath) + + val snapshotDF = spark.read.format("org.apache.hudi") + .load(tempBasePath) + assertEquals(totalRecs, snapshotDF.count()) + } + } + + val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tempBasePath).build() + assertEquals(metaClient.getTableConfig.getTableType, HoodieTableType.COPY_ON_WRITE) + assertTrue(StringUtils.isNullOrEmpty(metaClient.getTableConfig.getPreCombineField), + metaClient.getTableConfig.getPreCombineField + " should be empty or null") + } + /** * * Test that you can't have consistent hashing bucket index on a COW table diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 120c6adb6cdf4..fd23dd9a4c3db 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -1036,4 +1036,121 @@ class TestCreateTable extends HoodieSparkSqlTestBase { checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName) spark.sql(s"drop table $tableName") } + + test("Test Auto Generation Of Record Keys with CTAS") { + withTempDir { tmp => + Seq("cow").foreach { tableType => + // Create Non-Partitioned table + val tableName1 = generateTableName + spark.sql( + s""" + | create table $tableName1 using hudi + | tblproperties( + | type = '$tableType', + | hoodie.auto.generate.record.keys = 'true' + | ) + | location '${tmp.getCanonicalPath}/$tableName1' + | AS + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts + """.stripMargin) + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName1").getOperationType + } + checkAnswer(s"select id, name, price, ts from $tableName1")( + Seq(1, "a1", 10.0, 1000) + ) + + // Create Partitioned table + val tableName2 = generateTableName + spark.sql( + s""" + | create table $tableName2 using hudi + | partitioned by (dt) + | tblproperties( + | type = '$tableType', + | hoodie.auto.generate.record.keys = 'true' + | ) + | location '${tmp.getCanonicalPath}/$tableName2' + | AS + | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt + """.stripMargin + ) + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName2").getOperationType + } + checkAnswer(s"select id, name, price, dt from $tableName2")( + Seq(1, "a1", 10, "2021-04-01") + ) + + // Create Partitioned table with timestamp data type + val tableName3 = generateTableName + // CTAS failed with null primaryKey + assertThrows[Exception] { + spark.sql( + s""" + | create table $tableName3 using hudi + | partitioned by (dt) + | tblproperties( + | type = '$tableType', + | hoodie.auto.generate.record.keys = 'true' + | ) + | location '${tmp.getCanonicalPath}/$tableName3' + | AS + | select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt + | + """.stripMargin + ) + } + // Create table with timestamp type partition + spark.sql( + s""" + | create table $tableName3 using hudi + | partitioned by (dt) + | tblproperties( + | type = '$tableType', + | hoodie.auto.generate.record.keys = 'true' + | ) + | location '${tmp.getCanonicalPath}/$tableName3' + | AS + | select cast('2021-05-06 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as + | price + """.stripMargin + ) + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName3").getOperationType + } + checkAnswer(s"select id, name, price, cast(dt as string) from $tableName3")( + Seq(1, "a1", 10, "2021-05-06 00:00:00") + ) + + // Create table with date type partition + val tableName4 = generateTableName + spark.sql( + s""" + | create table $tableName4 using hudi + | partitioned by (dt) + | tblproperties( + | type = '$tableType', + | hoodie.auto.generate.record.keys = 'true' + | ) + | location '${tmp.getCanonicalPath}/$tableName4' + | AS + | select cast('2021-05-06' as date) as dt, 1 as id, 'a1' as name, 10 as + | price + """.stripMargin + ) + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName4").getOperationType + } + checkAnswer(s"select id, name, price, cast(dt as string) from $tableName4")( + Seq(1, "a1", 10, "2021-05-06") + ) + spark.sql("set hoodie.auto.generate.record.keys = false") + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index a227a20b8b6d0..6b5d16a4f265d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -1159,4 +1159,40 @@ class TestInsertTable extends HoodieSparkSqlTestBase { ) } } + + test("Test Auto Generate Record keys using Insert Into with values") { + withRecordType()(withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties (hoodie.auto.generate.record.keys = 'true') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + + spark.sql("set hoodie.datasource.write.operation = insert") + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + }) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index 0a8458063cd5a..8eef209728579 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -157,7 +157,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { .setConf(spark.sessionState.newHadoopConf()) .build() // check record key in hoodie.properties - assertResult("id")(metaClient.getTableConfig.getRecordKeyFields.get().mkString(",")) + assertResult("id")(metaClient.getTableConfig.getRecordKeyFields().get().mkString(",")) spark.sql( s"""