Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,9 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoCleanOnCommit();
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
if (config.isAutoArchive()) {
archive(table);
}
} finally {
this.heartbeatClient.stop(instantTime);
}
Expand Down Expand Up @@ -743,6 +741,31 @@ public HoodieCleanMetadata clean(boolean skipLocking) {
return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
}

/**
* Trigger archival for the table. This ensures that the number of commits do not explode
* and keep increasing unbounded over time.
* @param table table to commit on.
*/
protected void archive(HoodieTable<T, I, K, O> table) {
try {
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to archive", ioe);
}
}

/**
* Trigger archival for the table. This ensures that the number of commits do not explode
* and keep increasing unbounded over time.
*/
public void archive() {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = createTable(config, hadoopConf);
archive(table);
}

/**
* Provides a new commit time for a write operation (insert/update/delete).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
+ " growth is bounded.");

public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
.key("hoodie.archive.automatic")
.defaultValue("true")
.withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+ " to archive commits if we cross a maximum value of commits."
+ " It's recommended to enable this, to ensure number of active commits is bounded.");

public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
.key("hoodie.clean.async")
.defaultValue("false")
Expand Down Expand Up @@ -487,6 +494,11 @@ public Builder withAsyncClean(Boolean asyncClean) {
return this;
}

public Builder withAutoArchive(Boolean autoArchive) {
compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
return this;
}

public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,10 @@ public boolean isAutoClean() {
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
}

public boolean isAutoArchive() {
return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
}

public boolean isAsyncClean() {
return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
// we will trigger compaction manually, to control the instant times
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
// we will trigger archive manually, to ensure only regular writer invokes it
.withAutoArchive(false).build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
Expand All @@ -57,7 +56,6 @@
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
Expand Down Expand Up @@ -332,11 +330,10 @@ protected void postCommit(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), createTable(config, hadoopConf), instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
if (config.isAutoArchive()) {
// We cannot have unbounded commit files. Archive commits if we have to archive
archive(table);
}
} finally {
this.heartbeatClient.stop(instantTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
if (canTriggerTableService) {
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
writeClient.archive();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
if (canTriggerTableService) {
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
writeClient.archive();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -250,6 +251,43 @@ public void testTableOperations(HoodieTableType tableType, boolean enableFullSca
validateMetadata(testTable, emptyList(), true);
}

@Test
public void testMetadataTableArchival() throws Exception {
init(COPY_ON_WRITE, false);
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.enableFullScan(true)
.enableMetrics(false)
.withMaxNumDeltaCommitsBeforeCompaction(3)
.archiveCommitsWith(3, 4)
.retainCommits(1)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build()).build();
initWriteConfigAndMetatableWriter(writeConfig, true);

AtomicInteger commitTime = new AtomicInteger(1);
// trigger 2 regular writes(1 bootstrap commit). just 1 before archival can get triggered.
int i = 1;
for (; i <= 2; i++) {
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
}
// expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction.
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4);

// trigger a async table service, archival should not kick in, even though conditions are met.
doCluster(testTable, "000000" + commitTime.getAndIncrement());
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5);

// trigger a regular write operation. archival should kick in.
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3);
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws Exception {
Expand Down