Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package org.apache.hudi

import org.apache.hudi.avro.model.HoodieClusteringGroup
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.immutable.Map
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}

object HoodieCLIUtils {

Expand All @@ -46,4 +46,15 @@ object HoodieCLIUtils {
DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
metaClient.getTableConfig.getTableName, finalParameters.asJava)
}

def extractPartitions(clusteringGroups: Seq[HoodieClusteringGroup]): String = {
var partitionPaths: Seq[String] = Seq.empty
clusteringGroups.foreach(g =>
g.getSlices.asScala.foreach(slice =>
partitionPaths = partitionPaths :+ slice.getPartitionPath
)
)

partitionPaths.sorted.mkString(",")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package org.apache.spark.sql.hudi.command

import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, RunCompactionProcedure}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -50,10 +48,5 @@ case class CompactionHoodiePathCommand(path: String,
RunCompactionProcedure.builder.get().build.call(procedureArgs)
}

override val output: Seq[Attribute] = {
operation match {
case RUN => Seq.empty
case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)())
}
}
override val output: Seq[Attribute] = RunCompactionProcedure.builder.get().build.outputType.toAttributes
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.spark.sql.hudi.command

import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.CompactionOperation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.hudi.command.procedures.RunCompactionProcedure
import org.apache.spark.sql.{Row, SparkSession}

@Deprecated
Expand All @@ -35,10 +35,5 @@ case class CompactionHoodieTableCommand(table: CatalogTable,
CompactionHoodiePathCommand(basePath, operation, instantTimestamp).run(sparkSession)
}

override val output: Seq[Attribute] = {
operation match {
case RUN => Seq.empty
case SCHEDULE => Seq(AttributeReference("instant", StringType, nullable = false)())
}
}
override val output: Seq[Attribute] = RunCompactionProcedure.builder.get().build.outputType.toAttributes
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package org.apache.spark.sql.hudi.command

import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedureUtils, ShowCompactionProcedure}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -42,11 +40,5 @@ case class CompactionShowHoodiePathCommand(path: String, limit: Int)
ShowCompactionProcedure.builder.get().build.call(procedureArgs)
}

override val output: Seq[Attribute] = {
Seq(
AttributeReference("instant", StringType, nullable = false)(),
AttributeReference("action", StringType, nullable = false)(),
AttributeReference("size", IntegerType, nullable = false)()
)
}
override val output: Seq[Attribute] = ShowCompactionProcedure.builder.get().build.outputType.toAttributes
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.sql.hudi.command

import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.hudi.command.procedures.ShowCompactionProcedure
import org.apache.spark.sql.{Row, SparkSession}

@Deprecated
Expand All @@ -32,11 +32,5 @@ case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int)
CompactionShowHoodiePathCommand(basePath, limit).run(sparkSession)
}

override val output: Seq[Attribute] = {
Seq(
AttributeReference("timestamp", StringType, nullable = false)(),
AttributeReference("action", StringType, nullable = false)(),
AttributeReference("size", IntegerType, nullable = false)()
)
}
override val output: Seq[Attribute] = ShowCompactionProcedure.builder.get().build.outputType.toAttributes
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.common.util.{ClusteringUtils, Option => HOption}
Expand All @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.types._

import java.util.function.Supplier

import scala.collection.JavaConverters._

class RunClusteringProcedure extends BaseProcedure
Expand All @@ -50,13 +51,15 @@ class RunClusteringProcedure extends BaseProcedure
ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "predicate", DataTypes.StringType, None),
ProcedureParameter.optional(3, "order", DataTypes.StringType, None)
ProcedureParameter.optional(3, "order", DataTypes.StringType, None),
ProcedureParameter.optional(4, "show_involved_partition", DataTypes.BooleanType, false)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty)
StructField("input_group_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("state", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty)
))

def parameters: Array[ProcedureParameter] = PARAMETERS
Expand All @@ -70,6 +73,7 @@ class RunClusteringProcedure extends BaseProcedure
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val predicate = getArgValueOrDefault(args, PARAMETERS(2))
val orderColumns = getArgValueOrDefault(args, PARAMETERS(3))
val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean]

