diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala index 87d58fa6ed099..8d73a753cf4c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala @@ -48,6 +48,8 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log val tableName = getArgValueOrDefault(args, PARAMETERS(0)) val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] val basePath = getBasePath(tableName) + val instantTimes = instantTime.split(",") + var currentInstant = "" var client: SparkRDDWriteClient[_] = null val result = Try { @@ -56,14 +58,17 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log val config = client.getConfig val context = client.getEngineContext val table = HoodieSparkTable.create(config, context) - WriteMarkersFactory.get(config.getMarkersType, table, instantTime) - .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism) + for (it <- instantTimes) { + currentInstant = it + WriteMarkersFactory.get(config.getMarkersType, table, it) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism) + } } match { case Success(_) => logInfo(s"Marker $instantTime deleted.") true case Failure(e) => - logWarning(s"Failed: Could not clean marker instantTime: $instantTime.", e) + logWarning(s"Failed: Could not clean marker instantTime: $currentInstant.", e) false } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala index 0e92abc497768..d568566e55469 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala @@ -58,25 +58,28 @@ class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with if (StringUtils.isNullOrEmpty(instantTime)) { instantTime = completedInstants.lastInstant.get.getTimestamp } - val savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, instantTime) - - if (!completedInstants.containsInstant(savePoint)) { - throw new HoodieException("Commit " + instantTime + " not found in Commits " + completedInstants) - } - + val instantTimes = instantTime.split(",") val client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty, tableName.asInstanceOf[Option[String]]) - var result = false - - try { - client.deleteSavepoint(instantTime) - logInfo(s"The commit $instantTime has been deleted savepoint.") - result = true - } catch { - case _: HoodieSavepointException => - logWarning(s"Failed: Could not delete savepoint $instantTime.") - } finally { - client.close() + var result = true + var currentInstant = "" + for (it <- instantTimes) { + val savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, it) + currentInstant = it + if (!completedInstants.containsInstant(savePoint)) { + throw new HoodieException("Commit " + it + " not found in Commits " + completedInstants) + } + + try { + client.deleteSavepoint(it) + logInfo(s"The commit $instantTime has been deleted savepoint.") + } catch { + case _: HoodieSavepointException => + logWarning(s"Failed: Could not delete savepoint $currentInstant.") + result = false + } finally { + client.close() + } } Seq(Row(result)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala index 5b90e26681972..30bec0f8a9ceb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala @@ -209,6 +209,50 @@ class TestCallProcedure extends HoodieSparkProcedureTestBase { } } + test("Test Call delete_marker Procedure with batch mode") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // Check required fields + checkExceptionContain(s"""call delete_marker(table => '$tableName')""")( + s"Argument: instant_time is required") + + var instantTime = "101" + FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0", IOType.APPEND) + assertResult(1) { + FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND) + } + instantTime = "102" + FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0", IOType.APPEND) + assertResult(1) { + FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND) + } + + instantTime = "101,102" + checkAnswer(s"""call delete_marker(table => '$tableName', instant_time => '$instantTime')""")(Seq(true)) + + assertResult(0) { + FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND) + } + } + } + test("Test Call show_rollbacks Procedure") { withTempDir { tmp => val tableName = generateTableName diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala index c8fa10bde2c67..af31cd4bb2c4a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala @@ -214,6 +214,77 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase { } } + test("Test Call delete_savepoint Procedure with batch mode") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500") + + val commits = spark.sql(s"""call show_commits(table => '$tableName')""").collect() + assertResult(4) { + commits.length + } + + // create 4 savepoints + commits.foreach(r => { + checkAnswer(s"""call create_savepoint('$tableName', '${r.getString(0)}')""")(Seq(true)) + }) + + // Delete 2 savepoint with table name and instant time + val toDeleteInstant = s"${commits.apply(1).getString(0)},${commits.apply(0).getString(0)}" + checkAnswer(s"""call delete_savepoint('$tableName', '${toDeleteInstant}')""")(Seq(true)) + + // show_savepoints should return two savepoint + var savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect() + assertResult(2) { + savepoints.length + } + + assertResult(commits(2).getString(0))(savepoints(0).getString(0)) + assertResult(commits(3).getString(0))(savepoints(1).getString(0)) + + // Delete a savepoint with table name and latest savepoint time + checkAnswer(s"""call delete_savepoint('$tableName', '')""")(Seq(true)) + + // show_savepoints should return one savepoint + savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect() + assertResult(1) { + savepoints.length + } + + assertResult(commits(3).getString(0))(savepoints(0).getString(0)) + + // Delete a savepoint with table base path and latest savepoint time + checkAnswer(s"""call delete_savepoint(path => '$tablePath')""".stripMargin)(Seq(true)) + + // show_savepoints should return zero savepoint + savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect() + assertResult(0) { + savepoints.length + } + } + } + test("Test Call rollback_to_savepoint Procedure") { withTempDir { tmp => val tableName = generateTableName