Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ " keep the metadata overhead constant, even as the table size grows."
+ "This config controls the maximum number of instants to retain in the active timeline. ");

public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.archive.delete.parallelism")
.defaultValue(100)
.withDocumentation("Parallelism for deleting archived hoodie commits.");

public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
.key("hoodie.keep.min.commits")
.defaultValue("20")
Expand Down Expand Up @@ -568,6 +573,11 @@ public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBefo
return this;
}

public Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
compactionConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
return this;
}

public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) {
compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS, String.valueOf(maxDeltaSecondsBeforeCompaction));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,10 @@ public Boolean getCompactionReverseLogReadEnabled() {
return getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE);
}

public int getArchiveDeleteParallelism() {
return getInt(HoodieCompactionConfig.DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE);
}

public boolean inlineClusteringEnabled() {
return getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.hudi.table;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -127,7 +129,7 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException
LOG.info("Archiving instants " + instantsToArchive);
archive(context, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive);
success = deleteArchivedInstants(instantsToArchive, context);
} else {
LOG.info("No Instants to archive");
}
Expand Down Expand Up @@ -224,19 +226,34 @@ private Stream<HoodieInstant> getInstantsToArchive() {
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
}

private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException {
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
LOG.info("Deleting instants " + archivedInstants);
boolean success = true;
for (HoodieInstant archivedInstant : archivedInstants) {
Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
try {
if (metaClient.getFs().exists(commitFile)) {
success &= metaClient.getFs().delete(commitFile, false);
LOG.info("Archived and deleted instant file " + commitFile);
}
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e);
}
List<String> instantFiles = archivedInstants.stream().map(archivedInstant -> {
return new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
}).map(Path::toString).collect(Collectors.toList());

context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
Map<String, Boolean> resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context,
metaClient.getFs(),
config.getArchiveDeleteParallelism(),
pairOfSubPathAndConf -> {
Path commitFile = new Path(pairOfSubPathAndConf.getKey());
try {
FileSystem fs = commitFile.getFileSystem(pairOfSubPathAndConf.getValue().get());
if (fs.exists(commitFile)) {
return fs.delete(commitFile, false);
}
return true;
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " + commitFile, e);
}
},
instantFiles);

for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
success &= result.getValue();
}

// Remove older meta-data from auxiliary path too
Expand Down
25 changes: 18 additions & 7 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -658,19 +658,30 @@ public static <T> Map<String, T> parallelizeSubPathProcess(
.filter(subPathPredicate)
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
if (subPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
int actualParallelism = Math.min(subPaths.size(), parallelism);
result = hoodieEngineContext.mapToPair(subPaths,
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))),
actualParallelism);
}
result = parallelizeFilesProcess(hoodieEngineContext, fs, parallelism, pairFunction, subPaths);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
return result;
}

public static <T> Map<String, T> parallelizeFilesProcess(
HoodieEngineContext hoodieEngineContext,
FileSystem fs,
int parallelism,
SerializableFunction<Pair<String, SerializableConfiguration>, T> pairFunction,
List<String> subPaths) {
Map<String, T> result = new HashMap<>();
if (subPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
int actualParallelism = Math.min(subPaths.size(), parallelism);
result = hoodieEngineContext.mapToPair(subPaths,
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))),
actualParallelism);
}
return result;
}

/**
* Deletes a sub-path.
*
Expand Down