diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 31c8bbd6d30d2..9ca6e495e06d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -166,7 +166,7 @@ protected void runPrecommitValidators(HoodieWriteMetadata writeMetadata) { } throw new HoodieIOException("Precommit validation not implemented for all engines yet"); } - + protected void commitOnAutoCommit(HoodieWriteMetadata result) { // validate commit action before committing result runPrecommitValidators(result); @@ -249,7 +249,6 @@ protected HoodieWriteMetadata> executeClustering(HoodieC HoodieData statuses = updateIndex(writeStatusList, writeMetadata); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList()); writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata)); - validateWriteResult(clusteringPlan, writeMetadata); commitOnAutoCommit(writeMetadata); if (!writeMetadata.getCommitMetadata().isPresent()) { HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a142fd80d4bf8..5dfbdadca8409 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.TransactionUtils; @@ -37,7 +39,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; @@ -356,6 +360,8 @@ public HoodieWriteMetadata> cluster(String clusteringInstan LOG.info("Starting clustering at " + clusteringInstant); HoodieWriteMetadata> writeMetadata = table.cluster(context, clusteringInstant); HoodieWriteMetadata> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); + // Validation has to be done after cloning. if not, it could result in dereferencing the write status twice which means clustering could get executed twice. + validateClusteringCommit(clusteringMetadata, clusteringInstant, table); // TODO : Where is shouldComplete used ? if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant); @@ -403,6 +409,19 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, LOG.info("Clustering successfully on commit " + clusteringCommitTime); } + private void validateClusteringCommit(HoodieWriteMetadata> clusteringMetadata, String clusteringCommitTime, HoodieTable table) { + if (clusteringMetadata.getWriteStatuses().isEmpty()) { + HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime)) + .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException( + "Unable to read clustering plan for instant: " + clusteringCommitTime)); + throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime + + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + + " write statuses"); + } + } + private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, HoodieInstant hoodieInstant) { boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 3f9bda49e8ffe..d1b1b117c213f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -45,6 +45,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.model.WriteOperationType; @@ -106,6 +107,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -1456,6 +1458,42 @@ public void testSimpleClustering(boolean populateMetaFields, boolean preserveCom testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } + @Test + public void testAndValidateClusteringOutputFiles() throws IOException { + String partitionPath = "2015/03/16"; + testInsertTwoBatches(true, partitionPath); + + // Trigger clustering + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false) + .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2).build()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build())) { + int numRecords = 200; + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + List records1 = dataGen.generateInserts(newCommitTime, numRecords); + client.startCommitWithTime(newCommitTime); + JavaRDD insertRecordsRDD1 = jsc.parallelize(records1, 2); + JavaRDD statuses = client.insert(insertRecordsRDD1, newCommitTime); + client.commit(newCommitTime, statuses); + List statusList = statuses.collect(); + assertNoWriteErrors(statusList); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieInstant replaceCommitInstant = metaClient.getActiveTimeline().getCompletedReplaceTimeline().firstInstant().get(); + HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get(), HoodieReplaceCommitMetadata.class); + + List filesFromReplaceCommit = new ArrayList<>(); + replaceCommitMetadata.getPartitionToWriteStats() + .forEach((k,v) -> v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath()))); + + // find all parquet files created as part of clustering. Verify it matces w/ whats found in replace commit metadata. + FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" + partitionPath)); + List clusteredFiles = Arrays.stream(fileStatuses).filter(entry -> entry.getPath().getName().contains(replaceCommitInstant.getTimestamp())) + .map(fileStatus -> partitionPath + "/" + fileStatus.getPath().getName()).collect(Collectors.toList()); + assertEquals(clusteredFiles, filesFromReplaceCommit); + } + } + @Test public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception { HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) @@ -1707,18 +1745,22 @@ private List testInsertAndClustering(HoodieClusteringConfig cluste return allRecords.getLeft().getLeft(); } + private Pair, List>, Set> testInsertTwoBatches(boolean populateMetaFields) throws IOException { + return testInsertTwoBatches(populateMetaFields, "2015/03/16"); + } + /** * This method returns following three items: * 1. List of all HoodieRecord written in the two batches of insert. * 2. Commit instants of the two batches. * 3. List of new file group ids that were written in the two batches. */ - private Pair, List>, Set> testInsertTwoBatches(boolean populateMetaFields) throws IOException { + private Pair, List>, Set> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); SparkRDDWriteClient client = getHoodieWriteClient(config); - dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"}); + dataGen = new HoodieTestDataGenerator(new String[] {partitionPath}); String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); List records1 = dataGen.generateInserts(commitTime1, 200); List statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields);