Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String
String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
.get().getTimestamp();
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
.findInstantsBefore(instantTime).getInstants().collect(Collectors.toList());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compactionInstantTime at line 703 has to be based off instantTime and not latestDeltaCommitTime. Latest delta commit time is not part of the compaction yet. Otherwise we are changing the meaning of the current compaction timeline with this change.

Copy link
Copy Markdown
Contributor Author

@nsivabalan nsivabalan Jan 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me try to explain.
lets say we have 10 commits, C1, C2 -> C10.
Prior to this patch, we will compact immediately after C10 and so compaction commit will be C10 + "001".

With this patch, we will be compacting just before C11 starts getting applied to MDT.
And so, I am basing the compaction commit of latest delta commit time which is C10 and not instant time which is C11.
And so, its C10 + "001". but if I go with instantTime, then we might change the behavior. In fact, we can't do that, since compaction time will be greater than the delta commit which will be eventually created when we apply C11 to MDT.

Let me know if this makes sense.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I was actually asking the compaction time to be C10 and not C11. I misread line 689. Look good then.


if (!pendingInstants.isEmpty()) {
LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
Comment thread
nsivabalan marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);

try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
if (canTriggerTableService) {
// trigger compaction before doing the delta commit. this is to ensure, if this delta commit succeeds in metadata table, but failed in data table,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it the case that MT commit could succeed, while Data Table commit could fail?
MT table should only be updated after we're done with the Data Table changes, and right before we complete the txn, right?

// we would have compacted metadata table and so could have included uncommitted data which will never be ignored while reading from metadata
// table (since reader will filter out only from delta commits)
compactIfNecessary(writeClient, instantTime);
}

if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime);
Expand All @@ -153,7 +160,6 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
// reload timeline
metadataMetaClient.reloadActiveTimeline();
if (canTriggerTableService) {
compactIfNecessary(writeClient, instantTime);
cleanIfNecessary(writeClient, instantTime);
writeClient.archive();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,15 @@ public void testMetadataTableArchival() throws Exception {
for (; i <= 2; i++) {
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
}
// expected num commits = 1 (bootstrap) + 2 (writes) + 1 compaction.
// expected num commits = 1 (bootstrap) + 2 (writes)
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4);
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 3);

// trigger a async table service, archival should not kick in, even though conditions are met.
doCluster(testTable, "000000" + commitTime.getAndIncrement());
metadataTimeline = metadataMetaClient.reloadActiveTimeline();
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 5);
assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), 4);

// trigger a regular write operation. archival should kick in.
doWriteOperation(testTable, "000000" + (commitTime.getAndIncrement()), INSERT);
Expand Down Expand Up @@ -371,7 +371,7 @@ public void testMetadataTableServices() throws Exception {
// this should have triggered compaction in metadata table
tableMetadata = metadata(writeConfig, context);
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001");
}


Expand Down Expand Up @@ -402,7 +402,7 @@ public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Except

HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
assertTrue(tableMetadata.getLatestCompactionTime().isPresent());
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000004001");
assertEquals(tableMetadata.getLatestCompactionTime().get(), "0000003001");

HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
HoodieWriteConfig metadataTableWriteConfig = getMetadataWriteConfig(writeConfig);
Expand Down Expand Up @@ -447,6 +447,7 @@ public void testMetadataTableWithPendingCompaction(boolean simulateFailedCompact
// this new write is expected to trigger metadata table compaction
String commitInstant = "0000002";
doWriteOperation(testTable, commitInstant, INSERT);
doWriteOperation(testTable, "0000003", INSERT);

HoodieTableMetadata tableMetadata = metadata(writeConfig, context);
String metadataCompactionInstant = commitInstant + "001";
Expand All @@ -467,7 +468,7 @@ public void testMetadataTableWithPendingCompaction(boolean simulateFailedCompact

if (simulateFailedCompaction) {
// this should retry the compaction in metadata table.
doWriteOperation(testTable, "0000003", INSERT);
doWriteOperation(testTable, "0000004", INSERT);
} else {
// let the compaction succeed in metadata and validation should succeed.
FileCreateUtils.renameTempToMetaFile(tempFilePath, metaFilePath);
Expand All @@ -476,8 +477,8 @@ public void testMetadataTableWithPendingCompaction(boolean simulateFailedCompact
validateMetadata(testTable);

// add few more write and validate
doWriteOperation(testTable, "0000004", INSERT);
doWriteOperation(testTable, "0000005", UPSERT);
doWriteOperation(testTable, "0000005", INSERT);
doWriteOperation(testTable, "0000006", UPSERT);
validateMetadata(testTable);

if (simulateFailedCompaction) {
Expand All @@ -496,13 +497,13 @@ public void testMetadataTableWithPendingCompaction(boolean simulateFailedCompact
validateMetadata(testTable);

// this should retry the failed compaction in metadata table.
doWriteOperation(testTable, "0000006", INSERT);
doWriteOperation(testTable, "0000007", INSERT);

validateMetadata(testTable);

// add few more write and validate
doWriteOperation(testTable, "0000007", INSERT);
doWriteOperation(testTable, "0000008", UPSERT);
doWriteOperation(testTable, "0000008", INSERT);
doWriteOperation(testTable, "0000009", UPSERT);
validateMetadata(testTable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,21 +310,25 @@ public void testNoArchivalWithInflightCompactionInMiddle(boolean enableMetadata)
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
if (i != 6) {
if (enableMetadata) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
// on 6th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight.
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1);
for (int j = 1; j <= 6; j++) {
if (j == 1) {
// first commit should be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
} else if (j == 2) {
// 2nd compaction should not be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j)));
} else {
// every other commit should not be archived
assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
if (i != 6) {
assertEquals(originalCommits, commitsAfterArchival);
} else {
// on 7th commit, archival will kick in. but will archive only one commit since 2nd compaction commit is inflight.
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1);
for (int j = 1; j <= 6; j++) {
if (j == 1) {
// first commit should be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
} else if (j == 2) {
// 2nd compaction should not be archived
assertFalse(commitsAfterArchival.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + j)));
} else {
// every other commit should not be archived
assertTrue(commitsAfterArchival.contains(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)));
}
}
}
}
Expand Down Expand Up @@ -586,37 +590,39 @@ public void testArchiveTableWithMetadataTableCompaction() throws Exception {
assertEquals(originalCommits, commitsAfterArchival);
}

// one more commit will trigger compaction in metadata table and will let archival move forward.
// two more commits will trigger compaction in metadata table and will let archival move forward.
testTable.doWriteOperation("00000006", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// trigger archival
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
// before archival 1,2,3,4,5,6
// after archival 5,6
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 4);
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004")), getActiveCommitInstants(Arrays.asList("00000005", "00000006")), commitsAfterArchival);
// before archival 1,2,3,4,5,6,7
// after archival 6,7
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 5);
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005")),
getActiveCommitInstants(Arrays.asList("00000006", "00000007")), commitsAfterArchival);

// 3 more commits, 5 and 6 will be archived. but will not move after 6 since compaction has to kick in in metadata table.
testTable.doWriteOperation("00000007", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// 3 more commits, 6 and 7 will be archived. but will not move after 6 since compaction has to kick in metadata table.
testTable.doWriteOperation("00000008", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
testTable.doWriteOperation("00000009", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// trigger archival
commitsList = archiveAndGetCommitsList(writeConfig);
originalCommits = commitsList.getKey();
commitsAfterArchival = commitsList.getValue();
assertEquals(originalCommits, commitsAfterArchival);

// ideally, this will archive commits 5, 6, 7, but since compaction in metadata is until 6, only 5 and 6 will get archived,
testTable.doWriteOperation("00000009", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// ideally, this will archive commits 6, 7, 8 but since compaction in metadata is until 6, only 6 will get archived,
testTable.doWriteOperation("00000010", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
commitsList = archiveAndGetCommitsList(writeConfig);
originalCommits = commitsList.getKey();
commitsAfterArchival = commitsList.getValue();
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 2);
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 1);
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006")),
getActiveCommitInstants(Arrays.asList("00000007", "00000008", "00000009")), commitsAfterArchival);
getActiveCommitInstants(Arrays.asList("00000007", "00000008", "00000009", "00000010")), commitsAfterArchival);

// and then 2nd compaction will take place at 12th commit
for (int i = 10; i < 13; i++) {
for (int i = 11; i < 14; i++) {
testTable.doWriteOperation("000000" + i, WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// trigger archival
commitsList = archiveAndGetCommitsList(writeConfig);
Expand All @@ -626,16 +632,16 @@ public void testArchiveTableWithMetadataTableCompaction() throws Exception {
}

// one more commit will trigger compaction in metadata table and will let archival move forward.
testTable.doWriteOperation("00000013", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
testTable.doWriteOperation("00000014", WriteOperationType.UPSERT, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
// trigger archival
commitsList = archiveAndGetCommitsList(writeConfig);
originalCommits = commitsList.getKey();
commitsAfterArchival = commitsList.getValue();
// before archival 5,6,7,8,9,10,11,12,13
// after archival 12,13
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 5);
// before archival 7,8,9,10,11,12,13,14
// after archival 13,14
assertEquals(originalCommits.size() - commitsAfterArchival.size(), 6);
verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000006", "00000007", "00000008",
"00000009", "00000010", "00000011")), getActiveCommitInstants(Arrays.asList("00000012", "00000013")), commitsAfterArchival);
"00000009", "00000010", "00000011", "00000012")), getActiveCommitInstants(Arrays.asList("00000013", "00000014")), commitsAfterArchival);
}

private Pair<List<HoodieInstant>, List<HoodieInstant>> archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);

// 2nd write batch; 3 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit
for (int i = 5; i < 8; i++) {
// 2nd write batch; 4 commits for the 3rd partition; the 3rd commit to trigger archiving the replace commit
for (int i = 5; i < 9; i++) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
client.startCommitWithTime(instantTime);
client.insert(jsc().parallelize(dataGen.generateInsertsForPartition(instantTime, 1, DEFAULT_THIRD_PARTITION_PATH), 1), instantTime);
Expand All @@ -96,7 +96,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce

// verify records
final HoodieTimeline timeline2 = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(4, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()),
assertEquals(5, countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline2, Option.empty()),
"should only have the 4 records from the 3rd partition.");
}
}
Expand Down