val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
Expand Down Expand Up @@ -114,7 +118,27 @@ class RunClusteringProcedure extends BaseProcedure
pendingClustering.foreach(client.cluster(_, true))
logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
s" time cost: ${System.currentTimeMillis() - startTs}ms.")
Seq.empty[Row]

val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
.toSeq
.sortBy(f => f.getTimestamp)
.reverse

val clusteringPlans = clusteringInstants.map(instant =>
ClusteringUtils.getClusteringPlan(metaClient, instant)
)

if (showInvolvedPartitions) {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
}
} else {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
}
}
}

override def build: Procedure = new RunClusteringProcedure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.util.{HoodieTimer, Option => HOption}
import org.apache.hudi.common.util.{CompactionUtils, HoodieTimer, Option => HOption}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
Expand All @@ -47,7 +46,9 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty)
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("operation_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("state", DataTypes.StringType, nullable = true, Metadata.empty)
))

def parameters: Array[ProcedureParameter] = PARAMETERS
Expand All @@ -66,13 +67,12 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)

var willCompactionInstants: Seq[String] = Seq.empty
operation match {
case "schedule" =>
val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
Seq(Row(instantTime))
} else {
Seq.empty[Row]
willCompactionInstants = Seq(instantTime)
}
case "run" =>
// Do compaction
Expand All @@ -81,7 +81,7 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.map(_.getTimestamp)
.toSeq.sortBy(f => f)
val willCompactionInstants = if (instantTimestamp.isEmpty) {
willCompactionInstants = if (instantTimestamp.isEmpty) {
if (pendingCompactionInstants.nonEmpty) {
pendingCompactionInstants
} else { // If there are no pending compaction, schedule to generate one.
Expand All @@ -102,9 +102,9 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
}
}

if (willCompactionInstants.isEmpty) {
logInfo(s"No need to compaction on $basePath")
Seq.empty[Row]
} else {
logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
val timer = new HoodieTimer
Expand All @@ -116,10 +116,21 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
}
logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
s" spend: ${timer.endTimer()}ms")
Seq.empty[Row]
}
case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
}

val compactionInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
.filter(instant => willCompactionInstants.contains(instant.getTimestamp))
.toSeq
.sortBy(p => p.getTimestamp)
.reverse

compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
).map { case (instant, plan) =>
Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
}
}

private def handleResponse(metadata: HoodieCommitMetadata): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@

package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.ClusteringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

import java.util.function.Supplier

import scala.collection.JavaConverters._

class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20)
ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
ProcedureParameter.optional(3, "show_involved_partition", DataTypes.BooleanType, false)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("groups", DataTypes.IntegerType, nullable = true, Metadata.empty)
StructField("input_group_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("state", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("involved_partitions", DataTypes.StringType, nullable = true, Metadata.empty)
))

def parameters: Array[ProcedureParameter] = PARAMETERS
Expand All @@ -49,12 +54,32 @@ class ShowClusteringProcedure extends BaseProcedure with ProcedureBuilder with S
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
val showInvolvedPartitions = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean]

val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala.map { p =>
Row(p.getLeft.getTimestamp, p.getRight.getInputGroups.size())
}.toSeq.take(limit)
val clusteringInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION)
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
.take(limit)

val clusteringPlans = clusteringInstants.map(instant =>
ClusteringUtils.getClusteringPlan(metaClient, instant)
)

if (showInvolvedPartitions) {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
}
} else {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
p.get().getLeft.getState.name(), "*")
}
}
}

override def build: Procedure = new ShowClusteringProcedure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("timestamp", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("size", DataTypes.IntegerType, nullable = true, Metadata.empty)
StructField("operation_size", DataTypes.IntegerType, nullable = true, Metadata.empty),
StructField("state", DataTypes.StringType, nullable = true, Metadata.empty)
))

def parameters: Array[ProcedureParameter] = PARAMETERS
Expand All @@ -64,17 +64,17 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S

assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Cannot show compaction on a Non Merge On Read table.")
val timeLine = metaClient.getActiveTimeline
val compactionInstants = timeLine.getInstants.iterator().asScala
val compactionInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
.take(limit)
val compactionPlans = compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp)))
compactionPlans.map { case (instant, plan) =>
Row(instant.getTimestamp, instant.getAction, plan.getOperations.size())

compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
).map { case (instant, plan) =>
Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
}
}

Expand Down
Loading