Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* A wrapper of hoodie CatalogTable instance and hoodie Table.
* Table definition for SQL funcitonalities. Depending on the way of data generation,
* meta of Hudi table can be from Spark catalog or meta directory on filesystem.
* [[HoodieCatalogTable]] takes both meta sources into consideration when handling
* EXTERNAL and MANAGED tables.
*/
class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) extends Logging {
class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) extends Logging {

assert(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi", "It's not a Hudi table")

Expand Down Expand Up @@ -117,23 +120,9 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name()

/**
* The schema of table.
* Make StructField nullable and fill the comments in.
* Table schema
*/
lazy val tableSchema: StructType = {
val resolver = spark.sessionState.conf.resolver
val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).getOrElse(table.schema)
val fields = originSchema.fields.map { f =>
val nullableField: StructField = f.copy(nullable = true)
val catalogField = findColumnByName(table.schema, nullableField.name, resolver)
if (catalogField.isDefined) {
catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField)
} else {
nullableField
}
}
StructType(fields)
}
lazy val tableSchema: StructType = table.schema

/**
* The schema without hoodie meta fields
Expand Down Expand Up @@ -168,12 +157,14 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
def isPartitionedTable: Boolean = table.partitionColumnNames.nonEmpty

/**
* init hoodie table for create table (as select)
* Initializes table meta on filesystem when applying CREATE TABLE clause.
*/
def initHoodieTable(): Unit = {
logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}")
val (finalSchema, tableConfigs) = parseSchemaAndConfigs()

table = table.copy(schema = finalSchema)

// Save all the table config to the hoodie.properties.
val properties = new Properties()
properties.putAll(tableConfigs.asJava)
Expand All @@ -199,7 +190,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
}

/**
* @return schema, table parameters in which all parameters aren't sql-styled.
* Derives the SQL schema and configurations for a Hudi table:
* 1. Columns in the schema fall under two categories -- the data columns described in
* CREATE TABLE clause and meta columns enumerated in [[HoodieRecord#HOODIE_META_COLUMNS]];
* 2. Configurations derived come from config file, PROPERTIES and OPTIONS in CREATE TABLE clause.
*/
private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
Expand All @@ -216,24 +210,25 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties)
validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig))

val options = extraTableConfig(spark, hoodieTableExists, currentTableConfig) ++
val options = extraTableConfig(hoodieTableExists, currentTableConfig) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig

