diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala index 06fa1f449ebdf..540151bf67da0 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMetadataTableProcedure.scala @@ -44,16 +44,22 @@ class DeleteMetadataTableProcedure extends BaseProcedure with ProcedureBuilder w super.checkArgs(PARAMETERS, args) val tableName = getArgValueOrDefault(args, PARAMETERS(0)) - val basePath = getBasePath(tableName) - val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val tableNames = tableName.get.asInstanceOf[String].split(",") + var metadataPaths = "" + for (tb <- tableNames) { + val basePath = getBasePath(Option.apply(tb)) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build - try { - val metadataTableBasePath = deleteMetadataTable(metaClient, new HoodieSparkEngineContext(jsc), false) - Seq(Row(s"Deleted Metadata Table at '$metadataTableBasePath'")) - } catch { - case e: FileNotFoundException => - Seq(Row("File not found: " + e.getMessage)) + try { + val metadataTableBasePath = deleteMetadataTable(metaClient, new HoodieSparkEngineContext(jsc), false) + metadataPaths = s"$metadataPaths,$metadataTableBasePath" + Seq(Row(s"Deleted Metadata Table at '$metadataTableBasePath'")) + } catch { + case e: FileNotFoundException => + Seq(Row("File not found: " + e.getMessage)) + } } + Seq(Row(s"Deleted Metadata Table at '$metadataPaths'")) } override def build = new DeleteMetadataTableProcedure() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala index b3ce71c70eb96..44046103037d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala @@ -55,6 +55,64 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase { } } + test("Test Call create_metadata_table then create_metadata_table with mutiltables") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + val tableName_1 = generateTableName + // create table + spark.sql( + s""" + |create table $tableName_1 ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName_1' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + val tables = s"$tableName,$tableName_1" + + // The first step is delete the metadata + val ret = spark.sql(s"""call delete_metadata_table(table => '$tables')""").collect() + assertResult(1) { + ret.length + } + + // The second step is create the metadata + val createResult = spark.sql(s"""call create_metadata_table(table => '$tableName')""").collect() + assertResult(1) { + createResult.length + } + } + } + test("Test Call init_metadata_table Procedure") { withTempDir { tmp => val tableName = generateTableName