Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1045,8 +1045,15 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String inst
.filterCompletedInstants()
.lastInstant().orElseThrow(() -> new HoodieMetadataException("No completed deltacommit in metadata table"))
.getTimestamp();
// we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table.
// Whenever you want to change this logic, please ensure all below scenarios are considered.
// a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed
// b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, latest compaction instant time in MDT represents
// any instants before that is already synced with metadata table.
// c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every
// instant before c4 is synced with metadata table.
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBefore(latestDeltaCommitTimeInMetadataTable).getInstants();
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();

if (!pendingInstants.isEmpty()) {
LOG.info(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1949,6 +1949,69 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {
validateMetadata(client);
}

/**
* Validates that if an instant is completed in MDT, but crashed before commiting to DT, MDT compaction should not kick in based on the instant time
* since its not complete in DT yet.
* @throws Exception
*/
@Test
public void testMDTCompactionWithFailedCommits() throws Exception {
tableType = HoodieTableType.COPY_ON_WRITE;
init(tableType);
context = new HoodieSparkEngineContext(jsc);
HoodieWriteConfig initialConfig = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withProperties(initialConfig.getProps())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(4).build()).build();
SparkRDDWriteClient client = getHoodieWriteClient(config);

// Write 1 (Bulk insert)
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
client.startCommitWithTime(newCommitTime);
List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// Write 2 (inserts)
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);

// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("_row_key").withInlineClustering(true)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();

HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
.withAutoCommit(false)
.withClusteringConfig(clusteringConfig).build();

// trigger clustering
SparkRDDWriteClient newClient = getHoodieWriteClient(newWriteConfig);
String clusteringCommitTime = newClient.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = newClient.cluster(clusteringCommitTime, true);

// manually remove clustering completed instant from .hoodie folder and to mimic succeeded clustering in metadata table, but failed in data table.
FileCreateUtils.deleteReplaceCommit(basePath, clusteringCommitTime);

metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieWriteConfig updatedWriteConfig = HoodieWriteConfig.newBuilder().withProperties(initialConfig.getProps())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(4).build())
.withRollbackUsingMarkers(false).build();

client = getHoodieWriteClient(updatedWriteConfig);

newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client, Option.of(clusteringCommitTime));
}

@Test
public void testMetadataReadWithNoCompletedCommits() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
Expand Down Expand Up @@ -2519,6 +2582,10 @@ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, HoodieIndex.
}

private void validateMetadata(SparkRDDWriteClient testClient) throws IOException {
validateMetadata(testClient, Option.empty());
}

private void validateMetadata(SparkRDDWriteClient testClient, Option<String> ignoreFilesWithCommit) throws IOException {
HoodieWriteConfig config = testClient.getConfig();

SparkRDDWriteClient client;
Expand Down Expand Up @@ -2567,7 +2634,12 @@ private void validateMetadata(SparkRDDWriteClient testClient) throws IOException
} else {
partitionPath = new Path(basePath, partition);
}

FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, partitionPath);
if (ignoreFilesWithCommit.isPresent()) {
fsStatuses = Arrays.stream(fsStatuses).filter(fileStatus -> !fileStatus.getPath().getName().contains(ignoreFilesWithCommit.get()))
.collect(Collectors.toList()).toArray(new FileStatus[0]);
}
FileStatus[] metaStatuses = tableMetadata.getAllFilesInPartition(partitionPath);
List<String> fsFileNames = Arrays.stream(fsStatuses)
.map(s -> s.getPath().getName()).collect(Collectors.toList());
Expand Down