Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
Expand All @@ -33,6 +32,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -108,22 +108,28 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
List<HoodieRecord> preppedRecordList = HoodieList.getList(preppedRecords);

try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime);
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
} else {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually remove the completed instant and proceed.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieInstant alreadyCompletedInstant =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get();
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant);
metadataMetaClient.reloadActiveTimeline();
Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
if (alreadyCompletedInstant.isPresent()) {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually remove the completed instant and proceed.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
metadataMetaClient.reloadActiveTimeline();
}
// If the alreadyCompletedInstant is empty, that means there is a requested or inflight
// instant with the same instant time. This happens for data table clean action which
// reuses the same instant time without rollback first. It is a no-op here as the
// clean plan is the same, so we don't need to delete the requested and inflight instant
// files in the active timeline.
}

List<WriteStatus> statuses = preppedRecordList.size() > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,29 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
compactIfNecessary(writeClient, instantTime);
}

if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime);
} else {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually remove the completed instant and proceed.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieInstant alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get();
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant);
metadataMetaClient.reloadActiveTimeline();
Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
if (alreadyCompletedInstant.isPresent()) {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually remove the completed instant and proceed.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
metadataMetaClient.reloadActiveTimeline();
}
// If the alreadyCompletedInstant is empty, that means there is a requested or inflight
// instant with the same instant time. This happens for data table clean action which
// reuses the same instant time without rollback first. It is a no-op here as the
// clean plan is the same, so we don't need to delete the requested and inflight instant
// files in the active timeline.
}

List<WriteStatus> statuses = writeClient.upsertPreppedRecords(preppedRecordRDD, instantTime).collect();
statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.clean.CleanPlanner;
Expand Down Expand Up @@ -627,27 +628,34 @@ private void testFailedInsertAndCleanByCommits(
* @param config HoodieWriteConfig
*/
protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws IOException {
return runCleaner(config, false, 1, false);
return runCleaner(config, false, false, 1, false);
}

protected List<HoodieCleanStat> runCleanerWithInstantFormat(HoodieWriteConfig config, boolean needInstantInHudiFormat) throws IOException {
return runCleaner(config, false, 1, needInstantInHudiFormat);
return runCleaner(config, false, false, 1, needInstantInHudiFormat);
}

protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
return runCleaner(config, false, firstCommitSequence, needInstantInHudiFormat);
return runCleaner(config, false, false, firstCommitSequence, needInstantInHudiFormat);
}

protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException {
return runCleaner(config, simulateRetryFailure, 1, false);
return runCleaner(config, simulateRetryFailure, false, 1, false);
}

protected List<HoodieCleanStat> runCleaner(
HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure) throws IOException {
return runCleaner(config, simulateRetryFailure, simulateMetadataFailure, 1, false);
}

/**
* Helper to run cleaner and collect Clean Stats.
*
* @param config HoodieWriteConfig
*/
protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure, Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
protected List<HoodieCleanStat> runCleaner(
HoodieWriteConfig config, boolean simulateRetryFailure, boolean simulateMetadataFailure,
Integer firstCommitSequence, boolean needInstantInHudiFormat) throws IOException {
SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(config);
String cleanInstantTs = needInstantInHudiFormat ? makeNewCommitTime(firstCommitSequence, "%014d") : makeNewCommitTime(firstCommitSequence, "%09d");
HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
Expand All @@ -670,6 +678,17 @@ protected List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean sim
});
});
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);

if (config.isMetadataTableEnabled() && simulateMetadataFailure) {
// Simulate the failure of corresponding instant in the metadata table
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder()
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
HoodieInstant deltaCommit = new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, cleanInstantTs);
metadataMetaClient.reloadActiveTimeline().revertToInflight(deltaCommit);
}

// retry clean operation again
writeClient.clean();
final HoodieCleanMetadata retriedCleanMetadata = CleanerUtils.getCleanerMetadata(HoodieTableMetaClient.reload(metaClient), completedCleanInstant);
Expand Down Expand Up @@ -1215,12 +1234,80 @@ public void testCleanPreviousCorruptedCleanFiles() throws IOException {
assertEquals(0, cleanStats.size(), "Must not clean any files");
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testRerunFailedClean(boolean simulateMetadataFailure) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1)
.withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();

HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter);
String p0 = "2020/01/01";
String p1 = "2020/01/02";

// make 1 commit, with 1 file per partition
String file1P0C0 = UUID.randomUUID().toString();
String file1P1C0 = UUID.randomUUID().toString();
testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0);

HoodieCommitMetadata commitMetadata = generateCommitMetadata("00000000000001",
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{
put(p0, CollectionUtils.createImmutableList(file1P0C0));
put(p1, CollectionUtils.createImmutableList(file1P1C0));
}
})
);
metadataWriter.update(commitMetadata, "00000000000001", false);
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));

metaClient = HoodieTableMetaClient.reload(metaClient);

// make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
String file2P0C1 = partitionAndFileId002.get(p0);
Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> replaceMetadata =
generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, file2P0C1);
testTable.addReplaceCommit("00000000000002", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());

// make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
String file3P1C2 = partitionAndFileId003.get(p1);
replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, file1P1C0, file3P1C2);
testTable.addReplaceCommit("00000000000003", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());

// make next replacecommit, with 1 clustering operation. Replace data in p0 again
// notice that clustering generates empty inflight commit files
Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
String file4P0C3 = partitionAndFileId004.get(p0);
replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, file2P0C1, file4P0C3);
testTable.addReplaceCommit("00000000000004", Option.of(replaceMetadata.getKey()), Option.empty(), replaceMetadata.getValue());

// run cleaner with failures
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config, true, simulateMetadataFailure, 5, true);
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
//file1P1C0 still stays because its not replaced until 3 and its the only version available
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
}

/**
* Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective.
*
* @param insertFn Insert API to be tested
* @param insertFn Insert API to be tested
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
* record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
* record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
* @throws Exception in case of errors
*/
private void testInsertAndCleanFailedWritesByVersions(
Expand Down
Loading