diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 2bd9ab43a747f..f126e746c785d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -31,10 +31,15 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieSparkTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -42,6 +47,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -92,6 +98,10 @@ public static class Config implements Serializable { @Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead") public Boolean runSchedule = false; + @Parameter(names = {"--retry-last-failed-clustering-job", "-rc"}, description = "Take effect when using --mode/-m scheduleAndExecute. Set true means " + + "check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.", required = false) + public Boolean retryLastFailedClusteringJob = false; + @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a cluster plan; " + "Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; " + "Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately", required = false) @@ -100,6 +110,10 @@ public static class Config implements Serializable { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + @Parameter(names = {"--job-max-processing-time-ms", "-jt"}, description = "Take effect when using --mode/-m scheduleAndExecute and --retry-last-failed-clustering-job/-rc true. " + + "If maxProcessingTimeMs passed but clustering job is still unfinished, hoodie would consider this job as failed and relaunch.", required = false) + public long maxProcessingTimeMs = 0; + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + "hoodie client for clustering") public String propsFilePath = null; @@ -216,17 +230,32 @@ private Option doSchedule(SparkRDDWriteClient clien return client.scheduleClustering(Option.empty()); } - public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { + private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { LOG.info("Step 1: Do schedule"); String schemaStr = getSchemaFromLatestInstant(); try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { + Option instantTime = Option.empty(); + + if (cfg.retryLastFailedClusteringJob) { + HoodieSparkTable table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext()); + HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights(); + if (!inflightHoodieTimeline.empty()) { + HoodieInstant inflightClusteringInstant = inflightHoodieTimeline.lastInstant().get(); + Date clusteringStartTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(inflightClusteringInstant.getTimestamp()); + if (clusteringStartTime.getTime() + cfg.maxProcessingTimeMs < System.currentTimeMillis()) { + // if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering. + LOG.info("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again."); + instantTime = Option.of(inflightHoodieTimeline.lastInstant().get().getTimestamp()); + } else { + LOG.info(inflightClusteringInstant + " might still be in progress, will trigger a new clustering job."); + } + } + } - Option instantTime = doSchedule(client); - int result = instantTime.isPresent() ? 0 : -1; - - if (result == -1) { + instantTime = instantTime.isPresent() ? instantTime : doSchedule(client); + if (!instantTime.isPresent()) { LOG.info("Couldn't generate cluster plan"); - return result; + return -1; } LOG.info("The schedule instant time is " + instantTime.get()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 014a0c140d62e..667c853abe97d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -139,9 +139,13 @@ protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, i return new HoodieDeltaStreamer(cfg, jsc); } - protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) { + protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute) { + return initialHoodieClusteringJob(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, null); + } + + protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob) { HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - clusteringInstantTime, runSchedule, scheduleAndExecute); + clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob); return new HoodieClusteringJob(jsc, scheduleClusteringConfig); } @@ -844,20 +848,24 @@ private List getAsyncServicesConfigs(int totalRecords, String autoClean, private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, String clusteringInstantTime, - boolean runSchedule) { - return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null); + Boolean runSchedule) { + return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null, null); } private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, String clusteringInstantTime, - boolean runSchedule, - String runningMode) { + Boolean runSchedule, + String runningMode, + Boolean retryLastFailedClusteringJob) { HoodieClusteringJob.Config config = new HoodieClusteringJob.Config(); config.basePath = basePath; config.clusteringInstantTime = clusteringInstantTime; config.runSchedule = runSchedule; config.propsFilePath = dfsBasePath + "/clusteringjob.properties"; config.runningMode = runningMode; + if (retryLastFailedClusteringJob != null) { + config.retryLastFailedClusteringJob = retryLastFailedClusteringJob; + } return config; } @@ -933,6 +941,52 @@ public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetada }); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob) throws Exception { + String tableBasePath = dfsBasePath + "/asyncClustering3"; + + // ingest data + int totalRecords = 3000; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = false; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", "0", "false", "0")); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + + // assert ingest successful + TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs); + + // schedule a clustering job to build a clustering plan + HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, null, false, "schedule"); + schedule.cluster(0); + + // do another ingestion + HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc); + ds2.sync(); + + // convert clustering request into inflight, Simulate the last clustering failed scenario + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); + List hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList()); + HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0); + HoodieInstant hoodieInflightInstant = meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty()); + + // trigger a scheduleAndExecute clustering job + // when retryFailedClustering true => will rollback and re-execute failed clustering plan with same instant timestamp. + // when retryFailedClustering false => will make and execute a new clustering plan with new instant timestamp. + HoodieClusteringJob scheduleAndExecute = initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", retryLastFailedClusteringJob); + scheduleAndExecute.cluster(0); + + String completeClusteringTimeStamp = meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp(); + + if (retryLastFailedClusteringJob) { + assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp); + } else { + assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp)); + } + } + @ParameterizedTest @ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"}) public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception {