Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -76,12 +77,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;

/**
* Archiver to bound the growth of files under .hoodie meta path.
Expand Down Expand Up @@ -409,9 +412,11 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterInflights().firstInstant();

// We cannot have any holes in the commit timeline. We cannot archive any commits which are
// made after the first savepoint present.
// NOTE: We cannot have any holes in the commit timeline.
// We cannot archive any commits which are made after the first savepoint present,
// unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
Set<String> savepointTimestamps = table.getSavepointTimestamps();
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
// For Merge-On-Read table, inline or async compaction is enabled
// We need to make sure that there are enough delta commits in the active timeline
Expand All @@ -428,28 +433,33 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
.filter(s -> {
// if no savepoint present, then don't filter
return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
if (config.shouldArchiveBeyondSavepoint()) {
// skip savepoint commits and proceed further
return !savepointTimestamps.contains(s.getTimestamp());
} else {
// if no savepoint present, then don't filter
// stop at first savepoint commit
return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}
}).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
return oldestPendingCompactionAndReplaceInstant
.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}).filter(s -> {
// We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't
// get archived, i.e, instants after the oldestInflight are retained on the timeline
if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) {
return oldestInflightCommitInstant.map(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}
return true;
}).filter(s ->
oldestInstantToRetainForCompaction.map(instantToRetain ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
);

return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
return Stream.empty();
Expand Down Expand Up @@ -479,26 +489,37 @@ private Stream<HoodieInstant> getInstantsToArchive() {
instants = Stream.empty();
} else {
LOG.info("Limiting archiving of instants to latest compaction on metadata table at " + latestCompactionTime.get());
instants = instants.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN,
instants = instants.filter(instant -> compareTimestamps(instant.getTimestamp(), LESSER_THAN,
latestCompactionTime.get()));
}
} catch (Exception e) {
throw new HoodieException("Error limiting instant archival based on metadata table", e);
}
}

// If this is a metadata table, do not archive the commits that live in data set
// active timeline. This is required by metadata table,
// see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
Option<String> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp);
if (earliestActiveDatasetCommit.isPresent()) {
instants = instants.filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get()));
Option<HoodieInstant> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant();

if (config.shouldArchiveBeyondSavepoint()) {
// There are chances that there could be holes in the timeline due to archival and savepoint interplay.
// So, the first non-savepoint commit in the data timeline is considered as beginning of the active timeline.
Option<HoodieInstant> firstNonSavepointCommit = dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit();
if (firstNonSavepointCommit.isPresent()) {
String firstNonSavepointCommitTime = firstNonSavepointCommit.get().getTimestamp();
instants = instants.filter(instant ->
compareTimestamps(instant.getTimestamp(), LESSER_THAN, firstNonSavepointCommitTime));
}
} else {
// Do not archive the commits that live in data set active timeline.
// This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (earliestActiveDatasetCommit.isPresent()) {
instants = instants.filter(instant ->
compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp()));
}
}
}

Expand Down Expand Up @@ -589,7 +610,7 @@ private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thre
}

List<HoodieInstant> instantsToBeDeleted =
instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
instants.stream().filter(instant1 -> compareTimestamps(instant1.getTimestamp(),
LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList());

for (HoodieInstant deleteInstant : instantsToBeDeleted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.config.HoodieConfig;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand All @@ -34,8 +35,8 @@
*/
@Immutable
@ConfigClassProperty(name = "Archival Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations that control archival.")
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations that control archival.")
public class HoodieArchivalConfig extends HoodieConfig {

public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
Expand Down Expand Up @@ -92,6 +93,13 @@ public class HoodieArchivalConfig extends HoodieConfig {
.withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
+ " useful when storage scheme doesn't support append operation.");

public static final ConfigProperty<Boolean> ARCHIVE_BEYOND_SAVEPOINT = ConfigProperty
.key("hoodie.archive.beyond.savepoint")
.defaultValue(false)
.sinceVersion("0.12.0")
.withDocumentation("If enabled, archival will proceed beyond savepoint, skipping savepoint commits. "
+ "If disabled, archival will stop at the earliest savepoint commit.");

/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
Expand All @@ -107,7 +115,9 @@ public class HoodieArchivalConfig extends HoodieConfig {
*/
@Deprecated
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
/** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
@Deprecated
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue();
/**
Expand Down Expand Up @@ -186,6 +196,11 @@ public HoodieArchivalConfig.Builder withCommitsArchivalBatchSize(int batchSize)
return this;
}

public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT, String.valueOf(archiveBeyondSavepoint));
return this;
}

public HoodieArchivalConfig build() {
archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
return archivalConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,11 @@ public boolean isAutoClean() {
}

public boolean getArchiveMergeEnable() {
return getBoolean(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
}

public boolean shouldArchiveBeyondSavepoint() {
return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT);
}

public long getArchiveMergeSmallFileLimitBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ public HoodieTimeline getCompletedSavepointTimeline() {
}

/**
* Get the list of savepoints in this table.
* Get the list of savepoint timestamps in this table.
*/
public List<String> getSavepoints() {
return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
public Set<String> getSavepointTimestamps() {
return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
}

public HoodieActiveTimeline getActiveTimeline() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieT
* Get the list of data file names savepointed.
*/
public Stream<String> getSavepointedDataFiles(String savepointTime) {
if (!hoodieTable.getSavepoints().contains(savepointTime)) {
if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) {
throw new HoodieSavepointException(
"Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
}
Expand Down Expand Up @@ -227,7 +227,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(
+ " file versions. ");
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());

Expand Down Expand Up @@ -295,7 +295,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
List<CleanFileInfo> deletePaths = new ArrayList<>();

// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());

Expand Down
Loading