diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f1948d80c817f..037d2b09c749e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -39,6 +39,8 @@ import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRo import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.index.SparkHoodieIndexFactory import org.apache.hudi.internal.DataSourceInternalWriterHelper +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig @@ -51,11 +53,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.{SPARK_VERSION, SparkContext} -import org.apache.spark.SparkContext - -import java.util.Properties -import org.apache.hudi.internal.schema.InternalSchema -import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper} import scala.collection.JavaConversions._ import scala.collection.mutable @@ -92,6 +89,8 @@ object HoodieSparkSqlWriter { val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters) val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator( originKeyGeneratorClassName, parameters) + //validate datasource and tableconfig keygen are the same + validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig); val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "") val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 81199dbca9f25..5c379e43be5c5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -18,14 +18,16 @@ package org.apache.hudi import java.util.Properties -import org.apache.hudi.config.HoodieWriteConfig + import org.apache.hudi.DataSourceOptionsHelper.allAlternatives import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.HiveSyncConfig +import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hudi.command.SqlKeyGenerator @@ -173,6 +175,29 @@ object HoodieWriterUtils { } } + /** + * Detects conflicts between datasourceKeyGen and existing table configuration keyGen + */ + def validateKeyGeneratorConfig(datasourceKeyGen: String, tableConfig: HoodieConfig): Unit = { + val diffConfigs = StringBuilder.newBuilder + + if (null != tableConfig) { + val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + if (null != tableConfigKeyGen && null != datasourceKeyGen) { + val nonPartitionedTableConfig = tableConfigKeyGen.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName) + val simpleKeyDataSourceConfig = datasourceKeyGen.equals(classOf[SimpleKeyGenerator].getCanonicalName) + if (nonPartitionedTableConfig && simpleKeyDataSourceConfig) { + diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n") + } + } + } + + if (diffConfigs.nonEmpty) { + diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n") + throw new HoodieException(diffConfigs.toString.trim) + } + } + private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = { if (null == tableConfig) { null diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index c14d0bb063d7b..111a46261c769 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -17,11 +17,13 @@ package org.apache.hudi +import java.io.IOException +import java.time.Instant +import java.util.{Collections, Date, UUID} + import org.apache.commons.io.FileUtils -import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.SparkRDDWriteClient -import org.apache.hudi.common.config.HoodieConfig import org.apache.hudi.common.model._ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator @@ -29,7 +31,6 @@ import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.functional.TestBootstrap -import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.spark.api.java.JavaSparkContext @@ -37,7 +38,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.hudi.command.SqlKeyGenerator -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.{SparkConf, SparkContext} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -46,11 +46,8 @@ import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows -import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, intercept} +import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept} -import java.io.IOException -import java.time.Instant -import java.util.{Collections, Date, UUID} import scala.collection.JavaConversions._ import scala.collection.JavaConverters @@ -887,6 +884,139 @@ class TestHoodieSparkSqlWriter { assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16") } + @Test + def testNonpartitonedToDefaultKeyGen(): Unit = { + val _spark = spark + import _spark.implicits._ + val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + val options = Map( + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + ) + + // case 1: When commit C1 specificies a key generator and commit C2 does not specify key generator + val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1") + + // the first write need to specify KEYGENERATOR_CLASS_NAME params + df.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) + .mode(SaveMode.Overwrite).save(tablePath1) + + val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + // raise exception when no KEYGENERATOR_CLASS_NAME is specified and it is expected to default to SimpleKeyGenerator + val configConflictException = intercept[HoodieException] { + df2.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .mode(SaveMode.Append).save(tablePath1) + } + assert(configConflictException.getMessage.contains("Config conflict")) + assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[SimpleKeyGenerator].getName}\t${classOf[NonpartitionedKeyGenerator].getName}")) + } + + @Test + def testDefaultKeyGenToNonpartitoned(): Unit = { + val _spark = spark + import _spark.implicits._ + val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + val options = Map( + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + ) + + // case 1: When commit C1 does not specify key generator and commit C2 specificies a key generator + val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1") + + // the first write need to specify KEYGENERATOR_CLASS_NAME params + df.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .mode(SaveMode.Overwrite).save(tablePath1) + + val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + // raise exception when NonpartitionedKeyGenerator is specified + val configConflictException = intercept[HoodieException] { + df2.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) + .mode(SaveMode.Append).save(tablePath1) + } + assert(configConflictException.getMessage.contains("Config conflict")) + assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[NonpartitionedKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}")) + } + + + @Test + def testNoKeyGenToSimpleKeyGen(): Unit = { + val _spark = spark + import _spark.implicits._ + val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + val options = Map( + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + ) + + // case 1: When commit C1 specificies a key generator and commkt C2 does not specify key generator + val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1") + + // the first write need to specify KEYGENERATOR_CLASS_NAME params + df.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .mode(SaveMode.Overwrite).save(tablePath1) + + val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + // No Exception Should be raised + try { + df2.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .mode(SaveMode.Append).save(tablePath1) + } catch { + case _ => fail("Switching from no keygen to explicit SimpleKeyGenerator should not fail"); + } + } + + @Test + def testSimpleKeyGenToNoKeyGen(): Unit = { + val _spark = spark + import _spark.implicits._ + val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + val options = Map( + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt" + ) + + // case 1: When commit C1 specificies a key generator and commkt C2 does not specify key generator + val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1") + + // the first write need to specify KEYGENERATOR_CLASS_NAME params + df.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .mode(SaveMode.Overwrite).save(tablePath1) + + val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt") + // No Exception Should be raised when default keygen is used + try { + df2.write.format("hudi") + .options(options) + .option(HoodieWriteConfig.TBL_NAME.key, tableName1) + .mode(SaveMode.Append).save(tablePath1) + } catch { + case _ => fail("Switching from explicit SimpleKeyGenerator to default keygen should not fail"); + } + } + @Test def testGetOriginKeyGenerator(): Unit = { // for dataframe write