diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 38a044799dfda..236426e3807b1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -250,13 +250,14 @@ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableM rebootstrap = true; } else if (datasetMetaClient.getActiveTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) { LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived." - + "latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + + " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + ", latestDatasetInstant=" + datasetMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()); rebootstrap = true; } } if (rebootstrap) { + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.REBOOTSTRAP_STR, 1)); LOG.info("Deleting Metadata Table directory so that it can be re-bootstrapped"); datasetMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath()), true); exists = false; @@ -264,8 +265,9 @@ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableM if (!exists) { // Initialize for the first time by listing partitions and files directly from the file system - bootstrapFromFilesystem(engineContext, datasetMetaClient); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); + if (bootstrapFromFilesystem(engineContext, datasetMetaClient)) { + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); + } } } @@ -274,22 +276,22 @@ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableM * * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset */ - private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException { + private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient datasetMetaClient) throws IOException { ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); - // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - // Otherwise, we use the timestamp of the instant which does not have any non-completed instants before it. - Option latestInstant = Option.empty(); - boolean foundNonComplete = false; - for (HoodieInstant instant : datasetMetaClient.getActiveTimeline().getInstants().collect(Collectors.toList())) { - if (!instant.isCompleted()) { - foundNonComplete = true; - } else if (!foundNonComplete) { - latestInstant = Option.of(instant); - } + // We can only bootstrap if there are no pending operations on the dataset + Option pendingInstantOption = Option.fromJavaOptional(datasetMetaClient.getActiveTimeline() + .getReverseOrderedInstants().filter(i -> !i.isCompleted()).findFirst()); + if (pendingInstantOption.isPresent()) { + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1)); + LOG.warn("Cannot bootstrap metadata table as operation is in progress: " + pendingInstantOption.get()); + return false; } - String createInstantTime = latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); + // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit + // Otherwise, we use the latest commit timestamp. + String createInstantTime = datasetMetaClient.getActiveTimeline().getReverseOrderedInstants().findFirst() + .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); HoodieTableMetaClient.withPropertyBuilder() @@ -335,6 +337,7 @@ private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTa LOG.info("Committing " + partitionToFileStatus.size() + " partitions and " + stats[0] + " files to metadata"); update(commitMetadata, createInstantTime); + return true; } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index f34f567b19948..331092e0e7705 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -108,10 +108,10 @@ public void clean() throws IOException { } /** - * Metadata Table should not be created unless it is enabled in config. + * Metadata Table bootstrap scenarios. */ @Test - public void testDefaultNoMetadataTable() throws Exception { + public void testMetadataTableBootstrap() throws Exception { init(HoodieTableType.COPY_ON_WRITE); HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); @@ -120,46 +120,63 @@ public void testDefaultNoMetadataTable() throws Exception { assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); // Metadata table is not created if disabled by config + String firstCommitTime = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) { - client.startCommitWithTime("001"); - client.insert(jsc.emptyRDD(), "001"); + client.startCommitWithTime(firstCommitTime); + client.insert(jsc.parallelize(dataGen.generateInserts(firstCommitTime, 5)), firstCommitTime); assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created"); assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); } + // Metadata table should not be created if any non-complete instants are present + String secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(false, true), true)) { + client.startCommitWithTime(secondCommitTime); + client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime); + // AutoCommit is false so no bootstrap + client.syncTableMetadata(); + assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not be created"); + assertThrows(TableNotFoundException.class, () -> HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build()); + // rollback this commit + client.rollback(secondCommitTime); + } + // Metadata table created when enabled by config & sync is called + secondCommitTime = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime("002"); - client.insert(jsc.emptyRDD(), "002"); + client.startCommitWithTime(secondCommitTime); + client.insert(jsc.parallelize(dataGen.generateUpdates(secondCommitTime, 2)), secondCommitTime); client.syncTableMetadata(); assertTrue(fs.exists(new Path(metadataTableBasePath))); validateMetadata(client); } - // Delete the 001 and 002 instants and introduce a 003. This should trigger a rebootstrap of the metadata - // table as un-synched instants have been "archived". - // Metadata Table should not have 001 and 002 delta-commits as it was re-bootstrapped + // Delete all existing instants on dataset to simulate archiving. This should trigger a re-bootstrap of the metadata + // table as last synched instant has been "archived". final String metadataTableMetaPath = metadataTableBasePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName("001")))); - assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName("002")))); - Arrays.stream(fs.globStatus(new Path(metaClient.getMetaPath(), "{001,002}.*"))).forEach(s -> { - try { - fs.delete(s.getPath(), false); - } catch (IOException e) { - LOG.warn("Error when deleting instant " + s + ": " + e); - } - }); + assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); + + Arrays.stream(fs.listStatus(new Path(metaClient.getMetaPath()))).filter(status -> status.getPath().getName().matches("^\\d+\\..*")) + .forEach(status -> { + try { + fs.delete(status.getPath(), false); + } catch (IOException e) { + LOG.warn("Error when deleting instant " + status + ": " + e); + } + }); + String thirdCommitTime = HoodieActiveTimeline.createNewInstantTime(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) { - client.startCommitWithTime("003"); - client.insert(jsc.emptyRDD(), "003"); + client.startCommitWithTime(thirdCommitTime); + client.insert(jsc.parallelize(dataGen.generateUpdates(thirdCommitTime, 2)), thirdCommitTime); client.syncTableMetadata(); assertTrue(fs.exists(new Path(metadataTableBasePath))); validateMetadata(client); - // Metadata Table should not have 001 and 002 delta-commits as it was re-bootstrapped - assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName("001")))); - assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName("002")))); + // Metadata Table should not have previous delta-commits as it was re-bootstrapped + assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(firstCommitTime)))); + assertFalse(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(secondCommitTime)))); + assertTrue(fs.exists(new Path(metadataTableMetaPath, HoodieTimeline.makeDeltaFileName(thirdCommitTime)))); } } @@ -193,6 +210,7 @@ public void testOnlyValidPartitionsAdded() throws Exception { .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withDirectoryFilterRegex(filterDirRegex).build()).build(); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { client.startCommitWithTime("005"); + client.insert(jsc.emptyRDD(), "005"); List partitions = metadataWriter(client).metadata().getAllPartitionPaths(); assertFalse(partitions.contains(nonPartitionDirectory), diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java index 2bd773bc7819f..9f1b0a0799af0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java @@ -48,6 +48,8 @@ public class HoodieMetadataMetrics implements Serializable { public static final String BASEFILE_READ_STR = "basefile_read"; public static final String INITIALIZE_STR = "initialize"; public static final String SYNC_STR = "sync"; + public static final String REBOOTSTRAP_STR = "rebootstrap"; + public static final String BOOTSTRAP_ERR_STR = "bootstrap_error"; // Stats names public static final String STAT_TOTAL_BASE_FILE_SIZE = "totalBaseFileSizeInBytes";