diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 37bcc1f30af48..adcf394864634 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -218,6 +218,8 @@ protected void commit(HoodieTable table, String commitActionType, String instant HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); // Finalize write finalizeWrite(table, instantTime, stats); + // update Metadata table + writeTableMetadata(table, instantTime, commitActionType, metadata); activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); } @@ -244,16 +246,24 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String } /** - * Any pre-commit actions like conflict resolution or updating metadata table goes here. + * Any pre-commit actions like conflict resolution goes here. * @param inflightInstant instant of inflight operation. * @param metadata commit metadata for which pre commit is being invoked. */ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { - // Create a Hoodie table after starting the transaction which encapsulated the commits and files visible. - // Important to create this after the lock to ensure latest commits show up in the timeline without need for reload - HoodieTable table = createTable(config, hadoopConf); - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(), - table.isTableServiceAction(inflightInstant.getAction()))); + // To be overridden by specific engines to perform conflict resolution if any. + } + + /** + * Write the HoodieCommitMetadata to metadata table if available. + * @param table {@link HoodieTable} of interest. + * @param instantTime instant time of the commit. + * @param actionType action type of the commit. + * @param metadata instance of {@link HoodieCommitMetadata}. + */ + protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { + table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime, + table.isTableServiceAction(actionType))); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index cb244ff6bad6f..1229f58d57182 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -258,10 +258,10 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp } @Override - protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { + protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { this.metadataWriterOption.ifPresent(w -> { w.initTableMetadata(); // refresh the timeline - w.update(metadata, inflightInstant.getTimestamp(), getHoodieTable().isTableServiceAction(inflightInstant.getAction())); + w.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); }); } @@ -362,9 +362,9 @@ public void completeCompaction( String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); - // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); + // commit to data table after committing to metadata table. + writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 501ae9304f218..81da1fbed0aa5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -45,7 +45,6 @@ import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; -import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.table.BulkInsertPartitioner; @@ -314,9 +313,9 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); - // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); + writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); + // commit to data table after committing to metadata table. LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) @@ -386,8 +385,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } - writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime)); finalizeWrite(table, clusteringCommitTime, writeStats); + writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime)); try { // try to save statistics info to hudi if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) { @@ -415,8 +414,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD>, JavaRDD, JavaRDD> table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { + private void writeTableMetadataForTableServices(HoodieTable>, JavaRDD, JavaRDD> table, HoodieCommitMetadata commitMetadata, + HoodieInstant hoodieInstant) { try { this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); @@ -497,8 +496,6 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met HoodieTable table = createTable(config, hadoopConf); TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(), Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner()); - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(), - table.isTableServiceAction(inflightInstant.getAction()))); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 005d031cb9df2..ce2de3d7dcec7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -96,6 +96,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -121,6 +122,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("functional") @@ -224,7 +226,7 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep @ParameterizedTest @MethodSource("bootstrapAndTableOperationTestArgs") public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception { - init(tableType, true, enableFullScan, false); + init(tableType, true, enableFullScan, false, false); doWriteInsertAndUpsert(testTable); // trigger an upsert @@ -482,7 +484,7 @@ private Long getNextCommitTime(long curCommitTime) { @ParameterizedTest @EnumSource(HoodieTableType.class) public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception { - init(tableType, true, true, true); + init(tableType, true, true, true, false); long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime()); for (int i = 1; i < 25; i += 7) { long commitTime1 = getNextCommitTime(baseCommitTime); @@ -541,6 +543,34 @@ public void testFirstCommitRollback(HoodieTableType tableType) throws Exception } } + /** + * Tests the metadata payload spurious deletes. + * Lets say a commit was applied to metadata table, and later was explicitly got rolledback. Due to spark task failures, there could be more files in rollback + * metadata when compared to the original commit metadata. When payload consistency check is enabled, it will throw exception. If not, it will succeed. + * @throws Exception + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMetadataPayloadSpuriousDeletes(boolean ignoreSpuriousDeletes) throws Exception { + tableType = COPY_ON_WRITE; + init(tableType, true, true, false, ignoreSpuriousDeletes); + doWriteInsertAndUpsert(testTable); + // trigger an upsert + doWriteOperationAndValidate(testTable, "0000003"); + + // trigger a commit and rollback + doWriteOperation(testTable, "0000004"); + // add extra files in rollback to check for payload consistency + Map> extraFiles = new HashMap<>(); + extraFiles.put("p1", Collections.singletonList("f10")); + extraFiles.put("p2", Collections.singletonList("f12")); + testTable.doRollbackWithExtraFiles("0000004", "0000005", extraFiles); + if (!ignoreSpuriousDeletes) { + assertThrows(HoodieMetadataException.class, () -> validateMetadata(testTable)); + } else { + validateMetadata(testTable); + } + } /** * Test several table operations with restore. This test uses SparkRDDWriteClient. @@ -1101,7 +1131,7 @@ public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Except HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(), + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(), true)) { String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); @@ -1132,7 +1162,7 @@ public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Except } try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(), + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(), true)) { String newCommitTime = client.startCommit(); // Next insert @@ -1154,7 +1184,7 @@ public void testErrorCases() throws Exception { // TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table // should be rolled back to last valid commit. try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(), + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(), true)) { String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); @@ -1178,7 +1208,7 @@ public void testErrorCases() throws Exception { } try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(), + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(), true)) { String newCommitTime = client.startCommit(); // Next insert diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 30d59baece096..fbd93330b1763 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -2079,7 +2079,7 @@ public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsisten private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard, boolean populateMetaFields) throws Exception { - String instantTime = "000"; + String instantTime = "00000000000010"; HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); Properties properties = new Properties(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 5617058bb8af8..1e179ca59b610 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -75,10 +75,11 @@ public void init(HoodieTableType tableType) throws IOException { } public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException { - init(tableType, enableMetadataTable, true, false); + init(tableType, enableMetadataTable, true, false, false); } - public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics) throws IOException { + public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics, boolean + validateMetadataPayloadStateConsistency) throws IOException { this.tableType = tableType; initPath(); initSparkContexts("TestHoodieMetadata"); @@ -88,7 +89,7 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics, - enableFullScan, true).build(); + enableFullScan, true, validateMetadataPayloadStateConsistency).build(); initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable); } @@ -266,11 +267,12 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, bo protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) { - return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true); + return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true, false); } protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata, - boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers) { + boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers, + boolean validateMetadataPayloadConsistency) { Properties properties = new Properties(); properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) @@ -290,6 +292,7 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea .enableFullScan(enableFullScan) .enableMetrics(enableMetrics) .withPopulateMetaFields(false) + .ignoreSpuriousDeletes(validateMetadataPayloadConsistency) .build()) .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) .withExecutorMetrics(true).build()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 0aa0593693e7c..810e475e5b460 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -138,6 +138,13 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("0.10.0") .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated."); + public static final ConfigProperty IGNORE_SPURIOUS_DELETES = ConfigProperty + .key("_" + METADATA_PREFIX + ".ignore.spurious.deletes") + .defaultValue(true) + .sinceVersion("0.10.10") + .withDocumentation("There are cases when extra files are requested to be deleted from metadata table which was never added before. This config" + + "determines how to handle such spurious deletes"); + private HoodieMetadataConfig() { super(); } @@ -174,6 +181,10 @@ public boolean populateMetaFields() { return getBooleanOrDefault(HoodieMetadataConfig.POPULATE_META_FIELDS); } + public boolean ignoreSpuriousDeletes() { + return getBoolean(IGNORE_SPURIOUS_DELETES); + } + public static class Builder { private EngineType engineType = EngineType.SPARK; @@ -252,6 +263,11 @@ public Builder enableFullScan(boolean enableFullScan) { return this; } + public Builder ignoreSpuriousDeletes(boolean validateMetadataPayloadConsistency) { + metadataConfig.setValue(IGNORE_SPURIOUS_DELETES, String.valueOf(validateMetadataPayloadConsistency)); + return this; + } + public Builder withEngineType(EngineType engineType) { this.engineType = engineType; return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index b560b76941322..ccd421e677651 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -156,11 +156,7 @@ protected List fetchAllPartitionPaths() throws IOException { List partitions = Collections.emptyList(); if (hoodieRecord.isPresent()) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { - throw new HoodieMetadataException("Metadata partition list record is inconsistent: " - + hoodieRecord.get().getData()); - } - + mayBeHandleSpuriousDeletes(hoodieRecord, "\"all partitions\""); partitions = hoodieRecord.get().getData().getFilenames(); // Partition-less tables have a single empty partition if (partitions.contains(NON_PARTITIONED_NAME)) { @@ -190,10 +186,7 @@ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { FileStatus[] statuses = {}; if (hoodieRecord.isPresent()) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { - throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: " - + hoodieRecord.get().getData()); - } + mayBeHandleSpuriousDeletes(hoodieRecord, partitionName); statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath); } @@ -228,10 +221,7 @@ Map fetchAllFilesInPartitionPaths(List partitionPath for (Pair>> entry: partitionsFileStatus) { if (entry.getValue().isPresent()) { - if (!entry.getValue().get().getData().getDeletions().isEmpty()) { - throw new HoodieMetadataException("Metadata record for partition " + entry.getKey() + " is inconsistent: " - + entry.getValue().get().getData()); - } + mayBeHandleSpuriousDeletes(entry.getValue(), entry.getKey()); result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey()))); } } @@ -240,6 +230,23 @@ Map fetchAllFilesInPartitionPaths(List partitionPath return result; } + /** + * May be handle spurious deletes. Depending on config, throw an exception or log a warn msg. + * @param hoodieRecord instance of {@link HoodieRecord} of interest. + * @param partitionName partition name of interest. + */ + private void mayBeHandleSpuriousDeletes(Option> hoodieRecord, String partitionName) { + if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { + if (!metadataConfig.ignoreSpuriousDeletes()) { + throw new HoodieMetadataException("Metadata record for " + partitionName + " is inconsistent: " + + hoodieRecord.get().getData()); + } else { + LOG.warn("Metadata record for " + partitionName + " encountered some files to be deleted which was not added before. " + + "Ignoring the spurious deletes as the `" + HoodieMetadataConfig.IGNORE_SPURIOUS_DELETES.key() + "` config is set to false"); + } + } + } + protected abstract Option> getRecordByKey(String key, String partitionName); protected abstract List>>> getRecordsByKeys(List key, String partitionName); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 3057fa065db86..50c5858edce68 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -704,6 +704,25 @@ public HoodieTestTable doRollback(String commitTimeToRollback, String commitTime return addRollback(commitTime, rollbackMetadata); } + public HoodieTestTable doRollbackWithExtraFiles(String commitTimeToRollback, String commitTime, Map> extraFiles) throws Exception { + metaClient = HoodieTableMetaClient.reload(metaClient); + Option commitMetadata = getMetadataForInstant(commitTimeToRollback); + if (!commitMetadata.isPresent()) { + throw new IllegalArgumentException("Instant to rollback not present in timeline: " + commitTimeToRollback); + } + Map> partitionFiles = getPartitionFiles(commitMetadata.get()); + for (Map.Entry> entry : partitionFiles.entrySet()) { + deleteFilesInPartition(entry.getKey(), entry.getValue()); + } + for (Map.Entry> entry: extraFiles.entrySet()) { + if (partitionFiles.containsKey(entry.getKey())) { + partitionFiles.get(entry.getKey()).addAll(entry.getValue()); + } + } + HoodieRollbackMetadata rollbackMetadata = getRollbackMetadata(commitTimeToRollback, partitionFiles); + return addRollback(commitTime, rollbackMetadata); + } + public HoodieTestTable doRestore(String commitToRestoreTo, String restoreTime) throws Exception { metaClient = HoodieTableMetaClient.reload(metaClient); List commitsToRollback = metaClient.getActiveTimeline().getCommitsTimeline() diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 27502bfdf15c0..39ca00ac000a2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -19,7 +19,6 @@ package org.apache.hudi.utilities.functional; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -28,7 +27,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; @@ -37,7 +35,6 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hadoop.fs.FileSystem; -import org.apache.spark.sql.SaveMode; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest;