Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
Expand Down Expand Up @@ -413,15 +412,33 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
// with logic above to avoid Stream.concat
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();

Option<HoodieInstant> oldestPendingCompactionAndReplaceInstant = table.getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
.filter(s -> !s.isCompleted())
// Get the oldest inflight instant and a completed commit before this inflight instant.
Option<HoodieInstant> oldestPendingInstant = table.getActiveTimeline()
.getWriteTimeline()
.filter(instant -> !instant.isCompleted())
.firstInstant();

Option<HoodieInstant> oldestInflightCommitInstant =
table.getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterInflights().firstInstant();
// Oldest commit to retain is the greatest completed commit, that is less than the oldest pending instant.
// In some cases when inflight is the lowest commit then oldest commit to retain will be equal to oldest
// inflight commit.
Option<HoodieInstant> oldestCommitToRetain;
if (oldestPendingInstant.isPresent()) {
Option<HoodieInstant> completedCommitBeforeOldestPendingInstant =
Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(),
LESSER_THAN, oldestPendingInstant.get().getTimestamp())).findFirst());
// Check if the completed instant is higher than the oldest inflight instant
// in that case update the oldestCommitToRetain to oldestInflight commit time.
if (!completedCommitBeforeOldestPendingInstant.isPresent()
|| HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
LESSER_THAN, completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
oldestCommitToRetain = oldestPendingInstant;
} else {
oldestCommitToRetain = completedCommitBeforeOldestPendingInstant;
}
} else {
oldestCommitToRetain = Option.empty();
}

// NOTE: We cannot have any holes in the commit timeline.
// We cannot archive any commits which are made after the first savepoint present,
Expand Down Expand Up @@ -460,19 +477,12 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() throws IOException {
return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}
}).filter(s -> {
// Ensure commits >= the oldest pending compaction/replace commit is retained
return oldestPendingCompactionAndReplaceInstant
// oldestCommitToRetain is the highest completed commit instant that is less than the oldest inflight instant.
// By filtering out any commit >= oldestCommitToRetain, we can ensure there are no gaps in the timeline
// when inflight commits are present.
return oldestCommitToRetain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this simplifies a lot :)

Copy link
Contributor

@danny0405 danny0405 Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it can fix #7738, although a little obscure to understand.

.map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}).filter(s -> {
// We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't
// get archived, i.e, instants after the oldestInflight are retained on the timeline
if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) {
return oldestInflightCommitInstant.map(instant ->
compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}
return true;
}).filter(s ->
oldestInstantToRetainForCompaction.map(instantToRetain ->
compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -913,19 +914,21 @@ public void testNoArchivalWithInflightCompactionInMiddle(boolean enableMetadata)
if (i != 7) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
// on 7th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight.
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1);
// on 7th commit, archival will kick in, but cannot archive any commit,
// since 1st deltacommit is the greatest completed commit before an oldest inflight commit.
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 0);
}
} else {
if (i != 7) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
// on 7th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight.
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1);
// on 7th commit, archival will kick in, but cannot archive any commit,
// since 1st deltacommit is the greatest completed commit before an oldest inflight commit.
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 0);
for (int j = 1; j <= 7; j++) {
if (j == 1) {
// first commit should be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
// first commit should not be archived
assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
} else if (j == 2) {
// 2nd compaction should not be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j)));
Expand Down Expand Up @@ -1418,6 +1421,115 @@ public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enable
}
}

/**
* Test archival functionality when there are inflights files.
* Archive should hold on to the greatest completed commit that is less than the oldes inflight commit.
* @throws Exception
*/
@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some java docs on what we are testing here.
simple illustration of timeline might also help

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

public void testGetCommitInstantsToArchiveDuringInflightCommits() throws Exception {
HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 3, 4, 2);

Set<String> expectedInstants = new HashSet<>();
// Create 3 completed commits.
for (int i = 0; i < 3; i++) {
String instantTime = "100" + i;
HoodieTestDataGenerator.createCommitFile(basePath, instantTime, wrapperFs.getConf());
expectedInstants.add(instantTime);
}
// Create an inflight file.
String replaceInstant = "1003";
HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, replaceInstant, wrapperFs.getConf());
expectedInstants.add(replaceInstant);
// Create 3 more instants
for (int i = 4; i < 7; i++) {
String instantTime = "100" + i;
HoodieTestDataGenerator.createCommitFile(basePath, instantTime, wrapperFs.getConf());
expectedInstants.add(instantTime);
}
// Create another inflight commit
HoodieTestDataGenerator.createRequestedCommitFile(basePath, "1007", wrapperFs.getConf());
HoodieTestDataGenerator.createPendingCommitFile(basePath, "1007", wrapperFs.getConf());
expectedInstants.add("1007");
// Create 6 more instants
for (int i = 0; i < 6; i++) {
String instantTime = "101" + i;
HoodieTestDataGenerator.createCommitFile(basePath, instantTime, wrapperFs.getConf());
expectedInstants.add(instantTime);
}
HoodieTimeline timeline = metaClient.reloadActiveTimeline().getWriteTimeline();

// Check the count of instants.
assertEquals(expectedInstants.size(), timeline.countInstants(), "Loaded 14 commits and the count should match");

// Run archival
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
boolean result = archiver.archiveIfRequired(context);
expectedInstants.remove("1000");
expectedInstants.remove("1001");
assertTrue(result);
timeline = metaClient.reloadActiveTimeline().getWriteTimeline();

// Check the count of instants after archive it should have 2 less instants
// because 103 replacecommit's inflight will block archival.
assertEquals(12, timeline.countInstants(), "After archival only first 2 commits should be archived");
assertEquals(expectedInstants.size(), timeline.countInstants(), "After archival only first 2 commits should be archived");

HoodieTimeline finalTimeline = timeline;
assertEquals(12, expectedInstants.stream().filter(instant -> finalTimeline.containsInstant(instant)).count());
assertEquals("1002", timeline.getInstantsAsStream().findFirst().get().getTimestamp());

// Delete replacecommit requested instant.
Path replaceCommitRequestedPath = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeRequestedReplaceFileName(replaceInstant));
metaClient.getFs().delete(replaceCommitRequestedPath);
metaClient.reloadActiveTimeline();

// Run archival
assertTrue(archiver.archiveIfRequired(context));
timeline = metaClient.reloadActiveTimeline().getWriteTimeline();
expectedInstants.removeAll(Arrays.asList("1002", "1003", "1004", "1005"));

// Check the count of instants after archive it should have 3 more less instants
// This time 1007 inflight commit will block archival.
assertEquals(8, timeline.countInstants(), "After archival only first 2 commits should be archived");
assertEquals(expectedInstants.size(), timeline.countInstants(), "After archival only first 2 commits should be archived");
HoodieTimeline refreshedTimeline = timeline;
assertEquals(8, expectedInstants.stream().filter(instant -> refreshedTimeline.containsInstant(instant)).count());
assertEquals("1006", timeline.getInstantsAsStream().findFirst().get().getTimestamp());
}

/**
* If replacecommit inflight is the oldest commit in the timeline or for that matter any inflight commit is present
* then the archival is blocked from there. This method test this scenario.
*/
@Test
public void testWithOldestReplaceCommit() throws Exception {
HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 2, 3, 2);

HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, "1001", wrapperFs.getConf());
HoodieTestDataGenerator.createReplaceCommitInflightFile(basePath, "1001", wrapperFs.getConf());
// Create 8 completed commits.
for (int i = 2; i < 10; i++) {
String instantTime = "100" + i;
HoodieTestDataGenerator.createCommitFile(basePath, instantTime, wrapperFs.getConf());
}

HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);

HoodieTimeline timeline = metaClient.reloadActiveTimeline();
assertEquals(9, timeline.countInstants(), "Loaded 9 commits and the count should match");
boolean result = archiver.archiveIfRequired(context);
assertTrue(result);
timeline = metaClient.reloadActiveTimeline();
assertEquals(9, timeline.countInstants(),
"Since we have a pending replacecommit at 1001, we should never archive any commit after 1001");
assertEquals("1001", timeline.getInstantsAsStream().findFirst().get().getTimestamp());
}

