From a6cf2db7c0a449293ac7b50894055b3abb010cde Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 31 Aug 2022 21:35:12 -0700 Subject: [PATCH 1/7] Fixing repeated trigger of clustering execution --- .../hudi/table/action/commit/BaseCommitActionExecutor.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 31c8bbd6d30d2..11512465f1289 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -70,6 +70,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE; + public abstract class BaseCommitActionExecutor extends BaseActionExecutor { @@ -166,7 +168,7 @@ protected void runPrecommitValidators(HoodieWriteMetadata writeMetadata) { } throw new HoodieIOException("Precommit validation not implemented for all engines yet"); } - + protected void commitOnAutoCommit(HoodieWriteMetadata result) { // validate commit action before committing result runPrecommitValidators(result); @@ -247,8 +249,10 @@ protected HoodieWriteMetadata> executeClustering(HoodieC .performClustering(clusteringPlan, schema, instantTime); HoodieData writeStatusList = writeMetadata.getWriteStatuses(); HoodieData statuses = updateIndex(writeStatusList, writeMetadata); + statuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE)); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList()); writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata)); + // if we don't cache the write statuses above, validation will call isEmpty which might retrigger the execution again. validateWriteResult(clusteringPlan, writeMetadata); commitOnAutoCommit(writeMetadata); if (!writeMetadata.getCommitMetadata().isPresent()) { From cb99645fd5d1ebbdd95dfb2503367b8f81eeb96b Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sat, 3 Sep 2022 12:19:01 -0700 Subject: [PATCH 2/7] Fixing proper validation --- .../apache/hudi/client/SparkRDDWriteClient.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a142fd80d4bf8..0569689685d01 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.TransactionUtils; @@ -37,7 +39,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; @@ -356,6 +360,16 @@ public HoodieWriteMetadata> cluster(String clusteringInstan LOG.info("Starting clustering at " + clusteringInstant); HoodieWriteMetadata> writeMetadata = table.cluster(context, clusteringInstant); HoodieWriteMetadata> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); + if (clusteringMetadata.getWriteStatuses().isEmpty()) { + HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant)) + .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException( + "Unable to read clustering plan for instant: " + clusteringInstant)); + throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringInstant + + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + + " write statuses"); + } // TODO : Where is shouldComplete used ? if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant); From 402f9171361b89e8a0483ea9afd4f427d495efb8 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sat, 3 Sep 2022 14:11:17 -0700 Subject: [PATCH 3/7] adding tests --- .../commit/BaseCommitActionExecutor.java | 4 -- .../TestHoodieClientOnCopyOnWriteStorage.java | 45 ++++++++++++++++++- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 11512465f1289..1472e6a591e15 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -70,8 +70,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE; - public abstract class BaseCommitActionExecutor extends BaseActionExecutor { @@ -249,11 +247,9 @@ protected HoodieWriteMetadata> executeClustering(HoodieC .performClustering(clusteringPlan, schema, instantTime); HoodieData writeStatusList = writeMetadata.getWriteStatuses(); HoodieData statuses = updateIndex(writeStatusList, writeMetadata); - statuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE)); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList()); writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata)); // if we don't cache the write statuses above, validation will call isEmpty which might retrigger the execution again. - validateWriteResult(clusteringPlan, writeMetadata); commitOnAutoCommit(writeMetadata); if (!writeMetadata.getCommitMetadata().isPresent()) { HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 3f9bda49e8ffe..8023777f69ea5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -45,6 +45,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.model.WriteOperationType; @@ -106,6 +107,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -1456,6 +1458,41 @@ public void testSimpleClustering(boolean populateMetaFields, boolean preserveCom testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } + @Test + public void testAndValidateClusteringOutputFiles() throws IOException { + String partitionPath = "2015/03/16"; + testInsertTwoBatches(true, partitionPath); + + // Trigger clustering + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false) + .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2).build()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build())) { + int numRecords = 200; + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + List records1 = dataGen.generateInserts(newCommitTime, numRecords); + client.startCommitWithTime(newCommitTime); + JavaRDD insertRecordsRDD1 = jsc.parallelize(records1, 2); + JavaRDD statuses = client.insert(insertRecordsRDD1, newCommitTime); + client.commit(newCommitTime, statuses); + List statusList = statuses.collect(); + assertNoWriteErrors(statusList); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieInstant replaceCommitInstant = metaClient.getActiveTimeline().getCompletedReplaceTimeline().firstInstant().get(); + HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get(), HoodieReplaceCommitMetadata.class); + + List filesFromReplaceCommit = new ArrayList<>(); + replaceCommitMetadata.getPartitionToWriteStats().forEach((k,v) -> v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath()))); + + // find all parquet files created as part of clustering. Verify it matces w/ whats found in replace commit metadata. + FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" + partitionPath)); + List clusteredFiles = Arrays.stream(fileStatuses).filter(entry -> entry.getPath().getName().contains(replaceCommitInstant.getTimestamp())) + .map(fileStatus -> partitionPath + "/" + fileStatus.getPath().getName()).collect(Collectors.toList()); + assertEquals(clusteredFiles, filesFromReplaceCommit); + } + } + @Test public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception { HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) @@ -1707,18 +1744,22 @@ private List testInsertAndClustering(HoodieClusteringConfig cluste return allRecords.getLeft().getLeft(); } + private Pair, List>, Set> testInsertTwoBatches(boolean populateMetaFields) throws IOException { + return testInsertTwoBatches(populateMetaFields, "2015/03/16"); + } + /** * This method returns following three items: * 1. List of all HoodieRecord written in the two batches of insert. * 2. Commit instants of the two batches. * 3. List of new file group ids that were written in the two batches. */ - private Pair, List>, Set> testInsertTwoBatches(boolean populateMetaFields) throws IOException { + private Pair, List>, Set> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); SparkRDDWriteClient client = getHoodieWriteClient(config); - dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"}); + dataGen = new HoodieTestDataGenerator(new String[] {partitionPath}); String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); List records1 = dataGen.generateInserts(commitTime1, 200); List statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields); From 7163ee63648dc1e4957e71a73452d652e6ee9235 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 16 Sep 2022 00:40:29 -0700 Subject: [PATCH 4/7] addressing comments --- .../hudi/client/SparkRDDWriteClient.java | 24 +++++++++++-------- .../TestHoodieClientOnCopyOnWriteStorage.java | 4 +++- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 0569689685d01..d599a97ffb8ca 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -360,16 +360,7 @@ public HoodieWriteMetadata> cluster(String clusteringInstan LOG.info("Starting clustering at " + clusteringInstant); HoodieWriteMetadata> writeMetadata = table.cluster(context, clusteringInstant); HoodieWriteMetadata> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); - if (clusteringMetadata.getWriteStatuses().isEmpty()) { - HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan( - table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant)) - .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException( - "Unable to read clustering plan for instant: " + clusteringInstant)); - throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringInstant - + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " - + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() - + " write statuses"); - } + validateClusteringCommit(clusteringMetadata, clusteringInstant, table); // TODO : Where is shouldComplete used ? if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant); @@ -417,6 +408,19 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, LOG.info("Clustering successfully on commit " + clusteringCommitTime); } + private void validateClusteringCommit(HoodieWriteMetadata> clusteringMetadata, String clusteringCommitTime, HoodieTable table) { + if (clusteringMetadata.getWriteStatuses().isEmpty()) { + HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime)) + .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException( + "Unable to read clustering plan for instant: " + clusteringCommitTime)); + throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime + + " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least " + + clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum() + + " write statuses"); + } + } + private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, HoodieInstant hoodieInstant) { boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 8023777f69ea5..944179bae51ab 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1483,7 +1483,9 @@ public void testAndValidateClusteringOutputFiles() throws IOException { .fromBytes(metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get(), HoodieReplaceCommitMetadata.class); List filesFromReplaceCommit = new ArrayList<>(); - replaceCommitMetadata.getPartitionToWriteStats().forEach((k,v) -> v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath()))); + replaceCommitMetadata.getPartitionToWriteStats(). + forEach((k,v) -> + v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath()))); // find all parquet files created as part of clustering. Verify it matces w/ whats found in replace commit metadata. FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" + partitionPath)); From b4152f34dd521107f7a3ed82788a693fb3983cc9 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 21 Sep 2022 23:07:37 -0700 Subject: [PATCH 5/7] removing unncessary comments --- .../hudi/table/action/commit/BaseCommitActionExecutor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 1472e6a591e15..9ca6e495e06d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -249,7 +249,6 @@ protected HoodieWriteMetadata> executeClustering(HoodieC HoodieData statuses = updateIndex(writeStatusList, writeMetadata); writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList()); writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata)); - // if we don't cache the write statuses above, validation will call isEmpty which might retrigger the execution again. commitOnAutoCommit(writeMetadata); if (!writeMetadata.getCommitMetadata().isPresent()) { HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(), From cad92079d20c6725f5e563ef7da0157e1491194a Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 22 Sep 2022 15:56:55 -0700 Subject: [PATCH 6/7] fixing build fialure --- .../functional/TestHoodieClientOnCopyOnWriteStorage.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 944179bae51ab..d1b1b117c213f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1483,9 +1483,8 @@ public void testAndValidateClusteringOutputFiles() throws IOException { .fromBytes(metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get(), HoodieReplaceCommitMetadata.class); List filesFromReplaceCommit = new ArrayList<>(); - replaceCommitMetadata.getPartitionToWriteStats(). - forEach((k,v) -> - v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath()))); + replaceCommitMetadata.getPartitionToWriteStats() + .forEach((k,v) -> v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath()))); // find all parquet files created as part of clustering. Verify it matces w/ whats found in replace commit metadata. FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" + partitionPath)); From f9810e78fbc61c0dce902a74e426609c74788708 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 23 Sep 2022 16:27:08 -0700 Subject: [PATCH 7/7] adding docs --- .../main/java/org/apache/hudi/client/SparkRDDWriteClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index d599a97ffb8ca..5dfbdadca8409 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -360,6 +360,7 @@ public HoodieWriteMetadata> cluster(String clusteringInstan LOG.info("Starting clustering at " + clusteringInstant); HoodieWriteMetadata> writeMetadata = table.cluster(context, clusteringInstant); HoodieWriteMetadata> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); + // Validation has to be done after cloning. if not, it could result in dereferencing the write status twice which means clustering could get executed twice. validateClusteringCommit(clusteringMetadata, clusteringInstant, table); // TODO : Where is shouldComplete used ? if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {