Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ public void runAnyPendingCompactions() {
tableServiceClient.runAnyPendingCompactions(createTable(config, hadoopConf));
}

/**
* Run any pending log compactions.
*/
public void runAnyPendingLogCompactions() {
tableServiceClient.runAnyPendingLogCompactions(createTable(config, hadoopConf));
}

/**
* Create a savepoint based on the latest commit action on the timeline.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "but users are expected to trigger async job for execution. If `hoodie.compact.inline` is set to true, regular writers will do both scheduling and "
+ "execution inline for compaction");

public static final ConfigProperty<String> ENABLE_LOG_COMPACTION = ConfigProperty
.key("hoodie.log.compaction.enable")
.defaultValue("false")
.sinceVersion("0.14")
.withDocumentation("By enabling log compaction through this config, log compaction will also get enabled for the metadata table.");
Copy link
Contributor

@danny0405 danny0405 Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems only MDT uses the log compaction. If that is true, kind of think this belongs to a MDT config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log compaction can also be done on a main table, if it of MOR table type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think enable log compaction is still required so created another config in HoodieMetadataConfig class.


public static final ConfigProperty<String> INLINE_LOG_COMPACT = ConfigProperty
.key("hoodie.log.compaction.inline")
.defaultValue("false")
Expand Down Expand Up @@ -432,8 +438,13 @@ public Builder withCompactionLogFileNumThreshold(int logFileNumThreshold) {
return this;
}

public Builder withLogCompactionBlocksThreshold(String logCompactionBlocksThreshold) {
compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, logCompactionBlocksThreshold);
public Builder withLogCompactionEnabled(boolean enableLogCompaction) {
compactionConfig.setValue(ENABLE_LOG_COMPACTION, Boolean.toString(enableLogCompaction));
return this;
}

public Builder withLogCompactionBlocksThreshold(int logCompactionBlocksThreshold) {
compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, String.valueOf(logCompactionBlocksThreshold));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,10 @@ public boolean populateMetaFields() {
* compaction properties.
*/

public boolean isLogCompactionEnabled() {
return getBoolean(HoodieCompactionConfig.ENABLE_LOG_COMPACTION);
}

public int getLogCompactionBlocksThreshold() {
return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD);
}
Expand Down Expand Up @@ -2335,6 +2339,14 @@ public boolean isMetadataAsyncIndex() {
return getBooleanOrDefault(HoodieMetadataConfig.ASYNC_INDEX_ENABLE);
}

public int getMetadataLogCompactBlocksThreshold() {
return getInt(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD);
}

public boolean isLogCompactionEnabledOnMetadata() {
return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE);
}

/**
* Hoodie Client Lock Configs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Map;
Expand All @@ -34,6 +36,7 @@
* Factory class for hoodie merge handle.
*/
public class HoodieMergeHandleFactory {
private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandleFactory.class);
/**
* Creates a merge handle for normal write path.
*/
Expand All @@ -47,6 +50,7 @@ public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
String fileId,
TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
LOG.info("Create update handle for fileId {} and partition path {} at commit {}", fileId, partitionPath, instantTime);
if (table.requireSortedRecords()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just remove this log, it's verbose.

if (table.getMetaClient().getTableConfig().isCDCEnabled()) {
return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier,
Expand Down Expand Up @@ -79,6 +83,7 @@ public static <T, I, K, O> HoodieMergeHandle<T, I, K, O> create(
HoodieBaseFile dataFileToBeMerged,
TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
LOG.info("Get updateHandle for fileId {} and partitionPath {} at commit {}", fileId, partitionPath, instantTime);
if (table.requireSortedRecords()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just remove this log, it's verbose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will change it to debug mode in subsequent patch.

return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,6 @@ public void performTableServices(Option<String> inFlightInstantTimestamp) {
if (validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, latestDeltacommitTime)) {
compactIfNecessary(writeClient, latestDeltacommitTime);
}

writeClient.archive();
LOG.info("All the table services operations on MDT completed successfully");
} catch (Exception e) {
Expand All @@ -1008,6 +1007,7 @@ public void performTableServices(Option<String> inFlightInstantTimestamp) {
private void runPendingTableServicesOperations(BaseHoodieWriteClient writeClient) {
// finish off any pending log compaction or compactions operations if any from previous attempt.
writeClient.runAnyPendingCompactions();
writeClient.runAnyPendingLogCompactions();
}

/**
Expand All @@ -1025,13 +1025,27 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String late
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
final String compactionInstantTime = HoodieTableMetadataUtil.createCompactionTimestamp(latestDeltacommitTime);

// we need to avoid checking compaction w/ same instant again.
// let's say we trigger compaction after C5 in MDT and so compaction completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction if we already have compaction w/ same instant time.
if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)
&& writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) {
LOG.info(String.format("Compaction with same %s time is already present in the timeline.", compactionInstantTime));
} else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
LOG.info("Compaction is scheduled for timestamp " + compactionInstantTime);
writeClient.compact(compactionInstantTime);
} else if (metadataWriteConfig.isLogCompactionEnabled()) {
// Schedule and execute log compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
final String logCompactionInstantTime = HoodieTableMetadataUtil.createLogCompactionTimestamp(latestDeltacommitTime);
if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime)) {
LOG.info(String.format("Log compaction with same %s time is already present in the timeline.", logCompactionInstantTime));
} else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, Option.empty())) {
LOG.info("Log compaction is scheduled for timestamp " + logCompactionInstantTime);
writeClient.logCompact(logCompactionInstantTime);
}
}
}

Expand Down Expand Up @@ -1063,7 +1077,7 @@ protected boolean validateTimelineBeforeSchedulingCompaction(Option<String> inFl
// we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table.
// Whenever you want to change this logic, please ensure all below scenarios are considered.
// a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
// b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, latest compaction instant time in MDT represents
// b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, the latest compaction instant time in MDT represents
// any instants before that is already synced with metadata table.
// c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every
// instant before c4 is synced with metadata table.
Expand All @@ -1078,6 +1092,17 @@ protected boolean validateTimelineBeforeSchedulingCompaction(Option<String> inFl
return false;
}

// Check if there are any pending compaction or log compaction instants in the timeline.
// If pending compact/logcompaction operations are found abort scheduling new compaction/logcompaction operations.
Option<HoodieInstant> pendingLogCompactionInstant =
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
Option<HoodieInstant> pendingCompactionInstant =
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) {
LOG.warn(String.format("Not scheduling compaction or logcompaction, since a pending compaction instant %s or logcompaction %s instant is present",
pendingCompactionInstant, pendingLogCompactionInstant));
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public static HoodieWriteConfig createMetadataWriteConfig(
// deltacommits having corresponding completed commits. Therefore, we need to compact all fileslices of all
// partitions together requiring UnBoundedCompactionStrategy.
.withCompactionStrategy(new UnBoundedCompactionStrategy())
// Check if log compaction is enabled, this is needed for tables with lot of records.
.withLogCompactionEnabled(writeConfig.isLogCompactionEnabledOnMetadata())
// Below config is only used if isLogCompactionEnabled is set.
.withLogCompactionBlocksThreshold(writeConfig.getMetadataLogCompactBlocksThreshold())
.build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.table.HoodieTable;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;

/**
* Base class helps to perform compact.
Expand Down Expand Up @@ -96,4 +101,20 @@ public void completeInflightLogCompaction(HoodieTable table, String logCompactio
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + logCompactionCommitTime, e);
}
}

public Option<InstantRange> getInstantRange(HoodieTableMetaClient metaClient) {
return HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString())
? Option.of(getMetadataLogReaderInstantRange(metaClient)) : Option.empty();
}

private InstantRange getMetadataLogReaderInstantRange(HoodieTableMetaClient metadataMetaClient) {
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
.setConf(metadataMetaClient.getHadoopConf())
.setBasePath(HoodieTableMetadata.getDatasetBasePath(metadataMetaClient.getBasePathV2().toString()))
.build();
Set<String> validInstants = HoodieTableMetadataUtil.getValidInstantTimestamps(dataMetaClient, metadataMetaClient);
return InstantRange.builder()
.rangeType(InstantRange.RangeType.EXPLICIT_MATCH)
.explicitInstants(validInstants).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
Expand All @@ -60,7 +58,6 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.StreamSupport;

import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -131,8 +128,7 @@ public HoodieData<WriteStatus> compact(
context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName());
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
// if this is a MDT, set up the instant range of log reader just like regular MDT snapshot reader.
Option<InstantRange> instantRange = HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
? Option.of(getMetadataLogReaderInstantRange(metaClient)) : Option.empty();
Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient);
return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper))
.flatMap(List::iterator);
Expand Down Expand Up @@ -197,6 +193,7 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
.withLogFilePaths(logFiles)
.withReaderSchema(readerSchema)
.withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime, maxInstantTime))
.withInstantRange(instantRange)
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
Expand Down Expand Up @@ -256,17 +253,6 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
}).collect(toList());
}

private InstantRange getMetadataLogReaderInstantRange(HoodieTableMetaClient metadataMetaClient) {
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
.setConf(metadataMetaClient.getHadoopConf())
.setBasePath(HoodieTableMetadata.getDatasetBasePath(metadataMetaClient.getBasePath()))
.build();
Set<String> validInstants = HoodieBackedTableMetadata.getValidInstantTimestamps(dataMetaClient, metadataMetaClient);
return InstantRange.builder()
.rangeType(InstantRange.RangeType.EXPLICIT_MATCH)
.explicitInstants(validInstants).build();
}

public String getMaxInstantTime(HoodieTableMetaClient metaClient) {
String maxInstantTime = metaClient
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, schemaPair.getLeft().get());
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaPair.getRight().get());
}
// Setting operationType, which is compact.
metadata.setOperationType(operationType);
compactionMetadata.setWriteStatuses(statuses);
compactionMetadata.setCommitted(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
Expand All @@ -40,6 +41,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

import org.apache.hudi.table.action.compact.CompactHelpers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,7 +84,6 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException {

// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths);

if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no compaction plan
return null;
Expand All @@ -91,13 +92,15 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException {
engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + writeConfig.getTableName());

SyncableFileSystemView fileSystemView = (SyncableFileSystemView) this.hoodieTable.getSliceView();
// Exclude file groups under compaction.
Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
.collect(Collectors.toSet());

// Exclude files in pending clustering from compaction.
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));

// Exclude files in pending logcompaction.
if (filterLogCompactionOperations()) {
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
Expand All @@ -108,10 +111,12 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException {
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
LOG.info("Last completed instant time " + lastCompletedInstantTime);
Option<InstantRange> instantRange = CompactHelpers.getInstance().getInstantRange(metaClient);

List<HoodieCompactionOperation> operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath)
.filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering))
.filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering, instantRange))
.map(s -> {
List<HoodieLogFile> logFiles =
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
Expand Down Expand Up @@ -158,7 +163,8 @@ protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig writeCon
return partitionPaths;
}

protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime, Set<HoodieFileGroupId> pendingFileGroupIds) {
protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime,
Set<HoodieFileGroupId> pendingFileGroupIds, Option<InstantRange> instantRange) {
return fileSlice.getLogFiles().count() > 0 && !pendingFileGroupIds.contains(fileSlice.getFileGroupId());
}

Expand Down
Loading