From 38433f8184b92c7134e0dbd0fdffa8d399bd93a6 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 12 Jan 2022 14:07:00 +0530 Subject: [PATCH] [HUDI-2943] Complete pending clustering before deltastreamer sync Add unit test Fix the test and address review comments Remove unused import Rebase and fix test --- .../utilities/deltastreamer/DeltaSync.java | 22 +++++++++++++ .../deltastreamer/HoodieDeltaStreamer.java | 3 ++ .../functional/TestHoodieDeltaStreamer.java | 32 +++++++++++++++++++ 3 files changed, 57 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c7b29c9f0f520..5c2b69293c383 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -190,6 +190,9 @@ public class DeltaSync implements Serializable { */ private transient Option commitTimelineOpt; + // all commits timeline + private transient Option allCommitsTimelineOpt; + /** * Tracks whether new schema is being seen and creates client accordingly. */ @@ -245,15 +248,18 @@ public void refreshTimeline() throws IOException { switch (meta.getTableType()) { case COPY_ON_WRITE: this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); + this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline()); break; case MERGE_ON_READ: this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()); + this.allCommitsTimelineOpt = Option.of(meta.getActiveTimeline().getAllCommitsTimeline()); break; default: throw new HoodieException("Unsupported table type :" + meta.getTableType()); } } else { this.commitTimelineOpt = Option.empty(); + this.allCommitsTimelineOpt = Option.empty(); String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props); HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) @@ -306,6 +312,14 @@ public Pair, JavaRDD> syncOnce() throws IOException } } + // complete the pending clustering before writing to sink + if (cfg.retryLastPendingInlineClusteringJob && getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) { + Option pendingClusteringInstant = getLastPendingClusteringInstant(allCommitsTimelineOpt); + if (pendingClusteringInstant.isPresent()) { + writeClient.cluster(pendingClusteringInstant.get(), true); + } + } + result = writeToSink(srcRecordsWithCkpt.getRight().getRight(), srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext); } @@ -317,6 +331,14 @@ public Pair, JavaRDD> syncOnce() throws IOException return result; } + private Option getLastPendingClusteringInstant(Option commitTimelineOpt) { + if (commitTimelineOpt.isPresent()) { + Option pendingClusteringInstant = commitTimelineOpt.get().filterPendingReplaceTimeline().lastInstant(); + return pendingClusteringInstant.isPresent() ? Option.of(pendingClusteringInstant.get().getTimestamp()) : Option.empty(); + } + return Option.empty(); + } + /** * Read from Upstream Source and apply transformation if needed. * diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 3ceb0028751a2..3e4f00930ae6d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -372,6 +372,9 @@ public static class Config implements Serializable { @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; + @Parameter(names = {"--retry-last-pending-inline-clustering", "-rc"}, description = "Retry last pending inline clustering plan before writing to sink.") + public Boolean retryLastPendingInlineClusteringJob = false; + public boolean isAsyncCompactionEnabled() { return continuousMode && !forceDisableCompaction && HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 1874991888cbf..d63bce6584d90 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -754,6 +754,38 @@ public void testInlineClustering(String preserveCommitMetadata) throws Exception }); } + @Test + public void testDeltaSyncWithPendingClustering() throws Exception { + String tableBasePath = dfsBasePath + "/inlineClusteringPending"; + // ingest data + int totalRecords = 2000; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = false; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + // assert ingest successful + TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs); + + // schedule a clustering job to build a clustering plan and transition to inflight + HoodieClusteringJob clusteringJob = initialHoodieClusteringJob(tableBasePath, null, false, "schedule"); + clusteringJob.cluster(0); + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build(); + List hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList()); + HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0); + meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty()); + + // do another ingestion with inline clustering enabled + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); + cfg.retryLastPendingInlineClusteringJob = true; + HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc); + ds2.sync(); + String completeClusteringTimeStamp = meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp(); + assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp); + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws Exception {