Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val basePath = getBasePath(tableName)
val instantTimes = instantTime.split(",")
var currentInstant = ""

var client: SparkRDDWriteClient[_] = null
val result = Try {
Expand All @@ -56,14 +58,17 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
val config = client.getConfig
val context = client.getEngineContext
val table = HoodieSparkTable.create(config, context)
WriteMarkersFactory.get(config.getMarkersType, table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism)
for (it <- instantTimes) {
currentInstant = it
WriteMarkersFactory.get(config.getMarkersType, table, it)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism)
}
} match {
case Success(_) =>
logInfo(s"Marker $instantTime deleted.")
true
case Failure(e) =>
logWarning(s"Failed: Could not clean marker instantTime: $instantTime.", e)
logWarning(s"Failed: Could not clean marker instantTime: $currentInstant.", e)
false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,28 @@ class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with
if (StringUtils.isNullOrEmpty(instantTime)) {
instantTime = completedInstants.lastInstant.get.getTimestamp
}
val savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, instantTime)

if (!completedInstants.containsInstant(savePoint)) {
throw new HoodieException("Commit " + instantTime + " not found in Commits " + completedInstants)
}

val instantTimes = instantTime.split(",")
val client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty,
tableName.asInstanceOf[Option[String]])
var result = false

try {
client.deleteSavepoint(instantTime)
logInfo(s"The commit $instantTime has been deleted savepoint.")
result = true
} catch {
case _: HoodieSavepointException =>
logWarning(s"Failed: Could not delete savepoint $instantTime.")
} finally {
client.close()
var result = true
var currentInstant = ""
for (it <- instantTimes) {
val savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, it)
currentInstant = it
if (!completedInstants.containsInstant(savePoint)) {
throw new HoodieException("Commit " + it + " not found in Commits " + completedInstants)
}

try {
client.deleteSavepoint(it)
logInfo(s"The commit $instantTime has been deleted savepoint.")
} catch {
case _: HoodieSavepointException =>
logWarning(s"Failed: Could not delete savepoint $currentInstant.")
result = false
} finally {
client.close()
}
}

Seq(Row(result))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,50 @@ class TestCallProcedure extends HoodieSparkProcedureTestBase {
}
}

test("Test Call delete_marker Procedure with batch mode") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// Check required fields
checkExceptionContain(s"""call delete_marker(table => '$tableName')""")(
s"Argument: instant_time is required")

var instantTime = "101"
FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0", IOType.APPEND)
assertResult(1) {
FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND)
}
instantTime = "102"
FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0", IOType.APPEND)
assertResult(1) {
FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND)
}

instantTime = "101,102"
checkAnswer(s"""call delete_marker(table => '$tableName', instant_time => '$instantTime')""")(Seq(true))

assertResult(0) {
FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime, IOType.APPEND)
}
}
}

test("Test Call show_rollbacks Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,77 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
}
}

test("Test Call delete_savepoint Procedure with batch mode") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = tmp.getCanonicalPath + "/" + tableName
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")

val commits = spark.sql(s"""call show_commits(table => '$tableName')""").collect()
assertResult(4) {
commits.length
}

// create 4 savepoints
commits.foreach(r => {
checkAnswer(s"""call create_savepoint('$tableName', '${r.getString(0)}')""")(Seq(true))
})

// Delete 2 savepoint with table name and instant time
val toDeleteInstant = s"${commits.apply(1).getString(0)},${commits.apply(0).getString(0)}"
checkAnswer(s"""call delete_savepoint('$tableName', '${toDeleteInstant}')""")(Seq(true))

// show_savepoints should return two savepoint
var savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
assertResult(2) {
savepoints.length
}

assertResult(commits(2).getString(0))(savepoints(0).getString(0))
assertResult(commits(3).getString(0))(savepoints(1).getString(0))

// Delete a savepoint with table name and latest savepoint time
checkAnswer(s"""call delete_savepoint('$tableName', '')""")(Seq(true))

// show_savepoints should return one savepoint
savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
assertResult(1) {
savepoints.length
}

assertResult(commits(3).getString(0))(savepoints(0).getString(0))

// Delete a savepoint with table base path and latest savepoint time
checkAnswer(s"""call delete_savepoint(path => '$tablePath')""".stripMargin)(Seq(true))

// show_savepoints should return zero savepoint
savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
assertResult(0) {
savepoints.length
}
}
}

test("Test Call rollback_to_savepoint Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
Expand Down