Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ public class DeltaSync implements Serializable {
*/
private transient Option<HoodieTimeline> commitTimelineOpt;

// all commits timeline
private transient Option<HoodieTimeline> allCommitsTimelineOpt;

/**
* Tracks whether new schema is being seen and creates client accordingly.
*/
Expand Down Expand Up @@ -245,15 +248,18 @@ public void refreshTimeline() throws IOException {
switch (meta.getTableType()) {
case COPY_ON_WRITE:
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
break;
case MERGE_ON_READ:
this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants());
this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline());
break;
default:
throw new HoodieException("Unsupported table type :" + meta.getTableType());
}
} else {
this.commitTimelineOpt = Option.empty();
this.allCommitsTimelineOpt = Option.empty();
String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
Expand Down Expand Up @@ -306,6 +312,14 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
}
}

// complete the pending clustering before writing to sink
if (cfg.retryLastPendingInlineClusteringJob && getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) {
Option<String> pendingClusteringInstant = getLastPendingClusteringInstant(allCommitsTimelineOpt);
if (pendingClusteringInstant.isPresent()) {
writeClient.cluster(pendingClusteringInstant.get(), true);
}
}

result = writeToSink(srcRecordsWithCkpt.getRight().getRight(),
srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext);
}
Expand All @@ -317,6 +331,14 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
return result;
}

private Option<String> getLastPendingClusteringInstant(Option<HoodieTimeline> commitTimelineOpt) {
if (commitTimelineOpt.isPresent()) {
Option<HoodieInstant> pendingClusteringInstant = commitTimelineOpt.get().filterPendingReplaceTimeline().lastInstant();
return pendingClusteringInstant.isPresent() ? Option.of(pendingClusteringInstant.get().getTimestamp()) : Option.empty();
}
return Option.empty();
}

/**
* Read from Upstream Source and apply transformation if needed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ public static class Config implements Serializable {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

@Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"}, description = "Retry last pending inline clustering plan before writing to sink.")
public Boolean retryLastPendingInlineClusteringJob = false;

public boolean isAsyncCompactionEnabled() {
return continuousMode && !forceDisableCompaction
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,38 @@ public void testInlineClustering(String preserveCommitMetadata) throws Exception
});
}

@Test
public void testDeltaSyncWithPendingClustering() throws Exception {
String tableBasePath = dfsBasePath + "/inlineClusteringPending";
// ingest data
int totalRecords = 2000;
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = false;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);

// schedule a clustering job to build a clustering plan and transition to inflight
HoodieClusteringJob clusteringJob = initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
clusteringJob.cluster(0);
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
List<HoodieInstant> hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty());

// do another ingestion with inline clustering enabled
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
cfg.retryLastPendingInlineClusteringJob = true;
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
ds2.sync();
String completeClusteringTimeStamp = meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp);
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception {
Expand Down