diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index 5cdb6fc193398..fcc3274031b4d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; @@ -68,21 +69,30 @@ public static Stream> getAllPendingClu .filter(Option::isPresent).map(Option::get); } - public static Option> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant requestedReplaceInstant) { + public static Option> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) { try { - Option content = metaClient.getActiveTimeline().getInstantDetails(requestedReplaceInstant); + final HoodieInstant requestedInstant; + if (!pendingReplaceInstant.isRequested()) { + // inflight replacecommit files don't have clustering plan. + // This is because replacecommit inflight can have workload profile for 'insert_overwrite'. + // Get the plan from corresponding requested instant. + requestedInstant = HoodieTimeline.getReplaceCommitRequestedInstant(pendingReplaceInstant.getTimestamp()); + } else { + requestedInstant = pendingReplaceInstant; + } + Option content = metaClient.getActiveTimeline().getInstantDetails(requestedInstant); if (!content.isPresent() || content.get().length == 0) { // few operations create requested file without any content. Assume these are not clustering - LOG.warn("No content found in requested file for instant " + requestedReplaceInstant); + LOG.warn("No content found in requested file for instant " + pendingReplaceInstant); return Option.empty(); } HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get()); if (WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType())) { - return Option.of(Pair.of(requestedReplaceInstant, requestedReplaceMetadata.getClusteringPlan())); + return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.getClusteringPlan())); } return Option.empty(); } catch (IOException e) { - throw new HoodieIOException("Error reading clustering plan " + requestedReplaceInstant.getTimestamp(), e); + throw new HoodieIOException("Error reading clustering plan " + pendingReplaceInstant.getTimestamp(), e); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index e4933cf4e983b..5400dc40110af 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.HoodieCleanStat; @@ -35,6 +36,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; @@ -857,11 +859,20 @@ private List addInstant(HoodieTableMetaClient metaClient, String instant private List addReplaceInstant(HoodieTableMetaClient metaClient, String instant, List> writeStats, Map> partitionToReplaceFileIds) throws IOException { + // created requested + HoodieInstant newRequestedInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instant); + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setOperationType(WriteOperationType.UNKNOWN.name()).build(); + metaClient.getActiveTimeline().saveToPendingReplaceCommit(newRequestedInstant, + TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)); + + metaClient.reloadActiveTimeline(); + // transition to inflight + HoodieInstant inflightInstant = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(newRequestedInstant, Option.empty()); + // transition to replacecommit HoodieReplaceCommitMetadata replaceCommitMetadata = new HoodieReplaceCommitMetadata(); writeStats.forEach(e -> replaceCommitMetadata.addWriteStat(e.getKey(), e.getValue())); replaceCommitMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds); - HoodieInstant inflightInstant = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, instant); - metaClient.getActiveTimeline().createNewInstant(inflightInstant); metaClient.getActiveTimeline().saveAsComplete(inflightInstant, Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); return writeStats.stream().map(e -> e.getValue().getPath()).collect(Collectors.toList()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java index 5d82bbce734ca..54ca072651e07 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java @@ -98,6 +98,22 @@ public void testClusteringPlanMultipleInstants() throws Exception { validateClusteringInstant(fileIds3, partitionPath1, clusterTime, fileGroupToInstantMap); } + // replacecommit.inflight doesnt have clustering plan. + // Verify that getClusteringPlan fetches content from corresponding requested file. + @Test + public void testClusteringPlanInflight() throws Exception { + String partitionPath1 = "partition1"; + List fileIds1 = new ArrayList<>(); + fileIds1.add(UUID.randomUUID().toString()); + fileIds1.add(UUID.randomUUID().toString()); + String clusterTime1 = "1"; + HoodieInstant requestedInstant = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1); + HoodieInstant inflightInstant = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant, Option.empty()); + HoodieClusteringPlan requestedClusteringPlan = ClusteringUtils.getClusteringPlan(metaClient, requestedInstant).get().getRight(); + HoodieClusteringPlan inflightClusteringPlan = ClusteringUtils.getClusteringPlan(metaClient, inflightInstant).get().getRight(); + assertEquals(requestedClusteringPlan, inflightClusteringPlan); + } + private void validateClusteringInstant(List fileIds, String partitionPath, String expectedInstantTime, Map fileGroupToInstantMap) { for (String fileId : fileIds) {