Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
Expand All @@ -39,12 +38,9 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -61,13 +57,11 @@
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -100,7 +94,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand All @@ -110,7 +103,6 @@
import scala.Tuple3;

import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
Expand Down Expand Up @@ -211,14 +203,6 @@ public static Pair<String, JavaRDD<WriteStatus>> insertFirstFailedBigBatchForCli
return Pair.of(newCommitTime, statuses);
}

/**
* Test Clean-By-Versions using insert/upsert API.
*/
@Test
public void testInsertAndCleanByVersions() throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this moved to TestCleanerInsertAndCleanByVersions

testInsertAndCleanByVersions(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false);
}

/**
* Test Clean-Failed-Writes when Cleaning policy is by VERSIONS using insert/upsert API.
*/
Expand All @@ -228,32 +212,63 @@ public void testInsertAndCleanFailedWritesByVersions() throws Exception {
}

/**
* Test Clean-By-Versions using prepped versions of insert/upsert API.
* Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective.
*
* @param insertFn Insert API to be tested
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
* record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
* @throws Exception in case of errors
*/
@Test
public void testInsertPreppedAndCleanByVersions() throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this moved to TestCleanerInsertAndCleanByVersions

testInsertAndCleanByVersions(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
true);
}
private void testInsertAndCleanFailedWritesByVersions(
Copy link
Member Author

@xushiyan xushiyan Nov 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was moved up from the bottom to here below public void testInsertAndCleanFailedWritesByVersions()

Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
throws Exception {
int maxVersions = 3; // keep upto 3 versions for each file
HoodieWriteConfig cfg = getConfigBuilder()
.withAutoCommit(false)
.withHeartbeatIntervalInMs(3000)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {

/**
* Test Clean-By-Versions using bulk-insert/upsert API.
*/
@Test
public void testBulkInsertAndCleanByVersions() throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this moved to TestCleanerInsertAndCleanByVersions

testInsertAndCleanByVersions(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false);
}
final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);

/**
* Test Clean-By-Versions using prepped versions of bulk-insert/upsert API.
*/
@Test
public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this moved to TestCleanerInsertAndCleanByVersions

testInsertAndCleanByVersions(
(client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
SparkRDDWriteClient::upsertPreppedRecords, true);
}
Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);

client.commit(result.getLeft(), result.getRight());

HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient);

assertTrue(table.getCompletedCleanTimeline().empty());

insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);

insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);

Pair<String, JavaRDD<WriteStatus>> ret =
insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);

// Await till enough time passes such that the last failed commits heartbeats are expired
await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient()
.isHeartbeatExpired(ret.getLeft()));

List<HoodieCleanStat> cleanStats = runCleaner(cfg);
assertEquals(0, cleanStats.size(), "Must not clean any files");
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3);
Option<HoodieInstant> rollBackInstantForFailedCommit = timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata(
timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class);
// Rollback of one of the failed writes should have deleted 3 files
assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
}
}

/**
* Tests no more than 1 clean is scheduled if hoodie.clean.allow.multiple config is set to false.
Expand Down Expand Up @@ -329,133 +344,6 @@ public void testMultiClean() {
}
}

/**
* Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
*
* @param insertFn Insert API to be tested
* @param upsertFn Upsert API to be tested
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
* record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
* @throws Exception in case of errors
*/
private void testInsertAndCleanByVersions(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
throws Exception {
int maxVersions = 2; // keep upto 2 versions for each file
HoodieWriteConfig cfg = getConfigBuilder()
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
.retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {

final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);

final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);

insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);

Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
for (String partitionPath : dataGen.getPartitionPaths()) {
TableFileSystemView fsView = table.getFileSystemView();
Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> {
fg.getLatestFileSlice().map(fs -> compactionFileIdToLatestFileSlice.put(fg.getFileGroupId(), fs));
return true;
}));
if (added.isPresent()) {
// Select only one file-group for compaction
break;
}
}

// Create workload with selected file-slices
List<Pair<String, FileSlice>> partitionFileSlicePairs = compactionFileIdToLatestFileSlice.entrySet().stream()
.map(e -> Pair.of(e.getKey().getPartitionPath(), e.getValue())).collect(Collectors.toList());
HoodieCompactionPlan compactionPlan =
CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Option.empty(), Option.empty());
List<String> instantTimes = makeIncrementalCommitTimes(9, 1, 10);
String compactionTime = instantTimes.get(0);
table.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime),
TimelineMetadataUtils.serializeCompactionPlan(compactionPlan));

