Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.config.HoodieCleanConfig
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
Expand All @@ -33,8 +34,12 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "skip_locking", DataTypes.BooleanType, false),
ProcedureParameter.optional(2, "schedule_in_line", DataTypes.BooleanType, true),
ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, None),
ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 10)
ProcedureParameter.optional(3, "clean_policy", DataTypes.StringType, HoodieCleanConfig.CLEANER_POLICY.defaultValue()),
ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue().toInt),
ProcedureParameter.optional(5, "hours_retained", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_HOURS_RETAINED.defaultValue().toInt),
ProcedureParameter.optional(6, "file_versions_retained", DataTypes.IntegerType, HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.defaultValue().toInt),
ProcedureParameter.optional(7, "trigger_strategy", DataTypes.StringType, HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.defaultValue()),
ProcedureParameter.optional(8, "trigger_max_commits", DataTypes.IntegerType, HoodieCleanConfig.CLEAN_MAX_COMMITS.defaultValue().toInt)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
Expand Down Expand Up @@ -64,16 +69,16 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val skipLocking = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean]
val scheduleInLine = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Boolean]
val cleanPolicy = getArgValueOrDefault(args, PARAMETERS(3))
val retainCommits = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Integer]
val basePath = getBasePath(tableName, Option.empty)
val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime()
var props: Map[String, String] = Map(
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> String.valueOf(retainCommits)
val props: Map[String, String] = Map(
HoodieCleanConfig.CLEANER_POLICY.key() -> getArgValueOrDefault(args, PARAMETERS(3)).get.toString,
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(4)).get.toString,
HoodieCleanConfig.CLEANER_HOURS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(5)).get.toString,
HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> getArgValueOrDefault(args, PARAMETERS(6)).get.toString,
HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, PARAMETERS(8)).get.toString
)
if (cleanPolicy.isDefined) {
props += (HoodieCleanConfig.CLEANER_POLICY.key() -> String.valueOf(cleanPolicy.get))
}
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,75 @@ class TestCleanProcedure extends HoodieSparkProcedureTestBase {
spark.sql(s"update $tableName set price = 12 where id = 1")
spark.sql(s"update $tableName set price = 13 where id = 1")

// KEEP_LATEST_COMMITS
val result1 = spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()
.map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), row.getString(3), row.getString(4), row.getInt(5)))

assertResult(1)(result1.length)
assertResult(2)(result1(0)(2))

val result11 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(2)(result11.length)

checkAnswer(s"select id, name, price, ts from $tableName order by id") (
Seq(1, "a1", 13, 1000)
)

val result2 = spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)")
.collect()
assertResult(0)(result2.length)

// KEEP_LATEST_FILE_VERSIONS
spark.sql(s"update $tableName set price = 14 where id = 1")

val result3 = spark.sql(
s"call run_clean(table => '$tableName', clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 3)").collect()
assertResult(0)(result3.length)

val result4 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(3)(result4.length)

val result5 = spark.sql(
s"call run_clean(table => '$tableName', clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
assertResult(1)(result5.length)
assertResult(2)(result5(0)(2))

val result6 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(1)(result6.length)

checkAnswer(s"select id, name, price, ts from $tableName order by id") (
Seq(1, "a1", 14, 1000)
)

// trigger time
spark.sql(s"update $tableName set price = 15 where id = 1")

// no trigger, only has 1 commit
val result7 = spark.sql(
s"call run_clean(table => '$tableName', trigger_max_commits => 2, clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
assertResult(0)(result7.length)

val result8 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(2)(result8.length)

// trigger
val result9 = spark.sql(
s"call run_clean(table => '$tableName', trigger_max_commits => 1, clean_policy => 'KEEP_LATEST_FILE_VERSIONS', file_versions_retained => 1)").collect()
assertResult(1)(result9.length)
assertResult(1)(result9(0)(2))

val result10 = spark.sql(
s"call show_fsview_all(table => '$tableName', path_regex => '', limit => 10)").collect()
assertResult(1)(result10.length)

checkAnswer(s"select id, name, price, ts from $tableName order by id") (
Seq(1, "a1", 15, 1000)
)
}
}

}