diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index ab45d200b3734..3807ee2191b86 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -583,6 +583,13 @@ public void runAnyPendingCompactions() { tableServiceClient.runAnyPendingCompactions(createTable(config, hadoopConf)); } + /** + * Run any pending log compactions. + */ + public void runAnyPendingLogCompactions() { + tableServiceClient.runAnyPendingLogCompactions(createTable(config, hadoopConf)); + } + /** * Create a savepoint based on the latest commit action on the timeline. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index c8814406ce5a8..d615be0926d75 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -61,6 +61,12 @@ public class HoodieCompactionConfig extends HoodieConfig { + "but users are expected to trigger async job for execution. If `hoodie.compact.inline` is set to true, regular writers will do both scheduling and " + "execution inline for compaction"); + public static final ConfigProperty ENABLE_LOG_COMPACTION = ConfigProperty + .key("hoodie.log.compaction.enable") + .defaultValue("false") + .sinceVersion("0.14") + .withDocumentation("By enabling log compaction through this config, log compaction will also get enabled for the metadata table."); + public static final ConfigProperty INLINE_LOG_COMPACT = ConfigProperty .key("hoodie.log.compaction.inline") .defaultValue("false") @@ -432,8 +438,13 @@ public Builder withCompactionLogFileNumThreshold(int logFileNumThreshold) { return this; } - public Builder withLogCompactionBlocksThreshold(String logCompactionBlocksThreshold) { - compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, logCompactionBlocksThreshold); + public Builder withLogCompactionEnabled(boolean enableLogCompaction) { + compactionConfig.setValue(ENABLE_LOG_COMPACTION, Boolean.toString(enableLogCompaction)); + return this; + } + + public Builder withLogCompactionBlocksThreshold(int logCompactionBlocksThreshold) { + compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, String.valueOf(logCompactionBlocksThreshold)); return this; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9a7ee2fbaa737..fd19607f1fdca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1414,6 +1414,10 @@ public boolean populateMetaFields() { * compaction properties. */ + public boolean isLogCompactionEnabled() { + return getBoolean(HoodieCompactionConfig.ENABLE_LOG_COMPACTION); + } + public int getLogCompactionBlocksThreshold() { return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD); } @@ -2335,6 +2339,14 @@ public boolean isMetadataAsyncIndex() { return getBooleanOrDefault(HoodieMetadataConfig.ASYNC_INDEX_ENABLE); } + public int getMetadataLogCompactBlocksThreshold() { + return getInt(HoodieMetadataConfig.LOG_COMPACT_BLOCKS_THRESHOLD); + } + + public boolean isLogCompactionEnabledOnMetadata() { + return getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE); + } + /** * Hoodie Client Lock Configs. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java index 8aadd637f0e52..5c36eb3e8c1dd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java @@ -26,6 +26,8 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.Map; @@ -34,6 +36,7 @@ * Factory class for hoodie merge handle. */ public class HoodieMergeHandleFactory { + private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandleFactory.class); /** * Creates a merge handle for normal write path. */ @@ -47,6 +50,7 @@ public static HoodieMergeHandle create( String fileId, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { + LOG.info("Create update handle for fileId {} and partition path {} at commit {}", fileId, partitionPath, instantTime); if (table.requireSortedRecords()) { if (table.getMetaClient().getTableConfig().isCDCEnabled()) { return new HoodieSortedMergeHandleWithChangeLog<>(writeConfig, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, @@ -79,6 +83,7 @@ public static HoodieMergeHandle create( HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { + LOG.info("Get updateHandle for fileId {} and partitionPath {} at commit {}", fileId, partitionPath, instantTime); if (table.requireSortedRecords()) { return new HoodieSortedMergeHandle<>(writeConfig, instantTime, table, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 3614fcd3de1d4..40f5ff4c0a672 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -987,7 +987,6 @@ public void performTableServices(Option inFlightInstantTimestamp) { if (validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp, latestDeltacommitTime)) { compactIfNecessary(writeClient, latestDeltacommitTime); } - writeClient.archive(); LOG.info("All the table services operations on MDT completed successfully"); } catch (Exception e) { @@ -1008,6 +1007,7 @@ public void performTableServices(Option inFlightInstantTimestamp) { private void runPendingTableServicesOperations(BaseHoodieWriteClient writeClient) { // finish off any pending log compaction or compactions operations if any from previous attempt. writeClient.runAnyPendingCompactions(); + writeClient.runAnyPendingLogCompactions(); } /** @@ -1025,13 +1025,27 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String late // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. final String compactionInstantTime = HoodieTableMetadataUtil.createCompactionTimestamp(latestDeltacommitTime); + // we need to avoid checking compaction w/ same instant again. // let's say we trigger compaction after C5 in MDT and so compaction completes with C4001. but C5 crashed before completing in MDT. // and again w/ C6, we will re-attempt compaction at which point latest delta commit is C4 in MDT. // and so we try compaction w/ instant C4001. So, we can avoid compaction if we already have compaction w/ same instant time. - if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime) - && writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { + if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) { + LOG.info(String.format("Compaction with same %s time is already present in the timeline.", compactionInstantTime)); + } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { + LOG.info("Compaction is scheduled for timestamp " + compactionInstantTime); writeClient.compact(compactionInstantTime); + } else if (metadataWriteConfig.isLogCompactionEnabled()) { + // Schedule and execute log compaction with suffixes based on the same instant time. This ensures that any future + // delta commits synced over will not have an instant time lesser than the last completed instant on the + // metadata table. + final String logCompactionInstantTime = HoodieTableMetadataUtil.createLogCompactionTimestamp(latestDeltacommitTime); + if (metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime)) { + LOG.info(String.format("Log compaction with same %s time is already present in the timeline.", logCompactionInstantTime)); + } else if (writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, Option.empty())) { + LOG.info("Log compaction is scheduled for timestamp " + logCompactionInstantTime); + writeClient.logCompact(logCompactionInstantTime); + } } } @@ -1063,7 +1077,7 @@ protected boolean validateTimelineBeforeSchedulingCompaction(Option inFl // we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table. // Whenever you want to change this logic, please ensure all below scenarios are considered. // a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed - // b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, latest compaction instant time in MDT represents + // b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, the latest compaction instant time in MDT represents // any instants before that is already synced with metadata table. // c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every // instant before c4 is synced with metadata table. @@ -1078,6 +1092,17 @@ protected boolean validateTimelineBeforeSchedulingCompaction(Option inFl return false; } + // Check if there are any pending compaction or log compaction instants in the timeline. + // If pending compact/logcompaction operations are found abort scheduling new compaction/logcompaction operations. + Option pendingLogCompactionInstant = + metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant(); + Option pendingCompactionInstant = + metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); + if (pendingLogCompactionInstant.isPresent() || pendingCompactionInstant.isPresent()) { + LOG.warn(String.format("Not scheduling compaction or logcompaction, since a pending compaction instant %s or logcompaction %s instant is present", + pendingCompactionInstant, pendingLogCompactionInstant)); + return false; + } return true; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index 10d42444049a7..f431283ac7a50 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -111,6 +111,10 @@ public static HoodieWriteConfig createMetadataWriteConfig( // deltacommits having corresponding completed commits. Therefore, we need to compact all fileslices of all // partitions together requiring UnBoundedCompactionStrategy. .withCompactionStrategy(new UnBoundedCompactionStrategy()) + // Check if log compaction is enabled, this is needed for tables with lot of records. + .withLogCompactionEnabled(writeConfig.isLogCompactionEnabledOnMetadata()) + // Below config is only used if isLogCompactionEnabled is set. + .withLogCompactionBlocksThreshold(writeConfig.getMetadataLogCompactBlocksThreshold()) .build()) .withParallelism(parallelism, parallelism) .withDeleteParallelism(parallelism) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java index 800e6a4aceac4..c6fa1f4f2b2e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java @@ -24,16 +24,21 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.table.HoodieTable; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Set; /** * Base class helps to perform compact. @@ -96,4 +101,20 @@ public void completeInflightLogCompaction(HoodieTable table, String logCompactio "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + logCompactionCommitTime, e); } } + + public Option getInstantRange(HoodieTableMetaClient metaClient) { + return HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString()) + ? Option.of(getMetadataLogReaderInstantRange(metaClient)) : Option.empty(); + } + + private InstantRange getMetadataLogReaderInstantRange(HoodieTableMetaClient metadataMetaClient) { + HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() + .setConf(metadataMetaClient.getHadoopConf()) + .setBasePath(HoodieTableMetadata.getDatasetBasePath(metadataMetaClient.getBasePathV2().toString())) + .build(); + Set validInstants = HoodieTableMetadataUtil.getValidInstantTimestamps(dataMetaClient, metadataMetaClient); + return InstantRange.builder() + .rangeType(InstantRange.RangeType.EXPLICIT_MATCH) + .explicitInstants(validInstants).build(); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 0566b39b478ae..906ea6473a4b1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -42,8 +42,6 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.io.IOUtils; -import org.apache.hudi.metadata.HoodieBackedTableMetadata; -import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.table.HoodieCompactionHandler; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; @@ -60,7 +58,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Set; import java.util.stream.StreamSupport; import static java.util.stream.Collectors.toList; @@ -131,8 +128,7 @@ public HoodieData compact( context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName()); TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier(); // if this is a MDT, set up the instant range of log reader just like regular MDT snapshot reader. - Option instantRange = HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) - ? Option.of(getMetadataLogReaderInstantRange(metaClient)) : Option.empty(); + Option instantRange = CompactHelpers.getInstance().getInstantRange(metaClient); return context.parallelize(operations).map(operation -> compact( compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper)) .flatMap(List::iterator); @@ -197,6 +193,7 @@ public List compact(HoodieCompactionHandler compactionHandler, .withLogFilePaths(logFiles) .withReaderSchema(readerSchema) .withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime, maxInstantTime)) + .withInstantRange(instantRange) .withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())) .withMaxMemorySizeInBytes(maxMemoryPerCompaction) .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) @@ -256,17 +253,6 @@ public List compact(HoodieCompactionHandler compactionHandler, }).collect(toList()); } - private InstantRange getMetadataLogReaderInstantRange(HoodieTableMetaClient metadataMetaClient) { - HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() - .setConf(metadataMetaClient.getHadoopConf()) - .setBasePath(HoodieTableMetadata.getDatasetBasePath(metadataMetaClient.getBasePath())) - .build(); - Set validInstants = HoodieBackedTableMetadata.getValidInstantTimestamps(dataMetaClient, metadataMetaClient); - return InstantRange.builder() - .rangeType(InstantRange.RangeType.EXPLICIT_MATCH) - .explicitInstants(validInstants).build(); - } - public String getMaxInstantTime(HoodieTableMetaClient metaClient) { String maxInstantTime = metaClient .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java index 3f86df7e3ada2..5bd1894f26dee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java @@ -108,6 +108,7 @@ public HoodieWriteMetadata> execute() { metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, schemaPair.getLeft().get()); metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaPair.getRight().get()); } + // Setting operationType, which is compact. metadata.setOperationType(operationType); compactionMetadata.setWriteStatuses(statuses); compactionMetadata.setCommitted(false); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java index 55740269a837e..e8d109f7b762e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; @@ -40,6 +41,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.compact.CompactHelpers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +84,6 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException { // filter the partition paths if needed to reduce list status partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths); - if (partitionPaths.isEmpty()) { // In case no partitions could be picked, return no compaction plan return null; @@ -91,6 +92,7 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException { engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + writeConfig.getTableName()); SyncableFileSystemView fileSystemView = (SyncableFileSystemView) this.hoodieTable.getSliceView(); + // Exclude file groups under compaction. Set fgIdsInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) .collect(Collectors.toSet()); @@ -98,6 +100,7 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException { // Exclude files in pending clustering from compaction. fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); + // Exclude files in pending logcompaction. if (filterLogCompactionOperations()) { fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations() .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) @@ -108,10 +111,12 @@ public HoodieCompactionPlan generateCompactionPlan() throws IOException { .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); + LOG.info("Last completed instant time " + lastCompletedInstantTime); + Option instantRange = CompactHelpers.getInstance().getInstantRange(metaClient); List operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) - .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering)) + .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering, instantRange)) .map(s -> { List logFiles = s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList()); @@ -158,7 +163,8 @@ protected List filterPartitionPathsByStrategy(HoodieWriteConfig writeCon return partitionPaths; } - protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime, Set pendingFileGroupIds) { + protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime, + Set pendingFileGroupIds, Option instantRange) { return fileSlice.getLogFiles().count() > 0 && !pendingFileGroupIds.contains(fileSlice.getFileGroupId()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java index 920187e218f51..2b70472658023 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java @@ -28,7 +28,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner; +import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.compact.LogCompactionExecutionHelper; @@ -63,9 +65,10 @@ protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClien } @Override - protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime, Set pendingFileGroupIds) { - return isFileSliceEligibleForLogCompaction(fileSlice, lastCompletedInstantTime) - && super.filterFileSlice(fileSlice, lastCompletedInstantTime, pendingFileGroupIds); + protected boolean filterFileSlice(FileSlice fileSlice, String lastCompletedInstantTime, + Set pendingFileGroupIds, Option instantRange) { + return isFileSliceEligibleForLogCompaction(fileSlice, lastCompletedInstantTime, instantRange) + && super.filterFileSlice(fileSlice, lastCompletedInstantTime, pendingFileGroupIds, instantRange); } @Override @@ -78,7 +81,8 @@ protected boolean filterLogCompactionOperations() { * @param fileSlice File Slice under consideration. * @return Boolean value that determines whether log compaction will be scheduled or not. */ - private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, String maxInstantTime) { + private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, String maxInstantTime, + Option instantRange) { LOG.info("Checking if fileId " + fileSlice.getFileId() + " and partition " + fileSlice.getPartitionPath() + " eligible for log compaction."); HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); @@ -90,13 +94,15 @@ private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice, String .map(file -> file.getPath().toString()) .collect(Collectors.toList())) .withLatestInstantTime(maxInstantTime) + .withInstantRange(instantRange) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withOptimizedLogBlocksScan(true) .withRecordMerger(writeConfig.getRecordMerger()) .build(); scanner.scan(true); int totalBlocks = scanner.getCurrentInstantLogBlocks().size(); - LOG.info("Total blocks seen are " + totalBlocks); + LOG.info("Total blocks seen are " + totalBlocks + ", log blocks threshold is " + + writeConfig.getLogCompactionBlocksThreshold()); // If total blocks in the file slice is > blocks threshold value(default value is 5). // Log compaction can be scheduled. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9378274a12fef..d779eea4cc664 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -186,4 +186,4 @@ public BaseHoodieWriteClient getWriteClient() { } return writeClient; } -} \ No newline at end of file +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java index a52af34429f8a..6079d339317b4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java @@ -31,12 +31,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieLogCompactException; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.HoodieSparkTable; @@ -53,8 +55,13 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; + public class SparkRDDTableServiceClient extends BaseHoodieTableServiceClient> { private static final Logger LOG = LoggerFactory.getLogger(SparkRDDTableServiceClient.class); @@ -86,13 +93,29 @@ protected HoodieWriteMetadata> compact(String compactionIns @Override protected HoodieWriteMetadata> logCompact(String logCompactionInstantTime, boolean shouldComplete) { HoodieSparkTable table = HoodieSparkTable.create(config, context); + + // Check if a commit or compaction instant with a greater timestamp is on the timeline. + // If a instant is found then abort log compaction, since it is no longer needed. + Set actions = CollectionUtils.createSet(COMMIT_ACTION, COMPACTION_ACTION); + Option compactionInstantWithGreaterTimestamp = + Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream() + .filter(hoodieInstant -> actions.contains(hoodieInstant.getAction())) + .filter(hoodieInstant -> HoodieTimeline.compareTimestamps(hoodieInstant.getTimestamp(), + GREATER_THAN, logCompactionInstantTime)) + .findFirst()); + if (compactionInstantWithGreaterTimestamp.isPresent()) { + throw new HoodieLogCompactException(String.format("Cannot log compact since a compaction instant with greater " + + "timestamp exists. Instant details %s", compactionInstantWithGreaterTimestamp.get())); + } + HoodieTimeline pendingLogCompactionTimeline = table.getActiveTimeline().filterPendingLogCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime); if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) { LOG.info("Found Log compaction inflight file. Rolling back the commit and exiting."); table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); table.getMetaClient().reloadActiveTimeline(); - throw new HoodieException("Inflight logcompaction file exists"); + throw new HoodieException("Execution is aborted since it found an Inflight logcompaction," + + "log compaction plans are mutable plans, so reschedule another logcompaction."); } logCompactionTimer = metrics.getLogCompactionCtx(); WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime); @@ -131,7 +154,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant -> - metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION) + metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, COMPACTION_ACTION) ); } LOG.info("Compacted successfully on commit " + compactionCommitTime); @@ -157,8 +180,8 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, } WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionCommitTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - if (compactionTimer != null) { - long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); + if (logCompactionTimer != null) { + long durationInMs = metrics.getDurationInMs(logCompactionTimer.stop()); HoodieActiveTimeline.parseDateFromInstantTimeSafely(logCompactionCommitTime).ifPresent(parsedInstant -> metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.LOG_COMPACTION_ACTION) ); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 3b99535528f51..84e0671f8c776 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -150,7 +150,7 @@ private void commitInternal(String instantTime, Map ign } }); + // TODO: include validation for record_index partition here. LOG.info("Validation time=" + timer.endTimer()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java index 9c1acc19453c9..4073e63bd12ca 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java @@ -63,7 +63,6 @@ import static org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { @@ -143,7 +142,8 @@ public void testCompactionOnMORTable() throws Exception { @Test public void testLogCompactionOnMORTable() throws Exception { HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder() - .withLogCompactionBlocksThreshold("1") + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withLogCompactionBlocksThreshold(1) .build(); HoodieWriteConfig config = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build(); @@ -200,8 +200,9 @@ public void testLogCompactionOnMORTable() throws Exception { @Test public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception { HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder() - .withLogCompactionBlocksThreshold("1") .withEnableOptimizedLogBlocksScan("true") + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withLogCompactionBlocksThreshold(1) .build(); HoodieWriteConfig config = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build(); @@ -244,7 +245,7 @@ public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception { public void testSchedulingLogCompactionAfterSchedulingCompaction() throws Exception { HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1) - .withLogCompactionBlocksThreshold("1") + .withLogCompactionBlocksThreshold(1) .build(); HoodieWriteConfig config = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build(); @@ -279,7 +280,7 @@ public void testSchedulingLogCompactionAfterSchedulingCompaction() throws Except public void testSchedulingCompactionAfterSchedulingLogCompaction() throws Exception { HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1) - .withLogCompactionBlocksThreshold("1") + .withLogCompactionBlocksThreshold(1) .build(); HoodieWriteConfig config = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.INMEMORY).withAutoCommit(true) @@ -294,11 +295,10 @@ public void testSchedulingCompactionAfterSchedulingLogCompaction() throws Except .build()) .build(); SparkRDDWriteClient client = getHoodieWriteClient(config); - // First insert String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); insertBatch(config, client, newCommitTime, "000", 100, - SparkRDDWriteClient::insert, false, false, 100, 100, + SparkRDDWriteClient::insert, false, false, 10, 100, 1, Option.empty()); String prevCommitTime = newCommitTime; @@ -306,17 +306,15 @@ public void testSchedulingCompactionAfterSchedulingLogCompaction() throws Except newCommitTime = HoodieActiveTimeline.createNewInstantTime(); updateBatch(config, client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(prevCommitTime)), "000", 50, SparkRDDWriteClient::upsert, - false, false, 50, 100, 2, config.populateMetaFields()); + false, false, 50, 10, 2, config.populateMetaFields()); // Schedule log compaction Option logCompactionTimeStamp = client.scheduleLogCompaction(Option.empty()); assertTrue(logCompactionTimeStamp.isPresent()); - // Try scheduling compaction, it won't succeed + // Even if pending logcompaction plans are in the timeline, compaction plan can be created. Option compactionTimeStamp = client.scheduleCompaction(Option.empty()); assertTrue(compactionTimeStamp.isPresent()); - client.compact(compactionTimeStamp.get()); - assertThrows(Exception.class, () -> client.logCompact(logCompactionTimeStamp.get())); } @Test @@ -376,7 +374,8 @@ public void testCleanFunctionalityWhenCompactionRequestedInstantIsPresent() thro @Test public void testRollbackOnLogCompaction() throws Exception { HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder() - .withLogCompactionBlocksThreshold("1") + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withLogCompactionBlocksThreshold(1) .build(); HoodieWriteConfig lcConfig = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.INMEMORY) .withAutoCommit(false).withCompactionConfig(compactionConfig).build(); @@ -475,7 +474,8 @@ private void validateBlockInstantsBeforeAndAfterRollback(HoodieWriteConfig confi @Test public void testArchivalOnLogCompaction() throws Exception { HoodieCompactionConfig logCompactionConfig = HoodieCompactionConfig.newBuilder() - .withLogCompactionBlocksThreshold("2") + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withLogCompactionBlocksThreshold(1) .build(); HoodieWriteConfig lcWriteConfig = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(logCompactionConfig).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 184c7880c9e34..43dea6d3b8301 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -313,8 +313,8 @@ public void testLogFileCountsAfterCompaction() throws Exception { public void testLogBlocksCountsAfterLogCompaction(boolean populateMetaFields) throws Exception { HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder() - .withInlineCompaction(false) - .withLogCompactionBlocksThreshold("1") + .withMaxNumDeltaCommitsBeforeCompaction(1) + .withLogCompactionBlocksThreshold(1) .build(); // insert 100 recordsx HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 6f9615578fa3d..4f1f4afaf74f0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -94,6 +94,18 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("0.7.0") .withDocumentation("Controls how often the metadata table is compacted."); + public static final ConfigProperty ENABLE_LOG_COMPACTION_ON_METADATA_TABLE = ConfigProperty + .key(METADATA_PREFIX + ".log.compaction.enable") + .defaultValue("false") + .sinceVersion("0.14") + .withDocumentation("This configs enables logcompaction for the metadata table."); + + // Log blocks threshold, after a file slice crosses this threshold log compact operation is scheduled. + public static final ConfigProperty LOG_COMPACT_BLOCKS_THRESHOLD = ConfigProperty + .key(METADATA_PREFIX + ".log.compaction.blocks.threshold") + .defaultValue(5) + .withDocumentation("Controls the criteria to log compacted files groups in metadata table."); + // Regex to filter out matching directories during bootstrap public static final ConfigProperty DIR_FILTER_REGEX = ConfigProperty .key(METADATA_PREFIX + ".dir.filter.regex") @@ -408,6 +420,16 @@ public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBefo return this; } + public Builder withLogCompactionEnabled(boolean enableLogCompaction) { + metadataConfig.setValue(ENABLE_LOG_COMPACTION_ON_METADATA_TABLE, Boolean.toString(enableLogCompaction)); + return this; + } + + public Builder withLogCompactBlocksThreshold(int logCompactBlocksThreshold) { + metadataConfig.setValue(LOG_COMPACT_BLOCKS_THRESHOLD, Integer.toString(logCompactBlocksThreshold)); + return this; + } + public Builder withFileListingParallelism(int parallelism) { metadataConfig.setValue(FILE_LISTING_PARALLELISM_VALUE, String.valueOf(parallelism)); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index bf220ca784719..ef4eec1ba0783 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -332,7 +332,6 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder { private String keyFieldOverride; // By default, we're doing a full-scan private boolean forceFullScan = true; - // Use scanV2 method. private boolean enableOptimizedLogBlocksScan = false; private HoodieRecordMerger recordMerger; diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieLogCompactException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieLogCompactException.java new file mode 100644 index 0000000000000..888327bad5585 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieLogCompactException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class HoodieLogCompactException extends HoodieException { + + public HoodieLogCompactException(String msg) { + super(msg); + } + + public HoodieLogCompactException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 2639cd350d4fd..7ca9d6573e897 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -20,9 +20,6 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; -import org.apache.hudi.avro.model.HoodieRestoreMetadata; -import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; @@ -37,10 +34,7 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.HoodieTimer; @@ -50,7 +44,6 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieSeekingFileReader; @@ -67,7 +60,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -83,7 +75,6 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_FILES; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getFileSystemView; -import static org.apache.hudi.metadata.HoodieTableMetadataUtil.isIndexingCommit; /** * Table metadata provided by an internal DFS backed Hudi metadata table. @@ -469,37 +460,6 @@ private Pair, Long> getBaseFileReader(FileSlice slice return Pair.of(baseFileReader, baseFileOpenMs); } - public static Set getValidInstantTimestamps(HoodieTableMetaClient dataMetaClient, HoodieTableMetaClient metadataMetaClient) { - // Only those log files which have a corresponding completed instant on the dataset should be read - // This is because the metadata table is updated before the dataset instants are committed. - HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline(); - Set validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstantsAsStream() - .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); - - // We should also add completed indexing delta commits in the metadata table, as they do not - // have corresponding completed instant in the data table - validInstantTimestamps.addAll( - metadataMetaClient.getActiveTimeline() - .filter(instant -> instant.isCompleted() && isIndexingCommit(instant.getTimestamp())) - .getInstants().stream() - .map(HoodieInstant::getTimestamp) - .collect(Collectors.toList())); - - // For any rollbacks and restores, we cannot neglect the instants that they are rolling back. - // The rollback instant should be more recent than the start of the timeline for it to have rolled back any - // instant which we have a log block for. - final String earliestInstantTime = validInstantTimestamps.isEmpty() ? SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps); - datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream() - .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, earliestInstantTime)) - .forEach(instant -> { - validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline)); - }); - - // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid timestamp - validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP); - return validInstantTimestamps; - } - public Pair getLogRecordScanner(List logFiles, String partitionName, Option allowFullScanOverride) { @@ -511,7 +471,8 @@ public Pair getLogRecordScanner(List validInstantTimestamps = getValidInstantTimestamps(dataMetaClient, metadataMetaClient); + Set validInstantTimestamps = HoodieTableMetadataUtil + .getValidInstantTimestamps(dataMetaClient, metadataMetaClient); Option latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); @@ -559,46 +520,6 @@ private boolean isFullScanAllowedForPartition(String partitionName) { } } - /** - * Returns a list of commits which were rolled back as part of a Rollback or Restore operation. - * - * @param instant The Rollback operation to read - * @param timeline instant of timeline from dataset. - */ - private static List getRollbackedCommits(HoodieInstant instant, HoodieActiveTimeline timeline) { - try { - List commitsToRollback = null; - if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) { - try { - HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( - timeline.getInstantDetails(instant).get()); - commitsToRollback = rollbackMetadata.getCommitsRollback(); - } catch (IOException e) { - // if file is empty, fetch the commits to rollback from rollback.requested file - HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata( - timeline.readRollbackInfoAsBytes(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, - instant.getTimestamp())).get(), HoodieRollbackPlan.class); - commitsToRollback = Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime()); - LOG.warn("Had to fetch rollback info from requested instant since completed file is empty " + instant.toString()); - } - return commitsToRollback; - } - - List rollbackedCommits = new LinkedList<>(); - if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) { - // Restore is made up of several rollbacks - HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( - timeline.getInstantDetails(instant).get()); - restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { - rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback())); - }); - } - return rollbackedCommits; - } catch (IOException e) { - throw new HoodieMetadataException("Error retrieving rollback commits for instant " + instant, e); - } - } - @Override public void close() { closePartitionReaders(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 0308c79c9d331..45ef7b5ecc9bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -30,6 +30,7 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -49,6 +50,7 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; @@ -96,6 +98,7 @@ import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper; import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; +import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; /** * A utility to convert timeline information to metadata table records. @@ -111,6 +114,7 @@ public class HoodieTableMetadataUtil { // Suffix to use for compaction private static final String COMPACTION_TIMESTAMP_SUFFIX = "001"; + // Suffix to use for clean private static final String CLEAN_TIMESTAMP_SUFFIX = "002"; @@ -119,6 +123,9 @@ public class HoodieTableMetadataUtil { // to avoid collision. public static final String METADATA_INDEXER_TIME_SUFFIX = "004"; + // Suffix to use for log compaction + private static final String LOG_COMPACTION_TIMESTAMP_SUFFIX = "005"; + // This suffix and all after that are used for initialization of the various partitions. The unused suffixes lower than this value // are reserved for future operations on the MDT. public static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // corresponds to "010"; @@ -1325,6 +1332,39 @@ public static Set getInflightAndCompletedMetadataPartitions(HoodieTableC return inflightAndCompletedPartitions; } + public static Set getValidInstantTimestamps(HoodieTableMetaClient dataMetaClient, + HoodieTableMetaClient metadataMetaClient) { + // Only those log files which have a corresponding completed instant on the dataset should be read + // This is because the metadata table is updated before the dataset instants are committed. + HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline(); + Set validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstantsAsStream() + .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); + + // We should also add completed indexing delta commits in the metadata table, as they do not + // have corresponding completed instant in the data table + validInstantTimestamps.addAll( + metadataMetaClient.getActiveTimeline() + .filter(instant -> instant.isCompleted() + && (isIndexingCommit(instant.getTimestamp()) || isLogCompactionInstant(instant))) + .getInstantsAsStream() + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList())); + + // For any rollbacks and restores, we cannot neglect the instants that they are rolling back. + // The rollback instant should be more recent than the start of the timeline for it to have rolled back any + // instant which we have a log block for. + final String earliestInstantTime = validInstantTimestamps.isEmpty() ? SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps); + datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream() + .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, earliestInstantTime)) + .forEach(instant -> { + validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline)); + }); + + // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid timestamp + validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, PARTITION_INITIALIZATION_TIME_SUFFIX)); + return validInstantTimestamps; + } + /** * Checks if a delta commit in metadata table is written by async indexer. *

@@ -1339,6 +1379,58 @@ public static boolean isIndexingCommit(String instantTime) { && instantTime.endsWith(METADATA_INDEXER_TIME_SUFFIX); } + /** + * This method returns true if the instant provided belongs to Log compaction instant. + * For metadata table, log compaction instant are created with Suffix "004" provided in LOG_COMPACTION_TIMESTAMP_SUFFIX. + * @param instant Hoodie completed instant. + * @return true for logcompaction instants flase otherwise. + */ + public static boolean isLogCompactionInstant(HoodieInstant instant) { + return instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION) + && instant.getTimestamp().length() == MILLIS_INSTANT_ID_LENGTH + LOG_COMPACTION_TIMESTAMP_SUFFIX.length() + && instant.getTimestamp().endsWith(LOG_COMPACTION_TIMESTAMP_SUFFIX); + } + + /** + * Returns a list of commits which were rolled back as part of a Rollback or Restore operation. + * + * @param instant The Rollback operation to read + * @param timeline instant of timeline from dataset. + */ + private static List getRollbackedCommits(HoodieInstant instant, HoodieActiveTimeline timeline) { + try { + List commitsToRollback; + if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) { + try { + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( + timeline.getInstantDetails(instant).get()); + commitsToRollback = rollbackMetadata.getCommitsRollback(); + } catch (IOException e) { + // if file is empty, fetch the commits to rollback from rollback.requested file + HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata( + timeline.readRollbackInfoAsBytes(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, + instant.getTimestamp())).get(), HoodieRollbackPlan.class); + commitsToRollback = Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime()); + LOG.warn("Had to fetch rollback info from requested instant since completed file is empty " + instant.toString()); + } + return commitsToRollback; + } + + List rollbackedCommits = new LinkedList<>(); + if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) { + // Restore is made up of several rollbacks + HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( + timeline.getInstantDetails(instant).get()); + restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { + rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback())); + }); + } + return rollbackedCommits; + } catch (IOException e) { + throw new HoodieMetadataException("Error retrieving rollback commits for instant " + instant, e); + } + } + /** * Delete the metadata table for the dataset and backup if required. * @@ -1521,4 +1613,11 @@ public static String createCompactionTimestamp(String timestamp) { public static String createIndexInitTimestamp(String timestamp, int offset) { return String.format("%s%03d", timestamp, PARTITION_INITIALIZATION_TIME_SUFFIX + offset); } + + /** + * Create the timestamp for a compaction operation on the metadata table. + */ + public static String createLogCompactionTimestamp(String timestamp) { + return timestamp + LOG_COMPACTION_TIMESTAMP_SUFFIX; + } }