@Test
public void testArchivalAndCompactionInMetadataTable() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,12 @@ public static Option<HoodieInstant> getOldestInstantToRetainForClustering(
retainLowerBound = earliestInstantToRetain.getTimestamp();
} else {
// no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS clean policy,
// retain first instant after clean instant
// retain first instant after clean instant.
// For KEEP_LATEST_FILE_VERSIONS cleaner policy, file versions are only maintained for active file groups
// not for replaced file groups. So, last clean instant can be considered as a lower bound, since
// the cleaner would have removed all the file groups until then. But there is a catch to this logic,
// while cleaner is running if there is a pending replacecommit then those files are not cleaned.
// TODO: This case has to be handled. HUDI-6352
retainLowerBound = cleanInstant.getTimestamp();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,18 @@ public GenericRecord generateRecordForShortTripSchema(String rowKey, String ride
return rec;
}

public static void createRequestedCommitFile(String basePath, String instantTime, Configuration configuration) throws IOException {
Path pendingRequestedFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeRequestedCommitFileName(instantTime));
createEmptyFile(basePath, pendingRequestedFile, configuration);
}

public static void createPendingCommitFile(String basePath, String instantTime, Configuration configuration) throws IOException {
Path pendingCommitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeInflightCommitFileName(instantTime));
createEmptyFile(basePath, pendingCommitFile, configuration);
}

public static void createCommitFile(String basePath, String instantTime, Configuration configuration) {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
createCommitFile(basePath, instantTime, configuration, commitMetadata);
Expand Down Expand Up @@ -534,6 +546,20 @@ private static void createMetadataFile(String f, String basePath, Configuration
}
}

public static void createReplaceCommitRequestedFile(String basePath, String instantTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeRequestedReplaceFileName(instantTime));
createEmptyFile(basePath, commitFile, configuration);
}

public static void createReplaceCommitInflightFile(String basePath, String instantTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeInflightReplaceFileName(instantTime));
createEmptyFile(basePath, commitFile, configuration);
}

private static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
Arrays.asList(HoodieTimeline.makeInflightReplaceFileName(instantTime),
HoodieTimeline.makeRequestedReplaceFileName(instantTime))
Expand All @@ -558,6 +584,13 @@ private static void createEmptyFile(String basePath, Path filePath, Configuratio
os.close();
}

public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeRequestedCompactionFileName(instantTime));
createEmptyFile(basePath, commitFile, configuration);
}

public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant,
Configuration configuration) throws IOException {
Path commitFile =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,41 @@ public void testSnapshotPreCommitValidate() throws IOException {
ensureFilesInCommit("Pulling 1 commit from 100, should get us the 10 files committed at 100", files, "100", 10);
}

/**
* Test scenario where inflight commit is between completed commits.
*/
@Test
public void testSnapshotPreCommitValidateWithInflights() throws IOException {
// Create commit and data files with commit 000
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 5, "000");
createCommitFile(basePath, "000", "2016/05/01");

// create inflight commit add more files with same file_id.
InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId1", 5, "100");
FileCreateUtils.createInflightCommit(basePath.toString(), "100");

// Create another commit without datafiles.
createCommitFile(basePath, "200", "2016/05/01");

// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());

// Now, the original data files with commit time 000 should be returned.
FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(5, files.length, "Snapshot read must return all files in partition");
ensureFilesInCommit("Should return base files from commit 000, inflight data files with "
+ "greater timestamp should be filtered", files, "000", 5);

// Create data files with same file_id for commit 200.
InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId1", 5, "200");

// This time data files from commit time 200 will be returned.
files = inputFormat.listStatus(jobConf);
assertEquals(5, files.length, "Snapshot read must return all files in partition");
ensureFilesInCommit("Only completed commits files should be returned.",
files, "200", 5);
}

private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
int totalExpected) throws IOException {
int actualCount = 0;
Expand Down