From edb0803691023e502011e270ee83b69bc87c1ffb Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Sat, 18 Dec 2021 17:09:49 +0800 Subject: [PATCH 1/3] [HUDI-3060] DROP TABLE for spark sql --- .../sql/hudi/analysis/HoodieAnalysis.scala | 18 +-- .../hudi/command/DropHoodieTableCommand.scala | 108 ++++++++++++++++++ .../apache/spark/sql/hudi/TestDropTable.scala | 75 ++++++++++++ .../spark/sql/hudi/TestHoodieSqlBase.scala | 10 +- 4 files changed, 202 insertions(+), 9 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 1446760a3eb69..a8e074603708e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -19,24 +19,22 @@ package org.apache.spark.sql.hudi.analysis import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL import org.apache.hudi.SparkAdapterSupport - -import scala.collection.JavaConverters._ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.catalyst.analysis.UnresolvedStar -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} -import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.hudi.command._ +import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.{AnalysisException, SparkSession} + +import scala.collection.JavaConverters._ object HoodieAnalysis { def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = @@ -407,6 +405,10 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic case CreateDataSourceTableCommand(table, ignoreIfExists) if isHoodieTable(table) => CreateHoodieTableCommand(table, ignoreIfExists) + // Rewrite the DropTableCommand to DropHoodieTableCommand + case DropTableCommand(tableName, ifExists, isView, purge) + if isHoodieTable(tableName, sparkSession) => + DropHoodieTableCommand(tableName, ifExists, isView, purge) // Rewrite the AlterTableDropPartitionCommand to AlterHoodieTableDropPartitionCommand case AlterTableDropPartitionCommand(tableName, specs, _, _, _) if isHoodieTable(tableName, sparkSession) => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala new file mode 100644 index 0000000000000..ff9699f4c86b9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hadoop.fs.Path +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.common.fs.FSUtils +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.hive.HiveClientUtils +import org.apache.spark.sql.hudi.HoodieSqlUtils.isEnableHive + +import scala.util.control.NonFatal + +case class DropHoodieTableCommand( + tableIdentifier: TableIdentifier, + ifExists: Boolean, + isView: Boolean, + purge: Boolean) extends RunnableCommand + with SparkAdapterSupport { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" + logInfo(s"start execute drop table command for $fullTableName") + + try { + // drop catalog table for this hoodie table + dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge) + } catch { + case NonFatal(e) => + logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}") + } + + logInfo(s"Finish execute drop table command for $fullTableName") + Seq.empty[Row] + } + + def dropTableInCatalog(sparkSession: SparkSession, + tableIdentifier: TableIdentifier, + ifExists: Boolean, + purge: Boolean): Unit = { + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) + val table = hoodieCatalogTable.table + assert(table.tableType != CatalogTableType.VIEW) + + val basePath = hoodieCatalogTable.tableLocation + val catalog = sparkSession.sessionState.catalog + + // Drop table in the catalog + val enableHive = isEnableHive(sparkSession) + if (enableHive) { + dropHiveDataSourceTable(sparkSession, table, ifExists, purge) + } else { + catalog.dropTable(tableIdentifier, ifExists, purge) + } + + // Recursively delete table directories + if (purge) { + logInfo("Clean up " + basePath) + val targetPath = new Path(basePath) + val fs = FSUtils.getFs(basePath, sparkSession.sparkContext.hadoopConfiguration) + if (fs.exists(targetPath)) { + fs.delete(targetPath, true) + } + } + } + + private def dropHiveDataSourceTable( + sparkSession: SparkSession, + table: CatalogTable, + ifExists: Boolean, + purge: Boolean): Unit = { + val dbName = table.identifier.database.get + val tableName = table.identifier.table + // check database exists + val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName) + if (!dbExists) { + throw new NoSuchDatabaseException(dbName) + } + // check table exists + if (!sparkSession.sessionState.catalog.tableExists(table.identifier)) { + throw new NoSuchTableException(dbName, table.identifier.table) + } + + val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf, + sparkSession.sessionState.newHadoopConf()) + // drop hive table. + client.dropTable(dbName, tableName, ifExists, purge) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala new file mode 100644 index 0000000000000..c53eb9127c887 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestDropTable extends TestHoodieSqlBase { + + test("Test Drop Table") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql(s"DROP TABLE $tableName") + checkAnswer(s"show tables like '$tableName'")() + assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName")) + } + } + } + + test("Test Drop Table with purge") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql(s"DROP TABLE $tableName PURGE") + checkAnswer(s"show tables like '$tableName'")() + assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName")) + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index 5413bf4044892..a7abddaa0308f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.hudi -import java.io.File +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.fs.FSUtils import org.apache.log4j.Level import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.{Row, SparkSession} @@ -25,6 +26,7 @@ import org.apache.spark.util.Utils import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} +import java.io.File import java.util.TimeZone class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { @@ -115,4 +117,10 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { case _=> value } } + + protected def existsPath(filePath: String): Boolean = { + val path = new Path(filePath) + val fs = FSUtils.getFs(filePath, spark.sparkContext.hadoopConfiguration) + fs.exists(path) + } } From 04947e115398fd98e5ffe9f111886ea0fe9d53c1 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Tue, 21 Dec 2021 20:23:19 +0800 Subject: [PATCH 2/3] [HUDI-3060] DROP TABLE for spark sql --- .../apache/spark/sql/hudi/command/DropHoodieTableCommand.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index ff9699f4c86b9..3eb0ef4c85dbe 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -49,6 +49,7 @@ case class DropHoodieTableCommand( logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}") } + sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) logInfo(s"Finish execute drop table command for $fullTableName") Seq.empty[Row] } From 3aa814b0ca085a630674231012faed9f36d3bedd Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Tue, 21 Dec 2021 20:24:29 +0800 Subject: [PATCH 3/3] [HUDI-3060] DROP TABLE for spark sql --- .../apache/spark/sql/hudi/command/DropHoodieTableCommand.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index 3eb0ef4c85dbe..ed61153f54da4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -40,6 +40,7 @@ case class DropHoodieTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" logInfo(s"start execute drop table command for $fullTableName") + sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) try { // drop catalog table for this hoodie table @@ -49,7 +50,6 @@ case class DropHoodieTableCommand( logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}") } - sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) logInfo(s"Finish execute drop table command for $fullTableName") Seq.empty[Row] }