Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,23 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;

import org.apache.hudi.table.HoodieSparkTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.jetbrains.annotations.TestOnly;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -92,6 +98,10 @@ public static class Config implements Serializable {
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead")
public Boolean runSchedule = false;

@Parameter(names = {"--retry-last-failed-clustering-job", "-rc"}, description = "Take effect when using --mode/-m scheduleAndExecute. Set true means "
+ "check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.", required = false)
public Boolean retryLastFailedClusteringJob = false;

@Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a cluster plan; "
+ "Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; "
+ "Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately", required = false)
Expand All @@ -100,6 +110,10 @@ public static class Config implements Serializable {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

@Parameter(names = {"--job-max-processing-time-ms", "-jt"}, description = "Take effect when using --mode/-m scheduleAndExecute and --retry-last-failed-clustering-job/-rc true. "
+ "If maxProcessingTimeMs passed but clustering job is still unfinished, hoodie would consider this job as failed and relaunch.", required = false)
public long maxProcessingTimeMs = 0;

@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client for clustering")
public String propsFilePath = null;
Expand Down Expand Up @@ -216,17 +230,32 @@ private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> clien
return client.scheduleClustering(Option.empty());
}

public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
LOG.info("Step 1: Do schedule");
String schemaStr = getSchemaFromLatestInstant();
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
Option<String> instantTime = Option.empty();

if (cfg.retryLastFailedClusteringJob) {
HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext());
HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights();
if (!inflightHoodieTimeline.empty()) {
HoodieInstant inflightClusteringInstant = inflightHoodieTimeline.lastInstant().get();
Date clusteringStartTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(inflightClusteringInstant.getTimestamp());
if (clusteringStartTime.getTime() + cfg.maxProcessingTimeMs < System.currentTimeMillis()) {
// if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering.
LOG.info("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again.");
instantTime = Option.of(inflightHoodieTimeline.lastInstant().get().getTimestamp());
} else {
LOG.info(inflightClusteringInstant + " might still be in progress, will trigger a new clustering job.");
}
}
}

Option<String> instantTime = doSchedule(client);
int result = instantTime.isPresent() ? 0 : -1;

if (result == -1) {
instantTime = instantTime.isPresent() ? instantTime : doSchedule(client);
if (!instantTime.isPresent()) {
LOG.info("Couldn't generate cluster plan");
return result;
return -1;
}

LOG.info("The schedule instant time is " + instantTime.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,13 @@ protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, i
return new HoodieDeltaStreamer(cfg, jsc);
}

protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) {
protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute) {
return initialHoodieClusteringJob(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, null);
}

protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob) {
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
clusteringInstantTime, runSchedule, scheduleAndExecute);
clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
}

Expand Down Expand Up @@ -844,20 +848,24 @@ private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean,

private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
String clusteringInstantTime,
boolean runSchedule) {
return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null);
Boolean runSchedule) {
return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null, null);
}

private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
String clusteringInstantTime,
boolean runSchedule,
String runningMode) {
Boolean runSchedule,
String runningMode,
Boolean retryLastFailedClusteringJob) {
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
config.basePath = basePath;
config.clusteringInstantTime = clusteringInstantTime;
config.runSchedule = runSchedule;
config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
config.runningMode = runningMode;
if (retryLastFailedClusteringJob != null) {
config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
}
return config;
}

Expand Down Expand Up @@ -933,6 +941,52 @@ public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetada
});
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering3";

// ingest data
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = false;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", "0", "false", "0"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();

// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);

// schedule a clustering job to build a clustering plan
HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
schedule.cluster(0);

// do another ingestion
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
ds2.sync();

// convert clustering request into inflight, Simulate the last clustering failed scenario
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
List<HoodieInstant> hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
HoodieInstant hoodieInflightInstant = meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty());

// trigger a scheduleAndExecute clustering job
// when retryFailedClustering true => will rollback and re-execute failed clustering plan with same instant timestamp.
// when retryFailedClustering false => will make and execute a new clustering plan with new instant timestamp.
HoodieClusteringJob scheduleAndExecute = initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", retryLastFailedClusteringJob);
scheduleAndExecute.cluster(0);

String completeClusteringTimeStamp = meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();

if (retryLastFailedClusteringJob) {
assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp);
} else {
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
}
}

@ParameterizedTest
@ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"})
public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception {
Expand Down