Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp())
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.CleanFileInfo;
Expand Down Expand Up @@ -64,11 +65,16 @@ private int getCommitsSinceLastCleaning() {
Option<HoodieInstant> lastCleanInstant = table.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant();
HoodieTimeline commitTimeline = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();

String latestCleanTs;
int numCommits = 0;
if (lastCleanInstant.isPresent()) {
latestCleanTs = lastCleanInstant.get().getTimestamp();
numCommits = commitTimeline.findInstantsAfter(latestCleanTs).countInstants();
int numCommits;
if (lastCleanInstant.isPresent() && !table.getActiveTimeline().isEmpty(lastCleanInstant.get())) {
try {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(lastCleanInstant.get()).get());
String lastCompletedCommitTimestamp = cleanMetadata.getLastCompletedCommitTimestamp();
numCommits = commitTimeline.findInstantsAfter(lastCompletedCommitTimestamp).countInstants();
} catch (IOException e) {
throw new HoodieIOException("Parsing of last clean instant " + lastCleanInstant.get() + " failed", e);
}
} else {
numCommits = commitTimeline.countInstants();
}
Expand Down Expand Up @@ -123,6 +129,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {

return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(),
config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,17 @@ public Option<HoodieInstant> getEarliestCommitToRetain() {
return earliestCommitToRetain;
}

/**
* Returns the last completed commit timestamp before clean.
*/
public String getLastCompletedCommitTimestamp() {
if (commitTimeline.lastInstant().isPresent()) {
return commitTimeline.lastInstant().get().getTimestamp();
} else {
return "";
}
}

/**
* Determine if file slice needed to be preserved for pending compaction.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,16 @@ private void createReplace(String instantTime, WriteOperationType writeOperation
}

private void createCleanMetadata(String instantTime) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""),
"", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
HoodieCleanStat cleanStats = new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
instantTime);
instantTime,
"");
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ public void testMultiClean() {
HoodieWriteConfig writeConfig = getConfigBuilder()
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())

.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.allowMultipleCleans(false)
Expand Down Expand Up @@ -455,9 +454,10 @@ private void testInsertAndCleanByVersions(
/**
* Test Clean-By-Commits using insert/upsert API.
*/
@Test
public void testInsertAndCleanByCommits() throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false);
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync);
}

/**
Expand All @@ -473,7 +473,8 @@ public void testFailedInsertAndCleanByCommits() throws Exception {
*/
@Test
public void testInsertPreppedAndCleanByCommits() throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, true);
testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
true, false);
}

/**
Expand All @@ -483,15 +484,15 @@ public void testInsertPreppedAndCleanByCommits() throws Exception {
public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
testInsertAndCleanByCommits(
(client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
SparkRDDWriteClient::upsertPreppedRecords, true);
SparkRDDWriteClient::upsertPreppedRecords, true, false);
}

/**
* Test Clean-By-Commits using bulk-insert/upsert API.
*/
@Test
public void testBulkInsertAndCleanByCommits() throws Exception {
testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false);
testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false);
}

