Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* A bootstrap selector which employs bootstrap mode by specified partitions.
*/
public class BootstrapRegexModeSelector extends BootstrapModeSelector {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkValidatorUtils;
Expand Down Expand Up @@ -73,7 +72,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -115,10 +113,6 @@ private void validate() {
"Ensure Bootstrap Source Path is set");
checkArgument(config.getBootstrapModeSelectorClass() != null,
"Ensure Bootstrap Partition Selector is set");
if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) {
checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()),
"FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY bootstrap mode");
}
}

@Override
Expand Down Expand Up @@ -320,18 +314,8 @@ private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndPr
BootstrapModeSelector selector =
(BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);

Map<BootstrapMode, List<String>> result = new HashMap<>();
// for FULL_RECORD mode, original record along with metadata fields are needed
if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) {
if (!(selector instanceof FullRecordBootstrapModeSelector)) {
FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new FullRecordBootstrapModeSelector(config);
result.putAll(fullRecordBootstrapModeSelector.select(folders));
} else {
result.putAll(selector.select(folders));
}
} else {
result = selector.select(folders);
}
Map<BootstrapMode, List<String>> result = selector.select(folders);

Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context)
.stream().flatMap(f -> f.getValue().stream()).count());
.stream().mapToLong(f -> f.getValue().size()).sum());

BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
assertFalse(index.useIndex());
Expand All @@ -295,7 +295,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec

// Upsert case
long updateTimestamp = Instant.now().toEpochMilli();
String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
String updateSPath = tmpFolder.toAbsolutePath() + "/data2";
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
JavaRDD<HoodieRecord> updateBatch =
generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), updateSPath, context),
Expand Down Expand Up @@ -390,7 +390,6 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
Dataset<Row> missingBootstrapped = sqlContext.sql("select a._hoodie_record_key from bootstrapped a "
+ "where a._hoodie_record_key not in (select _row_key from original)");
assertEquals(0, missingBootstrapped.count());
//sqlContext.sql("select * from bootstrapped").show(10, false);
}

// RO Input Format Read
Expand All @@ -410,7 +409,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
}
assertEquals(totalRecords, seenKeys.size());

//RT Input Format Read
// RT Input Format Read
reloadInputFormats();
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
Expand Down Expand Up @@ -475,7 +474,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
}
assertEquals(totalRecords, seenKeys.size());

