diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 4e488047d845e..b70c31f9253fb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -124,9 +124,7 @@ public HoodieBootstrapWriteMetadata> execute() { Option>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY)); // if there are full bootstrap to be performed, perform that too Option>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD)); - // Delete the marker directory for the instant - WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) - .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); @@ -148,17 +146,22 @@ protected Option>> metadataBootstrap } HoodieTableMetaClient metaClient = table.getMetaClient(); + String bootstrapInstantTime = HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS; metaClient.getActiveTimeline().createNewInstant( - new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(), - HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)); + new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(), bootstrapInstantTime)); table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, - metaClient.getCommitActionType(), HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty()); + metaClient.getCommitActionType(), bootstrapInstantTime), Option.empty()); HoodieData bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList); HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result); + + // Delete the marker directory for the instant + WriteMarkersFactory.get(config.getMarkersType(), table, bootstrapInstantTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + return Option.of(result); } @@ -265,12 +268,20 @@ protected Option>> fullBootstrap(Lis (JavaRDD) inputProvider.generateInputRecords("bootstrap_source", config.getBootstrapSourceBasePath(), partitionFilesList); // Start Full Bootstrap - final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(), - HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS); + String bootstrapInstantTime = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS; + final HoodieInstant requested = new HoodieInstant( + State.REQUESTED, table.getMetaClient().getCommitActionType(), bootstrapInstantTime); table.getActiveTimeline().createNewInstant(requested); // Setup correct schema and run bulk insert. - return Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute()); + Option>> writeMetadataOption = + Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute()); + + // Delete the marker directory for the instant + WriteMarkersFactory.get(config.getMarkersType(), table, bootstrapInstantTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + + return writeMetadataOption; } protected BaseSparkCommitActionExecutor getBulkInsertActionExecutor(HoodieData inputRecordsRDD) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index 6b54765a0b08b..93b25f8a6542f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -62,6 +62,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; @@ -346,6 +347,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants()); assertEquals(instant, metaClient.getActiveTimeline() .getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp()); + verifyNoMarkerInTempFolder(); Dataset bootstrapped = sqlContext.read().format("parquet").load(basePath); Dataset original = sqlContext.read().format("parquet").load(bootstrapBasePath); @@ -463,7 +465,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta jsc.hadoopConfiguration(), FSUtils.getAllPartitionPaths(context, basePath, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), - basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true, + basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true, Arrays.asList("_row_key")); assertEquals(totalRecords, records.size()); for (GenericRecord r : records) { @@ -473,6 +475,12 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta assertEquals(totalRecords, seenKeys.size()); } + private void verifyNoMarkerInTempFolder() throws IOException { + String tempFolderPath = metaClient.getTempFolderPath(); + FileSystem fileSystem = FSUtils.getFs(tempFolderPath, jsc.hadoopConfiguration()); + assertEquals(0, fileSystem.listStatus(new Path(tempFolderPath)).length); + } + public static class TestFullBootstrapDataProvider extends FullRecordBootstrapDataProvider> { public TestFullBootstrapDataProvider(TypedProperties props, HoodieSparkEngineContext context) { @@ -481,7 +489,7 @@ public TestFullBootstrapDataProvider(TypedProperties props, HoodieSparkEngineCon @Override public JavaRDD generateInputRecords(String tableName, String sourceBasePath, - List>> partitionPaths) { + List>> partitionPaths) { String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream()) .findAny().get().getPath()).toString(); ParquetFileReader reader = null;