From 1378f6ce813b4fea31f1408c615e2449198ac970 Mon Sep 17 00:00:00 2001 From: shibei Date: Wed, 18 May 2022 11:51:37 +0800 Subject: [PATCH 1/2] Unify clustering/compaction related procedures' output type --- .../org/apache/hudi/HoodieCLIUtils.scala | 15 +++- .../command/CompactionHoodiePathCommand.scala | 11 +-- .../CompactionHoodieTableCommand.scala | 13 ++-- .../CompactionShowHoodiePathCommand.scala | 12 +--- .../CompactionShowHoodieTableCommand.scala | 12 +--- .../procedures/RunClusteringProcedure.scala | 34 +++++++-- .../procedures/RunCompactionProcedure.scala | 29 +++++--- .../procedures/ShowClusteringProcedure.scala | 37 ++++++++-- .../procedures/ShowCompactionProcedure.scala | 16 ++--- .../procedure/TestClusteringProcedure.scala | 64 ++++++++++------- .../procedure/TestCompactionProcedure.scala | 70 ++++++++++++++----- 11 files changed, 203 insertions(+), 110 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala index 58c33248234c2..552e3cfc9b9c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala @@ -19,14 +19,14 @@ package org.apache.hudi +import org.apache.hudi.avro.model.HoodieClusteringGroup import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf -import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.collection.immutable.Map +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter} object HoodieCLIUtils { @@ -46,4 +46,15 @@ object HoodieCLIUtils { DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath, metaClient.getTableConfig.getTableName, finalParameters.asJava) } + + def extractPartitions(clusteringGroups: Seq[HoodieClusteringGroup]): String = { + var partitionPaths: Seq[String] = Seq.empty + clusteringGroups.foreach(g => + g.getSlices.asScala.foreach(slice => + partitionPaths = partitionPaths :+ slice.getPartitionPath + ) + ) + + partitionPaths.sorted.mkString(",") + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index 5b513f7500c10..57aff092b7429 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -19,11 +19,9 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableMetaClient - -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, RunCompactionProcedure} -import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.unsafe.types.UTF8String @@ -50,10 +48,5 @@ case class CompactionHoodiePathCommand(path: String, RunCompactionProcedure.builder.get().build.call(procedureArgs) } - override val output: Seq[Attribute] = { - operation match { - case RUN => Seq.empty - case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)()) - } - } + override val output: Seq[Attribute] = RunCompactionProcedure.builder.get().build.outputType.toAttributes } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala index 5e362314c2df7..adaaeae9e55c9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.hudi.command import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.CompactionOperation import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.hudi.command.procedures.RunCompactionProcedure import org.apache.spark.sql.{Row, SparkSession} @Deprecated @@ -35,10 +35,5 @@ case class CompactionHoodieTableCommand(table: CatalogTable, CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession) } - override val output: Seq[Attribute] = { - operation match { - case RUN => Seq.empty - case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)()) - } - } + override val output: Seq[Attribute] = RunCompactionProcedure.builder.get().build.outputType.toAttributes } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala index 965724163b96c..95a4ecf7800e6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodiePathCommand.scala @@ -19,10 +19,8 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableMetaClient - -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, ShowCompactionProcedure} -import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.unsafe.types.UTF8String @@ -42,11 +40,5 @@ case class CompactionShowHoodiePathCommand(path: String, limit: Int) ShowCompactionProcedure.builder.get().build.call(procedureArgs) } - override val output: Seq[Attribute] = { - Seq( - AttributeReference("instant", StringType, nullable = false)(), - AttributeReference("action", StringType, nullable = false)(), - AttributeReference("size", IntegerType, nullable = false)() - ) - } + override val output: Seq[Attribute] = ShowCompactionProcedure.builder.get().build.outputType.toAttributes } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala index f3f0a8e529be9..afd15d5153db6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.hudi.command import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation -import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.sql.hudi.command.procedures.ShowCompactionProcedure import org.apache.spark.sql.{Row, SparkSession} @Deprecated @@ -32,11 +32,5 @@ case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int) CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession) } - override val output: Seq[Attribute] = { - Seq( - AttributeReference("timestamp", StringType, nullable = false)(), - AttributeReference("action", StringType, nullable = false)(), - AttributeReference("size", IntegerType, nullable = false)() - ) - } + override val output: Seq[Attribute] = ShowCompactionProcedure.builder.get().build.outputType.toAttributes } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index 231d0939cc2e7..b353aebe50ac9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption} @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.datasources.FileStatusCache import org.apache.spark.sql.types._ import java.util.function.Supplier + import scala.collection.JavaConverters._ class RunClusteringProcedure extends BaseProcedure @@ -50,13 +51,15 @@ class RunClusteringProcedure extends BaseProcedure ProcedureParameter.optional(0, "table", DataTypes.StringType, None), ProcedureParameter.optional(1, "path", DataTypes.StringType, None), ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None), - ProcedureParameter.optional(3, "order", DataTypes.StringType, None) + ProcedureParameter.optional(3, "order", DataTypes.StringType, None), + ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false) ) private val OUTPUT_TYPE = new StructType(Array[StructField]( StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), - StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), - StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty) + StructField("input_group_size", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty) )) def parameters: Array[ProcedureParameter] = PARAMETERS @@ -70,6 +73,7 @@ class RunClusteringProcedure extends BaseProcedure val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) val predicate = getArgValueOrDefault(args, PARAMETERS(2)) val orderColumns = getArgValueOrDefault(args, PARAMETERS(3)) + val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean] val basePath: String = getBasePath(tableName, tablePath) val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build @@ -114,7 +118,27 @@ class RunClusteringProcedure extends BaseProcedure pendingClustering.foreach(client.cluster(_, true)) logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," + s" time cost: ${System.currentTimeMillis() - startTs}ms.") - Seq.empty[Row] + + val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp)) + .toSeq + .sortBy(f => f.getTimestamp) + .reverse + + val clusteringPlans = clusteringInstants.map(instant => + ClusteringUtils.getClusteringPlan(metaClient, instant) + ) + + if (showInvolvedPartitions) { + clusteringPlans.map { p => + Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), + p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala)) + } + } else { + clusteringPlans.map { p => + Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*") + } + } } override def build: Procedure = new RunClusteringProcedure() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala index 9bca33f3882d4..3e5a7e29e4022 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.common.model.HoodieCommitMetadata import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} -import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} +import org.apache.hudi.common.util.{CompactionUtils, HoodieTimer, Option => HOption} import org.apache.hudi.exception.HoodieException import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport} - import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types._ @@ -47,7 +46,9 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp ) private val OUTPUT_TYPE = new StructType(Array[StructField]( - StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty) + StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_size", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty) )) def parameters: Array[ProcedureParameter] = PARAMETERS @@ -66,13 +67,12 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty) + var willCompactionInstants: Seq[String] = Seq.empty operation match { case "schedule" => val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime) if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) { - Seq(Row(instantTime)) - } else { - Seq.empty[Row] + willCompactionInstants = Seq(instantTime) } case "run" => // Do compaction @@ -81,7 +81,7 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) .map(_.getTimestamp) .toSeq.sortBy(f => f) - val willCompactionInstants = if (instantTimestamp.isEmpty) { + willCompactionInstants = if (instantTimestamp.isEmpty) { if (pendingCompactionInstants.nonEmpty) { pendingCompactionInstants } else { // If there are no pending compaction, schedule to generate one. @@ -102,9 +102,9 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ") } } + if (willCompactionInstants.isEmpty) { logInfo(s"No need to compaction on $basePath") - Seq.empty[Row] } else { logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath") val timer = new HoodieTimer @@ -116,10 +116,21 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp } logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," + s" spend: ${timer.endTimer()}ms") - Seq.empty[Row] } case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation") } + + val compactionInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala + .filter(instant => willCompactionInstants.contains(instant.getTimestamp)) + .toSeq + .sortBy(p => p.getTimestamp) + .reverse + + compactionInstants.map(instant => + (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)) + ).map { case (instant, plan) => + Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name()) + } } private def handleResponse(metadata: HoodieCommitMetadata): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala index a9d808217c0a9..092610119e606 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala @@ -17,26 +17,31 @@ package org.apache.spark.sql.hudi.command.procedures -import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport} import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.util.ClusteringUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import java.util.function.Supplier + import scala.collection.JavaConverters._ class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { private val PARAMETERS = Array[ProcedureParameter]( ProcedureParameter.optional(0, "table", DataTypes.StringType, None), ProcedureParameter.optional(1, "path", DataTypes.StringType, None), - ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20) + ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20), + ProcedureParameter.optional(3, "show_involved_partition", DataTypes.BooleanType, false) ) private val OUTPUT_TYPE = new StructType(Array[StructField]( StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), - StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty) + StructField("input_group_size", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty) )) def parameters: Array[ProcedureParameter] = PARAMETERS @@ -49,12 +54,32 @@ class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with S val tableName = getArgValueOrDefault(args, PARAMETERS(0)) val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int] + val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean] val basePath: String = getBasePath(tableName, tablePath) val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build - ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala.map { p => - Row(p.getLeft.getTimestamp, p.getRight.getInputGroups.size()) - }.toSeq.take(limit) + val clusteringInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + .sortBy(f => f.getTimestamp) + .reverse + .take(limit) + + val clusteringPlans = clusteringInstants.map(instant => + ClusteringUtils.getClusteringPlan(metaClient, instant) + ) + + if (showInvolvedPartitions) { + clusteringPlans.map { p => + Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), + p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala)) + } + } else { + clusteringPlans.map { p => + Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), + p.get().getLeft.getState.name(), "*") + } + } } override def build: Procedure = new ShowClusteringProcedure() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala index d484d65323447..7a7bb2cf9d996 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala @@ -44,8 +44,8 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S private val OUTPUT_TYPE = new StructType(Array[StructField]( StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty), - StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), - StructField("size", DataTypes.IntegerType, nullable = true, Metadata.empty) + StructField("operation_size", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty) )) def parameters: Array[ProcedureParameter] = PARAMETERS @@ -64,17 +64,17 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Cannot show compaction on a Non Merge On Read table.") - val timeLine = metaClient.getActiveTimeline - val compactionInstants = timeLine.getInstants.iterator().asScala + val compactionInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) .toSeq .sortBy(f => f.getTimestamp) .reverse .take(limit) - val compactionPlans = compactionInstants.map(instant => - (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))) - compactionPlans.map { case (instant, plan) => - Row(instant.getTimestamp, instant.getAction, plan.getOperations.size()) + + compactionInstants.map(instant => + (instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)) + ).map { case (instant, plan) => + Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name()) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala index f975651bd7527..25c0dd20bb66f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql.hudi.procedure import org.apache.hadoop.fs.Path -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline} import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers} - import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase import scala.collection.JavaConverters.asScalaIteratorConverter @@ -64,28 +63,22 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty()) checkAnswer(s"call show_clustering('$tableName')")( - Seq(firstScheduleInstant, 3), - Seq(secondScheduleInstant, 1) + Seq(secondScheduleInstant, 1, HoodieInstant.State.REQUESTED.name(), "*"), + Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*") ) // Do clustering for all clustering plan generated above, and no new clustering // instant will be generated because of there is no commit after the second // clustering plan generated - spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") + checkAnswer(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)")( + Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "ts=1003"), + Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001,ts=1002") + ) // No new commits val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, secondScheduleInstant)) - checkAnswer(s"select id, name, price, ts from $tableName order by id")( - Seq(1, "a1", 10.0, 1000), - Seq(2, "a2", 10.0, 1001), - Seq(3, "a3", 10.0, 1002), - Seq(4, "a4", 10.0, 1003) - ) - // After clustering there should be no pending clustering. - checkAnswer(s"call show_clustering(table => '$tableName')")() - // Check the number of finished clustering instants val finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) .getInstants @@ -94,10 +87,23 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { .toSeq assertResult(2)(finishedClustering.size) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003) + ) + + // After clustering there should be no pending clustering and all clustering instants should be completed + checkAnswer(s"call show_clustering(table => '$tableName')")( + Seq(secondScheduleInstant, 1, HoodieInstant.State.COMPLETED.name(), "*"), + Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "*") + ) + // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") spark.sql(s"insert into $tableName values(6, 'a6', 10, 1005)") - spark.sql(s"call run_clustering(table => '$tableName', order => 'ts')") + spark.sql(s"call run_clustering(table => '$tableName', order => 'ts', show_involved_partition => true)").show() val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) .findInstantsAfter(secondScheduleInstant) @@ -142,7 +148,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { | location '$basePath' """.stripMargin) - spark.sql(s"call run_clustering(path => '$basePath')") + spark.sql(s"call run_clustering(path => '$basePath')").show() checkAnswer(s"call show_clustering(path => '$basePath')")() spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") @@ -152,18 +158,22 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { // Generate the first clustering plan val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) - checkAnswer(s"call show_clustering(path => '$basePath')")( - Seq(firstScheduleInstant, 3) + checkAnswer(s"call show_clustering(path => '$basePath', show_involved_partition => true)")( + Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "ts=1000,ts=1001,ts=1002") ) // Do clustering for all the clustering plan - spark.sql(s"call run_clustering(path => '$basePath', order => 'ts')") + checkAnswer(s"call run_clustering(path => '$basePath', order => 'ts')")( + Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), "*") + ) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), Seq(2, "a2", 10.0, 1001), Seq(3, "a3", 10.0, 1002) ) + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) - HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant) + assertResult(false)(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, firstScheduleInstant)) // Check the number of finished clustering instants var finishedClustering = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) @@ -176,7 +186,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')") + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')").show() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), Seq(2, "a2", 10.0, 1001), @@ -230,7 +240,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { )("Only partition predicates are allowed") // Do clustering table with partition predicate - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')") + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')").show() // There is 1 completed clustering instant val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) @@ -247,7 +257,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { assertResult(2)(clusteringPlan.get().getInputGroups.size()) // No pending clustering instant - checkAnswer(s"call show_clustering(table => '$tableName')")() + spark.sql(s"call show_clustering(table => '$tableName')").show() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), @@ -267,7 +277,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { )("Only partition predicates are allowed") // Do clustering table with partition predicate - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts')") + spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts')").show() // There are 2 completed clustering instants val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) @@ -284,7 +294,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { assertResult(4)(clusteringPlan.get().getInputGroups.size()) // No pending clustering instant - checkAnswer(s"call show_clustering(table => '$tableName')")() + spark.sql(s"call show_clustering(table => '$tableName')").show() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), @@ -308,7 +318,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { )("Only partition predicates are allowed") // Do clustering table with partition predicate - spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts')") + spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts')").show() // There are 3 completed clustering instants val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) @@ -325,7 +335,7 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { assertResult(3)(clusteringPlan.get().getInputGroups.size()) // No pending clustering instant - checkAnswer(s"call show_clustering(table => '$tableName')")() + spark.sql(s"call show_clustering(table => '$tableName')").show() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala index 0f6f96f91196f..a5c09f9349f24 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala @@ -48,22 +48,48 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase { spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)") spark.sql(s"update $tableName set price = 11 where id = 1") - spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')") + // Schedule the first compaction + val firstResult = spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) + spark.sql(s"update $tableName set price = 12 where id = 2") - spark.sql(s"call run_compaction('schedule', '$tableName')") - val compactionRows = spark.sql(s"call show_compaction(table => '$tableName', limit => 10)").collect() - val timestamps = compactionRows.map(_.getString(0)) + + // Schedule the second compaction + val secondResult = spark.sql(s"call run_compaction('schedule', '$tableName')") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) + + assertResult(1)(firstResult.length) + assertResult(1)(secondResult.length) + val showCompactionSql: String = s"call show_compaction(table => '$tableName', limit => 10)" + checkAnswer(showCompactionSql)( + firstResult(0), + secondResult(0) + ) + + val compactionRows = spark.sql(showCompactionSql).collect() + val timestamps = compactionRows.map(_.getString(0)).sorted assertResult(2)(timestamps.length) - spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})") + // Execute the second scheduled compaction instant actually + spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})").show() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 11.0, 1000), Seq(2, "a2", 12.0, 1000), Seq(3, "a3", 10.0, 1000), Seq(4, "a4", 10.0, 1000) ) - assertResult(1)(spark.sql(s"call show_compaction('$tableName')").collect().length) - spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})") + + // A compaction action eventually becomes commit when completed, so show_compaction + // can only see the first scheduled compaction instant + val thirdResult = spark.sql(s"call show_compaction('$tableName')") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) + assertResult(1)(thirdResult.length) + assertResult(firstResult)(thirdResult) + + spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})").show() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 11.0, 1000), Seq(2, "a2", 12.0, 1000), @@ -98,25 +124,37 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase { spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") spark.sql(s"update $tableName set price = 11 where id = 1") - spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')") + spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')").show() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 11.0, 1000), Seq(2, "a2", 10.0, 1000), Seq(3, "a3", 10.0, 1000) ) assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) - // schedule compaction first + spark.sql(s"update $tableName set price = 12 where id = 1") - spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')") - // schedule compaction second + // Schedule the first compaction + val firstResult = spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) + spark.sql(s"update $tableName set price = 12 where id = 2") - spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')") - // show compaction - assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) - // run compaction for all the scheduled compaction - spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')") + // Schedule the second compaction + val secondResult = spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) + + assertResult(1)(firstResult.length) + assertResult(1)(secondResult.length) + checkAnswer(s"call show_compaction(path => '${tmp.getCanonicalPath}')")( + firstResult(0), + secondResult(0) + ) + + // Run compaction for all the scheduled compaction + spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')").show() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 12.0, 1000), From 21b8e9510a4feed0c19f8ebdac906a23fad8b202 Mon Sep 17 00:00:00 2001 From: shibei Date: Wed, 18 May 2022 22:19:27 +0800 Subject: [PATCH 2/2] Address review comments --- .../procedure/TestClusteringProcedure.scala | 53 +++++++++++++++---- .../procedure/TestCompactionProcedure.scala | 46 +++++++++------- 2 files changed, 70 insertions(+), 29 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala index 25c0dd20bb66f..df4d8c90e2e6f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala @@ -186,7 +186,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L')").show() + val resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts >= 1003L', show_involved_partition => true)") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3))) + assertResult(1)(resultA.length) + assertResult("ts=1003,ts=1004")(resultA(0)(3)) + checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), Seq(2, "a2", 10.0, 1001), @@ -230,6 +235,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) // Test partition pruning with single predicate + var resultA: Array[Seq[Any]] = Array.empty + { spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") @@ -240,7 +247,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { )("Only partition predicates are allowed") // Do clustering table with partition predicate - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts')").show() + resultA = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts <= 1001L', order => 'ts', show_involved_partition => true)") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3))) + assertResult(1)(resultA.length) + assertResult("ts=1000,ts=1001")(resultA(0)(3)) // There is 1 completed clustering instant val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) @@ -255,9 +266,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { val clusteringPlan = HoodieDataSourceHelpers.getClusteringPlan(fs, basePath, clusteringInstant.getTimestamp) assertResult(true)(clusteringPlan.isPresent) assertResult(2)(clusteringPlan.get().getInputGroups.size()) + assertResult(resultA(0)(1))(clusteringPlan.get().getInputGroups.size()) - // No pending clustering instant - spark.sql(s"call show_clustering(table => '$tableName')").show() + // All clustering instants are completed + checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")( + Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001") + ) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), @@ -267,6 +281,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { } // Test partition pruning with {@code And} predicates + var resultB: Array[Seq[Any]] = Array.empty + { spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") @@ -277,7 +293,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { )("Only partition predicates are allowed") // Do clustering table with partition predicate - spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts')").show() + resultB = spark.sql(s"call run_clustering(table => '$tableName', predicate => 'ts > 1001L and ts <= 1005L', order => 'ts', show_involved_partition => true)") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3))) + assertResult(1)(resultB.length) + assertResult("ts=1002,ts=1003,ts=1004,ts=1005")(resultB(0)(3)) // There are 2 completed clustering instants val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) @@ -293,8 +313,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { assertResult(true)(clusteringPlan.isPresent) assertResult(4)(clusteringPlan.get().getInputGroups.size()) - // No pending clustering instant - spark.sql(s"call show_clustering(table => '$tableName')").show() + // All clustering instants are completed + checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")( + Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"), + Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005") + ) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), @@ -307,6 +330,8 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { } // Test partition pruning with {@code And}-{@code Or} predicates + var resultC: Array[Seq[Any]] = Array.empty + { spark.sql(s"insert into $tableName values(7, 'a7', 10, 1006)") spark.sql(s"insert into $tableName values(8, 'a8', 10, 1007)") @@ -318,7 +343,11 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { )("Only partition predicates are allowed") // Do clustering table with partition predicate - spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts')").show() + resultC = spark.sql(s"call run_clustering(table => '$tableName', predicate => '(ts >= 1006L and ts < 1008L) or ts >= 1009L', order => 'ts', show_involved_partition => true)") + .collect() + .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), row.getString(3))) + assertResult(1)(resultC.length) + assertResult("ts=1006,ts=1007,ts=1009")(resultC(0)(3)) // There are 3 completed clustering instants val clusteringInstants = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) @@ -334,8 +363,12 @@ class TestClusteringProcedure extends HoodieSparkSqlTestBase { assertResult(true)(clusteringPlan.isPresent) assertResult(3)(clusteringPlan.get().getInputGroups.size()) - // No pending clustering instant - spark.sql(s"call show_clustering(table => '$tableName')").show() + // All clustering instants are completed + checkAnswer(s"call show_clustering(table => '$tableName', show_involved_partition => true)")( + Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1000,ts=1001"), + Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1002,ts=1003,ts=1004,ts=1005"), + Seq(resultC(0).head, resultC(0)(1), HoodieInstant.State.COMPLETED.name(), "ts=1006,ts=1007,ts=1009") + ) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 10.0, 1000), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala index a5c09f9349f24..39332d859171d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.procedure +import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase class TestCompactionProcedure extends HoodieSparkSqlTestBase { @@ -49,23 +50,23 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase { spark.sql(s"update $tableName set price = 11 where id = 1") // Schedule the first compaction - val firstResult = spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')") + val resultA = spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')") .collect() .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) spark.sql(s"update $tableName set price = 12 where id = 2") // Schedule the second compaction - val secondResult = spark.sql(s"call run_compaction('schedule', '$tableName')") + val resultB = spark.sql(s"call run_compaction('schedule', '$tableName')") .collect() .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) - assertResult(1)(firstResult.length) - assertResult(1)(secondResult.length) + assertResult(1)(resultA.length) + assertResult(1)(resultB.length) val showCompactionSql: String = s"call show_compaction(table => '$tableName', limit => 10)" checkAnswer(showCompactionSql)( - firstResult(0), - secondResult(0) + resultA(0), + resultB(0) ) val compactionRows = spark.sql(showCompactionSql).collect() @@ -73,7 +74,9 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase { assertResult(2)(timestamps.length) // Execute the second scheduled compaction instant actually - spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})").show() + checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(1)})")( + Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name()) + ) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 11.0, 1000), Seq(2, "a2", 12.0, 1000), @@ -83,13 +86,15 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase { // A compaction action eventually becomes commit when completed, so show_compaction // can only see the first scheduled compaction instant - val thirdResult = spark.sql(s"call show_compaction('$tableName')") + val resultC = spark.sql(s"call show_compaction('$tableName')") .collect() .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) - assertResult(1)(thirdResult.length) - assertResult(firstResult)(thirdResult) + assertResult(1)(resultC.length) + assertResult(resultA)(resultC) - spark.sql(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})").show() + checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})")( + Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name()) + ) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 11.0, 1000), Seq(2, "a2", 12.0, 1000), @@ -124,7 +129,7 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase { spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") spark.sql(s"update $tableName set price = 11 where id = 1") - spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')").show() + checkAnswer(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")() checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 11.0, 1000), Seq(2, "a2", 10.0, 1000), @@ -135,26 +140,29 @@ class TestCompactionProcedure extends HoodieSparkSqlTestBase { spark.sql(s"update $tableName set price = 12 where id = 1") // Schedule the first compaction - val firstResult = spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')") + val resultA = spark.sql(s"call run_compaction(op=> 'schedule', path => '${tmp.getCanonicalPath}')") .collect() .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) spark.sql(s"update $tableName set price = 12 where id = 2") // Schedule the second compaction - val secondResult = spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')") + val resultB = spark.sql(s"call run_compaction(op => 'schedule', path => '${tmp.getCanonicalPath}')") .collect() .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) - assertResult(1)(firstResult.length) - assertResult(1)(secondResult.length) + assertResult(1)(resultA.length) + assertResult(1)(resultB.length) checkAnswer(s"call show_compaction(path => '${tmp.getCanonicalPath}')")( - firstResult(0), - secondResult(0) + resultA(0), + resultB(0) ) // Run compaction for all the scheduled compaction - spark.sql(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')").show() + checkAnswer(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}')")( + Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name()), + Seq(resultB(0).head, resultB(0)(1), HoodieInstant.State.COMPLETED.name()) + ) checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 12.0, 1000),