Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,22 +250,24 @@ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableM
rebootstrap = true;
} else if (datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) {
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
+ "latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
+ ", latestDatasetInstant=" + datasetMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
rebootstrap = true;
}
}

if (rebootstrap) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.REBOOTSTRAP_STR, 1));
LOG.info("Deleting Metadata Table directory so that it can be re-bootstrapped");
datasetMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath()), true);
exists = false;
}

if (!exists) {
// Initialize for the first time by listing partitions and files directly from the file system
bootstrapFromFilesystem(engineContext, datasetMetaClient);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
if (bootstrapFromFilesystem(engineContext, datasetMetaClient)) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
}
}
}

Expand All @@ -274,22 +276,22 @@ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableM
*
* @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
*/
private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException {
ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled");

// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the instant which does not have any non-completed instants before it.
Option<HoodieInstant> latestInstant = Option.empty();
boolean foundNonComplete = false;
for (HoodieInstant instant : datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList())) {
if (!instant.isCompleted()) {
foundNonComplete = true;
} else if (!foundNonComplete) {
latestInstant = Option.of(instant);
}
// We can only bootstrap if there are no pending operations on the dataset
Option<HoodieInstant> pendingInstantOption = Option.fromJavaOptional(datasetMetaClient.getActiveTimeline()
.getReverseOrderedInstants().filter(i -> !i.isCompleted()).findFirst());
if (pendingInstantOption.isPresent()) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
LOG.warn("Cannot bootstrap metadata table as operation is in progress: " + pendingInstantOption.get());
return false;
}

String createInstantTime = latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the latest commit timestamp.
String createInstantTime = datasetMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst()
.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);

HoodieTableMetaClient.withPropertyBuilder()
Expand Down Expand Up @@ -335,6 +337,7 @@ private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTa

LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata");
update(commitMetadata, createInstantTime);
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ public void clean() throws IOException {
}

/**
* Metadata Table should not be created unless it is enabled in config.
* Metadata Table bootstrap scenarios.
*/
@Test
public void testDefaultNoMetadataTable() throws Exception {
public void testMetadataTableBootstrap() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

Expand All @@ -120,46 +120,63 @@ public void testDefaultNoMetadataTable() throws Exception {
assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());

// Metadata table is not created if disabled by config
String firstCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
client.startCommitWithTime("001");
client.insert(jsc.emptyRDD(), "001");
client.startCommitWithTime(firstCommitTime);
client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 5)), firstCommitTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we remove calling syncTableMetadata() in SparkRDDWriteClient#preWrite? since it will do nothing as there are in progress instant always.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the bootstrap path this will do nothing. But in the normal path (post bootstrap), this path also syncs the metadata table.

The metadata table sync is indepdendant of the dataset commits. So its possible that the dataset commit may have completed and metadata table sync failed (due to some error or crash) in postWrite. So calling the sync again in preWrite ensures we sync again.

@vinothchandar Do you think the sync in preWrite can be removed as an optimization?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @prashantwason mentioned, its actually kind of important that we do that in preWrite() so we bring the metadata table in sync with the timeline, before writes happen. So not sure if it can be removed.

We are moving towards a synchronous design anyway, for updating metadata, so lets may be revisit in that context?

assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created");
assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
}

// Metadata table should not be created if any non-complete instants are present
String secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, true), true)) {
client.startCommitWithTime(secondCommitTime);
client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime);
// AutoCommit is false so no bootstrap
client.syncTableMetadata();
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created");
assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
// rollback this commit
client.rollback(secondCommitTime);
}

// Metadata table created when enabled by config & sync is called
secondCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
client.startCommitWithTime("002");
client.insert(jsc.emptyRDD(), "002");
client.startCommitWithTime(secondCommitTime);
client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime);
client.syncTableMetadata();
assertTrue(fs.exists(new Path(metadataTableBasePath)));
validateMetadata(client);
}

// Delete the 001 and 002 instants and introduce a 003. This should trigger a rebootstrap of the metadata
// table as un-synched instants have been "archived".
// Metadata Table should not have 001 and 002 delta-commits as it was re-bootstrapped
// Delete all existing instants on dataset to simulate archiving. This should trigger a re-bootstrap of the metadata
// table as last synched instant has been "archived".
final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName("001"))));
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName("002"))));
Arrays.stream(fs.globStatus(new Path(metaClient.getMetaPath(), "{001,002}.*"))).forEach(s -> {
try {
fs.delete(s.getPath(), false);
} catch (IOException e) {
LOG.warn("Error when deleting instant " + s + ": " + e);
}
});
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime))));

Arrays.stream(fs.listStatus(new Path(metaClient.getMetaPath()))).filter(status -> status.getPath().getName().matches("^\\d+\\..*"))
.forEach(status -> {
try {
fs.delete(status.getPath(), false);
} catch (IOException e) {
LOG.warn("Error when deleting instant " + status + ": " + e);
}
});

String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
client.startCommitWithTime("003");
client.insert(jsc.emptyRDD(), "003");
client.startCommitWithTime(thirdCommitTime);
client.insert(jsc.parallelize(dataGen.generateUpdates(thirdCommitTime, 2)), thirdCommitTime);
client.syncTableMetadata();
assertTrue(fs.exists(new Path(metadataTableBasePath)));
validateMetadata(client);

// Metadata Table should not have 001 and 002 delta-commits as it was re-bootstrapped
assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName("001"))));
assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName("002"))));
// Metadata Table should not have previous delta-commits as it was re-bootstrapped
assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime))));
assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime))));
assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime))));
}
}

Expand Down Expand Up @@ -193,6 +210,7 @@ public void testOnlyValidPartitionsAdded() throws Exception {
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
client.startCommitWithTime("005");
client.insert(jsc.emptyRDD(), "005");

List<String> partitions = metadataWriter(client).metadata().getAllPartitionPaths();
assertFalse(partitions.contains(nonPartitionDirectory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class HoodieMetadataMetrics implements Serializable {
public static final String BASEFILE_READ_STR = "basefile_read";
public static final String INITIALIZE_STR = "initialize";
public static final String SYNC_STR = "sync";
public static final String REBOOTSTRAP_STR = "rebootstrap";
public static final String BOOTSTRAP_ERR_STR = "bootstrap_error";

// Stats names
public static final String STAT_TOTAL_BASE_FILE_SIZE = "totalBaseFileSizeInBytes";
Expand Down