Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;

import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -91,6 +93,10 @@ public static class Config implements Serializable {
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead")
public Boolean runSchedule = false;

@Parameter(names = {"--retry-last-failed-clustering-job", "-rc"}, description = "Take effect when using --mode/-m scheduleAndExecute. Set true means "
+ "check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.", required = false)
public Boolean retryLastFailedClusteringJob = false;

@Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a cluster plan; "
+ "Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; "
+ "Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately", required = false)
Expand Down Expand Up @@ -215,12 +221,26 @@ private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> clien
return client.scheduleClustering(Option.empty());
}

public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
LOG.info("Step 1: Do schedule");
String schemaStr = getSchemaFromLatestInstant();
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
Option<String> instantTime;

if (cfg.retryLastFailedClusteringJob) {
HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext());
HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights();
if (inflightHoodieTimeline.empty()) {
instantTime = doSchedule(client);
} else {
// if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering.
LOG.info("Found failed clustering instant at : " + inflightHoodieTimeline.lastInstant().get() + "; Will rollback the failed clustering and re-trigger again.");
instantTime = Option.of(inflightHoodieTimeline.lastInstant().get().getTimestamp());
}
} else {
instantTime = doSchedule(client);
}

Option<String> instantTime = doSchedule(client);
int result = instantTime.isPresent() ? 0 : -1;

if (result == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,13 @@ protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, i
return new HoodieDeltaStreamer(cfg, jsc);
}

protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) {
protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute) {
return initialHoodieClusteringJob(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, null);
}

protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob) {
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
clusteringInstantTime, runSchedule, scheduleAndExecute);
clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
}

Expand Down Expand Up @@ -1088,20 +1092,24 @@ private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean,

private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
String clusteringInstantTime,
boolean runSchedule) {
return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null);
Boolean runSchedule) {
return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null, null);
}

private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
String clusteringInstantTime,
boolean runSchedule,
String runningMode) {
Boolean runSchedule,
String runningMode,
Boolean retryLastFailedClusteringJob) {
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
config.basePath = basePath;
config.clusteringInstantTime = clusteringInstantTime;
config.runSchedule = runSchedule;
config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
config.runningMode = runningMode;
if (retryLastFailedClusteringJob != null) {
config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
}
return config;
}

Expand Down Expand Up @@ -1176,6 +1184,52 @@ public void testAsyncClusteringServiceWithCompaction() throws Exception {
});
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering3";

// ingest data
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = false;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", "0", "false", "0"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();

// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);

// schedule a clustering job to build a clustering plan
HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
schedule.cluster(0);

// do another ingestion
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
ds2.sync();

// convert clustering request into inflight, Simulate the last clustering failed scenario
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
List<HoodieInstant> hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
HoodieInstant hoodieInflightInstant = meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty());

// trigger a scheduleAndExecute clustering job
// when retryFailedClustering true => will rollback and re-execute failed clustering plan with same instant timestamp.
// when retryFailedClustering false => will make and execute a new clustering plan with new instant timestamp.
HoodieClusteringJob scheduleAndExecute = initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", retryLastFailedClusteringJob);
scheduleAndExecute.cluster(0);

String completeClusteringTimeStamp = meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();

if (retryLastFailedClusteringJob) {
assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp);
} else {
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
}
}

@ParameterizedTest
@ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"})
public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception {
Expand Down