diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 1eb82d97c5e95..1a9404d265a73 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -80,6 +80,7 @@ object HoodieProcedures { mapBuilder.put(RepairDeduplicateProcedure.NAME, RepairDeduplicateProcedure.builder) mapBuilder.put(RepairMigratePartitionMetaProcedure.NAME, RepairMigratePartitionMetaProcedure.builder) mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, RepairOverwriteHoodiePropsProcedure.builder) + mapBuilder.put(RunCleanProcedure.NAME, RunCleanProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala new file mode 100644 index 0000000000000..6e3d2e9dcbd71 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import java.util.function.Supplier +import org.apache.hudi.HoodieCLIUtils +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.common.util.JsonUtils +import org.apache.hudi.config.HoodieCleanConfig +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "skipLocking", DataTypes.BooleanType, false), + ProcedureParameter.optional(2, "scheduleInLine", DataTypes.BooleanType, true), + ProcedureParameter.optional(3, "cleanPolicy", DataTypes.StringType, None), + ProcedureParameter.optional(4, "retainCommits", DataTypes.IntegerType, 10) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("start_clean_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("time_taken_in_millis", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_files_deleted", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("earliest_commit_to_retain", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("bootstrap_part_metadata", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("version", DataTypes.IntegerType, nullable = true, Metadata.empty) + )) + + override def build: Procedure = new RunCleanProcedure + + /** + * Returns the input parameters of this procedure. + */ + override def parameters: Array[ProcedureParameter] = PARAMETERS + + /** + * Returns the type of rows produced by this procedure. + */ + override def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val skipLocking = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean] + val scheduleInLine = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Boolean] + val cleanPolicy = getArgValueOrDefault(args, PARAMETERS(3)) + val retainCommits = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Integer] + val basePath = getBasePath(tableName, Option.empty) + val cleanInstantTime = HoodieActiveTimeline.createNewInstantTime() + var props: Map[String, String] = Map( + HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> String.valueOf(retainCommits) + ) + if (cleanPolicy.isDefined) { + props += (HoodieCleanConfig.CLEANER_POLICY.key() -> String.valueOf(cleanPolicy.get)) + } + val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props) + val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking) + + if (hoodieCleanMeta == null) Seq(Row.empty) + else Seq(Row(hoodieCleanMeta.getStartCleanTime, + hoodieCleanMeta.getTimeTakenInMillis, + hoodieCleanMeta.getTotalFilesDeleted, + hoodieCleanMeta.getEarliestCommitToRetain, + JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata), + hoodieCleanMeta.getVersion)) + } +} + +object RunCleanProcedure { + val NAME = "run_clean" + + def builder : Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new RunCleanProcedure + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala new file mode 100644 index 0000000000000..e0d61cbb070ff --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestCleanProcedure extends HoodieSparkSqlTestBase { + + test("Test Call run_clean Procedure by Table") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey = 'id', + | type = 'cow', + | preCombineField = 'ts' + | ) + |""".stripMargin) + + spark.sql("set hoodie.parquet.max.file.size = 10000") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"update $tableName set price = 11 where id = 1") + spark.sql(s"update $tableName set price = 12 where id = 1") + spark.sql(s"update $tableName set price = 13 where id = 1") + + val result1 = spark.sql(s"call run_clean(table => '$tableName', retainCommits => 1)") + .collect() + .map(row => Seq(row.getString(0), row.getLong(1), row.getInt(2), row.getString(3), row.getString(4), row.getInt(5))) + + assertResult(1)(result1.length) + assertResult(2)(result1(0)(2)) + + checkAnswer(s"select id, name, price, ts from $tableName order by id") ( + Seq(1, "a1", 13, 1000) + ) + } + } + +}