Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
Expand Down Expand Up @@ -408,9 +408,9 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)

val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
Expand Down Expand Up @@ -734,14 +734,14 @@ object HoodieSparkSqlWriter {
}

private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = {
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
&& mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
}
if (null != tableConfig) {
if (null != tableConfig && mode != SaveMode.Overwrite) {
tableConfig.getProps.foreach { case (key, value) =>
mergedParams(key) = value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,47 +119,56 @@ object HoodieWriterUtils {
}
}

def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieConfig): Unit = {
validateTableConfig(spark, params, tableConfig, false)
}

/**
* Detects conflicts between new parameters and existing table configurations
*/
def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieConfig): Unit = {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = {
// If Overwrite is set as save mode, we don't need to do table config validation.
if (!isOverWriteMode) {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
if (null != existingValue && !resolver(existingValue, value)) {
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
}
}
}

if (null != tableConfig) {
val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
if (null != datasourceRecordKey && null != tableConfigRecordKey
if (null != tableConfig) {
val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
if (null != datasourceRecordKey && null != tableConfigRecordKey
&& datasourceRecordKey != tableConfigRecordKey) {
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
}
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
}

val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
&& datasourcePreCombineKey != tableConfigPreCombineKey) {
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
}
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
}

val datasourceKeyGen = getOriginKeyGenerator(params)
val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
if (null != datasourceKeyGen && null != tableConfigKeyGen
val datasourceKeyGen = getOriginKeyGenerator(params)
val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
if (null != datasourceKeyGen && null != tableConfigKeyGen
&& datasourceKeyGen != tableConfigKeyGen) {
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
}
}
}

if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
if (diffConfigs.nonEmpty) {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}
}

// Check schema evolution for bootstrap table.
// now we do not support bootstrap table.
if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,29 @@ class TestHoodieSparkSqlWriter {
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
}

/**
* Test case for Do not validate table config if save mode is set to Overwrite
*/
@Test
def testValidateTableConfigWithOverwriteSaveMode(): Unit = {
//create a new table
val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.datasource.write.recordkey.field" -> "uuid")
val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame)

//on same path try write with different RECORDKEY_FIELD_NAME and Append SaveMode should throw an exception
val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
"hoodie.datasource.write.recordkey.field" -> "ts")
val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2))
assert(hoodieException.getMessage.contains("Config conflict"))
assert(hoodieException.getMessage.contains(s"RecordKey:\tts\tuuid"))

//on same path try write with different RECORDKEY_FIELD_NAME and Overwrite SaveMode should be successful.
assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier2, dataFrame2)._1)
}

/**
* Test case for each bulk insert sort mode
*
Expand Down