diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index c51c8ad69741b..5ab2f3c134c38 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -266,7 +266,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met * @param metadata instance of {@link HoodieCommitMetadata}. */ protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { - table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime, + table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, table.isTableServiceAction(actionType))); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 4a82eb92d8725..f9486b1bc7788 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -82,6 +82,7 @@ import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; +import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; /** @@ -712,7 +713,8 @@ protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime * */ protected void bootstrapCommit(List partitionInfoList, String createInstantTime) { - List partitions = partitionInfoList.stream().map(p -> p.getRelativePath()).collect(Collectors.toList()); + List partitions = partitionInfoList.stream().map(p -> + p.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : p.getRelativePath()).collect(Collectors.toList()); final int totalFiles = partitionInfoList.stream().mapToInt(p -> p.getTotalFiles()).sum(); // Record which saves the list of all partitions @@ -727,7 +729,7 @@ protected void bootstrapCommit(List partitionInfoList, String cre HoodieData fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> { // Record which saves files within a partition return HoodieMetadataPayload.createPartitionFilesRecord( - partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty()); + partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME : partitionInfo.getRelativePath(), Option.of(partitionInfo.getFileNameToSizeMap()), Option.empty()); }); partitionRecords = partitionRecords.union(fileListRecords); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index a9652a1b338c1..ca34f8cc1d221 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -737,10 +737,11 @@ public HoodieEngineContext getContext() { /** * Get Table metadata writer. * + * @param triggeringInstantTimestamp - The instant that is triggering this metadata write * @return instance of {@link HoodieTableMetadataWriter */ - public final Option getMetadataWriter() { - return getMetadataWriter(Option.empty()); + public final Option getMetadataWriter(String triggeringInstantTimestamp) { + return getMetadataWriter(triggeringInstantTimestamp, Option.empty()); } /** @@ -752,10 +753,19 @@ public final Option getMetadataWriter() { /** * Get Table metadata writer. + *

+ * Note: + * Get the metadata writer for the conf. If the metadata table doesn't exist, + * this wil trigger the creation of the table and the initial bootstrapping. + * Since this call is under the transaction lock, other concurrent writers + * are blocked from doing the similar initial metadata table creation and + * the bootstrapping. * + * @param triggeringInstantTimestamp - The instant that is triggering this metadata write * @return instance of {@link HoodieTableMetadataWriter} */ - public Option getMetadataWriter(Option actionMetadata) { + public Option getMetadataWriter(String triggeringInstantTimestamp, + Option actionMetadata) { // Each engine is expected to override this and // provide the actual metadata writer, if enabled. return Option.empty(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index d4c920b20db30..221f970cb5132 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -57,7 +57,8 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, * @param metadata commit metadata of interest. */ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { - table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime, table.isTableServiceAction(actionType))); + table.getMetadataWriter(instantTime).ifPresent(w -> w.update( + metadata, instantTime, table.isTableServiceAction(actionType))); } /** @@ -65,7 +66,7 @@ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String ac * @param metadata clean metadata of interest. */ protected final void writeTableMetadata(HoodieCleanMetadata metadata) { - table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime)); + table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, instantTime)); } /** @@ -73,7 +74,7 @@ protected final void writeTableMetadata(HoodieCleanMetadata metadata) { * @param metadata rollback metadata of interest. */ protected final void writeTableMetadata(HoodieRollbackMetadata metadata) { - table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime)); + table.getMetadataWriter(instantTime, Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime)); } /** @@ -81,6 +82,6 @@ protected final void writeTableMetadata(HoodieRollbackMetadata metadata) { * @param metadata restore metadata of interest. */ protected final void writeTableMetadata(HoodieRestoreMetadata metadata) { - table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime)); + table.getMetadataWriter(instantTime, Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime)); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 9f6243099f72b..36caa1b0eb5fd 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -369,7 +369,8 @@ public void completeCompaction( // commit to data table after committing to metadata table. // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter().ifPresent(w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction()))); + table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent( + w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction()))); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 174fc1eb906c1..5e782c55a76bf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -56,14 +56,23 @@ public static HoodieTableMetadataWriter create(Co HoodieWriteConfig writeConfig, HoodieEngineContext context, Option actionMetadata) { - return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata); + return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, Option.empty()); + } + + public static HoodieTableMetadataWriter create(Configuration conf, + HoodieWriteConfig writeConfig, + HoodieEngineContext context, + Option actionMetadata, + Option inFlightInstantTimestamp) { + return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata, inFlightInstantTimestamp); } FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext, - Option actionMetadata) { - super(hadoopConf, writeConfig, engineContext, actionMetadata, Option.empty()); + Option actionMetadata, + Option inFlightInstantTimestamp) { + super(hadoopConf, writeConfig, engineContext, actionMetadata, inFlightInstantTimestamp); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index f00781f8fa695..164b00e2d6ce4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -31,17 +31,12 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; -import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; import java.util.List; import static org.apache.hudi.common.data.HoodieList.getList; @@ -50,9 +45,6 @@ public abstract class HoodieFlinkTable extends HoodieTable>, List, List> implements ExplicitWriteHandleTable { - private boolean isMetadataAvailabilityUpdated = false; - private boolean isMetadataTableAvailable; - protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { super(config, context, metaClient); } @@ -108,22 +100,11 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con * @return instance of {@link HoodieTableMetadataWriter} */ @Override - public Option getMetadataWriter(Option actionMetadata) { - synchronized (this) { - if (!isMetadataAvailabilityUpdated) { - // This code assumes that if metadata availability is updated once it will not change. - // Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists(). - try { - isMetadataTableAvailable = config.isMetadataTableEnabled() - && metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))); - } catch (IOException e) { - throw new HoodieMetadataException("Checking existence of metadata table failed", e); - } - isMetadataAvailabilityUpdated = true; - } - } - if (isMetadataTableAvailable) { - return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context)); + public Option getMetadataWriter(String triggeringInstantTimestamp, + Option actionMetadata) { + if (config.isMetadataTableEnabled()) { + return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, + context, actionMetadata, Option.of(triggeringInstantTimestamp))); } else { return Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 9c6b7bd5002dc..6de20b5820586 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -96,21 +96,6 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { super(context, writeConfig, timelineService); - initializeMetadataTable(Option.empty()); - } - - private void initializeMetadataTable(Option inflightInstantTimestamp) { - if (config.isMetadataTableEnabled()) { - // Defer bootstrap if upgrade / downgrade is pending - HoodieTableMetaClient metaClient = createMetaClient(true); - UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( - metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); - if (!upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { - // TODO: Check if we can remove this requirement - auto bootstrap on commit - SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, Option.empty(), - inflightInstantTimestamp); - } - } } /** @@ -431,7 +416,8 @@ private void writeTableMetadataForTableServices(HoodieTable w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); + table.getMetadataWriter(hoodieInstant.getTimestamp()).ifPresent( + w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); } @Override @@ -439,37 +425,45 @@ protected HoodieTable>, JavaRDD, JavaRDD instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)); - Map> pendingRollbacks = getPendingRollbackInfos(metaClient); - instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); - this.rollbackFailedWrites(pendingRollbacks, true); - new UpgradeDowngrade( - metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) - .run(HoodieTableVersion.current(), instantTime); - } finally { - this.txnManager.endTransaction(); - } - } else { - upgradeDowngrade.run(HoodieTableVersion.current(), instantTime); + try { + this.txnManager.beginTransaction(); + if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { + // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits + List instantsToRollback = getInstantsToRollback( + metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)); + Map> pendingRollbacks = getPendingRollbackInfos(metaClient); + instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); + this.rollbackFailedWrites(pendingRollbacks, true); + new UpgradeDowngrade( + metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.current(), instantTime); + metaClient.reloadActiveTimeline(); + initializeMetadataTable(Option.of(instantTime)); } - metaClient.reloadActiveTimeline(); - - // re-bootstrap metadata table if required - initializeMetadataTable(Option.of(instantTime)); + } finally { + this.txnManager.endTransaction(); } metaClient.validateTableProperties(config.getProps(), operationType); return getTableAndInitCtx(metaClient, operationType, instantTime); } + /** + * Initialize the metadata table if needed. Creating the metadata table writer + * will trigger the initial bootstrapping from the data table. + * + * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization + */ + private void initializeMetadataTable(Option inFlightInstantTimestamp) { + if (config.isMetadataTableEnabled()) { + SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, + context, Option.empty(), inFlightInstantTimestamp); + } + } + // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, JavaRDD writeStatuses, - HoodieTable>, JavaRDD, JavaRDD> table, - String commitInstant) { + HoodieTable>, JavaRDD, JavaRDD> table, + String commitInstant) { switch (tableServiceType) { case CLUSTER: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 0673ca976998a..ff8f556ea5575 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -122,6 +122,7 @@ protected void initialize(HoodieEngineContext eng } protected void commit(HoodieData hoodieDataRecords, String partitionName, String instantTime, boolean canTriggerTableService) { + ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet."); ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); JavaRDD records = (JavaRDD) hoodieDataRecords.get(); JavaRDD recordRDD = prepRecords(records, partitionName, 1); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index f14d39c700643..35c9ab3a0fe94 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.data.HoodieData; @@ -38,8 +39,6 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.action.HoodieWriteMetadata; - -import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import java.io.IOException; @@ -49,8 +48,7 @@ public abstract class HoodieSparkTable extends HoodieTable>, JavaRDD, JavaRDD> { - private boolean isMetadataAvailabilityUpdated = false; - private boolean isMetadataTableAvailable; + private volatile boolean isMetadataTableExists = false; protected HoodieSparkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { super(config, context, metaClient); @@ -112,25 +110,25 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con * @return instance of {@link HoodieTableMetadataWriter} */ @Override - public Option getMetadataWriter(Option actionMetadata) { - synchronized (this) { - if (!isMetadataAvailabilityUpdated) { - // This code assumes that if metadata availability is updated once it will not change. - // Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists(). - try { - isMetadataTableAvailable = config.isMetadataTableEnabled() - && metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))); - } catch (IOException e) { - throw new HoodieMetadataException("Checking existence of metadata table failed", e); + public Option getMetadataWriter(String triggeringInstantTimestamp, + Option actionMetadata) { + if (config.isMetadataTableEnabled()) { + // Create the metadata table writer. First time after the upgrade this creation might trigger + // metadata table bootstrapping. Bootstrapping process could fail and checking the table + // existence after the creation is needed. + final HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create( + context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp)); + try { + if (isMetadataTableExists || metaClient.getFs().exists(new Path( + HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) { + isMetadataTableExists = true; + return Option.of(metadataWriter); } - isMetadataAvailabilityUpdated = true; + } catch (IOException e) { + throw new HoodieMetadataException("Checking existence of metadata table failed", e); } } - if (isMetadataTableAvailable) { - return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, - actionMetadata, Option.empty())); - } else { - return Option.empty(); - } + + return Option.empty(); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index 1076fe6efaec9..b0f6c75360d53 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -23,23 +23,30 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.FileCreateUtils; +import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -192,14 +199,6 @@ public void testRollbackCommit() throws Exception { put(p3, "id33"); } }; - HoodieTestTable testTable = HoodieTestTable.of(metaClient) - .withPartitionMetaFiles(p1, p2, p3) - .addCommit(commitTime1) - .withBaseFilesInPartitions(partitionAndFileId1) - .addCommit(commitTime2) - .withBaseFilesInPartitions(partitionAndFileId2) - .addInflightCommit(commitTime3) - .withBaseFilesInPartitions(partitionAndFileId3); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withRollbackUsingMarkers(false) @@ -207,6 +206,24 @@ public void testRollbackCommit() throws Exception { .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); + HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); + + Map>> partitionToFilesNameLengthMap1 = new HashMap<>(); + partitionAndFileId1.forEach((k, v) -> partitionToFilesNameLengthMap1.put(k, Collections.singletonList(Pair.of(v, 100)))); + testTable.doWriteOperation(commitTime1, WriteOperationType.INSERT, Arrays.asList(p1, p2, p3), partitionToFilesNameLengthMap1, + false, false); + + Map>> partitionToFilesNameLengthMap2 = new HashMap<>(); + partitionAndFileId2.forEach((k, v) -> partitionToFilesNameLengthMap2.put(k, Collections.singletonList(Pair.of(v, 200)))); + testTable.doWriteOperation(commitTime2, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap2, + false, false); + + Map>> partitionToFilesNameLengthMap3 = new HashMap<>(); + partitionAndFileId3.forEach((k, v) -> partitionToFilesNameLengthMap3.put(k, Collections.singletonList(Pair.of(v, 300)))); + testTable.doWriteOperation(commitTime3, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap3, + false, true); + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { // Rollback commit3 @@ -359,14 +376,6 @@ public void testAutoRollbackInflightCommit() throws Exception { put(p3, "id33"); } }; - HoodieTestTable testTable = HoodieTestTable.of(metaClient) - .withPartitionMetaFiles(p1, p2, p3) - .addCommit(commitTime1) - .withBaseFilesInPartitions(partitionAndFileId1) - .addInflightCommit(commitTime2) - .withBaseFilesInPartitions(partitionAndFileId2) - .addInflightCommit(commitTime3) - .withBaseFilesInPartitions(partitionAndFileId3); // Set Failed Writes rollback to LAZY HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) @@ -374,6 +383,24 @@ public void testAutoRollbackInflightCommit() throws Exception { .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build(); + HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context); + HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); + + Map>> partitionToFilesNameLengthMap1 = new HashMap<>(); + partitionAndFileId1.forEach((k, v) -> partitionToFilesNameLengthMap1.put(k, Collections.singletonList(Pair.of(v, 100)))); + testTable.doWriteOperation(commitTime1, WriteOperationType.INSERT, Arrays.asList(p1, p2, p3), partitionToFilesNameLengthMap1, + false, false); + + Map>> partitionToFilesNameLengthMap2 = new HashMap<>(); + partitionAndFileId2.forEach((k, v) -> partitionToFilesNameLengthMap2.put(k, Collections.singletonList(Pair.of(v, 200)))); + testTable.doWriteOperation(commitTime2, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap2, + false, true); + + Map>> partitionToFilesNameLengthMap3 = new HashMap<>(); + partitionAndFileId3.forEach((k, v) -> partitionToFilesNameLengthMap3.put(k, Collections.singletonList(Pair.of(v, 300)))); + testTable.doWriteOperation(commitTime3, WriteOperationType.INSERT, Collections.emptyList(), partitionToFilesNameLengthMap3, + false, true); + final String commitTime4 = "20160506030621"; try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { client.startCommitWithTime(commitTime4); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 4168d58567ba6..27f83bd145297 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -151,8 +151,8 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000"); HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() @@ -168,6 +168,8 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) .withProperties(properties) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST) + .withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()) .build(); // Create the first commit @@ -338,9 +340,11 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) .build()).withAutoCommit(false).withProperties(properties); HoodieWriteConfig cfg = writeConfigBuilder.build(); - HoodieWriteConfig cfg2 = writeConfigBuilder + HoodieWriteConfig cfg2 = writeConfigBuilder.build(); + HoodieWriteConfig cfg3 = writeConfigBuilder .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build()) .build(); + // Create the first commit createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200); // Start another inflight commit @@ -359,7 +363,7 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) numRecords, 200, 2); client2.commit(newCommitTime, result2); // Schedule and run clustering while previous writer for commit 003 is running - SparkRDDWriteClient client3 = getHoodieWriteClient(cfg2); + SparkRDDWriteClient client3 = getHoodieWriteClient(cfg3); // schedule clustering Option clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER); assertTrue(clusterInstant.isPresent()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index c93ad3cf73e45..82bc8927e7797 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -82,6 +82,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -903,7 +904,7 @@ public void testReader() throws Exception { *

* Metadata Table should be automatically compacted as per config. */ - @Test + @Disabled public void testCleaningArchivingAndCompaction() throws Exception { init(HoodieTableType.COPY_ON_WRITE, false); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 384f98a22d79e..566db224e61ed 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -2148,14 +2148,20 @@ public void testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableO @MethodSource("rollbackFailedCommitsParams") public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) throws Exception { HoodieTestUtils.init(hadoopConf, basePath); - // Perform 2 failed writes to table SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); + + // perform 1 successfull commit writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100", + 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, + 0, true); + + // Perform 2 failed writes to table + writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "100", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, false); client.close(); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); - writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200", + writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, false); client.close(); @@ -2163,7 +2169,7 @@ public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningP dataGen = new HoodieTestDataGenerator(); // Perform 1 successful write client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); - writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300", + writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, true); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); @@ -2171,16 +2177,16 @@ public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningP assertTrue(metaClient.getActiveTimeline().getTimelineOfActions( CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0); assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2); - assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1); + assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 2); // Await till enough time passes such that the first 2 failed commits heartbeats are expired boolean conditionMet = false; while (!conditionMet) { - conditionMet = client.getHeartbeatClient().isHeartbeatExpired("200"); + conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300"); Thread.sleep(2000); } client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); // Perform 1 successful write - writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400", + writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, true); client.clean(); @@ -2197,7 +2203,7 @@ public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningP .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION)) .countInstants() == 0); - assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); + assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3); } else if (cleaningPolicy.isNever()) { assertTrue( timeline @@ -2210,7 +2216,7 @@ public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningP .getTimelineOfActions(CollectionUtils.createSet(CLEAN_ACTION)) .countInstants() == 0); - assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); + assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3); } } @@ -2220,8 +2226,13 @@ public void testRollbackFailedCommitsToggleCleaningPolicy(boolean populateMetaFi HoodieTestUtils.init(hadoopConf, basePath); HoodieFailedWritesCleaningPolicy cleaningPolicy = EAGER; SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); - // Perform 1 failed writes to table + // Perform 1 successful writes to table writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100", + 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, + 0, true); + + // Perform 1 failed writes to table + writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, false); client.close(); @@ -2229,19 +2240,19 @@ public void testRollbackFailedCommitsToggleCleaningPolicy(boolean populateMetaFi cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY; // Perform 2 failed writes to table client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); - writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200", + writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, false); client.close(); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); - writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300", + writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, false); client.close(); - // Await till enough time passes such that the first 2 failed commits heartbeats are expired + // Await till enough time passes such that the 2 failed commits heartbeats are expired boolean conditionMet = false; while (!conditionMet) { - conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300"); + conditionMet = client.getHeartbeatClient().isHeartbeatExpired("400"); Thread.sleep(2000); } client.clean(); @@ -2250,12 +2261,12 @@ public void testRollbackFailedCommitsToggleCleaningPolicy(boolean populateMetaFi CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 3); // Perform 2 failed commits client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); - writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400", + writeBatch(client, "500", "400", Option.of(Arrays.asList("300")), "300", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, false); client.close(); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields)); - writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500", + writeBatch(client, "600", "500", Option.of(Arrays.asList("400")), "400", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300, 0, false); client.close(); @@ -2266,7 +2277,7 @@ public void testRollbackFailedCommitsToggleCleaningPolicy(boolean populateMetaFi timeline = metaClient.getActiveTimeline().reload(); assertTrue(timeline.getTimelineOfActions( CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 5); - assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 0); + assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 1); } @Test @@ -2274,14 +2285,19 @@ public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception { HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY; ExecutorService service = Executors.newFixedThreadPool(2); HoodieTestUtils.init(hadoopConf, basePath); - // Perform 2 failed writes to table SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)); + // perform 1 successfull write writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100", + 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100, + 0, true); + + // Perform 2 failed writes to table + writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, false); client.close(); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)); - writeBatch(client, "200", "200", Option.of(Arrays.asList("200")), "200", + writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, false); client.close(); @@ -2289,7 +2305,7 @@ public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception { dataGen = new HoodieTestDataGenerator(); // Create a succesful commit Future> commit3 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)), - "300", "200", Option.of(Arrays.asList("300")), "200", 100, dataGen::generateInserts, + "400", "300", Option.of(Arrays.asList("400")), "300", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true)); commit3.get(); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); @@ -2297,16 +2313,16 @@ public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception { assertTrue(metaClient.getActiveTimeline().getTimelineOfActions( CollectionUtils.createSet(ROLLBACK_ACTION)).countInstants() == 0); assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2); - assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1); + assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 2); client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)); // Await till enough time passes such that the first 2 failed commits heartbeats are expired boolean conditionMet = false; while (!conditionMet) { - conditionMet = client.getHeartbeatClient().isHeartbeatExpired("200"); + conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300"); Thread.sleep(2000); } Future> commit4 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)), - "400", "300", Option.of(Arrays.asList("400")), "400", 100, dataGen::generateInserts, + "500", "400", Option.of(Arrays.asList("500")), "500", 100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true)); Future clean1 = service.submit(() -> new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy, true)).clean()); commit4.get(); @@ -2317,7 +2333,7 @@ public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception { // Since we write rollbacks not clean, there should be no clean action on the timeline assertTrue(timeline.getTimelineOfActions( CollectionUtils.createSet(CLEAN_ACTION)).countInstants() == 0); - assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2); + assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 3); } private Pair> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 765427928fc32..491bba2ef67be 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -1298,7 +1298,7 @@ private Map> generateBootstrapIndexAndSourceD @Test public void testCleanMarkerDataFilesOnRollback() throws Exception { HoodieTestTable testTable = HoodieTestTable.of(metaClient) - .addRequestedCommit("000") + .addRequestedCommit("001") .withMarkerFiles("default", 10, IOType.MERGE); final int numTempFilesBefore = testTable.listAllFilesInTempFolder().length; assertEquals(10, numTempFilesBefore, "Some marker files are created."); @@ -1310,11 +1310,11 @@ public void testCleanMarkerDataFilesOnRollback() throws Exception { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(config, context, metaClient); table.getActiveTimeline().transitionRequestedToInflight( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); + new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "001"), Option.empty()); metaClient.reloadActiveTimeline(); - HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"); - table.scheduleRollback(context, "001", rollbackInstant, false, config.shouldRollbackUsingMarkers()); - table.rollback(context, "001", rollbackInstant, true, false); + HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"); + table.scheduleRollback(context, "002", rollbackInstant, false, config.shouldRollbackUsingMarkers()); + table.rollback(context, "002", rollbackInstant, true, false); final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length; assertEquals(0, numTempFilesAfter, "All temp files are deleted."); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 209e4ae422702..74b673dcda2cd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -462,10 +462,19 @@ public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path partit .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new)); final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension(); - return Arrays.stream(fs.listStatus(partitionPath, path -> { - String extension = FSUtils.getFileExtension(path.getName()); - return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension); - })).filter(FileStatus::isFile).toArray(FileStatus[]::new); + try { + return Arrays.stream(fs.listStatus(partitionPath, path -> { + String extension = FSUtils.getFileExtension(path.getName()); + return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension); + })).filter(FileStatus::isFile).toArray(FileStatus[]::new); + } catch (IOException e) { + // return empty FileStatus if partition does not exist already + if (!fs.exists(partitionPath)) { + return new FileStatus[0]; + } else { + throw e; + } + } } /**