Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public String showLogFileRecords(
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.build();

Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
protected transient Timer.Context writeTimer = null;
protected transient Timer.Context compactionTimer;
protected transient Timer.Context clusteringTimer;
protected transient Timer.Context logCompactionTimer;

protected transient AsyncCleanerService asyncCleanerService;
protected transient AsyncArchiveService asyncArchiveService;
Expand Down Expand Up @@ -366,7 +367,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
protected void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
Option<String> instant = Option.fromJavaOptional(
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
Expand Down Expand Up @@ -562,6 +563,15 @@ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata me
inlineScheduleCompaction(extraMetadata);
}

// Do an inline log compaction if enabled
if (config.inlineLogCompactionEnabled()) {
runAnyPendingLogCompactions(table);
metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
inlineLogCompact(extraMetadata);
} else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
}

// Do an inline clustering if enabled
if (config.inlineClusteringEnabled()) {
runAnyPendingClustering(table);
Expand Down Expand Up @@ -589,6 +599,14 @@ protected void runAnyPendingCompactions(HoodieTable table) {
});
}

protected void runAnyPendingLogCompactions(HoodieTable table) {
table.getActiveTimeline().getWriteTimeline().filterPendingLogCompactionTimeline().getInstants()
.forEach(instant -> {
LOG.info("Running previously failed inflight log compaction at instant " + instant);
logCompact(instant.getTimestamp(), true);
});
}

protected void runAnyPendingClustering(HoodieTable table) {
table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
Expand Down Expand Up @@ -1077,13 +1095,60 @@ public abstract void commitCompaction(String compactionInstantTime, HoodieCommit
*/
protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);

/**
* Schedules a new log compaction instant.
* @param extraMetadata Extra Metadata to be stored
*/
public Option<String> scheduleLogCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
return scheduleLogCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
}

/**
* Schedules a new log compaction instant with passed-in instant time.
* @param instantTime Log Compaction Instant Time
* @param extraMetadata Extra Metadata to be stored
*/
public boolean scheduleLogCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
return scheduleTableService(instantTime, extraMetadata, TableServiceType.LOG_COMPACT).isPresent();
}

/**
* Performs Log Compaction for the workload stored in instant-time.
*
* @param logCompactionInstantTime Log Compaction Instant Time
* @return Collection of WriteStatus to inspect errors and counts
*/
public HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime) {
return logCompact(logCompactionInstantTime, config.shouldAutoCommit());
}

/**
* Commit a log compaction operation. Allow passing additional meta-data to be stored in commit instant file.
*
* @param logCompactionInstantTime Log Compaction Instant Time
* @param metadata All the metadata that gets stored along with a commit
* @param extraMetadata Extra Metadata to be stored
*/
public void commitLogCompaction(String logCompactionInstantTime, HoodieCommitMetadata metadata,
Option<Map<String, String>> extraMetadata) {
throw new UnsupportedOperationException("Log compaction is not supported yet.");
}

/**
* Commit Log Compaction and track metrics.
*/
protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable<T, I, K, O> table, String logCompactionCommitTime) {
throw new UnsupportedOperationException("Log compaction is not supported yet.");
}

