diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 7bd6dd2244f4f..828e4decd3591 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.catalog -import org.apache.hudi.DataSourceWriteOptions.OPERATION +import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType @@ -198,6 +198,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten } HoodieTableMetaClient.withPropertyBuilder() + .setTableType(tableConfigs.getOrElse(TABLE_TYPE.key, TABLE_TYPE.defaultValue())) .fromProperties(properties) .setDatabaseName(catalogDatabaseName) .setTableName(table.identifier.table) @@ -290,12 +291,18 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten extraConfig(URL_ENCODE_PARTITIONING.key) = URL_ENCODE_PARTITIONING.defaultValue() } - if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { + if (catalogProperties.contains(KEYGENERATOR_CLASS_NAME.key)) { + extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = + catalogProperties.get(KEYGENERATOR_CLASS_NAME.key).get + } else if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) } else { - val primaryKeys = table.properties.getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName, table.storage.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)).toString + var primaryKeys = table.properties.getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName, table.storage.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)).toString + if (primaryKeys.equals("None")){ + primaryKeys = catalogProperties.get(RECORDKEY_FIELD.key).getOrElse(RECORDKEY_FIELD.defaultValue) + } val partitions = table.partitionColumnNames.mkString(",") extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index 7efb60ae0b6a5..04070cf90d3c5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -52,7 +52,6 @@ object HoodieOptionConfig { .withSqlKey("type") .withHoodieKey(DataSourceWriteOptions.TABLE_TYPE.key) .withTableConfigKey(HoodieTableConfig.TYPE.key) - .defaultValue(SQL_VALUE_TABLE_TYPE_COW) .build() val SQL_KEY_PRECOMBINE_FIELD: HoodieSQLOption[String] = buildConf() @@ -195,10 +194,11 @@ object HoodieOptionConfig { // validate primary key val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName) .map(_.split(",").filter(_.length > 0)) - ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.") - primaryKeys.get.foreach { primaryKey => - ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))), - s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.") + if (primaryKeys.nonEmpty) { + primaryKeys.get.foreach { primaryKey => + ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))), + s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.") + } } // validate preCombine key @@ -210,11 +210,13 @@ object HoodieOptionConfig { // validate table type val tableType = sqlOptions.get(SQL_KEY_TABLE_TYPE.sqlKeyName) - ValidationUtils.checkArgument(tableType.nonEmpty, "No `type` is specified.") - ValidationUtils.checkArgument( - tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_COW) || - tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_MOR), - s"'type' must be '$SQL_VALUE_TABLE_TYPE_COW' or '$SQL_VALUE_TABLE_TYPE_MOR'") + + if(tableType.nonEmpty) { + ValidationUtils.checkArgument( + tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_COW) || + tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_MOR), + s"'type' must be '$SQL_VALUE_TABLE_TYPE_COW' or '$SQL_VALUE_TABLE_TYPE_MOR'") + } } def buildConf[T](): HoodieSQLOptionBuilder[T] = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 6a6b41da7fb73..de0b06fb057fe 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -1013,5 +1013,105 @@ class TestCreateTable extends HoodieSparkSqlTestBase { """.stripMargin) checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName) spark.sql(s"drop table $tableName") + + import spark.implicits._ + val partitionValue = "2022-11-05" + val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt") + + // Test key generator class with composite keys by spark dataframe.saveAsTable + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(RECORDKEY_FIELD.key, "id,name") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .partitionBy("dt") + .mode(SaveMode.Overwrite) + .saveAsTable(tableName) + checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName) + spark.sql(s"drop table $tableName") + } + + test("Test CTAS COW Table With DataFrame saveAsTable") { + import spark.implicits._ + val partitionValue = "2022-11-05" + val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt") + + val tableName = generateTableName + // Write a table by spark dataframe.saveAsTable + // Test default table type and key generator class + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .partitionBy("dt") + .mode(SaveMode.Overwrite) + .saveAsTable(tableName) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + val tablePath = table.storage.properties("path") + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap + assertResult(COW_TABLE_TYPE_OPT_VAL)(tableConfig(HoodieTableConfig.TYPE.key())) + + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")( + Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue) + ) + + // Test insert into + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")( + Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue), + Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue) + ) + } + + test("Test CTAS MOR Table With DataFrame saveAsTable") { + import spark.implicits._ + val partitionValue = "2022-11-05" + val df = Seq((1, "a1", 10, 1000, partitionValue)).toDF("id", "name", "value", "ts", "dt") + + val tableName = generateTableName + // Write a table by spark dataframe.saveAsTable + // Test the compatibility of table type and key generator classes in DF with SQL + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .partitionBy("dt") + .mode(SaveMode.Overwrite) + .saveAsTable(tableName) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + val tablePath = table.storage.properties("path") + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + val tableConfig = metaClient.getTableConfig.getProps.asScala.toMap + assertResult(MOR_TABLE_TYPE_OPT_VAL)(tableConfig(HoodieTableConfig.TYPE.key())) + + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")( + Seq("id:1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue) + ) + + // Test insert into + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, '$partitionValue')") + checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id, name, value, ts, dt from $tableName order by id")( + Seq("id:1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue), + Seq("id:2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue) + ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala index 44c23d146c68b..ef8d6d4d771a7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala @@ -31,9 +31,8 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness { def testWithDefaultSqlOptions(): Unit = { val ops1 = Map("primaryKey" -> "id") val with1 = HoodieOptionConfig.withDefaultSqlOptions(ops1) - assertTrue(with1.size == 3) + assertTrue(with1.size == 2) assertTrue(with1("primaryKey") == "id") - assertTrue(with1("type") == "cow") assertTrue(with1("payloadClass") == classOf[OverwriteWithLatestAvroPayload].getName) val ops2 = Map("primaryKey" -> "id", @@ -106,65 +105,45 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness { StructField("dt", StringType, true)) ) - // miss primaryKey parameter + // primary field not found val sqlOptions1 = baseSqlOptions ++ Map( + "primaryKey" -> "xxx", "type" -> "mor" ) - val e1 = intercept[IllegalArgumentException] { HoodieOptionConfig.validateTable(spark, schema, sqlOptions1) } - assertTrue(e1.getMessage.contains("No `primaryKey` is specified.")) - - // primary field not found - val sqlOptions2 = baseSqlOptions ++ Map( - "primaryKey" -> "xxx", - "type" -> "mor" - ) - val e2 = intercept[IllegalArgumentException] { - HoodieOptionConfig.validateTable(spark, schema, sqlOptions2) - } - assertTrue(e2.getMessage.contains("Can't find primaryKey")) + assertTrue(e1.getMessage.contains("Can't find primaryKey")) // preCombine field not found - val sqlOptions3 = baseSqlOptions ++ Map( + val sqlOptions2 = baseSqlOptions ++ Map( "primaryKey" -> "id", "preCombineField" -> "ts", "type" -> "mor" ) - val e3 = intercept[IllegalArgumentException] { - HoodieOptionConfig.validateTable(spark, schema, sqlOptions3) - } - assertTrue(e3.getMessage.contains("Can't find preCombineKey")) - - // miss type parameter - val sqlOptions4 = baseSqlOptions ++ Map( - "primaryKey" -> "id", - "preCombineField" -> "timestamp" - ) - val e4 = intercept[IllegalArgumentException] { - HoodieOptionConfig.validateTable(spark, schema, sqlOptions4) + val e2 = intercept[IllegalArgumentException] { + HoodieOptionConfig.validateTable(spark, schema, sqlOptions2) } - assertTrue(e4.getMessage.contains("No `type` is specified.")) + assertTrue(e2.getMessage.contains("Can't find preCombineKey")) // type is invalid - val sqlOptions5 = baseSqlOptions ++ Map( + val sqlOptions3 = baseSqlOptions ++ Map( "primaryKey" -> "id", "preCombineField" -> "timestamp", "type" -> "abc" ) - val e5 = intercept[IllegalArgumentException] { - HoodieOptionConfig.validateTable(spark, schema, sqlOptions5) + val e3 = intercept[IllegalArgumentException] { + HoodieOptionConfig.validateTable(spark, schema, sqlOptions3) } - assertTrue(e5.getMessage.contains("'type' must be 'cow' or 'mor'")) + assertTrue(e3.getMessage.contains("'type' must be 'cow' or 'mor'")) // right options and schema - val sqlOptions6 = baseSqlOptions ++ Map( + val sqlOptions4 = baseSqlOptions ++ Map( "primaryKey" -> "id", "preCombineField" -> "timestamp", "type" -> "cow" ) - HoodieOptionConfig.validateTable(spark, schema, sqlOptions6) + HoodieOptionConfig.validateTable(spark, schema, sqlOptions4) } }