diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java index 99c3dbd90a838..0b6e5a7781899 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java @@ -31,6 +31,9 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +/** + * A bootstrap selector which employs bootstrap mode by specified partitions. + */ public class BootstrapRegexModeSelector extends BootstrapModeSelector { private static final long serialVersionUID = 1L; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index eae577d8b9dab..d93401c2247bf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -26,7 +26,6 @@ import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider; import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider; import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; -import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.SparkValidatorUtils; @@ -73,7 +72,6 @@ import java.time.Duration; import java.time.Instant; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -115,10 +113,6 @@ private void validate() { "Ensure Bootstrap Source Path is set"); checkArgument(config.getBootstrapModeSelectorClass() != null, "Ensure Bootstrap Partition Selector is set"); - if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) { - checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()), - "FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY bootstrap mode"); - } } @Override @@ -320,18 +314,8 @@ private Map>>> listAndPr BootstrapModeSelector selector = (BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config); - Map> result = new HashMap<>(); - // for FULL_RECORD mode, original record along with metadata fields are needed - if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) { - if (!(selector instanceof FullRecordBootstrapModeSelector)) { - FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new FullRecordBootstrapModeSelector(config); - result.putAll(fullRecordBootstrapModeSelector.select(folders)); - } else { - result.putAll(selector.select(folders)); - } - } else { - result = selector.select(folders); - } + Map> result = selector.select(folders); + Map> partitionToFiles = folders.stream().collect( Collectors.toMap(Pair::getKey, Pair::getValue)); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index d94f065ee0ace..b398ea82aa986 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -272,7 +272,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec metaClient.reloadActiveTimeline(); assertEquals(0, metaClient.getCommitsTimeline().countInstants()); assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context) - .stream().flatMap(f -> f.getValue().stream()).count()); + .stream().mapToLong(f -> f.getValue().size()).sum()); BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient); assertFalse(index.useIndex()); @@ -295,7 +295,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec // Upsert case long updateTimestamp = Instant.now().toEpochMilli(); - String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2"; + String updateSPath = tmpFolder.toAbsolutePath() + "/data2"; generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath); JavaRDD updateBatch = generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), updateSPath, context), @@ -390,7 +390,6 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta Dataset missingBootstrapped = sqlContext.sql("select a._hoodie_record_key from bootstrapped a " + "where a._hoodie_record_key not in (select _row_key from original)"); assertEquals(0, missingBootstrapped.count()); - //sqlContext.sql("select * from bootstrapped").show(10, false); } // RO Input Format Read @@ -410,7 +409,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta } assertEquals(totalRecords, seenKeys.size()); - //RT Input Format Read + // RT Input Format Read reloadInputFormats(); seenKeys = new HashSet<>(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( @@ -475,7 +474,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta } assertEquals(totalRecords, seenKeys.size()); - //RT Input Format Read - Project only non-hoodie column + // RT Input Format Read - Project only non-hoodie column reloadInputFormats(); seenKeys = new HashSet<>(); records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat( diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index 80fc792ad54c2..bbce1c61f0f26 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -108,8 +108,8 @@ private static Stream testArgs() { for (Boolean dash : dashPartitions) { for (String bt : bootstrapType) { for (Integer n : nPartitions) { - //can't be mixed bootstrap if it's nonpartitioned - //don't need to test slash partitions if it's nonpartitioned + // can't be mixed bootstrap if it's nonpartitioned + // don't need to test slash partitions if it's nonpartitioned if ((!bt.equals("mixed") && dash) || n > 0) { b.add(Arguments.of(bt, dash, tt, n)); } @@ -129,7 +129,7 @@ public void runTests(String bootstrapType, Boolean dashPartitions, HoodieTableTy this.nPartitions = nPartitions; setupDirs(); - //do bootstrap + // do bootstrap Map options = setBootstrapOptions(); Dataset bootstrapDf = sparkSession.emptyDataFrame(); bootstrapDf.write().format("hudi") @@ -139,7 +139,7 @@ public void runTests(String bootstrapType, Boolean dashPartitions, HoodieTableTy compareTables(); verifyMetaColOnlyRead(0); - //do upserts + // do upserts options = basicOptions(); doUpdate(options, "001"); compareTables(); @@ -224,8 +224,8 @@ protected void doUpsert(Map options, Dataset df) { .mode(SaveMode.Append) .save(hudiBasePath); if (bootstrapType.equals("mixed")) { - //mixed tables have a commit for each of the metadata and full bootstrap modes - //so to align with the regular hudi table, we need to compact after 4 commits instead of 3 + // mixed tables have a commit for each of the metadata and full bootstrap modes + // so to align with the regular hudi table, we need to compact after 4 commits instead of 3 nCompactCommits = "4"; } df.write().format("hudi") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 12974d133a84b..9949b396abf10 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -17,9 +17,8 @@ package org.apache.hudi.functional -import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider -import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector +import org.apache.hudi.client.bootstrap.selector.{FullRecordBootstrapModeSelector, MetadataOnlyBootstrapModeSelector} import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord @@ -30,9 +29,11 @@ import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, sort import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.testutils.HoodieClientTestUtils import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger} + +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{col, lit} -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.io.TempDir import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -75,12 +76,14 @@ class TestDataSourceForBootstrap { val verificationCol: String = "driver" val originalVerificationVal: String = "driver_0" val updatedVerificationVal: String = "driver_update" + val metadataOnlySelector: String = classOf[MetadataOnlyBootstrapModeSelector].getCanonicalName + val fullRecordSelector: String = classOf[FullRecordBootstrapModeSelector].getCanonicalName /** * TODO rebase onto existing test base-class to avoid duplication */ @BeforeEach - def initialize(@TempDir tempDir: java.nio.file.Path) { + def initialize(@TempDir tempDir: java.nio.file.Path): Unit = { val sparkConf = HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName) spark = SparkSession.builder.config(sparkConf).getOrCreate @@ -119,7 +122,7 @@ class TestDataSourceForBootstrap { // Perform bootstrap val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key) - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + val commitInstantTime1 = runBootstrapAndVerifyCommit( DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass), bootstrapKeygenClass = bootstrapKeygenClass @@ -166,13 +169,13 @@ class TestDataSourceForBootstrap { @ParameterizedTest @CsvSource(value = Array( - "METADATA_ONLY,AVRO", + "org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector,AVRO", // TODO(HUDI-5807) enable for spark native records - /* "METADATA_ONLY,SPARK", */ - "FULL_RECORD,AVRO", - "FULL_RECORD,SPARK" + /* "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK", */ + "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,AVRO", + "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK" )) - def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String, recordType: HoodieRecordType): Unit = { + def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapSelector: String, recordType: HoodieRecordType): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) @@ -189,11 +192,11 @@ class TestDataSourceForBootstrap { val readOpts = commonOpts ++ Map( DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr", DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", - HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key -> bootstrapMode + HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key -> bootstrapSelector ) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + val commitInstantTime1 = runBootstrapAndVerifyCommit( DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, readOpts ++ getRecordTypeOpts(recordType), classOf[SimpleKeyGenerator].getName) @@ -201,10 +204,10 @@ class TestDataSourceForBootstrap { // check marked directory clean up assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001"))) - val expectedDF = bootstrapMode match { - case "METADATA_ONLY" => + val expectedDF = bootstrapSelector match { + case `metadataOnlySelector` => sort(sourceDF) - case "FULL_RECORD" => + case `fullRecordSelector` => sort(sourceDF) } @@ -271,7 +274,7 @@ class TestDataSourceForBootstrap { ) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + val commitInstantTime1 = runBootstrapAndVerifyCommit( DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, writeOpts, classOf[SimpleKeyGenerator].getName) @@ -346,7 +349,7 @@ class TestDataSourceForBootstrap { ) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + val commitInstantTime1 = runBootstrapAndVerifyCommit( DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, writeOpts, classOf[SimpleKeyGenerator].getName) @@ -414,7 +417,7 @@ class TestDataSourceForBootstrap { ) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + val commitInstantTime1 = runBootstrapAndVerifyCommit( DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, writeOpts, classOf[SimpleKeyGenerator].getName) @@ -481,7 +484,7 @@ class TestDataSourceForBootstrap { ) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + val commitInstantTime1 = runBootstrapAndVerifyCommit( DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, writeOpts, classOf[SimpleKeyGenerator].getName) @@ -616,9 +619,9 @@ class TestDataSourceForBootstrap { verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true) } - def runMetadataBootstrapAndVerifyCommit(tableType: String, - extraOpts: Map[String, String] = Map.empty, - bootstrapKeygenClass: String): String = { + def runBootstrapAndVerifyCommit(tableType: String, + extraOpts: Map[String, String] = Map.empty, + bootstrapKeygenClass: String): String = { val bootstrapDF = spark.emptyDataFrame bootstrapDF.write .format("hudi") @@ -632,7 +635,8 @@ class TestDataSourceForBootstrap { val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) val expectedBootstrapInstant = - if ("FULL_RECORD".equals(extraOpts.getOrElse(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key, HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.defaultValue))) + if (fullRecordSelector.equals(extraOpts.getOrElse(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key, + HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.defaultValue))) HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS else HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS assertEquals(expectedBootstrapInstant, commitInstantTime1) @@ -689,9 +693,9 @@ class TestDataSourceForBootstrap { object TestDataSourceForBootstrap { - def sort(df: DataFrame) = df.sort("_row_key") + def sort(df: DataFrame): Dataset[Row] = df.sort("_row_key") - def dropMetaCols(df: DataFrame) = + def dropMetaCols(df: DataFrame): DataFrame = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*) }