diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index 266fa39cb986e..e6355526e5233 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -60,7 +60,7 @@ public List getRollbackRequests(HoodieInstant instantToRo List rollbackRequests = null; if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, - table.getMetaClient().getBasePath(), config); + table.getMetaClient().getBasePath()); } else { rollbackRequests = RollbackUtils .generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 6ad4e1c986fb5..fccb992ef11f4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -18,10 +18,6 @@ package org.apache.hudi.table.action.rollback; -import org.apache.hadoop.fs.FileStatus; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -42,6 +38,10 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; +import org.apache.hadoop.fs.FileStatus; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -56,7 +56,8 @@ public class RollbackUtils { /** * Get Latest version of Rollback plan corresponding to a clean instant. - * @param metaClient Hoodie Table Meta Client + * + * @param metaClient Hoodie Table Meta Client * @param rollbackInstant Instant referring to rollback action * @return Rollback plan corresponding to rollback instant * @throws IOException @@ -106,12 +107,10 @@ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll * Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type. * @param engineContext instance of {@link HoodieEngineContext} to use. * @param basePath base path of interest. - * @param config instance of {@link HoodieWriteConfig} to use. * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected. */ - public static List generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, - String basePath, HoodieWriteConfig config) { - return FSUtils.getAllPartitionPaths(engineContext, config.getMetadataConfig(), basePath).stream() + public static List generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String basePath) { + return FSUtils.getAllPartitionPaths(engineContext, basePath, false, false).stream() .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction) .collect(Collectors.toList()); } @@ -127,7 +126,7 @@ public static List generateRollbackRequestsByListin public static List generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException { String commit = instantToRollback.getTimestamp(); HoodieWriteConfig config = table.getConfig(); - List partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath()); + List partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false); if (partitions.isEmpty()) { return new ArrayList<>(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 18815b2e132da..1aebbf6b4c42d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -102,7 +102,7 @@ protected void recreateMarkers(final String commitInstantTime, // generate rollback stats List rollbackRequests; if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { - rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig()); + rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath()); } else { rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 49eddc24ef0d8..59e12a1515ad5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -82,7 +82,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -93,6 +92,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -307,9 +307,8 @@ public void testMetadataTableServices() throws Exception { /** * Test rollback of various table operations sync to Metadata Table correctly. */ - //@ParameterizedTest - //@EnumSource(HoodieTableType.class) - @Disabled + @ParameterizedTest + @EnumSource(HoodieTableType.class) public void testRollbackOperations(HoodieTableType tableType) throws Exception { init(tableType); doWriteInsertAndUpsert(testTable); @@ -1087,18 +1086,72 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime()); } + /** + * Tests rollback of a commit which has new partitions which is not present in hudi table prior to the commit being rolledback. + * + * @throws Exception + */ + @Test + public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Exception { + init(HoodieTableType.COPY_ON_WRITE); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(), + true)) { + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 10); + List upsertRecords = new ArrayList<>(); + for (HoodieRecord entry : records) { + if (entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) + || entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) { + upsertRecords.add(entry); + } + } + List writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + + newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(newCommitTime); + records = dataGen.generateInserts(newCommitTime, 20); + writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + + // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed + // instant so that only the inflight is left over. + String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime); + assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME, + commitInstantFileName), false)); + } + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(), + true)) { + String newCommitTime = client.startCommit(); + // Next insert + List records = dataGen.generateInserts(newCommitTime, 20); + List writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + } + } + /** * Test various error scenarios. */ - //@Test - @Disabled + @Test public void testErrorCases() throws Exception { init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); // TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table // should be rolled back to last valid commit. - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(), + true)) { String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 10); @@ -1111,6 +1164,7 @@ public void testErrorCases() throws Exception { records = dataGen.generateInserts(newCommitTime, 5); writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); + validateMetadata(client); // There is no way to simulate failed commit on the main dataset, hence we simply delete the completed // instant so that only the inflight is left over. @@ -1119,7 +1173,9 @@ public void testErrorCases() throws Exception { commitInstantFileName), false)); } - try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(), + true)) { String newCommitTime = client.startCommit(); // Next insert List records = dataGen.generateInserts(newCommitTime, 5); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 5e4c4ba86632a..a91099976700e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -87,7 +87,8 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); - writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics, enableFullScan).build(); + writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics, + enableFullScan, true).build(); initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable); } @@ -265,11 +266,11 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, bo protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { - return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true); + return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true); } protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, - boolean enableMetrics, boolean enableFullScan) { + boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers) { Properties properties = new Properties(); properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) @@ -292,6 +293,7 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea .withExecutorMetrics(true).build()) .withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() .usePrefix("unit-test").build()) + .withRollbackUsingMarkers(useRollbackUsingMarkers) .withProperties(properties); }