Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -91,6 +91,11 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
*/
lazy val tableName: String = tableConfig.getTableName

/**
* the name of dabase
*/
lazy val databaseName: String = tableConfig.getDatabaseName

/**
* The name of type of table
*/
Expand Down Expand Up @@ -171,19 +176,23 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
val properties = new Properties()
properties.putAll(tableConfigs.asJava)

val catalogDatabaseName = formatName(spark,
table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
if (hoodieTableExists) {
assert(StringUtils.isNullOrEmpty(databaseName) || databaseName == catalogDatabaseName,
"The database names from this hoodie path and this catalog table is not same.")
// just persist hoodie.table.create.schema
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setDatabaseName(catalogDatabaseName)
.setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString())
.initTable(hadoopConf, tableLocation)
} else {
val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace)
val hoodieDatabaseName = formatName(spark, table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setDatabaseName(hoodieDatabaseName)
.setDatabaseName(catalogDatabaseName)
.setTableName(table.identifier.table)
.setTableCreateSchema(schema.toString())
.setPartitionFields(table.partitionColumnNames.mkString(","))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -62,8 +61,9 @@ case class CreateHoodieTableAsSelectCommand(
s"Expect the table $tableName has been dropped when the save mode is Overwrite")

if (mode == SaveMode.ErrorIfExists) {
throw new RuntimeException(s"Table $tableName already exists. You need to drop it first.")
throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
}

if (mode == SaveMode.Ignore) {
// Since the table already exists and the save mode is Ignore, we will just return.
// scalastyle:off
Expand Down Expand Up @@ -92,8 +92,6 @@ case class CreateHoodieTableAsSelectCommand(
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, newTable)
val tablePath = hoodieCatalogTable.tableLocation
val hadoopConf = sparkSession.sessionState.newHadoopConf()
assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf),
s"Path '$tablePath' should be empty for CTAS")

// Execute the insert query
try {
Expand All @@ -108,7 +106,8 @@ case class CreateHoodieTableAsSelectCommand(
DataSourceWriteOptions.SQL_INSERT_MODE.key -> InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, reOrderedQuery, Map.empty,
val partitionSpec = newTable.partitionColumnNames.map((_, None)).toMap
val success = InsertIntoHoodieTableCommand.run(sparkSession, newTable, reOrderedQuery, partitionSpec,
mode == SaveMode.Overwrite, refreshTable = false, extraOptions = options)
if (success) {
// If write success, create the table in catalog if it has not synced to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
Expand Down Expand Up @@ -978,9 +978,6 @@ class TestCOWDataSource extends HoodieClientTestBase {

df.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option("hoodie.insert.shuffle.parallelism", "4")
.option("hoodie.upsert.shuffle.parallelism", "4")
.option("hoodie.bulkinsert.shuffle.parallelism", "2")
Expand Down Expand Up @@ -1045,4 +1042,69 @@ class TestCOWDataSource extends HoodieClientTestBase {
)
}
}

@Test
def testSaveAsTableInDifferentModes(): Unit = {
val options = scala.collection.mutable.Map.empty ++ commonOpts ++ Map("path" -> basePath)

// first use the Overwrite mode
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.partitionBy("partition")
.options(options)
.mode(SaveMode.Append)
.saveAsTable("hoodie_test")

// init metaClient
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(spark.sessionState.newHadoopConf)
.build()
assertEquals(spark.read.format("hudi").load(basePath).count(), 5)

// use the Append mode
val records2 = recordsToStrings(dataGen.generateInserts("002", 6)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.partitionBy("partition")
.options(options)
.mode(SaveMode.Append)
.saveAsTable("hoodie_test")
assertEquals(spark.read.format("hudi").load(basePath).count(), 11)

// use the Ignore mode
val records3 = recordsToStrings(dataGen.generateInserts("003", 7)).toList
val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
inputDF3.write.format("org.apache.hudi")
.partitionBy("partition")
.options(options)
.mode(SaveMode.Ignore)
.saveAsTable("hoodie_test")
// nothing to do for the ignore mode
assertEquals(spark.read.format("hudi").load(basePath).count(), 11)

// use the ErrorIfExists mode
val records4 = recordsToStrings(dataGen.generateInserts("004", 8)).toList
val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2))
try {
inputDF4.write.format("org.apache.hudi")
.partitionBy("partition")
.options(options)
.mode(SaveMode.ErrorIfExists)
.saveAsTable("hoodie_test")
} catch {
case e: Throwable => // do nothing
}

// use the Overwrite mode
val records5 = recordsToStrings(dataGen.generateInserts("005", 9)).toList
val inputDF5 = spark.read.json(spark.sparkContext.parallelize(records5, 2))
inputDF5.write.format("org.apache.hudi")
.partitionBy("partition")
.options(options)
.mode(SaveMode.Overwrite)
.saveAsTable("hoodie_test")
assertEquals(spark.read.format("hudi").load(basePath).count(), 9)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
assertResult("")(metaClient.getTableConfig.getDatabaseName)
assertResult("hudi_database")(metaClient.getTableConfig.getDatabaseName)
assertResult(s"original_$tableName")(metaClient.getTableConfig.getTableName)

// Test insert into
Expand Down