diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index 76474fde66eae..07ab5df9c7dbd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -113,7 +113,7 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, Object persistedOrderingVal = getNestedFieldVal((GenericRecord) currentValue, properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true); Comparable incomingOrderingVal = (Comparable) getNestedFieldVal((GenericRecord) incomingRecord, - properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), false); + properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true); return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0; } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index ddbd7fc06a95b..d1094bb5cd3a7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -19,11 +19,13 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf + import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.DataSourceOptionsHelper.{allAlternatives, translateConfigurations} +import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} @@ -42,19 +44,21 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.hudi.table.BulkInsertPartitioner + import org.apache.log4j.LogManager + import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkContext} + import java.util import java.util.Properties import scala.collection.JavaConversions._ import scala.collection.mutable -import scala.collection.mutable.StringBuilder import scala.collection.mutable.ListBuffer object HoodieSparkSqlWriter { @@ -141,7 +145,7 @@ object HoodieSparkSqlWriter { .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) .setRecordKeyFields(hoodieConfig.getString(RECORDKEY_FIELD)) - .setKeyGeneratorClassProp(hoodieConfig.getString(KEYGENERATOR_CLASS_NAME)) + .setKeyGeneratorClassProp(HoodieWriterUtils.getOriginKeyGenerator(parameters)) .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) .initTable(sparkContext.hadoopConfiguration, path) @@ -713,22 +717,6 @@ object HoodieSparkSqlWriter { } } - private def validateTableConfig(spark: SparkSession, params: Map[String, String], - tableConfig: HoodieTableConfig): Unit = { - val resolver = spark.sessionState.conf.resolver - val diffConfigs = StringBuilder.newBuilder - params.foreach { case (key, value) => - val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key) - if (null != existingValue && !resolver(existingValue, value)) { - diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n") - } - } - if (diffConfigs.nonEmpty) { - diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n") - throw new HoodieException(diffConfigs.toString.trim) - } - } - private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String], tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = { val mergedParams = mutable.Map.empty ++ @@ -745,16 +733,4 @@ object HoodieSparkSqlWriter { val params = mergedParams.toMap (params, HoodieWriterUtils.convertMapToHoodieConfig(params)) } - - private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieTableConfig, key: String): String = { - if (null == tableConfig) { - null - } else { - if (allAlternatives.contains(key)) { - tableConfig.getString(allAlternatives(key)) - } else { - tableConfig.getString(key) - } - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 0e3ede1fe3ebc..c1223d9792273 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -17,15 +17,19 @@ package org.apache.hudi +import java.util.Properties + +import org.apache.hudi.DataSourceOptionsHelper.allAlternatives import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE import org.apache.hudi.common.config.{HoodieConfig, TypedProperties} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hudi.command.SqlKeyGenerator -import java.util.Properties import scala.collection.JavaConversions.mapAsJavaMap -import scala.collection.JavaConverters.{mapAsScalaMapConverter, _} -import scala.collection.JavaConverters.mapAsScalaMapConverter -import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import scala.collection.JavaConverters._ /** * WriterUtils to assist in write path in Datasource and tests. @@ -102,4 +106,68 @@ object HoodieWriterUtils { properties.putAll(mapAsJavaMap(parameters)) new HoodieConfig(properties) } + + def getOriginKeyGenerator(parameters: Map[String, String]): String = { + val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null) + if (classOf[SqlKeyGenerator].getCanonicalName == kg) { + parameters.getOrElse(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null) + } else { + kg + } + } + + /** + * Detects conflicts between new parameters and existing table configurations + */ + def validateTableConfig(spark: SparkSession, params: Map[String, String], + tableConfig: HoodieConfig): Unit = { + val resolver = spark.sessionState.conf.resolver + val diffConfigs = StringBuilder.newBuilder + params.foreach { case (key, value) => + val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key) + if (null != existingValue && !resolver(existingValue, value)) { + diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n") + } + } + + if (null != tableConfig) { + val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null) + val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS) + if (null != datasourceRecordKey && null != tableConfigRecordKey + && datasourceRecordKey != tableConfigRecordKey) { + diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n") + } + + val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null) + val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD) + if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey + && datasourcePreCombineKey != tableConfigPreCombineKey) { + diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n") + } + + val datasourceKeyGen = getOriginKeyGenerator(params) + val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + if (null != datasourceKeyGen && null != tableConfigKeyGen + && datasourceKeyGen != tableConfigKeyGen) { + diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n") + } + } + + if (diffConfigs.nonEmpty) { + diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n") + throw new HoodieException(diffConfigs.toString.trim) + } + } + + private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = { + if (null == tableConfig) { + null + } else { + if (allAlternatives.contains(key)) { + tableConfig.getString(allAlternatives(key)) + } else { + tableConfig.getString(key) + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index 963035cb638d3..543a8b997004c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -20,6 +20,10 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.common.model.DefaultHoodieRecordPayload import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.util.ValidationUtils + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType /** @@ -43,6 +47,7 @@ object HoodieOptionConfig { .withSqlKey("primaryKey") .withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key) .withTableConfigKey(HoodieTableConfig.RECORDKEY_FIELDS.key) + .defaultValue(DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) .build() val SQL_KEY_TABLE_TYPE: HoodieOption[String] = buildConf() @@ -102,6 +107,8 @@ object HoodieOptionConfig { private lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1) + def withDefaultSqlOptions(options: Map[String, String]): Map[String, String] = defaultSqlOptions ++ options + /** * Mapping the sql's short name key/value in the options to the hoodie's config key/value. * @param options @@ -119,14 +126,13 @@ object HoodieOptionConfig { * @return */ def mappingSqlOptionToTableConfig(options: Map[String, String]): Map[String, String] = { - defaultTableConfig ++ - options.map { case (k, v) => - if (keyTableConfigMapping.contains(k)) { - keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v) - } else { - k -> v - } + options.map { case (k, v) => + if (keyTableConfigMapping.contains(k)) { + keyTableConfigMapping(k) -> valueMapping.getOrElse(v, v) + } else { + k -> v } + } } /** @@ -136,16 +142,19 @@ object HoodieOptionConfig { options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2)) } - private lazy val defaultTableConfig: Map[String, String] = { + private lazy val defaultSqlOptions: Map[String, String] = { HoodieOptionConfig.getClass.getDeclaredFields .filter(f => f.getType == classOf[HoodieOption[_]]) .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]}) .filter(option => option.tableConfigKey.isDefined && option.defaultValue.isDefined) - .map(option => option.tableConfigKey.get -> - valueMapping.getOrElse(option.defaultValue.get.toString, option.defaultValue.get.toString)) + .map(option => option.sqlKeyName -> option.defaultValue.get.toString) .toMap } + private lazy val defaultTableConfig: Map[String, String] = { + mappingSqlOptionToHoodieParam(defaultSqlOptions) + } + /** * Get the primary key from the table options. * @param options @@ -154,7 +163,7 @@ object HoodieOptionConfig { def getPrimaryColumns(options: Map[String, String]): Array[String] = { val params = mappingSqlOptionToHoodieParam(options) params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key) - .map(_.split(",").filter(_.length > 0)) + .map(_.split(",").filter(_.nonEmpty)) .getOrElse(Array.empty) } @@ -171,7 +180,47 @@ object HoodieOptionConfig { def getPreCombineField(options: Map[String, String]): Option[String] = { val params = mappingSqlOptionToHoodieParam(options) - params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key) + params.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key).filter(_.nonEmpty) + } + + def deleteHooideOptions(options: Map[String, String]): Map[String, String] = { + options.filterNot(_._1.startsWith("hoodie.")).filterNot(kv => keyMapping.contains(kv._1)) + } + + // extract primaryKey, preCombineField, type options + def extractSqlOptions(options: Map[String, String]): Map[String, String] = { + val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) + options.filterKeys(targetOptions.contains) + } + + // validate primaryKey, preCombineField and type options + def validateTable(spark: SparkSession, schema: StructType, options: Map[String, String]): Unit = { + val resolver = spark.sessionState.conf.resolver + + // validate primary key + val primaryKeys = options.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName) + .map(_.split(",").filter(_.length > 0)) + ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.") + primaryKeys.get.foreach { primaryKey => + ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, primaryKey)), + s"Can't find primary key `$primaryKey` in ${schema.treeString}.") + } + + // validate precombine key + val precombineKey = options.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName) + if (precombineKey.isDefined && precombineKey.get.nonEmpty) { + ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, precombineKey.get)), + s"Can't find precombine key `${precombineKey.get}` in ${schema.treeString}.") + } + + // validate table type + val tableType = options.get(SQL_KEY_TABLE_TYPE.sqlKeyName) + ValidationUtils.checkArgument(tableType.nonEmpty, "No `type` is specified.") + ValidationUtils.checkArgument( + tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW) || + tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR), + s"'type' must be '${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW}' or " + + s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'") } def buildConf[T](): HoodieOptions[T] = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index cf9c49ef02a9c..8e490335080e0 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -90,7 +90,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) val metadataConfig = { val properties = new Properties() - properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava) + properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ table.properties).asJava) HoodieMetadataConfig.newBuilder.fromProperties(properties).build() } FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 87cbb8a7f0306..1446760a3eb69 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -202,8 +202,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi val targetTableId = getMergeIntoTargetTableId(mergeInto) val targetTable = sparkSession.sessionState.catalog.getTableMetadata(targetTableId) - val targetTableType = HoodieOptionConfig.getTableType(targetTable.storage.properties) - val preCombineField = HoodieOptionConfig.getPreCombineField(targetTable.storage.properties) + val tblProperties = targetTable.storage.properties ++ targetTable.properties + val targetTableType = HoodieOptionConfig.getTableType(tblProperties) + val preCombineField = HoodieOptionConfig.getPreCombineField(tblProperties) // Get the map of target attribute to value of the update assignments. val target2Values = resolvedAssignments.map { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index 4123ea9499a6d..e7d77e7598aec 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -105,8 +105,13 @@ object AlterHoodieTableAddColumnsCommand { val path = getTableLocation(table, sparkSession) val jsc = new JavaSparkContext(sparkSession.sparkContext) - val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, - path, table.identifier.table, HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties).asJava) + val client = DataSourceUtils.createHoodieClient( + jsc, + schema.toString, + path, + table.identifier.table, + HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties ++ table.properties).asJava + ) val hadoopConf = sparkSession.sessionState.newHadoopConf() val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index 7c4d45649587b..f12f43d389c1f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -92,7 +92,7 @@ extends RunnableCommand { .build() val tableConfig = metaClient.getTableConfig - val optParams = withSparkConf(sparkSession, table.storage.properties) { + withSparkConf(sparkSession, table.storage.properties) { Map( "path" -> path, TBL_NAME.key -> tableIdentifier.table, @@ -104,10 +104,6 @@ extends RunnableCommand { PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp ) } - - val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams) - val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters) - translatedOptions } def normalizePartitionSpec[T]( diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 38c7e290a2659..2244e72123483 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -19,9 +19,11 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.sql.InsertMode + import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -73,9 +75,10 @@ case class CreateHoodieTableAsSelectCommand( // Execute the insert query try { + val tblProperties = table.storage.properties ++ table.properties val options = Map( DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString, - DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(table.storage.properties.asJava), + DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava), DataSourceWriteOptions.HIVE_TABLE_PROPERTIES.key -> ConfigUtils.configToString(table.properties.asJava), DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(), DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true" @@ -88,7 +91,9 @@ case class CreateHoodieTableAsSelectCommand( if (!sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { // Create the table val createTableCommand = CreateHoodieTableCommand(tableWithSchema, mode == SaveMode.Ignore) - createTableCommand.createTableInCatalog(sparkSession, checkPathForManagedTable = false) + val path = getTableLocation(table, sparkSession) + val (finalSchema, _, tableSqlOptions) = createTableCommand.parseSchemaAndConfigs(sparkSession, path, ctas = true) + createTableCommand.createTableInCatalog(sparkSession, finalSchema, tableSqlOptions) } } else { // failed to insert data, clear table path clearTablePath(tablePath, hadoopConf) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 8ac63126a4b93..d6c5160897e6e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -19,13 +19,17 @@ package org.apache.spark.sql.hudi.command import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + +import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} +import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} -import org.apache.hudi.common.util.ValidationUtils import org.apache.hudi.hadoop.HoodieParquetInputFormat import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils -import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} +import org.apache.hudi.keygen.ComplexKeyGenerator +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.TableIdentifier @@ -36,19 +40,17 @@ import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.spark.sql.hudi.HoodieOptionConfig import org.apache.spark.sql.hudi.HoodieSqlUtils._ -import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.{initTableIfNeed, isEmptyPath} +import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.checkTableConfigEqual import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkConf} -import java.util.{Locale, Properties} -import org.apache.hudi.exception.HoodieException -import org.apache.hudi.keygen.ComplexKeyGenerator -import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory +import java.util.{Locale, Properties} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.util.control.NonFatal /** * Command for create hoodie table. @@ -56,9 +58,11 @@ import scala.collection.mutable case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean) extends RunnableCommand with SparkAdapterSupport { - override def run(sparkSession: SparkSession): Seq[Row] = { - val tableName = table.identifier.unquotedString + val tableName = formatName(table.identifier.table) + val tblProperties = table.storage.properties ++ table.properties + + override def run(sparkSession: SparkSession): Seq[Row] = { val tableIsExists = sparkSession.sessionState.catalog.tableExists(table.identifier) if (tableIsExists) { if (ignoreIfExists) { @@ -66,64 +70,95 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean return Seq.empty[Row] // scalastyle:on } else { - throw new IllegalArgumentException(s"Table $tableName already exists.") + throw new IllegalArgumentException(s"Table ${table.identifier.unquotedString} already exists.") } } - // Create table in the catalog - val createTable = createTableInCatalog(sparkSession) + + // get schema with meta fields, table config if hudi table exists, options including + // table configs and properties of the catalog table + val path = getTableLocation(table, sparkSession) + val (finalSchema, existingTableConfig, tableSqlOptions) = parseSchemaAndConfigs(sparkSession, path) + // Init the hoodie.properties - initTableIfNeed(sparkSession, createTable) + initTableIfNeed(sparkSession, path, finalSchema, existingTableConfig, tableSqlOptions) + + try { + // Create table in the catalog + createTableInCatalog(sparkSession, finalSchema, tableSqlOptions) + } catch { + case NonFatal(e) => + logWarning(s"Failed to create catalog table in metastore: ${e.getMessage}") + } + Seq.empty[Row] } - def createTableInCatalog(sparkSession: SparkSession, - checkPathForManagedTable: Boolean = true): CatalogTable = { + def parseSchemaAndConfigs(sparkSession: SparkSession, path: String, ctas: Boolean = false) + : (StructType, Map[String, String], Map[String, String]) = { + val resolver = sparkSession.sessionState.conf.resolver + val conf = sparkSession.sessionState.newHadoopConf + // if CTAS, we treat the table we just created as nonexistent + val isTableExists = if (ctas) false else tableExistsInPath(path, conf) + var existingTableConfig = Map.empty[String, String] + val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(tblProperties) + val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(tblProperties) + + // get final schema and parameters + val (finalSchema, tableSqlOptions) = (table.tableType, isTableExists) match { + case (CatalogTableType.EXTERNAL, true) => + // If this is an external table & the table has already exists in the location, + // load the schema from the table meta. + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(path) + .setConf(conf) + .build() + val tableSchema = getTableSqlSchema(metaClient) + existingTableConfig = metaClient.getTableConfig.getProps.asScala.toMap + validateTableConfig(sparkSession, catalogTableProps, convertMapToHoodieConfig(existingTableConfig)) + + val options = extraTableConfig(sparkSession, isTableExists, existingTableConfig) ++ + sqlOptions ++ HoodieOptionConfig.mappingTableConfigToSqlOption(existingTableConfig) + + val userSpecifiedSchema = table.schema + val schema = if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) { + tableSchema.get + } else if (userSpecifiedSchema.nonEmpty) { + userSpecifiedSchema + } else { + throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName") + } + + (addMetaFields(schema), options) + + case (_, false) => + assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName") + val schema = table.schema + val options = extraTableConfig(sparkSession, isTableExists = false) ++ sqlOptions + (addMetaFields(schema), options) + + case (CatalogTableType.MANAGED, true) => + throw new AnalysisException(s"Can not create the managed table('$tableName')" + + s". The associated location('$path') already exists.") + } + HoodieOptionConfig.validateTable(sparkSession, finalSchema, tableSqlOptions) + + val dataSchema = finalSchema.filterNot { f => + table.partitionColumnNames.exists(resolver(_, f.name)) + } + verifyDataSchema(table.identifier, table.tableType, dataSchema) + + (finalSchema, existingTableConfig, tableSqlOptions) + } + + def createTableInCatalog(sparkSession: SparkSession, finalSchema: StructType, + options: Map[String, String]): Unit = { assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) val sessionState = sparkSession.sessionState - val tableName = table.identifier.unquotedString val path = getTableLocation(table, sparkSession) val conf = sparkSession.sessionState.newHadoopConf() - val isTableExists = tableExistsInPath(path, conf) - // Get the schema & table options - val (newSchema, tableOptions) = if (table.tableType == CatalogTableType.EXTERNAL && - isTableExists) { - // If this is an external table & the table has already exists in the location, - // load the schema from the table meta. - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(path) - .setConf(conf) - .build() - val tableSchema = getTableSqlSchema(metaClient) - - // Get options from the external table and append with the options in ddl. - val originTableConfig = HoodieOptionConfig.mappingTableConfigToSqlOption( - metaClient.getTableConfig.getProps.asScala.toMap) - val extraConfig = extraTableConfig(sparkSession, isTableExists, originTableConfig) - val options = originTableConfig ++ table.storage.properties ++ extraConfig - - val userSpecifiedSchema = table.schema - if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) { - (addMetaFields(tableSchema.get), options) - } else if (userSpecifiedSchema.nonEmpty) { - (addMetaFields(userSpecifiedSchema), options) - } else { - throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName") - } - } else { - assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName") - // SPARK-19724: the default location of a managed table should be non-existent or empty. - if (checkPathForManagedTable && table.tableType == CatalogTableType.MANAGED - && !isEmptyPath(path, conf)) { - throw new AnalysisException(s"Can not create the managed table('$tableName')" + - s". The associated location('$path') already exists.") - } - // Add the meta fields to the schema if this is a managed table or an empty external table. - val options = table.storage.properties ++ extraTableConfig(sparkSession, false) - (addMetaFields(table.schema), options) - } - val tableType = HoodieOptionConfig.getTableType(table.storage.properties) + val tableType = HoodieOptionConfig.getTableType(options) val inputFormat = tableType match { case DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL => classOf[HoodieParquetInputFormat].getCanonicalName @@ -134,31 +169,39 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean val outputFormat = HoodieInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET) val serdeFormat = HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET) - val newStorage = new CatalogStorageFormat(Some(new Path(path).toUri), - Some(inputFormat), Some(outputFormat), Some(serdeFormat), - table.storage.compressed, tableOptions + ("path" -> path)) + // only parameters irrelevant to hudi can be set to storage.properties + val storageProperties = HoodieOptionConfig.deleteHooideOptions(options) + val newStorage = new CatalogStorageFormat( + Some(new Path(path).toUri), + Some(inputFormat), + Some(outputFormat), + Some(serdeFormat), + table.storage.compressed, + storageProperties + ("path" -> path)) val newDatabaseName = formatName(table.identifier.database .getOrElse(sessionState.catalog.getCurrentDatabase)) - val newTableName = formatName(table.identifier.table) val newTableIdentifier = table.identifier - .copy(table = newTableName, database = Some(newDatabaseName)) - - val newTable = table.copy(identifier = newTableIdentifier, - schema = newSchema, storage = newStorage, createVersion = SPARK_VERSION) - // validate the table - validateTable(newTable) + .copy(table = tableName, database = Some(newDatabaseName)) + + // append pk, preCombineKey, type to the properties of table + val newTblProperties = table.storage.properties ++ table.properties ++ HoodieOptionConfig.extractSqlOptions(options) + val newTable = table.copy( + identifier = newTableIdentifier, + schema = finalSchema, + storage = newStorage, + createVersion = SPARK_VERSION, + properties = newTblProperties + ) // Create table in the catalog val enableHive = isEnableHive(sparkSession) if (enableHive) { createHiveDataSourceTable(newTable, sparkSession) } else { - sessionState.catalog.createTable(newTable, ignoreIfExists = false, - validateLocation = checkPathForManagedTable) + sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) } - newTable } /** @@ -170,8 +213,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean * @param sparkSession */ private def createHiveDataSourceTable(table: CatalogTable, sparkSession: SparkSession): Unit = { - // check schema - verifyDataSchema(table.identifier, table.tableType, table.schema) val dbName = table.identifier.database.get // check database val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName) @@ -186,7 +227,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean val dataSourceProps = tableMetaToTableProps(sparkSession.sparkContext.conf, table, table.schema) - val tableWithDataSourceProps = table.copy(properties = dataSourceProps) + val tableWithDataSourceProps = table.copy(properties = dataSourceProps ++ table.properties) val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf, sparkSession.sessionState.newHadoopConf()) // create hive table. @@ -198,9 +239,8 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean } // This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema - private def verifyDataSchema(tableName: TableIdentifier, - tableType: CatalogTableType, - dataSchema: StructType): Unit = { + private def verifyDataSchema(tableName: TableIdentifier, tableType: CatalogTableType, + dataSchema: Seq[StructField]): Unit = { if (tableType != CatalogTableType.VIEW) { val invalidChars = Seq(",", ":", ";") def verifyNestedColumnNames(schema: StructType): Unit = schema.foreach { f => @@ -230,10 +270,10 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean } } } + // This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#tableMetaToTableProps - private def tableMetaToTableProps( sparkConf: SparkConf, - table: CatalogTable, - schema: StructType): Map[String, String] = { + private def tableMetaToTableProps(sparkConf: SparkConf, table: CatalogTable, + schema: StructType): Map[String, String] = { val partitionColumns = table.partitionColumnNames val bucketSpec = table.bucketSpec @@ -280,24 +320,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean properties.toMap } - private def validateTable(table: CatalogTable): Unit = { - val options = table.storage.properties - // validate the pk if it exist in the table. - HoodieOptionConfig.getPrimaryColumns(options).foreach(pk => table.schema.fieldIndex(pk)) - // validate the version column if it exist in the table. - HoodieOptionConfig.getPreCombineField(options).foreach(v => table.schema.fieldIndex(v)) - // validate the partition columns - table.partitionColumnNames.foreach(p => table.schema.fieldIndex(p)) - // validate table type - options.get(HoodieOptionConfig.SQL_KEY_TABLE_TYPE.sqlKeyName).foreach { tableType => - ValidationUtils.checkArgument( - tableType.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW) || - tableType.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR), - s"'type' must be '${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW}' or " + - s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'") - } - } - def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean, originTableConfig: Map[String, String] = Map.empty): Map[String, String] = { val extraConfig = mutable.Map.empty[String, String] @@ -322,10 +344,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue() } - val primaryColumns = HoodieOptionConfig.getPrimaryColumns(originTableConfig ++ table.storage.properties) - if (primaryColumns.isEmpty) { - extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[UuidKeyGenerator].getCanonicalName - } else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { + if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) @@ -334,31 +353,20 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean } extraConfig.toMap } -} - -object CreateHoodieTableCommand extends Logging { /** * Init the hoodie.properties. */ - def initTableIfNeed(sparkSession: SparkSession, table: CatalogTable): Unit = { - val location = getTableLocation(table, sparkSession) + def initTableIfNeed(sparkSession: SparkSession, + location: String, + schema: StructType, + originTableConfig: Map[String, String], + sqlOptions: Map[String, String]): Unit = { + logInfo(s"Init hoodie.properties for $tableName") val conf = sparkSession.sessionState.newHadoopConf() - // Init the hoodie table - val originTableConfig = if (tableExistsInPath(location, conf)) { - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(location) - .setConf(conf) - .build() - metaClient.getTableConfig.getProps.asScala.toMap - } else { - Map.empty[String, String] - } - val tableName = table.identifier.table - logInfo(s"Init hoodie.properties for $tableName") - val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(table.storage.properties) + val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key) checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key) @@ -372,10 +380,13 @@ object CreateHoodieTableCommand extends Logging { HoodieTableMetaClient.withPropertyBuilder() .fromProperties(properties) .setTableName(tableName) - .setTableCreateSchema(SchemaConverters.toAvroType(table.schema).toString()) + .setTableCreateSchema(SchemaConverters.toAvroType(schema).toString()) .setPartitionFields(table.partitionColumnNames.mkString(",")) .initTable(conf, location) } +} + +object CreateHoodieTableCommand extends Logging { def checkTableConfigEqual(originTableConfig: Map[String, String], newTableConfig: Map[String, String], configKey: String): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index 987ce0e050be4..e8acebd787073 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -23,11 +23,12 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hudi.HoodieOptionConfig import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.types.StructType case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends RunnableCommand with SparkAdapterSupport { @@ -56,8 +57,8 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab } private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = { - val targetTable = sparkSession.sessionState.catalog - .getTableMetadata(tableId) + val targetTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) + val tblProperties = targetTable.storage.properties ++ targetTable.properties val path = getTableLocation(targetTable, sparkSession) val conf = sparkSession.sessionState.newHadoopConf() val metaClient = HoodieTableMetaClient.builder() @@ -65,23 +66,27 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab .setConf(conf) .build() val tableConfig = metaClient.getTableConfig - val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties) + val tableSchema = getTableSqlSchema(metaClient).get + val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) + val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) + val primaryColumns = tableConfig.getRecordKeyFields.get() assert(primaryColumns.nonEmpty, s"There are no primary key defined in table $tableId, cannot execute delete operator") - withSparkConf(sparkSession, targetTable.storage.properties) { + withSparkConf(sparkSession, tblProperties) { Map( "path" -> path, TBL_NAME.key -> tableId.table, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning, - KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName, + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL + SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 2b88373115b71..ac3fce5315680 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, IndexedRecord} + import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} @@ -30,7 +31,8 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.sql.InsertMode -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} @@ -40,10 +42,13 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import java.util.Properties +import scala.collection.JavaConverters._ + /** * Command for insert into hoodie table. */ @@ -194,46 +199,39 @@ object InsertIntoHoodieTableCommand extends Logging { s"[${insertPartitions.keys.mkString(" " )}]" + s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]") } - val options = table.storage.properties ++ extraOptions - val parameters = withSparkConf(sparkSession, options)() - - val tableType = parameters.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue) - val primaryColumns = HoodieOptionConfig.getPrimaryColumns(options) - val partitionFields = table.partitionColumnNames.mkString(",") - val path = getTableLocation(table, sparkSession) val conf = sparkSession.sessionState.newHadoopConf() val isTableExists = tableExistsInPath(path, conf) - val tableConfig = if (isTableExists) { - HoodieTableMetaClient.builder() + val (tableConfig, tableSchema) = if (isTableExists) { + val metaClient = HoodieTableMetaClient.builder() .setBasePath(path) .setConf(conf) .build() - .getTableConfig - } else { - null - } - val hiveStylePartitioningEnable = if (null == tableConfig || null == tableConfig.getHiveStylePartitioningEnable) { - "true" + (metaClient.getTableConfig, getTableSqlSchema(metaClient).get) } else { - tableConfig.getHiveStylePartitioningEnable + (new HoodieTableConfig(), table.schema) } - val urlEncodePartitioning = if (null == tableConfig || null == tableConfig.getUrlEncodePartitoning) { - "false" + val partitionColumns = tableConfig.getPartitionFieldProp + val partitionSchema = if (null == partitionColumns || partitionColumns.isEmpty) { + table.partitionSchema } else { - tableConfig.getUrlEncodePartitoning - } - val keyGeneratorClassName = if (null == tableConfig || null == tableConfig.getKeyGeneratorClassName) { - if (primaryColumns.nonEmpty) { - classOf[ComplexKeyGenerator].getCanonicalName - } else { - classOf[UuidKeyGenerator].getCanonicalName - } - } else { - tableConfig.getKeyGeneratorClassName + StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) } - val tableSchema = table.schema + val options = table.storage.properties ++ table.properties ++ tableConfig.getProps.asScala.toMap ++ extraOptions + val parameters = withSparkConf(sparkSession, options)() + + val tableName = Option(tableConfig.getTableName).getOrElse(table.identifier.table) + val tableType = Option(tableConfig.getTableType.name).getOrElse(TABLE_TYPE.defaultValue) + val primaryColumns = tableConfig.getRecordKeyFields.orElse(HoodieOptionConfig.getPrimaryColumns(options)) + val preCombineColumn = Option(tableConfig.getPreCombineField) + .getOrElse(HoodieOptionConfig.getPreCombineField(options).getOrElse("")) + val partitionFields = Option(tableConfig.getPartitionFieldProp) + .getOrElse(table.partitionColumnNames.mkString(",")) + val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true") + val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitoning).getOrElse("false") + val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName) + .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName) val dropDuplicate = sparkSession.conf .getOption(INSERT_DROP_DUPS.key) @@ -242,35 +240,33 @@ object InsertIntoHoodieTableCommand extends Logging { val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean + val hasPrecombineColumn = preCombineColumn.nonEmpty val isPartitionedTable = table.partitionColumnNames.nonEmpty - val isPrimaryKeyTable = primaryColumns.nonEmpty val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key, DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue())) val isNonStrictMode = insertMode == InsertMode.NON_STRICT val operation = - (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match { - case (true, true, _, _) if !isNonStrictMode => + (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match { + case (true, _, _, false, _) => throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert in ${insertMode.value()} mode.") - case (_, true, true, _) if isPartitionedTable => + case (true, true, _, _, true) => throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.") - case (_, true, _, true) => + case (true, _, true, _, _) => throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." + s" Please disable $INSERT_DROP_DUPS and try again.") // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table. - case (_, true, true, _) if !isPartitionedTable => BULK_INSERT_OPERATION_OPT_VAL - // insert overwrite partition - case (_, _, true, _) if isPartitionedTable => INSERT_OVERWRITE_OPERATION_OPT_VAL + case (true, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL // insert overwrite table - case (_, _, true, _) if !isPartitionedTable => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL - // if it is pk table and the dropDuplicate has disable, use the upsert operation for strict and upsert mode. - case (true, false, false, false) if !isNonStrictMode => UPSERT_OPERATION_OPT_VAL - // if enableBulkInsert is true and the table is non-primaryKeyed, use the bulk insert operation - case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL + case (false, true, _, _, false) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL + // insert overwrite partition + case (_, true, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL + // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode. + case (false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode. - case (true, true, _, _) if isNonStrictMode => BULK_INSERT_OPERATION_OPT_VAL + case (true, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL // for the rest case, use the insert operation - case (_, _, _, _) => INSERT_OPERATION_OPT_VAL + case _ => INSERT_OPERATION_OPT_VAL } val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL && @@ -288,17 +284,18 @@ object InsertIntoHoodieTableCommand extends Logging { Map( "path" -> path, TABLE_TYPE.key -> tableType, - TBL_NAME.key -> table.identifier.table, - PRECOMBINE_FIELD.key -> tableSchema.fields.last.name, + TBL_NAME.key -> tableName, + PRECOMBINE_FIELD.key -> preCombineColumn, OPERATION.key -> operation, HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> keyGeneratorClassName, + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName, RECORDKEY_FIELD.key -> primaryColumns.mkString(","), PARTITIONPATH_FIELD.key -> partitionFields, PAYLOAD_CLASS_NAME.key -> payloadClassName, ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, - HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> isPrimaryKeyTable.toString, + HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), META_SYNC_ENABLED.key -> enableHive.toString, HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 5ec15ce4d84fd..251ebc32d828b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor @@ -80,8 +81,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab private lazy val targetTable = sparkSession.sessionState.catalog.getTableMetadata(targetTableIdentify) - private lazy val targetTableType = - HoodieOptionConfig.getTableType(targetTable.storage.properties) + private lazy val tblProperties = targetTable.storage.properties ++ targetTable.properties + + private lazy val targetTableType = HoodieOptionConfig.getTableType(tblProperties) /** * @@ -124,7 +126,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}") val updateAction = updateActions.headOption - HoodieOptionConfig.getPreCombineField(targetTable.storage.properties).map(preCombineField => { + HoodieOptionConfig.getPreCombineField(tblProperties).map(preCombineField => { val sourcePreCombineField = updateAction.map(u => u.assignments.filter { case Assignment(key: AttributeReference, _) => key.name.equalsIgnoreCase(preCombineField) @@ -242,8 +244,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab // Append the table schema to the parameters. In the case of merge into, the schema of sourceDF // may be different from the target table, because the are transform logical in the update or // insert actions. + val operation = if (StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) { + INSERT_OPERATION_OPT_VAL + } else { + UPSERT_OPERATION_OPT_VAL + } var writeParams = parameters + - (OPERATION.key -> UPSERT_OPERATION_OPT_VAL) + + (OPERATION.key -> operation) + (HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString) + (DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType) @@ -436,38 +443,38 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab .setConf(conf) .build() val tableConfig = metaClient.getTableConfig - val options = targetTable.storage.properties - val definedPk = HoodieOptionConfig.getPrimaryColumns(options) - // TODO Currently the mergeEqualConditionKeys must be the same the primary key. - if (targetKey2SourceExpression.keySet != definedPk.toSet) { - throw new IllegalArgumentException(s"Merge Key[${targetKey2SourceExpression.keySet.mkString(",")}] is not" + - s" Equal to the defined primary key[${definedPk.mkString(",")}] in table $targetTableName") - } + val tableSchema = getTableSqlSchema(metaClient).get + val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) + val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) + val options = tblProperties + val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("") + // Enable the hive sync by default if spark have enable the hive metastore. val enableHive = isEnableHive(sparkSession) withSparkConf(sparkSession, options) { Map( "path" -> path, - RECORDKEY_FIELD.key -> targetKey2SourceExpression.keySet.mkString(","), - PRECOMBINE_FIELD.key -> targetKey2SourceExpression.keySet.head, // set a default preCombine field + RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, + PRECOMBINE_FIELD.key -> preCombineColumn, TBL_NAME.key -> targetTableName, - PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning, - KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName, + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, META_SYNC_ENABLED.key -> enableHive.toString, HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", HIVE_DATABASE.key -> targetTableDb, HIVE_TABLE.key -> targetTableName, HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", - HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","), + HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", // set the default parallelism to 200 for sql HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL + SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index b1c8a04429e27..0c7d1ef0b4cbe 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -18,20 +18,20 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode -import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable} import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hudi.HoodieOptionConfig import org.apache.spark.sql.hudi.HoodieSqlUtils._ -import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.{StructField, StructType} import scala.collection.JavaConverters._ @@ -83,8 +83,8 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo } private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = { - val targetTable = sparkSession.sessionState.catalog - .getTableMetadata(tableId) + val targetTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) + val tblProperties = targetTable.storage.properties ++ targetTable.properties val path = getTableLocation(targetTable, sparkSession) val conf = sparkSession.sessionState.newHadoopConf() val metaClient = HoodieTableMetaClient.builder() @@ -92,32 +92,37 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo .setConf(conf) .build() val tableConfig = metaClient.getTableConfig - val primaryColumns = HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties) - + val tableSchema = getTableSqlSchema(metaClient).get + val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) + val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) + val primaryColumns = tableConfig.getRecordKeyFields.get() + val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("") assert(primaryColumns.nonEmpty, s"There are no primary key in table $tableId, cannot execute update operator") val enableHive = isEnableHive(sparkSession) - withSparkConf(sparkSession, targetTable.storage.properties) { + + withSparkConf(sparkSession, tblProperties) { Map( "path" -> path, RECORDKEY_FIELD.key -> primaryColumns.mkString(","), - PRECOMBINE_FIELD.key -> primaryColumns.head, //set the default preCombine field. + PRECOMBINE_FIELD.key -> preCombineColumn, TBL_NAME.key -> tableId.table, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning, - KEYGENERATOR_CLASS_NAME.key -> tableConfig.getKeyGeneratorClassName, - OPERATION.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> targetTable.partitionColumnNames.mkString(","), + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + OPERATION.key -> UPSERT_OPERATION_OPT_VAL, + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, META_SYNC_ENABLED.key -> enableHive.toString, HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", HIVE_DATABASE.key -> tableId.database.getOrElse("default"), HIVE_TABLE.key -> tableId.table, - HIVE_PARTITION_FIELDS.key -> targetTable.partitionColumnNames.mkString(","), + HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> targetTable.partitionSchema.toDDL + SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java index 0557d70405c4c..9372a36f4d78e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java @@ -206,7 +206,7 @@ public void run() throws Exception { .option(DataSourceWriteOptions.OPERATION().key(), "delete") .option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "_row_key") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp") .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql index 280fde59ff30a..135c83b4b975e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql @@ -18,7 +18,7 @@ set hoodie.delete.shuffle.parallelism = 1; # CTAS -create table h0 using hudi options(type = '${tableType}') +create table h0 using hudi options(type = '${tableType}', primaryKey = 'id') as select 1 as id, 'a1' as name, 10 as price; +----------+ | ok | @@ -30,7 +30,7 @@ select id, name, price from h0; +-----------+ create table h0_p using hudi partitioned by(dt) -options(type = '${tableType}') +options(type = '${tableType}', primaryKey = 'id') as select cast('2021-05-07 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as price; +----------+ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala new file mode 100644 index 0000000000000..e64f96ff8d0f4 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite2.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} + +import org.apache.spark.sql.hudi.command.SqlKeyGenerator + +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +class HoodieSparkSqlWriterSuite2 { + + @Test + def testGetOriginKeyGenerator(): Unit = { + // for dataframe write + val m1 = Map( + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[ComplexKeyGenerator].getName + ) + val kg1 = HoodieWriterUtils.getOriginKeyGenerator(m1) + assertTrue(kg1 == classOf[ComplexKeyGenerator].getName) + + // for sql write + val m2 = Map( + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName, + SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName + ) + val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2) + assertTrue(kg2 == classOf[SimpleKeyGenerator].getName) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index d6ae80d09af58..a4c98665ebdba 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -102,7 +102,10 @@ class TestDataSourceForBootstrap { .save(srcPath) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, + extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + ) // Read bootstrapped table and verify count var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala index e1bc4a1f71764..e2521047f68cf 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -38,7 +38,7 @@ class TestAlterTable extends TestHoodieSqlBase { | ts long |) using hudi | location '$tablePath' - | options ( + | tblproperties ( | type = '$tableType', | primaryKey = 'id', | preCombineField = 'ts' @@ -127,7 +127,7 @@ class TestAlterTable extends TestHoodieSqlBase { | dt string |) using hudi | location '${tmp.getCanonicalPath}/$partitionedTable' - | options ( + | tblproperties ( | type = '$tableType', | primaryKey = 'id', | preCombineField = 'ts' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index b2ada77c21941..8cc092c2e5f08 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -38,7 +38,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { | dt string | ) | using hudi - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' | ) @@ -77,7 +77,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - | options ( + |tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' |) @@ -105,7 +105,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { | dt string | ) | using hudi - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' | ) @@ -151,7 +151,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - | options ( + |tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' |) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala index e40a484210188..20238a6e4318d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala @@ -31,7 +31,7 @@ class TestCompactionTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}' - | options ( + | tblproperties ( | primaryKey ='id', | type = 'mor', | preCombineField = 'ts' @@ -82,7 +82,7 @@ class TestCompactionTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}' - | options ( + | tblproperties ( | primaryKey ='id', | type = 'mor', | preCombineField = 'ts' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 2af8fd782cb86..14bc70428df86 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -43,7 +43,7 @@ class TestCreateTable extends TestHoodieSqlBase { | price double, | ts long | ) using hudi - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' | ) @@ -62,6 +62,53 @@ class TestCreateTable extends TestHoodieSqlBase { )(table.schema.fields) } + test("Test Create Hoodie Table With Options") { + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + | ) using hudi + | partitioned by (dt) + | options ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assertResult(table.properties("type"))("cow") + assertResult(table.properties("primaryKey"))("id") + assertResult(table.properties("preCombineField"))("ts") + assertResult(tableName)(table.identifier.table) + assertResult("hudi")(table.provider.get) + assertResult(CatalogTableType.MANAGED)(table.tableType) + assertResult( + HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType)) + ++ Seq( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("price", DoubleType), + StructField("ts", LongType), + StructField("dt", StringType)) + )(table.schema.fields) + + val tablePath = table.storage.properties("path") + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap + assertResult(true)(tableConfig.contains(HoodieTableConfig.CREATE_SCHEMA.key)) + assertResult("dt")(tableConfig(HoodieTableConfig.PARTITION_FIELDS.key)) + assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key)) + assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key)) + assertResult(classOf[ComplexKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) + } + test("Test Create External Hoodie Table") { withTempDir { tmp => // Test create cow table. @@ -74,7 +121,7 @@ class TestCreateTable extends TestHoodieSqlBase { | price double, | ts long |) using hudi - | options ( + | tblproperties ( | primaryKey = 'id,name', | type = 'cow' | ) @@ -93,8 +140,8 @@ class TestCreateTable extends TestHoodieSqlBase { StructField("price", DoubleType), StructField("ts", LongType)) )(table.schema.fields) - assertResult(table.storage.properties("type"))("cow") - assertResult(table.storage.properties("primaryKey"))("id,name") + assertResult(table.properties("type"))("cow") + assertResult(table.properties("primaryKey"))("id,name") spark.sql(s"drop table $tableName") // Test create mor partitioned table @@ -108,15 +155,15 @@ class TestCreateTable extends TestHoodieSqlBase { | dt string |) using hudi | partitioned by (dt) - | options ( + | tblproperties ( | primaryKey = 'id', | type = 'mor' | ) | location '${tmp.getCanonicalPath}/h0' """.stripMargin) val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) - assertResult(table2.storage.properties("type"))("mor") - assertResult(table2.storage.properties("primaryKey"))("id") + assertResult(table2.properties("type"))("mor") + assertResult(table2.properties("primaryKey"))("id") assertResult(Seq("dt"))(table2.partitionColumnNames) assertResult(classOf[HoodieParquetRealtimeInputFormat].getCanonicalName)(table2.storage.inputFormat.get) @@ -129,8 +176,8 @@ class TestCreateTable extends TestHoodieSqlBase { |location '${tmp.getCanonicalPath}/h0' """.stripMargin) val table3 = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName3)) - assertResult(table3.storage.properties("type"))("mor") - assertResult(table3.storage.properties("primaryKey"))("id") + assertResult(table3.properties("type"))("mor") + assertResult(table3.properties("primaryKey"))("id") assertResult( HoodieRecord.HOODIE_META_COLUMNS.asScala.map(StructField(_, StringType)) ++ Seq( @@ -156,7 +203,7 @@ class TestCreateTable extends TestHoodieSqlBase { | price double, | ts long |) using hudi - | options ( + | tblproperties ( | primaryKey = 'id1', | type = 'cow' | ) @@ -173,7 +220,7 @@ class TestCreateTable extends TestHoodieSqlBase { | price double, | ts long |) using hudi - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts1', | type = 'cow' @@ -191,7 +238,7 @@ class TestCreateTable extends TestHoodieSqlBase { | price double, | ts long |) using hudi - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts', | type = 'cow1' @@ -208,7 +255,8 @@ class TestCreateTable extends TestHoodieSqlBase { val tableName1 = generateTableName spark.sql( s""" - |create table $tableName1 using hudi + | create table $tableName1 using hudi + | tblproperties(primaryKey = 'id') | location '${tmp.getCanonicalPath}/$tableName1' | AS | select 1 as id, 'a1' as name, 10 as price, 1000 as ts @@ -223,6 +271,7 @@ class TestCreateTable extends TestHoodieSqlBase { s""" | create table $tableName2 using hudi | partitioned by (dt) + | tblproperties(primaryKey = 'id') | location '${tmp.getCanonicalPath}/$tableName2' | AS | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt @@ -240,7 +289,7 @@ class TestCreateTable extends TestHoodieSqlBase { s""" | create table $tableName3 using hudi | partitioned by (dt) - | options(primaryKey = 'id') + | tblproperties(primaryKey = 'id') | location '${tmp.getCanonicalPath}/$tableName3' | AS | select null as id, 'a1' as name, 10 as price, '2021-05-07' as dt @@ -252,6 +301,7 @@ class TestCreateTable extends TestHoodieSqlBase { s""" | create table $tableName3 using hudi | partitioned by (dt) + | tblproperties(primaryKey = 'id') | location '${tmp.getCanonicalPath}/$tableName3' | AS | select cast('2021-05-06 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as @@ -267,6 +317,7 @@ class TestCreateTable extends TestHoodieSqlBase { s""" | create table $tableName4 using hudi | partitioned by (dt) + | tblproperties(primaryKey = 'id') | location '${tmp.getCanonicalPath}/$tableName4' | AS | select cast('2021-05-06' as date) as dt, 1 as id, 'a1' as name, 10 as @@ -303,7 +354,7 @@ class TestCreateTable extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - | options ( + |tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' |) @@ -380,7 +431,7 @@ class TestCreateTable extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - | options ( + |tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' |) @@ -455,7 +506,7 @@ class TestCreateTable extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - | options ( + |tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' |) @@ -514,6 +565,7 @@ class TestCreateTable extends TestHoodieSqlBase { | name string, | price double |) using hudi + |tblproperties(primaryKey = 'id') |""".stripMargin ) @@ -527,6 +579,7 @@ class TestCreateTable extends TestHoodieSqlBase { | name string, | price double |) using hudi + |tblproperties(primaryKey = 'id') |""".stripMargin ) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala index 9ad717aba457f..6137c4c6394b0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala @@ -33,7 +33,7 @@ class TestDeleteTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | type = '$tableType', | primaryKey = 'id', | preCombineField = 'ts' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala new file mode 100644 index 0000000000000..f91388ae9a10e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.testutils.HoodieClientTestBase + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types._ + +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.{BeforeEach, Test} + +import org.scalatest.Matchers.intercept + +class TestHoodieOptionConfig extends HoodieClientTestBase { + + var spark: SparkSession = _ + + /** + * Setup method running before each test. + */ + @BeforeEach override def setUp() { + initSparkContexts() + spark = sqlContext.sparkSession + } + + @Test + def testWithDefaultSqlOptions(): Unit = { + val ops1 = Map("primaryKey" -> "id") + val with1 = HoodieOptionConfig.withDefaultSqlOptions(ops1) + assertTrue(with1.size == 3) + assertTrue(with1("primaryKey") == "id") + assertTrue(with1("type") == "cow") + assertTrue(with1("payloadClass") == classOf[DefaultHoodieRecordPayload].getName) + + val ops2 = Map("primaryKey" -> "id", + "preCombineField" -> "timestamp", + "type" -> "mor", + "payloadClass" -> classOf[OverwriteWithLatestAvroPayload].getName + ) + val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2) + assertTrue(ops2 == with2) + } + + @Test + def testMappingSqlOptionToTableConfig(): Unit = { + val sqlOptions = Map("primaryKey" -> "id,addr", + "preCombineField" -> "timestamp", + "type" -> "mor", + "hoodie.index.type" -> "INMEMORY", + "hoodie.compact.inline" -> "true" + ) + val tableConfigs = HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) + + assertTrue(tableConfigs.size == 5) + assertTrue(tableConfigs(HoodieTableConfig.RECORDKEY_FIELDS.key) == "id,addr") + assertTrue(tableConfigs(HoodieTableConfig.PRECOMBINE_FIELD.key) == "timestamp") + assertTrue(tableConfigs(HoodieTableConfig.TYPE.key) == "MERGE_ON_READ") + assertTrue(tableConfigs("hoodie.index.type") == "INMEMORY") + assertTrue(tableConfigs("hoodie.compact.inline") == "true") + } + + @Test + def testDeleteHooideOptions(): Unit = { + val sqlOptions = Map("primaryKey" -> "id,addr", + "preCombineField" -> "timestamp", + "type" -> "mor", + "hoodie.index.type" -> "INMEMORY", + "hoodie.compact.inline" -> "true", + "key123" -> "value456" + ) + val tableConfigs = HoodieOptionConfig.deleteHooideOptions(sqlOptions) + assertTrue(tableConfigs.size == 1) + assertTrue(tableConfigs("key123") == "value456") + } + + @Test + def testExtractSqlOptions(): Unit = { + val sqlOptions = Map("primaryKey" -> "id,addr", + "preCombineField" -> "timestamp", + "type" -> "mor", + "hoodie.index.type" -> "INMEMORY", + "hoodie.compact.inline" -> "true", + "key123" -> "value456" + ) + val tableConfigs = HoodieOptionConfig.extractSqlOptions(sqlOptions) + assertTrue(tableConfigs.size == 3) + assertTrue(tableConfigs.keySet == Set("primaryKey", "preCombineField", "type")) + } + + @Test + def testValidateTable(): Unit = { + val baseSqlOptions = Map( + "hoodie.datasource.write.hive_style_partitioning" -> "true", + "hoodie.datasource.write.partitionpath.urlencode" -> "false", + "hoodie.table.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator" + ) + + val schema = StructType( + Seq(StructField("id", IntegerType, true), + StructField("name", StringType, true), + StructField("timestamp", TimestampType, true), + StructField("dt", StringType, true)) + ) + + // miss primaryKey parameter + val sqlOptions1 = baseSqlOptions ++ Map( + "type" -> "mor" + ) + + val e1 = intercept[IllegalArgumentException] { + HoodieOptionConfig.validateTable(spark, schema, sqlOptions1) + } + assertTrue(e1.getMessage.contains("No `primaryKey` is specified.")) + + // primary field not found + val sqlOptions2 = baseSqlOptions ++ Map( + "primaryKey" -> "xxx", + "type" -> "mor" + ) + val e2 = intercept[IllegalArgumentException] { + HoodieOptionConfig.validateTable(spark, schema, sqlOptions2) + } + assertTrue(e2.getMessage.contains("Can't find primary key")) + + // preCombine field not found + val sqlOptions3 = baseSqlOptions ++ Map( + "primaryKey" -> "id", + "preCombineField" -> "ts", + "type" -> "mor" + ) + val e3 = intercept[IllegalArgumentException] { + HoodieOptionConfig.validateTable(spark, schema, sqlOptions3) + } + assertTrue(e3.getMessage.contains("Can't find precombine key")) + + // miss type parameter + val sqlOptions4 = baseSqlOptions ++ Map( + "primaryKey" -> "id", + "preCombineField" -> "timestamp" + ) + val e4 = intercept[IllegalArgumentException] { + HoodieOptionConfig.validateTable(spark, schema, sqlOptions4) + } + assertTrue(e4.getMessage.contains("No `type` is specified.")) + + // type is invalid + val sqlOptions5 = baseSqlOptions ++ Map( + "primaryKey" -> "id", + "preCombineField" -> "timestamp", + "type" -> "abc" + ) + val e5 = intercept[IllegalArgumentException] { + HoodieOptionConfig.validateTable(spark, schema, sqlOptions5) + } + assertTrue(e5.getMessage.contains("'type' must be 'cow' or 'mor'")) + + // right options and schema + val sqlOptions6 = baseSqlOptions ++ Map( + "primaryKey" -> "id", + "preCombineField" -> "timestamp", + "type" -> "cow" + ) + HoodieOptionConfig.validateTable(spark, schema, sqlOptions6) + } + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 337317b8ef7b9..09711cc8f1bec 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -36,6 +36,7 @@ class TestInsertTable extends TestHoodieSqlBase { | ts long, | dt string |) using hudi + | tblproperties (primaryKey = 'id') | partitioned by (dt) | location '${tmp.getCanonicalPath}' """.stripMargin) @@ -75,7 +76,7 @@ class TestInsertTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | type = 'cow', | primaryKey = 'id', | preCombineField = 'ts' @@ -115,7 +116,7 @@ class TestInsertTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName2' - | options ( + | tblproperties ( | type = 'mor', | primaryKey = 'id', | preCombineField = 'ts' @@ -146,6 +147,7 @@ class TestInsertTable extends TestHoodieSqlBase { | ts long, | dt string |) using hudi + | tblproperties (primaryKey = 'id') | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' """.stripMargin) @@ -191,6 +193,7 @@ class TestInsertTable extends TestHoodieSqlBase { | price double, | ts long | ) using hudi + | tblproperties (primaryKey = 'id') | location '${tmp.getCanonicalPath}/$tblNonPartition' """.stripMargin) spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10, 1000") @@ -245,6 +248,7 @@ class TestInsertTable extends TestHoodieSqlBase { | price double, | dt $partitionType |) using hudi + | tblproperties (primaryKey = 'id') | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' """.stripMargin) @@ -273,6 +277,7 @@ class TestInsertTable extends TestHoodieSqlBase { | name string, | price double |) using hudi + | tblproperties (primaryKey = 'id') | location '${tmp.getCanonicalPath}' """.stripMargin) @@ -293,6 +298,7 @@ class TestInsertTable extends TestHoodieSqlBase { | price double, | dt string |) using hudi + | tblproperties (primaryKey = 'id') | partitioned by (dt) """.stripMargin) checkException(s"insert into $tableName partition(dt = '2021-06-20')" + @@ -305,7 +311,7 @@ class TestInsertTable extends TestHoodieSqlBase { " count: 3,columns: (1,a1,10)" ) spark.sql("set hoodie.sql.bulk.insert.enable = true") - spark.sql("set hoodie.sql.insert.mode= strict") + spark.sql("set hoodie.sql.insert.mode = strict") val tableName2 = generateTableName spark.sql( @@ -316,7 +322,7 @@ class TestInsertTable extends TestHoodieSqlBase { | price double, | ts long |) using hudi - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' | ) @@ -325,6 +331,7 @@ class TestInsertTable extends TestHoodieSqlBase { "Table with primaryKey can not use bulk insert in strict mode." ) + spark.sql("set hoodie.sql.insert.mode = non-strict") val tableName3 = generateTableName spark.sql( s""" @@ -334,16 +341,18 @@ class TestInsertTable extends TestHoodieSqlBase { | price double, | dt string |) using hudi + | tblproperties (primaryKey = 'id') | partitioned by (dt) """.stripMargin) checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")( "Insert Overwrite Partition can not use bulk insert." ) spark.sql("set hoodie.sql.bulk.insert.enable = false") - spark.sql("set hoodie.sql.insert.mode= upsert") + spark.sql("set hoodie.sql.insert.mode = upsert") } test("Test bulk insert") { + spark.sql("set hoodie.sql.insert.mode = non-strict") withTempDir { tmp => Seq("cow", "mor").foreach {tableType => // Test bulk insert for single partition @@ -356,8 +365,9 @@ class TestInsertTable extends TestHoodieSqlBase { | price double, | dt string |) using hudi - | options ( - | type = '$tableType' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' | ) | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' @@ -391,8 +401,9 @@ class TestInsertTable extends TestHoodieSqlBase { | dt string, | hh string |) using hudi - | options ( - | type = '$tableType' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' | ) | partitioned by (dt, hh) | location '${tmp.getCanonicalPath}/$tableMultiPartition' @@ -423,8 +434,9 @@ class TestInsertTable extends TestHoodieSqlBase { | name string, | price double |) using hudi - | options ( - | type = '$tableType' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' | ) | location '${tmp.getCanonicalPath}/$nonPartitionedTable' """.stripMargin) @@ -445,7 +457,7 @@ class TestInsertTable extends TestHoodieSqlBase { s""" |create table $tableName2 |using hudi - |options( + |tblproperties( | type = '$tableType', | primaryKey = 'id' |) @@ -459,9 +471,11 @@ class TestInsertTable extends TestHoodieSqlBase { ) } } + spark.sql("set hoodie.sql.insert.mode = upsert") } test("Test combine before insert") { + spark.sql("set hoodie.sql.bulk.insert.enable = false") withTempDir{tmp => val tableName = generateTableName spark.sql( @@ -473,7 +487,7 @@ class TestInsertTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' | ) @@ -495,6 +509,7 @@ class TestInsertTable extends TestHoodieSqlBase { } test("Test insert pk-table") { + spark.sql("set hoodie.sql.bulk.insert.enable = false") withTempDir{tmp => val tableName = generateTableName spark.sql( @@ -506,7 +521,7 @@ class TestInsertTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' | ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala index d911ace62a23e..5139825f9428f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoLogOnlyTable.scala @@ -34,7 +34,7 @@ class TestMergeIntoLogOnlyTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}' - | options ( + | tblproperties ( | primaryKey ='id', | type = 'mor', | preCombineField = 'ts', diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index bd8558710d0fb..baac82f4bd153 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -35,7 +35,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}' - | options ( + | tblproperties ( | primaryKey ='id', | preCombineField = 'ts' | ) @@ -137,7 +137,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$targetTable' - | options ( + | tblproperties ( | primaryKey ='id', | preCombineField = 'ts' | ) @@ -203,7 +203,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | ts long, | dt string | ) using hudi - | options ( + | tblproperties ( | type = 'mor', | primaryKey = 'id', | preCombineField = 'ts' @@ -313,7 +313,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | price double, | dt string | ) using hudi - | options ( + | tblproperties ( | type = 'mor', | primaryKey = 'id' | ) @@ -369,7 +369,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | v long, | dt string | ) using hudi - | options ( + | tblproperties ( | type = '$tableType', | primaryKey = 'id', | preCombineField = 'v' @@ -439,7 +439,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | price double, | _ts long |) using hudi - |options( + |tblproperties( | type ='$tableType', | primaryKey = 'id', | preCombineField = '_ts' @@ -457,7 +457,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | price double, | _ts long |) using hudi - |options( + |tblproperties( | type ='$tableType', | primaryKey = 'id', | preCombineField = '_ts' @@ -553,7 +553,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | c $dataType |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | primaryKey ='id', | preCombineField = 'c' | ) @@ -604,7 +604,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}' - | options ( + | tblproperties ( | primaryKey ='id', | type = 'mor', | preCombineField = 'ts', @@ -665,7 +665,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | primaryKey ='id', | preCombineField = 'ts' | ) @@ -711,7 +711,7 @@ class TestMergeIntoTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | primaryKey ='id', | preCombineField = 'ts' | ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index bf73251e947d7..072a1257d4069 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -35,7 +35,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { | ts long, | dt string | ) using hudi - | options ( + | tblproperties ( | type = 'mor', | primaryKey = 'id', | preCombineField = 'ts' @@ -145,7 +145,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { spark.sql( s""" |create table $tableName using hudi - |options(primaryKey = 'id') + |tblproperties(primaryKey = 'id') |location '${tmp.getCanonicalPath}' |as |select 1 as id, 'a1' as name @@ -187,7 +187,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { | m_value map, | ts long | ) using hudi - | options ( + | tblproperties ( | type = 'mor', | primaryKey = 'id', | preCombineField = 'ts' @@ -251,7 +251,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { | dt string |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | primaryKey ='id', | preCombineField = 'ts' | ) @@ -333,7 +333,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | primaryKey ='id', | preCombineField = 'ts' | ) @@ -376,7 +376,7 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | primaryKey ='id', | preCombineField = 'ts' | ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala index 357954ebb1d57..2524d04ec81fb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala @@ -32,7 +32,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { | price double, | _ts long |) using hudi - |options( + |tblproperties( | type ='$tableType', | primaryKey = 'id', | preCombineField = '_ts' @@ -60,7 +60,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { | name string, | price double |) using hudi - |options( + |tblproperties( | type ='$tableType', | primaryKey = 'id' |) @@ -92,7 +92,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { | price double, | _ts long |) using hudi - |options( + |tblproperties( | type = 'cow', | primaryKey = 'id', | preCombineField = '_ts' @@ -117,7 +117,7 @@ class TestPartialUpdateForMergeInto extends TestHoodieSqlBase { | price double, | _ts long |) using hudi - |options( + |tblproperties( | type = 'mor', | primaryKey = 'id', | preCombineField = '_ts' diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala index 05ee61c4879fd..19d8d0345c4d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala @@ -32,7 +32,7 @@ class TestShowPartitions extends TestHoodieSqlBase { | price double, | ts long |) using hudi - |options ( + |tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' ) @@ -59,7 +59,7 @@ class TestShowPartitions extends TestHoodieSqlBase { | dt string ) using hudi | partitioned by (dt) - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' | ) @@ -109,7 +109,7 @@ class TestShowPartitions extends TestHoodieSqlBase { | day string | ) using hudi | partitioned by (year, month, day) - | options ( + | tblproperties ( | primaryKey = 'id', | preCombineField = 'ts' | ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index 8e9c81b12cb7a..2537c9c807a8f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -33,7 +33,7 @@ class TestUpdateTable extends TestHoodieSqlBase { | ts long |) using hudi | location '${tmp.getCanonicalPath}/$tableName' - | options ( + | tblproperties ( | type = '$tableType', | primaryKey = 'id', | preCombineField = 'ts'