diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 7dd32e2916ec..6edeac05a748 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -32,9 +32,13 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -46,7 +50,7 @@ * Flink hoodie backed table metadata writer. */ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter { - + private static final Logger LOG = LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class); private transient BaseHoodieWriteClient writeClient; public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, @@ -118,33 +122,31 @@ private void doCommit(String instantTime, Map alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant(); - if (alreadyCompletedInstant.isPresent()) { - // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. - // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. - // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes - // are upserts to metadata table and so only a new delta commit will be created. - // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is - // already part of completed commit. So, we have to manually remove the completed instant and proceed. - // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. - HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get()); - metadataMetaClient.reloadActiveTimeline(); + // this code path refers to a re-attempted commit that: + // 1. got committed to metadata table, but failed in datatable. + // 2. failed while committing to metadata table + // for e.g., let's say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. + // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes + // are upserts to metadata table and so only a new delta commit will be created. + // once rollback is complete in datatable, compaction will be retried again, which will eventually hit this code block where the respective commit is + // already part of completed commit. So, we have to manually rollback the completed instant and proceed. + Option alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)) + .lastInstant(); + LOG.info(String.format("%s completed commit at %s being applied to MDT.", + alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime)); + + // Rollback the previous commit + if (!writeClient.rollback(instantTime)) { + throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT"); } - // If the alreadyCompletedInstant is empty, that means there is a requested or inflight - // instant with the same instant time. This happens for data table clean action which - // reuses the same instant time without rollback first. It is a no-op here as the - // clean plan is the same, so we don't need to delete the requested and inflight instant - // files in the active timeline. - - // The metadata writer uses LAZY cleaning strategy without auto commit, - // write client then checks the heartbeat expiration when committing the instant, - // sets up the heartbeat explicitly to make the check pass. - writeClient.getHeartbeatClient().start(instantTime); + metadataMetaClient.reloadActiveTimeline(); } + writeClient.startCommitWithTime(instantTime); + metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); + List statuses = isInitializing ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, Option.empty()) : writeClient.upsertPreppedRecords(preppedRecordList, instantTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index eab0c436248a..9a254409b8d7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -29,13 +29,13 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.table.BulkInsertPartitioner; @@ -144,27 +144,29 @@ private void commitInternal(String instantTime, Map alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant(); - if (alreadyCompletedInstant.isPresent()) { - // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. - // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. - // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes - // are upserts to metadata table and so only a new delta commit will be created. - // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is - // already part of completed commit. So, we have to manually remove the completed instant and proceed. - // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. - HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get()); - metadataMetaClient.reloadActiveTimeline(); + // this code path refers to a re-attempted commit that: + // 1. got committed to metadata table, but failed in datatable. + // 2. failed while committing to metadata table + // for e.g., let's say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. + // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes + // are upserts to metadata table and so only a new delta commit will be created. + // once rollback is complete in datatable, compaction will be retried again, which will eventually hit this code block where the respective commit is + // already part of completed commit. So, we have to manually rollback the completed instant and proceed. + Option alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)) + .lastInstant(); + LOG.info(String.format("%s completed commit at %s being applied to MDT.", + alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime)); + + // Rollback the previous commit + if (!writeClient.rollback(instantTime)) { + throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT"); } - // If the alreadyCompletedInstant is empty, that means there is a requested or inflight - // instant with the same instant time. This happens for data table clean action which - // reuses the same instant time without rollback first. It is a no-op here as the - // clean plan is the same, so we don't need to delete the requested and inflight instant - // files in the active timeline. + metadataMetaClient.reloadActiveTimeline(); } + writeClient.startCommitWithTime(instantTime); if (bulkInsertPartitioner.isPresent()) { writeClient.bulkInsertPreppedRecords(preppedRecordRDD, instantTime, bulkInsertPartitioner).collect(); } else { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index b50c002468f5..84417fce9582 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -19,6 +19,7 @@ package org.apache.hudi.client.functional; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.client.SparkRDDWriteClient; @@ -2029,7 +2030,9 @@ public void testReattemptOfFailedClusteringCommit() throws Exception { HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER) .withAutoCommit(false) - .withClusteringConfig(clusteringConfig).build(); + .withClusteringConfig(clusteringConfig) + .withRollbackUsingMarkers(false) + .build(); // trigger clustering SparkRDDWriteClient newClient = getHoodieWriteClient(newWriteConfig); @@ -2821,6 +2824,69 @@ public void testDuplicatesDuringRecordIndexBootstrap() throws Exception { } } + /** + * Test duplicate operation with same instant timestamp. + * + * This can happen if the commit on the MDT succeeds but fails on the dataset. For some table services like clean, + * compaction, replace commit, the operation will be retried with the same timestamp (taken from inflight). Hence, + * metadata table will see an additional commit with the same timestamp as a previously completed deltacommit. + */ + @Test + public void testRepeatedActionWithSameInstantTime() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + Properties props = new Properties(); + props.put(HoodieCleanConfig.ALLOW_MULTIPLE_CLEANS.key(), "false"); + HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false).withProps(props).build(); + + // Perform three writes so we can initiate a clean + int index = 0; + final String partition = "2015/03/16"; + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + for (; index < 3; ++index) { + String newCommitTime = "00" + index; + List records = index == 0 ? dataGen.generateInsertsForPartition(newCommitTime, 10, partition) + : dataGen.generateUniqueUpdates(newCommitTime, 5); + client.startCommitWithTime(newCommitTime); + client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + } + } + assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { + // Perform a clean + String cleanInstantTime = "00" + index++; + HoodieCleanMetadata cleanMetadata = client.clean(cleanInstantTime); + // 1 partition should be cleaned + assertEquals(cleanMetadata.getPartitionMetadata().size(), 1); + // 1 file cleaned + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1); + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0); + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1); + + // To simulate failed clean on the main dataset, we will delete the completed clean instant + String cleanInstantFileName = HoodieTimeline.makeCleanerFileName(cleanInstantTime); + assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME, + cleanInstantFileName), false)); + assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflights().countInstants(), 1); + assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants().countInstants(), 0); + + // Initiate another clean. The previous leftover clean will be attempted and no other clean will be scheduled. + String newCleanInstantTime = "00" + index++; + cleanMetadata = client.clean(newCleanInstantTime); + + // 1 partition should be cleaned + assertEquals(cleanMetadata.getPartitionMetadata().size(), 1); + // 1 file cleaned but was already deleted so will be a failed delete + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 0); + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 1); + assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1); + + validateMetadata(client); + } + } + private void doPreBootstrapOperations(HoodieTestTable testTable) throws Exception { doPreBootstrapOperations(testTable, "0000001", "0000002"); }