Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.table.HoodieSparkTable
import org.apache.hudi.table.marker.WriteMarkersFactory
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -47,8 +48,9 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val basePath = getBasePath(tableName)

var client: SparkRDDWriteClient[_] = null
val result = Try {
val client = createHoodieClient(jsc, basePath)
client = createHoodieClient(jsc, basePath)
val config = client.getConfig
val context = client.getEngineContext
val table = HoodieSparkTable.create(config, context)
Expand All @@ -63,6 +65,10 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
false
}

if (client != null) {
client.close()
}

Seq(Row(result))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
Expand Down Expand Up @@ -52,28 +53,35 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder

val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
val basePath = hoodieCatalogTable.tableLocation
val client = createHoodieClient(jsc, basePath)
client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
val config = getWriteConfig(basePath)
val metaClient = HoodieTableMetaClient.builder
.setConf(jsc.hadoopConfiguration)
.setBasePath(config.getBasePath)
.setLoadActiveTimelineOnLoad(false)
.setConsistencyGuardConfig(config.getConsistencyGuardConfig)
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion)))
.build

val activeTimeline = metaClient.getActiveTimeline
val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
val filteredTimeline = completedTimeline.containsInstant(instantTime)
if (!filteredTimeline) {
throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline")
var client: SparkRDDWriteClient[_] = null
try {
client = createHoodieClient(jsc, basePath)
client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
val config = getWriteConfig(basePath)
val metaClient = HoodieTableMetaClient.builder
.setConf(jsc.hadoopConfiguration)
.setBasePath(config.getBasePath)
.setLoadActiveTimelineOnLoad(false)
.setConsistencyGuardConfig(config.getConsistencyGuardConfig)
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion)))
.build

val activeTimeline = metaClient.getActiveTimeline
val completedTimeline: HoodieTimeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
val filteredTimeline = completedTimeline.containsInstant(instantTime)
if (!filteredTimeline) {
throw new HoodieException(s"Commit $instantTime not found in Commits $completedTimeline")
}

val result = if (client.rollback(instantTime)) true else false
val outputRow = Row(result)

Seq(outputRow)
} finally {
if (client != null) {
client.close()
}
}

val result = if (client.rollback(instantTime)) true else false
val outputRow = Row(result)

Seq(outputRow)
}

override def build: Procedure = new RollbackToInstantTimeProcedure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.HoodieCLIUtils
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.config.HoodieCleanConfig
Expand Down Expand Up @@ -79,16 +80,24 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY.key() -> getArgValueOrDefault(args, PARAMETERS(7)).get.toString,
HoodieCleanConfig.CLEAN_MAX_COMMITS.key() -> getArgValueOrDefault(args, PARAMETERS(8)).get.toString
)
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)

if (hoodieCleanMeta == null) Seq.empty
else Seq(Row(hoodieCleanMeta.getStartCleanTime,
hoodieCleanMeta.getTimeTakenInMillis,
hoodieCleanMeta.getTotalFilesDeleted,
hoodieCleanMeta.getEarliestCommitToRetain,
JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
hoodieCleanMeta.getVersion))
var client: SparkRDDWriteClient[_] = null
try {
client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)

if (hoodieCleanMeta == null) Seq.empty
else Seq(Row(hoodieCleanMeta.getStartCleanTime,
hoodieCleanMeta.getTimeTakenInMillis,
hoodieCleanMeta.getTotalFilesDeleted,
hoodieCleanMeta.getEarliestCommitToRetain,
JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata),
hoodieCleanMeta.getVersion))
} finally {
if (client != null) {
client.close()
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
Expand Down Expand Up @@ -64,70 +65,77 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp

val basePath = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)

var willCompactionInstants: Seq[String] = Seq.empty
operation match {
case "schedule" =>
val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
willCompactionInstants = Seq(instantTime)
}
case "run" =>
// Do compaction
val timeLine = metaClient.getActiveTimeline
val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.map(_.getTimestamp)
.toSeq.sortBy(f => f)
willCompactionInstants = if (instantTimestamp.isEmpty) {
if (pendingCompactionInstants.nonEmpty) {
pendingCompactionInstants
} else { // If there are no pending compaction, schedule to generate one.
// CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
val instantTime = HoodieActiveTimeline.createNewInstantTime()
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
Seq(instantTime)

var client: SparkRDDWriteClient[_] = null
try {
client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
var willCompactionInstants: Seq[String] = Seq.empty
operation match {
case "schedule" =>
val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime)
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
willCompactionInstants = Seq(instantTime)
}
case "run" =>
// Do compaction
val timeLine = metaClient.getActiveTimeline
val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
.map(_.getTimestamp)
.toSeq.sortBy(f => f)
willCompactionInstants = if (instantTimestamp.isEmpty) {
if (pendingCompactionInstants.nonEmpty) {
pendingCompactionInstants
} else { // If there are no pending compaction, schedule to generate one.
// CompactionHoodiePathCommand will return instanceTime for SCHEDULE.
val instantTime = HoodieActiveTimeline.createNewInstantTime()
if (client.scheduleCompactionAtInstant(instantTime, HOption.empty[java.util.Map[String, String]])) {
Seq(instantTime)
} else {
Seq.empty
}
}
} else {
// Check if the compaction timestamp has exists in the pending compaction
if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
Seq(instantTimestamp.get.toString)
} else {
Seq.empty
throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
}
}
} else {
// Check if the compaction timestamp has exists in the pending compaction
if (pendingCompactionInstants.contains(instantTimestamp.get.toString)) {
Seq(instantTimestamp.get.toString)

if (willCompactionInstants.isEmpty) {
logInfo(s"No need to compaction on $basePath")
} else {
throw new IllegalArgumentException(s"Compaction instant: ${instantTimestamp.get} is not found in " +
s"$basePath, Available pending compaction instants are: ${pendingCompactionInstants.mkString(",")} ")
}
}

if (willCompactionInstants.isEmpty) {
logInfo(s"No need to compaction on $basePath")
} else {
logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
val timer = HoodieTimer.start
willCompactionInstants.foreach { compactionInstant =>
val writeResponse = client.compact(compactionInstant)
handleResponse(writeResponse.getCommitMetadata.get())
client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
logInfo(s"Run compaction at instants: [${willCompactionInstants.mkString(",")}] on $basePath")
val timer = HoodieTimer.start
willCompactionInstants.foreach { compactionInstant =>
val writeResponse = client.compact(compactionInstant)
handleResponse(writeResponse.getCommitMetadata.get())
client.commitCompaction(compactionInstant, writeResponse.getCommitMetadata.get(), HOption.empty())
}
logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
s" spend: ${timer.endTimer()}ms")
}
logInfo(s"Finish Run compaction at instants: [${willCompactionInstants.mkString(",")}]," +
s" spend: ${timer.endTimer()}ms")
}
case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
}

val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
.filter(instant => willCompactionInstants.contains(instant.getTimestamp))
.toSeq
.sortBy(p => p.getTimestamp)
.reverse

compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
).map { case (instant, plan) =>
Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
case _ => throw new UnsupportedOperationException(s"Unsupported compaction operation: $operation")
}

val compactionInstants = metaClient.reloadActiveTimeline().getInstantsAsStream.iterator().asScala
.filter(instant => willCompactionInstants.contains(instant.getTimestamp))
.toSeq
.sortBy(p => p.getTimestamp)
.reverse

compactionInstants.map(instant =>
(instant, CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp))
).map { case (instant, plan) =>
Row(instant.getTimestamp, plan.getOperations.size(), instant.getState.name())
}
} finally {
if (client != null) {
client.close()
}
}
}

Expand Down