diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 8b353d64c4f5a..3c876e75c28f5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -77,6 +77,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; @@ -144,6 +145,9 @@ public HoodieBootstrapWriteMetadata execute() { Option metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY)); // if there are full bootstrap to be performed, perform that too Option fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD)); + // Delete the marker directory for the instant + WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index a4c98665ebdba..eeed5fe75b84a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -17,7 +17,7 @@ package org.apache.hudi.functional -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector import org.apache.hudi.common.fs.FSUtils @@ -31,9 +31,9 @@ import org.apache.spark.sql.{SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.io.TempDir import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} - import java.time.Instant import java.util.Collections + import scala.collection.JavaConverters._ class TestDataSourceForBootstrap { @@ -106,6 +106,8 @@ class TestDataSourceForBootstrap { DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, extraOpts = Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator") ) + // check marked directory clean up + assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001"))) // Read bootstrapped table and verify count var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") @@ -161,6 +163,9 @@ class TestDataSourceForBootstrap { Some("datestr"), Map(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true")) + // check marked directory clean up + assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001"))) + // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count())