Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive
import org.apache.spark.sql.catalyst.catalog._

import scala.util.control.NonFatal

Expand Down Expand Up @@ -69,13 +67,13 @@ extends HoodieLeafRunnableCommand {
val catalog = sparkSession.sessionState.catalog

// Drop table in the catalog
val enableHive = isEnableHive(sparkSession)
if (enableHive) {
dropHiveDataSourceTable(sparkSession, hoodieCatalogTable)
if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable)
rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
catalog.dropTable(table.identifier.copy(table = hoodieCatalogTable.tableName), ifExists, purge)
} else {
if (catalog.tableExists(tableIdentifier)) {
catalog.dropTable(tableIdentifier, ifExists, purge)
}
catalog.dropTable(table.identifier, ifExists, purge)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

purge is false and HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType Don't need to drop ro and rt table?

Copy link
Contributor Author

@jinxing64 jinxing64 May 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In existing code, RT and RO tables are dropped only when purging a MOR table; This PR respects the current behavior. From my point, I also think such logic makes sense and acceptable for the user.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In existing code, RT and RO tables are dropped only when purging a MOR table; This PR respects the current behavior. From my point, I also think such logic makes sense and acceptable for the user.

Will this cause the metadata of the ro and rt tables to remain? Causes failure when re-creating the mor table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be fine. The ro and rt is left with NO-PURGE, user can also drop them separately case by case.
Just like the test of Test("Test Drop RO & RT table by one by one.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be fine. The ro and rt is left with NO-PURGE, user can also drop them separately case by case. Just like the test of Test("Test Drop RO & RT table by one by one.")

well

}

// Recursively delete table directories
Expand All @@ -88,42 +86,33 @@ extends HoodieLeafRunnableCommand {
}
}

private def dropHiveDataSourceTable(
sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable): Unit = {
val table = hoodieCatalogTable.table
val dbName = table.identifier.database.get
val tableName = hoodieCatalogTable.tableName

// check database exists
val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName)
if (!dbExists) {
throw new NoSuchDatabaseException(dbName)
}

if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
val snapshotTableName = tableName + MOR_SNAPSHOT_TABLE_SUFFIX
val roTableName = tableName + MOR_READ_OPTIMIZED_TABLE_SUFFIX

dropHiveTable(sparkSession, dbName, snapshotTableName)
dropHiveTable(sparkSession, dbName, roTableName)
private def getTableRTAndRO(catalog: SessionCatalog,
hoodieTable: HoodieCatalogTable): (Option[CatalogTable], Option[CatalogTable]) = {
val rtIdt = hoodieTable.table.identifier.copy(
table = s"${hoodieTable.tableName}${MOR_SNAPSHOT_TABLE_SUFFIX}")
val roIdt = hoodieTable.table.identifier.copy(
table = s"${hoodieTable.tableName}${MOR_READ_OPTIMIZED_TABLE_SUFFIX}")

var rtTableOpt: Option[CatalogTable] = None
var roTableOpt: Option[CatalogTable] = None
if (catalog.tableExists(rtIdt)) {
val rtTable = catalog.getTableMetadata(rtIdt)
if (rtTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) {
rtTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
case Some(v) if v.equalsIgnoreCase("false") => rtTableOpt = Some(rtTable)
case _ => // do-nothing
}
}
}

dropHiveTable(sparkSession, dbName, tableName, purge)
}

private def dropHiveTable(
sparkSession: SparkSession,
dbName: String,
tableName: String,
purge: Boolean = false): Unit = {
// check table exists
if (sparkSession.sessionState.catalog.tableExists(new TableIdentifier(tableName, Option(dbName)))) {
val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf,
sparkSession.sessionState.newHadoopConf())

// drop hive table.
client.dropTable(dbName, tableName, ifExists, purge)
if (catalog.tableExists(roIdt)) {
val roTable = catalog.getTableMetadata(roIdt)
if (roTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) {
roTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
case Some(v) if v.equalsIgnoreCase("true") => roTableOpt = Some(roTable)
case _ => // do-nothing
}
}
}
(rtTableOpt, roTableOpt)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.hudi

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.SessionCatalog

class TestDropTable extends HoodieSparkSqlTestBase {

test("Test Drop Table") {
Expand Down Expand Up @@ -72,4 +75,167 @@ class TestDropTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test Drop RO & RT table by purging base table.") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

spark.sql(
s"""
|create table ${tableName}_ro using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
|create table ${tableName}_rt using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName} purge")
checkAnswer("show tables")()
}
}

test("Test Drop RO & RT table by one by one.") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

spark.sql(
s"""
|create table ${tableName}_ro using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
|create table ${tableName}_rt using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName}_ro")
checkAnswer("show tables")(
Seq("default", tableName, false), Seq("default", s"${tableName}_rt", false))

spark.sql(s"drop table ${tableName}_rt")
checkAnswer("show tables")(Seq("default", tableName, false))

spark.sql(s"drop table ${tableName}")
checkAnswer("show tables")()
}
}

test("Test Drop RO table with purge") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

spark.sql(
s"""
|create table ${tableName}_ro using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
|create table ${tableName}_rt using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName}_ro purge")
checkAnswer("show tables")()
}
}

private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier,
newProperties: Map[String, String]): Unit = {
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt)
val storage = catalogTable.storage
val storageProperties = storage.properties ++ newProperties
val newCatalogTable = catalogTable.copy(storage = storage.copy(properties = storageProperties))
sessionCatalog.alterTable(newCatalogTable)
}
}