Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRo
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
Expand All @@ -51,11 +53,6 @@ import org.apache.spark.sql._
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.SparkContext

import java.util.Properties
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}

import scala.collection.JavaConversions._
import scala.collection.mutable
Expand Down Expand Up @@ -92,6 +89,8 @@ object HoodieSparkSqlWriter {
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestmapBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
//validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.hudi

import java.util.Properties
import org.apache.hudi.config.HoodieWriteConfig

import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
Expand Down Expand Up @@ -173,6 +175,29 @@ object HoodieWriterUtils {
}
}

/**
* Detects conflicts between datasourceKeyGen and existing table configuration keyGen
*/
def validateKeyGeneratorConfig(datasourceKeyGen: String, tableConfig: HoodieConfig): Unit = {
val diffConfigs = StringBuilder.newBuilder

if (null != tableConfig) {
val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
if (null != tableConfigKeyGen && null != datasourceKeyGen) {
val nonPartitionedTableConfig = tableConfigKeyGen.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
val simpleKeyDataSourceConfig = datasourceKeyGen.equals(classOf[SimpleKeyGenerator].getCanonicalName)
if (nonPartitionedTableConfig && simpleKeyDataSourceConfig) {
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
Comment on lines +186 to +190
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if there are more case we need to catch here, as this check is very specific. What about the cases where users subclassed the built-in keygen ? Is there any generic way to prevent discrepancy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I also prefer to check all possible combination of switches.
I have created a follow up task https://issues.apache.org/jira/browse/HUDI-3820
@rkkalluri : there are 2 work items in there. a: adding validations and tests for switching between diff key gens. b: with insert_overwrite_table operation, we should not do the validation and over-write table config if key gen is changed.

}
}
}

if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
}

private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = {
if (null == tableConfig) {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@

package org.apache.hudi

import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.hudi.functional.TestBootstrap
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.hudi.command.SqlKeyGenerator
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand All @@ -46,11 +46,8 @@ import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Assertions.assertThrows
import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, intercept}
import org.scalatest.Matchers.{be, convertToAnyShouldWrapper, intercept}

import java.io.IOException
import java.time.Instant
import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters

Expand Down Expand Up @@ -887,6 +884,139 @@ class TestHoodieSparkSqlWriter {
assert(data.select("_hoodie_partition_path").map(_.getString(0)).distinct.collect.head == "2021-10-16")
}

@Test
def testNonpartitonedToDefaultKeyGen(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
)

// case 1: When commit C1 specificies a key generator and commit C2 does not specify key generator
val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")

// the first write need to specify KEYGENERATOR_CLASS_NAME params
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Overwrite).save(tablePath1)

val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// raise exception when no KEYGENERATOR_CLASS_NAME is specified and it is expected to default to SimpleKeyGenerator
val configConflictException = intercept[HoodieException] {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Append).save(tablePath1)
}
assert(configConflictException.getMessage.contains("Config conflict"))
assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[SimpleKeyGenerator].getName}\t${classOf[NonpartitionedKeyGenerator].getName}"))
}

@Test
def testDefaultKeyGenToNonpartitoned(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
)

// case 1: When commit C1 does not specify key generator and commit C2 specificies a key generator
val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")

// the first write need to specify KEYGENERATOR_CLASS_NAME params
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Overwrite).save(tablePath1)

val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// raise exception when NonpartitionedKeyGenerator is specified
val configConflictException = intercept[HoodieException] {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Append).save(tablePath1)
}
assert(configConflictException.getMessage.contains("Config conflict"))
assert(configConflictException.getMessage.contains(s"KeyGenerator:\t${classOf[NonpartitionedKeyGenerator].getName}\t${classOf[SimpleKeyGenerator].getName}"))
}


@Test
def testNoKeyGenToSimpleKeyGen(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
)

// case 1: When commit C1 specificies a key generator and commkt C2 does not specify key generator
val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")

// the first write need to specify KEYGENERATOR_CLASS_NAME params
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Overwrite).save(tablePath1)

val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// No Exception Should be raised
try {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.mode(SaveMode.Append).save(tablePath1)
} catch {
case _ => fail("Switching from no keygen to explicit SimpleKeyGenerator should not fail");
}
}

@Test
def testSimpleKeyGenToNoKeyGen(): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
val options = Map(
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
)

// case 1: When commit C1 specificies a key generator and commkt C2 does not specify key generator
val (tableName1, tablePath1) = ("hoodie_test_params_1", s"$tempBasePath" + "_1")

// the first write need to specify KEYGENERATOR_CLASS_NAME params
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName)
.mode(SaveMode.Overwrite).save(tablePath1)

val df2 = Seq((2, "a2", 20, 1000, "2021-10-16")).toDF("id", "name", "value", "ts", "dt")
// No Exception Should be raised when default keygen is used
try {
df2.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
.mode(SaveMode.Append).save(tablePath1)
} catch {
case _ => fail("Switching from explicit SimpleKeyGenerator to default keygen should not fail");
}
}

@Test
def testGetOriginKeyGenerator(): Unit = {
// for dataframe write
Expand Down