diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index da2736e59bdda..cf9767cd19548 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -83,9 +83,9 @@ object HoodieSparkSqlWriter { val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) - validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) + validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite) - val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) + val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode) val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters) val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator( originKeyGeneratorClassName, parameters) @@ -408,9 +408,9 @@ object HoodieSparkSqlWriter { val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) - validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) + validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite) - val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) + val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode) val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE) val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH, @@ -734,14 +734,14 @@ object HoodieSparkSqlWriter { } private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String], - tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = { + tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = { val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams) val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions) if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) { mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key) } - if (null != tableConfig) { + if (null != tableConfig && mode != SaveMode.Overwrite) { tableConfig.getProps.foreach { case (key, value) => mergedParams(key) = value } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 60428415861be..e202f9b8c44b0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -119,47 +119,56 @@ object HoodieWriterUtils { } } + def validateTableConfig(spark: SparkSession, params: Map[String, String], + tableConfig: HoodieConfig): Unit = { + validateTableConfig(spark, params, tableConfig, false) + } + /** * Detects conflicts between new parameters and existing table configurations */ def validateTableConfig(spark: SparkSession, params: Map[String, String], - tableConfig: HoodieConfig): Unit = { - val resolver = spark.sessionState.conf.resolver - val diffConfigs = StringBuilder.newBuilder - params.foreach { case (key, value) => - val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key) - if (null != existingValue && !resolver(existingValue, value)) { - diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n") + tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = { + // If Overwrite is set as save mode, we don't need to do table config validation. + if (!isOverWriteMode) { + val resolver = spark.sessionState.conf.resolver + val diffConfigs = StringBuilder.newBuilder + params.foreach { case (key, value) => + val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key) + if (null != existingValue && !resolver(existingValue, value)) { + diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n") + } } - } - if (null != tableConfig) { - val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null) - val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS) - if (null != datasourceRecordKey && null != tableConfigRecordKey + if (null != tableConfig) { + val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null) + val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS) + if (null != datasourceRecordKey && null != tableConfigRecordKey && datasourceRecordKey != tableConfigRecordKey) { - diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n") - } + diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n") + } - val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null) - val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD) - if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey + val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null) + val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD) + if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey && datasourcePreCombineKey != tableConfigPreCombineKey) { - diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n") - } + diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n") + } - val datasourceKeyGen = getOriginKeyGenerator(params) - val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) - if (null != datasourceKeyGen && null != tableConfigKeyGen + val datasourceKeyGen = getOriginKeyGenerator(params) + val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) + if (null != datasourceKeyGen && null != tableConfigKeyGen && datasourceKeyGen != tableConfigKeyGen) { - diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n") + diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n") + } } - } - if (diffConfigs.nonEmpty) { - diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n") - throw new HoodieException(diffConfigs.toString.trim) + if (diffConfigs.nonEmpty) { + diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n") + throw new HoodieException(diffConfigs.toString.trim) + } } + // Check schema evolution for bootstrap table. // now we do not support bootstrap table. if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 339dbb5c715ef..928b1b1a1eec7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -272,6 +272,29 @@ class TestHoodieSparkSqlWriter { assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl")) } + /** + * Test case for Do not validate table config if save mode is set to Overwrite + */ + @Test + def testValidateTableConfigWithOverwriteSaveMode(): Unit = { + //create a new table + val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.datasource.write.recordkey.field" -> "uuid") + val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame) + + //on same path try write with different RECORDKEY_FIELD_NAME and Append SaveMode should throw an exception + val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.datasource.write.recordkey.field" -> "ts") + val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime))) + val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2)) + assert(hoodieException.getMessage.contains("Config conflict")) + assert(hoodieException.getMessage.contains(s"RecordKey:\tts\tuuid")) + + //on same path try write with different RECORDKEY_FIELD_NAME and Overwrite SaveMode should be successful. + assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier2, dataFrame2)._1) + } + /** * Test case for each bulk insert sort mode *