Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@ package org.apache.spark.sql.hudi.analysis

import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.SparkAdapterSupport

import scala.collection.JavaConverters._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{AnalysisException, SparkSession}

import scala.collection.JavaConverters._

object HoodieAnalysis {
def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
Expand Down Expand Up @@ -407,6 +405,10 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case CreateDataSourceTableCommand(table, ignoreIfExists)
if isHoodieTable(table) =>
CreateHoodieTableCommand(table, ignoreIfExists)
// Rewrite the DropTableCommand to DropHoodieTableCommand
case DropTableCommand(tableName, ifExists, isView, purge)
if isHoodieTable(tableName, sparkSession) =>
DropHoodieTableCommand(tableName, ifExists, isView, purge)
// Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand
case AlterTableDropPartitionCommand(tableName, specs, _, _, _)
if isHoodieTable(tableName, sparkSession) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.fs.FSUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils.isEnableHive

import scala.util.control.NonFatal

case class DropHoodieTableCommand(
tableIdentifier: TableIdentifier,
ifExists: Boolean,
isView: Boolean,
purge: Boolean) extends RunnableCommand
with SparkAdapterSupport {

override def run(sparkSession: SparkSession): Seq[Row] = {
val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
logInfo(s"start execute drop table command for $fullTableName")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add sparkSession.catalog.refreshTable(xxx)

try {
// drop catalog table for this hoodie table
dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls remove this line, thanks

logInfo(s"Finish execute drop table command for $fullTableName")
Seq.empty[Row]
}

def dropTableInCatalog(sparkSession: SparkSession,
tableIdentifier: TableIdentifier,
ifExists: Boolean,
purge: Boolean): Unit = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val table = hoodieCatalogTable.table
assert(table.tableType != CatalogTableType.VIEW)

val basePath = hoodieCatalogTable.tableLocation
val catalog = sparkSession.sessionState.catalog

// Drop table in the catalog
val enableHive = isEnableHive(sparkSession)
if (enableHive) {
dropHiveDataSourceTable(sparkSession, table, ifExists, purge)
} else {
catalog.dropTable(tableIdentifier, ifExists, purge)
}

// Recursively delete table directories
if (purge) {
logInfo("Clean up " + basePath)
val targetPath = new Path(basePath)
val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration)
if (fs.exists(targetPath)) {
fs.delete(targetPath, true)
}
}
}

private def dropHiveDataSourceTable(
sparkSession: SparkSession,
table: CatalogTable,
ifExists: Boolean,
purge: Boolean): Unit = {
val dbName = table.identifier.database.get
val tableName = table.identifier.table
// check database exists
val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName)
if (!dbExists) {
throw new NoSuchDatabaseException(dbName)
}
// check table exists
if (!sparkSession.sessionState.catalog.tableExists(table.identifier)) {
throw new NoSuchTableException(dbName, table.identifier.table)
}

val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf,
sparkSession.sessionState.newHadoopConf())
// drop hive table.
client.dropTable(dbName, tableName, ifExists, purge)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi

class TestDropTable extends TestHoodieSqlBase {

test("Test Drop Table") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"DROP TABLE $tableName")
checkAnswer(s"show tables like '$tableName'")()
assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName"))
}
}
}

test("Test Drop Table with purge") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"DROP TABLE $tableName PURGE")
checkAnswer(s"show tables like '$tableName'")()
assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.hudi

import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.fs.FSUtils
import org.apache.log4j.Level
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.scalactic.source
import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}

import java.io.File
import java.util.TimeZone

class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -115,4 +117,10 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
case _=> value
}
}

protected def existsPath(filePath: String): Boolean = {
val path = new Path(filePath)
val fs = FSUtils.getFs(filePath, spark.sparkContext.hadoopConfiguration)
fs.exists(path)
}
}