Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -767,21 +767,28 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
if (!tableServicesEnabled(config)) {
return null;
}
if (scheduleInline) {
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
}
LOG.info("Cleaner started");
final Timer.Context timerContext = metrics.getCleanCtx();
LOG.info("Cleaned failed attempts if any");
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);

HoodieCleanMetadata metadata = null;
HoodieTable table = createTable(config, hadoopConf);
if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
LOG.info("Cleaner started");
// proceed only if multiple clean schedules are enabled or if there are no pending cleans.
if (scheduleInline) {
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
table.getMetaClient().reloadActiveTimeline();
}

metadata = table.clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);
}
}
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("The average record size. If not explicitly specified, hudi will compute the "
+ "record size estimate compute dynamically based on commit metadata. "
+ " This is critical in computing the insert parallelism and bin-packing inserts into small files.");

public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
.key("hoodie.clean.allow.multiple")
.defaultValue(true)
.sinceVersion("0.11.0")
.withDocumentation("Allows scheduling/executing multiple cleans by enabling this config. If users prefer to strictly ensure clean requests should be mutually exclusive, "
+ ".i.e. a 2nd clean will not be scheduled if another clean is not yet completed to avoid repeat cleaning of same files, they might want to disable this config.");

public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
.key("hoodie.archive.merge.files.batch.size")
Expand Down Expand Up @@ -630,6 +637,11 @@ public Builder approxRecordSize(int recordSizeEstimate) {
return this;
}

public Builder allowMultipleCleans(boolean allowMultipleCleanSchedules) {
compactionConfig.setValue(ALLOW_MULTIPLE_CLEANS, String.valueOf(allowMultipleCleanSchedules));
return this;
}

public Builder withCleanerParallelism(int cleanerParallelism) {
compactionConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,10 @@ public int getCopyOnWriteRecordSizeEstimate() {
return getInt(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
}

public boolean allowMultipleCleans() {
return getBoolean(HoodieCompactionConfig.ALLOW_MULTIPLE_CLEANS);
}

public boolean shouldAutoTuneInsertSplits() {
return getBoolean(HoodieCompactionConfig.COPY_ON_WRITE_AUTO_SPLIT_INSERTS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String ac
* Writes clean metadata to table metadata.
* @param metadata clean metadata of interest.
*/
protected final void writeTableMetadata(HoodieCleanMetadata metadata) {
protected final void writeTableMetadata(HoodieCleanMetadata metadata, String instantTime) {
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(metadata, instantTime));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
}
writeTableMetadata(metadata);
writeTableMetadata(metadata, inflightInstant.getTimestamp());
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(metadata));
LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
Expand Down Expand Up @@ -240,9 +240,13 @@ public HoodieCleanMetadata execute() {
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
}
}
table.getMetaClient().reloadActiveTimeline();
if (config.isMetadataTableEnabled()) {
table.getHoodieView().sync();
}
});
table.getMetaClient().reloadActiveTimeline();
}

// return the last clean metadata for now
// TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
// This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -254,6 +255,73 @@ public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
SparkRDDWriteClient::upsertPreppedRecords, true);
}


/**
* Tests no more than 1 clean is scheduled/executed if HoodieCompactionConfig.allowMultipleCleanSchedule config is disabled.
*/
@Test
public void testMultiClean() {
HoodieWriteConfig writeConfig = getConfigBuilder()
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.allowMultipleCleans(false)
.withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
.withEmbeddedTimelineServerEnabled(false).build();

int index = 0;
String cleanInstantTime;
final String partition = "2015/03/16";
try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) {
// Three writes so we can initiate a clean
for (; index < 3; ++index) {
String newCommitTime = "00" + index;
List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
}
}

// mimic failed/leftover clean by scheduling a clean but not performing it
cleanInstantTime = "00" + index++;
HoodieTable table = HoodieSparkTable.create(writeConfig, context);
Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context, cleanInstantTime, Option.empty());
assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition).size(), 1);
assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().countInstants(), 1);

try (SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig)) {
// Next commit. This is required so that there is an additional file version to clean.
String newCommitTime = "00" + index++;
List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 1, partition);
client.startCommitWithTime(newCommitTime);
client.insert(jsc.parallelize(records, 1), newCommitTime).collect();

// Initiate another clean. The previous leftover clean will be attempted first, followed by another clean
// due to the commit above.
String newCleanInstantTime = "00" + index++;
HoodieCleanMetadata cleanMetadata = client.clean(newCleanInstantTime);
// subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
assertNull(cleanMetadata);

// let the old clean complete
table = HoodieSparkTable.create(writeConfig, context);
cleanMetadata = table.clean(context, cleanInstantTime, false);
assertNotNull(cleanMetadata);

// any new clean should go ahead
cleanMetadata = client.clean(newCleanInstantTime);
// subsequent clean should not be triggered since allowMultipleCleanSchedules is set to false
assertNotNull(cleanMetadata);

// 1 file cleaned
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);
}
}

/**
* Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataV2MigrationHandler;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,6 +46,9 @@
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;

public class CleanerUtils {

private static final Logger LOG = LogManager.getLogger(CleanerUtils.class);

public static final Integer CLEAN_METADATA_VERSION_1 = CleanMetadataV1MigrationHandler.VERSION;
public static final Integer CLEAN_METADATA_VERSION_2 = CleanMetadataV2MigrationHandler.VERSION;
public static final Integer LATEST_CLEAN_METADATA_VERSION = CLEAN_METADATA_VERSION_2;
Expand Down Expand Up @@ -131,6 +137,7 @@ public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy cleanin
// No need to do any special cleanup for failed operations during clean
return;
} else if (cleaningPolicy.isLazy()) {
LOG.info("Cleaned failed attempts if any");
// Perform rollback of failed operations for all types of actions during clean
rollbackFailedWritesFunc.apply();
return;
Expand All @@ -140,6 +147,7 @@ public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy cleanin
case COMMIT_ACTION:
// For any other actions, perform rollback of failed writes
if (cleaningPolicy.isEager()) {
LOG.info("Cleaned failed attempts if any");
rollbackFailedWritesFunc.apply();
return;
}
Expand Down