diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index e27c15ebcf7c3..09981e845a108 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -23,7 +23,7 @@ import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.common.config.DFSPropertiesConfiguration import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} -import org.apache.hudi.common.util.ValidationUtils +import org.apache.hudi.common.util.{StringUtils, ValidationUtils} import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.spark.internal.Logging @@ -91,6 +91,11 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten */ lazy val tableName: String = tableConfig.getTableName + /** + * the name of dabase + */ + lazy val databaseName: String = tableConfig.getDatabaseName + /** * The name of type of table */ @@ -171,19 +176,23 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten val properties = new Properties() properties.putAll(tableConfigs.asJava) + val catalogDatabaseName = formatName(spark, + table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase)) if (hoodieTableExists) { + assert(StringUtils.isNullOrEmpty(databaseName) || databaseName == catalogDatabaseName, + "The database names from this hoodie path and this catalog table is not same.") // just persist hoodie.table.create.schema HoodieTableMetaClient.withPropertyBuilder() .fromProperties(properties) + .setDatabaseName(catalogDatabaseName) .setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString()) .initTable(hadoopConf, tableLocation) } else { val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table) val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace) - val hoodieDatabaseName = formatName(spark, table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase)) HoodieTableMetaClient.withPropertyBuilder() .fromProperties(properties) - .setDatabaseName(hoodieDatabaseName) + .setDatabaseName(catalogDatabaseName) .setTableName(table.identifier.table) .setTableCreateSchema(schema.toString()) .setPartitionFields(table.partitionColumnNames.mkString(",")) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index b43d3a3f857c9..6a93f0c7afb04 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import scala.collection.JavaConverters._ @@ -62,8 +61,9 @@ case class CreateHoodieTableAsSelectCommand( s"Expect the table $tableName has been dropped when the save mode is Overwrite") if (mode == SaveMode.ErrorIfExists) { - throw new RuntimeException(s"Table $tableName already exists. You need to drop it first.") + throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.") } + if (mode == SaveMode.Ignore) { // Since the table already exists and the save mode is Ignore, we will just return. // scalastyle:off @@ -92,8 +92,6 @@ case class CreateHoodieTableAsSelectCommand( val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable) val tablePath = hoodieCatalogTable.tableLocation val hadoopConf = sparkSession.sessionState.newHadoopConf() - assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf), - s"Path '$tablePath' should be empty for CTAS") // Execute the insert query try { @@ -108,7 +106,8 @@ case class CreateHoodieTableAsSelectCommand( DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(), DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true" ) - val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, reOrderedQuery, Map.empty, + val partitionSpec = newTable.partitionColumnNames.map((_, None)).toMap + val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, reOrderedQuery, partitionSpec, mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options) if (success) { // If write success, create the table in catalog if it has not synced to the diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 631b5d089401e..c7eef5bce4270 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -33,7 +33,7 @@ import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction -import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension @@ -978,9 +978,6 @@ class TestCOWDataSource extends HoodieClientTestBase { df.write.format("hudi") .options(commonOpts) - .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") - .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") - .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") .option("hoodie.insert.shuffle.parallelism", "4") .option("hoodie.upsert.shuffle.parallelism", "4") .option("hoodie.bulkinsert.shuffle.parallelism", "2") @@ -1045,4 +1042,69 @@ class TestCOWDataSource extends HoodieClientTestBase { ) } } + + @Test + def testSaveAsTableInDifferentModes(): Unit = { + val options = scala.collection.mutable.Map.empty ++ commonOpts ++ Map("path" -> basePath) + + // first use the Overwrite mode + val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .partitionBy("partition") + .options(options) + .mode(SaveMode.Append) + .saveAsTable("hoodie_test") + + // init metaClient + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + assertEquals(spark.read.format("hudi").load(basePath).count(), 5) + + // use the Append mode + val records2 = recordsToStrings(dataGen.generateInserts("002", 6)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .partitionBy("partition") + .options(options) + .mode(SaveMode.Append) + .saveAsTable("hoodie_test") + assertEquals(spark.read.format("hudi").load(basePath).count(), 11) + + // use the Ignore mode + val records3 = recordsToStrings(dataGen.generateInserts("003", 7)).toList + val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .partitionBy("partition") + .options(options) + .mode(SaveMode.Ignore) + .saveAsTable("hoodie_test") + // nothing to do for the ignore mode + assertEquals(spark.read.format("hudi").load(basePath).count(), 11) + + // use the ErrorIfExists mode + val records4 = recordsToStrings(dataGen.generateInserts("004", 8)).toList + val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2)) + try { + inputDF4.write.format("org.apache.hudi") + .partitionBy("partition") + .options(options) + .mode(SaveMode.ErrorIfExists) + .saveAsTable("hoodie_test") + } catch { + case e: Throwable => // do nothing + } + + // use the Overwrite mode + val records5 = recordsToStrings(dataGen.generateInserts("005", 9)).toList + val inputDF5 = spark.read.json(spark.sparkContext.parallelize(records5, 2)) + inputDF5.write.format("org.apache.hudi") + .partitionBy("partition") + .options(options) + .mode(SaveMode.Overwrite) + .saveAsTable("hoodie_test") + assertEquals(spark.read.format("hudi").load(basePath).count(), 9) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala index 70848529dcd65..d3dbf9a6e6aab 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala @@ -670,7 +670,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase { assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key)) assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key)) assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key)) - assertResult("")(metaClient.getTableConfig.getDatabaseName) + assertResult("hudi_database")(metaClient.getTableConfig.getDatabaseName) assertResult(s"original_$tableName")(metaClient.getTableConfig.getTableName) // Test insert into