/**
* Get inflight time line exclude compaction and clustering.
* @param metaClient
* @return
*/
private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {
HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingCompaction();
HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> {
if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant);
Expand Down Expand Up @@ -1139,7 +1204,7 @@ protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos
try {
String action = rollbackPlan.getInstantToRollback().getAction();
if (ignoreCompactionAndClusteringInstants) {
if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
if (!HoodieTimeline.COMPACTION_ACTION.equals(action) && !HoodieTimeline.LOG_COMPACTION_ACTION.equals(action)) {
boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action)
&& ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(),
rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
Expand Down Expand Up @@ -1253,6 +1318,28 @@ protected Option<String> inlineScheduleCompaction(Option<Map<String, String>> ex
return scheduleCompaction(extraMetadata);
}

/**
* Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
*
* @param compactionInstantTime Compaction Instant Time
* @return Collection of Write Status
*/
protected HoodieWriteMetadata<O> logCompact(String compactionInstantTime, boolean shouldComplete) {
throw new UnsupportedOperationException("Log compaction is not supported yet.");
}

/**
* Performs a log compaction operation on a table, serially before or after an insert/upsert action.
*/
protected Option<String> inlineLogCompact(Option<Map<String, String>> extraMetadata) {
Option<String> logCompactionInstantTimeOpt = scheduleLogCompaction(extraMetadata);
logCompactionInstantTimeOpt.ifPresent(logCompactInstantTime -> {
// inline log compaction should auto commit as the user is never given control
logCompact(logCompactInstantTime, true);
});
return logCompactionInstantTimeOpt;
}

/**
* Schedules a new clustering instant.
* @param extraMetadata Extra Metadata to be stored
Expand Down Expand Up @@ -1348,6 +1435,11 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
.scheduleCompaction(context, instantTime, extraMetadata);
return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case LOG_COMPACT:
LOG.info("Scheduling log compaction at instant time :" + instantTime);
Option<HoodieCompactionPlan> logCompactionPlan = createTable(config, hadoopConf)
.scheduleLogCompaction(context, instantTime, extraMetadata);
return logCompactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case CLEAN:
LOG.info("Scheduling cleaning at instant time :" + instantTime);
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.common.util.CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord;

Expand Down Expand Up @@ -149,6 +150,7 @@ private void init(HoodieInstant instant) {
switch (getInstantActionType()) {
case COMMIT_ACTION:
case DELTA_COMMIT_ACTION:
case LOG_COMPACTION_ACTION:
this.mutatedFileIds = CommitUtils.getFileIdWithoutSuffixAndRelativePaths(this.metadataWrapper.getCommitMetadata().getPartitionToWriteStats()).keySet();
this.operationType = this.metadataWrapper.getCommitMetadata().getOperationType();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,14 @@ public Option<HoodieCommitMetadata> resolveConflict(HoodieTable table,
// to consider it as conflict if we see overlapping file ids. Once concurrent updates are
// supported for CLUSTER (https://issues.apache.org/jira/browse/HUDI-1042),
// add that to the below check so that concurrent updates do not conflict.
if (otherOperation.getOperationType() == WriteOperationType.COMPACT
&& HoodieTimeline.compareTimestamps(otherOperation.getInstantTimestamp(), HoodieTimeline.LESSER_THAN, thisOperation.getInstantTimestamp())) {
if (otherOperation.getOperationType() == WriteOperationType.COMPACT) {
if (HoodieTimeline.compareTimestamps(otherOperation.getInstantTimestamp(), HoodieTimeline.LESSER_THAN, thisOperation.getInstantTimestamp())) {
return thisOperation.getCommitMetadataOption();
}
} else if (HoodieTimeline.LOG_COMPACTION_ACTION.equals(thisOperation.getInstantActionType())) {
// Since log compaction is a rewrite operation, it can be committed along with other delta commits.
// The ordering of the commits is taken care by AbstractHoodieLogRecordReader scan method.
// Conflict arises only if the log compaction commit has a lesser timestamp compared to compaction commit.
return thisOperation.getCommitMetadataOption();
}
// just abort the current write if conflicts are found
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst
archivedMetaWrapper.setActionType(ActionType.compaction.name());
break;
}
case HoodieTimeline.LOG_COMPACTION_ACTION: {
HoodieCompactionPlan plan = CompactionUtils.getLogCompactionPlan(metaClient, hoodieInstant.getTimestamp());
archivedMetaWrapper.setHoodieCompactionPlan(plan);
archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
break;
}
default: {
throw new UnsupportedOperationException("Action not fully supported yet");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "but users are expected to trigger async job for execution. If `hoodie.compact.inline` is set to true, regular writers will do both scheduling and "
+ "execution inline for compaction");

public static final ConfigProperty<String> INLINE_LOG_COMPACT = ConfigProperty
.key("hoodie.log.compaction.inline")
.defaultValue("false")
.withDocumentation("When set to true, logcompaction service is triggered after each write. While being "
+ " simpler operationally, this adds extra latency on the write path.");

public static final ConfigProperty<String> INLINE_COMPACT_NUM_DELTA_COMMITS = ConfigProperty
.key("hoodie.compact.inline.max.delta.commits")
.defaultValue("5")
Expand Down Expand Up @@ -173,6 +179,18 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "record size estimate compute dynamically based on commit metadata. "
+ " This is critical in computing the insert parallelism and bin-packing inserts into small files.");

public static final ConfigProperty<String> LOG_COMPACTION_BLOCKS_THRESHOLD = ConfigProperty
.key("hoodie.log.compaction.blocks.threshold")
.defaultValue("5")
.withDocumentation("Log compaction can be scheduled if the no. of log blocks crosses this threshold value. "
+ "This is effective only when log compaction is enabled via " + INLINE_LOG_COMPACT.key());

public static final ConfigProperty<String> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty
.key("hoodie.log.record.reader.use.scanV2")
.defaultValue("false")
.sinceVersion("0.13.0")
.withDocumentation("ScanV2 logic address all the multiwriter challenges while appending to log files. "
+ "It also differentiates original blocks written by ingestion writers and compacted blocks written log compaction.");

/** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */
@Deprecated
Expand Down Expand Up @@ -321,6 +339,11 @@ public Builder withScheduleInlineCompaction(Boolean scheduleAsyncCompaction) {
return this;
}

public Builder withInlineLogCompaction(Boolean inlineLogCompaction) {
compactionConfig.setValue(INLINE_LOG_COMPACT, String.valueOf(inlineLogCompaction));
return this;
}

public Builder withInlineCompactionTriggerStrategy(CompactionTriggerStrategy compactionTriggerStrategy) {
compactionConfig.setValue(INLINE_COMPACT_TRIGGER_STRATEGY, compactionTriggerStrategy.name());
return this;
Expand Down Expand Up @@ -401,6 +424,16 @@ public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
return this;
}

public Builder withLogCompactionBlocksThreshold(String logCompactionBlocksThreshold) {
compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, logCompactionBlocksThreshold);
return this;
}

public Builder withLogRecordReaderScanV2(String useLogRecordReaderScanV2) {
compactionConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, useLogRecordReaderScanV2);
return this;
}

public HoodieCompactionConfig build() {
compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
return compactionConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,15 @@ public boolean populateMetaFields() {
/**
* compaction properties.
*/

public int getLogCompactionBlocksThreshold() {
return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD);
}

public boolean useScanV2ForLogRecordReader() {
return getBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2);
}

public HoodieCleaningPolicy getCleanerPolicy() {
return HoodieCleaningPolicy.valueOf(getString(CLEANER_POLICY));
}
Expand Down Expand Up @@ -1252,6 +1261,10 @@ public boolean scheduleInlineCompaction() {
return getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
}

public boolean inlineLogCompactionEnabled() {
return getBoolean(HoodieCompactionConfig.INLINE_LOG_COMPACT);
}

public CompactionTriggerStrategy getInlineCompactTriggerStrategy() {
return CompactionTriggerStrategy.valueOf(getString(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY));
}
Expand Down Expand Up @@ -2095,7 +2108,7 @@ public Boolean doSkipDefaultPartitionValidation() {
*/
public Boolean areAnyTableServicesExecutedInline() {
return areTableServicesEnabled()
&& (inlineClusteringEnabled() || inlineCompactionEnabled()
&& (inlineClusteringEnabled() || inlineCompactionEnabled() || inlineLogCompactionEnabled()
|| (isAutoClean() && !isAsyncClean()) || (isAutoArchive() && !isAsyncArchive()));
}

Expand Down
Loading