/**
Expand All @@ -505,12 +506,12 @@ public void testBulkInsertAndCleanByCommits() throws Exception {
*/
private void testInsertAndCleanByCommits(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI, boolean isAsync)
throws Exception {
int maxCommits = 3; // keep upto 3 commits from the past
HoodieWriteConfig cfg = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
Expand Down Expand Up @@ -539,6 +540,10 @@ private void testInsertAndCleanByCommits(
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table1 = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
HoodieInstant lastInstant = activeTimeline.lastInstant().get();
if (cfg.isAsyncClean()) {
activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
}
// NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
// commit
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
Expand All @@ -560,6 +565,9 @@ private void testInsertAndCleanByCommits(
LOG.debug("Data File - " + value);
commitTimes.add(value.getCommitTime());
});
if (cfg.isAsyncClean()) {
commitTimes.remove(lastInstant.getTimestamp());
}
assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
"Only contain acceptable versions of file should be present");
}
Expand Down Expand Up @@ -677,7 +685,7 @@ protected List<HoodieCleanStat> runCleaner(
String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath();
p.getSuccessDeleteFiles().forEach(p2 -> {
try {
metaClient.getFs().create(new Path(dirPath, p2), true);
metaClient.getFs().create(new Path(dirPath, p2), true).close();
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
Expand Down Expand Up @@ -941,7 +949,7 @@ public void testCleanMetadataUpgradeDowngrade() {
// create partition1 clean stat.
HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
partition1, deletePathPatterns1, successDeleteFiles1,
failedDeleteFiles1, instantTime);
failedDeleteFiles1, instantTime, "");

List<String> deletePathPatterns2 = new ArrayList<>();
List<String> successDeleteFiles2 = new ArrayList<>();
Expand All @@ -950,7 +958,7 @@ public void testCleanMetadataUpgradeDowngrade() {
// create partition2 empty clean stat.
HoodieCleanStat cleanStat2 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS,
partition2, deletePathPatterns2, successDeleteFiles2,
failedDeleteFiles2, instantTime);
failedDeleteFiles2, instantTime, "");

// map with absolute file path.
Map<String, Tuple3> oldExpected = new HashMap<>();
Expand Down Expand Up @@ -1167,12 +1175,13 @@ public void testCleaningWithZeroPartitionPaths() throws Exception {
/**
* Test Keep Latest Commits when there are pending compactions.
*/
@Test
public void testKeepLatestCommitsWithPendingCompactions() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testKeepLatestCommitsWithPendingCompactions(boolean isAsync) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build())
.build();
// Deletions:
// . FileId Base Logs Total Retained Commits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,8 @@ public HoodieInstant createEmptyCleanMetadata(String instantTime, boolean inflig
}

public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly, boolean isEmptyForAll, boolean isEmptyCompleted) throws IOException {
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", "",
new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
if (inflightOnly) {
HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan);
} else {
Expand All @@ -739,7 +739,8 @@ HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEF
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
instantTime);
instantTime,
"");
HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata, isEmptyForAll, isEmptyCompleted);
}
Expand Down
1 change: 1 addition & 0 deletions hudi-common/src/main/avro/HoodieCleanMetadata.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
{"name": "timeTakenInMillis", "type": "long"},
{"name": "totalFilesDeleted", "type": "int"},
{"name": "earliestCommitToRetain", "type": "string"},
{"name": "lastCompletedCommitTimestamp", "type": "string", "default" : ""},
{"name": "partitionMetadata", "type": {
"type" : "map", "values" : "HoodieCleanPartitionMetadata"
}
Expand Down
5 changes: 5 additions & 0 deletions hudi-common/src/main/avro/HoodieCleanerPlan.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
}],
"default" : null
},
{
"name": "lastCompletedCommitTimestamp",
"type": "string",
"default" : ""
},
{
"name": "policy",
"type": "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,22 @@ public class HoodieCleanStat implements Serializable {
private final List<String> failedDeleteBootstrapBaseFiles;
// Earliest commit that was retained in this clean
private final String earliestCommitToRetain;
// Last completed commit timestamp before clean
private final String lastCompletedCommitTimestamp;
// set to true if partition is deleted
private final boolean isPartitionDeleted;

public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
List<String> successDeleteFiles, List<String> failedDeleteFiles, String earliestCommitToRetain) {
List<String> successDeleteFiles, List<String> failedDeleteFiles, String earliestCommitToRetain,String lastCompletedCommitTimestamp) {
this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain,
CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(),
lastCompletedCommitTimestamp, CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(),
CollectionUtils.createImmutableList(), false);
}

