diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 2b720bb94d2d8..7cfeaaa0b6cdb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -45,6 +45,8 @@ object HoodieProcedures { mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder) mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder) mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder) + mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder) + mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala new file mode 100644 index 0000000000000..e5cacdb0626fc --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import java.io.IOException +import java.util +import java.util.function.Supplier + +import org.apache.hudi.avro.model.HoodieRollbackMetadata +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieInstant.State +import org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline, TimelineMetadataUtils} +import org.apache.hudi.common.util.CollectionUtils +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.JavaConverters._ + +class ShowRollbacksProcedure(showDetails: Boolean) extends BaseProcedure with ProcedureBuilder { + private val ROLLBACKS_PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10) + ) + + private val ROLLBACK_PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10), + ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None) + ) + + private val ROLLBACKS_OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("rollback_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_files_deleted", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("time_taken_in_millis", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_partitions", DataTypes.IntegerType, nullable = true, Metadata.empty) + )) + + private val ROLLBACK_OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("rollback_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("deleted_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("succeeded", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = if (showDetails) ROLLBACK_PARAMETERS else ROLLBACKS_PARAMETERS + + def outputType: StructType = if (showDetails) ROLLBACK_OUTPUT_TYPE else ROLLBACKS_OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(parameters, args) + + val tableName = getArgValueOrDefault(args, parameters(0)) + val limit = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[Int] + + val basePath = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val activeTimeline = new RollbackTimeline(metaClient) + if (showDetails) { + val instantTime = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[String] + getRollbackDetail(activeTimeline, instantTime, limit) + } else { + getRollbacks(activeTimeline, limit) + } + } + + override def build: Procedure = new ShowRollbacksProcedure(showDetails) + + class RollbackTimeline(metaClient: HoodieTableMetaClient) extends HoodieActiveTimeline(metaClient, + CollectionUtils.createImmutableSet(HoodieTimeline.ROLLBACK_EXTENSION)) { + } + + def getRollbackDetail(activeTimeline: RollbackTimeline, + instantTime: String, + limit: Int): Seq[Row] = { + val rows = new util.ArrayList[Row] + val metadata = TimelineMetadataUtils.deserializeAvroMetadata(activeTimeline.getInstantDetails( + new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, instantTime)).get, classOf[HoodieRollbackMetadata]) + + metadata.getPartitionMetadata.asScala.toMap.iterator.foreach(entry => Stream + .concat(entry._2.getSuccessDeleteFiles.map(f => (f, true)), + entry._2.getFailedDeleteFiles.map(f => (f, false))) + .iterator.foreach(fileWithDeleteStatus => { + rows.add(Row(metadata.getStartRollbackTime, metadata.getCommitsRollback.toString, + entry._1, fileWithDeleteStatus._1, fileWithDeleteStatus._2)) + })) + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + def getRollbacks(activeTimeline: RollbackTimeline, + limit: Int): Seq[Row] = { + val rows = new util.ArrayList[Row] + val rollback = activeTimeline.getRollbackTimeline.filterCompletedInstants + + rollback.getInstants.iterator().asScala.foreach(instant => { + try { + val metadata = TimelineMetadataUtils.deserializeAvroMetadata(activeTimeline.getInstantDetails(instant).get, + classOf[HoodieRollbackMetadata]) + + metadata.getCommitsRollback.iterator().asScala.foreach(c => { + rows.add(Row(metadata.getStartRollbackTime, c, + metadata.getTotalFilesDeleted, metadata.getTimeTakenInMillis, + if (metadata.getPartitionMetadata != null) metadata.getPartitionMetadata.size else 0)) + }) + } catch { + case e: IOException => + throw new HoodieException(s"Failed to get rollback's info from instant ${instant.getTimestamp}.") + } + }) + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } +} + +object ShowRollbacksProcedure { + val NAME = "show_rollbacks" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowRollbacksProcedure(false) + } +} + +object ShowRollbackDetailProcedure { + val NAME = "show_rollback_detail" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowRollbacksProcedure(true) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala index 848d09ab62bd0..02b0e930fb35b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala @@ -171,4 +171,109 @@ class TestCallProcedure extends HoodieSparkSqlTestBase { } } } + + test("Test Call show_rollbacks Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + + // 3 commits are left before rollback + var commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(3){commits.length} + + // Call rollback_to_instant Procedure with Named Arguments + var instant_time = commits(0).get(0).toString + checkAnswer(s"""call rollback_to_instant(table => '$tableName', instant_time => '$instant_time')""")(Seq(true)) + // Call rollback_to_instant Procedure with Positional Arguments + instant_time = commits(1).get(0).toString + checkAnswer(s"""call rollback_to_instant('$tableName', '$instant_time')""")(Seq(true)) + + // 1 commits are left after rollback + commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(1){commits.length} + + // collect rollbacks for table + val rollbacks = spark.sql(s"""call show_rollbacks(table => '$tableName', limit => 10)""").collect() + assertResult(2) {rollbacks.length} + } + } + + test("Test Call show_rollback_detail Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000") + + // 3 commits are left before rollback + var commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(3) { + commits.length + } + + // Call rollback_to_instant Procedure with Named Arguments + var instant_time = commits(0).get(0).toString + checkAnswer(s"""call rollback_to_instant(table => '$tableName', instant_time => '$instant_time')""")(Seq(true)) + // Call rollback_to_instant Procedure with Positional Arguments + instant_time = commits(1).get(0).toString + checkAnswer(s"""call rollback_to_instant('$tableName', '$instant_time')""")(Seq(true)) + + // 1 commits are left after rollback + commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect() + assertResult(1) { + commits.length + } + + // collect rollbacks for table + val rollbacks = spark.sql(s"""call show_rollbacks(table => '$tableName', limit => 10)""").collect() + assertResult(2) { + rollbacks.length + } + + // Check required fields + checkExceptionContain(s"""call show_rollback_detail(table => '$tableName')""")( + s"Argument: instant_time is required") + + // collect rollback's info for table + instant_time = rollbacks(1).get(0).toString + val rollback = spark.sql(s"""call show_rollback_detail(table => '$tableName', instant_time => '$instant_time')""").collect() + assertResult(1) { + rollback.length + } + } + } }