ValidationUtils.checkArgument(tableSchema.nonEmpty || table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = if (tableSchema.nonEmpty) {
tableSchema
} else {
val schemaFromMetaOpt = loadTableSchemaByMetaClient()
val schema = if (schemaFromMetaOpt.nonEmpty) {
schemaFromMetaOpt.get
} else if (table.schema.nonEmpty) {
addMetaFields(table.schema)
} else {
throw new AnalysisException(
s"Missing schema fields when applying CREATE TABLE clause for ${catalogTableName}")
}

(schema, options)

case (_, false) =>
ValidationUtils.checkArgument(table.schema.nonEmpty,
s"Missing schema for Create Table: $catalogTableName")
val schema = table.schema
val options = extraTableConfig(spark, isTableExists = false, globalTableConfigs) ++
val options = extraTableConfig(tableExists = false, globalTableConfigs) ++
HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
(addMetaFields(schema), options)

Expand All @@ -253,10 +248,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
(finalSchema, tableConfigs)
}

private def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean,
private def extraTableConfig(tableExists: Boolean,
originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
val extraConfig = mutable.Map.empty[String, String]
if (isTableExists) {
if (tableExists) {
val allPartitionPaths = getPartitionPaths
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
Expand Down Expand Up @@ -287,6 +282,24 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
extraConfig.toMap
}

private def loadTableSchemaByMetaClient(): Option[StructType] = {
val resolver = spark.sessionState.conf.resolver
getTableSqlSchema(metaClient, includeMetadataFields = true).map(originSchema => {
// Load table schema from meta on filesystem, and fill in 'comment'
// information from Spark catalog.
val fields = originSchema.fields.map { f =>
val nullableField: StructField = f.copy(nullable = true)
val catalogField = findColumnByName(table.schema, nullableField.name, resolver)
if (catalogField.isDefined) {
catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField)
} else {
nullableField
}
}
StructType(fields)
})
}

// This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema
private def verifyDataSchema(tableIdentifier: TableIdentifier, tableType: CatalogTableType,
dataSchema: Seq[StructField]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,44 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._

import scala.util.control.NonFatal

/**
* Physical plan node for dropping a table.
*/
case class DropHoodieTableCommand(
tableIdentifier: TableIdentifier,
ifExists: Boolean,
isView: Boolean,
purge: Boolean)
extends HoodieLeafRunnableCommand {
purge: Boolean) extends HoodieLeafRunnableCommand {

val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt"
val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro"
private val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt"
private val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro"

override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
logInfo(s"start execute drop table command for $fullTableName")
sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)

try {
// drop catalog table for this hoodie table
dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}")
logInfo(s"Start executing 'DROP TABLE' on ${tableIdentifier.unquotedString}" +
s" (ifExists=${ifExists}, purge=${purge}).")
if (!sparkSession.catalog.tableExists(tableIdentifier.unquotedString)) {
sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
}
val qualifiedTableName = QualifiedTableName(
tableIdentifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
tableIdentifier.table)
sparkSession.sessionState.catalog.invalidateCachedTable(qualifiedTableName)

dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge)

logInfo(s"Finish execute drop table command for $fullTableName")
logInfo(s"Finished executing 'DROP TABLE' on ${tableIdentifier.unquotedString}.")
Seq.empty[Row]
}

def dropTableInCatalog(sparkSession: SparkSession,
/**
* Drops table in Spark catalog. Note that RO & RT table could coexist with a MOR table.
* If `purge` enabled, RO & RT table and corresponding data directory on filesystem will
* all be removed.
*/
private def dropTableInCatalog(sparkSession: SparkSession,
tableIdentifier: TableIdentifier,
ifExists: Boolean,
purge: Boolean): Unit = {
Expand All @@ -67,7 +72,8 @@ extends HoodieLeafRunnableCommand {
val catalog = sparkSession.sessionState.catalog

// Drop table in the catalog
if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
if (hoodieCatalogTable.hoodieTableExists &&
HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable)
rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ set hoodie.delete.shuffle.parallelism = 1;
# CTAS

create table h0 using hudi options(type = '${tableType}', primaryKey = 'id')
location '${tmpDir}/h0'
as select 1 as id, 'a1' as name, 10 as price;
+----------+
| ok |
Expand All @@ -46,6 +47,7 @@ select id, name, price from h0;

create table h0_p using hudi partitioned by(dt)
options(type = '${tableType}', primaryKey = 'id')
location '${tmpDir}/h0_p'
as select cast('2021-05-07 00:00:00' as timestamp) as dt,
1 as id, 'a1' as name, 10 as price;
+----------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hudi

import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.SessionCatalog

Expand Down Expand Up @@ -230,6 +232,115 @@ class TestDropTable extends HoodieSparkSqlTestBase {
}
}

test("Drop an EXTERNAL table which path is lost.") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
spark.sql(
s"""
|create table $tableName (
|id int,
|ts int,
|value string
|)using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
|""".stripMargin)

assert(filesystem.exists(new Path(tablePath)), s"Table path doesn't exists (${tablePath}).")

filesystem.delete(new Path(tablePath), true)
spark.sql(s"drop table ${tableName}")
checkAnswer("show tables")()
}
}

test("Drop an MOR table and related RT & RO when path is lost.") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
spark.sql(
s"""
|create table $tableName (
|id int,
|ts int,
|value string
|)using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'mor'
| )
|""".stripMargin)
assert(filesystem.exists(new Path(tablePath)), s"Table path doesn't exist (${tablePath}).")

spark.sql(
s"""
|create table ${tableName}_ro using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
|create table ${tableName}_rt using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

filesystem.delete(new Path(tablePath), true)
spark.sql(s"drop table ${tableName}")
spark.sql(s"drop table ${tableName}_ro")
spark.sql(s"drop table ${tableName}_rt")
checkAnswer("show tables")()
}
}


test("Drop an MANAGED table which path is lost.") {
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
|id int,
|ts int,
|value string
|)using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
|""".stripMargin)

val tablePath = new Path(
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location)

val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
assert(filesystem.exists(tablePath), s"Table path doesn't exists ($tablePath).")

filesystem.delete(tablePath, true)
spark.sql(s"drop table ${tableName}")
checkAnswer("show tables")()
}

private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier,
newProperties: Map[String, String]): Unit = {
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt)
Expand Down