public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<String> deletePathPatterns,
List<String> successDeleteFiles, List<String> failedDeleteFiles,
String earliestCommitToRetain, List<String> deleteBootstrapBasePathPatterns,
String earliestCommitToRetain,String lastCompletedCommitTimestamp,
List<String> deleteBootstrapBasePathPatterns,
List<String> successDeleteBootstrapBaseFiles,
List<String> failedDeleteBootstrapBaseFiles,
boolean isPartitionDeleted) {
Expand All @@ -69,6 +72,7 @@ public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List<S
this.successDeleteFiles = successDeleteFiles;
this.failedDeleteFiles = failedDeleteFiles;
this.earliestCommitToRetain = earliestCommitToRetain;
this.lastCompletedCommitTimestamp = lastCompletedCommitTimestamp;
this.deleteBootstrapBasePathPatterns = deleteBootstrapBasePathPatterns;
this.successDeleteBootstrapBaseFiles = successDeleteBootstrapBaseFiles;
this.failedDeleteBootstrapBaseFiles = failedDeleteBootstrapBaseFiles;
Expand Down Expand Up @@ -111,11 +115,15 @@ public String getEarliestCommitToRetain() {
return earliestCommitToRetain;
}

public String getLastCompletedCommitTimestamp() {
return lastCompletedCommitTimestamp;
}

public boolean isPartitionDeleted() {
return isPartitionDeleted;
}

public static HoodieCleanStat.Builder newBuilder() {
public static Builder newBuilder() {
return new Builder();
}

Expand All @@ -130,6 +138,7 @@ public static class Builder {
private List<String> failedDeleteFiles;
private String partitionPath;
private String earliestCommitToRetain;
private String lastCompletedCommitTimestamp;
private List<String> deleteBootstrapBasePathPatterns;
private List<String> successDeleteBootstrapBaseFiles;
private List<String> failedDeleteBootstrapBaseFiles;
Expand Down Expand Up @@ -181,15 +190,20 @@ public Builder withEarliestCommitRetained(Option<HoodieInstant> earliestCommitTo
return this;
}

public Builder withLastCompletedCommitTimestamp(String lastCompletedCommitTimestamp) {
this.lastCompletedCommitTimestamp = lastCompletedCommitTimestamp;
return this;
}

public Builder isPartitionDeleted(boolean isPartitionDeleted) {
this.isPartitionDeleted = isPartitionDeleted;
return this;
}

public HoodieCleanStat build() {
return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles,
earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles,
failedDeleteBootstrapBaseFiles, isPartitionDeleted);
earliestCommitToRetain, lastCompletedCommitTimestamp, deleteBootstrapBasePathPatterns,
successDeleteBootstrapBaseFiles, failedDeleteBootstrapBaseFiles, isPartitionDeleted);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public HoodieCleanMetadata downgradeFrom(HoodieCleanMetadata input) {

return HoodieCleanMetadata.newBuilder()
.setEarliestCommitToRetain(input.getEarliestCommitToRetain())
.setLastCompletedCommitTimestamp(input.getLastCompletedCommitTimestamp())
.setStartCleanTime(input.getStartCleanTime())
.setTimeTakenInMillis(input.getTimeTakenInMillis())
.setTotalFilesDeleted(input.getTotalFilesDeleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ public Integer getManagedVersion() {
public HoodieCleanMetadata upgradeFrom(HoodieCleanMetadata input) {
ValidationUtils.checkArgument(input.getVersion() == 1,
"Input version is " + input.getVersion() + ". Must be 1");
HoodieCleanMetadata metadata = new HoodieCleanMetadata();
metadata.setEarliestCommitToRetain(input.getEarliestCommitToRetain());
metadata.setTimeTakenInMillis(input.getTimeTakenInMillis());
metadata.setStartCleanTime(input.getStartCleanTime());
metadata.setTotalFilesDeleted(input.getTotalFilesDeleted());
metadata.setVersion(getManagedVersion());

Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = input.getPartitionMetadata()
.entrySet()
Expand All @@ -80,6 +74,7 @@ public HoodieCleanMetadata upgradeFrom(HoodieCleanMetadata input) {

return HoodieCleanMetadata.newBuilder()
.setEarliestCommitToRetain(input.getEarliestCommitToRetain())
.setLastCompletedCommitTimestamp(input.getLastCompletedCommitTimestamp())
.setStartCleanTime(input.getStartCleanTime())
.setTimeTakenInMillis(input.getTimeTakenInMillis())
.setTotalFilesDeleted(input.getTotalFilesDeleted())
Expand Down
Loading