//RT Input Format Read - Project only non-hoodie column
// RT Input Format Read - Project only non-hoodie column
reloadInputFormats();
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ private static Stream<Arguments> testArgs() {
for (Boolean dash : dashPartitions) {
for (String bt : bootstrapType) {
for (Integer n : nPartitions) {
//can't be mixed bootstrap if it's nonpartitioned
//don't need to test slash partitions if it's nonpartitioned
// can't be mixed bootstrap if it's nonpartitioned
// don't need to test slash partitions if it's nonpartitioned
if ((!bt.equals("mixed") && dash) || n > 0) {
b.add(Arguments.of(bt, dash, tt, n));
}
Expand All @@ -129,7 +129,7 @@ public void runTests(String bootstrapType, Boolean dashPartitions, HoodieTableTy
this.nPartitions = nPartitions;
setupDirs();

//do bootstrap
// do bootstrap
Map<String, String> options = setBootstrapOptions();
Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
bootstrapDf.write().format("hudi")
Expand All @@ -139,7 +139,7 @@ public void runTests(String bootstrapType, Boolean dashPartitions, HoodieTableTy
compareTables();
verifyMetaColOnlyRead(0);

//do upserts
// do upserts
options = basicOptions();
doUpdate(options, "001");
compareTables();
Expand Down Expand Up @@ -224,8 +224,8 @@ protected void doUpsert(Map<String,String> options, Dataset<Row> df) {
.mode(SaveMode.Append)
.save(hudiBasePath);
if (bootstrapType.equals("mixed")) {
//mixed tables have a commit for each of the metadata and full bootstrap modes
//so to align with the regular hudi table, we need to compact after 4 commits instead of 3
// mixed tables have a commit for each of the metadata and full bootstrap modes
// so to align with the regular hudi table, we need to compact after 4 commits instead of 3
nCompactCommits = "4";
}
df.write().format("hudi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.hudi.functional

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
import org.apache.hudi.client.bootstrap.selector.{FullRecordBootstrapModeSelector, MetadataOnlyBootstrapModeSelector}
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
Expand All @@ -30,9 +29,11 @@ import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, sort
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.HoodieClientTestUtils
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger}

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand Down Expand Up @@ -75,12 +76,14 @@ class TestDataSourceForBootstrap {
val verificationCol: String = "driver"
val originalVerificationVal: String = "driver_0"
val updatedVerificationVal: String = "driver_update"
val metadataOnlySelector: String = classOf[MetadataOnlyBootstrapModeSelector].getCanonicalName
val fullRecordSelector: String = classOf[FullRecordBootstrapModeSelector].getCanonicalName

/**
* TODO rebase onto existing test base-class to avoid duplication
*/
@BeforeEach
def initialize(@TempDir tempDir: java.nio.file.Path) {
def initialize(@TempDir tempDir: java.nio.file.Path): Unit = {
val sparkConf = HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName)

spark = SparkSession.builder.config(sparkConf).getOrCreate
Expand Down Expand Up @@ -119,7 +122,7 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
bootstrapKeygenClass = bootstrapKeygenClass
Expand Down Expand Up @@ -166,13 +169,13 @@ class TestDataSourceForBootstrap {

@ParameterizedTest
@CsvSource(value = Array(
"METADATA_ONLY,AVRO",
"org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector,AVRO",
// TODO(HUDI-5807) enable for spark native records
/* "METADATA_ONLY,SPARK", */
"FULL_RECORD,AVRO",
"FULL_RECORD,SPARK"
/* "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK", */
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,AVRO",
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK"
))
def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String, recordType: HoodieRecordType): Unit = {
def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapSelector: String, recordType: HoodieRecordType): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)

Expand All @@ -189,22 +192,22 @@ class TestDataSourceForBootstrap {
val readOpts = commonOpts ++ Map(
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr",
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key -> bootstrapMode
HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key -> bootstrapSelector
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
readOpts ++ getRecordTypeOpts(recordType),
classOf[SimpleKeyGenerator].getName)

// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))

val expectedDF = bootstrapMode match {
case "METADATA_ONLY" =>
val expectedDF = bootstrapSelector match {
case `metadataOnlySelector` =>
sort(sourceDF)
case "FULL_RECORD" =>
case `fullRecordSelector` =>
sort(sourceDF)
}

Expand Down Expand Up @@ -271,7 +274,7 @@ class TestDataSourceForBootstrap {
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
Expand Down Expand Up @@ -346,7 +349,7 @@ class TestDataSourceForBootstrap {
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
Expand Down Expand Up @@ -414,7 +417,7 @@ class TestDataSourceForBootstrap {
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
Expand Down Expand Up @@ -481,7 +484,7 @@ class TestDataSourceForBootstrap {
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
Expand Down Expand Up @@ -616,9 +619,9 @@ class TestDataSourceForBootstrap {
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
}

def runMetadataBootstrapAndVerifyCommit(tableType: String,
extraOpts: Map[String, String] = Map.empty,
bootstrapKeygenClass: String): String = {
def runBootstrapAndVerifyCommit(tableType: String,
extraOpts: Map[String, String] = Map.empty,
bootstrapKeygenClass: String): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
Expand All @@ -632,7 +635,8 @@ class TestDataSourceForBootstrap {

val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val expectedBootstrapInstant =
if ("FULL_RECORD".equals(extraOpts.getOrElse(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key, HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.defaultValue)))
if (fullRecordSelector.equals(extraOpts.getOrElse(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key,
HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.defaultValue)))
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS
else HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS
assertEquals(expectedBootstrapInstant, commitInstantTime1)
Expand Down Expand Up @@ -689,9 +693,9 @@ class TestDataSourceForBootstrap {

object TestDataSourceForBootstrap {

def sort(df: DataFrame) = df.sort("_row_key")
def sort(df: DataFrame): Dataset[Row] = df.sort("_row_key")

def dropMetaCols(df: DataFrame) =
def dropMetaCols(df: DataFrame): DataFrame =
df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)

}