diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 76cea362a3b5..3dbb358fbb05 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -39,9 +39,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable /** - * A wrapper of hoodie CatalogTable instance and hoodie Table. + * Table definition for SQL funcitonalities. Depending on the way of data generation, + * meta of Hudi table can be from Spark catalog or meta directory on filesystem. + * [[HoodieCatalogTable]] takes both meta sources into consideration when handling + * EXTERNAL and MANAGED tables. */ -class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) extends Logging { +class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) extends Logging { assert(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi", "It's not a Hudi table") @@ -117,23 +120,9 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name() /** - * The schema of table. - * Make StructField nullable and fill the comments in. + * Table schema */ - lazy val tableSchema: StructType = { - val resolver = spark.sessionState.conf.resolver - val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).getOrElse(table.schema) - val fields = originSchema.fields.map { f => - val nullableField: StructField = f.copy(nullable = true) - val catalogField = findColumnByName(table.schema, nullableField.name, resolver) - if (catalogField.isDefined) { - catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField) - } else { - nullableField - } - } - StructType(fields) - } + lazy val tableSchema: StructType = table.schema /** * The schema without hoodie meta fields @@ -168,12 +157,14 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten def isPartitionedTable: Boolean = table.partitionColumnNames.nonEmpty /** - * init hoodie table for create table (as select) + * Initializes table meta on filesystem when applying CREATE TABLE clause. */ def initHoodieTable(): Unit = { logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}") val (finalSchema, tableConfigs) = parseSchemaAndConfigs() + table = table.copy(schema = finalSchema) + // Save all the table config to the hoodie.properties. val properties = new Properties() properties.putAll(tableConfigs.asJava) @@ -199,7 +190,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten } /** - * @return schema, table parameters in which all parameters aren't sql-styled. + * Derives the SQL schema and configurations for a Hudi table: + * 1. Columns in the schema fall under two categories -- the data columns described in + * CREATE TABLE clause and meta columns enumerated in [[HoodieRecord#HOODIE_META_COLUMNS]]; + * 2. Configurations derived come from config file, PROPERTIES and OPTIONS in CREATE TABLE clause. */ private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = { val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap @@ -216,24 +210,25 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties) validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig)) - val options = extraTableConfig(spark, hoodieTableExists, currentTableConfig) ++ + val options = extraTableConfig(hoodieTableExists, currentTableConfig) ++ HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig - ValidationUtils.checkArgument(tableSchema.nonEmpty || table.schema.nonEmpty, - s"Missing schema for Create Table: $catalogTableName") - val schema = if (tableSchema.nonEmpty) { - tableSchema - } else { + val schemaFromMetaOpt = loadTableSchemaByMetaClient() + val schema = if (schemaFromMetaOpt.nonEmpty) { + schemaFromMetaOpt.get + } else if (table.schema.nonEmpty) { addMetaFields(table.schema) + } else { + throw new AnalysisException( + s"Missing schema fields when applying CREATE TABLE clause for ${catalogTableName}") } - (schema, options) case (_, false) => ValidationUtils.checkArgument(table.schema.nonEmpty, s"Missing schema for Create Table: $catalogTableName") val schema = table.schema - val options = extraTableConfig(spark, isTableExists = false, globalTableConfigs) ++ + val options = extraTableConfig(tableExists = false, globalTableConfigs) ++ HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) (addMetaFields(schema), options) @@ -253,10 +248,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten (finalSchema, tableConfigs) } - private def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean, + private def extraTableConfig(tableExists: Boolean, originTableConfig: Map[String, String] = Map.empty): Map[String, String] = { val extraConfig = mutable.Map.empty[String, String] - if (isTableExists) { + if (tableExists) { val allPartitionPaths = getPartitionPaths if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) { extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = @@ -287,6 +282,24 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten extraConfig.toMap } + private def loadTableSchemaByMetaClient(): Option[StructType] = { + val resolver = spark.sessionState.conf.resolver + getTableSqlSchema(metaClient, includeMetadataFields = true).map(originSchema => { + // Load table schema from meta on filesystem, and fill in 'comment' + // information from Spark catalog. + val fields = originSchema.fields.map { f => + val nullableField: StructField = f.copy(nullable = true) + val catalogField = findColumnByName(table.schema, nullableField.name, resolver) + if (catalogField.isDefined) { + catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField) + } else { + nullableField + } + } + StructType(fields) + }) + } + // This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema private def verifyDataSchema(tableIdentifier: TableIdentifier, tableType: CatalogTableType, dataSchema: Seq[StructField]): Unit = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index c24d0fd992d9..a0252861dbf6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -23,39 +23,44 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.sync.common.util.ConfigUtils import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ -import scala.util.control.NonFatal - +/** + * Physical plan node for dropping a table. + */ case class DropHoodieTableCommand( tableIdentifier: TableIdentifier, ifExists: Boolean, isView: Boolean, - purge: Boolean) -extends HoodieLeafRunnableCommand { + purge: Boolean) extends HoodieLeafRunnableCommand { - val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt" - val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro" + private val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt" + private val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro" override def run(sparkSession: SparkSession): Seq[Row] = { - val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" - logInfo(s"start execute drop table command for $fullTableName") - sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) - - try { - // drop catalog table for this hoodie table - dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge) - } catch { - case NonFatal(e) => - logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}") + logInfo(s"Start executing 'DROP TABLE' on ${tableIdentifier.unquotedString}" + + s" (ifExists=${ifExists}, purge=${purge}).") + if (!sparkSession.catalog.tableExists(tableIdentifier.unquotedString)) { + sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) } + val qualifiedTableName = QualifiedTableName( + tableIdentifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase), + tableIdentifier.table) + sparkSession.sessionState.catalog.invalidateCachedTable(qualifiedTableName) + + dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge) - logInfo(s"Finish execute drop table command for $fullTableName") + logInfo(s"Finished executing 'DROP TABLE' on ${tableIdentifier.unquotedString}.") Seq.empty[Row] } - def dropTableInCatalog(sparkSession: SparkSession, + /** + * Drops table in Spark catalog. Note that RO & RT table could coexist with a MOR table. + * If `purge` enabled, RO & RT table and corresponding data directory on filesystem will + * all be removed. + */ + private def dropTableInCatalog(sparkSession: SparkSession, tableIdentifier: TableIdentifier, ifExists: Boolean, purge: Boolean): Unit = { @@ -67,7 +72,8 @@ extends HoodieLeafRunnableCommand { val catalog = sparkSession.sessionState.catalog // Drop table in the catalog - if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) { + if (hoodieCatalogTable.hoodieTableExists && + HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) { val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable) rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false)) roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql index 3e92d31e3a3b..449ba2e2e67b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql @@ -34,6 +34,7 @@ set hoodie.delete.shuffle.parallelism = 1; # CTAS create table h0 using hudi options(type = '${tableType}', primaryKey = 'id') +location '${tmpDir}/h0' as select 1 as id, 'a1' as name, 10 as price; +----------+ | ok | @@ -46,6 +47,7 @@ select id, name, price from h0; create table h0_p using hudi partitioned by(dt) options(type = '${tableType}', primaryKey = 'id') +location '${tmpDir}/h0_p' as select cast('2021-05-07 00:00:00' as timestamp) as dt, 1 as id, 'a1' as name, 10 as price; +----------+ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala index 174835cbac0b..1beb78e27e7f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hudi +import org.apache.hadoop.fs.{LocalFileSystem, Path} +import org.apache.hudi.common.fs.FSUtils import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -230,6 +232,115 @@ class TestDropTable extends HoodieSparkSqlTestBase { } } + test("Drop an EXTERNAL table which path is lost.") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration); + spark.sql( + s""" + |create table $tableName ( + |id int, + |ts int, + |value string + |)using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + |""".stripMargin) + + assert(filesystem.exists(new Path(tablePath)), s"Table path doesn't exists (${tablePath}).") + + filesystem.delete(new Path(tablePath), true) + spark.sql(s"drop table ${tableName}") + checkAnswer("show tables")() + } + } + + test("Drop an MOR table and related RT & RO when path is lost.") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration); + spark.sql( + s""" + |create table $tableName ( + |id int, + |ts int, + |value string + |)using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'mor' + | ) + |""".stripMargin) + assert(filesystem.exists(new Path(tablePath)), s"Table path doesn't exist (${tablePath}).") + + spark.sql( + s""" + |create table ${tableName}_ro using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"), + Map("hoodie.query.as.ro.table" -> "true")) + + spark.sql( + s""" + |create table ${tableName}_rt using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"), + Map("hoodie.query.as.ro.table" -> "false")) + + filesystem.delete(new Path(tablePath), true) + spark.sql(s"drop table ${tableName}") + spark.sql(s"drop table ${tableName}_ro") + spark.sql(s"drop table ${tableName}_rt") + checkAnswer("show tables")() + } + } + + + test("Drop an MANAGED table which path is lost.") { + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + |id int, + |ts int, + |value string + |)using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + |""".stripMargin) + + val tablePath = new Path( + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location) + + val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration); + assert(filesystem.exists(tablePath), s"Table path doesn't exists ($tablePath).") + + filesystem.delete(tablePath, true) + spark.sql(s"drop table ${tableName}") + checkAnswer("show tables")() + } + private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier, newProperties: Map[String, String]): Unit = { val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt)