diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 96d89fcc3a6e7..59acbb21719a3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -449,11 +449,9 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata me WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); autoCleanOnCommit(); - // We cannot have unbounded commit files. Archive commits if we have to archive - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); - archiveLog.archiveIfRequired(context); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); + if (config.isAutoArchive()) { + archive(table); + } } finally { this.heartbeatClient.stop(instantTime); } @@ -743,6 +741,31 @@ public HoodieCleanMetadata clean(boolean skipLocking) { return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking); } + /** + * Trigger archival for the table. This ensures that the number of commits do not explode + * and keep increasing unbounded over time. + * @param table table to commit on. + */ + protected void archive(HoodieTable table) { + try { + // We cannot have unbounded commit files. Archive commits if we have to archive + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); + archiveLog.archiveIfRequired(context); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to archive", ioe); + } + } + + /** + * Trigger archival for the table. This ensures that the number of commits do not explode + * and keep increasing unbounded over time. + */ + public void archive() { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = createTable(config, hadoopConf); + archive(table); + } + /** * Provides a new commit time for a write operation (insert/update/delete). */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index e11d06098fc77..4694b03212214 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -57,6 +57,13 @@ public class HoodieCompactionConfig extends HoodieConfig { + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage" + " growth is bounded."); + public static final ConfigProperty AUTO_ARCHIVE = ConfigProperty + .key("hoodie.archive.automatic") + .defaultValue("true") + .withDocumentation("When enabled, the archival table service is invoked immediately after each commit," + + " to archive commits if we cross a maximum value of commits." + + " It's recommended to enable this, to ensure number of active commits is bounded."); + public static final ConfigProperty ASYNC_CLEAN = ConfigProperty .key("hoodie.clean.async") .defaultValue("false") @@ -487,6 +494,11 @@ public Builder withAsyncClean(Boolean asyncClean) { return this; } + public Builder withAutoArchive(Boolean autoArchive) { + compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive)); + return this; + } + public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) { compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9ae22e810e1ec..f69e3f5eba061 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1101,6 +1101,10 @@ public boolean isAutoClean() { return getBoolean(HoodieCompactionConfig.AUTO_CLEAN); } + public boolean isAutoArchive() { + return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE); + } + public boolean isAsyncClean() { return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index f9486b1bc7788..54284fc48149c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -204,7 +204,9 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep) // we will trigger compaction manually, to control the instant times .withInlineCompaction(false) - .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build()) + .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()) + // we will trigger archive manually, to ensure only regular writer invokes it + .withAutoArchive(false).build()) .withParallelism(parallelism, parallelism) .withDeleteParallelism(parallelism) .withRollbackParallelism(parallelism) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 36caa1b0eb5fd..374dd1226ca25 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -40,7 +40,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.HoodieIndex; @@ -57,7 +56,6 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; @@ -332,11 +330,10 @@ protected void postCommit(HoodieTable>, List, // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - // We cannot have unbounded commit files. Archive commits if we have to archive - HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table); - archiveLog.archiveIfRequired(context); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); + if (config.isAutoArchive()) { + // We cannot have unbounded commit files. Archive commits if we have to archive + archive(table); + } } finally { this.heartbeatClient.stop(instantTime); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 5e782c55a76bf..0dcfcfc925ffa 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -140,6 +140,7 @@ protected void commit(HoodieData hoodieDataRecords, String partiti if (canTriggerTableService) { compactIfNecessary(writeClient, instantTime); doClean(writeClient, instantTime); + writeClient.archive(); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index ff8f556ea5575..65ade82875a9d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -155,6 +155,7 @@ protected void commit(HoodieData hoodieDataRecords, String partiti if (canTriggerTableService) { compactIfNecessary(writeClient, instantTime); doClean(writeClient, instantTime); + writeClient.archive(); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 82bc8927e7797..73b78118bcd09 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -107,6 +107,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -250,6 +251,43 @@ public void testTableOperations(HoodieTableType tableType, boolean enableFullSca validateMetadata(testTable, emptyList(), true); } + @Test + public void testMetadataTableArchival() throws Exception { + init(COPY_ON_WRITE, false); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .enableFullScan(true) + .enableMetrics(false) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .archiveCommitsWith(3, 4) + .retainCommits(1) + .build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build()).build(); + initWriteConfigAndMetatableWriter(writeConfig, true); + + AtomicInteger commitTime = new AtomicInteger(1); + // trigger 2 regular writes(1 bootstrap commit). just 1 before archival can get triggered. + int i = 1; + for (; i <= 2; i++) { + doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); + } + // expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction. + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build(); + HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4); + + // trigger a async table service, archival should not kick in, even though conditions are met. + doCluster(testTable, "000000" + commitTime.getAndIncrement()); + metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5); + + // trigger a regular write operation. archival should kick in. + doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT); + metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3); + } + @ParameterizedTest @EnumSource(HoodieTableType.class) public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws Exception {