instantTimes = instantTimes.subList(1, instantTimes.size());
// Keep doing some writes and clean inline. Make sure we have expected number of files
// remaining.
for (String newInstantTime : instantTimes) {
try {
client.startCommitWithTime(newInstantTime);
List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newInstantTime, 100);

List<WriteStatus> statuses = upsertFn.apply(client, jsc.parallelize(records, 1), newInstantTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(getConfig(), context, metaClient);
HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();

TableFileSystemView fsView = table.getFileSystemView();
// Need to ensure the following
for (String partitionPath : dataGen.getPartitionPaths()) {
// compute all the versions of all files, from time 0
HashMap<String, TreeSet<String>> fileIdToVersions = new HashMap<>();
for (HoodieInstant entry : timeline.getInstants().collect(Collectors.toList())) {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(entry).get(), HoodieCommitMetadata.class);

for (HoodieWriteStat wstat : commitMetadata.getWriteStats(partitionPath)) {
if (!fileIdToVersions.containsKey(wstat.getFileId())) {
fileIdToVersions.put(wstat.getFileId(), new TreeSet<>());
}
fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new Path(wstat.getPath()).getName()));
}
}

List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());

for (HoodieFileGroup fileGroup : fileGroups) {
if (compactionFileIdToLatestFileSlice.containsKey(fileGroup.getFileGroupId())) {
// Ensure latest file-slice selected for compaction is retained
Option<HoodieBaseFile> dataFileForCompactionPresent =
Option.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> {
return compactionFileIdToLatestFileSlice.get(fileGroup.getFileGroupId()).getBaseInstantTime()
.equals(df.getCommitTime());
}).findAny());
assertTrue(dataFileForCompactionPresent.isPresent(),
"Data File selected for compaction is retained");
} else {
// file has no more than max versions
String fileId = fileGroup.getFileGroupId().getFileId();
List<HoodieBaseFile> dataFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList());

assertTrue(dataFiles.size() <= maxVersions,
"fileId " + fileId + " has more than " + maxVersions + " versions");

// Each file, has the latest N versions (i.e cleaning gets rid of older versions)
List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId));
for (int i = 0; i < dataFiles.size(); i++) {
assertEquals((dataFiles.get(i)).getCommitTime(),
commitedVersions.get(commitedVersions.size() - 1 - i),
"File " + fileId + " does not have latest versions on commits" + commitedVersions);
}
}
}
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
}

/**
* Test Clean-By-Commits using insert/upsert API.
*/
Expand Down Expand Up @@ -676,7 +564,7 @@ public void testCleanEmptyInstants() throws Exception {
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--instantClean, "%09d")));
}

@Test
public void testCleanWithReplaceCommits() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
Expand Down Expand Up @@ -1200,65 +1088,6 @@ public void testRerunFailedClean(boolean simulateMetadataFailure) throws Excepti
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
}

/**
* Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective.
*
* @param insertFn Insert API to be tested
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
* record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
* @throws Exception in case of errors
*/
private void testInsertAndCleanFailedWritesByVersions(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
throws Exception {
int maxVersions = 3; // keep upto 3 versions for each file
HoodieWriteConfig cfg = getConfigBuilder()
.withAutoCommit(false)
.withHeartbeatIntervalInMs(3000)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {

final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);

Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);

client.commit(result.getLeft(), result.getRight());

HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient);

assertTrue(table.getCompletedCleanTimeline().empty());

insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);

insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);

Pair<String, JavaRDD<WriteStatus>> ret =
insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);

// Await till enough time passes such that the last failed commits heartbeats are expired
await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient()
.isHeartbeatExpired(ret.getLeft()));

List<HoodieCleanStat> cleanStats = runCleaner(cfg);
assertEquals(0, cleanStats.size(), "Must not clean any files");
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3);
Option<HoodieInstant> rollBackInstantForFailedCommit = timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata(
timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class);
// Rollback of one of the failed writes should have deleted 3 files
assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
}
}

/**
* Common test method for validating pending compactions.
*
Expand Down
Loading